Browse Source

SubsequenceClustering

Former-commit-id: 58d4ede849 [formerly 8d592aac99] [formerly 922bd035eb [formerly dcf9930f6d]] [formerly 8ec008e084 [formerly 07e88a77c9] [formerly 6e46824dd9 [formerly 2c1dd8069d]]] [formerly 63b75e6a26 [formerly 062d738cdc] [formerly 642c71d15c [formerly 7bc1581919]] [formerly 52d8e58df8 [formerly b0bc94290d] [formerly bd767f40c7 [formerly 8dde23d0c3]]]] [formerly 7b9edf3f3d [formerly 6acdf83c7e] [formerly 536db956ff [formerly eae83d189a]] [formerly fa6a98411c [formerly 4236cc45f1] [formerly f0d83967a5 [formerly b273acc8c7]]] [formerly 4e46f41d78 [formerly 0821e9f8dc] [formerly 6f6ac2d3d1 [formerly 8c1df2bdb8]] [formerly ef8675d17a [formerly 5914ec24dc] [formerly 28344b0665 [formerly 237b1f7c7e]]]]] [formerly c0443d9abc [formerly 51f728abcd] [formerly 33e80eb165 [formerly 79ecacb6d4]] [formerly c55e1c6ab0 [formerly 5bf354c3fd] [formerly a41e547dea [formerly 3e703e6fff]]] [formerly 61183c145e [formerly 62edaa7785] [formerly e45c529a41 [formerly e114005809]] [formerly 16c61ace93 [formerly 231f2ec490] [formerly e71c23985f [formerly bcef76e011]]]] [formerly f3f328cc7d [formerly fbd84dc8b1] [formerly fe48841ec8 [formerly b9de0ce815]] [formerly 754bd03d47 [formerly 6a50760914] [formerly b9f1215274 [formerly 8af036e904]]] [formerly 1e864459d9 [formerly d36baba471] [formerly dbf76bc1d4 [formerly 97ea7bb294]] [formerly d0d311bce6 [formerly 6a0e5dcb0b] [formerly 6a7555493a [formerly 4f5b8dcf66]]]]]]
Former-commit-id: 4b0d79180b [formerly e3a754851d] [formerly 13da9aed20 [formerly 91c042d8ca]] [formerly 552fe1b8b8 [formerly 77a8c2416c] [formerly 7eae9a55e2 [formerly 71d14595e4]]] [formerly 97fea7599c [formerly e6f6f69edf] [formerly 9d1e702006 [formerly b86e653154]] [formerly b619d5e95d [formerly 7786af0e8f] [formerly 2caf358c19 [formerly b8e3330dcc]]]] [formerly 6eea285308 [formerly bc06ab6493] [formerly db6ab38d59 [formerly ced489c244]] [formerly c9f155159a [formerly 3231c93249] [formerly e417ff7363 [formerly 474b8ca7fe]]] [formerly c0bb265a25 [formerly f8d9097806] [formerly 22cb28de52 [formerly 3588629a7e]] [formerly d802ab0c05 [formerly 4b8980042d] [formerly 6a7555493a]]]]
Former-commit-id: 5418ca8391 [formerly 0132666eea] [formerly ab2f45aa24 [formerly 833cf5d7c5]] [formerly aa8ccded4c [formerly f493591837] [formerly fa310e35c2 [formerly 5e7158dab0]]] [formerly 6fff324872 [formerly 86639a7d69] [formerly b2e79d96a8 [formerly 6dde3f5609]] [formerly 3b26400dc9 [formerly 27e24b8dfd] [formerly 6c64bcc633 [formerly e580b4ac9b]]]]
Former-commit-id: f7b783128b [formerly 592aaa1f8b] [formerly d46f3b35bb [formerly 5636fe16cc]] [formerly 241ca0d727 [formerly 2ad44e15df] [formerly d18e944336 [formerly 0465122443]]]
Former-commit-id: fe8495b638 [formerly 4ea90d030a] [formerly b3074b0c25 [formerly 54c240a592]]
Former-commit-id: dc06cfd09f [formerly 1d64b4e9d4]
Former-commit-id: 7bf10d5fa3
master
jamielxu GitHub 4 years ago
parent
commit
61e0bb010a
1 changed files with 473 additions and 0 deletions
  1. +473
    -0
      tods/timeseries_processing/SubsequenceClustering.py

