Browse Source

fix unit-test for MP SKIinterface

master
lhenry15 4 years ago
parent
commit
24f9b24e11
4 changed files with 557 additions and 4 deletions
  1. +39
    -4
      tods/detection_algorithm/MatrixProfile.py
  2. +415
    -0
      tods/feature_analysis/MatrixProfile.py
  3. +102
    -0
      tods/tests/feature_analysis/test_MatrixProfile.py
  4. +1
    -0
      tods/tests/sk_interface/detection_algorithm/test_ski_MatrixProfile.py

+ 39
- 4
tods/detection_algorithm/MatrixProfile.py View File

@@ -37,6 +37,8 @@ from d3m import container, utils as d3m_utils

from .UODBasePrimitive import Params_ODBase, Hyperparams_ODBase, UnsupervisedOutlierDetectorBase
import stumpy

from sklearn.preprocessing import MinMaxScaler
# from typing import Union

Inputs = d3m_dataframe
@@ -51,14 +53,20 @@ class Params(Params_ODBase):

class Hyperparams(Hyperparams_ODBase):
######## Add more Attributes #######
pass
#pass
window_size = hyperparams.Hyperparameter[int](
default=3,
description='The moving window size.',
semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter']
)

class MP:
"""
This is the class for matrix profile function
"""
def __init__(self, window_size):
def __init__(self, window_size, step_size):
self._window_size = window_size
self._step_size = step_size
return

def fit(self, X, y=None):
@@ -102,7 +110,7 @@ class MP:
nparray

"""
"""
#only keep first two columns of MP results, the second column is left index, use windowsize to get right index
transformed_columns=utils.pandas.DataFrame()
for col in data.transpose(): #data.reshape(1,len(data)):
@@ -112,6 +120,33 @@ class MP:
output = self._get_right_inds(output)
transformed_columns=pd.concat([transformed_columns,output], axis=1)
return transformed_columns
"""
#data = np.random.rand(3, 1000)
#data = np.array([[1., 2., 3., 4.], [5., 6., 7., 8.], [9., 10., 11., 12.]])
#data = np.array([1., 2., 3., 4., 5., 6., 7., 8., 9., 10., 11., 12.])
#data = np.array([[3., 4., 8.6, 13., 22.5, 17, 19.2, 36.1, 127, -23, 59.2, -10],[3., 4., 8.6, 13., 22.5, 17, 19.2, 36.1, 127, -23, 59.2, -10],[3., 4., 8.6, 13., 22.5, 17, 19.2, 36.1, 127, -23, 59.2, -10]])

matrix_profile, matrix_profile_indices = stumpy.mstump(data.transpose(), m = self._window_size)
#matrix_profile, matrix_profile_indices = stumpy.mstump(data, m = self._window_size)

left_inds_ = numpy.arange(0, len(matrix_profile), self._step_size)
right_inds_ = left_inds_ + self._window_size
right_inds_[right_inds_ > len(matrix_profile)] = len(matrix_profile)
left_inds_ = np.array([left_inds_]).transpose()
right_inds_ = np.array([right_inds_]).transpose()
# apply min-max scaling
scaler = MinMaxScaler()
scaler = scaler.fit(matrix_profile)
matrix_profile = scaler.transform(matrix_profile)
output = []
for timestamp in matrix_profile:
timestamp = sum(timestamp)
output.append([timestamp])

output = np.concatenate((output, left_inds_, right_inds_),axis=1)
return output

def predict(self, data):
return self.produce(data)
@@ -168,7 +203,7 @@ class MatrixProfilePrimitive(UnsupervisedOutlierDetectorBase[Inputs, Outputs, Pa
docker_containers: Dict[str, DockerContainer] = None) -> None:
super().__init__(hyperparams=hyperparams, random_seed=random_seed, docker_containers=docker_containers)

self._clf = MP(window_size=hyperparams['window_size'])
self._clf = MP(window_size=hyperparams['window_size'], step_size=hyperparams['step_size'])

def set_training_data(self, *, inputs: Inputs) -> None:
"""


+ 415
- 0
tods/feature_analysis/MatrixProfile.py View File

@@ -0,0 +1,415 @@
from typing import Any, Callable, List, Dict, Union, Optional, Sequence, Tuple
from numpy import ndarray
from collections import OrderedDict
from scipy import sparse
import os
import sklearn
import numpy
import typing
import pandas as pd

# Custom import commands if any
import warnings
import numpy as np
from sklearn.utils import check_array
from sklearn.exceptions import NotFittedError
# from numba import njit
from pyod.utils.utility import argmaxn

