@@ -24,23 +24,24 @@ from mindarmour.utils._check_param import check_param_type, check_param_in_range
class ConceptDriftCheckTimeSeries:
"""
Concept is used for example series distribution change detection.
ConceptDriftCheckTimeSeries is used for example series distribution change detection.
Args:
window_size(int): Size of a concept window, belongs to [10, 1/3*len(input data)].
If the data is periodic, usually window_size equals 2-5 periods, such as,
for monthly/weekly data, the data volume of 30/7 days is a period. Default: 100.
window_size(int): Size of a concept window, no less than 10. If given the input data,
window_size belongs to [10, 1/3*len(input data)]. If the data is periodic, usually
window_size equals 2-5 periods, such as, for monthly/weekly data, the data volume
of 30/7 days is a period. Default: 100.
rolling_window(int): Smoothing window size, belongs to [1, window_size]. Default:10.
step(int): The jump length of the sliding window, belongs to [1, window_size]. Default:10.
threshold_index(float): The threshold index, (-∞, +∞), Default: 1.5.
threshold_index(float): The threshold index, :math:`(-\infty, +\infty)`. Default: 1.5.
need_label(bool): False or True. If need_label=True, concept drift labels are needed.
Default: False.
Examples:
>>> concept = ConceptDriftCheckTimeSeries(window_size=100, rolling_window=10,
>>> step=10, threshold_index=1.5, need_label=False)
>>> data_example = np.array([np.random.rand(1000),
>>> np.random.rand(1000),np.random.rand(1000)]).T
>>> data_example = 5*np.random.rand(1000)
>>> data_example[200: 800] = 20*np.random.rand(600)
>>> score, threshold, concept_drift_location = concept.concept_check(data_example)
"""
@@ -48,10 +49,7 @@ class ConceptDriftCheckTimeSeries:
step=10, threshold_index=1.5, need_label=False):
self.window_size = check_param_type('window_size', window_size, int)
self.rolling_window = check_param_type('rolling_window', rolling_window, int)
self.rolling_window = check_param_in_range('rolling_window',
rolling_window, 1, window_size)
self.step = check_param_type('step', step, int)
self.step = check_param_in_range('step', step, 1, window_size)
self.threshold_index = check_param_type('threshold_index', threshold_index, float)
self.need_label = check_param_type('need_label', need_label, bool)
self._in_size = window_size
@@ -66,8 +64,8 @@ class ConceptDriftCheckTimeSeries:
window_data(numpy.ndarray): The input data (in one window).
Returns:
w_out(numpy.ndarray): T he output weight of reservoir model.
x_state(numpy.ndarray): T he state of the reservoir model in the latent space.
- numpy.ndarray, t he output weight of reservoir model.
- numpy.ndarray, t he state of the reservoir model in the latent space.
Examples:
>>> input_data = np.random.rand(100)
@@ -99,7 +97,7 @@ class ConceptDriftCheckTimeSeries:
data_y(numpy.ndarray): Data y.
Returns:
distance_score_mean(float): D istance between data_x and data_y.
- float, d istance between data_x and data_y.
Examples:
>>> x = np.random.rand(100)
@@ -125,7 +123,7 @@ class ConceptDriftCheckTimeSeries:
data(numpy.ndarray): Input data.
Returns:
smooth_data(numpy.ndarray): D ata after smoothing.
- numpy.ndarray, d ata after smoothing.
Examples:
>>> data_example = np.random.rand(100)
@@ -150,28 +148,30 @@ class ConceptDriftCheckTimeSeries:
def concept_check(self, data):
"""
Find concept drift locations in a example series.
Find concept drift locations in a data series.
Args:
data(numpy.ndarray): Input data. The shape of data could be (n,1) or (n,m).
Note that each column (m columns) is one data series.
Returns:
drift_score(numpy.ndarray): T he concept drift score of the example series.
threshold(float): T he threshold to judge concept drift.
concept_drift_location(list): T he location of the concept drift.
- numpy.ndarray, t he concept drift score of the example series.
- float, t he threshold to judge concept drift.
- list, t he location of the concept drift.
Examples:
>>> concept = ConceptDriftCheckTimeSeries(window_size=100, rolling_window=10,
>>> step=10, threshold_index=1.5, need_label=False)
>>> data_example = np.array([np.random.rand(1000),
>>> np.random.rand(1000), np.random.rand(1000)]).T
>>> score, drift_threshold, point = concept.concept_check(data_example)
>>> data_example = 5*np.random.rand(1000)
>>> data_example[200: 800] = 20*np.random.rand(600)
>>> score, drift_threshold, drift_location = concept.concept_check(data_example)
"""
# data check
data = _check_array_not_empty('data', data)
data = check_param_type('data', data, np.ndarray)
check_param_in_range('window_size', self.window_size, 10, int((1 / 3)*len(data)))
check_param_in_range('rolling_window', self.rolling_window, 1, self.window_size)
check_param_in_range('step', self.step, 1, self.window_size)
original_data = data
data = self._data_process(data)
# calculate drift score
@@ -190,39 +190,38 @@ class ConceptDriftCheckTimeSeries:
# find drift blocks
concept_drift_location, drift_point = _drift_blocks(drift_score,
label_continue, label_location)
# show result
_plot_show(original_data, threshold, concept_drift_location,
drift_point, drift_score)
# save result
_result_save(original_data, threshold, concept_drift_location, drift_point, drift_score)
return drift_score, threshold, concept_drift_location
def _plot_show (original_data, threshold, concept_location, drift_point, drift_score):
def _result_save (original_data, threshold, concept_location, drift_point, drift_score):
"""
To show the result.
To save the result.
Args:
original_data(numpy.ndarray): The input data.
threshold(float): The concept drift threshold.
concept_location(list): The concept drift locations(x-axis).
drift_point(list): The precise drift location of a drift.
drift_point(list): The precise drift point of a drift.
drift_score(numpy.ndarray): The drift score of input data.
"""
plt.figure(figsize=(20, 8))
plt.subplot(2, 1, 1)
# Plot input data and drift points
plt.plot(original_data, label="data")
plt.title('concept drift check, threshold=' + str(threshold))
plt.title('concept drift check, threshold=' + str(threshold), fontsize=25 )
plt.scatter(concept_location, np.ones(len(concept_location)),
marker='*', s=200, color="b", label="concept drift occurred")
for _, i in enumerate(drift_point):
plt.axvline(x=i, color='r', linestyle='--')
plt.legend()
plt.legend(fontsize=15 )
plt.subplot(2, 1, 2)
# Plot drift score
plt.plot(drift_score, label="drift_score")
plt.axhline(y=threshold, color='r', linestyle='--', label="threshold")
plt.legend()
plt.show( )
plt.legend(fontsize=15 )
plt.savefig('concept_drift_check.pdf' )
def _original_label(original_data, threshold, drift_score, window_size, step_size):
@@ -238,9 +237,9 @@ def _original_label(original_data, threshold, drift_score, window_size, step_siz
step_size(int): The jump length of the sliding window.
Returns:
label(list): T he drift label of input data.
- list, t he drift label of input data.
0 means no drift, and 1 means drift happens.
label_location(list): T he locations of drifts(x-axis).
- list, t he locations of drifts(x-axis).
"""
label = []
label_location = []
@@ -262,7 +261,7 @@ def _label_continue_process(label):
label(list): The original drift label.
Returns:
label_continue(numpy.ndarray): The continual drift label.
- numpy.ndarray, The continual drift label.
The drift may happen occasionally, we hope to avoid
frequent alarms, so label continue process is necessary.
"""
@@ -290,7 +289,7 @@ def _continue_block(location):
location(numpy.ndarray): The locations of concept drift.
Returns:
area(list): C ontinue blocks of concept drift.
- list, c ontinue blocks of concept drift.
"""
area = []
for _, loc in groupby(enumerate(location), _find_loc):
@@ -309,8 +308,8 @@ def _drift_blocks(drift_score, label_continue, label_location):
label_location(numpy.ndarray): The locations of concept drift(x-axis).
Returns:
concept_location(list): T he concept drift locations(x-axis) after continual blocks finding.
drift_point(list): R eturn a precise beginning location of a drift.
- list, t he concept drift locations(x-axis) after continual blocks finding.
- list, r eturn a precise beginning location of a drift.
"""
# Find drift blocks
area = _continue_block(np.where(label_continue == 1)[0])
@@ -341,7 +340,7 @@ def _w_generate(res_size, in_size, input_data):
input_data(numpy.ndarray): Input data.
Returns:
x_state(numpy.ndarray): T he state of reservoir.
- numpy.ndarray, t he state of reservoir.
"""
# Weight generates randomly
np.random.seed(42)
@@ -373,7 +372,7 @@ def _cal_distance(matrix1, matrix2):
matrix2(numpy.ndarray): Input array.
Returns:
distance(numpy.ndarray): D istance between two arrays.
- numpy.ndarray, d istance between two arrays.
"""
w_mean_x = np.mean(matrix1, axis=0)
w_mean_y = np.mean(matrix2, axis=0)
@@ -390,7 +389,7 @@ def _cal_threshold(distance, threshold_index):
threshold_index(float): Threshold adjusted index, [-∞, +∞].
Returns:
threshold(float): [0, 1].
- float, [0, 1].
"""
distance = distance[distance > 0]
# Threshold calculation