import os import re import sys import json import pickle import logging import subprocess import numpy as np from collections import deque from evaluator import Evaluator from network import ShuffleNetV2OneShot, PARSED_FLOPS LAYER_CHOICE = "layer_choice" INPUT_CHOICE = "input_choice" _logger = logging.getLogger(__name__) class SPOSEvolution: """ SPOS evolution tuner. Parameters ---------- max_epochs : int Maximum number of epochs to run. num_select : int Number of survival candidates of each epoch. num_population : int Number of candidates at the start of each epoch. If candidates generated by crossover and mutation are not enough, the rest will be filled with random candidates. m_prob : float The probability of mutation. num_crossover : int Number of candidates generated by crossover in each epoch. num_mutation : int Number of candidates generated by mutation in each epoch. """ def __init__(self, max_epochs=20, num_select=10, num_population=50, m_prob=0.1, num_crossover=25, num_mutation=25, epoch=0): assert num_population >= num_select self.max_epochs = max_epochs self.num_select = num_select self.num_population = num_population self.m_prob = m_prob self.num_crossover = num_crossover self.num_mutation = num_mutation self.epoch = epoch self.search_space = None self.random_state = np.random.RandomState(0) # self.evl = Evaluator() # async status self._to_evaluate_queue = deque() self._sending_parameter_queue = deque() self._pending_result_ids = set() self._reward_dict = dict() self._id2candidate = dict() self._st_callback = None self.cand_path = "./checkpoints" self.acc_path = "./acc" self.candidates = [] if epoch == 0 else self.load_candidates() # 第一轮初始尚未有生成的种群 def load_candidates(self): # 从self.export_result()写入文件的候选模型,需要读入 # {"LayerChoice1": [false, false, false, true], ... } -> {"LayerChoice1": {"_idx":3, "_value":"3"}, ... } print("## evolution -- load ## begin to load candidates in evolution...\n") file_dir, _, files = next(os.walk(self.cand_path)) files = [i for i in files if "%03d_"%(self.epoch-1) in i] def get_true_index(l): return [i for i in range(len(l)) if l[i]][0] candidates = [] for file in files: with open(os.path.join(file_dir, file), "r") as f: candidate = json.load(f) # 转换成合适的形式 cand = {} for key, value in candidate.items(): v = get_true_index(value) value = {"_value":str(v), "_idx":int(v)} cand.update({key:value}) candidates.append(cand) print("## evolution -- load ## candidates loaded \n") return candidates def load_id2candidate(self): with open("./id2cand/%03d_id2candidate.json"%(self.epoch - 1), "r") as f: self.id2candidate = json.load(f) def update_search_space(self, search_space): """ Handle the initialization/update event of search space. """ print("## evolution -- update ## updating search space") self._search_space = search_space self._next_round() print("## evolution -- update ## search space updated") def _next_round(self): _logger.info("Epoch %d, generating...", self.epoch) if self.epoch == 0: self._get_random_population() self.export_results(self.candidates) self.evaluate_cands() # 评估全部的模型 else: self.load_id2candidate() self.receive_trial_result() best_candidates = self._select_top_candidates() if self.epoch >= self.max_epochs: return self.candidates = self._get_mutation(best_candidates) + self._get_crossover(best_candidates) self._get_random_population() self.export_results(self.candidates) self.evaluate_cands() # 评估全部的模型 self.epoch += 1 def _random_candidate(self): chosen_arch = dict() for key, val in self._search_space.items(): if val["_type"] == LAYER_CHOICE: choices = val["_value"] index = self.random_state.randint(len(choices)) chosen_arch[key] = {"_value": choices[index], "_idx": index} elif val["_type"] == INPUT_CHOICE: raise NotImplementedError("Input choice is not implemented yet.") return chosen_arch def _add_to_evaluate_queue(self, cand): _logger.info("Generate candidate %s, adding to eval queue.", self._get_architecture_repr(cand)) self._reward_dict[self._hashcode(cand)] = 0. self._to_evaluate_queue.append(cand) def _get_random_population(self): while len(self.candidates) < self.num_population: cand = self._random_candidate() if self._is_legal(cand): _logger.info("Random candidate generated.") self._add_to_evaluate_queue(cand) self.candidates.append(cand) def _get_crossover(self, best): result = [] for _ in range(10 * self.num_crossover): cand_p1 = best[self.random_state.randint(len(best))] cand_p2 = best[self.random_state.randint(len(best))] assert cand_p1.keys() == cand_p2.keys() cand = {k: cand_p1[k] if self.random_state.randint(2) == 0 else cand_p2[k] for k in cand_p1.keys()} if self._is_legal(cand): result.append(cand) self._add_to_evaluate_queue(cand) if len(result) >= self.num_crossover: break _logger.info("Found %d architectures with crossover.", len(result)) return result def _get_mutation(self, best): result = [] for _ in range(10 * self.num_mutation): cand = best[self.random_state.randint(len(best))].copy() mutation_sample = np.random.random_sample(len(cand)) for s, k in zip(mutation_sample, cand): if s < self.m_prob: choices = self._search_space[k]["_value"] index = self.random_state.randint(len(choices)) cand[k] = {"_value": choices[index], "_idx": index} if self._is_legal(cand): result.append(cand) self._add_to_evaluate_queue(cand) if len(result) >= self.num_mutation: break _logger.info("Found %d architectures with mutation.", len(result)) return result def _get_architecture_repr(self, cand): return re.sub(r"\".*?\": \{\"_idx\": (\d+), \"_value\": \".*?\"\}", r"\1", self._hashcode(cand)) def _is_legal(self, cand): if self._hashcode(cand) in self._reward_dict: return False return True # 将模型输出,并重训练、评估 def evaluate_cands(self): """ 1、对输出的模型进行重训练 2、对重训练后的模型进行评估 以上内容通过tester.py脚本完成 """ print("## evolution -- evaluate ## begin to evaluate candidates...") file_dir, _, files = next(os.walk(self.cand_path)) # 获取文件夹下的文件 files = [i for i in files if "%03d_"%self.epoch in i] for file in files: file = os.path.join(file_dir, file) # self.evl.eval_model(epoch=self.epoch, architecture=file) python_interpreter_path = sys.executable subprocess.run([python_interpreter_path,\ "evaluator.py", "--architecture", file, "--epoch", str(self.epoch)]) print("## evolution -- evaluate ## candidates evaluated") def _select_top_candidates(self): print("## evolution -- select ## begin to select top candidates...") reward_query = lambda cand: self._reward_dict[self._hashcode(cand)] _logger.info("All candidate rewards: %s", list(map(reward_query, self.candidates))) result = sorted(self.candidates, key=reward_query, reverse=True)[:self.num_select] _logger.info("Best candidate rewards: %s", list(map(reward_query, result))) print("## evolution -- select ## selected done") return result @staticmethod def _hashcode(d): return json.dumps(d, sort_keys=True) def _bind_and_send_parameters(self): """ There are two types of resources: parameter ids and candidates. This function is called at necessary times to bind these resources to send new trials with st_callback. """ result = [] while self._sending_parameter_queue and self._to_evaluate_queue: parameter_id = self._sending_parameter_queue.popleft() parameters = self._to_evaluate_queue.popleft() self._id2candidate[parameter_id] = parameters result.append(parameters) self._pending_result_ids.add(parameter_id) self._st_callback(parameter_id, parameters) _logger.info("Send parameter [%d] %s.", parameter_id, self._get_architecture_repr(parameters)) return result def generate_multiple_parameters(self, parameter_id_list, **kwargs): """ Callback function necessary to implement a tuner. This will put more parameter ids into the parameter id queue. """ if "st_callback" in kwargs and self._st_callback is None: self._st_callback = kwargs["st_callback"] for parameter_id in parameter_id_list: self._sending_parameter_queue.append(parameter_id) self._bind_and_send_parameters() return [] # always not use this. might induce problem of over-sending # def receive_trial_result(self, parameter_id, parameters, value, **kwargs): # """ # Callback function. Receive a trial result. # """ # _logger.info("Candidate %d, reported reward %f", parameter_id, value) # self._reward_dict[self._hashcode(self._id2candidate[parameter_id])] = value def receive_trial_result(self): # 获取并更新self._reward_dict file_dir, _, files = next(os.walk(self.acc_path)) files = [i for i in files if "%03d_"%(self.epoch-1) in i] # self.epoch-1: 读取上一轮的结果 acc_dict = {} for file in files: with open(os.path.join(file_dir, file), "r") as f: acc_dict.update(json.load(f)) # {"000_001.json":0.56} for key, value in acc_dict.items(): key = key.lstrip("./checkpoints/") # 删掉路径,仅保留文件名 self._reward_dict.update({self.id2candidate[key]: value}) # todo {self.id2candidate[key]: key} def trial_end(self, parameter_id, success, **kwargs): """ Callback function when a trial is ended and resource is released. """ self._pending_result_ids.remove(parameter_id) if not self._pending_result_ids and not self._to_evaluate_queue: # a new epoch now self._next_round() assert self._st_callback is not None self._bind_and_send_parameters() def export_results(self, result): """ Export a number of candidates to `checkpoints` dir. Parameters ---------- result : dict Chosen architectures to be exported. """ os.makedirs("checkpoints", exist_ok=True) os.makedirs("id2cand", exist_ok=True) self.id2candidate = {} for i, cand in enumerate(result): converted = dict() for cand_key, cand_val in cand.items(): onehot = [k == cand_val["_idx"] for k in range(len(self._search_space[cand_key]["_value"]))] converted[cand_key] = onehot with open(os.path.join("checkpoints", "%03d_%03d.json" % (self.epoch, i)), "w") as fp: json.dump(converted, fp) """ self.id2candidate: { 000_000.json: {"LayerChoice1": {"_values":3, "_idx":3}, "LayerChoice2": {"_values":2, "_idx":2}, ...} ...... } """ self.id2candidate.update({"%03d_%03d.json" % (self.epoch, i): json.dumps(result[i], sort_keys=True)}) with open("./id2cand/%03d_id2candidate.json"%self.epoch, "w") as f: json.dump(self.id2candidate, f) class EvolutionWithFlops(SPOSEvolution): """ This tuner extends the function of evolution tuner, by limiting the flops generated by tuner. Needs a function to examine the flops. """ def __init__(self, flops_limit=330E6, **kwargs): super().__init__(**kwargs) # self.model = ShuffleNetV2OneShot() self.flops_limit = flops_limit with open(os.path.join(os.path.dirname(__file__), "./data/op_flops_dict.pkl"), "rb") as fp: self._op_flops_dict = pickle.load(fp) def _is_legal(self, cand): if not super()._is_legal(cand): return False if self.get_candidate_flops(cand) > self.flops_limit: return False return True def get_candidate_flops(self, candidate): """ this method is the same with ShuffleNetV2OneShot.get_candidate_flops, but we dont need to initialize that class. """ conv1_flops = self._op_flops_dict["conv1"][(3, 16, 224, 224, 2)] rest_flops = self._op_flops_dict["rest_operation"][(640, 1000, 7, 7, 1)] total_flops = conv1_flops + rest_flops for k, m in candidate.items(): parsed_flops_dict = PARSED_FLOPS[k] if isinstance(m, dict): # to be compatible with classical nas format total_flops += parsed_flops_dict[m["_idx"]] else: total_flops += parsed_flops_dict[torch.max(m, 0)[1]] return total_flops