|
- """
- @author: linlin
- @references: Suard F, Rakotomamonjy A, Bensrhair A. Kernel on Bag of Paths For Measuring Similarity of Shapes. InESANN 2007 Apr 25 (pp. 355-360).
- """
-
- import sys
- import pathlib
- sys.path.insert(0, "../")
-
- import networkx as nx
- import numpy as np
- import time
-
- from pygraph.kernels.deltaKernel import deltakernel
-
- def pathkernel(*args, node_label = 'atom', edge_label = 'bond_type'):
- """Calculate mean average path kernels between graphs.
-
- Parameters
- ----------
- Gn : List of NetworkX graph
- List of graphs between which the kernels are calculated.
- /
- G1, G2 : NetworkX graphs
- 2 graphs between which the kernel is calculated.
- node_label : string
- node attribute used as label. The default node label is atom.
- edge_label : string
- edge attribute used as label. The default edge label is bond_type.
-
- Return
- ------
- Kmatrix/kernel : Numpy matrix/float
- Kernel matrix, each element of which is the path kernel between 2 praphs. / Path kernel between 2 graphs.
- """
- some_graph = args[0][0] if len(args) == 1 else args[0] # only edge attributes of type int or float can be used as edge weight to calculate the shortest paths.
- some_weight = list(nx.get_edge_attributes(some_graph, edge_label).values())[0]
- weight = edge_label if isinstance(some_weight, float) or isinstance(some_weight, int) else None
-
- if len(args) == 1: # for a list of graphs
- Gn = args[0]
- Kmatrix = np.zeros((len(Gn), len(Gn)))
-
- start_time = time.time()
-
- splist = [ get_shortest_paths(Gn[i], weight) for i in range(0, len(Gn)) ]
-
- for i in range(0, len(Gn)):
- for j in range(i, len(Gn)):
- Kmatrix[i][j] = _pathkernel_do(Gn[i], Gn[j], splist[i], splist[j], node_label, edge_label)
- Kmatrix[j][i] = Kmatrix[i][j]
-
- run_time = time.time() - start_time
- print("\n --- mean average path kernel matrix of size %d built in %s seconds ---" % (len(Gn), run_time))
-
- return Kmatrix, run_time
-
- else: # for only 2 graphs
- start_time = time.time()
-
- splist = get_shortest_paths(args[0], weight)
- splist = get_shortest_paths(args[1], weight)
-
- kernel = _pathkernel_do(args[0], args[1], sp1, sp2, node_label, edge_label)
-
- run_time = time.time() - start_time
- print("\n --- mean average path kernel built in %s seconds ---" % (run_time))
-
- return kernel, run_time
-
-
- def _pathkernel_do(G1, G2, sp1, sp2, node_label = 'atom', edge_label = 'bond_type'):
- """Calculate mean average path kernel between 2 graphs.
-
- Parameters
- ----------
- G1, G2 : NetworkX graphs
- 2 graphs between which the kernel is calculated.
- sp1, sp2 : list of list
- List of shortest paths of 2 graphs, where each path is represented by a list of nodes.
- node_label : string
- node attribute used as label. The default node label is atom.
- edge_label : string
- edge attribute used as label. The default edge label is bond_type.
-
- Return
- ------
- kernel : float
- Path Kernel between 2 graphs.
- """
- # calculate shortest paths for both graphs
-
- # calculate kernel
- kernel = 0
- for path1 in sp1:
- for path2 in sp2:
- if len(path1) == len(path2):
- kernel_path = (G1.node[path1[0]][node_label] == G2.node[path2[0]][node_label])
- if kernel_path:
- for i in range(1, len(path1)):
- # kernel = 1 if all corresponding nodes and edges in the 2 paths have same labels, otherwise 0
- kernel_path *= (G1[path1[i - 1]][path1[i]][edge_label] == G2[path2[i - 1]][path2[i]][edge_label]) \
- * (G1.node[path1[i]][node_label] == G2.node[path2[i]][node_label])
- if kernel_path == 0:
- break
- kernel += kernel_path # add up kernels of all paths
-
- # kernel = 0
- # for path1 in sp1:
- # for path2 in sp2:
- # if len(path1) == len(path2):
- # if (G1.node[path1[0]][node_label] == G2.node[path2[0]][node_label]):
- # for i in range(1, len(path1)):
- # # kernel = 1 if all corresponding nodes and edges in the 2 paths have same labels, otherwise 0
- # # kernel_path *= (G1[path1[i - 1]][path1[i]][edge_label] == G2[path2[i - 1]][path2[i]][edge_label]) \
- # # * (G1.node[path1[i]][node_label] == G2.node[path2[i]][node_label])
- # # if kernel_path == 0:
- # # break
- # # kernel += kernel_path # add up kernels of all paths
- # if (G1[path1[i - 1]][path1[i]][edge_label] != G2[path2[i - 1]][path2[i]][edge_label]) or \
- # (G1.node[path1[i]][node_label] != G2.node[path2[i]][node_label]):
- # break
- # else:
- # kernel += 1
-
- kernel = kernel / (len(sp1) * len(sp2)) # calculate mean average
-
- return kernel
-
- def get_shortest_paths(G, weight):
- """Get all shortest paths of a graph.
-
- Parameters
- ----------
- G : NetworkX graphs
- The graphs whose paths are calculated.
- weight : string/None
- edge attribute used as weight to calculate the shortest path.
-
- Return
- ------
- sp : list of list
- List of shortest paths of the graph, where each path is represented by a list of nodes.
- """
- sp = []
- num_nodes = G.number_of_nodes()
- for node1 in range(num_nodes):
- for node2 in range(node1 + 1, num_nodes):
- sp.append(nx.shortest_path(G, node1, node2, weight = weight))
- return sp
|