summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2017-10-23 16:05:30 -0700
committerJeff Forcier <jeff@bitprophet.org>2018-09-17 14:44:07 -0700
commit8868305a76e84e2ee00e35b5131b7b98e45f92a1 (patch)
tree9caa303d099bef32f7a7e9dbdf6e4688f34e3ecb
parent903f21d99d9e47dfc519c1521f8dbb8b4ff5724a (diff)
downloadparamiko-8868305a76e84e2ee00e35b5131b7b98e45f92a1.tar.gz
Start overhauling regular SFTP suite.
Includes attempt to split out the longer-lived server component from the client component, in fixtures Also starts tweaking early tests so they're less bad. E.g. test_2_close had its docstring actively disagreeing with its code (which was super confusing since both were committed at the same time in 2005...)
-rw-r--r--tests/conftest.py37
-rwxr-xr-xtests/test_sftp.py479
-rw-r--r--tests/test_sftp_big.py83
3 files changed, 288 insertions, 311 deletions
diff --git a/tests/conftest.py b/tests/conftest.py
index 7d85fa99..ea10d612 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -39,13 +39,15 @@ def make_sftp_folder(client):
pass
-# TODO: apply at module or session level
-# TODO: roll in SFTP folder setup and teardown?
-# NOTE: This is defined here for use by both SFTP (normal & 'big') suites.
@pytest.fixture(scope='session')
-def sftp():
+def sftp_server():
"""
- Set up an in-memory SFTP server, returning its corresponding SFTPClient.
+ Set up an in-memory SFTP server thread. Yields the client Transport/socket.
+
+ The resulting client Transport (along with all the server components) will
+ be the same object throughout the test session; the `sftp` fixture then
+ creates new higher level client objects wrapped around the client
+ Transport, as necessary.
"""
# Sockets & transports
socks = LoopSocket()
@@ -56,14 +58,27 @@ def sftp():
# Auth
host_key = RSAKey.from_private_key_file(_support('test_rsa.key'))
ts.add_server_key(host_key)
- # Server & client setup
+ # Server setup
event = threading.Event()
server = StubServer()
ts.set_subsystem_handler('sftp', SFTPServer, StubSFTPServer)
ts.start_server(event, server)
- tc.connect(username='slowdive', password='pygmalion')
+ # Wait (so client has time to connect? Not sure. Old.)
event.wait(1.0)
- client = SFTP.from_transport(tc)
+ # Make & yield connection.
+ tc.connect(username='slowdive', password='pygmalion')
+ yield tc
+ # TODO: any need for shutdown? Why didn't old suite do so? Or was that the
+ # point of the "join all threads from threading module" crap in test.py?
+
+
+@pytest.fixture
+def sftp(sftp_server):
+ """
+ Yield an SFTP client connected to the global in-session SFTP server thread.
+ """
+ # Client setup
+ client = SFTP.from_transport(sftp_server)
# Work in 'remote' folder setup (as it wants to use the client)
# TODO: how cleanest to make this available to tests? Doing it this way is
# marginally less bad than the previous 'global'-using setup, but not by
@@ -72,4 +87,8 @@ def sftp():
# Yield client to caller
yield client
# Clean up
- client.rmdir(client.FOLDER)
+ # TODO: many tests like to close the client; to match old test suite
+ # behavior we'd need to recreate the entire client? Possibly better to just
+ # make the "it runs locally, dumbass" explicit & then just use stdlib to
+ # clean up?
+ #client.rmdir(client.FOLDER)
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index 9154a795..c1d4cfe0 100755
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -84,143 +84,102 @@ decreased compared with chicken.
# Thus, the following 2-bytes sequence is not valid utf8: "invalid continuation byte"
NON_UTF8_DATA = b'\xC3\xC3'
-FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000')
-
-sftp = None
-tc = None
-g_big_file_test = True
unicode_folder = u'\u00fcnic\u00f8de' if PY2 else '\u00fcnic\u00f8de'
utf8_folder = b'/\xc3\xbcnic\xc3\xb8\x64\x65'
-class SFTPTest(unittest.TestCase):
- @staticmethod
- def set_big_file_test(onoff):
- global g_big_file_test
- g_big_file_test = onoff
-
- def setUp(self):
- # TODO: I think this was created to allow working around litter from
- # broken previous test runs, and/or to allow concurrent testing? Either
- # way I hate it and it should die ASAP in favor of pytest fixtures or
- # similar things that do try/finally.
- # TODO: also, for god's sake remove the global in favor of self.
- global FOLDER
- for i in range(1000):
- FOLDER = FOLDER[:-3] + '%03d' % i
- try:
- sftp.mkdir(FOLDER)
- break
- except (IOError, OSError):
- pass
-
- def tearDown(self):
- #sftp.chdir()
- sftp.rmdir(FOLDER)
-
- def test_1_file(self):
+class TestSFTP(object):
+ def test_1_file(self, sftp):
"""
verify that we can create a file.
"""
- f = sftp.open(FOLDER + '/test', 'w')
+ f = sftp.open(sftp.FOLDER + '/test', 'w')
try:
- self.assertEqual(f.stat().st_size, 0)
+ assert f.stat().st_size == 0
finally:
f.close()
- sftp.remove(FOLDER + '/test')
+ sftp.remove(sftp.FOLDER + '/test')
- def test_2_close(self):
+ def test_2_close(self, sftp):
"""
- verify that closing the sftp session doesn't do anything bad, and that
- a new one can be opened.
+ Verify that SFTP session close() causes a socket error on next action.
"""
- global sftp
sftp.close()
- try:
- sftp.open(FOLDER + '/test2', 'w')
- self.fail('expected exception')
- except:
- pass
- sftp = paramiko.SFTP.from_transport(tc)
+ with pytest.raises(socket.error, match='Socket is closed'):
+ sftp.open(sftp.FOLDER + '/test2', 'w')
- def test_2_sftp_can_be_used_as_context_manager(self):
+ def test_2_sftp_can_be_used_as_context_manager(self, sftp):
"""
verify that the sftp session is closed when exiting the context manager
"""
- global sftp
with sftp:
pass
- try:
- sftp.open(FOLDER + '/test2', 'w')
- self.fail('expected exception')
- except (EOFError, socket.error):
- pass
- finally:
- sftp = paramiko.SFTP.from_transport(tc)
+ with pytest.raises(socket.error, match='Socket is closed'):
+ sftp.open(sftp.FOLDER + '/test2', 'w')
- def test_3_write(self):
+ def test_3_write(self, sftp):
"""
verify that a file can be created and written, and the size is correct.
"""
try:
- with sftp.open(FOLDER + '/duck.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/duck.txt', 'w') as f:
f.write(ARTICLE)
- self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483)
+ assert sftp.stat(sftp.FOLDER + '/duck.txt').st_size == 1483
finally:
- sftp.remove(FOLDER + '/duck.txt')
+ sftp.remove(sftp.FOLDER + '/duck.txt')
- def test_3_sftp_file_can_be_used_as_context_manager(self):
+ def test_3_sftp_file_can_be_used_as_context_manager(self, sftp):
"""
verify that an opened file can be used as a context manager
"""
try:
- with sftp.open(FOLDER + '/duck.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/duck.txt', 'w') as f:
f.write(ARTICLE)
- self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483)
+ assert sftp.stat(sftp.FOLDER + '/duck.txt').st_size == 1483
finally:
- sftp.remove(FOLDER + '/duck.txt')
+ sftp.remove(sftp.FOLDER + '/duck.txt')
- def test_4_append(self):
+ def test_4_append(self, sftp):
"""
verify that a file can be opened for append, and tell() still works.
"""
try:
- with sftp.open(FOLDER + '/append.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/append.txt', 'w') as f:
f.write('first line\nsecond line\n')
- self.assertEqual(f.tell(), 23)
+ assert f.tell() == 23
- with sftp.open(FOLDER + '/append.txt', 'a+') as f:
+ with sftp.open(sftp.FOLDER + '/append.txt', 'a+') as f:
f.write('third line!!!\n')
- self.assertEqual(f.tell(), 37)
- self.assertEqual(f.stat().st_size, 37)
+ assert f.tell() == 37
+ assert f.stat().st_size == 37
f.seek(-26, f.SEEK_CUR)
- self.assertEqual(f.readline(), 'second line\n')
+ assert f.readline() == 'second line\n'
finally:
- sftp.remove(FOLDER + '/append.txt')
+ sftp.remove(sftp.FOLDER + '/append.txt')
- def test_5_rename(self):
+ def test_5_rename(self, sftp):
"""
verify that renaming a file works.
"""
try:
- with sftp.open(FOLDER + '/first.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/first.txt', 'w') as f:
f.write('content!\n')
- sftp.rename(FOLDER + '/first.txt', FOLDER + '/second.txt')
+ sftp.rename(sftp.FOLDER + '/first.txt', sftp.FOLDER + '/second.txt')
try:
- sftp.open(FOLDER + '/first.txt', 'r')
+ sftp.open(sftp.FOLDER + '/first.txt', 'r')
self.assertTrue(False, 'no exception on reading nonexistent file')
except IOError:
pass
- with sftp.open(FOLDER + '/second.txt', 'r') as f:
+ with sftp.open(sftp.FOLDER + '/second.txt', 'r') as f:
f.seek(-6, f.SEEK_END)
- self.assertEqual(u(f.read(4)), 'tent')
+ assert u(f.read(4)) == 'tent'
finally:
try:
- sftp.remove(FOLDER + '/first.txt')
+ sftp.remove(sftp.FOLDER + '/first.txt')
except:
pass
try:
- sftp.remove(FOLDER + '/second.txt')
+ sftp.remove(sftp.FOLDER + '/second.txt')
except:
pass
@@ -229,69 +188,69 @@ class SFTPTest(unittest.TestCase):
create a temporary folder, verify that we can create a file in it, then
remove the folder and verify that we can't create a file in it anymore.
"""
- sftp.mkdir(FOLDER + '/subfolder')
- sftp.open(FOLDER + '/subfolder/test', 'w').close()
- sftp.remove(FOLDER + '/subfolder/test')
- sftp.rmdir(FOLDER + '/subfolder')
+ sftp.mkdir(sftp.FOLDER + '/subfolder')
+ sftp.open(sftp.FOLDER + '/subfolder/test', 'w').close()
+ sftp.remove(sftp.FOLDER + '/subfolder/test')
+ sftp.rmdir(sftp.FOLDER + '/subfolder')
try:
- sftp.open(FOLDER + '/subfolder/test')
+ sftp.open(sftp.FOLDER + '/subfolder/test')
# shouldn't be able to create that file
self.assertTrue(False, 'no exception at dummy file creation')
except IOError:
pass
- def test_7_listdir(self):
+ def test_7_listdir(self, sftp):
"""
verify that a folder can be created, a bunch of files can be placed in
it, and those files show up in sftp.listdir.
"""
try:
- sftp.open(FOLDER + '/duck.txt', 'w').close()
- sftp.open(FOLDER + '/fish.txt', 'w').close()
- sftp.open(FOLDER + '/tertiary.py', 'w').close()
+ sftp.open(sftp.FOLDER + '/duck.txt', 'w').close()
+ sftp.open(sftp.FOLDER + '/fish.txt', 'w').close()
+ sftp.open(sftp.FOLDER + '/tertiary.py', 'w').close()
- x = sftp.listdir(FOLDER)
- self.assertEqual(len(x), 3)
+ x = sftp.listdir(sftp.FOLDER)
+ assert len(x) == 3
self.assertTrue('duck.txt' in x)
self.assertTrue('fish.txt' in x)
self.assertTrue('tertiary.py' in x)
self.assertTrue('random' not in x)
finally:
- sftp.remove(FOLDER + '/duck.txt')
- sftp.remove(FOLDER + '/fish.txt')
- sftp.remove(FOLDER + '/tertiary.py')
+ sftp.remove(sftp.FOLDER + '/duck.txt')
+ sftp.remove(sftp.FOLDER + '/fish.txt')
+ sftp.remove(sftp.FOLDER + '/tertiary.py')
- def test_7_5_listdir_iter(self):
+ def test_7_5_listdir_iter(self, sftp):
"""
listdir_iter version of above test
"""
try:
- sftp.open(FOLDER + '/duck.txt', 'w').close()
- sftp.open(FOLDER + '/fish.txt', 'w').close()
- sftp.open(FOLDER + '/tertiary.py', 'w').close()
+ sftp.open(sftp.FOLDER + '/duck.txt', 'w').close()
+ sftp.open(sftp.FOLDER + '/fish.txt', 'w').close()
+ sftp.open(sftp.FOLDER + '/tertiary.py', 'w').close()
- x = [x.filename for x in sftp.listdir_iter(FOLDER)]
- self.assertEqual(len(x), 3)
+ x = [x.filename for x in sftp.listdir_iter(sftp.FOLDER)]
+ assert len(x) == 3
self.assertTrue('duck.txt' in x)
self.assertTrue('fish.txt' in x)
self.assertTrue('tertiary.py' in x)
self.assertTrue('random' not in x)
finally:
- sftp.remove(FOLDER + '/duck.txt')
- sftp.remove(FOLDER + '/fish.txt')
- sftp.remove(FOLDER + '/tertiary.py')
+ sftp.remove(sftp.FOLDER + '/duck.txt')
+ sftp.remove(sftp.FOLDER + '/fish.txt')
+ sftp.remove(sftp.FOLDER + '/tertiary.py')
- def test_8_setstat(self):
+ def test_8_setstat(self, sftp):
"""
verify that the setstat functions (chown, chmod, utime, truncate) work.
"""
try:
- with sftp.open(FOLDER + '/special', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/special', 'w') as f:
f.write('x' * 1024)
- stat = sftp.stat(FOLDER + '/special')
- sftp.chmod(FOLDER + '/special', (stat.st_mode & ~o777) | o600)
- stat = sftp.stat(FOLDER + '/special')
+ stat = sftp.stat(sftp.FOLDER + '/special')
+ sftp.chmod(sftp.FOLDER + '/special', (stat.st_mode & ~o777) | o600)
+ stat = sftp.stat(sftp.FOLDER + '/special')
expected_mode = o600
if sys.platform == 'win32':
# chmod not really functional on windows
@@ -299,35 +258,35 @@ class SFTPTest(unittest.TestCase):
if sys.platform == 'cygwin':
# even worse.
expected_mode = o644
- self.assertEqual(stat.st_mode & o777, expected_mode)
- self.assertEqual(stat.st_size, 1024)
+ assert stat.st_mode & o777 == expected_mode
+ assert stat.st_size == 1024
mtime = stat.st_mtime - 3600
atime = stat.st_atime - 1800
- sftp.utime(FOLDER + '/special', (atime, mtime))
- stat = sftp.stat(FOLDER + '/special')
- self.assertEqual(stat.st_mtime, mtime)
+ sftp.utime(sftp.FOLDER + '/special', (atime, mtime))
+ stat = sftp.stat(sftp.FOLDER + '/special')
+ assert stat.st_mtime == mtime
if sys.platform not in ('win32', 'cygwin'):
- self.assertEqual(stat.st_atime, atime)
+ assert stat.st_atime == atime
# can't really test chown, since we'd have to know a valid uid.
- sftp.truncate(FOLDER + '/special', 512)
- stat = sftp.stat(FOLDER + '/special')
- self.assertEqual(stat.st_size, 512)
+ sftp.truncate(sftp.FOLDER + '/special', 512)
+ stat = sftp.stat(sftp.FOLDER + '/special')
+ assert stat.st_size == 512
finally:
- sftp.remove(FOLDER + '/special')
+ sftp.remove(sftp.FOLDER + '/special')
- def test_9_fsetstat(self):
+ def test_9_fsetstat(self, sftp):
"""
verify that the fsetstat functions (chown, chmod, utime, truncate)
work on open files.
"""
try:
- with sftp.open(FOLDER + '/special', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/special', 'w') as f:
f.write('x' * 1024)
- with sftp.open(FOLDER + '/special', 'r+') as f:
+ with sftp.open(sftp.FOLDER + '/special', 'r+') as f:
stat = f.stat()
f.chmod((stat.st_mode & ~o777) | o600)
stat = f.stat()
@@ -339,26 +298,26 @@ class SFTPTest(unittest.TestCase):
if sys.platform == 'cygwin':
# even worse.
expected_mode = o644
- self.assertEqual(stat.st_mode & o777, expected_mode)
- self.assertEqual(stat.st_size, 1024)
+ assert stat.st_mode & o777 == expected_mode
+ assert stat.st_size == 1024
mtime = stat.st_mtime - 3600
atime = stat.st_atime - 1800
f.utime((atime, mtime))
stat = f.stat()
- self.assertEqual(stat.st_mtime, mtime)
+ assert stat.st_mtime == mtime
if sys.platform not in ('win32', 'cygwin'):
- self.assertEqual(stat.st_atime, atime)
+ assert stat.st_atime == atime
# can't really test chown, since we'd have to know a valid uid.
f.truncate(512)
stat = f.stat()
- self.assertEqual(stat.st_size, 512)
+ assert stat.st_size == 512
finally:
- sftp.remove(FOLDER + '/special')
+ sftp.remove(sftp.FOLDER + '/special')
- def test_A_readline_seek(self):
+ def test_A_readline_seek(self, sftp):
"""
create a text file and write a bunch of text into it. then count the lines
in the file, and seek around to retrieve particular lines. this should
@@ -366,10 +325,10 @@ class SFTPTest(unittest.TestCase):
buffering is reset on 'seek'.
"""
try:
- with sftp.open(FOLDER + '/duck.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/duck.txt', 'w') as f:
f.write(ARTICLE)
- with sftp.open(FOLDER + '/duck.txt', 'r+') as f:
+ with sftp.open(sftp.FOLDER + '/duck.txt', 'r+') as f:
line_number = 0
loc = 0
pos_list = []
@@ -379,33 +338,33 @@ class SFTPTest(unittest.TestCase):
loc = f.tell()
self.assertTrue(f.seekable())
f.seek(pos_list[6], f.SEEK_SET)
- self.assertEqual(f.readline(), 'Nouzilly, France.\n')
+ assert f.readline(), 'Nouzilly == France.\n'
f.seek(pos_list[17], f.SEEK_SET)
- self.assertEqual(f.readline()[:4], 'duck')
+ assert f.readline()[:4] == 'duck'
f.seek(pos_list[10], f.SEEK_SET)
- self.assertEqual(f.readline(), 'duck types were equally resistant to exogenous insulin compared with chicken.\n')
+ assert f.readline() == 'duck types were equally resistant to exogenous insulin compared with chicken.\n'
finally:
- sftp.remove(FOLDER + '/duck.txt')
+ sftp.remove(sftp.FOLDER + '/duck.txt')
- def test_B_write_seek(self):
+ def test_B_write_seek(self, sftp):
"""
create a text file, seek back and change part of it, and verify that the
changes worked.
"""
try:
- with sftp.open(FOLDER + '/testing.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/testing.txt', 'w') as f:
f.write('hello kitty.\n')
f.seek(-5, f.SEEK_CUR)
f.write('dd')
- self.assertEqual(sftp.stat(FOLDER + '/testing.txt').st_size, 13)
- with sftp.open(FOLDER + '/testing.txt', 'r') as f:
+ assert sftp.stat(sftp.FOLDER + '/testing.txt').st_size == 13
+ with sftp.open(sftp.FOLDER + '/testing.txt', 'r') as f:
data = f.read(20)
- self.assertEqual(data, b'hello kiddy.\n')
+ assert data == b'hello kiddy.\n'
finally:
- sftp.remove(FOLDER + '/testing.txt')
+ sftp.remove(sftp.FOLDER + '/testing.txt')
- def test_C_symlink(self):
+ def test_C_symlink(self, sftp):
"""
create a symlink and then check that lstat doesn't follow it.
"""
@@ -414,97 +373,97 @@ class SFTPTest(unittest.TestCase):
return
try:
- with sftp.open(FOLDER + '/original.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/original.txt', 'w') as f:
f.write('original\n')
- sftp.symlink('original.txt', FOLDER + '/link.txt')
- self.assertEqual(sftp.readlink(FOLDER + '/link.txt'), 'original.txt')
+ sftp.symlink('original.txt', sftp.FOLDER + '/link.txt')
+ assert sftp.readlink(sftp.FOLDER + '/link.txt') == 'original.txt'
- with sftp.open(FOLDER + '/link.txt', 'r') as f:
- self.assertEqual(f.readlines(), ['original\n'])
+ with sftp.open(sftp.FOLDER + '/link.txt', 'r') as f:
+ assert f.readlines() == ['original\n']
cwd = sftp.normalize('.')
if cwd[-1] == '/':
cwd = cwd[:-1]
- abs_path = cwd + '/' + FOLDER + '/original.txt'
- sftp.symlink(abs_path, FOLDER + '/link2.txt')
- self.assertEqual(abs_path, sftp.readlink(FOLDER + '/link2.txt'))
+ abs_path = cwd + '/' + sftp.FOLDER + '/original.txt'
+ sftp.symlink(abs_path, sftp.FOLDER + '/link2.txt')
+ assert abs_path == sftp.readlink(sftp.FOLDER + '/link2.txt')
- self.assertEqual(sftp.lstat(FOLDER + '/link.txt').st_size, 12)
- self.assertEqual(sftp.stat(FOLDER + '/link.txt').st_size, 9)
+ assert sftp.lstat(sftp.FOLDER + '/link.txt').st_size == 12
+ assert sftp.stat(sftp.FOLDER + '/link.txt').st_size == 9
# the sftp server may be hiding extra path members from us, so the
# length may be longer than we expect:
- self.assertTrue(sftp.lstat(FOLDER + '/link2.txt').st_size >= len(abs_path))
- self.assertEqual(sftp.stat(FOLDER + '/link2.txt').st_size, 9)
- self.assertEqual(sftp.stat(FOLDER + '/original.txt').st_size, 9)
+ self.assertTrue(sftp.lstat(sftp.FOLDER + '/link2.txt').st_size >= len(abs_path))
+ assert sftp.stat(sftp.FOLDER + '/link2.txt').st_size == 9
+ assert sftp.stat(sftp.FOLDER + '/original.txt').st_size == 9
finally:
try:
- sftp.remove(FOLDER + '/link.txt')
+ sftp.remove(sftp.FOLDER + '/link.txt')
except:
pass
try:
- sftp.remove(FOLDER + '/link2.txt')
+ sftp.remove(sftp.FOLDER + '/link2.txt')
except:
pass
try:
- sftp.remove(FOLDER + '/original.txt')
+ sftp.remove(sftp.FOLDER + '/original.txt')
except:
pass
- def test_D_flush_seek(self):
+ def test_D_flush_seek(self, sftp):
"""
verify that buffered writes are automatically flushed on seek.
"""
try:
- with sftp.open(FOLDER + '/happy.txt', 'w', 1) as f:
+ with sftp.open(sftp.FOLDER + '/happy.txt', 'w', 1) as f:
f.write('full line.\n')
f.write('partial')
f.seek(9, f.SEEK_SET)
f.write('?\n')
- with sftp.open(FOLDER + '/happy.txt', 'r') as f:
- self.assertEqual(f.readline(), u('full line?\n'))
- self.assertEqual(f.read(7), b'partial')
+ with sftp.open(sftp.FOLDER + '/happy.txt', 'r') as f:
+ assert f.readline() == u('full line?\n')
+ assert f.read(7) == b'partial'
finally:
try:
- sftp.remove(FOLDER + '/happy.txt')
+ sftp.remove(sftp.FOLDER + '/happy.txt')
except:
pass
- def test_E_realpath(self):
+ def test_E_realpath(self, sftp):
"""
test that realpath is returning something non-empty and not an
error.
"""
pwd = sftp.normalize('.')
self.assertTrue(len(pwd) > 0)
- f = sftp.normalize('./' + FOLDER)
+ f = sftp.normalize('./' + sftp.FOLDER)
self.assertTrue(len(f) > 0)
- self.assertEqual(os.path.join(pwd, FOLDER), f)
+ assert os.path.join(pwd, sftp.FOLDER) == f
- def test_F_mkdir(self):
+ def test_F_mkdir(self, sftp):
"""
verify that mkdir/rmdir work.
"""
try:
- sftp.mkdir(FOLDER + '/subfolder')
+ sftp.mkdir(sftp.FOLDER + '/subfolder')
except:
self.assertTrue(False, 'exception creating subfolder')
try:
- sftp.mkdir(FOLDER + '/subfolder')
+ sftp.mkdir(sftp.FOLDER + '/subfolder')
self.assertTrue(False, 'no exception overwriting subfolder')
except IOError:
pass
try:
- sftp.rmdir(FOLDER + '/subfolder')
+ sftp.rmdir(sftp.FOLDER + '/subfolder')
except:
self.assertTrue(False, 'exception removing subfolder')
try:
- sftp.rmdir(FOLDER + '/subfolder')
+ sftp.rmdir(sftp.FOLDER + '/subfolder')
self.assertTrue(False, 'no exception removing nonexistent subfolder')
except IOError:
pass
- def test_G_chdir(self):
+ def test_G_chdir(self, sftp):
"""
verify that chdir/getcwd work.
"""
@@ -512,35 +471,35 @@ class SFTPTest(unittest.TestCase):
if root[-1] != '/':
root += '/'
try:
- sftp.mkdir(FOLDER + '/alpha')
- sftp.chdir(FOLDER + '/alpha')
+ sftp.mkdir(sftp.FOLDER + '/alpha')
+ sftp.chdir(sftp.FOLDER + '/alpha')
sftp.mkdir('beta')
- self.assertEqual(root + FOLDER + '/alpha', sftp.getcwd())
- self.assertEqual(['beta'], sftp.listdir('.'))
+ assert root + sftp.FOLDER + '/alpha' == sftp.getcwd()
+ assert ['beta'] == sftp.listdir('.')
sftp.chdir('beta')
with sftp.open('fish', 'w') as f:
f.write('hello\n')
sftp.chdir('..')
- self.assertEqual(['fish'], sftp.listdir('beta'))
+ assert ['fish'] == sftp.listdir('beta')
sftp.chdir('..')
- self.assertEqual(['fish'], sftp.listdir('alpha/beta'))
+ assert ['fish'] == sftp.listdir('alpha/beta')
finally:
sftp.chdir(root)
try:
- sftp.unlink(FOLDER + '/alpha/beta/fish')
+ sftp.unlink(sftp.FOLDER + '/alpha/beta/fish')
except:
pass
try:
- sftp.rmdir(FOLDER + '/alpha/beta')
+ sftp.rmdir(sftp.FOLDER + '/alpha/beta')
except:
pass
try:
- sftp.rmdir(FOLDER + '/alpha')
+ sftp.rmdir(sftp.FOLDER + '/alpha')
except:
pass
- def test_H_get_put(self):
+ def test_H_get_put(self, sftp):
"""
verify that get/put work.
"""
@@ -555,103 +514,103 @@ class SFTPTest(unittest.TestCase):
def progress_callback(x, y):
saved_progress.append((x, y))
- sftp.put(localname, FOLDER + '/bunny.txt', progress_callback)
+ sftp.put(localname, sftp.FOLDER + '/bunny.txt', progress_callback)
- with sftp.open(FOLDER + '/bunny.txt', 'rb') as f:
- self.assertEqual(text, f.read(128))
- self.assertEqual([(41, 41)], saved_progress)
+ with sftp.open(sftp.FOLDER + '/bunny.txt', 'rb') as f:
+ assert text == f.read(128)
+ assert [(41, 41)] == saved_progress
os.unlink(localname)
fd, localname = mkstemp()
os.close(fd)
saved_progress = []
- sftp.get(FOLDER + '/bunny.txt', localname, progress_callback)
+ sftp.get(sftp.FOLDER + '/bunny.txt', localname, progress_callback)
with open(localname, 'rb') as f:
- self.assertEqual(text, f.read(128))
- self.assertEqual([(41, 41)], saved_progress)
+ assert text == f.read(128)
+ assert [(41, 41)] == saved_progress
os.unlink(localname)
- sftp.unlink(FOLDER + '/bunny.txt')
+ sftp.unlink(sftp.FOLDER + '/bunny.txt')
- def test_I_check(self):
+ def test_I_check(self, sftp):
"""
verify that file.check() works against our own server.
(it's an sftp extension that we support, and may be the only ones who
support it.)
"""
- with sftp.open(FOLDER + '/kitty.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/kitty.txt', 'w') as f:
f.write('here kitty kitty' * 64)
try:
- with sftp.open(FOLDER + '/kitty.txt', 'r') as f:
+ with sftp.open(sftp.FOLDER + '/kitty.txt', 'r') as f:
sum = f.check('sha1')
- self.assertEqual('91059CFC6615941378D413CB5ADAF4C5EB293402', u(hexlify(sum)).upper())
+ assert '91059CFC6615941378D413CB5ADAF4C5EB293402' == u(hexlify(sum)).upper()
sum = f.check('md5', 0, 512)
- self.assertEqual('93DE4788FCA28D471516963A1FE3856A', u(hexlify(sum)).upper())
+ assert '93DE4788FCA28D471516963A1FE3856A' == u(hexlify(sum)).upper()
sum = f.check('md5', 0, 0, 510)
self.assertEqual('EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6',
u(hexlify(sum)).upper())
finally:
- sftp.unlink(FOLDER + '/kitty.txt')
+ sftp.unlink(sftp.FOLDER + '/kitty.txt')
- def test_J_x_flag(self):
+ def test_J_x_flag(self, sftp):
"""
verify that the 'x' flag works when opening a file.
"""
- sftp.open(FOLDER + '/unusual.txt', 'wx').close()
+ sftp.open(sftp.FOLDER + '/unusual.txt', 'wx').close()
try:
try:
- sftp.open(FOLDER + '/unusual.txt', 'wx')
+ sftp.open(sftp.FOLDER + '/unusual.txt', 'wx')
self.fail('expected exception')
except IOError:
pass
finally:
- sftp.unlink(FOLDER + '/unusual.txt')
+ sftp.unlink(sftp.FOLDER + '/unusual.txt')
- def test_K_utf8(self):
+ def test_K_utf8(self, sftp):
"""
verify that unicode strings are encoded into utf8 correctly.
"""
- with sftp.open(FOLDER + '/something', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/something', 'w') as f:
f.write('okay')
try:
- sftp.rename(FOLDER + '/something', FOLDER + '/' + unicode_folder)
- sftp.open(b(FOLDER) + utf8_folder, 'r')
+ sftp.rename(sftp.FOLDER + '/something', sftp.FOLDER + '/' + unicode_folder)
+ sftp.open(b(sftp.FOLDER) + utf8_folder, 'r')
except Exception as e:
self.fail('exception ' + str(e))
- sftp.unlink(b(FOLDER) + utf8_folder)
+ sftp.unlink(b(sftp.FOLDER) + utf8_folder)
- def test_L_utf8_chdir(self):
- sftp.mkdir(FOLDER + '/' + unicode_folder)
+ def test_L_utf8_chdir(self, sftp):
+ sftp.mkdir(sftp.FOLDER + '/' + unicode_folder)
try:
- sftp.chdir(FOLDER + '/' + unicode_folder)
+ sftp.chdir(sftp.FOLDER + '/' + unicode_folder)
with sftp.open('something', 'w') as f:
f.write('okay')
sftp.unlink('something')
finally:
sftp.chdir()
- sftp.rmdir(FOLDER + '/' + unicode_folder)
+ sftp.rmdir(sftp.FOLDER + '/' + unicode_folder)
- def test_M_bad_readv(self):
+ def test_M_bad_readv(self, sftp):
"""
verify that readv at the end of the file doesn't essplode.
"""
- sftp.open(FOLDER + '/zero', 'w').close()
+ sftp.open(sftp.FOLDER + '/zero', 'w').close()
try:
- with sftp.open(FOLDER + '/zero', 'r') as f:
+ with sftp.open(sftp.FOLDER + '/zero', 'r') as f:
f.readv([(0, 12)])
- with sftp.open(FOLDER + '/zero', 'r') as f:
+ with sftp.open(sftp.FOLDER + '/zero', 'r') as f:
file_size = f.stat().st_size
f.prefetch(file_size)
f.read(100)
finally:
- sftp.unlink(FOLDER + '/zero')
+ sftp.unlink(sftp.FOLDER + '/zero')
- def test_N_put_without_confirm(self):
+ def test_N_put_without_confirm(self, sftp):
"""
verify that get/put work without confirmation.
"""
@@ -666,61 +625,61 @@ class SFTPTest(unittest.TestCase):
def progress_callback(x, y):
saved_progress.append((x, y))
- res = sftp.put(localname, FOLDER + '/bunny.txt', progress_callback, False)
+ res = sftp.put(localname, sftp.FOLDER + '/bunny.txt', progress_callback, False)
- self.assertEqual(SFTPAttributes().attr, res.attr)
+ assert SFTPAttributes().attr == res.attr
- with sftp.open(FOLDER + '/bunny.txt', 'r') as f:
- self.assertEqual(text, f.read(128))
- self.assertEqual((41, 41), saved_progress[-1])
+ with sftp.open(sftp.FOLDER + '/bunny.txt', 'r') as f:
+ assert text == f.read(128)
+ assert (41, 41) == saved_progress[-1]
os.unlink(localname)
- sftp.unlink(FOLDER + '/bunny.txt')
+ sftp.unlink(sftp.FOLDER + '/bunny.txt')
- def test_O_getcwd(self):
+ def test_O_getcwd(self, sftp):
"""
verify that chdir/getcwd work.
"""
- self.assertEqual(None, sftp.getcwd())
+ assert sftp.getcwd() == None
root = sftp.normalize('.')
if root[-1] != '/':
root += '/'
try:
- sftp.mkdir(FOLDER + '/alpha')
- sftp.chdir(FOLDER + '/alpha')
- self.assertEqual('/' + FOLDER + '/alpha', sftp.getcwd())
+ sftp.mkdir(sftp.FOLDER + '/alpha')
+ sftp.chdir(sftp.FOLDER + '/alpha')
+ assert sftp.getcwd() == '/' + sftp.FOLDER + '/alpha'
finally:
sftp.chdir(root)
try:
- sftp.rmdir(FOLDER + '/alpha')
+ sftp.rmdir(sftp.FOLDER + '/alpha')
except:
pass
- def XXX_test_M_seek_append(self):
+ def XXX_test_M_seek_append(self, sftp):
"""
verify that seek does't affect writes during append.
does not work except through paramiko. :( openssh fails.
"""
try:
- with sftp.open(FOLDER + '/append.txt', 'a') as f:
+ with sftp.open(sftp.FOLDER + '/append.txt', 'a') as f:
f.write('first line\nsecond line\n')
f.seek(11, f.SEEK_SET)
f.write('third line\n')
- with sftp.open(FOLDER + '/append.txt', 'r') as f:
- self.assertEqual(f.stat().st_size, 34)
- self.assertEqual(f.readline(), 'first line\n')
- self.assertEqual(f.readline(), 'second line\n')
- self.assertEqual(f.readline(), 'third line\n')
+ with sftp.open(sftp.FOLDER + '/append.txt', 'r') as f:
+ assert f.stat().st_size == 34
+ assert f.readline() == 'first line\n'
+ assert f.readline() == 'second line\n'
+ assert f.readline() == 'third line\n'
finally:
- sftp.remove(FOLDER + '/append.txt')
+ sftp.remove(sftp.FOLDER + '/append.txt')
- def test_putfo_empty_file(self):
+ def test_putfo_empty_file(self, sftp):
"""
Send an empty file and confirm it is sent.
"""
- target = FOLDER + '/empty file.txt'
+ target = sftp.FOLDER + '/empty file.txt'
stream = StringIO()
try:
attrs = sftp.putfo(stream, target)
@@ -730,69 +689,71 @@ class SFTPTest(unittest.TestCase):
sftp.remove(target)
- def test_N_file_with_percent(self):
+ def test_N_file_with_percent(self, sftp):
"""
verify that we can create a file with a '%' in the filename.
( it needs to be properly escaped by _log() )
"""
- self.assertTrue( paramiko.util.get_logger("paramiko").handlers, "This unit test requires logging to be enabled" )
- f = sftp.open(FOLDER + '/test%file', 'w')
+ # TODO: how best to enable this only for the one test? & how to make it
+ # not log to actual file? lol??
+ paramiko.util.log_to_file('test_sftp.log')
+ f = sftp.open(sftp.FOLDER + '/test%file', 'w')
try:
- self.assertEqual(f.stat().st_size, 0)
+ assert f.stat().st_size == 0
finally:
f.close()
- sftp.remove(FOLDER + '/test%file')
+ sftp.remove(sftp.FOLDER + '/test%file')
- def test_O_non_utf8_data(self):
+ def test_O_non_utf8_data(self, sftp):
"""Test write() and read() of non utf8 data"""
try:
- with sftp.open('%s/nonutf8data' % FOLDER, 'w') as f:
+ with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'w') as f:
f.write(NON_UTF8_DATA)
- with sftp.open('%s/nonutf8data' % FOLDER, 'r') as f:
+ with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'r') as f:
data = f.read()
- self.assertEqual(data, NON_UTF8_DATA)
- with sftp.open('%s/nonutf8data' % FOLDER, 'wb') as f:
+ assert data == NON_UTF8_DATA
+ with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'wb') as f:
f.write(NON_UTF8_DATA)
- with sftp.open('%s/nonutf8data' % FOLDER, 'rb') as f:
+ with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'rb') as f:
data = f.read()
- self.assertEqual(data, NON_UTF8_DATA)
+ assert data == NON_UTF8_DATA
finally:
- sftp.remove('%s/nonutf8data' % FOLDER)
+ sftp.remove('%s/nonutf8data' % sftp.FOLDER)
- def test_sftp_attributes_empty_str(self):
+ def test_sftp_attributes_empty_str(self, sftp):
sftp_attributes = SFTPAttributes()
- self.assertEqual(str(sftp_attributes), "?--------- 1 0 0 0 (unknown date) ?")
+ assert str(sftp_attributes) == "?--------- 1 0 0 0 (unknown date) ?"
@needs_builtin('buffer')
- def test_write_buffer(self):
+ def test_write_buffer(self, sftp):
"""Test write() using a buffer instance."""
data = 3 * b'A potentially large block of data to chunk up.\n'
try:
- with sftp.open('%s/write_buffer' % FOLDER, 'wb') as f:
+ with sftp.open('%s/write_buffer' % sftp.FOLDER, 'wb') as f:
for offset in range(0, len(data), 8):
f.write(buffer(data, offset, 8))
- with sftp.open('%s/write_buffer' % FOLDER, 'rb') as f:
- self.assertEqual(f.read(), data)
+ with sftp.open('%s/write_buffer' % sftp.FOLDER, 'rb') as f:
+ assert f.read() == data
finally:
- sftp.remove('%s/write_buffer' % FOLDER)
+ sftp.remove('%s/write_buffer' % sftp.FOLDER)
@needs_builtin('memoryview')
- def test_write_memoryview(self):
+ def test_write_memoryview(self, sftp):
"""Test write() using a memoryview instance."""
data = 3 * b'A potentially large block of data to chunk up.\n'
try:
- with sftp.open('%s/write_memoryview' % FOLDER, 'wb') as f:
+ with sftp.open('%s/write_memoryview' % sftp.FOLDER, 'wb') as f:
view = memoryview(data)
for offset in range(0, len(data), 8):
f.write(view[offset:offset+8])
- with sftp.open('%s/write_memoryview' % FOLDER, 'rb') as f:
- self.assertEqual(f.read(), data)
+ with sftp.open('%s/write_memoryview' % sftp.FOLDER, 'rb') as f:
+ assert f.read() == data
finally:
- sftp.remove('%s/write_memoryview' % FOLDER)
+ sftp.remove('%s/write_memoryview' % sftp.FOLDER)
if __name__ == '__main__':
diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py
index c58a7912..e5708312 100644
--- a/tests/test_sftp_big.py
+++ b/tests/test_sftp_big.py
@@ -33,9 +33,6 @@ import unittest
from paramiko.common import o660
-FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000')
-
-
class TestBigSFTP(object):
def test_1_lots_of_files(self, sftp):
"""
@@ -44,22 +41,22 @@ class TestBigSFTP(object):
numfiles = 100
try:
for i in range(numfiles):
- with sftp.open('%s/file%d.txt' % (FOLDER, i), 'w', 1) as f:
+ with sftp.open('%s/file%d.txt' % (sftp.FOLDER, i), 'w', 1) as f:
f.write('this is file #%d.\n' % i)
- sftp.chmod('%s/file%d.txt' % (FOLDER, i), o660)
+ sftp.chmod('%s/file%d.txt' % (sftp.FOLDER, i), o660)
# now make sure every file is there, by creating a list of filenmes
# and reading them in random order.
numlist = list(range(numfiles))
while len(numlist) > 0:
r = numlist[random.randint(0, len(numlist) - 1)]
- with sftp.open('%s/file%d.txt' % (FOLDER, r)) as f:
+ with sftp.open('%s/file%d.txt' % (sftp.FOLDER, r)) as f:
assert f.readline() == 'this is file #%d.\n' % r
numlist.remove(r)
finally:
for i in range(numfiles):
try:
- sftp.remove('%s/file%d.txt' % (FOLDER, i))
+ sftp.remove('%s/file%d.txt' % (sftp.FOLDER, i))
except:
pass
@@ -70,19 +67,19 @@ class TestBigSFTP(object):
kblob = (1024 * b'x')
start = time.time()
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'w') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'w') as f:
for n in range(1024):
f.write(kblob)
if n % 128 == 0:
sys.stderr.write('.')
sys.stderr.write(' ')
- assert sftp.stat('%s/hongry.txt' % FOLDER).st_size == 1024 * 1024
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
end = time.time()
sys.stderr.write('%ds ' % round(end - start))
start = time.time()
- with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'r') as f:
for n in range(1024):
data = f.read(1024)
assert data == kblob
@@ -90,7 +87,7 @@ class TestBigSFTP(object):
end = time.time()
sys.stderr.write('%ds ' % round(end - start))
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
def test_3_big_file_pipelined(self, sftp):
"""
@@ -99,7 +96,7 @@ class TestBigSFTP(object):
kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
start = time.time()
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -107,12 +104,12 @@ class TestBigSFTP(object):
sys.stderr.write('.')
sys.stderr.write(' ')
- assert sftp.stat('%s/hongry.txt' % FOLDER).st_size == 1024 * 1024
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
end = time.time()
sys.stderr.write('%ds ' % round(end - start))
start = time.time()
- with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f:
file_size = f.stat().st_size
f.prefetch(file_size)
@@ -132,12 +129,12 @@ class TestBigSFTP(object):
end = time.time()
sys.stderr.write('%ds ' % round(end - start))
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
def test_4_prefetch_seek(self, sftp):
kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -145,13 +142,13 @@ class TestBigSFTP(object):
sys.stderr.write('.')
sys.stderr.write(' ')
- assert sftp.stat('%s/hongry.txt' % FOLDER).st_size == 1024 * 1024
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
start = time.time()
k2blob = kblob + kblob
chunk = 793
for i in range(10):
- with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f:
file_size = f.stat().st_size
f.prefetch(file_size)
base_offset = (512 * 1024) + 17 * random.randint(1000, 2000)
@@ -168,12 +165,12 @@ class TestBigSFTP(object):
end = time.time()
sys.stderr.write('%ds ' % round(end - start))
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
def test_5_readv_seek(self, sftp):
kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -181,13 +178,13 @@ class TestBigSFTP(object):
sys.stderr.write('.')
sys.stderr.write(' ')
- assert sftp.stat('%s/hongry.txt' % FOLDER).st_size == 1024 * 1024
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
start = time.time()
k2blob = kblob + kblob
chunk = 793
for i in range(10):
- with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f:
base_offset = (512 * 1024) + 17 * random.randint(1000, 2000)
# make a bunch of offsets and put them in random order
offsets = [base_offset + j * chunk for j in range(100)]
@@ -204,7 +201,7 @@ class TestBigSFTP(object):
end = time.time()
sys.stderr.write('%ds ' % round(end - start))
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
def test_6_lots_of_prefetching(self, sftp):
"""
@@ -213,7 +210,7 @@ class TestBigSFTP(object):
"""
kblob = (1024 * b'x')
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'w') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'w') as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -221,13 +218,13 @@ class TestBigSFTP(object):
sys.stderr.write('.')
sys.stderr.write(' ')
- assert sftp.stat('%s/hongry.txt' % FOLDER).st_size == 1024 * 1024
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
for i in range(10):
- with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'r') as f:
file_size = f.stat().st_size
f.prefetch(file_size)
- with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'r') as f:
file_size = f.stat().st_size
f.prefetch(file_size)
for n in range(1024):
@@ -237,7 +234,7 @@ class TestBigSFTP(object):
sys.stderr.write('.')
sys.stderr.write(' ')
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
def test_7_prefetch_readv(self, sftp):
"""
@@ -245,7 +242,7 @@ class TestBigSFTP(object):
"""
kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -253,9 +250,9 @@ class TestBigSFTP(object):
sys.stderr.write('.')
sys.stderr.write(' ')
- assert sftp.stat('%s/hongry.txt' % FOLDER).st_size == 1024 * 1024
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
- with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f:
file_size = f.stat().st_size
f.prefetch(file_size)
data = f.read(1024)
@@ -273,7 +270,7 @@ class TestBigSFTP(object):
sys.stderr.write(' ')
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
def test_8_large_readv(self, sftp):
"""
@@ -282,7 +279,7 @@ class TestBigSFTP(object):
"""
kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -290,9 +287,9 @@ class TestBigSFTP(object):
sys.stderr.write('.')
sys.stderr.write(' ')
- assert sftp.stat('%s/hongry.txt' % FOLDER).st_size == 1024 * 1024
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
- with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f:
data = list(f.readv([(23 * 1024, 128 * 1024)]))
assert len(data) == 1
data = data[0]
@@ -300,7 +297,7 @@ class TestBigSFTP(object):
sys.stderr.write(' ')
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
def test_9_big_file_big_buffer(self, sftp):
"""
@@ -308,12 +305,12 @@ class TestBigSFTP(object):
"""
mblob = (1024 * 1024 * 'x')
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'w', 128 * 1024) as f:
f.write(mblob)
- assert sftp.stat('%s/hongry.txt' % FOLDER).st_size == 1024 * 1024
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
def test_A_big_file_renegotiate(self, sftp):
"""
@@ -323,20 +320,20 @@ class TestBigSFTP(object):
t.packetizer.REKEY_BYTES = 512 * 1024
k32blob = (32 * 1024 * 'x')
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'w', 128 * 1024) as f:
for i in range(32):
f.write(k32blob)
- assert sftp.stat('%s/hongry.txt' % FOLDER).st_size == 1024 * 1024
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
assert t.H != t.session_id
# try to read it too.
- with sftp.open('%s/hongry.txt' % FOLDER, 'r', 128 * 1024) as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'r', 128 * 1024) as f:
file_size = f.stat().st_size
f.prefetch(file_size)
total = 0
while total < 1024 * 1024:
total += len(f.read(32 * 1024))
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
t.packetizer.REKEY_BYTES = pow(2, 30)