from d3m.container.numpy import ndarray as d3m_ndarray
from d3m.container import DataFrame as d3m_dataframe
from d3m.metadata import hyperparams, params, base as metadata_base
from d3m import utils
from d3m.base import utils as base_utils
from d3m.exceptions import PrimitiveNotFittedError
from d3m.primitive_interfaces.base import CallResult, DockerContainer

from d3m.primitive_interfaces.transformer import TransformerPrimitiveBase
from d3m.primitive_interfaces import base, transformer
from d3m.primitive_interfaces.base import ProbabilisticCompositionalityMixin, ContinueFitMixin
from d3m import exceptions
import pandas
import uuid

from d3m import container, utils as d3m_utils

import stumpy

from sklearn.preprocessing import MinMaxScaler
# from typing import Union

Inputs = d3m_dataframe
Outputs = d3m_dataframe


class Hyperparams(hyperparams.Hyperparams):
######## Add more Attributes #######
#pass
window_size = hyperparams.Hyperparameter[int](
default=3,
description='The moving window size.',
semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter']
)

# Keep previous
dataframe_resource = hyperparams.Hyperparameter[typing.Union[str, None]](
default=None,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Resource ID of a DataFrame to extract if there are multiple tabular resources inside a Dataset and none is a dataset entry point.",
)
use_columns = hyperparams.Set(
elements=hyperparams.Hyperparameter[int](-1),
default=(2,),
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="A set of column indices to force primitive to operate on. If any specified column cannot be parsed, it is skipped.",
)
exclude_columns = hyperparams.Set(
elements=hyperparams.Hyperparameter[int](-1),
default=(0,1,3,),
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="A set of column indices to not operate on. Applicable only if \"use_columns\" is not provided.",
)
return_result = hyperparams.Enumeration(
values=['append', 'replace', 'new'],
default='new',
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Should parsed columns be appended, should they replace original columns, or should only parsed columns be returned? This hyperparam is ignored if use_semantic_types is set to false.",
)
use_semantic_types = hyperparams.UniformBool(
default=False,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Controls whether semantic_types metadata will be used for filtering columns in input dataframe. Setting this to false makes the code ignore return_result and will produce only the output dataframe"
)
add_index_columns = hyperparams.UniformBool(
default=False,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Also include primary index columns if input data has them. Applicable only if \"return_result\" is set to \"new\".",
)
error_on_no_input = hyperparams.UniformBool(
default=True,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Throw an exception if no input column is selected/provided. Defaults to true to behave like sklearn. To prevent pipelines from breaking set this to False.",
)

return_semantic_type = hyperparams.Enumeration[str](
values=['https://metadata.datadrivendiscovery.org/types/Attribute',
'https://metadata.datadrivendiscovery.org/types/ConstructedAttribute'],
default='https://metadata.datadrivendiscovery.org/types/Attribute',
description='Decides what semantic type to attach to generated attributes',
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter']
)

class MP:
"""
This is the class for matrix profile function
"""
def __init__(self, window_size): #, step_size):
self._window_size = window_size
#self._step_size = step_size
return
def produce(self, data):

"""
Args:
data: dataframe column
Returns:
nparray
"""
"""
#only keep first two columns of MP results, the second column is left index, use windowsize to get right index
transformed_columns=utils.pandas.DataFrame()
for col in data.transpose(): #data.reshape(1,len(data)):
output = stumpy.stump(col, m = self._window_size)
output = pd.DataFrame(output)
output=output.drop(columns=[2,3])
output = self._get_right_inds(output)
transformed_columns=pd.concat([transformed_columns,output], axis=1)
return transformed_columns
"""
matrix_profile, matrix_profile_indices = stumpy.mstump(data, m = self._window_size)
print('matrix profile ', matrix_profile)
#output = np.concatenate((output, left_inds_, right_inds_),axis=1)
return matrix_profile
class MatrixProfilePrimitive(transformer.TransformerPrimitiveBase[Inputs, Outputs, Hyperparams]):
"""

A primitive that performs matrix profile on a DataFrame using Stumpy package
Stumpy documentation: https://stumpy.readthedocs.io/en/latest/index.html

Parameters
----------
T_A : ndarray
The time series or sequence for which to compute the matrix profile
m : int
Window size
T_B : ndarray
The time series or sequence that contain your query subsequences
of interest. Default is `None` which corresponds to a self-join.
ignore_trivial : bool
Set to `True` if this is a self-join. Otherwise, for AB-join, set this
to `False`. Default is `True`.
Returnsfdsf
-------
out : ndarray
The first column consists of the matrix profile, the second column
consists of the matrix profile indices, the third column consists of
the left matrix profile indices, and the fourth column consists of
the right matrix profile indices.
"""

