|
- # Copyright 2019 Huawei Technologies Co., Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """evaluate example"""
- import sys
- import os
- import time
- import numpy as np
- from scipy.special import softmax
-
- from lenet5_net import LeNet5
- from mindspore import Model
- from mindspore import Tensor
- from mindspore import context
- from mindspore import nn
- from mindspore.nn import Cell
- from mindspore.ops.operations import TensorAdd
- from mindspore.nn import SoftmaxCrossEntropyWithLogits
- from mindspore.train.serialization import load_checkpoint, load_param_into_net
-
- from mindarmour.attacks import FastGradientSignMethod
- from mindarmour.attacks import GeneticAttack
- from mindarmour.attacks.black.black_model import BlackModel
- from mindarmour.defenses import NaturalAdversarialDefense
- from mindarmour.evaluations import BlackDefenseEvaluate
- from mindarmour.evaluations import DefenseEvaluate
- from mindarmour.utils.logger import LogUtil
- from mindarmour.detectors.black.similarity_detector import SimilarityDetector
-
- sys.path.append("..")
- from data_processing import generate_mnist_dataset
-
- context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
-
- LOGGER = LogUtil.get_instance()
- TAG = 'Defense_Evaluate_Example'
-
-
- def get_detector(train_images):
- encoder = Model(EncoderNet(encode_dim=256))
- detector = SimilarityDetector(max_k_neighbor=50, trans_model=encoder)
- detector.fit(inputs=train_images)
- return detector
-
-
- class EncoderNet(Cell):
- """
- Similarity encoder for input data
- """
-
- def __init__(self, encode_dim):
- super(EncoderNet, self).__init__()
- self._encode_dim = encode_dim
- self.add = TensorAdd()
-
- def construct(self, inputs):
- """
- construct the neural network
- Args:
- inputs (Tensor): input data to neural network.
- Returns:
- Tensor, output of neural network.
- """
- return self.add(inputs, inputs)
-
- def get_encode_dim(self):
- """
- Get the dimension of encoded inputs
-
- Returns:
- int, dimension of encoded inputs.
- """
- return self._encode_dim
-
-
- class ModelToBeAttacked(BlackModel):
- """
- model to be attack
- """
-
- def __init__(self, network, defense=False, train_images=None):
- super(ModelToBeAttacked, self).__init__()
- self._network = network
- self._queries = []
- self._defense = defense
- self._detector = None
- self._detected_res = []
- if self._defense:
- self._detector = get_detector(train_images)
-
- def predict(self, inputs):
- """
- predict function
- """
- query_num = inputs.shape[0]
- results = []
- if self._detector:
- for i in range(query_num):
- query = np.expand_dims(inputs[i].astype(np.float32), axis=0)
- result = self._network(Tensor(query)).asnumpy()
- det_num = len(self._detector.get_detected_queries())
- self._detector.detect([query])
- new_det_num = len(self._detector.get_detected_queries())
- # If attack query detected, return random predict result
- if new_det_num > det_num:
- results.append(result + np.random.rand(*result.shape))
- self._detected_res.append(True)
- else:
- results.append(result)
- self._detected_res.append(False)
- results = np.concatenate(results)
- else:
- results = self._network(Tensor(inputs.astype(np.float32))).asnumpy()
- return results
-
- def get_detected_result(self):
- return self._detected_res
-
-
- def test_black_defense():
- # load trained network
- current_dir = os.path.dirname(os.path.abspath(__file__))
- ckpt_name = os.path.abspath(os.path.join(
- current_dir, './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'))
- # ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
- wb_net = LeNet5()
- load_dict = load_checkpoint(ckpt_name)
- load_param_into_net(wb_net, load_dict)
-
- # get test data
- data_list = "./MNIST_unzip/test"
- batch_size = 32
- ds_test = generate_mnist_dataset(data_list, batch_size=batch_size,
- sparse=False)
- inputs = []
- labels = []
- for data in ds_test.create_tuple_iterator():
- inputs.append(data[0].astype(np.float32))
- labels.append(data[1])
- inputs = np.concatenate(inputs).astype(np.float32)
- labels = np.concatenate(labels).astype(np.float32)
- labels_sparse = np.argmax(labels, axis=1)
-
- target_label = np.random.randint(0, 10, size=labels_sparse.shape[0])
- for idx in range(labels_sparse.shape[0]):
- while target_label[idx] == labels_sparse[idx]:
- target_label[idx] = np.random.randint(0, 10)
- target_label = np.eye(10)[target_label].astype(np.float32)
-
- attacked_size = 50
- benign_size = 500
-
- attacked_sample = inputs[:attacked_size]
- attacked_true_label = labels[:attacked_size]
- benign_sample = inputs[attacked_size:attacked_size + benign_size]
-
- wb_model = ModelToBeAttacked(wb_net)
-
- # gen white-box adversarial examples of test data
- wb_attack = FastGradientSignMethod(wb_net, eps=0.3)
- wb_adv_sample = wb_attack.generate(attacked_sample,
- attacked_true_label)
-
- wb_raw_preds = softmax(wb_model.predict(wb_adv_sample), axis=1)
- accuracy_test = np.mean(
- np.equal(np.argmax(wb_model.predict(attacked_sample), axis=1),
- np.argmax(attacked_true_label, axis=1)))
- LOGGER.info(TAG, "prediction accuracy before white-box attack is : %s",
- accuracy_test)
- accuracy_adv = np.mean(np.equal(np.argmax(wb_raw_preds, axis=1),
- np.argmax(attacked_true_label, axis=1)))
- LOGGER.info(TAG, "prediction accuracy after white-box attack is : %s",
- accuracy_adv)
-
- # improve the robustness of model with white-box adversarial examples
- loss = SoftmaxCrossEntropyWithLogits(is_grad=False, sparse=False)
- opt = nn.Momentum(wb_net.trainable_params(), 0.01, 0.09)
-
- nad = NaturalAdversarialDefense(wb_net, loss_fn=loss, optimizer=opt,
- bounds=(0.0, 1.0), eps=0.3)
- wb_net.set_train(False)
- nad.batch_defense(inputs[:5000], labels[:5000], batch_size=32, epochs=10)
-
- wb_def_preds = wb_net(Tensor(wb_adv_sample)).asnumpy()
- wb_def_preds = softmax(wb_def_preds, axis=1)
- accuracy_def = np.mean(np.equal(np.argmax(wb_def_preds, axis=1),
- np.argmax(attacked_true_label, axis=1)))
- LOGGER.info(TAG, "prediction accuracy after defense is : %s", accuracy_def)
-
- # calculate defense evaluation metrics for defense against white-box attack
- wb_def_evaluate = DefenseEvaluate(wb_raw_preds, wb_def_preds,
- np.argmax(attacked_true_label, axis=1))
- LOGGER.info(TAG, 'defense evaluation for white-box adversarial attack')
- LOGGER.info(TAG,
- 'classification accuracy variance (CAV) is : {:.2f}'.format(
- wb_def_evaluate.cav()))
- LOGGER.info(TAG, 'classification rectify ratio (CRR) is : {:.2f}'.format(
- wb_def_evaluate.crr()))
- LOGGER.info(TAG, 'classification sacrifice ratio (CSR) is : {:.2f}'.format(
- wb_def_evaluate.csr()))
- LOGGER.info(TAG,
- 'classification confidence variance (CCV) is : {:.2f}'.format(
- wb_def_evaluate.ccv()))
- LOGGER.info(TAG, 'classification output stability is : {:.2f}'.format(
- wb_def_evaluate.cos()))
-
- # calculate defense evaluation metrics for defense against black-box attack
- LOGGER.info(TAG, 'defense evaluation for black-box adversarial attack')
- bb_raw_preds = []
- bb_def_preds = []
- raw_query_counts = []
- raw_query_time = []
- def_query_counts = []
- def_query_time = []
- def_detection_counts = []
-
- # gen black-box adversarial examples of test data
- bb_net = LeNet5()
- load_param_into_net(bb_net, load_dict)
- bb_model = ModelToBeAttacked(bb_net, defense=False)
- attack_rm = GeneticAttack(model=bb_model, pop_size=6, mutation_rate=0.05,
- per_bounds=0.1, step_size=0.25, temp=0.1,
- sparse=False)
- attack_target_label = target_label[:attacked_size]
- true_label = labels_sparse[:attacked_size + benign_size]
- # evaluate robustness of original model
- # gen black-box adversarial examples of test data
- for idx in range(attacked_size):
- raw_st = time.time()
- raw_sl, raw_a, raw_qc = attack_rm.generate(
- np.expand_dims(attacked_sample[idx], axis=0),
- np.expand_dims(attack_target_label[idx], axis=0))
- raw_t = time.time() - raw_st
- bb_raw_preds.extend(softmax(bb_model.predict(raw_a), axis=1))
- raw_query_counts.extend(raw_qc)
- raw_query_time.append(raw_t)
-
- for idx in range(benign_size):
- raw_st = time.time()
- bb_raw_pred = softmax(
- bb_model.predict(np.expand_dims(benign_sample[idx], axis=0)),
- axis=1)
- raw_t = time.time() - raw_st
- bb_raw_preds.extend(bb_raw_pred)
- raw_query_counts.extend([0])
- raw_query_time.append(raw_t)
-
- accuracy_test = np.mean(
- np.equal(np.argmax(bb_raw_preds[0:len(attack_target_label)], axis=1),
- np.argmax(attack_target_label, axis=1)))
- LOGGER.info(TAG, "attack success before adv defense is : %s",
- accuracy_test)
-
- # improve the robustness of model with similarity-based detector
- bb_def_model = ModelToBeAttacked(bb_net, defense=True,
- train_images=inputs[0:6000])
- # attack defensed model
- attack_dm = GeneticAttack(model=bb_def_model, pop_size=6,
- mutation_rate=0.05,
- per_bounds=0.1, step_size=0.25, temp=0.1,
- sparse=False)
- for idx in range(attacked_size):
- def_st = time.time()
- def_sl, def_a, def_qc = attack_dm.generate(
- np.expand_dims(attacked_sample[idx], axis=0),
- np.expand_dims(attack_target_label[idx], axis=0))
- def_t = time.time() - def_st
- det_res = bb_def_model.get_detected_result()
- def_detection_counts.append(np.sum(det_res[-def_qc[0]:]))
- bb_def_preds.extend(softmax(bb_def_model.predict(def_a), axis=1))
- def_query_counts.extend(def_qc)
- def_query_time.append(def_t)
-
- for idx in range(benign_size):
- def_st = time.time()
- bb_def_pred = softmax(
- bb_def_model.predict(np.expand_dims(benign_sample[idx], axis=0)),
- axis=1)
- def_t = time.time() - def_st
- det_res = bb_def_model.get_detected_result()
- def_detection_counts.append(np.sum(det_res[-1]))
- bb_def_preds.extend(bb_def_pred)
- def_query_counts.extend([0])
- def_query_time.append(def_t)
-
- accuracy_adv = np.mean(
- np.equal(np.argmax(bb_def_preds[0:len(attack_target_label)], axis=1),
- np.argmax(attack_target_label, axis=1)))
- LOGGER.info(TAG, "attack success rate after adv defense is : %s",
- accuracy_adv)
-
- bb_raw_preds = np.array(bb_raw_preds).astype(np.float32)
- bb_def_preds = np.array(bb_def_preds).astype(np.float32)
- # check evaluate data
- max_queries = 6000
-
- def_evaluate = BlackDefenseEvaluate(bb_raw_preds, bb_def_preds,
- np.array(raw_query_counts),
- np.array(def_query_counts),
- np.array(raw_query_time),
- np.array(def_query_time),
- np.array(def_detection_counts),
- true_label, max_queries)
-
- LOGGER.info(TAG, 'query count variance of adversaries is : {:.2f}'.format(
- def_evaluate.qcv()))
- LOGGER.info(TAG, 'attack success rate variance of adversaries '
- 'is : {:.2f}'.format(def_evaluate.asv()))
- LOGGER.info(TAG, 'false positive rate (FPR) of the query-based detector '
- 'is : {:.2f}'.format(def_evaluate.fpr()))
- LOGGER.info(TAG, 'the benign query response time variance (QRV) '
- 'is : {:.2f}'.format(def_evaluate.qrv()))
-
-
- if __name__ == '__main__':
- test_black_defense()
|