分布式通信与并行策略
1.通信方式
1.1 Broadcast

1.2 AllGather

1.3 AllReduce

1.4 Reduce

1.5 ReduceScatter



目前比较常用的就是一个ring结构;reduceScater遵循以下规则,假设有n 个rank,首先会将每个rank上的tensor,切分成个块,紧接着将其按照如下规则,每一个时间步,都按以下规则发送和接收块tensor,并且接收的块做reduce,最终rank_i得到了第(rank_i+1)%num_ranks的结果。
规则:
第 k 步 (k 从 0 到 N-2, 这里 N=4):
- 发送:将本地缓冲区的第
(rank - k) % N块发送给下一个邻居(rank+1)。 - 接收:从上一个邻居(
rank-1)接收一个数据块,并把这个数据块与本地缓冲区的第(rank - k - 1) % N块相加(Reduce)。
1.6 总结

这里怎么理解AllReduce = ReduceScatter + AllGather,主要是因为ReduceScatter 把每张卡的值汇总起来,做reduce(可能是sum,average等),然后把结果分成rank个sub_result部分,分别给每个rank,而AllGather再把这rank个sub_result给聚合起来到每个rank上,则每个rank都有完整的result。其机制就跟直接做allreduce一样,对每个rank的数据汇总做reduce,然后把result分到每个rank上。
reduce_scatter使得每个rank_i获得了第(rank_i+1)%num_ranks的结果。这是需要n-1步,然后all gather需要把每张卡上reduce的结果,分发到其它卡,再走n-1步,所以一共的通信步是2*(n-1)。
2. Paddle三类通信区分动手,动半,静态全自动

同时,在动手的案例中不会出现dist_tensor,shard这些东西,因为这是动半特有的,对通信进行标记使用shard,而tensor被shard处理后,即变成了dist_tensor。
3.分布式并行方式
分布式并行方式包括:数据并行(DP),张量并行(tp),流水并行(pp),专家并行(ep),序列并行(sp),上下文并行(Context Parallel)。
3.1 基础并行层
3.1.1 ColumnParallelLayer与RowParallelLayer同时使用的关系
ColumnParallelLayer

RowParallelLayer

可以看到,RowParallelLayer在计算的过程中,需要把输入拆分成两列分别在两张卡上做计算,最终两张卡都得到Parital状态的数据,而如果上一层是ColumnParallel则其计算的结果刚好分配到两个设备上(即结果被按列切分),而此结果正是RowParallelLayer需要的输入,那么就无需做通信,直接继续计算最后再做allreduce即可。
3.1.2 ColumnParallelLayer与RowParallelLayer的w和bias的切分

注意,在做y=x*W^T+b的计算时,首先乘积得到的数据是[batchsize,output_size],每一行表示一个数据,而bias是分别和每一行相加,因此bias是一个一维的向量,因此,当W按列切分时,bias需要按行切分,从而保持正确的计算关系。
当添加了bias的时候,做RowParallelLayer和ColumnParallelLayer情况如下:
RowParallelLayer:

RowParallelLayer只切w,不切bias
ColumnParallelLayer:

ColumnParallelLayer切w的axis=1,切bias的axis=0
3.1.3 Vocab Parallel Embedding
Vocab Embedding
示例:
文本输入
用户输入: "Hello world, how are you?"
分词(Tokenization)
分词结果: ["Hello", "world", ",", "how", "are", "you", "?"]
词汇表映射(Vocabulary Mapping)
词汇表: {"<PAD>": 0, "<UNK>": 1, "<BOS>": 2, "<EOS>": 3,
"Hello": 4, "world": 5, ",": 6, "how": 7, "are": 8, "you": 9, "?": 10, ...}
映射结果: [4, 5, 6, 7, 8, 9, 10]
输入到模型为词汇ID序列
模型接收的输入: x = [4, 5, 6, 7, 8, 9, 10] (词汇ID序列)
因此,Vocab Embedding接收到的输入x是[batch_size,seq_length],即多组词汇ID序列。Vocab Embedding的大小为(vocab_size,hidden_dim),vocab_size这个维度即表示了所有token的ID,根据token_id找到对应的行,该行所有的列则表示当前这个token的向量化表示,即包含其数学维度信息的特征向量。初始时,这些数据是没有意义的,经过训练后,则可以表示每个词的空间特征,越相近的词,在空间上离得越近。
根据 x,在vocab表中查找对应token的向量化表示,最终输出为[batch_size,sequence_lenth,hidden_dim]。
.png)
如图,以batch_size=1为例,查表后,通过当前输入的sequence_lenth中存在的token_ids,选择k行,最终构成[batch_size,sequence_lenth,hidden_dim]的输出。
反向的grad形状与forward输出时形状相同,而因为Vocab_Embedding的操作是从整个词表中选出k行数据,因此最后更新grad的时候,也是指有这k行数据有grad,而其它行均默认为0,若有相同token_id,则做梯度累加。
Vocab Parallel Embedding

