import os import shutil import subprocess import unittest from argparse import ArgumentParser import numpy as np import torch.cuda from fastNLP import AccuracyMetric from fastNLP import CrossEntropyLoss, BCELoss from fastNLP import DataSet from fastNLP import Instance from fastNLP import SGD from fastNLP.core.callback import EchoCallback from fastNLP.core.dist_trainer import DistTrainer, get_local_rank from fastNLP.models.base_model import NaiveClassifier def prepare_fake_dataset(): mean = np.array([-3, -3]) cov = np.array([[1, 0], [0, 1]]) class_A = np.random.multivariate_normal(mean, cov, size=(1000,)) mean = np.array([3, 3]) cov = np.array([[1, 0], [0, 1]]) class_B = np.random.multivariate_normal(mean, cov, size=(1000,)) data_set = DataSet([Instance(x=[float(item[0]), float(item[1])], y=0) for item in class_A] + [Instance(x=[float(item[0]), float(item[1])], y=1) for item in class_B]) return data_set def prepare_fake_dataset2(*args, size=100): ys = np.random.randint(4, size=100, dtype=np.int64) data = {'y': ys} for arg in args: data[arg] = np.random.randn(size, 5) return DataSet(data=data) def set_rng_seed(seed): np.random.seed(seed) def prepare_env(): def prepare_fake_dataset(): mean = np.array([-3, -3]) cov = np.array([[1, 0], [0, 1]]) class_A = np.random.multivariate_normal(mean, cov, size=(1000,)) mean = np.array([3, 3]) cov = np.array([[1, 0], [0, 1]]) class_B = np.random.multivariate_normal(mean, cov, size=(1000,)) data_set = DataSet([Instance(x=[float(item[0]), float(item[1])], y=[0.0]) for item in class_A] + [Instance(x=[float(item[0]), float(item[1])], y=[1.0]) for item in class_B]) return data_set data_set = prepare_fake_dataset() data_set.set_input("x") data_set.set_target("y") model = NaiveClassifier(2, 1) return data_set, model class TestDistTrainer(unittest.TestCase): save_path = './save_cp' def run1(self): # test distributed training print('local rank', get_local_rank()) set_rng_seed(100) data_set = prepare_fake_dataset() data_set.set_input("x", flag=True) data_set.set_target("y", flag=True) model = NaiveClassifier(2, 2) trainer = DistTrainer( model=model, train_data=data_set, optimizer=SGD(lr=0.1), loss=CrossEntropyLoss(pred="predict", target="y"), batch_size_per_gpu=8, n_epochs=3, print_every=50, save_path=self.save_path, ) trainer.train() """ # 应该正确运行 """ if trainer.is_master and os.path.exists(self.save_path): shutil.rmtree(self.save_path) def run2(self): # test fp16 with distributed training print('local rank', get_local_rank()) set_rng_seed(100) data_set = prepare_fake_dataset() data_set.set_input("x", flag=True) data_set.set_target("y", flag=True) model = NaiveClassifier(2, 2) trainer = DistTrainer( model=model, train_data=data_set, optimizer=SGD(lr=0.1), loss=CrossEntropyLoss(pred="predict", target="y"), batch_size_per_gpu=8, n_epochs=3, print_every=50, save_path=self.save_path, fp16='O1' ) trainer.train() """ # 应该正确运行 """ if trainer.is_master and os.path.exists(self.save_path): shutil.rmtree(self.save_path) def run3(self): set_rng_seed(100) data_set, model = prepare_env() trainer = DistTrainer( data_set, model, optimizer=None, loss=BCELoss(pred="predict", target="y"), n_epochs=3, print_every=50, callbacks_all=[EchoCallback('callbacks_all')], callbacks_master=[EchoCallback('callbacks_master')] ) trainer.train() def run4(self): set_rng_seed(100) data_set, model = prepare_env() train_set, dev_set = data_set.split(0.3) model = NaiveClassifier(2, 1) trainer = DistTrainer( train_set, model, optimizer=SGD(lr=0.1), loss=BCELoss(pred="predict", target="y"), batch_size_per_gpu=32, n_epochs=3, print_every=50, dev_data=dev_set, metrics=AccuracyMetric(pred="predict", target="y"), validate_every=-1, save_path=self.save_path, ) trainer.train() """ # 应该正确运行 """ if trainer.is_master and os.path.exists(self.save_path): shutil.rmtree(self.save_path) def run_dist(self, run_id): if torch.cuda.is_available(): ngpu = min(2, torch.cuda.device_count()) path = __file__ cmd = ['python', '-m', 'torch.distributed.launch', '--nproc_per_node', str(ngpu), path, '--test', str(run_id)] print(' '.join(cmd)) subprocess.check_call(cmd) def test_normal_run(self): self.run_dist(1) def no_test_fp16(self): self.run_dist(2) def test_callback(self): self.run_dist(3) def test_dev_data(self): self.run_dist(4) if __name__ == '__main__': runner = TestDistTrainer() parser = ArgumentParser() parser.add_argument('--test', type=int) args, _ = parser.parse_known_args() if args.test and hasattr(runner, 'run%s' % args.test): getattr(runner, 'run%s' % args.test)()