|
- from typing import Any, Callable, List, Dict, Union, Optional, Sequence, Tuple
- from numpy import ndarray
- from collections import OrderedDict
- from scipy import sparse
- import os
- import sklearn
- import numpy
- import typing
- import pandas as pd
- # Custom import commands if any
- from sklearn.preprocessing.data import Normalizer
- from statsmodels.tsa.api import ExponentialSmoothing, SimpleExpSmoothing, Holt
- import uuid
-
-
- from d3m.container.numpy import ndarray as d3m_ndarray
- from d3m.container import DataFrame as d3m_dataframe
- from d3m.metadata import hyperparams, params, base as metadata_base
- from d3m import utils
- from d3m.base import utils as base_utils
- from d3m.exceptions import PrimitiveNotFittedError
- from d3m.primitive_interfaces.base import CallResult,DockerContainer
- from d3m.primitive_interfaces.unsupervised_learning import UnsupervisedLearnerPrimitiveBase
-
- import os
- from typing import Any,Optional,List
-
- from d3m import container, utils as d3m_utils
- from d3m.metadata import base as metadata_base
- from d3m.metadata import hyperparams,params
- from d3m.primitive_interfaces import base, transformer
-
- Inputs = d3m_dataframe
- Outputs = d3m_dataframe
-
-
- class Params(params.Params):
- input_column_names: Optional[Any]
- target_names_: Optional[Sequence[Any]]
- training_indices_: Optional[Sequence[int]]
- target_column_indices_: Optional[Sequence[int]]
- target_columns_metadata_: Optional[List[OrderedDict]]
-
-
-
- class Hyperparams(hyperparams.Hyperparams):
- # Added by Mia
- endog = hyperparams.Bounded[int](
- lower = 2,
- upper = None,
- default = 3,
- description='Array like time series.',
- semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter']
- )
-
- threshold = hyperparams.Bounded[float](
- lower = 0,
- upper = 1,
- default = 0.5,
- semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter']
- )
-
- # keep previous
- norm = hyperparams.Enumeration[str](
- default='l2',
- values=['l1', 'l2', 'max'],
- description='The norm to use to normalize each non zero sample.',
- semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter']
- )
-
- use_columns = hyperparams.Set(
- elements=hyperparams.Hyperparameter[int](-1),
- default=(),
- semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
- description="A set of column indices to force primitive to operate on. If any specified column cannot be parsed, it is skipped.",
- )
- exclude_columns = hyperparams.Set(
- elements=hyperparams.Hyperparameter[int](-1),
- default=(),
- semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
- description="A set of column indices to not operate on. Applicable only if \"use_columns\" is not provided.",
- )
- return_result = hyperparams.Enumeration(
- values=['append', 'replace', 'new'],
- default='new',
- semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
- description="Should parsed columns be appended, should they replace original columns, or should only parsed columns be returned? This hyperparam is ignored if use_semantic_types is set to false.",
- )
- use_semantic_types = hyperparams.UniformBool(
- default=False,
- semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
- description="Controls whether semantic_types metadata will be used for filtering columns in input dataframe. Setting this to false makes the code ignore return_result and will produce only the output dataframe",
- )
-
- add_index_columns = hyperparams.UniformBool(
- default=False,
- semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
- description="Also include primary index columns if input data has them. Applicable only if \"return_result\" is set to \"new\".",
- )
- error_on_no_input = hyperparams.UniformBool(
- default=True,
- semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
- description="Throw an exception if no input column is selected/provided. Defaults to true to behave like sklearn. To prevent pipelines from breaking set this to False.",
- )
-
- return_semantic_type = hyperparams.Enumeration[str](
- values=['https://metadata.datadrivendiscovery.org/types/Attribute', 'https://metadata.datadrivendiscovery.org/types/ConstructedAttribute'],
- default='https://metadata.datadrivendiscovery.org/types/Attribute',
- description='Decides what semantic type to attach to generated attributes',
- semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter']
- )
-
- class EnsemblePrimitive(UnsupervisedLearnerPrimitiveBase[Inputs, Outputs, Params, Hyperparams]):
- """
- Ensemble method
- `Calculate the Maximum/Minimum/Average and Majority Voting for the detection algorithm based on the threshold set for the score`_
-
- """
-
- metadata = metadata_base.PrimitiveMetadata({
- "__author__": "DATA Lab at Texas A&M University",
- "name": "Ensemble",
- "python_path": "d3m.primitives.tods.detection_algorithm.Ensemble",
- "source": {
- 'name': 'DATA Lab at Texas A&M University',
- 'contact': 'mailto:khlai037@tamu.edu',
- },
- "version": "0.0.1",
- "hyperparams_to_tune": ['use_columns'],
- "algorithm_types": [
- metadata_base.PrimitiveAlgorithmType.TODS_PRIMITIVE,
- ],
- "primitive_family": metadata_base.PrimitiveFamily.ANOMALY_DETECTION,
- 'id': str(uuid.uuid3(uuid.NAMESPACE_DNS, 'EnsemblePrimitive')),
- })
-
- def __init__(self, *,
- hyperparams: Hyperparams,
- random_seed: int = 0,
- docker_containers: Dict[str, DockerContainer] = None) -> None:
-
- super().__init__(hyperparams=hyperparams, random_seed=random_seed, docker_containers=docker_containers)
-
- # False
- self._clf = Normalizer(
- norm=self.hyperparams['norm'],
- )
-
- self._inputs = None
- self._outputs = None
- self._training_inputs = None
- self._training_outputs = None
- self._target_names = None
- self._training_indices = None
- self._target_column_indices = None
- self._target_columns_metadata: List[OrderedDict] = None
- self._input_column_names = None
- self._fitted = False
-
-
- def set_training_data(self, *, inputs: Inputs) -> None:
- self._inputs = inputs
- self._fitted = False
-
- def fit(self, *, timeout: float = None, iterations: int = None)-> CallResult[None]:
- if self._fitted:
- return CallResult(None)
-
- self._training_inputs, self._training_indices = self._get_columns_to_fit(self._inputs, self.hyperparams)
- self._input_column_names = self._training_inputs.columns
-
- if self._training_inputs is None:
- return CallResult(None)
-
- if len(self._training_indices) > 0:
- self._clf.fit(self._training_inputs)
- self._fitted = True
- else:
- if self.hyperparams['error_on_no_input']:
- raise RuntimeError("No input columns were selected")
- self.logger.warn("No input columns were selected")
- return CallResult(None)
-
- def produce(self, *, inputs: Inputs, timeout: float = None, iterations: int = None) -> base.CallResult[Outputs]:
-
- outputs = inputs
- outputs.columns = ['timestamp','value','system_id','scores']
- # print(outputs)
- # print('max_score')
- # ensemble_max = outputs.groupby('system_id')[outputs.columns[-1]].max()
-
- # print(ensemble_max)
- #
- # print('min_score')
- # ensemble_min = outputs.groupby('system_id')[outputs.columns[-1]].min()
- # print(ensemble_min)
- #
- # print('mean_score')
- # outputs_mean = outputs.groupby('system_id')[outputs.columns[3]].mean()
- # print(outputs_mean)
-
- outputs['results'] = numpy.where(outputs['scores']>0.05, 1, 0)
- print(outputs)
-
- outputs_xy = outputs.groupby('system_id')['results'].sum().reset_index()
- print("*****majority_sum_xy*****")
- print(outputs_xy)
-
- outputs_sum_x = outputs.groupby(['timestamp','system_id'])['results'].sum()
- # outputs_sum_x = outputs.groupby(['system_id','timestamp']).size().reset_index().groupby(['timestamp'])['results'].sum()
-
- outputs_sum_y = outputs.groupby(['system_id','value'])['results'].sum()
-
- print('*****majority_max_x*****')
- print(outputs_sum_x)
- print('*****majority_max_y*****')
- print(outputs_sum_y)
-
- return base.CallResult(outputs)
-
- def _update_metadata(self, outputs):
- outputs.metadata = outputs.metadata.generate(outputs,)
-
- def get_params(self) -> Params:
- if not self._fitted:
- return Params(
- input_column_names=self._input_column_names,
- training_indices_=self._training_indices,
- target_names_=self._target_names,
- target_column_indices_=self._target_column_indices,
- target_columns_metadata_=self._target_columns_metadata
- )
-
- return Params(
- input_column_names=self._input_column_names,
- training_indices_=self._training_indices,
- target_names_=self._target_names,
- target_column_indices_=self._target_column_indices,
- target_columns_metadata_=self._target_columns_metadata
- )
-
- def set_params(self, *, params: Params) -> None:
- self._input_column_names = params['input_column_names']
- self._training_indices = params['training_indices_']
- self._target_names = params['target_names_']
- self._target_column_indices = params['target_column_indices_']
- self._target_columns_metadata = params['target_columns_metadata_']
- self._fitted = True
-
-
-
-
-
- @classmethod
- def _get_columns_to_fit(cls, inputs: Inputs, hyperparams: Hyperparams):
- if not hyperparams['use_semantic_types']:
- return inputs, list(range(len(inputs.columns)))
-
- inputs_metadata = inputs.metadata
-
- def can_produce_column(column_index: int) -> bool:
- return cls._can_produce_column(inputs_metadata, column_index, hyperparams)
-
- columns_to_produce, columns_not_to_produce = base_utils.get_columns_to_use(inputs_metadata,
- use_columns=hyperparams['use_columns'],
- exclude_columns=hyperparams['exclude_columns'],
- can_use_column=can_produce_column)
- return inputs.iloc[:, columns_to_produce], columns_to_produce
- # return columns_to_produce
-
- @classmethod
- def _can_produce_column(cls, inputs_metadata: metadata_base.DataMetadata, column_index: int, hyperparams: Hyperparams) -> bool:
- column_metadata = inputs_metadata.query((metadata_base.ALL_ELEMENTS, column_index))
-
- accepted_structural_types = (int, float, numpy.integer, numpy.float64)
- accepted_semantic_types = set()
- accepted_semantic_types.add("https://metadata.datadrivendiscovery.org/types/Attribute")
- if not issubclass(column_metadata['structural_type'], accepted_structural_types):
- return False
-
- semantic_types = set(column_metadata.get('semantic_types', []))
-
- if len(semantic_types) == 0:
- cls.logger.warning("No semantic types found in column metadata")
- return False
-
- # Making sure all accepted_semantic_types are available in semantic_types
- if len(accepted_semantic_types - semantic_types) == 0:
- return True
-
- return False
-
-
- @classmethod
- def _get_target_columns_metadata(cls, outputs_metadata: metadata_base.DataMetadata, hyperparams) -> List[OrderedDict]:
- outputs_length = outputs_metadata.query((metadata_base.ALL_ELEMENTS,))['dimension']['length']
-
- target_columns_metadata: List[OrderedDict] = []
- for column_index in range(outputs_length):
- column_metadata = OrderedDict(outputs_metadata.query_column(column_index))
-
- # Update semantic types and prepare it for predicted targets.
- semantic_types = set(column_metadata.get('semantic_types', []))
- semantic_types_to_remove = set([])
- add_semantic_types = []
- add_semantic_types.add(hyperparams["return_semantic_type"])
- semantic_types = semantic_types - semantic_types_to_remove
- semantic_types = semantic_types.union(add_semantic_types)
- column_metadata['semantic_types'] = list(semantic_types)
-
- target_columns_metadata.append(column_metadata)
-
- return target_columns_metadata
-
- @classmethod
- def _update_predictions_metadata(cls, inputs_metadata: metadata_base.DataMetadata, outputs: Optional[Outputs],
- target_columns_metadata: List[OrderedDict]) -> metadata_base.DataMetadata:
- outputs_metadata = metadata_base.DataMetadata().generate(value=outputs)
-
- for column_index, column_metadata in enumerate(target_columns_metadata):
- column_metadata.pop("structural_type", None)
- outputs_metadata = outputs_metadata.update_column(column_index, column_metadata)
-
- return outputs_metadata
-
- def _wrap_predictions(self, inputs: Inputs, predictions: ndarray) -> Outputs:
- outputs = d3m_dataframe(predictions, generate_metadata=True)
- target_columns_metadata = self._copy_inputs_metadata(inputs.metadata, self._training_indices, outputs.metadata, self.hyperparams)
- outputs.metadata = self._update_predictions_metadata(inputs.metadata, outputs, target_columns_metadata)
- return outputs
-
-
- @classmethod
- def _copy_inputs_metadata(cls, inputs_metadata: metadata_base.DataMetadata, input_indices: List[int],
- outputs_metadata: metadata_base.DataMetadata, hyperparams):
- outputs_length = outputs_metadata.query((metadata_base.ALL_ELEMENTS,))['dimension']['length']
- target_columns_metadata: List[OrderedDict] = []
- for column_index in input_indices:
- column_name = inputs_metadata.query((metadata_base.ALL_ELEMENTS, column_index)).get("name")
- if column_name is None:
- column_name = "output_{}".format(column_index)
-
- column_metadata = OrderedDict(inputs_metadata.query_column(column_index))
- semantic_types = set(column_metadata.get('semantic_types', []))
- semantic_types_to_remove = set([])
- add_semantic_types = set()
- add_semantic_types.add(hyperparams["return_semantic_type"])
- semantic_types = semantic_types - semantic_types_to_remove
- semantic_types = semantic_types.union(add_semantic_types)
- column_metadata['semantic_types'] = list(semantic_types)
-
- column_metadata["name"] = str(column_name)
- target_columns_metadata.append(column_metadata)
-
- # If outputs has more columns than index, add Attribute Type to all remaining
- if outputs_length > len(input_indices):
- for column_index in range(len(input_indices), outputs_length):
- column_metadata = OrderedDict()
- semantic_types = set()
- semantic_types.add(hyperparams["return_semantic_type"])
- column_name = "output_{}".format(column_index)
- column_metadata["semantic_types"] = list(semantic_types)
- column_metadata["name"] = str(column_name)
- target_columns_metadata.append(column_metadata)
-
- return target_columns_metadata
-
-
- EnsemblePrimitive.__doc__ = Normalizer.__doc__
|