From eb43948636b99c27e573d99b0b0ebed05a8eb35b Mon Sep 17 00:00:00 2001 From: x54-729 <17307130121@fudan.edu.cn> Date: Thu, 30 Jun 2022 07:23:37 +0000 Subject: [PATCH] =?UTF-8?q?deepspeed=20=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fastNLP/core/drivers/torch_driver/deepspeed.py | 98 +++++++++++++++++++++++--- 1 file changed, 87 insertions(+), 11 deletions(-) diff --git a/fastNLP/core/drivers/torch_driver/deepspeed.py b/fastNLP/core/drivers/torch_driver/deepspeed.py index a99a42f8..aedff1e9 100644 --- a/fastNLP/core/drivers/torch_driver/deepspeed.py +++ b/fastNLP/core/drivers/torch_driver/deepspeed.py @@ -29,6 +29,81 @@ __all__ = [ ] class DeepSpeedDriver(TorchDDPDriver): + """ + 实现 ``deepspeed`` 分布式训练的 ``Driver``。 + + .. note:: + + 您在绝大多数情况下不需要自己使用到该类,通过向 ``Trainer`` 传入正确的参数,您可以方便快速地部署您的分布式训练; + + ``DeepSpeedDriver`` 目前支持的三种启动方式: + + 1. 用户自己不进行任何操作,直接使用我们的 ``Trainer``,这时是由我们自己使用 ``open_subprocesses`` 拉起多个进程, + 然后 ``DeepSpeedDriver`` 自己通过调用 ``deepspeed.initialize`` 来初始化模型和同心组;(情况 A) + + .. code-block:: + + trainer = Trainer( + ... + driver='deepspeed', + device=[0, 1] + ) + trainer.run() + + 通过运行 ``python train.py`` 启动; + + 2. 用户同样不在 ``Trainer`` 之外初始化 ``deepspeed``,但是用户自己使用 ``python -m torch.distributed.launch`` 拉起来创建多个进程,这时我们仍旧 + 会通过调用 ``model.initialize`` 来初始化 ``ddp`` 的通信组;(情况 B) + + .. code-block:: + + trainer = Trainer( + ... + driver='deepspeed', + device=None + ) + trainer.run() + + 通过运行 ``deepspeed train.py`` 启动; + + 3. 用户自己在外面初始化 ``deepspeed``,并且通过 ``deepspeed train.py`` 拉起,这时无论是多个进程的拉起和通信组的建立 + 都由用户自己操作,我们只会在 ``driver.setup`` 的时候对 ``DeepSpeedDriver`` 设置一些必要的属性值;(情况 C) + + .. code-block:: + + import deepspeed + + # 初始化 + model, _, _, _ = deepspeed.initialize(model, ...) + + trainer = Trainer( + ... + driver='deepspeed', + device=None + ) + trainer.run() + + 通过运行 ``deepspeed train.py`` 启动; + + :param model: 传入给 ``Trainer`` 的 ``model`` 参数; + :param parallel_device: 用于分布式训练的 ``gpu`` 设备; + :param is_pull_by_torch_run: 标志当前的脚本的启动是否由 ``python -m torch.distributed.launch`` 启动的; + :param fp16: 是否开启 fp16 训练; + :param deepspeed_kwargs: + * *strategy* -- 使用 ZeRO 优化的策略,默认为 ``deepspeed``;目前仅支持以下值: + + * ``deepspeed`` -- 使用 ZeRO 的第二阶段,等同于 ``deepspeed_stage_2``; + * ``deepspeed_stage_1`` -- 使用 ZeRO 的第一阶段,仅将 ``optimizer`` 的状态分散到不同设备上; + * ``deepspeed_stage_2`` -- 使用 ZeRO 的第二阶段,将 ``optimizer`` 和**梯度**分散到不同设备上; + * ``deepspeed_stage_2_offload`` -- 使用 ZeRO 的第二阶段,并且借助 cpu 的内存来进一步节约显存; + * ``deepspeed_stage_3`` -- 使用 ZeRO 的第三阶段,将 ``optimizer`` 、**梯度**和**模型**分散到不同设备上; + * ``deepspeed_stage_3_offload`` -- 使用 ZeRO 的第三阶段,并且借助 cpu 的内存来进一步节约显存; + * ``deepspeed_stage_3_offload_nvme`` -- 使用 ZeRO 的第三阶段,并且借助 NVMe 硬盘来进一步节约显存; + * *logging_level* -- ``deepspeed`` 库的日志等级,默认为 **logging.ERROR**; + * *config* -- ``deepspeed`` 的各项设置;**FastNLP** 允许用户传入自己的设置以增强灵活性,但这会使参数 + 中的 ``optimizer`` 、``strategy`` 、 ``fp16`` 等失效,即当这个参数存在时,**FastNLP** 会用该参数覆盖 + 其它的设置; + """ # TODO fp16 load_config def __init__( self, @@ -36,11 +111,13 @@ class DeepSpeedDriver(TorchDDPDriver): parallel_device: Union[List["torch.device"], "torch.device"], is_pull_by_torch_run = False, fp16: bool = False, + deepspeed_kwargs: Dict = {}, **kwargs ): assert _NEED_IMPORT_DEEPSPEED, "Deepspeed is not imported." - # assert not dist.is_initialized(), "DeepSpeedDriver does not support initialize distributed by user." - TorchDriver.__init__(self, model=model, fp16=False, **kwargs) + kwargs.pop("torch_kwargs", None) + self._ds_kwargs = deepspeed_kwargs + TorchDriver.__init__(self, model=model, fp16=False, torch_kwargs=deepspeed_kwargs, **kwargs) self.fp16 = fp16 # 如果用户自己在外面初始化 DDP,那么其一定是通过 python -m torch.distributed.launch 拉起的; @@ -108,7 +185,6 @@ class DeepSpeedDriver(TorchDDPDriver): "to 1 for deepspeed configuration.") self.train_micro_batch_size = 1 - self._ds_kwargs = kwargs.get("deepspeed_kwargs", {}) self.strategy = self._ds_kwargs.get("strategy", "deepspeed") deepspeed_logging_level = self._ds_kwargs.get("logging_level", logging.ERROR) deepspeed.utils.logging.logger.setLevel(deepspeed_logging_level) @@ -125,7 +201,7 @@ class DeepSpeedDriver(TorchDDPDriver): 准备分布式环境,该函数主要做以下两件事情: 1. 开启多进程,每个 gpu 设备对应单独的一个进程; - 2. 每个进程将模型迁移到自己对应的 ``gpu`` 设备上;然后使用 ``DistributedDataParallel`` 包裹模型; + 2. 使用 ``deepspeed.initialize`` 包裹模型; """ if len(self.optimizers) != 1: raise ValueError("Multi optimizers is not supported for `DeepSpeedDriver` right now.") @@ -160,15 +236,15 @@ class DeepSpeedDriver(TorchDDPDriver): self.open_subprocess() self.global_rank = self.local_rank # rank 一定是通过环境变量去获取的; deepspeed.init_distributed("nccl", distributed_port=self.master_port) - # 用户在这个 trainer 前面又初始化了一个 trainer,并且使用的是 TorchDDPDriver; + # 用户在这个 trainer 前面又初始化了一个 trainer,并且使用的是 DeepSpeedDriver; else: - # 如果 `dist.is_initialized() == True`,那么说明 TorchDDPDriver 在之前已经初始化并且已经 setup 过一次,那么我们需要保证现在 - # 使用的(即之后的)TorchDDPDriver 的设置和第一个 TorchDDPDriver 是完全一样的; + # 如果 `dist.is_initialized() == True`,那么说明 DeepSpeedDriver 在之前已经初始化并且已经 setup 过一次,那么我们需要保证现在 + # 使用的(即之后的)DeepSpeedDriver 的设置和第一个 DeepSpeedDriver 是完全一样的; pre_num_processes = int(os.environ[FASTNLP_DISTRIBUTED_CHECK]) if pre_num_processes != len(self.parallel_device): raise RuntimeError( - "Notice you are using `TorchDDPDriver` after one instantiated `TorchDDPDriver`, it is not" - "allowed that your second `TorchDDPDriver` has a new setting of parameters " + "Notice you are using `DeepSpeedDriver` after one instantiated `DeepSpeedDriver`, it is not" + "allowed that your second `DeepSpeedDriver` has a new setting of parameters " "`num_nodes` and `num_processes`.") self.world_size = dist.get_world_size() self.global_rank = dist.get_rank() @@ -302,7 +378,7 @@ class DeepSpeedDriver(TorchDDPDriver): 保存当前 driver 的模型到 folder 下。 :param filepath: 保存到哪个文件夹; - :param only_state_dict: 是否只保存权重; + :param only_state_dict: 是否只保存权重;在 ``DeepSpeedDriver`` 中该参数无效; :return: """ # deepspeed engine 要求在每个 rank 都调用 save_checkpoint,故去掉了 rank_zero_call 装饰器 @@ -325,7 +401,7 @@ class DeepSpeedDriver(TorchDDPDriver): 从 folder 中加载权重并赋值到当前 driver 的模型上。 :param filepath: 加载权重或模型的路径 - :param load_state_dict: 保存的内容是否只是权重。 + :param load_state_dict: 保存的内容是否只是权重;在 ``DeepSpeedDriver`` 中该参数无效; :param kwargs: :return: """