如果是词表并行,则按sequence_lenth做切分,多路并行时,在forward阶段需要做all_reduce,因为只在本rank的词表查找,若查找到,则返回词表的这一行值,即当前token的向量化表示;若超过此范围,则返回的是默认值0。all_reduce之后,每个rank都能拿到全部token的向量化表示。反向梯度更新的时候无需通信,直接选当前rank的词表范围内的梯度进行累积,并更新词表对应行的数据即可。
3.2 数据并行。

1通信为了让数据并行模型参数保持一致,3通信是为了让梯度累加;在反向过程中,本层的∇x梯度计算完毕后,可以继续计算下一层的∇x梯度,而不需要等待∇w的梯度计算,当对∇x梯度做allreduce通信时,可以做∇w的梯度计算;而做∇x梯度的计算时,又可以做∇w的allreduce通信,从而达到通信与计算重叠。
注意: 数据并行分两种,上面是DDP,即Distribute_DP是基于多进程实现的,还有一种是DP,是单进程多线程实现,如下图所示,它使用一个进程来计算模型权重。

注意: 二者效果是等价的,因为假设我们DP实现batchsize为64的数据量,对于DP来说,它会在计算loss时,汇总64条数据的output,然后和lable计算出loss,并得到一个平均的loss,再传给各GPU进行反向传播,此时每个grad就是平均之后的,即除以了64的。而DDP 则分给两个GPU各自去计算梯度,每个GPU计算的梯度也是mean的,不过是除以32的,所以要再allreduce一下,两个相加除以2,即同样是除以64。
补充说明:ZeRo(DeepSpeed,微软提出) 以及FSDP(Pytorch最新数据并行方案,ZeRo3和FSDP思想等价)
Group Sharded:
(这里存在一个组shard的概念,即按param的属性,将其划分到一个group,这些param在group里面同步和通信,与其它group相对独立,可以看一下记录的Dynamic optimizer shardingV1,V2的Fused_Buffer的概念)

ZeRO-1(pytorch:OSS):
ZeRO-1没有将模型本身进行分片,也没有将Gradient进行分片,而是只将优化器进行分片。训练过程与DDP类似。
- forward过程由每个rank的GPU独自完整的完成,然后进行backward过程。在backward过程中,梯度通过allReduce进行同步。
- Optimizer state 使用贪心策略基于参数量进行分片,以此确保每个rank几乎拥有相同大小的优化器内存。
- 每个rank只负责更新当前优化器分片的部分,由于每个rank只有分片的优化器state,所以当前rank忽略其余的state。
- 在更新过后,通过广播或者allGather的方式确保所有的rank都收到最新更新过后的模型参数。
ZeRO-1 非常适合使用类似Adam进行优化的模型训练,因为Adam拥有额外的参数m(momentum)与v(variance),特别是FP16混合精度训练。ZeRO-1 不适合使用SGD类似的优化器进行模型训练,因为SGD只有较少的参数内存,并且由于需要更新模型参数,导致额外的通讯成本。ZeRO-1只是解决了Optimizer state的冗余。
ZeRO-2(pytorch:SDP):
相比于ZeRO-1,ZeRO-2除了对optimizer state进行切分,还对Gradient进行了切分。
像ZeRO-1一样将optimizer的参数进行分片,并安排在不同的rank上。在backward过程中,gradients被reduce操作到对应的rank上,取代了all-reduce,以此减少了通讯开销。 每个rank独自更新各自负责的参数。在更新操作之后,广播或allGather保证所有的ranks接收到更新后的参数。
ZeRO-3(pytorch:FSDP):
为了进一步节省更多的内存,ZeRO-3提出进行模型参数的分片。类似以上两种分片方式,ranks负责模型参数的切片。可以进行参数切片的原因主要有以下两点:
- All-Reduce操作可以被拆分为Reduce与allgather操作的结合。
- 模型的每一层拥有该层的完整参数,并且整个层能够直接被一个GPU装下。所以计算前向的时候,除了当前rank需要的层之外,其余的层的参数可以抛弃。从这个层面上来说,Zero相当于数据并行+模型并行。
FSDP(Fully Sharded Data Parallel):
FSDP 是一种新型数据并行训练方法,但与传统的数据并行不同,传统的数据并行维护模型参数、梯度和优化器状态的每个 GPU 副本,而 FSDP 将所有这些状态跨数据并行工作线程进行分片,并且可以选择将模型参数分片卸载到 CPU。
下图显示了 FSDP 如何在 2 个数据并行进程中工作流程:

通常,模型层以嵌套方式用 FSDP 包装,因此,只有单个 FSDP 实例中的层需要在前向或后向计算期间将完整参数收集到单个设备。 计算完成后,收集到的完整参数将立即释放,释放的内存可用于下一层的计算。 通过这种方式,可以节省峰值 GPU 内存,从而可以扩展训练以使用更大的模型大小或更大的批量大小。 为了进一步最大化内存效率,当实例在计算中不活动时,FSDP 可以将参数、梯度和优化器状态卸载到 CPU。
3.3 张量并行(TP,MP)

