CUDA-MODE课程笔记|GPU集合通信(NCCL)

↑ 点击蓝字 关注极市平台
作者丨BBuf
来源丨GiantPandaCV
编辑丨极市平台

极市导读

 

本文详细介绍了NVIDIA的NCCL库,包括其在分布式深度学习中的应用,特别是如何通过PyTorch DDP实例实现高效的梯度同步。文章还讲解了NCCL的基本概念、API使用、通信器初始化方式,并深入分析了Ring AllReduce算法的工作原理,提供了对NCCL库的全面理解。 >>加入极市CV技术交流群,走在计算机视觉的最前沿

我的课程笔记,欢迎关注:https://github.com/BBuf/how-to-optim-algorithm-in-cuda/tree/master/cuda-mode 。

这节课介绍了NVIDIA的NCCL(NVIDIA Collective Communications Library)通信库,重点讲解了其在分布式深度学习中的应用。首先通过PyTorch DDP的实例,展示了NCCL如何实现高效的梯度同步。接着介绍了下NCCL的基本概念、API使用、通信器初始化方式,并深入分析了Ring AllReduce算法的工作原理。

第17课,GPU集合通信(NCCL)

课程笔记

这张Slides介绍了 NVIDIA 的 NCCL (NVIDIA Collective Communications Library) 通信库,它是一个专门用于 GPU 之间快速数据通信的库,支持点对点和集体通信两种模式,提供了包括 Scatter、Gather、All-to-all、AllReduce、Broadcast、Reduce、AllGather 和 ReduceScatter 等多种通信原语,Slides下方的图展示了 AllGather 操作的工作流程,然后在上方展示了一下Broadcast和Scatter的示意图。

这张Slides简单展示了一下nccl AllReduce(Reduce Sum)的操作。图片分为”Before”和”After”两个部分,显示了在3个GPU(GPU 0、GPU 1和GPU 2)上的数据处理过程。在初始状态下,每个GPU都包含3个不同的数据块(GPU 0有A、B、C;GPU 1有D、E、F;GPU 2有G、H、I)。经过AllReduce操作后,每个GPU都得到了相同位置数据的总和(即A+D+G、B+E+H、C+F+I),这样三个GPU最终都具有相同的计算结果。

这张Slides讲了一下DDP里面需要nccl的地方,也就是同步全局梯度的时候。具体来说,在这个例子中,数据被分成两部分(x₀和x₁)分别在两个GPU上处理。每个GPU运行相同的模型,计算各自的局部梯度(Local Gradients),然后通过NCCL的AllReduce操作来同步和平均所有GPU上的梯度。最后,每个GPU使用这个平均梯度来更新自己的模型参数,确保所有GPU上的模型保持同步。

这张Slides更具体了一些,用一个 y = w * 7 * x 的例子,展示了 DDP 里面同步梯度的时候,如何使用 NCCL 的 AllReduce 操作来同步和平均所有 GPU 上的梯度。这个例子作者也提供了一个代码,代码如下:

# modified from https://pytorch.org/tutorials/intermediate/ddp_tutorial.html  

import torch  
import torch.distributed as dist  
import torch.nn as nn  
from torch.profiler import profile  

from torch.nn.parallel import DistributedDataParallel as DDP  

# 定义一个简单的玩具模型类  
class ToyModel(nn.Module):  
    def __init__(self):  
        super(ToyModel, self).__init__()  
        # 定义一个可训练参数w,初始值为5.0  
        self.w = nn.Parameter(torch.tensor(5.0))  

    def forward(self, x):  
        # 前向传播: y = w * 7 * x  
        return self.w * 7.0 * x  

def demo_basic():  
    # 初始化进程组,使用NCCL后端  
    dist.init_process_group("nccl")  
    # 获取当前进程的rank  
    rank = dist.get_rank()  
    print(f"Start running basic DDP example on rank {rank}.")  

    # 创建模型实例并移到对应GPU  
    model = ToyModel().to(rank)  
    # 用DDP包装模型  
    ddp_model = DDP(model, device_ids=[rank])  

    # 使用PyTorch profiler收集性能数据  
    with profile() as prof:  
        # 创建输入张量,值为当前进程的rank  
        x = torch.tensor(dist.get_rank(), dtype=torch.float)  
        # 前向传播  
        y = ddp_model(x)  
        # 打印计算结果  
        print(f"rank {rank}: y=w*7*x: {y.item()}={ddp_model.module.w.item()}*7*{x.item()}")  
        # 打印关于w的导数  
        print(f"rank {rank}: dy/dw=7*x: {7.0*x.item()}")  
        # 反向传播  
        y.backward()  
        # 打印经过AllReduce后的梯度  
        print(f"rank {rank}: reduced dy/dw: {ddp_model.module.w.grad.item()}")  
    # rank 0负责导出性能跟踪文件  
    if rank == 0:  
        print("exporting trace")  
        prof.export_chrome_trace("trace_ddp_simple.json")  
    # 清理进程组  
    dist.destroy_process_group()  

