You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

_queue.py 3.5 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. # -*- coding: utf-8 -*-
  2. # MegEngine is Licensed under the Apache License, Version 2.0 (the "License")
  3. #
  4. # Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
  5. #
  6. # Unless required by applicable law or agreed to in writing,
  7. # software distributed under the License is distributed on an
  8. # "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  9. import binascii
  10. import os
  11. import queue
  12. import subprocess
  13. from multiprocessing import Queue
  14. import pyarrow.plasma as plasma
  15. MGE_PLASMA_MEMORY = int(os.environ.get("MGE_PLASMA_MEMORY", 4000000000)) # 4GB
  16. class _PlasmaStoreManager:
  17. def __init__(self):
  18. self.socket_name = "/tmp/mge_plasma_{}".format(
  19. binascii.hexlify(os.urandom(8)).decode()
  20. )
  21. debug_flag = bool(os.environ.get("MGE_DATALOADER_PLASMA_DEBUG", 0))
  22. self.plasma_store = subprocess.Popen(
  23. ["plasma_store", "-s", self.socket_name, "-m", str(MGE_PLASMA_MEMORY),],
  24. stdout=None if debug_flag else subprocess.DEVNULL,
  25. stderr=None if debug_flag else subprocess.DEVNULL,
  26. )
  27. def __del__(self):
  28. if self.plasma_store and self.plasma_store.returncode is None:
  29. self.plasma_store.kill()
  30. # Each process only need to start one plasma store, so we set it as a global variable.
  31. # TODO: how to share between different processes?
  32. MGE_PLASMA_STORE_MANAGER = _PlasmaStoreManager()
  33. class PlasmaShmQueue:
  34. def __init__(self, maxsize: int = 0):
  35. r"""Use pyarrow in-memory plasma store to implement shared memory queue.
  36. Compared to native `multiprocess.Queue`, `PlasmaShmQueue` avoid pickle/unpickle
  37. and communication overhead, leading to better performance in multi-process
  38. application.
  39. :type maxsize: int
  40. :param maxsize: maximum size of the queue, `None` means no limit. (default: ``None``)
  41. """
  42. self.socket_name = MGE_PLASMA_STORE_MANAGER.socket_name
  43. # TODO: how to catch the exception happened in `plasma.connect`?
  44. self.client = None
  45. # Used to store the header for the data.(ObjectIDs)
  46. self.queue = Queue(maxsize) # type: Queue
  47. def put(self, data, block=True, timeout=None):
  48. if self.client is None:
  49. self.client = plasma.connect(self.socket_name)
  50. try:
  51. object_id = self.client.put(data)
  52. except plasma.PlasmaStoreFull:
  53. raise RuntimeError("plasma store out of memory!")
  54. try:
  55. self.queue.put(object_id, block, timeout)
  56. except queue.Full:
  57. self.client.delete([object_id])
  58. raise queue.Full
  59. def get(self, block=True, timeout=None):
  60. if self.client is None:
  61. self.client = plasma.connect(self.socket_name)
  62. object_id = self.queue.get(block, timeout)
  63. if not self.client.contains(object_id):
  64. raise RuntimeError(
  65. "ObjectID: {} not found in plasma store".format(object_id)
  66. )
  67. data = self.client.get(object_id)
  68. self.client.delete([object_id])
  69. return data
  70. def qsize(self):
  71. return self.queue.qsize()
  72. def empty(self):
  73. return self.queue.empty()
  74. def join(self):
  75. self.queue.join()
  76. def disconnect_client(self):
  77. if self.client is not None:
  78. self.client.disconnect()
  79. def close(self):
  80. self.queue.close()
  81. self.disconnect_client()
  82. def cancel_join_thread(self):
  83. self.queue.cancel_join_thread()

MegEngine 安装包中集成了使用 GPU 运行代码所需的 CUDA 环境,不用区分 CPU 和 GPU 版。 如果想要运行 GPU 程序,请确保机器本身配有 GPU 硬件设备并安装好驱动。 如果你想体验在云端 GPU 算力平台进行深度学习开发的感觉,欢迎访问 MegStudio 平台

Contributors (1)