diff --git a/tods/detection_algorithm/MatrixProfile.py b/tods/detection_algorithm/MatrixProfile.py index 9b2bfa5..669e3e4 100644 --- a/tods/detection_algorithm/MatrixProfile.py +++ b/tods/detection_algorithm/MatrixProfile.py @@ -37,6 +37,8 @@ from d3m import container, utils as d3m_utils from .UODBasePrimitive import Params_ODBase, Hyperparams_ODBase, UnsupervisedOutlierDetectorBase import stumpy + +from sklearn.preprocessing import MinMaxScaler # from typing import Union Inputs = d3m_dataframe @@ -51,14 +53,20 @@ class Params(Params_ODBase): class Hyperparams(Hyperparams_ODBase): ######## Add more Attributes ####### - pass + #pass + window_size = hyperparams.Hyperparameter[int]( + default=3, + description='The moving window size.', + semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter'] + ) class MP: """ This is the class for matrix profile function """ - def __init__(self, window_size): + def __init__(self, window_size, step_size): self._window_size = window_size + self._step_size = step_size return def fit(self, X, y=None): @@ -102,7 +110,7 @@ class MP: nparray """ - + """ #only keep first two columns of MP results, the second column is left index, use windowsize to get right index transformed_columns=utils.pandas.DataFrame() for col in data.transpose(): #data.reshape(1,len(data)): @@ -112,6 +120,33 @@ class MP: output = self._get_right_inds(output) transformed_columns=pd.concat([transformed_columns,output], axis=1) return transformed_columns + """ + #data = np.random.rand(3, 1000) + #data = np.array([[1., 2., 3., 4.], [5., 6., 7., 8.], [9., 10., 11., 12.]]) + #data = np.array([1., 2., 3., 4., 5., 6., 7., 8., 9., 10., 11., 12.]) + #data = np.array([[3., 4., 8.6, 13., 22.5, 17, 19.2, 36.1, 127, -23, 59.2, -10],[3., 4., 8.6, 13., 22.5, 17, 19.2, 36.1, 127, -23, 59.2, -10],[3., 4., 8.6, 13., 22.5, 17, 19.2, 36.1, 127, -23, 59.2, -10]]) + + matrix_profile, matrix_profile_indices = stumpy.mstump(data.transpose(), m = self._window_size) + #matrix_profile, matrix_profile_indices = stumpy.mstump(data, m = self._window_size) + + left_inds_ = numpy.arange(0, len(matrix_profile), self._step_size) + right_inds_ = left_inds_ + self._window_size + right_inds_[right_inds_ > len(matrix_profile)] = len(matrix_profile) + left_inds_ = np.array([left_inds_]).transpose() + right_inds_ = np.array([right_inds_]).transpose() + + # apply min-max scaling + scaler = MinMaxScaler() + scaler = scaler.fit(matrix_profile) + matrix_profile = scaler.transform(matrix_profile) + output = [] + for timestamp in matrix_profile: + timestamp = sum(timestamp) + output.append([timestamp]) + + output = np.concatenate((output, left_inds_, right_inds_),axis=1) + + return output def predict(self, data): return self.produce(data) @@ -168,7 +203,7 @@ class MatrixProfilePrimitive(UnsupervisedOutlierDetectorBase[Inputs, Outputs, Pa docker_containers: Dict[str, DockerContainer] = None) -> None: super().__init__(hyperparams=hyperparams, random_seed=random_seed, docker_containers=docker_containers) - self._clf = MP(window_size=hyperparams['window_size']) + self._clf = MP(window_size=hyperparams['window_size'], step_size=hyperparams['step_size']) def set_training_data(self, *, inputs: Inputs) -> None: """ diff --git a/tods/feature_analysis/MatrixProfile.py b/tods/feature_analysis/MatrixProfile.py new file mode 100644 index 0000000..87443f2 --- /dev/null +++ b/tods/feature_analysis/MatrixProfile.py @@ -0,0 +1,415 @@ +from typing import Any, Callable, List, Dict, Union, Optional, Sequence, Tuple +from numpy import ndarray +from collections import OrderedDict +from scipy import sparse +import os +import sklearn +import numpy +import typing +import pandas as pd + +# Custom import commands if any +import warnings +import numpy as np +from sklearn.utils import check_array +from sklearn.exceptions import NotFittedError +# from numba import njit +from pyod.utils.utility import argmaxn + +from d3m.container.numpy import ndarray as d3m_ndarray +from d3m.container import DataFrame as d3m_dataframe +from d3m.metadata import hyperparams, params, base as metadata_base +from d3m import utils +from d3m.base import utils as base_utils +from d3m.exceptions import PrimitiveNotFittedError +from d3m.primitive_interfaces.base import CallResult, DockerContainer + +from d3m.primitive_interfaces.transformer import TransformerPrimitiveBase +from d3m.primitive_interfaces import base, transformer +from d3m.primitive_interfaces.base import ProbabilisticCompositionalityMixin, ContinueFitMixin +from d3m import exceptions +import pandas +import uuid + +from d3m import container, utils as d3m_utils + +import stumpy + +from sklearn.preprocessing import MinMaxScaler +# from typing import Union + +Inputs = d3m_dataframe +Outputs = d3m_dataframe + + +class Hyperparams(hyperparams.Hyperparams): + ######## Add more Attributes ####### + #pass + window_size = hyperparams.Hyperparameter[int]( + default=3, + description='The moving window size.', + semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter'] + ) + + # Keep previous + dataframe_resource = hyperparams.Hyperparameter[typing.Union[str, None]]( + default=None, + semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], + description="Resource ID of a DataFrame to extract if there are multiple tabular resources inside a Dataset and none is a dataset entry point.", + ) + use_columns = hyperparams.Set( + elements=hyperparams.Hyperparameter[int](-1), + default=(2,), + semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], + description="A set of column indices to force primitive to operate on. If any specified column cannot be parsed, it is skipped.", + ) + exclude_columns = hyperparams.Set( + elements=hyperparams.Hyperparameter[int](-1), + default=(0,1,3,), + semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], + description="A set of column indices to not operate on. Applicable only if \"use_columns\" is not provided.", + ) + return_result = hyperparams.Enumeration( + values=['append', 'replace', 'new'], + default='new', + semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], + description="Should parsed columns be appended, should they replace original columns, or should only parsed columns be returned? This hyperparam is ignored if use_semantic_types is set to false.", + ) + use_semantic_types = hyperparams.UniformBool( + default=False, + semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], + description="Controls whether semantic_types metadata will be used for filtering columns in input dataframe. Setting this to false makes the code ignore return_result and will produce only the output dataframe" + ) + add_index_columns = hyperparams.UniformBool( + default=False, + semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], + description="Also include primary index columns if input data has them. Applicable only if \"return_result\" is set to \"new\".", + ) + error_on_no_input = hyperparams.UniformBool( + default=True, + semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], + description="Throw an exception if no input column is selected/provided. Defaults to true to behave like sklearn. To prevent pipelines from breaking set this to False.", + ) + + return_semantic_type = hyperparams.Enumeration[str]( + values=['https://metadata.datadrivendiscovery.org/types/Attribute', + 'https://metadata.datadrivendiscovery.org/types/ConstructedAttribute'], + default='https://metadata.datadrivendiscovery.org/types/Attribute', + description='Decides what semantic type to attach to generated attributes', + semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'] + ) + +class MP: + """ + This is the class for matrix profile function + """ + def __init__(self, window_size): #, step_size): + self._window_size = window_size + #self._step_size = step_size + return + + def produce(self, data): + + """ + Args: + data: dataframe column + Returns: + nparray + """ + """ + #only keep first two columns of MP results, the second column is left index, use windowsize to get right index + transformed_columns=utils.pandas.DataFrame() + for col in data.transpose(): #data.reshape(1,len(data)): + output = stumpy.stump(col, m = self._window_size) + output = pd.DataFrame(output) + output=output.drop(columns=[2,3]) + output = self._get_right_inds(output) + transformed_columns=pd.concat([transformed_columns,output], axis=1) + return transformed_columns + """ + matrix_profile, matrix_profile_indices = stumpy.mstump(data, m = self._window_size) + print('matrix profile ', matrix_profile) + #output = np.concatenate((output, left_inds_, right_inds_),axis=1) + return matrix_profile + +class MatrixProfilePrimitive(transformer.TransformerPrimitiveBase[Inputs, Outputs, Hyperparams]): + """ + + A primitive that performs matrix profile on a DataFrame using Stumpy package + Stumpy documentation: https://stumpy.readthedocs.io/en/latest/index.html + + Parameters + ---------- + T_A : ndarray + The time series or sequence for which to compute the matrix profile + m : int + Window size + T_B : ndarray + The time series or sequence that contain your query subsequences + of interest. Default is `None` which corresponds to a self-join. + ignore_trivial : bool + Set to `True` if this is a self-join. Otherwise, for AB-join, set this + to `False`. Default is `True`. + Returnsfdsf + ------- + out : ndarray + The first column consists of the matrix profile, the second column + consists of the matrix profile indices, the third column consists of + the left matrix profile indices, and the fourth column consists of + the right matrix profile indices. + """ + + metadata = metadata_base.PrimitiveMetadata({ + '__author__': "DATA Lab @Texas A&M University", + 'name': "Matrix Profile", + 'python_path': 'd3m.primitives.tods.feature_analysis.matrix_profile', + 'source': { + 'name': "DATA Lab @Taxes A&M University", + 'contact': 'mailto:khlai037@tamu.edu', + }, + 'hyperparams_to_tune': ['window_size'], + 'version': '0.0.2', + 'algorithm_types': [ + metadata_base.PrimitiveAlgorithmType.TODS_PRIMITIVE, + ], + 'primitive_family': metadata_base.PrimitiveFamily.ANOMALY_DETECTION, + 'id': str(uuid.uuid3(uuid.NAMESPACE_DNS, 'MatrixProfilePrimitive')), + }) + + + def __init__(self, *, + hyperparams: Hyperparams, # + random_seed: int = 0, + docker_containers: Dict[str, DockerContainer] = None) -> None: + super().__init__(hyperparams=hyperparams, random_seed=random_seed, docker_containers=docker_containers) + + self._clf = MP(window_size=hyperparams['window_size']) #, step_size=hyperparams['step_size']) + + + def produce(self, *, inputs: Inputs, timeout: float = None, iterations: int = None) -> base.CallResult[Outputs]: + """ + Process the testing data. + Args: + inputs: Container DataFrame. Time series data up to Wavelet transform. + + Returns: + [cA_n, cD_n, cD_n-1, …, cD2, cD1]: Container DataFrame after Wavelet Transformation. + Ordered frame of coefficients arrays where n denotes the level of decomposition. The first element (cA_n) of the result is approximation coefficients array and the following elements (cD_n - cD_1) are details coefficients arrays. + """ + assert isinstance(inputs, container.DataFrame), type(container.DataFrame) + + _, self._columns_to_produce = self._get_columns_to_fit(inputs, self.hyperparams) + self._input_column_names = inputs.columns + + # print('columns_to_produce=', self._columns_to_produce) + + + sk_inputs = inputs + if self.hyperparams['use_semantic_types']: # pragma: no cover + sk_inputs = inputs.iloc[:, self._columns_to_produce] + output_columns = [] + if len(self._columns_to_produce) > 0: + sk_output = self._clf.produce(sk_inputs) + if sparse.issparse(sk_output): # pragma: no cover + sk_output = sk_output.toarray() + outputs = self._wrap_predictions(inputs, sk_output) + if len(outputs.columns) == len(self._input_column_names): + outputs.columns = self._input_column_names + output_columns = [outputs] + else: # pragma: no cover + if self.hyperparams['error_on_no_input']: + raise RuntimeError("No input columns were selected") + self.logger.warn("No input columns were selected") + outputs = base_utils.combine_columns(return_result=self.hyperparams['return_result'], + add_index_columns=self.hyperparams['add_index_columns'], + inputs=inputs, column_indices=self._columns_to_produce, + columns_list=output_columns) + + # outputs = inputs + return base.CallResult(outputs) + + @classmethod + def _get_columns_to_fit(cls, inputs: Inputs, hyperparams: Hyperparams): # pragma: no cover + """ + Select columns to fit. + Args: + inputs: Container DataFrame + hyperparams: d3m.metadata.hyperparams.Hyperparams + + Returns: + list + """ + # print('======_get_columns_to_fit======') + + if not hyperparams['use_semantic_types']: + return inputs, list(range(len(inputs.columns))) + + inputs_metadata = inputs.metadata + + def can_produce_column(column_index: int) -> bool: + return cls._can_produce_column(inputs_metadata, column_index, hyperparams) + + columns_to_produce, columns_not_to_produce = base_utils.get_columns_to_use(inputs_metadata, + use_columns=hyperparams[ + 'use_columns'], + exclude_columns=hyperparams[ + 'exclude_columns'], + can_use_column=can_produce_column) + + return inputs.iloc[:, columns_to_produce], columns_to_produce + # return columns_to_produce + + @classmethod + def _can_produce_column(cls, inputs_metadata: metadata_base.DataMetadata, column_index: int, + hyperparams: Hyperparams) -> bool: # pragma: no cover + """ + Output whether a column can be processed. + Args: + inputs_metadata: d3m.metadata.base.DataMetadata + column_index: int + + Returns: + bool + """ + column_metadata = inputs_metadata.query((metadata_base.ALL_ELEMENTS, column_index)) + + accepted_structural_types = (int, float, numpy.integer, numpy.float64) + accepted_semantic_types = set() + accepted_semantic_types.add("https://metadata.datadrivendiscovery.org/types/Attribute") + + # print(column_metadata) + # print(column_metadata['structural_type'], accepted_structural_types) + + if not issubclass(column_metadata['structural_type'], accepted_structural_types): + return False + + semantic_types = set(column_metadata.get('semantic_types', [])) + + # print(column_metadata) + # print(semantic_types, accepted_semantic_types) + + if len(semantic_types) == 0: + cls.logger.warning("No semantic types found in column metadata") + return False + + # Making sure all accepted_semantic_types are available in semantic_types + if len(accepted_semantic_types - semantic_types) == 0: + return True + + return False + + @classmethod + def _get_target_columns_metadata(cls, outputs_metadata: metadata_base.DataMetadata, hyperparams) -> List[ + OrderedDict]: # pragma: no cover + """ + Output metadata of selected columns. + Args: + outputs_metadata: metadata_base.DataMetadata + hyperparams: d3m.metadata.hyperparams.Hyperparams + + Returns: + d3m.metadata.base.DataMetadata + """ + outputs_length = outputs_metadata.query((metadata_base.ALL_ELEMENTS,))['dimension']['length'] + + target_columns_metadata: List[OrderedDict] = [] + for column_index in range(outputs_length): + column_metadata = OrderedDict(outputs_metadata.query_column(column_index)) + + # Update semantic types and prepare it for predicted targets. + semantic_types = set(column_metadata.get('semantic_types', [])) + semantic_types_to_remove = set([]) + add_semantic_types = [] + add_semantic_types.add(hyperparams["return_semantic_type"]) + semantic_types = semantic_types - semantic_types_to_remove + semantic_types = semantic_types.union(add_semantic_types) + column_metadata['semantic_types'] = list(semantic_types) + + target_columns_metadata.append(column_metadata) + + return target_columns_metadata + + @classmethod + def _update_predictions_metadata(cls, inputs_metadata: metadata_base.DataMetadata, outputs: Optional[Outputs], + target_columns_metadata: List[OrderedDict]) -> metadata_base.DataMetadata: # pragma: no cover + """ + Updata metadata for selected columns. + Args: + inputs_metadata: metadata_base.DataMetadata + outputs: Container Dataframe + target_columns_metadata: list + + Returns: + d3m.metadata.base.DataMetadata + """ + outputs_metadata = metadata_base.DataMetadata().generate(value=outputs) + + for column_index, column_metadata in enumerate(target_columns_metadata): + column_metadata.pop("structural_type", None) + outputs_metadata = outputs_metadata.update_column(column_index, column_metadata) + + return outputs_metadata + + def _wrap_predictions(self, inputs: Inputs, predictions: ndarray) -> Outputs: # pragma: no cover + """ + Wrap predictions into dataframe + Args: + inputs: Container Dataframe + predictions: array-like data (n_samples, n_features) + + Returns: + Dataframe + """ + outputs = container.DataFrame(predictions, generate_metadata=True) + target_columns_metadata = self._copy_inputs_metadata(inputs.metadata, self._columns_to_produce, outputs.metadata, + self.hyperparams) + outputs.metadata = self._update_predictions_metadata(inputs.metadata, outputs, target_columns_metadata) + return outputs + + @classmethod + def _copy_inputs_metadata(cls, inputs_metadata: metadata_base.DataMetadata, input_indices: List[int], + outputs_metadata: metadata_base.DataMetadata, hyperparams): # pragma: no cover + """ + Updata metadata for selected columns. + Args: + inputs_metadata: metadata.base.DataMetadata + input_indices: list + outputs_metadata: metadata.base.DataMetadata + hyperparams: d3m.metadata.hyperparams.Hyperparams + + Returns: + d3m.metadata.base.DataMetadata + """ + outputs_length = outputs_metadata.query((metadata_base.ALL_ELEMENTS,))['dimension']['length'] + target_columns_metadata: List[OrderedDict] = [] + for column_index in input_indices: + column_name = inputs_metadata.query((metadata_base.ALL_ELEMENTS, column_index)).get("name") + if column_name is None: + column_name = "output_{}".format(column_index) + + column_metadata = OrderedDict(inputs_metadata.query_column(column_index)) + semantic_types = set(column_metadata.get('semantic_types', [])) + semantic_types_to_remove = set([]) + add_semantic_types = set() + add_semantic_types.add(hyperparams["return_semantic_type"]) + semantic_types = semantic_types - semantic_types_to_remove + semantic_types = semantic_types.union(add_semantic_types) + column_metadata['semantic_types'] = list(semantic_types) + # print(column_metadata['semantic_types']) + + column_metadata["name"] = str(column_name) + target_columns_metadata.append(column_metadata) + + # If outputs has more columns than index, add Attribute Type to all remaining + if outputs_length > len(input_indices): + for column_index in range(len(input_indices), outputs_length): + column_metadata = OrderedDict() + semantic_types = set() + semantic_types.add(hyperparams["return_semantic_type"]) + column_name = "output_{}".format(column_index) + column_metadata["semantic_types"] = list(semantic_types) + column_metadata["name"] = str(column_name) + target_columns_metadata.append(column_metadata) + + # print(target_columns_metadata) + return target_columns_metadata diff --git a/tods/tests/feature_analysis/test_MatrixProfile.py b/tods/tests/feature_analysis/test_MatrixProfile.py new file mode 100644 index 0000000..3958132 --- /dev/null +++ b/tods/tests/feature_analysis/test_MatrixProfile.py @@ -0,0 +1,102 @@ +import unittest + +from d3m import container, utils +from d3m.metadata import base as metadata_base +from tods.feature_analysis.MatrixProfile import MatrixProfilePrimitive + + + +class MatrixProfileTest(unittest.TestCase): + def test_basic(self): + self.maxDiff = None + main = container.DataFrame({'a': [1., 2., 3., 4., 5., 6., 7., 8., 9.], + 'b': [2., 3., 4., 5., 6., 7., 8., 9., 10.], + 'c': [3., 4., 5., 6., 7., 8., 9., 10., 11.]}, + columns=['a', 'b', 'c'], + generate_metadata=True) + + #print(main) + + + self.assertEqual(utils.to_json_structure(main.metadata.to_internal_simple_structure()), [{ + 'selector': [], + 'metadata': { + # 'top_level': 'main', + 'schema': metadata_base.CONTAINER_SCHEMA_VERSION, + 'structural_type': 'd3m.container.pandas.DataFrame', + 'semantic_types': ['https://metadata.datadrivendiscovery.org/types/Table'], + 'dimension': { + 'name': 'rows', + 'semantic_types': ['https://metadata.datadrivendiscovery.org/types/TabularRow'], + 'length': 9, + }, + }, + }, { + 'selector': ['__ALL_ELEMENTS__'], + 'metadata': { + 'dimension': { + 'name': 'columns', + 'semantic_types': ['https://metadata.datadrivendiscovery.org/types/TabularColumn'], + 'length': 3, + }, + }, + }, { + 'selector': ['__ALL_ELEMENTS__', 0], + 'metadata': {'structural_type': 'numpy.float64', 'name': 'a'}, + }, { + 'selector': ['__ALL_ELEMENTS__', 1], + 'metadata': {'structural_type': 'numpy.float64', 'name': 'b'}, + }, { + 'selector': ['__ALL_ELEMENTS__', 2], + 'metadata': {'structural_type': 'numpy.float64', 'name': 'c'} + }]) + + + self.assertIsInstance(main, container.DataFrame) + + + hyperparams_class = MatrixProfilePrimitive.metadata.get_hyperparams() + hyperparams = hyperparams_class.defaults() + hyperparams = hyperparams.replace({'window_size': 3}) + #print(type(main)) + primitive = MatrixProfilePrimitive(hyperparams=hyperparams) + new_main = primitive.produce(inputs=main).value + print(new_main) + + + self.assertEqual(utils.to_json_structure(main.metadata.to_internal_simple_structure()), [{ + 'selector': [], + 'metadata': { + # 'top_level': 'main', + 'schema': metadata_base.CONTAINER_SCHEMA_VERSION, + 'structural_type': 'd3m.container.pandas.DataFrame', + 'semantic_types': ['https://metadata.datadrivendiscovery.org/types/Table'], + 'dimension': { + 'name': 'rows', + 'semantic_types': ['https://metadata.datadrivendiscovery.org/types/TabularRow'], + 'length': 9, + }, + }, + }, { + 'selector': ['__ALL_ELEMENTS__'], + 'metadata': { + 'dimension': { + 'name': 'columns', + 'semantic_types': ['https://metadata.datadrivendiscovery.org/types/TabularColumn'], + 'length': 3, + }, + }, + }, { + 'selector': ['__ALL_ELEMENTS__', 0], + 'metadata': {'structural_type': 'numpy.float64', 'name': 'a'}, + }, { + 'selector': ['__ALL_ELEMENTS__', 1], + 'metadata': {'structural_type': 'numpy.float64', 'name': 'b'}, + }, { + 'selector': ['__ALL_ELEMENTS__', 2], + 'metadata': {'structural_type': 'numpy.float64', 'name': 'c'} + }]) + + +if __name__ == '__main__': + unittest.main() diff --git a/tods/tests/sk_interface/detection_algorithm/test_ski_MatrixProfile.py b/tods/tests/sk_interface/detection_algorithm/test_ski_MatrixProfile.py index df5e84d..13997a7 100644 --- a/tods/tests/sk_interface/detection_algorithm/test_ski_MatrixProfile.py +++ b/tods/tests/sk_interface/detection_algorithm/test_ski_MatrixProfile.py @@ -40,6 +40,7 @@ class MatrixProfileSKI_TestCase(unittest.TestCase): def test_prediction_score(self): pred_scores = self.transformer.predict_score(self.X_test) assert_equal(pred_scores.shape[0], self.y_test.shape[0]) + pred_scores = pred_scores[:,0] assert_greater_equal(roc_auc_score(self.y_test, pred_scores), self.roc_floor)