Browse Source

Add parallel functions to MGE.

v0.2.x
jajupmochi 5 years ago
parent
commit
34e82bfce8
8 changed files with 1708 additions and 1538 deletions
  1. +217
    -65
      gklearn/ged/median/median_graph_estimator.py
  2. +4
    -4
      gklearn/ged/median/test_median_graph_estimator.py
  3. +2
    -0
      gklearn/ged/median/utils.py
  4. +2
    -0
      gklearn/ged/util/util.py
  5. +1468
    -1466
      gklearn/gedlib/gedlibpy.cpp
  6. BIN
      gklearn/gedlib/gedlibpy.cpython-36m-x86_64-linux-gnu.so
  7. +9
    -1
      gklearn/gedlib/gedlibpy.pyx
  8. +6
    -2
      gklearn/preimage/median_preimage_generator.py

+ 217
- 65
gklearn/ged/median/median_graph_estimator.py View File

@@ -13,6 +13,9 @@ import time
from tqdm import tqdm
import sys
import networkx as nx
import multiprocessing
from multiprocessing import Pool
from functools import partial


class MedianGraphEstimator(object): # @todo: differ dummy_node from undifined node?
@@ -47,6 +50,7 @@ class MedianGraphEstimator(object): # @todo: differ dummy_node from undifined no
self.__desired_num_random_inits = 10
self.__use_real_randomness = True
self.__seed = 0
self.__parallel = True
self.__update_order = True
self.__refine = True
self.__time_limit_in_sec = 0
@@ -125,6 +129,16 @@ class MedianGraphEstimator(object): # @todo: differ dummy_node from undifined no
else:
raise Exception('Invalid argument "' + opt_val + '" for option stdout. Usage: options = "[--stdout 0|1|2] [...]"')

elif opt_name == 'parallel':
if opt_val == 'TRUE':
self.__parallel = True
elif opt_val == 'FALSE':
self.__parallel = False
else:
raise Exception('Invalid argument "' + opt_val + '" for option parallel. Usage: options = "[--parallel TRUE|FALSE] [...]"')
elif opt_name == 'update-order':
if opt_val == 'TRUE':
@@ -312,7 +326,6 @@ class MedianGraphEstimator(object): # @todo: differ dummy_node from undifined no
# print(graphs[0].nodes(data=True))
# print(graphs[0].edges(data=True))
# print(nx.adjacency_matrix(graphs[0]))

