diff options
Diffstat (limited to 'bzrlib/tests/test_transport.py')
-rw-r--r-- | bzrlib/tests/test_transport.py | 1147 |
1 files changed, 1147 insertions, 0 deletions
diff --git a/bzrlib/tests/test_transport.py b/bzrlib/tests/test_transport.py new file mode 100644 index 0000000..7f1e4fa --- /dev/null +++ b/bzrlib/tests/test_transport.py @@ -0,0 +1,1147 @@ +# Copyright (C) 2005-2011 Canonical Ltd +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + + +from cStringIO import StringIO +import errno +import os +import subprocess +import sys +import threading + +from bzrlib import ( + errors, + osutils, + tests, + transport, + urlutils, + ) +from bzrlib.directory_service import directories +from bzrlib.transport import ( + chroot, + fakenfs, + http, + local, + location_to_url, + memory, + pathfilter, + readonly, + ) +import bzrlib.transport.trace +from bzrlib.tests import ( + features, + test_server, + ) + + +# TODO: Should possibly split transport-specific tests into their own files. + + +class TestTransport(tests.TestCase): + """Test the non transport-concrete class functionality.""" + + def test__get_set_protocol_handlers(self): + handlers = transport._get_protocol_handlers() + self.assertNotEqual([], handlers.keys()) + transport._clear_protocol_handlers() + self.addCleanup(transport._set_protocol_handlers, handlers) + self.assertEqual([], transport._get_protocol_handlers().keys()) + + def test_get_transport_modules(self): + handlers = transport._get_protocol_handlers() + self.addCleanup(transport._set_protocol_handlers, handlers) + # don't pollute the current handlers + transport._clear_protocol_handlers() + + class SampleHandler(object): + """I exist, isnt that enough?""" + transport._clear_protocol_handlers() + transport.register_transport_proto('foo') + transport.register_lazy_transport('foo', + 'bzrlib.tests.test_transport', + 'TestTransport.SampleHandler') + transport.register_transport_proto('bar') + transport.register_lazy_transport('bar', + 'bzrlib.tests.test_transport', + 'TestTransport.SampleHandler') + self.assertEqual([SampleHandler.__module__, + 'bzrlib.transport.chroot', + 'bzrlib.transport.pathfilter'], + transport._get_transport_modules()) + + def test_transport_dependency(self): + """Transport with missing dependency causes no error""" + saved_handlers = transport._get_protocol_handlers() + self.addCleanup(transport._set_protocol_handlers, saved_handlers) + # don't pollute the current handlers + transport._clear_protocol_handlers() + transport.register_transport_proto('foo') + transport.register_lazy_transport( + 'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler') + try: + transport.get_transport_from_url('foo://fooserver/foo') + except errors.UnsupportedProtocol, e: + e_str = str(e) + self.assertEquals('Unsupported protocol' + ' for url "foo://fooserver/foo":' + ' Unable to import library "some_lib":' + ' testing missing dependency', str(e)) + else: + self.fail('Did not raise UnsupportedProtocol') + + def test_transport_fallback(self): + """Transport with missing dependency causes no error""" + saved_handlers = transport._get_protocol_handlers() + self.addCleanup(transport._set_protocol_handlers, saved_handlers) + transport._clear_protocol_handlers() + transport.register_transport_proto('foo') + transport.register_lazy_transport( + 'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler') + transport.register_lazy_transport( + 'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler') + t = transport.get_transport_from_url('foo://fooserver/foo') + self.assertTrue(isinstance(t, BackupTransportHandler)) + + def test_ssh_hints(self): + """Transport ssh:// should raise an error pointing out bzr+ssh://""" + try: + transport.get_transport_from_url('ssh://fooserver/foo') + except errors.UnsupportedProtocol, e: + e_str = str(e) + self.assertEquals('Unsupported protocol' + ' for url "ssh://fooserver/foo":' + ' bzr supports bzr+ssh to operate over ssh,' + ' use "bzr+ssh://fooserver/foo".', + str(e)) + else: + self.fail('Did not raise UnsupportedProtocol') + + def test_LateReadError(self): + """The LateReadError helper should raise on read().""" + a_file = transport.LateReadError('a path') + try: + a_file.read() + except errors.ReadError, error: + self.assertEqual('a path', error.path) + self.assertRaises(errors.ReadError, a_file.read, 40) + a_file.close() + + def test_local_abspath_non_local_transport(self): + # the base implementation should throw + t = memory.MemoryTransport() + e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't') + self.assertEqual('memory:///t is not a local path.', str(e)) + + +class TestCoalesceOffsets(tests.TestCase): + + def check(self, expected, offsets, limit=0, max_size=0, fudge=0): + coalesce = transport.Transport._coalesce_offsets + exp = [transport._CoalescedOffset(*x) for x in expected] + out = list(coalesce(offsets, limit=limit, fudge_factor=fudge, + max_size=max_size)) + self.assertEqual(exp, out) + + def test_coalesce_empty(self): + self.check([], []) + + def test_coalesce_simple(self): + self.check([(0, 10, [(0, 10)])], [(0, 10)]) + + def test_coalesce_unrelated(self): + self.check([(0, 10, [(0, 10)]), + (20, 10, [(0, 10)]), + ], [(0, 10), (20, 10)]) + + def test_coalesce_unsorted(self): + self.check([(20, 10, [(0, 10)]), + (0, 10, [(0, 10)]), + ], [(20, 10), (0, 10)]) + + def test_coalesce_nearby(self): + self.check([(0, 20, [(0, 10), (10, 10)])], + [(0, 10), (10, 10)]) + + def test_coalesce_overlapped(self): + self.assertRaises(ValueError, + self.check, [(0, 15, [(0, 10), (5, 10)])], + [(0, 10), (5, 10)]) + + def test_coalesce_limit(self): + self.check([(10, 50, [(0, 10), (10, 10), (20, 10), + (30, 10), (40, 10)]), + (60, 50, [(0, 10), (10, 10), (20, 10), + (30, 10), (40, 10)]), + ], [(10, 10), (20, 10), (30, 10), (40, 10), + (50, 10), (60, 10), (70, 10), (80, 10), + (90, 10), (100, 10)], + limit=5) + + def test_coalesce_no_limit(self): + self.check([(10, 100, [(0, 10), (10, 10), (20, 10), + (30, 10), (40, 10), (50, 10), + (60, 10), (70, 10), (80, 10), + (90, 10)]), + ], [(10, 10), (20, 10), (30, 10), (40, 10), + (50, 10), (60, 10), (70, 10), (80, 10), + (90, 10), (100, 10)]) + + def test_coalesce_fudge(self): + self.check([(10, 30, [(0, 10), (20, 10)]), + (100, 10, [(0, 10)]), + ], [(10, 10), (30, 10), (100, 10)], + fudge=10) + + def test_coalesce_max_size(self): + self.check([(10, 20, [(0, 10), (10, 10)]), + (30, 50, [(0, 50)]), + # If one range is above max_size, it gets its own coalesced + # offset + (100, 80, [(0, 80)]),], + [(10, 10), (20, 10), (30, 50), (100, 80)], + max_size=50) + + def test_coalesce_no_max_size(self): + self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])], + [(10, 10), (20, 10), (30, 50), (80, 100)], + ) + + def test_coalesce_default_limit(self): + # By default we use a 100MB max size. + ten_mb = 10 * 1024 * 1024 + self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]), + (10*ten_mb, ten_mb, [(0, ten_mb)])], + [(i*ten_mb, ten_mb) for i in range(11)]) + self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])], + [(i * ten_mb, ten_mb) for i in range(11)], + max_size=1*1024*1024*1024) + + +class TestMemoryServer(tests.TestCase): + + def test_create_server(self): + server = memory.MemoryServer() + server.start_server() + url = server.get_url() + self.assertTrue(url in transport.transport_list_registry) + t = transport.get_transport_from_url(url) + del t + server.stop_server() + self.assertFalse(url in transport.transport_list_registry) + self.assertRaises(errors.UnsupportedProtocol, + transport.get_transport, url) + + +class TestMemoryTransport(tests.TestCase): + + def test_get_transport(self): + memory.MemoryTransport() + + def test_clone(self): + t = memory.MemoryTransport() + self.assertTrue(isinstance(t, memory.MemoryTransport)) + self.assertEqual("memory:///", t.clone("/").base) + + def test_abspath(self): + t = memory.MemoryTransport() + self.assertEqual("memory:///relpath", t.abspath('relpath')) + + def test_abspath_of_root(self): + t = memory.MemoryTransport() + self.assertEqual("memory:///", t.base) + self.assertEqual("memory:///", t.abspath('/')) + + def test_abspath_of_relpath_starting_at_root(self): + t = memory.MemoryTransport() + self.assertEqual("memory:///foo", t.abspath('/foo')) + + def test_append_and_get(self): + t = memory.MemoryTransport() + t.append_bytes('path', 'content') + self.assertEqual(t.get('path').read(), 'content') + t.append_file('path', StringIO('content')) + self.assertEqual(t.get('path').read(), 'contentcontent') + + def test_put_and_get(self): + t = memory.MemoryTransport() + t.put_file('path', StringIO('content')) + self.assertEqual(t.get('path').read(), 'content') + t.put_bytes('path', 'content') + self.assertEqual(t.get('path').read(), 'content') + + def test_append_without_dir_fails(self): + t = memory.MemoryTransport() + self.assertRaises(errors.NoSuchFile, + t.append_bytes, 'dir/path', 'content') + + def test_put_without_dir_fails(self): + t = memory.MemoryTransport() + self.assertRaises(errors.NoSuchFile, + t.put_file, 'dir/path', StringIO('content')) + + def test_get_missing(self): + transport = memory.MemoryTransport() + self.assertRaises(errors.NoSuchFile, transport.get, 'foo') + + def test_has_missing(self): + t = memory.MemoryTransport() + self.assertEquals(False, t.has('foo')) + + def test_has_present(self): + t = memory.MemoryTransport() + t.append_bytes('foo', 'content') + self.assertEquals(True, t.has('foo')) + + def test_list_dir(self): + t = memory.MemoryTransport() + t.put_bytes('foo', 'content') + t.mkdir('dir') + t.put_bytes('dir/subfoo', 'content') + t.put_bytes('dirlike', 'content') + + self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.'))) + self.assertEquals(['subfoo'], sorted(t.list_dir('dir'))) + + def test_mkdir(self): + t = memory.MemoryTransport() + t.mkdir('dir') + t.append_bytes('dir/path', 'content') + self.assertEqual(t.get('dir/path').read(), 'content') + + def test_mkdir_missing_parent(self): + t = memory.MemoryTransport() + self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir') + + def test_mkdir_twice(self): + t = memory.MemoryTransport() + t.mkdir('dir') + self.assertRaises(errors.FileExists, t.mkdir, 'dir') + + def test_parameters(self): + t = memory.MemoryTransport() + self.assertEqual(True, t.listable()) + self.assertEqual(False, t.is_readonly()) + + def test_iter_files_recursive(self): + t = memory.MemoryTransport() + t.mkdir('dir') + t.put_bytes('dir/foo', 'content') + t.put_bytes('dir/bar', 'content') + t.put_bytes('bar', 'content') + paths = set(t.iter_files_recursive()) + self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths) + + def test_stat(self): + t = memory.MemoryTransport() + t.put_bytes('foo', 'content') + t.put_bytes('bar', 'phowar') + self.assertEqual(7, t.stat('foo').st_size) + self.assertEqual(6, t.stat('bar').st_size) + + +class ChrootDecoratorTransportTest(tests.TestCase): + """Chroot decoration specific tests.""" + + def test_abspath(self): + # The abspath is always relative to the chroot_url. + server = chroot.ChrootServer( + transport.get_transport_from_url('memory:///foo/bar/')) + self.start_server(server) + t = transport.get_transport_from_url(server.get_url()) + self.assertEqual(server.get_url(), t.abspath('/')) + + subdir_t = t.clone('subdir') + self.assertEqual(server.get_url(), subdir_t.abspath('/')) + + def test_clone(self): + server = chroot.ChrootServer( + transport.get_transport_from_url('memory:///foo/bar/')) + self.start_server(server) + t = transport.get_transport_from_url(server.get_url()) + # relpath from root and root path are the same + relpath_cloned = t.clone('foo') + abspath_cloned = t.clone('/foo') + self.assertEqual(server, relpath_cloned.server) + self.assertEqual(server, abspath_cloned.server) + + def test_chroot_url_preserves_chroot(self): + """Calling get_transport on a chroot transport's base should produce a + transport with exactly the same behaviour as the original chroot + transport. + + This is so that it is not possible to escape a chroot by doing:: + url = chroot_transport.base + parent_url = urlutils.join(url, '..') + new_t = transport.get_transport_from_url(parent_url) + """ + server = chroot.ChrootServer( + transport.get_transport_from_url('memory:///path/subpath')) + self.start_server(server) + t = transport.get_transport_from_url(server.get_url()) + new_t = transport.get_transport_from_url(t.base) + self.assertEqual(t.server, new_t.server) + self.assertEqual(t.base, new_t.base) + + def test_urljoin_preserves_chroot(self): + """Using urlutils.join(url, '..') on a chroot URL should not produce a + URL that escapes the intended chroot. + + This is so that it is not possible to escape a chroot by doing:: + url = chroot_transport.base + parent_url = urlutils.join(url, '..') + new_t = transport.get_transport_from_url(parent_url) + """ + server = chroot.ChrootServer( + transport.get_transport_from_url('memory:///path/')) + self.start_server(server) + t = transport.get_transport_from_url(server.get_url()) + self.assertRaises( + errors.InvalidURLJoin, urlutils.join, t.base, '..') + + +class TestChrootServer(tests.TestCase): + + def test_construct(self): + backing_transport = memory.MemoryTransport() + server = chroot.ChrootServer(backing_transport) + self.assertEqual(backing_transport, server.backing_transport) + + def test_setUp(self): + backing_transport = memory.MemoryTransport() + server = chroot.ChrootServer(backing_transport) + server.start_server() + self.addCleanup(server.stop_server) + self.assertTrue(server.scheme + in transport._get_protocol_handlers().keys()) + + def test_stop_server(self): + backing_transport = memory.MemoryTransport() + server = chroot.ChrootServer(backing_transport) + server.start_server() + server.stop_server() + self.assertFalse(server.scheme + in transport._get_protocol_handlers().keys()) + + def test_get_url(self): + backing_transport = memory.MemoryTransport() + server = chroot.ChrootServer(backing_transport) + server.start_server() + self.addCleanup(server.stop_server) + self.assertEqual('chroot-%d:///' % id(server), server.get_url()) + + +class TestHooks(tests.TestCase): + """Basic tests for transport hooks""" + + def _get_connected_transport(self): + return transport.ConnectedTransport("bogus:nowhere") + + def test_transporthooks_initialisation(self): + """Check all expected transport hook points are set up""" + hookpoint = transport.TransportHooks() + self.assertTrue("post_connect" in hookpoint, + "post_connect not in %s" % (hookpoint,)) + + def test_post_connect(self): + """Ensure the post_connect hook is called when _set_transport is""" + calls = [] + transport.Transport.hooks.install_named_hook("post_connect", + calls.append, None) + t = self._get_connected_transport() + self.assertLength(0, calls) + t._set_connection("connection", "auth") + self.assertEqual(calls, [t]) + + +class PathFilteringDecoratorTransportTest(tests.TestCase): + """Pathfilter decoration specific tests.""" + + def test_abspath(self): + # The abspath is always relative to the base of the backing transport. + server = pathfilter.PathFilteringServer( + transport.get_transport_from_url('memory:///foo/bar/'), + lambda x: x) + server.start_server() + t = transport.get_transport_from_url(server.get_url()) + self.assertEqual(server.get_url(), t.abspath('/')) + + subdir_t = t.clone('subdir') + self.assertEqual(server.get_url(), subdir_t.abspath('/')) + server.stop_server() + + def make_pf_transport(self, filter_func=None): + """Make a PathFilteringTransport backed by a MemoryTransport. + + :param filter_func: by default this will be a no-op function. Use this + parameter to override it.""" + if filter_func is None: + filter_func = lambda x: x + server = pathfilter.PathFilteringServer( + transport.get_transport_from_url('memory:///foo/bar/'), filter_func) + server.start_server() + self.addCleanup(server.stop_server) + return transport.get_transport_from_url(server.get_url()) + + def test__filter(self): + # _filter (with an identity func as filter_func) always returns + # paths relative to the base of the backing transport. + t = self.make_pf_transport() + self.assertEqual('foo', t._filter('foo')) + self.assertEqual('foo/bar', t._filter('foo/bar')) + self.assertEqual('', t._filter('..')) + self.assertEqual('', t._filter('/')) + # The base of the pathfiltering transport is taken into account too. + t = t.clone('subdir1/subdir2') + self.assertEqual('subdir1/subdir2/foo', t._filter('foo')) + self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar')) + self.assertEqual('subdir1', t._filter('..')) + self.assertEqual('', t._filter('/')) + + def test_filter_invocation(self): + filter_log = [] + + def filter(path): + filter_log.append(path) + return path + t = self.make_pf_transport(filter) + t.has('abc') + self.assertEqual(['abc'], filter_log) + del filter_log[:] + t.clone('abc').has('xyz') + self.assertEqual(['abc/xyz'], filter_log) + del filter_log[:] + t.has('/abc') + self.assertEqual(['abc'], filter_log) + + def test_clone(self): + t = self.make_pf_transport() + # relpath from root and root path are the same + relpath_cloned = t.clone('foo') + abspath_cloned = t.clone('/foo') + self.assertEqual(t.server, relpath_cloned.server) + self.assertEqual(t.server, abspath_cloned.server) + + def test_url_preserves_pathfiltering(self): + """Calling get_transport on a pathfiltered transport's base should + produce a transport with exactly the same behaviour as the original + pathfiltered transport. + + This is so that it is not possible to escape (accidentally or + otherwise) the filtering by doing:: + url = filtered_transport.base + parent_url = urlutils.join(url, '..') + new_t = transport.get_transport_from_url(parent_url) + """ + t = self.make_pf_transport() + new_t = transport.get_transport_from_url(t.base) + self.assertEqual(t.server, new_t.server) + self.assertEqual(t.base, new_t.base) + + +class ReadonlyDecoratorTransportTest(tests.TestCase): + """Readonly decoration specific tests.""" + + def test_local_parameters(self): + # connect to . in readonly mode + t = readonly.ReadonlyTransportDecorator('readonly+.') + self.assertEqual(True, t.listable()) + self.assertEqual(True, t.is_readonly()) + + def test_http_parameters(self): + from bzrlib.tests.http_server import HttpServer + # connect to '.' via http which is not listable + server = HttpServer() + self.start_server(server) + t = transport.get_transport_from_url('readonly+' + server.get_url()) + self.assertIsInstance(t, readonly.ReadonlyTransportDecorator) + self.assertEqual(False, t.listable()) + self.assertEqual(True, t.is_readonly()) + + +class FakeNFSDecoratorTests(tests.TestCaseInTempDir): + """NFS decorator specific tests.""" + + def get_nfs_transport(self, url): + # connect to url with nfs decoration + return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url) + + def test_local_parameters(self): + # the listable and is_readonly parameters + # are not changed by the fakenfs decorator + t = self.get_nfs_transport('.') + self.assertEqual(True, t.listable()) + self.assertEqual(False, t.is_readonly()) + + def test_http_parameters(self): + # the listable and is_readonly parameters + # are not changed by the fakenfs decorator + from bzrlib.tests.http_server import HttpServer + # connect to '.' via http which is not listable + server = HttpServer() + self.start_server(server) + t = self.get_nfs_transport(server.get_url()) + self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator) + self.assertEqual(False, t.listable()) + self.assertEqual(True, t.is_readonly()) + + def test_fakenfs_server_default(self): + # a FakeNFSServer() should bring up a local relpath server for itself + server = test_server.FakeNFSServer() + self.start_server(server) + # the url should be decorated appropriately + self.assertStartsWith(server.get_url(), 'fakenfs+') + # and we should be able to get a transport for it + t = transport.get_transport_from_url(server.get_url()) + # which must be a FakeNFSTransportDecorator instance. + self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator) + + def test_fakenfs_rename_semantics(self): + # a FakeNFS transport must mangle the way rename errors occur to + # look like NFS problems. + t = self.get_nfs_transport('.') + self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'], + transport=t) + self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to') + + +class FakeVFATDecoratorTests(tests.TestCaseInTempDir): + """Tests for simulation of VFAT restrictions""" + + def get_vfat_transport(self, url): + """Return vfat-backed transport for test directory""" + from bzrlib.transport.fakevfat import FakeVFATTransportDecorator + return FakeVFATTransportDecorator('vfat+' + url) + + def test_transport_creation(self): + from bzrlib.transport.fakevfat import FakeVFATTransportDecorator + t = self.get_vfat_transport('.') + self.assertIsInstance(t, FakeVFATTransportDecorator) + + def test_transport_mkdir(self): + t = self.get_vfat_transport('.') + t.mkdir('HELLO') + self.assertTrue(t.has('hello')) + self.assertTrue(t.has('Hello')) + + def test_forbidden_chars(self): + t = self.get_vfat_transport('.') + self.assertRaises(ValueError, t.has, "<NU>") + + +class BadTransportHandler(transport.Transport): + def __init__(self, base_url): + raise errors.DependencyNotPresent('some_lib', + 'testing missing dependency') + + +class BackupTransportHandler(transport.Transport): + """Test transport that works as a backup for the BadTransportHandler""" + pass + + +class TestTransportImplementation(tests.TestCaseInTempDir): + """Implementation verification for transports. + + To verify a transport we need a server factory, which is a callable + that accepts no parameters and returns an implementation of + bzrlib.transport.Server. + + That Server is then used to construct transport instances and test + the transport via loopback activity. + + Currently this assumes that the Transport object is connected to the + current working directory. So that whatever is done + through the transport, should show up in the working + directory, and vice-versa. This is a bug, because its possible to have + URL schemes which provide access to something that may not be + result in storage on the local disk, i.e. due to file system limits, or + due to it being a database or some other non-filesystem tool. + + This also tests to make sure that the functions work with both + generators and lists (assuming iter(list) is effectively a generator) + """ + + def setUp(self): + super(TestTransportImplementation, self).setUp() + self._server = self.transport_server() + self.start_server(self._server) + + def get_transport(self, relpath=None): + """Return a connected transport to the local directory. + + :param relpath: a path relative to the base url. + """ + base_url = self._server.get_url() + url = self._adjust_url(base_url, relpath) + # try getting the transport via the regular interface: + t = transport.get_transport_from_url(url) + # vila--20070607 if the following are commented out the test suite + # still pass. Is this really still needed or was it a forgotten + # temporary fix ? + if not isinstance(t, self.transport_class): + # we did not get the correct transport class type. Override the + # regular connection behaviour by direct construction. + t = self.transport_class(url) + return t + + +class TestTransportFromPath(tests.TestCaseInTempDir): + + def test_with_path(self): + t = transport.get_transport_from_path(self.test_dir) + self.assertIsInstance(t, local.LocalTransport) + self.assertEquals(t.base.rstrip("/"), + urlutils.local_path_to_url(self.test_dir)) + + def test_with_url(self): + t = transport.get_transport_from_path("file:") + self.assertIsInstance(t, local.LocalTransport) + self.assertEquals(t.base.rstrip("/"), + urlutils.local_path_to_url(os.path.join(self.test_dir, "file:"))) + + +class TestTransportFromUrl(tests.TestCaseInTempDir): + + def test_with_path(self): + self.assertRaises(errors.InvalidURL, transport.get_transport_from_url, + self.test_dir) + + def test_with_url(self): + url = urlutils.local_path_to_url(self.test_dir) + t = transport.get_transport_from_url(url) + self.assertIsInstance(t, local.LocalTransport) + self.assertEquals(t.base.rstrip("/"), url) + + def test_with_url_and_segment_parameters(self): + url = urlutils.local_path_to_url(self.test_dir)+",branch=foo" + t = transport.get_transport_from_url(url) + self.assertIsInstance(t, local.LocalTransport) + self.assertEquals(t.base.rstrip("/"), url) + with open(os.path.join(self.test_dir, "afile"), 'w') as f: + f.write("data") + self.assertTrue(t.has("afile")) + + +class TestLocalTransports(tests.TestCase): + + def test_get_transport_from_abspath(self): + here = osutils.abspath('.') + t = transport.get_transport(here) + self.assertIsInstance(t, local.LocalTransport) + self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/') + + def test_get_transport_from_relpath(self): + here = osutils.abspath('.') + t = transport.get_transport('.') + self.assertIsInstance(t, local.LocalTransport) + self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/') + + def test_get_transport_from_local_url(self): + here = osutils.abspath('.') + here_url = urlutils.local_path_to_url(here) + '/' + t = transport.get_transport(here_url) + self.assertIsInstance(t, local.LocalTransport) + self.assertEquals(t.base, here_url) + + def test_local_abspath(self): + here = osutils.abspath('.') + t = transport.get_transport(here) + self.assertEquals(t.local_abspath(''), here) + + +class TestLocalTransportMutation(tests.TestCaseInTempDir): + + def test_local_transport_mkdir(self): + here = osutils.abspath('.') + t = transport.get_transport(here) + t.mkdir('test') + self.assertTrue(os.path.exists('test')) + + def test_local_transport_mkdir_permission_denied(self): + # See https://bugs.launchpad.net/bzr/+bug/606537 + here = osutils.abspath('.') + t = transport.get_transport(here) + def fake_chmod(path, mode): + e = OSError('permission denied') + e.errno = errno.EPERM + raise e + self.overrideAttr(os, 'chmod', fake_chmod) + t.mkdir('test') + t.mkdir('test2', mode=0707) + self.assertTrue(os.path.exists('test')) + self.assertTrue(os.path.exists('test2')) + + +class TestLocalTransportWriteStream(tests.TestCaseWithTransport): + + def test_local_fdatasync_calls_fdatasync(self): + """Check fdatasync on a stream tries to flush the data to the OS. + + We can't easily observe the external effect but we can at least see + it's called. + """ + sentinel = object() + fdatasync = getattr(os, 'fdatasync', sentinel) + if fdatasync is sentinel: + raise tests.TestNotApplicable('fdatasync not supported') + t = self.get_transport('.') + calls = self.recordCalls(os, 'fdatasync') + w = t.open_write_stream('out') + w.write('foo') + w.fdatasync() + with open('out', 'rb') as f: + # Should have been flushed. + self.assertEquals(f.read(), 'foo') + self.assertEquals(len(calls), 1, calls) + + def test_missing_directory(self): + t = self.get_transport('.') + self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo') + + +class TestWin32LocalTransport(tests.TestCase): + + def test_unc_clone_to_root(self): + # Win32 UNC path like \\HOST\path + # clone to root should stop at least at \\HOST part + # not on \\ + t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/') + for i in xrange(4): + t = t.clone('..') + self.assertEquals(t.base, 'file://HOST/') + # make sure we reach the root + t = t.clone('..') + self.assertEquals(t.base, 'file://HOST/') + + +class TestConnectedTransport(tests.TestCase): + """Tests for connected to remote server transports""" + + def test_parse_url(self): + t = transport.ConnectedTransport( + 'http://simple.example.com/home/source') + self.assertEquals(t._parsed_url.host, 'simple.example.com') + self.assertEquals(t._parsed_url.port, None) + self.assertEquals(t._parsed_url.path, '/home/source/') + self.assertTrue(t._parsed_url.user is None) + self.assertTrue(t._parsed_url.password is None) + + self.assertEquals(t.base, 'http://simple.example.com/home/source/') + + def test_parse_url_with_at_in_user(self): + # Bug 228058 + t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/') + self.assertEquals(t._parsed_url.user, 'user@host.com') + + def test_parse_quoted_url(self): + t = transport.ConnectedTransport( + 'http://ro%62ey:h%40t@ex%41mple.com:2222/path') + self.assertEquals(t._parsed_url.host, 'exAmple.com') + self.assertEquals(t._parsed_url.port, 2222) + self.assertEquals(t._parsed_url.user, 'robey') + self.assertEquals(t._parsed_url.password, 'h@t') + self.assertEquals(t._parsed_url.path, '/path/') + + # Base should not keep track of the password + self.assertEquals(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/') + + def test_parse_invalid_url(self): + self.assertRaises(errors.InvalidURL, + transport.ConnectedTransport, + 'sftp://lily.org:~janneke/public/bzr/gub') + + def test_relpath(self): + t = transport.ConnectedTransport('sftp://user@host.com/abs/path') + + self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), + 'sub') + self.assertRaises(errors.PathNotChild, t.relpath, + 'http://user@host.com/abs/path/sub') + self.assertRaises(errors.PathNotChild, t.relpath, + 'sftp://user2@host.com/abs/path/sub') + self.assertRaises(errors.PathNotChild, t.relpath, + 'sftp://user@otherhost.com/abs/path/sub') + self.assertRaises(errors.PathNotChild, t.relpath, + 'sftp://user@host.com:33/abs/path/sub') + # Make sure it works when we don't supply a username + t = transport.ConnectedTransport('sftp://host.com/abs/path') + self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub') + + # Make sure it works when parts of the path will be url encoded + t = transport.ConnectedTransport('sftp://host.com/dev/%path') + self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub') + + def test_connection_sharing_propagate_credentials(self): + t = transport.ConnectedTransport('ftp://user@host.com/abs/path') + self.assertEquals('user', t._parsed_url.user) + self.assertEquals('host.com', t._parsed_url.host) + self.assertIs(None, t._get_connection()) + self.assertIs(None, t._parsed_url.password) + c = t.clone('subdir') + self.assertIs(None, c._get_connection()) + self.assertIs(None, t._parsed_url.password) + + # Simulate the user entering a password + password = 'secret' + connection = object() + t._set_connection(connection, password) + self.assertIs(connection, t._get_connection()) + self.assertIs(password, t._get_credentials()) + self.assertIs(connection, c._get_connection()) + self.assertIs(password, c._get_credentials()) + + # credentials can be updated + new_password = 'even more secret' + c._update_credentials(new_password) + self.assertIs(connection, t._get_connection()) + self.assertIs(new_password, t._get_credentials()) + self.assertIs(connection, c._get_connection()) + self.assertIs(new_password, c._get_credentials()) + + +class TestReusedTransports(tests.TestCase): + """Tests for transport reuse""" + + def test_reuse_same_transport(self): + possible_transports = [] + t1 = transport.get_transport_from_url('http://foo/', + possible_transports=possible_transports) + self.assertEqual([t1], possible_transports) + t2 = transport.get_transport_from_url('http://foo/', + possible_transports=[t1]) + self.assertIs(t1, t2) + + # Also check that final '/' are handled correctly + t3 = transport.get_transport_from_url('http://foo/path/') + t4 = transport.get_transport_from_url('http://foo/path', + possible_transports=[t3]) + self.assertIs(t3, t4) + + t5 = transport.get_transport_from_url('http://foo/path') + t6 = transport.get_transport_from_url('http://foo/path/', + possible_transports=[t5]) + self.assertIs(t5, t6) + + def test_don_t_reuse_different_transport(self): + t1 = transport.get_transport_from_url('http://foo/path') + t2 = transport.get_transport_from_url('http://bar/path', + possible_transports=[t1]) + self.assertIsNot(t1, t2) + + +class TestTransportTrace(tests.TestCase): + + def test_decorator(self): + t = transport.get_transport_from_url('trace+memory://') + self.assertIsInstance( + t, bzrlib.transport.trace.TransportTraceDecorator) + + def test_clone_preserves_activity(self): + t = transport.get_transport_from_url('trace+memory://') + t2 = t.clone('.') + self.assertTrue(t is not t2) + self.assertTrue(t._activity is t2._activity) + + # the following specific tests are for the operations that have made use of + # logging in tests; we could test every single operation but doing that + # still won't cause a test failure when the top level Transport API + # changes; so there is little return doing that. + def test_get(self): + t = transport.get_transport_from_url('trace+memory:///') + t.put_bytes('foo', 'barish') + t.get('foo') + expected_result = [] + # put_bytes records the bytes, not the content to avoid memory + # pressure. + expected_result.append(('put_bytes', 'foo', 6, None)) + # get records the file name only. + expected_result.append(('get', 'foo')) + self.assertEqual(expected_result, t._activity) + + def test_readv(self): + t = transport.get_transport_from_url('trace+memory:///') + t.put_bytes('foo', 'barish') + list(t.readv('foo', [(0, 1), (3, 2)], + adjust_for_latency=True, upper_limit=6)) + expected_result = [] + # put_bytes records the bytes, not the content to avoid memory + # pressure. + expected_result.append(('put_bytes', 'foo', 6, None)) + # readv records the supplied offset request + expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6)) + self.assertEqual(expected_result, t._activity) + + +class TestSSHConnections(tests.TestCaseWithTransport): + + def test_bzr_connect_to_bzr_ssh(self): + """get_transport of a bzr+ssh:// behaves correctly. + + bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH. + """ + # This test actually causes a bzr instance to be invoked, which is very + # expensive: it should be the only such test in the test suite. + # A reasonable evolution for this would be to simply check inside + # check_channel_exec_request that the command is appropriate, and then + # satisfy requests in-process. + self.requireFeature(features.paramiko) + # SFTPFullAbsoluteServer has a get_url method, and doesn't + # override the interface (doesn't change self._vendor). + # Note that this does encryption, so can be slow. + from bzrlib.tests import stub_sftp + + # Start an SSH server + self.command_executed = [] + # XXX: This is horrible -- we define a really dumb SSH server that + # executes commands, and manage the hooking up of stdin/out/err to the + # SSH channel ourselves. Surely this has already been implemented + # elsewhere? + started = [] + + class StubSSHServer(stub_sftp.StubServer): + + test = self + + def check_channel_exec_request(self, channel, command): + self.test.command_executed.append(command) + proc = subprocess.Popen( + command, shell=True, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + # XXX: horribly inefficient, not to mention ugly. + # Start a thread for each of stdin/out/err, and relay bytes + # from the subprocess to channel and vice versa. + def ferry_bytes(read, write, close): + while True: + bytes = read(1) + if bytes == '': + close() + break + write(bytes) + + file_functions = [ + (channel.recv, proc.stdin.write, proc.stdin.close), + (proc.stdout.read, channel.sendall, channel.close), + (proc.stderr.read, channel.sendall_stderr, channel.close)] + started.append(proc) + for read, write, close in file_functions: + t = threading.Thread( + target=ferry_bytes, args=(read, write, close)) + t.start() + started.append(t) + + return True + + ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer) + # We *don't* want to override the default SSH vendor: the detected one + # is the one to use. + + # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer + # inherits from SFTPServer which forces the SSH vendor to + # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623 + self.start_server(ssh_server) + port = ssh_server.port + + if sys.platform == 'win32': + bzr_remote_path = sys.executable + ' ' + self.get_bzr_path() + else: + bzr_remote_path = self.get_bzr_path() + self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path) + + # Access the branch via a bzr+ssh URL. The BZR_REMOTE_PATH environment + # variable is used to tell bzr what command to run on the remote end. + path_to_branch = osutils.abspath('.') + if sys.platform == 'win32': + # On Windows, we export all drives as '/C:/, etc. So we need to + # prefix a '/' to get the right path. + path_to_branch = '/' + path_to_branch + url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch) + t = transport.get_transport(url) + self.permit_url(t.base) + t.mkdir('foo') + + self.assertEqual( + ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path], + self.command_executed) + # Make sure to disconnect, so that the remote process can stop, and we + # can cleanup. Then pause the test until everything is shutdown + t._client._medium.disconnect() + if not started: + return + # First wait for the subprocess + started[0].wait() + # And the rest are threads + for t in started[1:]: + t.join() + + +class TestUnhtml(tests.TestCase): + + """Tests for unhtml_roughly""" + + def test_truncation(self): + fake_html = "<p>something!\n" * 1000 + result = http.unhtml_roughly(fake_html) + self.assertEquals(len(result), 1000) + self.assertStartsWith(result, " something!") + + +class SomeDirectory(object): + + def look_up(self, name, url): + return "http://bar" + + +class TestLocationToUrl(tests.TestCase): + + def get_base_location(self): + path = osutils.abspath('/foo/bar') + if path.startswith('/'): + url = 'file://%s' % (path,) + else: + # On Windows, abspaths start with the drive letter, so we have to + # add in the extra '/' + url = 'file:///%s' % (path,) + return path, url + + def test_regular_url(self): + self.assertEquals("file://foo", location_to_url("file://foo")) + + def test_directory(self): + directories.register("bar:", SomeDirectory, "Dummy directory") + self.addCleanup(directories.remove, "bar:") + self.assertEquals("http://bar", location_to_url("bar:")) + + def test_unicode_url(self): + self.assertRaises(errors.InvalidURL, location_to_url, + "http://fo/\xc3\xaf".decode("utf-8")) + + def test_unicode_path(self): + path, url = self.get_base_location() + location = path + "\xc3\xaf".decode("utf-8") + url += '%C3%AF' + self.assertEquals(url, location_to_url(location)) + + def test_path(self): + path, url = self.get_base_location() + self.assertEquals(url, location_to_url(path)) + + def test_relative_file_url(self): + self.assertEquals(urlutils.local_path_to_url(".") + "/bar", + location_to_url("file:bar")) + + def test_absolute_file_url(self): + self.assertEquals("file:///bar", location_to_url("file:/bar")) |