From efc2675741da3740d255a5d12aed2413d31e86dc Mon Sep 17 00:00:00 2001 From: x54-729 <17307130121@fudan.edu.cn> Date: Wed, 25 May 2022 17:03:12 +0000 Subject: [PATCH] =?UTF-8?q?1.=E4=BF=AE=E6=94=B9Driver.py=E7=9A=84=E5=B0=8F?= =?UTF-8?q?=E9=83=A8=E5=88=86=E6=96=87=E6=A1=A3=EF=BC=9B2.=E5=AE=8C?= =?UTF-8?q?=E5=96=84=20JittorDriver=20JittorSingleDriver=20=E7=9A=84?= =?UTF-8?q?=E9=83=A8=E5=88=86=E5=9F=BA=E7=A1=80=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fastNLP/core/drivers/driver.py | 3 +- .../core/drivers/jittor_driver/jittor_driver.py | 168 +++++++++++++++------ fastNLP/core/drivers/jittor_driver/mpi.py | 15 +- .../core/drivers/jittor_driver/single_device.py | 54 ++++--- fastNLP/core/drivers/jittor_driver/utils.py | 50 ------ .../core/drivers/paddle_driver/paddle_driver.py | 9 +- 6 files changed, 178 insertions(+), 121 deletions(-) diff --git a/fastNLP/core/drivers/driver.py b/fastNLP/core/drivers/driver.py index bd06e705..fda1e6b1 100644 --- a/fastNLP/core/drivers/driver.py +++ b/fastNLP/core/drivers/driver.py @@ -41,7 +41,7 @@ class Driver(ABC): r""" 根据输入的 ``dataloader`` 得到一个 支持分布式 (``distributed``) 与 可复现的 (``reproducible``) 的 dataloader。 - :param dataloader: 根据 ``dataloade``r 设置其对应的分布式版本以及可复现版本; + :param dataloader: 根据 ``dataloader`` 设置其对应的分布式版本以及可复现版本; :param dist: 应当为一个字符串,其值应当为以下之一:``[None, "dist", "unrepeatdist"]``;为 ``None`` 时,表示不需要考虑当前 dataloader 切换为分布式状态;为 ``dist`` 时,表示该 dataloader 应该保证每个 gpu 上返回的 batch 的数量是一样多的,允许出现少量 sample ,在 不同 gpu 上出现重复;为 ``unrepeatdist`` 时,表示该 dataloader 应该保证所有 gpu 上迭代出来的数据合并起来应该刚好等于原始的 @@ -263,7 +263,6 @@ class Driver(ABC): :param filepath: 需要被加载的对象的文件位置(需要包括文件名)或一个 ``BytesIO`` 对象; :param load_state_dict: 保存的文件是否只是模型的权重,还是完整的模型。即便是保存的完整的模型,此处也只能使用尝试加载filepath 模型中的权重到自身模型,而不会直接替代当前 Driver 中的模型。 - :return: 返回加载指定文件后的结果; """ raise NotImplementedError("Each specific driver should implemented its own `load_model` function.") diff --git a/fastNLP/core/drivers/jittor_driver/jittor_driver.py b/fastNLP/core/drivers/jittor_driver/jittor_driver.py index 4f7f23bd..0dd6d0fb 100644 --- a/fastNLP/core/drivers/jittor_driver/jittor_driver.py +++ b/fastNLP/core/drivers/jittor_driver/jittor_driver.py @@ -1,13 +1,17 @@ import os -import warnings -from typing import Optional, Callable, Dict +import random +from pathlib import Path +from typing import Union, Optional +from functools import partial + +import numpy as np -from .utils import _build_fp16_env from fastNLP.envs.imports import _NEED_IMPORT_JITTOR from fastNLP.core.drivers.driver import Driver from fastNLP.core.dataloaders import JittorDataLoader from fastNLP.core.log import logger from fastNLP.core.utils import apply_to_collection +from fastNLP.envs import FASTNLP_GLOBAL_RANK, FASTNLP_SEED_WORKERS if _NEED_IMPORT_JITTOR: import jittor as jt @@ -47,17 +51,18 @@ class JittorDriver(Driver): f"`jittor.Module` type.") super(JittorDriver, self).__init__(model) - self.auto_cast, _grad_scaler = _build_fp16_env(dummy=not fp16) - self.grad_scaler = _grad_scaler() + if fp16: + jt.flags.auto_mixed_precision_level = 6 + else: + jt.flags.auto_mixed_precision_level = 0 + self.fp16 = fp16 # 用来设置是否关闭 auto_param_call 中的参数匹配问题; self.wo_auto_param_call = kwargs.get("model_wo_auto_param_call", False) def check_dataloader_legality(self, dataloader): - # 在fastnlp中实现了JittorDataLoader - if not isinstance(dataloader, Dataset): - raise TypeError(f"{Dataset} is expected, instead of `{type(dataloader)}`") - + if not isinstance(dataloader, (Dataset, JittorDataLoader)): + raise TypeError(f"{Dataset} or {JittorDataLoader} is expected, instead of `{type(dataloader)}`") @staticmethod def _check_optimizer_legality(optimizers): @@ -66,54 +71,102 @@ class JittorDriver(Driver): raise ValueError(f"Each optimizer of parameter `optimizers` should be 'jittor.optim.Optimizer' type, " f"not {type(each_optimizer)}.") - def check_evaluator_mode(self, mode: str): + def step(self): + for optimizer in self.optimizers: + optimizer.step() + + def backward(self, loss): + for optimizer in self.optimizers: + optimizer.backward(loss) + + def zero_grad(self): + for optimizer in self.optimizers: + optimizer.zero_grad() + + def save_model(self, filepath: Union[str, Path], only_state_dict: bool = True, **kwargs): + r""" + 将模型保存到 ``filepath`` 中。 + + :param filepath: 保存文件的文件位置(需要包括文件名); + :param only_state_dict: 在 **Jittor** 中,该参数无效,**Jittor** 仅支持保存模型的 ``state_dict``。 + """ + if not only_state_dict: + logger.rank_zero_warning( + "Jittor only supports saving state_dict, and we will also save state_dict for you.", + once=True + ) + if isinstance(filepath, Path): + filepath = str(filepath) model = self.unwrap_model() - if mode == "evaluate": - if not hasattr(model, "evaluate_step"): - if hasattr(model, "test_step"): - logger.warning_once( - "Your model does not have 'evaluate_step' method but has 'test_step' method, but you" - "are using 'evaluate_fn=validate', we are going to use 'test_step' to substitute for" - "'evaluate_step'.") + model.save(filepath) - else: - if not hasattr(model, "test_step"): - if hasattr(model, "evaluate_step"): - logger.warning_once("Your model does not have 'test_step' method but has 'validate' method, but you" - "are using 'evaluate_fn=test', we are going to use 'evaluate_step' to substitute for" - "'test_step'.") - - def save_model(self, filepath: str, only_state_dict: bool = False, model_save_fn: Optional[Callable]=None): - if model_save_fn is not None: - outputs = model_save_fn(filepath) - if outputs is not None: - jt.save(outputs, filepath) - else: - if only_state_dict: - states = self.model.state_dict() - else: - warnings.warn("Saving the whole model is not supported now in Jittor. Save state dict instead.") - jt.save(states, filepath) + def load_model(self, filepath: Union[Path, str], only_state_dict: bool = True, **kwargs): + r""" + 加载模型的函数;将 ``filepath`` 中的模型加载并赋值给当前 ``model`` 。 - def load_model(self, filepath: str): - if not os.path.exists(filepath): - raise FileNotFoundError("Checkpoint at {} not found.".format(filepath)) - return jt.load(filepath) + :param filepath: 保存文件的文件位置(需要包括文件名); + :param load_state_dict: 在 **Jittor** 中,该参数无效,**Jittor** 仅支持加载模型的 ``state_dict``。 + """ + if not only_state_dict: + logger.rank_zero_warning( + "Jittor only supports loading state_dict, and we will also load state_dict for you.", + once=True + ) + if isinstance(filepath, Path): + filepath = str(filepath) + model = self.unwrap_model() + model.load(filepath) def save_checkpoint(self): ... + def get_optimizer_state(self): + # optimizers_state_dict = {} + # for i in range(len(self.optimizers)): + # optimizer: torch.optim.Optimizer = self.optimizers[i] + # optimizer_state = optimizer.state_dict() + # optimizer_state["state"] = optimizer_state_to_device(optimizer_state["state"], torch.device("cpu")) + # optimizers_state_dict[f"optimizer{i}"] = optimizer_state # 注意这里没有使用 deepcopy,测试是不需要的; + # return optimizers_state_dict + ... + + def load_optimizer_state(self, states): + # assert len(states) == len(self.optimizers), f"The number of optimizers is:{len(self.optimizers)}, while in " \ + # f"checkpoint it is:{len(states)}" + # for i in range(len(self.optimizers)): + # optimizer: torch.optim.Optimizer = self.optimizers[i] + # optimizer.load_state_dict(states[f"optimizer{i}"]) + # logger.debug("Load optimizer state dict.") + ... + def load_checkpoint(self): ... def get_evaluate_context(self): return jt.no_grad - def get_model_device(self): - return self.model_device + @staticmethod + def move_model_to_device(model: "jt.Module", device): + r""" + 将模型转移到指定的设备上。由于 **Jittor** 会自动为数据分配设备,因此该函数实际上无效。 + """ + ... + + def move_data_to_device(self, batch): + """ + 将数据 ``batch`` 转移到指定的设备上。由于 **Jittor** 会自动为数据分配设备,因此该函数实际上无效。 + """ + return batch @staticmethod def tensor_to_numeric(tensor, reduce=None): + r""" + 将一个 :class:`jittor.Var` 对象转换为 转换成 python 中的数值类型; + + :param tensor: :class:`jittor.Var` 类型的对象; + :param reduce: 当 tensor 是一个多数值的张量时,应当使用何种归一化操作来转换成单一数值,应当为以下类型之一:``['max', 'min', 'sum', 'mean']``; + :return: 返回一个单一数值,其数值类型是 python 中的基本的数值类型,例如 ``int,float`` 等; + """ if tensor is None: return None @@ -145,7 +198,32 @@ class JittorDriver(Driver): """ return batch - # def set_sampler_epoch(self, dataloader: JittorDataLoader, cur_epoch_idx): - # # 保证 ddp 训练时的 shuffle=True 时的正确性,因为需要保证每一个进程上的 sampler 的shuffle 的随机数种子是一样的; - # if callable(getattr(dataloader.batch_sampler, "set_epoch", None)): - # dataloader.batch_sampler.set_epoch(cur_epoch_idx) + @staticmethod + def worker_init_function(worker_id: int, rank: Optional[int] = None) -> None: # pragma: no cover + global_rank = rank if rank is not None else int(os.environ.get(FASTNLP_GLOBAL_RANK, 0)) + process_seed = jt.get_seed() + # back out the base seed so we can use all the bits + base_seed = process_seed - worker_id + ss = np.random.SeedSequence([base_seed, worker_id, global_rank]) + # use 128 bits (4 x 32-bit words) + np.random.seed(ss.generate_state(4)) + # Spawn distinct SeedSequences for the PyTorch PRNG and the stdlib random module + jittor_ss, stdlib_ss = ss.spawn(2) + jt.set_global_seed(jittor_ss.generate_state(1, dtype=np.uint64)[0]) + # use 128 bits expressed as an integer + stdlib_seed = (stdlib_ss.generate_state(2, dtype=np.uint64).astype(object) * [1 << 64, 1]).sum() + random.seed(stdlib_seed) + + def set_deterministic_dataloader(self, dataloader: Union["JittorDataLoader", "Dataset"]): + if int(os.environ.get(FASTNLP_SEED_WORKERS, 0)) and dataloader.worker_init_fn is None: + dataloader.worker_init_fn = partial(self.worker_init_function, + rank=int(os.environ.get(FASTNLP_GLOBAL_RANK, 0))) + + def set_sampler_epoch(self, dataloader: Union["JittorDataLoader", "Dataset"], cur_epoch_idx: int): + # 保证 ddp 训练时的 shuffle=True 时的正确性,因为需要保证每一个进程上的 sampler 的shuffle 的随机数种子是一样的; + if callable(getattr(dataloader.sampler, "set_epoch", None)): + dataloader.sampler.set_epoch(cur_epoch_idx) + + @staticmethod + def get_dataloader_args(dataloader: Union["JittorDataLoader", "Dataset"]): + pass diff --git a/fastNLP/core/drivers/jittor_driver/mpi.py b/fastNLP/core/drivers/jittor_driver/mpi.py index ee2514e9..93187ede 100644 --- a/fastNLP/core/drivers/jittor_driver/mpi.py +++ b/fastNLP/core/drivers/jittor_driver/mpi.py @@ -146,7 +146,10 @@ class JittorMPIDriver(JittorDriver): return self.model.no_sync def unwrap_model(self): - pass + """ + 返回训练使用的模型。 + """ + return self.model def get_local_rank(self) -> int: return self.local_rank @@ -155,4 +158,14 @@ class JittorMPIDriver(JittorDriver): pass def is_distributed(self): + """ + 判断是否为分布式的 **Driver** ,在 ``JittorSingleDriver`` 中,返回 ``True``。 + """ return True + + @property + def data_device(self) -> str: + """ + :return: 数据所在的设备; + """ + return self.model_device \ No newline at end of file diff --git a/fastNLP/core/drivers/jittor_driver/single_device.py b/fastNLP/core/drivers/jittor_driver/single_device.py index 19c4b4c2..b559fd92 100644 --- a/fastNLP/core/drivers/jittor_driver/single_device.py +++ b/fastNLP/core/drivers/jittor_driver/single_device.py @@ -27,28 +27,36 @@ class JittorSingleDriver(JittorDriver): 支持 cpu 和 gpu 的切换; 实现断点重训中替换 dataloader 的 set_dist_repro_dataloader 函数 + :param model: 传入给 ``Trainer`` 的 ``model`` 参数; + :param device: 训练和模型所在的设备,在 **Jittor** 中,应当为以下值之一:``[None, 'cpu', 'gpu', 'cuda']``; + + * 为 ``None`` 或 ``cpu`` 时 + 表示在 ``cpu`` 上进行训练; + * 为 ``gpu`` 或 ``cuda`` 时 + 表示在显卡设备上进行训练; + + :param fp16: 是否开启 fp16; """ def __init__(self, model, device=None, fp16: bool = False, **kwargs): + if device not in [None, "cpu", "gpu", "cuda"]: + raise RuntimeError("Parameter `device` should be one of [None, 'cpu', 'gpu', 'cuda'] .") super(JittorSingleDriver, self).__init__(model, fp16) - self.model_device = device + self.model_device = device if device is not None else "cpu" self.local_rank = 0 self.global_rank = 0 self.world_size = 1 - def step(self): - for optimizer in self.optimizers: - optimizer.step() - - def backward(self, loss): - for optimizer in self.optimizers: - optimizer.backward(loss) - - def zero_grad(self): - for optimizer in self.optimizers: - optimizer.zero_grad() + def setup(self): + r""" + 初始化训练环境;根据传入的 ``device`` 值设置模型的训练场景为 ``cpu`` 或 ``gpu``; + """ + if self.model_device in ["cpu", None]: + jt.flags.use_cuda = 0 # 使用 cpu + else: + jt.flags.use_cuda = 1 # 使用 cuda def model_call(self, batch, fn: Callable, signature_fn: Optional[Callable]) -> Dict: if isinstance(batch, Dict) and not self.wo_auto_param_call: @@ -70,9 +78,15 @@ class JittorSingleDriver(JittorDriver): raise RuntimeError(f"There is no `{fn}` method in your {type(self.model)}.") def unwrap_model(self): + """ + 返回训练使用的模型。 + """ return self.model def is_distributed(self): + """ + 判断是否为分布式的 **Driver** ,在 ``JittorSingleDriver`` 中,返回 ``False``。 + """ return False def set_dist_repro_dataloader(self, dataloader, dist: Union[str, ReproducibleBatchSampler, ReproducibleSampler], @@ -103,11 +117,15 @@ class JittorSingleDriver(JittorDriver): else: return dataloader - def setup(self): + def unwrap_model(self): """ - 支持 cpu 和 gpu 的切换 + 返回训练使用的模型。 """ - if self.model_device in ["cpu", None]: - jt.flags.use_cuda = 0 # 使用 cpu - else: - jt.flags.use_cuda = 1 # 使用 cuda + return self.model + + @property + def data_device(self) -> str: + """ + :return: 数据和模型所在的设备; + """ + return self.model_device diff --git a/fastNLP/core/drivers/jittor_driver/utils.py b/fastNLP/core/drivers/jittor_driver/utils.py index c6d44cfc..50eed7e3 100644 --- a/fastNLP/core/drivers/jittor_driver/utils.py +++ b/fastNLP/core/drivers/jittor_driver/utils.py @@ -1,56 +1,6 @@ -from contextlib import ExitStack - from fastNLP.envs.imports import _NEED_IMPORT_JITTOR if _NEED_IMPORT_JITTOR: import jittor __all__ = [] - -class DummyGradScaler: - """ - 用于仿造的 **GradScaler** 对象,防止重复写大量的if判断 - """ - def __init__(self, *args, **kwargs): - pass - - def get_scale(self): - return 1.0 - - def is_enabled(self): - return False - - def scale(self, outputs): - return outputs - - def step(self, optimizer, *args, **kwargs): - optimizer.step(*args, **kwargs) - - def update(self, new_scale=None): - pass - - def unscale_(self, optimizer): - pass - - def load_state_dict(self, state_dict): - pass - - def state_dict(self): - return {} - - -def _build_fp16_env(dummy=False): - if dummy: - auto_cast = ExitStack - GradScaler = DummyGradScaler - else: - raise NotImplementedError("JittorDriver does not support fp16 now.") - # if not jt.flags.use_cuda: - # raise RuntimeError("No cuda") - # if paddle.device.cuda.get_device_capability(0)[0] < 7: - # log.warning( - # "NOTE: your device does NOT support faster training with fp16, " - # "please switch to FP32 which is likely to be faster" - # ) - # from paddle.amp import auto_cast, GradScaler - return auto_cast, GradScaler \ No newline at end of file diff --git a/fastNLP/core/drivers/paddle_driver/paddle_driver.py b/fastNLP/core/drivers/paddle_driver/paddle_driver.py index e879dd90..090bf567 100644 --- a/fastNLP/core/drivers/paddle_driver/paddle_driver.py +++ b/fastNLP/core/drivers/paddle_driver/paddle_driver.py @@ -113,12 +113,11 @@ class PaddleDriver(Driver): @staticmethod def tensor_to_numeric(tensor, reduce=None): r""" - 将一个 `tensor` 对象(类型为 `paddle.Tensor` )转换为 python 的 `numeric` 对象;如果 tensor 只包含一个元素则返回 float 或 int 。 + 将一个 :class:`paddle.Tensor` 对象转换为 转换成 python 中的数值类型; - :param tensor: 需要被转换的 `tensor` 对象 - :param reduce: 可选 ['sum', 'max', 'mea', 'min'],如果不为 None 将使用该 reduce 方法来处理当前 tensor 再返回 - float 或 int 对象。 - :return: 转换后返回的结果 + :param tensor: :class:`paddle.Tensor` 类型的对象; + :param reduce: 当 tensor 是一个多数值的张量时,应当使用何种归一化操作来转换成单一数值,应当为以下类型之一:``['max', 'min', 'sum', 'mean']``; + :return: 返回一个单一数值,其数值类型是 python 中的基本的数值类型,例如 ``int,float`` 等; """ if tensor is None: return None