Browse Source

fix Matrix Profile

Former-commit-id: 0508fe1e5b [formerly d0b3fb94dd] [formerly 7c3f333e68 [formerly 4305068a1f]] [formerly 95f4a7b3c9 [formerly 031dc9615f] [formerly 656d01fdc5 [formerly 924889df3c]]] [formerly 312fc70109 [formerly 57e72d9cf8] [formerly abad9b547e [formerly 01e6885b91]] [formerly 9cfbce8dd0 [formerly 8b3731e595] [formerly 3b02e0b6c4 [formerly 66004e8d6b]]]] [formerly a05a7725d8 [formerly dd8cbdebe7] [formerly 2d0d09fd43 [formerly fb3c187cc3]] [formerly 41e49270d3 [formerly d44b99c4d6] [formerly 6e7b553b41 [formerly f915a1c961]]] [formerly 6355c7704b [formerly f5f8be010b] [formerly 6d56c08ca7 [formerly d8cf711712]] [formerly 79dbdc61f6 [formerly 0eef28d35a] [formerly 5486eef3cc [formerly 7b29a01635]]]]] [formerly 8a24c3e899 [formerly 1b589c1017] [formerly 040f062e33 [formerly 84e580cfa0]] [formerly cec9f82649 [formerly e7b85d5c41] [formerly 9c4ea83829 [formerly c02155e676]]] [formerly b2149dc9bf [formerly 09d9c1fd48] [formerly 1360f95ce6 [formerly 1b35ce4662]] [formerly 5e2bff7099 [formerly 3ab76ed0a8] [formerly ddc6f3f5d9 [formerly 41ca6c9dfa]]]] [formerly c48844dddb [formerly 58f86baada] [formerly 0215dc1685 [formerly 7b7d5f9b59]] [formerly 8cbacb9ec4 [formerly 7aa727b7c5] [formerly f3831122d4 [formerly 714eab8192]]] [formerly 0cc1d60ac4 [formerly 6fc93900ec] [formerly e5297790f9 [formerly a8642bec67]] [formerly db79caaeb2 [formerly 4cbb2c66ec] [formerly d774596516 [formerly d37f461764]]]]]]
Former-commit-id: 3015eb1cc4 [formerly a64305ecf4] [formerly 4530cd0caf [formerly e8d18e6952]] [formerly 06fb7cca76 [formerly b8a703c4b7] [formerly cd8c7f568b [formerly 66017cc4a1]]] [formerly ef6ce5f493 [formerly 88cc50cf34] [formerly 73d0297faf [formerly c06b8ca6bf]] [formerly bb8038d0d6 [formerly 80dc4d1250] [formerly e8b478993c [formerly 06d1552277]]]] [formerly 7f98e080de [formerly f67b72cacd] [formerly f7fbb3b29b [formerly cd29a7f67e]] [formerly 437b996566 [formerly 8b2edd8e63] [formerly 72b1f39938 [formerly af56782e86]]] [formerly f536760edf [formerly 8f7bac2c41] [formerly 0dfae2f4f4 [formerly d50b851997]] [formerly ecbc336970 [formerly 3f569a667c] [formerly d774596516]]]]
Former-commit-id: a3ef8e1645 [formerly 0c32def7ea] [formerly 31956558b4 [formerly 79ddf764ec]] [formerly 519a814bba [formerly 8d1f02b718] [formerly dcb257483e [formerly bf314d2963]]] [formerly c252ff726f [formerly dcc58f2b65] [formerly 8e11b7d98b [formerly 4e593a6a4e]] [formerly d69fdc8418 [formerly 39d03951c5] [formerly 897abe34da [formerly effe430c15]]]]
Former-commit-id: 9ed7c019c4 [formerly d5df678f58] [formerly 1fd3454089 [formerly d60807a2f2]] [formerly 050e055a69 [formerly d0a3cafca3] [formerly ed7354fb85 [formerly f51f365643]]]
Former-commit-id: 6e5f45be96 [formerly 9b18b3e2a4] [formerly 53339f81eb [formerly bb990bdcb8]]
Former-commit-id: 8627ac9e6a [formerly ab62a81a38]
Former-commit-id: 01a577e551
master
YileAllenChen1 4 years ago
parent
commit
23ffa7d4e0
2 changed files with 305 additions and 74 deletions
  1. +304
    -73
      tods/detection_algorithm/MatrixProfile.py
  2. +1
    -1
      tods/tests/test_MatrixProfile.py

