|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Mon May 11 11:03:01 2020
-
- @author: ljia
- """
- import numpy as np
- from sklearn.model_selection import ShuffleSplit
- from sklearn.neighbors import KNeighborsClassifier
- from sklearn.metrics import accuracy_score
- from gklearn.utils.utils import get_graph_kernel_by_name
- # from gklearn.preimage.utils import get_same_item_indices
-
- def sum_squares(a, b):
- """
- Return the sum of squares of the difference between a and b, aka MSE
- """
- return np.sum([(a[i] - b[i])**2 for i in range(len(a))])
-
-
- def euclid_d(x, y):
- """
- 1D euclidean distance
- """
- return np.sqrt((x-y)**2)
-
-
- def man_d(x, y):
- """
- 1D manhattan distance
- """
- return np.abs((x-y))
-
-
- def knn_regression(D_app, D_test, y_app, y_test, n_neighbors, verbose=True, text=None):
-
- from sklearn.neighbors import KNeighborsRegressor
- knn = KNeighborsRegressor(n_neighbors=n_neighbors, metric='precomputed')
- knn.fit(D_app, y_app)
- y_pred = knn.predict(D_app)
- y_pred_test = knn.predict(D_test.T)
- perf_app = np.sqrt(sum_squares(y_pred, y_app)/len(y_app))
- perf_test = np.sqrt(sum_squares(y_pred_test, y_test)/len(y_test))
-
- if (verbose):
- print("Learning error with {} train examples : {}".format(text, perf_app))
- print("Test error with {} train examples : {}".format(text, perf_test))
-
- return perf_app, perf_test
-
-
- def knn_classification(d_app, d_test, y_app, y_test, n_neighbors, verbose=True, text=None):
- knn = KNeighborsClassifier(n_neighbors=n_neighbors, metric='precomputed')
- knn.fit(d_app, y_app)
- y_pred = knn.predict(d_app)
- y_pred_test = knn.predict(d_test.T)
- perf_app = accuracy_score(y_app, y_pred)
- perf_test = accuracy_score(y_test, y_pred_test)
-
- if (verbose):
- print("Learning accuracy with {} costs : {}".format(text, perf_app))
- print("Test accuracy with {} costs : {}".format(text, perf_test))
-
- return perf_app, perf_test
-
-
- def knn_cv(dataset, kernel_options, trainset=None, n_neighbors=1, n_splits=50, test_size=0.9, verbose=True):
- '''
- Perform a knn classification cross-validation on given dataset.
- '''
- # Gn = dataset.graphs
- y_all = dataset.targets
-
- # compute kernel distances.
- dis_mat = _compute_kernel_distances(dataset, kernel_options, trainset=trainset)
-
-
- rs = ShuffleSplit(n_splits=n_splits, test_size=test_size, random_state=0)
- # train_indices = [[] for _ in range(n_splits)]
- # test_indices = [[] for _ in range(n_splits)]
- # idx_targets = get_same_item_indices(y_all)
- # for key, item in idx_targets.items():
- # i = 0
- # for train_i, test_i in rs.split(item): # @todo: careful when parallel.
- # train_indices[i] += [item[idx] for idx in train_i]
- # test_indices[i] += [item[idx] for idx in test_i]
- # i += 1
-
- accuracies = []
- # for trial in range(len(train_indices)):
- # train_index = train_indices[trial]
- # test_index = test_indices[trial]
- for train_index, test_index in rs.split(y_all):
- # print(train_index, test_index)
- # G_app = [Gn[i] for i in train_index]
- # G_test = [Gn[i] for i in test_index]
- y_app = [y_all[i] for i in train_index]
- y_test = [y_all[i] for i in test_index]
-
- N = len(train_index)
-
- d_app = dis_mat.copy()
- d_app = d_app[train_index,:]
- d_app = d_app[:,train_index]
-
- d_test = np.zeros((N, len(test_index)))
-
- for i in range(N):
- for j in range(len(test_index)):
- d_test[i, j] = dis_mat[train_index[i], test_index[j]]
-
- accuracies.append(knn_classification(d_app, d_test, y_app, y_test, n_neighbors, verbose=verbose, text=''))
-
- results = {}
- results['ave_perf_train'] = np.mean([i[0] for i in accuracies], axis=0)
- results['std_perf_train'] = np.std([i[0] for i in accuracies], axis=0, ddof=1)
- results['ave_perf_test'] = np.mean([i[1] for i in accuracies], axis=0)
- results['std_perf_test'] = np.std([i[1] for i in accuracies], axis=0, ddof=1)
-
- return results
-
-
- def _compute_kernel_distances(dataset, kernel_options, trainset=None):
- graph_kernel = get_graph_kernel_by_name(kernel_options['name'],
- node_labels=dataset.node_labels,
- edge_labels=dataset.edge_labels,
- node_attrs=dataset.node_attrs,
- edge_attrs=dataset.edge_attrs,
- ds_infos=dataset.get_dataset_infos(keys=['directed']),
- kernel_options=kernel_options)
-
- gram_matrix, run_time = graph_kernel.compute(dataset.graphs, **kernel_options)
-
- dis_mat, _, _, _ = graph_kernel.compute_distance_matrix()
-
- if trainset is not None:
- gram_matrix_unnorm = graph_kernel.gram_matrix_unnorm
-
-
- return dis_mat
|