if __name__ == "__main__":  
    print("Running")  
    demo_basic()  

# torchrun --nnodes=1 --nproc_per_node=2 --rdzv_id=100 --rdzv_backend=c10d --rdzv_endpoint=localhost:29400 ddp_simple.py  

接着作者给出了一个稍微完善一些的例子,由Linear和ReLU组成的网络,有optimizer更新参数的过程,代码如下:

# modified from https://pytorch.org/tutorials/intermediate/ddp_tutorial.html  

import torch  
import torch.distributed as dist  
import torch.nn as nn  

from torch.nn.parallel import DistributedDataParallel as DDP  
from torch.profiler import profile  
import torch.optim as optim  

SIZE = 4000  

class ToyModel(nn.Module):  
    def __init__(self):  
        super(ToyModel, self).__init__()  
        self.net1 = nn.Linear(SIZE, SIZE)  
        self.relu = nn.ReLU()  
        self.net2 = nn.Linear(SIZE, SIZE)  
        self.net3 = nn.Linear(SIZE, SIZE)  

    def forward(self, x):  
        return self.net3(self.relu(self.net2(self.relu(self.net1(x)))))  

def demo_basic():  
    dist.init_process_group("nccl")  
    rank = dist.get_rank()  
    print(f"Start running basic DDP example on rank {rank}.")  

    model = ToyModel().to(rank)  
    ddp_model = DDP(model, bucket_cap_mb=25, device_ids=[rank])  

    loss_fn = nn.MSELoss()  
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)  

    with profile(  
        record_shapes=True,  
        activities=[  
            torch.profiler.ProfilerActivity.CPU,  
            torch.profiler.ProfilerActivity.CUDA,  
        ],  
    ) as prof:  
        for i in range(10):  
            optimizer.zero_grad()  
            outputs = ddp_model(torch.randn(1000, SIZE, device=rank))  
            labels = torch.randn(1000, SIZE, device=rank)  
            loss_fn(outputs, labels).backward()  
            optimizer.step()  
    if rank == 0:  
        prof.export_chrome_trace("trace_ddp_example.json")  

if __name__ == "__main__":  
    demo_basic()  

# torchrun --nnodes=1 --nproc_per_node=2 --rdzv_id=100 --rdzv_backend=c10d --rdzv_endpoint=localhost:29400 ddp_example.py  

作者分析了几分钟这个代码中一个iter的pytorch profiler结果,我们可以看到前向Pass,反向Pass,优化器更新参数,以及AllReduce的通信时间以及部分AllReduce被重叠到了反向计算中。这就引入到了下一张slides。

这里作者讲了一下DDP里面的AllReduce是怎么和Backward Pass重叠的,这个建议阅读这篇博客:https://zhuanlan.zhihu.com/p/485208899 ,从这张Slides的PyTorch Profiler图我们也可以发现一些其它信息,例如在同一个Stream上的kernel是顺序执行,所以为了重叠计算和通信这里使用了两个Stream。由于网络最开始的几个层必须等待梯度计算完毕才能开始AllReduce,所以存在无法重叠的层。

这张Slides提了一下yTorch DDP的内部机制,包括:

  • DDP的梯度同步机制:

    • 使用 autograd hooks 在构建时注册,用于触发梯度同步
    • Reducer 组件会异步执行 allreduce 操作来计算所有进程间的梯度平均值
    • 计算完成后,平均后的梯度会被写入所有参数的 param.grad 字段
    • 在反向传播完成后,不同 DDP 进程中相同参数的梯度值应该是一致的
  • 通信后端支持:

    • NCCL
    • MPI
    • Gloo
    • DDP 支持多种通信后端,包括:
  • 具体实现:

    • NCCL API 的调用是在 PyTorch 的 ProcessGroupNCCL.cpp 文件中通过 Reducer 完成的

这张Slides开始介绍NCCL库中的nccl AllReduce API函数。该函数用于对长度为count的数据数组进行规约(reduce)操作,使用指定的op操作符进行计算,并将相同的结果复制到每个recvbuff中。当sendbuff和recvbuff指向相同位置时,会执行原地操作。这是一个在分布式深度学习中常用的集合通信操作,用于在多个GPU之间同步和聚合数据。