+ 473
- 0
tods/timeseries_processing/SubsequenceClustering.py View File

@@ -0,0 +1,473 @@
from typing import Any, Callable, List, Dict, Union, Optional, Sequence, Tuple
from numpy import ndarray
from collections import OrderedDict
from scipy import sparse
from sklearn.utils import check_array
import numpy as np
import typing
import time
import pandas as pd

from d3m import container
from d3m.primitive_interfaces import base, transformer
from d3m.metadata import base as metadata_base, hyperparams

from d3m.container.numpy import ndarray as d3m_ndarray
from d3m.container import DataFrame as d3m_dataframe
from d3m.metadata import hyperparams, params, base as metadata_base
from d3m import utils
from d3m.base import utils as base_utils
from d3m.exceptions import PrimitiveNotFittedError
from d3m.primitive_interfaces.base import CallResult, DockerContainer


__all__ = ('SubsequenceClustering',)

Inputs = container.DataFrame
Outputs = container.DataFrame


class Hyperparams(hyperparams.Hyperparams):
# Tuning
window_size = hyperparams.Hyperparameter[int](
default=1,
semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter'],
description="The moving window size.",
)
step = hyperparams.Hyperparameter[int](
default=1,
semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter'],
description="The displacement for moving window.",
)
# return_numpy = hyperparams.UniformBool(
# default=True,
# semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
# description="If True, return the data format in 3d numpy array."
# )
# flatten = hyperparams.UniformBool(
# default=True,
# semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
# description="If True, flatten the returned array in 2d."
# )
flatten_order= hyperparams.Enumeration(
values=['C', 'F', 'A'],
default='F',
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Decide the order of the flatten for multivarite sequences."
)


# Control
columns_using_method= hyperparams.Enumeration(
values=['name', 'index'],
default='index',
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Choose to use columns by names or indecies. If 'name', \"use_columns\" or \"exclude_columns\" is used. If 'index', \"use_columns_name\" or \"exclude_columns_name\" is used."
)
use_columns_name = hyperparams.Set(
elements=hyperparams.Hyperparameter[str](''),
default=(),
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="A set of column names to force primitive to operate on. If any specified column cannot be parsed, it is skipped.",
)
exclude_columns_name = hyperparams.Set(
elements=hyperparams.Hyperparameter[str](''),
default=(),
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="A set of column names to not operate on. Applicable only if \"use_columns_name\" is not provided.",
)
use_columns = hyperparams.Set(
elements=hyperparams.Hyperparameter[int](-1),
default=(),
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="A set of column indices to force primitive to operate on. If any specified column cannot be parsed, it is skipped.",
)
exclude_columns = hyperparams.Set(
elements=hyperparams.Hyperparameter[int](-1),
default=(),
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="A set of column indices to not operate on. Applicable only if \"use_columns\" is not provided.",
)
return_result = hyperparams.Enumeration(
values=['append', 'replace', 'new'],
default='replace',
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Should parsed columns be appended, should they replace original columns, or should only parsed columns be returned? This hyperparam is ignored if use_semantic_types is set to false.",
)
use_semantic_types = hyperparams.UniformBool(
default=False,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Controls whether semantic_types metadata will be used for filtering columns in input dataframe. Setting this to false makes the code ignore return_result and will produce only the output dataframe"
)
add_index_columns = hyperparams.UniformBool(
default=False,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Also include primary index columns if input data has them. Applicable only if \"return_result\" is set to \"new\".",
)
error_on_no_input = hyperparams.UniformBool(
default=True,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Throw an exception if no input column is selected/provided. Defaults to true to behave like sklearn. To prevent pipelines from breaking set this to False.",
)
return_semantic_type = hyperparams.Enumeration[str](
values=['https://metadata.datadrivendiscovery.org/types/Attribute', 'https://metadata.datadrivendiscovery.org/types/ConstructedAttribute'],
default='https://metadata.datadrivendiscovery.org/types/Attribute',
description='Decides what semantic type to attach to generated attributes',
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter']
)

