diff --git a/lang/fr/gklearn/kernels/path_up_to_h.py b/lang/fr/gklearn/kernels/path_up_to_h.py new file mode 100644 index 0000000..1c8b5e2 --- /dev/null +++ b/lang/fr/gklearn/kernels/path_up_to_h.py @@ -0,0 +1,603 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Fri Apr 10 18:33:13 2020 + +@author: ljia + +@references: + + [1] Liva Ralaivola, Sanjay J Swamidass, Hiroto Saigo, and Pierre + Baldi. Graph kernels for chemical informatics. Neural networks, + 18(8):1093–1110, 2005. +""" +import sys +from multiprocessing import Pool +from tqdm import tqdm +import numpy as np +import networkx as nx +from collections import Counter +from functools import partial +from gklearn.utils import SpecialLabel +from gklearn.utils.parallel import parallel_gm, parallel_me +from gklearn.kernels import GraphKernel +from gklearn.utils import Trie + + +class PathUpToH(GraphKernel): # @todo: add function for k_func == None + + def __init__(self, **kwargs): + GraphKernel.__init__(self) + self.__node_labels = kwargs.get('node_labels', []) + self.__edge_labels = kwargs.get('edge_labels', []) + self.__depth = int(kwargs.get('depth', 10)) + self.__k_func = kwargs.get('k_func', 'MinMax') + self.__compute_method = kwargs.get('compute_method', 'trie') + self.__ds_infos = kwargs.get('ds_infos', {}) + + + def _compute_gm_series(self): + self.__add_dummy_labels(self._graphs) + + from itertools import combinations_with_replacement + itr_kernel = combinations_with_replacement(range(0, len(self._graphs)), 2) + if self._verbose >= 2: + iterator_ps = tqdm(range(0, len(self._graphs)), desc='getting paths', file=sys.stdout) + iterator_kernel = tqdm(itr_kernel, desc='calculating kernels', file=sys.stdout) + else: + iterator_ps = range(0, len(self._graphs)) + iterator_kernel = itr_kernel + + gram_matrix = np.zeros((len(self._graphs), len(self._graphs))) + + if self.__compute_method == 'trie': + all_paths = [self.__find_all_path_as_trie(self._graphs[i]) for i in iterator_ps] + for i, j in iterator_kernel: + kernel = self.__kernel_do_trie(all_paths[i], all_paths[j]) + gram_matrix[i][j] = kernel + gram_matrix[j][i] = kernel + else: + all_paths = [self.__find_all_paths_until_length(self._graphs[i]) for i in iterator_ps] + for i, j in iterator_kernel: + kernel = self.__kernel_do_naive(all_paths[i], all_paths[j]) + gram_matrix[i][j] = kernel + gram_matrix[j][i] = kernel + + return gram_matrix + + + def _compute_gm_imap_unordered(self): + self.__add_dummy_labels(self._graphs) + + # get all paths of all graphs before calculating kernels to save time, + # but this may cost a lot of memory for large datasets. + pool = Pool(self._n_jobs) + itr = zip(self._graphs, range(0, len(self._graphs))) + if len(self._graphs) < 100 * self._n_jobs: + chunksize = int(len(self._graphs) / self._n_jobs) + 1 + else: + chunksize = 100 + all_paths = [[] for _ in range(len(self._graphs))] + if self.__compute_method == 'trie' and self.__k_func is not None: + get_ps_fun = self._wrapper_find_all_path_as_trie + elif self.__compute_method != 'trie' and self.__k_func is not None: + get_ps_fun = partial(self._wrapper_find_all_paths_until_length, True) + else: + get_ps_fun = partial(self._wrapper_find_all_paths_until_length, False) + if self._verbose >= 2: + iterator = tqdm(pool.imap_unordered(get_ps_fun, itr, chunksize), + desc='getting paths', file=sys.stdout) + else: + iterator = pool.imap_unordered(get_ps_fun, itr, chunksize) + for i, ps in iterator: + all_paths[i] = ps + pool.close() + pool.join() + + # compute Gram matrix. + gram_matrix = np.zeros((len(self._graphs), len(self._graphs))) + + if self.__compute_method == 'trie' and self.__k_func is not None: + def init_worker(trie_toshare): + global G_trie + G_trie = trie_toshare + do_fun = self._wrapper_kernel_do_trie + elif self.__compute_method != 'trie' and self.__k_func is not None: + def init_worker(plist_toshare): + global G_plist + G_plist = plist_toshare + do_fun = self._wrapper_kernel_do_naive + else: + def init_worker(plist_toshare): + global G_plist + G_plist = plist_toshare + do_fun = self.__wrapper_kernel_do_kernelless # @todo: what is this? + parallel_gm(do_fun, gram_matrix, self._graphs, init_worker=init_worker, + glbv=(all_paths,), n_jobs=self._n_jobs, verbose=self._verbose) + + return gram_matrix + + + def _compute_kernel_list_series(self, g1, g_list): + self.__add_dummy_labels(g_list + [g1]) + + if self._verbose >= 2: + iterator_ps = tqdm(g_list, desc='getting paths', file=sys.stdout) + iterator_kernel = tqdm(range(len(g_list)), desc='calculating kernels', file=sys.stdout) + else: + iterator_ps = g_list + iterator_kernel = range(len(g_list)) + + kernel_list = [None] * len(g_list) + + if self.__compute_method == 'trie': + paths_g1 = self.__find_all_path_as_trie(g1) + paths_g_list = [self.__find_all_path_as_trie(g) for g in iterator_ps] + for i in iterator_kernel: + kernel = self.__kernel_do_trie(paths_g1, paths_g_list[i]) + kernel_list[i] = kernel + else: + paths_g1 = self.__find_all_paths_until_length(g1) + paths_g_list = [self.__find_all_paths_until_length(g) for g in iterator_ps] + for i in iterator_kernel: + kernel = self.__kernel_do_naive(paths_g1, paths_g_list[i]) + kernel_list[i] = kernel + + return kernel_list + + + def _compute_kernel_list_imap_unordered(self, g1, g_list): + self.__add_dummy_labels(g_list + [g1]) + + # get all paths of all graphs before calculating kernels to save time, + # but this may cost a lot of memory for large datasets. + pool = Pool(self._n_jobs) + itr = zip(g_list, range(0, len(g_list))) + if len(g_list) < 100 * self._n_jobs: + chunksize = int(len(g_list) / self._n_jobs) + 1 + else: + chunksize = 100 + paths_g_list = [[] for _ in range(len(g_list))] + if self.__compute_method == 'trie' and self.__k_func is not None: + paths_g1 = self.__find_all_path_as_trie(g1) + get_ps_fun = self._wrapper_find_all_path_as_trie + elif self.__compute_method != 'trie' and self.__k_func is not None: + paths_g1 = self.__find_all_paths_until_length(g1) + get_ps_fun = partial(self._wrapper_find_all_paths_until_length, True) + else: + paths_g1 = self.__find_all_paths_until_length(g1) + get_ps_fun = partial(self._wrapper_find_all_paths_until_length, False) + if self._verbose >= 2: + iterator = tqdm(pool.imap_unordered(get_ps_fun, itr, chunksize), + desc='getting paths', file=sys.stdout) + else: + iterator = pool.imap_unordered(get_ps_fun, itr, chunksize) + for i, ps in iterator: + paths_g_list[i] = ps + pool.close() + pool.join() + + # compute kernel list. + kernel_list = [None] * len(g_list) + + def init_worker(p1_toshare, plist_toshare): + global G_p1, G_plist + G_p1 = p1_toshare + G_plist = plist_toshare + do_fun = self._wrapper_kernel_list_do + def func_assign(result, var_to_assign): + var_to_assign[result[0]] = result[1] + itr = range(len(g_list)) + len_itr = len(g_list) + parallel_me(do_fun, func_assign, kernel_list, itr, len_itr=len_itr, + init_worker=init_worker, glbv=(paths_g1, paths_g_list), method='imap_unordered', n_jobs=self._n_jobs, itr_desc='calculating kernels', verbose=self._verbose) + + return kernel_list + + + def _wrapper_kernel_list_do(self, itr): + if self.__compute_method == 'trie' and self.__k_func is not None: + return itr, self.__kernel_do_trie(G_p1, G_plist[itr]) + elif self.__compute_method != 'trie' and self.__k_func is not None: + return itr, self.__kernel_do_naive(G_p1, G_plist[itr]) + else: + return itr, self.__kernel_do_kernelless(G_p1, G_plist[itr]) + + + def _compute_single_kernel_series(self, g1, g2): + self.__add_dummy_labels([g1] + [g2]) + if self.__compute_method == 'trie': + paths_g1 = self.__find_all_path_as_trie(g1) + paths_g2 = self.__find_all_path_as_trie(g2) + kernel = self.__kernel_do_trie(paths_g1, paths_g2) + else: + paths_g1 = self.__find_all_paths_until_length(g1) + paths_g2 = self.__find_all_paths_until_length(g2) + kernel = self.__kernel_do_naive(paths_g1, paths_g2) + return kernel + + + def __kernel_do_trie(self, trie1, trie2): + """Calculate path graph kernels up to depth d between 2 graphs using trie. + + Parameters + ---------- + trie1, trie2 : list + Tries that contains all paths in 2 graphs. + k_func : function + A kernel function applied using different notions of fingerprint + similarity. + + Return + ------ + kernel : float + Path kernel up to h between 2 graphs. + """ + if self.__k_func == 'tanimoto': + # traverse all paths in graph1 and search them in graph2. Deep-first + # search is applied. + def traverseTrie1t(root, trie2, setlist, pcurrent=[]): + for key, node in root['children'].items(): + pcurrent.append(key) + if node['isEndOfWord']: + setlist[1] += 1 + count2 = trie2.searchWord(pcurrent) + if count2 != 0: + setlist[0] += 1 + if node['children'] != {}: + traverseTrie1t(node, trie2, setlist, pcurrent) + else: + del pcurrent[-1] + if pcurrent != []: + del pcurrent[-1] + + + # traverse all paths in graph2 and find out those that are not in + # graph1. Deep-first search is applied. + def traverseTrie2t(root, trie1, setlist, pcurrent=[]): + for key, node in root['children'].items(): + pcurrent.append(key) + if node['isEndOfWord']: + # print(node['count']) + count1 = trie1.searchWord(pcurrent) + if count1 == 0: + setlist[1] += 1 + if node['children'] != {}: + traverseTrie2t(node, trie1, setlist, pcurrent) + else: + del pcurrent[-1] + if pcurrent != []: + del pcurrent[-1] + + setlist = [0, 0] # intersection and union of path sets of g1, g2. + # print(trie1.root) + # print(trie2.root) + traverseTrie1t(trie1.root, trie2, setlist) + # print(setlist) + traverseTrie2t(trie2.root, trie1, setlist) + # print(setlist) + kernel = setlist[0] / setlist[1] + + elif self.__k_func == 'MinMax': # MinMax kernel + # traverse all paths in graph1 and search them in graph2. Deep-first + # search is applied. + def traverseTrie1m(root, trie2, sumlist, pcurrent=[]): + for key, node in root['children'].items(): + pcurrent.append(key) + if node['isEndOfWord']: +# print(node['count']) + count1 = node['count'] + count2 = trie2.searchWord(pcurrent) + sumlist[0] += min(count1, count2) + sumlist[1] += max(count1, count2) + if node['children'] != {}: + traverseTrie1m(node, trie2, sumlist, pcurrent) + else: + del pcurrent[-1] + if pcurrent != []: + del pcurrent[-1] + + # traverse all paths in graph2 and find out those that are not in + # graph1. Deep-first search is applied. + def traverseTrie2m(root, trie1, sumlist, pcurrent=[]): + for key, node in root['children'].items(): + pcurrent.append(key) + if node['isEndOfWord']: + # print(node['count']) + count1 = trie1.searchWord(pcurrent) + if count1 == 0: + sumlist[1] += node['count'] + if node['children'] != {}: + traverseTrie2m(node, trie1, sumlist, pcurrent) + else: + del pcurrent[-1] + if pcurrent != []: + del pcurrent[-1] + + sumlist = [0, 0] # sum of mins and sum of maxs +# print(trie1.root) +# print(trie2.root) + traverseTrie1m(trie1.root, trie2, sumlist) +# print(sumlist) + traverseTrie2m(trie2.root, trie1, sumlist) +# print(sumlist) + kernel = sumlist[0] / sumlist[1] + else: + raise Exception('The given "k_func" cannot be recognized. Possible choices include: "tanimoto", "MinMax".') + + return kernel + + + def _wrapper_kernel_do_trie(self, itr): + i = itr[0] + j = itr[1] + return i, j, self.__kernel_do_trie(G_trie[i], G_trie[j]) + + + def __kernel_do_naive(self, paths1, paths2): + """Calculate path graph kernels up to depth d between 2 graphs naively. + + Parameters + ---------- + paths_list : list of list + List of list of paths in all graphs, where for unlabeled graphs, each + path is represented by a list of nodes; while for labeled graphs, each + path is represented by a string consists of labels of nodes and/or + edges on that path. + k_func : function + A kernel function applied using different notions of fingerprint + similarity. + + Return + ------ + kernel : float + Path kernel up to h between 2 graphs. + """ + all_paths = list(set(paths1 + paths2)) + + if self.__k_func == 'tanimoto': + length_union = len(set(paths1 + paths2)) + kernel = (len(set(paths1)) + len(set(paths2)) - + length_union) / length_union + # vector1 = [(1 if path in paths1 else 0) for path in all_paths] + # vector2 = [(1 if path in paths2 else 0) for path in all_paths] + # kernel_uv = np.dot(vector1, vector2) + # kernel = kernel_uv / (len(set(paths1)) + len(set(paths2)) - kernel_uv) + + elif self.__k_func == 'MinMax': # MinMax kernel + path_count1 = Counter(paths1) + path_count2 = Counter(paths2) + vector1 = [(path_count1[key] if (key in path_count1.keys()) else 0) + for key in all_paths] + vector2 = [(path_count2[key] if (key in path_count2.keys()) else 0) + for key in all_paths] + kernel = np.sum(np.minimum(vector1, vector2)) / \ + np.sum(np.maximum(vector1, vector2)) + + elif self.__k_func is None: # no sub-kernel used; compare paths directly. + path_count1 = Counter(paths1) + path_count2 = Counter(paths2) + vector1 = [(path_count1[key] if (key in path_count1.keys()) else 0) + for key in all_paths] + vector2 = [(path_count2[key] if (key in path_count2.keys()) else 0) + for key in all_paths] + kernel = np.dot(vector1, vector2) + + else: + raise Exception('The given "k_func" cannot be recognized. Possible choices include: "tanimoto", "MinMax" and None.') + + return kernel + + + def _wrapper_kernel_do_naive(self, itr): + i = itr[0] + j = itr[1] + return i, j, self.__kernel_do_naive(G_plist[i], G_plist[j]) + + + def __find_all_path_as_trie(self, G): + # all_path = find_all_paths_until_length(G, length, ds_attrs, + # node_label=node_label, + # edge_label=edge_label) + # ptrie = Trie() + # for path in all_path: + # ptrie.insertWord(path) + + # ptrie = Trie() + # path_l = [[n] for n in G.nodes] # paths of length l + # path_l_str = paths2labelseqs(path_l, G, ds_attrs, node_label, edge_label) + # for p in path_l_str: + # ptrie.insertWord(p) + # for l in range(1, length + 1): + # path_lplus1 = [] + # for path in path_l: + # for neighbor in G[path[-1]]: + # if neighbor not in path: + # tmp = path + [neighbor] + ## if tmp[::-1] not in path_lplus1: + # path_lplus1.append(tmp) + # path_l = path_lplus1[:] + # # consider labels + # path_l_str = paths2labelseqs(path_l, G, ds_attrs, node_label, edge_label) + # for p in path_l_str: + # ptrie.insertWord(p) + # + # print(time.time() - time1) + # print(ptrie.root) + # print() + + + # traverse all paths up to length h in a graph and construct a trie with + # them. Deep-first search is applied. Notice the reverse of each path is + # also stored to the trie. + def traverseGraph(root, ptrie, G, pcurrent=[]): + if len(pcurrent) < self.__depth + 1: + for neighbor in G[root]: + if neighbor not in pcurrent: + pcurrent.append(neighbor) + plstr = self.__paths2labelseqs([pcurrent], G) + ptrie.insertWord(plstr[0]) + traverseGraph(neighbor, ptrie, G, pcurrent) + del pcurrent[-1] + + + ptrie = Trie() + path_l = [[n] for n in G.nodes] # paths of length l + path_l_str = self.__paths2labelseqs(path_l, G) + for p in path_l_str: + ptrie.insertWord(p) + for n in G.nodes: + traverseGraph(n, ptrie, G, pcurrent=[n]) + + + # def traverseGraph(root, all_paths, length, G, ds_attrs, node_label, edge_label, + # pcurrent=[]): + # if len(pcurrent) < length + 1: + # for neighbor in G[root]: + # if neighbor not in pcurrent: + # pcurrent.append(neighbor) + # plstr = paths2labelseqs([pcurrent], G, ds_attrs, + # node_label, edge_label) + # all_paths.append(pcurrent[:]) + # traverseGraph(neighbor, all_paths, length, G, ds_attrs, + # node_label, edge_label, pcurrent) + # del pcurrent[-1] + # + # + # path_l = [[n] for n in G.nodes] # paths of length l + # all_paths = path_l[:] + # path_l_str = paths2labelseqs(path_l, G, ds_attrs, node_label, edge_label) + ## for p in path_l_str: + ## ptrie.insertWord(p) + # for n in G.nodes: + # traverseGraph(n, all_paths, length, G, ds_attrs, node_label, edge_label, + # pcurrent=[n]) + + # print(ptrie.root) + return ptrie + + + def _wrapper_find_all_path_as_trie(self, itr_item): + g = itr_item[0] + i = itr_item[1] + return i, self.__find_all_path_as_trie(g) + + + # @todo: (can be removed maybe) this method find paths repetively, it could be faster. + def __find_all_paths_until_length(self, G, tolabelseqs=True): + """Find all paths no longer than a certain maximum length in a graph. A + recursive depth first search is applied. + + Parameters + ---------- + G : NetworkX graphs + The graph in which paths are searched. + length : integer + The maximum length of paths. + ds_attrs: dict + Dataset attributes. + node_label : string + Node attribute used as label. The default node label is atom. + edge_label : string + Edge attribute used as label. The default edge label is bond_type. + + Return + ------ + path : list + List of paths retrieved, where for unlabeled graphs, each path is + represented by a list of nodes; while for labeled graphs, each path is + represented by a list of strings consists of labels of nodes and/or + edges on that path. + """ + # path_l = [tuple([n]) for n in G.nodes] # paths of length l + # all_paths = path_l[:] + # for l in range(1, self.__depth + 1): + # path_l_new = [] + # for path in path_l: + # for neighbor in G[path[-1]]: + # if len(path) < 2 or neighbor != path[-2]: + # tmp = path + (neighbor, ) + # if tuple(tmp[::-1]) not in path_l_new: + # path_l_new.append(tuple(tmp)) + + # all_paths += path_l_new + # path_l = path_l_new[:] + + path_l = [[n] for n in G.nodes] # paths of length l + all_paths = [p.copy() for p in path_l] + for l in range(1, self.__depth + 1): + path_lplus1 = [] + for path in path_l: + for neighbor in G[path[-1]]: + if neighbor not in path: + tmp = path + [neighbor] + # if tmp[::-1] not in path_lplus1: + path_lplus1.append(tmp) + + all_paths += path_lplus1 + path_l = [p.copy() for p in path_lplus1] + + # for i in range(0, self.__depth + 1): + # new_paths = find_all_paths(G, i) + # if new_paths == []: + # break + # all_paths.extend(new_paths) + + # consider labels + # print(paths2labelseqs(all_paths, G, ds_attrs, node_label, edge_label)) + # print() + return (self.__paths2labelseqs(all_paths, G) if tolabelseqs else all_paths) + + + def _wrapper_find_all_paths_until_length(self, tolabelseqs, itr_item): + g = itr_item[0] + i = itr_item[1] + return i, self.__find_all_paths_until_length(g, tolabelseqs=tolabelseqs) + + + def __paths2labelseqs(self, plist, G): + if len(self.__node_labels) > 0: + if len(self.__edge_labels) > 0: + path_strs = [] + for path in plist: + pths_tmp = [] + for idx, node in enumerate(path[:-1]): + pths_tmp.append(tuple(G.nodes[node][nl] for nl in self.__node_labels)) + pths_tmp.append(tuple(G[node][path[idx + 1]][el] for el in self.__edge_labels)) + pths_tmp.append(tuple(G.nodes[path[-1]][nl] for nl in self.__node_labels)) + path_strs.append(tuple(pths_tmp)) + else: + path_strs = [] + for path in plist: + pths_tmp = [] + for node in path: + pths_tmp.append(tuple(G.nodes[node][nl] for nl in self.__node_labels)) + path_strs.append(tuple(pths_tmp)) + return path_strs + else: + if len(self.__edge_labels) > 0: + path_strs = [] + for path in plist: + if len(path) == 1: + path_strs.append(tuple()) + else: + pths_tmp = [] + for idx, node in enumerate(path[:-1]): + pths_tmp.append(tuple(G[node][path[idx + 1]][el] for el in self.__edge_labels)) + path_strs.append(tuple(pths_tmp)) + return path_strs + else: + return [tuple(['0' for node in path]) for path in plist] + # return [tuple([len(path)]) for path in all_paths] + + + def __add_dummy_labels(self, Gn): + if self.__k_func is not None: + if len(self.__node_labels) == 0 or (len(self.__node_labels) == 1 and self.__node_labels[0] == SpecialLabel.DUMMY): + for i in range(len(Gn)): + nx.set_node_attributes(Gn[i], '0', SpecialLabel.DUMMY) + self.__node_labels = [SpecialLabel.DUMMY] + if len(self.__edge_labels) == 0 or (len(self.__edge_labels) == 1 and self.__edge_labels[0] == SpecialLabel.DUMMY): + for i in range(len(Gn)): + nx.set_edge_attributes(Gn[i], '0', SpecialLabel.DUMMY) + self.__edge_labels = [SpecialLabel.DUMMY] \ No newline at end of file