这张Slides介绍了NCCL通信器对象的两种使用场景:一种是每个CPU进程对应一个GPU的情况,此时root进程会生成唯一ID并广播给所有进程,所有进程用相同的ID和唯一的rank初始化通信器例如MPI;另一种是单个CPU进程管理多个GPU的情况,这时不需要广播ID,而是通过循环来初始化每个rank,并可以使用封装好的ncclCommInitAll函数来简化这个过程。Slides右侧的代码示例展示了这些初始化操作的具体实现方式。

这张Slides展示了错误处理宏定义

#define CUDACHECK(cmd) {                      
    cudaError_t err = cmd;                    
    if (err != cudaSuccess) {                
        printf("Failed: Cuda error %s:%dn",  
            __FILE__,__LINE__,cudaGetErrorString(err));  
        exit(EXIT_FAILURE);                 
    }  
}  

#define NCCLCHECK(cmd) {                      
    ncclResult_t res = cmd;                 
    if (res != ncclSuccess) {               
        printf("Failed: NCCL error %s:%dn",  
            __FILE__,__LINE__,ncclGetErrorString(res));  
        exit(EXIT_FAILURE);                 
    }  
}  

这部分定义了两个错误处理宏:

  • CUDACHECK: 用于检查CUDA API调用的错误
  • NCCLCHECK: 用于检查NCCL操作的错误
int main(int argc, char* argv[]) {  
    ncclComm_t comms[4];  

    //管理4个设备  
    int nDev = 4;  
    int size = 32*1024*1024;  
    int devs[4] = { 0, 1, 2, 3 };  

    //分配和初始化设备缓冲区  
    float** sendbuff = (float**)malloc(nDev * sizeof(float*));  
    float** recvbuff = (float**)malloc(nDev * sizeof(float*));  
    cudaStream_t* s = (cudaStream_t*)malloc(sizeof(cudaStream_t)*nDev);  

这里的代码创建了NCCL通信器数组,设置4个GPU设备,定义数据大小(32MB),分配发送和接收缓冲区的内存并为每个设备创建CUDA流。然后还有下面的循环

for (int i = 0; i < nDev; ++i) {  
    CUDACHECK(cudaSetDevice(i));  
    CUDACHECK(cudaMalloc(sendbuff + i, size * sizeof(float)));  
    CUDACHECK(cudaMalloc(recvbuff + i, size * sizeof(float)));  
    CUDACHECK(cudaMemset(sendbuff[i], 1, size * sizeof(float)));  
    CUDACHECK(cudaMemset(recvbuff[i], 0, size * sizeof(float)));  
    CUDACHECK(cudaStreamCreate(s+i));  
}  

这个循环给每个GPU设置当前设备,然后分配发送和接收缓冲区的GPU内存,初始化发送缓冲区为1,接收缓冲区为0,最后为每个设备创建CUDA流。

//初始化NCCL  
NCCLCHECK(ncclCommInitAll(comms, nDev, devs));  

//调用NCCL通信API  
NCCLCHECK(ncclGroupStart());  
for (int i = 0; i < nDev; ++i)  
    NCCLCHECK(ncclAllReduce((const void*)sendbuff[i], (void*)recvbuff[i], size, ncclFloat, ncclSum,  
        comms[i], s[i]));  
NCCLCHECK(ncclGroupEnd());  

//同步CUDA流等待NCCL操作完成  
for (int i = 0; i < nDev; ++i) {  
    CUDACHECK(cudaSetDevice(i));  
    CUDACHECK(cudaStreamSynchronize(s[i]));  
}  

这部分代码展示了初始化NCCL通信器,执行AllReduce操作(将所有设备的数据求和并分发给所有设备),最后同步所有CUDA流确保操作完成。

//释放设备缓冲区  
for (int i = 0; i < nDev; ++i) {  
    CUDACHECK(cudaSetDevice(i));  
    CUDACHECK(cudaFree(sendbuff[i]));  
    CUDACHECK(cudaFree(recvbuff[i]));  
}  

//终止NCCL  
for(int i = 0; i < nDev; ++i)  
    ncclCommDestroy(comms[i]);  

最后进行资源清理包括释放GPU上分配的内存,销毁NCCL通信器。上面4张slides放在一起展示了一个如何在单个进程中使用NCCL进行AllReduce操作。

这张Slides展示了”每个CPU进程一个GPU”的场景下的实现。代码有以下步骤:

  • 获取NCCL唯一ID并在所有进程间广播
  • 基于本地rank选择GPU并分配设备缓冲区
  • 初始化NCCL通信器
  • 使用NCCL执行AllReduce集合通信操作(从代码可以看到是每个rank都发起了这个操作)
  • 同步CUDA流来完成NCCL操作

实际上这个例子对应的就是PyTorch Distributed Data Parallel里面的AllReduce操作,而上面的Single Process的例子对应的就是PyTorch Data Parallel里面的AllReduce操作。

这里展示了一下环状的AllReduce算法的原理,它由两个操作组成:

  • ReduceScatter 操作: 输入数据分布在不同的 rank (进程/节点) 上 (rank 0 到 rank 3);每个 rank 负责对一部分数据进行规约(reduction)操作;规约结果被分散到不同的 rank 上;图中显示 out[i] = sum(in[j]^count+i))
  • AllGather 操作: 在 ReduceScatter 之后执行;每个 rank 将自己的部分结果广播给其他所有 rank;最终每个 rank 都获得完整的规约结果;图中显示 out[Ycount+i] = in[Y][i]

