0%

分布式训练

三天半的清明假期开始啦!先来总结下PyTorch分布式训练相关的内容。
GETTING STARTED WITH DISTRIBUTED DATA PARALLELDistributedDataParallel(DDP)可以跨多台机器运行,实现数据并行。DDP使用torch.distributed包中的集体通信来同步梯度和buffer。
研究生的时候一直使用的是DataParallel,但是DistributedDataParallel的效率更高,主要区别如下:

  • DataParallel是单进程、多线程,并且只在单机上工作,而DistributedDataParallel是多进程的,适用于单机和多机训练。
  • 由于跨线程的GIL争用、每次迭代的复制模型以及分散输入和收集输出引入的额外开销,即使在单台机器上,DataParallel通常也比DistributedDataParallel慢。
  • DistributedDataParallel可以和model parallel一起使用,而DataParallel不可以。

因此使用DistributedDataParallel是非常有必要的!

下面就来介绍一下使用方法(TCP初始化方式)

输入参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def myargs():
parser = argparse.ArgumentParser(description='training example')
parser.add_argument('--batch_size', default=256, type=int, help='this is the total batch size of all GPUs on the current node when using Distributed Data Parallel')
parser.add_argument('--workers', default=8, type=int, help='number of data loading workers')
parser.add_argument('--epochs', default=100, type=int, help='number of total epochs to run')
parser.add_argument('--seed', default=20, type=int, help='seed for initializing training.')
# 分布式训练相关
parser.add_argument('--dist_url', default='tcp://127.0.0.1:23456', type=str, help='url used to set up distributed training')
parser.add_argument('--dist_backend', default='nccl', type=str, help='distributed backend')
parser.add_argument('--world_size', default=1, type=int, help='number of nodes for distributed training')
parser.add_argument('--rank', default=0, type=int, help='node rank for distributed training')
parser.add_argument('--gpu', default=None, type=int, help='GPU id to use.')
myargs = parser.parse_args()
return myargs

batch_size指一台机子上所有GPU的总batch_size;dist_url为主机的ip:port,port未被占用,当只有一台机子时,使用本机ip 127.0.0.1;dist_backend一般设置为nccl;world_size为使用的机器总数;rank为机器序号;gpu不需要指定,表示当前机子当前进程的局部rank号,torch.multiprocessing.spawn会自动将其传入main_worker函数。

运行样例

1
2
3
4
5
6
# 单机4GPU运行
CUDA_VISIBLE_DEVICES=0,1,2,3 python xxx.py --dist-url 'tcp://127.0.0.1:23456' --dist-backend 'nccl' --world-size 1 --rank 0

# 多机运行
CUDA_VISIBLE_DEVICES=0,1,2,3 python xxx.py --dist-url 'tcp://IP_OF_NODE0:FREEPORT' --dist-backend 'nccl' --world-size 2 --rank 0 # node 0
CUDA_VISIBLE_DEVICES=0,1,2,3 python xxx.py --dist-url 'tcp://IP_OF_NODE0:FREEPORT' --dist-backend 'nccl' --world-size 2 --rank 1 # node 1

训练代码

  1. 主函数,通过spawn开启多个进程,一个进程一个GPU,传入main_worker函数的参数为(当前局部rank号可以视为GPU号,args)。
1
2
3
4
5
6
if __name__ == '__main__':
args = myargs()

ngpus_per_node = torch.cuda.device_count() # 一个机子的GPU数
args.world_size = ngpus_per_node * args.world_size # 所有机子总的GPU数
torch.multiprocessing.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, args))
  1. main函数,每个进程都会执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def main_worker(gpu, ngpus_per_node, args):
# 这边设定随机种子,benchmark设定为False,原因?
random.seed(myargs.random_seed)
np.random.seed(myargs.random_seed)
torch.manual_seed(myargs.random_seed)
torch.cuda.manual_seed_all(myargs.random_seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# spawn传进来的局部rank号,即GPU号
args.gpu = gpu
print("Use GPU: {} for distributed training".format(args.gpu))

# init_process_group需传入当前进程的全局rank号
args.rank = args.rank * ngpus_per_node + gpu # 全局rank
dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url, world_size=args.world_size, rank=args.rank)

# 定义模型,修改batch_size,num_workers
model = resnet50()
torch.cuda.set_device(args.gpu) # 设置device为当前gpu
model.cuda(args.gpu)
args.batch_size = int(args.batch_size / ngpus_per_node) # 得到单个GPU上的batch_size
args.workers = int((args.workers + ngpus_per_node - 1) / ngpus_per_node)
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])

# 定义损失、optimizer、scheduler
criterion = torch.nn.CrossEntropyLoss().cuda(args.gpu)
optimizer = torch.optim.SGD(model.parameters(), 0.1, momentum=0.9, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1)

# 需定义数据sampler
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=args.batch_size, shuffle=False, sampler=train_sampler, num_workers=args.workers, pin_memory=True, drop_last=True)

for epoch in range(args.epochs):
# 保证每个进程拿到的数据不一样
train_sampler.set_epoch(epoch)

model.train()
for i, (images, target) in enumerate(train_loader):
optimizer.zero_grad()

images = images.cuda(args.gpu, non_blocking=True)
target = target.cuda(args.gpu, non_blocking=True)
output = model(images)
loss = criterion(output, target)

loss.backward()
optimizer.step()

scheduler.step()
if args.rank % ngpus_per_node == 0:
torch.save(model.module.state_dict(), 'checkpoint.pth.tar')

torch.cuda.empty_cache()