import os import pytest import numpy as np from fastNLP.core.dataset import DataSet, FieldArray, Instance, ApplyResultException from fastNLP import logger class TestDataSetInit: """初始化DataSet的办法有以下几种: 1) 用dict: 1.1) 二维list DataSet({"x": [[1, 2], [3, 4]]}) 1.2) 二维array DataSet({"x": np.array([[1, 2], [3, 4]])}) 1.3) 三维list DataSet({"x": [[[1, 2], [3, 4]], [[1, 2], [3, 4]]]}) 2) 用list of Instance: 2.1) 一维list DataSet([Instance(x=[1, 2, 3, 4])]) 2.2) 一维array DataSet([Instance(x=np.array([1, 2, 3, 4]))]) 2.3) 二维list DataSet([Instance(x=[[1, 2], [3, 4]])]) 2.4) 二维array DataSet([Instance(x=np.array([[1, 2], [3, 4]]))]) 只接受纯list或者最外层ndarray """ def test_init_v1(self): # 一维list ds = DataSet([Instance(x=[1, 2, 3, 4], y=[5, 6])] * 40) assert ("x" in ds.field_arrays and "y" in ds.field_arrays) == True assert ds.field_arrays["x"].content == [[1, 2, 3, 4], ] * 40 assert ds.field_arrays["y"].content == [[5, 6], ] * 40 def test_init_v2(self): # 用dict ds = DataSet({"x": [[1, 2, 3, 4]] * 40, "y": [[5, 6]] * 40}) assert ("x" in ds.field_arrays and "y" in ds.field_arrays) == True assert ds.field_arrays["x"].content == [[1, 2, 3, 4], ] * 40 assert ds.field_arrays["y"].content == [[5, 6], ] * 40 def test_init_assert(self): with pytest.raises(AssertionError): _ = DataSet({"x": [[1, 2, 3, 4]] * 40, "y": [[5, 6]] * 100}) with pytest.raises(AssertionError): _ = DataSet([[1, 2, 3, 4]] * 10) with pytest.raises(ValueError): _ = DataSet(0.00001) class TestDataSetMethods: def test_append(self): dd = DataSet() for _ in range(3): dd.append(Instance(x=[1, 2, 3, 4], y=[5, 6])) assert len(dd) == 3 assert dd.field_arrays["x"].content == [[1, 2, 3, 4]] * 3 assert dd.field_arrays["y"].content == [[5, 6]] * 3 def test_add_field(self): dd = DataSet() dd.add_field("x", [[1, 2, 3]] * 10) dd.add_field("y", [[1, 2, 3, 4]] * 10) dd.add_field("z", [[5, 6]] * 10) assert len(dd) == 10 assert dd.field_arrays["x"].content == [[1, 2, 3]] * 10 assert dd.field_arrays["y"].content == [[1, 2, 3, 4]] * 10 assert dd.field_arrays["z"].content == [[5, 6]] * 10 with pytest.raises(RuntimeError): dd.add_field("??", [[1, 2]] * 40) def test_delete_field(self): dd = DataSet() dd.add_field("x", [[1, 2, 3]] * 10) dd.add_field("y", [[1, 2, 3, 4]] * 10) dd.delete_field("x") assert ("x" in dd.field_arrays) == False assert "y" in dd.field_arrays def test_delete_instance(self): dd = DataSet() old_length = 2 dd.add_field("x", [[1, 2, 3]] * old_length) dd.add_field("y", [[1, 2, 3, 4]] * old_length) dd.delete_instance(0) assert len(dd) == old_length - 1 dd.delete_instance(0) assert len(dd) == old_length - 2 def test_getitem(self): ds = DataSet({"x": [[1, 2, 3, 4]] * 40, "y": [[5, 6]] * 40}) ins_1, ins_0 = ds[0], ds[1] assert isinstance(ins_1, Instance) and isinstance(ins_0, Instance) == True assert ins_1["x"] == [1, 2, 3, 4] assert ins_1["y"] == [5, 6] assert ins_0["x"] == [1, 2, 3, 4] assert ins_0["y"] == [5, 6] sub_ds = ds[:10] assert isinstance(sub_ds, DataSet) == True assert len(sub_ds) == 10 sub_ds_1 = ds[[10, 0, 2, 3]] assert isinstance(sub_ds_1, DataSet) == True assert len(sub_ds_1) == 4 field_array = ds['x'] assert isinstance(field_array, FieldArray) == True assert len(field_array) == 40 def test_setitem(self): ds = DataSet({"x": [[1, 2, 3, 4]] * 40, "y": [[5, 6]] * 40}) ds.add_field('i', list(range(len(ds)))) assert ds.get_field('i').content == list(range(len(ds))) import random random.shuffle(ds) import numpy as np np.random.shuffle(ds) assert ds.get_field('i').content != list(range(len(ds))) ins1 = ds[1] ds[2] = ds[1] assert ds[2]['x'] == ins1['x'] and ds[2]['y'] == ins1['y'] def test_get_item_error(self): with pytest.raises(RuntimeError): ds = DataSet({"x": [[1, 2, 3, 4]] * 10, "y": [[5, 6]] * 10}) _ = ds[40:] with pytest.raises(KeyError): ds = DataSet({"x": [[1, 2, 3, 4]] * 10, "y": [[5, 6]] * 10}) _ = ds["kom"] def test_len_(self): ds = DataSet({"x": [[1, 2, 3, 4]] * 40, "y": [[5, 6]] * 40}) assert len(ds) == 40 ds = DataSet() assert len(ds) == 0 def test_add_fieldarray(self): ds = DataSet({"x": [[1, 2, 3, 4]] * 40, "y": [[5, 6]] * 40}) ds.add_fieldarray('z', FieldArray('z', [[7, 8]] * 40)) assert ds['z'].content == [[7, 8]] * 40 with pytest.raises(RuntimeError): ds.add_fieldarray('z', FieldArray('z', [[7, 8]] * 10)) with pytest.raises(TypeError): ds.add_fieldarray('z', [1, 2, 4]) def test_copy_field(self): ds = DataSet({"x": [[1, 2, 3, 4]] * 40, "y": [[5, 6]] * 40}) ds.copy_field('x', 'z') assert ds['x'].content == ds['z'].content def test_has_field(self): ds = DataSet({"x": [[1, 2, 3, 4]] * 40, "y": [[5, 6]] * 40}) assert ds.has_field('x') == True assert ds.has_field('z') == False def test_get_field(self): ds = DataSet({"x": [[1, 2, 3, 4]] * 40, "y": [[5, 6]] * 40}) with pytest.raises(KeyError): ds.get_field('z') x_array = ds.get_field('x') assert x_array.content == [[1, 2, 3, 4]] * 40 def test_get_all_fields(self): ds = DataSet({"x": [[1, 2, 3, 4]] * 40, "y": [[5, 6]] * 40}) field_arrays = ds.get_all_fields() assert field_arrays["x"].content == [[1, 2, 3, 4]] * 40 assert field_arrays['y'].content == [[5, 6]] * 40 def test_get_field_names(self): ds = DataSet({"x": [[1, 2, 3, 4]] * 40, "y": [[5, 6]] * 40}) field_names = ds.get_field_names() assert 'x' in field_names assert 'y' in field_names def test_apply(self): ds = DataSet({"x": [[1, 2, 3, 4]] * 4000, "y": [[5, 6]] * 4000}) ds.apply(lambda ins: ins["x"][::-1], new_field_name="rx", progress_desc='rx') assert ("rx" in ds.field_arrays) == True assert ds.field_arrays["rx"].content[0] == [4, 3, 2, 1] ds.apply(lambda ins: len(ins["y"]), new_field_name="y", progress_bar=None) assert ds.field_arrays["y"].content[0] == 2 res = ds.apply(lambda ins: len(ins["x"]), num_proc=0, progress_desc="len") assert (isinstance(res, list) and len(res) > 0) == True assert res[0] == 4 ds.apply(lambda ins: (len(ins["x"]), "hahaha"), new_field_name="k") # expect no exception raised def test_apply_progress_bar(self): import time ds = DataSet({"x": [[1, 2, 3, 4]] * 400, "y": [[5, 6]] * 400}) def do_nothing(ins): time.sleep(0.01) ds.apply(do_nothing, progress_bar='rich', num_proc=0) ds.apply_field(do_nothing, field_name='x', progress_bar='rich') def test_apply_cannot_modify_instance(self): ds = DataSet({"x": [[1, 2, 3, 4]] * 40, "y": [[5, 6]] * 40}) def modify_inplace(instance): instance['words'] = 1 ds.apply(modify_inplace) # with self.assertRaises(TypeError): # ds.apply(modify_inplace) def test_apply_more(self): T = DataSet({"a": [1, 2, 3], "b": [2, 4, 5]}) func_1 = lambda x: {"c": x["a"] * 2, "d": x["a"] ** 2} func_2 = lambda x: {"c": x * 3, "d": x ** 3} def func_err_1(x): if x["a"] == 1: return {"e": x["a"] * 2, "f": x["a"] ** 2} else: return {"e": x["a"] * 2} def func_err_2(x): if x == 1: return {"e": x * 2, "f": x ** 2} else: return {"e": x * 2} T.apply_more(func_1) # print(T['c'][0, 1, 2]) assert list(T["c"].content) == [2, 4, 6] assert list(T["d"].content) == [1, 4, 9] res = T.apply_field_more(func_2, "a", modify_fields=False) assert list(T["c"].content) == [2, 4, 6] assert list(T["d"].content) == [1, 4, 9] assert list(res["c"]) == [3, 6, 9] assert list(res["d"]) == [1, 8, 27] with pytest.raises(ApplyResultException) as e: T.apply_more(func_err_1) print(e) with pytest.raises(ApplyResultException) as e: T.apply_field_more(func_err_2, "a") print(e) def test_drop(self): ds = DataSet({"x": [[1, 2, 3, 4]] * 40, "y": [[5, 6], [7, 8, 9, 0]] * 20}) ds.drop(lambda ins: len(ins["y"]) < 3, inplace=True) assert len(ds) == 20 def test_contains(self): ds = DataSet({"x": [[1, 2, 3, 4]] * 40, "y": [[5, 6]] * 40}) assert ("x" in ds) == True assert ("y" in ds) == True assert ("z" in ds) == False def test_rename_field(self): ds = DataSet({"x": [[1, 2, 3, 4]] * 10, "y": [[5, 6]] * 10}) ds.rename_field("x", "xx") assert ("xx" in ds) == True assert ("x" in ds) == False with pytest.raises(KeyError): ds.rename_field("yyy", "oo") def test_split(self): ds = DataSet({"x": [[1, 2, 3, 4]] * 10, "y": [[5, 6]] * 10}) d1, d2 = ds.split(0.1) assert len(d2) == (len(ds) * 0.9) assert len(d1) == (len(ds) * 0.1) def test_add_field_v2(self): ds = DataSet({"x": [3, 4]}) ds.add_field('y', [['hello', 'world'], ['this', 'is', 'a', 'test']]) # ds.apply(lambda x:[x['x']]*3, new_field_name='y') print(ds) def test_save_load(self): ds = DataSet({"x": [[1, 2, 3, 4]] * 10, "y": [[5, 6]] * 10}) ds.save("./my_ds.pkl") assert os.path.exists("./my_ds.pkl") == True ds_1 = DataSet.load("./my_ds.pkl") os.remove("my_ds.pkl") def test_add_null(self): ds = DataSet() with pytest.raises(RuntimeError) as RE: ds.add_field('test', []) def test_concat(self): """ 测试两个dataset能否正确concat """ ds1 = DataSet({"x": [[1, 2, 3, 4] for _ in range(10)], "y": [[5, 6] for _ in range(10)]}) ds2 = DataSet({"x": [[4, 3, 2, 1] for _ in range(10)], "y": [[6, 5] for _ in range(10)]}) ds3 = ds1.concat(ds2) assert len(ds3) == 20 assert ds1[9]['x'] == [1, 2, 3, 4] assert ds1[10]['x'] == [4, 3, 2, 1] ds2[0]['x'][0] = 100 assert ds3[10]['x'][0] == 4 # 不改变copy后的field了 ds3[10]['x'][0] = -100 assert ds2[0]['x'][0] == 100 # 不改变copy前的field了 # 测试inplace ds1 = DataSet({"x": [[1, 2, 3, 4] for i in range(10)], "y": [[5, 6] for i in range(10)]}) ds2 = DataSet({"x": [[4, 3, 2, 1] for i in range(10)], "y": [[6, 5] for i in range(10)]}) ds3 = ds1.concat(ds2, inplace=True) ds2[0]['x'][0] = 100 assert ds3[10]['x'][0] == 4 # 不改变copy后的field了 ds3[10]['x'][0] = -100 assert ds2[0]['x'][0] == 100 # 不改变copy前的field了 ds3[0]['x'][0] = 100 assert ds1[0]['x'][0] == 100 # 改变copy前的field了 # 测试mapping ds1 = DataSet({"x": [[1, 2, 3, 4] for i in range(10)], "y": [[5, 6] for i in range(10)]}) ds2 = DataSet({"X": [[4, 3, 2, 1] for i in range(10)], "Y": [[6, 5] for i in range(10)]}) ds3 = ds1.concat(ds2, field_mapping={'X': 'x', 'Y': 'y'}) assert len(ds3) == 20 # 测试忽略掉多余的 ds1 = DataSet({"x": [[1, 2, 3, 4] for i in range(10)], "y": [[5, 6] for i in range(10)]}) ds2 = DataSet({"X": [[4, 3, 2, 1] for i in range(10)], "Y": [[6, 5] for i in range(10)], 'Z': [0] * 10}) ds3 = ds1.concat(ds2, field_mapping={'X': 'x', 'Y': 'y'}) # 测试报错 ds1 = DataSet({"x": [[1, 2, 3, 4] for i in range(10)], "y": [[5, 6] for i in range(10)]}) ds2 = DataSet({"X": [[4, 3, 2, 1] for i in range(10)]}) with pytest.raises(RuntimeError): ds3 = ds1.concat(ds2, field_mapping={'X': 'x'}) def test_instance_field_disappear_bug(self): data = DataSet({'raw_chars': [[0, 1], [2]], 'target': [0, 1]}) data.copy_field(field_name='raw_chars', new_field_name='chars') _data = data[:1] for field_name in ['raw_chars', 'target', 'chars']: assert _data.has_field(field_name) == True def test_from_pandas(self): import pandas as pd df = pd.DataFrame({'x': [1, 2, 3], 'y': [4, 5, 6]}) ds = DataSet.from_pandas(df) print(ds) assert ds['x'].content == [1, 2, 3] assert ds['y'].content == [4, 5, 6] def test_to_pandas(self): ds = DataSet({'x': [1, 2, 3], 'y': [4, 5, 6]}) df = ds.to_pandas() def test_to_csv(self): ds = DataSet({'x': [1, 2, 3], 'y': [4, 5, 6]}) ds.to_csv("1.csv") assert os.path.exists("1.csv") == True os.remove("1.csv") def test_add_seq_len(self): ds = DataSet({'x': [[1, 2], [2, 3, 4], [3]], 'y': [4, 5, 6]}) ds.add_seq_len('x') print(ds) def test_apply_proc(self): data = DataSet({'x': ['xxxxas1w xw zxw xz', 'xxxxas1w xw zxw xz'] * 100, 'y': [0, 1] * 100}) data.apply_field(lambda x: len(x), field_name='x', new_field_name='len_x', num_proc=0) def test_apply_more_proc(self): def func(x): print("x") logger.info("demo") return len(x) data = DataSet({'x': ['xxxxas1w xw zxw xz', 'xxxxas1w xw zxw xz'] * 100, 'y': [0, 1] * 100}) data.apply_field(func, field_name='x', new_field_name='len_x', num_proc=2) class TestFieldArrayInit: """ 1) 如果DataSet使用dict初始化,那么在add_field中会构造FieldArray: 1.1) 二维list DataSet({"x": [[1, 2], [3, 4]]}) 1.2) 二维array DataSet({"x": np.array([[1, 2], [3, 4]])}) 1.3) 三维list DataSet({"x": [[[1, 2], [3, 4]], [[1, 2], [3, 4]]]}) 2) 如果DataSet使用list of Instance 初始化,那么在append中会先对第一个样本初始化FieldArray; 然后后面的样本使用FieldArray.append进行添加。 2.1) 一维list DataSet([Instance(x=[1, 2, 3, 4])]) 2.2) 一维array DataSet([Instance(x=np.array([1, 2, 3, 4]))]) 2.3) 二维list DataSet([Instance(x=[[1, 2], [3, 4]])]) 2.4) 二维array DataSet([Instance(x=np.array([[1, 2], [3, 4]]))]) """ def test_init_v1(self): # 二维list fa = FieldArray("x", [[1, 2], [3, 4]] * 5) def test_init_v2(self): # 二维array fa = FieldArray("x", np.array([[1, 2], [3, 4]] * 5)) def test_init_v3(self): # 三维list fa = FieldArray("x", [[[1, 2], [3, 4]], [[1, 2], [3, 4]]]) def test_init_v4(self): # 一维list val = [1, 2, 3, 4] fa = FieldArray("x", [val]) fa.append(val) def test_init_v5(self): # 一维array val = np.array([1, 2, 3, 4]) fa = FieldArray("x", [val]) fa.append(val) def test_init_v6(self): # 二维array val = [[1, 2], [3, 4]] fa = FieldArray("x", [val]) fa.append(val) def test_init_v7(self): # list of array fa = FieldArray("x", [np.array([[1, 2], [3, 4]]), np.array([[1, 2], [3, 4]])]) def test_init_v8(self): # 二维list val = np.array([[1, 2], [3, 4]]) fa = FieldArray("x", [val]) fa.append(val) class TestFieldArray: def test_main(self): fa = FieldArray("x", [1, 2, 3, 4, 5]) assert len(fa) == 5 fa.append(6) assert len(fa) == 6 assert fa[-1] == 6 assert fa[0] == 1 fa[-1] = 60 assert fa[-1] == 60 assert fa.get(0) == 1 assert isinstance(fa.get([0, 1, 2]), np.ndarray) == True assert list(fa.get([0, 1, 2])) == [1, 2, 3] def test_getitem_v1(self): fa = FieldArray("y", [[1.1, 2.2, 3.3, 4.4, 5.5], [1.0, 2.0, 3.0, 4.0, 5.0]]) assert fa[0] == [1.1, 2.2, 3.3, 4.4, 5.5] ans = fa[[0, 1]] assert isinstance(ans, np.ndarray) == True assert isinstance(ans[0], np.ndarray) == True assert ans[0].tolist() == [1.1, 2.2, 3.3, 4.4, 5.5] assert ans[1].tolist() == [1, 2, 3, 4, 5] assert ans.dtype == np.float64 def test_getitem_v2(self): x = np.random.rand(10, 5) fa = FieldArray("my_field", x) indices = [0, 1, 3, 4, 6] for a, b in zip(fa[indices], x[indices]): assert a.tolist() == b.tolist() def test_append(self): fa = FieldArray("y", [[1.1, 2.2, 3.3, 4.4, 5.5], [1.0, 2.0, 3.0, 4.0, 5.0]]) fa.append([1.2, 2.3, 3.4, 4.5, 5.6]) assert len(fa) == 3 assert fa[2] == [1.2, 2.3, 3.4, 4.5, 5.6] def test_pop(self): fa = FieldArray("y", [[1.1, 2.2, 3.3, 4.4, 5.5], [1.0, 2.0, 3.0, 4.0, 5.0]]) fa.pop(0) assert len(fa) == 1 assert fa[0] == [1.0, 2.0, 3.0, 4.0, 5.0] fa[0] = [1.1, 2.2, 3.3, 4.4, 5.5] assert fa[0] == [1.1, 2.2, 3.3, 4.4, 5.5] class TestCase: def test_init(self): fields = {"x": [1, 2, 3], "y": [4, 5, 6]} ins = Instance(x=[1, 2, 3], y=[4, 5, 6]) assert isinstance(ins.fields, dict) == True assert ins.fields == fields ins = Instance(**fields) assert ins.fields == fields def test_add_field(self): fields = {"x": [1, 2, 3], "y": [4, 5, 6]} ins = Instance(**fields) ins.add_field("z", [1, 1, 1]) fields.update({"z": [1, 1, 1]}) assert ins.fields == fields def test_get_item(self): fields = {"x": [1, 2, 3], "y": [4, 5, 6], "z": [1, 1, 1]} ins = Instance(**fields) assert ins["x"] == [1, 2, 3] assert ins["y"] == [4, 5, 6] assert ins["z"] == [1, 1, 1] def test_repr(self): fields = {"x": [1, 2, 3], "y": [4, 5, 6], "z": [1, 1, 1]} ins = Instance(**fields) # simple print, that is enough. print(ins) def test_dataset(self): from datasets import Dataset as HuggingfaceDataset # ds = DataSet({"x": ["11sxa", "1sasz"]*100, "y": [0, 1]*100}) ds = HuggingfaceDataset.from_dict({"x": ["11sxa", "1sasz"]*100, "y": [0, 1]*100}) print(DataSet.from_datasets(ds)) # print(ds.from_datasets())