You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

dataloader.py 25 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703
  1. # -*- coding: utf-8 -*-
  2. import collections
  3. import gc
  4. import itertools
  5. import multiprocessing
  6. import os
  7. import platform
  8. import queue
  9. import random
  10. import threading
  11. import time
  12. import numpy as np
  13. from ..device import _sh, get_default_device
  14. from ..functional.tensor import copy
  15. from ..logger import get_logger
  16. from ..random.rng import _random_seed_generator
  17. from ..tensor import Tensor
  18. from .collator import Collator
  19. from .dataset import Dataset, StreamDataset
  20. from .sampler import MapSampler, Sampler, SequentialSampler, StreamSampler
  21. from .transform import PseudoTransform, Transform
  22. try:
  23. import thread
  24. except:
  25. import _thread as thread
  26. logger = get_logger(__name__)
  27. GLOBAL_TIMEOUT = 5
  28. def raise_timeout_error():
  29. raise RuntimeError("dataloader timeout")
  30. class DataLoader:
  31. r"""Provides a convenient way to iterate on a given dataset.
  32. The process is as follows:
  33. .. mermaid::
  34. :align: center
  35. flowchart LR
  36. Dataset.__len__ -- Sampler --> Indices
  37. batch_size -- Sampler --> Indices
  38. Indices -- Dataset.__getitem__ --> Samples
  39. Samples -- Transform + Collator --> mini-batch
  40. DataLoader combines a :class:`~.Dataset` with
  41. :class:`~.Sampler`, :class:`~.Transform` and :class:`~.Collator`,
  42. make it flexible to get minibatch continually from a dataset.
  43. See :ref:`data-guide` for more details.
  44. Args:
  45. dataset: dataset from which to load the minibatch.
  46. sampler: defines the strategy to sample data from the dataset.
  47. If ``None``, it will sequentially sample from the dataset one by one.
  48. transform: defined the transforming strategy for a sampled batch.
  49. collator: defined the merging strategy for a transformed batch.
  50. num_workers: the number of sub-process to load, transform and collate
  51. the batch. ``0`` means using single-process. Default: 0
  52. timeout: if positive, means the timeout value(second) for collecting a
  53. batch from workers. Default: 0
  54. preload: whether to enable the preloading strategy of the dataloader.
  55. When enabling, the dataloader will preload one batch to the device memory to speed up the whole training process.
  56. parallel_stream: whether to splitting workload across all workers when dataset is streamdataset and num_workers > 0.
  57. When enabling, each worker will collect data from different dataset in order to speed up the whole loading process.
  58. See ref:`streamdataset-example` for more details
  59. .. admonition:: The effect of enabling preload
  60. :class: warning
  61. * All elements in :class:`map`, :class:`list`, and :class:`tuple` will be converted to :class:`~.Tensor` by preloading,
  62. and you will get :class:`~.Tensor` instead of the original Numpy array or Python built-in data structrure.
  63. * Tensors' host2device copy and device kernel execution will be overlapped,
  64. which will improve the training speed at the cost of **higher device memory usage** (due to one more batch data on device memory).
  65. This feature saves more time when your NN training time is short or your machine's host PCIe bandwidth for each device is low.
  66. """
  67. def __init__(
  68. self,
  69. dataset: Dataset,
  70. sampler: Sampler = None,
  71. transform: Transform = None,
  72. collator: Collator = None,
  73. num_workers: int = 0,
  74. timeout: int = 0,
  75. preload: bool = False,
  76. parallel_stream: bool = False,
  77. ):
  78. if num_workers < 0:
  79. raise ValueError("num_workers should not be negative")
  80. if timeout < 0:
  81. raise ValueError("timeout should not be negative")
  82. self.dataset = dataset
  83. self.num_workers = num_workers
  84. self.timeout = timeout
  85. self.preload = preload
  86. self.parallel_stream = parallel_stream
  87. if isinstance(dataset, StreamDataset):
  88. self.sampler = sampler if sampler else StreamSampler(batch_size=1)
  89. assert isinstance(
  90. self.sampler, StreamSampler
  91. ), "types of dataset and sampler do not match"
  92. if parallel_stream is False and self.num_workers > 1:
  93. logger.warning(
  94. "Data time will be affected by getting origin-data, please set parallel_stream in order to speed up dataloader!"
  95. )
  96. self.datakind = "stream"
  97. else:
  98. assert isinstance(
  99. dataset, Dataset
  100. ), "Can not recognize this kind of dataset: %s" % type(dataset)
  101. self.sampler = (
  102. sampler
  103. if sampler
  104. else SequentialSampler(dataset, batch_size=1, drop_last=False)
  105. )
  106. assert isinstance(
  107. self.sampler, MapSampler
  108. ), "types of dataset and sampler do not match"
  109. self.datakind = "map"
  110. if transform is None:
  111. self.transform = PseudoTransform()
  112. else:
  113. self.transform = transform
  114. if collator is None:
  115. self.collator = Collator()
  116. else:
  117. self.collator = collator
  118. if platform.system() == "Linux" and self.num_workers > 0:
  119. self.check_memory_rationality()
  120. def __iter__(self):
  121. if platform.system() == "Windows" and self.num_workers > 0:
  122. print(
  123. "pyarrow.plasma does not support ParallelDataLoader on windows, changing num_workers to be zero"
  124. )
  125. self.num_workers = 0
  126. if os.getenv("TERMUX_VERSION"):
  127. # FIXME: termux install pyarrow will build error now
  128. # remove this logic after pyarrow fix this issue
  129. print(
  130. "pyarrow do not support on termux env now, changing num_workers to be zero"
  131. )
  132. self.num_workers = 0
  133. if isinstance(self.dataset, StreamDataset):
  134. if not self.num_workers:
  135. return _SerialStreamDataLoaderIter(self, self.preload)
  136. else:
  137. return _ParallelStreamDataLoaderIter(self, self.preload)
  138. else:
  139. assert isinstance(
  140. self.dataset, Dataset
  141. ), "Can not recognize this kind of dataset: %s" % type(self.dataset)
  142. if not self.num_workers:
  143. return _SerialMapDataLoaderIter(self, self.preload)
  144. else:
  145. return _ParallelMapDataLoaderIter(self, self.preload)
  146. def __len__(self):
  147. return len(self.sampler)
  148. def check_memory_rationality(self):
  149. import psutil
  150. main_memory = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024 / 1024
  151. total_memory = (self.num_workers + 1) * main_memory
  152. current_memory = (
  153. int(os.popen("cat /sys/fs/cgroup/memory/memory.limit_in_bytes").read())
  154. / 1024
  155. / 1024
  156. / 1024
  157. )
  158. if current_memory < total_memory:
  159. logger.warning(
  160. "Each worker need to read the shared meta-data, which will be increasing the reference count."
  161. "Copy-On-Write propety will lead to 'memory leak', the memory usage will end up being "
  162. + str(total_memory)
  163. + " GB, "
  164. "However the current requested memory is "
  165. + str(current_memory)
  166. + " GB, "
  167. "Maybe you can request more memory or uesd np-array to save meta-data rather than List or Tuple"
  168. )
  169. class PreLoader:
  170. def __init__(self, loader, preload):
  171. self.dataset = loader.dataset
  172. self.sampler = loader.sampler
  173. self.seed = _random_seed_generator().__next__()
  174. self.transform = loader.transform
  175. self.collator = loader.collator
  176. self.num_workers = loader.num_workers
  177. self.timeout = loader.timeout
  178. self.num_processed = 0
  179. self.datakind = loader.datakind
  180. self.parallel_stream = loader.parallel_stream
  181. if preload:
  182. self.default_device = get_default_device()
  183. self.pre_load_device = self.default_device + ":" + str(_sh.get_next())
  184. self.pre_load_device_cache = None
  185. self.preload = preload
  186. def __iter__(self):
  187. return self
  188. """
  189. strategy one: load from numpy data, and generate dtype tensor
  190. """
  191. def _load_tensor(self, batch, cached=True):
  192. if isinstance(batch, np.ndarray):
  193. device = self.pre_load_device if cached else self.default_device
  194. return Tensor(batch, device=device)
  195. elif isinstance(batch, collections.abc.Mapping):
  196. return {k: self._load_tensor(v, cached) for k, v in batch.items()}
  197. elif isinstance(batch, tuple) and hasattr(batch, "_fields"): # namedtuple
  198. return type(batch)(*(self._load_tensor(value, cached) for value in batch))
  199. elif isinstance(batch, collections.abc.Sequence):
  200. return [self._load_tensor(value, cached) for value in batch]
  201. else:
  202. return batch
  203. """
  204. strategy two: load from cache that is already tensor just do d2d copy
  205. """
  206. def _load_cache(self, data):
  207. if isinstance(data, Tensor):
  208. if data.device == self.default_device:
  209. return data
  210. return copy(data, device=self.default_device)
  211. elif isinstance(data, collections.abc.Mapping):
  212. return {k: self._load_cache(v) for k, v in data.items()}
  213. elif isinstance(data, tuple) and hasattr(data, "_fields"): # namedtuple
  214. return type(data)(*(self._load_cache(value) for value in data))
  215. elif isinstance(data, collections.abc.Sequence):
  216. return [self._load_cache(value) for value in data]
  217. else:
  218. return data
  219. def _swap_out_cache(self):
  220. out = self._load_cache(self.pre_load_device_cache)
  221. self.pre_load_device_cache = None # clean cache
  222. return out
  223. class _ParallelDataLoaderIter:
  224. def __init__(self):
  225. self._worker_queue_idx_cycle = itertools.cycle(range(self.num_workers))
  226. from .tools._queue import PlasmaShmQueue
  227. self._worker_result_queue = PlasmaShmQueue()
  228. self._shutdown = False
  229. self._workers_done_event = multiprocessing.Event()
  230. self._index_queues = []
  231. self._workers = []
  232. for i in range(self.num_workers):
  233. index_queue = multiprocessing.Queue()
  234. index_queue.cancel_join_thread()
  235. w = multiprocessing.Process(
  236. target=_worker_loop,
  237. args=(
  238. self.dataset,
  239. index_queue,
  240. self._worker_result_queue,
  241. self._workers_done_event,
  242. self.transform,
  243. self.collator,
  244. self.sampler.batch_size,
  245. self.seed + i,
  246. i,
  247. self.num_workers,
  248. self.datakind,
  249. self.parallel_stream,
  250. ),
  251. daemon=True,
  252. )
  253. gc.collect()
  254. w.start()
  255. self._index_queues.append(index_queue)
  256. self._workers.append(w)
  257. self._data_queue = self._worker_result_queue
  258. self._reset()
  259. def _try_put_index(self):
  260. raise NotImplementedError
  261. def _reset(self):
  262. self._sampler_iter = iter(self.sampler)
  263. self._send_idx = 0
  264. self._rcvd_idx = 0
  265. self._task_info = {}
  266. self._workers_status = [True for _ in range(self.num_workers)]
  267. for _ in range(2 * self.num_workers):
  268. self._try_put_index()
  269. def _process_data(self, data):
  270. self._rcvd_idx += 1
  271. self._try_put_index()
  272. return data
  273. def _get_data(self):
  274. if self.timeout > 0:
  275. success, data = self._try_get_data(self.timeout)
  276. if success:
  277. return data
  278. else:
  279. raise_timeout_error()
  280. else:
  281. while True:
  282. success, data = self._try_get_data()
  283. if success:
  284. return data
  285. def _get_next_batch(self):
  286. while True:
  287. while self._rcvd_idx < self._send_idx:
  288. info = self._task_info[self._rcvd_idx]
  289. worker_id = info[0]
  290. if (
  291. len(info) == 2 or self._workers_status[worker_id]
  292. ): # has data or work is still active
  293. break
  294. del self._task_info[self._rcvd_idx]
  295. self._rcvd_idx += 1
  296. else:
  297. self._shutdown_workers()
  298. raise StopIteration
  299. if len(self._task_info[self._rcvd_idx]) == 2:
  300. data = self._task_info.pop(self._rcvd_idx)[1]
  301. return self._process_data(data)
  302. idx, data = self._get_data()
  303. if isinstance(data, int): # Check if StopIteration in StreamDataset
  304. self._mark_worker_as_unavailable(data)
  305. self._try_put_index()
  306. continue
  307. if idx != self._rcvd_idx:
  308. self._task_info[idx] += (data,)
  309. else:
  310. del self._task_info[idx]
  311. return self._process_data(data)
  312. def _try_get_data(self, timeout=GLOBAL_TIMEOUT):
  313. try:
  314. data = self._data_queue.get(timeout=timeout)
  315. return (True, data)
  316. except Exception as e:
  317. failed_workers = []
  318. for worker_id, w in enumerate(self._workers):
  319. if self._workers_status[worker_id] and not w.is_alive():
  320. failed_workers.append((worker_id, w))
  321. self._mark_worker_as_unavailable(worker_id)
  322. if w.exitcode == -9:
  323. logger.debug(
  324. "Maybe memory is not enough, please request for more memory!"
  325. )
  326. if len(failed_workers) > 0:
  327. pids_str = ", ".join(str(w_info[1].pid) for w_info in failed_workers)
  328. w_ids_str = ", ".join(str(w_info[0]) for w_info in failed_workers)
  329. exitcode_str = ", ".join(
  330. str(w_info[1].exitcode) for w_info in failed_workers
  331. )
  332. raise RuntimeError(
  333. "DataLoader worker (worker(s): {} , pid(s): {}) exited unexpectedly, exitcode(s): {}".format(
  334. w_ids_str, pids_str, exitcode_str
  335. )
  336. )
  337. if isinstance(e, queue.Empty):
  338. return (False, None)
  339. def _mark_worker_as_unavailable(self, worker_id, shutdown=False):
  340. q = self._index_queues[worker_id]
  341. q.put(None)
  342. self._workers_status[worker_id] = False
  343. assert self._workers_done_event.is_set() == shutdown
  344. def _shutdown_workers(self):
  345. if not self._shutdown:
  346. self._shutdown = True
  347. try:
  348. self._workers_done_event.set()
  349. for worker_id in range(len(self._workers)):
  350. if self._workers_status[worker_id]:
  351. self._mark_worker_as_unavailable(worker_id, shutdown=True)
  352. for w in self._workers:
  353. w.join(timeout=GLOBAL_TIMEOUT)
  354. for q in self._index_queues:
  355. q.cancel_join_thread()
  356. q.close()
  357. self._data_queue.cancel_join_thread()
  358. self._data_queue.close()
  359. finally:
  360. for w in self._workers:
  361. if w.is_alive():
  362. w.terminate()
  363. def __del__(self):
  364. self._shutdown_workers()
  365. class _BaseMapDataLoaderIter(PreLoader):
  366. def __init__(self, loader, preload):
  367. super().__init__(loader, preload)
  368. def __len__(self):
  369. return len(self.sampler)
  370. def __next__(self):
  371. if self.preload:
  372. cached = self.pre_load_device_cache
  373. if cached is None: # first and last
  374. if self.num_processed >= len(self): # last
  375. raise StopIteration
  376. elif self.num_processed == 0: # first
  377. self._try_load_tensor(cached=False) # first do the h2d
  378. out = self._swap_out_cache()
  379. self._try_load_tensor()
  380. return out
  381. else:
  382. data = self._get_next_batch()
  383. return data
  384. def _try_load_tensor(self, cached=True):
  385. if self.num_processed >= len(self):
  386. return
  387. else:
  388. self.num_processed += 1
  389. batch = self._get_next_batch()
  390. self.pre_load_device_cache = self._load_tensor(batch, cached)
  391. class _SerialMapDataLoaderIter(_BaseMapDataLoaderIter):
  392. def __init__(self, loader, preload):
  393. super(_SerialMapDataLoaderIter, self).__init__(loader, preload)
  394. self._sampler_iter = iter(self.sampler)
  395. def _get_next_batch(self):
  396. indices = next(self._sampler_iter)
  397. items = [self.dataset[idx] for idx in indices]
  398. trans_items = self.transform.apply_batch(items)
  399. return self.collator.apply(trans_items)
  400. class _ParallelMapDataLoaderIter(_BaseMapDataLoaderIter, _ParallelDataLoaderIter):
  401. def __init__(self, loader, preload):
  402. _BaseMapDataLoaderIter.__init__(self, loader, preload)
  403. _ParallelDataLoaderIter.__init__(self)
  404. def _try_put_index(self):
  405. try:
  406. index = next(self._sampler_iter)
  407. except StopIteration:
  408. return
  409. for _ in range(self.num_workers): # find the next active worker, if any
  410. worker_queue_idx = next(self._worker_queue_idx_cycle)
  411. if self._workers_status[worker_queue_idx]:
  412. break
  413. self._index_queues[worker_queue_idx].put((self._send_idx, index))
  414. self._task_info[self._send_idx] = (worker_queue_idx,)
  415. self._send_idx += 1
  416. _worker_info = None
  417. class WorkerInfo(object):
  418. __initialized = False
  419. def __init__(self, **kwargs):
  420. for k, v in kwargs.items():
  421. setattr(self, k, v)
  422. self.__keys = tuple(kwargs.keys())
  423. self.__initialized = True
  424. def __setattr__(self, key, val):
  425. if self.__initialized:
  426. raise RuntimeError(
  427. "Cannot assign attributes to {} objects".format(self.__class__.__name__)
  428. )
  429. return super(WorkerInfo, self).__setattr__(key, val)
  430. def __repr__(self):
  431. items = []
  432. for k in self.__keys:
  433. items.append("{}={}".format(k, getattr(self, k)))
  434. return "{}({})".format(self.__class__.__name__, ", ".join(items))
  435. def get_worker_info():
  436. return _worker_info
  437. class _BaseStreamDataLoaderIter(PreLoader):
  438. def __init__(self, loader, preload):
  439. super().__init__(loader, preload)
  440. self.dataset_iter = iter(self.dataset)
  441. def __next__(self):
  442. if self.preload:
  443. if self.pre_load_device_cache is None:
  444. self._try_load_tensor(cached=False) # load in current
  445. out = self._swap_out_cache()
  446. self._try_load_tensor() # load in cached
  447. return out
  448. else:
  449. return self._get_next_batch()
  450. def _try_load_tensor(self, cached=True):
  451. batch = self._get_next_batch()
  452. self.pre_load_device_cache = self._load_tensor(batch, cached)
  453. class _SerialStreamDataLoaderIter(_BaseStreamDataLoaderIter):
  454. def __init__(self, loader, preload):
  455. super().__init__(loader, preload)
  456. self.dataset_iter = iter(self.dataset)
  457. def _try_get_raw_data(self, start_time):
  458. raw_data = None
  459. while not raw_data:
  460. try:
  461. if self.timeout > 0:
  462. timer = threading.Timer(self.timeout, thread.interrupt_main)
  463. timer.start()
  464. raw_data = next(self.dataset_iter)
  465. if self.timeout > 0:
  466. timer.cancel()
  467. except AttributeError as error:
  468. raise error
  469. except:
  470. if self.timeout > 0:
  471. timer.cancel()
  472. waited_time = time.time() - start_time
  473. if waited_time > self.timeout:
  474. raise_timeout_error()
  475. return raw_data
  476. def _get_next_batch(self):
  477. ret = []
  478. start_time = time.time()
  479. while len(ret) < self.sampler.batch_size:
  480. raw_data = self._try_get_raw_data(start_time)
  481. ret.append(self.transform.apply(raw_data))
  482. return self.collator.apply(ret)
  483. class _ParallelStreamDataLoaderIter(_BaseStreamDataLoaderIter, _ParallelDataLoaderIter):
  484. def __init__(self, loader, preload):
  485. _BaseStreamDataLoaderIter.__init__(self, loader, preload)
  486. _ParallelDataLoaderIter.__init__(self)
  487. def _get_remaind_data(self, place_holder):
  488. num = self.sampler.batch_size
  489. for _ in range(num - 1):
  490. place_holder.append(next(self.dataset_iter))
  491. return place_holder
  492. def _try_put_index(self):
  493. try:
  494. if self.parallel_stream is False:
  495. start_time = time.time()
  496. place_holder = [next(self.dataset_iter)]
  497. waited_time = time.time() - start_time
  498. if self.timeout > 0 and waited_time > self.timeout:
  499. raise_timeout_error()
  500. place_holder = self._get_remaind_data(place_holder)
  501. else:
  502. place_holder = next(self._sampler_iter)
  503. except StopIteration:
  504. return
  505. for _ in range(self.num_workers):
  506. worker_queue_idx = next(self._worker_queue_idx_cycle)
  507. if self._workers_status[worker_queue_idx]:
  508. break
  509. else:
  510. return
  511. self._index_queues[worker_queue_idx].put((self._send_idx, place_holder))
  512. self._task_info[self._send_idx] = (worker_queue_idx,)
  513. self._send_idx += 1
  514. class ManagerWatchdog(object):
  515. def __init__(self):
  516. self.manager_pid = os.getppid()
  517. self.manager_dead = False
  518. def is_alive(self):
  519. if not self.manager_dead:
  520. self.manager_dead = os.getppid() != self.manager_pid
  521. return not self.manager_dead
  522. def stream_fetcher(
  523. dataset_iter, place_holder, transform, collate, parallel_stream, batch_size
  524. ):
  525. data = []
  526. for idx in place_holder:
  527. try:
  528. if parallel_stream is False:
  529. raw_data = idx
  530. else:
  531. raw_data = next(dataset_iter)
  532. trans_items = transform.apply(raw_data)
  533. data.append(trans_items)
  534. except StopIteration:
  535. break
  536. if len(data) == 0:
  537. raise StopIteration
  538. data = collate.apply(data)
  539. return data
  540. def map_fetcher(dataset, place_holder, transform, collate, parallel_stream, batch_size):
  541. items = [dataset[idx] for idx in place_holder]
  542. trans_items = transform.apply_batch(items)
  543. data = collate.apply(trans_items)
  544. return data
  545. def _worker_loop(
  546. dataset,
  547. index_queue,
  548. data_queue,
  549. done_event,
  550. transform,
  551. collate,
  552. batch_size,
  553. seed,
  554. worker_id,
  555. num_workers,
  556. datakind,
  557. parallel_stream,
  558. ):
  559. random.seed(seed)
  560. np.random.seed(seed)
  561. watchdog = ManagerWatchdog()
  562. iteration_end = False
  563. fetcher = map_fetcher
  564. if datakind == "stream":
  565. global _worker_info
  566. _worker_info = WorkerInfo(idx=worker_id, worker=num_workers, seed=seed)
  567. dataset = iter(dataset)
  568. fetcher = stream_fetcher
  569. while watchdog.is_alive():
  570. try:
  571. r = index_queue.get(timeout=GLOBAL_TIMEOUT)
  572. except queue.Empty:
  573. continue
  574. if r is None:
  575. assert done_event.is_set() or iteration_end
  576. break
  577. elif done_event.is_set() or iteration_end:
  578. continue
  579. idx, place_holder = r
  580. try:
  581. data = fetcher(
  582. dataset, place_holder, transform, collate, parallel_stream, batch_size
  583. )
  584. except Exception as e:
  585. if isinstance(e, StopIteration) and datakind == "stream":
  586. data = worker_id
  587. iteration_end = True
  588. else:
  589. raise e
  590. data_queue.put((idx, data))
  591. del data, idx, place_holder, r
  592. if done_event.is_set():
  593. data_queue.disconnect_client()
  594. data_queue.close()