class SubsequenceClustering(transformer.TransformerPrimitiveBase[Inputs, Outputs, Hyperparams]):
"""
Subsequence Time Seires Clustering.

Parameters
----------
window_size : int
The moving window size.

step : int, optional (default=1)
The displacement for moving window.

# return_numpy : bool, optional (default=True)
# If True, return the data format in 3d numpy array.

# flatten : bool, optional (default=True)
# If True, flatten the returned array in 2d.

flatten_order : str, optional (default='F')
Decide the order of the flatten for multivarite sequences.
‘C’ means to flatten in row-major (C-style) order.
‘F’ means to flatten in column-major (Fortran- style) order.
‘A’ means to flatten in column-major order if a is Fortran contiguous in memory,
row-major order otherwise. ‘K’ means to flatten a in the order the elements occur in memory.
The default is ‘F’.

use_columns: Set
A set of column indices to force primitive to operate on. If any specified column cannot be parsed, it is skipped.
exclude_columns: Set
A set of column indices to not operate on. Applicable only if \"use_columns\" is not provided.
return_result: Enumeration
Should parsed columns be appended, should they replace original columns, or should only parsed columns be returned? This hyperparam is ignored if use_semantic_types is set to false.
use_semantic_types: Bool
Controls whether semantic_types metadata will be used for filtering columns in input dataframe. Setting this to false makes the code ignore return_result and will produce only the output dataframe.
add_index_columns: Bool
Also include primary index columns if input data has them. Applicable only if \"return_result\" is set to \"new\".
error_on_no_input: Bool(
Throw an exception if no input column is selected/provided. Defaults to true to behave like sklearn. To prevent pipelines from breaking set this to False.
return_semantic_type: Enumeration[str](
Decides what semantic type to attach to generated attributes'
"""

__author__: "DATA Lab at Texas A&M University"
metadata = metadata_base.PrimitiveMetadata({
"name": "Subsequence Clustering Primitive",
"python_path": "d3m.primitives.tods.timeseries_processing.subsequence_clustering",
"source": {'name': 'DATA Lab at Texas A&M University', 'contact': 'mailto:khlai037@tamu.edu',
'uris': ['https://gitlab.com/lhenry15/tods.git', ]},
"algorithm_types": [metadata_base.PrimitiveAlgorithmType.BK_FILTER,],
"primitive_family": metadata_base.PrimitiveFamily.DATA_PREPROCESSING,
"id": "cf0bd4c1-9e09-4471-a2a3-6956deed17ac",
"hyperparams_to_tune": ['window_size', 'step', 'flatten_order'],
"version": "0.0.1",
})


def produce(self, *, inputs: Inputs, timeout: float = None, iterations: int = None) -> CallResult[Outputs]:
"""
Process the testing data.
Args:
inputs: Container DataFrame.

Returns:
Container DataFrame after BKFilter.
"""
# Get cols to fit.
self._fitted = False
self._training_inputs, self._training_indices = self._get_columns_to_fit(inputs, self.hyperparams)
self._input_column_names = self._training_inputs.columns


if len(self._training_indices) > 0:
# self._clf.fit(self._training_inputs)
self._fitted = True
else:
if self.hyperparams['error_on_no_input']:
raise RuntimeError("No input columns were selected")
self.logger.warn("No input columns were selected")



if not self._fitted:
raise PrimitiveNotFittedError("Primitive not fitted.")
sk_inputs = inputs

if self.hyperparams['use_semantic_types']:
sk_inputs = inputs.iloc[:, self._training_indices]
output_columns = []
if len(self._training_indices) > 0:
sk_output = self._get_sub_matrices(sk_inputs,
window_size=self.hyperparams['window_size'],
step=self.hyperparams['step'],
flatten_order=self.hyperparams['flatten_order'])
if sparse.issparse(sk_output):
sk_output = sk_output.toarray()

