概念
Agent :Agent是运行在单节点上的独立后台进程,可以认为是 worker manager 或者 process supervisor,其负责启动worker,监控 worker 运行,捕获woker异常,通过 rendezvous
实现 worker 间的相互发现(比如把状态上报到KVStore),成员变动时候基于 rendezvous
进行变更同步等等。
Rendezvous :为了实现弹性训练,需要有一个节点/进程之间彼此发现的机制。Rendezvous就是这个发现机制或者说同步组件。当系统启动或者成员变更时候,所有worker会(重新)集合(rendezvous)以建立一个新的进程组。
PET
PyTorch 1.9 使用 torch/distributed/run.py 进行启动。torch.distributed.run
是之前torch.distributed.launch
的一个超集,提供如下新功能:
- 容错:通过重新启动所有workers,可以优雅地处理worker故障。
- 自动:Worker 的
RANK
和WORLD_SIZE
是自动分配的。 -
弹性:允许在最小值和最大值(弹性)之间更改节点数。
部署pytorchJob在被Reconcile时,
部署一般按照如下方式。
- (C10d后端不需要)启动 rendezvous 后端服务器,并获取端点(作为
--rdzv_endpoint
传递给启动程序脚本) - 单节点多 worker:在主机上启动 launcher 以启动代理进程,代理会创建并监视本地工作组。
- 多节点多 worker:在所有节点上使用相同的参数启动 launcher 参加训练。
当使用作业/群集管理器时,多节点作业的入口点命令应为 launcher。
单节点多worker启动
单节点多worker的启动方式如下,其实就是Standalone 模式,这是分布式模式的一种特例,具体就是针对单机多 Worker 提供了一些便利设置。
python - m torch.distributed.run - - standalone - - nnodes = 1 - - nproc_per_node = $NUM_TRAINERS YOUR_TRAINING_SCRIPT.py ( - - arg1 ... train script args...) |
容错方式启动
如下是容错方式启动,固定数目workers,没有弹性训练。 --nproc_per_node=$NUM_TRAINERS 一般是 单节点上GPU 个数。
python - m torch.distributed.run - - nnodes = $NUM_NODES - - nproc_per_node = $NUM_TRAINERS - - rdzv_id = $JOB_ID - - rdzv_backend = c10d - - rdzv_endpoint = $HOST_NODE_ADDR YOUR_TRAINING_SCRIPT.py ( - - arg1 ... train script args...) |
HOST_NODE_ADDR
, 的格式是: [:] ,指定了 C10d rendezvous 后端所运行的节点地址和端口,这个节点可以是训练集群中任意节点,但是最好找一个高带宽的节点。
弹性方式启动
下面是弹性训练,弹性区间为 (min=1
, max=4
)。通过指定rdzv参数,可以实现多机训练,具备容错与弹性能力。
在多台机器上分别执行以下命令启动:最小节点数为MIN_SIZE,最大为MAX_SIZE,利用etcd服务实现一致性和信息同步。
python - m torch.distributed.run - - nnodes = 1 : 4 - - nproc_per_node = $NUM_TRAINERS - - rdzv_id = $JOB_ID - - rdzv_backend = c10d - - rdzv_endpoint = $HOST_NODE_ADDR YOUR_TRAINING_SCRIPT.py ( - - arg1 ... train script args...) |
HOST_NODE_ADDR
, 的格式是: [:] ,指定了 C10d rendezvous 后端所运行的节点地址和端口,这个节点可以是训练集群中任意节点,但是最好找一个高带宽的节点。
关于 rendezvous backend,有几点说明:
对于多节点训练,需要指定:
--rdzv_id
: 一个唯一的 job id,在参与job的所有节点之间共享。--rdzv_backend
:torch.distributed.elastic.rendezvous.RendezvousHandler
的一个实现。 (--rdzv_backend
默认是static模式,不支持容错和弹性伸缩)--rdzv_endpoint
: rendezvous backend 所运行的 endpoint,通常格式为:host:port
。就是取代了之前的 master address / port 设置。
目前,以下几种后端可以直接使用,c10d
(推荐), etcd-v2
, and etcd
(legacy) 。为了使用 etcd-v2
或者 etcd
,需要搭建一个 v2
api开启的 etcd server (即. --enable-v2
)。
PyTorchJob支持
示例YAML
apiVersion: "ctyun.cn/v1" kind: PyTorchJob metadata: name: elastic-example spec: elasticPolicy: rdzvBackend: c10d minReplicas: 1 maxReplicas: 3 maxRestarts: 100 metrics: - type : Resource resource: name: cpu target: type: Utilization averageUtilization: 80 pytorchReplicaSpecs: Worker: replicas: 2 restartPolicy: OnFailure template: spec: containers: - name : pytorch image: pytorch-elastic-example-imagenet : latest imagePullPolicy: IfNotPresent resources: requests: cpu: 4 env: - name : LOGLEVEL value: DEBUG command: - python - -m - torch.distributed.run - /workspace/examples/imagenet.py - "--arch=resnet18" - "--epochs=1" - "--batch-size=32" - "--workers=0" - "/workspace/data/tiny-imagenet-200" |
`minReplicas` and `maxReplicas` are used to generate `-nnodes` cpnfiguration when `ElasticPolicy` is specified. --nnodes NNODES Number of nodes, or the range of nodes in form <minimum_nodes>:<maximum_nodes>.
PET支持
环境变量
根据elasticPolicy的设置,为Pod创建对应的环境变量。
There is no need to set `RANK`, `WORLD_SIZE`, `MASTER_ADDR`, `MASTER_PORT` if TorchElastic is used.
// EnvRDZVBackend is the environment variable name for the rdzv backend. EnvRDZVBackend = "PET_RDZV_BACKEND" // EnvRDZVID is the environment variable name for the rdzv id. EnvRDZVID = "PET_RDZV_ID" // ENVRDZVConf is the environment variable name for the rdzv conf. EnvRDZVConf = "PET_RDZV_CONF" // EnvRDZVEndpoint is the environment variable name for the rdzv endpoint. EnvRDZVEndpoint = "PET_RDZV_ENDPOINT" // EnvRDZVStandalone is the environment variable name for the standalone mode. EnvStandalone = "PET_STANDALONE" // User-code launch related arguments. // EnvMaxRestarts is the environment variable name for the maximum number of worker group restarts before failing. EnvMaxRestarts = "PET_MAX_RESTARTS" // EnvMonitorInterval is the environment variable name for the interval, in seconds, to monitor the state of workers. EnvMonitorInterval = "PET_MONITOR_INTERVAL" // EnvStartMethod is the environment variable name for the multiprocessing start method to use when creating workers, which could be fork, spawn and forkserver. EnvStartMethod = "PET_START_METHOD" // Worker/node size related arguments. // EnvNProcPerNode is the environment variable name for the number of processes per node. EnvNProcPerNode = "PET_N_PROC_PER_NODE" // EnvNNodes is the environment variable name for the number of nodes. EnvNNodes = "PET_NNODES" |
Pod示例
apiVersion: v1 kind: Pod metadata: name: $ { pytorchjob.metadata.name } -worker-1 spec: containers: - command : - python - -m - torch.distributed.run - /workspace/examples/imagenet.py - --arch=resnet18 - --epochs=20 - --batch-size=32 - --workers=0 - /workspace/data/tiny-imagenet-200 env: - name : LOGLEVEL value: DEBUG - name : PYTHONUNBUFFERED value: "0" - name : PET_RDZV_ENDPOINT value: elastic-example-worker-0 : 29400 - name : PET_RDZV_BACKEND value: c10d - name : PET_NODES value: "1:2" - name : PET_MAX_RESTARTS value: "100" image: pytorch-elastic-example : 1.0.0 imagePullPolicy: IfNotPresent name: pytorch ports: - containerPort : 29400 name: pytorchjob-port protocol: TCP |
HPA支持
pytorchJob在被Reconcile时,会根据elasticPolicy生成对应的HPA资源,并提交给k8s,转换关系如下:
func desiredHPA(pytorchJob *kubeflowv1.PyTorchJob, scheme *runtime.Scheme) ( *autoscalingv2.HorizontalPodAutoscaler, error) { hpa := &autoscalingv2.HorizontalPodAutoscaler{ ObjectMeta: metav1.ObjectMeta{ Name: pytorchJob.Name, Namespace: pytorchJob.Namespace, }, Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{ Kind: pytorchJob.Kind, Name: pytorchJob.Name, APIVersion: pytorchJob.APIVersion, }, MinReplicas: pytorchJob.Spec.ElasticPolicy.MinReplicas, MaxReplicas: *pytorchJob.Spec.ElasticPolicy.MaxReplicas, Metrics: pytorchJob.Spec.ElasticPolicy.Metrics, }, } if err := controllerruntime.SetControllerReference(pytorchJob, hpa, scheme); err != nil { return nil, err } return hpa, nil } |