# -*- coding: utf-8 -*- # MegEngine is Licensed under the Apache License, Version 2.0 (the "License") # # Copyright (c) 2014-2020 Megvii Inc. All rights reserved. # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. import io from tempfile import mkstemp import numpy as np import pytest from megengine import tensor from megengine.core.ops import builtin as ops from megengine.core.tensor import megbrain_graph as G from megengine.core.tensor.core import apply from megengine.core.tensor.raw_tensor import as_raw_tensor from megengine.functional import exp, log from megengine.jit import exclude_from_trace, trace def test_trace(): for symbolic in [False, True]: @trace(symbolic=symbolic) def f(x): op = ops.Elemwise(mode="negate") (y,) = apply(op, x) return y x = as_raw_tensor([1]).numpy() y = f.__wrapped__(as_raw_tensor(x)).numpy() for i in range(3): np.testing.assert_equal(f(as_raw_tensor(x)).numpy(), y) def test_exclude_from_trace(): for symbolic in [False, True]: @trace(symbolic=symbolic) def f(x): neg = ops.Elemwise(mode="negate") (x,) = apply(neg, x) with exclude_from_trace(): if i % 2: (x,) = apply(neg, x) (x,) = apply(neg, x) return x x = as_raw_tensor([1]).numpy() for i in range(3): y = f.__wrapped__(as_raw_tensor(x)).numpy() np.testing.assert_equal(f(as_raw_tensor(x)).numpy(), y) def test_print_in_trace(): for symbolic in [False]: # cannot read value in symbolic mode @trace(symbolic=symbolic) def f(x): nonlocal buf neg = ops.Elemwise(mode="negate") (x,) = apply(neg, x) buf = x.numpy() (x,) = apply(neg, x) return x buf = None x = as_raw_tensor([1]).numpy() for i in range(3): y = f.__wrapped__(as_raw_tensor(x)).numpy() z = buf buf = None np.testing.assert_equal(f(as_raw_tensor(x)).numpy(), y) np.testing.assert_equal(z, buf) def test_dump(): @trace(symbolic=True, capture_as_const=True) def f(x): op = ops.Elemwise(mode="negate") (y,) = apply(op, x) return y x = as_raw_tensor([1]).numpy() y = f.__wrapped__(as_raw_tensor(x)).numpy() for i in range(3): np.testing.assert_equal(f(as_raw_tensor(x)).numpy(), y) file = io.BytesIO() f.dump(file) def test_trace_profiler(): for symbolic in [False, True]: @trace(symbolic=symbolic, profiling=True) def f(x): op = ops.Elemwise(mode="negate") (y,) = apply(op, x) return y x = as_raw_tensor([1]).numpy() y = f.__wrapped__(as_raw_tensor(x)).numpy() f(as_raw_tensor(x)) f(as_raw_tensor(x)) # XXX: has to run twice out = f.get_profile() assert out.get("profiler") @pytest.mark.skip(reason="eq_to_unit failed in inplace.cpp") def test_goptions_div_zero(): @trace(symbolic=True, opt_level=0) def f(x): return x / x @trace(symbolic=True, opt_level=1) def g(x): return x / x out = f(tensor(0.0)) if out == out: raise ValueError("actual result should be nan") out = g(tensor(0.0)) if out != out: raise ValueError("actual result should be 1") @pytest.mark.skip(reason="cast to Elemwise failed in inplace.cpp") def test_goptions_log_exp(): @trace(symbolic=True, opt_level=0, capture_as_const=True) def f(x): return log(exp(x)) @trace(symbolic=True, opt_level=1, capture_as_const=True) def g(x): return log(exp(x)) f(tensor(1.0)) _, out = mkstemp() f.dump(out) *_, outputs = G.load_comp_graph_from_file(out) oprs_1 = cgtools.get_oprs_seq(outputs) g(tensor(1.0)) g.dump(out) *_, outputs = G.load_comp_graph_from_file(out) oprs_2 = cgtools.get_oprs_seq(outputs) assert len(oprs_1) - len(oprs_2) == 2 @pytest.mark.skip(reason="need cgtools to check final oprs") def test_goptions_log_sum_exp(): @trace(symbolic=True, opt_level=0, capture_as_const=True) def f(x, y): return log(exp(x) + exp(y)) @trace(symbolic=True, opt_level=1, capture_as_const=True) def g(x, y): return log(exp(x) + exp(y)) f(tensor(1.0), tensor(2.0)) _, out = mkstemp() f.dump(out) *_, outputs = G.load_comp_graph_from_file(out) oprs_1 = cgtools.get_oprs_seq(outputs) g(tensor(1.0), tensor(2.0)) g.dump(out) *_, outputs = G.load_comp_graph_from_file(out) oprs_2 = cgtools.get_oprs_seq(outputs) assert len(oprs_1) - len(oprs_2) == 2 @pytest.mark.skip(reason="need cgtools to check computing input dtype") def test_optimize_for_inference(): @trace(symbolic=True, capture_as_const=True) def f(x): return exp(x) _, out = mkstemp() f(tensor(5.0)) f.dump(out, optimize_for_inference=True, optimize_options={"enable_io16xc32": True}) res = G.load_comp_graph_from_file(out) computing_input = res.output_vars_list[0].owner.inputs[0] assert computing_input.dtype == np.float16