Browse Source

test(mge/dist): add unit tests for megengine.distributed.utils

GitOrigin-RevId: 32336c66f1
tags/v0.4.0
Megvii Engine Team Xinran Xu 5 years ago
parent
commit
d3730036ea
4 changed files with 473 additions and 9 deletions
  1. +2
    -0
      python_module/megengine/distributed/__init__.py
  2. +14
    -9
      python_module/test/run.sh
  3. +294
    -0
      python_module/test/unit/distributed/test_functional.py
  4. +163
    -0
      python_module/test/unit/distributed/test_util.py

+ 2
- 0
python_module/megengine/distributed/__init__.py View File

@@ -17,6 +17,7 @@ from .functional import (
reduce_sum,
)
from .util import (
get_backend,
get_master_ip,
get_master_port,
get_rank,
@@ -24,4 +25,5 @@ from .util import (
group_barrier,
init_process_group,
is_distributed,
synchronized,
)

+ 14
- 9
python_module/test/run.sh View File

@@ -1,14 +1,19 @@
#!/bin/bash -e

ignore_list="--ignore test/unit/module/test_pytorch.py \
--ignore test/pytorch_comparison \
--ignore test/unit/hub/test_hub.py \
--ignore test/unit/data \
--ignore test/integration/manual \
--ignore megengine/module/pytorch \
--ignore test/unit/module/test_external.py"
test_dirs="megengine test"

pushd $(dirname "${BASH_SOURCE[0]}")/.. >/dev/null
pytest -xv -m 'not internet' \
pytest -xv -m 'isolated_distributed' \
--json-report --json-report-file=time_python_test.json \
$ignore_list $test_dirs
pytest -xv -m 'not internet and not isolated_distributed' \
--json-report --json-report-file=time_python_test.json \
--ignore test/unit/module/test_pytorch.py \
--ignore test/pytorch_comparison \
--ignore test/unit/hub/test_hub.py \
--ignore test/unit/data \
--ignore test/integration/manual \
--ignore megengine/module/pytorch \
--ignore test/unit/module/test_external.py \
megengine test
$ignore_list $test_dirs
popd >/dev/null

+ 294
- 0
python_module/test/unit/distributed/test_functional.py View File

@@ -0,0 +1,294 @@
# MegEngine is Licensed under the Apache License, Version 2.0 (the "License")
#
# Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

import multiprocessing as mp

import numpy as np
import pytest

import megengine as mge
import megengine.distributed as dist
from megengine.core import Parameter, tensor


def _init_process_group_wrapper(world_size, rank, dev, backend, q):
if rank == 0:
dist.init_process_group("localhost", 0, world_size, rank, dev, backend)
q.put(dist.get_master_port())
else:
port = q.get()
dist.init_process_group("localhost", port, world_size, rank, dev, backend)


@pytest.mark.isolated_distributed
def test_reduce_sum():
world_size = 2

def worker(rank, data, backend, expect, port_queue):
if not mge.is_cuda_available():
return
_init_process_group_wrapper(world_size, rank, rank, backend, port_queue)
inp = tensor(data)
output = dist.functional.reduce_sum(inp, "x")
if rank == 0:
assert np.allclose(output.numpy(), expect)
else:
assert np.allclose(output.numpy(), 0)

def check(shape, backend):
port_queue = mp.Queue()
x = np.random.rand(*shape).astype("float32")
y = np.random.rand(*shape).astype("float32")
z = x + y
p0 = mp.Process(target=worker, args=(0, x, backend, z, port_queue))
p1 = mp.Process(target=worker, args=(1, y, backend, None, port_queue))

p0.start()
p1.start()

p0.join(10)
p1.join(10)

assert p0.exitcode == 0 and p1.exitcode == 0

for shape in [(2, 3), (8, 10), (99, 77)]:
for backend in ["nccl", "ucx"]:
check(shape, backend)


@pytest.mark.isolated_distributed
def test_broadcast():
world_size = 2

def worker(rank, data, backend, expect, port_queue):
if not mge.is_cuda_available():
return
_init_process_group_wrapper(world_size, rank, rank, backend, port_queue)
inp = tensor(data)
output = dist.functional.broadcast(inp, "x")
assert np.allclose(output.numpy(), expect)

def check(shape, backend):
port_queue = mp.Queue()
x = np.random.rand(*shape).astype("float32")
y = x + 1
p0 = mp.Process(target=worker, args=(0, x, backend, x, port_queue))
p1 = mp.Process(target=worker, args=(1, y, backend, x, port_queue))

p0.start()
p1.start()

p0.join(10)
p1.join(10)

assert p0.exitcode == 0 and p1.exitcode == 0

for shape in [(2, 3), (8, 10), (99, 77)]:
for backend in ["nccl", "ucx"]:
check(shape, backend)


@pytest.mark.isolated_distributed
def test_all_gather():
world_size = 2

def worker(rank, data, backend, expect, port_queue):
if not mge.is_cuda_available():
return
_init_process_group_wrapper(world_size, rank, rank, backend, port_queue)
inp = tensor(data)
output = dist.functional.all_gather(inp, "x")
assert np.allclose(output.numpy(), expect)

def check(shape, backend):
port_queue = mp.Queue()
x = np.random.rand(*shape).astype("float32")
y = np.random.rand(*shape).astype("float32")
z = np.concatenate((x, y))
p0 = mp.Process(target=worker, args=(0, x, backend, z, port_queue))
p1 = mp.Process(target=worker, args=(1, y, backend, z, port_queue))

p0.start()
p1.start()

p0.join(10)
p1.join(10)

assert p0.exitcode == 0 and p1.exitcode == 0

for shape in [(2, 3), (8, 10), (99, 77)]:
for backend in ["nccl", "ucx"]:
check(shape, backend)


@pytest.mark.isolated_distributed
def test_reduce_scatter_sum():
world_size = 2

def worker(rank, data, backend, expect, port_queue):
if not mge.is_cuda_available():
return
_init_process_group_wrapper(world_size, rank, rank, backend, port_queue)
inp = tensor(data)
output = dist.functional.reduce_scatter_sum(inp, "x")
assert np.allclose(output.numpy(), expect)

def check(shape, backend):
port_queue = mp.Queue()
x = np.random.rand(*shape).astype("float32")
y = np.random.rand(*shape).astype("float32")
z = x + y
p0 = mp.Process(
target=worker, args=(0, x, backend, z[: shape[0] // 2], port_queue)
)
p1 = mp.Process(
target=worker, args=(1, y, backend, z[shape[0] // 2 :], port_queue)
)

p0.start()
p1.start()

p0.join(10)
p1.join(10)

assert p0.exitcode == 0 and p1.exitcode == 0

for shape in [(2, 4), (8, 10), (88, 44)]:
for backend in ["nccl", "ucx"]:
check(shape, backend)


@pytest.mark.isolated_distributed
def test_all_reduce_sum():
world_size = 2

def worker(rank, data, backend, expect, port_queue):
if not mge.is_cuda_available():
return
_init_process_group_wrapper(world_size, rank, rank, backend, port_queue)
inp = tensor(data)
output = dist.functional.all_reduce_sum(inp, "x")
assert np.allclose(output.numpy(), expect)

def check(shape, backend):
port_queue = mp.Queue()
x = np.random.rand(*shape).astype("float32")
y = np.random.rand(*shape).astype("float32")
z = x + y
p0 = mp.Process(target=worker, args=(0, x, backend, z, port_queue))
p1 = mp.Process(target=worker, args=(1, y, backend, z, port_queue))

p0.start()
p1.start()

p0.join(10)
p1.join(10)

assert p0.exitcode == 0 and p1.exitcode == 0

for shape in [(2, 3), (8, 10), (99, 77)]:
for backend in ["nccl", "ucx"]:
check(shape, backend)


@pytest.mark.isolated_distributed
def test_all_reduce_max():
world_size = 2

def worker(rank, data, backend, expect, port_queue):
if not mge.is_cuda_available():
return
_init_process_group_wrapper(world_size, rank, rank, backend, port_queue)
inp = tensor(data)
output = dist.functional.all_reduce_max(inp, "x")
assert np.allclose(output.numpy(), expect)

def check(shape, backend):
port_queue = mp.Queue()
x = np.random.rand(*shape).astype("float32")
y = np.random.rand(*shape).astype("float32")
z = np.maximum(x, y)
p0 = mp.Process(target=worker, args=(0, x, backend, z, port_queue))
p1 = mp.Process(target=worker, args=(1, y, backend, z, port_queue))

p0.start()
p1.start()

p0.join(10)
p1.join(10)

assert p0.exitcode == 0 and p1.exitcode == 0

for shape in [(2, 3), (8, 10), (99, 77)]:
for backend in ["nccl", "ucx"]:
check(shape, backend)


@pytest.mark.isolated_distributed
def test_all_reduce_min():
world_size = 2

def worker(rank, data, backend, expect, port_queue):
if not mge.is_cuda_available():
return
_init_process_group_wrapper(world_size, rank, rank, backend, port_queue)
inp = tensor(data)
output = dist.functional.all_reduce_min(inp, "x")
assert np.allclose(output.numpy(), expect)

def check(shape, backend):
port_queue = mp.Queue()
x = np.random.rand(*shape).astype("float32")
y = np.random.rand(*shape).astype("float32")
z = np.minimum(x, y)
p0 = mp.Process(target=worker, args=(0, x, backend, z, port_queue))
p1 = mp.Process(target=worker, args=(1, y, backend, z, port_queue))

p0.start()
p1.start()

p0.join(10)
p1.join(10)

assert p0.exitcode == 0 and p1.exitcode == 0

for shape in [(2, 3), (8, 10), (99, 77)]:
for backend in ["nccl", "ucx"]:
check(shape, backend)


@pytest.mark.isolated_distributed
def test_bcast_param():
world_size = 2

def worker(rank, data, backend, expect, port_queue):
if not mge.is_cuda_available():
return
_init_process_group_wrapper(world_size, rank, rank, backend, port_queue)
inp = Parameter(data)
dist.functional.bcast_param(inp, "x")
assert np.allclose(inp.numpy(), expect)

def check(shape, backend):
port_queue = mp.Queue()
x = np.random.rand(*shape).astype("float32")
y = x + 1
p0 = mp.Process(target=worker, args=(0, x, backend, x, port_queue))
p1 = mp.Process(target=worker, args=(1, y, backend, x, port_queue))

p0.start()
p1.start()

p0.join(10)
p1.join(10)

assert p0.exitcode == 0 and p1.exitcode == 0

for shape in [(2, 3), (8, 10), (99, 77)]:
for backend in ["nccl", "ucx"]:
check(shape, backend)

+ 163
- 0
python_module/test/unit/distributed/test_util.py View File

@@ -0,0 +1,163 @@
# MegEngine is Licensed under the Apache License, Version 2.0 (the "License")
#
# Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
import multiprocessing as mp
import queue
from time import sleep

import pytest

import megengine as mge
import megengine._internal as mgb
import megengine.distributed as dist

_LOCALHOST = "127.0.0.1"


def _assert_q_empty(q):
try:
res = q.get(timeout=1)
except Exception as e:
assert isinstance(e, queue.Empty)
else:
assert False, "queue is not empty"


def _assert_q_val(q, val):
ret = q.get()
assert ret == val


def _init_process_group_wrapper(world_size, rank, dev, backend, q):
if rank == 0:
dist.init_process_group(_LOCALHOST, 0, world_size, rank, dev, backend)
q.put(dist.get_master_port())
else:
port = q.get()
dist.init_process_group(_LOCALHOST, port, world_size, rank, dev, backend)


@pytest.mark.isolated_distributed
def test_create_mm_server():
def worker():
if not mge.is_cuda_available():
return
port = mgb.config.create_mm_server("0.0.0.0", 0)
assert port > 0
res = mgb.config.create_mm_server("0.0.0.0", port)
assert res == -1

p = mp.Process(target=worker)

p.start()

p.join(10)

assert p.exitcode == 0


@pytest.mark.isolated_distributed
def test_init_process_group():
world_size = 2

def worker(rank, backend, q):
if not mge.is_cuda_available():
return
_init_process_group_wrapper(world_size, rank, rank, backend, q)
assert dist.is_distributed() == True
assert dist.get_master_ip() == _LOCALHOST
assert dist.get_master_port() > 0
assert dist.get_world_size() == world_size
assert dist.get_rank() == rank
assert dist.get_backend() == backend

def check(backend):
Q = mp.Queue()
p0 = mp.Process(target=worker, args=(0, backend, Q))
p1 = mp.Process(target=worker, args=(1, backend, Q))

p0.start()
p1.start()

p0.join(10)
p1.join(10)

assert p0.exitcode == 0 and p1.exitcode == 0

check("nccl")
check("ucx")


@pytest.mark.isolated_distributed
def test_group_barrier():
world_size = 2
ip = "127.0.0.1"
backend = "nccl"

def worker(rank, q):
if not mge.is_cuda_available():
return
_init_process_group_wrapper(world_size, rank, rank, backend, q)
dist.group_barrier()
if rank == 0:
dist.group_barrier()
q.put(0) # to be observed in rank 1
else:
_assert_q_empty(q) # q.put(0) is not executed in rank 0
dist.group_barrier()
_assert_q_val(q, 0) # q.put(0) executed in rank 0

Q = mp.Queue()
p0 = mp.Process(target=worker, args=(0, Q))
p1 = mp.Process(target=worker, args=(1, Q))

p0.start()
p1.start()

p0.join(10)
p1.join(10)

assert p0.exitcode == 0 and p1.exitcode == 0


@pytest.mark.isolated_distributed
def test_synchronized():
world_size = 2
backend = "nccl"

@dist.synchronized
def func(rank, q):
q.put(rank)

def worker(rank, q):
if not mge.is_cuda_available():
return
_init_process_group_wrapper(world_size, rank, rank, backend, q)
dist.group_barrier()
if rank == 0:
func(0, q) # q.put(0)
q.put(2)
else:
_assert_q_val(q, 0) # func executed in rank 0
_assert_q_empty(q) # q.put(2) is not executed
func(1, q)
_assert_q_val(
q, 1
) # func in rank 1 executed earlier than q.put(2) in rank 0
_assert_q_val(q, 2) # q.put(2) executed in rank 0

Q = mp.Queue()
p0 = mp.Process(target=worker, args=(0, Q))
p1 = mp.Process(target=worker, args=(1, Q))

p0.start()
p1.start()

p0.join(10)
p1.join(10)

assert p0.exitcode == 0 and p1.exitcode == 0

Loading…
Cancel
Save