1. 管道并行
DeepSpeed v0.3增加了对管道并行的新支持。管道并行将模型的层划分为阶段,可以并行处理,从而提高深度学习训练的内存和计算效率。DeepSpeed的训练引擎提供了混合数据和管道并行,并可进一步与模型并行(如Megatron-LM)结合使用。下面展示了3D并行的示例。最新结果表明,这种三维并行使得训练具有万亿参数的模型成为可能。
DeepSpeed使用梯度累积来提取管道并行性。每个训练数据批次被划分为可以由管道阶段并行处理的微批次。一旦一个阶段完成微批次的正向传递,激活内存就会传递到管道中的下一个阶段。同样地,当下一个阶段在微批次上完成反向传递时,关于激活的梯度就会通过管道向后传递。每个反向传递都会本地累积梯度。接下来,所有数据并行组都会并行执行梯度的缩减操作。最后,优化器会更新模型权重。
下面是一个用混合双向数据并行和两个阶段的管道并行来训练8个微批次的批次的示例。GPU 0和2排成管道,交替进行前向(F)和后向(B)传递。然后,它们将与其数据并行的对应部分GPU 1和3一起执行梯度的全局归约操作(AR)。最后,两个管道阶段会更新模型权重。
2.Pipline 并行开始
DeepSpeed致力于加速和简化管道并行训练过程。本节提供了使用混合数据和管道并行训练torchvision的AlexNet模型的第一步。
2.1 表示管道模型
管道并行需要将模型表示为层序列。在正向传递中,每个层都使用前一层的输出。事实上,对于管道并行模型,不需要指定forward()!管道并行模型的正向传递隐式采用以下形式:
def forward(self, inputs):
x = inputs
for layer in self.layers:
x = layer(x)
return x
PyTorch的torch.nn.Sequential
是表达管道并行模型的方便容器,并且可以在无需修改的情况下由DeepSpeed并行化。
net = nn.Sequential(
nn.Linear(in_features, hidden_dim),
nn.ReLU(inplace=True),
nn.Linear(hidden_dim, out_features)
)
from deepspeed.pipe import PipelineModule
net = PipelineModule(layers=net, num_stages=2)
PipelineModule
使用其layers
参数作为构成模型的层序列。初始化后,net被划分为两个管道阶段,其层被移动到相应的GPU上。如果存在多于两个GPU,则DeepSpeed还将使用混合数据并行。
Note:The total number of GPUs must be divisible by the number of pipeline stages.
2.2 AlexNet
让我们看一下torchvision
的AlexNet
的简化实现:
class AlexNet(nn.Module):
def __init__(self, num_classes=1000):
super(AlexNet, self).__init__()
self.features = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
...
nn.MaxPool2d(kernel_size=3, stride=2),
)
self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
self.classifier = nn.Sequential(
nn.Dropout(),
...
nn.Linear(4096, num_classes),
)
def forward(self, x):
x = self.features(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.classifier(x)
return x
AlexNet
主要是由几个Sequential
子模块组成。我们可以将其子模块展开成单个层序列,并将其转换为PipelineModule
:
class AlexNetPipe(AlexNet):
def to_layers(self):
layers = [
*self.features,
self.avgpool,
lambda x: torch.flatten(x, 1),
*self.classifier
]
return layers
from deepspeed.pipe import PipelineModule
net = AlexNetPipe()
net = PipelineModule(layers=net.to_layers(), num_stages=2)
2.3 输入和输出
在torch.nn.Sequential
之后,每个层的输入和输出必须是单个torch.Tensor
或一个元组的张量。实际上,某些模型可能需要修改它们的正向传递以打包和解包forward()
的参数。考虑一下Transformer block的简化实现:
class TransformerBlock(nn.Module)
...
def forward(self, hidden, mask):
output = self.compute(hidden, mask)
return output
...
stack = [ TransformerBlock() for _ in range(num_layers) ]
需要对TransformerBlock进行两个修改:
-
参数必须被收集到一个元组中。
-
还必须从
forward()
返回mask
以传递给下一层。
这些修改可以通过一个短的子类来完成:
class TransformerBlockPipe(TransformerBlock)
def forward(self, inputs):
hidden, mask = inputs
output = super().forward(hidden, mask)
return (output, mask)
stack = [ TransformerBlockPipe() for _ in range(num_layers) ]
2.4 训练循环
管道并行交替执行前向传递和反向传递,因此训练循环不能分为forward()
、backward
和step()
等单独的阶段。相反,DeepSpeed的管道引擎提供了train_batch()
方法,该方法推进管道引擎直到消耗下一批训练数据并更新模型权重。
train_iter = iter(train_loader)
loss = engine.train_batch(data_iter=train_iter)
以上train_batch()
示例等效于以下传统数据并行DeepSpeed示例:
train_iter = iter(train_loader)
for micro_batch in engine.gradient_accumulation_steps():
batch = next(data_iter)
loss = engine(batch)
engine.backward(loss)
engine.step()
2.5 数据处理
通常,数据并行训练会使每个工作器在每个批次开始时独立执行IO。但是,在管道并行环境中,只有第一个阶段使用输入数据,只有最后一个阶段使用标签进行损失计算。
注意:管道引擎希望数据加载器返回两个项目的元组。第一个返回项是输入批次数据,第二个项目是用于计算损失的数据。与之前一样,输入和标签应为torch.Tensor类型或张量的元组。
出于方便起见,DeepSpeed管道引擎可以在向deepspeed.initialize()
提供数据集时构建分布式数据加载器。DeepSpeed处理了数据加载的其余复杂性,因此管道训练循环变为:
engine, _, _, _ = deepspeed.initialize(
args=args,
model=net,
model_parameters=[p for p in net.parameters() if p.requires_grad],
training_data=cifar_trainset())
for step in range(args.steps):
loss = engine.train_batch()
当然,DeepSpeed将与您希望使用的任何数据加载器一起工作。数据加载器应由管道中的第一个和最后一个阶段构建。每个工作器应加载engine.train_micro_batch_size_per_gpu()
大小的微批次,并且每个train_batch()
将查询engine.gradient_accumulation_steps()
次。
请注意!管道引擎从迭代器中提取数据而不是对其进行迭代。在训练批次中间,数据流不能为空。每次调用train_batch()将从数据迭代器中拉取engine.gradient_accumulation_steps()个微批次数据。
DeepSpeed提供了一个方便的类deepspeed.utils.RepeatingLoader
,它简单地包装可迭代对象(例如数据加载器),并在到达结尾时重新启动它:
train_loader = deepspeed.utils.RepeatingLoader(train_loader)
train_iter = iter(train_loader)
for step in range(args.steps):
loss = engine.train_batch(data_iter=trainiter)
3. 进一步建议
3.1 管道模块的负载平衡
管道并行训练的性能强烈依赖于负载均衡。DeepSpeed提供了几种在GPU之间分配模型的机制。可以使用PipelineModule
的partition_method
关键字参数设置这些策略。以下是DeepSpeed当前提供的分区方法:
-
partition_method="parameters"
(默认)将每个管道阶段上可训练参数的数量平衡起来。这在内存受限的环境和当层的大小与计算时间成比例时特别有用。 -
partition_method="type:[regex]"
平衡类名与[regex]匹配的层。正则表达式不区分大小写。例如,partition_method="type:transformer"
将平衡每个阶段中transformer层的数量。 -
partition_method="uniform"
平衡每个阶段中的层数。
3.2 内存高效的模型构建
将一个 Sequential
容器构建并提供给 PipelineModule
是指定管道并行模型的一种方便的方式。然而,对于大型模型,这种方法会遇到可扩展性问题,因为每个 worker 都会在 CPU 内存中复制整个模型。例如,一个具有 16 个 GPU 的机器必须拥有与模型大小的 16 倍相同的本地 CPU 内存。
DeepSpeed 提供了一个LayerSpec
类,它延迟模块的构建,直到模型层次已经被分配给工作节点。然后每个 worker 只分配它所分配的层次。因此,与前一段中的示例进行比较,使用 LayerSpec
,具有 16 个 GPU 的机器将需要在其 CPU 内存上分配总共 1x 模型大小而不是 16 倍。
以下是 AlexNet 模型的简略表示方式,但仅使用 LayerSpec
s。请注意,语法几乎没有改变:nn.ReLU(inplace=True)
简单地变成了 LayerSpec(nn.ReLU, inplace=True)
。
from deepspeed.pipe import PipelineModule, LayerSpec
class AlexNetPipe(PipelineModule):
def __init__(self, num_classes=10, **kwargs):
self.num_classes = num_classes
specs = [
LayerSpec(nn.Conv2d, 3, 64, kernel_size=11, stride=4, padding=2),
LayerSpec(nn.ReLU, inplace=True),
...
LayerSpec(nn.ReLU, inplace=True),
LayerSpec(nn.Linear, 4096, self.num_classes),
]
super().__init__(layers=specs, loss_fn=nn.CrossEntropyLoss(), **kwargs)
某些模型不能完全表示为管道并行模型,因为一些层在管道内被重用。例如,基于 Transformer 的语言模型通常在管道早期使用嵌入层将词汇映射到隐藏状态,然后在管道末尾使用嵌入将隐藏状态映射回词汇。如果该模型受到纯管道并行性的限制,则这种嵌入重用将禁止管道并行性。
3.3 Tied Layers
某些模型不能完全表示为管道并行模型,因为一些层在管道内被重用。例如,基于 Transformer 的语言模型通常在管道早期使用嵌入层将词汇映射到隐藏状态,然后在管道末尾使用嵌入将隐藏状态映射回词汇。如果该模型受到纯管道并行性的限制,则这种嵌入重用将禁止管道并行性。
DeepSpeed 提供了TiedLayerSpec
,它是 LayerSpec
的扩展。TiedLayerSpec
需要一个额外的参数:key
。每次重用层都使用 TiedLayerSpec
进行指定,key
字段用于识别层何处被重用。
被捆绑的层会在每个拥有“重用”实例的管道阶段中复制。然后训练将像平常一样进行,但在所有反向传递完成后,会添加一个额外的捆绑梯度的全局归约操作。这个全局归约确保了捆绑层的权重在管道阶段之间保持同步。