You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

launcher.py 3.7 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. # -*- coding: utf-8 -*-
  2. # MegEngine is Licensed under the Apache License, Version 2.0 (the "License")
  3. #
  4. # Copyright (c) 2014-2021 Megvii Inc. All rights reserved.
  5. #
  6. # Unless required by applicable law or agreed to in writing,
  7. # software distributed under the License is distributed on an
  8. # "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  9. import functools
  10. import multiprocessing as mp
  11. from ..core._imperative_rt.core2 import sync
  12. from .group import group_barrier, init_process_group
  13. from .helper import get_device_count_by_fork
  14. from .server import Client, Server
  15. def _run_wrapped(
  16. func, is_multimachine, master_ip, port, world_size, rank, dev, args, kwargs
  17. ):
  18. """Init distributed process group and run wrapped function."""
  19. init_process_group(
  20. master_ip=master_ip, port=port, world_size=world_size, rank=rank, device=dev
  21. )
  22. if is_multimachine:
  23. group_barrier()
  24. func(*args, **kwargs)
  25. sync()
  26. if is_multimachine:
  27. group_barrier()
  28. class launcher:
  29. """Decorator for launching multiple processes in single-machine multi-gpu training.
  30. :param func: the function you want to launch in distributed mode.
  31. :param n_gpus: how many devices each node.
  32. :param world_size: how many devices totally.
  33. :param rank_start: start number for rank.
  34. :param master_ip: ip address for master node (where the rank 0 is).
  35. :param port: server port for distributed server.
  36. """
  37. def __new__(cls, *args, **kwargs):
  38. if not args:
  39. return functools.partial(cls, **kwargs)
  40. return super().__new__(cls)
  41. def __init__(
  42. self,
  43. func,
  44. n_gpus=None,
  45. world_size=None,
  46. rank_start=0,
  47. master_ip="localhost",
  48. port=0,
  49. ):
  50. self.func = func
  51. self.n_gpus = n_gpus if n_gpus is not None else get_device_count_by_fork("gpu")
  52. self.world_size = world_size if world_size is not None else self.n_gpus
  53. self.rank_start = rank_start
  54. self.master_ip = master_ip
  55. self.port = port
  56. # master node create server
  57. if self.rank_start == 0:
  58. self.server = Server(self.port)
  59. self.port = self.server.py_server_port
  60. else:
  61. assert self.port != 0, "you have to assign a port for distributed server"
  62. def __call__(self, *args, **kwargs):
  63. procs = []
  64. for dev in range(self.n_gpus):
  65. p = mp.Process(
  66. target=_run_wrapped,
  67. args=(
  68. self.func,
  69. self.world_size > self.n_gpus,
  70. self.master_ip,
  71. self.port,
  72. self.world_size,
  73. dev + self.rank_start,
  74. dev,
  75. args,
  76. kwargs,
  77. ),
  78. )
  79. p.start()
  80. procs.append(p)
  81. devs = list(range(self.n_gpus))
  82. while len(devs) > 0:
  83. left = []
  84. # check all processes in one second
  85. time_to_wait = 1.0 / len(devs)
  86. for dev in devs:
  87. procs[dev].join(time_to_wait)
  88. code = procs[dev].exitcode
  89. # terminate processes if one of them has failed
  90. if code != 0 and code != None:
  91. for i in devs:
  92. procs[i].terminate()
  93. assert (
  94. code == 0 or code == None
  95. ), "subprocess {} exit with code {}".format(dev + self.rank_start, code)
  96. if code == None:
  97. left.append(dev)
  98. devs = left

MegEngine 安装包中集成了使用 GPU 运行代码所需的 CUDA 环境,不用区分 CPU 和 GPU 版。 如果想要运行 GPU 程序,请确保机器本身配有 GPU 硬件设备并安装好驱动。 如果你想体验在云端 GPU 算力平台进行深度学习开发的感觉,欢迎访问 MegStudio 平台