|
- # -*- coding: utf-8 -*-
- # MegEngine is Licensed under the Apache License, Version 2.0 (the "License")
- #
- # Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- import functools
- import multiprocessing as mp
- import platform
-
- import numpy as np
- import pytest
-
- import megengine as mge
- import megengine.distributed as dist
- from megengine import Tensor
- from megengine.core._trace_option import use_tensor_shape
- from megengine.module import BatchNorm1d, BatchNorm2d, SyncBatchNorm
-
- _assert_allclose = functools.partial(np.testing.assert_allclose, atol=5e-6, rtol=5e-6)
-
-
- @pytest.mark.skipif(
- platform.system() == "Darwin", reason="do not imp GPU mode at macos now"
- )
- @pytest.mark.skipif(
- platform.system() == "Windows", reason="do not imp GPU mode at Windows now"
- )
- @pytest.mark.isolated_distributed
- def test_syncbn():
- nr_chan = 8
- data_shape = (3, nr_chan, 4, 16)
- momentum = 0.9
- eps = 1e-5
- running_mean = np.zeros((1, nr_chan, 1, 1), dtype=np.float32)
- running_var = np.ones((1, nr_chan, 1, 1), dtype=np.float32)
- steps = 4
- nr_ranks = 2
- server = dist.Server(0)
- port = server.py_server_port
-
- def worker(rank, data, yv_expect, running_mean, running_var):
- if mge.get_device_count("gpu") < nr_ranks:
- return
- dist.init_process_group("localhost", port, nr_ranks, rank, rank)
- bn = SyncBatchNorm(nr_chan, momentum=momentum, eps=eps)
- for i in range(steps):
- yv = bn(Tensor(data[i]))
-
- _assert_allclose(yv.numpy(), yv_expect)
- _assert_allclose(bn.running_mean.numpy(), running_mean)
- _assert_allclose(bn.running_var.numpy(), running_var)
-
- xv = []
- for i in range(steps):
- xv.append(np.random.normal(loc=2.3, size=data_shape).astype(np.float32))
- xv_transposed = np.transpose(xv[i], [0, 2, 3, 1]).reshape(
- (data_shape[0] * data_shape[2] * data_shape[3], nr_chan)
- )
-
- mean = np.mean(xv_transposed, axis=0).reshape(1, nr_chan, 1, 1)
-
- var_biased = np.var(xv_transposed, axis=0).reshape((1, nr_chan, 1, 1))
- sd = np.sqrt(var_biased + eps)
-
- var_unbiased = np.var(xv_transposed, axis=0, ddof=1).reshape((1, nr_chan, 1, 1))
- running_mean = running_mean * momentum + mean * (1 - momentum)
- running_var = running_var * momentum + var_unbiased * (1 - momentum)
-
- yv_expect = (xv[i] - mean) / sd
-
- data = []
- for i in range(nr_ranks):
- data.append([])
- for j in range(steps):
- data[i].append(xv[j][:, :, :, i * 8 : i * 8 + 8])
-
- procs = []
- for rank in range(nr_ranks):
- p = mp.Process(
- target=worker,
- args=(
- rank,
- data[rank],
- yv_expect[:, :, :, rank * 8 : rank * 8 + 8],
- running_mean,
- running_var,
- ),
- )
- p.start()
- procs.append(p)
-
- for p in procs:
- p.join(10)
- assert p.exitcode == 0
-
-
- def test_batchnorm():
- nr_chan = 8
- data_shape = (3, nr_chan, 4)
- momentum = 0.9
- bn = BatchNorm1d(nr_chan, momentum=momentum)
- running_mean = np.zeros((1, nr_chan, 1), dtype=np.float32)
- running_var = np.ones((1, nr_chan, 1), dtype=np.float32)
- for i in range(3):
- xv = np.random.normal(loc=2.3, size=data_shape).astype(np.float32)
- mean = np.mean(np.mean(xv, axis=0, keepdims=True), axis=2, keepdims=True)
- xv_transposed = np.transpose(xv, [0, 2, 1]).reshape(
- (data_shape[0] * data_shape[2], nr_chan)
- )
-
- var_biased = np.var(xv_transposed, axis=0).reshape((1, nr_chan, 1))
- sd = np.sqrt(var_biased + bn.eps)
-
- var_unbiased = np.var(xv_transposed, axis=0, ddof=1).reshape((1, nr_chan, 1))
- running_mean = running_mean * momentum + mean * (1 - momentum)
- running_var = running_var * momentum + var_unbiased * (1 - momentum)
-
- yv = bn(Tensor(xv))
- yv_expect = (xv - mean) / sd
-
- _assert_allclose(yv.numpy(), yv_expect)
- _assert_allclose(bn.running_mean.numpy().reshape(-1), running_mean.reshape(-1))
- _assert_allclose(bn.running_var.numpy().reshape(-1), running_var.reshape(-1))
-
- # test set 'training' flag to False
- mean_backup = bn.running_mean.numpy()
- var_backup = bn.running_var.numpy()
- bn.training = False
- xv = np.random.normal(loc=2.3, size=data_shape).astype(np.float32)
- data = Tensor(xv)
- yv1 = bn(data)
- yv2 = bn(data)
- np.testing.assert_equal(yv1.numpy(), yv2.numpy())
- np.testing.assert_equal(mean_backup, bn.running_mean.numpy())
- np.testing.assert_equal(var_backup, bn.running_var.numpy())
- yv_expect = (xv - running_mean) / np.sqrt(running_var + bn.eps)
- _assert_allclose(yv1.numpy(), yv_expect)
-
-
- @pytest.mark.skipif(
- platform.system() == "Darwin", reason="do not imp GPU mode at macos now"
- )
- @pytest.mark.skipif(
- platform.system() == "Windows", reason="do not imp GPU mode at Windows now"
- )
- @pytest.mark.isolated_distributed
- def test_syncbn1d():
- nr_chan = 8
- data_shape = (3, nr_chan, 4)
- momentum = 0.9
- bn = SyncBatchNorm(nr_chan, momentum=momentum)
- running_mean = np.zeros((1, nr_chan, 1), dtype=np.float32)
- running_var = np.ones((1, nr_chan, 1), dtype=np.float32)
- for i in range(3):
- xv = np.random.normal(loc=2.3, size=data_shape).astype(np.float32)
- mean = np.mean(np.mean(xv, axis=0, keepdims=True), axis=2, keepdims=True)
- xv_transposed = np.transpose(xv, [0, 2, 1]).reshape(
- (data_shape[0] * data_shape[2], nr_chan)
- )
-
- var_biased = np.var(xv_transposed, axis=0).reshape((1, nr_chan, 1))
- sd = np.sqrt(var_biased + bn.eps)
-
- var_unbiased = np.var(xv_transposed, axis=0, ddof=1).reshape((1, nr_chan, 1))
- running_mean = running_mean * momentum + mean * (1 - momentum)
- running_var = running_var * momentum + var_unbiased * (1 - momentum)
-
- yv = bn(Tensor(xv))
- yv_expect = (xv - mean) / sd
-
- _assert_allclose(yv.numpy(), yv_expect)
- _assert_allclose(bn.running_mean.numpy().reshape(-1), running_mean.reshape(-1))
- _assert_allclose(bn.running_var.numpy().reshape(-1), running_var.reshape(-1))
-
- # test set 'training' flag to False
- mean_backup = bn.running_mean.numpy()
- var_backup = bn.running_var.numpy()
- bn.training = False
- xv = np.random.normal(loc=2.3, size=data_shape).astype(np.float32)
- data = Tensor(xv)
- yv1 = bn(data)
- yv2 = bn(data)
- np.testing.assert_equal(yv1.numpy(), yv2.numpy())
- np.testing.assert_equal(mean_backup, bn.running_mean.numpy())
- np.testing.assert_equal(var_backup, bn.running_var.numpy())
- yv_expect = (xv - running_mean) / np.sqrt(running_var + bn.eps)
- _assert_allclose(yv1.numpy(), yv_expect)
-
-
- def test_batchnorm2d():
- nr_chan = 8
- data_shape = (3, nr_chan, 16, 16)
- momentum = 0.9
- bn = BatchNorm2d(nr_chan, momentum=momentum)
- running_mean = np.zeros((1, nr_chan, 1, 1), dtype=np.float32)
- running_var = np.ones((1, nr_chan, 1, 1), dtype=np.float32)
- for i in range(3):
- xv = np.random.normal(loc=2.3, size=data_shape).astype(np.float32)
- xv_transposed = np.transpose(xv, [0, 2, 3, 1]).reshape(
- (data_shape[0] * data_shape[2] * data_shape[3], nr_chan)
- )
-
- mean = np.mean(xv_transposed, axis=0).reshape(1, nr_chan, 1, 1)
-
- var_biased = np.var(xv_transposed, axis=0).reshape((1, nr_chan, 1, 1))
- sd = np.sqrt(var_biased + bn.eps)
-
- var_unbiased = np.var(xv_transposed, axis=0, ddof=1).reshape((1, nr_chan, 1, 1))
- running_mean = running_mean * momentum + mean * (1 - momentum)
- running_var = running_var * momentum + var_unbiased * (1 - momentum)
-
- yv = bn(Tensor(xv))
- yv_expect = (xv - mean) / sd
-
- _assert_allclose(yv.numpy(), yv_expect)
- _assert_allclose(bn.running_mean.numpy(), running_mean)
- _assert_allclose(bn.running_var.numpy(), running_var)
-
- # test set 'training' flag to False
- mean_backup = bn.running_mean.numpy()
- var_backup = bn.running_var.numpy()
- bn.training = False
- xv = np.random.normal(loc=2.3, size=data_shape).astype(np.float32)
- data = Tensor(xv)
- yv1 = bn(data)
- yv2 = bn(data)
- np.testing.assert_equal(yv1.numpy(), yv2.numpy())
- np.testing.assert_equal(mean_backup, bn.running_mean.numpy())
- np.testing.assert_equal(var_backup, bn.running_var.numpy())
- yv_expect = (xv - running_mean) / np.sqrt(running_var + bn.eps)
- _assert_allclose(yv1.numpy(), yv_expect)
-
-
- @pytest.mark.skipif(
- platform.system() == "Darwin", reason="do not imp GPU mode at macos now"
- )
- @pytest.mark.skipif(
- platform.system() == "Windows", reason="do not imp GPU mode at Windows now"
- )
- @pytest.mark.isolated_distributed
- def test_syncbn2d():
- nr_chan = 8
- data_shape = (3, nr_chan, 16, 16)
- momentum = 0.9
- bn = SyncBatchNorm(nr_chan, momentum=momentum)
- running_mean = np.zeros((1, nr_chan, 1, 1), dtype=np.float32)
- running_var = np.ones((1, nr_chan, 1, 1), dtype=np.float32)
- for i in range(3):
- xv = np.random.normal(loc=2.3, size=data_shape).astype(np.float32)
- xv_transposed = np.transpose(xv, [0, 2, 3, 1]).reshape(
- (data_shape[0] * data_shape[2] * data_shape[3], nr_chan)
- )
-
- mean = np.mean(xv_transposed, axis=0).reshape(1, nr_chan, 1, 1)
-
- var_biased = np.var(xv_transposed, axis=0).reshape((1, nr_chan, 1, 1))
- sd = np.sqrt(var_biased + bn.eps)
-
- var_unbiased = np.var(xv_transposed, axis=0, ddof=1).reshape((1, nr_chan, 1, 1))
- running_mean = running_mean * momentum + mean * (1 - momentum)
- running_var = running_var * momentum + var_unbiased * (1 - momentum)
-
- yv = bn(Tensor(xv))
- yv_expect = (xv - mean) / sd
-
- _assert_allclose(yv.numpy(), yv_expect)
- _assert_allclose(bn.running_mean.numpy(), running_mean)
- _assert_allclose(bn.running_var.numpy(), running_var)
-
- # test set 'training' flag to False
- mean_backup = bn.running_mean.numpy()
- var_backup = bn.running_var.numpy()
- bn.training = False
- xv = np.random.normal(loc=2.3, size=data_shape).astype(np.float32)
- data = Tensor(xv)
- yv1 = bn(data)
- yv2 = bn(data)
- np.testing.assert_equal(yv1.numpy(), yv2.numpy())
- np.testing.assert_equal(mean_backup, bn.running_mean.numpy())
- np.testing.assert_equal(var_backup, bn.running_var.numpy())
- yv_expect = (xv - running_mean) / np.sqrt(running_var + bn.eps)
- _assert_allclose(yv1.numpy(), yv_expect)
-
-
- def test_batchnorm_no_stats():
- nr_chan = 8
- data_shape = (3, nr_chan, 4)
- bn = BatchNorm1d(8, track_running_stats=False)
- for i in range(4):
- if i == 2:
- bn.training = False
- xv = np.random.normal(loc=2.3, size=data_shape).astype(np.float32)
- mean = np.mean(np.mean(xv, axis=0, keepdims=True), axis=2, keepdims=True)
- var = np.var(
- np.transpose(xv, [0, 2, 1]).reshape(
- (data_shape[0] * data_shape[2], nr_chan)
- ),
- axis=0,
- ).reshape((1, nr_chan, 1))
- sd = np.sqrt(var + bn.eps)
-
- yv = bn(Tensor(xv))
- yv_expect = (xv - mean) / sd
-
- _assert_allclose(yv.numpy(), yv_expect)
-
-
- @pytest.mark.skipif(
- platform.system() == "Darwin", reason="do not imp GPU mode at macos now"
- )
- @pytest.mark.skipif(
- platform.system() == "Windows", reason="do not imp GPU mode at Windows now"
- )
- @pytest.mark.isolated_distributed
- def test_syncbn_no_stats():
- nr_chan = 8
- data_shape = (3, nr_chan, 4)
- bn = SyncBatchNorm(8, track_running_stats=False)
- for i in range(4):
- if i == 2:
- bn.training = False
- xv = np.random.normal(loc=2.3, size=data_shape).astype(np.float32)
- mean = np.mean(np.mean(xv, axis=0, keepdims=True), axis=2, keepdims=True)
- var = np.var(
- np.transpose(xv, [0, 2, 1]).reshape(
- (data_shape[0] * data_shape[2], nr_chan)
- ),
- axis=0,
- ).reshape((1, nr_chan, 1))
- sd = np.sqrt(var + bn.eps)
-
- yv = bn(Tensor(xv))
- yv_expect = (xv - mean) / sd
-
- _assert_allclose(yv.numpy(), yv_expect)
-
-
- def test_batchnorm2d_no_stats():
- nr_chan = 8
- data_shape = (3, nr_chan, 16, 16)
- bn = BatchNorm2d(8, track_running_stats=False)
- for i in range(4):
- if i == 2:
- bn.training = False
- xv = np.random.normal(loc=2.3, size=data_shape).astype(np.float32)
- xv_transposed = np.transpose(xv, [0, 2, 3, 1]).reshape(
- (data_shape[0] * data_shape[2] * data_shape[3], nr_chan)
- )
-
- mean = np.mean(xv_transposed, axis=0).reshape(1, nr_chan, 1, 1)
- var = np.var(xv_transposed, axis=0).reshape((1, nr_chan, 1, 1))
- sd = np.sqrt(var + bn.eps)
-
- yv = bn(Tensor(xv))
- yv_expect = (xv - mean) / sd
-
- _assert_allclose(yv.numpy(), yv_expect)
-
-
- @pytest.mark.skipif(
- platform.system() == "Darwin", reason="do not imp GPU mode at macos now"
- )
- @pytest.mark.skipif(
- platform.system() == "Windows", reason="do not imp GPU mode at Windows now"
- )
- @pytest.mark.isolated_distributed
- def test_syncbn2d_no_stats():
- nr_chan = 8
- data_shape = (3, nr_chan, 16, 16)
- bn = SyncBatchNorm(8, track_running_stats=False)
- for i in range(4):
- if i == 2:
- bn.training = False
- xv = np.random.normal(loc=2.3, size=data_shape).astype(np.float32)
- xv_transposed = np.transpose(xv, [0, 2, 3, 1]).reshape(
- (data_shape[0] * data_shape[2] * data_shape[3], nr_chan)
- )
-
- mean = np.mean(xv_transposed, axis=0).reshape(1, nr_chan, 1, 1)
- var = np.var(xv_transposed, axis=0).reshape((1, nr_chan, 1, 1))
- sd = np.sqrt(var + bn.eps)
-
- yv = bn(Tensor(xv))
- yv_expect = (xv - mean) / sd
-
- _assert_allclose(yv.numpy(), yv_expect)
|