@@ -40,20 +40,22 @@ __all__ = [ | |||
class JittorDriver(Driver): | |||
r""" | |||
``Jittor`` 框架的 ``Driver`` | |||
``Jittor`` 框架的 ``Driver``,是 ``JittorSingleDevice`` 和 ``JittorMPIDriver`` 的父类。 | |||
.. note:: | |||
.. warning:: | |||
这是一个正在开发中的功能,敬请期待。 | |||
您不应当直接初始化该类,然后传入给 ``Trainer``,换句话说,您应当使用该类的子类 ``JittorSingleDriver`` 和 ``TorchDDPDriver``,而不是 | |||
该类本身; | |||
.. todo:: | |||
.. note:: | |||
实现 fp16 的设置,且支持 cpu 和 gpu 的切换; | |||
实现用于断点重训的 save 和 load 函数; | |||
您可以在使用 ``JittorSingleDevice`` 和 ``JittorMPIDriver`` 时使用 ``JittorDriver`` 提供的接口; | |||
:param model: 训练时使用的 **jittor** 模型; | |||
:param fp16: 是否开启混合精度训练; | |||
:param jittor_kwargs: | |||
""" | |||
def __init__(self, model, fp16: bool = False, **kwargs): | |||
def __init__(self, model, fp16: bool = False, jittor_kwargs: Dict = {}, **kwargs): | |||
if not isinstance(model, Module): | |||
raise ValueError(f"Parameter `model` can not be `{type(model)}` in `JittorDriver`, it should be exactly " | |||
f"`jittor.Module` type.") | |||
@@ -65,6 +67,7 @@ class JittorDriver(Driver): | |||
jt.flags.auto_mixed_precision_level = 0 | |||
self.fp16 = fp16 | |||
self._auto_cast = nullcontext | |||
self._jittor_kwargs = jittor_kwargs | |||
# 用来设置是否关闭 auto_param_call 中的参数匹配问题; | |||
self.wo_auto_param_call = kwargs.get("model_wo_auto_param_call", False) | |||
@@ -34,10 +34,11 @@ class JittorMPIDriver(JittorDriver): | |||
parallel_device: None, | |||
is_pull_by_jittor_run: bool = False, | |||
fp16: bool = False, | |||
jittor_kwargs: Dict = {}, | |||
**kwargs | |||
): | |||
super(JittorMPIDriver, self).__init__(model, fp16=fp16, **kwargs) | |||
super(JittorMPIDriver, self).__init__(model, fp16=fp16, jittor_kwargs=jittor_kwargs, **kwargs) | |||
raise NotImplementedError("MPI for Jittor is not supported right now.") | |||
self.is_pull_by_jittor_run = is_pull_by_jittor_run | |||
@@ -25,15 +25,6 @@ class JittorSingleDriver(JittorDriver): | |||
r""" | |||
``Jittor`` 框架下用于 ``cpu`` 和单卡 ``gpu`` 运算的 ``Driver``。 | |||
.. note:: | |||
这是一个正在开发中的功能,敬请期待。 | |||
.. todo:: | |||
支持 cpu 和 gpu 的切换; | |||
实现断点重训中替换 dataloader 的 set_dist_repro_dataloader 函数 | |||
:param model: 传入给 ``Trainer`` 的 ``model`` 参数; | |||
:param device: 训练和模型所在的设备,在 **Jittor** 中,应当为以下值之一:``[None, 'cpu', 'gpu', 'cuda']``; | |||
@@ -43,12 +34,13 @@ class JittorSingleDriver(JittorDriver): | |||
表示在显卡设备上进行训练; | |||
:param fp16: 是否开启 fp16; | |||
:param jittor_kwargs: | |||
""" | |||
def __init__(self, model, device=None, fp16: bool = False, **kwargs): | |||
def __init__(self, model, device=None, fp16: bool = False, jittor_kwargs: Dict = {}, **kwargs): | |||
if device not in [None, "cpu", "gpu", "cuda"]: | |||
raise RuntimeError("Parameter `device` should be one of [None, 'cpu', 'gpu', 'cuda'] .") | |||
super(JittorSingleDriver, self).__init__(model, fp16) | |||
super(JittorSingleDriver, self).__init__(model, fp16, jittor_kwargs=jittor_kwargs) | |||
self.model_device = device if device is not None else "cpu" | |||
@@ -157,7 +157,7 @@ class PaddleFleetDriver(PaddleDriver): | |||
): | |||
if USER_CUDA_VISIBLE_DEVICES not in os.environ: | |||
raise RuntimeError("To run paddle distributed training, please set `FASTNLP_BACKEND` to 'paddle' before using FastNLP.") | |||
super(PaddleFleetDriver, self).__init__(model, fp16=fp16, **kwargs) | |||
super(PaddleFleetDriver, self).__init__(model, fp16=fp16, paddle_kwrags=paddle_kwargs, **kwargs) | |||
# 如果不是通过 launch 启动,要求用户必须传入 parallel_device | |||
if not is_pull_by_paddle_run: | |||
@@ -68,12 +68,6 @@ class PaddleDriver(Driver): | |||
:param model: 训练时使用的 **PaddlePaddle** 模型; | |||
:param fp16: 是否开启混合精度训练; | |||
:param paddle_kwargs: | |||
:kwargs: | |||
* wo_auto_param_call (``bool``) -- 是否关闭在训练时调用我们的 ``auto_param_call`` 函数来自动匹配 batch 和前向函数的参数的行为; | |||
.. note:: | |||
关于该参数的详细说明,请参见 :class:`~fastNLP.core.controllers.Trainer` 中的描述;函数 ``auto_param_call`` 详见 :func:`fastNLP.core.utils.auto_param_call`。 | |||
""" | |||
def __init__(self, model: "paddle.nn.Layer", fp16: Optional[bool] = False, paddle_kwrags: Dict = {}, **kwargs): | |||
@@ -1,78 +0,0 @@ | |||
import oneflow | |||
from oneflow import nn | |||
from oneflow.utils.data import DataLoader, Dataset | |||
from oneflow.nn.parallel import DistributedDataParallel as ddp | |||
import os | |||
# print(oneflow.ones(3,4).device) | |||
# print(oneflow.rand(3,4).device) | |||
# exit(0) | |||
# PLACEMENT = oneflow.placement("cuda", [0,1]) | |||
# S0 = oneflow.sbp.split(0) | |||
# B = oneflow.sbp.broadcast | |||
print(oneflow.cuda.current_device()) | |||
exit(0) | |||
class OneflowArgMaxDataset(Dataset): | |||
def __init__(self, feature_dimension=10, data_num=1000, seed=0): | |||
self.num_labels = feature_dimension | |||
self.feature_dimension = feature_dimension | |||
self.data_num = data_num | |||
self.seed = seed | |||
g = oneflow.Generator() | |||
g.manual_seed(1000) | |||
self.x = oneflow.randint(low=-100, high=100, size=[data_num, feature_dimension], generator=g).float() | |||
self.y = oneflow.max(self.x, dim=-1)[1] | |||
def __len__(self): | |||
return self.data_num | |||
def __getitem__(self, item): | |||
return self.x[item], self.y[item] | |||
class Model(nn.Module): | |||
def __init__(self, num_labels, feature_dimension): | |||
super(Model, self).__init__() | |||
self.num_labels = num_labels | |||
self.linear1 = nn.Linear(in_features=feature_dimension, out_features=10) | |||
self.ac1 = nn.ReLU() | |||
self.linear2 = nn.Linear(in_features=10, out_features=10) | |||
self.ac2 = nn.ReLU() | |||
self.output = nn.Linear(in_features=10, out_features=num_labels) | |||
def forward(self, x): | |||
x = self.ac1(self.linear1(x)) | |||
x = self.ac2(self.linear2(x)) | |||
x = self.output(x) | |||
return x | |||
dataset = OneflowArgMaxDataset(10, 100) | |||
model = Model(10, 10) | |||
loss_func = nn.CrossEntropyLoss() | |||
optimizer = oneflow.optim.Adam(model.parameters(), 0.001) | |||
dataloader = oneflow.utils.data.DataLoader(dataset, batch_size=32) | |||
device = "cuda" | |||
model.to(device) | |||
# model = ddp(model) | |||
loss_func.to(device) | |||
# model = model.to_global(PLACEMENT, B) | |||
for i in range(2): | |||
for i, (x, y) in enumerate(dataloader): | |||
if i % 2 != oneflow.env.get_rank(): | |||
continue | |||
x = x.to(device) | |||
y = y.to(device) | |||
# x = x.to_global(PLACEMENT, S0) | |||
# y = y.to_global(PLACEMENT, S0) | |||
output = model(x) | |||
loss = loss_func(output, y) | |||
optimizer.zero_grad() | |||
loss.backward() | |||
optimizer.step() | |||
oneflow.save(model, "ttt") | |||
print("end.") | |||
# python -m oneflow.distributed.launch --nproc_per_node 2 dist.py | |||