From 980dd1c9cf857f2808c69c2a6383abc1da8e48f8 Mon Sep 17 00:00:00 2001 From: jajupmochi Date: Tue, 2 Jun 2020 11:34:57 +0200 Subject: [PATCH] Add parallel to RPG. --- gklearn/preimage/random_preimage_generator.py | 205 +++++++++++++++++++------- 1 file changed, 151 insertions(+), 54 deletions(-) diff --git a/gklearn/preimage/random_preimage_generator.py b/gklearn/preimage/random_preimage_generator.py index b2da2b2..f3dfb48 100644 --- a/gklearn/preimage/random_preimage_generator.py +++ b/gklearn/preimage/random_preimage_generator.py @@ -10,9 +10,11 @@ import numpy as np import time import random import sys -import tqdm +from tqdm import tqdm import multiprocessing import networkx as nx +from multiprocessing import Pool +from functools import partial from gklearn.preimage import PreimageGenerator from gklearn.preimage.utils import compute_k_dis from gklearn.utils import Timer @@ -144,12 +146,14 @@ class RandomPreimageGenerator(PreimageGenerator): dihat_list = [] r = 0 dis_of_each_itr = [dhat] + if self.__parallel: + self._kernel_options['parallel'] = None while r < self.__r_max: print('\n- r =', r) found = False dis_bests = dis_gs + dihat_list - # compute numbers of nodes to be inserted/deleted. + # compute numbers of edges to be inserted/deleted. # @todo what if the log is negetive? how to choose alpha (scalar)? fdgs_list = np.array(dis_bests) if np.min(fdgs_list) < 1: @@ -161,54 +165,7 @@ class RandomPreimageGenerator(PreimageGenerator): for ig, gs in enumerate(Gs_nearest + gihat_list): if self._verbose >= 2: print('-- computing', ig + 1, 'graphs out of', len(Gs_nearest) + len(gihat_list)) - for trail in range(0, self.__l): - if self._verbose >= 2: - print('---', trail + 1, 'trail out of', self.__l) - - # add and delete edges. - gtemp = gs.copy() - np.random.seed() # @todo: may not work for possible parallel. - # which edges to change. - # @todo: should we use just half of the adjacency matrix for undirected graphs? - nb_vpairs = nx.number_of_nodes(gs) * (nx.number_of_nodes(gs) - 1) - # @todo: what if fdgs is bigger than nb_vpairs? - idx_change = random.sample(range(nb_vpairs), fdgs_list[ig] if - fdgs_list[ig] < nb_vpairs else nb_vpairs) - for item in idx_change: - node1 = int(item / (nx.number_of_nodes(gs) - 1)) - node2 = (item - node1 * (nx.number_of_nodes(gs) - 1)) - if node2 >= node1: # skip the self pair. - node2 += 1 - # @todo: is the randomness correct? - if not gtemp.has_edge(node1, node2): - gtemp.add_edge(node1, node2) - else: - gtemp.remove_edge(node1, node2) - - # compute new distances. - kernels_to_gtmp, _ = self._graph_kernel.compute(gtemp, D_N, **self._kernel_options) - kernel_gtmp, _ = self._graph_kernel.compute(gtemp, gtemp, **self._kernel_options) - kernels_to_gtmp = [kernels_to_gtmp[i] / np.sqrt(self.__gram_matrix_unnorm[i, i] * kernel_gtmp) for i in range(len(kernels_to_gtmp))] # normalize - # @todo: not correct kernel value - gram_with_gtmp = np.concatenate((np.array([kernels_to_gtmp]), np.copy(self._graph_kernel.gram_matrix)), axis=0) - gram_with_gtmp = np.concatenate((np.array([[1] + kernels_to_gtmp]).T, gram_with_gtmp), axis=1) - dnew = compute_k_dis(0, range(1, 1 + len(D_N)), self.__alphas, gram_with_gtmp, term3=term3, withterm3=True) - - # get the better graph preimage. - if dnew <= dhat: # @todo: the new distance is smaller or also equal? - if dnew < dhat: - if self._verbose >= 2: - print('trail =', str(trail)) - print('\nI am smaller!') - print('index (as in D_k U {gihat} =', str(ig)) - print('distance:', dhat, '->', dnew) - self.__num_updates += 1 - elif dnew == dhat: - if self._verbose >= 2: - print('I am equal!') - dhat = dnew - gnew = gtemp.copy() - found = True # found better graph. + gnew, dhat, found = self.__generate_l_graphs(gs, fdgs_list[ig], dhat, ig, found, term3) if found: r = 0 @@ -220,10 +177,9 @@ class RandomPreimageGenerator(PreimageGenerator): dis_of_each_itr.append(dhat) self.__itrs += 1 if self._verbose >= 2: - print('Total number of iterations is', self.__itrs) + print('Total number of iterations is', self.__itrs, '.') print('The preimage is updated', self.__num_updates, 'times.') - print('The shortest distances for previous iterations are', dis_of_each_itr) - + print('The shortest distances for previous iterations are', dis_of_each_itr, '.') # get results and print. @@ -245,8 +201,149 @@ class RandomPreimageGenerator(PreimageGenerator): print('Time to generate pre-images:', self.__runtime_generate_preimage) print('Total time:', self.__runtime_total) print('=============================================================================') - print() + print() + + + def __generate_l_graphs(self, g_init, fdgs, dhat, ig, found, term3): + if self.__parallel: + gnew, dhat, found = self.__generate_l_graphs_parallel(g_init, fdgs, dhat, ig, found, term3) + else: + gnew, dhat, found = self.__generate_l_graphs_series(g_init, fdgs, dhat, ig, found, term3) + return gnew, dhat, found + + + def __generate_l_graphs_series(self, g_init, fdgs, dhat, ig, found, term3): + gnew = None + for trail in range(0, self.__l): + if self._verbose >= 2: + print('---', trail + 1, 'trail out of', self.__l) + + # add and delete edges. + gtemp = g_init.copy() + np.random.seed() # @todo: may not work for possible parallel. + # which edges to change. + # @todo: should we use just half of the adjacency matrix for undirected graphs? + nb_vpairs = nx.number_of_nodes(g_init) * (nx.number_of_nodes(g_init) - 1) + # @todo: what if fdgs is bigger than nb_vpairs? + idx_change = random.sample(range(nb_vpairs), fdgs if + fdgs < nb_vpairs else nb_vpairs) + for item in idx_change: + node1 = int(item / (nx.number_of_nodes(g_init) - 1)) + node2 = (item - node1 * (nx.number_of_nodes(g_init) - 1)) + if node2 >= node1: # skip the self pair. + node2 += 1 + # @todo: is the randomness correct? + if not gtemp.has_edge(node1, node2): + gtemp.add_edge(node1, node2) + else: + gtemp.remove_edge(node1, node2) + + # compute new distances. + kernels_to_gtmp, _ = self._graph_kernel.compute(gtemp, self._dataset.graphs, **self._kernel_options) + kernel_gtmp, _ = self._graph_kernel.compute(gtemp, gtemp, **self._kernel_options) + kernels_to_gtmp = [kernels_to_gtmp[i] / np.sqrt(self.__gram_matrix_unnorm[i, i] * kernel_gtmp) for i in range(len(kernels_to_gtmp))] # normalize + # @todo: not correct kernel value + gram_with_gtmp = np.concatenate((np.array([kernels_to_gtmp]), np.copy(self._graph_kernel.gram_matrix)), axis=0) + gram_with_gtmp = np.concatenate((np.array([[1] + kernels_to_gtmp]).T, gram_with_gtmp), axis=1) + dnew = compute_k_dis(0, range(1, 1 + len(self._dataset.graphs)), self.__alphas, gram_with_gtmp, term3=term3, withterm3=True) + + # get the better graph preimage. + if dnew <= dhat: # @todo: the new distance is smaller or also equal? + if dnew < dhat: + if self._verbose >= 2: + print('trail =', str(trail)) + print('\nI am smaller!') + print('index (as in D_k U {gihat} =', str(ig)) + print('distance:', dhat, '->', dnew) + self.__num_updates += 1 + elif dnew == dhat: + if self._verbose >= 2: + print('I am equal!') + dhat = dnew + gnew = gtemp.copy() + found = True # found better graph. + + return gnew, dhat, found + + + def __generate_l_graphs_parallel(self, g_init, fdgs, dhat, ig, found, term3): + gnew = None + len_itr = self.__l + gnew_list = [None] * len_itr + dnew_list = [None] * len_itr + itr = range(0, len_itr) + n_jobs = multiprocessing.cpu_count() + if len_itr < 100 * n_jobs: + chunksize = int(len_itr / n_jobs) + 1 + else: + chunksize = 100 + do_fun = partial(self._generate_graph_parallel, g_init, fdgs, term3) + pool = Pool(processes=n_jobs) + if self._verbose >= 2: + iterator = tqdm(pool.imap_unordered(do_fun, itr, chunksize), + desc='Generating l graphs', file=sys.stdout) + else: + iterator = pool.imap_unordered(do_fun, itr, chunksize) + for idx, gnew, dnew in iterator: + gnew_list[idx] = gnew + dnew_list[idx] = dnew + pool.close() + pool.join() + + # check if get the better graph preimage. + idx_min = np.argmin(dnew_list) + dnew = dnew_list[idx_min] + if dnew <= dhat: # @todo: the new distance is smaller or also equal? + if dnew < dhat: + if self._verbose >= 2: + print('\nI am smaller!') + print('index (as in D_k U {gihat} =', str(ig)) + print('distance:', dhat, '->', dnew) + self.__num_updates += 1 + elif dnew == dhat: + if self._verbose >= 2: + print('I am equal!') + dhat = dnew + gnew = gnew_list[idx_min] + found = True # found better graph. + + return gnew, dhat, found + + + def _generate_graph_parallel(self, g_init, fdgs, term3, itr): + trail = itr + # add and delete edges. + gtemp = g_init.copy() + np.random.seed() # @todo: may not work for possible parallel. + # which edges to change. + # @todo: should we use just half of the adjacency matrix for undirected graphs? + nb_vpairs = nx.number_of_nodes(g_init) * (nx.number_of_nodes(g_init) - 1) + # @todo: what if fdgs is bigger than nb_vpairs? + idx_change = random.sample(range(nb_vpairs), fdgs if + fdgs < nb_vpairs else nb_vpairs) + for item in idx_change: + node1 = int(item / (nx.number_of_nodes(g_init) - 1)) + node2 = (item - node1 * (nx.number_of_nodes(g_init) - 1)) + if node2 >= node1: # skip the self pair. + node2 += 1 + # @todo: is the randomness correct? + if not gtemp.has_edge(node1, node2): + gtemp.add_edge(node1, node2) + else: + gtemp.remove_edge(node1, node2) + + # compute new distances. + kernels_to_gtmp, _ = self._graph_kernel.compute(gtemp, self._dataset.graphs, **self._kernel_options) + kernel_gtmp, _ = self._graph_kernel.compute(gtemp, gtemp, **self._kernel_options) + kernels_to_gtmp = [kernels_to_gtmp[i] / np.sqrt(self.__gram_matrix_unnorm[i, i] * kernel_gtmp) for i in range(len(kernels_to_gtmp))] # normalize + # @todo: not correct kernel value + gram_with_gtmp = np.concatenate((np.array([kernels_to_gtmp]), np.copy(self._graph_kernel.gram_matrix)), axis=0) + gram_with_gtmp = np.concatenate((np.array([[1] + kernels_to_gtmp]).T, gram_with_gtmp), axis=1) + dnew = compute_k_dis(0, range(1, 1 + len(self._dataset.graphs)), self.__alphas, gram_with_gtmp, term3=term3, withterm3=True) + + return trail, gtemp, dnew + def get_results(self): results = {}