searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

DeepSpeed-Pipeline并行

2023-07-14 08:23:17
503
0

1. 管道并行

DeepSpeed v0.3增加了对管道并行的新支持。管道并行将模型的层划分为阶段,可以并行处理,从而提高深度学习训练的内存和计算效率。DeepSpeed的训练引擎提供了混合数据和管道并行,并可进一步与模型并行(如Megatron-LM)结合使用。下面展示了3D并行的示例。最新结果表明,这种三维并行使得训练具有万亿参数的模型成为可能。

DeepSpeed使用梯度累积来提取管道并行性。每个训练数据批次被划分为可以由管道阶段并行处理的微批次。一旦一个阶段完成微批次的正向传递,激活内存就会传递到管道中的下一个阶段。同样地,当下一个阶段在微批次上完成反向传递时,关于激活的梯度就会通过管道向后传递。每个反向传递都会本地累积梯度。接下来,所有数据并行组都会并行执行梯度的缩减操作。最后,优化器会更新模型权重。

下面是一个用混合双向数据并行和两个阶段的管道并行来训练8个微批次的批次的示例。GPU 0和2排成管道,交替进行前向(F)和后向(B)传递。然后,它们将与其数据并行的对应部分GPU 1和3一起执行梯度的全局归约操作(AR)。最后,两个管道阶段会更新模型权重。

2.Pipline 并行开始

DeepSpeed致力于加速和简化管道并行训练过程。本节提供了使用混合数据和管道并行训练torchvision的AlexNet模型的第一步。

2.1 表示管道模型

管道并行需要将模型表示为层序列。在正向传递中,每个层都使用前一层的输出。事实上,对于管道并行模型,不需要指定forward()!管道并行模型的正向传递隐式采用以下形式:

def forward(self, inputs):
   x = inputs
   for layer in self.layers:
       x = layer(x)
   return x

PyTorch的torch.nn.Sequential是表达管道并行模型的方便容器,并且可以在无需修改的情况下由DeepSpeed并行化。

net = nn.Sequential(
   nn.Linear(in_features, hidden_dim),
   nn.ReLU(inplace=True),
   nn.Linear(hidden_dim, out_features)
)
from deepspeed.pipe import PipelineModule
net = PipelineModule(layers=net, num_stages=2)

PipelineModule使用其layers参数作为构成模型的层序列。初始化后,net被划分为两个管道阶段,其层被移动到相应的GPU上。如果存在多于两个GPU,则DeepSpeed还将使用混合数据并行。

Note:The total number of GPUs must be divisible by the number of pipeline stages.

2.2 AlexNet

让我们看一下torchvisionAlexNet的简化实现:

class AlexNet(nn.Module):
   def __init__(self, num_classes=1000):
       super(AlexNet, self).__init__()
       self.features = nn.Sequential(
           nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
           ...
           nn.MaxPool2d(kernel_size=3, stride=2),
      )
       self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
       self.classifier = nn.Sequential(
           nn.Dropout(),
           ...
           nn.Linear(4096, num_classes),
      )

   def forward(self, x):
       x = self.features(x)
       x = self.avgpool(x)
       x = torch.flatten(x, 1)
       x = self.classifier(x)
       return x

AlexNet主要是由几个Sequential子模块组成。我们可以将其子模块展开成单个层序列,并将其转换为PipelineModule

class AlexNetPipe(AlexNet):
   def to_layers(self):
       layers = [
           *self.features,
           self.avgpool,
           lambda x: torch.flatten(x, 1),
           *self.classifier
      ]
       return layers

from deepspeed.pipe import PipelineModule
net = AlexNetPipe()
net = PipelineModule(layers=net.to_layers(), num_stages=2)

2.3 输入和输出

torch.nn.Sequential之后,每个层的输入和输出必须是单个torch.Tensor或一个元组的张量。实际上,某些模型可能需要修改它们的正向传递以打包和解包forward()的参数。考虑一下Transformer block的简化实现:

class TransformerBlock(nn.Module)
  ...
  def forward(self, hidden, mask):
      output = self.compute(hidden, mask)
      return output
  ...

stack = [ TransformerBlock() for _ in range(num_layers) ]

需要对TransformerBlock进行两个修改:

  1. 参数必须被收集到一个元组中。

  2. 还必须从forward()返回mask以传递给下一层。