# Construct initial medians.
medians = []
@@ -356,30 +369,12 @@ class MedianGraphEstimator(object): # @todo: differ dummy_node from undifined no
self.__ged_env.load_nx_graph(median, gen_median_id)
self.__ged_env.init(self.__ged_env.get_init_type())
# Print information about current iteration.
if self.__print_to_stdout == 2:
progress = tqdm(desc='Computing initial node maps', total=len(graph_ids), file=sys.stdout)
# Compute node maps and sum of distances for initial median.
self.__sum_of_distances = 0
self.__node_maps_from_median.clear()
for graph_id in graph_ids:
self.__ged_env.run_method(gen_median_id, graph_id)
self.__node_maps_from_median[graph_id] = self.__ged_env.get_node_map(gen_median_id, graph_id)
# print(self.__node_maps_from_median[graph_id])
self.__sum_of_distances += self.__node_maps_from_median[graph_id].induced_cost()
# print(self.__sum_of_distances)
# Print information about current iteration.
if self.__print_to_stdout == 2:
progress.update(1)
self.__compute_init_node_maps(graph_ids, gen_median_id)
self.__best_init_sum_of_distances = min(self.__best_init_sum_of_distances, self.__sum_of_distances)
self.__ged_env.load_nx_graph(median, set_median_id)
# print(self.__best_init_sum_of_distances)
# Print information about current iteration.
if self.__print_to_stdout == 2:
print('\n')
# Run block gradient descent from initial median.
converged = False
@@ -434,7 +429,7 @@ class MedianGraphEstimator(object): # @todo: differ dummy_node from undifined no
# print(self.__node_maps_from_median[graph_id].induced_cost())
# xxx = self.__node_maps_from_median[graph_id]
self.__ged_env.compute_induced_cost(gen_median_id, graph_id, self.__node_maps_from_median[graph_id])
# print('---------------------------------------')
# print('---------------------------------------')
# print(self.__node_maps_from_median[graph_id].induced_cost())
# @todo:!!!!!!!!!!!!!!!!!!!!!!!!!!!!This value is a slight different from the c++ program, which might be a bug! Use it very carefully!
@@ -637,6 +632,7 @@ class MedianGraphEstimator(object): # @todo: differ dummy_node from undifined no
self.__desired_num_random_inits = 10
self.__use_real_randomness = True
self.__seed = 0
self.__parallel = True
self.__update_order = True
self.__refine = True
self.__time_limit_in_sec = 0
@@ -682,35 +678,123 @@ class MedianGraphEstimator(object): # @todo: differ dummy_node from undifined no
def __compute_medoid(self, graph_ids, timer, initial_medians):
# Use method selected for initialization phase.
self.__ged_env.set_method(self.__init_method, self.__init_options)
# Print information about current iteration.
if self.__print_to_stdout == 2:
progress = tqdm(desc='Computing medoid', total=len(graph_ids), file=sys.stdout)
# Compute the medoid.
medoid_id = graph_ids[0]
best_sum_of_distances = np.inf
for g_id in graph_ids:
if timer.expired():
self.__state = AlgorithmState.CALLED
break
sum_of_distances = 0
for h_id in graph_ids:
self.__ged_env.run_method(g_id, h_id)
sum_of_distances += self.__ged_env.get_upper_bound(g_id, h_id)
if sum_of_distances < best_sum_of_distances:
best_sum_of_distances = sum_of_distances
medoid_id = g_id
if self.__parallel:
# @todo: notice when parallel self.__ged_env is not modified.
sum_of_distances_list = [np.inf] * len(graph_ids)
len_itr = len(graph_ids)
itr = zip(graph_ids, range(0, len(graph_ids)))
n_jobs = multiprocessing.cpu_count()
if len_itr < 100 * n_jobs:
chunksize = int(len_itr / n_jobs) + 1
else:
chunksize = 100
def init_worker(ged_env_toshare):
global G_ged_env
G_ged_env = ged_env_toshare
do_fun = partial(_compute_medoid_parallel, graph_ids)
pool = Pool(processes=n_jobs, initializer=init_worker, initargs=(self.__ged_env,))
if self.__print_to_stdout == 2:
iterator = tqdm(pool.imap_unordered(do_fun, itr, chunksize),
desc='Computing medoid', file=sys.stdout)
else:
iterator = pool.imap_unordered(do_fun, itr, chunksize)
for i, dis in iterator:
sum_of_distances_list[i] = dis
pool.close()
pool.join()
medoid_id = np.argmin(sum_of_distances_list)
best_sum_of_distances = sum_of_distances_list[medoid_id]
initial_medians.append(self.__ged_env.get_nx_graph(medoid_id, True, True, False)) # @todo

