#!/usr/bin/env python3 __author__ = 'Martin Pitt ' __copyright__ = '(C) 2012 Canonical Ltd.' __license__ = 'LGPL v2 or later' # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General # Public License along with this library; if not, write to the # Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, # Boston, MA 02110-1301, USA. import os import os.path import sys import unittest import subprocess import tempfile import tarfile import zipfile import time import shutil import fcntl import re import locale import socket from glob import glob from gi.repository import GLib, Gio os.environ['GIO_USE_VFS'] = 'gvfs' try: import gi gi.require_version('UMockdev', '1.0') from gi.repository import UMockdev have_umockdev = subprocess.call(['which', 'umockdev-wrapper'], stdout=subprocess.PIPE) == 0 # needs >= 0.2.10 have_umockdev = have_umockdev and hasattr(UMockdev.Testbed, 'add_from_file') except (ValueError, ImportError): have_umockdev = False # umockdev environment for gphoto/MTP tests umockdev_testbed = None have_twistd = subprocess.call(['which', 'twistd'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) == 0 have_smbd = subprocess.call(['which', 'smbd'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) == 0 def find_alternative(cmds): '''Find command in cmds array and return the found alternative''' for cmd in cmds: if subprocess.call(['which', cmd], stdout=subprocess.PIPE, stderr=subprocess.PIPE) == 0: return cmd in_testbed = os.path.exists('/home/gvfs_sandbox_marker') samba_running = subprocess.call(['pidof', 'smbd'], stdout=subprocess.PIPE) == 0 httpd_cmd = find_alternative(['apache2', 'httpd', 'apachectl']) have_httpd = httpd_cmd is not None sshd_path = subprocess.check_output(['which', 'sshd'], universal_newlines=True).strip() local_ip = subprocess.check_output("ip -4 addr | sed -nr '/127\.0\.0/ n; " "/inet / { s/^.*inet ([0-9.]+).*$/\\1/; p; q }'", shell=True, universal_newlines=True) SMB_USER_PORT = 1445 # when running in the build tree, check whether Dav backend is enabled have_dav_backend = True if 'GVFS_MOUNTABLE_DIR' in os.environ: have_dav_backend = os.path.exists(os.path.join(os.environ['GVFS_MOUNTABLE_DIR'], 'dav.localmount')) my_dir = os.path.dirname(os.path.abspath(__file__)) # we need this flag to check if we can test error messages locale.setlocale(locale.LC_ALL, '') lc = locale.getlocale(locale.LC_MESSAGES)[0] english_messages = not lc or lc.startswith('en_') # http://sg.danny.cz/sg/sdebug26.html PTYPE_DISK = 0 PTYPE_CDROM = 5 # local D-BUS daemon dbus_daemon = None class GvfsTestCase(unittest.TestCase): '''Gvfs tests base class. Provide some utility functions and a temporary work dir. ''' def setUp(self): self.workdir = tempfile.mkdtemp() def tearDown(self): shutil.rmtree(self.workdir) if umockdev_testbed: umockdev_testbed.clear() def run(self, result=None): '''Show dbus daemon output on failed tests''' if result: orig_err_fail = len(result.errors) + len(result.failures) super().run(result) # always read the logs, so that we only get the ones relevant to this # particular test case dbus_out = dbus_daemon.stdout.read() dbus_err = dbus_daemon.stderr.read() if result and len(result.errors) + len(result.failures) > orig_err_fail: print('\n----- dbus stdout -----\n%s\n----- dbus stderr -----\n%s\n' % (dbus_out and dbus_out.decode('UTF-8') or '', dbus_err and dbus_err.decode('UTF-8') or '')) def program_code_out_err(self, argv): '''Return (exitcode, stdout, stderr) from a program call.''' prog = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) (out, err) = prog.communicate() return (prog.returncode, out, err) def program_out_err(self, argv): '''Return (stdout, stderr) from a program call.''' (code, out, err) = self.program_code_out_err(argv) self.assertEqual(code, 0, err) return (out, err) def program_out_success(self, argv): '''Return stdout from a successful program call.''' (out, err) = self.program_out_err(argv) self.assertEqual(err, '', err) return out @classmethod def root_command(klass, command): '''Run a shell command string as root. This only works when running under gvfs-testbed. Return (code, stdout, stderr). ''' assert in_testbed, 'root_command() only works under gvfs-testbed' rootsh = subprocess.Popen(['./rootsh', '-p'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # set reasonable path that includes /sbin rootsh.stdin.write(b'export PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin\n') (out, err) = rootsh.communicate(command.encode('UTF-8')) return (rootsh.returncode, out.decode('UTF-8'), err.decode('UTF-8')) @classmethod def root_command_success(klass, command): '''root_command() for commands that should succeed without errors.''' (code, out, err) = klass.root_command(command) if code != 0: raise SystemError('command "%s" failed with code %i:\n%s' % (command, code, err)) if err: raise SystemError('command "%s" produced error:\n%s' % (command, err)) def unmount(self, uri): self.program_out_success(['gio', 'mount', '-u', uri]) timeout = 5 while timeout > 0: (out, err) = self.program_out_err(['gio', 'mount', '-li']) if not re.search('Mount.*' + uri, out): break timeout -= 1 time.sleep(1) else: self.fail('gio mount -u %s failed' % uri) @classmethod def quote(klass, path): '''Quote a path for GIO URLs''' return path.replace('%', '%25').replace('/', '%2F').replace(':', '%3A') def wait_for_gvfs_mount_user_prompt(self, popen): '''Wait for a gio mount Popen process to show an User auth prompt''' empty_timeout = 50 # set stdout to nonblocking flags = fcntl.fcntl(popen.stdout, fcntl.F_GETFL) fcntl.fcntl(popen.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK) while True: r = popen.stdout.read(1000) # print(' wait_for_gvfs_mount_user_prompt: got "%s"' % str(r)) if r and (b'User' in r or b'Domain' in r): break self.assertGreater(empty_timeout, 0, 'timed out waiting for auth prompt') empty_timeout -= 1 time.sleep(0.1) # restore flags fcntl.fcntl(popen.stdout, fcntl.F_SETFL, flags) def wait_for_question(self, popen): '''Wait for a gio mount Popen process to show an question prompt''' empty_timeout = 50 # set stdout to nonblocking flags = fcntl.fcntl(popen.stdout, fcntl.F_GETFL) fcntl.fcntl(popen.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK) while True: r = popen.stdout.read(1000) if r and (b'Choice' in r): break self.assertGreater(empty_timeout, 0, 'timed out waiting for question prompt') empty_timeout -= 1 time.sleep(0.1) # restore flags fcntl.fcntl(popen.stdout, fcntl.F_SETFL, flags) def mount_api(self, gfile, mount_op=None): '''Mount a Gio.File using the Gio API This times out after 30 seconds. Return True on success or a GLib.GError object from the mount call. ''' self.cb_result = None def mount_done(obj, result, main_loop): ml.quit() try: success = obj.mount_enclosing_volume_finish(result) self.cb_result = (obj, success) except GLib.GError as e: self.cb_result = (obj, e) ml = GLib.MainLoop() gfile.mount_enclosing_volume(Gio.MountMountFlags.NONE, mount_op, None, mount_done, ml) # ensure we are timing out GLib.timeout_add_seconds(30, lambda data: ml.quit(), None) ml.run() self.assertNotEqual(self.cb_result, None, 'operation timed out') self.assertEqual(self.cb_result[0], gfile) return self.cb_result[1] def unmount_api(self, gfile): '''Umount a mounted Gio.File using the Gio API This times out after 5 seconds. ''' self.cb_result = None def unmount_done(obj, result, main_loop): success = obj.unmount_with_operation_finish(result) self.cb_result = (obj, success) main_loop.quit() mount = gfile.find_enclosing_mount(None) self.assertNotEqual(mount, None) ml = GLib.MainLoop() mount.unmount_with_operation(Gio.MountUnmountFlags.NONE, None, None, unmount_done, ml) # ensure we are timing out GLib.timeout_add_seconds(5, lambda data: ml.quit(), None) ml.run() self.assertNotEqual(self.cb_result, None, 'operation timed out') self.assertEqual(self.cb_result[0], mount) self.assertTrue(self.cb_result[1]) def make_mountop(self, user=None, password=None): '''Create a Gio.MountOperation from given credentials (anonymous is requested if credentials aren't given) On the first ask_password signal this sends the password, and aborts the second request (for tests that use wrong credentials). ''' def pwd_cb(op, message, default_user, default_domain, flags, data): # first call: send correct result if op.get_username() or op.get_anonymous(): op.reply(Gio.MountOperationResult.HANDLED) # subsequent calls: abort op.set_username('') mo.set_anonymous(False) op.reply(Gio.MountOperationResult.ABORTED) mo = Gio.MountOperation.new() if user is None and password is None: mo.set_anonymous(True) else: mo.set_username(user) mo.set_password(password) mo.connect('ask_password', pwd_cb, None) return mo class ArchiveMounter(GvfsTestCase): def add_files(self, add_fn): '''Add test files to an archive''' p = os.path.join(self.workdir, 'hello.txt') with open(p, 'w') as f: f.write('hello\n') add_fn(p, 'hello.txt') p = os.path.join(self.workdir, 'bye.txt') with open(p, 'w') as f: f.write('bye\n') add_fn(p, 'stuff/bye.txt') def test_tar(self): '''archive:// for tar''' tar_path = os.path.join(self.workdir, 'stuff.tar') tf = tarfile.open(tar_path, 'w') self.add_files(tf.add) tf.close() self.do_test_for_archive(tar_path) def test_tar_gz(self): '''archive:// for tar.gz''' tar_path = os.path.join(self.workdir, 'stuff.tar.gz') tf = tarfile.open(tar_path, 'w:gz') self.add_files(tf.add) tf.close() self.do_test_for_archive(tar_path) def test_tar_bz2(self): '''archive:// for tar.bz2''' tar_path = os.path.join(self.workdir, 'stuff.tar.bz2') tf = tarfile.open(tar_path, 'w:bz2') self.add_files(tf.add) tf.close() self.do_test_for_archive(tar_path) def test_zip(self): '''archive:// for .zip''' zip_path = os.path.join(self.workdir, 'stuff.zip') zf = zipfile.ZipFile(zip_path, 'w') self.add_files(zf.write) zf.close() self.do_test_for_archive(zip_path) def test_iso_rr(self): '''archive:// for RockRidge .iso''' iso = os.path.join(self.workdir, 'bogus-cd.iso') with open(iso, 'wb') as f: subprocess.check_call(['bzip2', '-cd', os.path.join(my_dir, 'files', 'bogus-cd.iso.bz2')], stdout=f) self.do_test_for_archive(iso) def test_iso_joliet(self): '''archive:// for Joliet .iso''' iso = os.path.join(self.workdir, 'joliet.iso') with open(iso, 'wb') as f: subprocess.check_call(['bzip2', '-cd', os.path.join(my_dir, 'files', 'joliet.iso.bz2')], stdout=f) self.do_test_for_archive(iso) def do_test_for_archive(self, path): # mount it; yes, gvfs expects double quoting uri = 'archive://' + self.quote(self.quote('file://' + path)) subprocess.check_call(['gio', 'mount', uri]) # appears in gio mount list (out, err) = self.program_out_err(['gio', 'mount', '-li']) try: self.assertTrue('Mount(0)' in out, out) self.assertTrue('%s -> %s' % (os.path.basename(path), uri) in out, out) # check gio info out = self.program_out_success(['gio', 'info', uri]) self.assertTrue('standard::content-type: inode/directory' in out, out) self.assertTrue('access::can-read: TRUE' in out, out) # check gio cat out = self.program_out_success(['gio', 'cat', uri + '/hello.txt']) self.assertEqual(out, 'hello\n') out = self.program_out_success(['gio', 'cat', uri + '/stuff/bye.txt']) self.assertEqual(out, 'bye\n') finally: self.unmount(uri) def test_api(self): '''archive:// with Gio API''' tar_path = os.path.join(self.workdir, 'stuff.tar') tf = tarfile.open(tar_path, 'w') tf.add(__file__, 'gvfs-test.py') tf.close() uri = 'archive://' + self.quote(self.quote('file://' + tar_path)) gfile = Gio.File.new_for_uri(uri) # not mounted yet, should fail self.assertRaises(GLib.GError, gfile.query_info, '*', 0, None) self.assertEqual(self.mount_api(gfile), True) try: info = gfile.query_info('*', 0, None) self.assertEqual(info.get_content_type(), 'inode/directory') self.assertEqual(info.get_file_type(), Gio.FileType.DIRECTORY) self.assertTrue('stuff.tar' in info.get_display_name(), info.get_display_name()) self.assertEqual(info.get_attribute_boolean('access::can-read'), True) finally: self.unmount_api(gfile) @unittest.skipUnless(os.getenv('XDG_RUNTIME_DIR'), 'No $XDG_RUNTIME_DIR available') @unittest.skipUnless(os.path.exists(os.path.expanduser('~/.ssh/id_rsa')), 'This test needs an existing ~/.ssh/id_rsa') class Sftp(GvfsTestCase): def setUp(self): '''Run ssh server''' super().setUp() # find sftp-server for dir in ['/usr/local/lib/openssh', '/usr/lib/openssh', '/usr/local/libexec/openssh', '/usr/libexec/openssh', '/usr/lib/misc', '/usr/lib64/misc']: sftp_server = os.path.join(dir, 'sftp-server') if os.path.exists(sftp_server): break else: self.fail('Cannot locate OpenSSH sftp-server program, please update tests for your distribution') # look for authorized_keys in a temporary dir, to avoid having to mess # with the actual user files when not calling this through gvfs-testbed # (unfortunately ssh doesn't consider $HOME); NB we cannot use # self.workdir as ssh refuses files in /tmp. self.authorized_keys = os.path.join(os.environ['XDG_RUNTIME_DIR'], 'gvfs_test_authorized_keys') # generate sshd configuration; note that we must ensure that the # private key is not world-readable, so we need to copy it shutil.copy(os.path.join(my_dir, 'files', 'ssh_host_rsa_key'), self.workdir) os.chmod(os.path.join(self.workdir, 'ssh_host_rsa_key'), 0o600) shutil.copy(os.path.join(my_dir, 'files', 'ssh_host_rsa_key.pub'), self.workdir) self.sshd_config = os.path.join(self.workdir, 'sshd_config') with open(self.sshd_config, 'w') as f: f.write('''Port 22222 HostKey %(workdir)s/ssh_host_rsa_key UsePrivilegeSeparation no AuthorizedKeysFile %(authorized_keys)s UsePam no Subsystem sftp %(sftp_server)s ''' % {'workdir': self.workdir, 'sftp_server': sftp_server, 'authorized_keys': self.authorized_keys}) self.sshd = subprocess.Popen([sshd_path, '-Dde', '-f', self.sshd_config], universal_newlines=True, stderr=subprocess.PIPE) def tearDown(self): if self.sshd.returncode is None: self.sshd.terminate() self.sshd.wait() super().tearDown() def run(self, result=None): '''Show sshd log output on failed tests''' if result: orig_err_fail = len(result.errors) + len(result.failures) super().run(result) if result and len(result.errors) + len(result.failures) > orig_err_fail and hasattr(self, 'sshd'): print('\n----- sshd log -----\n%s\n------\n' % self.sshd.stderr.read()) def test_rsa(self): '''sftp://localhost with RSA authentication''' # accept our key for localhost logins shutil.copy(os.path.expanduser('~/.ssh/id_rsa.pub'), self.authorized_keys) # mount it uri = 'sftp://localhost:22222' subprocess.check_call(['gio', 'mount', uri]) self.do_mount_check(uri) # if we are in the testbed, then ssh defaults to # "StrictHostKeyChecking ask", and a connection attempt should fail; # otherwise this is client-configurable behaviour which cannot be # temporarily overridden @unittest.skipUnless(in_testbed, 'not running under gvfs-testbed') @unittest.skipUnless(local_ip, 'not having any non-localhost IP') def test_unknown_host(self): '''sftp:// with RSA authentication for unknown host''' # accept our key for localhost logins shutil.copy(os.path.expanduser('~/.ssh/id_rsa.pub'), self.authorized_keys) # try to mount it; should fail as it's an unknown host uri = 'sftp://%s:22222' % local_ip (code, out, err) = self.program_code_out_err(['gio', 'mount', uri]) self.assertNotEqual(code, 0) # there is nothing in our testbed which would show or answer the # dialog if english_messages: self.assertTrue('Login dialog cancelled' in err, err) def do_mount_check(self, uri): # appears in gio mount list (out, err) = self.program_out_err(['gio', 'mount', '-li']) try: self.assertRegex(out, 'Mount\(\d+\):.*localhost -> %s' % uri) # check gio info out = self.program_out_success(['gio', 'info', uri]) self.assertRegex(out, 'display name: / .* localhost') self.assertTrue('type: directory' in out, out) self.assertTrue('access::can-read: TRUE' in out, out) # check gio list out = self.program_out_success(['gio', 'list', uri + '/home']) self.assertTrue('%s\n' % os.environ['USER'] in out, out) # check gio cat out = self.program_out_success(['gio', 'cat', uri + '/etc/passwd']) self.assertTrue('root:' in out, out) finally: self.unmount(uri) @unittest.skipUnless(have_twistd, 'Twisted twistd not installed') class Ftp(GvfsTestCase): def setUp(self): '''Launch FTP server''' super().setUp() with open(os.path.join(self.workdir, 'myfile.txt'), 'w') as f: f.write('hello world\n') os.mkdir(os.path.join(self.workdir, 'mydir')) secret_path = os.path.join(self.workdir, 'mydir', 'onlyme.txt') with open(secret_path, 'w') as f: f.write('secret\n') os.chmod(secret_path, 0o600) self.ftpd = subprocess.Popen(['twistd', '-n', 'ftp', '-p', '2121', '-r', self.workdir, '--auth', 'memory:testuser:pwd1'], stdout=subprocess.PIPE) # wait until server is started up s = socket.socket() for timeout in range(50): try: s.connect(('127.0.0.1', 2121)) s.close() break except ConnectionRefusedError: time.sleep(0.1) pass else: self.fail('timed out waiting for test FTP server') def tearDown(self): '''Shut down FTP server''' self.ftpd.terminate() self.ftpd.wait() super().tearDown() def test_anonymous_cli_user(self): '''ftp:// anonymous (CLI with user)''' uri = 'ftp://anonymous@localhost:2121' subprocess.check_call(['gio', 'mount', uri]) self.do_mount_check_cli(uri, True) def test_anonymous_cli_option(self): '''ftp:// anonymous (CLI with option)''' uri = 'ftp://localhost:2121' subprocess.check_call(['gio', 'mount', '-a', uri]) self.do_mount_check_cli(uri, True) def test_authenticated_cli(self): '''ftp:// authenticated (CLI)''' uri = 'ftp://localhost:2121' mount = subprocess.Popen(['gio', 'mount', uri], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # wrong user name self.wait_for_gvfs_mount_user_prompt(mount) mount.stdin.write(b'eve\nh4ck\n') mount.stdin.flush() # wrong password name self.wait_for_gvfs_mount_user_prompt(mount) mount.stdin.write(b'testuser\nh4ck\n') mount.stdin.flush() # correct credentials self.wait_for_gvfs_mount_user_prompt(mount) (out, err) = mount.communicate(b'testuser\npwd1\n') self.assertEqual(mount.returncode, 0) self.assertEqual(err, b'') # in test bed, there is nothing interesting in /home/testuser/, and # without the test bed we do not know what's in the folder, so skip # gio list check self.do_mount_check_cli(uri, False) def do_mount_check_cli(self, uri, check_contents): # appears in gio mount list (out, err) = self.program_out_err(['gio', 'mount', '-li']) try: self.assertRegex(out, 'Mount\(\d+\):.* -> ftp://([a-z0-9]+@)?localhost:2121') # check gio info out = self.program_out_success(['gio', 'info', uri]) self.assertRegex(out, 'display name: / .* localhost', out) self.assertTrue('type: directory' in out, out) # check gio list if check_contents: out = self.program_out_success(['gio', 'list', uri]) self.assertEqual(set(out.split()), set(['myfile.txt', 'mydir'])) out = self.program_out_success(['gio', 'list', uri + '/mydir']) self.assertEqual(out, 'onlyme.txt\n') # check gio cat out = self.program_out_success(['gio', 'cat', uri + '/myfile.txt']) self.assertEqual(out, 'hello world\n') finally: self.unmount(uri) def test_anonymous_api_user(self): '''ftp:// anonymous (API with user)''' uri = 'ftp://anonymous@localhost:2121' gfile = Gio.File.new_for_uri(uri) self.assertEqual(self.mount_api(gfile), True) try: self.do_mount_check_api(gfile, True) finally: self.unmount_api(gfile) def test_anonymous_api_flag(self): '''ftp:// anonymous (API with flag)''' uri = 'ftp://localhost:2121' gfile = Gio.File.new_for_uri(uri) self.assertEqual(self.mount_api(gfile, self.make_mountop()), True) try: self.do_mount_check_api(gfile, True) finally: self.unmount_api(gfile) def test_authenticated_api(self): '''ftp:// authenticated (API)''' uri = 'ftp://localhost:2121' gfile = Gio.File.new_for_uri(uri) # no password supplied res = self.mount_api(gfile) self.assertTrue(isinstance(res, GLib.GError), res) # wrong username res = self.mount_api(gfile, self.make_mountop('eve', 'h4ck')) self.assertTrue(isinstance(res, GLib.GError)) # wrong password res = self.mount_api(gfile, self.make_mountop('testuser', 'h4ck')) self.assertTrue(isinstance(res, GLib.GError)) # correct credentials res = self.mount_api(gfile, self.make_mountop('testuser', 'pwd1')) self.assertEqual(res, True) try: self.do_mount_check_api(gfile, False) finally: self.unmount_api(gfile) def do_mount_check_api(self, gfile, check_contents): info = gfile.query_info('*', 0, None) self.assertEqual(info.get_content_type(), 'inode/directory') self.assertEqual(info.get_file_type(), Gio.FileType.DIRECTORY) self.assertTrue('localhost' in info.get_display_name(), info.get_display_name()) # FIXME: this is actually supposed to be true! # self.assertEqual(info.get_attribute_boolean('access::can-read'), True) if check_contents: # check available files enum = gfile.enumerate_children('*', Gio.FileQueryInfoFlags.NONE, None) files = set() while True: info = enum.next_file(None) if info is None: break files.add(info.get_name()) self.assertEqual(files, set(['myfile.txt', 'mydir'])) gfile_myfile = Gio.File.new_for_uri(gfile.get_uri() + '/myfile.txt') (success, contents, etags) = gfile_myfile.load_contents(None) self.assertTrue(success) self.assertEqual(contents, b'hello world\n') @unittest.skipUnless(have_smbd, 'Samba smbd not installed') class Smb(GvfsTestCase): def setUp(self): '''start local smbd as user if we are not in test bed''' super().setUp() # create a few test files if in_testbed: pubdir = os.path.expanduser('~/public') privdir = os.path.expanduser('~/private') else: pubdir = os.path.join(self.workdir, 'public') privdir = os.path.join(self.workdir, 'private') if not os.path.exists(pubdir): # only run this once os.mkdir(pubdir) os.makedirs(os.path.join(privdir, 'mydir')) with open(os.path.join(pubdir, 'myfile.txt'), 'w') as f: f.write('hello world\n') secret_path = os.path.join(privdir, 'mydir', 'onlyme.txt') with open(secret_path, 'w') as f: f.write('secret\n') os.chmod(secret_path, 0o600) if in_testbed: return # smbpasswd file with password "foo" smbpasswd = os.path.join(self.workdir, 'smbpasswd') with open(smbpasswd, 'w') as f: f.write(os.environ['USER']) f.write(':2:XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX:AC8E657F83DF82BEEA5D43BDAF7800CC:[U ]:LCT-507C14C7:\n') # create local samba configuration smbdir = os.path.join(self.workdir, 'samba') os.mkdir(smbdir, 0o755) smbconf = os.path.join(self.workdir, 'smb.conf') with open(smbconf, 'w') as f: f.write('''[global] workgroup = TESTGROUP interfaces = lo 127.0.0.0/8 smb ports = %(port)d log level = 2 map to guest = Bad User passdb backend = smbpasswd smb passwd file = %(workdir)s/smbpasswd lock directory = %(workdir)s/samba state directory = %(workdir)s/samba cache directory = %(workdir)s/samba pid directory = %(workdir)s/samba private dir = %(workdir)s/samba ncalrpc dir = %(workdir)s/samba [public] path = %(workdir)s/public guest ok = yes [private] path = %(workdir)s/private read only = no ''' % {'workdir': self.workdir, 'port': SMB_USER_PORT}) # start smbd self.smbd = subprocess.Popen(['smbd', '-FS', '-s', smbconf], universal_newlines=True, stdout=subprocess.PIPE) timeout = 50 while timeout > 0: (out, err) = self.program_out_err(['ss', '-ltn']) if (':%d ' % SMB_USER_PORT) in out: break timeout -= 1 time.sleep(0.1) else: self.smbd.terminate() self.smbd.wait() self.fail('starting smbd failed') def tearDown(self): # stop smbd if hasattr(self, 'smbd') and self.smbd.returncode is None: self.smbd.terminate() self.smbd.wait() super().tearDown() def run(self, result=None): '''Show smbd log output on failed tests''' if result: orig_err_fail = len(result.errors) + len(result.failures) super().run(result) if hasattr(self, 'smbd'): if result and len(result.errors) + len(result.failures) > orig_err_fail: print('\n----- smbd log -----\n%s\n------\n' % self.smbd.stdout.read()) def test_anonymous(self): '''smb:// anonymous''' uri = 'smb://%s/public' % os.uname()[1] # ensure that this does not ask for any credentials mount = subprocess.Popen(['gio', 'mount', '-a', uri]) timeout = 50 while timeout > 0: time.sleep(0.1) timeout -= 1 if mount.poll() is not None: self.assertEqual(mount.returncode, 0, 'gio mount -a %s failed' % uri) break else: mount.terminate() self.fail('timed out waiting for gio mount -a %s' % uri) mount.wait() self.do_mount_check(uri, False) def test_authenticated(self): '''smb:// authenticated''' uri = 'smb://%s@%s/private' % (os.environ['USER'], os.uname()[1]) mount = subprocess.Popen(['gio', 'mount', uri], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # correct credentials self.wait_for_gvfs_mount_user_prompt(mount) # default domain, password (out, err) = mount.communicate(b'\nfoo\n') self.assertEqual(mount.returncode, 0, err) # self.assertEqual(err, b'') # we get some warnings self.do_mount_check(uri, True) def do_mount_check(self, uri, writable): sharename = uri.split('/')[-1] # appears in gio mount list (out, err) = self.program_out_err(['gio', 'mount', '-li']) try: self.assertRegex(out, 'Mount\(0\): %s .* smb://.*/%s' % (sharename, sharename)) # check gio info out = self.program_out_success(['gio', 'info', uri]) self.assertTrue('display name: ' + sharename in out, out) self.assertTrue('type: directory' in out, out) # check gio list and gio cat out = self.program_out_success(['gio', 'list', uri]) if sharename == 'public': self.assertEqual(out, 'myfile.txt\n') out = self.program_out_success(['gio', 'cat', uri + '/myfile.txt']) self.assertEqual(out, 'hello world\n') else: self.assertEqual(out, 'mydir\n') self.assertEqual(self.program_out_success(['gio', 'list', uri + '/mydir']), 'onlyme.txt\n') out = self.program_out_success(['gio', 'cat', uri + '/mydir/onlyme.txt']) self.assertEqual(out, 'secret\n') if writable: # should be writable self.program_out_success(['gio', 'copy', '/etc/passwd', uri + '/newfile.txt']) out = self.program_out_success(['gio', 'cat', uri + '/newfile.txt']) with open('/etc/passwd') as f: self.assertEqual(out, f.read()) else: # should not be writable (code, out, err) = self.program_code_out_err( ['gio', 'copy', '/etc/passwd', uri + '/newfile.txt']) self.assertNotEqual(code, 0) self.assertEqual(out, '') self.assertNotEqual(err, '') finally: self.unmount(uri) @unittest.skipUnless(in_testbed, 'not running under gvfs-testbed') @unittest.skipIf(os.path.exists('/sys/module/scsi_debug'), 'scsi_debug is already loaded') class Drive(GvfsTestCase): @classmethod def setUpClass(klass): '''Load scsi_debug''' klass.root_command_success('modprobe scsi_debug add_host=0 dev_size_mb=64') @classmethod def tearDownClass(klass): # remove scsi_debug; might need a few tries while being busy timeout = 10 while timeout > 0: (code, out, err) = klass.root_command('rmmod -v scsi_debug') if code == 0: break if 'in use' in err: time.sleep(0.2) else: break if code != 0: raise SystemError('cannot rmmod scsi_debug: ' + err) @classmethod def get_devices(klass): '''Return current set of device names from scsi_debug''' devs = [] for dir in glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block'): try: devs += os.listdir(dir) except OSError: # TOCTOU, might change underneath us pass return set(devs) @classmethod def create_host(klass, ptype): '''Create a new SCSI host. Return device name. ''' orig_devs = klass.get_devices() klass.root_command_success('echo %i > /sys/bus/pseudo/drivers/scsi_debug/ptype' % ptype) klass.root_command_success('echo 1 > /sys/bus/pseudo/drivers/scsi_debug/add_host') timeout = 1000 while timeout >= 0: devs = klass.get_devices() if devs - orig_devs: break time.sleep(0.2) timeout -= 1 else: raise SystemError('timed out waiting for new device') new_devs = devs - orig_devs assert len(new_devs) == 1 return new_devs.pop() @classmethod def remove_device(klass, device): '''Remove given device name.''' klass.root_command_success('echo 1 > /sys/block/%s/device/delete' % device) def load_image(self, fname): '''Install a test image on the scsi_debug drive This must be a bzip2'ed file in test/files/. ''' # we cannot write to a scsi_debug CD drive, so write it into it in hard # disk mode dev = self.create_host(PTYPE_DISK) # put test.iso onto disk img = os.path.join(my_dir, 'files', fname) self.root_command_success('bzip2 -cd %s > /dev/%s; sync' % (img, dev)) # leave the actual device creation to the individual tests; all devices # created henceforth will default to the image contents self.remove_device(dev) while dev in self.get_devices(): time.sleep(0.2) # flush volume monitor output ctx = GLib.MainContext().default() while ctx.iteration(False): pass self.monitor.stdout.read() def setUp(self): super().setUp() self.mock_polkit = None # do not use the mocked /sys for these tests, but the real one # (GvfsTestCase.tearDown() puts it back via .clear()) if umockdev_testbed: umockdev_testbed.disable() self.monitor = subprocess.Popen(['gio', 'mount', '-oi'], stdout=subprocess.PIPE) # set monitor stdout to non-blocking fl = fcntl.fcntl(self.monitor.stdout, fcntl.F_GETFL) fcntl.fcntl(self.monitor.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK) # wait until monitor is ready while 'Monitoring events' not in self.get_monitor_output(): time.sleep(0.1) def tearDown(self): for dev in self.get_devices(): self.remove_device(dev) self.monitor.terminate() self.monitor.wait() self.stop_polkit() super().tearDown() def test_cdrom(self): '''drive mount: cdrom''' self.load_image('bogus-cd.iso.bz2') dev = self.create_host(PTYPE_CDROM) # check that gvfs monitor picks up the new drive out = self.get_monitor_output() self.assertRegex(out, 'Drive connected:') self.assertRegex(out, '\[drive-optical\]') self.assertRegex(out, '\[media-optical-cd\]') self.assertRegex(out, 'unix-device:.*/dev/%s' % dev) self.assertRegex(out, 'has_media=1') self.assertRegex(out, 'Volume added:\s+.*bogus-cd') self.assertRegex(out, "label:\s+'bogus-cd") self.assertRegex(out, 'can_mount=1') self.assertRegex(out, 'should_automount=1') self.assertRegex(out, 'themed icons:.*media-optical') # tell polkit to do allow removable (but not internal) storage self.start_polkit(['org.freedesktop.udisks2.filesystem-mount', 'org.freedesktop.udisks2.filesystem-mount-other-seat']) # now mounting should succeed (out, err) = self.program_out_err(['gio', 'mount', '-d', '/dev/' + dev]) # should appear as Mount (out, err) = self.program_out_err(['gio', 'mount', '-li']) self.assertEqual(err.strip(), '') match = re.search('Mount\(\d+\): bogus-cd -> (file://.*/media/.*/bogus-cd)', out) self.assertTrue(match, 'no Mount found in gio mount -li output:\n' + out) # unmount it again self.unmount(match.group(1)) def test_cdrom_api(self): '''drive mount: cdrom with Gio API''' self.load_image('bogus-cd.iso.bz2') self.start_polkit(['org.freedesktop.udisks2.filesystem-mount', 'org.freedesktop.udisks2.filesystem-mount-other-seat']) self.bogus_volume = None # add CD and wait for it to appear in the monitor def volume_added(vm, v, main_loop): if v.get_name() == 'bogus-cd': self.bogus_volume = v main_loop.quit() vm = Gio.VolumeMonitor.get() ml = GLib.MainLoop() vm.connect('volume-added', volume_added, ml) dev = self.create_host(PTYPE_CDROM) timeout_id = GLib.timeout_add_seconds(5, lambda data: ml.quit(), None) ml.run() self.assertNotEqual(self.bogus_volume, None, 'timed out waiting for bogus-cd volume') ml.get_context().find_source_by_id(timeout_id).destroy() # check properties ids = self.bogus_volume.enumerate_identifiers() self.assertTrue('unix-device' in ids, ids) self.assertTrue('label' in ids, ids) self.assertEqual(self.bogus_volume.get_identifier('unix-device'), '/dev/' + dev) self.assertEqual(self.bogus_volume.get_identifier('label'), 'bogus-cd') self.assertEqual(self.bogus_volume.get_mount(), None) # mount it self.cb_result = None def mount_done(obj, result, main_loop): main_loop.quit() try: success = obj.mount_finish(result) self.cb_result = (obj, success) except GLib.GError as e: self.cb_result = (obj, e) self.bogus_volume.mount(Gio.MountMountFlags.NONE, None, None, mount_done, ml) timeout_id = GLib.timeout_add_seconds(5, lambda data: ml.quit(), None) ml.run() self.assertNotEqual(self.cb_result, None, 'timed out waiting for bogus-cd mount') ml.get_context().find_source_by_id(timeout_id).destroy() self.assertEqual(self.cb_result[1], True) self.assertEqual(self.cb_result[0], self.bogus_volume) # get Mount object mount = self.bogus_volume.get_mount() self.assertNotEqual(mount, None) self.assertEqual(mount.get_name(), 'bogus-cd') p = mount.get_root().get_path() self.assertTrue(os.path.isdir(p), p) self.assertTrue(os.path.isfile(os.path.join(p, 'hello.txt'))) self.assertTrue('/media/' in p, p) self.assertEqual(mount.get_volume(), self.bogus_volume) # unmount self.cb_result = None def unmount_done(obj, result, main_loop): main_loop.quit() try: success = obj.unmount_with_operation_finish(result) self.cb_result = (obj, success) except GLib.GError as e: self.cb_result = (obj, e) mount.unmount_with_operation(Gio.MountUnmountFlags.NONE, None, None, unmount_done, ml) timeout_id = GLib.timeout_add_seconds(5, lambda data: ml.quit(), None) ml.run() self.assertEqual(self.cb_result[1], True) self.assertEqual(self.bogus_volume.get_mount(), None) def test_system_partition(self): '''drive mount: system partition''' self.load_image('vfat.img.bz2') dev = self.create_host(PTYPE_DISK) # check that gvfs monitor picks up the new drive out = self.get_monitor_output() self.assertRegex(out, 'Drive connected:') self.assertRegex(out, '\[drive-harddisk\]') self.assertRegex(out, 'unix-device:.*/dev/%s' % dev) self.assertRegex(out, 'has_media=1') self.assertRegex(out, 'Volume added:\s+.*testvfat') self.assertRegex(out, "label:\s+'testvfat") self.assertRegex(out, 'should_automount=0') self.assertRegex(out, 'themed icons:.*harddisk') # should fail with only allowing the user to mount removable storage self.start_polkit(['org.freedesktop.udisks2.filesystem-mount']) (code, out, err) = self.program_code_out_err(['gio', 'mount', '-d', '/dev/' + dev]) self.assertNotEqual(code, 0) self.assertRegex(err, 'Not authorized') # should succeed with allowing the user to mount system storage self.start_polkit(['org.freedesktop.udisks2.filesystem-mount-system']) (out, err) = self.program_out_err(['gio', 'mount', '-d', '/dev/' + dev]) # should appear as Mount (out, err) = self.program_out_err(['gio', 'mount', '-li']) self.assertEqual(err.strip(), '') match = re.search('Mount\(\d+\): testvfat -> (file://.*/media/.*/testvfat)', out) self.assertTrue(match, 'no Mount found in gio mount -li output:\n' + out) # unmount it again self.unmount(match.group(1)) def test_system_partition_api(self): '''drive mount: system partition with Gio API''' self.load_image('vfat.img.bz2') self.start_polkit(['org.freedesktop.udisks2.filesystem-mount-system']) self.volume = None # add partition and wait for it to appear in the monitor def volume_added(vm, v, main_loop): if v.get_name() == 'testvfat': self.volume = v main_loop.quit() vm = Gio.VolumeMonitor.get() ml = GLib.MainLoop() vm.connect('volume-added', volume_added, ml) dev = self.create_host(PTYPE_DISK) timeout_id = GLib.timeout_add_seconds(5, lambda data: ml.quit(), None) ml.run() self.assertNotEqual(self.volume, None, 'timed out waiting for volume') ml.get_context().find_source_by_id(timeout_id).destroy() # check properties ids = self.volume.enumerate_identifiers() self.assertTrue('unix-device' in ids, ids) self.assertTrue('label' in ids, ids) self.assertTrue('uuid' in ids, ids) self.assertEqual(self.volume.get_identifier('unix-device'), '/dev/' + dev) self.assertEqual(self.volume.get_identifier('label'), 'testvfat') self.assertEqual(self.volume.get_identifier('uuid'), 'F3C1-6301') def test_media_player(self): '''drive mount: media player''' self.load_image('bogus-cd.iso.bz2') def cleanup(): rootsh = subprocess.Popen(['./rootsh', '-p'], stdin=subprocess.PIPE) rootsh.communicate(b'''rm /run/udev/rules.d/40-scsi_debug-fake-mediaplayer.rules pkill --signal HUP udevd || pkill --signal HUP systemd-udevd ''') # create udev rule to turn it into a music player self.addCleanup(cleanup) rootsh = subprocess.Popen(['./rootsh', '-p'], stdin=subprocess.PIPE) rootsh.communicate(b'''export PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin mkdir -p /run/udev/rules.d echo 'SUBSYSTEM=="block", ATTRS{model}=="scsi_debug*", ENV{ID_MEDIA_PLAYER}="MockTune"' > /run/udev/rules.d/40-scsi_debug-fake-mediaplayer.rules sync pkill --signal HUP udevd || pkill --signal HUP systemd-udevd ''') dev = self.create_host(PTYPE_DISK) # check that gvfs monitor picks up the new volume out = self.get_monitor_output() self.assertRegex(out, 'Volume added:\s+.*bogus-cd') self.assertRegex(out, "label:\s+'bogus-cd") self.assertTrue('should_automount=0' in out, out) self.assertRegex(out, 'themed icons:.*harddisk') # mount it self.start_polkit(['org.freedesktop.udisks2.filesystem-mount-system']) (out, err) = self.program_out_err(['gio', 'mount', '-d', '/dev/' + dev]) # should appear as Mount (out, err) = self.program_out_err(['gio', 'mount', '-li']) self.assertEqual(err.strip(), '') match = re.search('Mount\(\d+\): bogus-cd -> (file://.*/media/.*/bogus-cd)', out) self.assertTrue(match, 'no Mount found in gio mount -li output:\n' + out) # should have media player content self.assertRegex(out, 'x_content_types:.*x-content/audio-player') # unmount it again self.unmount(match.group(1)) def get_monitor_output(self): '''Wait for gvfs monitor to output something, and return it''' empty_timeout = 50 while True: out = self.monitor.stdout.read() if out: break else: empty_timeout -= 1 self.assertGreater(empty_timeout, 0, 'timed out waiting for monitor output') time.sleep(0.1) # wait a bit more to see whether we catch some stragglers time.sleep(0.2) out2 = self.monitor.stdout.read() if out2: out += out2 return out.decode() def start_polkit(self, actions): '''Start mock polkit with list of allowed actions.''' self.stop_polkit() self.mock_polkit = subprocess.Popen(['./rootsh', '-p'], stdin=subprocess.PIPE) self.mock_polkit.stdin.write(('set -e\n/home/test_polkitd.py -r -a %s\n' % ','.join(actions)).encode('ASCII')) self.mock_polkit.stdin.flush() # wait until it started up if actions: timeout = 50 while timeout > 0: try: out = subprocess.check_output(['pkcheck', '--action-id', actions[0], '--process', '1'], stderr=subprocess.PIPE) if b'test=test' in out: break except subprocess.CalledProcessError: pass time.sleep(0.1) timeout -= 1 else: self.fail('timed out waiting for test_polkitd.py') else: # we can only cross fingers here, as we do not have an action to verify time.sleep(0.5) self.assertEqual(self.mock_polkit.poll(), None, 'mock polkitd unexpectedly terminated') def stop_polkit(self): '''Stop mock polkit, if it is running.''' if self.mock_polkit: # for some reason, terminating the shell doesn't terminate the # polkitd running in it, so kill that separately self.root_command('kill `pidof -x /home/test_polkitd.py`') self.mock_polkit.terminate() self.mock_polkit.wait() self.mock_polkit = None @unittest.skipUnless(have_httpd, 'Apache httpd not installed') @unittest.skipUnless(have_dav_backend, 'Dav backend not enabled') class Dav(GvfsTestCase): '''Test WebDAV backend''' @classmethod def setUpClass(klass): '''Set up Apache httpd sandbox''' klass.mod_dir = klass.get_httpd_module_dir() klass.httpd_sandbox = tempfile.mkdtemp() klass.public_dir = os.path.join(klass.httpd_sandbox, 'public') os.mkdir(klass.public_dir) with open(os.path.join(klass.public_dir, 'hello.txt'), 'w') as f: f.write('hi\n') klass.secret_dir = os.path.join(klass.httpd_sandbox, 'secret') os.mkdir(klass.secret_dir) with open(os.path.join(klass.secret_dir, 'restricted.txt'), 'w') as f: f.write('dont tell anyone\n') # test:s3kr1t with open(os.path.join(klass.httpd_sandbox, 'htpasswd'), 'w') as f: f.write('test:$apr1$t0B4mfkT$Tr8ip333/ZR/7xrRBuxjI.\n') # some distros have some extra modules which we need to load modules = '' for m in ['authn_core', 'authz_core', 'authz_user', 'auth_basic', 'authn_file', 'mpm_prefork', 'unixd', 'dav', 'dav_fs', 'ssl']: if os.path.exists(os.path.join(klass.mod_dir, 'mod_%s.so' % m)): modules += 'LoadModule %s_module %s/mod_%s.so\n' % (m, klass.mod_dir, m) with open(os.path.join(klass.httpd_sandbox, 'apache2.conf'), 'w') as f: f.write('''Listen localhost:8088 Listen localhost:4443 %(modules)s DocumentRoot . ServerName localhost PidFile apache.pid LogLevel debug ErrorLog error_log DAVLockDB DAVLock SSLEngine on SSLCertificateFile %(mydir)s/files/testcert.pem SSLCertificateKeyFile %(mydir)s/files/testcert.pem Dav On Dav On AuthType Basic AuthName DAV AuthUserFile htpasswd Require valid-user ''' % {'mod_dir': klass.mod_dir, 'root': klass.httpd_sandbox, 'modules': modules, 'mydir': my_dir}) # start server try: subprocess.check_call([httpd_cmd, '-d', klass.httpd_sandbox, '-f', 'apache2.conf', '-k', 'start']) except subprocess.CalledProcessError: error_log = os.path.join(klass.httpd_sandbox, 'error_log') if os.path.exists(error_log): with open(error_log) as f: print('---- apache http error log ----\n%s\n---------\n' % f.read()) raise @classmethod def tearDownClass(klass): '''Stop httpd server and remove sandbox''' subprocess.call([httpd_cmd, '-d', klass.httpd_sandbox, '-f', 'apache2.conf', '-k', 'stop']) shutil.rmtree(klass.httpd_sandbox) @classmethod def get_httpd_module_dir(klass): '''Return module directory for Apache httpd. Unfortunately this is highly distro/platform specific, so try to determine it from apxs2 or apachectl/apache2. ''' # if we have apxs2 installed, use this try: apxs2 = subprocess.Popen(['apxs2', '-q', 'LIBEXECDIR'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) out = apxs2.communicate()[0].strip() assert apxs2.returncode == 0, 'apxs2 -V failed' return out except OSError: # Look for apxs instead try: apxs2 = subprocess.Popen(['apxs', '-q', 'LIBEXECDIR'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) out = apxs2.communicate()[0].strip() assert apxs2.returncode == 0, 'apxs2 -V failed' return out except OSError: print('[no apxs2, falling back]') pass # fall back to looking for modules in HTTPD_ROOT/modules/ ctl = subprocess.Popen([httpd_cmd, '-V'], stdout=subprocess.PIPE, universal_newlines=True) out = ctl.communicate()[0] assert ctl.returncode == 0, httpd_cmd + ' -V failed' m = re.search('\sHTTPD_ROOT="([^"]+)"\s', out) assert m, httpd_cmd + ' -V does not show HTTPD_ROOT' mod_dir = os.path.join(m.group(1), 'modules') assert os.path.isdir(mod_dir), \ '%s does not exist, cannot determine httpd module path' % mod_dir return mod_dir def test_http_noauth(self): '''dav://localhost without credentials''' uri = 'dav://localhost:8088/public' subprocess.check_call(['gio', 'mount', uri]) self.do_mount_check(uri, 'hello.txt', 'hi\n') def test_https_noauth(self): '''davs://localhost without credentials''' uri = 'davs://localhost:4443/public' mount = subprocess.Popen(['gio', 'mount', uri], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # confirm unknown certificate self.wait_for_question (mount); mount.stdin.write(b'1\n') mount.stdin.flush() self.do_mount_check(uri, 'hello.txt', 'hi\n') def test_http_auth(self): '''dav://localhost with credentials''' uri = 'dav://localhost:8088/secret' mount = subprocess.Popen(['gio', 'mount', uri], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # wrong password self.wait_for_gvfs_mount_user_prompt(mount) mount.stdin.write(b'test\nh4ck\n') mount.stdin.flush() # correct password (out, err) = mount.communicate(b's3kr1t\n') self.assertEqual(mount.returncode, 0) self.assertEqual(err, b'') self.do_mount_check(uri, 'restricted.txt', 'dont tell anyone\n') def test_https_auth(self): '''davs://localhost with credentials''' uri = 'davs://localhost:4443/secret' mount = subprocess.Popen(['gio', 'mount', uri], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # confirm unknown certificate self.wait_for_question (mount); mount.stdin.write(b'1\n') mount.stdin.flush() # wrong password self.wait_for_gvfs_mount_user_prompt(mount) mount.stdin.write(b'test\nh4ck\n') mount.stdin.flush() # correct password (out, err) = mount.communicate(b's3kr1t\n') self.assertEqual(mount.returncode, 0) self.assertEqual(err, b'') self.do_mount_check(uri, 'restricted.txt', 'dont tell anyone\n') def do_mount_check(self, uri, testfile, content): # appears in gio mount list (out, err) = self.program_out_err(['gio', 'mount', '-li']) try: self.assertRegex(out, 'Mount\(\d+\):.* -> davs?://([a-z0-9]+@)?localhost') # check gio info out = self.program_out_success(['gio', 'info', uri]) self.assertRegex(out, 'id::filesystem: dav') self.assertTrue('type: directory' in out, out) # check gio list out = self.program_out_success(['gio', 'list', uri]) self.assertEqual(out.strip(), testfile) # check gio cat out = self.program_out_success(['gio', 'cat', uri + '/' + testfile]) self.assertEqual(out, content) # create a new file self.program_out_success(['gio', 'copy', uri + '/' + testfile, uri + '/foo']) out = self.program_out_success(['gio', 'cat', uri + '/foo']) self.assertEqual(out, content) # remove it again self.program_out_success(['gio', 'remove', uri + '/foo']) out = self.program_out_success(['gio', 'list', uri]) self.assertFalse('foo' in out.split(), out) finally: self.unmount(uri) class Trash(GvfsTestCase): def setUp(self): super().setUp() self.gfile_trash = Gio.File.new_for_uri('trash://') # double-check that we are really running with a temporary # $XDG_DATA_HOME and that gvfs respects it, or under gvfs-testbed self.assertEqual(self.files_in_trash(), set()) self.my_file = None def tearDown(self): if self.my_file: if os.path.exists(self.my_file): os.unlink(self.my_file) # clean up the trash, for predictable test cases for f in self.files_in_trash(): # print('cleaning up trash:///' + f) subprocess.call(['gio', 'remove', 'trash:///' + f]) super().tearDown() def files_in_trash(self): files = set() enum = self.gfile_trash.enumerate_children('*', Gio.FileQueryInfoFlags.NONE, None) while True: info = enum.next_file(None) if info is None: break files.add(info.get_name()) return files def test_file_in_home_cli(self): '''trash:// deletion, attributes, restoring for a file in $HOME (CLI)''' # create test file self.my_file = os.path.expanduser('~/hello_gvfs_tests.txt') with open(self.my_file, 'w') as f: f.write('hello world\n') # trash it del_time = time.time() subprocess.check_call(['gio', 'trash', self.my_file]) # should now be gone self.assertFalse(os.path.exists(self.my_file)) # and be in the trash self.assertEqual(self.files_in_trash(), set(['hello_gvfs_tests.txt'])) out = self.program_out_success(['gio', 'info', 'trash:///hello_gvfs_tests.txt']) # has proper original path self.assertTrue('trash::orig-path: ' + self.my_file in out, out) # has proper deletion time m = re.search('trash::deletion-date: (.*)\n', out) self.assertNotEqual(m, None) recorded_time = time.mktime(time.strptime(m.group(1), '%Y-%m-%dT%H:%M:%S')) self.assertLess(abs(recorded_time - del_time), 2.0) # is saved in home trash, not by-device trash data_home = os.environ.get('XDG_DATA_HOME', os.path.expanduser('~/.local/share')) self.assertTrue('standard::target-uri: file://' + data_home in out, out) def test_file_in_home_api(self): '''trash:// deletion, attributes, restoring for a file in $HOME (API)''' # create test file self.my_file = os.path.expanduser('~/hello_gvfs_tests.txt') with open(self.my_file, 'w') as f: f.write('hello world\n') gfile = Gio.File.new_for_path(self.my_file) self.assertTrue(gfile.trash(None)) # should now be gone self.assertFalse(os.path.exists(self.my_file)) # and be in the trash self.assertEqual(self.files_in_trash(), set(['hello_gvfs_tests.txt'])) def test_deletion_with_same_path(self): '''trash:// deletion of two files with the same path''' # create test file self.my_file = os.path.expanduser('~/hello_gvfs_tests.txt') with open(self.my_file, 'w') as f: f.write('hello world\n') gfile = Gio.File.new_for_path(self.my_file) self.assertTrue(gfile.trash(None)) self.assertFalse(os.path.exists(self.my_file)) # and re-create/re-trash it again self.my_file = os.path.expanduser('~/hello_gvfs_tests.txt') with open(self.my_file, 'w') as f: f.write('bye bye\n') gfile = Gio.File.new_for_path(self.my_file) self.assertTrue(gfile.trash(None)) self.assertFalse(os.path.exists(self.my_file)) # should have two trash entries now with tame original path enum = self.gfile_trash.enumerate_children('*', Gio.FileQueryInfoFlags.NONE, None) count = 0 while True: info = enum.next_file(None) if info is None: break count += 1 self.assertEqual(info.get_attribute_byte_string('trash::orig-path'), self.my_file) self.assertEqual(count, 2) def test_file_in_system(self): '''trash:// deletion for system location This either should work if /tmp/ is a partition on its own writable to the user (such as a tmpfs), or fail gracefully without deleting the file. ''' # create test file self.my_file = os.path.join(self.workdir, 'hello_gvfs_tests.txt') with open(self.my_file, 'w') as f: f.write('hello world\n') # try to trash it trash = subprocess.Popen(['gio', 'trash', self.my_file], stderr=subprocess.PIPE) trash.communicate() if trash.returncode == 0: self.assertFalse(os.path.exists(self.my_file)) if os.stat('/tmp').st_dev == os.stat(os.environ['XDG_DATA_HOME']).st_dev: self.assertTrue(os.path.exists(os.path.join(os.environ['XDG_DATA_HOME'], 'Trash/files/hello_gvfs_tests.txt'))) else: self.assertTrue(os.path.exists('/tmp/.Trash-%i/files/hello_gvfs_tests.txt' % os.getuid())) else: # file should still be there self.assertTrue(os.path.exists(self.my_file)) @unittest.skipUnless(have_umockdev, 'umockdev not installed; get it from https://launchpad.net/umockdev') class GPhoto(GvfsTestCase): def test_mount_api(self): '''gphoto2:// mount with Gio API''' self.add_powershot() uri = 'gphoto2://[usb:001,015]' gfile_mount = Gio.File.new_for_uri(uri) self.assertEqual(self.mount_api(gfile_mount), True) try: # check top-level directory info = gfile_mount.query_info('*', 0, None) self.assertEqual(info.get_content_type(), 'inode/directory') self.assertEqual(info.get_file_type(), Gio.FileType.DIRECTORY) self.assertIn('camera', info.get_display_name().lower()) self.assertEqual(info.get_attribute_boolean('access::can-read'), True) # check a photo gfile = Gio.File.new_for_uri(uri + '/DCIM/100CANON/IMG_0001.JPG') # FIXME: The first call always fails (only with umockdev) try: info = gfile.query_info('*', 0, None) except GLib.GError: info = gfile.query_info('*', 0, None) self.assertEqual(info.get_content_type(), 'image/jpeg') self.assertEqual(info.get_file_type(), Gio.FileType.REGULAR) # we don't care about capitalization self.assertEqual(info.get_display_name().lower(), 'img_0001.jpg') self.assertEqual(info.get_attribute_boolean('access::can-read'), True) self.assertEqual(info.get_attribute_boolean('access::can-write'), True) # open photo stream = gfile.read(None) block = stream.read_bytes(20, None) self.assertIn(b'JFIF\x00', block.get_data()) stream.close(None) # nonexisting file gfile = Gio.File.new_for_uri(uri + '/DCIM/100CANON/IMG_9999.JPG') self.assertRaises(GLib.GError, gfile.query_info, '*', 0, None) finally: self.unmount_api(gfile_mount) def add_powershot(self): '''Add PowerShot device and ioctls to umockdev testbed''' with open(os.path.join(my_dir, 'files', 'powershot.umockdev')) as f: umockdev_testbed.add_from_string(f.read()) umockdev_testbed.load_ioctl('/dev/bus/usb/001/015', os.path.join(my_dir, 'files', 'powershot.ioctl')) # signal our monitor about the addition # umockdev_testbed.uevent('/sys/devices/pci0000:00/0000:00:1a.0/usb1/1-1/1-1.5/1-1.5.2/1-1.5.2.3', 'add'); def start_dbus(): '''Run a local D-BUS daemon under temporary XDG directories This also runs the D-BUS daemon under umockdev-wrapper (if available), so that it will see fake umockdev devices. Return temporary XDG home directory. ''' global dbus_daemon # use temporary config/data/runtime directories; NB that these need to be # in g_get_home_dir(), otherwise you can't trash files as this doesn't work # across fs boundaries # if/once https://bugzilla.gnome.org/show_bug.cgi?id=142568 gets fixed, we # can put it into a proper temp dir again temp_home = tempfile.mkdtemp(prefix='gvfs_test', dir=GLib.get_home_dir()) os.environ['XDG_CONFIG_HOME'] = os.path.join(temp_home, 'config') os.environ['XDG_DATA_HOME'] = os.path.join(temp_home, 'data') if os.path.exists('session.conf'): dbus_conf = 'session.conf' else: # for out-of-tree builds dbus_conf = os.path.join(os.path.dirname(__file__), 'session.conf') env = os.environ.copy() env['G_MESSAGES_DEBUG'] = 'all' env['GVFS_DEBUG'] = 'all' env['GVFS_SMB_DEBUG'] = '10' env['GVFS_HTTP_DEBUG'] = 'all' if not in_testbed: env['LIBSMB_PROG'] = "nc localhost %d" % SMB_USER_PORT # run local D-BUS; if we run this in a built tree, use our config to pick # up the built services, otherwise the standard session one if os.path.exists(dbus_conf): argv = ['dbus-daemon', '--config-file', dbus_conf, '--print-address=1'] else: argv = ['dbus-daemon', '--session', '--print-address=1'] if umockdev_testbed: argv.insert(0, 'umockdev-wrapper') # Python doesn't catch the setenv() from UMockdev.Testbed.new() env['UMOCKDEV_DIR'] = umockdev_testbed.get_root_dir() dbus_daemon = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) addr = dbus_daemon.stdout.readline().decode() os.environ['DBUS_SESSION_BUS_ADDRESS'] = addr # set dbus output to nonblocking flags = fcntl.fcntl(dbus_daemon.stdout, fcntl.F_GETFL) fcntl.fcntl(dbus_daemon.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK) flags = fcntl.fcntl(dbus_daemon.stderr, fcntl.F_GETFL) fcntl.fcntl(dbus_daemon.stderr, fcntl.F_SETFL, flags | os.O_NONBLOCK) return temp_home if __name__ == '__main__': # do not break tests due to translations try: del os.environ['LANGUAGE'] except KeyError: pass os.environ['LC_ALL'] = 'C' # we need to create the umockdev testbed before launching D-BUS, so # that all spawned gvfs daemons see it if have_umockdev: umockdev_testbed = UMockdev.Testbed.new() temp_home = start_dbus() try: unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2)) finally: dbus_daemon.terminate() dbus_daemon.wait() # it might take a while until the child processes terminate and # release usage of the home dir, so try several times timeout = 20 while timeout > 0: try: shutil.rmtree(temp_home) break except OSError as e: timeout -= 1 time.sleep(0.1) if timeout <= 0: raise