metadata = metadata_base.PrimitiveMetadata({
'__author__': "DATA Lab @Texas A&M University",
'name': "Matrix Profile",
'python_path': 'd3m.primitives.tods.feature_analysis.matrix_profile',
'source': {
'name': "DATA Lab @Taxes A&M University",
'contact': 'mailto:khlai037@tamu.edu',
},
'hyperparams_to_tune': ['window_size'],
'version': '0.0.2',
'algorithm_types': [
metadata_base.PrimitiveAlgorithmType.TODS_PRIMITIVE,
],
'primitive_family': metadata_base.PrimitiveFamily.ANOMALY_DETECTION,
'id': str(uuid.uuid3(uuid.NAMESPACE_DNS, 'MatrixProfilePrimitive')),
})


def __init__(self, *,
hyperparams: Hyperparams, #
random_seed: int = 0,
docker_containers: Dict[str, DockerContainer] = None) -> None:
super().__init__(hyperparams=hyperparams, random_seed=random_seed, docker_containers=docker_containers)

self._clf = MP(window_size=hyperparams['window_size']) #, step_size=hyperparams['step_size'])


def produce(self, *, inputs: Inputs, timeout: float = None, iterations: int = None) -> base.CallResult[Outputs]:
"""
Process the testing data.
Args:
inputs: Container DataFrame. Time series data up to Wavelet transform.

Returns:
[cA_n, cD_n, cD_n-1, …, cD2, cD1]: Container DataFrame after Wavelet Transformation.
Ordered frame of coefficients arrays where n denotes the level of decomposition. The first element (cA_n) of the result is approximation coefficients array and the following elements (cD_n - cD_1) are details coefficients arrays.
"""
assert isinstance(inputs, container.DataFrame), type(container.DataFrame)

_, self._columns_to_produce = self._get_columns_to_fit(inputs, self.hyperparams)
self._input_column_names = inputs.columns

# print('columns_to_produce=', self._columns_to_produce)


sk_inputs = inputs
if self.hyperparams['use_semantic_types']: # pragma: no cover
sk_inputs = inputs.iloc[:, self._columns_to_produce]
output_columns = []
if len(self._columns_to_produce) > 0:
sk_output = self._clf.produce(sk_inputs)
if sparse.issparse(sk_output): # pragma: no cover
sk_output = sk_output.toarray()
outputs = self._wrap_predictions(inputs, sk_output)
if len(outputs.columns) == len(self._input_column_names):
outputs.columns = self._input_column_names
output_columns = [outputs]
else: # pragma: no cover
if self.hyperparams['error_on_no_input']:
raise RuntimeError("No input columns were selected")
self.logger.warn("No input columns were selected")
outputs = base_utils.combine_columns(return_result=self.hyperparams['return_result'],
add_index_columns=self.hyperparams['add_index_columns'],
inputs=inputs, column_indices=self._columns_to_produce,
columns_list=output_columns)

# outputs = inputs
return base.CallResult(outputs)

@classmethod
def _get_columns_to_fit(cls, inputs: Inputs, hyperparams: Hyperparams): # pragma: no cover
"""
Select columns to fit.
Args:
inputs: Container DataFrame
hyperparams: d3m.metadata.hyperparams.Hyperparams

Returns:
list
"""
# print('======_get_columns_to_fit======')

if not hyperparams['use_semantic_types']:
return inputs, list(range(len(inputs.columns)))

inputs_metadata = inputs.metadata

def can_produce_column(column_index: int) -> bool:
return cls._can_produce_column(inputs_metadata, column_index, hyperparams)

columns_to_produce, columns_not_to_produce = base_utils.get_columns_to_use(inputs_metadata,
use_columns=hyperparams[
'use_columns'],
exclude_columns=hyperparams[
'exclude_columns'],
can_use_column=can_produce_column)

return inputs.iloc[:, columns_to_produce], columns_to_produce
# return columns_to_produce

@classmethod
def _can_produce_column(cls, inputs_metadata: metadata_base.DataMetadata, column_index: int,
hyperparams: Hyperparams) -> bool: # pragma: no cover
"""
Output whether a column can be processed.
Args:
inputs_metadata: d3m.metadata.base.DataMetadata
column_index: int

Returns:
bool
"""
column_metadata = inputs_metadata.query((metadata_base.ALL_ELEMENTS, column_index))

accepted_structural_types = (int, float, numpy.integer, numpy.float64)
accepted_semantic_types = set()
accepted_semantic_types.add("https://metadata.datadrivendiscovery.org/types/Attribute")

# print(column_metadata)
# print(column_metadata['structural_type'], accepted_structural_types)

