diff --git a/pygraph/kernels/structuralspKernel.py b/pygraph/kernels/structuralspKernel.py index dc5da88..ba2f86a 100644 --- a/pygraph/kernels/structuralspKernel.py +++ b/pygraph/kernels/structuralspKernel.py @@ -32,6 +32,8 @@ def structuralspkernel(*args, node_kernels=None, edge_kernels=None, compute_method='naive', +# parallel='imap_unordered', + parallel=None, n_jobs=None, verbose=True): """Calculate mean average structural shortest path kernels between graphs. @@ -112,29 +114,42 @@ def structuralspkernel(*args, start_time = time.time() # get shortest paths of each graph in Gn - splist = [None] * len(Gn) - pool = Pool(n_jobs) - itr = zip(Gn, range(0, len(Gn))) - if len(Gn) < 100 * n_jobs: - chunksize = int(len(Gn) / n_jobs) + 1 - else: - chunksize = 100 - # get shortest path graphs of Gn - if compute_method == 'trie': - getsp_partial = partial(wrapper_getSP_trie, weight, ds_attrs['is_directed']) - else: - getsp_partial = partial(wrapper_getSP_naive, weight, ds_attrs['is_directed']) - if verbose: - iterator = tqdm(pool.imap_unordered(getsp_partial, itr, chunksize), - desc='getting shortest paths', file=sys.stdout) - else: - iterator = pool.imap_unordered(getsp_partial, itr, chunksize) - for i, sp in iterator: - splist[i] = sp -# time.sleep(10) - pool.close() - pool.join() - + if parallel == 'imap_unordered': + splist = [None] * len(Gn) + pool = Pool(n_jobs) + itr = zip(Gn, range(0, len(Gn))) + if len(Gn) < 100 * n_jobs: + chunksize = int(len(Gn) / n_jobs) + 1 + else: + chunksize = 100 + # get shortest path graphs of Gn + if compute_method == 'trie': + getsp_partial = partial(wrapper_getSP_trie, weight, ds_attrs['is_directed']) + else: + getsp_partial = partial(wrapper_getSP_naive, weight, ds_attrs['is_directed']) + if verbose: + iterator = tqdm(pool.imap_unordered(getsp_partial, itr, chunksize), + desc='getting shortest paths', file=sys.stdout) + else: + iterator = pool.imap_unordered(getsp_partial, itr, chunksize) + for i, sp in iterator: + splist[i] = sp + # time.sleep(10) + pool.close() + pool.join() + # ---- direct running, normally use single CPU core. ---- + elif parallel == None: + splist = [] + if verbose: + iterator = tqdm(Gn, desc='getting sp graphs', file=sys.stdout) + else: + iterator = Gn + if compute_method == 'trie': + for g in iterator: + splist.append(get_sps_as_trie(g, weight, ds_attrs['is_directed'])) + else: + for g in iterator: + splist.append(get_shortest_paths(g, weight, ds_attrs['is_directed'])) # ss = 0 # ss += sys.getsizeof(splist) @@ -146,14 +161,7 @@ def structuralspkernel(*args, # time.sleep(20) -# # ---- direct running, normally use single CPU core. ---- -# splist = [] -# if compute_method == 'trie': -# for g in tqdm(Gn, desc='getting sp graphs', file=sys.stdout): -# splist.append(get_sps_as_trie(g, weight, ds_attrs['is_directed'])) -# else: -# for g in tqdm(Gn, desc='getting sp graphs', file=sys.stdout): -# splist.append(get_shortest_paths(g, weight, ds_attrs['is_directed'])) + # # ---- only for the Fast Computation of Shortest Path Kernel (FCSP) # sp_ml = [0] * len(Gn) # shortest path matrices @@ -174,22 +182,45 @@ def structuralspkernel(*args, # print(len(edge_w_g[0])) Kmatrix = np.zeros((len(Gn), len(Gn))) - - # ---- use pool.imap_unordered to parallel and track progress. ---- - def init_worker(spl_toshare, gs_toshare): - global G_spl, G_gs - G_spl = spl_toshare - G_gs = gs_toshare - if compute_method == 'trie': - do_partial = partial(wrapper_ssp_do_trie, ds_attrs, node_label, edge_label, - node_kernels, edge_kernels) - parallel_gm(do_partial, Kmatrix, Gn, init_worker=init_worker, - glbv=(splist, Gn), n_jobs=n_jobs, verbose=verbose) - else: - do_partial = partial(wrapper_ssp_do, ds_attrs, node_label, edge_label, - node_kernels, edge_kernels) - parallel_gm(do_partial, Kmatrix, Gn, init_worker=init_worker, - glbv=(splist, Gn), n_jobs=n_jobs, verbose=verbose) + + # ---- use pool.imap_unordered to parallel and track progress. ---- + if parallel == 'imap_unordered': + def init_worker(spl_toshare, gs_toshare): + global G_spl, G_gs + G_spl = spl_toshare + G_gs = gs_toshare + if compute_method == 'trie': + do_partial = partial(wrapper_ssp_do_trie, ds_attrs, node_label, edge_label, + node_kernels, edge_kernels) + parallel_gm(do_partial, Kmatrix, Gn, init_worker=init_worker, + glbv=(splist, Gn), n_jobs=n_jobs, verbose=verbose) + else: + do_partial = partial(wrapper_ssp_do, ds_attrs, node_label, edge_label, + node_kernels, edge_kernels) + parallel_gm(do_partial, Kmatrix, Gn, init_worker=init_worker, + glbv=(splist, Gn), n_jobs=n_jobs, verbose=verbose) + # ---- direct running, normally use single CPU core. ---- + elif parallel == None: + from itertools import combinations_with_replacement + itr = combinations_with_replacement(range(0, len(Gn)), 2) + if verbose: + iterator = tqdm(itr, desc='calculating kernels', file=sys.stdout) + else: + iterator = itr + if compute_method == 'trie': + for i, j in iterator: + kernel = ssp_do_trie(Gn[i], Gn[j], splist[i], splist[j], + ds_attrs, node_label, edge_label, node_kernels, edge_kernels) + Kmatrix[i][j] = kernel + Kmatrix[j][i] = kernel + else: + for i, j in iterator: + kernel = structuralspkernel_do(Gn[i], Gn[j], splist[i], splist[j], + ds_attrs, node_label, edge_label, node_kernels, edge_kernels) + # if(kernel > 1): + # print("error here ") + Kmatrix[i][j] = kernel + Kmatrix[j][i] = kernel # # ---- use pool.map to parallel. ---- # pool = Pool(n_jobs) @@ -229,23 +260,6 @@ def structuralspkernel(*args, # pool.join() -# # ---- direct running, normally use single CPU core. ---- -# from itertools import combinations_with_replacement -# itr = combinations_with_replacement(range(0, len(Gn)), 2) -# if compute_method == 'trie': -# for i, j in tqdm(itr, desc='calculating kernels', file=sys.stdout): -# kernel = ssp_do_trie(Gn[i], Gn[j], splist[i], splist[j], -# ds_attrs, node_label, edge_label, node_kernels, edge_kernels) -# Kmatrix[i][j] = kernel -# Kmatrix[j][i] = kernel -# else: -# for i, j in tqdm(itr, desc='calculating kernels', file=sys.stdout): -# kernel = structuralspkernel_do(Gn[i], Gn[j], splist[i], splist[j], -# ds_attrs, node_label, edge_label, node_kernels, edge_kernels) -# # if(kernel > 1): -# # print("error here ") -# Kmatrix[i][j] = kernel -# Kmatrix[j][i] = kernel run_time = time.time() - start_time if verbose: @@ -309,8 +323,13 @@ def structuralspkernel_do(g1, g2, spl1, spl2, ds_attrs, node_label, edge_label, for p1, p2 in product(spl1, spl2): if len(p1) == len(p2): kernel += 1 - - kernel = kernel / (len(spl1) * len(spl2)) # calculate mean average + try: + kernel = kernel / (len(spl1) * len(spl2)) # calculate mean average + except ZeroDivisionError: + print(spl1, spl2) + print(g1.nodes(data=True)) + print(g1.edges(data=True)) + raise Exception # # ---- exact implementation of the Fast Computation of Shortest Path Kernel (FCSP), reference [2], sadly it is slower than the current implementation # # compute vertex kernel matrix