summaryrefslogtreecommitdiff
path: root/test/test_store/test_store_sparqlupdatestore.py
blob: c29a6ac6c472da0b46092883a37c0f18cdc57cb5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
# -*- coding: utf-8 -*-

import re
from test.data import bob, cheese, hates, likes, michel, pizza, tarek
from urllib.request import urlopen

import pytest

from rdflib import BNode, ConjunctiveGraph, Graph, Literal, URIRef

HOST = "http://localhost:3031"
DB = "/db/"

# this assumes SPARQL1.1 query/update endpoints running locally at
# http://localhost:3031/db/
#
# The ConjunctiveGraph tests below require that the SPARQL endpoint renders its
# default graph as the union of all known graphs! This is incompatible with the
# endpoint behavior required by our Dataset tests in test_dataset.py, so you
# need to run a second SPARQL endpoint on a non standard port,
# e.g. fuseki started with:
# ./fuseki-server --port 3031 --memTDB --update --set tdb:unionDefaultGraph=true /db

# THIS WILL DELETE ALL DATA IN THE /db dataset

graphuri = URIRef("urn:example:graph")
othergraphuri = URIRef("urn:example:othergraph")

try:
    assert len(urlopen(HOST).read()) > 0
except:
    pytest.skip(f"{HOST} is unavailable.", allow_module_level=True)


@pytest.fixture
def get_graph():
    longMessage = True
    graph = ConjunctiveGraph("SPARQLUpdateStore")

    root = HOST + DB
    graph.open((root + "sparql", root + "update"))

    # clean out the store
    for c in graph.contexts():
        c.remove((None, None, None))
        assert len(c) == 0

    yield graph

    graph.close()


def test_simple_graph(get_graph):
    graph = get_graph
    g = graph.get_context(graphuri)
    g.add((tarek, likes, pizza))
    g.add((bob, likes, pizza))
    g.add((bob, likes, cheese))

    g2 = graph.get_context(othergraphuri)
    g2.add((michel, likes, pizza))

    assert len(g) == 3, "graph contains 3 triples"
    assert len(g2) == 1, "other graph contains 1 triple"

    r = g.query("SELECT * WHERE { ?s <urn:example:likes> <urn:example:pizza> . }")
    assert len(list(r)) == 2, "two people like pizza"

    r = g.triples((None, likes, pizza))
    assert len(list(r)) == 2, "two people like pizza"

    # Test initBindings
    r = g.query(
        "SELECT * WHERE { ?s <urn:example:likes> <urn:example:pizza> . }",
        initBindings={"s": tarek},
    )
    assert len(list(r)) == 1, "i was asking only about tarek"

    r = g.triples((tarek, likes, pizza))
    assert len(list(r)) == 1, "i was asking only about tarek"

    r = g.triples((tarek, likes, cheese))
    assert len(list(r)) == 0, "tarek doesn't like cheese"

    g2.add((tarek, likes, pizza))
    g.remove((tarek, likes, pizza))
    r = g.query("SELECT * WHERE { ?s <urn:example:likes> <urn:example:pizza> . }")
    assert len(list(r)) == 1, "only bob likes pizza"


def test_conjunctive_default(get_graph):
    graph = get_graph
    g = graph.get_context(graphuri)
    g.add((tarek, likes, pizza))
    g2 = graph.get_context(othergraphuri)
    g2.add((bob, likes, pizza))
    g.add((tarek, hates, cheese))

    assert 2 == len(g), "graph contains 2 triples"

    # the following are actually bad tests as they depend on your endpoint,
    # as pointed out in the sparqlstore.py code:
    #
    # For ConjunctiveGraphs, reading is done from the "default graph" Exactly
    # what this means depends on your endpoint, because SPARQL does not offer a
    # simple way to query the union of all graphs as it would be expected for a
    # ConjuntiveGraph.
    ##
    # Fuseki/TDB has a flag for specifying that the default graph
    # is the union of all graphs (tdb:unionDefaultGraph in the Fuseki config).
    assert (
        len(graph) == 3
    ), "default union graph should contain three triples but contains:\n%s" % list(
        graph
    )

    r = graph.query("SELECT * WHERE { ?s <urn:example:likes> <urn:example:pizza> . }")
    assert len(list(r)) == 2, "two people like pizza"

    r = graph.query(
        "SELECT * WHERE { ?s <urn:example:likes> <urn:example:pizza> . }",
        initBindings={"s": tarek},
    )
    assert len(list(r)) == 1, "i was asking only about tarek"

    r = graph.triples((tarek, likes, pizza))
    assert len(list(r)) == 1, "i was asking only about tarek"

    r = graph.triples((tarek, likes, cheese))
    assert len(list(r)) == 0, "tarek doesn't like cheese"

    g2.remove((bob, likes, pizza))

    r = graph.query("SELECT * WHERE { ?s <urn:example:likes> <urn:example:pizza> . }")
    assert len(list(r)) == 1, "only tarek likes pizza"