if not issubclass(column_metadata['structural_type'], accepted_structural_types):
return False

semantic_types = set(column_metadata.get('semantic_types', []))

# print(column_metadata)
# print(semantic_types, accepted_semantic_types)

if len(semantic_types) == 0:
cls.logger.warning("No semantic types found in column metadata")
return False

# Making sure all accepted_semantic_types are available in semantic_types
if len(accepted_semantic_types - semantic_types) == 0:
return True

return False

@classmethod
def _get_target_columns_metadata(cls, outputs_metadata: metadata_base.DataMetadata, hyperparams) -> List[
OrderedDict]: # pragma: no cover
"""
Output metadata of selected columns.
Args:
outputs_metadata: metadata_base.DataMetadata
hyperparams: d3m.metadata.hyperparams.Hyperparams

Returns:
d3m.metadata.base.DataMetadata
"""
outputs_length = outputs_metadata.query((metadata_base.ALL_ELEMENTS,))['dimension']['length']

target_columns_metadata: List[OrderedDict] = []
for column_index in range(outputs_length):
column_metadata = OrderedDict(outputs_metadata.query_column(column_index))

# Update semantic types and prepare it for predicted targets.
semantic_types = set(column_metadata.get('semantic_types', []))
semantic_types_to_remove = set([])
add_semantic_types = []
add_semantic_types.add(hyperparams["return_semantic_type"])
semantic_types = semantic_types - semantic_types_to_remove
semantic_types = semantic_types.union(add_semantic_types)
column_metadata['semantic_types'] = list(semantic_types)

target_columns_metadata.append(column_metadata)

return target_columns_metadata

@classmethod
def _update_predictions_metadata(cls, inputs_metadata: metadata_base.DataMetadata, outputs: Optional[Outputs],
target_columns_metadata: List[OrderedDict]) -> metadata_base.DataMetadata: # pragma: no cover
"""
Updata metadata for selected columns.
Args:
inputs_metadata: metadata_base.DataMetadata
outputs: Container Dataframe
target_columns_metadata: list

Returns:
d3m.metadata.base.DataMetadata
"""
outputs_metadata = metadata_base.DataMetadata().generate(value=outputs)

for column_index, column_metadata in enumerate(target_columns_metadata):
column_metadata.pop("structural_type", None)
outputs_metadata = outputs_metadata.update_column(column_index, column_metadata)

return outputs_metadata

def _wrap_predictions(self, inputs: Inputs, predictions: ndarray) -> Outputs: # pragma: no cover
"""
Wrap predictions into dataframe
Args:
inputs: Container Dataframe
predictions: array-like data (n_samples, n_features)

Returns:
Dataframe
"""
outputs = container.DataFrame(predictions, generate_metadata=True)
target_columns_metadata = self._copy_inputs_metadata(inputs.metadata, self._columns_to_produce, outputs.metadata,
self.hyperparams)
outputs.metadata = self._update_predictions_metadata(inputs.metadata, outputs, target_columns_metadata)
return outputs

@classmethod
def _copy_inputs_metadata(cls, inputs_metadata: metadata_base.DataMetadata, input_indices: List[int],
outputs_metadata: metadata_base.DataMetadata, hyperparams): # pragma: no cover
"""
Updata metadata for selected columns.
Args:
inputs_metadata: metadata.base.DataMetadata
input_indices: list
outputs_metadata: metadata.base.DataMetadata
hyperparams: d3m.metadata.hyperparams.Hyperparams

Returns:
d3m.metadata.base.DataMetadata
"""
outputs_length = outputs_metadata.query((metadata_base.ALL_ELEMENTS,))['dimension']['length']
target_columns_metadata: List[OrderedDict] = []
for column_index in input_indices:
column_name = inputs_metadata.query((metadata_base.ALL_ELEMENTS, column_index)).get("name")
if column_name is None:
column_name = "output_{}".format(column_index)

column_metadata = OrderedDict(inputs_metadata.query_column(column_index))
semantic_types = set(column_metadata.get('semantic_types', []))
semantic_types_to_remove = set([])
add_semantic_types = set()
add_semantic_types.add(hyperparams["return_semantic_type"])
semantic_types = semantic_types - semantic_types_to_remove
semantic_types = semantic_types.union(add_semantic_types)
column_metadata['semantic_types'] = list(semantic_types)
# print(column_metadata['semantic_types'])

column_metadata["name"] = str(column_name)
target_columns_metadata.append(column_metadata)

