diff --git a/lang/fr/gklearn/utils/parallel.py b/lang/fr/gklearn/utils/parallel.py new file mode 100644 index 0000000..71bb47c --- /dev/null +++ b/lang/fr/gklearn/utils/parallel.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Tue Dec 11 11:39:46 2018 +Parallel aid functions. +@author: ljia +""" +import multiprocessing +from multiprocessing import Pool +from tqdm import tqdm +import sys + +def parallel_me(func, func_assign, var_to_assign, itr, len_itr=None, init_worker=None, + glbv=None, method=None, n_jobs=None, chunksize=None, itr_desc='', + verbose=True): + ''' + ''' + if method == 'imap_unordered': + if glbv: # global varibles required. +# def init_worker(v_share): +# global G_var +# G_var = v_share + if n_jobs == None: + n_jobs = multiprocessing.cpu_count() + with Pool(processes=n_jobs, initializer=init_worker, + initargs=glbv) as pool: + if chunksize is None: + if len_itr < 100 * n_jobs: + chunksize = int(len_itr / n_jobs) + 1 + else: + chunksize = 100 + for result in (tqdm(pool.imap_unordered(func, itr, chunksize), + desc=itr_desc, file=sys.stdout) if verbose else + pool.imap_unordered(func, itr, chunksize)): + func_assign(result, var_to_assign) + pool.close() + pool.join() + else: + if n_jobs == None: + n_jobs = multiprocessing.cpu_count() + with Pool(processes=n_jobs) as pool: + if chunksize is None: + if len_itr < 100 * n_jobs: + chunksize = int(len_itr / n_jobs) + 1 + else: + chunksize = 100 + for result in (tqdm(pool.imap_unordered(func, itr, chunksize), + desc=itr_desc, file=sys.stdout) if verbose else + pool.imap_unordered(func, itr, chunksize)): + func_assign(result, var_to_assign) + pool.close() + pool.join() + + +def parallel_gm(func, Kmatrix, Gn, init_worker=None, glbv=None, + method='imap_unordered', n_jobs=None, chunksize=None, + verbose=True): # @todo: Gn seems not necessary. + from itertools import combinations_with_replacement + def func_assign(result, var_to_assign): + var_to_assign[result[0]][result[1]] = result[2] + var_to_assign[result[1]][result[0]] = result[2] + itr = combinations_with_replacement(range(0, len(Gn)), 2) + len_itr = int(len(Gn) * (len(Gn) + 1) / 2) + parallel_me(func, func_assign, Kmatrix, itr, len_itr=len_itr, + init_worker=init_worker, glbv=glbv, method=method, n_jobs=n_jobs, + chunksize=chunksize, itr_desc='calculating kernels', verbose=verbose) \ No newline at end of file