diff options
Diffstat (limited to 'bzrlib/tests/test_sftp_transport.py')
-rw-r--r-- | bzrlib/tests/test_sftp_transport.py | 499 |
1 files changed, 499 insertions, 0 deletions
diff --git a/bzrlib/tests/test_sftp_transport.py b/bzrlib/tests/test_sftp_transport.py new file mode 100644 index 0000000..1c4d04b --- /dev/null +++ b/bzrlib/tests/test_sftp_transport.py @@ -0,0 +1,499 @@ +# Copyright (C) 2005-2011 Robey Pointer <robey@lag.net> +# Copyright (C) 2005, 2006, 2007 Canonical Ltd +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +import os +import socket +import sys +import time + +from bzrlib import ( + config, + controldir, + errors, + tests, + transport as _mod_transport, + ui, + ) +from bzrlib.osutils import ( + lexists, + ) +from bzrlib.tests import ( + features, + TestCaseWithTransport, + TestCase, + TestSkipped, + ) +from bzrlib.tests.http_server import HttpServer +import bzrlib.transport.http + +if features.paramiko.available(): + from bzrlib.transport import sftp as _mod_sftp + from bzrlib.tests import stub_sftp + + +def set_test_transport_to_sftp(testcase): + """A helper to set transports on test case instances.""" + if getattr(testcase, '_get_remote_is_absolute', None) is None: + testcase._get_remote_is_absolute = True + if testcase._get_remote_is_absolute: + testcase.transport_server = stub_sftp.SFTPAbsoluteServer + else: + testcase.transport_server = stub_sftp.SFTPHomeDirServer + testcase.transport_readonly_server = HttpServer + + +class TestCaseWithSFTPServer(TestCaseWithTransport): + """A test case base class that provides a sftp server on localhost.""" + + def setUp(self): + super(TestCaseWithSFTPServer, self).setUp() + self.requireFeature(features.paramiko) + set_test_transport_to_sftp(self) + + +class SFTPLockTests(TestCaseWithSFTPServer): + + def test_sftp_locks(self): + from bzrlib.errors import LockError + t = self.get_transport() + + l = t.lock_write('bogus') + self.assertPathExists('bogus.write-lock') + + # Don't wait for the lock, locking an already locked + # file should raise an assert + self.assertRaises(LockError, t.lock_write, 'bogus') + + l.unlock() + self.assertFalse(lexists('bogus.write-lock')) + + with open('something.write-lock', 'wb') as f: f.write('fake lock\n') + self.assertRaises(LockError, t.lock_write, 'something') + os.remove('something.write-lock') + + l = t.lock_write('something') + + l2 = t.lock_write('bogus') + + l.unlock() + l2.unlock() + + +class SFTPTransportTestRelative(TestCaseWithSFTPServer): + """Test the SFTP transport with homedir based relative paths.""" + + def test__remote_path(self): + if sys.platform == 'darwin': + # This test is about sftp absolute path handling. There is already + # (in this test) a TODO about windows needing an absolute path + # without drive letter. To me, using self.test_dir is a trick to + # get an absolute path for comparison purposes. That fails for OSX + # because the sftp server doesn't resolve the links (and it doesn't + # have to). --vila 20070924 + self.knownFailure('Mac OSX symlinks /tmp to /private/tmp,' + ' testing against self.test_dir' + ' is not appropriate') + t = self.get_transport() + # This test require unix-like absolute path + test_dir = self.test_dir + if sys.platform == 'win32': + # using hack suggested by John Meinel. + # TODO: write another mock server for this test + # and use absolute path without drive letter + test_dir = '/' + test_dir + # try what is currently used: + # remote path = self._abspath(relpath) + self.assertIsSameRealPath(test_dir + '/relative', + t._remote_path('relative')) + # we dont os.path.join because windows gives us the wrong path + root_segments = test_dir.split('/') + root_parent = '/'.join(root_segments[:-1]) + # .. should be honoured + self.assertIsSameRealPath(root_parent + '/sibling', + t._remote_path('../sibling')) + # / should be illegal ? + ### FIXME decide and then test for all transports. RBC20051208 + + +class SFTPTransportTestRelativeRoot(TestCaseWithSFTPServer): + """Test the SFTP transport with homedir based relative paths.""" + + def setUp(self): + # Only SFTPHomeDirServer is tested here + self._get_remote_is_absolute = False + super(SFTPTransportTestRelativeRoot, self).setUp() + + def test__remote_path_relative_root(self): + # relative paths are preserved + t = self.get_transport('') + self.assertEqual('/~/', t._parsed_url.path) + # the remote path should be relative to home dir + # (i.e. not begining with a '/') + self.assertEqual('a', t._remote_path('a')) + + +class SFTPNonServerTest(TestCase): + def setUp(self): + TestCase.setUp(self) + self.requireFeature(features.paramiko) + + def test_parse_url_with_home_dir(self): + s = _mod_sftp.SFTPTransport( + 'sftp://ro%62ey:h%40t@example.com:2222/~/relative') + self.assertEquals(s._parsed_url.host, 'example.com') + self.assertEquals(s._parsed_url.port, 2222) + self.assertEquals(s._parsed_url.user, 'robey') + self.assertEquals(s._parsed_url.password, 'h@t') + self.assertEquals(s._parsed_url.path, '/~/relative/') + + def test_relpath(self): + s = _mod_sftp.SFTPTransport('sftp://user@host.com/abs/path') + self.assertRaises(errors.PathNotChild, s.relpath, + 'sftp://user@host.com/~/rel/path/sub') + + def test_get_paramiko_vendor(self): + """Test that if no 'ssh' is available we get builtin paramiko""" + from bzrlib.transport import ssh + # set '.' as the only location in the path, forcing no 'ssh' to exist + self.overrideAttr(ssh, '_ssh_vendor_manager') + self.overrideEnv('PATH', '.') + ssh._ssh_vendor_manager.clear_cache() + vendor = ssh._get_ssh_vendor() + self.assertIsInstance(vendor, ssh.ParamikoVendor) + + def test_abspath_root_sibling_server(self): + server = stub_sftp.SFTPSiblingAbsoluteServer() + server.start_server() + self.addCleanup(server.stop_server) + + transport = _mod_transport.get_transport_from_url(server.get_url()) + self.assertFalse(transport.abspath('/').endswith('/~/')) + self.assertTrue(transport.abspath('/').endswith('/')) + del transport + + +class SFTPBranchTest(TestCaseWithSFTPServer): + """Test some stuff when accessing a bzr Branch over sftp""" + + def test_push_support(self): + self.build_tree(['a/', 'a/foo']) + t = controldir.ControlDir.create_standalone_workingtree('a') + b = t.branch + t.add('foo') + t.commit('foo', rev_id='a1') + + b2 = controldir.ControlDir.create_branch_and_repo(self.get_url('/b')) + b2.pull(b) + + self.assertEquals(b2.last_revision(), 'a1') + + with open('a/foo', 'wt') as f: f.write('something new in foo\n') + t.commit('new', rev_id='a2') + b2.pull(b) + + self.assertEquals(b2.last_revision(), 'a2') + + +class SSHVendorConnection(TestCaseWithSFTPServer): + """Test that the ssh vendors can all connect. + + Verify that a full-handshake (SSH over loopback TCP) sftp connection works. + + We have 3 sftp implementations in the test suite: + 'loopback': Doesn't use ssh, just uses a local socket. Most tests are + done this way to save the handshaking time, so it is not + tested again here + 'none': This uses paramiko's built-in ssh client and server, and layers + sftp on top of it. + None: If 'ssh' exists on the machine, then it will be spawned as a + child process. + """ + + def setUp(self): + super(SSHVendorConnection, self).setUp() + + def create_server(): + """Just a wrapper so that when created, it will set _vendor""" + # SFTPFullAbsoluteServer can handle any vendor, + # it just needs to be set between the time it is instantiated + # and the time .setUp() is called + server = stub_sftp.SFTPFullAbsoluteServer() + server._vendor = self._test_vendor + return server + self._test_vendor = 'loopback' + self.vfs_transport_server = create_server + f = open('a_file', 'wb') + try: + f.write('foobar\n') + finally: + f.close() + + def set_vendor(self, vendor): + self._test_vendor = vendor + + def test_connection_paramiko(self): + from bzrlib.transport import ssh + self.set_vendor(ssh.ParamikoVendor()) + t = self.get_transport() + self.assertEqual('foobar\n', t.get('a_file').read()) + + def test_connection_vendor(self): + raise TestSkipped("We don't test spawning real ssh," + " because it prompts for a password." + " Enable this test if we figure out" + " how to prevent this.") + self.set_vendor(None) + t = self.get_transport() + self.assertEqual('foobar\n', t.get('a_file').read()) + + +class SSHVendorBadConnection(TestCaseWithTransport): + """Test that the ssh vendors handle bad connection properly + + We don't subclass TestCaseWithSFTPServer, because we don't actually + need an SFTP connection. + """ + + def setUp(self): + self.requireFeature(features.paramiko) + super(SSHVendorBadConnection, self).setUp() + + # open a random port, so we know nobody else is using it + # but don't actually listen on the port. + s = socket.socket() + s.bind(('localhost', 0)) + self.addCleanup(s.close) + self.bogus_url = 'sftp://%s:%s/' % s.getsockname() + + def set_vendor(self, vendor, subprocess_stderr=None): + from bzrlib.transport import ssh + self.overrideAttr(ssh._ssh_vendor_manager, '_cached_ssh_vendor', vendor) + if subprocess_stderr is not None: + self.overrideAttr(ssh.SubprocessVendor, "_stderr_target", + subprocess_stderr) + + def test_bad_connection_paramiko(self): + """Test that a real connection attempt raises the right error""" + from bzrlib.transport import ssh + self.set_vendor(ssh.ParamikoVendor()) + t = _mod_transport.get_transport_from_url(self.bogus_url) + self.assertRaises(errors.ConnectionError, t.get, 'foobar') + + def test_bad_connection_ssh(self): + """None => auto-detect vendor""" + f = file(os.devnull, "wb") + self.addCleanup(f.close) + self.set_vendor(None, f) + t = _mod_transport.get_transport_from_url(self.bogus_url) + try: + self.assertRaises(errors.ConnectionError, t.get, 'foobar') + except NameError, e: + if "global name 'SSHException'" in str(e): + self.knownFailure('Known NameError bug in paramiko 1.6.1') + raise + + +class SFTPLatencyKnob(TestCaseWithSFTPServer): + """Test that the testing SFTPServer's latency knob works.""" + + def test_latency_knob_slows_transport(self): + # change the latency knob to 500ms. We take about 40ms for a + # loopback connection ordinarily. + start_time = time.time() + self.get_server().add_latency = 0.5 + transport = self.get_transport() + transport.has('not me') # Force connection by issuing a request + with_latency_knob_time = time.time() - start_time + self.assertTrue(with_latency_knob_time > 0.4) + + def test_default(self): + # This test is potentially brittle: under extremely high machine load + # it could fail, but that is quite unlikely + raise TestSkipped('Timing-sensitive test') + start_time = time.time() + transport = self.get_transport() + transport.has('not me') # Force connection by issuing a request + regular_time = time.time() - start_time + self.assertTrue(regular_time < 0.5) + + +class FakeSocket(object): + """Fake socket object used to test the SocketDelay wrapper without + using a real socket. + """ + + def __init__(self): + self._data = "" + + def send(self, data, flags=0): + self._data += data + return len(data) + + def sendall(self, data, flags=0): + self._data += data + return len(data) + + def recv(self, size, flags=0): + if size < len(self._data): + result = self._data[:size] + self._data = self._data[size:] + return result + else: + result = self._data + self._data = "" + return result + + +class TestSocketDelay(TestCase): + + def setUp(self): + TestCase.setUp(self) + self.requireFeature(features.paramiko) + + def test_delay(self): + sending = FakeSocket() + receiving = stub_sftp.SocketDelay(sending, 0.1, bandwidth=1000000, + really_sleep=False) + # check that simulated time is charged only per round-trip: + t1 = stub_sftp.SocketDelay.simulated_time + receiving.send("connect1") + self.assertEqual(sending.recv(1024), "connect1") + t2 = stub_sftp.SocketDelay.simulated_time + self.assertAlmostEqual(t2 - t1, 0.1) + receiving.send("connect2") + self.assertEqual(sending.recv(1024), "connect2") + sending.send("hello") + self.assertEqual(receiving.recv(1024), "hello") + t3 = stub_sftp.SocketDelay.simulated_time + self.assertAlmostEqual(t3 - t2, 0.1) + sending.send("hello") + self.assertEqual(receiving.recv(1024), "hello") + sending.send("hello") + self.assertEqual(receiving.recv(1024), "hello") + sending.send("hello") + self.assertEqual(receiving.recv(1024), "hello") + t4 = stub_sftp.SocketDelay.simulated_time + self.assertAlmostEqual(t4, t3) + + def test_bandwidth(self): + sending = FakeSocket() + receiving = stub_sftp.SocketDelay(sending, 0, bandwidth=8.0/(1024*1024), + really_sleep=False) + # check that simulated time is charged only per round-trip: + t1 = stub_sftp.SocketDelay.simulated_time + receiving.send("connect") + self.assertEqual(sending.recv(1024), "connect") + sending.send("a" * 100) + self.assertEqual(receiving.recv(1024), "a" * 100) + t2 = stub_sftp.SocketDelay.simulated_time + self.assertAlmostEqual(t2 - t1, 100 + 7) + + +class ReadvFile(object): + """An object that acts like Paramiko's SFTPFile when readv() is used""" + + def __init__(self, data): + self._data = data + + def readv(self, requests): + for start, length in requests: + yield self._data[start:start+length] + + def close(self): + pass + + +def _null_report_activity(*a, **k): + pass + + +class Test_SFTPReadvHelper(tests.TestCase): + + def checkGetRequests(self, expected_requests, offsets): + self.requireFeature(features.paramiko) + helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test', + _null_report_activity) + self.assertEqual(expected_requests, helper._get_requests()) + + def test__get_requests(self): + # Small single requests become a single readv request + self.checkGetRequests([(0, 100)], + [(0, 20), (30, 50), (20, 10), (80, 20)]) + # Non-contiguous ranges are given as multiple requests + self.checkGetRequests([(0, 20), (30, 50)], + [(10, 10), (30, 20), (0, 10), (50, 30)]) + # Ranges larger than _max_request_size (32kB) are broken up into + # multiple requests, even if it actually spans multiple logical + # requests + self.checkGetRequests([(0, 32768), (32768, 32768), (65536, 464)], + [(0, 40000), (40000, 100), (40100, 1900), + (42000, 24000)]) + + def checkRequestAndYield(self, expected, data, offsets): + self.requireFeature(features.paramiko) + helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test', + _null_report_activity) + data_f = ReadvFile(data) + result = list(helper.request_and_yield_offsets(data_f)) + self.assertEqual(expected, result) + + def test_request_and_yield_offsets(self): + data = 'abcdefghijklmnopqrstuvwxyz' + self.checkRequestAndYield([(0, 'a'), (5, 'f'), (10, 'klm')], data, + [(0, 1), (5, 1), (10, 3)]) + # Should combine requests, and split them again + self.checkRequestAndYield([(0, 'a'), (1, 'b'), (10, 'klm')], data, + [(0, 1), (1, 1), (10, 3)]) + # Out of order requests. The requests should get combined, but then be + # yielded out-of-order. We also need one that is at the end of a + # previous range. See bug #293746 + self.checkRequestAndYield([(0, 'a'), (10, 'k'), (4, 'efg'), (1, 'bcd')], + data, [(0, 1), (10, 1), (4, 3), (1, 3)]) + + +class TestUsesAuthConfig(TestCaseWithSFTPServer): + """Test that AuthenticationConfig can supply default usernames.""" + + def get_transport_for_connection(self, set_config): + port = self.get_server().port + if set_config: + conf = config.AuthenticationConfig() + conf._get_config().update( + {'sftptest': {'scheme': 'ssh', 'port': port, 'user': 'bar'}}) + conf._save() + t = _mod_transport.get_transport_from_url( + 'sftp://localhost:%d' % port) + # force a connection to be performed. + t.has('foo') + return t + + def test_sftp_uses_config(self): + t = self.get_transport_for_connection(set_config=True) + self.assertEqual('bar', t._get_credentials()[0]) + + def test_sftp_is_none_if_no_config(self): + t = self.get_transport_for_connection(set_config=False) + self.assertIs(None, t._get_credentials()[0]) + + def test_sftp_doesnt_prompt_username(self): + stdout = tests.StringIOWrapper() + ui.ui_factory = tests.TestUIFactory(stdin='joe\nfoo\n', stdout=stdout) + t = self.get_transport_for_connection(set_config=False) + self.assertIs(None, t._get_credentials()[0]) + # No prompts should've been printed, stdin shouldn't have been read + self.assertEquals("", stdout.getvalue()) + self.assertEquals(0, ui.ui_factory.stdin.tell()) |