def testU_update(get_graph):
    graph = get_graph
    graph.update(
        "INSERT DATA { GRAPH <urn:example:graph> { <urn:example:michel> <urn:example:likes> <urn:example:pizza> . } }"
    )

    g = graph.get_context(graphuri)
    assert 1 == len(g), "graph contains 1 triples"


def testU_update_with_initns(get_graph):
    graph = get_graph
    graph.update(
        "INSERT DATA { GRAPH ns:graph { ns:michel ns:likes ns:pizza . } }",
        initNs={"ns": URIRef("urn:example:")},
    )

    g = graph.get_context(graphuri)
    assert set(g.triples((None, None, None))) == set(
        [(michel, likes, pizza)]
    ), "only michel likes pizza"


def test_update_with_init_bindings(get_graph):
    graph = get_graph
    graph.update(
        "INSERT { GRAPH <urn:example:graph> { ?a ?b ?c . } } WherE { }",
        initBindings={
            "a": URIRef("urn:example:michel"),
            "b": URIRef("urn:example:likes"),
            "c": URIRef("urn:example:pizza"),
        },
    )

    g = graph.get_context(graphuri)
    assert set(g.triples((None, None, None))) == set(
        [(michel, likes, pizza)]
    ), "only michel likes pizza"


def test_update_with_blank_node(get_graph):
    graph = get_graph
    graph.update(
        "INSERT DATA { GRAPH <urn:example:graph> { _:blankA <urn:example:type> <urn:example:Blank> } }"
    )
    g = graph.get_context(graphuri)
    for t in g.triples((None, None, None)):
        assert isinstance(t[0], BNode)
        assert t[1].n3() == "<urn:example:type>"
        assert t[2].n3() == "<urn:example:Blank>"


def test_updateW_with_blank_node_serialize_and_parse(get_graph):
    graph = get_graph
    graph.update(
        "INSERT DATA { GRAPH <urn:example:graph> { _:blankA <urn:example:type> <urn:example:Blank> } }"
    )
    g = graph.get_context(graphuri)
    string = g.serialize(format="ntriples")
    raised = False
    try:
        Graph().parse(data=string, format="ntriples")
    except Exception as e:
        raised = True
    assert raised is False, "Exception raised when parsing: " + string


def test_multiple_update_with_init_bindings(get_graph):
    graph = get_graph
    graph.update(
        "INSERT { GRAPH <urn:example:graph> { ?a ?b ?c . } } WHERE { };"
        "INSERT { GRAPH <urn:example:graph> { ?d ?b ?c . } } WHERE { }",
        initBindings={
            "a": URIRef("urn:example:michel"),
            "b": URIRef("urn:example:likes"),
            "c": URIRef("urn:example:pizza"),
            "d": URIRef("urn:example:bob"),
        },
    )

    g = graph.get_context(graphuri)
    assert set(g.triples((None, None, None))) == set(
        [(michel, likes, pizza), (bob, likes, pizza)]
    ), "michel and bob like pizza"


