diff --git a/imperative/python/megengine/distributed/helper.py b/imperative/python/megengine/distributed/helper.py index f56cddc0..2be3011f 100644 --- a/imperative/python/megengine/distributed/helper.py +++ b/imperative/python/megengine/distributed/helper.py @@ -7,8 +7,11 @@ # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. import functools +import multiprocessing as mp from typing import Callable +from megengine.device import get_device_count + from .group import group_barrier, is_distributed @@ -26,3 +29,16 @@ def synchronized(func: Callable): return ret return wrapper + + +def get_device_count_by_fork(device_type: str): + q = mp.Queue() + + def worker(queue): + num = get_device_count(device_type) + queue.put(num) + + p = mp.Process(target=worker, args=(q,)) + p.start() + p.join() + return q.get() diff --git a/imperative/python/test/integration/test_dp_correctness.py b/imperative/python/test/integration/test_dp_correctness.py index b706adb6..1494cbd7 100644 --- a/imperative/python/test/integration/test_dp_correctness.py +++ b/imperative/python/test/integration/test_dp_correctness.py @@ -21,6 +21,7 @@ import megengine as mge import megengine.distributed as dist import megengine.functional as F from megengine.device import get_default_device, set_default_device +from megengine.distributed.helper import get_device_count_by_fork from megengine.functional.debug_param import set_conv_execution_strategy from megengine.module import AvgPool2d, BatchNorm2d, Conv2d, Linear, Module from megengine.optimizer import SGD @@ -196,6 +197,7 @@ def run_test( assert p.exitcode == 0 +@pytest.mark.skipif(get_device_count_by_fork("gpu") < 4, reason="need more gpu device") @pytest.mark.isolated_distributed @pytest.mark.skipif( platform.system() == "Windows", reason="windows disable MGB_ENABLE_OPR_MM" diff --git a/imperative/python/test/unit/functional/test_tensor.py b/imperative/python/test/unit/functional/test_tensor.py index 8d06d699..8fe8cb8d 100644 --- a/imperative/python/test/unit/functional/test_tensor.py +++ b/imperative/python/test/unit/functional/test_tensor.py @@ -6,6 +6,8 @@ # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +import platform + import numpy as np import pytest @@ -13,6 +15,7 @@ import megengine.functional as F from megengine import Buffer, Parameter, is_cuda_available, tensor from megengine.core._trace_option import use_tensor_shape from megengine.core.tensor.utils import astensor1d +from megengine.distributed.helper import get_device_count_by_fork from megengine.test import assertTensorClose @@ -323,17 +326,35 @@ def copy_test(dst, src): assert np.allclose(data, y.numpy()) -@pytest.mark.skipif(not is_cuda_available(), reason="CUDA is disabled") +@pytest.mark.skipif( + platform.system() == "Darwin", reason="do not imp GPU mode at macos now" +) +@pytest.mark.skipif( + platform.system() == "Windows", reason="do not imp GPU mode at Windows now" +) +@pytest.mark.skipif(get_device_count_by_fork("gpu") == 0, reason="CUDA is disabled") def test_copy_h2d(): copy_test("cpu0", "gpu0") -@pytest.mark.skipif(not is_cuda_available(), reason="CUDA is disabled") +@pytest.mark.skipif( + platform.system() == "Darwin", reason="do not imp GPU mode at macos now" +) +@pytest.mark.skipif( + platform.system() == "Windows", reason="do not imp GPU mode at Windows now" +) +@pytest.mark.skipif(get_device_count_by_fork("gpu") == 0, reason="CUDA is disabled") def test_copy_d2h(): copy_test("gpu0", "cpu0") -@pytest.mark.skipif(not is_cuda_available(), reason="CUDA is disabled") +@pytest.mark.skipif( + platform.system() == "Darwin", reason="do not imp GPU mode at macos now" +) +@pytest.mark.skipif( + platform.system() == "Windows", reason="do not imp GPU mode at Windows now" +) +@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device") def test_copy_d2d(): copy_test("gpu0", "gpu1") copy_test("gpu0:0", "gpu0:1") diff --git a/imperative/python/test/unit/test_distributed.py b/imperative/python/test/unit/test_distributed.py index 70692eb9..dd94ac05 100644 --- a/imperative/python/test/unit/test_distributed.py +++ b/imperative/python/test/unit/test_distributed.py @@ -14,6 +14,7 @@ import pytest import megengine as mge import megengine.distributed as dist +from megengine.distributed.helper import get_device_count_by_fork def _assert_q_empty(q): @@ -36,6 +37,7 @@ def _assert_q_val(q, val): @pytest.mark.skipif( platform.system() == "Windows", reason="do not imp GPU mode at Windows now" ) +@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device") @pytest.mark.isolated_distributed def test_init_process_group(): world_size = 2 @@ -43,8 +45,6 @@ def test_init_process_group(): server = dist.Server(port) def worker(rank, backend): - if mge.get_device_count("gpu") < world_size: - return dist.init_process_group("localhost", port, world_size, rank, rank, backend) assert dist.is_distributed() == True assert dist.get_rank() == rank @@ -82,6 +82,7 @@ def test_init_process_group(): @pytest.mark.skipif( platform.system() == "Windows", reason="do not imp GPU mode at Windows now" ) +@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device") @pytest.mark.isolated_distributed def test_new_group(): world_size = 3 @@ -90,8 +91,6 @@ def test_new_group(): server = dist.Server(port) def worker(rank): - if mge.get_device_count("gpu") < world_size: - return dist.init_process_group("localhost", port, world_size, rank, rank) if rank in ranks: group = dist.new_group(ranks) @@ -117,6 +116,7 @@ def test_new_group(): @pytest.mark.skipif( platform.system() == "Windows", reason="do not imp GPU mode at Windows now" ) +@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device") @pytest.mark.isolated_distributed def test_group_barrier(): world_size = 2 @@ -124,8 +124,6 @@ def test_group_barrier(): server = dist.Server(port) def worker(rank, q): - if mge.get_device_count("gpu") < world_size: - return dist.init_process_group("localhost", port, world_size, rank, rank) dist.group_barrier() if rank == 0: @@ -154,6 +152,7 @@ def test_group_barrier(): @pytest.mark.skipif( platform.system() == "Windows", reason="do not imp GPU mode at Windows now" ) +@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device") @pytest.mark.isolated_distributed def test_synchronized(): world_size = 2 @@ -165,8 +164,6 @@ def test_synchronized(): q.put(rank) def worker(rank, q): - if mge.get_device_count("gpu") < world_size: - return dist.init_process_group("localhost", port, world_size, rank, rank) dist.group_barrier() if rank == 0: diff --git a/imperative/python/test/unit/test_module.py b/imperative/python/test/unit/test_module.py index d4be2351..741ad329 100644 --- a/imperative/python/test/unit/test_module.py +++ b/imperative/python/test/unit/test_module.py @@ -10,6 +10,14 @@ import platform import pytest +import megengine as mge +import megengine.distributed as dist +from megengine import tensor +from megengine.distributed.group import Group +from megengine.distributed.helper import get_device_count_by_fork +from megengine.module import SyncBatchNorm +from megengine.test import assertTensorClose + @pytest.mark.skipif( platform.system() == "Darwin", reason="do not imp GPU mode at macos now" @@ -17,6 +25,7 @@ import pytest @pytest.mark.skipif( platform.system() == "Windows", reason="do not imp GPU mode at Windows now" ) +@pytest.mark.skipif(get_device_count_by_fork("gpu") < 4, reason="need more gpu device") @pytest.mark.isolated_distributed def test_syncbn(): import numpy as np @@ -39,15 +48,6 @@ def test_syncbn(): port = server.py_server_port def worker(rank, data, yv_expect, running_mean, running_var): - import megengine as mge - import megengine.distributed as dist - from megengine import tensor - from megengine.module import SyncBatchNorm - from megengine.distributed.group import Group - from megengine.test import assertTensorClose - - if mge.get_device_count("gpu") < nr_ranks: - return dist.init_process_group("localhost", port, nr_ranks, rank, rank) group = Group([i for i in range(nr_ranks)]) bn = SyncBatchNorm(nr_chan, eps=eps, momentum=momentum, group=group)