#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Mon Mar 30 11:52:47 2020 @author: ljia """ import numpy as np import networkx as nx import multiprocessing import time class GraphKernel(object): def __init__(self): self._graphs = None self._parallel = '' self._n_jobs = 0 self._verbose = None self._normalize = True self._run_time = 0 self._gram_matrix = None self._gram_matrix_unnorm = None def compute(self, *graphs, **kwargs): self._parallel = kwargs.get('parallel', 'imap_unordered') self._n_jobs = kwargs.get('n_jobs', multiprocessing.cpu_count()) self._normalize = kwargs.get('normalize', True) self._verbose = kwargs.get('verbose', 2) if len(graphs) == 1: if not isinstance(graphs[0], list): raise Exception('Cannot detect graphs.') elif len(graphs[0]) == 0: raise Exception('The graph list given is empty. No computation was performed.') else: self._graphs = [g.copy() for g in graphs[0]] self._gram_matrix = self.__compute_gram_matrix() self._gram_matrix_unnorm = np.copy(self._gram_matrix) if self._normalize: self._gram_matrix = self.normalize_gm(self._gram_matrix) return self._gram_matrix, self._run_time elif len(graphs) == 2: if self.is_graph(graphs[0]) and self.is_graph(graphs[1]): kernel = self.__compute_single_kernel(graphs[0].copy(), graphs[1].copy()) return kernel, self._run_time elif self.is_graph(graphs[0]) and isinstance(graphs[1], list): g1 = graphs[0].copy() g_list = [g.copy() for g in graphs[1]] kernel_list = self.__compute_kernel_list(g1, g_list) return kernel_list, self._run_time elif isinstance(graphs[0], list) and self.is_graph(graphs[1]): g1 = graphs[1].copy() g_list = [g.copy() for g in graphs[0]] kernel_list = self.__compute_kernel_list(g1, g_list) return kernel_list, self._run_time else: raise Exception('Cannot detect graphs.') elif len(graphs) == 0 and self._graphs is None: raise Exception('Please add graphs before computing.') else: raise Exception('Cannot detect graphs.') def normalize_gm(self, gram_matrix): import warnings warnings.warn('gklearn.kernels.graph_kernel.normalize_gm will be deprecated, use gklearn.utils.normalize_gram_matrix instead', DeprecationWarning) diag = gram_matrix.diagonal().copy() for i in range(len(gram_matrix)): for j in range(i, len(gram_matrix)): gram_matrix[i][j] /= np.sqrt(diag[i] * diag[j]) gram_matrix[j][i] = gram_matrix[i][j] return gram_matrix def compute_distance_matrix(self): if self._gram_matrix is None: raise Exception('Please compute the Gram matrix before computing distance matrix.') dis_mat = np.empty((len(self._gram_matrix), len(self._gram_matrix))) for i in range(len(self._gram_matrix)): for j in range(i, len(self._gram_matrix)): dis = self._gram_matrix[i, i] + self._gram_matrix[j, j] - 2 * self._gram_matrix[i, j] if dis < 0: if dis > -1e-10: dis = 0 else: raise ValueError('The distance is negative.') dis_mat[i, j] = np.sqrt(dis) dis_mat[j, i] = dis_mat[i, j] dis_max = np.max(np.max(dis_mat)) dis_min = np.min(np.min(dis_mat[dis_mat != 0])) dis_mean = np.mean(np.mean(dis_mat)) return dis_mat, dis_max, dis_min, dis_mean def __compute_gram_matrix(self): start_time = time.time() if self._parallel == 'imap_unordered': gram_matrix = self._compute_gm_imap_unordered() elif self._parallel is None: gram_matrix = self._compute_gm_series() else: raise Exception('Parallel mode is not set correctly.') self._run_time = time.time() - start_time if self._verbose: print('Gram matrix of size %d built in %s seconds.' % (len(self._graphs), self._run_time)) return gram_matrix def _compute_gm_series(self): pass def _compute_gm_imap_unordered(self): pass def __compute_kernel_list(self, g1, g_list): start_time = time.time() if self._parallel == 'imap_unordered': kernel_list = self._compute_kernel_list_imap_unordered(g1, g_list) elif self._parallel is None: kernel_list = self._compute_kernel_list_series(g1, g_list) else: raise Exception('Parallel mode is not set correctly.') self._run_time = time.time() - start_time if self._verbose: print('Graph kernel bewteen a graph and a list of %d graphs built in %s seconds.' % (len(g_list), self._run_time)) return kernel_list def _compute_kernel_list_series(self, g1, g_list): pass def _compute_kernel_list_imap_unordered(self, g1, g_list): pass def __compute_single_kernel(self, g1, g2): start_time = time.time() kernel = self._compute_single_kernel_series(g1, g2) self._run_time = time.time() - start_time if self._verbose: print('Graph kernel bewteen two graphs built in %s seconds.' % (self._run_time)) return kernel def _compute_single_kernel_series(self, g1, g2): pass def is_graph(self, graph): if isinstance(graph, nx.Graph): return True if isinstance(graph, nx.DiGraph): return True if isinstance(graph, nx.MultiGraph): return True if isinstance(graph, nx.MultiDiGraph): return True return False @property def graphs(self): return self._graphs @property def parallel(self): return self._parallel @property def n_jobs(self): return self._n_jobs @property def verbose(self): return self._verbose @property def normalize(self): return self._normalize @property def run_time(self): return self._run_time @property def gram_matrix(self): return self._gram_matrix @gram_matrix.setter def gram_matrix(self, value): self._gram_matrix = value @property def gram_matrix_unnorm(self): return self._gram_matrix_unnorm @gram_matrix_unnorm.setter def gram_matrix_unnorm(self, value): self._gram_matrix_unnorm = value