diff --git a/examples/build_System_Wise_Detection_pipeline.py b/examples/build_System_Wise_Detection_pipeline.py new file mode 100644 index 0000000..aa5ea69 --- /dev/null +++ b/examples/build_System_Wise_Detection_pipeline.py @@ -0,0 +1,74 @@ +from d3m import index +from d3m.metadata.base import ArgumentType +from d3m.metadata.pipeline import Pipeline, PrimitiveStep + +# -> dataset_to_dataframe -> column_parser -> extract_columns_by_semantic_types(attributes) -> imputer -> random_forest +# extract_columns_by_semantic_types(targets) -> ^ + +# Creating pipeline +pipeline_description = Pipeline() +pipeline_description.add_input(name='inputs') + +# Step 0: dataset_to_dataframe +step_0 = PrimitiveStep(primitive=index.get_primitive('d3m.primitives.data_transformation.dataset_to_dataframe.Common')) +step_0.add_argument(name='inputs', argument_type=ArgumentType.CONTAINER, data_reference='inputs.0') +step_0.add_output('produce') +pipeline_description.add_step(step_0) + +# Step 1: column_parser +step_1 = PrimitiveStep(primitive=index.get_primitive('d3m.primitives.data_transformation.column_parser.Common')) +step_1.add_argument(name='inputs', argument_type=ArgumentType.CONTAINER, data_reference='steps.0.produce') +step_1.add_output('produce') +pipeline_description.add_step(step_1) + +# Step 2: extract_columns_by_semantic_types(attributes) +step_2 = PrimitiveStep(primitive=index.get_primitive('d3m.primitives.data_transformation.extract_columns_by_semantic_types.Common')) +step_2.add_argument(name='inputs', argument_type=ArgumentType.CONTAINER, data_reference='steps.1.produce') +step_2.add_output('produce') +step_2.add_hyperparameter(name='semantic_types', argument_type=ArgumentType.VALUE, + data=['https://metadata.datadrivendiscovery.org/types/Attribute']) +pipeline_description.add_step(step_2) + +# Step 3: extract_columns_by_semantic_types(targets) +step_3 = PrimitiveStep(primitive=index.get_primitive('d3m.primitives.data_transformation.extract_columns_by_semantic_types.Common')) +step_3.add_argument(name='inputs', argument_type=ArgumentType.CONTAINER, data_reference='steps.0.produce') +step_3.add_output('produce') +step_3.add_hyperparameter(name='semantic_types', argument_type=ArgumentType.VALUE, + data=['https://metadata.datadrivendiscovery.org/types/TrueTarget']) +pipeline_description.add_step(step_3) + +attributes = 'steps.2.produce' +targets = 'steps.3.produce' + +# Step 4: auto encoder +step_4 = PrimitiveStep(primitive=index.get_primitive('d3m.primitives.tods.detection_algorithm.pyod_ae')) +step_4.add_argument(name='inputs', argument_type=ArgumentType.CONTAINER, data_reference=attributes) +step_4.add_output('produce_score') +#step_4.add_hyperparameter(name='use_columns', argument_type=ArgumentType.VALUE, data=[2]) +#step_4.add_hyperparameter(name='use_semantic_types', argument_type=ArgumentType.VALUE, data=True) +step_4.add_hyperparameter(name='return_result', argument_type=ArgumentType.VALUE, data='append') +pipeline_description.add_step(step_4) + +# Step 5: ensemble +step_5 = PrimitiveStep(primitive=index.get_primitive('d3m.primitives.tods.detection_algorithm.system_wise_detection')) +step_5.add_argument(name='inputs', argument_type=ArgumentType.CONTAINER, data_reference='steps.4.produce_score') +step_5.add_hyperparameter(name='return_result', argument_type=ArgumentType.VALUE, data='new') + +step_5.add_output('produce') +pipeline_description.add_step(step_5) + + +# Final Output +pipeline_description.add_output(name='output predictions', data_reference='steps.5.produce') + +# Output to YAML +#yaml = pipeline_description.to_yaml() +#with open('pipeline.yml', 'w') as f: +# f.write(yaml) +#prin(yaml) + +# Output to json +data = pipeline_description.to_json() +with open('example_pipeline.json', 'w') as f: + f.write(data) + print(data) diff --git a/tods/detection_algorithm/SystemWiseDetection.py b/tods/detection_algorithm/SystemWiseDetection.py new file mode 100644 index 0000000..d505522 --- /dev/null +++ b/tods/detection_algorithm/SystemWiseDetection.py @@ -0,0 +1,465 @@ +import os +from typing import Any,Optional,List +import statsmodels.api as sm +import numpy as np +from d3m import container, utils as d3m_utils +from d3m import utils + +from numpy import ndarray +from collections import OrderedDict +from scipy import sparse +import os + +import numpy +import typing +import time + +from d3m import container +from d3m.primitive_interfaces import base, transformer + +from d3m.container import DataFrame as d3m_dataframe +from d3m.metadata import hyperparams, params, base as metadata_base + +from d3m.base import utils as base_utils +from d3m.exceptions import PrimitiveNotFittedError + +__all__ = ('SystemWiseDetectionPrimitive',) + +Inputs = container.DataFrame +Outputs = container.DataFrame + +class Params(params.Params): + #to-do : how to make params dynamic + use_column_names: Optional[Any] + + + +class Hyperparams(hyperparams.Hyperparams): + + #Tuning Parameter + #default -1 considers entire time series is considered + window_size = hyperparams.Hyperparameter(default=10, semantic_types=[ + 'https://metadata.datadrivendiscovery.org/types/TuningParameter', + ], description="Window Size for decomposition") + + method_type = hyperparams.Enumeration( + values=['max', 'avg', 'sliding_window_sum','majority_voting_sliding_window_sum','majority_voting_sliding_window_max'], + default='majority_voting_sliding_window_max', + semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], + description="The type of method used to find anomalous system", + ) + contamination = hyperparams.Uniform( + lower=0., + upper=0.5, + default=0.1, + description='The amount of contamination of the data set, i.e. the proportion of outliers in the data set. ', + semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter'] + ) + + #control parameter + use_columns = hyperparams.Set( + elements=hyperparams.Hyperparameter[int](-1), + default=(), + semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], + description="A set of column indices to force primitive to operate on. If any specified column cannot be parsed, it is skipped.", + ) + exclude_columns = hyperparams.Set( + elements=hyperparams.Hyperparameter[int](-1), + default=(), + semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], + description="A set of column indices to not operate on. Applicable only if \"use_columns\" is not provided.", + ) + return_result = hyperparams.Enumeration( + values=['append', 'replace', 'new'], + default='new', + semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], + description="Should parsed columns be appended, should they replace original columns, or should only parsed columns be returned? This hyperparam is ignored if use_semantic_types is set to false.", + ) + use_semantic_types = hyperparams.UniformBool( + default=False, + semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], + description="Controls whether semantic_types metadata will be used for filtering columns in input dataframe. Setting this to false makes the code ignore return_result and will produce only the output dataframe" + ) + add_index_columns = hyperparams.UniformBool( + default=False, + semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], + description="Also include primary index columns if input data has them. Applicable only if \"return_result\" is set to \"new\".", + ) + error_on_no_input = hyperparams.UniformBool( + default=True, + semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], + description="Throw an exception if no input column is selected/provided. Defaults to true to behave like sklearn. To prevent pipelines from breaking set this to False.", + ) + + return_semantic_type = hyperparams.Enumeration[str]( + values=['https://metadata.datadrivendiscovery.org/types/Attribute', + 'https://metadata.datadrivendiscovery.org/types/ConstructedAttribute'], + default='https://metadata.datadrivendiscovery.org/types/Attribute', + description='Decides what semantic type to attach to generated attributes', + semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'] + ) + + + +class SystemWiseDetectionPrimitive(transformer.TransformerPrimitiveBase[Inputs, Outputs, Hyperparams]): + """ + Primitive to find abs_energy of time series + """ + + __author__ = "DATA Lab at Texas A&M University", + metadata = metadata_base.PrimitiveMetadata( + { + 'id': '3726fa29-28c5-4529-aec5-2f8b4ff2ef9e', + 'version': '0.1.0', + 'name': 'Sytem_Wise_Anomaly_Detection_Primitive', + 'python_path': 'd3m.primitives.tods.detection_algorithm.system_wise_detection', + 'keywords': ['Time Series','Anomalous System '], + "hyperparams_to_tune": ['window_size','method_type','contamination'], + 'source': { + 'name': 'DATA Lab at Texas A&M University', + 'uris': ['https://gitlab.com/lhenry15/tods.git','https://gitlab.com/lhenry15/tods/-/blob/devesh/tods/feature_analysis/StatisticalAbsEnergy.py'], + 'contact': 'mailto:khlai037@tamu.edu' + + }, + 'installation': [ + {'type': metadata_base.PrimitiveInstallationType.PIP, + 'package_uri': 'git+https://gitlab.com/lhenry15/tods.git@{git_commit}#egg=TODS'.format( + git_commit=d3m_utils.current_git_commit(os.path.dirname(__file__)), + ), + } + + ], + 'algorithm_types': [ + metadata_base.PrimitiveAlgorithmType.DATA_PROFILING, + ], + 'primitive_family': metadata_base.PrimitiveFamily.ANOMALY_DETECTION, + + } + ) + + def __init__(self, *, hyperparams: Hyperparams) -> None: + super().__init__(hyperparams=hyperparams) + self.primitiveNo = 0 + + def produce(self, *, inputs: Inputs, timeout: float = None, iterations: int = None) -> base.CallResult[Outputs]: + """ + + Args: + inputs: Container DataFrame + timeout: Default + iterations: Default + + Returns: + Container DataFrame containing abs_energy of time series + """ + + self.logger.info('System wise Detection Input Primitive called') + + + # Get cols to fit. + self._fitted = False + self._training_inputs, self._training_indices = self._get_columns_to_fit(inputs, self.hyperparams) + self._input_column_names = self._training_inputs.columns + + if len(self._training_indices) > 0: + # self._clf.fit(self._training_inputs) + self._fitted = True + else: + if self.hyperparams['error_on_no_input']: + raise RuntimeError("No input columns were selected") + self.logger.warn("No input columns were selected") + + if not self._fitted: + raise PrimitiveNotFittedError("Primitive not fitted.") + system_wise_detection_input = inputs + if self.hyperparams['use_semantic_types']: + system_wise_detection_input = inputs.iloc[:, self._training_indices] + output_columns = [] + if len(self._training_indices) > 0: + system_wise_detection_output = self._system_wise_detection(system_wise_detection_input,self.hyperparams["method_type"],self.hyperparams["window_size"],self.hyperparams["contamination"]) + outputs = system_wise_detection_output + + + if sparse.issparse(system_wise_detection_output): + system_wise_detection_output = system_wise_detection_output.toarray() + outputs = self._wrap_predictions(inputs, system_wise_detection_output) + + #if len(outputs.columns) == len(self._input_column_names): + # outputs.columns = self._input_column_names + + output_columns = [outputs] + + + else: + if self.hyperparams['error_on_no_input']: + raise RuntimeError("No input columns were selected") + self.logger.warn("No input columns were selected") + + + self.logger.info('System wise Detection Primitive returned') + outputs = base_utils.combine_columns(return_result=self.hyperparams['return_result'], + add_index_columns=self.hyperparams['add_index_columns'], + inputs=inputs, column_indices=self._training_indices, + columns_list=output_columns) + return base.CallResult(outputs) + + @classmethod + def _get_columns_to_fit(cls, inputs: Inputs, hyperparams: Hyperparams): + """ + Select columns to fit. + Args: + inputs: Container DataFrame + hyperparams: d3m.metadata.hyperparams.Hyperparams + + Returns: + list + """ + if not hyperparams['use_semantic_types']: + return inputs, list(range(len(inputs.columns))) + + inputs_metadata = inputs.metadata + + def can_produce_column(column_index: int) -> bool: + return cls._can_produce_column(inputs_metadata, column_index, hyperparams) + + use_columns = hyperparams['use_columns'] + exclude_columns = hyperparams['exclude_columns'] + + columns_to_produce, columns_not_to_produce = base_utils.get_columns_to_use(inputs_metadata, + use_columns=use_columns, + exclude_columns=exclude_columns, + can_use_column=can_produce_column) + return inputs.iloc[:, columns_to_produce], columns_to_produce + # return columns_to_produce + + @classmethod + def _can_produce_column(cls, inputs_metadata: metadata_base.DataMetadata, column_index: int, + hyperparams: Hyperparams) -> bool: + """ + Output whether a column can be processed. + Args: + inputs_metadata: d3m.metadata.base.DataMetadata + column_index: int + + Returns: + bool + """ + column_metadata = inputs_metadata.query((metadata_base.ALL_ELEMENTS, column_index)) + + accepted_structural_types = (int, float, numpy.integer, numpy.float64) + accepted_semantic_types = set() + accepted_semantic_types.add("https://metadata.datadrivendiscovery.org/types/Attribute") + if not issubclass(column_metadata['structural_type'], accepted_structural_types): + return False + + semantic_types = set(column_metadata.get('semantic_types', [])) + return True + if len(semantic_types) == 0: + cls.logger.warning("No semantic types found in column metadata") + return False + + # Making sure all accepted_semantic_types are available in semantic_types + if len(accepted_semantic_types - semantic_types) == 0: + return True + + return False + + @classmethod + def _update_predictions_metadata(cls, inputs_metadata: metadata_base.DataMetadata, outputs: Optional[Outputs], + target_columns_metadata: List[OrderedDict]) -> metadata_base.DataMetadata: + """ + Updata metadata for selected columns. + Args: + inputs_metadata: metadata_base.DataMetadata + outputs: Container Dataframe + target_columns_metadata: list + + Returns: + d3m.metadata.base.DataMetadata + """ + outputs_metadata = metadata_base.DataMetadata().generate(value=outputs) + + for column_index, column_metadata in enumerate(target_columns_metadata): + column_metadata.pop("structural_type", None) + outputs_metadata = outputs_metadata.update_column(column_index, column_metadata) + + return outputs_metadata + + def _wrap_predictions(self, inputs: Inputs, predictions: ndarray) -> Outputs: + """ + Wrap predictions into dataframe + Args: + inputs: Container Dataframe + predictions: array-like data (n_samples, n_features) + + Returns: + Dataframe + """ + outputs = d3m_dataframe(predictions, generate_metadata=True) + target_columns_metadata = self._add_target_columns_metadata(outputs.metadata, self.hyperparams,self.primitiveNo) + outputs.metadata = self._update_predictions_metadata(inputs.metadata, outputs, target_columns_metadata) + + return outputs + + @classmethod + def _add_target_columns_metadata(cls, outputs_metadata: metadata_base.DataMetadata, hyperparams, primitiveNo): + """ + Add target columns metadata + Args: + outputs_metadata: metadata.base.DataMetadata + hyperparams: d3m.metadata.hyperparams.Hyperparams + + Returns: + List[OrderedDict] + """ + outputs_length = outputs_metadata.query((metadata_base.ALL_ELEMENTS,))['dimension']['length'] + target_columns_metadata: List[OrderedDict] = [] + for column_index in range(outputs_length): + column_name = "{0}{1}_{2}".format(cls.metadata.query()['name'], primitiveNo, column_index) + column_metadata = OrderedDict() + semantic_types = set() + semantic_types.add(hyperparams["return_semantic_type"]) + column_metadata['semantic_types'] = list(semantic_types) + + column_metadata["name"] = str(column_name) + target_columns_metadata.append(column_metadata) + + return target_columns_metadata + + def _write(self, inputs: Inputs): + inputs.to_csv(str(time.time()) + '.csv') + + + + + def _system_wise_detection(self,X,method_type,window_size,contamination): + systemIds = X.system_id.unique() + groupedX = X.groupby(X.system_id) + + transformed_X = [] + if(method_type=="max"): + """ + Sytems are sorted based on maximum of reconstruction errors" + """ + maxOutlierScorePerSystemList = [] + for systemId in systemIds: + systemDf = groupedX.get_group(systemId) + maxOutlierScorePerSystemList.append(np.max(np.abs(systemDf["value_0"].values))) + + ranking = np.sort(maxOutlierScorePerSystemList) + threshold = ranking[int((1 - contamination) * len(ranking))] + self.threshold = threshold + mask = (maxOutlierScorePerSystemList >= threshold) + ranking[mask] = 1 + ranking[np.logical_not(mask)] = 0 + for iter in range(len(systemIds)): + transformed_X.append([systemIds[iter],ranking[iter]]) + + if (method_type == "avg"): + """ + Sytems are sorted based on average of reconstruction errors" + """ + avgOutlierScorePerSystemList = [] + for systemId in systemIds: + systemDf = groupedX.get_group(systemId) + avgOutlierScorePerSystemList.append(np.mean(np.abs(systemDf["value_0"].values))) + + ranking = np.sort(avgOutlierScorePerSystemList) + threshold = ranking[int((1 - contamination) * len(ranking))] + self.threshold = threshold + mask = (avgOutlierScorePerSystemList >= threshold) + ranking[mask] = 1 + ranking[np.logical_not(mask)] = 0 + for iter in range(len(systemIds)): + transformed_X.append([systemIds[iter], ranking[iter]]) + + if (method_type == "sliding_window_sum"): + """ + Sytems are sorted based on max of max of reconstruction errors in each window" + """ + OutlierScorePerSystemList = [] + for systemId in systemIds: + systemDf = groupedX.get_group(systemId) + column_value = systemDf["value_0"].values + column_score = np.zeros(len(column_value)) + for iter in range(window_size - 1, len(column_value)): + sequence = column_value[iter - window_size + 1:iter + 1] + column_score[iter] = np.sum(np.abs(sequence)) + column_score[:window_size - 1] = column_score[window_size - 1] + OutlierScorePerSystemList.append(column_score.tolist()) + OutlierScorePerSystemList = np.asarray(OutlierScorePerSystemList) + + maxOutlierScorePerSystemList = OutlierScorePerSystemList.max(axis=1).tolist() + + ranking = np.sort(maxOutlierScorePerSystemList) + threshold = ranking[int((1 - contamination) * len(ranking))] + self.threshold = threshold + mask = (maxOutlierScorePerSystemList >= threshold) + ranking[mask] = 1 + ranking[np.logical_not(mask)] = 0 + for iter in range(len(systemIds)): + transformed_X.append([systemIds[iter], ranking[iter]]) + + if (method_type == "majority_voting_sliding_window_sum"): + """ + Sytem with most vote based on max of sum of reconstruction errors in each window + """ + OutlierScorePerSystemList = [] + for systemId in systemIds: + systemDf = groupedX.get_group(systemId) + column_value = systemDf["value_0"].values + column_score = np.zeros(len(column_value)) + for iter in range(window_size - 1, len(column_value)): + sequence = column_value[iter - window_size + 1:iter + 1] + column_score[iter] = np.sum(np.abs(sequence)) + column_score[:window_size - 1] = column_score[window_size - 1] + OutlierScorePerSystemList.append(column_score.tolist()) + OutlierScorePerSystemList = np.asarray(OutlierScorePerSystemList) + OutlierScorePerSystemList = ( + OutlierScorePerSystemList == OutlierScorePerSystemList.max(axis=0)[None, :]).astype(int) + + maxOutlierScorePerSystemList = OutlierScorePerSystemList.sum(axis=1).tolist() + + ranking = np.sort(maxOutlierScorePerSystemList) + threshold = ranking[int((1 - contamination) * len(ranking))] + self.threshold = threshold + mask = (maxOutlierScorePerSystemList >= threshold) + ranking[mask] = 1 + ranking[np.logical_not(mask)] = 0 + for iter in range(len(systemIds)): + transformed_X.append([systemIds[iter], ranking[iter]]) + + if (method_type == "majority_voting_sliding_window_max"): + """ + Sytem with most vote based on max of max of reconstruction errors in each window + """ + OutlierScorePerSystemList = [] + for systemId in systemIds: + systemDf = groupedX.get_group(systemId) + column_value = systemDf["value_0"].values + column_score = np.zeros(len(column_value)) + for iter in range(window_size - 1, len(column_value)): + sequence = column_value[iter - window_size + 1:iter + 1] + column_score[iter] = np.max(np.abs(sequence)) + column_score[:window_size - 1] = column_score[window_size - 1] + OutlierScorePerSystemList.append(column_score.tolist()) + OutlierScorePerSystemList = np.asarray(OutlierScorePerSystemList) + OutlierScorePerSystemList = ( + OutlierScorePerSystemList == OutlierScorePerSystemList.max(axis=0)[None, :]).astype(int) + + maxOutlierScorePerSystemList = OutlierScorePerSystemList.sum(axis=1).tolist() + + ranking = np.sort(maxOutlierScorePerSystemList) + threshold = ranking[int((1 - contamination) * len(ranking))] + self.threshold = threshold + mask = (maxOutlierScorePerSystemList >= threshold) + ranking[mask] = 1 + ranking[np.logical_not(mask)] = 0 + for iter in range(len(systemIds)): + transformed_X.append([systemIds[iter], ranking[iter]]) + + return transformed_X + + + + diff --git a/tods/resources/.entry_points.ini b/tods/resources/.entry_points.ini index a3767f2..f51c82a 100644 --- a/tods/resources/.entry_points.ini +++ b/tods/resources/.entry_points.ini @@ -79,3 +79,6 @@ tods.detection_algorithm.telemanom = tods.detection_algorithm.Telemanom:Telemano tods.detection_algorithm.Ensemble = tods.detection_algorithm.Ensemble:Ensemble tods.reinforcement.rule_filter = tods.reinforcement.RuleBasedFilter:RuleBasedFilter + + +tods.detection_algorithm.system_wise_detection = tods.detection_algorithm.SystemWiseDetection:SystemWiseDetectionPrimitive \ No newline at end of file