Distributed Training¶
flashlight provides an easy-to-use API for distributed training. It uses Facebook’s Gloo library when using the CPU backend, and NVIDIA’s NCCL library when using the CUDA backend. In the sections below, we briefly detail the API and document its use.
See examples/DistributedTraining.cpp
for examples.
Setup¶
To initialize the distributed environment, participating process must first perform an initial coordination step. flashlight supports multiple initialization methods, detailed below.
DistributedInit::MPI¶
Use this initialization if spawning processes using MPI. MPI jobs are typically started from the command line using mpirun -n <NUM_PROC> [...]
. MPI will assign ranks for each process so that information can be broadcast for initial coordination.
fl::distributedInit(
fl::DistributedInit::MPI,
-1, // worldRank - unused. Automatically derived from `MPI_Comm_Rank`
-1, // worldRank - unused. Automatically derived from `MPI_Comm_Size`
{} // params
);
DistributedInit::FILE_SYSTEM¶
Use this initialization if all participating devices and processes have access to a shared filesystem. A shared file in that filesystem is used to initially coordinate participating processes. This shared file is specified via the fl::DistributedConstants::kFilePath
key in the parameter map:
fl::distributedInit(
fl::DistributedInit::FILE_SYSTEM,
[...], // worldRank. Each process calls with its corresponding rank.
4, // worldSize
{{fl::DistributedConstants::kFilePath, "/path/to/shared/filesystem/file"}});
When using the CUDA backend, fl::DistributedConstants::kMaxDevicePerNode
must be passed as an additional required value in the parameter map to specify maximum number of GPU devices per node from which to derive a device-id
.
std::cout << (int)fl::distributedBackend(); // 1 - fl::DistributedBackend::NCCL
auto rank = fl::getWorldRank();
std::cout << rank; // 0/1/2/3 depending on the process
auto size = fl::getWorldSize();
std::cout << size; // 4
Synchronizing Parameters¶
Now, we demonstrate the implementation of a data parallel model; during training, data is equally distributed amongst all devices, and each device completes full forward and backward passes independently, before synchronizing state via an allReduce
operation. Below, we call allReduce
on a Flashlight Tensor
:
auto a = fl::full({3, 3}, rank);
fl::allReduce(a);
std::cout << "a" << a << std::endl;
// a
// [3 3 1 1]
// 6.0000 6.0000 6.0000
// 6.0000 6.0000 6.0000
// 6.0000 6.0000 6.0000
flashlight’s distributed API also includes specific functions to synchronize Module
parameters and register them for gradient synchronization. allReduceParameters
synchronizes parameters of a Module
across all processes (which is important in the case of random initialization), and distributeModuleGrads
registers gradients in the Module
for synchronization after each iteration of the backward pass:
auto model = std::make_shared<Sequential>();
// (add other modules to the Sequential)
// synchronize parameters across processes
fl::allReduceParameters(model);
// add a hook to synchronize gradients of model parameters as they're computed
fl::distributedModuleGrads(model, 1.0 / worldSize)
// ...
Distributed Training¶
In this section, we build on the Perceptron training example to run data-parallel distributed training using synchronous stochastic gradient descent (SGD).
First things first - initialize the distributed environment:
// Uses MPI (Run with `mpirun -n 2`), CUDA backend
fl::distributedInit(
fl::DistributedInit::MPI,
-1, // worldRank - unused. Automatically derived from `MPI_Comm_Rank`
-1, // worldSize - unused. Automatically derived from `MPI_Comm_Size`
{{fl::DistributedConstants::kMaxDevicePerNode, "8"}} // param
);
auto worldSize = fl::getWorldSize();
auto worldRank = fl::getWorldRank();
bool isMaster = (worldRank == 0);
fl::setSeed(worldRank);
Create the dataset. Samples are divided equally among all processes.
// Create dataset
const int nSamples = 10000 / worldSize;
const int nFeat = 10;
auto X = fl::rand({nFeat, nSamples}) + 1; // X elements in [1, 2]
auto Y = fl::sum(fl::power(X, 3), {0}).T() + // signal
fl::sin(2 * M_PI * fl::rand({nSamples})); // noise
// Create Dataset to simplify the code for iterating over samples
TensorDataset data({X, Y});
const int inputIdx = 0, targetIdx = 1;
Create a Module
, synchronize its parameters, and register gradients for synchronization:
// Model defintion - 2-layer Perceptron with ReLU activation
auto model = std::make_shared<Sequential>();
model->add(Linear(nFeat, 100));
model->add(ReLU());
model->add(Linear(100, 1));
// MSE loss
auto loss = MeanSquaredError();
// synchronize parameters across processes
fl::allReduceParameters(model);
// register gradients for synchronization
fl::distributeModuleGrads(model, 1.0 / worldSize);
Create an Optimizer
and Meter
and start training:
// Optimizer definition
const float learningRate = 0.0001;
const float momentum = 0.9;
auto sgd = SGDOptimizer(model->params(), learningRate, momentum);
// Meter definition
AverageValueMeter meter;
// Start training
if (isMaster) {
std::cout << "[Multi-layer Perceptron] Started..." << std::endl;
}
const int nEpochs = 100;
for (int e = 1; e <= nEpochs; ++e) {
meter.reset();
for (auto& sample : data) {
sgd.zeroGrad();
// Forward propagation
auto result = model->forward(input(sample[inputIdx]));
// Calculate loss
auto l = loss(result, noGrad(sample[targetIdx]));
// Backward propagation
l.backward();
// Update parameters
sgd.step();
meter.add(l.scalar<float>());
}
auto mse = meter.value();
auto mseArr = fl::Tensor(1, &mse[0]);
fl::allReduce(mseArr);
if (isMaster) {
std::cout << "Epoch: " << e << " Mean Squared Error: "
<< mseArr.scalar<double>() / worldSize << std::endl;
}
}
if (isMaster) {
std::cout << "[Multi-layer Perceptron] Done!" << std::endl;
}
// I1208 19:47:27.683432 3049001 DistributedBackend.cpp:190] Initialized NCCL successfully! Compiled with NCCL 2.2
// [Multi-layer Perceptron] Started...
// Epoch: 1 Mean Squared Error: 20.2124
// Epoch: 2 Mean Squared Error: 5.28266
// Epoch: 3 Mean Squared Error: 2.91948
// Epoch: 4 Mean Squared Error: 2.50887
// Epoch: 5 Mean Squared Error: 2.25293
// ...
// ...
// ...
// Epoch: 97 Mean Squared Error: 0.925514
// Epoch: 98 Mean Squared Error: 0.922071
// Epoch: 99 Mean Squared Error: 0.923678
// Epoch: 100 Mean Squared Error: 0.922085
// [Multi-layer Perceptron] Done!
On NVIDIA Tesla M40 GPUs, the above code runs in 3min 17sec while using distributed traininig with two GPUs, and runs in 5min 30sec without distributed training.