Browse Source

add parallel switch for structural sp kernel.

v0.1
jajupmochi 5 years ago
parent
commit
461de387b6
1 changed files with 85 additions and 66 deletions
  1. +85
    -66
      pygraph/kernels/structuralspKernel.py

+ 85
- 66
pygraph/kernels/structuralspKernel.py View File

@@ -32,6 +32,8 @@ def structuralspkernel(*args,
node_kernels=None,
edge_kernels=None,
compute_method='naive',
# parallel='imap_unordered',
parallel=None,
n_jobs=None,
verbose=True):
"""Calculate mean average structural shortest path kernels between graphs.
@@ -112,29 +114,42 @@ def structuralspkernel(*args,
start_time = time.time()

# get shortest paths of each graph in Gn
splist = [None] * len(Gn)
pool = Pool(n_jobs)
itr = zip(Gn, range(0, len(Gn)))
if len(Gn) < 100 * n_jobs:
chunksize = int(len(Gn) / n_jobs) + 1
else:
chunksize = 100
# get shortest path graphs of Gn
if compute_method == 'trie':
getsp_partial = partial(wrapper_getSP_trie, weight, ds_attrs['is_directed'])
else:
getsp_partial = partial(wrapper_getSP_naive, weight, ds_attrs['is_directed'])
if verbose:
iterator = tqdm(pool.imap_unordered(getsp_partial, itr, chunksize),
desc='getting shortest paths', file=sys.stdout)
else:
iterator = pool.imap_unordered(getsp_partial, itr, chunksize)
for i, sp in iterator:
splist[i] = sp
# time.sleep(10)
pool.close()
pool.join()
if parallel == 'imap_unordered':
splist = [None] * len(Gn)
pool = Pool(n_jobs)
itr = zip(Gn, range(0, len(Gn)))
if len(Gn) < 100 * n_jobs:
chunksize = int(len(Gn) / n_jobs) + 1
else:
chunksize = 100
# get shortest path graphs of Gn
if compute_method == 'trie':
getsp_partial = partial(wrapper_getSP_trie, weight, ds_attrs['is_directed'])
else:
getsp_partial = partial(wrapper_getSP_naive, weight, ds_attrs['is_directed'])
if verbose:
iterator = tqdm(pool.imap_unordered(getsp_partial, itr, chunksize),
desc='getting shortest paths', file=sys.stdout)
else:
iterator = pool.imap_unordered(getsp_partial, itr, chunksize)
for i, sp in iterator:
splist[i] = sp
# time.sleep(10)
pool.close()
pool.join()
# ---- direct running, normally use single CPU core. ----
elif parallel == None:
splist = []
if verbose:
iterator = tqdm(Gn, desc='getting sp graphs', file=sys.stdout)
else:
iterator = Gn
if compute_method == 'trie':
for g in iterator:
splist.append(get_sps_as_trie(g, weight, ds_attrs['is_directed']))
else:
for g in iterator:
splist.append(get_shortest_paths(g, weight, ds_attrs['is_directed']))
# ss = 0
# ss += sys.getsizeof(splist)
@@ -146,14 +161,7 @@ def structuralspkernel(*args,
# time.sleep(20)
# # ---- direct running, normally use single CPU core. ----
# splist = []
# if compute_method == 'trie':
# for g in tqdm(Gn, desc='getting sp graphs', file=sys.stdout):
# splist.append(get_sps_as_trie(g, weight, ds_attrs['is_directed']))
# else:
# for g in tqdm(Gn, desc='getting sp graphs', file=sys.stdout):
# splist.append(get_shortest_paths(g, weight, ds_attrs['is_directed']))


# # ---- only for the Fast Computation of Shortest Path Kernel (FCSP)
# sp_ml = [0] * len(Gn) # shortest path matrices
@@ -174,22 +182,45 @@ def structuralspkernel(*args,
# print(len(edge_w_g[0]))

Kmatrix = np.zeros((len(Gn), len(Gn)))
# ---- use pool.imap_unordered to parallel and track progress. ----
def init_worker(spl_toshare, gs_toshare):
global G_spl, G_gs
G_spl = spl_toshare
G_gs = gs_toshare
if compute_method == 'trie':
do_partial = partial(wrapper_ssp_do_trie, ds_attrs, node_label, edge_label,
node_kernels, edge_kernels)
parallel_gm(do_partial, Kmatrix, Gn, init_worker=init_worker,
glbv=(splist, Gn), n_jobs=n_jobs, verbose=verbose)
else:
do_partial = partial(wrapper_ssp_do, ds_attrs, node_label, edge_label,
node_kernels, edge_kernels)
parallel_gm(do_partial, Kmatrix, Gn, init_worker=init_worker,
glbv=(splist, Gn), n_jobs=n_jobs, verbose=verbose)

# ---- use pool.imap_unordered to parallel and track progress. ----
if parallel == 'imap_unordered':
def init_worker(spl_toshare, gs_toshare):
global G_spl, G_gs
G_spl = spl_toshare
G_gs = gs_toshare
if compute_method == 'trie':
do_partial = partial(wrapper_ssp_do_trie, ds_attrs, node_label, edge_label,
node_kernels, edge_kernels)
parallel_gm(do_partial, Kmatrix, Gn, init_worker=init_worker,
glbv=(splist, Gn), n_jobs=n_jobs, verbose=verbose)
else:
do_partial = partial(wrapper_ssp_do, ds_attrs, node_label, edge_label,
node_kernels, edge_kernels)
parallel_gm(do_partial, Kmatrix, Gn, init_worker=init_worker,
glbv=(splist, Gn), n_jobs=n_jobs, verbose=verbose)
# ---- direct running, normally use single CPU core. ----
elif parallel == None:
from itertools import combinations_with_replacement
itr = combinations_with_replacement(range(0, len(Gn)), 2)
if verbose:
iterator = tqdm(itr, desc='calculating kernels', file=sys.stdout)
else:
iterator = itr
if compute_method == 'trie':
for i, j in iterator:
kernel = ssp_do_trie(Gn[i], Gn[j], splist[i], splist[j],
ds_attrs, node_label, edge_label, node_kernels, edge_kernels)
Kmatrix[i][j] = kernel
Kmatrix[j][i] = kernel
else:
for i, j in iterator:
kernel = structuralspkernel_do(Gn[i], Gn[j], splist[i], splist[j],
ds_attrs, node_label, edge_label, node_kernels, edge_kernels)
# if(kernel > 1):
# print("error here ")
Kmatrix[i][j] = kernel
Kmatrix[j][i] = kernel
# # ---- use pool.map to parallel. ----
# pool = Pool(n_jobs)
@@ -229,23 +260,6 @@ def structuralspkernel(*args,
# pool.join()


# # ---- direct running, normally use single CPU core. ----
# from itertools import combinations_with_replacement
# itr = combinations_with_replacement(range(0, len(Gn)), 2)
# if compute_method == 'trie':
# for i, j in tqdm(itr, desc='calculating kernels', file=sys.stdout):
# kernel = ssp_do_trie(Gn[i], Gn[j], splist[i], splist[j],
# ds_attrs, node_label, edge_label, node_kernels, edge_kernels)
# Kmatrix[i][j] = kernel
# Kmatrix[j][i] = kernel
# else:
# for i, j in tqdm(itr, desc='calculating kernels', file=sys.stdout):
# kernel = structuralspkernel_do(Gn[i], Gn[j], splist[i], splist[j],
# ds_attrs, node_label, edge_label, node_kernels, edge_kernels)
# # if(kernel > 1):
# # print("error here ")
# Kmatrix[i][j] = kernel
# Kmatrix[j][i] = kernel

run_time = time.time() - start_time
if verbose:
@@ -309,8 +323,13 @@ def structuralspkernel_do(g1, g2, spl1, spl2, ds_attrs, node_label, edge_label,
for p1, p2 in product(spl1, spl2):
if len(p1) == len(p2):
kernel += 1

kernel = kernel / (len(spl1) * len(spl2)) # calculate mean average
try:
kernel = kernel / (len(spl1) * len(spl2)) # calculate mean average
except ZeroDivisionError:
print(spl1, spl2)
print(g1.nodes(data=True))
print(g1.edges(data=True))
raise Exception

# # ---- exact implementation of the Fast Computation of Shortest Path Kernel (FCSP), reference [2], sadly it is slower than the current implementation
# # compute vertex kernel matrix


Loading…
Cancel
Save