outputs = self._wrap_predictions(inputs, sk_output)

if len(outputs.columns) == len(self._input_column_names):
outputs.columns = self._input_column_names
output_columns = [outputs]
else:
if self.hyperparams['error_on_no_input']:
raise RuntimeError("No input columns were selected")
self.logger.warn("No input columns were selected")

# outputs = base_utils.combine_columns(return_result=self.hyperparams['return_result'],
# add_index_columns=self.hyperparams['add_index_columns'],
# inputs=inputs, column_indices=self._training_indices,
# columns_list=output_columns)
print(outputs.shape)
self._write(outputs)
return CallResult(outputs)
@classmethod
def _get_columns_to_fit(cls, inputs: Inputs, hyperparams: Hyperparams):
"""
Select columns to fit.
Args:
inputs: Container DataFrame
hyperparams: d3m.metadata.hyperparams.Hyperparams

Returns:
list
"""
if not hyperparams['use_semantic_types']:
return inputs, list(range(len(inputs.columns)))

inputs_metadata = inputs.metadata

def can_produce_column(column_index: int) -> bool:
return cls._can_produce_column(inputs_metadata, column_index, hyperparams)

use_columns = []
exclude_columns = []

# if hyperparams['columns_using_method'] == 'name':
# inputs_cols = inputs.columns.values.tolist()
# for i in range(len(inputs_cols)):
# if inputs_cols[i] in hyperparams['use_columns_name']:
# use_columns.append(i)
# elif inputs_cols[i] in hyperparams['exclude_columns_name']:
# exclude_columns.append(i)
# else:
use_columns=hyperparams['use_columns']
exclude_columns=hyperparams['exclude_columns']
columns_to_produce, columns_not_to_produce = base_utils.get_columns_to_use(inputs_metadata, use_columns=use_columns, exclude_columns=exclude_columns, can_use_column=can_produce_column)
return inputs.iloc[:, columns_to_produce], columns_to_produce
# return columns_to_produce

@classmethod
def _can_produce_column(cls, inputs_metadata: metadata_base.DataMetadata, column_index: int, hyperparams: Hyperparams) -> bool:
"""
Output whether a column can be processed.
Args:
inputs_metadata: d3m.metadata.base.DataMetadata
column_index: int

Returns:
boolnp
"""
column_metadata = inputs_metadata.query((metadata_base.ALL_ELEMENTS, column_index))

accepted_structural_types = (int, float, np.integer, np.float64)
accepted_semantic_types = set()
accepted_semantic_types.add("https://metadata.datadrivendiscovery.org/types/Attribute")
if not issubclass(column_metadata['structural_type'], accepted_structural_types):
return False

semantic_types = set(column_metadata.get('semantic_types', []))

if len(semantic_types) == 0:
cls.logger.warning("No semantic types found in column metadata")
return False
# Making sure all accepted_semantic_types are available in semantic_types
if len(accepted_semantic_types - semantic_types) == 0:
return True

return False
@classmethod
def _update_predictions_metadata(cls, inputs_metadata: metadata_base.DataMetadata, outputs: Optional[Outputs],
target_columns_metadata: List[OrderedDict]) -> metadata_base.DataMetadata:
"""
Updata metadata for selected columns.
Args:
inputs_metadata: metadata_base.DataMetadata
outputs: Container Dataframe
target_columns_metadata: list

Returns:
d3m.metadata.base.DataMetadata
"""
outputs_metadata = metadata_base.DataMetadata().generate(value=outputs)

for column_index, column_metadata in enumerate(target_columns_metadata):
column_metadata.pop("structural_type", None)
outputs_metadata = outputs_metadata.update_column(column_index, column_metadata)

return outputs_metadata

