summaryrefslogtreecommitdiff
path: root/utils
diff options
context:
space:
mode:
authorSam Thursfield <sam@afuera.me.uk>2020-05-01 19:05:23 +0200
committerSam Thursfield <sam@afuera.me.uk>2020-05-02 01:22:23 +0200
commitb2f922a2cee81fc6d219f2788501a10535a3817e (patch)
tree3f1c7ce88216052626bdf2ddd673d4b974fdaf37 /utils
parent68fe90a7b8d0d6b5c3d6f2237e4ddfc8123f7e5a (diff)
downloadtracker-b2f922a2cee81fc6d219f2788501a10535a3817e.tar.gz
trackertestutils: Make the graph parameter explicit
Database queries should always specify the graph(s) they want to query from. This is how we enforce data isolation and it's also faster than running a query against the union of all graphs.
Diffstat (limited to 'utils')
-rw-r--r--utils/trackertestutils/helpers.py73
1 files changed, 41 insertions, 32 deletions
diff --git a/utils/trackertestutils/helpers.py b/utils/trackertestutils/helpers.py
index afcee7e35..4de36d522 100644
--- a/utils/trackertestutils/helpers.py
+++ b/utils/trackertestutils/helpers.py
@@ -86,8 +86,8 @@ class await_insert():
Use like this:
- expected = 'a nfo:Document; nie:url <test://url>'
- with self.tracker.await_update(expected) as resource:
+ expected = 'a nfo:Document; nie:isStoredAs <test://url>'
+ with self.tracker.await_update(DOCUMENTS_GRAPH, expected) as resource:
# Create or update a file that's indexed by tracker-miner-fs.
#
# The context manager will not exit from the 'with' block until the
@@ -100,9 +100,10 @@ class await_insert():
ensure_resource() if you just want to ensure that some data is present.
"""
- def __init__(self, conn, predicates, timeout=DEFAULT_TIMEOUT,
+ def __init__(self, conn, graph, predicates, timeout=DEFAULT_TIMEOUT,
_check_inserted=True):
self.conn = conn
+ self.graph = graph
self.predicates = predicates
self.timeout = timeout
self._check_inserted = _check_inserted
@@ -117,8 +118,9 @@ class await_insert():
if self._check_inserted:
query_check = ' '.join([
- 'SELECT ?urn tracker:id(?urn) '
- ' WHERE { '
+ 'SELECT ?urn tracker:id(?urn) ',
+ f' FROM NAMED <{self.graph}> ',
+ ' WHERE { ',
' ?urn a rdfs:Resource ; ',
self.predicates,
'}'
@@ -128,8 +130,9 @@ class await_insert():
raise AwaitException("Expected data is already present in the store.")
query_filtered = ' '.join([
- 'SELECT ?urn tracker:id(?urn) '
- ' WHERE { '
+ 'SELECT ?urn tracker:id(?urn) ',
+ ' FROM NAMED <%s> ',
+ ' WHERE { ',
' ?urn a rdfs:Resource ; ',
self.predicates,
#' FILTER (tracker:id(?urn) = ~id) '
@@ -144,10 +147,10 @@ class await_insert():
for event in events:
if event.get_event_type() in [Tracker.NotifierEventType.CREATE,
Tracker.NotifierEventType.UPDATE]:
- log.debug("Processing %s event for id %s", event.get_event_type(), event.get_id())
+ log.debug("Processing %s event for id %s", event.get_event_type().value_nick, event.get_id())
#stmt.bind_int('~id', event.get_id())
#cursor = stmt.execute(None)
- stmt = query_filtered % event.get_id()
+ stmt = query_filtered % (self.graph, event.get_id())
log.debug("Running %s", stmt)
cursor = self.conn.query(stmt)
@@ -187,15 +190,16 @@ class await_update():
Use like this:
- before = 'nie:url <test://url1>'
- after = 'nie:url <test://url2>'
+ before = 'nie:isStoredAs <test://url1>'
+ after = 'nie:isStoredAs <test://url2>'
with self.tracker.await_update(resource_id, before, after):
# Trigger an update of the data.
"""
- def __init__(self, conn, resource_id, before_predicates, after_predicates,
+ def __init__(self, conn, graph, resource_id, before_predicates, after_predicates,
timeout=DEFAULT_TIMEOUT):
self.conn = conn
+ self.graph = graph
self.resource_id = resource_id
self.before_predicates = before_predicates
self.after_predicates = after_predicates
@@ -210,21 +214,23 @@ class await_update():
query_before = ' '.join([
'SELECT nie:isStoredAs(?urn) ?urn tracker:id(?urn) '
+ ' FROM NAMED <%s> ',
' WHERE { '
' ?urn a rdfs:Resource ; ',
self.before_predicates,
' . FILTER (tracker:id(?urn) = %s) '
'}'
- ]) % self.resource_id
+ ]) % (self.graph, self.resource_id)
cursor = self.conn.query(query_before)
if not cursor.next():
raise AwaitException("Expected data is not present in the store.")
self.stored_as = cursor.get_string(0)[0]
- query_on_create = 'SELECT nie:isStoredAs(tracker:uri(%s)) { }'
+ query_on_create = 'SELECT FROM NAMED <%s> nie:isStoredAs(tracker:uri(%s)) { }'
query_after = ' '.join([
'SELECT ?urn tracker:id(?urn) '
+ ' FROM NAMED <%s> ',
' WHERE { '
' ?urn a rdfs:Resource ; ',
self.after_predicates,
@@ -234,7 +240,7 @@ class await_update():
def match_cb(notifier, service, graph, events):
for event in events:
- log.debug("Processing %s event for id %s", event.get_event_type(), event.get_id())
+ log.debug("Processing %s event for id %s", event.get_event_type().value_nick, event.get_id())
if event.get_event_type() == Tracker.NotifierEventType.DELETE and event.get_id() == self.resource_id:
self.resource_deleted = True
elif event.get_event_type() in [Tracker.NotifierEventType.CREATE,
@@ -242,12 +248,12 @@ class await_update():
if event.get_event_type() == Tracker.NotifierEventType.CREATE:
if not self.resource_deleted:
raise AwaitException("Received insert with no prior delete")
- cursor = self.conn.query(query_on_create % event.get_id())
+ cursor = self.conn.query(query_on_create % (self.graph, event.get_id()))
if cursor.next() and cursor.get_string(0)[0] == self.stored_as:
self.resource_id = event.get_id()
- log.debug("Running %s", query_after % event.get_id())
- cursor = self.conn.query(query_after % event.get_id())
+ log.debug("Running %s", query_after % (self.graph, event.get_id()))
+ cursor = self.conn.query(query_after % (self.graph, event.get_id()))
if cursor.next():
log.debug("Query matched!")
@@ -279,8 +285,9 @@ class await_update():
class await_delete():
"""Context manager to await removal of a resource."""
- def __init__(self, conn, resource_id, timeout=DEFAULT_TIMEOUT):
+ def __init__(self, conn, graph, resource_id, timeout=DEFAULT_TIMEOUT):
self.conn = conn
+ self.graph = graph
self.resource_id = resource_id
self.timeout = timeout
@@ -292,13 +299,14 @@ class await_delete():
log.info("Awaiting deletion of resource id %s", self.resource_id)
query_check = ' '.join([
- 'SELECT ?urn tracker:id(?urn) '
- ' WHERE { '
+ 'SELECT ?urn tracker:id(?urn) ',
+ 'FROM NAMED <%s> ',
+ ' WHERE { ',
' ?urn a rdfs:Resource ; ',
' . FILTER (tracker:id(?urn) = %s) '
'}'
])
- cursor = self.conn.query(query_check % self.resource_id)
+ cursor = self.conn.query(query_check % (self.graph, self.resource_id))
if not cursor.next():
raise AwaitException(
"Resource with id %i isn't present in the store.", self.resource_id)
@@ -348,31 +356,32 @@ class StoreHelper():
self.conn = conn
- def await_insert(self, predicates, timeout=DEFAULT_TIMEOUT):
+ def await_insert(self, graph, predicates, timeout=DEFAULT_TIMEOUT):
"""Context manager that blocks until a resource is inserted."""
- return await_insert(self.conn, predicates, timeout)
+ return await_insert(self.conn, graph, predicates, timeout)
- def await_update(self, resource_id, before_predicates, after_predicates,
+ def await_update(self, graph, resource_id, before_predicates, after_predicates,
timeout=DEFAULT_TIMEOUT):
"""Context manager that blocks until a resource is updated."""
- return await_update(self.conn, resource_id, before_predicates,
+ return await_update(self.conn, graph, resource_id, before_predicates,
after_predicates, timeout)
- def await_delete(self, resource_id, timeout=DEFAULT_TIMEOUT):
+ def await_delete(self, graph, resource_id, timeout=DEFAULT_TIMEOUT):
"""Context manager that blocks until a resource is deleted."""
- return await_delete(self.conn, resource_id, timeout)
+ return await_delete(self.conn, graph, resource_id, timeout)
- def ensure_resource(self, predicates, timeout=DEFAULT_TIMEOUT):
- """Ensure that a resource matching 'predicates' exists.
+ def ensure_resource(self, graph, predicates, timeout=DEFAULT_TIMEOUT):
+ """Ensure that a resource matching 'predicates' exists in 'graph'.
This function will block if the resource is not yet created.
"""
- await_ctx_mgr = await_insert(self.conn, predicates, timeout, _check_inserted=False)
+ await_ctx_mgr = await_insert(self.conn, graph, predicates, timeout, _check_inserted=False)
with await_ctx_mgr as resource:
# Check if the data was committed *before* the function was called.
query_initial = ' '.join([
'SELECT ?urn tracker:id(?urn) '
+ f' FROM NAMED <{graph}>',
' WHERE { '
' ?urn a rdfs:Resource ; ',
predicates,
@@ -432,7 +441,7 @@ class StoreHelper():
Get the internal ID for a given resource, identified by URL.
"""
result = self.query(
- 'SELECT tracker:id(?r) WHERE { ?r nie:isStoredAs/nie:url "%s" }' % url)
+ 'SELECT tracker:id(?r) WHERE { ?r nie:isStoredAs "%s" }' % url)
if len(result) == 1:
return int(result[0][0])
elif len(result) == 0: