1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
"""
A commandline tool for testing if RDF graphs are isomorpic, i.e. equal
if BNode labels are ignored.
"""
from rdflib import Graph
from rdflib import BNode
from itertools import combinations
class IsomorphicTestableGraph(Graph):
"""
Ported from:
http://www.w3.org/2001/sw/DataAccess/proto-tests/tools/rdfdiff.py
(Sean B Palmer's RDF Graph Isomorphism Tester)
"""
def __init__(self, **kargs):
super(IsomorphicTestableGraph, self).__init__(**kargs)
self.hash = None
def internal_hash(self):
"""
This is defined instead of __hash__ to avoid a circular recursion
scenario with the Memory store for rdflib which requires a hash
lookup in order to return a generator of triples
"""
return hash(tuple(sorted(self.hashtriples())))
def hashtriples(self):
for triple in self:
g = ((isinstance(t, BNode) and self.vhash(t)) or t for t in triple)
yield hash(tuple(g))
def vhash(self, term, done=False):
return tuple(sorted(self.vhashtriples(term, done)))
def vhashtriples(self, term, done):
for t in self:
if term in t:
yield tuple(self.vhashtriple(t, term, done))
def vhashtriple(self, triple, term, done):
for p in range(3):
if not isinstance(triple[p], BNode):
yield triple[p]
elif done or (triple[p] == term):
yield p
else:
yield self.vhash(triple[p], done=True)
def __eq__(self, G):
"""Graph isomorphism testing."""
if not isinstance(G, IsomorphicTestableGraph):
return False
elif len(self) != len(G):
return False
elif list.__eq__(list(self), list(G)):
return True # @@
return self.internal_hash() == G.internal_hash()
def __ne__(self, G):
"""Negative graph isomorphism testing."""
return not self.__eq__(G)
def main():
import sys
from optparse import OptionParser
usage = """usage: %prog [options] file1 file2 ... fileN"""
op = OptionParser(usage=usage)
op.add_option(
"-s",
"--stdin",
action="store_true",
default=False,
help="Load from STDIN as well",
)
op.add_option(
"--format",
default="xml",
dest="inputFormat",
metavar="RDF_FORMAT",
choices=["xml", "trix", "n3", "nt", "rdfa"],
help="The format of the RDF document(s) to compare"
+ "One of 'xml','n3','trix', 'nt', "
+ "or 'rdfa'. The default is %default",
)
(options, args) = op.parse_args()
graphs = []
graph2FName = {}
if options.stdin:
graph = IsomorphicTestableGraph().parse(sys.stdin, format=options.inputFormat)
graphs.append(graph)
graph2FName[graph] = "(STDIN)"
for fn in args:
graph = IsomorphicTestableGraph().parse(fn, format=options.inputFormat)
graphs.append(graph)
graph2FName[graph] = fn
checked = set()
for graph1, graph2 in combinations(graphs, 2):
if (graph1, graph2) not in checked and (graph2, graph1) not in checked:
assert graph1 == graph2, "%s != %s" % (
graph2FName[graph1],
graph2FName[graph2],
)
if __name__ == "__main__":
main()
|