|
- # -*- coding: utf-8 -*-
- # MegEngine is Licensed under the Apache License, Version 2.0 (the "License")
- #
- # Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- import base64
- import json
- import os
- import re
- from typing import Iterable, List, Optional
-
- from ..core._imperative_rt import OperatorNodeConfig, ProfileEntry
- from ..core._imperative_rt import ProfilerImpl as _Profiler
- from ..core._imperative_rt.imperative import sync
- from ..core._imperative_rt.ops import CollectiveComm
-
-
- def _make_dict(**kwargs):
- unused_keys = []
- for k, v in kwargs.items():
- if v is None:
- unused_keys.append(k)
- for k in unused_keys:
- del kwargs[k]
- return kwargs
-
-
- def _print_opnode_config(config):
- return _make_dict(
- name=config.name, dtype=config.dtype, comp_node_arr=config.comp_node_arr,
- )
-
-
- def _dump_chrome_timeline(entries: List[ProfileEntry], path: str):
- pid = os.getpid()
- trace_events = []
-
- def append_event(**kwargs):
- trace_events.append(_make_dict(**kwargs))
-
- for id, entry in enumerate(entries):
- op = entry.op
- name = type(op).__name__
- host_begin, host_end = entry.host
- device_list = entry.device_list
- args = Profiler.fetch_attrs(op)
- args["__id__"] = "[{}]".format(id)
- cat = name
- for ts, ph in [(host_begin, "B"), (host_end, "E")]:
- append_event(
- name=name, ph=ph, ts=ts * 1000, pid=pid, tid="host", args=args, cat=cat,
- )
- for device, device_begin, device_end in device_list:
- for ts, ph in [(device_begin(), "B"), (device_end(), "E")]:
- append_event(
- name=name, ph=ph, ts=ts * 1000, pid=pid, tid=str(device), args=args,
- )
- with open("{}.chrome_timeline.json".format(path), "w") as f:
- json.dump(trace_events, f, indent=2)
-
-
- def _dump_compatible(entries: List[ProfileEntry], path: str):
- obj = {
- "graph_exec": {"var": [], "operator": {}},
- "profiler": {"device": {}, "host": {}, "opr_footprint": {}},
- }
- var_list = obj["graph_exec"]["var"]
- operator_dict = obj["graph_exec"]["operator"]
- device_dict = obj["profiler"]["device"]
- host_dict = obj["profiler"]["host"]
- opr_foot_print_dict = obj["profiler"]["opr_footprint"]
-
- def add_var(var) -> int:
- var_id = len(var_list)
- var_list.append(
- {"comp_node": str(var[2]),}
- )
- return var_id
-
- for op_id, entry in enumerate(entries):
- operator_dict[op_id] = {
- "input": [add_var(var) for var in entry.inputs],
- "output": [add_var(var) for var in entry.outputs],
- "name": str(entry.op.ctype()),
- "type": "imperative",
- "id": entry.id,
- }
- op_device_dict = {}
- for device, device_begin, device_end in entry.device_list:
- op_device_dict[str(device)] = {
- "start": device_begin(),
- "kern": device_begin(),
- "end": device_end(),
- }
- device_dict[op_id] = op_device_dict
- host_begin, host_end = entry.host
- host_dict[op_id] = {
- "host": {"start": host_begin, "kern": host_begin, "end": host_end}
- }
- opr_footprint = {
- "out_shapes": [oup[1] for oup in entry.outputs],
- "in_shapes": [inp[1] for inp in entry.inputs],
- "params": {},
- }
- if entry.memory > 0:
- opr_footprint["memory"] = entry.memory
- if entry.computation > 0:
- opr_footprint["computation"] = entry.computation
- opr_foot_print_dict[op_id] = opr_footprint
- with open("{}.compatible.json".format(path), "w") as f:
- json.dump(obj, f, indent=2)
-
-
- def _dump_graphviz(entries: List[ProfileEntry], path: str):
- import json
-
- import graphviz
-
- graph = graphviz.Digraph()
- graph.graph_attr["ordering"] = "out"
- var_cache = {}
-
- def cache_var(var_id, var_shape):
- if var_id not in var_cache:
- var_name = "var({})".format(var_id)
- var_label = "{}\nshape:{}\n".format(var_name, shape)
- graph.node(var_name, var_label)
- var_cache[var_id] = var_name
- return var_cache[var_id]
-
- for op_id, entry in enumerate(entries):
- op = entry.op
- op_name = "op({})".format(op_id)
- op_type = type(op).__name__
- op_attrs = Profiler.fetch_attrs(op)
- label_lines = []
- if "param" in op_attrs:
- del op_attrs["param"]
- label_lines.append("{}:{}".format(op_name, op_type))
- for k, v in op_attrs.items():
- label_lines.append("attr[{}]: {}".format(k, v))
- op_param_str = entry.param
- if len(op_param_str) > 0:
- op_param = json.loads(op_param_str)
- for k, v in op_param.items():
- label_lines.append("param[{}]:{}".format(k, v))
- host_begin, host_end = entry.host
- label_lines.append("time[host]: {:f}ms".format(host_end - host_begin))
- for device, device_begin, device_end in entry.device_list:
- device_time = device_end() - device_begin()
- label_lines.append("time[{}]: {:f}ms".format(device, device_time))
- op_label = "\n".join(label_lines)
- graph.node(op_name, op_label, shape="rectangle")
- for var_id, shape, device in entry.inputs:
- graph.edge(cache_var(var_id, shape), op_name)
- for var_id, shape, device in entry.outputs:
- graph.edge(op_name, cache_var(var_id, shape))
- graph.save("{}.graphviz.dot".format(path))
-
-
- class Profiler:
- r"""
- Profile graph execution in imperative mode.
-
- :type path: Optional[str]
- :param path: default path prefix for profiler to dump.
-
- Examples:
-
- .. code-block::
-
- import megengine as mge
- import megengine.module as M
- from megengine.utils.profiler import Profiler
-
- # With Learnable Parameters
- for iter in range(0, 10):
- # Only profile record of last iter would be saved
- with Profiler("profile"):
- # your code here
-
- # Then open the profile file in chrome timeline window
- """
-
- CHROME_TIMELINE = "chrome_timeline"
- COMPATIBLE = "compatible"
- GRAPHVIZ = "graphviz"
-
- WITH_FOOTPRINT = 1
-
- _type_map = {
- OperatorNodeConfig: lambda x: _print_opnode_config(x),
- bytes: lambda x: base64.encodebytes(x).decode("ascii"),
- CollectiveComm.Mode: lambda x: str(x),
- }
-
- _dumper_map = {
- CHROME_TIMELINE: _dump_chrome_timeline,
- COMPATIBLE: _dump_compatible,
- GRAPHVIZ: _dump_graphviz,
- }
-
- def __init__(
- self,
- path: str = "profile",
- *,
- formats: Iterable[str] = (CHROME_TIMELINE,),
- type_filter: str = ".*",
- exit_dump: bool = True
- ) -> None:
- self._impl = _Profiler()
- self._path = path
-
- if isinstance(formats, str):
- formats = (formats,)
-
- self._filter = type_filter
- self._dumpers = [Profiler._dumper_map[fmt] for fmt in formats]
- self._exit_dump = exit_dump
-
- def __enter__(self):
- sync()
- self._impl.start(Profiler.WITH_FOOTPRINT)
- return self
-
- def __exit__(self, val, tp, trace):
- if self._exit_dump:
- self.dump()
- sync()
- self._impl.stop()
- self._impl.clear()
-
- @classmethod
- def fetch_attrs(cls, op):
- attrs = dir(op)
- results = {}
- for attr in attrs:
- if attr.startswith("_"):
- continue
- value = op.__getattribute__(attr)
- if callable(value):
- continue
- value_type = type(value)
- if value_type in cls._type_map:
- value = cls._type_map[value_type](value)
- results[attr] = value
- return results
-
- def dump(self, path: Optional[str] = None):
- sync()
- raw = [
- entry
- for entry in self._impl.dump()
- if re.match(self._filter, type(entry.op).__name__)
- ]
- if path is None:
- path = self._path
- for dumper in self._dumpers:
- dumper(raw, path)
-
- def __call__(self, func):
- def wrapper(*args, **kwargs):
- with self:
- return func(*args, **kwargs)
-
- return wrapper
-
-
- profile = Profiler
|