Browse Source

test(mge/distributed): add get_device_count_by_fork to fix distributed test skip

GitOrigin-RevId: 9ffd8a6149
tags/v1.0.0-rc1
Megvii Engine Team 4 years ago
parent
commit
e1fba6ece7
5 changed files with 56 additions and 20 deletions
  1. +16
    -0
      imperative/python/megengine/distributed/helper.py
  2. +2
    -0
      imperative/python/test/integration/test_dp_correctness.py
  3. +24
    -3
      imperative/python/test/unit/functional/test_tensor.py
  4. +5
    -8
      imperative/python/test/unit/test_distributed.py
  5. +9
    -9
      imperative/python/test/unit/test_module.py

+ 16
- 0
imperative/python/megengine/distributed/helper.py View File

@@ -7,8 +7,11 @@
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
import functools
import multiprocessing as mp
from typing import Callable

from megengine.device import get_device_count

from .group import group_barrier, is_distributed


@@ -26,3 +29,16 @@ def synchronized(func: Callable):
return ret

return wrapper


def get_device_count_by_fork(device_type: str):
q = mp.Queue()

def worker(queue):
num = get_device_count(device_type)
queue.put(num)

p = mp.Process(target=worker, args=(q,))
p.start()
p.join()
return q.get()

+ 2
- 0
imperative/python/test/integration/test_dp_correctness.py View File

