summaryrefslogtreecommitdiff
path: root/src/tests/integration-test
diff options
context:
space:
mode:
Diffstat (limited to 'src/tests/integration-test')
-rwxr-xr-xsrc/tests/integration-test270
1 files changed, 220 insertions, 50 deletions
diff --git a/src/tests/integration-test b/src/tests/integration-test
index 22bee38..aded71b 100755
--- a/src/tests/integration-test
+++ b/src/tests/integration-test
@@ -89,6 +89,7 @@ class UDisksTestCase(unittest.TestCase):
This provides static functions which are useful for all test cases.
'''
daemon = None
+ daemon_path = None
daemon_log = None
device = None
@@ -104,22 +105,22 @@ class UDisksTestCase(unittest.TestCase):
sys.exit(0)
# run from local build tree if we are in one, otherwise use system instance
- daemon_path = os.path.join(srcdir, 'src', 'udisksd')
- if (os.access (daemon_path, os.X_OK)):
+ klass.daemon_path = os.path.join(srcdir, 'src', 'udisksd')
+ if (os.access (klass.daemon_path, os.X_OK)):
print('Testing binaries from local build tree')
klass.check_build_tree_config()
else:
print('Testing installed system binaries')
- daemon_path = None
+ klass.daemon_path = None
for l in open('/usr/share/dbus-1/system-services/org.freedesktop.UDisks2.service'):
if l.startswith('Exec='):
- daemon_path = l.split('=', 1)[1].split()[0]
+ klass.daemon_path = l.split('=', 1)[1].split()[0]
break
- assert daemon_path, 'could not determine daemon path from D-BUS .service file'
+ assert klass.daemon_path, 'could not determine daemon path from D-BUS .service file'
- print('daemon path: ' + daemon_path)
+ print('daemon path: ' + klass.daemon_path)
- klass.device = klass.setup_vdev()
+ (klass.device, klass.cd_device) = klass.setup_vdev()
# start polkit and udisks on a private DBus
klass.dbus = Gio.TestDBus()
@@ -135,14 +136,34 @@ class UDisksTestCase(unittest.TestCase):
klass.daemon_log = open(logfile, 'w')
else:
klass.daemon_log = tempfile.TemporaryFile()
- klass.daemon = subprocess.Popen([daemon_path],
+ atexit.register(klass.cleanup)
+
+ klass.start_daemon()
+
+ @classmethod
+ def cleanup(klass):
+ '''stop daemon again and clean up test environment'''
+
+ subprocess.call(['umount', klass.device], stderr=subprocess.PIPE) # if a test failed
+
+ klass.stop_daemon()
+
+ klass.teardown_vdev(klass.device)
+ klass.device = None
+
+ del os.environ['DBUS_SYSTEM_BUS_ADDRESS']
+ klass.dbus.down()
+
+ @classmethod
+ def start_daemon(klass):
+ assert klass.daemon == None
+ klass.daemon = subprocess.Popen([klass.daemon_path, '--replace'],
stdout=klass.daemon_log, stderr=subprocess.STDOUT)
assert klass.daemon.pid, 'daemon failed to start'
- atexit.register(klass.cleanup)
-
# wait until the daemon has started up
timeout = 10
+ klass.manager = None
while klass.manager is None and timeout > 0:
time.sleep(0.2)
klass.client = UDisks.Client.new_sync(None)
@@ -150,25 +171,17 @@ class UDisksTestCase(unittest.TestCase):
klass.manager = klass.client.get_manager()
timeout -= 1
assert klass.manager, 'daemon failed to start'
+ assert klass.daemon.pid, 'daemon failed to start'
klass.sync()
@classmethod
- def cleanup(klass):
- '''stop daemon again and clean up test environment'''
-
- subprocess.call(['umount', klass.device], stderr=subprocess.PIPE) # if a test failed
-
+ def stop_daemon(klass):
+ assert klass.daemon
os.kill(klass.daemon.pid, signal.SIGTERM)
os.wait()
klass.daemon = None
- klass.teardown_vdev(klass.device)
- klass.device = None
-
- del os.environ['DBUS_SYSTEM_BUS_ADDRESS']
- klass.dbus.down()
-
@classmethod
def sync(klass):
'''Wait until pending events finished processing.
@@ -212,36 +225,46 @@ class UDisksTestCase(unittest.TestCase):
klass.sync()
@classmethod
- def devname(klass, partition=None):
- '''Get name of test device or one of its partitions'''
+ def devname(klass, partition=None, cd=False):
+ '''Get name of test device or one of its partitions
+ If cd is True, return the CD device, otherwise the hard disk device.
+ '''
+ if cd:
+ dev = klass.cd_device
+ else:
+ dev = klass.device
if partition:
- if klass.device[-1].isdigit():
- return klass.device + 'p' + str(partition)
+ if dev[-1].isdigit():
+ return dev + 'p' + str(partition)
else:
- return klass.device + str(partition)
+ return dev + str(partition)
else:
- return klass.device
+ return dev
@classmethod
- def udisks_block(klass, partition=None):
- '''Get UDisksBlock object for test device or partition'''
+ def udisks_block(klass, partition=None, cd=False):
+ '''Get UDisksBlock object for test device or partition
+ If cd is True, return the CD device, otherwise the hard disk device.
+ '''
assert klass.client
- devname = klass.devname(partition)
+ devname = klass.devname(partition, cd)
dev_t = os.stat(devname).st_rdev
block = klass.client.get_block_for_dev(dev_t)
assert block, 'did not find an UDisksBlock object for %s' % devname
return block
@classmethod
- def udisks_filesystem(klass, partition=None):
+ def udisks_filesystem(klass, partition=None, cd=False):
'''Get UDisksFilesystem object for test device or partition
Return None if there is no file system on that device.
+
+ If cd is True, return the CD device, otherwise the hard disk device.
'''
- block = klass.udisks_block(partition)
- return klass.client.get_object(block.get_object_path()).get_property('filesystem')
+ block = klass.udisks_block(partition, cd)
+ return klass.client.get_object(block.get_object_path()).get_filesystem()
@classmethod
def blkid(klass, partition=None, device=None):
@@ -364,38 +387,70 @@ class UDisksTestCase(unittest.TestCase):
@classmethod
def setup_vdev(klass):
- '''create virtual test device
+ '''create virtual test devices
It is zeroed out initially.
- Return the device path.
+ Return a pair (writable HD device path, readonly CD device path).
'''
# ensure that the scsi_debug module is loaded
if os.path.isdir('/sys/module/scsi_debug'):
sys.stderr.write('The scsi_debug module is already loaded; please remove before running this test.\n')
sys.exit(1)
+ # work around scsi_debug not implementing CD-ROM SCSI commands, so that
+ # udev's cdrom_id does not recognize tracks
+ scsi_debug_rules = '/run/udev/rules.d/60-persistent-storage-scsi_debug.rules'
+ if os.path.isdir('/run/udev/rules.d') and not os.path.exists(scsi_debug_rules):
+ with open(scsi_debug_rules, 'w') as f:
+ f.write('''KERNEL=="sr*", ENV{DISK_EJECT_REQUEST}!="?*", ATTRS{model}=="scsi_debug*", ENV{ID_CDROM_MEDIA}=="?*", IMPORT{program}="/sbin/blkid -o udev -p -u noraid $tempnode"
+''')
+ # reload udev
+ subprocess.call('sync; pkill --signal HUP udevd || pkill --signal HUP systemd-udevd',
+ shell=True)
+
+ # craete a fake SCSI hard drive
assert subprocess.call(['modprobe', 'scsi_debug', 'dev_size_mb=%i' % (
VDEV_SIZE/1048576)]) == 0, 'Failure to modprobe scsi_debug'
- # wait until all drives are created
- dirs = []
- while len(dirs) < 1:
- dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block')
+ # wait until drive got created
+ rw_dirs = []
+ while len(rw_dirs) < 1:
+ rw_dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block')
time.sleep(0.1)
- assert len(dirs) == 1
+ assert len(rw_dirs) == 1
+
+ # create a fake CD-ROM, too
+ with open('/sys/bus/pseudo/drivers/scsi_debug/ptype', 'w') as f:
+ f.write('5') # henceforth, created devices will be CD drives
+ with open('/sys/bus/pseudo/drivers/scsi_debug/add_host', 'w') as f:
+ f.write('1') # generate a new drive
+ subprocess.call(['udevadm', 'settle'])
+ with open('/sys/bus/pseudo/drivers/scsi_debug/ptype', 'w') as f:
+ f.write('0')
+
+ ro_dirs = []
+ while len(ro_dirs) < 2:
+ ro_dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block')
+ time.sleep(0.1)
+ ro_dirs.remove(rw_dirs[0])
+ assert len(ro_dirs) == 1
# determine the debug block devices
- devs = os.listdir(dirs[0])
+ devs = os.listdir(ro_dirs[0])
+ assert len(devs) == 1
+ ro_dev = '/dev/' + devs[0]
+ devs = os.listdir(rw_dirs[0])
assert len(devs) == 1
- dev = '/dev/' + devs[0]
- assert os.path.exists(dev)
+ rw_dev = '/dev/' + devs[0]
+ assert os.path.exists(ro_dev)
+ assert os.path.exists(rw_dev)
# let's be 100% sure that we pick a virtual one
- assert open('/sys/block/%s/device/model' % devs[0]).read().strip() == 'scsi_debug'
+ assert open('/sys/block/%s/device/model' % os.path.basename(rw_dev)).read().strip() == 'scsi_debug'
- print('Set up test device: ' + dev)
- return dev
+ print('Set up test device: r/w: %s, r/o: %s' % (rw_dev, ro_dev))
+ return (rw_dev, ro_dev)
@classmethod
def teardown_vdev(klass, device):
@@ -680,6 +735,63 @@ class FS(UDisksTestCase):
self.client.settle()
self.assertEqual(fs.get_property('mount-points'), [])
+ def test_existing_manual_mount_point(self):
+ '''fs: does not reuse existing manual mount point'''
+
+ self.mkfs('ext4', 'udiskstest')
+ fs = self.udisks_filesystem()
+
+ # mount it, determine mount path, and unmount again
+ mount_path = fs.call_mount_sync(no_options, None)
+ self.assertTrue(mount_path.endswith('udiskstest'))
+
+ self.retry_busy(fs.call_unmount_sync, no_options, None)
+ self.client.settle()
+ self.assertEqual(fs.get_property('mount-points'), [])
+
+ # cleans up mountpoint
+ self.assertFalse(os.path.exists(mount_path))
+
+ # now manually create the mount point
+ os.mkdir(mount_path)
+
+ # now this should use mount_path + '1'
+ try:
+ new_mount_path = fs.call_mount_sync(no_options, None)
+ self.retry_busy(fs.call_unmount_sync, no_options, None)
+ self.client.settle()
+ self.assertEqual(fs.get_property('mount-points'), [])
+ self.assertEqual(new_mount_path, mount_path + '1')
+ finally:
+ os.rmdir(mount_path)
+
+ def test_existing_udisks_mount_point(self):
+ '''fs: reuses existing udisks mount point'''
+
+ self.mkfs('ext4', 'udiskstest')
+ fs = self.udisks_filesystem()
+
+ # mount it, determine mount path
+ mount_path = fs.call_mount_sync(no_options, None)
+ self.assertTrue(mount_path.endswith('udiskstest'))
+
+ # stop the daemon (happens during a package upgrade)
+ UDisksTestCase.stop_daemon()
+
+ # mount should still be there; unmount it manually
+ self.assertTrue(self.is_mountpoint(mount_path))
+ subprocess.check_call(['umount', mount_path])
+
+ # restart daemon, mount again; this should use the same mount point as
+ # before
+ UDisksTestCase.start_daemon()
+ fs = self.udisks_filesystem()
+ new_mount_path = fs.call_mount_sync(no_options, None)
+ self.retry_busy(fs.call_unmount_sync, no_options, None)
+ self.client.settle()
+ self.assertEqual(fs.get_property('mount-points'), [])
+ self.assertEqual(new_mount_path, mount_path)
+
def _do_fs_check(self, type):
'''Run checks for a particular file system.'''
@@ -876,6 +988,32 @@ class FS(UDisksTestCase):
# check fs - Not implemented in udisks yet
#self.assertEqual(iface.FilesystemCheck([]), True)
+ # check mounting of a read-only device
+ # this is known-broken for reiserfs and xfs right now:
+ # https://github.com/karelzak/util-linux/issues/17
+ # https://github.com/karelzak/util-linux/issues/18
+ if type not in ['reiserfs', 'xfs']:
+ # the scsi_debug CD drive content is the same as for the HD drive, but
+ # udev does not know about this; so give it a nudge to re-probe
+ subprocess.call(['udevadm', 'trigger', '--action=change',
+ '--sysname-match=' + os.path.basename(self.cd_device)])
+ self.sync()
+ self.sync()
+ cd_fs = self.udisks_filesystem(cd=True)
+
+ mount_path = cd_fs.call_mount_sync(no_options, None)
+ try:
+ self.assertTrue('/media/' in mount_path)
+ self.sync()
+ self.assertEqual(cd_fs.get_property('mount-points'), [mount_path])
+ self.assertTrue(self.is_mountpoint(mount_path))
+
+ self.assertEqual(self.udisks_block(cd=True).get_property('read-only'), True)
+ finally:
+ self.retry_busy(cd_fs.call_unmount_sync, no_options, None)
+ self.assertFalse(os.path.exists(mount_path), 'mount point was not removed')
+ self.assertEqual(cd_fs.get_property('mount-points'), [mount_path])
+
def _do_file_perms_checks(self, type, mount_point):
'''Check for permissions for data files and executables.
@@ -1090,12 +1228,24 @@ class Luks(UDisksTestCase):
self.assertTrue(mount_path.endswith('treasure'))
# removal should clean up mounts
- self.remove_device(self.device)
- self.assertFalse(os.path.exists(mount_path))
- self.assertEqual(self.client.get_object(path), None)
+ try:
+ self.remove_device(self.device)
+ self.assertFalse(os.path.exists(mount_path))
+ timeout = 50
+ while timeout > 0:
+ if self.client.get_object(path) is None:
+ break
+ timeout -= 1
+ # we do not have a main loop, and cannot currently use
+ # g_main_context_get_default() from introspection, so
+ # instead of refreshing self.client, get a new one
+ self.client = UDisks.Client.new_sync(None)
+ time.sleep(0.1)
+ self.assertGreater(timeout, 0, 'timeout waiting for object path %s to disappear' % path)
+ finally:
+ self.readd_devices()
# after putting it back, it should be mountable again
- self.readd_devices()
crypt_obj = self.client.get_object(self.udisks_block().get_object_path())
path = crypt_obj.get_property('encrypted').call_unlock_sync('s3kr1t',
no_options, None)
@@ -1150,6 +1300,26 @@ class Polkit(UDisksTestCase, test_polkitd.PolkitTestCase):
self.assertEqual(block.get_property('id-type'), 'ext4')
self.assertEqual(block.get_property('id-label'), 'polkityes')
+ def test_removable_fs(self):
+ '''Create FS on removable drive (allowed)'''
+
+ self.start_polkitd(['org.freedesktop.udisks2.filesystem-mount'])
+
+ # the scsi_debug CD drive content is the same as for the HD drive, but
+ # udev does not know about this; so give it a nudge to re-probe
+ subprocess.call(['udevadm', 'trigger', '--action=change',
+ '--sysname-match=' + os.path.basename(self.cd_device)])
+ self.sync()
+ self.sync()
+
+ fs = self.udisks_filesystem(cd=True)
+ self.assertNotEqual(fs, None)
+ mount_path = fs.call_mount_sync(no_options, None)
+ self.assertTrue('/media/' in mount_path, mount_path)
+
+ self.retry_busy(fs.call_unmount_sync, no_options, None)
+ self.client.settle()
+
# ----------------------------------------------------------------------------
if __name__ == '__main__':