Browse Source

修复新增的set_dist_repro_dataloader函数测试例在paddle情况下的问题

tags/v1.0.0alpha
x54-729 3 years ago
parent
commit
c99315f79e
3 changed files with 180 additions and 193 deletions
  1. +5
    -3
      fastNLP/core/dataloaders/paddle_dataloader/fdl.py
  2. +97
    -108
      tests/core/drivers/paddle_driver/test_fleet.py
  3. +78
    -82
      tests/core/drivers/paddle_driver/test_single_device.py

+ 5
- 3
fastNLP/core/dataloaders/paddle_dataloader/fdl.py View File

@@ -137,9 +137,11 @@ class PaddleDataLoader(DataLoader):
if batch_sampler is None:
batch_sampler = RandomBatchSampler(dataset, batch_size=batch_size, shuffle=shuffle,
drop_last=drop_last)
batch_size = 1
shuffle = False
drop_last = False
# 因为无论如何传给 DataLoader 的 batch_sampler 都不是 None
# 所以要恢复默认值防止报错
batch_size = 1
shuffle = False
drop_last = False

if isinstance(collate_fn, str):
if collate_fn == 'auto':


+ 97
- 108
tests/core/drivers/paddle_driver/test_fleet.py View File

@@ -522,6 +522,103 @@ class TestSetDistReproDataloader:
assert len(left_idxes) + len(already_seen_idx) == len(self.dataset) / num_replicas
assert len(left_idxes | already_seen_idx) == len(self.dataset) / num_replicas

@magic_argv_env_context
@pytest.mark.parametrize("shuffle", ([True, False]))
@pytest.mark.parametrize("batch_size", ([1, 3, 16, 17]))
@pytest.mark.parametrize("drop_last", ([True, False]))
def test_shuffle_dataloader(self, shuffle, batch_size, drop_last, reproducible=True):
try:
# 需要检验一下 set_dist_repro_dataloader 没有修改参数
num_samples = 200
dataset = PaddleNormalXYDataset(num_samples)
dl = prepare_paddle_dataloader(dataset, shuffle=shuffle, batch_size=batch_size, drop_last=drop_last)
model = PaddleNormalModel_Classification_1(10, 32)
self.driver.setup()
dl = self.driver.set_dist_repro_dataloader(dataloader=dl, dist='dist', reproducible=reproducible)

data = []
flags = []
for batch in dl:
flags.append(batch['x'].shape[0] == batch_size)
data.extend(batch['x'].reshape((-1, )).tolist())

_num_samples = num_samples//2

if drop_last and _num_samples%batch_size != 0:
assert len(data)!=_num_samples
assert all(flags) == True
elif _num_samples%batch_size!=0:
assert flags[-1] is False
else:
assert len(data) == _num_samples

if not shuffle:
for i in range(1, len(data)-1):
assert data[i]>data[i-1]
else:
flags = []
for i in range(1, len(data)-1):
flags.append(data[i]>data[i-1])
assert all(flags) is False
datas = fastnlp_paddle_all_gather(data)
if drop_last:
assert len(set(datas[0] + datas[1])) == num_samples-_num_samples%batch_size*2
else:
assert len(set(datas[0] + datas[1])) == num_samples
finally:
dist.barrier()

@magic_argv_env_context
@pytest.mark.parametrize("shuffle", ([True, False]))
@pytest.mark.parametrize("batch_size", ([1, 3, 16, 17]))
@pytest.mark.parametrize("drop_last", ([True, False]))
def test_batch_sampler_dataloader(self, shuffle, batch_size, drop_last, reproducible=True):
try:
# 需要检验一下 set_dist_repro_dataloader 没有修改参数
num_samples = 200
num_device = 2
dataset = PaddleNormalXYDataset(num_samples)
sampler = BucketedBatchSampler(dataset, length=dataset._data, batch_size=batch_size, drop_last=drop_last,
shuffle=shuffle, num_batch_per_bucket=2)
dl = prepare_paddle_dataloader(dataset, batch_sampler=sampler)
model = PaddleNormalModel_Classification_1(10, 32)
device = [0, 1]
self.driver.setup()
dl = self.driver.set_dist_repro_dataloader(dataloader=dl, dist='dist', reproducible=reproducible)

