@@ -27,9 +27,9 @@ from d3m.primitive_interfaces.base import CallResult, DockerContainer
from d3m.primitive_interfaces.unsupervised_learning import UnsupervisedLearnerPrimitiveBase
from d3m.primitive_interfaces.unsupervised_learning import UnsupervisedLearnerPrimitiveBase
from d3m.primitive_interfaces.transformer import TransformerPrimitiveBase
from d3m.primitive_interfaces.transformer import TransformerPrimitiveBase
from .UODBasePrimitive import Params_ODBase, Hyperparams_ODBase, UnsupervisedOutlierDetectorBase
from detection_algorithm .UODBasePrimitive import Params_ODBase, Hyperparams_ODBase, UnsupervisedOutlierDetectorBase
from .core.CollectiveBase import CollectiveBaseDetector
from detection_algorithm .core.CollectiveBase import CollectiveBaseDetector
from sklearn.utils import check_array
from sklearn.utils import check_array
@@ -39,9 +39,9 @@ from d3m import exceptions
# from detection_algorithm.UODBasePrimitive import Params_ODBase, Hyperparams_ODBase, UnsupervisedOutlierDetectorBase
# from detection_algorithm.UODBasePrimitive import Params_ODBase, Hyperparams_ODBase, UnsupervisedOutlierDetectorBase
from .core.utils.errors import Errors
from .core.utils.channel import Channel
from .core.utils.modeling import Model
from detection_algorithm .core.utils.errors import Errors
from detection_algorithm .core.utils.channel import Channel
from detection_algorithm .core.utils.modeling import Model
# from pyod.models.base import BaseDetector
# from pyod.models.base import BaseDetector
@@ -53,394 +53,396 @@ Inputs = container.DataFrame
Outputs = container.DataFrame
Outputs = container.DataFrame
class Params(Params_ODBase):
class Params(Params_ODBase):
######## Add more Attributes #######
######## Add more Attributes #######
pass
pass
class Hyperparams(Hyperparams_ODBase):
class Hyperparams(Hyperparams_ODBase):
smoothing_perc = hyperparams.Hyperparameter[float](
default=0.05,
semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter'],
description="determines window size used in EWMA smoothing (percentage of total values for channel)"
)
smoothing_perc = hyperparams.Hyperparameter[float](
default=0.05,
semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter'],
description="determines window size used in EWMA smoothing (percentage of total values for channel)"
)
window_size_ = hyperparams.Hyperparameter[int](
default=100,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="number of trailing batches to use in error calculation"
)
error_buffer = hyperparams.Hyperparameter[int](
default=50,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="number of values surrounding an error that are brought into the sequence (promotes grouping on nearby sequences"
)
batch_size = hyperparams.Hyperparameter[int](
default=70,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Batch size while predicting"
)
window_size_ = hyperparams.Hyperparameter[int](
default=100,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="number of trailing batches to use in error calculation"
)
error_buffer = hyperparams.Hyperparameter[int](
default=50,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="number of values surrounding an error that are brought into the sequence (promotes grouping on nearby sequences"
)
batch_size = hyperparams.Hyperparameter[int](
default=70,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Batch size while predicting"
)
# LSTM Model Parameters
dropout = hyperparams.Hyperparameter[float](
default=0.3,
semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter'],
description="Dropout rate"
)
validation_split = hyperparams.Hyperparameter[float](
default=0.2,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Validation split"
)
# LSTM Model Parameters
optimizer = hyperparams.Hyperparameter[typing.Union[str, None]](
default='Adam',
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Optimizer"
)
dropout = hyperparams.Hyperparameter[float](
default=0.3,
semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter'],
description="Dropout rate"
)
lstm_batch_size = hyperparams.Hyperparameter[int](
default=64,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="lstm model training batch size"
)
validation_split = hyperparams.Hyperparameter[floa t](
default=0.2 ,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Validation split "
)
optimizer = hyperparams.Hyperparameter[typing.Union[str, None]](
default='Adam',
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Optimizer"
)
loss_metric = hyperparams.Hyperparameter[typing.Union[str, None]](
default='mean_squared_error',
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="loss function"
)
lstm_batch_size = hyperparams.Hyperparameter[int](
default=64,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="lstm model training batch size"
)
layers = hyperparams.List(
elements=hyperparams.Hyperparameter[int](1),
default=[10,10],
semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter'],
description="No of units for the 2 lstm layers"
)
# Training Parameters
loss_metric = hyperparams.Hyperparameter[typing.Union[str, None]](
default='mean_squared_error',
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="loss function"
)
epochs = hyperparams.Hyperparameter[int](
default=1,
semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter'],
description="Epoch"
)
patience = hyperparams.Hyperparameter[int](
default=10,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Number of consequetive training iterations to allow without decreasing the val_loss by at least min_delta"
)
layers = hyperparams.List(
elements=hyperparams.Hyperparameter[int](1),
default=[10,10],
semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter'],
description="No of units for the 2 lstm layers"
)
min_delta = hyperparams.Hyperparameter[float](
default=0.0003,
semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter'],
description="Number of consequetive training iterations to allow without decreasing the val_loss by at least min_delta"
)
# Training Parameters
epochs = hyperparams.Hyperparameter[int](
default=1,
semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter'],
description="Epoch"
)
l_s = hyperparams.Hyperparameter[int](
default=100,
semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter'],
description="num previous timesteps provided to model to predict future values"
)
patience = hyperparams.Hyperparameter[int](
default=1 0,
semantic_types=['https://metadata.datadrivendiscovery.org/types/Control Parameter'],
description="Number of consequetive training iterations to allow without decreasing the val_loss by at least min_delta "
)
n_predictions = hyperparams.Hyperparameter[in t](
default=10 ,
semantic_types=['https://metadata.datadrivendiscovery.org/types/Control Parameter'],
description="number of steps ahead to predict "
)
min_delta = hyperparams.Hyperparameter[floa t](
default=0.0003 ,
semantic_types=['https://metadata.datadrivendiscovery.org/types/Tuning Parameter'],
description="Number of consequetive training iterations to allow without decreasing the val_loss by at least min_delta "
)
# Error thresholding parameters
# ==================================
p = hyperparams.Hyperparameter[float](
default=0.05,
semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter'],
description="minimum percent decrease between max errors in anomalous sequences (used for pruning)"
)
# Contamination
l_s = hyperparams.Hyperparameter[int](
default=100,
semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter'],
description="num previous timesteps provided to model to predict future values"
)
contamination = hyperparams.Uniform(
lower=0.,
upper=0.5,
default=0.1,
description='the amount of contamination of the data set, i.e.the proportion of outliers in the data set. Used when fitting to define the threshold on the decision function',
semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter']
)
n_predictions = hyperparams.Hyperparameter[int](
default=10,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="number of steps ahead to predict"
)
# Error thresholding parameters
# ==================================
p = hyperparams.Hyperparameter[float](
default=0.05,
semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter'],
description="minimum percent decrease between max errors in anomalous sequences (used for pruning)"
)
# Contamination
contamination = hyperparams.Uniform(
lower=0.,
upper=0.5,
default=0.1,
description='the amount of contamination of the data set, i.e.the proportion of outliers in the data set. Used when fitting to define the threshold on the decision function',
semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter']
)
class TelemanomPrimitive(UnsupervisedOutlierDetectorBase[Inputs, Outputs, Params, Hyperparams]):
class TelemanomPrimitive(UnsupervisedOutlierDetectorBase[Inputs, Outputs, Params, Hyperparams]):
"""
A primitive that uses telmanom for outlier detection
Parameters
----------
"""
__author__ = "Data Lab"
metadata = metadata_base.PrimitiveMetadata(
{
'__author__' : "DATA Lab at Texas A&M University",
'name': "Telemanom",
'python_path': 'd3m.primitives.tods.detection_algorithm.telemanom',
'source': {
'name': 'DATA Lab at Texas A&M University',
'contact': 'mailto:khlai037@tamu.edu',
'uris': [
'https://gitlab.com/lhenry15/tods.git',
'https://gitlab.com/lhenry15/tods/-/blob/purav/anomaly-primitives/anomaly_primitives/telemanom.py',
],
},
'algorithm_types': [
metadata_base.PrimitiveAlgorithmType.TELEMANOM,
],
'primitive_family': metadata_base.PrimitiveFamily.ANOMALY_DETECTION,
'id': 'c7259da6-7ce6-42ad-83c6-15238679f5fa',
'hyperparameters_to_tune':['layers','loss_metric','optimizer','epochs','p','l_s','patience','min_delta','dropout','smoothing_perc'],
'version': '0.0.1',
},
)
def __init__(self, *,
hyperparams: Hyperparams, #
random_seed: int = 0,
docker_containers: Dict[str, DockerContainer] = None) -> None:
super().__init__(hyperparams=hyperparams, random_seed=random_seed, docker_containers=docker_containers)
self._clf = Detector(smoothing_perc=self.hyperparams['smoothing_perc'],
window_size=self.hyperparams['window_size_'],
error_buffer=self.hyperparams['error_buffer'],
batch_size = self.hyperparams['batch_size'],
validation_split = self.hyperparams['validation_split'],
optimizer = self.hyperparams['optimizer'],
lstm_batch_size = self.hyperparams['lstm_batch_size'],
loss_metric = self.hyperparams['loss_metric'],
layers = self.hyperparams['layers'],
epochs = self.hyperparams['epochs'],
patience = self.hyperparams['patience'],
min_delta = self.hyperparams['min_delta'],
l_s = self.hyperparams['l_s'],
n_predictions = self.hyperparams['n_predictions'],
p = self.hyperparams['p'],
contamination=hyperparams['contamination']
)
def set_training_data(self, *, inputs: Inputs) -> None:
"""
Set training data for outlier detection.
Args:
inputs: Container DataFrame
Returns:
None
"""
super().set_training_data(inputs=inputs)
def fit(self, *, timeout: float = None, iterations: int = None) -> CallResult[None]:
"""
Fit model with training data.
Args:
*: Container DataFrame. Time series data up to fit.
Returns:
None
"""
return super().fit()
def produce(self, *, inputs: Inputs, timeout: float = None, iterations: int = None) -> CallResult[Outputs]:
"""
Process the testing data.
Args:
inputs: Container DataFrame. Time series data up to outlier detection.
Returns:
Container DataFrame
1 marks Outliers, 0 marks normal.
"""
return super().produce(inputs=inputs, timeout=timeout, iterations=iterations)
def produce_score(self, *, inputs: Inputs, timeout: float = None, iterations: int = None) -> CallResult[Outputs]:
"""
Process the testing data.
Args:
inputs: Container DataFrame. Time series data up to outlier detection.
Returns:
Container DataFrame
Outlier score of input DataFrame.
"""
return super().produce_score(inputs=inputs, timeout=timeout, iterations=iterations)
def get_params(self) -> Params:
"""
Return parameters.
Args:
None
Returns:
class Params
"""
return super().get_params()
def set_params(self, *, params: Params) -> None:
"""
Set parameters for outlier detection.
Args:
params: class Params
Returns:
None
"""
super().set_params(params=params)
"""
A primitive that uses telmanom for outlier detection
Parameters
----------
"""
__author__ = "Data Lab"
metadata = metadata_base.PrimitiveMetadata(
{
'__author__' : "DATA Lab at Texas A&M University",
'name': "Telemanom",
'python_path': 'd3m.primitives.tods.detection_algorithm.telemanom',
'source': {
'name': 'DATA Lab at Texas A&M University',
'contact': 'mailto:khlai037@tamu.edu',
'uris': [
'https://gitlab.com/lhenry15/tods.git',
'https://gitlab.com/lhenry15/tods/-/blob/purav/anomaly-primitives/anomaly_primitives/telemanom.py',
],
},
'algorithm_types': [
metadata_base.PrimitiveAlgorithmType.TELEMANOM,
],
'primitive_family': metadata_base.PrimitiveFamily.ANOMALY_DETECTION,
'id': 'c7259da6-7ce6-42ad-83c6-15238679f5fa',
'hyperparameters_to_tune':['layers','loss_metric','optimizer','epochs','p','l_s','patience','min_delta','dropout','smoothing_perc'],
'version': '0.0.1',
},
)
def __init__(self, *,
hyperparams: Hyperparams, #
random_seed: int = 0,
docker_containers: Dict[str, DockerContainer] = None) -> None:
super().__init__(hyperparams=hyperparams, random_seed=random_seed, docker_containers=docker_containers)
self._clf = Detector(smoothing_perc=self.hyperparams['smoothing_perc'],
window_size=self.hyperparams['window_size_'],
error_buffer=self.hyperparams['error_buffer'],
batch_size = self.hyperparams['batch_size'],
validation_split = self.hyperparams['validation_split'],
optimizer = self.hyperparams['optimizer'],
lstm_batch_size = self.hyperparams['lstm_batch_size'],
loss_metric = self.hyperparams['loss_metric'],
layers = self.hyperparams['layers'],
epochs = self.hyperparams['epochs'],
patience = self.hyperparams['patience'],
min_delta = self.hyperparams['min_delta'],
l_s = self.hyperparams['l_s'],
n_predictions = self.hyperparams['n_predictions'],
p = self.hyperparams['p'],
contamination=hyperparams['contamination']
)
def set_training_data(self, *, inputs: Inputs) -> None:
"""
Set training data for outlier detection.
Args:
inputs: Container DataFrame
Returns:
None
"""
super().set_training_data(inputs=inputs)
def fit(self, *, timeout: float = None, iterations: int = None) -> CallResult[None]:
"""
Fit model with training data.
Args:
*: Container DataFrame. Time series data up to fit.
Returns:
None
"""
return super().fit()
def produce(self, *, inputs: Inputs, timeout: float = None, iterations: int = None) -> CallResult[Outputs]:
"""
Process the testing data.
Args:
inputs: Container DataFrame. Time series data up to outlier detection.
Returns:
Container DataFrame
1 marks Outliers, 0 marks normal.
"""
return super().produce(inputs=inputs, timeout=timeout, iterations=iterations)
def produce_score(self, *, inputs: Inputs, timeout: float = None, iterations: int = None) -> CallResult[Outputs]:
"""
Process the testing data.
Args:
inputs: Container DataFrame. Time series data up to outlier detection.
Returns:
Container DataFrame
Outlier score of input DataFrame.
"""
return super().produce_score(inputs=inputs, timeout=timeout, iterations=iterations)
def get_params(self) -> Params:
"""
Return parameters.
Args:
None
Returns:
class Params
"""
return super().get_params()
def set_params(self, *, params: Params) -> None:
"""
Set parameters for outlier detection.
Args:
params: class Params
Returns:
None
"""
super().set_params(params=params)
class Detector(CollectiveBaseDetector):
class Detector(CollectiveBaseDetector):
"""Class to Implement Deep Log LSTM based on "https://www.cs.utah.edu/~lifeifei/papers/deeplog.pdf
Only Parameter Value anomaly detection layer has been implemented for time series data"""
def __init__(self,smoothing_perc=0.05,window_size = 10,error_buffer = 5,batch_size =30, \
dropout = 0.3, validation_split=0.2,optimizer='adam',lstm_batch_size=64,loss_metric='mean_squared_error', \
layers=[40,40],epochs = 1,patience =10,min_delta=0.0003,l_s=5,n_predictions=2,p = 0.05,contamination=0.1):
# super(Detector, self).__init__(contamination=contamination)
super(Detector, self).__init__(contamination=contamination,
window_size=l_s,
step_size=1,
)
self._smoothin_perc = smoothing_perc
self._window_size =window_size
self._error_buffer = error_buffer
self._batch_size = batch_size
self._dropout = dropout
self._validation_split = validation_split
self._optimizer = optimizer
self._lstm_batch_size = lstm_batch_size
self._loss_metric = loss_metric
self._layers = layers
self._epochs = epochs
self._patience = patience
self._min_delta = min_delta
self._l_s = l_s
self._n_predictions = n_predictions
self._p = p
self.contamination = contamination
# self.y_hat = None
self.results = []
self.result_df = None
self._model = None
self._channel = None
def fit(self,X,y=None):
"""
Fit data to LSTM model.
Args:
inputs : X , ndarray of size (number of sample,features)
Returns:
return : self object with trained model
"""
X = check_array(X).astype(np.float)
self._set_n_classes(None)
inputs = X
self._channel = Channel(n_predictions = self._n_predictions,l_s = self._l_s)
self._channel.shape_train_data(inputs)
self._model = Model(self._channel,patience = self._patience,
min_delta =self._min_delta,
layers = self._layers,
dropout = self._dropout,
n_predictions = self._n_predictions,
loss_metric = self._loss_metric,
optimizer = self._optimizer,
lstm_batch_size = self._lstm_batch_size,
epochs = self._epochs,
validation_split = self._validation_split,
batch_size = self._batch_size,
l_s = self._l_s
)
self.decision_scores_, self.left_inds_, self.right_inds_ = self.decision_function(X)
self._process_decision_scores()
return self
def decision_function(self, X: np.array):
"""Predict raw anomaly scores of X using the fitted detector.
The anomaly score of an input sample is computed based on the fitted
detector. For consistency, outliers are assigned with
higher anomaly scores.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples. Sparse matrices are accepted only
if they are supported by the base estimator.
Returns
-------
anomaly_scores : numpy array of shape (n_samples,)
The anomaly score of the input samples.
"""
X = check_array(X).astype(np.float)
self._set_n_classes(None)
inputs = X
self._channel.shape_test_data(inputs)
self._channel = self._model.batch_predict(channel = self._channel)
errors = Errors(channel = self._channel,
window_size = self._window_size,
batch_size = self._batch_size,
smoothing_perc = self._smoothin_perc,
n_predictions = self._n_predictions,
l_s = self._l_s,
error_buffer = self._error_buffer,
p = self._p
)
# prediciton smoothed error
prediction_errors = np.reshape(errors.e_s,(self._channel.X_test.shape[0],self._channel.X_test.shape[2]))
prediction_errors = np.sum(prediction_errors,axis=1)
left_indices = []
right_indices = []
scores = []
for i in range(len(prediction_errors)):
left_indices.append(i)
right_indices.append(i+self._l_s)
scores.append(prediction_errors[i])
return np.asarray(scores),np.asarray(left_indices),np.asarray(right_indices)
"""Class to Implement Deep Log LSTM based on "https://www.cs.utah.edu/~lifeifei/papers/deeplog.pdf
Only Parameter Value anomaly detection layer has been implemented for time series data"""
def __init__(self,smoothing_perc=0.05,window_size = 10,error_buffer = 5,batch_size =30, \
dropout = 0.3, validation_split=0.2,optimizer='adam',lstm_batch_size=64,loss_metric='mean_squared_error', \
layers=[40,40],epochs = 1,patience =10,min_delta=0.0003,l_s=5,n_predictions=2,p = 0.05,contamination=0.1):
# super(Detector, self).__init__(contamination=contamination)
super(Detector, self).__init__(contamination=contamination,
window_size=l_s,
step_size=1,
)
self._smoothin_perc = smoothing_perc
self._window_size =window_size
self._error_buffer = error_buffer
self._batch_size = batch_size
self._dropout = dropout
self._validation_split = validation_split
self._optimizer = optimizer
self._lstm_batch_size = lstm_batch_size
self._loss_metric = loss_metric
self._layers = layers
self._epochs = epochs
self._patience = patience
self._min_delta = min_delta
self._l_s = l_s
self._n_predictions = n_predictions
self._p = p
self.contamination = contamination
# self.y_hat = None
self.results = []
self.result_df = None
self._model = None
self._channel = None
def fit(self,X,y=None):
"""
Fit data to LSTM model.
Args:
inputs : X , ndarray of size (number of sample,features)
Returns:
return : self object with trained model
"""
X = check_array(X).astype(np.float)
self._set_n_classes(None)
inputs = X
self._channel = Channel(n_predictions = self._n_predictions,l_s = self._l_s)
self._channel.shape_train_data(inputs)
self._model = Model(self._channel,patience = self._patience,
min_delta =self._min_delta,
layers = self._layers,
dropout = self._dropout,
n_predictions = self._n_predictions,
loss_metric = self._loss_metric,
optimizer = self._optimizer,
lstm_batch_size = self._lstm_batch_size,
epochs = self._epochs,
validation_split = self._validation_split,
batch_size = self._batch_size,
l_s = self._l_s
)
self.decision_scores_, self.left_inds_, self.right_inds_ = self.decision_function(X)
self._process_decision_scores()
return self
def decision_function(self, X: np.array):
"""Predict raw anomaly scores of X using the fitted detector.
The anomaly score of an input sample is computed based on the fitted
detector. For consistency, outliers are assigned with
higher anomaly scores.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples. Sparse matrices are accepted only
if they are supported by the base estimator.
Returns
-------
anomaly_scores : numpy array of shape (n_samples,)
The anomaly score of the input samples.
"""
X = check_array(X).astype(np.float)
self._set_n_classes(None)
inputs = X
self._channel.shape_test_data(inputs)
self._channel = self._model.batch_predict(channel = self._channel)
errors = Errors(channel = self._channel,
window_size = self._window_size,
batch_size = self._batch_size,
smoothing_perc = self._smoothin_perc,
n_predictions = self._n_predictions,
l_s = self._l_s,
error_buffer = self._error_buffer,
p = self._p
)
# prediciton smoothed error
prediction_errors = np.reshape(errors.e_s,(self._channel.X_test.shape[0],self._channel.X_test.shape[2]))
prediction_errors = np.sum(prediction_errors,axis=1)
left_indices = []
right_indices = []
scores = []
for i in range(len(prediction_errors)):
left_indices.append(i)
right_indices.append(i+self._l_s)
scores.append(prediction_errors[i])
return np.asarray(scores),np.asarray(left_indices),np.asarray(right_indices)