1.初始化:
各卡训练参数为partial状态,即部分参数,对于切分的参数,分别初始化;对于没有切分的参数,初始化后同步为一致的参数(此时需要通信同步)。
2.Forward
各卡的网络使用切分后的网络,即网络层数不变,但shape为切分后的shape。如果是column parallel,则得到的每个rank的output也是经过了column parallel的output,因此需要做allgather,将输出拼接;而如果是row paralle,则得到的每个rank的output,形状与output相同,但数据属于partial状态,需要做all-reduce获得完整张量。
看上图,假设Linear1是ColumnParallel,Linear2是RowParallel,则列切计算出来后的结果,可以直接作为行切的输入(如果tp数相同,即Y1,Y2),因此无需通信(注意,若此时下一层做全参数计算,而不是RowParallel,则需要做all-gather聚合输出,拼接成完整的输出Y),而Linear2的结果要作为完整参数输入到Linear3(和单卡视角对齐),因此需要对Z1,Z2做allreduce,得到完整输入Z。这里需要做一次通信。
3.Backward
在反向传播过程中,因为Linear3以后的层都是完整参数,所以纯TP下,反向计算的梯度都是一样的,无需通信,
Linear2的反向梯度计算:
完整参数下:
$$ \frac{\partial L}{\partial B}={Y^T}*\frac{\partial L}{\partial Z} $$
$$ \frac{\partial L}{\partial Y}=\frac{\partial L}{\partial Y}*B^T $$
而由于卡1和卡2分别占据部分参数B1、B2,因此只需要B1、B2部分参数分别对应的梯度即可(Y1,Y2分别是卡1,卡2此时的输入,因此分别计算自己卡上的数据,无需通信):
即$$ \frac{\partial L}{\partial B_1}=\frac{\partial L}{\partial Z}\frac{\partial Z}{\partial B_1}=\frac{\partial L}{\partial Z}\frac{\partial Y_1B_1}{\partial B_1}=Y_1^T\frac{\partial L}{\partial Z} $$
同理,B2的梯度也是,所以求B1的梯度无需通信。
对于输入Y,由于对称性,可知:
$$ \frac{\partial L}{\partial Y_1}=\frac{\partial L}{\partial Z}\frac{\partial Z}{\partial Y_1}=\frac{\partial L}{\partial Z}\frac{\partial Y_1B_1}{\partial Y_1}=\frac{\partial L}{\partial Z}B_1^T $$
Y2同理,此时计算出来的梯度无需通信,因为Y1,Y2即分别对应rank0,rank1的上一层Linear1的输出,上一层也是切分状态,所以,求输入的梯度Y1,Y2也无需通信。注意:若上一层为完成的参数做forward,即rank0,rank1输入都是完整的Y,则backward时,需要做allgather将Y1_grad,Y2_grad做聚合
Linear1的反向梯度计算:
X*A1=Y1,X*A2=Y2
则$$ \frac{\partial L}{\partial A1}=X^T*\frac{\partial L}{\partial Y1} $$
每个rank都有全量X,因此也无需通信。
但是,如果X需要继续传播梯度:
对于rank1:$$ \frac{\partial L}{\partial X}=\frac{\partial L}{\partial Y1}*A_1^T $$
对于rank2:$$ \frac{\partial L}{\partial X}=\frac{\partial L}{\partial Y1}*A_2^T $$
此时两个rank的$$ \frac{\partial L}{\partial X} $$是partial的状态,因此需要做allreduce,才能得到X的完整梯度。如果要继续往下传递grad,且接下来的参数都是未切分的,则这里需要做all-reduce通信合并成完整梯度。
4.Optimizer
每张卡获得grad后,在optimizer中进行参数更新,无需通信。
3.4 流水并行(PP)

流水并行是在模型层间实现并行,以层为粒度将不同的层和参数划分到不同卡上并行计算。如上图所示,将Linear1和Linear2切分到0号卡,Linear3和Relu切分到1号卡。流水并行钟,loss层和准确率计算都在前向计算的最后1卡上,也仅有这张卡上能获取到loss值。
同样进行通信分析:
1.各卡的训练参数按流水线切分后的参数分配,分别初始化,不需要通信。
2.各卡的网络使用切分后的网络,在前向计算中,当前卡计算完成后的数据要send到下一阶段的卡上,同时需要从其它卡通过recv接收数据。此时需要通信。
3.反向计算过程与前向一样,需要发送和接收grad数据,因此也需要通信。
4.由于optimizer不做切分,因此各卡都有完整优化器,用于更新参数,无需通信。
流水并行的基本示意图如下:

而为了优化流水线并行中设备的计算效率,可以进一步将mini-batch切分成粒度更小的micro-batch,来提升流水并行的并发度,进而达到提升设备利用率和计算效率的目的。如下是以更小粒度的micro-batch做编排:

上图这样的方法是最基础的micro-batch方法