Browse Source

New translations common_walk.py (Chinese Simplified)

l10n_v0.2.x
linlin 4 years ago
parent
commit
95f39ecb0c
1 changed files with 293 additions and 0 deletions
  1. +293
    -0
      lang/zh/gklearn/kernels/common_walk.py

+ 293
- 0
lang/zh/gklearn/kernels/common_walk.py View File

@@ -0,0 +1,293 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Aug 18 11:21:31 2020

@author: ljia

@references:

[1] Thomas Gärtner, Peter Flach, and Stefan Wrobel. On graph kernels:
Hardness results and efficient alternatives. Learning Theory and Kernel
Machines, pages 129–143, 2003.
"""

import sys
from tqdm import tqdm
import numpy as np
import networkx as nx
from gklearn.utils import SpecialLabel
from gklearn.utils.parallel import parallel_gm, parallel_me
from gklearn.utils.utils import direct_product_graph
from gklearn.kernels import GraphKernel


class CommonWalk(GraphKernel):
def __init__(self, **kwargs):
GraphKernel.__init__(self)
self.__node_labels = kwargs.get('node_labels', [])
self.__edge_labels = kwargs.get('edge_labels', [])
self.__weight = kwargs.get('weight', 1)
self.__compute_method = kwargs.get('compute_method', None)
self.__ds_infos = kwargs.get('ds_infos', {})
self.__compute_method = self.__compute_method.lower()


def _compute_gm_series(self):
self.__check_graphs(self._graphs)
self.__add_dummy_labels(self._graphs)
if not self.__ds_infos['directed']: # convert
self._graphs = [G.to_directed() for G in self._graphs]
# compute Gram matrix.
gram_matrix = np.zeros((len(self._graphs), len(self._graphs)))
from itertools import combinations_with_replacement
itr = combinations_with_replacement(range(0, len(self._graphs)), 2)
if self._verbose >= 2:
iterator = tqdm(itr, desc='calculating kernels', file=sys.stdout)
else:
iterator = itr
# direct product graph method - exponential
if self.__compute_method == 'exp':
for i, j in iterator:
kernel = self.__kernel_do_exp(self._graphs[i], self._graphs[j], self.__weight)
gram_matrix[i][j] = kernel
gram_matrix[j][i] = kernel
# direct product graph method - geometric
elif self.__compute_method == 'geo':
for i, j in iterator:
kernel = self.__kernel_do_geo(self._graphs[i], self._graphs[j], self.__weight)
gram_matrix[i][j] = kernel
gram_matrix[j][i] = kernel
return gram_matrix
def _compute_gm_imap_unordered(self):
self.__check_graphs(self._graphs)
self.__add_dummy_labels(self._graphs)
if not self.__ds_infos['directed']: # convert
self._graphs = [G.to_directed() for G in self._graphs]
# compute Gram matrix.
gram_matrix = np.zeros((len(self._graphs), len(self._graphs)))
# def init_worker(gn_toshare):
# global G_gn
# G_gn = gn_toshare
# direct product graph method - exponential
if self.__compute_method == 'exp':
do_fun = self._wrapper_kernel_do_exp
# direct product graph method - geometric
elif self.__compute_method == 'geo':
do_fun = self._wrapper_kernel_do_geo
parallel_gm(do_fun, gram_matrix, self._graphs, init_worker=_init_worker_gm,
glbv=(self._graphs,), n_jobs=self._n_jobs, verbose=self._verbose)
return gram_matrix
def _compute_kernel_list_series(self, g1, g_list):
self.__check_graphs(g_list + [g1])
self.__add_dummy_labels(g_list + [g1])
if not self.__ds_infos['directed']: # convert
g1 = g1.to_directed()
g_list = [G.to_directed() for G in g_list]
# compute kernel list.
kernel_list = [None] * len(g_list)
if self._verbose >= 2:
iterator = tqdm(range(len(g_list)), desc='calculating kernels', file=sys.stdout)
else:
iterator = range(len(g_list))
# direct product graph method - exponential
if self.__compute_method == 'exp':
for i in iterator:
kernel = self.__kernel_do_exp(g1, g_list[i], self.__weight)
kernel_list[i] = kernel
# direct product graph method - geometric
elif self.__compute_method == 'geo':
for i in iterator:
kernel = self.__kernel_do_geo(g1, g_list[i], self.__weight)
kernel_list[i] = kernel
return kernel_list
def _compute_kernel_list_imap_unordered(self, g1, g_list):
self.__check_graphs(g_list + [g1])
self.__add_dummy_labels(g_list + [g1])
if not self.__ds_infos['directed']: # convert
g1 = g1.to_directed()
g_list = [G.to_directed() for G in g_list]
# compute kernel list.
kernel_list = [None] * len(g_list)

# def init_worker(g1_toshare, g_list_toshare):
# global G_g1, G_g_list
# G_g1 = g1_toshare
# G_g_list = g_list_toshare
# direct product graph method - exponential
if self.__compute_method == 'exp':
do_fun = self._wrapper_kernel_list_do_exp
# direct product graph method - geometric
elif self.__compute_method == 'geo':
do_fun = self._wrapper_kernel_list_do_geo
def func_assign(result, var_to_assign):
var_to_assign[result[0]] = result[1]
itr = range(len(g_list))
len_itr = len(g_list)
parallel_me(do_fun, func_assign, kernel_list, itr, len_itr=len_itr,
init_worker=_init_worker_list, glbv=(g1, g_list), method='imap_unordered',
n_jobs=self._n_jobs, itr_desc='calculating kernels', verbose=self._verbose)
return kernel_list
def _wrapper_kernel_list_do_exp(self, itr):
return itr, self.__kernel_do_exp(G_g1, G_g_list[itr], self.__weight)


def _wrapper_kernel_list_do_geo(self, itr):
return itr, self.__kernel_do_geo(G_g1, G_g_list[itr], self.__weight)
def _compute_single_kernel_series(self, g1, g2):
self.__check_graphs([g1] + [g2])
self.__add_dummy_labels([g1] + [g2])
if not self.__ds_infos['directed']: # convert
g1 = g1.to_directed()
g2 = g2.to_directed()
# direct product graph method - exponential
if self.__compute_method == 'exp':
kernel = self.__kernel_do_exp(g1, g2, self.__weight)
# direct product graph method - geometric
elif self.__compute_method == 'geo':
kernel = self.__kernel_do_geo(g1, g2, self.__weight)

return kernel
def __kernel_do_exp(self, g1, g2, beta):
"""Calculate common walk graph kernel between 2 graphs using exponential
series.
Parameters
----------
g1, g2 : NetworkX graphs
Graphs between which the kernels are calculated.
beta : integer
Weight.
Return
------
kernel : float
The common walk Kernel between 2 graphs.
"""
# get tensor product / direct product
gp = direct_product_graph(g1, g2, self.__node_labels, self.__edge_labels)
# return 0 if the direct product graph have no more than 1 node.
if nx.number_of_nodes(gp) < 2:
return 0
A = nx.adjacency_matrix(gp).todense()
ew, ev = np.linalg.eig(A)
# # remove imaginary part if possible.
# # @todo: don't know if it is necessary.
# for i in range(len(ew)):
# if np.abs(ew[i].imag) < 1e-9:
# ew[i] = ew[i].real
# for i in range(ev.shape[0]):
# for j in range(ev.shape[1]):
# if np.abs(ev[i, j].imag) < 1e-9:
# ev[i, j] = ev[i, j].real

D = np.zeros((len(ew), len(ew)), dtype=complex) # @todo: use complex?
for i in range(len(ew)):
D[i][i] = np.exp(beta * ew[i])

exp_D = ev * D * ev.T
kernel = exp_D.sum()
if (kernel.real == 0 and np.abs(kernel.imag) < 1e-9) or np.abs(kernel.imag / kernel.real) < 1e-9:
kernel = kernel.real
return kernel
def _wrapper_kernel_do_exp(self, itr):
i = itr[0]
j = itr[1]
return i, j, self.__kernel_do_exp(G_gn[i], G_gn[j], self.__weight)
def __kernel_do_geo(self, g1, g2, gamma):
"""Calculate common walk graph kernel between 2 graphs using geometric
series.
Parameters
----------
g1, g2 : NetworkX graphs
Graphs between which the kernels are calculated.
gamma : integer
Weight.
Return
------
kernel : float
The common walk Kernel between 2 graphs.
"""
# get tensor product / direct product
gp = direct_product_graph(g1, g2, self.__node_labels, self.__edge_labels)
# return 0 if the direct product graph have no more than 1 node.
if nx.number_of_nodes(gp) < 2:
return 0
A = nx.adjacency_matrix(gp).todense()
mat = np.identity(len(A)) - gamma * A
# try:
return mat.I.sum()
# except np.linalg.LinAlgError:
# return np.nan

def _wrapper_kernel_do_geo(self, itr):
i = itr[0]
j = itr[1]
return i, j, self.__kernel_do_geo(G_gn[i], G_gn[j], self.__weight)
def __check_graphs(self, Gn):
for g in Gn:
if nx.number_of_nodes(g) == 1:
raise Exception('Graphs must contain more than 1 nodes to construct adjacency matrices.')
def __add_dummy_labels(self, Gn):
if len(self.__node_labels) == 0 or (len(self.__node_labels) == 1 and self.__node_labels[0] == SpecialLabel.DUMMY):
for i in range(len(Gn)):
nx.set_node_attributes(Gn[i], '0', SpecialLabel.DUMMY)
self.__node_labels = [SpecialLabel.DUMMY]
if len(self.__edge_labels) == 0 or (len(self.__edge_labels) == 1 and self.__edge_labels[0] == SpecialLabel.DUMMY):
for i in range(len(Gn)):
nx.set_edge_attributes(Gn[i], '0', SpecialLabel.DUMMY)
self.__edge_labels = [SpecialLabel.DUMMY]
def _init_worker_gm(gn_toshare):
global G_gn
G_gn = gn_toshare
def _init_worker_list(g1_toshare, g_list_toshare):
global G_g1, G_g_list
G_g1 = g1_toshare
G_g_list = g_list_toshare

Loading…
Cancel
Save