@@ -21,6 +21,7 @@ import megengine as mge
import megengine.distributed as dist
import megengine.functional as F
from megengine.device import get_default_device, set_default_device
from megengine.distributed.helper import get_device_count_by_fork
from megengine.functional.debug_param import set_conv_execution_strategy
from megengine.module import AvgPool2d, BatchNorm2d, Conv2d, Linear, Module
from megengine.optimizer import SGD
@@ -196,6 +197,7 @@ def run_test(
assert p.exitcode == 0


@pytest.mark.skipif(get_device_count_by_fork("gpu") < 4, reason="need more gpu device")
@pytest.mark.isolated_distributed
@pytest.mark.skipif(
platform.system() == "Windows", reason="windows disable MGB_ENABLE_OPR_MM"


+ 24
- 3
imperative/python/test/unit/functional/test_tensor.py View File

@@ -6,6 +6,8 @@
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
import platform

import numpy as np
import pytest

@@ -13,6 +15,7 @@ import megengine.functional as F
from megengine import Buffer, Parameter, is_cuda_available, tensor
from megengine.core._trace_option import use_tensor_shape
from megengine.core.tensor.utils import astensor1d
from megengine.distributed.helper import get_device_count_by_fork
from megengine.test import assertTensorClose


@@ -323,17 +326,35 @@ def copy_test(dst, src):
assert np.allclose(data, y.numpy())


@pytest.mark.skipif(not is_cuda_available(), reason="CUDA is disabled")
@pytest.mark.skipif(
platform.system() == "Darwin", reason="do not imp GPU mode at macos now"
)
@pytest.mark.skipif(
platform.system() == "Windows", reason="do not imp GPU mode at Windows now"
)
@pytest.mark.skipif(get_device_count_by_fork("gpu") == 0, reason="CUDA is disabled")
def test_copy_h2d():
copy_test("cpu0", "gpu0")


@pytest.mark.skipif(not is_cuda_available(), reason="CUDA is disabled")
@pytest.mark.skipif(
platform.system() == "Darwin", reason="do not imp GPU mode at macos now"
)
@pytest.mark.skipif(
platform.system() == "Windows", reason="do not imp GPU mode at Windows now"
)
@pytest.mark.skipif(get_device_count_by_fork("gpu") == 0, reason="CUDA is disabled")
def test_copy_d2h():
copy_test("gpu0", "cpu0")


@pytest.mark.skipif(not is_cuda_available(), reason="CUDA is disabled")
@pytest.mark.skipif(
platform.system() == "Darwin", reason="do not imp GPU mode at macos now"
)
@pytest.mark.skipif(
platform.system() == "Windows", reason="do not imp GPU mode at Windows now"
)
@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device")
def test_copy_d2d():
copy_test("gpu0", "gpu1")
copy_test("gpu0:0", "gpu0:1")


+ 5
- 8
imperative/python/test/unit/test_distributed.py View File

@@ -14,6 +14,7 @@ import pytest

import megengine as mge
import megengine.distributed as dist
from megengine.distributed.helper import get_device_count_by_fork


def _assert_q_empty(q):
@@ -36,6 +37,7 @@ def _assert_q_val(q, val):
@pytest.mark.skipif(
platform.system() == "Windows", reason="do not imp GPU mode at Windows now"
)
@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device")
@pytest.mark.isolated_distributed
def test_init_process_group():
world_size = 2
@@ -43,8 +45,6 @@ def test_init_process_group():
server = dist.Server(port)

def worker(rank, backend):
if mge.get_device_count("gpu") < world_size:
return
dist.init_process_group("localhost", port, world_size, rank, rank, backend)
assert dist.is_distributed() == True
assert dist.get_rank() == rank
@@ -82,6 +82,7 @@ def test_init_process_group():
@pytest.mark.skipif(
platform.system() == "Windows", reason="do not imp GPU mode at Windows now"
)
@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device")
@pytest.mark.isolated_distributed
def test_new_group():
world_size = 3
@@ -90,8 +91,6 @@ def test_new_group():
server = dist.Server(port)

def worker(rank):
if mge.get_device_count("gpu") < world_size:
return
dist.init_process_group("localhost", port, world_size, rank, rank)
if rank in ranks:
group = dist.new_group(ranks)
@@ -117,6 +116,7 @@ def test_new_group():
@pytest.mark.skipif(
platform.system() == "Windows", reason="do not imp GPU mode at Windows now"
)
@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device")
@pytest.mark.isolated_distributed
def test_group_barrier():
world_size = 2
@@ -124,8 +124,6 @@ def test_group_barrier():
server = dist.Server(port)

def worker(rank, q):
if mge.get_device_count("gpu") < world_size:
return
dist.init_process_group("localhost", port, world_size, rank, rank)
dist.group_barrier()
if rank == 0:
@@ -154,6 +152,7 @@ def test_group_barrier():
@pytest.mark.skipif(
platform.system() == "Windows", reason="do not imp GPU mode at Windows now"
)
@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device")
@pytest.mark.isolated_distributed
def test_synchronized():
world_size = 2
@@ -165,8 +164,6 @@ def test_synchronized():
q.put(rank)

def worker(rank, q):
if mge.get_device_count("gpu") < world_size:
return
dist.init_process_group("localhost", port, world_size, rank, rank)
dist.group_barrier()
if rank == 0:


+ 9
- 9
imperative/python/test/unit/test_module.py View File

@@ -10,6 +10,14 @@ import platform

import pytest

import megengine as mge
import megengine.distributed as dist
from megengine import tensor
from megengine.distributed.group import Group
from megengine.distributed.helper import get_device_count_by_fork
from megengine.module import SyncBatchNorm
from megengine.test import assertTensorClose


@pytest.mark.skipif(
platform.system() == "Darwin", reason="do not imp GPU mode at macos now"
@@ -17,6 +25,7 @@ import pytest
@pytest.mark.skipif(
platform.system() == "Windows", reason="do not imp GPU mode at Windows now"
)
@pytest.mark.skipif(get_device_count_by_fork("gpu") < 4, reason="need more gpu device")
@pytest.mark.isolated_distributed
def test_syncbn():
import numpy as np
@@ -39,15 +48,6 @@ def test_syncbn():
port = server.py_server_port

def worker(rank, data, yv_expect, running_mean, running_var):
import megengine as mge
import megengine.distributed as dist
from megengine import tensor
from megengine.module import SyncBatchNorm
from megengine.distributed.group import Group
from megengine.test import assertTensorClose

if mge.get_device_count("gpu") < nr_ranks:
return
dist.init_process_group("localhost", port, nr_ranks, rank, rank)
group = Group([i for i in range(nr_ranks)])
bn = SyncBatchNorm(nr_chan, eps=eps, momentum=momentum, group=group)


Loading…
Cancel
Save