+ 304
- 73
tods/detection_algorithm/MatrixProfile.py View File

@@ -1,56 +1,99 @@
from typing import Any, Callable, List, Dict, Union, Optional, Sequence, Tuple
from numpy import ndarray
from collections import OrderedDict
from scipy import sparse
import os
import sklearn
import numpy
import typing
import time
from scipy import sparse
from numpy import ndarray
from collections import OrderedDict
from typing import Any, Callable, List, Dict, Union, Optional, Sequence, Tuple

# Custom import commands if any
import warnings
import numpy as np
from sklearn.utils import check_array
from sklearn.exceptions import NotFittedError
# from numba import njit
from pyod.utils.utility import argmaxn
import pandas as pd
import logging, uuid
from scipy import sparse
from numpy import ndarray
from collections import OrderedDict
from common_primitives import dataframe_utils, utils

from d3m.container.numpy import ndarray as d3m_ndarray
from d3m.container import DataFrame as d3m_dataframe
from d3m.metadata import hyperparams, params, base as metadata_base
from d3m import utils
from d3m import container
from d3m.base import utils as base_utils
from d3m.exceptions import PrimitiveNotFittedError
from d3m.container import DataFrame as d3m_dataframe
from d3m.container.numpy import ndarray as d3m_ndarray
from d3m.primitive_interfaces import base, transformer
from d3m.metadata import base as metadata_base, hyperparams
from d3m.metadata import hyperparams, params, base as metadata_base
from d3m.primitive_interfaces.base import CallResult, DockerContainer

# from d3m.primitive_interfaces.supervised_learning import SupervisedLearnerPrimitiveBase
from d3m.primitive_interfaces.unsupervised_learning import UnsupervisedLearnerPrimitiveBase
from d3m.primitive_interfaces.transformer import TransformerPrimitiveBase

from d3m.primitive_interfaces.base import ProbabilisticCompositionalityMixin, ContinueFitMixin
from d3m import exceptions
import pandas
import uuid

from d3m import container, utils as d3m_utils

from .UODBasePrimitive import Params_ODBase, Hyperparams_ODBase, UnsupervisedOutlierDetectorBase
import stumpy
# from typing import Union

Inputs = d3m_dataframe
Outputs = d3m_dataframe
__all__ = ('MatrixProfile',)

Inputs = container.DataFrame
Outputs = container.DataFrame

class PrimitiveCount:
primitive_no = 0

class Params(Params_ODBase):
######## Add more Attributes #######
pass