# If outputs has more columns than index, add Attribute Type to all remaining
if outputs_length > len(input_indices):
for column_index in range(len(input_indices), outputs_length):
column_metadata = OrderedDict()
semantic_types = set()
semantic_types.add(hyperparams["return_semantic_type"])
column_name = "output_{}".format(column_index)
column_metadata["semantic_types"] = list(semantic_types)
column_metadata["name"] = str(column_name)
target_columns_metadata.append(column_metadata)

# print(target_columns_metadata)
return target_columns_metadata

+ 102
- 0
tods/tests/feature_analysis/test_MatrixProfile.py View File

@@ -0,0 +1,102 @@
import unittest

from d3m import container, utils
from d3m.metadata import base as metadata_base
from tods.feature_analysis.MatrixProfile import MatrixProfilePrimitive



class MatrixProfileTest(unittest.TestCase):
def test_basic(self):
self.maxDiff = None
main = container.DataFrame({'a': [1., 2., 3., 4., 5., 6., 7., 8., 9.],
'b': [2., 3., 4., 5., 6., 7., 8., 9., 10.],
'c': [3., 4., 5., 6., 7., 8., 9., 10., 11.]},
columns=['a', 'b', 'c'],
generate_metadata=True)

#print(main)


self.assertEqual(utils.to_json_structure(main.metadata.to_internal_simple_structure()), [{
'selector': [],
'metadata': {
# 'top_level': 'main',
'schema': metadata_base.CONTAINER_SCHEMA_VERSION,
'structural_type': 'd3m.container.pandas.DataFrame',
'semantic_types': ['https://metadata.datadrivendiscovery.org/types/Table'],
'dimension': {
'name': 'rows',
'semantic_types': ['https://metadata.datadrivendiscovery.org/types/TabularRow'],
'length': 9,
},
},
}, {
'selector': ['__ALL_ELEMENTS__'],
'metadata': {
'dimension': {
'name': 'columns',
'semantic_types': ['https://metadata.datadrivendiscovery.org/types/TabularColumn'],
'length': 3,
},
},
}, {
'selector': ['__ALL_ELEMENTS__', 0],
'metadata': {'structural_type': 'numpy.float64', 'name': 'a'},
}, {
'selector': ['__ALL_ELEMENTS__', 1],
'metadata': {'structural_type': 'numpy.float64', 'name': 'b'},
}, {
'selector': ['__ALL_ELEMENTS__', 2],
'metadata': {'structural_type': 'numpy.float64', 'name': 'c'}
}])


self.assertIsInstance(main, container.DataFrame)


hyperparams_class = MatrixProfilePrimitive.metadata.get_hyperparams()
hyperparams = hyperparams_class.defaults()
hyperparams = hyperparams.replace({'window_size': 3})
#print(type(main))
primitive = MatrixProfilePrimitive(hyperparams=hyperparams)
new_main = primitive.produce(inputs=main).value
print(new_main)


self.assertEqual(utils.to_json_structure(main.metadata.to_internal_simple_structure()), [{
'selector': [],
'metadata': {
# 'top_level': 'main',
'schema': metadata_base.CONTAINER_SCHEMA_VERSION,
'structural_type': 'd3m.container.pandas.DataFrame',
'semantic_types': ['https://metadata.datadrivendiscovery.org/types/Table'],
'dimension': {
'name': 'rows',
'semantic_types': ['https://metadata.datadrivendiscovery.org/types/TabularRow'],
'length': 9,
},
},
}, {
'selector': ['__ALL_ELEMENTS__'],
'metadata': {
'dimension': {
'name': 'columns',
'semantic_types': ['https://metadata.datadrivendiscovery.org/types/TabularColumn'],
'length': 3,
},
},
}, {
'selector': ['__ALL_ELEMENTS__', 0],
'metadata': {'structural_type': 'numpy.float64', 'name': 'a'},
}, {
'selector': ['__ALL_ELEMENTS__', 1],
'metadata': {'structural_type': 'numpy.float64', 'name': 'b'},
}, {
'selector': ['__ALL_ELEMENTS__', 2],
'metadata': {'structural_type': 'numpy.float64', 'name': 'c'}
}])


if __name__ == '__main__':
unittest.main()

+ 1
- 0
tods/tests/sk_interface/detection_algorithm/test_ski_MatrixProfile.py View File

@@ -40,6 +40,7 @@ class MatrixProfileSKI_TestCase(unittest.TestCase):
def test_prediction_score(self):
pred_scores = self.transformer.predict_score(self.X_test)
assert_equal(pred_scores.shape[0], self.y_test.shape[0])
pred_scores = pred_scores[:,0]
assert_greater_equal(roc_auc_score(self.y_test, pred_scores), self.roc_floor)




Loading…
Cancel
Save