diff options
author | Alexander Larsson <alexl@redhat.com> | 2012-10-12 00:18:02 +0200 |
---|---|---|
committer | Alexander Larsson <alexl@redhat.com> | 2012-10-12 00:21:26 +0200 |
commit | 8e999efb4f617d01b876f9b9d41cea11385cf3f9 (patch) | |
tree | f1f8d74acf763756ba75083198d657ed02dacb49 /test | |
parent | 7dfcc1d1fda1a9bd388ea0823e9b24807782b59e (diff) | |
download | gvfs-8e999efb4f617d01b876f9b9d41cea11385cf3f9.tar.gz |
Initial version of testing framework
This is an initial import of the gvfs-test test frameworks
from Martin Pitt, integrated into the gvfs tree.
For now its only run if you make test in the tests subdir
as some tests are failing. It also doesn't use the gvfs-testbed
script to launch a fuller test environment
Diffstat (limited to 'test')
-rw-r--r-- | test/.gitignore | 3 | ||||
-rw-r--r-- | test/Makefile.am | 46 | ||||
-rwxr-xr-x | test/gvfs-test | 988 | ||||
-rwxr-xr-x | test/run-in-tree.sh | 22 | ||||
-rw-r--r-- | test/session.conf.in | 53 |
5 files changed, 1111 insertions, 1 deletions
diff --git a/test/.gitignore b/test/.gitignore new file mode 100644 index 00000000..61baad1e --- /dev/null +++ b/test/.gitignore @@ -0,0 +1,3 @@ +*.service +session.conf + diff --git a/test/Makefile.am b/test/Makefile.am index fb6c4b21..bf9ca893 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -16,4 +16,48 @@ noinst_PROGRAMS = \ benchmark-posix-big-files \ $(NULL) -EXTRA_DIST = benchmark-common.c +session.conf: session.conf.in ../config.log + $(AM_V_GEN) $(SED) -e "s|\@testdir\@|$(abs_builddir)|" $< > $@ + +gvfs-daemon.service: $(top_srcdir)/daemon/gvfs-daemon.service.in ../config.log + $(AM_V_GEN) $(SED) -e "s|\@libexecdir\@|$(abs_top_builddir)/daemon|" $< > $@ + +gvfs-metadata.service: $(top_srcdir)/metadata/gvfs-metadata.service.in ../config.log + $(AM_V_GEN) $(SED) -e "s|\@libexecdir\@|$(abs_top_builddir)/metadata|" $< > $@ + +noinst_DATA= session.conf gvfs-daemon.service gvfs-metadata.service + +if USE_AFC +org.gtk.Private.AfcVolumeMonitor.service: $(top_srcdir)/monitor/afc/org.gtk.Private.AfcVolumeMonitor.service.in ../config.log + $(AM_V_GEN) $(SED) -e "s|\@libexecdir\@|$(abs_top_builddir)/monitor//afc|" $< > $@ +noinst_DATA+=org.gtk.Private.AfcVolumeMonitor.service +endif + +if USE_GDU +org.gtk.Private.GduVolumeMonitor.service: $(top_srcdir)/monitor/gdu/org.gtk.Private.GduVolumeMonitor.service.in ../config.log + $(AM_V_GEN) $(SED) -e "s|\@libexecdir\@|$(abs_top_builddir)/monitor//gdu|" $< > $@ +noinst_DATA+=org.gtk.Private.GduVolumeMonitor.service +endif + +if USE_GPHOTO2 +org.gtk.Private.GPhoto2VolumeMonitor.service: $(top_srcdir)/monitor/gphoto2/org.gtk.Private.GPhoto2VolumeMonitor.service.in ../config.log + $(AM_V_GEN) $(SED) -e "s|\@libexecdir\@|$(abs_top_builddir)/monitor//gphoto2|" $< > $@ +noinst_DATA+=org.gtk.Private.GPhoto2VolumeMonitor.service +endif + +if USE_HAL +org.gtk.Private.HalVolumeMonitor.service: $(top_srcdir)/monitor/hal/org.gtk.Private.HalVolumeMonitor.service.in ../config.log + $(AM_V_GEN) $(SED) -e "s|\@libexecdir\@|$(abs_top_builddir)/monitor//hal|" $< > $@ +noinst_DATA+=org.gtk.Private.HalVolumeMonitor.service +endif + +if USE_UDISKS2 +org.gtk.Private.UDisks2VolumeMonitor.service: $(top_srcdir)/monitor/udisks2/org.gtk.Private.UDisks2VolumeMonitor.service.in ../config.log + $(AM_V_GEN) $(SED) -e "s|\@libexecdir\@|$(abs_top_builddir)/monitor//udisks2|" $< > $@ +noinst_DATA+=org.gtk.Private.UDisks2VolumeMonitor.service +endif + +test: session.conf gvfs-daemon.service gvfs-metadata.service + ./run-in-tree.sh ./gvfs-test + +EXTRA_DIST = benchmark-common.c session.conf.in gvfs-test diff --git a/test/gvfs-test b/test/gvfs-test new file mode 100755 index 00000000..a55a675d --- /dev/null +++ b/test/gvfs-test @@ -0,0 +1,988 @@ +#!/usr/bin/python3 +import os +import os.path +import sys +import unittest +import subprocess +import tempfile +import tarfile +import zipfile +import time +import shutil +import fcntl +import re +from glob import glob + +in_testbed = os.path.exists('/home/gvfs_sandbox_marker') +samba_running = subprocess.call(['pidof', 'smbd'], stdout=subprocess.PIPE) == 0 + +local_ip = subprocess.check_output("ip -4 addr | sed -nr '/127\.0\.0/ n; " + "/inet / { s/^.*inet ([0-9.]+).*$/\\1/; p; q }'", + shell=True, universal_newlines=True) + +# http://sg.danny.cz/sg/sdebug26.html +PTYPE_DISK = 0 +PTYPE_CDROM = 5 + +class GvfsTestCase(unittest.TestCase): + '''Gvfs tests base class. + + Provide some utility functions and a temporary work dir. + ''' + def setUp(self): + self.workdir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.workdir) + + def program_code_out_err(self, argv): + '''Return (exitcode, stdout, stderr) from a program call.''' + + prog = subprocess.Popen(argv, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, universal_newlines=True) + (out, err) = prog.communicate() + return (prog.returncode, out, err) + + def program_out_err(self, argv): + '''Return (stdout, stderr) from a program call.''' + + (code, out, err) = self.program_code_out_err(argv) + self.assertEqual(code, 0, err) + return (out, err) + + def program_out_success(self, argv): + '''Return stdout from a successful program call.''' + + (out, err) = self.program_out_err(argv) + self.assertEqual(err, '', err) + return out + + @classmethod + def root_command(klass, command): + '''Run a shell command string as root. + + This only works when running under gvfs-testbed. + + Return (code, stdout, stderr). + ''' + assert in_testbed, 'root_command() only works under gvfs-testbed' + + rootsh = subprocess.Popen(['./rootsh'], stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True) + # set reasonable path that includes /sbin + rootsh.stdin.write('export PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin\n') + + (out, err) = rootsh.communicate(command.encode('UTF-8')) + return (rootsh.returncode, out, err) + + @classmethod + def root_command_success(klass, command): + '''root_command() for commands that should succeed without errors.''' + + (code, out, err) = klass.root_command(command) + if code != 0: + raise SystemError('command "%s" failed with code %i:\n%s' % (code, err)) + if err: + raise SystemError('command "%s" produced error:\n%s' % err) + + def unmount(self, uri): + self.program_out_success(['gvfs-mount', '-u', uri]) + + timeout = 50 + while timeout > 0: + (out, err) = self.program_out_err(['gvfs-mount', '-li']) + if 'Mount(0)' not in out: + break + timeout -= 1 + else: + self.fail('gvfs-mount -u %s failed' % uri) + + @classmethod + def quote(klass, path): + '''Quote a path for GIO URLs''' + + return path.replace('%', '%25').replace('/', '%2F').replace(':', '%3A') + + def wait_for_gvfs_mount_user_prompt(self, popen): + '''Wait for a gvfs-mount Popen process to show an User auth prompt''' + + empty_timeout = 50 + while True: + r = popen.stdout.read(1000) + if not r: + empty_timeout -= 1 + self.assertGreater(empty_timeout, 0, + 'timed out waiting for auth prompt') + + if b'User' in r: + break + time.sleep(0.1) + +class Programs(GvfsTestCase): + '''Test gvfs-* programs''' + + def test_gvfs_info_filesystem(self): + '''gvfs-info --filesystem''' + + out = self.program_out_success(['gvfs-info', '-f', '/']) + self.assertTrue('filesystem::size:' in out, out) + self.assertTrue('filesystem::type:' in out, out) + +class ArchiveMounter(GvfsTestCase): + def test_tar(self): + '''archive:// for tar''' + + tar_path = os.path.join(self.workdir, 'stuff.tar') + tf = tarfile.open(tar_path, 'w') + tf.add(__file__, 'gvfs-test.py') + tf.close() + + self.do_test_for_archive(tar_path) + + def test_tar_gz(self): + '''archive:// for tar.gz''' + + tar_path = os.path.join(self.workdir, 'stuff.tar.gz') + tf = tarfile.open(tar_path, 'w:gz') + tf.add(__file__, 'gvfs-test.py') + tf.close() + + self.do_test_for_archive(tar_path) + + def test_tar_bz2(self): + '''archive:// for tar.bz2''' + + tar_path = os.path.join(self.workdir, 'stuff.tar.bz2') + tf = tarfile.open(tar_path, 'w:bz2') + tf.add(__file__, 'gvfs-test.py') + tf.close() + + self.do_test_for_archive(tar_path) + + def test_zip(self): + '''archive:// for .zip''' + + zip_path = os.path.join(self.workdir, 'stuff.zip') + zf = zipfile.ZipFile(zip_path, 'w') + zf.write(__file__, 'gvfs-test.py') + zf.close() + + self.do_test_for_archive(zip_path) + + def test_iso_rr(self): + '''archive:// for RockRidge .iso''' + + shutil.copy(__file__, os.path.join(self.workdir, 'gvfs-test.py')) + subprocess.check_call(['genisoimage', '-R', '-quiet', '-o', 'stuff.iso', 'gvfs-test.py'], + cwd=self.workdir) + self.do_test_for_archive(os.path.join(self.workdir, 'stuff.iso')) + + def test_iso_joliet(self): + '''archive:// for Joliet .iso''' + + shutil.copy(__file__, os.path.join(self.workdir, 'gvfs-test.py')) + iso_path = os.path.join(self.workdir, 'stuff.iso') + subprocess.check_call(['genisoimage', '-JR', '-quiet', '-o', 'stuff.iso', 'gvfs-test.py'], + cwd=self.workdir) + self.do_test_for_archive(os.path.join(self.workdir, 'stuff.iso')) + + def do_test_for_archive(self, path): + # mount it; yes, gvfs expects double quoting + uri = 'archive://' + self.quote(self.quote('file://' + path)) + subprocess.check_call(['gvfs-mount', uri]) + + # appears in gvfs-mount list + (out, err) = self.program_out_err(['gvfs-mount', '-li']) + try: + self.assertTrue('Mount(0)' in out, out) + self.assertTrue('%s -> %s' % (os.path.basename(path), uri) in out, out) + + # check gvfs-info + out = self.program_out_success(['gvfs-info', uri]) + self.assertTrue('standard::content-type: inode/directory' in out, out) + self.assertTrue('access::can-read: TRUE' in out, out) + + # check gvfs-cat + out = self.program_out_success(['gvfs-cat', uri + '/gvfs-test.py']) + with open(__file__) as f: + self.assertEqual(out, f.read()) + finally: + self.unmount(uri) + +@unittest.skipUnless(in_testbed, 'not running under gvfs-testbed') +class Sftp(GvfsTestCase): + @classmethod + def setUpClass(klass): + # triple-check that we are in the testbed + assert not os.path.exists('.ssh') + # generate ssh key for test user + os.mkdir('.ssh') + subprocess.check_call(['ssh-keygen', '-q', '-f', '.ssh/id_rsa', '-N', '']) + + def setUp(self): + '''Run ssh server''' + + super().setUp() + + self.sshd_log = open('/var/log/sshd.log', 'ab') + self.sshd = subprocess.Popen([os.environ['SSHD'], '-Dde', '-p', '2222', + '-o', 'UsePrivilegeSeparation no', + '-o', 'UsePam no'], + stderr=self.sshd_log) + + def tearDown(self): + if os.path.exists('.ssh/authorized_keys'): + os.unlink('.ssh/authorized_keys') + + if self.sshd.returncode is None: + self.sshd.terminate() + self.sshd.wait() + self.sshd_log.close() + super().tearDown() + + def test_rsa(self): + '''sftp://localhost with RSA authentication''' + + # accept our key for localhost logins + shutil.copy('.ssh/id_rsa.pub', '.ssh/authorized_keys') + + # mount it + uri = 'sftp://localhost:2222' + subprocess.check_call(['gvfs-mount', uri]) + + self.do_mount_check(uri) + + @unittest.skipUnless(local_ip, 'not having any non-localhost IP') + def test_unknown_host(self): + '''sftp:// with RSA authentication for unknown host''' + + # accept our RSA key + shutil.copy('.ssh/id_rsa.pub', '.ssh/authorized_keys') + + # try to mount it; should fail as it's an unknown host + uri = 'sftp://%s:2222' % local_ip + (code, out, err) = self.program_code_out_err(['gvfs-mount', uri]) + + self.assertNotEqual(code, 0) + # there is nothing in our testbed which would show or answer the + # dialog + self.assertTrue('Login dialog cancelled' in err, err) + + def do_mount_check(self, uri): + with open('stuff.txt', 'w') as f: + f.write('moo!') + + # appears in gvfs-mount list + (out, err) = self.program_out_err(['gvfs-mount', '-li']) + try: + self.assertRegex(out, 'Mount\(\d+\):.*localhost -> %s' % uri) + + # check gvfs-info + out = self.program_out_success(['gvfs-info', uri]) + self.assertTrue('display name: / on localhost' in out, out) + self.assertTrue('type: directory' in out, out) + self.assertTrue('access::can-read: TRUE' in out, out) + + # check gvfs-ls + out = self.program_out_success(['gvfs-ls', uri + '/home']) + self.assertTrue('gvfs_sandbox_marker' in out, out) + + # check gvfs-cat + out = self.program_out_success(['gvfs-cat', uri + '/home/%s/stuff.txt' % os.environ['USER']]) + self.assertEqual(out, 'moo!') + finally: + self.unmount(uri) + +class Ftp(GvfsTestCase): + def setUp(self): + '''Launch FTP server''' + + super().setUp() + with open(os.path.join(self.workdir, 'myfile.txt'), 'w') as f: + f.write('hello world\n') + os.mkdir(os.path.join(self.workdir, 'mydir')) + secret_path = os.path.join(self.workdir, 'mydir', 'onlyme.txt') + with open(secret_path, 'w') as f: + f.write('secret\n') + os.chmod(secret_path, 0o600) + + self.ftpd = subprocess.Popen(['twistd', '-n', 'ftp', '-p', '2121', + '-r', self.workdir, + '--auth', 'memory:testuser:pwd1'], + stdout=subprocess.PIPE) + # give ftp server some time to start up + time.sleep(0.5) + + def tearDown(self): + '''Shut down FTP server''' + + self.ftpd.terminate() + self.ftpd.wait() + super().tearDown() + + def test_anonymous(self): + '''ftp:// anonymous''' + + uri = 'ftp://anonymous@localhost:2121' + subprocess.check_call(['gvfs-mount', uri]) + + self.do_mount_check(uri, True) + + def test_authenticated(self): + '''ftp:// authenticated''' + + uri = 'ftp://localhost:2121' + mount = subprocess.Popen(['gvfs-mount', uri], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + # wrong user name + self.wait_for_gvfs_mount_user_prompt(mount) + mount.stdin.write(b'eve\nh4ck\n') + + # wrong password name + self.wait_for_gvfs_mount_user_prompt(mount) + mount.stdin.write(b'testuser\nh4ck\n') + + # correct credentials + self.wait_for_gvfs_mount_user_prompt(mount) + (out, err) = mount.communicate(b'testuser\npwd1\n') + self.assertEqual(mount.returncode, 0) + self.assertEqual(err, b'') + + # in test bed, there is nothing interesting in /home/testuser/, and + # without the test bed we do not know what's in the folder, so skip + # gvfs-ls check + self.do_mount_check(uri, False) + + def do_mount_check(self, uri, check_contents): + # appears in gvfs-mount list + (out, err) = self.program_out_err(['gvfs-mount', '-li']) + try: + self.assertRegex(out, 'Mount\(\d+\):.* -> ftp://([a-z0-9]+@)?localhost:2121') + + # check gvfs-info + out = self.program_out_success(['gvfs-info', uri]) + self.assertRegex(out, 'display name: / .* localhost', out) + self.assertTrue('type: directory' in out, out) + + # check gvfs-ls + if check_contents: + out = self.program_out_success(['gvfs-ls', uri]) + self.assertEqual(set(out.split()), set(['myfile.txt', 'mydir'])) + out = self.program_out_success(['gvfs-ls', uri + '/mydir']) + self.assertEqual(out, 'onlyme.txt\n') + + # check gvfs-cat + out = self.program_out_success(['gvfs-cat', uri + '/myfile.txt']) + self.assertEqual(out, 'hello world\n') + finally: + self.unmount(uri) + +@unittest.skipUnless(samba_running, 'smbd is not running') +class Smb(GvfsTestCase): + + def setUp(self): + super().setUp() + + # create a few test files + with open(os.path.join(self.workdir, 'myfile.txt'), 'w') as f: + f.write('hello world\n') + os.mkdir(os.path.join(self.workdir, 'mydir')) + secret_path = os.path.join(self.workdir, 'mydir', 'onlyme.txt') + with open(secret_path, 'w') as f: + f.write('secret\n') + os.chmod(secret_path, 0o600) + + def test_anonymous(self): + '''smb:// anonymous''' + + os.chmod(self.workdir, 0o755) + subprocess.check_call(['net', 'usershare', 'add', 'myfiles', + self.workdir, 'My Files', + '%s:F,Everyone:R' % os.environ['USER'], + 'guest_ok=y']) + try: + uri = 'smb://%s/myfiles' % os.uname()[1] + subprocess.check_call(['gvfs-mount', uri]) + self.do_mount_check(uri, False) + finally: + subprocess.check_call(['net', 'usershare', 'delete', 'myfiles']) + + # needs predictable password + @unittest.skipUnless(in_testbed, 'not running under gvfs-testbed') + def test_authenticated(self): + '''smb:// authenticated''' + + subprocess.check_call(['net', 'usershare', 'add', 'myfiles', + self.workdir, 'My Files', + '%s:F' % os.environ['USER']]) + try: + uri = 'smb://%s/myfiles' % os.uname()[1] + mount = subprocess.Popen(['gvfs-mount', uri], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + # correct credentials + self.wait_for_gvfs_mount_user_prompt(mount) + # default user, default domain, password + (out, err) = mount.communicate(b'\n\nfoo\n') + self.assertEqual(mount.returncode, 0, err) + #self.assertEqual(err, b'') # we get some warnings + + self.do_mount_check(uri, True) + finally: + subprocess.check_call(['net', 'usershare', 'delete', 'myfiles']) + + def do_mount_check(self, uri, auth): + # appears in gvfs-mount list + (out, err) = self.program_out_err(['gvfs-mount', '-li']) + try: + self.assertRegex(out, 'Mount\(0\): myfiles .* smb://.*/myfiles') + + # check gvfs-info + out = self.program_out_success(['gvfs-info', uri]) + self.assertTrue('display name: myfiles' in out, out) + self.assertTrue('type: directory' in out, out) + + # check gvfs-ls + out = self.program_out_success(['gvfs-ls', uri]) + self.assertEqual(set(out.split()), set(['myfile.txt', 'mydir'])) + out = self.program_out_success(['gvfs-ls', uri + '/mydir']) + self.assertEqual(out, 'onlyme.txt\n') + + # check gvfs-cat + out = self.program_out_success(['gvfs-cat', uri + '/myfile.txt']) + self.assertEqual(out, 'hello world\n') + + if auth: + out = self.program_out_success(['gvfs-cat', uri + '/mydir/onlyme.txt']) + self.assertEqual(out, 'secret\n') + + # should be writable + self.program_out_success(['gvfs-copy', uri + '/myfile.txt', + uri + '/mycopy.txt']) + out = self.program_out_success(['gvfs-cat', uri + '/mycopy.txt']) + self.assertEqual(out, 'hello world\n') + else: + (code, out, err) = self.program_code_out_err(['gvfs-cat', uri + '/mydir/onlyme.txt']) + self.assertNotEqual(code, 0) + self.assertEqual(out, '') + self.assertTrue('onlyme.txt' in err) + + # should be read-only + (code, out, err) = self.program_code_out_err(['gvfs-copy', uri + '/myfile.txt', + uri + '/mycopy.txt']) + self.assertNotEqual(code, 0) + self.assertEqual(out, '') + self.assertTrue('myfile.txt' in err, err) + finally: + self.unmount(uri) + +@unittest.skipUnless(in_testbed, 'not running under gvfs-testbed') +@unittest.skipIf(os.path.exists('/sys/module/scsi_debug'), 'scsi_debug is already loaded') +class Drive(GvfsTestCase): + @classmethod + def setUpClass(klass): + '''Load scsi_debug and put a simple .iso into it''' + + # generate a test .iso + test_iso = 'test.iso' + subprocess.check_call(['genisoimage', '-R', '-quiet', '-V', 'bogus-cd', '-o', + test_iso, '/etc/passwd', __file__]) + + # we cannot write to a scsi_debug CD drive, so write it into it in hard + # disk mode + klass.root_command_success('modprobe scsi_debug add_host=0 dev_size_mb=64') + dev = klass.create_host(PTYPE_DISK) + + # put test.iso onto disk + klass.root_command_success('cat %s > /dev/%s; sync' % (test_iso, dev)) + + # leave the actual device creation to the individual tests; all devices + # created henceforth will default to the test.iso contents + klass.remove_device(dev) + + while klass.get_devices(): + time.sleep(0.2) + + @classmethod + def tearDownClass(klass): + for dev in klass.get_devices(): + klass.remove_device(dev) + + # remove scsi_debug; might need a few tries while being busy + timeout = 10 + while timeout > 0: + (code, out, err) = klass.root_command('rmmod -v scsi_debug') + if code == 0: + break + if 'in use' in err: + time.sleep(0.2) + else: + break + if code != 0: + raise SystemError('cannot rmmod scsi_debug: ' + err) + + @classmethod + def get_devices(klass): + '''Return current set of device names from scsi_debug''' + + devs = [] + for dir in glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block'): + try: + devs += os.listdir(dir) + except OSError: + # TOCTOU, might change underneath us + pass + return set(devs) + + @classmethod + def create_host(klass, ptype): + '''Create a new SCSI host. + + Return device name. + ''' + orig_devs = klass.get_devices() + klass.root_command_success('echo %i > /sys/bus/pseudo/drivers/scsi_debug/ptype' % ptype) + klass.root_command_success('echo 1 > /sys/bus/pseudo/drivers/scsi_debug/add_host') + + timeout = 1000 + while timeout >= 0: + devs = klass.get_devices() + if devs - orig_devs: + break + time.sleep(0.2) + timeout -= 1 + else: + raise SystemError('timed out waiting for new device') + + new_devs = devs - orig_devs + assert len(new_devs) == 1 + return new_devs.pop() + + @classmethod + def remove_device(klass, device): + '''Remove given device name.''' + + klass.root_command_success('echo 1 > /sys/block/%s/device/delete' % device) + + def setUp(self): + self.mock_polkit = None + + self.monitor = subprocess.Popen(['gvfs-mount', '-oi'], + stdout=subprocess.PIPE) + # set monitor stdout to non-blocking + fl = fcntl.fcntl(self.monitor.stdout, fcntl.F_GETFL) + fcntl.fcntl(self.monitor.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK) + + # wait until monitor is ready + while 'Monitoring events' not in self.get_monitor_output(): + time.sleep(0.1) + + def tearDown(self): + self.monitor.terminate() + self.monitor.wait() + self.stop_polkit() + + def test_cdrom(self): + '''drive mount: cdrom''' + + dev = self.create_host(PTYPE_CDROM) + + # check that gvfs monitor picks up the new drive + out = self.get_monitor_output() + self.assertRegex(out, 'Drive connected:\s+.*CD') + self.assertRegex(out, 'unix-device:.*/dev/%s' % dev) + self.assertTrue('has_media=1' in out, out) + + self.assertRegex(out, 'Volume added:\s+.*bogus-cd') + self.assertRegex(out, "label:\s+'bogus-cd") + self.assertTrue('can_mount=1' in out, out) + self.assertTrue('should_automount=1' in out, out) + self.assertRegex(out, 'themed icons:.*media-optical') + + # user is not on any local session in the sandbox, so mounting ought to + # fail + (code, out, err) = self.program_code_out_err(['gvfs-mount', '-d', '/dev/' + dev]) + self.assertNotEqual(code, 0) + self.assertRegex(err, 'Not authorized') + + # tell polkit to do allow removable (but not internal) storage + self.start_polkit(['org.freedesktop.udisks2.filesystem-mount']) + + # now mounting should succeed + (out, err) = self.program_out_err(['gvfs-mount', '-d', '/dev/' + dev]) + + # should appear as Mount + (out, err) = self.program_out_err(['gvfs-mount', '-li']) + self.assertEqual(err.strip(), '') + match = re.search('Mount\(\d+\): bogus-cd -> (file://.*/media/.*/bogus-cd)', out) + self.assertTrue(match, 'no Mount found in gvfs-mount -li output:\n' + out) + + # unmount it again + self.unmount(match.group(1)) + + def test_system_partition(self): + '''drive mount: system partition''' + + dev = self.create_host(PTYPE_DISK) + + # check that gvfs monitor picks up the new drive + out = self.get_monitor_output() + self.assertRegex(out, 'Drive connected:\s+.*Disk') + self.assertRegex(out, 'unix-device:.*/dev/%s' % dev) + self.assertTrue('has_media=1' in out, out) + + self.assertRegex(out, 'Volume added:\s+.*bogus-cd') + self.assertRegex(out, "label:\s+'bogus-cd") + self.assertTrue('should_automount=0' in out, out) + self.assertRegex(out, 'themed icons:.*harddisk') + + # should fail with only allowing the user to mount removable storage + self.start_polkit(['org.freedesktop.udisks2.filesystem-mount']) + (code, out, err) = self.program_code_out_err(['gvfs-mount', '-d', '/dev/' + dev]) + self.assertNotEqual(code, 0) + self.assertRegex(err, 'Not authorized') + + # should succeed with allowing the user to mount system storage + self.start_polkit(['org.freedesktop.udisks2.filesystem-mount-system']) + (out, err) = self.program_out_err(['gvfs-mount', '-d', '/dev/' + dev]) + + # should appear as Mount + (out, err) = self.program_out_err(['gvfs-mount', '-li']) + self.assertEqual(err.strip(), '') + match = re.search('Mount\(\d+\): bogus-cd -> (file://.*/media/.*/bogus-cd)', out) + self.assertTrue(match, 'no Mount found in gvfs-mount -li output:\n' + out) + + # unmount it again + self.unmount(match.group(1)) + + def test_media_player(self): + '''drive mount: media player''' + + def cleanup(): + rootsh = subprocess.Popen(['./rootsh'], stdin=subprocess.PIPE) + rootsh.communicate(b'''rm /run/udev/rules.d/40-scsi_debug-fake-mediaplayer.rules +pkill --signal HUP udevd || pkill --signal HUP systemd-udevd +''') + + # create udev rule to turn it into a music player + self.addCleanup(cleanup) + rootsh = subprocess.Popen(['./rootsh'], stdin=subprocess.PIPE) + rootsh.communicate(b'''export PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin +echo 'SUBSYSTEM=="block", ATTRS{model}=="scsi_debug*", ENV{ID_MEDIA_PLAYER}="MockTune"' > /run/udev/rules.d/40-scsi_debug-fake-mediaplayer.rules +sync +pkill --signal HUP udevd || pkill --signal HUP systemd-udevd +''') + + dev = self.create_host(PTYPE_DISK) + + # check that gvfs monitor picks up the new volume + out = self.get_monitor_output() + self.assertRegex(out, 'Volume added:\s+.*bogus-cd') + self.assertRegex(out, "label:\s+'bogus-cd") + self.assertTrue('should_automount=0' in out, out) + self.assertRegex(out, 'themed icons:.*harddisk') + + # mount it + self.start_polkit(['org.freedesktop.udisks2.filesystem-mount-system']) + (out, err) = self.program_out_err(['gvfs-mount', '-d', '/dev/' + dev]) + + # should appear as Mount + (out, err) = self.program_out_err(['gvfs-mount', '-li']) + self.assertEqual(err.strip(), '') + match = re.search('Mount\(\d+\): bogus-cd -> (file://.*/media/.*/bogus-cd)', out) + self.assertTrue(match, 'no Mount found in gvfs-mount -li output:\n' + out) + + # should have media player content + self.assertRegex(out, 'x_content_types:.*x-content/audio-player') + + # unmount it again + self.unmount(match.group(1)) + + def get_monitor_output(self): + '''Wait for gvfs monitor to output something, and return it''' + + empty_timeout = 50 + while True: + out = self.monitor.stdout.readall() + if out: + break + else: + empty_timeout -= 1 + self.assertGreater(empty_timeout, 0, + 'timed out waiting for monitor output') + + time.sleep(0.1) + + # wait a bit more to see whether we catch some stragglers + time.sleep(0.2) + out2 = self.monitor.stdout.readall() + if out2: + out += out2 + + return out.decode() + + def start_polkit(self, actions): + '''Start mock polkit with list of allowed actions.''' + + self.stop_polkit() + self.mock_polkit = subprocess.Popen(['./rootsh'], + stdin=subprocess.PIPE) + self.mock_polkit.stdin.write(('set -e\n/home/test_polkitd.py -r -a %s\n' + % ','.join(actions)).encode('ASCII')) + # wait until it started up + if actions: + timeout = 50 + while timeout > 0: + try: + out = subprocess.check_output(['pkcheck', '--action-id', actions[0], '--process', '1'], + stderr=subprocess.PIPE) + if b'test=test' in out: + break + except subprocess.CalledProcessError: + pass + + time.sleep(0.1) + timeout -= 1 + else: + self.fail('timed out waiting for test_polkitd.py') + else: + # we can only cross fingers here, as we do not have an action to verify + time.sleep(0.5) + + self.assertEqual(self.mock_polkit.poll(), None, + 'mock polkitd unexpectedly terminated') + + def stop_polkit(self): + '''Stop mock polkit, if it is running.''' + + if self.mock_polkit: + # for some reason, terminating the shell doesn't terminate the + # polkitd running in it, so kill that separately + self.root_command('kill `pidof -x /home/test_polkitd.py`') + self.mock_polkit.terminate() + self.mock_polkit.wait() + self.mock_polkit = None + +class Dav(GvfsTestCase): + '''Test WebDAV backend''' + + @classmethod + def setUpClass(klass): + '''Set up Apache httpd sandbox''' + + klass.mod_dir = klass.get_httpd_module_dir() + klass.httpd_sandbox = tempfile.mkdtemp() + + # create SSL certificate + openssl = subprocess.check_call(['openssl', 'req', '-x509', '-nodes', + '-days', '1', '-newkey', 'rsa:1024', '-subj', + '/CN=localhost', '-keyout', 'mycert.pem', + '-out', 'mycert.pem'], + stderr=subprocess.PIPE, + cwd=klass.httpd_sandbox) + + klass.public_dir = os.path.join(klass.httpd_sandbox, 'public') + os.mkdir(klass.public_dir) + with open(os.path.join(klass.public_dir, 'hello.txt'), 'w') as f: + f.write('hi\n') + + klass.secret_dir = os.path.join(klass.httpd_sandbox, 'secret') + os.mkdir(klass.secret_dir) + with open(os.path.join(klass.secret_dir, 'restricted.txt'), 'w') as f: + f.write('dont tell anyone\n') + + # test:s3kr1t + with open(os.path.join(klass.httpd_sandbox, 'htpasswd'), 'w') as f: + f.write('test:$apr1$t0B4mfkT$Tr8ip333/ZR/7xrRBuxjI.\n') + + with open(os.path.join(klass.httpd_sandbox, 'apache2.conf'), 'w') as f: + f.write('''Listen localhost:8088 +Listen localhost:4443 +LoadModule dav_module %(mod_dir)s/mod_dav.so +LoadModule dav_fs_module %(mod_dir)s/mod_dav_fs.so +LoadModule ssl_module %(mod_dir)s/mod_ssl.so +LoadModule auth_basic_module %(mod_dir)s/mod_auth_basic.so +LoadModule authn_file_module %(mod_dir)s/mod_authn_file.so +LoadModule authz_user_module %(mod_dir)s/mod_authz_user.so + +DocumentRoot . +PidFile apache.pid +LogLevel debug +ErrorLog error_log +DAVLockDB DAVLock + +<VirtualHost localhost:4443> + ServerName localhost + SSLEngine on + SSLCertificateFile mycert.pem + SSLCertificateKeyFile mycert.pem +</VirtualHost> + +<Directory %(root)s/public> + Dav On +</Directory> + +<Directory %(root)s/secret> + Dav On + AuthType Basic + AuthName DAV + AuthUserFile htpasswd + Require valid-user +</Directory> +''' % {'mod_dir': klass.mod_dir, 'root': klass.httpd_sandbox}) + + # start server + subprocess.check_call(['apachectl', '-d', klass.httpd_sandbox, '-f', 'apache2.conf', '-k', 'start']) + + @classmethod + def tearDownClass(klass): + '''Stop httpd server and remove sandbox''' + + subprocess.call(['apachectl', '-d', klass.httpd_sandbox, '-f', 'apache2.conf', '-k', 'stop']) + shutil.rmtree(klass.httpd_sandbox) + + @classmethod + def get_httpd_module_dir(klass): + '''Return module directory for Apache httpd. + + Unfortunately this is highly distro/platform specific, so try to + determine it from apxs2 or apachectl. + ''' + # if we have apxs2 installed, use this + try: + apxs2 = subprocess.Popen(['apxs2', '-q', 'LIBEXECDIR'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True) + out = apxs2.communicate()[0].strip() + assert apxs2.returncode == 0, 'apxs2 -V failed' + return out + except OSError: + # Look for apxs instead + try: + apxs2 = subprocess.Popen(['apxs', '-q', 'LIBEXECDIR'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True) + out = apxs2.communicate()[0].strip() + assert apxs2.returncode == 0, 'apxs2 -V failed' + return out + except OSError: + print('[no apxs2, falling back]') + pass + + # fall back to looking for modules in HTTPD_ROOT/modules/ + ctl = subprocess.Popen(['apachectl', '-V'], + stdout=subprocess.PIPE, + universal_newlines=True) + out = ctl.communicate()[0] + assert ctl.returncode == 0, 'apachectl -V failed' + m = re.search('\sHTTPD_ROOT="([^"]+)"\s', out) + assert m, 'apachectl -V does not show HTTPD_ROOT' + mod_dir = os.path.join(m.group(1), 'modules') + assert os.path.isdir(mod_dir), \ + '%s does not exist, cannot determine httpd module path' % mod_dir + return mod_dir + + def test_http_noauth(self): + '''dav://localhost without credentials''' + + uri = 'dav://localhost:8088/public' + subprocess.check_call(['gvfs-mount', uri]) + self.do_mount_check(uri, 'hello.txt', 'hi\n') + + def test_https_noauth(self): + '''davs://localhost without credentials''' + + uri = 'davs://localhost:4443/public' + subprocess.check_call(['gvfs-mount', uri]) + self.do_mount_check(uri, 'hello.txt', 'hi\n') + + def test_http_auth(self): + '''dav://localhost with credentials''' + + uri = 'dav://localhost:8088/secret' + + mount = subprocess.Popen(['gvfs-mount', uri], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + # wrong password + self.wait_for_gvfs_mount_user_prompt(mount) + mount.stdin.write(b'test\nh4ck\n') + + # correct password + (out, err) = mount.communicate(b's3kr1t\n') + self.assertEqual(mount.returncode, 0) + self.assertEqual(err, b'') + + self.do_mount_check(uri, 'restricted.txt', 'dont tell anyone\n') + + def test_https_auth(self): + '''davs://localhost with credentials''' + + uri = 'davs://localhost:4443/secret' + + mount = subprocess.Popen(['gvfs-mount', uri], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + # wrong password + self.wait_for_gvfs_mount_user_prompt(mount) + mount.stdin.write(b'test\nh4ck\n') + + # correct password + (out, err) = mount.communicate(b's3kr1t\n') + self.assertEqual(mount.returncode, 0) + self.assertEqual(err, b'') + + self.do_mount_check(uri, 'restricted.txt', 'dont tell anyone\n') + + def do_mount_check(self, uri, testfile, content): + # appears in gvfs-mount list + (out, err) = self.program_out_err(['gvfs-mount', '-li']) + try: + self.assertRegex(out, 'Mount\(\d+\):.* -> davs?://([a-z0-9]+@)?localhost') + + # check gvfs-info + out = self.program_out_success(['gvfs-info', uri]) + self.assertRegex(out, 'id::filesystem: dav') + self.assertTrue('type: directory' in out, out) + + # check gvfs-ls + out = self.program_out_success(['gvfs-ls', uri]) + self.assertEqual(out.strip(), testfile) + + # check gvfs-cat + out = self.program_out_success(['gvfs-cat', uri + '/' + testfile]) + self.assertEqual(out, content) + + # create a new file + self.program_out_success(['gvfs-copy', uri + '/' + testfile, uri + '/foo']) + out = self.program_out_success(['gvfs-cat', uri + '/foo']) + self.assertEqual(out, content) + + # remove it again + self.program_out_success(['gvfs-rm', uri + '/foo']) + out = self.program_out_success(['gvfs-ls', uri]) + self.assertFalse('foo' in out.split(), out) + finally: + self.unmount(uri) + + +if __name__ == '__main__': + # do not break tests due to translations + try: + del os.environ['LANGUAGE'] + except KeyError: + pass + os.environ['LC_ALL'] = 'C' + unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2)) diff --git a/test/run-in-tree.sh b/test/run-in-tree.sh new file mode 100755 index 00000000..9cb638f2 --- /dev/null +++ b/test/run-in-tree.sh @@ -0,0 +1,22 @@ +#!/bin/bash + +if [ $# -lt 1 ]; then + echo missing argument + exit +fi + +# Set up env vars to make gvfs read mounts from the build tree +export GVFS_MOUNTABLE_EXTENSION=".localmount" +export GVFS_MOUNTABLE_DIR=`pwd`/../daemon +export PATH=`pwd`/../programs:$PATH + +# Start a custom session dbus +PIDFILE=`mktemp` +export DBUS_SESSION_BUS_ADDRESS=`dbus-daemon --config-file=session.conf --fork --print-address=1 --print-pid=3 3>${PIDFILE}` +DBUS_SESSION_BUS_PID=`cat $PIDFILE` +rm $PIDFILE + +trap "kill -9 $DBUS_SESSION_BUS_PID" SIGINT SIGTERM EXIT + +$@ + diff --git a/test/session.conf.in b/test/session.conf.in new file mode 100644 index 00000000..611811f3 --- /dev/null +++ b/test/session.conf.in @@ -0,0 +1,53 @@ +<!DOCTYPE busconfig PUBLIC "-//freedesktop//DTD D-Bus Bus Configuration 1.0//EN" + "http://www.freedesktop.org/standards/dbus/1.0/busconfig.dtd"> +<busconfig> + <!-- Our well-known bus type, don't change this --> + <type>session</type> + + <!-- If we fork, keep the user's original umask to avoid affecting + the behavior of child processes. --> + <keep_umask/> + + <listen>unix:tmpdir=/tmp</listen> + + <servicedir>@testdir@</servicedir> + + <standard_session_servicedirs /> + + <policy context="default"> + <!-- Allow everything to be sent --> + <allow send_destination="*" eavesdrop="true"/> + <!-- Allow everything to be received --> + <allow eavesdrop="true"/> + <!-- Allow anyone to own anything --> + <allow own="*"/> + </policy> + + <include if_selinux_enabled="yes" selinux_root_relative="yes">contexts/dbus_contexts</include> + + <!-- For the session bus, override the default relatively-low limits + with essentially infinite limits, since the bus is just running + as the user anyway, using up bus resources is not something we need + to worry about. In some cases, we do set the limits lower than + "all available memory" if exceeding the limit is almost certainly a bug, + having the bus enforce a limit is nicer than a huge memory leak. But the + intent is that these limits should never be hit. --> + + <!-- the memory limits are 1G instead of say 4G because they can't exceed 32-bit signed int max --> + <limit name="max_incoming_bytes">1000000000</limit> + <limit name="max_incoming_unix_fds">250000000</limit> + <limit name="max_outgoing_bytes">1000000000</limit> + <limit name="max_outgoing_unix_fds">250000000</limit> + <limit name="max_message_size">1000000000</limit> + <limit name="max_message_unix_fds">4096</limit> + <limit name="service_start_timeout">120000</limit> + <limit name="auth_timeout">240000</limit> + <limit name="max_completed_connections">100000</limit> + <limit name="max_incomplete_connections">10000</limit> + <limit name="max_connections_per_user">100000</limit> + <limit name="max_pending_service_starts">10000</limit> + <limit name="max_names_per_connection">50000</limit> + <limit name="max_match_rules_per_connection">50000</limit> + <limit name="max_replies_per_connection">50000</limit> + +</busconfig> |