From 6a4fa83df3e458f2457280aabf96f7ee96d71ea7 Mon Sep 17 00:00:00 2001 From: lhenry15 Date: Tue, 9 Feb 2021 14:15:07 -0600 Subject: [PATCH] add csv reader --- tods/common/CSVReader.py | 139 +++++++++++++++++++++++++++++++++++ tods/resources/.entry_points.ini | 2 + tods/tests/common/test_csv_reader.py | 34 +++++++++ 3 files changed, 175 insertions(+) create mode 100644 tods/common/CSVReader.py create mode 100644 tods/tests/common/test_csv_reader.py diff --git a/tods/common/CSVReader.py b/tods/common/CSVReader.py new file mode 100644 index 0000000..662d62d --- /dev/null +++ b/tods/common/CSVReader.py @@ -0,0 +1,139 @@ +import typing +import os +from urllib import parse as url_parse + +import frozendict +import pandas + +from d3m import container, utils as d3m_utils +from d3m.metadata import base as metadata_base +from d3m.base import primitives + + + +class CSVReaderPrimitive(primitives.FileReaderPrimitiveBase): + """ + A primitive which reads columns referencing CSV files. + + Each column which has ``https://metadata.datadrivendiscovery.org/types/FileName`` semantic type + and a valid media type (``text/csv``) has every filename read as a pandas DataFrame. By default + the resulting column with read pandas DataFrames is appended to existing columns. + """ + + _supported_media_types = ( + 'text/csv', + ) + _file_structural_type = container.DataFrame + _file_semantic_types = ('https://metadata.datadrivendiscovery.org/types/Table', 'https://metadata.datadrivendiscovery.org/types/Timeseries') + + metadata = metadata_base.PrimitiveMetadata( + { + 'id': '989562ac-b50f-4462-99cb-abef80d765b2', + 'version': '0.1.0', + 'name': 'Columns CSV reader', + 'python_path': 'd3m.primitives.tods.common.csv_reader', + 'keywords': ['CSV', 'reader'], + 'source': { + 'name': "DATALab@Texas A&M University", + 'contact': 'mailto:mitar.commonprimitives@tnode.com', + }, + 'algorithm_types': [ + metadata_base.PrimitiveAlgorithmType.FILE_MANIPULATION, + ], + 'supported_media_types': _supported_media_types, + 'primitive_family': metadata_base.PrimitiveFamily.DATA_TRANSFORMATION, + } + ) + + def _read_fileuri(self, metadata: frozendict.FrozenOrderedDict, fileuri: str) -> container.DataFrame: + # This is the same logic as used in D3M core package. + # TODO: Deduplicate. + + expected_names = None + if metadata.get('file_columns', None) is not None: + expected_names = [] + for column in metadata['file_columns']: + expected_names.append(column['column_name']) + + # Pandas requires a host for "file" URIs. + parsed_uri = url_parse.urlparse(fileuri, allow_fragments=False) + if parsed_uri.scheme == 'file' and parsed_uri.netloc == '': + parsed_uri = parsed_uri._replace(netloc='localhost') + fileuri = url_parse.urlunparse(parsed_uri) + + data = pandas.read_csv( + fileuri, + usecols=expected_names, + # We do not want to do any conversion of values at this point. + # This should be done by primitives later on. + dtype=str, + # We always expect one row header. + header=0, + # We want empty strings and not NaNs. + na_filter=False, + encoding='utf8', + low_memory=False, + memory_map=True, + ) + + column_names = list(data.columns) + + if expected_names is not None and expected_names != column_names: + raise ValueError("Mismatch between column names in data {column_names} and expected names {expected_names}.".format( + column_names=column_names, + expected_names=expected_names, + )) + + if data is None: + raise FileNotFoundError("Data file for table '{file_path}' cannot be found.".format( + file_path=fileuri, + )) + + data = container.DataFrame(data, { + 'schema': metadata_base.CONTAINER_SCHEMA_VERSION, + 'structural_type': container.DataFrame, + }, generate_metadata=False) + + assert column_names is not None + + for i, column_name in enumerate(column_names): + data.metadata = data.metadata.update((metadata_base.ALL_ELEMENTS, i), { + 'name': column_name, + 'structural_type': str, + }) + return data + + def _produce_column_metadata(self, inputs_metadata: metadata_base.DataMetadata, column_index: int, + read_files: typing.Sequence[typing.Any]) -> metadata_base.DataMetadata: + # We do not pass "read_files" to parent method but we apply it at the end of this method ourselves. + column_metadata = super()._produce_column_metadata(inputs_metadata, column_index, []) + column_metadata = column_metadata.update_column(0, { + # Clear metadata useful for filename columns. + 'file_columns': metadata_base.NO_VALUE, + }) + + # We might have metadata about columns, apply it here. + column_meta = inputs_metadata.query_column(column_index) + if column_meta.get('file_columns', None): + for i, column in enumerate(column_meta['file_columns']): + column_metadata = column_metadata.update((metadata_base.ALL_ELEMENTS, 0, metadata_base.ALL_ELEMENTS, i), column) + + # We know which columns are there, but also we know that we are reading everything as strings, so we can set that as well. + column_metadata = column_metadata.update( + (metadata_base.ALL_ELEMENTS, 0, metadata_base.ALL_ELEMENTS, i), + { + 'structural_type': str, + 'column_name': metadata_base.NO_VALUE, + 'column_index': metadata_base.NO_VALUE, + } + ) + + # A DataFrame is always a table as well. + column_metadata = column_metadata.add_semantic_type((metadata_base.ALL_ELEMENTS, 0), 'https://metadata.datadrivendiscovery.org/types/Table') + + # We do not pass "read_files" to parent method but we apply it here ourselves. + # This makes sure that metadata read from data override any metadata from metadata. + for row_index, file in enumerate(read_files): + column_metadata = file.metadata.copy_to(column_metadata, (), (row_index, 0)) + + return column_metadata diff --git a/tods/resources/.entry_points.ini b/tods/resources/.entry_points.ini index a5fa52e..cc194c9 100644 --- a/tods/resources/.entry_points.ini +++ b/tods/resources/.entry_points.ini @@ -88,3 +88,5 @@ tods.evaluation.kfold_time_series_split = tods.common.KFoldSplitTimeseries:KFold tods.evaluation.no_split_dataset_split = tods.common.NoSplit:NoSplitDatasetSplitPrimitive tods.evaluation.train_score_dataset_split = tods.common.TrainScoreSplit:TrainScoreDatasetSplitPrimitive tods.evaluation.redact_columns = tods.common.RedactColumns:RedactColumnsPrimitive + +tods.common.csv_reader = tods.common.CSVReader:CSVReaderPrimitive diff --git a/tods/tests/common/test_csv_reader.py b/tods/tests/common/test_csv_reader.py new file mode 100644 index 0000000..60d8ae0 --- /dev/null +++ b/tods/tests/common/test_csv_reader.py @@ -0,0 +1,34 @@ +import unittest +import os + +from d3m import container + +from tods.common import CSVReader +from tods.data_processing import DatasetToDataframe + + +class CSVReaderPrimitiveTestCase(unittest.TestCase): + + def _get_yahoo_dataset(self): + dataset_doc_path = os.path.abspath(os.path.join(os.path.dirname(__file__),'..', '..', '..', 'datasets', 'anomaly','yahoo_sub_5','TRAIN','dataset_TRAIN', 'datasetDoc.json')) + + dataset = container.Dataset.load('file://{dataset_doc_path}'.format(dataset_doc_path=dataset_doc_path)) + + return dataset + def test_basic(self): + dataset_doc_path = os.path.abspath(os.path.join(os.path.dirname(__file__),'..', '..', '..', 'datasets', 'anomaly','yahoo_sub_5','TRAIN','dataset_TRAIN', 'datasetDoc.json')) + dataset = container.Dataset.load('file://{dataset_doc_path}'.format(dataset_doc_path=dataset_doc_path)) + + dataframe_hyperparams_class = DatasetToDataframe.DatasetToDataFramePrimitive.metadata.get_hyperparams() + dataframe_primitive = DatasetToDataframe.DatasetToDataFramePrimitive(hyperparams=dataframe_hyperparams_class.defaults()) + dataframe = dataframe_primitive.produce(inputs=dataset).value + + csv_hyperparams_class = CSVReader.CSVReaderPrimitive.metadata.get_hyperparams() + csv_primitive = CSVReader.CSVReaderPrimitive(hyperparams=csv_hyperparams_class.defaults().replace({'return_result': 'replace'})) + tables = csv_primitive.produce(inputs=dataframe).value + + self.assertEqual(tables.shape, (1260, 8)) + + +if __name__ == '__main__': + unittest.main()