You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

_queue.py 4.9 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. # -*- coding: utf-8 -*-
  2. import binascii
  3. import os
  4. import queue
  5. import subprocess
  6. from multiprocessing import Queue
  7. import pyarrow
  8. import pyarrow.plasma as plasma
  9. MGE_PLASMA_MEMORY = int(os.environ.get("MGE_PLASMA_MEMORY", 4000000000)) # 4GB
  10. # Each process only need to start one plasma store, so we set it as a global variable.
  11. # TODO: how to share between different processes?
  12. MGE_PLASMA_STORE_MANAGER = None
  13. def _clear_plasma_store():
  14. # `_PlasmaStoreManager.__del__` will not be called automaticly in subprocess,
  15. # so this function should be called explicitly
  16. global MGE_PLASMA_STORE_MANAGER
  17. if MGE_PLASMA_STORE_MANAGER is not None and MGE_PLASMA_STORE_MANAGER.refcount == 0:
  18. del MGE_PLASMA_STORE_MANAGER
  19. MGE_PLASMA_STORE_MANAGER = None
  20. class _PlasmaStoreManager:
  21. __initialized = False
  22. def __init__(self):
  23. self.socket_name = "/tmp/mge_plasma_{}".format(
  24. binascii.hexlify(os.urandom(8)).decode()
  25. )
  26. debug_flag = bool(os.environ.get("MGE_DATALOADER_PLASMA_DEBUG", 0))
  27. # NOTE: this is a hack. Directly use `plasma_store` may make subprocess
  28. # difficult to handle the exception happened in `plasma-store-server`.
  29. # For `plasma_store` is just a wrapper of `plasma-store-server`, which use
  30. # `os.execv` to call the executable `plasma-store-server`.
  31. cmd_path = os.path.join(pyarrow.__path__[0], "plasma-store-server")
  32. self.plasma_store = subprocess.Popen(
  33. [cmd_path, "-s", self.socket_name, "-m", str(MGE_PLASMA_MEMORY),],
  34. stdout=None if debug_flag else subprocess.DEVNULL,
  35. stderr=None if debug_flag else subprocess.DEVNULL,
  36. )
  37. self.__initialized = True
  38. self.refcount = 1
  39. def __del__(self):
  40. if self.__initialized and self.plasma_store.returncode is None:
  41. self.plasma_store.kill()
  42. class PlasmaShmQueue:
  43. def __init__(self, maxsize: int = 0):
  44. r"""Use pyarrow in-memory plasma store to implement shared memory queue.
  45. Compared to native `multiprocess.Queue`, `PlasmaShmQueue` avoid pickle/unpickle
  46. and communication overhead, leading to better performance in multi-process
  47. application.
  48. Args:
  49. maxsize: maximum size of the queue, `None` means no limit. (default: ``None``)
  50. """
  51. # Lazy start the plasma store manager
  52. global MGE_PLASMA_STORE_MANAGER
  53. if MGE_PLASMA_STORE_MANAGER is None:
  54. try:
  55. MGE_PLASMA_STORE_MANAGER = _PlasmaStoreManager()
  56. except Exception as e:
  57. err_info = (
  58. "Please make sure pyarrow installed correctly!\n"
  59. "You can try reinstall pyarrow and see if you can run "
  60. "`plasma_store -s /tmp/mge_plasma_xxx -m 1000` normally."
  61. )
  62. raise RuntimeError(
  63. "Exception happened in starting plasma_store: {}\n"
  64. "Tips: {}".format(str(e), err_info)
  65. )
  66. else:
  67. MGE_PLASMA_STORE_MANAGER.refcount += 1
  68. self.socket_name = MGE_PLASMA_STORE_MANAGER.socket_name
  69. # TODO: how to catch the exception happened in `plasma.connect`?
  70. self.client = None
  71. # Used to store the header for the data.(ObjectIDs)
  72. self.queue = Queue(maxsize) # type: Queue
  73. def put(self, data, block=True, timeout=None):
  74. if self.client is None:
  75. self.client = plasma.connect(self.socket_name)
  76. try:
  77. object_id = self.client.put(data)
  78. except plasma.PlasmaStoreFull:
  79. raise RuntimeError("plasma store out of memory!")
  80. try:
  81. self.queue.put(object_id, block, timeout)
  82. except queue.Full:
  83. self.client.delete([object_id])
  84. raise queue.Full
  85. def get(self, block=True, timeout=None):
  86. if self.client is None:
  87. self.client = plasma.connect(self.socket_name)
  88. object_id = self.queue.get(block, timeout)
  89. if not self.client.contains(object_id):
  90. raise RuntimeError(
  91. "ObjectID: {} not found in plasma store".format(object_id)
  92. )
  93. data = self.client.get(object_id)
  94. self.client.delete([object_id])
  95. return data
  96. def qsize(self):
  97. return self.queue.qsize()
  98. def empty(self):
  99. return self.queue.empty()
  100. def join(self):
  101. self.queue.join()
  102. def disconnect_client(self):
  103. if self.client is not None:
  104. self.client.disconnect()
  105. def close(self):
  106. self.queue.close()
  107. self.disconnect_client()
  108. global MGE_PLASMA_STORE_MANAGER
  109. MGE_PLASMA_STORE_MANAGER.refcount -= 1
  110. _clear_plasma_store()
  111. def cancel_join_thread(self):
  112. self.queue.cancel_join_thread()