这些修改可以通过一个短的子类来完成:

class TransformerBlockPipe(TransformerBlock)
   def forward(self, inputs):
       hidden, mask = inputs
       output = super().forward(hidden, mask)
       return (output, mask)
stack = [ TransformerBlockPipe() for _ in range(num_layers) ]

2.4 训练循环

管道并行交替执行前向传递和反向传递,因此训练循环不能分为forward()backwardstep()等单独的阶段。相反,DeepSpeed的管道引擎提供了train_batch()方法,该方法推进管道引擎直到消耗下一批训练数据并更新模型权重。

train_iter = iter(train_loader)
loss = engine.train_batch(data_iter=train_iter)

以上train_batch()示例等效于以下传统数据并行DeepSpeed示例:

train_iter = iter(train_loader)
for micro_batch in engine.gradient_accumulation_steps():
   batch = next(data_iter)
   loss = engine(batch)
   engine.backward(loss)
   engine.step()

2.5 数据处理

通常,数据并行训练会使每个工作器在每个批次开始时独立执行IO。但是,在管道并行环境中,只有第一个阶段使用输入数据,只有最后一个阶段使用标签进行损失计算。

注意:管道引擎希望数据加载器返回两个项目的元组。第一个返回项是输入批次数据,第二个项目是用于计算损失的数据。与之前一样,输入和标签应为torch.Tensor类型或张量的元组。

出于方便起见,DeepSpeed管道引擎可以在向deepspeed.initialize()提供数据集时构建分布式数据加载器。DeepSpeed处理了数据加载的其余复杂性,因此管道训练循环变为:

engine, _, _, _ = deepspeed.initialize(
  args=args,
  model=net,
  model_parameters=[p for p in net.parameters() if p.requires_grad],
  training_data=cifar_trainset())

for step in range(args.steps):
  loss = engine.train_batch()

当然,DeepSpeed将与您希望使用的任何数据加载器一起工作。数据加载器应由管道中的第一个和最后一个阶段构建。每个工作器应加载engine.train_micro_batch_size_per_gpu()大小的微批次,并且每个train_batch()将查询engine.gradient_accumulation_steps()次。

请注意!管道引擎从迭代器中提取数据而不是对其进行迭代。在训练批次中间,数据流不能为空。每次调用train_batch()将从数据迭代器中拉取engine.gradient_accumulation_steps()个微批次数据。

DeepSpeed提供了一个方便的类deepspeed.utils.RepeatingLoader,它简单地包装可迭代对象(例如数据加载器),并在到达结尾时重新启动它:

train_loader = deepspeed.utils.RepeatingLoader(train_loader)
train_iter = iter(train_loader)
for step in range(args.steps):
   loss = engine.train_batch(data_iter=trainiter)

3. 进一步建议

3.1 管道模块的负载平衡

管道并行训练的性能强烈依赖于负载均衡。DeepSpeed提供了几种在GPU之间分配模型的机制。可以使用PipelineModulepartition_method关键字参数设置这些策略。以下是DeepSpeed当前提供的分区方法:

  • partition_method="parameters"(默认)将每个管道阶段上可训练参数的数量平衡起来。这在内存受限的环境和当层的大小与计算时间成比例时特别有用。

  • partition_method="type:[regex]"平衡类名与[regex]匹配的层。正则表达式不区分大小写。例如,partition_method="type:transformer"将平衡每个阶段中transformer层的数量。

  • partition_method="uniform"平衡每个阶段中的层数。

3.2 内存高效的模型构建

将一个 Sequential容器构建并提供给 PipelineModule 是指定管道并行模型的一种方便的方式。然而,对于大型模型,这种方法会遇到可扩展性问题,因为每个 worker 都会在 CPU 内存中复制整个模型。例如,一个具有 16 个 GPU 的机器必须拥有与模型大小的 16 倍相同的本地 CPU 内存。

DeepSpeed 提供了一个LayerSpec类,它延迟模块的构建,直到模型层次已经被分配给工作节点。然后每个 worker 只分配它所分配的层次。因此,与前一段中的示例进行比较,使用 LayerSpec,具有 16 个 GPU 的机器将需要在其 CPU 内存上分配总共 1x 模型大小而不是 16 倍。

