diff --git a/gklearn/utils/parallel.py b/gklearn/utils/parallel.py index a1862c0..254df8a 100644 --- a/gklearn/utils/parallel.py +++ b/gklearn/utils/parallel.py @@ -7,60 +7,64 @@ Parallel aid functions. """ import multiprocessing from multiprocessing import Pool -from tqdm import tqdm import sys +from gklearn.utils import get_iters -def parallel_me(func, func_assign, var_to_assign, itr, len_itr=None, init_worker=None, - glbv=None, method=None, n_jobs=None, chunksize=None, itr_desc='', - verbose=True): - ''' - ''' - if method == 'imap_unordered': - if glbv: # global varibles required. -# def init_worker(v_share): -# global G_var -# G_var = v_share - if n_jobs == None: - n_jobs = multiprocessing.cpu_count() - with Pool(processes=n_jobs, initializer=init_worker, - initargs=glbv) as pool: - if chunksize is None: - if len_itr < 100 * n_jobs: - chunksize = int(len_itr / n_jobs) + 1 - else: - chunksize = 100 - for result in (tqdm(pool.imap_unordered(func, itr, chunksize), - desc=itr_desc, file=sys.stdout) if verbose else - pool.imap_unordered(func, itr, chunksize)): - func_assign(result, var_to_assign) - pool.close() - pool.join() - else: - if n_jobs == None: - n_jobs = multiprocessing.cpu_count() - with Pool(processes=n_jobs) as pool: - if chunksize is None: - if len_itr < 100 * n_jobs: - chunksize = int(len_itr / n_jobs) + 1 - else: - chunksize = 100 - for result in (tqdm(pool.imap_unordered(func, itr, chunksize), - desc=itr_desc, file=sys.stdout) if verbose else - pool.imap_unordered(func, itr, chunksize)): - func_assign(result, var_to_assign) - pool.close() - pool.join() - -def parallel_gm(func, Kmatrix, Gn, init_worker=None, glbv=None, - method='imap_unordered', n_jobs=None, chunksize=None, - verbose=True): # @todo: Gn seems not necessary. - from itertools import combinations_with_replacement - def func_assign(result, var_to_assign): - var_to_assign[result[0]][result[1]] = result[2] - var_to_assign[result[1]][result[0]] = result[2] - itr = combinations_with_replacement(range(0, len(Gn)), 2) - len_itr = int(len(Gn) * (len(Gn) + 1) / 2) - parallel_me(func, func_assign, Kmatrix, itr, len_itr=len_itr, - init_worker=init_worker, glbv=glbv, method=method, n_jobs=n_jobs, - chunksize=chunksize, itr_desc='Computing kernels', verbose=verbose) \ No newline at end of file +def parallel_me(func, func_assign, var_to_assign, itr, len_itr=None, init_worker=None, + glbv=None, method=None, n_jobs=None, chunksize=None, itr_desc='', + verbose=True): + ''' + ''' + if method == 'imap_unordered': + if glbv: # global varibles required. +# def init_worker(v_share): +# global G_var +# G_var = v_share + if n_jobs == None: + n_jobs = multiprocessing.cpu_count() + with Pool(processes=n_jobs, initializer=init_worker, + initargs=glbv) as pool: + if chunksize is None: + if len_itr < 100 * n_jobs: + chunksize = int(len_itr / n_jobs) + 1 + else: + chunksize = 100 + + iterator = get_iters(pool.imap_unordered(func, itr, chunksize), + desc=itr_desc, file=sys.stdout, length=len_itr, + verbose=(verbose >= 2)) + for result in iterator: + func_assign(result, var_to_assign) + pool.close() + pool.join() + else: + if n_jobs == None: + n_jobs = multiprocessing.cpu_count() + with Pool(processes=n_jobs) as pool: + if chunksize is None: + if len_itr < 100 * n_jobs: + chunksize = int(len_itr / n_jobs) + 1 + else: + chunksize = 100 + iterator = get_iters(pool.imap_unordered(func, itr, chunksize), + desc=itr_desc, file=sys.stdout, length=len_itr, + verbose=(verbose >= 2)) + for result in iterator: + func_assign(result, var_to_assign) + pool.close() + pool.join() + + +def parallel_gm(func, Kmatrix, Gn, init_worker=None, glbv=None, + method='imap_unordered', n_jobs=None, chunksize=None, + verbose=True): # @todo: Gn seems not necessary. + from itertools import combinations_with_replacement + def func_assign(result, var_to_assign): + var_to_assign[result[0]][result[1]] = result[2] + var_to_assign[result[1]][result[0]] = result[2] + itr = combinations_with_replacement(range(0, len(Gn)), 2) + len_itr = int(len(Gn) * (len(Gn) + 1) / 2) + parallel_me(func, func_assign, Kmatrix, itr, len_itr=len_itr, + init_worker=init_worker, glbv=glbv, method=method, n_jobs=n_jobs, + chunksize=chunksize, itr_desc='Computing kernels', verbose=verbose) \ No newline at end of file