0x00 摘要
Horovod 是Uber于2017年发布的一个易于使用的高性能的分布式训练框架,在业界得到了广泛应用。
本系列将通过源码分析来带领大家了解 Horovod。本文是系列第六篇,看看 Horovod 后台线程架构。
因为掘金字数限制,因此本文分为两篇发出,望见谅。
前面几篇链接如下:
[源码解析] 深度学习分布式训练框架 Horovod (1) — 基础知识
[源码解析] 深度学习分布式训练框架 horovod (2) — 从使用者角度切入
[源码解析] 深度学习分布式训练框架 horovod (3) — Horovodrun背后做了什么
[源码解析] 深度学习分布式训练框架 horovod (4) — 网络基础 & Driver
[源码解析] 深度学习分布式训练框架 horovod (5) — 融合框架
0x01 引子
在前文我们看到,当训练时,Execution Thread 会通过一系列操作,把 Tensor & Operation 传递给后台线程,其流程大致如下:左面是 执行线程,就是训练线程,右面是后台线程,用来做 ring-allreduce:
我们下面继续看看后台是如何运作的。
0x02 设计要点
2.1 问题
因为计算框架往往采用多线程执行训练的计算图,所以在多节点情况下,拿allreduce操作来举例,我们不能保证每个节点上的 allreduce 请求是有序的。因此MPI_Allreduce并不能直接用。
2.2 方案
为了解决这个问题,hvd 设计了一个主从模式,rank 0 为 master 节点,rank 1 ~ rank n 为 worker 节点。
- master 节点进行同步协调,保证对于某些 tensor 的 allreduce 请求最终有序 & 完备,可以继续处理。
- 在决定了哪些 tensor 以后,master又会将可以进行通信的tensor 名字和顺序发还给各个节点。
- 当所有的节点都得到了即将进行的MPI的tensor和顺序,MPI通信得以进行。
首先回顾下同步梯度更新这个概念,其表示的是等待 所有Rank的梯度都计算完毕后,再统一做全局梯度累加,这就涉及到在集群中做消息通信,为此HVD做了两个方面的工作。
- 在Horovod中,每张卡都对应一个训练进程,称之为rank。如4张卡,对应的各个进程的rank则为 [0,1,2,3]。
- 协调工作:HVD里面将 Rank0 作为coordinator(master),其余的进程为worker。由Rank0来协调所有Rank的进度。
- 后台线程:为了不block正常OP的计算,HVD里面创建 background communication 线程,专门用来Rank间的消息同步和AllReduce操作。
在 Horovod 中,训练进程是平等的参与者,每个进程既负责梯度的分发,也负责具体的梯度计算。如下图所示,三个 Worker 中的梯度被均衡地划分为三份,通过 4 次通信,能够完成集群梯度的计算和同步。
2.3 协调
2.3.1 设计
对于协调的过程,文档中也有非常详细的讲述,我也一起翻译。
coordinator 目前采用master-worker paradigm。Rank 0 作为master(即 “coordinator”),其他的rank是 worker。每个 rank 在自己的后台线程中运行,时间片循环调度处理。在每个时间片中会进行如下操作:
Workers 会发送 MPIRequests 给 coordinator。MPIRequests 显式注明 worker 希望做什么(比如在哪个 tensor 上做什么操作,是 gather 还是 reduce,以及 tensor 的形状和类型)。在 tensor 的 collective op 已经执行完 ComputeAsync 之后,worker 就会对于每个 tensor 发送MPIRequest。
当没有更多处理的 tensors 之后,workers 会向 coordinator 发送一个空的 “DONE” 消息;
coordinator 从 worker 收到 MPIRequests 以及 coordinator本身的 TensorFlow ops 之后,将它们存储在请求表中(request table)。协调器继续接收MPIRequest,直到收到了MPI_SIZE 个 “DONE” 消息;
Coordinator 收集所有准备缩减,gather 的张量,或所有导致错误的操作。对于每一个向量或者操作。Coordinator 向所有工作人员发送MPIResponse。当没有更多的MPIResponse时,Coordinator将向工人发送“完成”响应。如果进程正在关闭,它将发送一个“shutdown”响应。
Workers 监听MPIResponse消息,逐个做所要求的reduce或gather操作,直到他们收到”DONE” resposne。此时,时间片结束。如果接收到的不是“DONE”,而是“SHUTDOWN”,则退出background loop
简单来讲就是:
- Coordinator 收集所有 worker(包括Coordinator自己,因为自己也在进行训练)的MPIRequests,把他们放入request table。
- 当收集到 MPI_SIZE 个 “DONE” 消息之后,Coordinator 会找出就绪的 tensor (在 message_table 里面查找)构造出一个 read_to_reduce 的列表,然后发出 size 个 MPIResponse 告知进程进行计算。
- worker 接受到 response 开始真正的计算过程(通过 op_manager 具体执行)。
- 这是整体同步的过程,如果打开 horovod 的 trace log(
HOROVOD_LOG_LEVEL=trace
) 就能看到同步的过程。
2.3.2 实现
我们再具体看看实现。
在Horovod中,每张卡都对应一个训练进程,称之为rank。如4张卡,对应的各个进程的rank则为[0,1,2,3]。
hvd 设计了一个主从模式,将 Rank0 作为coordinator(master),其余的进程为worker,由Rank0来协调所有Rank的进度。每个worker节点上都有一个消息队列,而在master节点上除了一个消息队列,还有一个消息map。
每当计算框架发来通信请求时,hvd并不直接执行MPI,而是封装了这个消息并推入自己的消息队列。
- 整体采用消息的 Request 和 Response 机制;
- 当某个 OP 的 gradient 计算完成并且等待全局的 AllReduce,该 Rank 就会包装一个 Request 请求,调用 ComputeResponseList 将 Request (就是说,这是个 ready tensor)放入这个 rank 的 message_queue 中,每个 Rank 的 后台线程 定期轮训自己的 message_queue,然后把 queue 里面的 request 发送到 Rank 0。因为是同步MPI,所以每个节点会阻塞等待MPI完成。
- Rank 0 拥有 message_table,用来保存其他 rank 的 request 信息,rank 0 会处理 message_table 里面所有的 request。
- 当 rank 0 收到 所有 rank 对于某个 op allreduce 的 request 之后,就说明 这个 tensor 在所有的rank中都已经ready。说明 所有的节点都已经发出了对该tensor的通信请求,那这个tensor就需要且能够进行通信。
- 决定了tensor以后,master又会将可以进行通信的tensor 名字和顺序发还给各个节点。
- Rank 0 节点会挑选出所有符合要求的tensor进行MPI通信:
- 不符合要求的tensor继续留在消息map中,等待条件符合。
- 当有符合要求的 tensor,Rank 0 然后就会发送 Response 给其他 rank,表明当前 op & tensor 的所有局部梯度已经 Ready,可以对这个tensor执行collective操作,比如可以执行 allReduce 操作。
- 至此,所有的节点都得到了即将进行的MPI的tensor和顺序,MPI通信得以进行。
大致逻辑如下:
Rank 0 Rank 1 Rank 2
+ + +
| | |
| | |
| | |
+ Tensor 1 request | |
message_table <---------------------+ |
+ | |
| | |
| | |
v | |
| |
message_table[tensor 1] | |
+ | |
| | |
| Tensor 1 request | |
| <--------------------------------------------+
+ | |
message_table[tensor 1, tensor 1] | |
+ | |
| | |
| Tensor 1 request | |
+-------------------------+ | |
| | | |
| | | |
| <-----------------------+ | |
| | |
v | |
message_table[tensor 1, tensor 1, tensor 1] | |
+ | |
| | |
| | |
| Tensor 1 response | |
+-----------------------------> | |
| | |
| Tensor 1 response | |
+--------------------------------------------> |
| | |
| Tensor 1 response | |
+-------------------------v | |
| | | |
| | | |
| <-----------------------+ | |
| | |
| | |
v v v
复制代码
2.4 Background Thread
每个rank有两个thread,我们通常在python文件中使用hvd.init()来初始化hvd,实际上是开了一个后台线程和一个MPI线程。
- Execution thread(MPI线程) 是用来做机器学习计算的。
- background thread 是 rank 之间同步通讯和做allreduce操作的。百度在设计时候,就有了一个MPI background thread,Horovod沿用了这个设计,名字就是BackgroundThreadLoop。
2.4.1 设计
关于设计的思考,百度在源码注释(tensorflow-allreduce-master/tensorflow/contrib/mpi_collectives/mpi_ops.cc)里面写的非常清楚,我大致翻译出来。
MPI background thread 是为了协调所有的 MPI 进程和tensor reduction。这个设计是处于几个考虑:
- 一些MPI实现要求所有的MPI调用必须在一个单独线程中。因为 Tensorflow 在处理图的时候可能会用到几个线程,所以我们必须使用自己的特定的线程来处理MPI;
- 对于某些错误(比如不匹配的types),MPI 有时候会没有一个确定的处理方式,但是我们还想优雅的处理这些错误。为了做到优雅处理,就要求 MPI 进程需要知道其他进程上tensor的形状和类型;
- MPI reductions and gathers 也许会和其他操作一起并行处理。因为 MPI 使用一个与TF GPUDevice streams分离的内部(inaccessible)的GPU stream,我们不能显式进行同步memcpys或者kernels。因此,MPIAllreduce and MPIAllgather 必须是 AsyncOpKernels 类型 以便 确保memcpys或者kernels的合理顺序;
- 注意:我们无法确保所有的MPI进程以同样的顺序reduce他们的tensors。因此,必须有一个办法来确保可以同时跨越所有的ranks来做reduction memcpys and kernels。我们使用 rank ID 0 作为 coordinator 来协调那些已经准备好的,可以执行的操作(gather and trigger the reduction operations);
精简下:
- 一些MPI的实现机制要求所有的MPI调用必须在一个单独线程中。
- 为了处理错误,MPI 进程需要知道其他进程上tensor的形状和类型。
- MPIAllreduce and MPIAllgather 必须是 AsyncOpKernels 类型 以便 确保memcpys或者kernels的合理顺序。
因此,一个后台线程是有必要的。horovod_global.message_queue 以及 horovod_global.tensor_table 都是在Horovod的后台线程BackgroundThreadLoop 中被处理的。
2.4.2 实现
在底层,AllReduce 被注册为 Op,在 ComputeAsync 中,计算请求被入队到一个队列中。这一队列会被一个统一的后台线程处理。
在这个后台线程的初始化过程中,它会利用进程内共享的全局状态在自己的内存里创建一些对象,以及一些逻辑判断。比如要不要进行 Hierarchical AllReduce,要不要 AutoTune等。这里是初始化阶段的日志。
在初始化的过程中,有一些比较重要的对象会被构造出来,比如各种 Controller。
我们接下来就具体分析后台线程。
0x03 辅助功能
我们首先介绍一些辅助功能。
3.1 如何判断是 coordinator
因为后台线程代码是所有worker公用,所以需要区分 rank0 还是其他 worker,从而执行不同的代码流程。
这里采用 is_coordinator 用来判断是否是 Rank0。
is_coordinator_ 的赋值如下:
void MPIController::DoInitialization() {
......
// Get MPI rank to determine if we are rank zero.
MPI_Comm_rank(mpi_ctx_.mpi_comm, &rank_);
is_coordinator_ = rank_ == 0;
复制代码
is_coordinator_ 的使用方式示例如下,可以看出来,在同步参数的时候,是从 rank 0 获取参数,然后广播给其他 rank,即 workers:
void Controller::SynchronizeParameters() {
ParameterManager::Params param;
if (is_coordinator_) { // rank 0 执行操作
param = parameter_manager_.GetParams();
}
void* buffer = (void*)(¶m);
size_t param_size = sizeof(param);
Bcast(buffer, param_size, 0, Communicator::GLOBAL);
if (!is_coordinator_) { // worker 执行操作
parameter_manager_.SetParams(param);
}
}
复制代码
3.2 协调缓存&信息
在 ComputeResponseList 函数中,会使用以下代码来协调缓存,作用就是整理出来所有 rank 共有的 tensor。
CoordinateCacheAndState(cache_coordinator);
复制代码
主要还是用到了cache_coordinator 操作。
void Controller::CoordinateCacheAndState(CacheCoordinator& cache_coordinator) {
// Sync cache and state information across workers.
cache_coordinator.sync(shared_from_this(), timeline_enabled_);
}
复制代码
3.2.1 计算共有 tensor
CoordinateCacheAndState 函数如下:
- 每个worker都整理自己的bitvector;
- 使用 CrossRankBitwiseAnd 整理出来共有的 tensor;
- 使用 CrossRankBitwiseOr 整理出来共有的无效 tensor;
void CacheCoordinator::sync(std::shared_ptr<Controller> controller,
bool timeline_enabled) {
// Resize and initialize bit vector.
int nbits = num_active_bits_ + NUM_STATUS_BITS;
int count = (nbits + sizeof(long long) * CHAR_BIT - 1) /
(sizeof(long long) * CHAR_BIT);
......
// 每个worker都整理自己的bitvector
// For each cache hit on this worker, flip associated bit in bit vector.
for (auto bit : cache_hits_) {
int shifted_bit = bit + NUM_STATUS_BITS;
int shift = shifted_bit / (sizeof(long long) * CHAR_BIT);
bitvector_[shift] |=
(1ull << (shifted_bit % (sizeof(long long) * CHAR_BIT)));
if (timeline_enabled) {
// Set corresponding bit in extended section for timeline if needed.
bitvector_[count + shift] ^=
(1ull << (shifted_bit % (sizeof(long long) * CHAR_BIT)));
}
}
// 整理出来共有的 tensor
// Global AND operation to get intersected bit array.
controller->CrossRankBitwiseAnd(bitvector_, fullcount);
// Search for flipped bits to populate common cache hit set. There will never
// be invalid bits in this set.
cache_hits_.clear();
for (int i = 0; i < count; ++i) {
int shift = i * sizeof(long long) * CHAR_BIT;
long long ll = bitvector_[i];
while (ll) {
int idx = __builtin_ffsll(ll);
int shifted_bit = shift + idx - 1;
cache_hits_.insert(shifted_bit - NUM_STATUS_BITS);
ll &= ~(1ull << (idx - 1));
}
}
......
// If any worker has invalid cache entries, communicate invalid bits across
// workers using a second bit-wise allreduce operation.
if (invalid_in_queue_) {
std::memset(&bitvector_[0], 0, count * sizeof(long long));
for (auto bit : invalid_bits_) {
int shift = bit / (sizeof(long long) * CHAR_BIT);
bitvector_[shift] |= (1ull << (bit % (sizeof(long long) * CHAR_BIT)));
}
// Global OR operation to get common invalid bits.
controller->CrossRankBitwiseOr(bitvector_, count);
// Search for flipped bits to populate common invalid bit set.
invalid_bits_.clear();
for (int i = 0; i < count; ++i) {
int shift = i * sizeof(long long) * CHAR_BIT;
long long ll = bitvector_[i];
while (ll) {
int idx = __builtin_ffsll(ll);
int bit = shift + idx - 1;
invalid_bits_.insert(bit);
ll &= ~(1ull << (idx - 1));
}
}
}
synced_ = true;
}
复制代码
3.2.2 MPI操作
CrossRankBitwiseAnd 作用是 调用 MPI 归并 共有的 bitvector。
void MPIController::CrossRankBitwiseAnd(std::vector<long long>& bitvector,
int count) {
int ret_code = MPI_Allreduce(MPI_IN_PLACE, bitvector.data(), count,
MPI_LONG_LONG_INT, MPI_BAND, mpi_ctx_.mpi_comm);
}
复制代码
3.3 MPIContext
mpi_context 是在加载 C++ 的代码时候就已经创建了,同时创建的还有其他 context( nccl_context, gpu_context),主要是维护一些节点上 mpi 通信的必要环境信息和设置,如:
- 3 个 MPI communicator,mpi_comm,local_comm,cross_comm 分别负责 horovod mpi 传输,节点内传输,和节点间分层传输(主要用于 hierarchical allreduce)。
- mpi_float16_t :horovod 主要以 float16 传输。
- mpi_float16_sum: float16 对应的sum 操作。
在 horovod 使用 mpi 的时候,都会使用上面的 communicator 进行数据传输。
3.4 Parameter_manager
Parameter_manager 主要是 GlobalState 的一个用于管理一些调节 horovod 性能的参数的管理器,在 BackgroundThreadLoop 中跟其他的 GlobalState 的元素一同初始化,然后会读取下面这些对应的环境变量,然后进行设置。
-
HOROVOD_FUSION_THRESHOLD :指传输数据切片的大小,默认是64M,如果切片太大,传输的时候就不能很好地 pipeline 传输,如果太小,一个 tensor 需要传输多次,增加 IO 的 overhead。
-
HOROVOD_CYCLE_TIME :指 RunLoopOnce 的睡眠时长,默认是 5ms,比较理想的睡眠时间应该是 RunLoopOnce 其余逻辑处理的时间 + HOROVOD_CYCLE_TIME 刚好等于一次前向传播和后向传播所用的时间,因为睡太久前端会在等 RunLoopOnce 睡醒;如果睡太短,不断地跑一次 RunLoopOnce,tensor_queue 也不会有新的元素,只是白跑。
-
HOROVOD_CACHE_CAPACITY:指 cache 的大小,这个可能跟 model 层数参数量相关了。
-
HOROVOD_HIERARCHICAL_ALLGATHER:是否使用分层的 allgather 的方式等
Parameter_manager 也提供了对这些参数自动调节的功能。通过 Parameter_manager.SetAutoTuning 进行设置,设置后会在初始的几个 batch 尝试不同的参数组合进行通信,后面会收敛到一组最优的参数值。
0xEE 个人信息
★★★★★★关于生活和技术的思考★★★★★★
微信公众账号:罗西的思考
如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,敬请关注。
0xFF 参考
Scaling model training in PyTorch using distributed data parallel
A developer-friendly guide to mixed precision training with PyTorch
It’s 2020, why isn’t deep learning 100% on the cloud yet?
到了2020年,为什么还不可以在云上进行100%的深度学习?
在 Amazon SageMaker 管道模式下使用 Horovod 实现多 GPU 分布式训练