data = []
flags = []
for batch in dl:
d = batch['x'].reshape((-1, )).tolist()
diff = max(d) - min(d)
assert diff<batch_size*2*2*2
data.extend(d)
flags.append(len(d)==batch_size)
_num_samples = num_samples//num_device
if drop_last and _num_samples%batch_size != 0:
assert len(data)!=_num_samples
assert all(flags) == True
elif _num_samples%batch_size!=0:
assert flags[-1] is False
else:
assert len(data) == _num_samples

if not shuffle:
for i in range(1, len(data)-1):
assert data[i]<data[i-1]
else:
flags = []
for i in range(1, len(data)-1):
flags.append(data[i]<data[i-1])
assert all(flags) is False
datas = fastnlp_paddle_all_gather(data)
if drop_last:
assert len(set(datas[0] + datas[1])) == num_samples-_num_samples%batch_size*2
else:
assert len(set(datas[0] + datas[1])) == num_samples
finally:
dist.barrier()


############################################################################
#
@@ -817,111 +914,3 @@ class TestSaveLoad:

finally:
rank_zero_rm(path)


@pytest.mark.torch
@magic_argv_env_context
@pytest.mark.parametrize("shuffle", ([True, False]))
@pytest.mark.parametrize("batch_size", ([1, 3, 16, 17]))
@pytest.mark.parametrize("drop_last", ([True, False]))
def test_shuffle_dataloader(shuffle, batch_size, drop_last, reproducible=True):
try:
# 需要检验一下 set_dist_repro_dataloader 没有修改参数
num_samples = 200
dataset = PaddleNormalXYDataset(num_samples)
dl = prepare_paddle_dataloader(dataset, shuffle=shuffle, batch_size=batch_size, drop_last=drop_last)
model = PaddleNormalModel_Classification_1(10, 32)
device = [0, 1]
driver = PaddleFleetDriver(model, parallel_device=device)
driver.setup()
dl = driver.set_dist_repro_dataloader(dataloader=dl, dist='dist', reproducible=reproducible)

data = []
flags = []
for batch in dl:
flags.append(batch['x'].shape[0] == batch_size)
data.extend(batch['x'].reshape(-1).tolist())

_num_samples = num_samples//2

if drop_last and _num_samples%batch_size != 0:
assert len(data)!=_num_samples
assert all(flags) == True
elif _num_samples%batch_size!=0:
assert flags[-1] is False
else:
assert len(data) == _num_samples

if not shuffle:
for i in range(1, len(data)-1):
assert data[i]>data[i-1]
else:
flags = []
for i in range(1, len(data)-1):
flags.append(data[i]>data[i-1])
assert all(flags) is False
datas = fastnlp_paddle_all_gather(data)
if drop_last:
assert len(set(datas[0] + datas[1])) == num_samples-_num_samples%batch_size*2
else:
assert len(set(datas[0] + datas[1])) == num_samples
finally:
if dist.is_initialized():
dist.barrier()
dist.destroy_process_group()


@pytest.mark.torch
@magic_argv_env_context
@pytest.mark.parametrize("shuffle", ([True, False]))
@pytest.mark.parametrize("batch_size", ([1, 3, 16, 17]))
@pytest.mark.parametrize("drop_last", ([True, False]))
def test_batch_sampler_dataloader(shuffle, batch_size, drop_last, reproducible=True):
try:
# 需要检验一下 set_dist_repro_dataloader 没有修改参数
num_samples = 200
num_device = 2
dataset = PaddleNormalXYDataset(num_samples)
sampler = BucketedBatchSampler(dataset, length=dataset._data, batch_size=batch_size, drop_last=drop_last,
shuffle=shuffle, num_batch_per_bucket=2)
dl = prepare_paddle_dataloader(dataset, batch_sampler=sampler)
model = PaddleNormalModel_Classification_1(10, 32)
device = [0, 1]
driver = PaddleFleetDriver(model, parallel_device=device)
driver.setup()
dl = driver.set_dist_repro_dataloader(dataloader=dl, dist='dist', reproducible=reproducible)

