summaryrefslogtreecommitdiff
path: root/tests/unit/test_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unit/test_utils.py')
-rw-r--r--tests/unit/test_utils.py106
1 files changed, 89 insertions, 17 deletions
diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py
index f072aed..d82d2b8 100644
--- a/tests/unit/test_utils.py
+++ b/tests/unit/test_utils.py
@@ -14,10 +14,10 @@
# limitations under the License.
import testtools
-
import mock
import six
import tempfile
+from hashlib import md5
from swiftclient import utils as u
@@ -161,39 +161,111 @@ class TestTempURL(testtools.TestCase):
self.method)
+class TestReadableToIterable(testtools.TestCase):
+
+ def test_iter(self):
+ chunk_size = 4
+ write_data = tuple(x.encode() for x in ('a', 'b', 'c', 'd'))
+ actual_md5sum = md5()
+
+ with tempfile.TemporaryFile() as f:
+ for x in write_data:
+ f.write(x * chunk_size)
+ actual_md5sum.update(x * chunk_size)
+ f.seek(0)
+ data = u.ReadableToIterable(f, chunk_size, True)
+
+ for i, data_chunk in enumerate(data):
+ self.assertEquals(chunk_size, len(data_chunk))
+ self.assertEquals(data_chunk, write_data[i] * chunk_size)
+
+ self.assertEquals(actual_md5sum.hexdigest(), data.get_md5sum())
+
+ def test_md5_creation(self):
+ # Check creation with a real and noop md5 class
+ data = u.ReadableToIterable(None, None, md5=True)
+ self.assertEquals(md5().hexdigest(), data.get_md5sum())
+ self.assertTrue(isinstance(data.md5sum, type(md5())))
+
+ data = u.ReadableToIterable(None, None, md5=False)
+ self.assertEquals('', data.get_md5sum())
+ self.assertTrue(isinstance(data.md5sum, type(u.NoopMD5())))
+
+ def test_unicode(self):
+ # Check no errors are raised if unicode data is feed in.
+ unicode_data = u'abc'
+ actual_md5sum = md5(unicode_data.encode()).hexdigest()
+ chunk_size = 2
+
+ with tempfile.TemporaryFile(mode='w+') as f:
+ f.write(unicode_data)
+ f.seek(0)
+ data = u.ReadableToIterable(f, chunk_size, True)
+
+ x = next(data)
+ self.assertEquals(2, len(x))
+ self.assertEquals(unicode_data[:2], x)
+
+ x = next(data)
+ self.assertEquals(1, len(x))
+ self.assertEquals(unicode_data[2:], x)
+
+ self.assertEquals(actual_md5sum, data.get_md5sum())
+
+
class TestLengthWrapper(testtools.TestCase):
def test_stringio(self):
- contents = six.StringIO('a' * 100)
- data = u.LengthWrapper(contents, 42)
+ contents = six.StringIO(u'a' * 100)
+ data = u.LengthWrapper(contents, 42, True)
+ s = u'a' * 42
+ read_data = u''.join(iter(data.read, ''))
+
+ self.assertEqual(42, len(data))
+ self.assertEqual(42, len(read_data))
+ self.assertEqual(s, read_data)
+ self.assertEqual(md5(s.encode()).hexdigest(), data.get_md5sum())
+
+ def test_bytesio(self):
+ contents = six.BytesIO(b'a' * 100)
+ data = u.LengthWrapper(contents, 42, True)
+ s = b'a' * 42
+ read_data = b''.join(iter(data.read, ''))
+
self.assertEqual(42, len(data))
- read_data = ''.join(iter(data.read, ''))
self.assertEqual(42, len(read_data))
- self.assertEqual('a' * 42, read_data)
+ self.assertEqual(s, read_data)
+ self.assertEqual(md5(s).hexdigest(), data.get_md5sum())
def test_tempfile(self):
- with tempfile.NamedTemporaryFile(mode='w') as f:
- f.write('a' * 100)
+ with tempfile.NamedTemporaryFile(mode='wb') as f:
+ f.write(b'a' * 100)
f.flush()
- contents = open(f.name)
- data = u.LengthWrapper(contents, 42)
+ contents = open(f.name, 'rb')
+ data = u.LengthWrapper(contents, 42, True)
+ s = b'a' * 42
+ read_data = b''.join(iter(data.read, ''))
+
self.assertEqual(42, len(data))
- read_data = ''.join(iter(data.read, ''))
self.assertEqual(42, len(read_data))
- self.assertEqual('a' * 42, read_data)
+ self.assertEqual(s, read_data)
+ self.assertEqual(md5(s).hexdigest(), data.get_md5sum())
def test_segmented_file(self):
- with tempfile.NamedTemporaryFile(mode='w') as f:
+ with tempfile.NamedTemporaryFile(mode='wb') as f:
segment_length = 1024
segments = ('a', 'b', 'c', 'd')
for c in segments:
- f.write(c * segment_length)
+ f.write((c * segment_length).encode())
f.flush()
for i, c in enumerate(segments):
- contents = open(f.name)
+ contents = open(f.name, 'rb')
contents.seek(i * segment_length)
- data = u.LengthWrapper(contents, segment_length)
+ data = u.LengthWrapper(contents, segment_length, True)
+ read_data = b''.join(iter(data.read, ''))
+ s = (c * segment_length).encode()
+
self.assertEqual(segment_length, len(data))
- read_data = ''.join(iter(data.read, ''))
self.assertEqual(segment_length, len(read_data))
- self.assertEqual(c * segment_length, read_data)
+ self.assertEqual(s, read_data)
+ self.assertEqual(md5(s).hexdigest(), data.get_md5sum())