summaryrefslogtreecommitdiff
path: root/tests/functional-tests/fixtures.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/functional-tests/fixtures.py')
-rw-r--r--tests/functional-tests/fixtures.py115
1 files changed, 115 insertions, 0 deletions
diff --git a/tests/functional-tests/fixtures.py b/tests/functional-tests/fixtures.py
index 08d16da17..8c16b40b4 100644
--- a/tests/functional-tests/fixtures.py
+++ b/tests/functional-tests/fixtures.py
@@ -32,6 +32,7 @@ import logging
import os
import pathlib
import multiprocessing
+import threading
import shutil
import subprocess
import sys
@@ -148,6 +149,120 @@ class TrackerSparqlBusTest (ut.TestCase):
shutil.rmtree(self.tmpdir, ignore_errors=True)
+class TrackerPortalTest(ut.TestCase):
+ @classmethod
+ def database_process_fn(self, service_name, in_queue, out_queue, dbus_address):
+ # This runs in a separate process and provides a clean Tracker database
+ # exported over D-Bus to the main test process.
+
+ log.info("Started database thread")
+
+ bus = Gio.DBusConnection.new_for_address_sync(
+ dbus_address,
+ Gio.DBusConnectionFlags.AUTHENTICATION_CLIENT |
+ Gio.DBusConnectionFlags.MESSAGE_BUS_CONNECTION, None, None)
+
+ conn = Tracker.SparqlConnection.new(
+ Tracker.SparqlConnectionFlags.NONE,
+ None,
+ Gio.File.new_for_path(cfg.ontologies_dir()),
+ None)
+
+ endpoint = Tracker.EndpointDBus.new(conn, bus, None, None)
+
+ bus.call_sync(
+ 'org.freedesktop.DBus',
+ '/org/freedesktop/DBus',
+ 'org.freedesktop.DBus',
+ 'RequestName',
+ GLib.Variant('(su)', (service_name, 0x4)),
+ None, 0, -1, None)
+
+ loop = GLib.MainLoop.new(None, False)
+
+ def pop_update(message_queue):
+ try:
+ sparql = message_queue.get_nowait()
+ if sparql is None:
+ loop.quit()
+ conn.update(sparql, None)
+ out_queue.put(None)
+ except Exception:
+ pass
+ return GLib.SOURCE_CONTINUE
+
+ GLib.timeout_add (50, pop_update, in_queue)
+ out_queue.put(None)
+ loop.run()
+
+ bus.close(None)
+
+ def setUp(self):
+ extra_env = {}
+ extra_env['TRACKER_TEST_PORTAL_FLATPAK_INFO'] = cfg.TEST_PORTAL_FLATPAK_INFO
+
+ self.message_queues = {}
+ self.connections = {}
+ self.sandbox = trackertestutils.helpers.TrackerDBusSandbox(
+ session_bus_config_file=cfg.TEST_DBUS_DAEMON_CONFIG_FILE, extra_env=extra_env)
+
+ self.sandbox.start()
+
+ self.bus = self.sandbox.get_session_bus_connection()
+ self.dbus_address = self.sandbox.get_session_bus_address()
+
+ try:
+ log.info("Starting portal")
+ self._portal_proxy = Gio.DBusProxy.new_sync(
+ self.bus,
+ Gio.DBusProxyFlags.NONE, None,
+ 'org.freedesktop.portal.Tracker',
+ '/org/freedesktop/portal/Tracker',
+ 'org.freedesktop.portal.Tracker',
+ None)
+
+ except Exception:
+ self.sandbox.stop()
+ raise
+
+ def tearDown(self):
+ for service in self.message_queues:
+ self.stop_service(service)
+ self.sandbox.stop()
+
+ def start_service(self, service_name):
+ in_queue = multiprocessing.Queue()
+ out_queue = multiprocessing.Queue()
+ thread = threading.Thread(
+ target=self.database_process_fn,
+ args=(service_name, out_queue, in_queue, self.dbus_address))
+ thread.start()
+ in_queue.get()
+ self.message_queues[service_name] = [ in_queue, out_queue ]
+
+ def stop_service(self, service_name):
+ queues = self.message_queues[service_name]
+ if queues is not None:
+ queues[1].put(None)
+
+ def update(self, service_name, sparql):
+ if sparql is not None:
+ # Updates go through the message queue, bypassing the sandbox
+ queues = self.message_queues[service_name]
+ if queues is not None:
+ queues[1].put(sparql)
+ queues[0].get()
+
+ def query(self, service_name, sparql):
+ if service_name not in self.connections:
+ conn = Tracker.SparqlConnection.bus_new(service_name, None, self.bus)
+ store = trackertestutils.helpers.StoreHelper(conn)
+ self.connections[service_name] = store
+ else:
+ store = self.connections[service_name]
+
+ return store.query(sparql)
+
class CliError(Exception):
pass