|
|
@@ -0,0 +1,404 @@ |
|
|
|
# -*- coding: utf-8 -*- |
|
|
|
# Copyright 2021 Huawei Technologies Co., Ltd |
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
# you may not use this file except in compliance with the License. |
|
|
|
# You may obtain a copy of the License at |
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0 |
|
|
|
# Unless required by applicable law or agreed to in writing, software |
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS, |
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
|
|
# See the License for the specific language governing permissions and |
|
|
|
# limitations under the License. |
|
|
|
# ============================================================================ |
|
|
|
|
|
|
|
""" |
|
|
|
Concpt drift module |
|
|
|
""" |
|
|
|
|
|
|
|
from itertools import groupby |
|
|
|
import numpy as np |
|
|
|
import matplotlib.pyplot as plt |
|
|
|
from mindarmour.utils._check_param import _check_array_not_empty |
|
|
|
from mindarmour.utils._check_param import check_param_type, check_param_in_range |
|
|
|
|
|
|
|
|
|
|
|
class ConceptDriftCheckTimeSeries: |
|
|
|
""" |
|
|
|
Concept is used for example series distribution change detection. |
|
|
|
|
|
|
|
Args: |
|
|
|
window_size(int): Size of a concept window, belongs to [10, 1/3*len(input data)]. |
|
|
|
If the data is periodic, usually window_size equals 2-5 periods, such as, |
|
|
|
for monthly/weekly data, the data volume of 30/7 days is a period. Default: 100. |
|
|
|
rolling_window(int): Smoothing window size, belongs to [1, window_size]. Default:10. |
|
|
|
step(int): The jump length of the sliding window, belongs to [1, window_size]. Default:10. |
|
|
|
threshold_index(float): The threshold index, (-∞, +∞), Default: 1.5. |
|
|
|
need_label(bool): False or True. If need_label=True, concept drift labels are needed. |
|
|
|
Default: False. |
|
|
|
|
|
|
|
Examples: |
|
|
|
>>> concept = ConceptDriftCheckTimeSeries(window_size=100, rolling_window=10, |
|
|
|
>>> step=10, threshold_index=1.5, need_label=False) |
|
|
|
>>> data_example = np.array([np.random.rand(1000), |
|
|
|
>>> np.random.rand(1000),np.random.rand(1000)]).T |
|
|
|
>>> score, threshold, concept_drift_location = concept.concept_check(data_example) |
|
|
|
""" |
|
|
|
|
|
|
|
def __init__(self, window_size=100, rolling_window=10, |
|
|
|
step=10, threshold_index=1.5, need_label=False): |
|
|
|
self.window_size = check_param_type('window_size', window_size, int) |
|
|
|
self.rolling_window = check_param_type('rolling_window', rolling_window, int) |
|
|
|
self.rolling_window = check_param_in_range('rolling_window', |
|
|
|
rolling_window, 1, window_size) |
|
|
|
self.step = check_param_type('step', step, int) |
|
|
|
self.step = check_param_in_range('step', step, 1, window_size) |
|
|
|
self.threshold_index = check_param_type('threshold_index', threshold_index, float) |
|
|
|
self.need_label = check_param_type('need_label', need_label, bool) |
|
|
|
self._in_size = window_size |
|
|
|
self._out_size = window_size |
|
|
|
self._res_size = int(0.1*window_size) |
|
|
|
|
|
|
|
def _reservoir_model_feature(self, window_data): |
|
|
|
""" |
|
|
|
Extract example features in reservoir model. |
|
|
|
|
|
|
|
Args: |
|
|
|
window_data(numpy.ndarray): The input data (in one window). |
|
|
|
|
|
|
|
Returns: |
|
|
|
w_out(numpy.ndarray): The output weight of reservoir model. |
|
|
|
x_state(numpy.ndarray): The state of the reservoir model in the latent space. |
|
|
|
|
|
|
|
Examples: |
|
|
|
>>> input_data = np.random.rand(100) |
|
|
|
>>> w, x = ConceptDriftCheckTimeSeries._reservoir_model_feature(window_data) |
|
|
|
""" |
|
|
|
# Initialize weights |
|
|
|
res_size = self._res_size |
|
|
|
x_state = _w_generate(res_size, len(window_data), window_data) |
|
|
|
x_state_t = x_state.T |
|
|
|
# Data reshape |
|
|
|
data_channel = None |
|
|
|
if window_data.ndim == 2: |
|
|
|
data_channel = window_data.shape[1] |
|
|
|
if window_data.ndim == 1: |
|
|
|
data_channel = 1 |
|
|
|
y_t = window_data.reshape(len(window_data), data_channel) |
|
|
|
reg = 1e-8 |
|
|
|
# Calculate w_out |
|
|
|
w_out = np.dot(np.dot(y_t, x_state_t), |
|
|
|
np.linalg.inv(np.dot(x_state, x_state_t) + reg*np.eye(res_size))) |
|
|
|
return w_out, x_state |
|
|
|
|
|
|
|
def _concept_distance(self, data_x, data_y): |
|
|
|
""" |
|
|
|
Calculate the distance of two example blocks. |
|
|
|
|
|
|
|
Args: |
|
|
|
data_x(numpy.ndarray): Data x. |
|
|
|
data_y(numpy.ndarray): Data y. |
|
|
|
|
|
|
|
Returns: |
|
|
|
distance_score_mean(float): Distance between data_x and data_y. |
|
|
|
|
|
|
|
Examples: |
|
|
|
>>> x = np.random.rand(100) |
|
|
|
>>> y = np.random.rand(100) |
|
|
|
>>> score = ConceptDriftCheckTimeSeries._concept_distance(x, y) |
|
|
|
""" |
|
|
|
# Feature extraction |
|
|
|
feature_x = self._reservoir_model_feature(data_x) |
|
|
|
feature_y = self._reservoir_model_feature(data_y) |
|
|
|
# Calculate distance |
|
|
|
distance_wx = sum(abs(np.dot(feature_x[0], feature_x[1]) |
|
|
|
- np.dot(feature_y[0], feature_y[1])))/len(data_x) |
|
|
|
statistic_feature = abs(data_x.mean() - data_y.mean()).mean() |
|
|
|
distance_score = (distance_wx + statistic_feature) / (1 + distance_wx + statistic_feature) |
|
|
|
distance_score_mean = distance_score.mean() |
|
|
|
return distance_score_mean |
|
|
|
|
|
|
|
def _data_process(self, data): |
|
|
|
""" |
|
|
|
Data processing. |
|
|
|
|
|
|
|
Args: |
|
|
|
data(numpy.ndarray): Input data. |
|
|
|
|
|
|
|
Returns: |
|
|
|
smooth_data(numpy.ndarray): Data after smoothing. |
|
|
|
|
|
|
|
Examples: |
|
|
|
>>> data_example = np.random.rand(100) |
|
|
|
>>> data_example = ConceptDriftCheckTimeSeries._data_process(data_example) |
|
|
|
""" |
|
|
|
temp = [] |
|
|
|
data_channel = None |
|
|
|
if data.ndim == 2: |
|
|
|
data_channel = data.shape[1] |
|
|
|
if data.ndim == 1: |
|
|
|
data_channel = 1 |
|
|
|
data = data.reshape(len(data), data_channel) |
|
|
|
# Moving average |
|
|
|
for i in range(data_channel): |
|
|
|
data_av = np.convolve(data[:, i], |
|
|
|
np.ones((self.rolling_window,)) / self.rolling_window, 'valid') |
|
|
|
data_av = np.append(data_av, np.ones(self.rolling_window - 1)*data_av[-1]) |
|
|
|
data_av = (data_av - data_av.min()) / (data_av.max() - data_av.min()) |
|
|
|
temp.append(data_av) |
|
|
|
smooth_data = np.array(temp).T |
|
|
|
return smooth_data |
|
|
|
|
|
|
|
def concept_check(self, data): |
|
|
|
""" |
|
|
|
Find concept drift locations in a example series. |
|
|
|
|
|
|
|
Args: |
|
|
|
data(numpy.ndarray): Input data. The shape of data could be (n,1) or (n,m). |
|
|
|
Note that each column (m columns) is one data series. |
|
|
|
|
|
|
|
Returns: |
|
|
|
drift_score(numpy.ndarray): The concept drift score of the example series. |
|
|
|
threshold(float): The threshold to judge concept drift. |
|
|
|
concept_drift_location(list): The location of the concept drift. |
|
|
|
|
|
|
|
Examples: |
|
|
|
>>> concept = ConceptDriftCheckTimeSeries(window_size=100, rolling_window=10, |
|
|
|
>>> step=10, threshold_index=1.5, need_label=False) |
|
|
|
>>> data_example = np.array([np.random.rand(1000), |
|
|
|
>>> np.random.rand(1000), np.random.rand(1000)]).T |
|
|
|
>>> score, drift_threshold, point = concept.concept_check(data_example) |
|
|
|
""" |
|
|
|
# data check |
|
|
|
data = _check_array_not_empty('data', data) |
|
|
|
data = check_param_type('data', data, np.ndarray) |
|
|
|
check_param_in_range('window_size', self.window_size, 10, int((1 / 3)*len(data))) |
|
|
|
original_data = data |
|
|
|
data = self._data_process(data) |
|
|
|
# calculate drift score |
|
|
|
drift_score = np.zeros(len(data)) |
|
|
|
step_size = self.step |
|
|
|
for i in range(0, len(data) - 2*self.window_size, step_size): |
|
|
|
data_x = data[i: i + self.window_size] |
|
|
|
data_y = data[i + self.window_size:i + 2*self.window_size] |
|
|
|
drift_score[i + self.window_size] = self._concept_distance(data_x, data_y) |
|
|
|
threshold = _cal_threshold(drift_score, self.threshold_index) |
|
|
|
# original label |
|
|
|
label, label_location = _original_label(data, threshold, drift_score, |
|
|
|
self.window_size, step_size) |
|
|
|
# label continue |
|
|
|
label_continue = _label_continue_process(label) |
|
|
|
# find drift blocks |
|
|
|
concept_drift_location, drift_point = _drift_blocks(drift_score, |
|
|
|
label_continue, label_location) |
|
|
|
# show result |
|
|
|
_plot_show(original_data, threshold, concept_drift_location, |
|
|
|
drift_point, drift_score) |
|
|
|
return drift_score, threshold, concept_drift_location |
|
|
|
|
|
|
|
|
|
|
|
def _plot_show(original_data, threshold, concept_location, drift_point, drift_score): |
|
|
|
""" |
|
|
|
To show the result. |
|
|
|
|
|
|
|
Args: |
|
|
|
original_data(numpy.ndarray): The input data. |
|
|
|
threshold(float): The concept drift threshold. |
|
|
|
concept_location(list): The concept drift locations(x-axis). |
|
|
|
drift_point(list): The precise drift location of a drift. |
|
|
|
drift_score(numpy.ndarray): The drift score of input data. |
|
|
|
""" |
|
|
|
plt.figure(figsize=(20, 8)) |
|
|
|
plt.subplot(2, 1, 1) |
|
|
|
# Plot input data and drift points |
|
|
|
plt.plot(original_data, label="data") |
|
|
|
plt.title('concept drift check, threshold=' + str(threshold)) |
|
|
|
plt.scatter(concept_location, np.ones(len(concept_location)), |
|
|
|
marker='*', s=200, color="b", label="concept drift occurred") |
|
|
|
for _, i in enumerate(drift_point): |
|
|
|
plt.axvline(x=i, color='r', linestyle='--') |
|
|
|
plt.legend() |
|
|
|
plt.subplot(2, 1, 2) |
|
|
|
# Plot drift score |
|
|
|
plt.plot(drift_score, label="drift_score") |
|
|
|
plt.axhline(y=threshold, color='r', linestyle='--', label="threshold") |
|
|
|
plt.legend() |
|
|
|
plt.show() |
|
|
|
|
|
|
|
|
|
|
|
def _original_label(original_data, threshold, drift_score, window_size, step_size): |
|
|
|
""" |
|
|
|
To obtain a original drift label of time series. |
|
|
|
|
|
|
|
Args: |
|
|
|
original_data(numpy.ndarray): The input data. |
|
|
|
threshold(float): The drift threshold. |
|
|
|
drift_score(numpy.ndarray): The drift score of the input data. |
|
|
|
window_size(int): Size of a concept window. |
|
|
|
Usually 3 periods of the input data if it is periodic. |
|
|
|
step_size(int): The jump length of the sliding window. |
|
|
|
|
|
|
|
Returns: |
|
|
|
label(list): The drift label of input data. |
|
|
|
0 means no drift, and 1 means drift happens. |
|
|
|
label_location(list): The locations of drifts(x-axis). |
|
|
|
""" |
|
|
|
label = [] |
|
|
|
label_location = [] |
|
|
|
# Label: label=0, no drifts; label=1, drift happens. |
|
|
|
for i in range(0, len(original_data) - 2*window_size, step_size): |
|
|
|
label_location.append(i + window_size) |
|
|
|
if drift_score[i + window_size] >= threshold: |
|
|
|
label.append(1) |
|
|
|
else: |
|
|
|
label.append(0) |
|
|
|
return label, label_location |
|
|
|
|
|
|
|
|
|
|
|
def _label_continue_process(label): |
|
|
|
""" |
|
|
|
To obtain a continual drift label of time series. |
|
|
|
|
|
|
|
Args: |
|
|
|
label(list): The original drift label. |
|
|
|
|
|
|
|
Returns: |
|
|
|
label_continue(numpy.ndarray): The continual drift label. |
|
|
|
The drift may happen occasionally, we hope to avoid |
|
|
|
frequent alarms, so label continue process is necessary. |
|
|
|
""" |
|
|
|
if label[-1] == 1 and label[-2] == 0 and label[-3] == 0 and label[-4] == 0: |
|
|
|
label[-1] = 0 |
|
|
|
if label[0] == 1 and label[1] == 0 and label[2] == 0 and label[3] == 0: |
|
|
|
label[0] = 0 |
|
|
|
label_continue = np.array(label) |
|
|
|
# Label continue process |
|
|
|
for i in range(1, len(label) - 1): |
|
|
|
if label[i - 1] == 0 and label[i + 1] == 0: |
|
|
|
label_continue[i - 1:i + 1] = 0 |
|
|
|
return label_continue |
|
|
|
|
|
|
|
|
|
|
|
def _find_loc(label_location): |
|
|
|
return label_location[1] - label_location[0] |
|
|
|
|
|
|
|
|
|
|
|
def _continue_block(location): |
|
|
|
""" |
|
|
|
Find continue blocks of concept drift. |
|
|
|
|
|
|
|
Args: |
|
|
|
location(numpy.ndarray): The locations of concept drift. |
|
|
|
|
|
|
|
Returns: |
|
|
|
area(list): Continue blocks of concept drift. |
|
|
|
""" |
|
|
|
area = [] |
|
|
|
for _, loc in groupby(enumerate(location), _find_loc): |
|
|
|
l_1 = [j for i, j in loc] |
|
|
|
area.append(l_1) |
|
|
|
return area |
|
|
|
|
|
|
|
|
|
|
|
def _drift_blocks(drift_score, label_continue, label_location): |
|
|
|
""" |
|
|
|
Find the concept drift areas. |
|
|
|
|
|
|
|
Args: |
|
|
|
drift_score(numpy.ndarray): The drift score of the data series. |
|
|
|
label_continue(numpy.ndarray): The concept drift continual label. |
|
|
|
label_location(numpy.ndarray): The locations of concept drift(x-axis). |
|
|
|
|
|
|
|
Returns: |
|
|
|
concept_location(list): The concept drift locations(x-axis) after continual blocks finding. |
|
|
|
drift_point(list): Return a precise beginning location of a drift. |
|
|
|
""" |
|
|
|
# Find drift blocks |
|
|
|
area = _continue_block(np.where(label_continue == 1)[0]) |
|
|
|
label_continue = np.array(label_continue) |
|
|
|
label_location = np.array(label_location) |
|
|
|
label_continue = label_continue[label_continue == 1] |
|
|
|
concept_location = [] |
|
|
|
drift_point = [] |
|
|
|
# Find drift points |
|
|
|
for _, lo_ in enumerate(area): |
|
|
|
location = label_location[lo_] |
|
|
|
concept_location.extend(location) |
|
|
|
if label_continue.size > 0: |
|
|
|
drift_point.append(location[np.where(drift_score[location] |
|
|
|
== np.max(drift_score[location]))[0]]) |
|
|
|
else: |
|
|
|
drift_point.append(None) |
|
|
|
return concept_location, drift_point |
|
|
|
|
|
|
|
|
|
|
|
def _w_generate(res_size, in_size, input_data): |
|
|
|
""" |
|
|
|
Randomly generate weights of the reservoir model. |
|
|
|
|
|
|
|
Args: |
|
|
|
res_size(int): The number of reservoir nodes. |
|
|
|
in_size(int): The input size of reservoir model. |
|
|
|
input_data(numpy.ndarray): Input data. |
|
|
|
|
|
|
|
Returns: |
|
|
|
x_state(numpy.ndarray): The state of reservoir. |
|
|
|
""" |
|
|
|
# Weight generates randomly |
|
|
|
np.random.seed(42) |
|
|
|
w_in = np.random.rand(res_size, in_size) - 0.5 |
|
|
|
w_0 = np.random.rand(res_size, res_size) - 0.5 |
|
|
|
w_0 *= 0.8 |
|
|
|
a_speed = 0.3 |
|
|
|
# Data reshape |
|
|
|
data_channel = None |
|
|
|
if input_data.ndim == 2: |
|
|
|
data_channel = input_data.shape[1] |
|
|
|
if input_data.ndim == 1: |
|
|
|
data_channel = 1 |
|
|
|
# Reservoir state |
|
|
|
x_state = np.zeros((res_size, data_channel)) |
|
|
|
u_input = input_data.reshape(len(input_data), data_channel) |
|
|
|
for _ in range(50): |
|
|
|
x_state = (1 - a_speed)*x_state + \ |
|
|
|
a_speed*np.tanh(np.dot(w_in, u_input) + np.dot(w_0, x_state)) |
|
|
|
return x_state |
|
|
|
|
|
|
|
|
|
|
|
def _cal_distance(matrix1, matrix2): |
|
|
|
""" |
|
|
|
Calculate distance between two matrix. |
|
|
|
|
|
|
|
Args: |
|
|
|
matrix1(numpy.ndarray): Input array. |
|
|
|
matrix2(numpy.ndarray): Input array. |
|
|
|
|
|
|
|
Returns: |
|
|
|
distance(numpy.ndarray): Distance between two arrays. |
|
|
|
""" |
|
|
|
w_mean_x = np.mean(matrix1, axis=0) |
|
|
|
w_mean_y = np.mean(matrix2, axis=0) |
|
|
|
distance = sum(abs(w_mean_x - w_mean_y)) |
|
|
|
return distance |
|
|
|
|
|
|
|
|
|
|
|
def _cal_threshold(distance, threshold_index): |
|
|
|
""" |
|
|
|
Calculate the threshold of concept drift. |
|
|
|
|
|
|
|
Args: |
|
|
|
distance(numpy.ndarray): The distance between two data series. |
|
|
|
threshold_index(float): Threshold adjusted index, [-∞, +∞]. |
|
|
|
|
|
|
|
Returns: |
|
|
|
threshold(float): [0, 1]. |
|
|
|
""" |
|
|
|
distance = distance[distance > 0] |
|
|
|
# Threshold calculation |
|
|
|
if distance.size > 0: |
|
|
|
q_1 = np.percentile(distance, 25) |
|
|
|
q_3 = np.percentile(distance, 75) |
|
|
|
q_diff = q_3 - q_1 |
|
|
|
threshold = np.clip(0.1 + threshold_index*q_diff, 0, 1) |
|
|
|
else: |
|
|
|
threshold = 1 |
|
|
|
return threshold |