diff options
Diffstat (limited to 'src/tests/integration-test')
-rwxr-xr-x | src/tests/integration-test | 270 |
1 files changed, 220 insertions, 50 deletions
diff --git a/src/tests/integration-test b/src/tests/integration-test index 22bee38..aded71b 100755 --- a/src/tests/integration-test +++ b/src/tests/integration-test @@ -89,6 +89,7 @@ class UDisksTestCase(unittest.TestCase): This provides static functions which are useful for all test cases. ''' daemon = None + daemon_path = None daemon_log = None device = None @@ -104,22 +105,22 @@ class UDisksTestCase(unittest.TestCase): sys.exit(0) # run from local build tree if we are in one, otherwise use system instance - daemon_path = os.path.join(srcdir, 'src', 'udisksd') - if (os.access (daemon_path, os.X_OK)): + klass.daemon_path = os.path.join(srcdir, 'src', 'udisksd') + if (os.access (klass.daemon_path, os.X_OK)): print('Testing binaries from local build tree') klass.check_build_tree_config() else: print('Testing installed system binaries') - daemon_path = None + klass.daemon_path = None for l in open('/usr/share/dbus-1/system-services/org.freedesktop.UDisks2.service'): if l.startswith('Exec='): - daemon_path = l.split('=', 1)[1].split()[0] + klass.daemon_path = l.split('=', 1)[1].split()[0] break - assert daemon_path, 'could not determine daemon path from D-BUS .service file' + assert klass.daemon_path, 'could not determine daemon path from D-BUS .service file' - print('daemon path: ' + daemon_path) + print('daemon path: ' + klass.daemon_path) - klass.device = klass.setup_vdev() + (klass.device, klass.cd_device) = klass.setup_vdev() # start polkit and udisks on a private DBus klass.dbus = Gio.TestDBus() @@ -135,14 +136,34 @@ class UDisksTestCase(unittest.TestCase): klass.daemon_log = open(logfile, 'w') else: klass.daemon_log = tempfile.TemporaryFile() - klass.daemon = subprocess.Popen([daemon_path], + atexit.register(klass.cleanup) + + klass.start_daemon() + + @classmethod + def cleanup(klass): + '''stop daemon again and clean up test environment''' + + subprocess.call(['umount', klass.device], stderr=subprocess.PIPE) # if a test failed + + klass.stop_daemon() + + klass.teardown_vdev(klass.device) + klass.device = None + + del os.environ['DBUS_SYSTEM_BUS_ADDRESS'] + klass.dbus.down() + + @classmethod + def start_daemon(klass): + assert klass.daemon == None + klass.daemon = subprocess.Popen([klass.daemon_path, '--replace'], stdout=klass.daemon_log, stderr=subprocess.STDOUT) assert klass.daemon.pid, 'daemon failed to start' - atexit.register(klass.cleanup) - # wait until the daemon has started up timeout = 10 + klass.manager = None while klass.manager is None and timeout > 0: time.sleep(0.2) klass.client = UDisks.Client.new_sync(None) @@ -150,25 +171,17 @@ class UDisksTestCase(unittest.TestCase): klass.manager = klass.client.get_manager() timeout -= 1 assert klass.manager, 'daemon failed to start' + assert klass.daemon.pid, 'daemon failed to start' klass.sync() @classmethod - def cleanup(klass): - '''stop daemon again and clean up test environment''' - - subprocess.call(['umount', klass.device], stderr=subprocess.PIPE) # if a test failed - + def stop_daemon(klass): + assert klass.daemon os.kill(klass.daemon.pid, signal.SIGTERM) os.wait() klass.daemon = None - klass.teardown_vdev(klass.device) - klass.device = None - - del os.environ['DBUS_SYSTEM_BUS_ADDRESS'] - klass.dbus.down() - @classmethod def sync(klass): '''Wait until pending events finished processing. @@ -212,36 +225,46 @@ class UDisksTestCase(unittest.TestCase): klass.sync() @classmethod - def devname(klass, partition=None): - '''Get name of test device or one of its partitions''' + def devname(klass, partition=None, cd=False): + '''Get name of test device or one of its partitions + If cd is True, return the CD device, otherwise the hard disk device. + ''' + if cd: + dev = klass.cd_device + else: + dev = klass.device if partition: - if klass.device[-1].isdigit(): - return klass.device + 'p' + str(partition) + if dev[-1].isdigit(): + return dev + 'p' + str(partition) else: - return klass.device + str(partition) + return dev + str(partition) else: - return klass.device + return dev @classmethod - def udisks_block(klass, partition=None): - '''Get UDisksBlock object for test device or partition''' + def udisks_block(klass, partition=None, cd=False): + '''Get UDisksBlock object for test device or partition + If cd is True, return the CD device, otherwise the hard disk device. + ''' assert klass.client - devname = klass.devname(partition) + devname = klass.devname(partition, cd) dev_t = os.stat(devname).st_rdev block = klass.client.get_block_for_dev(dev_t) assert block, 'did not find an UDisksBlock object for %s' % devname return block @classmethod - def udisks_filesystem(klass, partition=None): + def udisks_filesystem(klass, partition=None, cd=False): '''Get UDisksFilesystem object for test device or partition Return None if there is no file system on that device. + + If cd is True, return the CD device, otherwise the hard disk device. ''' - block = klass.udisks_block(partition) - return klass.client.get_object(block.get_object_path()).get_property('filesystem') + block = klass.udisks_block(partition, cd) + return klass.client.get_object(block.get_object_path()).get_filesystem() @classmethod def blkid(klass, partition=None, device=None): @@ -364,38 +387,70 @@ class UDisksTestCase(unittest.TestCase): @classmethod def setup_vdev(klass): - '''create virtual test device + '''create virtual test devices It is zeroed out initially. - Return the device path. + Return a pair (writable HD device path, readonly CD device path). ''' # ensure that the scsi_debug module is loaded if os.path.isdir('/sys/module/scsi_debug'): sys.stderr.write('The scsi_debug module is already loaded; please remove before running this test.\n') sys.exit(1) + # work around scsi_debug not implementing CD-ROM SCSI commands, so that + # udev's cdrom_id does not recognize tracks + scsi_debug_rules = '/run/udev/rules.d/60-persistent-storage-scsi_debug.rules' + if os.path.isdir('/run/udev/rules.d') and not os.path.exists(scsi_debug_rules): + with open(scsi_debug_rules, 'w') as f: + f.write('''KERNEL=="sr*", ENV{DISK_EJECT_REQUEST}!="?*", ATTRS{model}=="scsi_debug*", ENV{ID_CDROM_MEDIA}=="?*", IMPORT{program}="/sbin/blkid -o udev -p -u noraid $tempnode" +''') + # reload udev + subprocess.call('sync; pkill --signal HUP udevd || pkill --signal HUP systemd-udevd', + shell=True) + + # craete a fake SCSI hard drive assert subprocess.call(['modprobe', 'scsi_debug', 'dev_size_mb=%i' % ( VDEV_SIZE/1048576)]) == 0, 'Failure to modprobe scsi_debug' - # wait until all drives are created - dirs = [] - while len(dirs) < 1: - dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block') + # wait until drive got created + rw_dirs = [] + while len(rw_dirs) < 1: + rw_dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block') time.sleep(0.1) - assert len(dirs) == 1 + assert len(rw_dirs) == 1 + + # create a fake CD-ROM, too + with open('/sys/bus/pseudo/drivers/scsi_debug/ptype', 'w') as f: + f.write('5') # henceforth, created devices will be CD drives + with open('/sys/bus/pseudo/drivers/scsi_debug/add_host', 'w') as f: + f.write('1') # generate a new drive + subprocess.call(['udevadm', 'settle']) + with open('/sys/bus/pseudo/drivers/scsi_debug/ptype', 'w') as f: + f.write('0') + + ro_dirs = [] + while len(ro_dirs) < 2: + ro_dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block') + time.sleep(0.1) + ro_dirs.remove(rw_dirs[0]) + assert len(ro_dirs) == 1 # determine the debug block devices - devs = os.listdir(dirs[0]) + devs = os.listdir(ro_dirs[0]) + assert len(devs) == 1 + ro_dev = '/dev/' + devs[0] + devs = os.listdir(rw_dirs[0]) assert len(devs) == 1 - dev = '/dev/' + devs[0] - assert os.path.exists(dev) + rw_dev = '/dev/' + devs[0] + assert os.path.exists(ro_dev) + assert os.path.exists(rw_dev) # let's be 100% sure that we pick a virtual one - assert open('/sys/block/%s/device/model' % devs[0]).read().strip() == 'scsi_debug' + assert open('/sys/block/%s/device/model' % os.path.basename(rw_dev)).read().strip() == 'scsi_debug' - print('Set up test device: ' + dev) - return dev + print('Set up test device: r/w: %s, r/o: %s' % (rw_dev, ro_dev)) + return (rw_dev, ro_dev) @classmethod def teardown_vdev(klass, device): @@ -680,6 +735,63 @@ class FS(UDisksTestCase): self.client.settle() self.assertEqual(fs.get_property('mount-points'), []) + def test_existing_manual_mount_point(self): + '''fs: does not reuse existing manual mount point''' + + self.mkfs('ext4', 'udiskstest') + fs = self.udisks_filesystem() + + # mount it, determine mount path, and unmount again + mount_path = fs.call_mount_sync(no_options, None) + self.assertTrue(mount_path.endswith('udiskstest')) + + self.retry_busy(fs.call_unmount_sync, no_options, None) + self.client.settle() + self.assertEqual(fs.get_property('mount-points'), []) + + # cleans up mountpoint + self.assertFalse(os.path.exists(mount_path)) + + # now manually create the mount point + os.mkdir(mount_path) + + # now this should use mount_path + '1' + try: + new_mount_path = fs.call_mount_sync(no_options, None) + self.retry_busy(fs.call_unmount_sync, no_options, None) + self.client.settle() + self.assertEqual(fs.get_property('mount-points'), []) + self.assertEqual(new_mount_path, mount_path + '1') + finally: + os.rmdir(mount_path) + + def test_existing_udisks_mount_point(self): + '''fs: reuses existing udisks mount point''' + + self.mkfs('ext4', 'udiskstest') + fs = self.udisks_filesystem() + + # mount it, determine mount path + mount_path = fs.call_mount_sync(no_options, None) + self.assertTrue(mount_path.endswith('udiskstest')) + + # stop the daemon (happens during a package upgrade) + UDisksTestCase.stop_daemon() + + # mount should still be there; unmount it manually + self.assertTrue(self.is_mountpoint(mount_path)) + subprocess.check_call(['umount', mount_path]) + + # restart daemon, mount again; this should use the same mount point as + # before + UDisksTestCase.start_daemon() + fs = self.udisks_filesystem() + new_mount_path = fs.call_mount_sync(no_options, None) + self.retry_busy(fs.call_unmount_sync, no_options, None) + self.client.settle() + self.assertEqual(fs.get_property('mount-points'), []) + self.assertEqual(new_mount_path, mount_path) + def _do_fs_check(self, type): '''Run checks for a particular file system.''' @@ -876,6 +988,32 @@ class FS(UDisksTestCase): # check fs - Not implemented in udisks yet #self.assertEqual(iface.FilesystemCheck([]), True) + # check mounting of a read-only device + # this is known-broken for reiserfs and xfs right now: + # https://github.com/karelzak/util-linux/issues/17 + # https://github.com/karelzak/util-linux/issues/18 + if type not in ['reiserfs', 'xfs']: + # the scsi_debug CD drive content is the same as for the HD drive, but + # udev does not know about this; so give it a nudge to re-probe + subprocess.call(['udevadm', 'trigger', '--action=change', + '--sysname-match=' + os.path.basename(self.cd_device)]) + self.sync() + self.sync() + cd_fs = self.udisks_filesystem(cd=True) + + mount_path = cd_fs.call_mount_sync(no_options, None) + try: + self.assertTrue('/media/' in mount_path) + self.sync() + self.assertEqual(cd_fs.get_property('mount-points'), [mount_path]) + self.assertTrue(self.is_mountpoint(mount_path)) + + self.assertEqual(self.udisks_block(cd=True).get_property('read-only'), True) + finally: + self.retry_busy(cd_fs.call_unmount_sync, no_options, None) + self.assertFalse(os.path.exists(mount_path), 'mount point was not removed') + self.assertEqual(cd_fs.get_property('mount-points'), [mount_path]) + def _do_file_perms_checks(self, type, mount_point): '''Check for permissions for data files and executables. @@ -1090,12 +1228,24 @@ class Luks(UDisksTestCase): self.assertTrue(mount_path.endswith('treasure')) # removal should clean up mounts - self.remove_device(self.device) - self.assertFalse(os.path.exists(mount_path)) - self.assertEqual(self.client.get_object(path), None) + try: + self.remove_device(self.device) + self.assertFalse(os.path.exists(mount_path)) + timeout = 50 + while timeout > 0: + if self.client.get_object(path) is None: + break + timeout -= 1 + # we do not have a main loop, and cannot currently use + # g_main_context_get_default() from introspection, so + # instead of refreshing self.client, get a new one + self.client = UDisks.Client.new_sync(None) + time.sleep(0.1) + self.assertGreater(timeout, 0, 'timeout waiting for object path %s to disappear' % path) + finally: + self.readd_devices() # after putting it back, it should be mountable again - self.readd_devices() crypt_obj = self.client.get_object(self.udisks_block().get_object_path()) path = crypt_obj.get_property('encrypted').call_unlock_sync('s3kr1t', no_options, None) @@ -1150,6 +1300,26 @@ class Polkit(UDisksTestCase, test_polkitd.PolkitTestCase): self.assertEqual(block.get_property('id-type'), 'ext4') self.assertEqual(block.get_property('id-label'), 'polkityes') + def test_removable_fs(self): + '''Create FS on removable drive (allowed)''' + + self.start_polkitd(['org.freedesktop.udisks2.filesystem-mount']) + + # the scsi_debug CD drive content is the same as for the HD drive, but + # udev does not know about this; so give it a nudge to re-probe + subprocess.call(['udevadm', 'trigger', '--action=change', + '--sysname-match=' + os.path.basename(self.cd_device)]) + self.sync() + self.sync() + + fs = self.udisks_filesystem(cd=True) + self.assertNotEqual(fs, None) + mount_path = fs.call_mount_sync(no_options, None) + self.assertTrue('/media/' in mount_path, mount_path) + + self.retry_busy(fs.call_unmount_sync, no_options, None) + self.client.settle() + # ---------------------------------------------------------------------------- if __name__ == '__main__': |