diff --git a/fastNLP/core/dataloaders/paddle_dataloader/fdl.py b/fastNLP/core/dataloaders/paddle_dataloader/fdl.py index 37130e3e..c84c1aaf 100644 --- a/fastNLP/core/dataloaders/paddle_dataloader/fdl.py +++ b/fastNLP/core/dataloaders/paddle_dataloader/fdl.py @@ -137,9 +137,11 @@ class PaddleDataLoader(DataLoader): if batch_sampler is None: batch_sampler = RandomBatchSampler(dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last) - batch_size = 1 - shuffle = False - drop_last = False + # 因为无论如何传给 DataLoader 的 batch_sampler 都不是 None + # 所以要恢复默认值防止报错 + batch_size = 1 + shuffle = False + drop_last = False if isinstance(collate_fn, str): if collate_fn == 'auto': diff --git a/tests/core/drivers/paddle_driver/test_fleet.py b/tests/core/drivers/paddle_driver/test_fleet.py index 4421e4b1..b303249c 100644 --- a/tests/core/drivers/paddle_driver/test_fleet.py +++ b/tests/core/drivers/paddle_driver/test_fleet.py @@ -522,6 +522,103 @@ class TestSetDistReproDataloader: assert len(left_idxes) + len(already_seen_idx) == len(self.dataset) / num_replicas assert len(left_idxes | already_seen_idx) == len(self.dataset) / num_replicas + @magic_argv_env_context + @pytest.mark.parametrize("shuffle", ([True, False])) + @pytest.mark.parametrize("batch_size", ([1, 3, 16, 17])) + @pytest.mark.parametrize("drop_last", ([True, False])) + def test_shuffle_dataloader(self, shuffle, batch_size, drop_last, reproducible=True): + try: + # 需要检验一下 set_dist_repro_dataloader 没有修改参数 + num_samples = 200 + dataset = PaddleNormalXYDataset(num_samples) + dl = prepare_paddle_dataloader(dataset, shuffle=shuffle, batch_size=batch_size, drop_last=drop_last) + model = PaddleNormalModel_Classification_1(10, 32) + self.driver.setup() + dl = self.driver.set_dist_repro_dataloader(dataloader=dl, dist='dist', reproducible=reproducible) + + data = [] + flags = [] + for batch in dl: + flags.append(batch['x'].shape[0] == batch_size) + data.extend(batch['x'].reshape((-1, )).tolist()) + + _num_samples = num_samples//2 + + if drop_last and _num_samples%batch_size != 0: + assert len(data)!=_num_samples + assert all(flags) == True + elif _num_samples%batch_size!=0: + assert flags[-1] is False + else: + assert len(data) == _num_samples + + if not shuffle: + for i in range(1, len(data)-1): + assert data[i]>data[i-1] + else: + flags = [] + for i in range(1, len(data)-1): + flags.append(data[i]>data[i-1]) + assert all(flags) is False + datas = fastnlp_paddle_all_gather(data) + if drop_last: + assert len(set(datas[0] + datas[1])) == num_samples-_num_samples%batch_size*2 + else: + assert len(set(datas[0] + datas[1])) == num_samples + finally: + dist.barrier() + + @magic_argv_env_context + @pytest.mark.parametrize("shuffle", ([True, False])) + @pytest.mark.parametrize("batch_size", ([1, 3, 16, 17])) + @pytest.mark.parametrize("drop_last", ([True, False])) + def test_batch_sampler_dataloader(self, shuffle, batch_size, drop_last, reproducible=True): + try: + # 需要检验一下 set_dist_repro_dataloader 没有修改参数 + num_samples = 200 + num_device = 2 + dataset = PaddleNormalXYDataset(num_samples) + sampler = BucketedBatchSampler(dataset, length=dataset._data, batch_size=batch_size, drop_last=drop_last, + shuffle=shuffle, num_batch_per_bucket=2) + dl = prepare_paddle_dataloader(dataset, batch_sampler=sampler) + model = PaddleNormalModel_Classification_1(10, 32) + device = [0, 1] + self.driver.setup() + dl = self.driver.set_dist_repro_dataloader(dataloader=dl, dist='dist', reproducible=reproducible) + + data = [] + flags = [] + for batch in dl: + d = batch['x'].reshape((-1, )).tolist() + diff = max(d) - min(d) + assert diffdata[i-1] - else: - flags = [] - for i in range(1, len(data)-1): - flags.append(data[i]>data[i-1]) - assert all(flags) is False - datas = fastnlp_paddle_all_gather(data) - if drop_last: - assert len(set(datas[0] + datas[1])) == num_samples-_num_samples%batch_size*2 - else: - assert len(set(datas[0] + datas[1])) == num_samples - finally: - if dist.is_initialized(): - dist.barrier() - dist.destroy_process_group() - - -@pytest.mark.torch -@magic_argv_env_context -@pytest.mark.parametrize("shuffle", ([True, False])) -@pytest.mark.parametrize("batch_size", ([1, 3, 16, 17])) -@pytest.mark.parametrize("drop_last", ([True, False])) -def test_batch_sampler_dataloader(shuffle, batch_size, drop_last, reproducible=True): - try: - # 需要检验一下 set_dist_repro_dataloader 没有修改参数 - num_samples = 200 - num_device = 2 - dataset = PaddleNormalXYDataset(num_samples) - sampler = BucketedBatchSampler(dataset, length=dataset._data, batch_size=batch_size, drop_last=drop_last, - shuffle=shuffle, num_batch_per_bucket=2) - dl = prepare_paddle_dataloader(dataset, batch_sampler=sampler) - model = PaddleNormalModel_Classification_1(10, 32) - device = [0, 1] - driver = PaddleFleetDriver(model, parallel_device=device) - driver.setup() - dl = driver.set_dist_repro_dataloader(dataloader=dl, dist='dist', reproducible=reproducible) - - data = [] - flags = [] - for batch in dl: - d = batch['x'].reshape(-1).tolist() - diff = max(d) - min(d) - assert diffdata[i-1] + else: + flags = [] + for i in range(1, len(data)): + flags.append(data[i]>data[i-1]) + assert all(flags) is False + + + @pytest.mark.paddle + @pytest.mark.parametrize("shuffle", ([True, False])) + @pytest.mark.parametrize("batch_size", ([1, 3, 16, 17])) + @pytest.mark.parametrize("drop_last", ([True, False])) + @pytest.mark.parametrize("reproducible", ([True, False])) + def test_batch_sampler_dataloader(self, shuffle, batch_size, drop_last, reproducible): + # 需要检验一下 set_dist_repro_dataloader 没有修改参数 + num_samples = 200 + dataset = PaddleNormalXYDataset(num_samples) + sampler = BucketedBatchSampler(dataset, length=dataset._data, batch_size=batch_size, drop_last=drop_last, + shuffle=shuffle, num_batch_per_bucket=2) + dl = prepare_paddle_dataloader(dataset, batch_sampler=sampler) + model = PaddleNormalModel_Classification_1(1, 2) + dl = self.driver.set_dist_repro_dataloader(dataloader=dl, reproducible=reproducible) + + data = [] + flags = [] + for batch in dl: + d = batch['x'].reshape((-1, )).tolist() + diff = max(d) - min(d) + assert diffdata[i-1] - else: - flags = [] - for i in range(1, len(data)): - flags.append(data[i]>data[i-1]) - assert all(flags) is False - - -@pytest.mark.torch -@pytest.mark.parametrize("shuffle", ([True, False])) -@pytest.mark.parametrize("batch_size", ([1, 3, 16, 17])) -@pytest.mark.parametrize("drop_last", ([True, False])) -@pytest.mark.parametrize("reproducible", ([True, False])) -def test_batch_sampler_dataloader(shuffle, batch_size, drop_last, reproducible): - # 需要检验一下 set_dist_repro_dataloader 没有修改参数 - num_samples = 200 - dataset = PaddleNormalXYDataset(num_samples) - sampler = BucketedBatchSampler(dataset, length=dataset._data, batch_size=batch_size, drop_last=drop_last, - shuffle=shuffle, num_batch_per_bucket=2) - dl = prepare_paddle_dataloader(dataset, batch_sampler=sampler) - model = PaddleNormalModel_Classification_1(1, 2) - driver = PaddleSingleDriver(model, device="cpu") - dl = driver.set_dist_repro_dataloader(dataloader=dl, reproducible=reproducible) - - data = [] - flags = [] - for batch in dl: - d = batch['x'].reshape(-1).tolist() - diff = max(d) - min(d) - assert diff