else:
# Print information about current iteration.
if self.__print_to_stdout == 2:
progress.update(1)
initial_medians.append(self.__ged_env.get_nx_graph(medoid_id, True, True, False)) # @todo
progress = tqdm(desc='Computing medoid', total=len(graph_ids), file=sys.stdout)
# Print information about current iteration.
if self.__print_to_stdout == 2:
print('\n')
medoid_id = graph_ids[0]
best_sum_of_distances = np.inf
for g_id in graph_ids:
if timer.expired():
self.__state = AlgorithmState.CALLED
break
sum_of_distances = 0
for h_id in graph_ids:
self.__ged_env.run_method(g_id, h_id)
sum_of_distances += self.__ged_env.get_upper_bound(g_id, h_id)
if sum_of_distances < best_sum_of_distances:
best_sum_of_distances = sum_of_distances
medoid_id = g_id
# Print information about current iteration.
if self.__print_to_stdout == 2:
progress.update(1)
initial_medians.append(self.__ged_env.get_nx_graph(medoid_id, True, True, False)) # @todo
# Print information about current iteration.
if self.__print_to_stdout == 2:
print('\n')
def __compute_init_node_maps(self, graph_ids, gen_median_id):
# Compute node maps and sum of distances for initial median.
if self.__parallel:
# @todo: notice when parallel self.__ged_env is not modified.
self.__sum_of_distances = 0
self.__node_maps_from_median.clear()
sum_of_distances_list = [0] * len(graph_ids)
len_itr = len(graph_ids)
itr = graph_ids
n_jobs = multiprocessing.cpu_count()
if len_itr < 100 * n_jobs:
chunksize = int(len_itr / n_jobs) + 1
else:
chunksize = 100
def init_worker(ged_env_toshare):
global G_ged_env
G_ged_env = ged_env_toshare
do_fun = partial(_compute_init_node_maps_parallel, gen_median_id)
pool = Pool(processes=n_jobs, initializer=init_worker, initargs=(self.__ged_env,))
if self.__print_to_stdout == 2:
iterator = tqdm(pool.imap_unordered(do_fun, itr, chunksize),
desc='Computing initial node maps', file=sys.stdout)
else:
iterator = pool.imap_unordered(do_fun, itr, chunksize)
for g_id, sod, node_maps in iterator:
sum_of_distances_list[g_id] = sod
self.__node_maps_from_median[g_id] = node_maps
pool.close()
pool.join()
self.__sum_of_distances = np.sum(sum_of_distances_list)
# xxx = self.__node_maps_from_median
else:
# Print information about current iteration.
if self.__print_to_stdout == 2:
progress = tqdm(desc='Computing initial node maps', total=len(graph_ids), file=sys.stdout)
self.__sum_of_distances = 0
self.__node_maps_from_median.clear()
for graph_id in graph_ids:
self.__ged_env.run_method(gen_median_id, graph_id)
self.__node_maps_from_median[graph_id] = self.__ged_env.get_node_map(gen_median_id, graph_id)
# print(self.__node_maps_from_median[graph_id])
self.__sum_of_distances += self.__node_maps_from_median[graph_id].induced_cost()
# print(self.__sum_of_distances)
# Print information about current iteration.
if self.__print_to_stdout == 2:
progress.update(1)
# Print information about current iteration.
if self.__print_to_stdout == 2:
print('\n')

def __termination_criterion_met(self, converged, timer, itr, itrs_without_update):
if timer.expired() or (itr >= self.__max_itrs if self.__max_itrs >= 0 else False):
@@ -816,26 +900,57 @@ class MedianGraphEstimator(object): # @todo: differ dummy_node from undifined no


def __update_node_maps(self):
# Print information about current iteration.
if self.__print_to_stdout == 2:
progress = tqdm(desc='Updating node maps', total=len(self.__node_maps_from_median), file=sys.stdout)
# Update the node maps.
node_maps_were_modified = False
for graph_id, node_map in self.__node_maps_from_median.items():
self.__ged_env.run_method(self.__median_id, graph_id)
if self.__ged_env.get_upper_bound(self.__median_id, graph_id) < node_map.induced_cost() - self.__epsilon:
# xxx = self.__node_maps_from_median[graph_id]
self.__node_maps_from_median[graph_id] = self.__ged_env.get_node_map(self.__median_id, graph_id)
# yyy = self.__node_maps_from_median[graph_id]
node_maps_were_modified = True
if self.__parallel:
# @todo: notice when parallel self.__ged_env is not modified.
node_maps_were_modified = False
# xxx = self.__node_maps_from_median.copy()
len_itr = len(self.__node_maps_from_median)
itr = [item for item in self.__node_maps_from_median.items()]
n_jobs = multiprocessing.cpu_count()
if len_itr < 100 * n_jobs:
chunksize = int(len_itr / n_jobs) + 1
else:
chunksize = 100
def init_worker(ged_env_toshare):
global G_ged_env
G_ged_env = ged_env_toshare
do_fun = partial(_update_node_maps_parallel, self.__median_id, self.__epsilon)
pool = Pool(processes=n_jobs, initializer=init_worker, initargs=(self.__ged_env,))
if self.__print_to_stdout == 2:
iterator = tqdm(pool.imap_unordered(do_fun, itr, chunksize),
desc='Updating node maps', file=sys.stdout)
else:
iterator = pool.imap_unordered(do_fun, itr, chunksize)
for g_id, node_map, nm_modified in iterator:
self.__node_maps_from_median[g_id] = node_map
if nm_modified:
node_maps_were_modified = True
pool.close()
pool.join()
# yyy = self.__node_maps_from_median.copy()

