You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

parallel.py 2.3 kB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Tue Dec 11 11:39:46 2018
  5. Parallel aid functions.
  6. @author: ljia
  7. """
  8. import multiprocessing
  9. from multiprocessing import Pool
  10. import sys
  11. from gklearn.utils import get_iters
  12. def parallel_me(func, func_assign, var_to_assign, itr, len_itr=None, init_worker=None,
  13. glbv=None, method=None, n_jobs=None, chunksize=None, itr_desc='',
  14. verbose=True):
  15. '''
  16. '''
  17. if method == 'imap_unordered':
  18. if glbv: # global varibles required.
  19. # def init_worker(v_share):
  20. # global G_var
  21. # G_var = v_share
  22. if n_jobs == None:
  23. n_jobs = multiprocessing.cpu_count()
  24. with Pool(processes=n_jobs, initializer=init_worker,
  25. initargs=glbv) as pool:
  26. if chunksize is None:
  27. if len_itr < 100 * n_jobs:
  28. chunksize = int(len_itr / n_jobs) + 1
  29. else:
  30. chunksize = 100
  31. iterator = get_iters(pool.imap_unordered(func, itr, chunksize),
  32. desc=itr_desc, file=sys.stdout, length=len_itr,
  33. verbose=(verbose >= 2))
  34. for result in iterator:
  35. func_assign(result, var_to_assign)
  36. pool.close()
  37. pool.join()
  38. else:
  39. if n_jobs == None:
  40. n_jobs = multiprocessing.cpu_count()
  41. with Pool(processes=n_jobs) as pool:
  42. if chunksize is None:
  43. if len_itr < 100 * n_jobs:
  44. chunksize = int(len_itr / n_jobs) + 1
  45. else:
  46. chunksize = 100
  47. iterator = get_iters(pool.imap_unordered(func, itr, chunksize),
  48. desc=itr_desc, file=sys.stdout, length=len_itr,
  49. verbose=(verbose >= 2))
  50. for result in iterator:
  51. func_assign(result, var_to_assign)
  52. pool.close()
  53. pool.join()
  54. def parallel_gm(func, Kmatrix, Gn, init_worker=None, glbv=None,
  55. method='imap_unordered', n_jobs=None, chunksize=None,
  56. verbose=True): # @todo: Gn seems not necessary.
  57. from itertools import combinations_with_replacement
  58. def func_assign(result, var_to_assign):
  59. var_to_assign[result[0]][result[1]] = result[2]
  60. var_to_assign[result[1]][result[0]] = result[2]
  61. itr = combinations_with_replacement(range(0, len(Gn)), 2)
  62. len_itr = int(len(Gn) * (len(Gn) + 1) / 2)
  63. parallel_me(func, func_assign, Kmatrix, itr, len_itr=len_itr,
  64. init_worker=init_worker, glbv=glbv, method=method, n_jobs=n_jobs,
  65. chunksize=chunksize, itr_desc='Computing kernels', verbose=verbose)

A Python package for graph kernels, graph edit distances and graph pre-image problem.