diff options
Diffstat (limited to 'utils/trackertestutils/helpers.py')
-rw-r--r-- | utils/trackertestutils/helpers.py | 283 |
1 files changed, 97 insertions, 186 deletions
diff --git a/utils/trackertestutils/helpers.py b/utils/trackertestutils/helpers.py index e6219be3c..ef450ee05 100644 --- a/utils/trackertestutils/helpers.py +++ b/utils/trackertestutils/helpers.py @@ -24,9 +24,12 @@ from gi.repository import GLib import atexit import logging import os -import subprocess +import signal +from . import dbusdaemon from . import mainloop +from . import psutil_mini as psutil + log = logging.getLogger(__name__) @@ -54,172 +57,9 @@ def _cleanup_processes(): atexit.register(_cleanup_processes) -class Helper: +class StoreHelper(): """ - Abstract helper for Tracker processes. Launches the process - and waits for it to appear on the session bus. - - The helper will fail if the process is already running. Use - test-runner.sh to ensure the processes run inside a separate DBus - session bus. - - The process is watched using a timed GLib main loop source. If the process - exits with an error code, the test will abort the next time the main loop - is entered (or straight away if currently running the main loop). - """ - - STARTUP_TIMEOUT = 200 # milliseconds - SHUTDOWN_TIMEOUT = 200 # - - def __init__(self, helper_name, bus_name, process_path): - self.name = helper_name - self.bus_name = bus_name - self.process_path = process_path - - self.log = logging.getLogger(f'{__name__}.{self.name}') - - self.process = None - self.available = False - - self.loop = mainloop.MainLoop() - - self.bus = Gio.bus_get_sync(Gio.BusType.SESSION, None) - - def _start_process(self, command_args=None, extra_env=None): - global _process_list - _process_list.append(self) - - command = [self.process_path] + (command_args or []) - self.log.debug("Starting %s.", ' '.join(command)) - - env = os.environ - if extra_env: - self.log.debug(" starting with extra environment: %s", extra_env) - env.update(extra_env) - - try: - return subprocess.Popen(command, env=env) - except OSError as e: - raise RuntimeError("Error starting %s: %s" % (self.process_path, e)) - - def _bus_name_appeared(self, connection, name, owner): - self.log.debug("%s appeared on the message bus, owned by %s", name, owner) - self.available = True - self.loop.quit() - - def _bus_name_vanished(self, connection, name): - self.log.debug("%s vanished from the message bus", name) - self.available = False - self.loop.quit() - - def _process_watch_cb(self): - if self.process_watch_timeout == 0: - # GLib seems to call the timeout after we've removed it - # sometimes, which causes errors unless we detect it. - return False - - status = self.process.poll() - - if status is None: - return True # continue - elif status == 0 and not self.abort_if_process_exits_with_status_0: - return True # continue - else: - self.process_watch_timeout = 0 - raise RuntimeError(f"{self.name} exited with status: {self.status}") - - def _process_startup_timeout_cb(self): - self.log.debug(f"Process timeout of {self.STARTUP_TIMEOUT}ms was called") - self.loop.quit() - self.timeout_id = None - return False - - def start(self, command_args=None, extra_env=None): - """ - Start an instance of process and wait for it to appear on the bus. - """ - if self.process is not None: - raise RuntimeError("%s: already started" % self.name) - - self._bus_name_watch_id = Gio.bus_watch_name_on_connection( - self.bus, self.bus_name, Gio.BusNameWatcherFlags.NONE, - self._bus_name_appeared, self._bus_name_vanished) - - # We expect the _bus_name_vanished callback to be called here, - # causing the loop to exit again. - self.loop.run_checked() - - if self.available: - # It's running, but we didn't start it... - raise RuntimeError("Unable to start test instance of %s: " - "already running" % self.name) - - self.process = self._start_process(command_args=command_args, - extra_env=extra_env) - self.log.debug('Started with PID %i', self.process.pid) - - self.process_startup_timeout = GLib.timeout_add( - self.STARTUP_TIMEOUT, self._process_startup_timeout_cb) - - self.abort_if_process_exits_with_status_0 = True - - # Run the loop until the bus name appears, or the process dies. - self.loop.run_checked() - - self.abort_if_process_exits_with_status_0 = False - - def stop(self): - global _process_list - - if self.process is None: - # Seems that it didn't even start... - return - - if self.process_startup_timeout != 0: - GLib.source_remove(self.process_startup_timeout) - self.process_startup_timeout = 0 - - if self.process.poll() == None: - self.process.terminate() - returncode = self.process.wait(timeout=self.SHUTDOWN_TIMEOUT * 1000) - if returncode is None: - self.log.debug("Process failed to terminate in time, sending kill!") - self.process.kill() - self.process.wait() - elif returncode > 0: - self.log.warn("Process returned error code %s", returncode) - - self.log.debug("Process stopped.") - - # Run the loop to handle the expected name_vanished signal. - self.loop.run_checked() - Gio.bus_unwatch_name(self._bus_name_watch_id) - - self.process = None - _process_list.remove(self) - - def kill(self): - global _process_list - - if self.process_watch_timeout != 0: - GLib.source_remove(self.process_watch_timeout) - self.process_watch_timeout = 0 - - self.process.kill() - - # Name owner changed callback should take us out from this loop - self.loop.run_checked() - Gio.bus_unwatch_name(self._bus_name_watch_id) - - self.process = None - _process_list.remove(self) - - self.log.debug("Process killed.") - - -class StoreHelper (Helper): - """ - Helper for starting and testing the tracker-store daemon. + Helper for testing the tracker-store daemon. """ TRACKER_BUSNAME = 'org.freedesktop.Tracker1' @@ -235,32 +75,41 @@ class StoreHelper (Helper): TRACKER_STATUS_OBJ_PATH = "/org/freedesktop/Tracker1/Status" STATUS_IFACE = "org.freedesktop.Tracker1.Status" - def __init__(self, process_path): - Helper.__init__(self, "tracker-store", self.TRACKER_BUSNAME, process_path) + def __init__(self, dbus_connection): + self.log = logging.getLogger(__name__) + self.loop = mainloop.MainLoop() - def start(self, command_args=None, extra_env=None): - Helper.start(self, command_args, extra_env) + self.bus = dbus_connection + self.graph_updated_handler_id = 0 self.resources = Gio.DBusProxy.new_sync( - self.bus, Gio.DBusProxyFlags.DO_NOT_AUTO_START, None, + self.bus, Gio.DBusProxyFlags.DO_NOT_AUTO_START_AT_CONSTRUCTION, None, self.TRACKER_BUSNAME, self.TRACKER_OBJ_PATH, self.RESOURCES_IFACE) self.backup_iface = Gio.DBusProxy.new_sync( - self.bus, Gio.DBusProxyFlags.DO_NOT_AUTO_START, None, + self.bus, Gio.DBusProxyFlags.DO_NOT_AUTO_START_AT_CONSTRUCTION, None, self.TRACKER_BUSNAME, self.TRACKER_BACKUP_OBJ_PATH, self.BACKUP_IFACE) self.stats_iface = Gio.DBusProxy.new_sync( - self.bus, Gio.DBusProxyFlags.DO_NOT_AUTO_START, None, + self.bus, Gio.DBusProxyFlags.DO_NOT_AUTO_START_AT_CONSTRUCTION, None, self.TRACKER_BUSNAME, self.TRACKER_STATS_OBJ_PATH, self.STATS_IFACE) self.status_iface = Gio.DBusProxy.new_sync( - self.bus, Gio.DBusProxyFlags.DO_NOT_AUTO_START, None, + self.bus, Gio.DBusProxyFlags.DO_NOT_AUTO_START_AT_CONSTRUCTION, None, self.TRACKER_BUSNAME, self.TRACKER_STATUS_OBJ_PATH, self.STATUS_IFACE) + def start_and_wait_for_ready(self): + # The daemon is autostarted as soon as a method is called. + # + # We set a big timeout to avoid interfering when a daemon is being + # interactively debugged. self.log.debug("Calling %s.Wait() method", self.STATUS_IFACE) - self.status_iface.Wait() + self.status_iface.call_sync('Wait', None, Gio.DBusCallFlags.NONE, 1000000, None) self.log.debug("Ready") + def start_watching_updates(self): + assert self.graph_updated_handler_id == 0 + self.reset_graph_updates_tracking() def signal_handler(proxy, sender_name, signal_name, parameters): @@ -269,12 +118,13 @@ class StoreHelper (Helper): self.graph_updated_handler_id = self.resources.connect( 'g-signal', signal_handler) + self.log.debug("Watching for updates from Resources interface") - def stop(self): - Helper.stop(self) - + def stop_watching_updates(self): if self.graph_updated_handler_id != 0: + self.log.debug("No longer watching for updates from Resources interface") self.resources.disconnect(self.graph_updated_handler_id) + self.graph_updated_handler_id = 0 # A system to follow GraphUpdated and make sure all changes are tracked. # This code saves every change notification received, and exposes methods @@ -329,6 +179,7 @@ class StoreHelper (Helper): """ assert (self.inserts_match_function == None) assert (self.class_to_track == None), "Already waiting for resource of type %s" % self.class_to_track + assert (self.graph_updated_handler_id != 0), "You must call start_watching_updates() first." self.class_to_track = rdf_class @@ -413,6 +264,7 @@ class StoreHelper (Helper): """ assert (self.deletes_match_function == None) assert (self.class_to_track == None) + assert (self.graph_updated_handler_id != 0), "You must call start_watching_updates() first." def find_resource_deletion(deletes_list): self.log.debug("find_resource_deletion: looking for %i in %s", id, deletes_list) @@ -444,8 +296,8 @@ class StoreHelper (Helper): # Run the event loop until the correct notification arrives try: self.loop.run_checked() - except GraphUpdateTimeoutException: - raise GraphUpdateTimeoutException("Resource %i has not been deleted." % id) + except GraphUpdateTimeoutException as e: + raise GraphUpdateTimeoutException("Resource %i has not been deleted." % id) from e self.deletes_match_function = None self.class_to_track = None @@ -458,6 +310,7 @@ class StoreHelper (Helper): assert (self.inserts_match_function == None) assert (self.deletes_match_function == None) assert (self.class_to_track == None) + assert (self.graph_updated_handler_id != 0), "You must call start_watching_updates() first." self.log.debug("Await change to %i %s (%i, %i existing)", subject_id, property_uri, len(self.inserts_list), len(self.deletes_list)) @@ -505,14 +358,14 @@ class StoreHelper (Helper): # is useful for testing this API surface, but we recommand that all regular # applications use libtracker-sparql library to talk to the database. - def query(self, query, timeout=5000, **kwargs): - return self.resources.SparqlQuery('(s)', query, timeout=timeout, **kwargs) + def query(self, query, **kwargs): + return self.resources.SparqlQuery('(s)', query, **kwargs) - def update(self, update_sparql, timeout=5000, **kwargs): - return self.resources.SparqlUpdate('(s)', update_sparql, timeout=timeout, **kwargs) + def update(self, update_sparql, **kwargs): + return self.resources.SparqlUpdate('(s)', update_sparql, **kwargs) - def load(self, ttl_uri, timeout=5000, **kwargs): - return self.resources.Load('(s)', ttl_uri, timeout=timeout, **kwargs) + def load(self, ttl_uri, **kwargs): + return self.resources.Load('(s)', ttl_uri, **kwargs) def batch_update(self, update_sparql, **kwargs): return self.resources.BatchSparqlUpdate('(s)', update_sparql, **kwargs) @@ -582,3 +435,61 @@ class StoreHelper (Helper): return False else: raise Exception("Something fishy is going on") + + +class TrackerDBusSandbox: + """ + Private D-Bus session bus which executes a sandboxed Tracker instance. + + """ + def __init__(self, dbus_daemon_config_file, extra_env=None): + self.dbus_daemon_config_file = dbus_daemon_config_file + self.extra_env = extra_env or {} + + self.daemon = dbusdaemon.DBusDaemon() + + def start(self): + env = os.environ + env.update(self.extra_env) + env['G_MESSAGES_PREFIXED'] = 'all' + + # Precreate runtime dir, to avoid this warning from dbus-daemon: + # + # Unable to set up transient service directory: XDG_RUNTIME_DIR "/home/sam/tracker-tests/tmp_59i3ev1/run" not available: No such file or directory + # + xdg_runtime_dir = env.get('XDG_RUNTIME_DIR') + if xdg_runtime_dir: + os.makedirs(xdg_runtime_dir, exist_ok=True) + + log.info("Starting D-Bus daemon for sandbox.") + log.debug("Added environment variables: %s", self.extra_env) + self.daemon.start_if_needed(self.dbus_daemon_config_file, env=env) + + def stop(self): + tracker_processes = [] + + log.info("Looking for active Tracker processes on the bus") + for busname in self.daemon.list_names_sync(): + if busname.startswith('org.freedesktop.Tracker1'): + pid = self.daemon.get_connection_unix_process_id_sync(busname) + tracker_processes.append(pid) + + log.info("Terminating %i Tracker processes", len(tracker_processes)) + for pid in tracker_processes: + os.kill(pid, signal.SIGTERM) + + log.info("Waiting for %i Tracker processes", len(tracker_processes)) + for pid in tracker_processes: + psutil.wait_pid(pid) + + # We need to wait until Tracker processes have stopped before we + # terminate the D-Bus daemon, otherwise lots of criticals like this + # appear in the log output: + # + # (tracker-miner-fs:14955): GLib-GIO-CRITICAL **: 11:38:40.386: Error while sending AddMatch() message: The connection is closed + + log.info("Stopping D-Bus daemon for sandbox.") + self.daemon.stop() + + def get_connection(self): + return self.daemon.get_connection() |