DP
单机多卡( Data Parallel,DP)
简介
DataParallel 可以帮助我们(使用单进程控)将模型和数据加载到多个 GPU 中,控制数据在 GPU 之间的流动,协同不同 GPU 上的模型进行并行训练(细粒度的方法有 scatter,gather 等等)。
DP 基于单机多卡,所有设备都负责计算和训练网络,除此之外, device[0] (并非 GPU 真实标号而是输入参数 device_ids 首位) 还要负责整合梯度,更新参数。图 1 即为 GPU 0 作为 device[0] 的例子。从图中我们可以看出,有三个主要过程:
- 过程一(图中红色部分):各卡分别计算损失和梯度
- 过程二(图中蓝色部分):所有梯度整合到 device[0]
- 过程三(图中绿色部分):device[0] 进行参数更新,其他卡拉取 device[0] 的参数进行更新
所有卡都并行运算(图中红色),将梯度收集到 device[0](图中浅蓝色)和 device[0] 分享模型参数给其他 GPU(图中绿色)三个主要过程。

虽然 DP 只能实现单机训练不能算是严格意义上的分布式训练(多个节点),但是其原理和分布式训练算法里的 Parameter Server 架构很相近,我们借用 PS 的伪代码来说明一下。

我们可以看到 PS 的并行梯度下降流程分为四部分:
Task Scheduler:负责加载数据并分发数据至每个 worker 节点,并执行多轮迭代。
在每轮迭代中,worker 负责:
- 初始化:载入数据并将全部模型参数从 server 节点拉下来(图 1 绿色)
- 梯度计算:利用该节点的数据计算梯度(图 1 红色)并将梯度更新到 server 节点(图 1 蓝色)
Server 负责:
- 汇总梯度
- 更新参数
使用
DataParallel 使用起来非常方便,我们只需要用 DataParallel 包装模型,再设置一些参数即可。主要使用到了nn.DataParallel
函数,需要定义的参数包括:参与训练的 GPU 有哪些,device_ids=gpus;用于汇总梯度的 GPU 是哪个,output_device=gpus[0] 。DataParallel 会自动帮我们将数据切分 load 到相应 GPU,将模型复制到相应 GPU,进行正向传播计算梯度并汇总:
model = nn.DataParallel(model.cuda(), device_ids=gpus, output_device=gpus[0])
指定 GPU 进行并行训练,一般有两种方式
nn.DataParallel
函数传入device_ids
参数,可以指定了使用的 GPU 编号model = nn.DataParallel(model, device_ids=[0,1]) # 使用第0和第1张卡进行并行训练
1要手动指定对程序可见的 GPU 设备
os.environ["CUDA_VISIBLE_DEVICES"] = "1,2"
1
值得注意的是,模型和数据都需要先 load 进 GPU 中,DataParallel 的 module 才能对其进行处理,否则会报错:
# 这里要 model.cuda()
model = nn.DataParallel(model.cuda(), device_ids=gpus, output_device=gpus[0])
for epoch in range(100):
for batch_idx, (data, target) in enumerate(train_loader):
# 这里要 images/target.cuda()
images = images.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
...
output = model(images)
loss = criterion(output, target)
...
optimizer.zero_grad()
loss.backward()
optimizer.step()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
汇总一下,DataParallel 并行训练部分主要与如下代码段有关:
# main.py
import torch
import torch.distributed as dist
gpus = [0, 1, 2, 3]
torch.cuda.set_device('cuda:{}'.format(gpus[0]))
train_dataset = ...
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=...)
model = ...
model = nn.DataParallel(model.to(device), device_ids=gpus, output_device=gpus[0])
optimizer = optim.SGD(model.parameters())
for epoch in range(100):
for batch_idx, (data, target) in enumerate(train_loader):
images = images.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
...
output = model(images)
loss = criterion(output, target)
...
optimizer.zero_grad()
loss.backward()
optimizer.step()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
在使用时,使用 python 执行即可:
python main.py
在 ImageNet 上的完整训练代码,请点击Github (opens new window)。
实现
这一节主要讨论 DP 的实现,首先先贴上源码(顺便看一下comment 部分):
class DataParallel(Module):
def __init__(self, module, device_ids=None, output_device=None, dim=0):
super(DataParallel, self).__init__()
# 检查是否有可用的 GPU
device_type = _get_available_device_type()
if device_type is None:
self.module = module
self.device_ids = []
return
# 默认使用所有可见的 GPU
if device_ids is None:
device_ids = _get_all_device_indices()
# 默认 server 是 device_ids 列表上第一个
if output_device is None:
output_device = device_ids[0]
self.dim = dim
self.module = module
self.device_ids = list(map(lambda x: _get_device_index(x, True), device_ids))
self.output_device = _get_device_index(output_device, True)
self.src_device_obj = torch.device(device_type, self.device_ids[0])
# 检查负载是否平衡, 不平衡(指内存或者处理器 max/min > 0.75 会有警告)
_check_balance(self.device_ids)
# 单卡
if len(self.device_ids) == 1:
self.module.to(self.src_device_obj)
def forward(self, *inputs, **kwargs):
# 没 GPU 可用
if not self.device_ids:
return self.module(*inputs, **kwargs)
# 运行前 GPU device_ids[0] (即我们的 server )上必须有 parallelized module 的parameters 和 buffers
# 因为 DP 保证 GPU device_ids[0] 和 base parallelized module 共享存储
# 所以在device[0] 上的 in-place 更新也会被保留下来,其他的则不会
for t in chain(self.module.parameters(), self.module.buffers()):
if t.device != self.src_device_obj:
raise RuntimeError("module must have its parameters and buffers "
"on device {} (device_ids[0]) but found one of "
"them on device: {}".format(self.src_device_obj, t.device))
# nice 现在 device[0] 上已经有了 module 和 input, 接下来我们就要开始 PS 算法了
# 可以开始看正文了
inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
# 如果仅有单卡可用,直接单卡计算,不用并行
if len(self.device_ids) == 1:
return self.module(*inputs[0], **kwargs[0])
replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
outputs = self.parallel_apply(replicas, inputs, kwargs)
return self.gather(outputs, self.output_device)
def replicate(self, module, device_ids):
return replicate(module, device_ids, not torch.is_grad_enabled())
def scatter(self, inputs, kwargs, device_ids):
return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)
def parallel_apply(self, replicas, inputs, kwargs):
return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)])
def gather(self, outputs, output_device):
return gather(outputs, output_device, dim=self.dim)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
从 forward 函数可以看出,关键函数有 scatter, replicate, parallel_apply 和 gather,我们一个一个看一下。
首先是 scatter 函数,即 scatter_kwargs 函数。
def scatter_kwargs(inputs, kwargs, target_gpus, dim=0):
r"""Scatter with support for kwargs dictionary"""
# 主要函数
inputs = scatter(inputs, target_gpus, dim) if inputs else []
kwargs = scatter(kwargs, target_gpus, dim) if kwargs else []
# 用空项补全使 inputs 和 kwargs 长度相当
if len(inputs) < len(kwargs):
inputs.extend([() for _ in range(len(kwargs) - len(inputs))])
elif len(kwargs) < len(inputs):
kwargs.extend([{} for _ in range(len(inputs) - len(kwargs))])
# 返回 tuple
inputs = tuple(inputs)
kwargs = tuple(kwargs)
return inputs, kwargs
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scatter_kwargs 函数中最重要的就是 scatter 函数,负责将 tensor 分成大概相等的块并将他们分给不同的 GPU。对其他的数据类型,则是复制分散给不同的 GPU 。
def scatter(inputs, target_gpus, dim=0):
r"""
Slices tensors into approximately equal chunks and
distributes them across given GPUs. Duplicates
references to objects that are not tensors.
"""
def scatter_map(obj):
if isinstance(obj, torch.Tensor):
return Scatter.apply(target_gpus, None, dim, obj)
if is_namedtuple(obj):
return [type(obj)(*args) for args in zip(*map(scatter_map, obj))]
if isinstance(obj, tuple) and len(obj) > 0:
return list(zip(*map(scatter_map, obj)))
if isinstance(obj, list) and len(obj) > 0:
return [list(i) for i in zip(*map(scatter_map, obj))]
if isinstance(obj, dict) and len(obj) > 0:
return [type(obj)(i) for i in zip(*map(scatter_map, obj.items()))]
return [obj for targets in target_gpus]
# After scatter_map is called, a scatter_map cell will exist. This cell
# has a reference to the actual function scatter_map, which has references
# to a closure that has a reference to the scatter_map cell (because the
# fn is recursive). To avoid this reference cycle, we set the function to
# None, clearing the cell
try:
res = scatter_map(inputs)
finally:
scatter_map = None
return res
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
其中,针对 tensor 的函数,
class Scatter(Function):
@staticmethod
def forward(ctx, target_gpus, chunk_sizes, dim, input):
target_gpus = [_get_device_index(x, True) for x in target_gpus]
ctx.dim = dim
ctx.input_device = input.get_device() if input.device.type != "cpu" else -1
streams = None
if torch.cuda.is_available() and ctx.input_device == -1:
# Perform CPU to GPU copies in a background stream
# 新建 cuda stream
streams = [_get_stream(device) for device in target_gpus]
# 真正的操作
outputs = comm.scatter(input, target_gpus, chunk_sizes, ctx.dim, streams)
# Synchronize with the copy stream
if streams is not None:
for i, output in enumerate(outputs):
with torch.cuda.device(target_gpus[i]):
main_stream = torch.cuda.current_stream()
main_stream.wait_stream(streams[i])
output.record_stream(main_stream)
return outputs
@staticmethod
def backward(ctx, *grad_output):
return None, None, None, Gather.apply(ctx.input_device, ctx.dim, *grad_output)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
comm.scatter 依赖于 C++,就不介绍了。
回顾 DP 代码块,我们已经运行完 scatter函数,即将一个 batch 近似等分成更小的 batch。接下来我们要看 replicate 函数和 gather 函数 (假设我们有不少于两张卡)。
# DP forward 里的代码
replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
# 实现
def replicate(network, devices, detach=False):
if not _replicatable_module(network):
raise RuntimeError("Cannot replicate network where python modules are "
"childrens of ScriptModule")
if not devices:
return []
# 需要复制到哪些 GPU, 复制多少份
devices = [_get_device_index(x, True) for x in devices]
num_replicas = len(devices)
# 复制 parameters
params = list(network.parameters())
param_indices = {param: idx for idx, param in enumerate(params)}
# 拉到代码块底部看原函数,然后再回来
param_copies = _broadcast_coalesced_reshape(params, devices, detach)
# 复制 buffers
buffers = list(network.buffers())
buffers_rg = []
buffers_not_rg = []
for buf in buffers:
if buf.requires_grad and not detach:
buffers_rg.append(buf)
else:
buffers_not_rg.append(buf)
# 记录需要和不需要求导的 buffer 的 index
buffer_indices_rg = {buf: idx for idx, buf in enumerate(buffers_rg)}
buffer_indices_not_rg = {buf: idx for idx, buf in enumerate(buffers_not_rg)}
# 分别拷贝,这个咱们已经会了
buffer_copies_rg = _broadcast_coalesced_reshape(buffers_rg, devices, detach=detach)
buffer_copies_not_rg = _broadcast_coalesced_reshape(buffers_not_rg, devices, detach=True)
# 现在开始拷贝网络
# 准备过程:将 network.modules() 变成list
# 然后再为之后复制的模型准备好空的 list 和 indices
modules = list(network.modules())
module_copies = [[] for device in devices]
module_indices = {}
scriptmodule_skip_attr = {"_parameters", "_buffers", "_modules", "forward", "_c"}
for i, module in enumerate(modules):
module_indices[module] = i
for j in range(num_replicas):
replica = module._replicate_for_data_parallel()
# This is a temporary fix for DDP. DDP needs to access the
# replicated model parameters. It used to do so through
# `mode.parameters()`. The fix added in #33907 for DP stops the
# `parameters()` API from exposing the replicated parameters.
# Hence, we add a `_former_parameters` dict here to support DDP.
replica._former_parameters = OrderedDict()
module_copies[j].append(replica)
# 接下来分别复制 module,param,buffer
for i, module in enumerate(modules):
for key, child in module._modules.items():
if child is None:
for j in range(num_replicas):
replica = module_copies[j][i]
replica._modules[key] = None
else:
module_idx = module_indices[child]
for j in range(num_replicas):
replica = module_copies[j][i]
setattr(replica, key, module_copies[j][module_idx])
for key, param in module._parameters.items():
if param is None:
for j in range(num_replicas):
replica = module_copies[j][i]
replica._parameters[key] = None
else:
param_idx = param_indices[param]
for j in range(num_replicas):
replica = module_copies[j][i]
param = param_copies[j][param_idx]
# parameters in replicas are no longer leaves,
# so setattr them as non-parameter attributes
setattr(replica, key, param)
# expose the parameter for DDP
replica._former_parameters[key] = param
for key, buf in module._buffers.items():
if buf is None:
for j in range(num_replicas):
replica = module_copies[j][i]
replica._buffers[key] = None
else:
if buf.requires_grad and not detach:
buffer_copies = buffer_copies_rg
buffer_idx = buffer_indices_rg[buf]
else:
buffer_copies = buffer_copies_not_rg
buffer_idx = buffer_indices_not_rg[buf]
for j in range(num_replicas):
replica = module_copies[j][i]
setattr(replica, key, buffer_copies[j][buffer_idx])
return [module_copies[j][0] for j in range(num_replicas)]
# !!!从replicate来看这里
def _broadcast_coalesced_reshape(tensors, devices, detach=False):
from ._functions import Broadcast
# 先看 else 的 comment,因为不 detach 也会用到同样的函数
if detach:
return comm.broadcast_coalesced(tensors, devices)
else:
# Use the autograd function to broadcast if not detach
if len(tensors) > 0:
# 下拉看源码
tensor_copies = Broadcast.apply(devices, *tensors)
return [tensor_copies[i:i + len(tensors)]
for i in range(0, len(tensor_copies), len(tensors))]
else:
return []
# Broadcast.apply
class Broadcast(Function):
@staticmethod
def forward(ctx, target_gpus, *inputs):
assert all(i.device.type != 'cpu' for i in inputs), (
'Broadcast function not implemented for CPU tensors'
)
target_gpus = [_get_device_index(x, True) for x in target_gpus]
ctx.target_gpus = target_gpus
if len(inputs) == 0:
return tuple()
ctx.num_inputs = len(inputs)
# input 放在 device[0]
ctx.input_device = inputs[0].get_device()
# 和 detach 的情况一样
outputs = comm.broadcast_coalesced(inputs, ctx.target_gpus)
# comm.broadcast_coalesced 的代码
# tensors 必须在同一个设备,CPU 或者 GPU; devices 即是要拷贝到的设备;buffer_size 则是最大的buffer
# 这里用到 buffer 将小张量合并到缓冲区以减少同步次数
# def broadcast_coalesced(tensors, devices, buffer_size=10485760):
# devices = [_get_device_index(d) for d in devices]
# return torch._C._broadcast_coalesced(tensors, devices, buffer_size)
non_differentiables = []
for idx, input_requires_grad in enumerate(ctx.needs_input_grad[1:]):
if not input_requires_grad:
for output in outputs:
non_differentiables.append(output[idx])
ctx.mark_non_differentiable(*non_differentiables)
return tuple([t for tensors in outputs for t in tensors])
@staticmethod
def backward(ctx, *grad_outputs):
return (None,) + ReduceAddCoalesced.apply(ctx.input_device, ctx.num_inputs, *grad_outputs)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
下面继续 parallel_apply 部分。⚠️ DP 和 DDP 共用 parallel_apply 代码
# DP 代码
outputs = self.parallel_apply(replicas, inputs, kwargs)
# threading 实现,用前面准备好的 replica 和输入数据,然后
# for 循环启动多线程
# 源码
def parallel_apply(modules, inputs, kwargs_tup=None, devices=None):
# 每个 GPU 都有模型和输入
assert len(modules) == len(inputs)
# 确保每个 GPU 都有相应的数据,如没有就空白补全
if kwargs_tup is not None:
# 咱们在 scatter 已经补全了
assert len(modules) == len(kwargs_tup)
else:
kwargs_tup = ({},) * len(modules)
if devices is not None:
assert len(modules) == len(devices)
else:
devices = [None] * len(modules)
devices = [_get_device_index(x, True) for x in devices]
# 多线程实现
lock = threading.Lock()
results = {}
grad_enabled, autocast_enabled = torch.is_grad_enabled(), torch.is_autocast_enabled()
# 定义 worker
def _worker(i, module, input, kwargs, device=None):
torch.set_grad_enabled(grad_enabled)
if device is None:
device = get_a_var(input).get_device()
try:
with torch.cuda.device(device), autocast(enabled=autocast_enabled):
# this also avoids accidental slicing of `input` if it is a Tensor
if not isinstance(input, (list, tuple)):
input = (input,)
output = module(*input, **kwargs)
with lock:
# 并行计算得到输出
results[i] = output
except Exception:
with lock:
results[i] = ExceptionWrapper(
where="in replica {} on device {}".format(i, device))
if len(modules) > 1:
# 如有一个进程控制多个 GPU ,起多个线程
# 需要强调一下,虽然 DDP 推荐单卡单进程,即每次调用 DDP device_ids 都只输入一张卡的 id(通常是 args.local_rank),但是如果输入多个 device_id,此时 DDP 就是单进程多线程控制多卡,和 DP 一样,关于 DDP 的解读可以看下文
threads = [threading.Thread(target=_worker,
args=(i, module, input, kwargs, device))
for i, (module, input, kwargs, device) in
enumerate(zip(modules, inputs, kwargs_tup, devices))]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
else:
# 一个 GPU 一个进程 ( DDP 推荐操作)
_worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0])
outputs = []
for i in range(len(inputs)):
output = results[i]
# error handle
if isinstance(output, ExceptionWrapper):
output.reraise()
outputs.append(output)
# 输出 n 个计算结果
return outputs
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
现在我们已经得到并行计算的结果了,接下来我们要将结果收集到 device[0]。
# DP 代码
return self.gather(outputs, self.output_device)
# 收集到 devices[0]
# 源码
def gather(outputs, target_device, dim=0):
r"""
Gathers tensors from different GPUs on a specified device
(-1 means the CPU).
"""
def gather_map(outputs):
out = outputs[0]
if isinstance(out, torch.Tensor):
return Gather.apply(target_device, dim, *outputs)
if out is None:
return None
if isinstance(out, dict):
if not all((len(out) == len(d) for d in outputs)):
raise ValueError('All dicts must have the same number of keys')
return type(out)(((k, gather_map([d[k] for d in outputs]))
for k in out))
return type(out)(map(gather_map, zip(*outputs)))
# Recursive function calls like this create reference cycles.
# Setting the function to None clears the refcycle.
try:
res = gather_map(outputs)
finally:
gather_map = None
return res
# Gather 源码
class Gather(Function):
@staticmethod
def forward(ctx, target_device, dim, *inputs):
assert all(i.device.type != 'cpu' for i in inputs), (
'Gather function not implemented for CPU tensors'
)
target_device = _get_device_index(target_device, True)
ctx.target_device = target_device
ctx.dim = dim
ctx.input_gpus = tuple(i.get_device() for i in inputs)
if all(t.dim() == 0 for t in inputs) and dim == 0:
inputs = tuple(t.view(1) for t in inputs)
warnings.warn('Was asked to gather along dimension 0, but all '
'input tensors were scalars; will instead unsqueeze '
'and return a vector.')
ctx.unsqueezed_scalar = True
else:
ctx.unsqueezed_scalar = False
ctx.input_sizes = tuple(i.size(ctx.dim) for i in inputs)
return comm.gather(inputs, ctx.dim, ctx.target_device)
@staticmethod
def backward(ctx, grad_output):
scattered_grads = Scatter.apply(ctx.input_gpus, ctx.input_sizes, ctx.dim, grad_output)
if ctx.unsqueezed_scalar:
scattered_grads = tuple(g[0] for g in scattered_grads)
return (None, None) + scattered_grads
# comm.gather 涉及到 C++,具体实现咱也不讲了 ;)
# Gathers tensors from multiple GPU devices.
def gather(tensors, dim=0, destination=None, *, out=None):
tensors = [_handle_complex(t) for t in tensors]
if out is None:
if destination == -1:
warnings.warn(
'Using -1 to represent CPU tensor is deprecated. Please use a '
'device object or string instead, e.g., "cpu".')
destination = _get_device_index(destination, allow_cpu=True, optional=True)
return torch._C._gather(tensors, dim, destination)
else:
if destination is not None:
raise RuntimeError(
"'destination' must not be specified when 'out' is specified, but "
"got destination={}".format(destination))
return torch._C._gather_out(tensors, out, dim)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
因为大量实现都依赖 C++,这篇笔记就不涉及了。
最后,用一张图形象地看一下 DP Module 究竟怎么执行的,具体看第一行和第三行:
前向传播的时候我们会先用 Scatter 函数将数据从 device[0] 分配并复制到不同的卡,之后用 Replicate 函数将模型从 device[0] 复制到不同的卡,之后各个卡都有了同样的模型和不同的数据,分别调用 forward 计算损失和梯度。
反向传播的时候,我们会将梯度收集到 device[0] 然后在 device[0] 更新参数。

分析
- 负载不均衡
device[0] 负载大一些
- 通信开销
假设有
- 单进程
The difference between DistributedDataParallel (opens new window) and DataParallel (opens new window)
is:
DistributedDataParallel (opens new window) uses multiprocessing where a process is created for each GPU, while DataParallel (opens new window) uses multithreading. By using multiprocessing, each GPU has its dedicated process, this avoids the performance overhead caused by GIL of Python interpreter.
- Global Interpreter Lock (GIL) (opens new window) 全局解释器锁 (opens new window),简单来说就是,一个 Python 进程只能利用一个 CPU kernel,即单核多线程并发时,只能执行一个线程。考虑多核,多核多线程可能出现线程颠簸 (thrashing) 造成资源浪费,所以 Python 想要利用多核最好是多进程。