GPU集群上的分布式
GPU 集群上的分布式
Slurm,是一个用于 Linux 系统的免费、开源的任务调度工具。它提供了三个关键功能。第一,为用户分配资源(计算机节点),以供用户执行工作。第二,它提供了一个框架,用于执行在节点上运行着的任务(通常是并行的任务),第三,为任务队列合理地分配资源。如果你还没有部署 Slurm 可以按照作者总结的部署教程 (opens new window)进行部署。
通过运行 slurm 的控制命令,slurm 会将写好的 python 程序在每个节点上分别执行,调用节点上定义的 GPU 资源进行运算。要编写能被 Slurm 在 GPU 集群上执行的 python 分布式训练程序,我们只需要对上文中多进程的 DistributedDataParallel 代码进行修改,告诉每一个执行的任务(每个节点上的 python 程序),要用哪些训练哪一部分数据,反向传播的结果如何合并就可以了。
我们首先需要获得每个任务(对应每个节点)的基本信息,以便针对任务的基本信息处理其应当负责的数据。在使用 slurm 执行 srun python 代码时,python 可以从环境变量 os.environ 中获取当前 python 进程的基本信息:
import os
local_rank = os.environ['SLURM_PROCID'] # 当前任务的编号(比如节点 1 执行 1 号任务,节点 2 执行 2 号任务)
world_size = os.environ['SLURM_NPROCS'] # 共开启的任务的总数(共有 2 个节点执行了 2 个任务)
job_id = os.environ['SLURM_JOBID'] # 当前作业的编号(这是第 1 次执行 srun,编号为 1)
2
3
4
在每个任务(节点)中,我们需要为节点中的每个 GPU 资源分配一个进程,管理该 GPU 应当处理的数据。
当前节点的 GPU 的数量可以由 torch.cuda 查询得到:
ngpus_per_node = torch.cuda.device_count()
接着,与上文相似,我们使用 torch.multiprocessing 创建 ngpus_per_node 个进程,其中,每个进程执行的函数为 main_worker ,该函数调用所需要的由 args 传入:
mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, args))
在编写 main_worker 时,我们首先需要解决的问题是:不同节点、或者同一节点间的不同进程之间需要通信来实现数据的分割、参数的合并。我们可以使用 pytorch 的 dist 库在共享文件系统上创建一个文件进行通信:
import torch.distributed as dist
def main_worker(gpu, ngpus_per_node, args):
dist_url = "file://dist_file.{}".format(job_id)
rank = local_rank * ngpus_per_node + gpu
dist.init_process_group(backend='nccl', init_method=dist_url, world_size=world_size, rank=rank)
...
2
3
4
5
6
7
完成进程创建和通信后,下一步就是实现我们常用的 pipline 了,即加载模型、加载数据、正向传播、反向传播。与上文相似,这里,我们把模型加载进当前进程所对应的 GPU 中:
def main_worker(gpu, ngpus_per_node, args):
dist_url = "file://dist_file.{}".format(job_id)
rank = local_rank * ngpus_per_node + gpu
dist.init_process_group(backend='nccl', init_method=dist_url, world_size=world_size, rank=rank)
...
torch.cuda.set_device(gpu)
model.cuda(gpu)
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[gpu])
2
3
4
5
6
7
8
接着,把当前进程对应的数据段采样出来,也加载到对应的 GPU 中。同样可以使用 pytorch 的 dist 库实现这个采样过程:
def main_worker(gpu, ngpus_per_node, args):
dist_url = "file://dist_file.{}".format(job_id)
rank = local_rank * ngpus_per_node + gpu
dist.init_process_group(backend='nccl', init_method=dist_url, world_size=world_size, rank=rank)
...
torch.cuda.set_device(gpu)
model.cuda(gpu)
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[gpu])
...
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(train_dataset,
batch_size=args.batch_size,
num_workers=2,
pin_memory=True,
sampler=train_sampler)
for i, (images, target) in enumerate(train_loader):
images = images.cuda(gpu, non_blocking=True)
target = target.cuda(gpu, non_blocking=True)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
最后,进行正常的正向和反向传播:
def main_worker(gpu, ngpus_per_node, args):
dist_url = "file://dist_file.{}".format(job_id)
rank = local_rank * ngpus_per_node + gpu
dist.init_process_group(backend='nccl', init_method=dist_url, world_size=world_size, rank=rank)
...
torch.cuda.set_device(gpu)
model.cuda(gpu)
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[gpu])
...
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(train_dataset,
batch_size=args.batch_size,
num_workers=2,
pin_memory=True,
sampler=train_sampler)
for i, (images, target) in enumerate(train_loader):
images = images.cuda(gpu, non_blocking=True)
target = target.cuda(gpu, non_blocking=True)
...
output = model(images)
loss = criterion(output, target)
optimizer.zero_grad()
loss.backward()
optimizer.step()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
在使用时,调用 srun 启动任务:
srun -N2 --gres gpu:1 python distributed_slurm_main.py --dist-file dist_file
在 ImageNet 上的完整训练代码,请点击Github (opens new window)。