class Hyperparams(hyperparams.Hyperparams):
window_size = hyperparams.UniformInt(
lower = 0,
upper = 100, #TODO: Define the correct the upper bound
default=50,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="window size to calculate"
)
# Keep previous
dataframe_resource = hyperparams.Hyperparameter[typing.Union[str, None]](
default=None,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Resource ID of a DataFrame to extract if there are multiple tabular resources inside a Dataset and none is a dataset entry point.",
)
use_columns = hyperparams.Set(
elements=hyperparams.Hyperparameter[int](-1),
default=(2,),
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="A set of column indices to force primitive to operate on. If any specified column cannot be parsed, it is skipped.",
)
exclude_columns = hyperparams.Set(
elements=hyperparams.Hyperparameter[int](-1),
default=(0,1,3,),
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="A set of column indices to not operate on. Applicable only if \"use_columns\" is not provided.",
)
return_result = hyperparams.Enumeration(
values=['append', 'replace', 'new'],
default='new',
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Should parsed columns be appended, should they replace original columns, or should only parsed columns be returned? This hyperparam is ignored if use_semantic_types is set to false.",
)
use_semantic_types = hyperparams.UniformBool(
default=False,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Controls whether semantic_types metadata will be used for filtering columns in input dataframe. Setting this to false makes the code ignore return_result and will produce only the output dataframe"
)
add_index_columns = hyperparams.UniformBool(
default=False,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Also include primary index columns if input data has them. Applicable only if \"return_result\" is set to \"new\".",
)
error_on_no_input = hyperparams.UniformBool(
default=True,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Throw an exception if no input column is selected/provided. Defaults to true to behave like sklearn. To prevent pipelines from breaking set this to False.",
)
return_semantic_type = hyperparams.Enumeration[str](
values=['https://metadata.datadrivendiscovery.org/types/Attribute',
'https://metadata.datadrivendiscovery.org/types/ConstructedAttribute'],
default='https://metadata.datadrivendiscovery.org/types/Attribute',
description='Decides what semantic type to attach to generated attributes',
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter']
)

class Hyperparams(Hyperparams_ODBase):
######## Add more Attributes #######
pass

class MP:
"""
@@ -80,10 +123,9 @@ class MP:
#transformed_columns[col]=output
#print(transformed_columns)
return transformed_columns
class MatrixProfile(UnsupervisedOutlierDetectorBase[Inputs, Outputs, Params, Hyperparams]):
"""

class MatrixProfile(transformer.TransformerPrimitiveBase[Inputs, Outputs, Hyperparams]):
"""
A primitive that performs matrix profile on a DataFrame using Stumpy package
Stumpy documentation: https://stumpy.readthedocs.io/en/latest/index.html

@@ -99,7 +141,7 @@ class MatrixProfile(UnsupervisedOutlierDetectorBase[Inputs, Outputs, Params, Hyp
ignore_trivial : bool
Set to `True` if this is a self-join. Otherwise, for AB-join, set this
to `False`. Default is `True`.
Returnsfdsf
Returns
-------
out : ndarray
The first column consists of the matrix profile, the second column
@@ -109,6 +151,7 @@ class MatrixProfile(UnsupervisedOutlierDetectorBase[Inputs, Outputs, Params, Hyp
"""