else:
# Print information about current iteration.
if self.__print_to_stdout == 2:
progress.update(1)
# Print information about current iteration.
if self.__print_to_stdout == 2:
print('\n')
progress = tqdm(desc='Updating node maps', total=len(self.__node_maps_from_median), file=sys.stdout)
node_maps_were_modified = False
for graph_id, node_map in self.__node_maps_from_median.items():
self.__ged_env.run_method(self.__median_id, graph_id)
if self.__ged_env.get_upper_bound(self.__median_id, graph_id) < node_map.induced_cost() - self.__epsilon:
# xxx = self.__node_maps_from_median[graph_id]
self.__node_maps_from_median[graph_id] = self.__ged_env.get_node_map(self.__median_id, graph_id)
# yyy = self.__node_maps_from_median[graph_id]
node_maps_were_modified = True
# Print information about current iteration.
if self.__print_to_stdout == 2:
progress.update(1)
# Print information about current iteration.
if self.__print_to_stdout == 2:
print('\n')
# Return true if the node maps were modified.
return node_maps_were_modified
@@ -1230,7 +1345,8 @@ class MedianGraphEstimator(object): # @todo: differ dummy_node from undifined no
def __add_node_to_median(self, best_config, best_label, median):
# Update the median.
median.add_node(nx.number_of_nodes(median), **best_label)
nb_nodes_median = nx.number_of_nodes(median)
median.add_node(nb_nodes_median, **best_label)
# Update the node maps.
for graph_id, node_map in self.__node_maps_from_median.items():
@@ -1494,4 +1610,40 @@ class MedianGraphEstimator(object): # @todo: differ dummy_node from undifined no
# median_label = {}
# for key, val in median.items():
# median_label[key] = str(val)
# return median_label
# return median_label


def _compute_medoid_parallel(graph_ids, itr):
g_id = itr[0]
i = itr[1]
# @todo: timer not considered here.
# if timer.expired():
# self.__state = AlgorithmState.CALLED
# break
sum_of_distances = 0
for h_id in graph_ids:
G_ged_env.run_method(g_id, h_id)
sum_of_distances += G_ged_env.get_upper_bound(g_id, h_id)
return i, sum_of_distances


def _compute_init_node_maps_parallel(gen_median_id, itr):
graph_id = itr
G_ged_env.run_method(gen_median_id, graph_id)
node_maps_from_median = G_ged_env.get_node_map(gen_median_id, graph_id)
# print(self.__node_maps_from_median[graph_id])
sum_of_distance = node_maps_from_median.induced_cost()
# print(self.__sum_of_distances)
return graph_id, sum_of_distance, node_maps_from_median


def _update_node_maps_parallel(median_id, epsilon, itr):
graph_id = itr[0]
node_map = itr[1]

node_maps_were_modified = False
G_ged_env.run_method(median_id, graph_id)
if G_ged_env.get_upper_bound(median_id, graph_id) < node_map.induced_cost() - epsilon:
node_map = G_ged_env.get_node_map(median_id, graph_id)
node_maps_were_modified = True
return graph_id, node_map, node_maps_were_modified

+ 4
- 4
gklearn/ged/median/test_median_graph_estimator.py View File

@@ -53,7 +53,7 @@ def test_median_graph_estimator():
mge.set_refine_method(algo, '--threads ' + str(threads) + ' --initial-solutions ' + str(initial_solutions) + ' --ratio-runs-from-initial-solutions 1')
mge_options = '--time-limit ' + str(time_limit) + ' --stdout 2 --init-type ' + init_type
mge_options += ' --random-inits ' + str(num_inits) + ' --seed ' + '1' + ' --update-order TRUE --refine FALSE --randomness PSEUDO '# @todo: std::to_string(rng())
mge_options += ' --random-inits ' + str(num_inits) + ' --seed ' + '1' + ' --update-order TRUE --refine FALSE --randomness PSEUDO --parallel TRUE '# @todo: std::to_string(rng())
# Select the GED algorithm.
algo_options = '--threads ' + str(threads) + algo_options_suffix
@@ -127,7 +127,7 @@ def test_median_graph_estimator_symb():
mge.set_refine_method(algo, '--threads ' + str(threads) + ' --initial-solutions ' + str(initial_solutions) + ' --ratio-runs-from-initial-solutions 1')
mge_options = '--time-limit ' + str(time_limit) + ' --stdout 2 --init-type ' + init_type
mge_options += ' --random-inits ' + str(num_inits) + ' --seed ' + '1' + ' --update-order TRUE --refine FALSE'# @todo: std::to_string(rng())
mge_options += ' --random-inits ' + str(num_inits) + ' --seed ' + '1' + ' --update-order TRUE --refine FALSE --randomness PSEUDO --parallel TRUE '# @todo: std::to_string(rng())
# Select the GED algorithm.
algo_options = '--threads ' + str(threads) + algo_options_suffix
@@ -155,5 +155,5 @@ def test_median_graph_estimator_symb():


