Browse Source

test(mge/distributed): check gpu num for multi gpu test

GitOrigin-RevId: 78f4718682
release-0.6
Megvii Engine Team 5 years ago
parent
commit
4ace67ff44
2 changed files with 19 additions and 18 deletions
  1. +11
    -11
      python_module/test/unit/distributed/test_functional.py
  2. +8
    -7
      python_module/test/unit/module/test_batchnorm.py

+ 11
- 11
python_module/test/unit/distributed/test_functional.py View File

@@ -30,7 +30,7 @@ def test_reduce_sum():
world_size = 2

def worker(rank, data, backend, expect, port_queue):
if not mge.is_cuda_available():
if mge.get_device_count("gpu") < world_size:
return
_init_process_group_wrapper(world_size, rank, rank, backend, port_queue)
inp = tensor(data)
@@ -66,7 +66,7 @@ def test_gather():
world_size = 2

def worker(rank, data, backend, expect, port_queue):
if not mge.is_cuda_available():
if mge.get_device_count("gpu") < world_size:
return
_init_process_group_wrapper(world_size, rank, rank, backend, port_queue)
inp = tensor(data)
@@ -102,7 +102,7 @@ def test_broadcast():
world_size = 2

def worker(rank, data, backend, expect, port_queue):
if not mge.is_cuda_available():
if mge.get_device_count("gpu") < world_size:
return
_init_process_group_wrapper(world_size, rank, rank, backend, port_queue)
inp = tensor(data)
@@ -134,7 +134,7 @@ def test_scatter():
world_size = 2

def worker(rank, data, backend, expect, port_queue):
if not mge.is_cuda_available():
if mge.get_device_count("gpu") < world_size:
return
_init_process_group_wrapper(world_size, rank, rank, backend, port_queue)
inp = tensor(data)
@@ -170,7 +170,7 @@ def test_all_to_all():
world_size = 2

def worker(rank, data, backend, expect, port_queue):
if not mge.is_cuda_available():
if mge.get_device_count("gpu") < world_size:
return
_init_process_group_wrapper(world_size, rank, rank, backend, port_queue)
inp = tensor(data)
@@ -204,7 +204,7 @@ def test_all_gather():
world_size = 2

def worker(rank, data, backend, expect, port_queue):
if not mge.is_cuda_available():
if mge.get_device_count("gpu") < world_size:
return
_init_process_group_wrapper(world_size, rank, rank, backend, port_queue)
inp = tensor(data)
@@ -237,7 +237,7 @@ def test_reduce_scatter_sum():
world_size = 2

def worker(rank, data, backend, expect, port_queue):
if not mge.is_cuda_available():
if mge.get_device_count("gpu") < world_size:
return
_init_process_group_wrapper(world_size, rank, rank, backend, port_queue)
inp = tensor(data)
@@ -274,7 +274,7 @@ def test_all_reduce_sum():
world_size = 2

def worker(rank, data, backend, expect, port_queue):
if not mge.is_cuda_available():
if mge.get_device_count("gpu") < world_size:
return
_init_process_group_wrapper(world_size, rank, rank, backend, port_queue)
inp = tensor(data)
@@ -307,7 +307,7 @@ def test_all_reduce_max():
world_size = 2

def worker(rank, data, backend, expect, port_queue):
if not mge.is_cuda_available():
if mge.get_device_count("gpu") < world_size:
return
_init_process_group_wrapper(world_size, rank, rank, backend, port_queue)
inp = tensor(data)
@@ -340,7 +340,7 @@ def test_all_reduce_min():
world_size = 2

def worker(rank, data, backend, expect, port_queue):
if not mge.is_cuda_available():
if mge.get_device_count("gpu") < world_size:
return
_init_process_group_wrapper(world_size, rank, rank, backend, port_queue)
inp = tensor(data)
@@ -373,7 +373,7 @@ def test_bcast_param():
world_size = 2

def worker(rank, data, backend, expect, port_queue):
if not mge.is_cuda_available():
if mge.get_device_count("gpu") < world_size:
return
_init_process_group_wrapper(world_size, rank, rank, backend, port_queue)
inp = Parameter(data)


+ 8
- 7
python_module/test/unit/module/test_batchnorm.py View File

@@ -27,11 +27,12 @@ def test_syncbn():
running_mean = np.zeros((1, nr_chan, 1, 1), dtype=np.float32)
running_var = np.ones((1, nr_chan, 1, 1), dtype=np.float32)
steps = 4
nr_ranks = 2

def worker(rank, data, yv_expect, running_mean, running_var):
if not mge.is_cuda_available():
if mge.get_device_count("gpu") < nr_ranks:
return
dist.init_process_group("localhost", 2333, 4, rank, rank)
dist.init_process_group("localhost", 2333, nr_ranks, rank, rank)
bn = SyncBatchNorm(nr_chan, momentum=momentum, eps=eps)
data_tensor = tensor()
for i in range(steps):
@@ -61,19 +62,19 @@ def test_syncbn():
yv_expect = (xv[i] - mean) / sd

data = []
for i in range(4):
for i in range(nr_ranks):
data.append([])
for j in range(steps):
data[i].append(xv[j][:, :, :, i * 4 : i * 4 + 4])
data[i].append(xv[j][:, :, :, i * 8 : i * 8 + 8])

procs = []
for rank in range(4):
for rank in range(nr_ranks):
p = mp.Process(
target=worker,
args=(
rank,
data[rank],
yv_expect[:, :, :, rank * 4 : rank * 4 + 4],
yv_expect[:, :, :, rank * 8 : rank * 8 + 8],
running_mean,
running_var,
),
@@ -82,7 +83,7 @@ def test_syncbn():
procs.append(p)

for p in procs:
p.join()
p.join(10)
assert p.exitcode == 0




Loading…
Cancel
Save