data = []
flags = []
for batch in dl:
d = batch['x'].reshape(-1).tolist()
diff = max(d) - min(d)
assert diff<batch_size*2*2*2
data.extend(d)
flags.append(len(d)==batch_size)
_num_samples = num_samples//num_device
if drop_last and _num_samples%batch_size != 0:
assert len(data)!=_num_samples
assert all(flags) == True
elif _num_samples%batch_size!=0:
assert flags[-1] is False
else:
assert len(data) == _num_samples

if not shuffle:
for i in range(1, len(data)-1):
assert data[i]<data[i-1]
else:
flags = []
for i in range(1, len(data)-1):
flags.append(data[i]<data[i-1])
assert all(flags) is False
datas = fastnlp_paddle_all_gather(data)
if drop_last:
assert len(set(datas[0] + datas[1])) == num_samples-_num_samples%batch_size*2
else:
assert len(set(datas[0] + datas[1])) == num_samples
finally:
if dist.is_initialized():
dist.barrier()
dist.destroy_process_group()

+ 78
- 82
tests/core/drivers/paddle_driver/test_single_device.py View File

@@ -519,6 +519,84 @@ class TestSetDistReproDataloader:

assert len(left_idxes) + len(already_seen_idx) == len(self.dataset)
assert len(left_idxes | already_seen_idx) == len(self.dataset)
@pytest.mark.paddle
@pytest.mark.parametrize("shuffle", ([True, False]))
@pytest.mark.parametrize("batch_size", ([1, 3, 16, 17]))
@pytest.mark.parametrize("drop_last", ([True, False]))
@pytest.mark.parametrize("reproducible", ([True, False]))
def test_shuffle_dataloader(self, shuffle, batch_size, drop_last, reproducible):
# 需要检验一下 set_dist_repro_dataloader 没有修改参数
num_samples = 200
dataset = PaddleNormalXYDataset(num_samples)
dl = prepare_paddle_dataloader(dataset, shuffle=shuffle, batch_size=batch_size, drop_last=drop_last)
model = PaddleNormalModel_Classification_1(1, 2)
dl = self.driver.set_dist_repro_dataloader(dataloader=dl, reproducible=reproducible)

data = []
flags = []
for batch in dl:
flags.append(batch['x'].shape[0] == batch_size)
data.extend(batch['x'].reshape((-1, )).tolist())

if drop_last and num_samples%batch_size != 0:
assert len(data)!=num_samples
assert all(flags) == True
elif num_samples%batch_size!=0:
assert flags[-1] is False
else:
assert len(data) == num_samples

if not shuffle:
for i in range(1, len(data)):
assert data[i]>data[i-1]
else:
flags = []
for i in range(1, len(data)):
flags.append(data[i]>data[i-1])
assert all(flags) is False


@pytest.mark.paddle
@pytest.mark.parametrize("shuffle", ([True, False]))
@pytest.mark.parametrize("batch_size", ([1, 3, 16, 17]))
@pytest.mark.parametrize("drop_last", ([True, False]))
@pytest.mark.parametrize("reproducible", ([True, False]))
def test_batch_sampler_dataloader(self, shuffle, batch_size, drop_last, reproducible):
# 需要检验一下 set_dist_repro_dataloader 没有修改参数
num_samples = 200
dataset = PaddleNormalXYDataset(num_samples)
sampler = BucketedBatchSampler(dataset, length=dataset._data, batch_size=batch_size, drop_last=drop_last,
shuffle=shuffle, num_batch_per_bucket=2)
dl = prepare_paddle_dataloader(dataset, batch_sampler=sampler)
model = PaddleNormalModel_Classification_1(1, 2)
dl = self.driver.set_dist_repro_dataloader(dataloader=dl, reproducible=reproducible)