这张Slides截了一下Ring Allreduce的cuda代码实现,可以粗略的浏览一下代码:

// Ring AllReduce算法实现 (结合了ReduceScatter和AllGather操作)  
template<typename T, typename RedOp, typename Proto>  
__device__ __forceinline__ void run(ncclWorkElem *args) {  
    const int tid = threadIdx.x;      // 获取当前线程ID  
    const int nthreads = args->nWarps*WARP_SIZE;  // 计算总线程数  
    const int bid = args->bid;        // 获取块ID  
    const int nChannels = args->nChannels;  // 获取通道数  
    ncclRing *ring = &ncclShmem.channel.ring;  // 获取环形通信结构的指针  
    int ringIx = ring->index;         // 获取环形索引  

    // 计算每步处理的数据块大小  
    const size_t chunkSize = int(Proto::calcBytePerStep()/sizeof(T)) * (Proto::Id == NCCL_PROTO_SIMPLE ? ALLREDUCE_CHUNKSTEPS : 1));  
    const int nranks = ncclShmem.comm.nRanks;  // 获取总进程数  
    const size_t loopSize = nChannels*nranks*chunkSize;  // 计算循环大小  
    const size_t size = args->count;  // 获取需要处理的总数据量  

    int minChunkSize;  // 最小数据块大小  
    if (Proto::Id == NCCL_PROTO_LL) {  
        // LL协议下计算最小数据块大小  
        minChunkSize = nthreads*(Proto::calcBytePerGrain()/sizeof(T));  
    }  
    if (Proto::Id == NCCL_PROTO_LL128) {  
        // LL128协议下的特殊处理  
        // 注释说明这里的除2可能是个bug,但能提高性能  
        minChunkSize = nthreads*(Proto::calcBytePerGrain()/sizeof(T))/2;  
    }  

    // 使用Primitives模板类处理规约操作  
    Primitives<T, RedOp, FanSymmetric<1>, Proto, 0> prims  
        (tid, nthreads, &ring->prev, &ring->next, args->sendbuff, args->recvbuff, args->redOpArg);  
}  
// Ring AllReduce实现 (ReduceScatter + AllGather)  
for (size_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {  
    size_t realChunkSize;  

    // 处理NCCL协议简单模式  
    if (Proto::id == NCCL_PROTO_SIMPLE) {  
        // 计算实际的chunk大小,考虑网格偏移和通道数  
        realChunkSize = min(chunkSize, divide(size-gridOffset, nChannels*nranks));  
        // 根据线程数和数据类型大小调整chunk大小  
        realChunkSize = roundUp(realChunkSize, (nthreads*WARP_SIZE)*sizeof(uint64_t)/sizeof(T));  
    } else {  
        // 非简单模式下的chunk大小计算  
        realChunkSize = min(chunkSize, divide(size-gridOffset, nChannels*nranks*minChunkSize));  
        realChunkSize = int(realChunkSize);  
    }  

    // 计算每个chunk的偏移量  
    auto calcOffset = [&]__device__(int chunk)->size_t {  
        if (Proto::id == NCCL_PROTO_SIMPLE)  
            return gridOffset + bid*nranks*realChunkSize + chunk*realChunkSize;  
        else  
            return gridOffset + (chunk*nChannels + bid)*realChunkSize;  
    };  

    // 计算每个rank的修改位置  
    auto modRanks = [&]__device__(int r)->int {  
        return r >= nranks ? r-nranks : r;  
    };  

    // 声明变量  
    size_t offset;  
    int nelem;  
    int chunk;  

    // step 0: 将数据推送到下一个GPU  
    chunk = modRanks(ringIx + nranks-1);  // 计算chunk索引  
    offset = calcOffset(chunk);           // 计算偏移量  
    nelem = min(realChunkSize, size-offset); // 计算元素数量  
    prims.send(offset, nelem);           // 发送数据  
}  

这几张Slides展示了Ring AllReduce(环形全规约)算法的工作原理,它是通过组合ReduceScatter和AllGather两个操作来实现的。第一张Slides的图展示了初始状态:

  • 有3个GPU (GPU 0, 1, 2)
  • 每个GPU上有3个数据块(A/B/C, D/E/F, G/H/I)

第二张Slides的图展示了数据传输的模式:

  • 数据以环形方式在GPU之间传递
  • GPU 0 向 GPU 1 传输
  • GPU 1 向 GPU 2 传输
  • GPU 2 回传到 GPU 0,形成一个环
// k-2步: 执行规约操作并将结果复制到下一个GPU  
for (int j=2; j<nranks; ++j) {  
    // 计算当前需要处理的数据块索引  
    // ringIx是当前GPU的索引,通过模运算确保索引在有效范围内  
    chunk = modRanks(ringIx + nranks-j);  

    // 根据chunk计算在缓冲区中的偏移量  
    offset = calcOffset(chunk);  

    // 计算本次需要传输的实际元素数量  
    // 取实际块大小和剩余大小中的较小值,避免越界  
    nelem = min(realChunkSize, size-offset);  

    // 执行接收-规约-发送操作  
    // 从上一个GPU接收数据,与本地数据进行规约,然后发送给下一个GPU  
    prims.recvReduceSend(offset, nelem);  
}  

这里展示了Ring AllReduce 第k-1步做的事:

// step k-1: 在当前GPU上规约缓冲区和数据  
// 规约结果将存储在当前数据中并传送到下一个GPU  

// 计算当前要处理的数据块索引  
// ringIx 是环形通信中的索引位置  
chunk = ringIx + 0;  

// 根据chunk计算在内存中的偏移量  
// 用于确定数据在缓冲区中的具体位置  
offset = calcOffset(chunk);  

// 计算本次需要处理的实际元素数量  
// realChunkSize: 标准块大小  
// size-offset: 剩余可处理的元素数量  
// 取两者的最小值以防止越界  
nelem = min(realChunkSize, size-offset);  

// 执行接收-规约-复制-发送操作  
// offset: 源数据偏移量  
// offset: 目标数据偏移量  
// nelem: 要处理的元素数量  
// true: postOp参数,表示是否执行后续操作  
prims.directRecvReduceCopySend(offset, offset, nelem, /*postOp=*/true);  

上面的过程实际上就对应了ReduceScatter操作。

这几张图涉及到的就是AllGather操作,只有数据复制,没有数据的Reduce操作。操作完成之后我们可以看到所有的rank上的数据都拥有一样的求和值。

这里提一些有趣的知识

  • 除了Ring Allreduce之外还有其它的AllReduce算法,如Tree AllReduce(树形归约)算法。可以参考https://developer.nvidia.com/blog/massively-scale-deep-learning-training-nccl-2-4/
  • 其他集体通信操作(Other Collectives)
  • 网络拓扑相关技术,包括NVLink、Infiniband/RoCE(提供了NVIDIA官方白皮书链接)以及IP网络
  • 集体操作原语(Collective Operation Primitives)

最后这张Slides介绍了 CUDA 中其它的集体操作原语(Collective Operations Prims),主要说明了 prims.send、prims.recvReduceSend 等函数是如何在 GPU 之间进行集体操作数据传输的。这些原语实现了三种不同的协议:Simple(简单协议)、LL(低延迟协议,8字节原子存储,4字节数据和4字节标志)以及 LL128(低延迟128位协议,128字节原子存储,120字节数据和8字节标志)。另外,AllReduce 操作通过组合3种算法和3种协议,总共可以有9种不同的运行方式,这些原语为 GPU 集群中的并行计算和数据通信提供了灵活的性能选择。

总结

这节课介绍了NVIDIA的NCCL(NVIDIA Collective Communications Library)通信库,重点讲解了其在分布式深度学习中的应用。首先通过PyTorch DDP的实例,展示了NCCL如何实现高效的梯度同步。接着介绍了下NCCL的基本概念、API使用、通信器初始化方式,并深入分析了Ring AllReduce算法的工作原理。

(文:极市干货)

欢迎分享

发表评论