# Distributed Library¶

namespace fl

This source code is licensed under the BSD-style license found in the LICENSE file in the root directory of this source tree. Logging is a light, multi-level, compile time filterable, logging infrastructure that is similar to glog in output format. It defines two logging macros, one for any logging and the other for more verbose logging. Compile time filter is applied separately to each of the two.

Output format: LMMDD HH:MM:SS.uuuuuu tid filename:##] Log message … L: Log level (Fatal, Critical, Error, Warning, Info) MMDD: month, day HH:MM:SS.uuuuuu: time (24-hour format) with micro-seconds tid: thread ID filename:## the basename of the source file and line number of the LOG message

LOG use examples: LOG(INFO) << “foo bar n=” << 42; Output example: I0206 10:42:21.047293 87072 Logging.h:15 foo bar n=42 Note that LOG(level) only prints when level is <= from value set to Logging::setMaxLoggingLevel(level)

VLOG use example: VLOG(1) << “foo bar n=” << 42; Output example: vlog(1)0206 10:42:21.005439 87072 Logging.h:23 foo bar n=42 Note that VLOG(level) only prints when level is <= from value set to VerboseLogging::setMaxLoggingLevel(level)

This source code is licensed under the BSD-style license found in the LICENSE file in the root directory of this source tree. The configurable memory allocator is obtained by calling: std::unique_ptr<MemoryAllocator> CreateMemoryAllocator(config) Config defines a a set of allocators assembled in a CompositeMemoryAllocator.

Functions

void distributedInit(DistributedInit initMethod, int worldRank, int worldSize, const std::unordered_map<std::string, std::string> &params = {})

Initialize the distributed environment.

Note that worldSize, worldRank are ignored if DistributedInit::MPI is used.

Parameters
• initMethod: Initialization method used for setting up the rendezvous

• worldSize: Total number of processes in the communication group 

• worldRank: 0-indexed rank of the current process

• params: Additional parameters (if any) needed for initialization

bool isDistributedInit()

Returns whether the distributed environment has been initialized.

DistributedBackend distributedBackend()

Returns the backend used for distributed setup.

int getWorldRank()

Returns rank of the current process in the communication group (zero-based).

Returns 0 if distributed environment is not initialized

int getWorldSize()

Returns total process in the communication group Returns 1 if distributed environment is not initialized.

void allReduce(Variable &var, double scale = 1.0, bool async = false)

Synchronizes a the array wrapped by the Variable with allreduce.

Parameters
• [in] var: a variable whose array will be synchronized

• [in] scale: scale the Variable after allreduce by this factor

• [in] async: perform the allReduce operation asynchronously in a separate compute stream to the ArrayFire compute stream. NB: if true, syncDistributed must be called in order to ensure the ArrayFire CUDA stream waits until allReduce is complete and uses updated values.

void allReduce(af::array &arr, bool async = false)

Synchronizes a single Arrayfire array with allreduce.

Parameters
• arr: an array which will be synchronized

• [in] async: perform the allReduce operation asynchronously in a separate compute stream to the ArrayFire compute stream. NB: if used, syncDistributed must be called in order to ensure the ArrayFire CUDA stream waits until allReduce is complete and uses updated values.

void allReduceMultiple(std::vector<Variable> vars, double scale = 1.0, bool async = false, bool contiguous = false)

Synchronizes a the arrays wrapped by a vector of Variables with allreduce.

Parameters
• [in] vars: Variables whose arrays will be synchronized

• [in] scale: scale the Variable after allreduce by this factor

• [in] async: perform the allReduce operation asynchronously in a separate compute stream to the ArrayFire compute stream. NB: if used, syncDistributed must be called in order to ensure the ArrayFire CUDA stream waits until allReduce is complete and uses updated values.

• [in] contiguous: copy data for each Variable into a contiguous buffer before performing the allReduce operation

void allReduceMultiple(std::vector<af::array *> arrs, bool async = false, bool contiguous = false)

Synchronizes a vector of pointers to arrays with allreduce.

Parameters
• [in] arrs: a vector of pointers to arrays which will be synchronized

• [in] async: perform the allReduce operation asynchronously in a separate compute stream to the ArrayFire compute stream. NB: if used, syncDistributed must be called in order to ensure the ArrayFire CUDA stream waits until allReduce is complete and uses updated values.

• [in] contiguous: copy data for each Variable into a contiguous buffer before performing the allReduce operation

void syncDistributed()

Synchronizes operations in the ArrayFire compute stream with operations in the distributed compute stream, if applicable.

That is, all operations in the ArrayFire compute stream will not be executed until operations currently enqueued on the distributed compute stream are finished executing.

Note that if asynchronous allReduce is not used, this operation will be a no-op, since no operations will be enqueued on the distributed compute stream.

## Reducer Framework¶

class Reducer

An interface for creating tensor reduction algorithms/rules.

In flashlight, a Reducer instance is typically used for gradient synchronization across devices/processes during training, although the API is general.

Subclassed by fl::CoalescingReducer, fl::InlineReducer

Public Functions

virtual ~Reducer()
virtual void add(Variable &var) = 0

Have the Reducer ingest a Variable.

What happens next is implementation-specific; the implementation may cache the value, process/synchronize immediately, or ignore the value.

Parameters
• [in] var: a Variable to be ingested

virtual void finalize() = 0

Forces a reduction/synchronization of the Reducer.

For some implementations, this may be a no-op if the Reducer immediately processes or synchronizes all gradients that are added.

## Reducers¶

class InlineReducer : public fl::Reducer

A Reducer which calls allReduce directly on gradients to process.

All synchronized gradients are scaled by a pre-specified factor.

Public Functions

InlineReducer(double scale)

Creates a new InlineReducer with a given scaling factor.

Parameters
• [in] scale: the factor by which to scale gradients after synchronization

void add(Variable &var)

Ingest a Variable and immediately call allReduce on it.

Parameters
• [in] var: the Variable to process for synchronization

void finalize()

Forces a reduction/synchronization of the Reducer.

For some implementations, this may be a no-op if the Reducer immediately processes or synchronizes all gradients that are added.

class CoalescingReducer : public fl::Reducer

A Reducer which coalesces added Variables in a cache until some maximum cache size is reached, after which all Variables in the cache are reduced and the cache is emptied.

Since the Reducer executes allReduceMultiple operations asynchronously, to guarantee that synchronized values are available after reduction, finalize must be called before using a given value.

Public Functions

CoalescingReducer(double scale, bool async, bool contiguous)

Creates a new coalescing reducer.

Parameters
• [in] cache: threshold at which the cache will be flushed and its contents synchronized, in bytes

• [in] async: determines whether or not the distributed compute stream runs asynchronously to the AF stream.

• [in] contiguous: forces synchronization of the set of Variables to occur in a contiguous buffer, which may improve performance.

~CoalescingReducer()

Destroy the Reducer.

Calls finalize() before returning.

void add(Variable &var)

Add a Variable to Reducer.

Behaves as follows:

• if the Variable exceeds the size of the coalescing cache, call allReduce immediately to synchronize.

• if the Variable is smaller than the cache and adding it would push the cache oversize, flush the cache and synchronize with allReduceMultiple

• otherwise, add the Variable to the cache.

void finalize()

Flush any remaining Variable`s in the cache and synchronize.