NCCL源码解析⑦:机器间Channel连接

1,375次阅读
没有评论

NCCL源码解析⑦:机器间Channel连接
作者|KIDGINBROOK


上节中完成了单机内部的channel搜索,仍然以ringGraph为例的话,相当于在单台机器内部搜索出来了一系列的环,接下来需要将机器之间的环连接起来。


为了方便理解,假设两机十六卡的情况下第一台机器的一个ring为:


graph->intra: GPU/0 GPU/7 GPU/6 GPU/3 GPU/2 GPU/5 GPU/4 GPU/1graph->inter: NET/0 NET/0

第二个机器对应的ring为:


graph->intra: GPU/10 GPU/9 GPU/8 GPU/13 GPU/12 GPU/15 GPU/14 GPU/11graph->inter: NET/0 NET/0

allGather3Data用于rank间聚合channel的信息,ncclGraphInfo记录了环的信息,比如speed和type


struct ncclGraphInfo { int sameC++hannels; float speedIntra; float speedInter; int typeIntra; }; struct { int cudaCompCap; int fullCudaCompCap; int nChannels; struct ncclGraphInfo tree; struct ncclGraphInfo ring; struct ncclGraphInfo collNet; struct ncclTopoRanks topoRanks; } *allGather3Data; NCCLCHECK(ncclCalloc(&allGather3Data, nranks)); allGather3Data[rank].cudaCompCap = ncclCudaCompCap(); allGather3Data[rank].nChannels = comm->nChannels = treeGraph.nChannels = ringGraph.nChannels = std::min(treeGraph.nChannels, ringGraph.nChannels); ... allGather3Data[rank].ring.sameChannels = ringGraph.sameChannels; allGather3Data[rank].ring.speedIntra = ringGraph.speedIntra; allGather3Data[rank].ring.speedInter = ringGraph.speedInter; allGather3Data[rank].ring.typeIntra = ringGraph.typeIntra; ...

然后开始设置ncclTopoRanks,获取当前rank在ring中的prev和next,其中第一个rank的prev和最后一个rank的next为-1,如rank6的prev为7,next为3;获取当前ring的ringRecv和ringSend,即ring的第一个节点和最后一个节点,最后将搜索到的环复制了一遍,这里在官方issue中看到相关解释是为了进一步的并行以充分利用带宽。