if __name__ == '__main__':
set_median, gen_median = test_median_graph_estimator()
# set_median, gen_median = test_median_graph_estimator_symb()
# set_median, gen_median = test_median_graph_estimator()
set_median, gen_median = test_median_graph_estimator_symb()

+ 2
- 0
gklearn/ged/median/utils.py View File

@@ -30,6 +30,8 @@ def mge_options_to_string(options):
opt_str += '--randomness ' + str(val) + ' '
elif key == 'verbose':
opt_str += '--stdout ' + str(val) + ' '
elif key == 'parallel':
opt_str += '--parallel ' + ('TRUE' if val else 'FALSE') + ' '
elif key == 'update_order':
opt_str += '--update-order ' + ('TRUE' if val else 'FALSE') + ' '
elif key == 'refine':


+ 2
- 0
gklearn/ged/util/util.py View File

@@ -54,6 +54,8 @@ def compute_geds(graphs, options={}, parallel=False):
ged_env.add_nx_graph(g, '')
listID = ged_env.get_all_graph_ids()
ged_env.init()
if parallel:
options['threads'] = 1
ged_env.set_method(options['method'], ged_options_to_string(options))
ged_env.init_method()



+ 1468
- 1466
gklearn/gedlib/gedlibpy.cpp
File diff suppressed because it is too large
View File


BIN
gklearn/gedlib/gedlibpy.cpython-36m-x86_64-linux-gnu.so View File


+ 9
- 1
gklearn/gedlib/gedlibpy.pyx View File

@@ -112,6 +112,7 @@ cdef extern from "src/GedLibBind.hpp" namespace "pyged":
##CYTHON WRAPPER INTERFACES##
#############################

# import cython
import numpy as np
import networkx as nx
from gklearn.ged.env import NodeMap
@@ -177,14 +178,16 @@ def get_dummy_node() :
return getDummyNode()

# @cython.auto_pickle(True)
cdef class GEDEnv:
"""Cython wrapper class for C++ class PyGEDEnv
"""
# cdef PyGEDEnv c_env # Hold a C++ instance which we're wrapping
# cdef PyGEDEnv c_env # Hold a C++ instance which we're wrapping
cdef PyGEDEnv* c_env # hold a pointer to the C++ instance which we're wrapping


def __cinit__(self):
# self.c_env = PyGEDEnv()
self.c_env = new PyGEDEnv()
@@ -192,6 +195,11 @@ cdef class GEDEnv:
del self.c_env


# def __reduce__(self):
# # return GEDEnv, (self.c_env,)
# return GEDEnv, tuple()


def is_initialized(self) :
"""
Checks and returns if the computation environment is initialized or not.


+ 6
- 2
gklearn/preimage/median_preimage_generator.py View File

@@ -669,6 +669,7 @@ class MedianPreimageGenerator(PreimageGenerator):
options = self.__mge_options.copy()
if not 'seed' in options:
options['seed'] = int(round(time.time() * 1000)) # @todo: may not work correctly for possible parallel usage.
options['parallel'] = self.__parallel
# Select the GED algorithm.
self.__mge.set_options(mge_options_to_string(options))
@@ -676,8 +677,11 @@ class MedianPreimageGenerator(PreimageGenerator):
edge_labels=self._dataset.edge_labels,
node_attrs=self._dataset.node_attrs,
edge_attrs=self._dataset.edge_attrs)
self.__mge.set_init_method(self.__ged_options['method'], ged_options_to_string(self.__ged_options))
self.__mge.set_descent_method(self.__ged_options['method'], ged_options_to_string(self.__ged_options))
ged_options = self.__ged_options.copy()
if self.__parallel:
ged_options['threads'] = 1
self.__mge.set_init_method(self.__ged_options['method'], ged_options_to_string(ged_options))
self.__mge.set_descent_method(self.__ged_options['method'], ged_options_to_string(ged_options))
# Run the estimator.
self.__mge.run(graph_ids, set_median_id, gen_median_id)


Loading…
Cancel
Save