You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_graph_kernels.py 12 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. """Tests of graph kernels.
  2. """
  3. import pytest
  4. import multiprocessing
  5. def chooseDataset(ds_name):
  6. """Choose dataset according to name.
  7. """
  8. from gklearn.utils import Dataset
  9. dataset = Dataset()
  10. # no node labels (and no edge labels).
  11. if ds_name == 'Alkane':
  12. dataset.load_predefined_dataset(ds_name)
  13. dataset.trim_dataset(edge_required=False)
  14. irrelevant_labels = {'node_attrs': ['x', 'y', 'z'], 'edge_labels': ['bond_stereo']}
  15. dataset.remove_labels(**irrelevant_labels)
  16. # node symbolic labels.
  17. elif ds_name == 'Acyclic':
  18. dataset.load_predefined_dataset(ds_name)
  19. dataset.trim_dataset(edge_required=False)
  20. irrelevant_labels = {'node_attrs': ['x', 'y', 'z'], 'edge_labels': ['bond_stereo']}
  21. dataset.remove_labels(**irrelevant_labels)
  22. # node non-symbolic labels.
  23. elif ds_name == 'Letter-med':
  24. dataset.load_predefined_dataset(ds_name)
  25. dataset.trim_dataset(edge_required=False)
  26. # node symbolic and non-symbolic labels (and edge symbolic labels).
  27. elif ds_name == 'AIDS':
  28. dataset.load_predefined_dataset(ds_name)
  29. dataset.trim_dataset(edge_required=False)
  30. # edge non-symbolic labels (no node labels).
  31. elif ds_name == 'Fingerprint_edge':
  32. dataset.load_predefined_dataset('Fingerprint')
  33. dataset.trim_dataset(edge_required=True)
  34. irrelevant_labels = {'edge_attrs': ['orient', 'angle']}
  35. dataset.remove_labels(**irrelevant_labels)
  36. # edge non-symbolic labels (and node non-symbolic labels).
  37. elif ds_name == 'Fingerprint':
  38. dataset.load_predefined_dataset(ds_name)
  39. dataset.trim_dataset(edge_required=True)
  40. # edge symbolic and non-symbolic labels (and node symbolic and non-symbolic labels).
  41. elif ds_name == 'Cuneiform':
  42. dataset.load_predefined_dataset(ds_name)
  43. dataset.trim_dataset(edge_required=True)
  44. dataset.cut_graphs(range(0, 3))
  45. return dataset
  46. @pytest.mark.parametrize('ds_name', ['Alkane', 'AIDS'])
  47. @pytest.mark.parametrize('weight,compute_method', [(0.01, 'geo'), (1, 'exp')])
  48. @pytest.mark.parametrize('parallel', ['imap_unordered', None])
  49. def test_CommonWalk(ds_name, parallel, weight, compute_method):
  50. """Test common walk kernel.
  51. """
  52. from gklearn.kernels import CommonWalk
  53. import networkx as nx
  54. dataset = chooseDataset(ds_name)
  55. dataset.load_graphs([g for g in dataset.graphs if nx.number_of_nodes(g) > 1])
  56. try:
  57. graph_kernel = CommonWalk(node_labels=dataset.node_labels,
  58. edge_labels=dataset.edge_labels,
  59. ds_infos=dataset.get_dataset_infos(keys=['directed']),
  60. weight=weight,
  61. compute_method=compute_method)
  62. gram_matrix, run_time = graph_kernel.compute(dataset.graphs,
  63. parallel=parallel, n_jobs=multiprocessing.cpu_count(), verbose=True)
  64. kernel_list, run_time = graph_kernel.compute(dataset.graphs[0], dataset.graphs[1:],
  65. parallel=parallel, n_jobs=multiprocessing.cpu_count(), verbose=True)
  66. kernel, run_time = graph_kernel.compute(dataset.graphs[0], dataset.graphs[1],
  67. parallel=parallel, n_jobs=multiprocessing.cpu_count(), verbose=True)
  68. except Exception as exception:
  69. assert False, exception
  70. @pytest.mark.parametrize('ds_name', ['Alkane', 'AIDS'])
  71. @pytest.mark.parametrize('remove_totters', [False]) #[True, False])
  72. @pytest.mark.parametrize('parallel', ['imap_unordered', None])
  73. def test_Marginalized(ds_name, parallel, remove_totters):
  74. """Test marginalized kernel.
  75. """
  76. from gklearn.kernels import Marginalized
  77. dataset = chooseDataset(ds_name)
  78. try:
  79. graph_kernel = Marginalized(node_labels=dataset.node_labels,
  80. edge_labels=dataset.edge_labels,
  81. ds_infos=dataset.get_dataset_infos(keys=['directed']),
  82. p_quit=0.5,
  83. n_iteration=2,
  84. remove_totters=remove_totters)
  85. gram_matrix, run_time = graph_kernel.compute(dataset.graphs,
  86. parallel=parallel, n_jobs=multiprocessing.cpu_count(), verbose=True)
  87. kernel_list, run_time = graph_kernel.compute(dataset.graphs[0], dataset.graphs[1:],
  88. parallel=parallel, n_jobs=multiprocessing.cpu_count(), verbose=True)
  89. kernel, run_time = graph_kernel.compute(dataset.graphs[0], dataset.graphs[1],
  90. parallel=parallel, n_jobs=multiprocessing.cpu_count(), verbose=True)
  91. except Exception as exception:
  92. assert False, exception
  93. # @pytest.mark.parametrize(
  94. # 'compute_method,ds_name,sub_kernel',
  95. # [
  96. # # ('sylvester', 'Alkane', None),
  97. # # ('conjugate', 'Alkane', None),
  98. # # ('conjugate', 'AIDS', None),
  99. # # ('fp', 'Alkane', None),
  100. # # ('fp', 'AIDS', None),
  101. # ('spectral', 'Alkane', 'exp'),
  102. # ('spectral', 'Alkane', 'geo'),
  103. # ]
  104. # )
  105. # #@pytest.mark.parametrize('parallel', ['imap_unordered', None])
  106. # def test_randomwalkkernel(ds_name, compute_method, sub_kernel):
  107. # """Test random walk kernel kernel.
  108. # """
  109. # from gklearn.kernels.randomWalkKernel import randomwalkkernel
  110. # from gklearn.utils.kernels import deltakernel, gaussiankernel, kernelproduct
  111. # import functools
  112. # Gn, y = chooseDataset(ds_name)
  113. # mixkernel = functools.partial(kernelproduct, deltakernel, gaussiankernel)
  114. # sub_kernels = [{'symb': deltakernel, 'nsymb': gaussiankernel, 'mix': mixkernel}]
  115. # try:
  116. # Kmatrix, run_time, idx = randomwalkkernel(Gn,
  117. # compute_method=compute_method,
  118. # weight=1e-3,
  119. # p=None,
  120. # q=None,
  121. # edge_weight=None,
  122. # node_kernels=sub_kernels,
  123. # edge_kernels=sub_kernels,
  124. # node_label='atom',
  125. # edge_label='bond_type',
  126. # sub_kernel=sub_kernel,
  127. # # parallel=parallel,
  128. # n_jobs=multiprocessing.cpu_count(),
  129. # verbose=True)
  130. # except Exception as exception:
  131. # assert False, exception
  132. @pytest.mark.parametrize('ds_name', ['Alkane', 'Acyclic', 'Letter-med', 'AIDS', 'Fingerprint'])
  133. @pytest.mark.parametrize('parallel', ['imap_unordered', None])
  134. def test_ShortestPath(ds_name, parallel):
  135. """Test shortest path kernel.
  136. """
  137. from gklearn.kernels import ShortestPath
  138. from gklearn.utils.kernels import deltakernel, gaussiankernel, kernelproduct
  139. import functools
  140. dataset = chooseDataset(ds_name)
  141. mixkernel = functools.partial(kernelproduct, deltakernel, gaussiankernel)
  142. sub_kernels = {'symb': deltakernel, 'nsymb': gaussiankernel, 'mix': mixkernel}
  143. try:
  144. graph_kernel = ShortestPath(node_labels=dataset.node_labels,
  145. node_attrs=dataset.node_attrs,
  146. ds_infos=dataset.get_dataset_infos(keys=['directed']),
  147. node_kernels=sub_kernels)
  148. gram_matrix, run_time = graph_kernel.compute(dataset.graphs,
  149. parallel=parallel, n_jobs=multiprocessing.cpu_count(), verbose=True)
  150. kernel_list, run_time = graph_kernel.compute(dataset.graphs[0], dataset.graphs[1:],
  151. parallel=parallel, n_jobs=multiprocessing.cpu_count(), verbose=True)
  152. kernel, run_time = graph_kernel.compute(dataset.graphs[0], dataset.graphs[1],
  153. parallel=parallel, n_jobs=multiprocessing.cpu_count(), verbose=True)
  154. except Exception as exception:
  155. assert False, exception
  156. #@pytest.mark.parametrize('ds_name', ['Alkane', 'Acyclic', 'Letter-med', 'AIDS', 'Fingerprint'])
  157. @pytest.mark.parametrize('ds_name', ['Alkane', 'Acyclic', 'Letter-med', 'AIDS', 'Fingerprint', 'Fingerprint_edge', 'Cuneiform'])
  158. @pytest.mark.parametrize('parallel', ['imap_unordered', None])
  159. def test_StructuralSP(ds_name, parallel):
  160. """Test structural shortest path kernel.
  161. """
  162. from gklearn.kernels import StructuralSP
  163. from gklearn.utils.kernels import deltakernel, gaussiankernel, kernelproduct
  164. import functools
  165. dataset = chooseDataset(ds_name)
  166. mixkernel = functools.partial(kernelproduct, deltakernel, gaussiankernel)
  167. sub_kernels = {'symb': deltakernel, 'nsymb': gaussiankernel, 'mix': mixkernel}
  168. try:
  169. graph_kernel = StructuralSP(node_labels=dataset.node_labels,
  170. edge_labels=dataset.edge_labels,
  171. node_attrs=dataset.node_attrs,
  172. edge_attrs=dataset.edge_attrs,
  173. ds_infos=dataset.get_dataset_infos(keys=['directed']),
  174. node_kernels=sub_kernels,
  175. edge_kernels=sub_kernels)
  176. gram_matrix, run_time = graph_kernel.compute(dataset.graphs,
  177. parallel=parallel, n_jobs=multiprocessing.cpu_count(), verbose=True)
  178. kernel_list, run_time = graph_kernel.compute(dataset.graphs[0], dataset.graphs[1:],
  179. parallel=parallel, n_jobs=multiprocessing.cpu_count(), verbose=True)
  180. kernel, run_time = graph_kernel.compute(dataset.graphs[0], dataset.graphs[1],
  181. parallel=parallel, n_jobs=multiprocessing.cpu_count(), verbose=True)
  182. except Exception as exception:
  183. assert False, exception
  184. @pytest.mark.parametrize('ds_name', ['Alkane', 'AIDS'])
  185. @pytest.mark.parametrize('parallel', ['imap_unordered', None])
  186. #@pytest.mark.parametrize('k_func', ['MinMax', 'tanimoto', None])
  187. @pytest.mark.parametrize('k_func', ['MinMax', 'tanimoto'])
  188. @pytest.mark.parametrize('compute_method', ['trie', 'naive'])
  189. def test_PathUpToH(ds_name, parallel, k_func, compute_method):
  190. """Test path kernel up to length $h$.
  191. """
  192. from gklearn.kernels import PathUpToH
  193. dataset = chooseDataset(ds_name)
  194. try:
  195. graph_kernel = PathUpToH(node_labels=dataset.node_labels,
  196. edge_labels=dataset.edge_labels,
  197. ds_infos=dataset.get_dataset_infos(keys=['directed']),
  198. depth=2, k_func=k_func, compute_method=compute_method)
  199. gram_matrix, run_time = graph_kernel.compute(dataset.graphs,
  200. parallel=parallel, n_jobs=multiprocessing.cpu_count(), verbose=True)
  201. kernel_list, run_time = graph_kernel.compute(dataset.graphs[0], dataset.graphs[1:],
  202. parallel=parallel, n_jobs=multiprocessing.cpu_count(), verbose=True)
  203. kernel, run_time = graph_kernel.compute(dataset.graphs[0], dataset.graphs[1],
  204. parallel=parallel, n_jobs=multiprocessing.cpu_count(), verbose=True)
  205. except Exception as exception:
  206. assert False, exception
  207. @pytest.mark.parametrize('ds_name', ['Alkane', 'AIDS'])
  208. @pytest.mark.parametrize('parallel', ['imap_unordered', None])
  209. def test_Treelet(ds_name, parallel):
  210. """Test treelet kernel.
  211. """
  212. from gklearn.kernels import Treelet
  213. from gklearn.utils.kernels import polynomialkernel
  214. import functools
  215. dataset = chooseDataset(ds_name)
  216. pkernel = functools.partial(polynomialkernel, d=2, c=1e5)
  217. try:
  218. graph_kernel = Treelet(node_labels=dataset.node_labels,
  219. edge_labels=dataset.edge_labels,
  220. ds_infos=dataset.get_dataset_infos(keys=['directed']),
  221. sub_kernel=pkernel)
  222. gram_matrix, run_time = graph_kernel.compute(dataset.graphs,
  223. parallel=parallel, n_jobs=multiprocessing.cpu_count(), verbose=True)
  224. kernel_list, run_time = graph_kernel.compute(dataset.graphs[0], dataset.graphs[1:],
  225. parallel=parallel, n_jobs=multiprocessing.cpu_count(), verbose=True)
  226. kernel, run_time = graph_kernel.compute(dataset.graphs[0], dataset.graphs[1],
  227. parallel=parallel, n_jobs=multiprocessing.cpu_count(), verbose=True)
  228. except Exception as exception:
  229. assert False, exception
  230. @pytest.mark.parametrize('ds_name', ['Acyclic'])
  231. #@pytest.mark.parametrize('base_kernel', ['subtree', 'sp', 'edge'])
  232. # @pytest.mark.parametrize('base_kernel', ['subtree'])
  233. @pytest.mark.parametrize('parallel', ['imap_unordered', None])
  234. def test_WLSubtree(ds_name, parallel):
  235. """Test Weisfeiler-Lehman subtree kernel.
  236. """
  237. from gklearn.kernels import WLSubtree
  238. dataset = chooseDataset(ds_name)
  239. try:
  240. graph_kernel = WLSubtree(node_labels=dataset.node_labels,
  241. edge_labels=dataset.edge_labels,
  242. ds_infos=dataset.get_dataset_infos(keys=['directed']),
  243. height=2)
  244. gram_matrix, run_time = graph_kernel.compute(dataset.graphs,
  245. parallel=parallel, n_jobs=multiprocessing.cpu_count(), verbose=True)
  246. kernel_list, run_time = graph_kernel.compute(dataset.graphs[0], dataset.graphs[1:],
  247. parallel=parallel, n_jobs=multiprocessing.cpu_count(), verbose=True)
  248. kernel, run_time = graph_kernel.compute(dataset.graphs[0], dataset.graphs[1],
  249. parallel=parallel, n_jobs=multiprocessing.cpu_count(), verbose=True)
  250. except Exception as exception:
  251. assert False, exception
  252. if __name__ == "__main__":
  253. # test_spkernel('Alkane', 'imap_unordered')
  254. test_StructuralSP('Fingerprint_edge', 'imap_unordered')

A Python package for graph kernels, graph edit distances and graph pre-image problem.