""" @author: linlin @references: Suard F, Rakotomamonjy A, Bensrhair A. Kernel on Bag of Paths For Measuring Similarity of Shapes. InESANN 2007 Apr 25 (pp. 355-360). """ import sys import pathlib sys.path.insert(0, "../") import time import itertools from tqdm import tqdm import networkx as nx import numpy as np from gklearn.kernels.deltaKernel import deltakernel from gklearn.utils.graphdataset import get_dataset_attributes def pathkernel(*args, node_label='atom', edge_label='bond_type'): """Calculate mean average path kernels between graphs. Parameters ---------- Gn : List of NetworkX graph List of graphs between which the kernels are calculated. / G1, G2 : NetworkX graphs 2 graphs between which the kernel is calculated. node_label : string node attribute used as label. The default node label is atom. edge_label : string edge attribute used as label. The default edge label is bond_type. Return ------ Kmatrix/kernel : Numpy matrix/float Kernel matrix, each element of which is the path kernel between 2 praphs. / Path kernel between 2 graphs. """ Gn = args[0] if len(args) == 1 else [args[0], args[1]] Kmatrix = np.zeros((len(Gn), len(Gn))) ds_attrs = get_dataset_attributes( Gn, attr_names=['node_labeled', 'edge_labeled', 'is_directed'], node_label=node_label, edge_label=edge_label) try: some_weight = list(nx.get_edge_attributes(Gn[0], edge_label).values())[0] weight = edge_label if isinstance(some_weight, float) or isinstance( some_weight, int) else None except: weight = None start_time = time.time() splist = [ get_shortest_paths(Gn[i], weight) for i in tqdm( range(0, len(Gn)), desc='getting shortest paths', file=sys.stdout) ] pbar = tqdm( total=((len(Gn) + 1) * len(Gn) / 2), desc='calculating kernels', file=sys.stdout) if ds_attrs['node_labeled']: if ds_attrs['edge_labeled']: for i in range(0, len(Gn)): for j in range(i, len(Gn)): Kmatrix[i][j] = _pathkernel_do_l(Gn[i], Gn[j], splist[i], splist[j], node_label, edge_label) Kmatrix[j][i] = Kmatrix[i][j] pbar.update(1) else: for i in range(0, len(Gn)): for j in range(i, len(Gn)): Kmatrix[i][j] = _pathkernel_do_nl(Gn[i], Gn[j], splist[i], splist[j], node_label) Kmatrix[j][i] = Kmatrix[i][j] pbar.update(1) else: if ds_attrs['edge_labeled']: for i in range(0, len(Gn)): for j in range(i, len(Gn)): Kmatrix[i][j] = _pathkernel_do_el(Gn[i], Gn[j], splist[i], splist[j], edge_label) Kmatrix[j][i] = Kmatrix[i][j] pbar.update(1) else: for i in range(0, len(Gn)): for j in range(i, len(Gn)): Kmatrix[i][j] = _pathkernel_do_unl(Gn[i], Gn[j], splist[i], splist[j]) Kmatrix[j][i] = Kmatrix[i][j] pbar.update(1) run_time = time.time() - start_time print( "\n --- mean average path kernel matrix of size %d built in %s seconds ---" % (len(Gn), run_time)) return Kmatrix, run_time def _pathkernel_do_l(G1, G2, sp1, sp2, node_label, edge_label): """Calculate mean average path kernel between 2 fully-labeled graphs. Parameters ---------- G1, G2 : NetworkX graphs 2 graphs between which the kernel is calculated. sp1, sp2 : list of list List of shortest paths of 2 graphs, where each path is represented by a list of nodes. node_label : string node attribute used as label. The default node label is atom. edge_label : string edge attribute used as label. The default edge label is bond_type. Return ------ kernel : float Path Kernel between 2 graphs. """ # calculate kernel kernel = 0 # if len(sp1) == 0 or len(sp2) == 0: # return 0 # @todo: should it be zero? for path1 in sp1: for path2 in sp2: if len(path1) == len(path2): kernel_path = (G1.node[path1[0]][node_label] == G2.node[path2[ 0]][node_label]) if kernel_path: for i in range(1, len(path1)): # kernel = 1 if all corresponding nodes and edges in the 2 paths have same labels, otherwise 0 if G1[path1[i - 1]][path1[i]][edge_label] != G2[path2[i - 1]][path2[i]][edge_label] or G1.node[path1[i]][node_label] != G2.node[path2[i]][node_label]: kernel_path = 0 break kernel += kernel_path # add up kernels of all paths kernel = kernel / (len(sp1) * len(sp2)) # calculate mean average return kernel def _pathkernel_do_nl(G1, G2, sp1, sp2, node_label): """Calculate mean average path kernel between 2 node-labeled graphs. """ # calculate kernel kernel = 0 # if len(sp1) == 0 or len(sp2) == 0: # return 0 # @todo: should it be zero? for path1 in sp1: for path2 in sp2: if len(path1) == len(path2): kernel_path = 1 for i in range(0, len(path1)): # kernel = 1 if all corresponding nodes in the 2 paths have same labels, otherwise 0 if G1.node[path1[i]][node_label] != G2.node[path2[i]][node_label]: kernel_path = 0 break kernel += kernel_path kernel = kernel / (len(sp1) * len(sp2)) # calculate mean average return kernel def _pathkernel_do_el(G1, G2, sp1, sp2, edge_label): """Calculate mean average path kernel between 2 edge-labeled graphs. """ # calculate kernel kernel = 0 for path1 in sp1: for path2 in sp2: if len(path1) == len(path2): if len(path1) == 0: kernel += 1 else: kernel_path = 1 for i in range(0, len(path1) - 1): # kernel = 1 if all corresponding edges in the 2 paths have same labels, otherwise 0 if G1[path1[i]][path1[i + 1]][edge_label] != G2[path2[ i]][path2[i + 1]][edge_label]: kernel_path = 0 break kernel += kernel_path kernel = kernel / (len(sp1) * len(sp2)) # calculate mean average return kernel def _pathkernel_do_unl(G1, G2, sp1, sp2): """Calculate mean average path kernel between 2 unlabeled graphs. """ # calculate kernel kernel = 0 for path1 in sp1: for path2 in sp2: if len(path1) == len(path2): kernel += 1 kernel = kernel / (len(sp1) * len(sp2)) # calculate mean average return kernel def get_shortest_paths(G, weight): """Get all shortest paths of a graph. Parameters ---------- G : NetworkX graphs The graphs whose paths are calculated. weight : string/None edge attribute used as weight to calculate the shortest path. Return ------ sp : list of list List of shortest paths of the graph, where each path is represented by a list of nodes. """ sp = [] for n1, n2 in itertools.combinations(G.nodes(), 2): try: sp.append(nx.shortest_path(G, n1, n2, weight=weight)) except nx.NetworkXNoPath: # nodes not connected sp.append([]) # add single nodes as length 0 paths. sp += [[n] for n in G.nodes()] return sp