以下是 AlexNet 模型的简略表示方式,但仅使用 LayerSpecs。请注意,语法几乎没有改变:nn.ReLU(inplace=True) 简单地变成了 LayerSpec(nn.ReLU, inplace=True)

from deepspeed.pipe import PipelineModule, LayerSpec
class AlexNetPipe(PipelineModule):
   def __init__(self, num_classes=10, **kwargs):
       self.num_classes = num_classes
       specs = [
           LayerSpec(nn.Conv2d, 3, 64, kernel_size=11, stride=4, padding=2),
           LayerSpec(nn.ReLU, inplace=True),
           ...
           LayerSpec(nn.ReLU, inplace=True),
           LayerSpec(nn.Linear, 4096, self.num_classes),
      ]
       super().__init__(layers=specs, loss_fn=nn.CrossEntropyLoss(), **kwargs)

 

某些模型不能完全表示为管道并行模型,因为一些层在管道内被重用。例如,基于 Transformer 的语言模型通常在管道早期使用嵌入层将词汇映射到隐藏状态,然后在管道末尾使用嵌入将隐藏状态映射回词汇。如果该模型受到纯管道并行性的限制,则这种嵌入重用将禁止管道并行性。

3.3 Tied Layers

某些模型不能完全表示为管道并行模型,因为一些层在管道内被重用。例如,基于 Transformer 的语言模型通常在管道早期使用嵌入层将词汇映射到隐藏状态,然后在管道末尾使用嵌入将隐藏状态映射回词汇。如果该模型受到纯管道并行性的限制,则这种嵌入重用将禁止管道并行性。

DeepSpeed 提供了TiedLayerSpec,它是 LayerSpec 的扩展。TiedLayerSpec需要一个额外的参数:key。每次重用层都使用 TiedLayerSpec进行指定,key字段用于识别层何处被重用。

被捆绑的层会在每个拥有“重用”实例的管道阶段中复制。然后训练将像平常一样进行,但在所有反向传递完成后,会添加一个额外的捆绑梯度的全局归约操作。这个全局归约确保了捆绑层的权重在管道阶段之间保持同步。

0条评论
0 / 1000
赵****斌
4文章数
0粉丝数
赵****斌
4 文章 | 0 粉丝
赵****斌
4文章数
0粉丝数
赵****斌
4 文章 | 0 粉丝
原创

DeepSpeed-Pipeline并行

2023-07-14 08:23:17
503
0

1. 管道并行

DeepSpeed v0.3增加了对管道并行的新支持。管道并行将模型的层划分为阶段,可以并行处理,从而提高深度学习训练的内存和计算效率。DeepSpeed的训练引擎提供了混合数据和管道并行,并可进一步与模型并行(如Megatron-LM)结合使用。下面展示了3D并行的示例。最新结果表明,这种三维并行使得训练具有万亿参数的模型成为可能。

DeepSpeed使用梯度累积来提取管道并行性。每个训练数据批次被划分为可以由管道阶段并行处理的微批次。一旦一个阶段完成微批次的正向传递,激活内存就会传递到管道中的下一个阶段。同样地,当下一个阶段在微批次上完成反向传递时,关于激活的梯度就会通过管道向后传递。每个反向传递都会本地累积梯度。接下来,所有数据并行组都会并行执行梯度的缩减操作。最后,优化器会更新模型权重。

下面是一个用混合双向数据并行和两个阶段的管道并行来训练8个微批次的批次的示例。GPU 0和2排成管道,交替进行前向(F)和后向(B)传递。然后,它们将与其数据并行的对应部分GPU 1和3一起执行梯度的全局归约操作(AR)。最后,两个管道阶段会更新模型权重。

2.Pipline 并行开始

DeepSpeed致力于加速和简化管道并行训练过程。本节提供了使用混合数据和管道并行训练torchvision的AlexNet模型的第一步。

2.1 表示管道模型

管道并行需要将模型表示为层序列。在正向传递中,每个层都使用前一层的输出。事实上,对于管道并行模型,不需要指定forward()!管道并行模型的正向传递隐式采用以下形式:

def forward(self, inputs):
   x = inputs
   for layer in self.layers:
       x = layer(x)
   return x

PyTorch的torch.nn.Sequential是表达管道并行模型的方便容器,并且可以在无需修改的情况下由DeepSpeed并行化。