def _wrap_predictions(self, inputs: Inputs, predictions: ndarray) -> Outputs:
"""
Wrap predictions into dataframe
Args:
inputs: Container Dataframe
predictions: array-like data (n_samples, n_features)

Returns:
Dataframe
"""
outputs = d3m_dataframe(predictions, generate_metadata=True)
target_columns_metadata = self._add_target_columns_metadata(outputs.metadata, self.hyperparams)
outputs.metadata = self._update_predictions_metadata(inputs.metadata, outputs, target_columns_metadata)
return outputs


@classmethod
def _add_target_columns_metadata(cls, outputs_metadata: metadata_base.DataMetadata, hyperparams):
"""
Add target columns metadata
Args:
outputs_metadata: metadata.base.DataMetadata
hyperparams: d3m.metadata.hyperparams.Hyperparams

Returns:
List[OrderedDict]
"""
outputs_length = outputs_metadata.query((metadata_base.ALL_ELEMENTS,))['dimension']['length']
target_columns_metadata: List[OrderedDict] = []
for column_index in range(outputs_length):
column_name = "output_{}".format(column_index)
column_metadata = OrderedDict()
semantic_types = set()
semantic_types.add(hyperparams["return_semantic_type"])
column_metadata['semantic_types'] = list(semantic_types)

column_metadata["name"] = str(column_name)
target_columns_metadata.append(column_metadata)

return target_columns_metadata

def _write(self, inputs:Inputs):
inputs.to_csv(str(time.time())+'.csv')

def _get_sub_sequences_length(self, n_samples, window_size, step):
"""Pseudo chop a univariate time series into sub sequences. Return valid
length only.
Parameters
----------
X : numpy array of shape (n_samples,)
The input samples.
window_size : int
The moving window size.
step_size : int, optional (default=1)
The displacement for moving window.
Returns
-------
valid_len : int
The number of subsequences.

"""
valid_len = int(np.floor((n_samples - window_size) / step)) + 1
return valid_len


def _get_sub_matrices(self, X, window_size, step=1, flatten_order='F'):
"""
Chop a multivariate time series into sub sequences (matrices).
Parameters
----------
X : numpy array of shape (n_samples,)
The input samples.
window_size : int
The moving window size.
step : int, optional (default=1)
The displacement for moving window.

return_numpy : bool, optional (default=True)
If True, return the data format in 3d numpy array.
flatten : bool, optional (default=True)
If True, flatten the returned array in 2d.

flatten_order : str, optional (default='F')
Decide the order of the flatten for multivarite sequences.
‘C’ means to flatten in row-major (C-style) order.
‘F’ means to flatten in column-major (Fortran- style) order.
‘A’ means to flatten in column-major order if a is Fortran contiguous in memory,
row-major order otherwise. ‘K’ means to flatten a in the order the elements occur in memory.
The default is ‘F’.
Returns
-------
X_sub : numpy array of shape (valid_len, window_size*n_sequences)
The numpy matrix with each row stands for a flattend submatrix.
"""
X = check_array(X).astype(np.float)
n_samples, n_sequences = X.shape[0], X.shape[1]

# get the valid length
valid_len = self._get_sub_sequences_length(n_samples, window_size, step)

X_sub = []
X_left_inds = []
X_right_inds = []

# exclude the edge
steps = list(range(0, n_samples, step))
steps = steps[:valid_len]

# print(n_samples, n_sequences)
for idx, i in enumerate(steps):
X_sub.append(X[i: i + window_size, :])
X_left_inds.append(i)
X_right_inds.append(i + window_size)

X_sub = np.asarray(X_sub)

# if return_numpy:
# if flatten:

temp_array = np.zeros([valid_len, window_size * n_sequences])
if flatten_order == 'C':
for i in range(valid_len):
temp_array[i, :] = X_sub[i, :, :].flatten(order='C')

else:
for i in range(valid_len):
temp_array[i, :] = X_sub[i, :, :].flatten(order='F')
print("temp_array", temp_array.shape)
return temp_array #, np.asarray(X_left_inds), np.asarray(X_right_inds)

# else:
# return np.asarray(X_sub), np.asarray(X_left_inds), np.asarray(X_right_inds)
# else:
# return X_sub, np.asarray(X_left_inds), np.asarray(X_right_inds)





Loading…
Cancel
Save