data = []
flags = []
for batch in dl:
d = batch['x'].reshape((-1, )).tolist()
diff = max(d) - min(d)
assert diff<batch_size*2
data.extend(d)
flags.append(len(d)==batch_size)

if drop_last and num_samples%batch_size != 0:
assert len(data)!=num_samples
assert all(flags) == True
elif num_samples%batch_size!=0:
assert flags[-1] is False
else:
assert len(data) == num_samples

if not shuffle:
for i in range(1, len(data)):
assert data[i]<data[i-1]
else:
flags = []
for i in range(1, len(data)):
flags.append(data[i]<data[i-1])
assert all(flags) is False

############################################################################
#
@@ -739,85 +817,3 @@ def test_save_and_load_with_randomsampler(only_state_dict, fp16):
assert len(left_y_batches | already_seen_y_set) == len(dataset)
finally:
rank_zero_rm(path)


@pytest.mark.torch
@pytest.mark.parametrize("shuffle", ([True, False]))
@pytest.mark.parametrize("batch_size", ([1, 3, 16, 17]))
@pytest.mark.parametrize("drop_last", ([True, False]))
@pytest.mark.parametrize("reproducible", ([True, False]))
def test_shuffle_dataloader(shuffle, batch_size, drop_last, reproducible):
# 需要检验一下 set_dist_repro_dataloader 没有修改参数
num_samples = 200
dataset = PaddleNormalXYDataset(num_samples)
dl = prepare_paddle_dataloader(dataset, shuffle=shuffle, batch_size=batch_size, drop_last=drop_last)
model = PaddleNormalModel_Classification_1(1, 2)
driver = PaddleSingleDriver(model, device="cpu")
dl = driver.set_dist_repro_dataloader(dataloader=dl, reproducible=reproducible)

data = []
flags = []
for batch in dl:
flags.append(batch['x'].shape[0] == batch_size)
data.extend(batch['x'].reshape(-1).tolist())

if drop_last and num_samples%batch_size != 0:
assert len(data)!=num_samples
assert all(flags) == True
elif num_samples%batch_size!=0:
assert flags[-1] is False
else:
assert len(data) == num_samples

if not shuffle:
for i in range(1, len(data)):
assert data[i]>data[i-1]
else:
flags = []
for i in range(1, len(data)):
flags.append(data[i]>data[i-1])
assert all(flags) is False


@pytest.mark.torch
@pytest.mark.parametrize("shuffle", ([True, False]))
@pytest.mark.parametrize("batch_size", ([1, 3, 16, 17]))
@pytest.mark.parametrize("drop_last", ([True, False]))
@pytest.mark.parametrize("reproducible", ([True, False]))
def test_batch_sampler_dataloader(shuffle, batch_size, drop_last, reproducible):
# 需要检验一下 set_dist_repro_dataloader 没有修改参数
num_samples = 200
dataset = PaddleNormalXYDataset(num_samples)
sampler = BucketedBatchSampler(dataset, length=dataset._data, batch_size=batch_size, drop_last=drop_last,
shuffle=shuffle, num_batch_per_bucket=2)
dl = prepare_paddle_dataloader(dataset, batch_sampler=sampler)
model = PaddleNormalModel_Classification_1(1, 2)
driver = PaddleSingleDriver(model, device="cpu")
dl = driver.set_dist_repro_dataloader(dataloader=dl, reproducible=reproducible)

data = []
flags = []
for batch in dl:
d = batch['x'].reshape(-1).tolist()
diff = max(d) - min(d)
assert diff<batch_size*2
data.extend(d)
flags.append(len(d)==batch_size)

if drop_last and num_samples%batch_size != 0:
assert len(data)!=num_samples
assert all(flags) == True
elif num_samples%batch_size!=0:
assert flags[-1] is False
else:
assert len(data) == num_samples

if not shuffle:
for i in range(1, len(data)):
assert data[i]<data[i-1]
else:
flags = []
for i in range(1, len(data)):
flags.append(data[i]<data[i-1])
assert all(flags) is False


Loading…
Cancel
Save