From b2f922a2cee81fc6d219f2788501a10535a3817e Mon Sep 17 00:00:00 2001 From: Sam Thursfield Date: Fri, 1 May 2020 19:05:23 +0200 Subject: trackertestutils: Make the graph parameter explicit Database queries should always specify the graph(s) they want to query from. This is how we enforce data isolation and it's also faster than running a query against the union of all graphs. --- utils/trackertestutils/helpers.py | 73 ++++++++++++++++++++++----------------- 1 file changed, 41 insertions(+), 32 deletions(-) (limited to 'utils') diff --git a/utils/trackertestutils/helpers.py b/utils/trackertestutils/helpers.py index afcee7e35..4de36d522 100644 --- a/utils/trackertestutils/helpers.py +++ b/utils/trackertestutils/helpers.py @@ -86,8 +86,8 @@ class await_insert(): Use like this: - expected = 'a nfo:Document; nie:url ' - with self.tracker.await_update(expected) as resource: + expected = 'a nfo:Document; nie:isStoredAs ' + with self.tracker.await_update(DOCUMENTS_GRAPH, expected) as resource: # Create or update a file that's indexed by tracker-miner-fs. # # The context manager will not exit from the 'with' block until the @@ -100,9 +100,10 @@ class await_insert(): ensure_resource() if you just want to ensure that some data is present. """ - def __init__(self, conn, predicates, timeout=DEFAULT_TIMEOUT, + def __init__(self, conn, graph, predicates, timeout=DEFAULT_TIMEOUT, _check_inserted=True): self.conn = conn + self.graph = graph self.predicates = predicates self.timeout = timeout self._check_inserted = _check_inserted @@ -117,8 +118,9 @@ class await_insert(): if self._check_inserted: query_check = ' '.join([ - 'SELECT ?urn tracker:id(?urn) ' - ' WHERE { ' + 'SELECT ?urn tracker:id(?urn) ', + f' FROM NAMED <{self.graph}> ', + ' WHERE { ', ' ?urn a rdfs:Resource ; ', self.predicates, '}' @@ -128,8 +130,9 @@ class await_insert(): raise AwaitException("Expected data is already present in the store.") query_filtered = ' '.join([ - 'SELECT ?urn tracker:id(?urn) ' - ' WHERE { ' + 'SELECT ?urn tracker:id(?urn) ', + ' FROM NAMED <%s> ', + ' WHERE { ', ' ?urn a rdfs:Resource ; ', self.predicates, #' FILTER (tracker:id(?urn) = ~id) ' @@ -144,10 +147,10 @@ class await_insert(): for event in events: if event.get_event_type() in [Tracker.NotifierEventType.CREATE, Tracker.NotifierEventType.UPDATE]: - log.debug("Processing %s event for id %s", event.get_event_type(), event.get_id()) + log.debug("Processing %s event for id %s", event.get_event_type().value_nick, event.get_id()) #stmt.bind_int('~id', event.get_id()) #cursor = stmt.execute(None) - stmt = query_filtered % event.get_id() + stmt = query_filtered % (self.graph, event.get_id()) log.debug("Running %s", stmt) cursor = self.conn.query(stmt) @@ -187,15 +190,16 @@ class await_update(): Use like this: - before = 'nie:url ' - after = 'nie:url ' + before = 'nie:isStoredAs ' + after = 'nie:isStoredAs ' with self.tracker.await_update(resource_id, before, after): # Trigger an update of the data. """ - def __init__(self, conn, resource_id, before_predicates, after_predicates, + def __init__(self, conn, graph, resource_id, before_predicates, after_predicates, timeout=DEFAULT_TIMEOUT): self.conn = conn + self.graph = graph self.resource_id = resource_id self.before_predicates = before_predicates self.after_predicates = after_predicates @@ -210,21 +214,23 @@ class await_update(): query_before = ' '.join([ 'SELECT nie:isStoredAs(?urn) ?urn tracker:id(?urn) ' + ' FROM NAMED <%s> ', ' WHERE { ' ' ?urn a rdfs:Resource ; ', self.before_predicates, ' . FILTER (tracker:id(?urn) = %s) ' '}' - ]) % self.resource_id + ]) % (self.graph, self.resource_id) cursor = self.conn.query(query_before) if not cursor.next(): raise AwaitException("Expected data is not present in the store.") self.stored_as = cursor.get_string(0)[0] - query_on_create = 'SELECT nie:isStoredAs(tracker:uri(%s)) { }' + query_on_create = 'SELECT FROM NAMED <%s> nie:isStoredAs(tracker:uri(%s)) { }' query_after = ' '.join([ 'SELECT ?urn tracker:id(?urn) ' + ' FROM NAMED <%s> ', ' WHERE { ' ' ?urn a rdfs:Resource ; ', self.after_predicates, @@ -234,7 +240,7 @@ class await_update(): def match_cb(notifier, service, graph, events): for event in events: - log.debug("Processing %s event for id %s", event.get_event_type(), event.get_id()) + log.debug("Processing %s event for id %s", event.get_event_type().value_nick, event.get_id()) if event.get_event_type() == Tracker.NotifierEventType.DELETE and event.get_id() == self.resource_id: self.resource_deleted = True elif event.get_event_type() in [Tracker.NotifierEventType.CREATE, @@ -242,12 +248,12 @@ class await_update(): if event.get_event_type() == Tracker.NotifierEventType.CREATE: if not self.resource_deleted: raise AwaitException("Received insert with no prior delete") - cursor = self.conn.query(query_on_create % event.get_id()) + cursor = self.conn.query(query_on_create % (self.graph, event.get_id())) if cursor.next() and cursor.get_string(0)[0] == self.stored_as: self.resource_id = event.get_id() - log.debug("Running %s", query_after % event.get_id()) - cursor = self.conn.query(query_after % event.get_id()) + log.debug("Running %s", query_after % (self.graph, event.get_id())) + cursor = self.conn.query(query_after % (self.graph, event.get_id())) if cursor.next(): log.debug("Query matched!") @@ -279,8 +285,9 @@ class await_update(): class await_delete(): """Context manager to await removal of a resource.""" - def __init__(self, conn, resource_id, timeout=DEFAULT_TIMEOUT): + def __init__(self, conn, graph, resource_id, timeout=DEFAULT_TIMEOUT): self.conn = conn + self.graph = graph self.resource_id = resource_id self.timeout = timeout @@ -292,13 +299,14 @@ class await_delete(): log.info("Awaiting deletion of resource id %s", self.resource_id) query_check = ' '.join([ - 'SELECT ?urn tracker:id(?urn) ' - ' WHERE { ' + 'SELECT ?urn tracker:id(?urn) ', + 'FROM NAMED <%s> ', + ' WHERE { ', ' ?urn a rdfs:Resource ; ', ' . FILTER (tracker:id(?urn) = %s) ' '}' ]) - cursor = self.conn.query(query_check % self.resource_id) + cursor = self.conn.query(query_check % (self.graph, self.resource_id)) if not cursor.next(): raise AwaitException( "Resource with id %i isn't present in the store.", self.resource_id) @@ -348,31 +356,32 @@ class StoreHelper(): self.conn = conn - def await_insert(self, predicates, timeout=DEFAULT_TIMEOUT): + def await_insert(self, graph, predicates, timeout=DEFAULT_TIMEOUT): """Context manager that blocks until a resource is inserted.""" - return await_insert(self.conn, predicates, timeout) + return await_insert(self.conn, graph, predicates, timeout) - def await_update(self, resource_id, before_predicates, after_predicates, + def await_update(self, graph, resource_id, before_predicates, after_predicates, timeout=DEFAULT_TIMEOUT): """Context manager that blocks until a resource is updated.""" - return await_update(self.conn, resource_id, before_predicates, + return await_update(self.conn, graph, resource_id, before_predicates, after_predicates, timeout) - def await_delete(self, resource_id, timeout=DEFAULT_TIMEOUT): + def await_delete(self, graph, resource_id, timeout=DEFAULT_TIMEOUT): """Context manager that blocks until a resource is deleted.""" - return await_delete(self.conn, resource_id, timeout) + return await_delete(self.conn, graph, resource_id, timeout) - def ensure_resource(self, predicates, timeout=DEFAULT_TIMEOUT): - """Ensure that a resource matching 'predicates' exists. + def ensure_resource(self, graph, predicates, timeout=DEFAULT_TIMEOUT): + """Ensure that a resource matching 'predicates' exists in 'graph'. This function will block if the resource is not yet created. """ - await_ctx_mgr = await_insert(self.conn, predicates, timeout, _check_inserted=False) + await_ctx_mgr = await_insert(self.conn, graph, predicates, timeout, _check_inserted=False) with await_ctx_mgr as resource: # Check if the data was committed *before* the function was called. query_initial = ' '.join([ 'SELECT ?urn tracker:id(?urn) ' + f' FROM NAMED <{graph}>', ' WHERE { ' ' ?urn a rdfs:Resource ; ', predicates, @@ -432,7 +441,7 @@ class StoreHelper(): Get the internal ID for a given resource, identified by URL. """ result = self.query( - 'SELECT tracker:id(?r) WHERE { ?r nie:isStoredAs/nie:url "%s" }' % url) + 'SELECT tracker:id(?r) WHERE { ?r nie:isStoredAs "%s" }' % url) if len(result) == 1: return int(result[0][0]) elif len(result) == 0: -- cgit v1.2.1