metadata = metadata_base.PrimitiveMetadata({
'__author__': "DATA Lab @Texas A&M University",
'name': "Matrix Profile",
@@ -124,66 +167,254 @@ class MatrixProfile(UnsupervisedOutlierDetectorBase[Inputs, Outputs, Params, Hyp
})


def __init__(self, *,
hyperparams: Hyperparams, #
random_seed: int = 0,
docker_containers: Dict[str, DockerContainer] = None) -> None:
super().__init__(hyperparams=hyperparams, random_seed=random_seed, docker_containers=docker_containers)
def __init__(self, *, hyperparams: Hyperparams) -> None:
super().__init__(hyperparams=hyperparams)
self._clf = MP(window_size = hyperparams['window_size'])
self.primitiveNo = PrimitiveCount.primitive_no
PrimitiveCount.primitive_no+=1

self._clf = MP(window_size=hyperparams['window_size'])
def produce(self, *, inputs: Inputs, timeout: float = None, iterations: int = None) -> base.CallResult[Outputs]:

def set_training_data(self, *, inputs: Inputs) -> None:
"""
Set training data for outlier detection.
Args:

inputs: Container DataFrame

timeout: Default

iterations: Default

Returns:
None

Container DataFrame containing Matrix Profile of selected columns
"""
super().set_training_data(inputs=inputs)

def fit(self, *, timeout: float = None, iterations: int = None) -> CallResult[None]:
# Get cols to fit.
self._fitted = False
self._training_inputs, self._training_indices = self._get_columns_to_fit(inputs, self.hyperparams)
self._input_column_names = self._training_inputs.columns


if len(self._training_indices) > 0:
self._fitted = True
else:
if self.hyperparams['error_on_no_input']:
raise RuntimeError("No input columns were selected")
self.logger.warn("No input columns were selected")

if not self._fitted:
raise PrimitiveNotFittedError("Primitive not fitted.")
sk_inputs = inputs
if self.hyperparams['use_semantic_types']:
sk_inputs = inputs.iloc[:, self._training_indices]
output_columns = []
if len(self._training_indices) > 0:
sk_output = self._clf.produce(sk_inputs)
if sparse.issparse(sk_output):
sk_output = sk_output.toarray()
outputs = self._wrap_predictions(inputs, sk_output)
if len(outputs.columns) == len(self._input_column_names):
outputs.columns = self._input_column_names
output_columns = [outputs]

else:
if self.hyperparams['error_on_no_input']:
raise RuntimeError("No input columns were selected")
self.logger.warn("No input columns were selected")

outputs = base_utils.combine_columns(return_result=self.hyperparams['return_result'],
add_index_columns=self.hyperparams['add_index_columns'],
inputs=inputs, column_indices=self._training_indices,
columns_list=output_columns)
#print(outputs)
#CallResult(outputs)
#print("___")
print(outputs.columns)
#outputs.columns = [str(x) for x in outputs.columns]

return CallResult(outputs)

# assert isinstance(inputs, container.DataFrame), type(container.DataFrame)
# _, self._columns_to_produce = self._get_columns_to_fit(inputs, self.hyperparams)
# #print("columns_to_produce ", self._columns_to_produce)
# outputs = inputs
# if len(self._columns_to_produce) > 0:
# for col in self.hyperparams['use_columns']:
# output = self._clf.produce(inputs.iloc[ : ,col])
# outputs = pd.concat((outputs, pd.DataFrame({inputs.columns[col]+'_matrix_profile': output[:,0],
# inputs.columns[col]+'_matrix_profile_indices': output[:,1],
# inputs.columns[col]+'_left_matrix_profile_indices': output[:,2],
# inputs.columns[col]+'_right_matrix_profile_indices': output[:,3]})), axis = 1)

# else:
# if self.hyperparams['error_on_no_input']:
# raise RuntimeError("No input columns were selected")
# self.logger.warn("No input columns were selected")

# #print(outputs)
# self._update_metadata(outputs)

# return base.CallResult(outputs)



def _update_metadata(self, outputs):
outputs.metadata = outputs.metadata.generate(outputs)
@classmethod
def _get_columns_to_fit(cls, inputs: Inputs, hyperparams: Hyperparams):

"""
Fit model with training data.
Args:
*: Container DataFrame. Time series data up to fit.

Returns:
None
Select columns to fit.
Args:
inputs: Container DataFrame
hyperparams: d3m.metadata.hyperparams.Hyperparams

Returns:
list

"""
return super().fit()

def produce(self, *, inputs: Inputs, timeout: float = None, iterations: int = None) -> CallResult[Outputs]:
if not hyperparams['use_semantic_types']:
return inputs, list(range(len(inputs.columns)))

inputs_metadata = inputs.metadata


def can_produce_column(column_index: int) -> bool:
return cls._can_produce_column(inputs_metadata, column_index, hyperparams)

columns_to_produce, columns_not_to_produce = base_utils.get_columns_to_use(inputs_metadata,
use_columns=hyperparams['use_columns'],
exclude_columns=hyperparams['exclude_columns'],
can_use_column=can_produce_column)


"""
Process the testing data.
Args:
inputs: Container DataFrame. Time series data up to outlier detection.
Encountered error: when hyperparams['use_columns'] = (2,3) and hyperparams['exclude_columns'] is (1,2)
columns_to_produce is still [2]
"""
return inputs.iloc[:, columns_to_produce], columns_to_produce

@classmethod
def _can_produce_column(cls, inputs_metadata: metadata_base.DataMetadata, column_index: int, hyperparams: Hyperparams) -> bool:

Returns:
Container DataFrame
1 marks Outliers, 0 marks normal.
"""
return super().produce(inputs=inputs, timeout=timeout, iterations=iterations)

def get_params(self) -> Params:
Output whether a column can be processed.
Args:
inputs_metadata: d3m.metadata.base.DataMetadata
column_index: int

Returns:
bool

"""
Return parameters.

column_metadata = inputs_metadata.query((metadata_base.ALL_ELEMENTS, column_index))

accepted_structural_types = (int, float, np.integer, np.float64) #changed numpy to np
accepted_semantic_types = set()
accepted_semantic_types.add("https://metadata.datadrivendiscovery.org/types/Attribute")

# print(column_metadata)
# print(column_metadata['structural_type'], accepted_structural_types)

if not issubclass(column_metadata['structural_type'], accepted_structural_types):
return False

semantic_types = set(column_metadata.get('semantic_types', []))

# print(column_metadata)
# print(semantic_types, accepted_semantic_types)

if len(semantic_types) == 0:
cls.logger.warning("No semantic types found in column metadata")
return False

# Making sure all accepted_semantic_types are available in semantic_types
if len(accepted_semantic_types - semantic_types) == 0:
return True

return False

def _wrap_predictions(self, inputs: Inputs, predictions: ndarray) -> Outputs:

"""

Wrap predictions into dataframe
Args:
None
inputs: Container Dataframe
predictions: array-like data (n_samples, n_features)

Returns:
class Params
Dataframe

"""

outputs = d3m_dataframe(predictions, generate_metadata=True)
target_columns_metadata = self._add_target_columns_metadata(outputs.metadata, self.hyperparams, self.primitiveNo)
outputs.metadata = self._update_predictions_metadata(inputs.metadata, outputs, target_columns_metadata)
return outputs



@classmethod
def _update_predictions_metadata(cls, inputs_metadata: metadata_base.DataMetadata, outputs: Optional[Outputs],
target_columns_metadata: List[OrderedDict]) -> metadata_base.DataMetadata:

"""
return super().get_params()

def set_params(self, *, params: Params) -> None:
Updata metadata for selected columns.
Args:
inputs_metadata: metadata_base.DataMetadata
outputs: Container Dataframe
target_columns_metadata: list

Returns:
d3m.metadata.base.DataMetadata

"""

outputs_metadata = metadata_base.DataMetadata().generate(value=outputs)

for column_index, column_metadata in enumerate(target_columns_metadata):
column_metadata.pop("structural_type", None)
outputs_metadata = outputs_metadata.update_column(column_index, column_metadata)

return outputs_metadata


@classmethod
def _add_target_columns_metadata(cls, outputs_metadata: metadata_base.DataMetadata, hyperparams, primitiveNo):
"""
Set parameters for outlier detection.
Add target columns metadata
Args:
params: class Params
outputs_metadata: metadata.base.DataMetadata
hyperparams: d3m.metadata.hyperparams.Hyperparams

Returns:
None
List[OrderedDict]
"""
super().set_params(params=params)
outputs_length = outputs_metadata.query((metadata_base.ALL_ELEMENTS,))['dimension']['length']
target_columns_metadata: List[OrderedDict] = []
for column_index in range(outputs_length):
column_name = "{0}{1}_{2}".format(cls.metadata.query()['name'], primitiveNo, column_index)
column_metadata = OrderedDict()
semantic_types = set()
semantic_types.add(hyperparams["return_semantic_type"])
column_metadata['semantic_types'] = list(semantic_types)

column_metadata["name"] = str(column_name)
target_columns_metadata.append(column_metadata)
return target_columns_metadata

+ 1
- 1
tods/tests/test_MatrixProfile.py View File

@@ -15,7 +15,7 @@ class MatrixProfileTest(unittest.TestCase):
columns=['a', 'b', 'c'],
generate_metadata=True)

print(main)
#print(main)


self.assertEqual(utils.to_json_structure(main.metadata.to_internal_simple_structure()), [{


Loading…
Cancel
Save