summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Thursfield <sam@afuera.me.uk>2020-06-13 13:40:52 +0000
committerSam Thursfield <sam@afuera.me.uk>2020-06-13 13:40:52 +0000
commitf8239858a753a1d8f68d9fca4aa3dbca89c56dc1 (patch)
treeef115650baadb37fd2dc151b87cec73b35439605
parent39a32edc988c1e8f288bf85468f40892bc1bb5d8 (diff)
parent032e7baeffc02d9d8f6e285e11f5fb2973ba59a2 (diff)
downloadtracker-f8239858a753a1d8f68d9fca4aa3dbca89c56dc1.tar.gz
Merge branch 'sam/umockdev' into 'master'
trackertestutils: Add support for system bus processes See merge request GNOME/tracker!254
-rw-r--r--tests/functional-tests/fixtures.py4
-rw-r--r--utils/trackertestutils/__main__.py2
-rw-r--r--utils/trackertestutils/dbusdaemon.py64
-rw-r--r--utils/trackertestutils/helpers.py630
-rw-r--r--utils/trackertestutils/sandbox.py182
-rw-r--r--utils/trackertestutils/storehelper.py533
6 files changed, 777 insertions, 638 deletions
diff --git a/tests/functional-tests/fixtures.py b/tests/functional-tests/fixtures.py
index b42d742c4..08d16da17 100644
--- a/tests/functional-tests/fixtures.py
+++ b/tests/functional-tests/fixtures.py
@@ -55,9 +55,9 @@ def tracker_test_main():
# only errors and warnings should be output here unless the environment
# contains G_MESSAGES_DEBUG=.
handler_stderr = logging.StreamHandler(stream=sys.stderr)
- handler_stderr.addFilter(logging.Filter('trackertestutils.dbusdaemon.stderr'))
+ handler_stderr.addFilter(logging.Filter('sandbox-session-bus.stderr'))
handler_stdout = logging.StreamHandler(stream=sys.stderr)
- handler_stdout.addFilter(logging.Filter('trackertestutils.dbusdaemon.stdout'))
+ handler_stdout.addFilter(logging.Filter('sandbox-session-bus.stdout'))
logging.basicConfig(level=logging.INFO,
handlers=[handler_stderr, handler_stdout],
format='%(message)s')
diff --git a/utils/trackertestutils/__main__.py b/utils/trackertestutils/__main__.py
index ba64eb4d7..991da79bf 100644
--- a/utils/trackertestutils/__main__.py
+++ b/utils/trackertestutils/__main__.py
@@ -297,7 +297,7 @@ class MinerStatusWatch():
def setup(self):
log.debug(f"Set up status watch on {self.dbus_name}")
self._proxy = Gio.DBusProxy.new_sync(
- self._sandbox.get_connection(),
+ self._sandbox.get_session_bus_connection(),
Gio.DBusProxyFlags.NONE, None,
self.dbus_name, self.object_path, 'org.freedesktop.Tracker3.Miner',
None)
diff --git a/utils/trackertestutils/dbusdaemon.py b/utils/trackertestutils/dbusdaemon.py
index e55c03595..67c947af5 100644
--- a/utils/trackertestutils/dbusdaemon.py
+++ b/utils/trackertestutils/dbusdaemon.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2018,2019, Sam Thursfield <sam@afuera.me.uk>
+# Copyright (C) 2018-2020, Sam Thursfield <sam@afuera.me.uk>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
@@ -26,19 +26,49 @@ import signal
import subprocess
import threading
+from . import mainloop
+
+DEFAULT_TIMEOUT = 10
+
log = logging.getLogger(__name__)
-dbus_stderr_log = logging.getLogger(__name__ + '.stderr')
-dbus_stdout_log = logging.getLogger(__name__ + '.stdout')
class DaemonNotStartedError(Exception):
pass
+def await_bus_name(conn, bus_name, timeout=DEFAULT_TIMEOUT):
+ """Blocks until 'bus_name' has an owner."""
+
+ log.info("Blocking until name %s has owner", bus_name)
+ loop = mainloop.MainLoop()
+
+ def name_appeared_cb(connection, name, name_owner):
+ log.info("Name %s appeared (owned by %s)", name, name_owner)
+ loop.quit()
+
+ def timeout_cb():
+ log.info("Timeout fired after %s seconds", timeout)
+ raise AwaitTimeoutException(
+ f"Timeout awaiting bus name '{bus_name}'")
+
+ watch_id = Gio.bus_watch_name_on_connection(
+ conn, bus_name, Gio.BusNameWatcherFlags.NONE, name_appeared_cb, None)
+ timeout_id = GLib.timeout_add_seconds(timeout, timeout_cb)
+
+ loop.run_checked()
+
+ Gio.bus_unwatch_name(watch_id)
+ GLib.source_remove(timeout_id)
+
+
class DBusDaemon:
- """The private D-Bus instance that provides the sandbox's session bus."""
+ """A private D-Bus daemon instance."""
+
+ def __init__(self, config_file=None, name='dbus-daemon'):
+ self.name = name
+ self.config_file = config_file
- def __init__(self):
self.process = None
self.address = None
@@ -77,12 +107,13 @@ class DBusDaemon:
return dbus_daemon
- def start(self, config_file=None, env=None, new_session=False):
+ def start(self, env=None, new_session=False):
dbus_command = [self._dbus_daemon_path(), '--print-address=1', '--print-pid=1']
- if config_file:
- dbus_command += ['--config-file=' + config_file]
+ if self.config_file:
+ dbus_command += ['--config-file=' + self.config_file]
else:
dbus_command += ['--session']
+
log.debug("Running: %s", dbus_command)
self.process = subprocess.Popen(
dbus_command, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
@@ -101,10 +132,13 @@ class DBusDaemon:
log.debug("Using new D-Bus session with address '%s' with PID %d",
self.address, self.pid)
+ stderr_log = logging.getLogger(self.name + '.stderr')
+ stdout_log = logging.getLogger(self.name + '.stdout')
+
# We must read from the pipes continuously, otherwise the daemon
# process will block.
- self._threads=[threading.Thread(target=self.pipe_to_log, args=(self.process.stdout, dbus_stdout_log), daemon=True),
- threading.Thread(target=self.pipe_to_log, args=(self.process.stderr, dbus_stdout_log), daemon=True)]
+ self._threads=[threading.Thread(target=self.pipe_to_log, args=(self.process.stdout, stdout_log), daemon=True),
+ threading.Thread(target=self.pipe_to_log, args=(self.process.stderr, stderr_log), daemon=True)]
self._threads[0].start()
self._threads[1].start()
@@ -199,3 +233,13 @@ class DBusDaemon:
return None
else:
raise
+
+ def activate_service(self, bus_name, object_path):
+ GDBUS_DEFAULT_TIMEOUT = -1
+ self.get_connection().call_sync(
+ bus_name, object_path, 'org.freedesktop.DBus.Peer', 'Ping',
+ None, None, Gio.DBusCallFlags.NONE, GDBUS_DEFAULT_TIMEOUT, None)
+ self.await_bus_name(bus_name)
+
+ def await_bus_name(self, bus_name, timeout=DEFAULT_TIMEOUT):
+ await_bus_name(self.get_connection(), bus_name, timeout)
diff --git a/utils/trackertestutils/helpers.py b/utils/trackertestutils/helpers.py
index f806fef6a..91086618f 100644
--- a/utils/trackertestutils/helpers.py
+++ b/utils/trackertestutils/helpers.py
@@ -1,6 +1,4 @@
-#
-# Copyright (C) 2010, Nokia <jean-luc.lamadon@nokia.com>
-# Copyright (C) 2019, Sam Thursfield <sam@afuera.me.uk>
+# Copyright (C) 2020, Sam Thursfield <sam@afuera.me.uk>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
@@ -16,626 +14,8 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
-#
-
-import gi
-gi.require_version('Tracker', '3.0')
-from gi.repository import Tracker
-from gi.repository import GLib
-from gi.repository import GObject
-
-import atexit
-import dataclasses
-import logging
-import os
-import signal
-
-from . import dbusdaemon
-from . import mainloop
-from . import psutil_mini as psutil
-
-
-log = logging.getLogger(__name__)
-
-
-TRACKER_DBUS_PREFIX = 'org.freedesktop.Tracker3'
-TRACKER_MINER_FS_BUSNAME = 'org.freedesktop.Tracker3.Miner.Files'
-
-
-class AwaitException(RuntimeError):
- pass
-
-
-class AwaitTimeoutException(AwaitException):
- pass
-
-
-class NoMetadataException (Exception):
- pass
-
-
-DEFAULT_TIMEOUT = 10
-
-
-_process_list = []
-
-
-def _cleanup_processes():
- for process in _process_list:
- log.debug("helpers._cleanup_processes: stopping %s", process)
- process.stop()
-
-
-atexit.register(_cleanup_processes)
-
-
-@dataclasses.dataclass
-class InsertedResource():
- """Wraps the 'urn' value returned by await_insert context manager.
-
- We can't return the value directly as we don't know it until the context
- manager exits.
-
- """
- urn: str
- id: int
-
-
-class await_insert():
- """Context manager to await data insertion by Tracker miners & extractors.
-
- Use like this:
-
- expected = 'a nfo:Document; nie:isStoredAs <test://url>'
- with self.tracker.await_update(DOCUMENTS_GRAPH, expected) as resource:
- # Create or update a file that's indexed by tracker-miner-fs.
- #
- # The context manager will not exit from the 'with' block until the
- # data has been inserted in the store.
-
- print(f"Inserted resource with urn: {resource.urn}")
-
- The function expects an insertion to happen, and will raise an error if the
- expected data is already present in the store. You can use
- ensure_resource() if you just want to ensure that some data is present.
-
- """
- def __init__(self, conn, graph, predicates, timeout=DEFAULT_TIMEOUT,
- _check_inserted=True):
- self.conn = conn
- self.graph = graph
- self.predicates = predicates
- self.timeout = timeout
- self._check_inserted = _check_inserted
-
- self.loop = mainloop.MainLoop()
- self.notifier = self.conn.create_notifier(Tracker.NotifierFlags.NONE)
-
- self.result = InsertedResource(None, 0)
-
- def __enter__(self):
- log.info("Awaiting insertion of resource with data %s", self.predicates)
-
- if self._check_inserted:
- query_check = ' '.join([
- 'SELECT ?urn tracker:id(?urn) ',
- f' FROM NAMED <{self.graph}> ',
- ' WHERE { ',
- ' ?urn a rdfs:Resource ; ',
- self.predicates,
- '}'
- ])
- cursor = self.conn.query(query_check)
- if cursor.next():
- raise AwaitException("Expected data is already present in the store.")
-
- query_filtered = ' '.join([
- 'SELECT ?urn tracker:id(?urn) ',
- ' FROM NAMED <%s> ',
- ' WHERE { ',
- ' ?urn a rdfs:Resource ; ',
- self.predicates,
- #' FILTER (tracker:id(?urn) = ~id) '
- ' . FILTER (tracker:id(?urn) = %s) '
- '}'
- ])
-
- # FIXME: doesn't work with bus backend: https://gitlab.gnome.org/GNOME/tracker/issues/179
- #stmt = self.conn.query_statement(query, None)
-
- def match_cb(notifier, service, graph, events):
- for event in events:
- if event.get_event_type() in [Tracker.NotifierEventType.CREATE,
- Tracker.NotifierEventType.UPDATE]:
- log.debug("Processing %s event for id %s", event.get_event_type().value_nick, event.get_id())
- #stmt.bind_int('~id', event.get_id())
- #cursor = stmt.execute(None)
- stmt = query_filtered % (self.graph, event.get_id())
- cursor = self.conn.query(stmt)
-
- if cursor.next():
- self.result.urn = cursor.get_string(0)[0]
- self.result.id = cursor.get_integer(1)
- log.debug("Query matched! Got urn %s", self.result.urn)
-
- self.loop.quit()
-
- def timeout_cb():
- log.info("Timeout fired after %s seconds", self.timeout)
- raise AwaitTimeoutException(
- f"Timeout awaiting insert of resource matching: {self.predicates}")
-
- self.signal_id = self.notifier.connect('events', match_cb)
- self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
-
- return self.result
-
- def __exit__(self, etype, evalue, etraceback):
- if etype is not None:
- return False
-
- while self.result.urn is None:
- self.loop.run_checked()
- log.debug("Got urn %s", self.result.urn)
-
- GLib.source_remove(self.timeout_id)
- GObject.signal_handler_disconnect(self.notifier, self.signal_id)
-
- return True
-
-
-class await_property_update():
- """Context manager to await data updates by Tracker miners & extractors.
-
- Use like this:
-
- before = 'nie:isStoredAs <test://url1>'
- after = 'nie:isStoredAs <test://url2>'
- with self.tracker.await_property_update(resource_id, before, after):
- # Trigger an update of the data.
-
- """
- def __init__(self, conn, graph, resource_id, before_predicates, after_predicates,
- timeout=DEFAULT_TIMEOUT):
- self.conn = conn
- self.graph = graph
- self.resource_id = resource_id
- self.before_predicates = before_predicates
- self.after_predicates = after_predicates
- self.timeout = timeout
-
- self.loop = mainloop.MainLoop()
- self.notifier = self.conn.create_notifier(Tracker.NotifierFlags.NONE)
- self.matched = False
-
- def __enter__(self):
- log.info("Awaiting update of resource id %s", self.resource_id)
-
- query_before = ' '.join([
- 'SELECT ?urn tracker:id(?urn) ',
- ' FROM NAMED <%s> ',
- ' WHERE { ',
- ' ?urn a rdfs:Resource ; ',
- self.before_predicates,
- ' . FILTER (tracker:id(?urn) = %s) '
- '}'
- ]) % (self.graph, self.resource_id)
-
- cursor = self.conn.query(query_before)
- if not cursor.next():
- raise AwaitException("Expected data is not present in the store.")
-
- query_after = ' '.join([
- 'SELECT ?urn tracker:id(?urn) '
- ' FROM NAMED <%s> ',
- ' WHERE { '
- ' ?urn a rdfs:Resource ; ',
- self.after_predicates,
- ' . FILTER (tracker:id(?urn) = %s) '
- '}'
- ]) % (self.graph, self.resource_id)
-
- def match_cb(notifier, service, graph, events):
- for event in events:
- if event.get_event_type() == Tracker.NotifierEventType.UPDATE and event.get_id() == self.resource_id:
- log.debug("Processing %s event for id %s", event.get_event_type(), event.get_id())
- cursor = self.conn.query(query_after)
-
- if cursor.next():
- log.debug("Query matched!")
- self.matched = True
- self.loop.quit()
-
- def timeout_cb():
- log.info("Timeout fired after %s seconds", self.timeout)
- raise AwaitTimeoutException(
- f"Timeout awaiting update of resource {self.resource_id} "
- f"matching: {self.after_predicates}")
-
- self.signal_id = self.notifier.connect('events', match_cb)
- self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
-
- def __exit__(self, etype, evalue, etraceback):
- if etype is not None:
- return False
- while not self.matched:
- self.loop.run_checked()
-
- GLib.source_remove(self.timeout_id)
- GObject.signal_handler_disconnect(self.notifier, self.signal_id)
-
- return True
-
-
-class await_content_update():
- """Context manager to await updates to file contents.
-
- When a file is updated, the old information it contained is deleted from
- the store, and the new information is inserted as a new resource.
-
- """
- def __init__(self, conn, graph, before_resource_id, before_predicates, after_predicates,
- timeout=DEFAULT_TIMEOUT):
- self.conn = conn
- self.graph = graph
- self.before_resource_id = before_resource_id
- self.before_predicates = before_predicates
- self.after_predicates = after_predicates
- self.timeout = timeout
-
- self.loop = mainloop.MainLoop()
- self.notifier = self.conn.create_notifier(Tracker.NotifierFlags.NONE)
- self.matched = False
-
- self.result = InsertedResource(None, 0)
-
- def __enter__(self):
- log.info("Awaiting delete of resource id %s and creation of a new one", self.before_resource_id)
-
- query_before = ' '.join([
- 'SELECT nie:isStoredAs(?urn) ?urn tracker:id(?urn) '
- ' FROM NAMED <%s> ',
- ' WHERE { '
- ' ?urn a rdfs:Resource ; ',
- self.before_predicates,
- ' . FILTER (tracker:id(?urn) = %s) '
- '}'
- ]) % (self.graph, self.before_resource_id)
- cursor = self.conn.query(query_before)
- if not cursor.next():
- raise AwaitException("Expected data is not present in the store.")
- file_url = cursor.get_string(0)[0]
-
- query_after = ' '.join([
- 'SELECT ?urn tracker:id(?urn) '
- ' FROM NAMED <%s> ',
- ' WHERE { '
- ' ?urn a rdfs:Resource ; ',
- ' nie:isStoredAs <%s> ; ',
- self.after_predicates,
- '}'
- ]) % (self.graph, file_url)
-
- # When a file is updated, the DataObject representing the file gets
- # an UPDATE notification. The InformationElement representing the
- # content gets a DELETE and CREATE notification, because it is
- # deleted and recreated. We detect the latter situation.
-
- self.matched_delete = False
- def match_cb(notifier, service, graph, events):
- for event in events:
- log.debug("Received %s event for id %s", event.get_event_type().value_nick, event.get_id())
- if event.get_id() == self.before_resource_id and event.get_event_type() == Tracker.NotifierEventType.DELETE:
- log.debug(" Matched delete")
- self.matched_delete = True
-
- # The after_predicates may match after the miner-fs creates
- # the new resource, or they may only match once the extractor
- # processes the resource. The latter will be an UPDATE event.
- elif self.matched_delete and event.get_event_type() in [Tracker.NotifierEventType.CREATE, Tracker.NotifierEventType.UPDATE]:
- cursor = self.conn.query(query_after)
-
- if cursor.next():
- self.result.urn = cursor.get_string(0)[0]
- self.result.id = cursor.get_integer(1)
- log.debug("Query matched! Got new urn %s", self.result.urn)
-
- self.matched = True
- self.loop.quit()
-
- def timeout_cb():
- log.info("Timeout fired after %s seconds", self.timeout)
- raise AwaitTimeoutException(
- f"Timeout awaiting update of resource {self.before_resource_id} "
- f"with URL {file_url} matching: {self.after_predicates}")
-
- self.signal_id = self.notifier.connect('events', match_cb)
- self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
-
- return self.result
-
- def __exit__(self, etype, evalue, etraceback):
- if etype is not None:
- return False
-
- while not self.matched:
- self.loop.run_checked()
-
- GLib.source_remove(self.timeout_id)
- GObject.signal_handler_disconnect(self.notifier, self.signal_id)
-
- return True
-
-
-class await_delete():
- """Context manager to await removal of a resource."""
-
- def __init__(self, conn, graph, resource_id, timeout=DEFAULT_TIMEOUT):
- self.conn = conn
- self.graph = graph
- self.resource_id = resource_id
- self.timeout = timeout
-
- self.loop = mainloop.MainLoop()
- self.notifier = self.conn.create_notifier(Tracker.NotifierFlags.NONE)
- self.matched = False
-
- def __enter__(self):
- log.info("Awaiting deletion of resource id %s", self.resource_id)
-
- query_check = ' '.join([
- 'SELECT ?urn tracker:id(?urn) ',
- 'FROM NAMED <%s> ',
- ' WHERE { ',
- ' ?urn a rdfs:Resource ; ',
- ' . FILTER (tracker:id(?urn) = %s) '
- '}'
- ])
- cursor = self.conn.query(query_check % (self.graph, self.resource_id))
- if not cursor.next():
- raise AwaitException(
- "Resource with id %i isn't present in the store.", self.resource_id)
-
- def match_cb(notifier, service, graph, events):
- for event in events:
- if event.get_event_type() == Tracker.NotifierEventType.DELETE:
- log.debug("Received %s event for id %s", event.get_event_type(), event.get_id())
- if event.get_id() == self.resource_id:
- log.debug("Matched expected id %s", self.resource_id)
- self.matched = True
- self.loop.quit()
- else:
- log.debug("Received %s event for id %s", event.get_event_type(), event.get_id())
-
- def timeout_cb():
- log.info("Timeout fired after %s seconds", self.timeout)
- raise AwaitTimeoutException(
- f"Timeout awaiting removal of resource {self.resource_id} ")
-
- self.signal_id = self.notifier.connect('events', match_cb)
- self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
-
- return None
-
- def __exit__(self, etype, evalue, etraceback):
- if etype is not None:
- return False
-
- while not self.matched:
- self.loop.run_checked()
-
- GLib.source_remove(self.timeout_id)
- GObject.signal_handler_disconnect(self.notifier, self.signal_id)
-
- return True
-
-
-class StoreHelper():
- """
- Helper for testing database access with libtracker-sparql.
- """
-
- def __init__(self, conn):
- self.log = logging.getLogger(__name__)
- self.loop = mainloop.MainLoop()
-
- self.conn = conn
-
- def await_insert(self, graph, predicates, timeout=DEFAULT_TIMEOUT):
- """Context manager that blocks until a resource is inserted."""
- return await_insert(self.conn, graph, predicates, timeout)
-
- def await_property_update(self, graph, resource_id, before_predicates, after_predicates,
- timeout=DEFAULT_TIMEOUT):
- """Context manager that blocks until a resource property is updated."""
- return await_property_update(self.conn, graph, resource_id, before_predicates,
- after_predicates, timeout)
-
- def await_content_update(self, graph, before_resource_id, before_predicates, after_predicates,
- timeout=DEFAULT_TIMEOUT):
- """Context manager that blocks until a resource is deleted and recreated."""
- return await_content_update(self.conn, graph, before_resource_id, before_predicates,
- after_predicates, timeout)
-
- def await_delete(self, graph, resource_id, timeout=DEFAULT_TIMEOUT):
- """Context manager that blocks until a resource is deleted."""
- return await_delete(self.conn, graph, resource_id, timeout)
-
- def ensure_resource(self, graph, predicates, timeout=DEFAULT_TIMEOUT):
- """Ensure that a resource matching 'predicates' exists in 'graph'.
-
- This function will block if the resource is not yet created.
-
- """
- await_ctx_mgr = await_insert(self.conn, graph, predicates, timeout, _check_inserted=False)
- with await_ctx_mgr as resource:
- # Check if the data was committed *before* the function was called.
- query_initial = ' '.join([
- 'SELECT ?urn tracker:id(?urn) '
- f' FROM NAMED <{graph}>',
- ' WHERE { '
- ' ?urn a rdfs:Resource ; ',
- predicates,
- '}'
- ])
-
- cursor = self.conn.query(query_initial)
- if cursor.next():
- resource.urn = cursor.get_string(0)[0]
- resource.id = cursor.get_integer(1)
- return resource
- return resource
-
- def query(self, query):
- cursor = self.conn.query(query, None)
- result = []
- while cursor.next():
- row = []
- for i in range(0, cursor.get_n_columns()):
- row.append(cursor.get_string(i)[0])
- result.append(row)
- return result
-
- def update(self, update_sparql):
- self.conn.update(update_sparql, 0, None)
-
- def count_instances(self, ontology_class):
- QUERY = """
- SELECT COUNT(?u) WHERE {
- ?u a %s .
- }
- """
- result = self.query(QUERY % ontology_class)
-
- if (len(result) == 1):
- return int(result[0][0])
- else:
- return -1
-
- def get_resource_id_by_uri(self, uri):
- """
- Get the internal ID for a given resource, identified by URI.
- """
- result = self.query(
- 'SELECT tracker:id(<%s>) WHERE { }' % uri)
- if len(result) == 1:
- return int(result[0][0])
- elif len(result) == 0:
- raise Exception("No entry for resource %s" % uri)
- else:
- raise Exception("Multiple entries for resource %s" % uri)
-
- def get_content_resource_id(self, url):
- """
- Gets the internal ID for an nie:InformationElement resource.
-
- The InformationElement represents data stored in a file, not
- the file itself. The 'url' parameter is the URL of the file
- that stores the given content.
-
- """
- result = self.query(
- 'SELECT tracker:id(?r) WHERE { ?r a nie:InformationElement; nie:isStoredAs "%s" }' % url)
- if len(result) == 1:
- return int(result[0][0])
- elif len(result) == 0:
- raise Exception("No entry for resource %s" % url)
- else:
- raise Exception("Multiple entries for resource %s" % url)
-
- def ask(self, ask_query):
- assert ask_query.strip().startswith("ASK")
- result = self.query(ask_query)
- assert len(result) == 1
- if result[0][0] == "true":
- return True
- elif result[0][0] == "false":
- return False
- else:
- raise Exception("Something fishy is going on")
-
-
-class TrackerDBusSandbox:
- """
- Private D-Bus session bus which executes a sandboxed Tracker instance.
-
- """
- def __init__(self, dbus_daemon_config_file, extra_env=None):
- self.dbus_daemon_config_file = dbus_daemon_config_file
- self.extra_env = extra_env or {}
-
- self.daemon = dbusdaemon.DBusDaemon()
-
- def start(self, new_session=False):
- env = os.environ
- env.update(self.extra_env)
- env['G_MESSAGES_PREFIXED'] = 'all'
-
- # This avoids an issue where gvfsd-fuse can start up while the bus is
- # shutting down. If it fails to connect to the bus, it continues to
- # run anyway which leads to our dbus-daemon failing to shut down.
- #
- # Since https://gitlab.gnome.org/GNOME/gvfs/issues/323 was implemented
- # in GVFS 1.42 this problem may have gone away.
- env['GVFS_DISABLE_FUSE'] = '1'
-
- # Precreate runtime dir, to avoid this warning from dbus-daemon:
- #
- # Unable to set up transient service directory: XDG_RUNTIME_DIR "/home/sam/tracker-tests/tmp_59i3ev1/run" not available: No such file or directory
- #
- xdg_runtime_dir = env.get('XDG_RUNTIME_DIR')
- if xdg_runtime_dir:
- os.makedirs(xdg_runtime_dir, exist_ok=True)
-
- log.info("Starting D-Bus daemon for sandbox.")
- log.debug("Added environment variables: %s", self.extra_env)
- self.daemon.start(self.dbus_daemon_config_file, env=env, new_session=new_session)
-
- def stop(self):
- tracker_processes = []
-
- log.info("Looking for active Tracker processes on the bus")
- for busname in self.daemon.list_names_sync():
- if busname.startswith(TRACKER_DBUS_PREFIX):
- pid = self.daemon.get_connection_unix_process_id_sync(busname)
- if pid is not None:
- tracker_processes.append(pid)
-
- log.info("Terminating %i Tracker processes", len(tracker_processes))
- for pid in tracker_processes:
- os.kill(pid, signal.SIGTERM)
-
- log.info("Waiting for %i Tracker processes", len(tracker_processes))
- for pid in tracker_processes:
- psutil.wait_pid(pid)
-
- # We need to wait until Tracker processes have stopped before we
- # terminate the D-Bus daemon, otherwise lots of criticals like this
- # appear in the log output:
- #
- # (tracker-miner-fs:14955): GLib-GIO-CRITICAL **: 11:38:40.386: Error while sending AddMatch() message: The connection is closed
-
- log.info("Stopping D-Bus daemon for sandbox.")
- self.daemon.stop()
-
- def stop_daemon(self, busname):
- """Stops the daemon that owns 'busname'.
-
- This can be used if you want to force the miner-fs to exit, for
- example.
-
- """
- log.info("Stopping daemon process that owns %s.", busname)
- pid = self.daemon.get_connection_unix_process_id_sync(busname)
- if pid:
- os.kill(pid, signal.SIGTERM)
- psutil.wait_pid(pid)
- log.info("Process %i has stopped.", pid)
- else:
- log.info("Couldn't find a process owning %s.", busname)
-
- def get_connection(self):
- return self.daemon.get_connection()
- def get_session_bus_address(self):
- return self.daemon.get_address()
+# FIXME: Compatibility module due to recent API breaks.
+# Remove this before 3.0.
+from .sandbox import TrackerSandbox as TrackerDBusSandbox
+from .storehelper import StoreHelper
diff --git a/utils/trackertestutils/sandbox.py b/utils/trackertestutils/sandbox.py
new file mode 100644
index 000000000..55897b184
--- /dev/null
+++ b/utils/trackertestutils/sandbox.py
@@ -0,0 +1,182 @@
+# Copyright (C) 2010, Nokia <jean-luc.lamadon@nokia.com>
+# Copyright (C) 2019, Sam Thursfield <sam@afuera.me.uk>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+#
+
+"""
+Sandbox environment for running tests.
+
+The sandbox is essentially a private D-Bus daemon.
+"""
+
+from gi.repository import Gio
+
+import atexit
+import logging
+import os
+import signal
+import subprocess
+
+from . import dbusdaemon
+from . import dconf
+from . import psutil_mini as psutil
+
+log = logging.getLogger(__name__)
+
+TRACKER_DBUS_PREFIX = 'org.freedesktop.Tracker3'
+TRACKER_MINER_FS_BUSNAME = 'org.freedesktop.Tracker3.Miner.Files'
+
+_process_list = []
+
+
+def _cleanup_processes():
+ for process in _process_list:
+ log.debug("helpers._cleanup_processes: stopping %s", process)
+ process.stop()
+
+
+atexit.register(_cleanup_processes)
+
+
+class TrackerSandbox:
+ """
+ Run Tracker daemons isolated from the real user session.
+
+ The primary method of sandboxing is running one or more private D-Bus
+ daemons, which take place of the host's session and system bus.
+
+ """
+ def __init__(self, session_bus_config_file, system_bus_config_file=None,
+ extra_env=None):
+ self.extra_env = extra_env or {}
+
+ self.session_bus = dbusdaemon.DBusDaemon(
+ name='sandbox-session-bus', config_file=session_bus_config_file)
+ if system_bus_config_file:
+ self.system_bus = dbusdaemon.DBusDaemon(
+ name='sandbox-system-bus', config_file=system_bus_config_file)
+ else:
+ self.system_bus = None
+
+ def get_environment(self):
+ env = os.environ
+ env.update(self.extra_env)
+ env['G_MESSAGES_PREFIXED'] = 'all'
+
+ # This avoids an issue where gvfsd-fuse can start up while the bus is
+ # shutting down. If it fails to connect to the bus, it continues to
+ # run anyway which leads to our dbus-daemon failing to shut down.
+ #
+ # Since https://gitlab.gnome.org/GNOME/gvfs/issues/323 was implemented
+ # in GVFS 1.42 this problem may have gone away.
+ env['GVFS_DISABLE_FUSE'] = '1'
+
+ # Precreate runtime dir, to avoid this warning from dbus-daemon:
+ #
+ # Unable to set up transient service directory: XDG_RUNTIME_DIR "/home/sam/tracker-tests/tmp_59i3ev1/run" not available: No such file or directory
+ #
+ xdg_runtime_dir = env.get('XDG_RUNTIME_DIR')
+ if xdg_runtime_dir:
+ os.makedirs(xdg_runtime_dir, exist_ok=True)
+
+ def start(self, new_session=False):
+ if self.system_bus:
+ log.info("Starting D-Bus system bus for sandbox.")
+ log.debug("Added environment variables: %s", self.extra_env)
+ self.system_bus.start(env=self.get_environment(), new_session=new_session)
+
+ self.extra_env['DBUS_SYSTEM_BUS_ADDRESS'] = self.system_bus.get_address()
+
+ log.info("Starting D-Bus session bus for sandbox.")
+ log.debug("Added environment variables: %s", self.extra_env)
+ self.session_bus.start(env=self.get_environment(), new_session=new_session)
+
+ def stop(self):
+ tracker_processes = []
+
+ log.info("Looking for active Tracker processes on the session bus")
+ for busname in self.session_bus.list_names_sync():
+ if busname.startswith(TRACKER_DBUS_PREFIX):
+ pid = self.session_bus.get_connection_unix_process_id_sync(busname)
+ if pid is not None:
+ tracker_processes.append(pid)
+
+ log.info("Terminating %i Tracker processes", len(tracker_processes))
+ for pid in tracker_processes:
+ os.kill(pid, signal.SIGTERM)
+
+ log.info("Waiting for %i Tracker processes", len(tracker_processes))
+ for pid in tracker_processes:
+ psutil.wait_pid(pid)
+
+ # We need to wait until Tracker processes have stopped before we
+ # terminate the D-Bus daemon, otherwise lots of criticals like this
+ # appear in the log output:
+ #
+ # (tracker-miner-fs:14955): GLib-GIO-CRITICAL **: 11:38:40.386: Error while sending AddMatch() message: The connection is closed
+
+ log.info("Stopping D-Bus session bus for sandbox.")
+ self.session_bus.stop()
+
+ if self.system_bus:
+ log.info("Stopping D-Bus system bus for sandbox.")
+ self.system_bus.stop()
+
+ def stop_daemon(self, busname):
+ """Stops the daemon that owns 'busname'.
+
+ This can be used if you want to force the miner-fs to exit, for
+ example.
+
+ """
+ log.info("Stopping daemon process that owns %s.", busname)
+ pid = self.daemon.get_connection_unix_process_id_sync(busname)
+ if pid:
+ os.kill(pid, signal.SIGTERM)
+ psutil.wait_pid(pid)
+ log.info("Process %i has stopped.", pid)
+ else:
+ log.info("Couldn't find a process owning %s.", busname)
+
+ def get_session_bus_connection(self):
+ """Return a Gio.BusConnection to the sandbox D-Bus session bus."""
+ return self.session_bus.get_connection()
+
+ def get_system_bus_connection(self):
+ """Return a Gio.BusConnection to the sandbox D-Bus system bus."""
+ return self.system_bus.get_connection()
+
+ def get_session_bus_address(self):
+ return self.session_bus.get_address()
+
+ def set_config(self, schema_config_dict):
+ """Set config values in multiple GSettings schemas.
+
+ Example input:
+
+ set_all({
+ 'org.freedesktop.Tracker3.Miner.Files': {
+ 'enable-writeback': GLib.Variant.new_boolean(False),
+ }
+ })
+
+ """
+
+ for schema_name, contents in schema_config_dict.items():
+ dconfclient = dconf.DConfClient(self)
+ for key, value in contents.items():
+ dconfclient.write(schema_name, key, value)
diff --git a/utils/trackertestutils/storehelper.py b/utils/trackertestutils/storehelper.py
new file mode 100644
index 000000000..19a212861
--- /dev/null
+++ b/utils/trackertestutils/storehelper.py
@@ -0,0 +1,533 @@
+#
+# Copyright (C) 2010, Nokia <jean-luc.lamadon@nokia.com>
+# Copyright (C) 2019, Sam Thursfield <sam@afuera.me.uk>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+#
+
+"""Test helpers for libtracker-sparql data stores."""
+
+import gi
+gi.require_version('Tracker', '3.0')
+from gi.repository import GLib
+from gi.repository import GObject
+from gi.repository import Tracker
+
+import dataclasses
+import logging
+
+from . import mainloop
+
+log = logging.getLogger(__name__)
+
+DEFAULT_TIMEOUT = 10
+
+
+class AwaitException(RuntimeError):
+ pass
+
+
+class AwaitTimeoutException(AwaitException):
+ pass
+
+
+class NoMetadataException (Exception):
+ pass
+
+@dataclasses.dataclass
+class InsertedResource():
+ """Wraps the 'urn' value returned by await_insert context manager.
+
+ We can't return the value directly as we don't know it until the context
+ manager exits.
+
+ """
+ urn: str
+ id: int
+
+
+class await_insert():
+ """Context manager to await data insertion by Tracker miners & extractors.
+
+ Use like this:
+
+ expected = 'a nfo:Document; nie:isStoredAs <test://url>'
+ with self.tracker.await_update(DOCUMENTS_GRAPH, expected) as resource:
+ # Create or update a file that's indexed by tracker-miner-fs.
+ #
+ # The context manager will not exit from the 'with' block until the
+ # data has been inserted in the store.
+
+ print(f"Inserted resource with urn: {resource.urn}")
+
+ The function expects an insertion to happen, and will raise an error if the
+ expected data is already present in the store. You can use
+ ensure_resource() if you just want to ensure that some data is present.
+
+ """
+ def __init__(self, conn, graph, predicates, timeout=DEFAULT_TIMEOUT,
+ _check_inserted=True):
+ self.conn = conn
+ self.graph = graph
+ self.predicates = predicates
+ self.timeout = timeout
+ self._check_inserted = _check_inserted
+
+ self.loop = mainloop.MainLoop()
+ self.notifier = self.conn.create_notifier(Tracker.NotifierFlags.NONE)
+
+ self.result = InsertedResource(None, 0)
+
+ def __enter__(self):
+ log.info("Awaiting insertion of resource with data %s", self.predicates)
+
+ if self._check_inserted:
+ query_check = ' '.join([
+ 'SELECT ?urn tracker:id(?urn) ',
+ f' FROM NAMED <{self.graph}> ',
+ ' WHERE { ',
+ ' ?urn a rdfs:Resource ; ',
+ self.predicates,
+ '}'
+ ])
+ cursor = self.conn.query(query_check)
+ if cursor.next():
+ raise AwaitException("Expected data is already present in the store.")
+
+ query_filtered = ' '.join([
+ 'SELECT ?urn tracker:id(?urn) ',
+ ' FROM NAMED <%s> ',
+ ' WHERE { ',
+ ' ?urn a rdfs:Resource ; ',
+ self.predicates,
+ #' FILTER (tracker:id(?urn) = ~id) '
+ ' . FILTER (tracker:id(?urn) = %s) '
+ '}'
+ ])
+
+ # FIXME: doesn't work with bus backend: https://gitlab.gnome.org/GNOME/tracker/issues/179
+ #stmt = self.conn.query_statement(query, None)
+
+ def match_cb(notifier, service, graph, events):
+ for event in events:
+ if event.get_event_type() in [Tracker.NotifierEventType.CREATE,
+ Tracker.NotifierEventType.UPDATE]:
+ log.debug("Processing %s event for id %s", event.get_event_type().value_nick, event.get_id())
+ #stmt.bind_int('~id', event.get_id())
+ #cursor = stmt.execute(None)
+ stmt = query_filtered % (self.graph, event.get_id())
+ cursor = self.conn.query(stmt)
+
+ if cursor.next():
+ self.result.urn = cursor.get_string(0)[0]
+ self.result.id = cursor.get_integer(1)
+ log.debug("Query matched! Got urn %s", self.result.urn)
+
+ self.loop.quit()
+
+ def timeout_cb():
+ log.info("Timeout fired after %s seconds", self.timeout)
+ raise AwaitTimeoutException(
+ f"Timeout awaiting insert of resource matching: {self.predicates}")
+
+ self.signal_id = self.notifier.connect('events', match_cb)
+ self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
+
+ return self.result
+
+ def __exit__(self, etype, evalue, etraceback):
+ if etype is not None:
+ return False
+
+ while self.result.urn is None:
+ self.loop.run_checked()
+ log.debug("Got urn %s", self.result.urn)
+
+ GLib.source_remove(self.timeout_id)
+ GObject.signal_handler_disconnect(self.notifier, self.signal_id)
+
+ return True
+
+
+class await_property_update():
+ """Context manager to await data updates by Tracker miners & extractors.
+
+ Use like this:
+
+ before = 'nie:isStoredAs <test://url1>'
+ after = 'nie:isStoredAs <test://url2>'
+ with self.tracker.await_property_update(resource_id, before, after):
+ # Trigger an update of the data.
+
+ """
+ def __init__(self, conn, graph, resource_id, before_predicates, after_predicates,
+ timeout=DEFAULT_TIMEOUT):
+ self.conn = conn
+ self.graph = graph
+ self.resource_id = resource_id
+ self.before_predicates = before_predicates
+ self.after_predicates = after_predicates
+ self.timeout = timeout
+
+ self.loop = mainloop.MainLoop()
+ self.notifier = self.conn.create_notifier(Tracker.NotifierFlags.NONE)
+ self.matched = False
+
+ def __enter__(self):
+ log.info("Awaiting update of resource id %s", self.resource_id)
+
+ query_before = ' '.join([
+ 'SELECT ?urn tracker:id(?urn) ',
+ ' FROM NAMED <%s> ',
+ ' WHERE { ',
+ ' ?urn a rdfs:Resource ; ',
+ self.before_predicates,
+ ' . FILTER (tracker:id(?urn) = %s) '
+ '}'
+ ]) % (self.graph, self.resource_id)
+
+ cursor = self.conn.query(query_before)
+ if not cursor.next():
+ raise AwaitException("Expected data is not present in the store.")
+
+ query_after = ' '.join([
+ 'SELECT ?urn tracker:id(?urn) '
+ ' FROM NAMED <%s> ',
+ ' WHERE { '
+ ' ?urn a rdfs:Resource ; ',
+ self.after_predicates,
+ ' . FILTER (tracker:id(?urn) = %s) '
+ '}'
+ ]) % (self.graph, self.resource_id)
+
+ def match_cb(notifier, service, graph, events):
+ for event in events:
+ if event.get_event_type() == Tracker.NotifierEventType.UPDATE and event.get_id() == self.resource_id:
+ log.debug("Processing %s event for id %s", event.get_event_type(), event.get_id())
+ cursor = self.conn.query(query_after)
+
+ if cursor.next():
+ log.debug("Query matched!")
+ self.matched = True
+ self.loop.quit()
+
+ def timeout_cb():
+ log.info("Timeout fired after %s seconds", self.timeout)
+ raise AwaitTimeoutException(
+ f"Timeout awaiting update of resource {self.resource_id} "
+ f"matching: {self.after_predicates}")
+
+ self.signal_id = self.notifier.connect('events', match_cb)
+ self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
+
+ def __exit__(self, etype, evalue, etraceback):
+ if etype is not None:
+ return False
+ while not self.matched:
+ self.loop.run_checked()
+
+ GLib.source_remove(self.timeout_id)
+ GObject.signal_handler_disconnect(self.notifier, self.signal_id)
+
+ return True
+
+
+class await_content_update():
+ """Context manager to await updates to file contents.
+
+ When a file is updated, the old information it contained is deleted from
+ the store, and the new information is inserted as a new resource.
+
+ """
+ def __init__(self, conn, graph, before_resource_id, before_predicates, after_predicates,
+ timeout=DEFAULT_TIMEOUT):
+ self.conn = conn
+ self.graph = graph
+ self.before_resource_id = before_resource_id
+ self.before_predicates = before_predicates
+ self.after_predicates = after_predicates
+ self.timeout = timeout
+
+ self.loop = mainloop.MainLoop()
+ self.notifier = self.conn.create_notifier(Tracker.NotifierFlags.NONE)
+ self.matched = False
+
+ self.result = InsertedResource(None, 0)
+
+ def __enter__(self):
+ log.info("Awaiting delete of resource id %s and creation of a new one", self.before_resource_id)
+
+ query_before = ' '.join([
+ 'SELECT nie:isStoredAs(?urn) ?urn tracker:id(?urn) '
+ ' FROM NAMED <%s> ',
+ ' WHERE { '
+ ' ?urn a rdfs:Resource ; ',
+ self.before_predicates,
+ ' . FILTER (tracker:id(?urn) = %s) '
+ '}'
+ ]) % (self.graph, self.before_resource_id)
+ cursor = self.conn.query(query_before)
+ if not cursor.next():
+ raise AwaitException("Expected data is not present in the store.")
+ file_url = cursor.get_string(0)[0]
+
+ query_after = ' '.join([
+ 'SELECT ?urn tracker:id(?urn) '
+ ' FROM NAMED <%s> ',
+ ' WHERE { '
+ ' ?urn a rdfs:Resource ; ',
+ ' nie:isStoredAs <%s> ; ',
+ self.after_predicates,
+ '}'
+ ]) % (self.graph, file_url)
+
+ # When a file is updated, the DataObject representing the file gets
+ # an UPDATE notification. The InformationElement representing the
+ # content gets a DELETE and CREATE notification, because it is
+ # deleted and recreated. We detect the latter situation.
+
+ self.matched_delete = False
+ def match_cb(notifier, service, graph, events):
+ for event in events:
+ log.debug("Received %s event for id %s", event.get_event_type().value_nick, event.get_id())
+ if event.get_id() == self.before_resource_id and event.get_event_type() == Tracker.NotifierEventType.DELETE:
+ log.debug(" Matched delete")
+ self.matched_delete = True
+
+ # The after_predicates may match after the miner-fs creates
+ # the new resource, or they may only match once the extractor
+ # processes the resource. The latter will be an UPDATE event.
+ elif self.matched_delete and event.get_event_type() in [Tracker.NotifierEventType.CREATE, Tracker.NotifierEventType.UPDATE]:
+ cursor = self.conn.query(query_after)
+
+ if cursor.next():
+ self.result.urn = cursor.get_string(0)[0]
+ self.result.id = cursor.get_integer(1)
+ log.debug("Query matched! Got new urn %s", self.result.urn)
+
+ self.matched = True
+ self.loop.quit()
+
+ def timeout_cb():
+ log.info("Timeout fired after %s seconds", self.timeout)
+ raise AwaitTimeoutException(
+ f"Timeout awaiting update of resource {self.before_resource_id} "
+ f"with URL {file_url} matching: {self.after_predicates}")
+
+ self.signal_id = self.notifier.connect('events', match_cb)
+ self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
+
+ return self.result
+
+ def __exit__(self, etype, evalue, etraceback):
+ if etype is not None:
+ return False
+
+ while not self.matched:
+ self.loop.run_checked()
+
+ GLib.source_remove(self.timeout_id)
+ GObject.signal_handler_disconnect(self.notifier, self.signal_id)
+
+ return True
+
+
+class await_delete():
+ """Context manager to await removal of a resource."""
+
+ def __init__(self, conn, graph, resource_id, timeout=DEFAULT_TIMEOUT):
+ self.conn = conn
+ self.graph = graph
+ self.resource_id = resource_id
+ self.timeout = timeout
+
+ self.loop = mainloop.MainLoop()
+ self.notifier = self.conn.create_notifier(Tracker.NotifierFlags.NONE)
+ self.matched = False
+
+ def __enter__(self):
+ log.info("Awaiting deletion of resource id %s", self.resource_id)
+
+ query_check = ' '.join([
+ 'SELECT ?urn tracker:id(?urn) ',
+ 'FROM NAMED <%s> ',
+ ' WHERE { ',
+ ' ?urn a rdfs:Resource ; ',
+ ' . FILTER (tracker:id(?urn) = %s) '
+ '}'
+ ])
+ cursor = self.conn.query(query_check % (self.graph, self.resource_id))
+ if not cursor.next():
+ raise AwaitException(
+ "Resource with id %i isn't present in the store.", self.resource_id)
+
+ def match_cb(notifier, service, graph, events):
+ for event in events:
+ if event.get_event_type() == Tracker.NotifierEventType.DELETE:
+ log.debug("Received %s event for id %s", event.get_event_type(), event.get_id())
+ if event.get_id() == self.resource_id:
+ log.debug("Matched expected id %s", self.resource_id)
+ self.matched = True
+ self.loop.quit()
+ else:
+ log.debug("Received %s event for id %s", event.get_event_type(), event.get_id())
+
+ def timeout_cb():
+ log.info("Timeout fired after %s seconds", self.timeout)
+ raise AwaitTimeoutException(
+ f"Timeout awaiting removal of resource {self.resource_id} ")
+
+ self.signal_id = self.notifier.connect('events', match_cb)
+ self.timeout_id = GLib.timeout_add_seconds(self.timeout, timeout_cb)
+
+ return None
+
+ def __exit__(self, etype, evalue, etraceback):
+ if etype is not None:
+ return False
+
+ while not self.matched:
+ self.loop.run_checked()
+
+ GLib.source_remove(self.timeout_id)
+ GObject.signal_handler_disconnect(self.notifier, self.signal_id)
+
+ return True
+
+
+class StoreHelper():
+ """
+ Helper for testing database access with libtracker-sparql.
+ """
+
+ def __init__(self, conn):
+ self.log = logging.getLogger(__name__)
+ self.loop = mainloop.MainLoop()
+
+ self.conn = conn
+
+ def await_insert(self, graph, predicates, timeout=DEFAULT_TIMEOUT):
+ """Context manager that blocks until a resource is inserted."""
+ return await_insert(self.conn, graph, predicates, timeout)
+
+ def await_property_update(self, graph, resource_id, before_predicates, after_predicates,
+ timeout=DEFAULT_TIMEOUT):
+ """Context manager that blocks until a resource property is updated."""
+ return await_property_update(self.conn, graph, resource_id, before_predicates,
+ after_predicates, timeout)
+
+ def await_content_update(self, graph, before_resource_id, before_predicates, after_predicates,
+ timeout=DEFAULT_TIMEOUT):
+ """Context manager that blocks until a resource is deleted and recreated."""
+ return await_content_update(self.conn, graph, before_resource_id, before_predicates,
+ after_predicates, timeout)
+
+ def await_delete(self, graph, resource_id, timeout=DEFAULT_TIMEOUT):
+ """Context manager that blocks until a resource is deleted."""
+ return await_delete(self.conn, graph, resource_id, timeout)
+
+ def ensure_resource(self, graph, predicates, timeout=DEFAULT_TIMEOUT):
+ """Ensure that a resource matching 'predicates' exists in 'graph'.
+
+ This function will block if the resource is not yet created.
+
+ """
+ await_ctx_mgr = await_insert(self.conn, graph, predicates, timeout, _check_inserted=False)
+ with await_ctx_mgr as resource:
+ # Check if the data was committed *before* the function was called.
+ query_initial = ' '.join([
+ 'SELECT ?urn tracker:id(?urn) '
+ f' FROM NAMED <{graph}>',
+ ' WHERE { '
+ ' ?urn a rdfs:Resource ; ',
+ predicates,
+ '}'
+ ])
+
+ cursor = self.conn.query(query_initial)
+ if cursor.next():
+ resource.urn = cursor.get_string(0)[0]
+ resource.id = cursor.get_integer(1)
+ return resource
+ return resource
+
+ def query(self, query):
+ cursor = self.conn.query(query, None)
+ result = []
+ while cursor.next():
+ row = []
+ for i in range(0, cursor.get_n_columns()):
+ row.append(cursor.get_string(i)[0])
+ result.append(row)
+ return result
+
+ def update(self, update_sparql):
+ self.conn.update(update_sparql, 0, None)
+
+ def count_instances(self, ontology_class):
+ QUERY = """
+ SELECT COUNT(?u) WHERE {
+ ?u a %s .
+ }
+ """
+ result = self.query(QUERY % ontology_class)
+
+ if (len(result) == 1):
+ return int(result[0][0])
+ else:
+ return -1
+
+ def get_resource_id_by_uri(self, uri):
+ """
+ Get the internal ID for a given resource, identified by URI.
+ """
+ result = self.query(
+ 'SELECT tracker:id(<%s>) WHERE { }' % uri)
+ if len(result) == 1:
+ return int(result[0][0])
+ elif len(result) == 0:
+ raise Exception("No entry for resource %s" % uri)
+ else:
+ raise Exception("Multiple entries for resource %s" % uri)
+
+ def get_content_resource_id(self, url):
+ """
+ Gets the internal ID for an nie:InformationElement resource.
+
+ The InformationElement represents data stored in a file, not
+ the file itself. The 'url' parameter is the URL of the file
+ that stores the given content.
+
+ """
+ result = self.query(
+ 'SELECT tracker:id(?r) WHERE { ?r a nie:InformationElement; nie:isStoredAs "%s" }' % url)
+ if len(result) == 1:
+ return int(result[0][0])
+ elif len(result) == 0:
+ raise Exception("No entry for resource %s" % url)
+ else:
+ raise Exception("Multiple entries for resource %s" % url)
+
+ def ask(self, ask_query):
+ assert ask_query.strip().startswith("ASK")
+ result = self.query(ask_query)
+ assert len(result) == 1
+ if result[0][0] == "true":
+ return True
+ elif result[0][0] == "false":
+ return False
+ else:
+ raise Exception("Something fishy is going on")