Browse Source

refactor(mge/dataloader): make plasma_store starting more robust

GitOrigin-RevId: e1f78db99a
tags/v0.4.0
Megvii Engine Team Xinran Xu 5 years ago
parent
commit
6ffcfb4ce2
1 changed files with 21 additions and 7 deletions
  1. +21
    -7
      python_module/megengine/data/_queue.py

+ 21
- 7
python_module/megengine/data/_queue.py View File

@@ -12,6 +12,7 @@ import queue
import subprocess
from multiprocessing import Queue

import pyarrow
import pyarrow.plasma as plasma

MGE_PLASMA_MEMORY = int(os.environ.get("MGE_PLASMA_MEMORY", 4000000000)) # 4GB
@@ -22,7 +23,7 @@ MGE_PLASMA_STORE_MANAGER = None


def _clear_plasma_store():
# `_PlasmaStoreManager.__del__` will not ne called automaticly in subprocess,
# `_PlasmaStoreManager.__del__` will not be called automaticly in subprocess,
# so this function should be called explicitly
global MGE_PLASMA_STORE_MANAGER
if MGE_PLASMA_STORE_MANAGER is not None:
@@ -31,19 +32,27 @@ def _clear_plasma_store():


class _PlasmaStoreManager:
__initialized = False

def __init__(self):
self.socket_name = "/tmp/mge_plasma_{}".format(
binascii.hexlify(os.urandom(8)).decode()
)
debug_flag = bool(os.environ.get("MGE_DATALOADER_PLASMA_DEBUG", 0))
# NOTE: this is a hack. Directly use `plasma_store` may make subprocess
# difficult to handle the exception happened in `plasma-store-server`.
# For `plasma_store` is just a wrapper of `plasma-store-server`, which use
# `os.execv` to call the executable `plasma-store-server`.
cmd_path = os.path.join(pyarrow.__path__[0], "plasma-store-server")
self.plasma_store = subprocess.Popen(
["plasma_store", "-s", self.socket_name, "-m", str(MGE_PLASMA_MEMORY),],
[cmd_path, "-s", self.socket_name, "-m", str(MGE_PLASMA_MEMORY),],
stdout=None if debug_flag else subprocess.DEVNULL,
stderr=None if debug_flag else subprocess.DEVNULL,
)
self.__initialized = True

def __del__(self):
if self.plasma_store and self.plasma_store.returncode is None:
if self.__initialized and self.plasma_store.returncode is None:
self.plasma_store.kill()


@@ -64,10 +73,15 @@ class PlasmaShmQueue:
if MGE_PLASMA_STORE_MANAGER is None:
try:
MGE_PLASMA_STORE_MANAGER = _PlasmaStoreManager()
except FileNotFoundError as e:
raise FileNotFoundError(
"command 'plasma_store' not found in your $PATH!"
"Please make sure pyarrow installed and add into $PATH."
except Exception as e:
err_info = (
"Please make sure pyarrow installed correctly!\n"
"You can try reinstall pyarrow and see if you can run "
"`plasma_store -s /tmp/mge_plasma_xxx -m 1000` normally."
)
raise RuntimeError(
"Exception happened in starting plasma_store: {}\n"
"Tips: {}".format(str(e), err_info)
)

self.socket_name = MGE_PLASMA_STORE_MANAGER.socket_name


Loading…
Cancel
Save