struct ncclTopoRanks { int ringRecv[MAXCHANNELS]; int ringSend[MAXCHANNELS]; int ringPrev[MAXCHANNELS]; int ringNext[MAXCHANNELS]; int treeUpRecv[MAXCHANNELS]; int treeUpSend[MAXCHANNELS]; int treeDnRecv[MAXCHANNELS]; int treeDnSend[MAXCHANNELS];}; ncclResult_t ncclTopoPreset(struct ncclComm* comm, struct ncclTopoGraph* treeGraph, struct ncclTopoGraph* ringGraph, struct ncclTopoGraph* collNetGraph, struct ncclTopoRanks* topoRanks) { int rank = comm->rank; int localRanks = comm->localRanks; int nChannels = comm->nChannels; for (int c=0; c<nChannels; c++) { struct ncclChannel* channel = comm->channels+c; channel->ring.prev = channel->ring.next = -1; ... int* ringIntra = ringGraph->intra+c*localRanks; int* treeIntra = treeGraph->intra+c*localRanks; int* collNetIntra = collNetGraph->intra+c*localRanks; for (int i=0; i<localRanks; i++) { if (ringIntra[i] == rank) { topoRanks->ringRecv[c] = ringIntra[0]; topoRanks->ringSend[c] = ringIntra[localRanks-1]; channel->ring.prev = (i == 0) ? -1 : ringIntra[i-1]; channel->ring.next = (i == localRanks-1) ? -1 : ringIntra[i+1]; } ... } topoRanks->ringPrev[c] = channel->ring.prev; topoRanks->ringNext[c] = channel->ring.next; } // Duplicate channels rings/trees struct ncclChannel* channel0 = comm->channels; struct ncclChannel* channel1 = channel0+nChannels; memcpy(channel1, channel0, nChannels*sizeof(struct ncclChannel)); return ncclSuccess;}

然后通过bootstrapAllGather获取全局的allGather3Data信息,计算出当前rank所在的node保存在comm->node,以及每个node的第一个rank保存在nodesFirstRank,因此例子中:


nodesFirstRank[0]: 0nodesFirstRank[1]: 10


然后开始将每个机器的环首尾相连组成大环。


ncclResult_t ncclTopoPostset(struct ncclComm* comm, int* firstRanks, struct ncclTopoRanks** allTopoRanks, int* rings) { // Gather data from all ranks int *ringRecv, *ringSend, *ringPrev, *ringNext, *treeUpRecv, *treeUpSend, *treeDnRecv,*treeDnSend; int nranks = comm->nRanks; int nChannels = comm->nChannels; NCCLCHECK(ncclCalloc(&ringRecv, nranks*MAXCHANNELS)); NCCLCHECK(ncclCalloc(&ringSend, nranks*MAXCHANNELS)); NCCLCHECK(ncclCalloc(&ringPrev, nranks*MAXCHANNELS)); NCCLCHECK(ncclCalloc(&ringNext, nranks*MAXCHANNELS)); NCCLCHECK(ncclCalloc(&treeUpRecv, nranks*MAXCHANNELS)); NCCLCHECK(ncclCalloc(&treeUpSend, nranks*MAXCHANNELS)); NCCLCHECK(ncclCalloc(&treeDnRecv, nranks*MAXCHANNELS)); NCCLCHECK(ncclCalloc(&treeDnSend, nranks*MAXCHANNELS)); for (int i=0; i<nranks; i++) { for (int c=0; c<nChannels;c++) { ringRecv[c*nranks+i] = allTopoRanks[i]->ringRecv[c]; ringSend[c*nranks+i] = allTopoRanks[i]->ringSend[c]; ringPrev[c*nranks+i] = allTopoRanks[i]->ringPrev[c]; ringNext[c*nranks+i] = allTopoRanks[i]->ringNext[c]; treeUpRecv[c*nranks+i] = allTopoRanks[i]->treeUpRecv[c]; treeUpSend[c*nranks+i] = allTopoRanks[i]->treeUpSend[c]; treeDnRecv[c*nranks+i] = allTopoRanks[i]->treeDnRecv[c]; treeDnSend[c*nranks+i] = allTopoRanks[i]->treeDnSend[c]; } } // Connect rings and trees. This should also duplicate the channels. NCCLCHECK(connectRings(comm, ringRecv, ringSend, ringPrev, ringNext, firstRanks)); NCCLCHECK(connectTrees(comm, treeUpRecv, treeUpSend, treeDnRecv, treeDnSend, firstRanks)); // Duplicate ringPrev/ringNext for ncclBuildRing memcpy(ringPrev+nChannels*nranks, ringPrev, nChannels*nranks*sizeof(int)); memcpy(ringNext+nChannels*nranks, ringNext, nChannels*nranks*sizeof(int)); // Duplication should be complete now nChannels = comm->nChannels = std::min(MAXCHANNELS,nChannels*2); // Honor NCCL_MIN_NRINGS/NCCL_MAX_NRINGS. // We permit combining max, then min, to only use the first channels, then duplicate them. nChannels = comm->nChannels = std::min((int)ncclMaxNchannels(), nChannels); int c; for (c=nChannels; c<ncclMinNchannels(); c++) { memcpy(ringPrev+c*nranks, ringPrev+(c-nChannels)*nranks, nranks*sizeof(int)); memcpy(ringNext+c*nranks, ringNext+(c-nChannels)*nranks, nranks*sizeof(int)); memcpy(comm->channels+c, comm->channels+c-nChannels, sizeof(struct ncclChannel)); } nChannels = comm->nChannels = c; // Create rings array and check all is fine NCCLCHECK(ncclBuildRings(nChannels, rings, comm->rank, comm->nRanks, ringPrev, ringNext)); free(ringRecv); free(ringSend); free(ringPrev); free(ringNext); free(treeUpRecv); free(treeUpSend); free(treeDnRecv); free(treeDnSend); return ncclSuccess;}

这里将所有channel的prev,next,send,recv信息打平到数组中,例如recv[0]表示第一个ring中rank0的recv是哪个rank,然后开始计算当前机器第一个rank的prev和最后一个rank的next。


static ncclResult_t connectRings(struct ncclComm* comm, int* ringRecv, int* ringSend, int* ringPrev, int* ringNext, int* firstRanks) { int nChannels = comm->nChannels; int nNodes = comm->nNodes; for (int c=0; c<nChannels; c++) { int* recv = ringRecv+c*comm->nRanks; int* send = ringSend+c*comm->nRanks; int* prev = ringPrev+c*comm->nRanks; int* next = ringNext+c*comm->nRanks; struct ncclChannel* channel0 = comm->channels+c; struct ncclChannel* channel1 = channel0+nChannels; for (int n=0; n<nNodes; n++) { int recvRank = recv[firstRanks[n]]; int prevSendRank = send[firstRanks[(n-1+nNodes)%nNodes]]; prev[recvRank] = prevSendRank; if (comm->rank == recvRank) { channel0->ring.prev = prevSendRank; channel1->ring.prev = prevSendRank; } int sendRank = send[firstRanks[n]]; int nextRecvRank = recv[firstRanks[(n+1)%nNodes]]; next[sendRank] = nextRecvRank; if (comm->rank == sendRank) { channel0->ring.next = nextRecvRank; channel1->ring.next = nextRecvRank; } } TRACE(NCCL_GRAPH, "Ring %d : %d -> %d -> %d", c, channel0->ring.prev, comm->rank, channel0->ring.next); TRACE(NCCL_GRAPH, "Ring %d : %d -> %d -> %d", c+nChannels, channel1->ring.prev, comm->rank, channel1->ring.next); } return ncclSuccess;}

如上所示,当前机器recv rank的prev就是前一个机器的send rank,当前机器send rank的next就是下一个机器的recv rank。然后执行ncclBuildRings按照大环的顺序依次记录rank到rings。


ncclResult_t ncclBuildRings(int nrings, int* rings, int rank, int nranks, int* prev, int* next) { for (int r=0; r<nrings; r++) { char prefix[30]; int current = rank; for (int i=0; i<nranks; i++) { rings[r*nranks+i] = current; current = next[r*nranks+current]; } ... // Check that all ranks are there for (int i=0; i<nranks; i++) { int found = 0; for (int j=0; j<nranks; j++) { if (rings[r*nranks+j] == i) { found = 1; break; } } if (found == 0) { WARN("Error : ring %d does not contain rank %d", r, i); return ncclInternalError; } } } return ncclSuccess;}


还是以上述为例,其中rank6记录的rings的第一个大环为:

 

GPU/6 GPU/3 GPU/2 GPU/5 GPU/4 GPU/1 GPU/10 GPU/9 GPU/8 GPU/13 GPU/12 GPU/15 GPU/14 GPU/11 GPU/0 GPU/7


到这里就完成了机器之间大环建立,每个rank都知道自己的上一个和下一个rank是谁,那么就可以建立实际的通信链路了。


接下来每个rank都要为通信分配一些内存,为了提高性能,这里会在分配buffer之前设置cpu亲和性,使得分配的内存尽量是当前numa本地的。


cpu_set_t affinitySave; sched_getaffinity(0, sizeof(cpu_set_t), &affinitySave); NCCLCHECK(ncclTopoSetAffinity(comm->topo, comm->rank)); ncclResult_t ncclTopoSetAffinity(struct ncclTopoSystem* system, int rank) { struct ncclTopoNode* cpu = NULL, *gpu = NULL; for (int g=0; g<system->nodes[GPU].count; g++) { if (system->nodes[GPU].nodes[g].gpu.rank == rank) { gpu = system->nodes[GPU].nodes+g; // Find closer CPU int cpuIndex = -1, minHops = 0; for (int c=0; c<system->nodes[CPU].count; c++) { int nHops = system->nodes[GPU].nodes[g].paths[CPU][c].count; if (cpuIndex == -1 || nHops < minHops) { cpuIndex = c; minHops = nHops; } } cpu = system->nodes[CPU].nodes+cpuIndex; } } if (cpu == NULL) { WARN("Set CPU affinity : unable to find GPU/CPU for rank %d", rank); return ncclInternalError; } // Query the CPU affinity set we were provided cpu_set_t mask; SYSCHECK(sched_getaffinity(0, sizeof(cpu_set_t), &mask), "sched_getaffinity"); // Get the affinity of the CPU close to our GPU. cpu_set_t cpuMask = cpu->cpu.affinity; cpu_set_t finalMask; if (ncclParamIgnoreCpuAffinity()) // Ignore the CPU affinity set and use the GPU one instead finalMask = cpuMask; else // Use a subset of the GPU affinity set CPU_AND(&finalMask, &mask, &cpuMask); // If there is a non empty set, use it to set affinity if (CPU_COUNT(&finalMask)) { char affinityStr[sizeof(cpu_set_t)*2]; NCCLCHECK(ncclCpusetToStr(&finalMask, affinityStr)); INFO(NCCL_INIT, "Setting affinity for GPU %d to %s", gpu->gpu.dev, affinityStr); SYSCHECK(sched_setaffinity(0, sizeof(cpu_set_t), &finalMask), "sched_setaffinity"); } return ncclSuccess;}


首先获取当前线程的cpu亲和性保存到affinitySave,分配好buffer之后会用affinitySave来恢复亲和性。


然后通过ncclTopoSetAffinity设置cpu亲和性,找到当前rank对应的cpu节点之后,可以获取到该cpu对应的core,即cpuMask,然后获取当前线程对应的亲和性,即mask,默认会取cpuMask和mask的交集finalMask,如果交集不为空的话,会将finalMask设置给当前线程。


struct ncclConnect { char data[CONNECT_SIZE];}; struct ncclConnect *connect; NCCLCHECKGOTO(ncclCalloc(&connect, 2), ret, affinity_restore); for (int c=0; c<comm->nChannels; c++) { struct ncclChannel* channel = comm->channels+c; NCCLCHECKGOTO(setupChannel(comm, c, rank, nranks, rings+c*nranks), ret, affinity_restore); if (comm->nRanks == 1) continue; NCCLCHECKGOTO(ncclTransportP2pSetup(comm, &ringGraph, channel, 1, &channel->ring.prev, 1, &channel->ring.next), ret, affinity_restore); ... }


然后简单看下ncclChannel数据结构,其中collectives保存了用户向nccl提交的通信操作,比如ncclSend,ncclRecv等都会向collectives里加一项,ncclColl则保存了这些操作对应的参数;collectives是一个环形队列,所以collStart指向了开始位置,collCount表示队列中操作数量;FifoHead和FifoTail用于协调kernel产出数据和NET发送数据,其实就是生产者消费者,ncclPeer保存了通信相关的信息,后续再具体介绍。


struct ncclRing { // Shortcuts for userRanks[1] and userRanks[n-1] int prev; // 记录环中当前rank的上一个rank int next; // 记录环中当前rank的下一个rank // Maps an internal nccl index to user-specified rank order. This is necessary // since we need to know how the user expects data to be ordered across // devices. Ordered from current device. int* userRanks; // 以当前rank为起点记录整个环 int* devUserRanks; // device断的userRanks}; struct ncclChannel { union { struct { struct ncclRing ring; struct ncclTree treeUp; struct ncclTree treeDn; struct ncclTree collTreeUp; struct ncclTree collTreeDn; int id; // Communication structures struct ncclPeer* peers; struct ncclPeer* devPeers; // Operation list for aggregation struct ncclColl* collectives; int collStart; int collCount; int collFifoHead; // Only used by GPU int collFifoTail; // Only used by CPU }; int data[0x80]; }; };


然后开始初始化channel,initChannel主要是buffer的分配,分配userRanks和devUserRanks,设置ncclPeer,分配collectives,因为host和device都会访问collectives这个数据结构,所以需要通过cudaHostAlloc分配host端的锁页内存,并通过flag cudaHostAllocMapped将其映射到cuda的地址空间。不过在uva系统上,cudaMallocHost,cudaHostAlloc + cudaHostAllocDefault以及cudaHostAlloc + cudaHostAllocMapped这三种方式没啥区别,host和device都可以访问。


ncclResult_t initChannel(struct ncclComm* comm, int channelid) { struct ncclChannel* channel = comm->channels+channelid; if (channel->id != -1) return ncclSuccess; channel->id = channelid; // Ring index to user rank table. NCCLCHECK(ncclCudaCalloc(&channel->ring.devUserRanks, comm->nRanks)); NCCLCHECK(ncclCalloc(&channel->ring.userRanks, comm->nRanks)); // Communication structures with peers. NCCLCHECK(ncclCudaCalloc(&channel->devPeers, comm->nRanks+1)); // The extra one rank is for collnet root (i.e. network) NCCLCHECK(ncclCalloc(&channel->peers, comm->nRanks+1)); for (size_t i=0; i<comm->nRanks+1; ++i) { channel->peers[i].send.comm = comm; channel->peers[i].recv.comm = comm; } // Per-channel operation list. NCCLCHECK(ncclCudaHostCalloc(&channel->collectives, NCCL_MAX_OPS)); return ncclSuccess;} template <typename T>static ncclResult_t ncclCudaHostCalloc(T** ptr, size_t nelem) { CUDACHECK(cudaHostAlloc(ptr, nelem*sizeof(T), cudaHostAllocMapped)); memset(*ptr, 0, nelem*sizeof(T)); return ncclSuccess; }


然后从当前rank为起点,将环写到userRanks。


static ncclResult_t setupChannel(struct ncclComm* comm, int channelId, int rank, int nranks, int* ringRanks) { TRACE(NCCL_INIT, "rank %d nranks %d", rank, nranks); NCCLCHECK(initChannel(comm, channelId)); struct ncclRing* ring = &comm->channels[channelId].ring; // Reorganize ranks to start with rank. int shift; for (shift = 0; shift<nranks; shift++) { if (ringRanks[shift] == rank) { break; } } for (int i=0; i<nranks; i++) { ring->userRanks[i] = ringRanks[(i+shift)%nranks]; } return ncclSuccess;}


然后执行ncclTransportP2pSetup建立当前rank和prev,next的通信链路。


到这里就完成了机器之间channel的连接,下节会了解到通信链路的建立过程。


(本文经授权后由OneFlow发布。原文:https://blog.csdn.net/KIDGIN7439/article/details/128144057)


其他人都在看

试用OneFlow: github.com/Oneflow-Inc/oneflow/

NCCL源码解析⑦:机器间Channel连接

 

Read More 

正文完
可以使用微信扫码关注公众号(ID:xzluomor)
post-qrcode
 0
评论(没有评论)

文心AIGC

2023 年 9 月
 123
45678910
11121314151617
18192021222324
252627282930  
文心AIGC
文心AIGC
人工智能ChatGPT,AIGC指利用人工智能技术来生成内容,其中包括文字、语音、代码、图像、视频、机器人动作等等。被认为是继PGC、UGC之后的新型内容创作方式。AIGC作为元宇宙的新方向,近几年迭代速度呈现指数级爆发,谷歌、Meta、百度等平台型巨头持续布局
文章搜索
热门文章
手把手教你用AI 10分钟生成一个APP!零基础也能搞定

手把手教你用AI 10分钟生成一个APP!零基础也能搞定

今日,我将向大家展示DeepSeek的全新玩法——从零开始,利用AI创建一个完整的应用程序。借助DeepSee...
最新评论
ufabet ufabet มีเกมให้เลือกเล่นมากมาย: เกมเดิมพันหลากหลาย ครบทุกค่ายดัง
tornado crypto mixer tornado crypto mixer Discover the power of privacy with TornadoCash! Learn how this decentralized mixer ensures your transactions remain confidential.
ดูบอลสด ดูบอลสด Very well presented. Every quote was awesome and thanks for sharing the content. Keep sharing and keep motivating others.
ดูบอลสด ดูบอลสด Pretty! This has been a really wonderful post. Many thanks for providing these details.
ดูบอลสด ดูบอลสด Pretty! This has been a really wonderful post. Many thanks for providing these details.
ดูบอลสด ดูบอลสด Hi there to all, for the reason that I am genuinely keen of reading this website’s post to be updated on a regular basis. It carries pleasant stuff.
Obrazy Sztuka Nowoczesna Obrazy Sztuka Nowoczesna Thank you for this wonderful contribution to the topic. Your ability to explain complex ideas simply is admirable.
ufabet ufabet Hi there to all, for the reason that I am genuinely keen of reading this website’s post to be updated on a regular basis. It carries pleasant stuff.
ufabet ufabet You’re so awesome! I don’t believe I have read a single thing like that before. So great to find someone with some original thoughts on this topic. Really.. thank you for starting this up. This website is something that is needed on the internet, someone with a little originality!
ufabet ufabet Very well presented. Every quote was awesome and thanks for sharing the content. Keep sharing and keep motivating others.
热评文章
手把手教你用AI 10分钟生成一个APP!零基础也能搞定

手把手教你用AI 10分钟生成一个APP!零基础也能搞定

今日,我将向大家展示DeepSeek的全新玩法——从零开始,利用AI创建一个完整的应用程序。借助DeepSee...