diff --git a/tods/timeseries_processing/SubsequenceSegmentation.py b/tods/timeseries_processing/SubsequenceSegmentation.py deleted file mode 100644 index 0f38594..0000000 --- a/tods/timeseries_processing/SubsequenceSegmentation.py +++ /dev/null @@ -1,477 +0,0 @@ -from typing import Any, Callable, List, Dict, Union, Optional, Sequence, Tuple -from numpy import ndarray -from collections import OrderedDict -from scipy import sparse -from sklearn.utils import check_array -import numpy as np -import typing -import time -import pandas as pd -import uuid - -from d3m import container -from d3m.primitive_interfaces import base, transformer -from d3m.metadata import base as metadata_base, hyperparams - -from d3m.container.numpy import ndarray as d3m_ndarray -from d3m.container import DataFrame as d3m_dataframe -from d3m.metadata import hyperparams, params, base as metadata_base -from d3m import utils -from d3m.base import utils as base_utils -from d3m.exceptions import PrimitiveNotFittedError -from d3m.primitive_interfaces.base import CallResult, DockerContainer - - -__all__ = ('SubsequenceSegmentationPrimitive',) - -Inputs = container.DataFrame -Outputs = container.DataFrame - - -class Hyperparams(hyperparams.Hyperparams): - # Tuning - window_size = hyperparams.Hyperparameter[int]( - default=1, - semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter'], - description="The moving window size.", - ) - step = hyperparams.Hyperparameter[int]( - default=1, - semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter'], - description="The displacement for moving window.", - ) - # return_numpy = hyperparams.UniformBool( - # default=True, - # semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], - # description="If True, return the data format in 3d numpy array." - # ) - # flatten = hyperparams.UniformBool( - # default=True, - # semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], - # description="If True, flatten the returned array in 2d." - # ) - flatten_order= hyperparams.Enumeration( - values=['C', 'F', 'A'], - default='F', - semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], - description="Decide the order of the flatten for multivarite sequences." - ) - - - # Control - columns_using_method= hyperparams.Enumeration( - values=['name', 'index'], - default='index', - semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], - description="Choose to use columns by names or indecies. If 'name', \"use_columns\" or \"exclude_columns\" is used. If 'index', \"use_columns_name\" or \"exclude_columns_name\" is used." - ) - use_columns_name = hyperparams.Set( - elements=hyperparams.Hyperparameter[str](''), - default=(), - semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], - description="A set of column names to force primitive to operate on. If any specified column cannot be parsed, it is skipped.", - ) - exclude_columns_name = hyperparams.Set( - elements=hyperparams.Hyperparameter[str](''), - default=(), - semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], - description="A set of column names to not operate on. Applicable only if \"use_columns_name\" is not provided.", - ) - use_columns = hyperparams.Set( - elements=hyperparams.Hyperparameter[int](-1), - default=(), - semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], - description="A set of column indices to force primitive to operate on. If any specified column cannot be parsed, it is skipped.", - ) - exclude_columns = hyperparams.Set( - elements=hyperparams.Hyperparameter[int](-1), - default=(), - semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], - description="A set of column indices to not operate on. Applicable only if \"use_columns\" is not provided.", - ) - return_result = hyperparams.Enumeration( - values=['append', 'replace', 'new'], - default='replace', - semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], - description="Should parsed columns be appended, should they replace original columns, or should only parsed columns be returned? This hyperparam is ignored if use_semantic_types is set to false.", - ) - use_semantic_types = hyperparams.UniformBool( - default=False, - semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], - description="Controls whether semantic_types metadata will be used for filtering columns in input dataframe. Setting this to false makes the code ignore return_result and will produce only the output dataframe" - ) - add_index_columns = hyperparams.UniformBool( - default=False, - semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], - description="Also include primary index columns if input data has them. Applicable only if \"return_result\" is set to \"new\".", - ) - error_on_no_input = hyperparams.UniformBool( - default=True, - semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], - description="Throw an exception if no input column is selected/provided. Defaults to true to behave like sklearn. To prevent pipelines from breaking set this to False.", - ) - - return_semantic_type = hyperparams.Enumeration[str]( - values=['https://metadata.datadrivendiscovery.org/types/Attribute', 'https://metadata.datadrivendiscovery.org/types/ConstructedAttribute'], - default='https://metadata.datadrivendiscovery.org/types/Attribute', - description='Decides what semantic type to attach to generated attributes', - semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'] - ) - - -class SubsequenceSegmentationPrimitive(transformer.TransformerPrimitiveBase[Inputs, Outputs, Hyperparams]): - """ - Subsequence Time Seires Segmentation. - - Parameters - ---------- - window_size : int - The moving window size. - - step : int, optional (default=1) - The displacement for moving window. - - # return_numpy : bool, optional (default=True) - # If True, return the data format in 3d numpy array. - - # flatten : bool, optional (default=True) - # If True, flatten the returned array in 2d. - - flatten_order : str, optional (default='F') - Decide the order of the flatten for multivarite sequences. - ‘C’ means to flatten in row-major (C-style) order. - ‘F’ means to flatten in column-major (Fortran- style) order. - ‘A’ means to flatten in column-major order if a is Fortran contiguous in memory, - row-major order otherwise. ‘K’ means to flatten a in the order the elements occur in memory. - The default is ‘F’. - - use_columns: Set - A set of column indices to force primitive to operate on. If any specified column cannot be parsed, it is skipped. - - exclude_columns: Set - A set of column indices to not operate on. Applicable only if \"use_columns\" is not provided. - - return_result: Enumeration - Should parsed columns be appended, should they replace original columns, or should only parsed columns be returned? This hyperparam is ignored if use_semantic_types is set to false. - - use_semantic_types: Bool - Controls whether semantic_types metadata will be used for filtering columns in input dataframe. Setting this to false makes the code ignore return_result and will produce only the output dataframe. - - add_index_columns: Bool - Also include primary index columns if input data has them. Applicable only if \"return_result\" is set to \"new\". - - error_on_no_input: Bool( - Throw an exception if no input column is selected/provided. Defaults to true to behave like sklearn. To prevent pipelines from breaking set this to False. - - return_semantic_type: Enumeration[str]( - Decides what semantic type to attach to generated attributes' - """ - - metadata = metadata_base.PrimitiveMetadata({ - "__author__": "DATA Lab @ Texas A&M University", - "name": "Subsequence Segmentation Primitive", - "python_path": "d3m.primitives.tods.timeseries_processing.subsequence_segmentation", - "source": { - 'name': 'DATA Lab @ Texas A&M University', - 'contact': 'mailto:khlai037@tamu.edu', - }, - "algorithm_types": [ - metadata_base.PrimitiveAlgorithmType.TODS_PRIMITIVE, - ], - "primitive_family": metadata_base.PrimitiveFamily.DATA_PREPROCESSING, - 'id': str(uuid.uuid3(uuid.NAMESPACE_DNS, 'SubsequenceSegmentationPrimitive')), - "hyperparams_to_tune": ['window_size', 'step', 'flatten_order'], - "version": "0.0.1", - }) - - - def produce(self, *, inputs: Inputs, timeout: float = None, iterations: int = None) -> CallResult[Outputs]: - """ - Process the testing data. - Args: - inputs: Container DataFrame. - - Returns: - Container DataFrame after BKFilter. - """ - # Get cols to fit. - self._fitted = False - self._training_inputs, self._training_indices = self._get_columns_to_fit(inputs, self.hyperparams) - self._input_column_names = self._training_inputs.columns - - - if len(self._training_indices) > 0: - # self._clf.fit(self._training_inputs) - self._fitted = True - else: # pragma: no cover - if self.hyperparams['error_on_no_input']: - raise RuntimeError("No input columns were selected") - self.logger.warn("No input columns were selected") - - - - if not self._fitted: # pragma: no cover - raise PrimitiveNotFittedError("Primitive not fitted.") - sk_inputs = inputs - - if self.hyperparams['use_semantic_types']: # pragma: no cover - sk_inputs = inputs.iloc[:, self._training_indices] - output_columns = [] - if len(self._training_indices) > 0: - sk_output = self._get_sub_matrices(sk_inputs, - window_size=self.hyperparams['window_size'], - step=self.hyperparams['step'], - flatten_order=self.hyperparams['flatten_order']) - if sparse.issparse(sk_output): - sk_output = sk_output.toarray() - - outputs = self._wrap_predictions(inputs, sk_output) - - if len(outputs.columns) == len(self._input_column_names): - outputs.columns = self._input_column_names - output_columns = [outputs] - - else: # pragma: no cover - if self.hyperparams['error_on_no_input']: - raise RuntimeError("No input columns were selected") - self.logger.warn("No input columns were selected") - - # outputs = base_utils.combine_columns(return_result=self.hyperparams['return_result'], - # add_index_columns=self.hyperparams['add_index_columns'], - # inputs=inputs, column_indices=self._training_indices, - # columns_list=output_columns) - - print(outputs.shape) - return CallResult(outputs) - - - @classmethod - def _get_columns_to_fit(cls, inputs: Inputs, hyperparams: Hyperparams): - """ - Select columns to fit. - Args: - inputs: Container DataFrame - hyperparams: d3m.metadata.hyperparams.Hyperparams - - Returns: - list - """ - if not hyperparams['use_semantic_types']: - return inputs, list(range(len(inputs.columns))) - - inputs_metadata = inputs.metadata - - def can_produce_column(column_index: int) -> bool: # pragma: no cover - return cls._can_produce_column(inputs_metadata, column_index, hyperparams) - - use_columns = [] - exclude_columns = [] - - # if hyperparams['columns_using_method'] == 'name': - # inputs_cols = inputs.columns.values.tolist() - # for i in range(len(inputs_cols)): - # if inputs_cols[i] in hyperparams['use_columns_name']: - # use_columns.append(i) - # elif inputs_cols[i] in hyperparams['exclude_columns_name']: - # exclude_columns.append(i) - # else: - use_columns=hyperparams['use_columns'] - exclude_columns=hyperparams['exclude_columns'] - - columns_to_produce, columns_not_to_produce = base_utils.get_columns_to_use(inputs_metadata, use_columns=use_columns, exclude_columns=exclude_columns, can_use_column=can_produce_column) - return inputs.iloc[:, columns_to_produce], columns_to_produce - # return columns_to_produce - - @classmethod - def _can_produce_column(cls, inputs_metadata: metadata_base.DataMetadata, column_index: int, hyperparams: Hyperparams) -> bool: # pragma: no cover - """ - Output whether a column can be processed. - Args: - inputs_metadata: d3m.metadata.base.DataMetadata - column_index: int - - Returns: - boolnp - """ - column_metadata = inputs_metadata.query((metadata_base.ALL_ELEMENTS, column_index)) - - accepted_structural_types = (int, float, np.integer, np.float64) - accepted_semantic_types = set() - accepted_semantic_types.add("https://metadata.datadrivendiscovery.org/types/Attribute") - if not issubclass(column_metadata['structural_type'], accepted_structural_types): - return False - - semantic_types = set(column_metadata.get('semantic_types', [])) - - if len(semantic_types) == 0: - cls.logger.warning("No semantic types found in column metadata") - return False - - # Making sure all accepted_semantic_types are available in semantic_types - if len(accepted_semantic_types - semantic_types) == 0: - return True - - return False - - - @classmethod - def _update_predictions_metadata(cls, inputs_metadata: metadata_base.DataMetadata, outputs: Optional[Outputs], - target_columns_metadata: List[OrderedDict]) -> metadata_base.DataMetadata: # pragma: no cover - """ - Updata metadata for selected columns. - Args: - inputs_metadata: metadata_base.DataMetadata - outputs: Container Dataframe - target_columns_metadata: list - - Returns: - d3m.metadata.base.DataMetadata - """ - outputs_metadata = metadata_base.DataMetadata().generate(value=outputs) - - for column_index, column_metadata in enumerate(target_columns_metadata): - column_metadata.pop("structural_type", None) - outputs_metadata = outputs_metadata.update_column(column_index, column_metadata) - - return outputs_metadata - - def _wrap_predictions(self, inputs: Inputs, predictions: ndarray) -> Outputs: # pragma: no cover - """ - Wrap predictions into dataframe - Args: - inputs: Container Dataframe - predictions: array-like data (n_samples, n_features) - - Returns: - Dataframe - """ - outputs = d3m_dataframe(predictions, generate_metadata=True) - target_columns_metadata = self._add_target_columns_metadata(outputs.metadata, self.hyperparams) - outputs.metadata = self._update_predictions_metadata(inputs.metadata, outputs, target_columns_metadata) - return outputs - - - @classmethod - def _add_target_columns_metadata(cls, outputs_metadata: metadata_base.DataMetadata, hyperparams): # pragma: no cover - """ - Add target columns metadata - Args: - outputs_metadata: metadata.base.DataMetadata - hyperparams: d3m.metadata.hyperparams.Hyperparams - - Returns: - List[OrderedDict] - """ - outputs_length = outputs_metadata.query((metadata_base.ALL_ELEMENTS,))['dimension']['length'] - target_columns_metadata: List[OrderedDict] = [] - for column_index in range(outputs_length): - column_name = "output_{}".format(column_index) - column_metadata = OrderedDict() - semantic_types = set() - semantic_types.add(hyperparams["return_semantic_type"]) - column_metadata['semantic_types'] = list(semantic_types) - - column_metadata["name"] = str(column_name) - target_columns_metadata.append(column_metadata) - - return target_columns_metadata - - def _write(self, inputs:Inputs): # pragma: no cover - inputs.to_csv(str(time.time())+'.csv') - - def _get_sub_sequences_length(self, n_samples, window_size, step): # pragma: no cover - """Pseudo chop a univariate time series into sub sequences. Return valid - length only. - Parameters - ---------- - X : numpy array of shape (n_samples,) - The input samples. - window_size : int - The moving window size. - step_size : int, optional (default=1) - The displacement for moving window. - Returns - ------- - valid_len : int - The number of subsequences. - - """ - valid_len = int(np.floor((n_samples - window_size) / step)) + 1 - return valid_len - - - def _get_sub_matrices(self, X, window_size, step=1, flatten_order='F'): # pragma: no cover - """ - Chop a multivariate time series into sub sequences (matrices). - Parameters - ---------- - X : numpy array of shape (n_samples,) - The input samples. - window_size : int - The moving window size. - step : int, optional (default=1) - The displacement for moving window. - - return_numpy : bool, optional (default=True) - If True, return the data format in 3d numpy array. - flatten : bool, optional (default=True) - If True, flatten the returned array in 2d. - - flatten_order : str, optional (default='F') - Decide the order of the flatten for multivarite sequences. - ‘C’ means to flatten in row-major (C-style) order. - ‘F’ means to flatten in column-major (Fortran- style) order. - ‘A’ means to flatten in column-major order if a is Fortran contiguous in memory, - row-major order otherwise. ‘K’ means to flatten a in the order the elements occur in memory. - The default is ‘F’. - Returns - ------- - X_sub : numpy array of shape (valid_len, window_size*n_sequences) - The numpy matrix with each row stands for a flattend submatrix. - """ - X = check_array(X).astype(np.float) - n_samples, n_sequences = X.shape[0], X.shape[1] - - # get the valid length - valid_len = self._get_sub_sequences_length(n_samples, window_size, step) - - X_sub = [] - X_left_inds = [] - X_right_inds = [] - - # exclude the edge - steps = list(range(0, n_samples, step)) - steps = steps[:valid_len] - - # print(n_samples, n_sequences) - for idx, i in enumerate(steps): - X_sub.append(X[i: i + window_size, :]) - X_left_inds.append(i) - X_right_inds.append(i + window_size) - - X_sub = np.asarray(X_sub) - - # if return_numpy: - # if flatten: - - temp_array = np.zeros([valid_len, window_size * n_sequences]) - if flatten_order == 'C': - for i in range(valid_len): - temp_array[i, :] = X_sub[i, :, :].flatten(order='C') - - else: - for i in range(valid_len): - temp_array[i, :] = X_sub[i, :, :].flatten(order='F') - - print("temp_array", temp_array.shape) - return temp_array #, np.asarray(X_left_inds), np.asarray(X_right_inds) - - # else: - # return np.asarray(X_sub), np.asarray(X_left_inds), np.asarray(X_right_inds) - # else: - # return X_sub, np.asarray(X_left_inds), np.asarray(X_right_inds) - - - - -