net = nn.Sequential(
   nn.Linear(in_features, hidden_dim),
   nn.ReLU(inplace=True),
   nn.Linear(hidden_dim, out_features)
)
from deepspeed.pipe import PipelineModule
net = PipelineModule(layers=net, num_stages=2)

PipelineModule使用其layers参数作为构成模型的层序列。初始化后,net被划分为两个管道阶段,其层被移动到相应的GPU上。如果存在多于两个GPU,则DeepSpeed还将使用混合数据并行。

Note:The total number of GPUs must be divisible by the number of pipeline stages.

2.2 AlexNet

让我们看一下torchvisionAlexNet的简化实现:

class AlexNet(nn.Module):
   def __init__(self, num_classes=1000):
       super(AlexNet, self).__init__()
       self.features = nn.Sequential(
           nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
           ...
           nn.MaxPool2d(kernel_size=3, stride=2),
      )
       self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
       self.classifier = nn.Sequential(
           nn.Dropout(),
           ...
           nn.Linear(4096, num_classes),
      )

   def forward(self, x):
       x = self.features(x)
       x = self.avgpool(x)
       x = torch.flatten(x, 1)
       x = self.classifier(x)
       return x

AlexNet主要是由几个Sequential子模块组成。我们可以将其子模块展开成单个层序列,并将其转换为PipelineModule

class AlexNetPipe(AlexNet):
   def to_layers(self):
       layers = [
           *self.features,
           self.avgpool,
           lambda x: torch.flatten(x, 1),
           *self.classifier
      ]
       return layers

from deepspeed.pipe import PipelineModule
net = AlexNetPipe()
net = PipelineModule(layers=net.to_layers(), num_stages=2)

2.3 输入和输出

torch.nn.Sequential之后,每个层的输入和输出必须是单个torch.Tensor或一个元组的张量。实际上,某些模型可能需要修改它们的正向传递以打包和解包forward()的参数。考虑一下Transformer block的简化实现:

class TransformerBlock(nn.Module)
  ...
  def forward(self, hidden, mask):
      output = self.compute(hidden, mask)
      return output
  ...

stack = [ TransformerBlock() for _ in range(num_layers) ]

需要对TransformerBlock进行两个修改:

  1. 参数必须被收集到一个元组中。

  2. 还必须从forward()返回mask以传递给下一层。

这些修改可以通过一个短的子类来完成:

class TransformerBlockPipe(TransformerBlock)
   def forward(self, inputs):
       hidden, mask = inputs
       output = super().forward(hidden, mask)
       return (output, mask)
stack = [ TransformerBlockPipe() for _ in range(num_layers) ]

2.4 训练循环

管道并行交替执行前向传递和反向传递,因此训练循环不能分为forward()backwardstep()等单独的阶段。相反,DeepSpeed的管道引擎提供了train_batch()方法,该方法推进管道引擎直到消耗下一批训练数据并更新模型权重。

train_iter = iter(train_loader)
loss = engine.train_batch(data_iter=train_iter)

以上train_batch()示例等效于以下传统数据并行DeepSpeed示例:

train_iter = iter(train_loader)
for micro_batch in engine.gradient_accumulation_steps():
   batch = next(data_iter)
   loss = engine(batch)
   engine.backward(loss)
   engine.step()

2.5 数据处理

通常,数据并行训练会使每个工作器在每个批次开始时独立执行IO。但是,在管道并行环境中,只有第一个阶段使用输入数据,只有最后一个阶段使用标签进行损失计算。

注意:管道引擎希望数据加载器返回两个项目的元组。第一个返回项是输入批次数据,第二个项目是用于计算损失的数据。与之前一样,输入和标签应为torch.Tensor类型或张量的元组。

出于方便起见,DeepSpeed管道引擎可以在向deepspeed.initialize()提供数据集时构建分布式数据加载器。DeepSpeed处理了数据加载的其余复杂性,因此管道训练循环变为:

engine, _, _, _ = deepspeed.initialize(
  args=args,
  model=net,
  model_parameters=[p for p in net.parameters() if p.requires_grad],
  training_data=cifar_trainset())

for step in range(args.steps):
  loss = engine.train_batch()

当然,DeepSpeed将与您希望使用的任何数据加载器一起工作。数据加载器应由管道中的第一个和最后一个阶段构建。每个工作器应加载engine.train_micro_batch_size_per_gpu()大小的微批次,并且每个train_batch()将查询engine.gradient_accumulation_steps()次。

