# -*- coding: utf-8 -*- # MegEngine is Licensed under the Apache License, Version 2.0 (the "License") # # Copyright (c) 2014-2020 Megvii Inc. All rights reserved. # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. """plugins associated with computing graph""" import atexit import collections import json import os import signal import struct import numpy as np from . import mgb as _mgb from .logconf import get_logger InfkernFinderInputValueRec = collections.namedtuple( "InfkernFinderInputValueRec", ["var_name", "var_id", "run_id", "value"] ) class CompGraphProfiler(_mgb._CompGraphProfilerImpl): """a plugin to profile computing graphs""" def __init__(self, comp_graph): super().__init__(comp_graph) def get(self): """get visualizable profiling result on a function""" return json.loads(self._get_result()) def write_json(self, fobj): """write the result to a json file :param fobj: a file-like object, or a string """ if isinstance(fobj, str): with open(fobj, "w") as fout: return self.write_json(fout) fobj.write(self._get_result()) class NumRangeChecker(_mgb._NumRangeCheckerImpl): """check that all numberical float values of variables in a computing graph are within given range""" def __init__(self, comp_graph, max_abs_val): """:param max_abs_val: max absolute value""" super().__init__(comp_graph, float(max_abs_val)) class TextOprIODump(_mgb._TextOprIODumpImpl): """dump all internal results as text to a file""" def __init__(self, comp_graph, fpath, *, print_addr=None, max_size=None): super().__init__(comp_graph, fpath) if print_addr is not None: self.print_addr(print_addr) if max_size is not None: self.max_size(max_size) def print_addr(self, flag): """set whether to print var address :return: self """ self._print_addr(flag) return self def max_size(self, size): """set the number of elements to be printed for each var :return: self """ self._max_size(size) return self class BinaryOprIODump(_mgb._BinaryOprIODumpImpl): """dump all internal results binary files to a directory; the values can be loaded by :func:`load_tensor_binary` """ def __init__(self, comp_graph, dir_path): super().__init__(comp_graph, dir_path) class InfkernFinder(_mgb._InfkernFinderImpl): """a plugin to find kernels that cause infinite loops""" def __init__(self, comp_graph, record_input_value): """ :param record_input_value: whether need to record input var values of all operators :type record_input_value: bool """ super().__init__(comp_graph, record_input_value) def write_to_file(self, fpath): """write current execution status to a text file :return: ID of the first operator that is still not finished, or None if all oprs are finished :rtype: int or None """ v = self._write_to_file(fpath) if v == 0: return return v - 1 def get_input_values(self, opr_id): """get recorded input values of a given operator. Return a list of :class:`InfkernFinderInputValueRec`. Note that the value in each item is either None (if it is not recorded) or a numpy array """ ret = [] for idx in range(self._get_input_values_prepare(opr_id)): vn = self._get_input_values_var_name(idx) vi = self._get_input_values_var_idx(idx) ri = self._get_input_values_run_id(idx) val = self._get_input_values_val(idx) if not val.shape: val = None else: val = val.get_value() ret.append(InfkernFinderInputValueRec(vn, vi, ri, val)) return ret def fast_signal_hander(signum, callback): """bypass python's signal handling system and registera handler that is called ASAP in a dedicated thread (in contrary, python calls handlers in the main thread) :param callback: signal callback, taking the signal number as its sole argument """ def cb_wrapped(): try: callback(signum) except: get_logger().exception("error calling signal handler for {}".format(signum)) _mgb._FastSignal.register_handler(signum, cb_wrapped) atexit.register(_mgb._FastSignal.shutdown) class GlobalInfkernFinder: """ manage a list of :class:`InfkernFinder` objects; when this process is signaled with SIGUSR1, an interactive IPython shell would be presented for further investigation """ _signal = signal.SIGUSR1 _registry = [] _shell_maker = None @classmethod def add_graph(cls, comp_graph): """register a graph so it can be tracked by :class:`InfkernFinder`""" enabled = os.getenv("MGB_DBG_INFKERN_FINDER") if not enabled: return if enabled == "1": record_input_value = False else: assert enabled == "2", ( "MGB_DBG_INFKERN_FINDER must be either 1 or 2, indicating " "whether to record input values" ) record_input_value = True finder = InfkernFinder(comp_graph, record_input_value) get_logger().warning( "interactive InfkernFinder {} registered to graph {}; all input " "var values would be recorded and the graph would never be " "reclaimed. You can enter the interactive debug session by " 'executing "kill -{} {}". record_input_value={}'.format( finder, comp_graph, cls._signal, os.getpid(), record_input_value ) ) if not cls._registry: from IPython.terminal.embed import InteractiveShellEmbed cls._shell_maker = InteractiveShellEmbed fast_signal_hander(signal.SIGUSR1, cls._on_signal) cls._registry.append(finder) @classmethod def _on_signal(cls, signum): shell = cls._shell_maker() shell( header="Enter interactive InfkernFinder session; the registered " "finder objects can be found in variable f", local_ns={"f": cls._registry}, ) def load_tensor_binary(fobj): """load a tensor dumped by the :class:`BinaryOprIODump` plugin; the actual tensor value dump is implemented by ``mgb::debug::dump_tensor``. Multiple values can be compared by ``tools/compare_binary_iodump.py``. :param fobj: file object, or a string that contains the file name :return: tuple ``(tensor_value, tensor_name)`` """ if isinstance(fobj, str): with open(fobj, "rb") as fin: return load_tensor_binary(fin) DTYPE_LIST = { 0: np.float32, 1: np.uint8, 2: np.int8, 3: np.int16, 4: np.int32, 5: _mgb.intb1, 6: _mgb.intb2, 7: _mgb.intb4, 8: None, 9: np.float16, # quantized dtype start from 100000 # see MEGDNN_PARAMETERIZED_DTYPE_ENUM_BASE in # dnn/include/megdnn/dtype.h 100000: np.uint8, 100001: np.int32, 100002: np.int8, } header_fmt = struct.Struct("III") name_len, dtype, max_ndim = header_fmt.unpack(fobj.read(header_fmt.size)) assert ( DTYPE_LIST[dtype] is not None ), "Cannot load this tensor: dtype Byte is unsupported." shape = list(struct.unpack("I" * max_ndim, fobj.read(max_ndim * 4))) while shape[-1] == 0: shape.pop(-1) name = fobj.read(name_len).decode("ascii") return np.fromfile(fobj, dtype=DTYPE_LIST[dtype]).reshape(shape), name