def test_named_graph_update(get_graph):
    graph = get_graph
    g = graph.get_context(graphuri)
    r1 = "INSERT DATA { <urn:example:michel> <urn:example:likes> <urn:example:pizza> }"
    g.update(r1)
    assert set(g.triples((None, None, None))) == set(
        [(michel, likes, pizza)]
    ), "only michel likes pizza"

    r2 = (
        "DELETE { <urn:example:michel> <urn:example:likes> <urn:example:pizza> } "
        + "INSERT { <urn:example:bob> <urn:example:likes> <urn:example:pizza> } WHERE {}"
    )
    g.update(r2)
    assert set(g.triples((None, None, None))) == set(
        [(bob, likes, pizza)]
    ), "only bob likes pizza"

    says = URIRef("urn:says")

    # Strings with unbalanced curly braces
    tricky_strs = ["With an unbalanced curly brace %s " % brace for brace in ["{", "}"]]
    for tricky_str in tricky_strs:
        r3 = (
            """INSERT { ?b <urn:says> "%s" }
        WHERE { ?b <urn:example:likes> <urn:example:pizza>} """
            % tricky_str
        )
        g.update(r3)

    values = set()
    for v in g.objects(bob, says):
        values.add(str(v))
    assert values == set(tricky_strs)

    # Complicated Strings
    r4strings = []
    r4strings.append(r'''"1: adfk { ' \\\" \" { "''')
    r4strings.append(r'''"2: adfk } <foo> #éï \\"''')

    r4strings.append(r"""'3: adfk { " \\\' \' { '""")
    r4strings.append(r"""'4: adfk } <foo> #éï \\'""")

    r4strings.append(r'''"""5: adfk { ' \\\" \" { """''')
    r4strings.append(r'''"""6: adfk } <foo> #éï \\"""''')
    r4strings.append('"""7: ad adsfj \n { \n sadfj"""')

    r4strings.append(r"""'''8: adfk { " \\\' \' { '''""")
    r4strings.append(r"""'''9: adfk } <foo> #éï \\'''""")
    r4strings.append("'''10: ad adsfj \n { \n sadfj'''")

    r4 = "\n".join(
        ["INSERT DATA { <urn:example:michel> <urn:says> %s } ;" % s for s in r4strings]
    )
    g.update(r4)
    values = set()
    for v in g.objects(michel, says):
        values.add(str(v))
    assert values == set(
        [
            re.sub(
                r"\\(.)",
                r"\1",
                re.sub(r"^'''|'''$|^'|'$|" + r'^"""|"""$|^"|"$', r"", s),
            )
            for s in r4strings
        ]
    )

    # IRI Containing ' or #
    # The fragment identifier must not be misinterpreted as a comment
    # (commenting out the end of the block).
    # The ' must not be interpreted as the start of a string, causing the }
    # in the literal to be identified as the end of the block.
    r5 = """INSERT DATA { <urn:example:michel> <urn:example:hates> <urn:example:foo'bar?baz;a=1&b=2#fragment>, "'}" }"""

    g.update(r5)
    values = set()
    for v in g.objects(michel, hates):
        values.add(str(v))
    assert values == set(["urn:example:foo'bar?baz;a=1&b=2#fragment", "'}"])

    # Comments
    r6 = """
        INSERT DATA {
            <urn:example:bob> <urn:example:hates> <urn:example:bob> . # No closing brace: }
            <urn:example:bob> <urn:example:hates> <urn:example:michel>.
        }
    #Final { } comment"""

    g.update(r6)
    values = set()
    for v in g.objects(bob, hates):
        values.add(v)
    assert values == set([bob, michel])


def test_named_graph_update_with_init_bindings(get_graph):
    graph = get_graph
    g = graph.get_context(graphuri)
    r = "INSERT { ?a ?b ?c } WHERE {}"
    g.update(r, initBindings={"a": michel, "b": likes, "c": pizza})
    assert set(g.triples((None, None, None))) == set(
        [(michel, likes, pizza)]
    ), "only michel likes pizza"


def test_empty_named_graph(get_graph):
    graph = get_graph
    empty_graph_iri = "urn:empty-graph-1"
    graph.update("CREATE GRAPH <%s>" % empty_graph_iri)
    named_graphs = [
        str(r[0]) for r in graph.query("SELECT ?name WHERE { GRAPH ?name {} }")
    ]
    # Some SPARQL endpoint backends (like TDB) are not able to find empty named graphs
    # (at least with this query)
    if empty_graph_iri in named_graphs:
        assert empty_graph_iri in [str(g.identifier) for g in graph.contexts()]


def test_empty_literal(get_graph):
    graph = get_graph
    # test for https://github.com/RDFLib/rdflib/issues/457
    # also see test_issue457.py which is sparql store independent!
    g = graph.get_context(graphuri)
    g.add(
        (
            URIRef("http://example.com/s"),
            URIRef("http://example.com/p"),
            Literal(""),
        )
    )

    o = tuple(g)[0][2]
    assert o == Literal(""), repr(o)