|
- # -*- coding: utf-8 -*-
- # MegEngine is Licensed under the Apache License, Version 2.0 (the "License")
- #
- # Copyright (c) 2014-2021 Megvii Inc. All rights reserved.
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-
- import os
- import unittest
-
- import numpy as np
-
- from megenginelite import *
-
- set_log_level(2)
-
-
- def test_version():
- print("Lite verson: {}".format(version))
-
-
- def test_network_io():
- input_io1 = LiteIO("data1", is_host=False, io_type=LiteIOType.LITE_IO_VALUE)
- input_io2 = LiteIO(
- "data2",
- is_host=True,
- io_type=LiteIOType.LITE_IO_SHAPE,
- layout=LiteLayout([2, 4, 4]),
- )
- io = LiteNetworkIO()
- io.add_input(input_io1)
- io.add_input(input_io2)
-
- output_io1 = LiteIO("out1", is_host=False)
- output_io2 = LiteIO("out2", is_host=True, layout=LiteLayout([1, 1000]))
-
- io.add_output(output_io1)
- io.add_output(output_io2)
-
- assert len(io.inputs) == 2
- assert len(io.outputs) == 2
-
- assert io.inputs[0] == input_io1
- assert io.outputs[0] == output_io1
-
- c_io = io._create_network_io()
-
- assert c_io.input_size == 2
- assert c_io.output_size == 2
-
-
- class TestShuffleNet(unittest.TestCase):
- source_dir = os.getenv("LITE_TEST_RESOUCE")
- input_data_path = os.path.join(source_dir, "input_data.npy")
- correct_data_path = os.path.join(source_dir, "output_data.npy")
- model_path = os.path.join(source_dir, "shufflenet.mge")
- correct_data = np.load(correct_data_path).flatten()
- input_data = np.load(input_data_path)
-
- def check_correct(self, out_data, error=1e-4):
- out_data = out_data.flatten()
- assert np.isfinite(out_data.sum())
- assert self.correct_data.size == out_data.size
- for i in range(out_data.size):
- assert abs(out_data[i] - self.correct_data[i]) < error
-
- def do_forward(self, network, times=3):
- input_name = network.get_input_name(0)
- input_tensor = network.get_io_tensor(input_name)
- output_name = network.get_output_name(0)
- output_tensor = network.get_io_tensor(output_name)
-
- input_tensor.set_data_by_copy(self.input_data)
- for i in range(times):
- network.forward()
- network.wait()
-
- output_data = output_tensor.to_numpy()
- self.check_correct(output_data)
-
-
- class TestNetwork(TestShuffleNet):
- def test_decryption(self):
- model_path = os.path.join(self.source_dir, "shufflenet_crypt_aes.mge")
- config = LiteConfig()
- config.bare_model_cryption_name = "AES_default".encode("utf-8")
- network = LiteNetwork(config)
- network.load(model_path)
- self.do_forward(network)
-
- def test_pack_model(self):
- model_path = os.path.join(self.source_dir, "test_packed_model_rc4.lite")
- network = LiteNetwork()
- network.load(model_path)
- self.do_forward(network)
-
- def test_network_basic(self):
- network = LiteNetwork()
- network.load(self.model_path)
-
- input_name = network.get_input_name(0)
- input_tensor = network.get_io_tensor(input_name)
- output_name = network.get_output_name(0)
- output_tensor = network.get_io_tensor(output_name)
-
- assert input_tensor.layout.shapes[0] == 1
- assert input_tensor.layout.shapes[1] == 3
- assert input_tensor.layout.shapes[2] == 224
- assert input_tensor.layout.shapes[3] == 224
- assert input_tensor.layout.data_type == LiteDataType.LITE_FLOAT
- assert input_tensor.layout.ndim == 4
-
- self.do_forward(network)
-
- def test_network_shared_data(self):
- network = LiteNetwork()
- network.load(self.model_path)
-
- input_name = network.get_input_name(0)
- input_tensor = network.get_io_tensor(input_name)
- output_name = network.get_output_name(0)
- output_tensor = network.get_io_tensor(output_name)
-
- input_tensor.set_data_by_share(self.input_data)
- for i in range(3):
- network.forward()
- network.wait()
-
- output_data = output_tensor.to_numpy()
- self.check_correct(output_data)
-
- def test_network_get_name(self):
- network = LiteNetwork()
- network.load(self.model_path)
-
- input_names = network.get_all_input_name()
- assert input_names[0] == "data"
- output_names = network.get_all_output_name()
- assert output_names[0] == network.get_output_name(0)
-
- self.do_forward(network)
-
- def test_network_set_device_id(self):
- network = LiteNetwork()
- assert network.device_id == 0
-
- network.device_id = 1
- network.load(self.model_path)
- assert network.device_id == 1
-
- with self.assertRaises(RuntimeError):
- network.device_id = 1
-
- self.do_forward(network)
-
- def test_network_set_stream_id(self):
- network = LiteNetwork()
- assert network.stream_id == 0
-
- network.stream_id = 1
- network.load(self.model_path)
- assert network.stream_id == 1
-
- with self.assertRaises(RuntimeError):
- network.stream_id = 1
-
- self.do_forward(network)
-
- def test_network_set_thread_number(self):
- network = LiteNetwork()
- assert network.threads_number == 1
-
- network.threads_number = 2
- network.load(self.model_path)
- assert network.threads_number == 2
-
- with self.assertRaises(RuntimeError):
- network.threads_number = 2
-
- self.do_forward(network)
-
- def test_network_cpu_inplace(self):
- network = LiteNetwork()
- assert network.is_cpu_inplace_mode() == False
-
- network.enable_cpu_inplace_mode()
- network.load(self.model_path)
- assert network.is_cpu_inplace_mode() == True
-
- with self.assertRaises(RuntimeError):
- network.enable_cpu_inplace_mode()
-
- self.do_forward(network)
-
- def test_network_option(self):
- option = LiteOptions()
- option.weight_preprocess = 1
- option.var_sanity_check_first_run = 0
-
- config = LiteConfig(option=option)
- network = LiteNetwork(config=config)
- network.load(self.model_path)
-
- self.do_forward(network)
-
- def test_network_reset_io(self):
- option = LiteOptions()
- option.var_sanity_check_first_run = 0
- config = LiteConfig(option=option)
-
- input_io = LiteIO("data")
- ios = LiteNetworkIO()
- ios.add_input(input_io)
- network = LiteNetwork(config=config, io=ios)
- network.load(self.model_path)
-
- input_tensor = network.get_io_tensor("data")
- assert input_tensor.device_type == LiteDeviceType.LITE_CPU
-
- self.do_forward(network)
-
- def test_network_by_share(self):
- network = LiteNetwork()
- network.load(self.model_path)
-
- input_name = network.get_input_name(0)
- input_tensor = network.get_io_tensor(input_name)
- output_name = network.get_output_name(0)
- output_tensor = network.get_io_tensor(output_name)
-
- assert input_tensor.device_type == LiteDeviceType.LITE_CPU
- layout = LiteLayout(self.input_data.shape, self.input_data.dtype)
- tensor_tmp = LiteTensor(layout=layout)
- tensor_tmp.set_data_by_share(self.input_data)
- input_tensor.share_memory_with(tensor_tmp)
-
- for i in range(3):
- network.forward()
- network.wait()
-
- output_data = output_tensor.to_numpy()
- self.check_correct(output_data)
-
- def test_network_share_weights(self):
- option = LiteOptions()
- option.var_sanity_check_first_run = 0
- config = LiteConfig(option=option)
-
- src_network = LiteNetwork(config=config)
- src_network.load(self.model_path)
-
- new_network = LiteNetwork()
- new_network.enable_cpu_inplace_mode()
- new_network.share_weights_with(src_network)
-
- self.do_forward(src_network)
- self.do_forward(new_network)
-
- def test_network_share_runtime_memory(self):
- option = LiteOptions()
- option.var_sanity_check_first_run = 0
- config = LiteConfig(option=option)
-
- src_network = LiteNetwork(config=config)
- src_network.load(self.model_path)
-
- new_network = LiteNetwork()
- new_network.enable_cpu_inplace_mode()
- new_network.share_runtime_memroy(src_network)
- new_network.load(self.model_path)
-
- self.do_forward(src_network)
- self.do_forward(new_network)
-
- # def test_network_async(self):
- # count = 0
- # finished = False
- #
- # def async_callback():
- # nonlocal finished
- # finished = True
- # return 0
- #
- # option = LiteOptions()
- # option.var_sanity_check_first_run = 0
- # config = LiteConfig(option=option)
- #
- # network = LiteNetwork(config=config)
- # network.load(self.model_path)
- #
- # network.async_with_callback(async_callback)
- #
- # input_tensor = network.get_io_tensor(network.get_input_name(0))
- # output_tensor = network.get_io_tensor(network.get_output_name(0))
- #
- # input_tensor.set_data_by_share(self.input_data)
- # network.forward()
- #
- # while not finished:
- # count += 1
- #
- # assert count > 0
- # output_data = output_tensor.to_numpy()
- # self.check_correct(output_data)
- #
- # def test_network_start_callback(self):
- # network = LiteNetwork()
- # network.load(self.model_path)
- # start_checked = False
- #
- # @start_finish_callback
- # def start_callback(ios):
- # nonlocal start_checked
- # start_checked = True
- # assert len(ios) == 1
- # for key in ios:
- # io = key
- # data = ios[key].to_numpy().flatten()
- # input_data = self.input_data.flatten()
- # assert data.size == input_data.size
- # assert io.name.decode("utf-8") == "data"
- # for i in range(data.size):
- # assert data[i] == input_data[i]
- # return 0
- #
- # network.set_start_callback(start_callback)
- # self.do_forward(network, 1)
- # assert start_checked == True
- #
- # def test_network_finish_callback(self):
- # network = LiteNetwork()
- # network.load(self.model_path)
- # finish_checked = False
- #
- # @start_finish_callback
- # def finish_callback(ios):
- # nonlocal finish_checked
- # finish_checked = True
- # assert len(ios) == 1
- # for key in ios:
- # io = key
- # data = ios[key].to_numpy().flatten()
- # output_data = self.correct_data.flatten()
- # assert data.size == output_data.size
- # for i in range(data.size):
- # assert data[i] == output_data[i]
- # return 0
- #
- # network.set_finish_callback(finish_callback)
- # self.do_forward(network, 1)
- # assert finish_checked == True
-
- def test_enable_profile(self):
- network = LiteNetwork()
- network.load(self.model_path)
- network.enable_profile_performance("./profile.json")
-
- self.do_forward(network)
-
- fi = open("./profile.json", "r")
- fi.close()
- os.remove("./profile.json")
-
- def test_io_txt_dump(self):
- network = LiteNetwork()
- network.load(self.model_path)
- network.io_txt_dump("./io_txt.txt")
- self.do_forward(network)
-
- def test_io_bin_dump(self):
- import shutil
-
- folder = "./out"
- network = LiteNetwork()
- network.load(self.model_path)
- if not os.path.exists(folder):
- os.mkdir(folder)
- network.io_bin_dump(folder)
- self.do_forward(network)
- shutil.rmtree(folder)
-
- def test_algo_workspace_limit(self):
- network = LiteNetwork()
- network.load(self.model_path)
- print("modify the workspace limit.")
- network.set_network_algo_workspace_limit(10000)
- self.do_forward(network)
-
- def test_network_algo_policy(self):
- network = LiteNetwork()
- network.load(self.model_path)
- network.set_network_algo_policy(
- LiteAlgoSelectStrategy.LITE_ALGO_PROFILE
- | LiteAlgoSelectStrategy.LITE_ALGO_REPRODUCIBLE
- )
- self.do_forward(network)
-
- def test_network_algo_policy_ignore_batch(self):
- network = LiteNetwork()
- network.load(self.model_path)
- network.set_network_algo_policy(
- LiteAlgoSelectStrategy.LITE_ALGO_PROFILE,
- shared_batch_size=1,
- binary_equal_between_batch=True,
- )
- self.do_forward(network)
|