@@ -0,0 +1,2 @@ | |||
mindspore: | |||
'mindspore/mindspore/version/202110/20211016/r1.5_20211016173415_b550abc902290739ca38bda0d01cfea7e053e77d/' |
@@ -35,13 +35,13 @@ if __name__ == '__main__': | |||
model = Model(net) | |||
# load data | |||
ds_train = np.load('../../tests/ut/python/dataset/concept_train_lenet.npy') | |||
ds_test1 = np.load('../../tests/ut/python/dataset/concept_test_lenet1.npy') | |||
ds_test2 = np.load('../../tests/ut/python/dataset/concept_test_lenet2.npy') | |||
ds_eval = np.load('../../tests/ut/python/dataset/concept_test_lenet1.npy') | |||
ds_test = np.load('../../tests/ut/python/dataset/concept_test_lenet2.npy') | |||
# ood detector initialization | |||
detector = OodDetectorFeatureCluster(model, ds_train, n_cluster=10, layer='output[:Tensor]') | |||
# get optimal threshold with ds_test1 | |||
num = int(len(ds_test1) / 2) | |||
# get optimal threshold with ds_eval | |||
num = int(len(ds_eval) / 2) | |||
label = np.concatenate((np.zeros(num), np.ones(num)), axis=0) # ID data = 0, OOD data = 1 | |||
optimal_threshold = detector.get_optimal_threshold(label, ds_test1) | |||
# get result of ds_test2. We can also set threshold by ourself. | |||
result = detector.ood_predict(optimal_threshold, ds_test2) | |||
optimal_threshold = detector.get_optimal_threshold(label, ds_eval) | |||
# get result of ds_test2. We can also set threshold by ourselves. | |||
result = detector.ood_predict(optimal_threshold, ds_test) |
@@ -35,13 +35,13 @@ if __name__ == '__main__': | |||
model = Model(net) | |||
# load data | |||
ds_train = np.load('train.npy') | |||
ds_test1 = np.load('test1.npy') | |||
ds_test2 = np.load('test2.npy') | |||
ds_eval = np.load('test1.npy') | |||
ds_test = np.load('test2.npy') | |||
# ood detector initialization | |||
detector = OodDetectorFeatureCluster(model, ds_train, n_cluster=10, layer='output[:Tensor]') | |||
# get optimal threshold with ds_test1 | |||
num = int(len(ds_test1) / 2) | |||
# get optimal threshold with ds_eval | |||
num = int(len(ds_eval) / 2) | |||
label = np.concatenate((np.zeros(num), np.ones(num)), axis=0) # ID data = 0, OOD data = 1 | |||
optimal_threshold = detector.get_optimal_threshold(label, ds_test1) | |||
# get result of ds_test2. We can also set threshold by ourself. | |||
result = detector.ood_predict(optimal_threshold, ds_test2) | |||
optimal_threshold = detector.get_optimal_threshold(label, ds_eval) | |||
# get result of ds_test2. We can also set threshold by ourselves. | |||
result = detector.ood_predict(optimal_threshold, ds_test) |
@@ -274,11 +274,12 @@ class SimilarityDetector(Detector): | |||
""" | |||
Filter adversarial noises in input samples. | |||
Args: | |||
inputs (Union[numpy.ndarray, list, tuple]): Data been used as references to create adversarial examples. | |||
Raises: | |||
NotImplementedError: This function is not available | |||
in class `SimilarityDetector`. | |||
NotImplementedError: This function is not available in class `SimilarityDetector`. | |||
""" | |||
msg = 'The function transform() is not available in the class ' \ | |||
'`SimilarityDetector`.' | |||
msg = 'The function transform() is not available in the class `SimilarityDetector`.' | |||
LOGGER.error(TAG, msg) | |||
raise NotImplementedError(msg) |
@@ -133,8 +133,11 @@ from mindarmour.reliability.concept_drift.concept_drift_check_images import OodD | |||
#### Load Classification Model | |||
For convenience, we use a pre-trained model file `checkpoint_lenet-10_1875.ckpt` | |||
in 'mindarmour/tests/ut/python/dataset/trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'. | |||
```python | |||
ckpt_path = '../../dataset/trained_ckpt_file/checkpoint_lenet-10_1875.ckpt' | |||
ckpt_path = 'checkpoint_lenet-10_1875.ckpt' | |||
net = LeNet5() | |||
load_dict = load_checkpoint(ckpt_path) | |||
load_param_into_net(net, load_dict) | |||
@@ -143,21 +146,111 @@ model = Model(net) | |||
>`ckpt_path(str)`: the model path. | |||
We can also use self-constructed model. | |||
It is important that we need to name the model layer, and get the layer outputs. | |||
Take LeNet as an example. | |||
Firstly, we import `TensorSummary` module. | |||
Secondly, we initialize it as `self.summary = TensorSummary()`. | |||
Finally, we add `self.summary('name', x)` after each layer we pay attention to. Here, `name` of each layer is given by users. | |||
After the above process, we can train the model and load it. | |||
```python | |||
from mindspore import nn | |||
from mindspore.common.initializer import TruncatedNormal | |||
from mindspore.ops import TensorSummary | |||
def conv(in_channels, out_channels, kernel_size, stride=1, padding=0): | |||
"""Wrap conv.""" | |||
weight = weight_variable() | |||
return nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding, | |||
weight_init=weight, has_bias=False, pad_mode="valid") | |||
def fc_with_initialize(input_channels, out_channels): | |||
"""Wrap initialize method of full connection layer.""" | |||
weight = weight_variable() | |||
bias = weight_variable() | |||
return nn.Dense(input_channels, out_channels, weight, bias) | |||
def weight_variable(): | |||
"""Wrap initialize variable.""" | |||
return TruncatedNormal(0.05) | |||
class LeNet5(nn.Cell): | |||
""" | |||
Lenet network | |||
""" | |||
def __init__(self): | |||
super(LeNet5, self).__init__() | |||
self.conv1 = conv(1, 6, 5) | |||
self.conv2 = conv(6, 16, 5) | |||
self.fc1 = fc_with_initialize(16*5*5, 120) | |||
self.fc2 = fc_with_initialize(120, 84) | |||
self.fc3 = fc_with_initialize(84, 10) | |||
self.relu = nn.ReLU() | |||
self.max_pool2d = nn.MaxPool2d(kernel_size=2, stride=2) | |||
self.flatten = nn.Flatten() | |||
self.summary = TensorSummary() | |||
def construct(self, x): | |||
""" | |||
construct the network architecture | |||
Returns: | |||
x (tensor): network output | |||
""" | |||
x = self.conv1(x) | |||
self.summary('1', x) | |||
x = self.relu(x) | |||
self.summary('2', x) | |||
x = self.max_pool2d(x) | |||
self.summary('3', x) | |||
x = self.conv2(x) | |||
self.summary('4', x) | |||
x = self.relu(x) | |||
self.summary('5', x) | |||
x = self.max_pool2d(x) | |||
self.summary('6', x) | |||
x = self.flatten(x) | |||
self.summary('7', x) | |||
x = self.fc1(x) | |||
self.summary('8', x) | |||
x = self.relu(x) | |||
self.summary('9', x) | |||
x = self.fc2(x) | |||
self.summary('10', x) | |||
x = self.relu(x) | |||
self.summary('11', x) | |||
x = self.fc3(x) | |||
self.summary('output', x) | |||
return x | |||
``` | |||
#### Load Data | |||
We prepare three datasets. The training dataset, that is the same as the dataset to train the Lenet. Two testing datasets, the first testing dataset is with OOD label(0 for non-ood, and 1 for ood) for finding an optimal threshold for ood detection. | |||
The second testing dataset is for ood validation. The first testing dataset is not necessary if we would like to set threshold by ourselves | |||
```python | |||
ds_train = np.load('../../dataset/concept_train_lenet.npy') | |||
ds_test1 = np.load('../../dataset/concept_test_lenet1.npy') | |||
ds_test2 = np.load('../../dataset/concept_test_lenet2.npy') | |||
ds_eval = np.load('../../dataset/concept_test_lenet1.npy') | |||
ds_test = np.load('../../dataset/concept_test_lenet2.npy') | |||
``` | |||
> `ds_train(numpy.ndarray)`: the train data. | |||
> `ds_test1(numpy.ndarray)`: the data for finding an optimal threshold. This dataset is not necessary. | |||
> `ds_test2(numpy.ndarray)`: the test data for ood detection. | |||
> `ds_eval(numpy.ndarray)`: the data for finding an optimal threshold. This dataset is not necessary. | |||
> `ds_test(numpy.ndarray)`: the test data for ood detection. | |||
#### OOD detector initialization | |||
@@ -172,30 +265,32 @@ detector = OodDetectorFeatureCluster(model, ds_train, n_cluster=10, layer='outpu | |||
> `model(Model)`: the model trained by the `ds_train`. | |||
> `ds_train(numpy.ndarray)`: the training data. | |||
> `n_cluster(int)`: the feature cluster number. | |||
> `layer(str)`: the feature extraction layer. In our example, The layer name could be 'output[:Tensor]', '9[:Tensor]', '10[:Tensor]', '11[:Tensor]' for LeNet. | |||
> `layer(str)`: the name of the feature extraction layer. | |||
In our example, we input the layer name `output[:Tensor]`, which can also be`9[:Tensor]`, `10[:Tensor]`, `11[:Tensor]` for LeNet. | |||
#### Optimal Threshold | |||
This step is optional. If we have a labeled dataset, named ds_test1, we can use the following code to find the optimal detection threshold. | |||
This step is optional. If we have a labeled dataset, named `ds_eval`, we can use the following code to find the optimal detection threshold. | |||
```python | |||
# get optimal threshold with ds_test1 | |||
num = int(len(ds_test1) / 2) | |||
# get optimal threshold with ds_eval | |||
num = int(len(ds_eval) / 2) | |||
label = np.concatenate((np.zeros(num), np.ones(num)), axis=0) # ID data = 0, OOD data = 1 | |||
optimal_threshold = detector.get_optimal_threshold(label, ds_test1) | |||
optimal_threshold = detector.get_optimal_threshold(label, ds_eval) | |||
``` | |||
> `ds_test1(numpy.ndarray)`: the data for finding an optimal threshold. . | |||
> `label(numpy.ndarray)`: the ood label of ds_test1. 0 means non-ood data, and 1 means ood data. | |||
> `ds_eval(numpy.ndarray)`: the data for finding an optimal threshold. | |||
> `label(numpy.ndarray)`: the ood label of ds_eval. 0 means non-ood data, and 1 means ood data. | |||
#### Detection result | |||
```python | |||
result = detector.ood_predict(optimal_threshold, ds_test2) | |||
result = detector.ood_predict(optimal_threshold, ds_test) | |||
``` | |||
> `ds_test2(numpy.ndarray)`: the testing data for ood detection. | |||
> `ds_test(numpy.ndarray)`: the testing data for ood detection. | |||
> `optimal_threshold(float)`: the optimal threshold to judge out-of-distribution data. We can also set the threshold value by ourselves. | |||
## Script Description | |||
@@ -24,7 +24,6 @@ from mindspore.train.summary.summary_record import _get_summary_tensor_data | |||
""" | |||
Out-of-Distribution detection for images. | |||
The sample can be run on Ascend 910 AI processor. | |||
""" | |||
@@ -46,32 +45,60 @@ class OodDetector: | |||
Args: | |||
model (Model): The model for extracting features. | |||
data (numpy.ndarray): Input data. | |||
layer (str): The feature layer. The layer name could be 'output[:Tensor]', | |||
'9[:Tensor]', '10[:Tensor]', '11[:Tensor]' for LeNet, and 'output[:Tensor]', | |||
'1[:Tensor]' for Resnet. | |||
layer (str): The name of the feature layer. layer (str) is represented as | |||
'name[:Tensor]', where 'name' is given by users when training the model. | |||
Please see more details about how to name the model layer in 'README.md'. | |||
Returns: | |||
numpy.ndarray, the feature of input data. | |||
numpy.ndarray, the data feature extracted by a certain neural layer. | |||
""" | |||
model.predict(Tensor(data)) | |||
layer_out = _get_summary_tensor_data() | |||
return layer_out[layer].asnumpy() | |||
def get_optimal_threshold(self, score, label, ds_test1): | |||
def get_optimal_threshold(self, label, ds_eval): | |||
""" | |||
Get the optimal threshold. | |||
Args: | |||
label (numpy.ndarray): The label whether an image is in-distribution and out-of-distribution. | |||
ds_eval (numpy.ndarray): The testing dataset to help find the threshold. | |||
Returns: | |||
- float, the optimal threshold. | |||
""" | |||
pass | |||
def ood_predict(self, threshold, ds_test2): | |||
def ood_predict(self, threshold, ds_test): | |||
""" | |||
The out-of-distribution detection. | |||
Args: | |||
threshold (float): the threshold to judge ood data. One can set value by experience | |||
or use function get_optimal_threshold. | |||
ds_test (numpy.ndarray): The testing dataset. | |||
Returns: | |||
- numpy.ndarray, the detection result. 0 means the data is not ood, 1 means the data is ood. | |||
""" | |||
pass | |||
class OodDetectorFeatureCluster(OodDetector): | |||
""" | |||
Train the OOD detector. | |||
Train the OOD detector. Extract the training data features, and obtain the clustering centers. The distance between | |||
the testing data features and the clustering centers determines whether an image is an out-of-distribution(OOD) | |||
image or not. | |||
Args: | |||
model (Model):The training model. | |||
ds_train (numpy.ndarray): The training dataset. | |||
n_cluster (int): The cluster number. | |||
n_cluster (int): The cluster number. Belonging to [2,100]. | |||
Usually, n_cluster equals to the class number of the training dataset. | |||
If the OOD detector performs poor in the testing dataset, we can increase the value of n_cluster | |||
appropriately. | |||
layer (str): The name of the feature layer. layer (str) is represented by | |||
'name[:Tensor]', where 'name' is given by users when training the model. | |||
Please see more details about how to name the model layer in 'README.md'. | |||
""" | |||
def __init__(self, model, ds_train, n_cluster, layer): | |||
@@ -118,21 +145,20 @@ class OodDetectorFeatureCluster(OodDetector): | |||
score = np.array(score) | |||
return score | |||
def get_optimal_threshold(self, label, test_data_threshold): | |||
def get_optimal_threshold(self, label, ds_eval): | |||
""" | |||
Get the optimal threshold. | |||
Args: | |||
score (numpy.ndarray): The detection score of images. | |||
label (numpy.ndarray): The label whether an image is in-distribution and out-of-distribution. | |||
test_data_threshold (numpy.ndarray): The testing dataset to help find the threshold. | |||
ds_eval (numpy.ndarray): The testing dataset to help find the threshold. | |||
Returns: | |||
- float, the optimal threshold. | |||
""" | |||
check_param_type('label', label, np.ndarray) | |||
check_param_type('ds_test1', test_data_threshold, np.ndarray) | |||
score = self._get_ood_score(test_data_threshold) | |||
check_param_type('ds_eval', ds_eval, np.ndarray) | |||
score = self._get_ood_score(ds_eval) | |||
acc = [] | |||
threshold = [] | |||
for threshold_change in np.arange(0.0, 1.0, 0.01): | |||
@@ -154,7 +180,7 @@ class OodDetectorFeatureCluster(OodDetector): | |||
The out-of-distribution detection. | |||
Args: | |||
threshold (float): the threshold to judge ood data. One can set value by experience | |||
or use function get_optimal_threshold. | |||
or use function get_optimal_threshold. | |||
ds_test (numpy.ndarray): The testing dataset. | |||
Returns: | |||
@@ -49,16 +49,16 @@ def test_cp(): | |||
model = Model(net) | |||
# load data | |||
ds_train = np.load('../../dataset/concept_train_lenet.npy') | |||
ds_test1 = np.load('../../dataset/concept_test_lenet1.npy') | |||
ds_test2 = np.load('../../dataset/concept_test_lenet2.npy') | |||
ds_eval = np.load('../../dataset/concept_test_lenet1.npy') | |||
ds_test = np.load('../../dataset/concept_test_lenet2.npy') | |||
# ood detector initialization | |||
detector = OodDetectorFeatureCluster(model, ds_train, n_cluster=10, layer='output[:Tensor]') | |||
# get optimal threshold with ds_test1 | |||
num = int(len(ds_test1) / 2) | |||
# get optimal threshold with ds_eval | |||
num = int(len(ds_eval) / 2) | |||
label = np.concatenate((np.zeros(num), np.ones(num)), axis=0) # ID data = 0, OOD data = 1 | |||
optimal_threshold = detector.get_optimal_threshold(label, ds_test1) | |||
# get result of ds_test2. We can also set threshold by ourself. | |||
result = detector.ood_predict(optimal_threshold, ds_test2) | |||
optimal_threshold = detector.get_optimal_threshold(label, ds_eval) | |||
# get result of ds_test. We can also set threshold by ourselves. | |||
result = detector.ood_predict(optimal_threshold, ds_test) | |||
# result log | |||
LOGGER.set_level(logging.DEBUG) | |||
LOGGER.debug(TAG, '--start ood test--') | |||