请注意!管道引擎从迭代器中提取数据而不是对其进行迭代。在训练批次中间,数据流不能为空。每次调用train_batch()将从数据迭代器中拉取engine.gradient_accumulation_steps()个微批次数据。

DeepSpeed提供了一个方便的类deepspeed.utils.RepeatingLoader,它简单地包装可迭代对象(例如数据加载器),并在到达结尾时重新启动它:

train_loader = deepspeed.utils.RepeatingLoader(train_loader)
train_iter = iter(train_loader)
for step in range(args.steps):
   loss = engine.train_batch(data_iter=trainiter)

3. 进一步建议

3.1 管道模块的负载平衡

管道并行训练的性能强烈依赖于负载均衡。DeepSpeed提供了几种在GPU之间分配模型的机制。可以使用PipelineModulepartition_method关键字参数设置这些策略。以下是DeepSpeed当前提供的分区方法:

  • partition_method="parameters"(默认)将每个管道阶段上可训练参数的数量平衡起来。这在内存受限的环境和当层的大小与计算时间成比例时特别有用。

  • partition_method="type:[regex]"平衡类名与[regex]匹配的层。正则表达式不区分大小写。例如,partition_method="type:transformer"将平衡每个阶段中transformer层的数量。

  • partition_method="uniform"平衡每个阶段中的层数。

3.2 内存高效的模型构建

将一个 Sequential容器构建并提供给 PipelineModule 是指定管道并行模型的一种方便的方式。然而,对于大型模型,这种方法会遇到可扩展性问题,因为每个 worker 都会在 CPU 内存中复制整个模型。例如,一个具有 16 个 GPU 的机器必须拥有与模型大小的 16 倍相同的本地 CPU 内存。

DeepSpeed 提供了一个LayerSpec类,它延迟模块的构建,直到模型层次已经被分配给工作节点。然后每个 worker 只分配它所分配的层次。因此,与前一段中的示例进行比较,使用 LayerSpec,具有 16 个 GPU 的机器将需要在其 CPU 内存上分配总共 1x 模型大小而不是 16 倍。

以下是 AlexNet 模型的简略表示方式,但仅使用 LayerSpecs。请注意,语法几乎没有改变:nn.ReLU(inplace=True) 简单地变成了 LayerSpec(nn.ReLU, inplace=True)

from deepspeed.pipe import PipelineModule, LayerSpec
class AlexNetPipe(PipelineModule):
   def __init__(self, num_classes=10, **kwargs):
       self.num_classes = num_classes
       specs = [
           LayerSpec(nn.Conv2d, 3, 64, kernel_size=11, stride=4, padding=2),
           LayerSpec(nn.ReLU, inplace=True),
           ...
           LayerSpec(nn.ReLU, inplace=True),
           LayerSpec(nn.Linear, 4096, self.num_classes),
      ]
       super().__init__(layers=specs, loss_fn=nn.CrossEntropyLoss(), **kwargs)

 

某些模型不能完全表示为管道并行模型,因为一些层在管道内被重用。例如,基于 Transformer 的语言模型通常在管道早期使用嵌入层将词汇映射到隐藏状态,然后在管道末尾使用嵌入将隐藏状态映射回词汇。如果该模型受到纯管道并行性的限制,则这种嵌入重用将禁止管道并行性。

3.3 Tied Layers

某些模型不能完全表示为管道并行模型,因为一些层在管道内被重用。例如,基于 Transformer 的语言模型通常在管道早期使用嵌入层将词汇映射到隐藏状态,然后在管道末尾使用嵌入将隐藏状态映射回词汇。如果该模型受到纯管道并行性的限制,则这种嵌入重用将禁止管道并行性。

DeepSpeed 提供了TiedLayerSpec,它是 LayerSpec 的扩展。TiedLayerSpec需要一个额外的参数:key。每次重用层都使用 TiedLayerSpec进行指定,key字段用于识别层何处被重用。

被捆绑的层会在每个拥有“重用”实例的管道阶段中复制。然后训练将像平常一样进行,但在所有反向传递完成后,会添加一个额外的捆绑梯度的全局归约操作。这个全局归约确保了捆绑层的权重在管道阶段之间保持同步。

文章来自个人专栏
pipline 并行
1 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0