summaryrefslogtreecommitdiff
path: root/fs/tests/test_remote.py
diff options
context:
space:
mode:
Diffstat (limited to 'fs/tests/test_remote.py')
-rw-r--r--fs/tests/test_remote.py80
1 files changed, 42 insertions, 38 deletions
diff --git a/fs/tests/test_remote.py b/fs/tests/test_remote.py
index a5cc8c5..9a4db5e 100644
--- a/fs/tests/test_remote.py
+++ b/fs/tests/test_remote.py
@@ -24,23 +24,27 @@ from fs.local_functools import wraps
from six import PY3, b
+
class RemoteTempFS(TempFS):
"""
Simple filesystem implementing setfilecontents
for RemoteFileBuffer tests
"""
- def open(self, path, mode='rb', write_on_flush=True):
+ def open(self, path, mode='rb', write_on_flush=True, **kwargs):
if 'a' in mode or 'r' in mode or '+' in mode:
- f = super(RemoteTempFS, self).open(path, 'rb')
+ f = super(RemoteTempFS, self).open(path, mode='rb', **kwargs)
f = TellAfterCloseFile(f)
else:
f = None
-
- return RemoteFileBuffer(self, path, mode, f,
- write_on_flush=write_on_flush)
-
- def setcontents(self, path, data, chunk_size=64*1024):
- f = super(RemoteTempFS, self).open(path, 'wb')
+
+ return RemoteFileBuffer(self,
+ path,
+ mode,
+ f,
+ write_on_flush=write_on_flush)
+
+ def setcontents(self, path, data, encoding=None, errors=None, chunk_size=64*1024):
+ f = super(RemoteTempFS, self).open(path, 'wb', encoding=encoding, errors=errors, chunk_size=chunk_size)
if getattr(data, 'read', False):
f.write(data.read())
else:
@@ -51,7 +55,7 @@ class RemoteTempFS(TempFS):
class TellAfterCloseFile(object):
"""File-like object that allows calling tell() after it's been closed."""
- def __init__(self,file):
+ def __init__(self, file):
self._finalpos = None
self.file = file
@@ -65,49 +69,49 @@ class TellAfterCloseFile(object):
return self._finalpos
return self.file.tell()
- def __getattr__(self,attr):
- return getattr(self.file,attr)
+ def __getattr__(self, attr):
+ return getattr(self.file, attr)
class TestRemoteFileBuffer(unittest.TestCase, FSTestCases, ThreadingTestCases):
class FakeException(Exception): pass
-
+
def setUp(self):
self.fs = RemoteTempFS()
self.original_setcontents = self.fs.setcontents
- def tearDown(self):
+ def tearDown(self):
self.fs.close()
self.fakeOff()
def fake_setcontents(self, path, content=b(''), chunk_size=16*1024):
''' Fake replacement for RemoteTempFS setcontents() '''
raise self.FakeException("setcontents should not be called here!")
-
+
def fakeOn(self):
'''
Turn on fake_setcontents(). When setcontents on RemoteTempFS
is called, FakeException is raised and nothing is stored.
'''
self.fs.setcontents = self.fake_setcontents
-
+
def fakeOff(self):
''' Switch off fake_setcontents(). '''
self.fs.setcontents = self.original_setcontents
-
+
def test_ondemand(self):
'''
Tests on-demand loading of remote content in RemoteFileBuffer
- '''
+ '''
contents = b("Tristatricettri stribrnych strikacek strikalo") + \
b("pres tristatricettri stribrnych strech.")
f = self.fs.open('test.txt', 'wb')
f.write(contents)
f.close()
-
+
# During following tests, no setcontents() should be called.
self.fakeOn()
-
+
f = self.fs.open('test.txt', 'rb')
self.assertEquals(f.read(10), contents[:10])
f.wrapped_file.seek(0, SEEK_END)
@@ -118,18 +122,18 @@ class TestRemoteFileBuffer(unittest.TestCase, FSTestCases, ThreadingTestCases):
f.seek(0, SEEK_END)
self.assertEquals(f._rfile.tell(), len(contents))
f.close()
-
+
f = self.fs.open('test.txt', 'ab')
self.assertEquals(f.tell(), len(contents))
f.close()
-
+
self.fakeOff()
-
+
# Writing over the rfile edge
f = self.fs.open('test.txt', 'wb+')
self.assertEquals(f.tell(), 0)
f.seek(len(contents) - 5)
- # Last 5 characters not loaded from remote file
+ # Last 5 characters not loaded from remote file
self.assertEquals(f._rfile.tell(), len(contents) - 5)
# Confirm that last 5 characters are still in rfile buffer
self.assertEquals(f._rfile.read(), contents[-5:])
@@ -141,9 +145,9 @@ class TestRemoteFileBuffer(unittest.TestCase, FSTestCases, ThreadingTestCases):
# We are on the end of file (and buffer not serve anything anymore)
self.assertEquals(f.read(), b(''))
f.close()
-
+
self.fakeOn()
-
+
# Check if we wrote everything OK from
# previous writing over the remote buffer edge
f = self.fs.open('test.txt', 'rb')
@@ -151,7 +155,7 @@ class TestRemoteFileBuffer(unittest.TestCase, FSTestCases, ThreadingTestCases):
f.close()
self.fakeOff()
-
+
def test_writeonflush(self):
'''
Test 'write_on_flush' switch of RemoteFileBuffer.
@@ -168,7 +172,7 @@ class TestRemoteFileBuffer(unittest.TestCase, FSTestCases, ThreadingTestCases):
self.fakeOff()
f.close()
self.fakeOn()
-
+
f = self.fs.open('test.txt', 'wb', write_on_flush=False)
f.write(b('Sample text'))
# FakeException is not raised, because setcontents is not called
@@ -176,16 +180,16 @@ class TestRemoteFileBuffer(unittest.TestCase, FSTestCases, ThreadingTestCases):
f.write(b('Second sample text'))
self.assertRaises(self.FakeException, f.close)
self.fakeOff()
-
+
def test_flush_and_continue(self):
'''
This tests if partially loaded remote buffer can be flushed
back to remote destination and opened file is still
- in good condition.
+ in good condition.
'''
contents = b("Zlutoucky kun upel dabelske ody.")
contents2 = b('Ententyky dva spaliky cert vyletel z elektriky')
-
+
f = self.fs.open('test.txt', 'wb')
f.write(contents)
f.close()
@@ -202,12 +206,12 @@ class TestRemoteFileBuffer(unittest.TestCase, FSTestCases, ThreadingTestCases):
# Try if we have unocrrupted file locally...
self.assertEquals(f.read(), contents[:10] + b('x') + contents[11:])
f.close()
-
+
# And if we have uncorrupted file also on storage
f = self.fs.open('test.txt', 'rb')
self.assertEquals(f.read(), contents[:10] + b('x') + contents[11:])
f.close()
-
+
# Now try it again, but write garbage behind edge of remote file
f = self.fs.open('test.txt', 'rb+')
self.assertEquals(f.read(10), contents[:10])
@@ -218,12 +222,12 @@ class TestRemoteFileBuffer(unittest.TestCase, FSTestCases, ThreadingTestCases):
# Try if we have unocrrupted file locally...
self.assertEquals(f.read(), contents[:10] + contents2)
f.close()
-
+
# And if we have uncorrupted file also on storage
f = self.fs.open('test.txt', 'rb')
self.assertEquals(f.read(), contents[:10] + contents2)
f.close()
-
+
class TestCacheFS(unittest.TestCase,FSTestCases,ThreadingTestCases):
"""Test simple operation of CacheFS"""
@@ -267,7 +271,7 @@ class TestCacheFS(unittest.TestCase,FSTestCases,ThreadingTestCases):
self.assertFalse(self.fs.isfile("hello"))
finally:
self.fs.cache_timeout = old_timeout
-
+
class TestConnectionManagerFS(unittest.TestCase,FSTestCases):#,ThreadingTestCases):
@@ -286,7 +290,7 @@ class TestConnectionManagerFS(unittest.TestCase,FSTestCases):#,ThreadingTestCase
class DisconnectingFS(WrapFS):
"""FS subclass that raises lots of RemoteConnectionErrors."""
- def __init__(self,fs=None):
+ def __init__(self,fs=None):
if fs is None:
fs = TempFS()
self._connected = True
@@ -315,8 +319,8 @@ class DisconnectingFS(WrapFS):
time.sleep(random.random()*0.1)
self._connected = not self._connected
- def setcontents(self, path, contents=b(''), chunk_size=64*1024):
- return self.wrapped_fs.setcontents(path, contents)
+ def setcontents(self, path, data=b(''), encoding=None, errors=None, chunk_size=64*1024):
+ return self.wrapped_fs.setcontents(path, data, encoding=encoding, errors=errors, chunk_size=chunk_size)
def close(self):
if not self.closed: