#!/usr/bin/env python3 # # Copyright © 2018 Tomasz Miąsko # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the licence, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, see . import os import subprocess import sys import tempfile import time import unittest from textwrap import dedent DAEMON_CONFIG = ''' session unix:tmpdir=/tmp {servicedir} EXTERNAL ''' SERVICE_CONFIG = '''\ [D-BUS Service] Name={name} Exec={exec} ''' def dconf(*args, **kwargs): argv = [dconf_exe] argv.extend(args) # Setup convenient defaults: kwargs.setdefault('check', True) kwargs.setdefault('stdout', subprocess.PIPE) kwargs.setdefault('universal_newlines', True) return subprocess.run(argv, **kwargs) def dconf_read(key): return dconf('read', key).stdout.rstrip('\n') def dconf_write(key, value): dconf('write', key, value) def dconf_list(key): return dconf('list', key).stdout.splitlines() def dconf_complete(suffix, prefix): lines = dconf('_complete', suffix, prefix).stdout.splitlines() lines.sort() return lines def dconf_watch(path): args = [dconf_exe, 'watch', path] return subprocess.Popen(args, stdout=subprocess.PIPE, universal_newlines=True) class DBusTest(unittest.TestCase): def setUp(self): self.temporary_dir = tempfile.TemporaryDirectory() self.runtime_dir = os.path.join(self.temporary_dir.name, 'run') self.config_home = os.path.join(self.temporary_dir.name, 'config') self.dbus_dir = os.path.join(self.temporary_dir.name, 'dbus-1') os.mkdir(self.runtime_dir, mode=0o700) os.mkdir(self.config_home, mode=0o700) os.mkdir(self.dbus_dir, mode=0o700) os.environ['XDG_RUNTIME_DIR'] = self.runtime_dir os.environ['XDG_CONFIG_HOME'] = self.config_home # Prepare dbus-daemon config. dbus_daemon_config = os.path.join(self.dbus_dir, 'session.conf') with open(dbus_daemon_config, 'w') as file: file.write(DAEMON_CONFIG.format(servicedir=self.dbus_dir)) # Prepare service config. name = 'ca.desrt.dconf' path = os.path.join(self.dbus_dir, '{}.service'.format(name)) with open(path, 'w') as file: config = SERVICE_CONFIG.format(name=name, exec=dconf_service_exe) file.write(config) # Pipe where daemon will write its address. read_fd, write_fd = os.pipe2(0) args = ['dbus-daemon', '--config-file={}'.format(dbus_daemon_config), '--nofork', '--print-address={}'.format(write_fd)] # Start daemon self.dbus_daemon_process = subprocess.Popen(args, pass_fds=[write_fd]) # Close our writing end of pipe. Daemon closes its own writing end of # pipe after printing address, so subsequent reads shouldn't block. os.close(write_fd) with os.fdopen(read_fd) as f: dbus_address = f.read().rstrip() # Prepare environment os.environ['DBUS_SESSION_BUS_ADDRESS'] = dbus_address def tearDown(self): # Terminate dbus-daemon. p = self.dbus_daemon_process try: p.terminate() p.wait(timeout=0.5) except subprocess.TimeoutExpired: p.kill() p.wait() self.temporary_dir.cleanup() def test_read_nonexisiting(self): """Reading missing key produces no output. """ self.assertEqual('', dconf_read('/key')) def test_write_read(self): """Read returns previously written value.""" dconf('write', '/key', '0') self.assertEqual('0', dconf_read('/key')) dconf('write', '/key', '"hello there"') self.assertEqual("'hello there'", dconf_read('/key')) def test_list(self): """List returns a list of names inside given directory. Results include both keys and subdirectories. """ dconf('write', '/org/gnome/app/fullscreen', 'true') dconf('write', '/org/gnome/terminal/profile', '"default"') dconf('write', '/key', '42') self.assertEqual(['key', 'org/'], dconf_list('/')) self.assertEqual(['gnome/'], dconf_list('/org/')) def test_list_missing(self): """List can be used successfully with non existing directories. """ self.assertEqual([], dconf_list('/no-existing/directory/')) def test_reset_key(self): """Reset can be used to reset a value of a single key.""" dconf('write', '/app/width', '1024') dconf('write', '/app/height', '768') dconf('write', '/app/fullscreen', 'true') # Sanity check. self.assertEqual(['fullscreen', 'height', 'width'], dconf_list('/app/')) # Reset one key after another: dconf('reset', '/app/fullscreen') self.assertEqual(['height', 'width'], dconf_list('/app/')) dconf('reset', '/app/width') self.assertEqual(['height'], dconf_list('/app/')) dconf('reset', '/app/height') self.assertEqual([], dconf_list('/app/')) def test_reset_dir(self): """Reseting whole directory is possible with -f option. It is an error not to use -f when resetting a dir. """ dconf('write', '/app/a', '1') dconf('write', '/app/b', '2') dconf('write', '/app/c/d', '3') dconf('write', '/x', '4') dconf('write', '/y/z', '5') with self.assertRaises(subprocess.CalledProcessError) as cm: dconf('reset', '/app/', stderr=subprocess.PIPE) self.assertRegex(cm.exception.stderr, '-f must be given') self.assertRegex(cm.exception.stderr, 'Usage:') # Nothing should be removed just yet. self.assertTrue(['a', 'b', 'c'], dconf_list('/app/')) # Try again with -f. dconf('reset', '-f', '/app/') # /app/ should be gone now: self.assertEqual(['x', 'y/'], dconf_list('/')) def test_watch(self): """Watch reports changes made using write command. Only changes made inside given subdirectory should be reported. """ watch_root = dconf_watch('/') watch_org = dconf_watch('/org/') # Arbitrary delay to give "dconf watch" time to set-up the watch. # In the case this turns out to be problematic, dconf tool could be # changed to produce debug message after `dconf_client_watch_sync`, # so that we could synchronize on its output. time.sleep(0.2) dconf('write', '/com/a', '1') dconf('write', '/org/b', '2') dconf('write', '/organ/c', '3') dconf('write', '/c', '4') # Again, give "dconf watch" some time to pick-up changes. time.sleep(0.2) watch_root.terminate() watch_org.terminate() watch_root.wait() watch_org.wait() # Watch for '/' should capture all writes. expected = '''\ /com/a 1 /org/b 2 /organ/c 3 /c 4 ''' self.assertEqual(dedent(expected), watch_root.stdout.read()) # Watch for '/org/' should capture only a subset of all writes: expected = '''\ /org/b 2 ''' self.assertEqual(dedent(expected), watch_org.stdout.read()) def test_dump_load(self): """Checks that output produced with dump can be used with load and vice versa. """ keyfile = dedent('''\ [/] password='secret' [org/editor] window-fullscreen=true window-size=(1024, 768) [org/editor/language/c-sharp] tab-width=8 [org/editor/language/c] tab-width=2 ''') # Load and dump is identity. dconf('load', '/', input=keyfile) self.assertEqual(dconf('dump', '/').stdout, keyfile) # Copy /org/ directory to /com/. keyfile = dconf('dump', '/org/').stdout dconf('load', '/com/', input=keyfile) # Verify that /org/ and /com/ are now exactly the same. keyfile_org = dconf('dump', '/org/').stdout keyfile_com = dconf('dump', '/com/').stdout self.assertEqual(keyfile_org, keyfile_com) def test_complete(self): """Tests _complete command used internally to implement bash completion. Runs completion queries after loading a sample database from key-file. """ keyfile = dedent('''\ [org] calamity=false [org/calculator] window-position=(0, 0) [org/calendar] window-position=(0, 0) [org/history] file0='/tmp/a' file1='/tmp/b' file2='/tmp/c' ''') dconf('load', '/', input=keyfile) # Empty string is completed to '/'. completions = dconf_complete('', '') self.assertEqual(completions, ['/']) completions = dconf_complete('/', '') self.assertEqual(completions, ['/']) # Invalid paths don't return any completions. completions = dconf_complete('', 'foo/') self.assertEqual(completions, []) completions = dconf_complete('/', 'foo/') self.assertEqual(completions, []) # Key completions include trailing whitespace, # directory completions do not. completions = dconf_complete('', '/org/') self.assertEqual(completions, ['/org/calamity ', '/org/calculator/', '/org/calendar/', '/org/history/']) # Only matches with given prefix are returned. completions = dconf_complete('', '/org/cal') self.assertEqual(completions, ['/org/calamity ', '/org/calculator/', '/org/calendar/']) # Only matches with given suffix are returned. completions = dconf_complete('/', '/org/cal') self.assertEqual(completions, ['/org/calculator/', '/org/calendar/']) def test_compile_precedence(self): """Compile processes key-files in reverse lexicographical order. When key occurs in multiple files, the value from file processed first is preferred. Test that by preparing four key-files each with a different value for '/org/file'. Compiling it directly into user database, and performing read to check which value had been selected. """ # Prepare key file database directory. user_d = os.path.join(self.temporary_dir.name, 'user.d') os.mkdir(user_d, mode=0o700) # Required from compile utility specifically. os.mkdir(os.path.join(self.config_home, 'dconf'), mode=0o700) def write_config_d(name): keyfile = dedent(''' [org] file = {name} '''.format(name=name)) with open(os.path.join(user_d, name), 'w') as file: file.write(keyfile) write_config_d('00') write_config_d('25') write_config_d('50') write_config_d('99') # Compile directly into user configuration file. dconf('compile', os.path.join(self.config_home, 'dconf', 'user'), user_d) # Lexicographically last value should win: self.assertEqual(dconf_read('/org/file'), '99') @unittest.expectedFailure def test_redundant_disk_writes(self): """Redundant disk writes are avoided. When write or reset operations don't modify actual contents of the database, the database file shouldn't be needlessly rewritten. Check mtime after each redundant operation to verify that. """ config = os.path.join(self.config_home, 'dconf', 'user') def move_time_back(path): """Moves file mtime 60 seconds back and returns its new value. Used to avoid false positives during comparison checks in the case that mtime is stored with low precision. """ atime = os.path.getatime(config) mtime = os.path.getmtime(config) os.utime(config, times=(atime, mtime - 60)) return os.path.getmtime(config) # Activate service to trigger initial database write. dconf_write('/prime', '5') # Sanity check that database is rewritten when necessary. saved_mtime = move_time_back(config) dconf_write('/prime', '13') self.assertLess(saved_mtime, os.path.getmtime(config)) # Write the same value as one already in the database. saved_mtime = move_time_back(config) dconf('write', '/prime', '13') self.assertEqual(saved_mtime, os.path.getmtime(config)) # Reset not directory which is not present in the database. saved_mtime = move_time_back(config) dconf('reset', '-f', '/non-existing/directory/') self.assertEqual(saved_mtime, os.path.getmtime(config)) if __name__ == '__main__': # Make sure we don't pick up mandatory profile. mandatory_profile = '/run/dconf/user/{}'.format(os.getuid()) assert not os.path.isfile(mandatory_profile) # Avoid profile sourced from environment os.environ.pop('DCONF_PROFILE', None) if len(sys.argv) < 3: message = 'Usage: {} path-to-dconf path-to-dconf-service'.format( sys.argv[0]) raise RuntimeError(message) dconf_exe, dconf_service_exe = sys.argv[1:3] del sys.argv[1:3] # Run tests! unittest.main()