diff options
author | Martin Pitt <martinpitt@gnome.org> | 2013-01-08 11:08:02 +0100 |
---|---|---|
committer | Martin Pitt <martinpitt@gnome.org> | 2013-02-12 05:48:34 +0100 |
commit | 7b331d0c8331f577311a3d94a4651e8fdfa7a19f (patch) | |
tree | 4dc6cc6661294c533f561d779019595c8223192b /test/test_polkitd.py | |
parent | 26d731dccfa4732b8b4b94027cbd65929e1e1a03 (diff) | |
download | gvfs-7b331d0c8331f577311a3d94a4651e8fdfa7a19f.tar.gz |
Add gvfs-testbed to enable tests which need root
The Drive test requires root privileges as it uses the scsi_debug kernel module
and running some commands as root, such as injecting a temporary udev rule for
working around some scsi_debug limitations and running udisksd under a mock
polkit daemon.
Add a "gvfs-testbed" script which sets up some unshared tmpdir overlays as a
sandbox (to ensure that the tests don't destroy anything in the real system),
set up a temporary user etc.
This also enables the Sftp.test_unknown_host, as this depends on a particular
client-side configuration and ssh does not allow using a temporary $HOME.
Integrate this into "make installcheck", so that this uses gvfs-testbed when
being called as root.
https://bugzilla.gnome.org/show_bug.cgi?id=691336
Diffstat (limited to 'test/test_polkitd.py')
-rwxr-xr-x | test/test_polkitd.py | 196 |
1 files changed, 196 insertions, 0 deletions
diff --git a/test/test_polkitd.py b/test/test_polkitd.py new file mode 100755 index 00000000..a66c4266 --- /dev/null +++ b/test/test_polkitd.py @@ -0,0 +1,196 @@ +#!/usr/bin/python3 +# (C) 2011 Sebastian Heinlein +# (C) 2012 Canonical Ltd. +# Authors: +# Sebastian Heinlein <sebi@glatzor.de> +# Martin Pitt <martin.pitt@ubuntu.com> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. + +'''Simple mock polkit daemon for test suites. + +This also provides some convenience API for launching the daemon and for +writing unittest test cases involving polkit operations. +''' + +import sys +import os +import argparse +import unittest +import signal +import time + +import dbus +import dbus.service +from gi.repository import GLib, Gio + +# ---------------------------------------------------------------------------- + +class TestPolicyKitDaemon(dbus.service.Object): + def __init__(self, allowed_actions, on_bus=None, replace=False): + '''Initialize test polkit daemon. + + @allowed_actions is a list of PolicyKit action IDs which will be + allowed (active/inactive sessions or user IDs will not be considered); + all actions not in that list will be denied. If 'all' is an element of + @allowed_actions, all actions will be allowed. + + When @on_bus string is given, the daemon will run on that D-BUS + address, otherwise on the system D-BUS. + + If @replace is True, this will replace an already running polkit daemon + on the D-BUS. + ''' + self.allowed_actions = allowed_actions + if on_bus: + bus = dbus.bus.BusConnection(on_bus) + else: + bus = dbus.SystemBus() + bus_name = dbus.service.BusName('org.freedesktop.PolicyKit1', + bus, do_not_queue=True, + replace_existing=replace, + allow_replacement=True) + bus.add_signal_receiver(self.on_disconnected, signal_name='Disconnected') + + dbus.service.Object.__init__(self, bus_name, + '/org/freedesktop/PolicyKit1/Authority') + self.loop = GLib.MainLoop() + + def run(self): + self.loop.run() + + @dbus.service.method('org.freedesktop.PolicyKit1.Authority', + in_signature='(sa{sv})sa{ss}us', + out_signature='(bba{ss})') + def CheckAuthorization(self, subject, action_id, details, flags, + cancellation_id): + if 'all' in self.allowed_actions: + allowed = True + else: + allowed = action_id in self.allowed_actions + challenged = False + details = {'test': 'test'} + return (allowed, challenged, details) + + @dbus.service.method('org.freedesktop.PolicyKit1.Authority', + in_signature='', out_signature='') + def Quit(self): + GLib.idle_add(self.loop.quit) + + def on_disconnected(self): + print('disconnected from D-BUS, terminating') + self.Quit() + +# ---------------------------------------------------------------------------- + +class PolkitTestCase(unittest.TestCase): + '''Convenient test cases involving polkit. + + Call start_polkitd() with the list of allowed actions in your test cases. + The daemon will be automatically terminated when the test case exits. + ''' + + def __init__(self, methodName='runTest'): + unittest.TestCase.__init__(self, methodName) + self.polkit_pid = None + + def start_polkitd(self, allowed_actions, on_bus=None): + '''Start test polkitd. + + This should be called in your test cases before the exercised code + makes any polkit query. The daemon will be stopped automatically when + the test case ends (regardless of whether its successful or failed). If + you want to test multiple different action sets in one test case, you + have to call stop_polkitd() before starting a new one. + + @allowed_actions is a list of PolicyKit action IDs which will be + allowed (active/inactive sessions or user IDs will not be considered); + all actions not in that list will be denied. If 'all' is an element of + @allowed_actions, all actions will be allowed. + + When @on_bus string is given, the daemon will run on that D-BUS + address, otherwise on the system D-BUS. + ''' + assert self.polkit_pid is None, \ + 'can only launch one polkitd at a time; write a separate test case or call stop_polkitd()' + self.polkit_pid = spawn(allowed_actions, on_bus) + self.addCleanup(self.stop_polkitd) + + def stop_polkitd(self): + '''Stop test polkitd. + + This happens automatically when a test case ends, but is required when + you want to test multiple different action sets in one test case. + ''' + assert self.polkit_pid is not None, 'polkitd is not running' + os.kill(self.polkit_pid, signal.SIGTERM) + os.waitpid(self.polkit_pid, 0) + self.polkit_pid = None + +# ---------------------------------------------------------------------------- + +def _run(allowed_actions, bus_address, replace=False): + # Set up the DBus main loop + import dbus.mainloop.glib + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + + polkitd = TestPolicyKitDaemon(allowed_actions, bus_address, replace) + polkitd.run() + +def spawn(allowed_actions, on_bus=None): + '''Run a TestPolicyKitDaemon instance in a separate process. + + @allowed_actions is a list of PolicyKit action IDs which will be + allowed (active/inactive sessions or user IDs will not be considered); + all actions not in that list will be denied. If 'all' is an element of + @allowed_actions, all actions will be allowed. + + When @on_bus string is given, the daemon will run on that D-BUS address, + otherwise on the system D-BUS. + + The daemon will terminate automatically when the @on_bus D-BUS goes down. + If that does not happen (e. g. you test on the actual system/session bus), + you need to kill it manually. + + Returns the process ID of the spawned daemon. + ''' + pid = os.fork() + if pid == 0: + # child + _run(allowed_actions, on_bus) + os._exit(0) + + # wait until the daemon is up on the bus + if on_bus: + bus = dbus.bus.BusConnection(on_bus) + elif 'DBUS_SYSTEM_BUS_ADDRESS' in os.environ: + # dbus.SystemBus() does not recognize this env var, so we have to + # handle that manually + bus = dbus.bus.BusConnection(os.environ['DBUS_SYSTEM_BUS_ADDRESS']) + else: + bus = dbus.SystemBus() + timeout = 50 + while timeout > 0 and not bus.name_has_owner('org.freedesktop.PolicyKit1'): + timeout -= 1 + time.sleep(0.1) + assert timeout > 0, 'test polkitd failed to start up' + + return pid + +def main(): + parser = argparse.ArgumentParser(description='Simple mock polkit daemon for test suites') + parser.add_argument('-a', '--allowed-actions', metavar='ACTION[,ACTION,...]', + default='', help='Comma separated list of allowed action ids') + parser.add_argument('-b', '--bus-address', + help='D-BUS address to listen on (if not given, listen on system D-BUS)') + parser.add_argument('-r', '--replace', action='store_true', + help='Replace existing polkit daemon on the bus') + args = parser.parse_args() + + _run(args.allowed_actions.split(','), args.bus_address, args.replace) + +if __name__ == '__main__': + main() |