diff options
Diffstat (limited to 'test/unit/test_utils.py')
-rw-r--r-- | test/unit/test_utils.py | 195 |
1 files changed, 119 insertions, 76 deletions
diff --git a/test/unit/test_utils.py b/test/unit/test_utils.py index cbee82b..129208d 100644 --- a/test/unit/test_utils.py +++ b/test/unit/test_utils.py @@ -14,13 +14,13 @@ # limitations under the License. import gzip +import io import json import unittest -import mock -import six +from unittest import mock import tempfile from time import gmtime, localtime, mktime, strftime, strptime -from hashlib import md5, sha1 +import hashlib from swiftclient import utils as u @@ -127,27 +127,75 @@ class TestTempURL(unittest.TestCase): seconds = 3600 key = 'correcthorsebatterystaple' method = 'GET' - expected_url = url + ('?temp_url_sig=temp_url_signature' - '&temp_url_expires=1400003600') expected_body = '\n'.join([ method, '1400003600', url, ]).encode('utf-8') + @property + def expected_url(self): + if isinstance(self.url, bytes): + return self.url + (b'?temp_url_sig=temp_url_signature' + b'&temp_url_expires=1400003600') + return self.url + (u'?temp_url_sig=temp_url_signature' + u'&temp_url_expires=1400003600') + + @property + def expected_sha512_url(self): + if isinstance(self.url, bytes): + return self.url + (b'?temp_url_sig=sha512:dGVtcF91cmxfc2lnbmF0dXJl' + b'&temp_url_expires=1400003600') + return self.url + (u'?temp_url_sig=sha512:dGVtcF91cmxfc2lnbmF0dXJl' + u'&temp_url_expires=1400003600') + @mock.patch('hmac.HMAC') @mock.patch('time.time', return_value=1400000000) - def test_generate_temp_url(self, time_mock, hmac_mock): + def test_generate_sha1_temp_url(self, time_mock, hmac_mock): + hmac_mock().hexdigest.return_value = 'temp_url_signature' + url = u.generate_temp_url(self.url, self.seconds, + self.key, self.method, digest='sha1') + key = self.key + if not isinstance(key, bytes): + key = key.encode('utf-8') + self.assertEqual(url, self.expected_url) + self.assertEqual(hmac_mock.mock_calls, [ + mock.call(), + mock.call(key, self.expected_body, hashlib.sha1), + mock.call().hexdigest(), + ]) + self.assertIsInstance(url, type(self.url)) + + @mock.patch('hmac.HMAC') + @mock.patch('time.time', return_value=1400000000) + def test_generate_sha512_temp_url(self, time_mock, hmac_mock): + hmac_mock().digest.return_value = b'temp_url_signature' + url = u.generate_temp_url(self.url, self.seconds, + self.key, self.method, digest=hashlib.sha512) + key = self.key + if not isinstance(key, bytes): + key = key.encode('utf-8') + self.assertEqual(url, self.expected_sha512_url) + self.assertEqual(hmac_mock.mock_calls, [ + mock.call(), + mock.call(key, self.expected_body, hashlib.sha512), + mock.call().digest(), + ]) + self.assertIsInstance(url, type(self.url)) + + @mock.patch('hmac.HMAC') + @mock.patch('time.time', return_value=1400000000) + def test_generate_sha256_temp_url_by_default(self, time_mock, hmac_mock): hmac_mock().hexdigest.return_value = 'temp_url_signature' url = u.generate_temp_url(self.url, self.seconds, self.key, self.method) key = self.key - if not isinstance(key, six.binary_type): + if not isinstance(key, bytes): key = key.encode('utf-8') self.assertEqual(url, self.expected_url) self.assertEqual(hmac_mock.mock_calls, [ mock.call(), - mock.call(key, self.expected_body, sha1), + mock.call(key, self.expected_body, hashlib.sha256), mock.call().hexdigest(), ]) self.assertIsInstance(url, type(self.url)) @@ -170,10 +218,10 @@ class TestTempURL(unittest.TestCase): self.key, self.method, ip_range=ip_range) key = self.key - if not isinstance(key, six.binary_type): + if not isinstance(key, bytes): key = key.encode('utf-8') - if isinstance(ip_range, six.binary_type): + if isinstance(ip_range, bytes): ip_range_expected_url = ( expected_url + ip_range.decode('utf-8') ) @@ -195,7 +243,7 @@ class TestTempURL(unittest.TestCase): self.assertEqual(url, ip_range_expected_url) self.assertEqual(hmac_mock.mock_calls, [ - mock.call(key, expected_body, sha1), + mock.call(key, expected_body, hashlib.sha256), mock.call().hexdigest(), ]) self.assertIsInstance(url, type(path)) @@ -215,7 +263,7 @@ class TestTempURL(unittest.TestCase): lt = localtime() expires = strftime(u.EXPIRES_ISO8601_FORMAT[:-1], lt) - if not isinstance(self.expected_url, six.string_types): + if not isinstance(self.expected_url, str): expected_url = self.expected_url.replace( b'1400003600', bytes(str(int(mktime(lt))), encoding='ascii')) else: @@ -228,7 +276,7 @@ class TestTempURL(unittest.TestCase): expires = strftime(u.SHORT_EXPIRES_ISO8601_FORMAT, lt) lt = strptime(expires, u.SHORT_EXPIRES_ISO8601_FORMAT) - if not isinstance(self.expected_url, six.string_types): + if not isinstance(self.expected_url, str): expected_url = self.expected_url.replace( b'1400003600', bytes(str(int(mktime(lt))), encoding='ascii')) else: @@ -246,17 +294,17 @@ class TestTempURL(unittest.TestCase): self.key, self.method, iso8601=True) key = self.key - if not isinstance(key, six.binary_type): + if not isinstance(key, bytes): key = key.encode('utf-8') expires = strftime(u.EXPIRES_ISO8601_FORMAT, gmtime(1400003600)) - if not isinstance(self.url, six.string_types): + if not isinstance(self.url, str): self.assertTrue(url.endswith(bytes(expires, 'utf-8'))) else: self.assertTrue(url.endswith(expires)) self.assertEqual(hmac_mock.mock_calls, [ mock.call(), - mock.call(key, self.expected_body, sha1), + mock.call(key, self.expected_body, hashlib.sha256), mock.call().hexdigest(), ]) self.assertIsInstance(url, type(self.url)) @@ -280,11 +328,11 @@ class TestTempURL(unittest.TestCase): url = u.generate_temp_url(path, self.seconds, self.key, self.method, prefix=True) key = self.key - if not isinstance(key, six.binary_type): + if not isinstance(key, bytes): key = key.encode('utf-8') self.assertEqual(url, expected_url) self.assertEqual(hmac_mock.mock_calls, [ - mock.call(key, expected_body, sha1), + mock.call(key, expected_body, hashlib.sha256), mock.call().hexdigest(), ]) @@ -299,12 +347,12 @@ class TestTempURL(unittest.TestCase): @mock.patch('hmac.HMAC.hexdigest', return_value="temp_url_signature") def test_generate_absolute_expiry_temp_url(self, hmac_mock): - if isinstance(self.expected_url, six.binary_type): + if isinstance(self.expected_url, bytes): expected_url = self.expected_url.replace( b'1400003600', b'2146636800') else: expected_url = self.expected_url.replace( - u'1400003600', u'2146636800') + '1400003600', '2146636800') url = u.generate_temp_url(self.url, 2146636800, self.key, self.method, absolute=True) self.assertEqual(url, expected_url) @@ -372,34 +420,28 @@ class TestTempURL(unittest.TestCase): class TestTempURLUnicodePathAndKey(TestTempURL): - url = u'/v1/\u00e4/c/\u00f3' - key = u'k\u00e9y' - expected_url = (u'%s?temp_url_sig=temp_url_signature' - u'&temp_url_expires=1400003600') % url - expected_body = u'\n'.join([ - u'GET', - u'1400003600', + url = '/v1/\u00e4/c/\u00f3' + key = 'k\u00e9y' + expected_body = '\n'.join([ + 'GET', + '1400003600', url, ]).encode('utf-8') class TestTempURLUnicodePathBytesKey(TestTempURL): - url = u'/v1/\u00e4/c/\u00f3' - key = u'k\u00e9y'.encode('utf-8') - expected_url = (u'%s?temp_url_sig=temp_url_signature' - u'&temp_url_expires=1400003600') % url + url = '/v1/\u00e4/c/\u00f3' + key = 'k\u00e9y'.encode('utf-8') expected_body = '\n'.join([ - u'GET', - u'1400003600', + 'GET', + '1400003600', url, ]).encode('utf-8') class TestTempURLBytesPathUnicodeKey(TestTempURL): - url = u'/v1/\u00e4/c/\u00f3'.encode('utf-8') - key = u'k\u00e9y' - expected_url = url + (b'?temp_url_sig=temp_url_signature' - b'&temp_url_expires=1400003600') + url = '/v1/\u00e4/c/\u00f3'.encode('utf-8') + key = 'k\u00e9y' expected_body = b'\n'.join([ b'GET', b'1400003600', @@ -408,10 +450,8 @@ class TestTempURLBytesPathUnicodeKey(TestTempURL): class TestTempURLBytesPathAndKey(TestTempURL): - url = u'/v1/\u00e4/c/\u00f3'.encode('utf-8') - key = u'k\u00e9y'.encode('utf-8') - expected_url = url + (b'?temp_url_sig=temp_url_signature' - b'&temp_url_expires=1400003600') + url = '/v1/\u00e4/c/\u00f3'.encode('utf-8') + key = 'k\u00e9y'.encode('utf-8') expected_body = b'\n'.join([ b'GET', b'1400003600', @@ -420,10 +460,8 @@ class TestTempURLBytesPathAndKey(TestTempURL): class TestTempURLBytesPathAndNonUtf8Key(TestTempURL): - url = u'/v1/\u00e4/c/\u00f3'.encode('utf-8') + url = '/v1/\u00e4/c/\u00f3'.encode('utf-8') key = b'k\xffy' - expected_url = url + (b'?temp_url_sig=temp_url_signature' - b'&temp_url_expires=1400003600') expected_body = b'\n'.join([ b'GET', b'1400003600', @@ -436,7 +474,7 @@ class TestReadableToIterable(unittest.TestCase): def test_iter(self): chunk_size = 4 write_data = tuple(x.encode() for x in ('a', 'b', 'c', 'd')) - actual_md5sum = md5() + actual_md5sum = hashlib.md5() with tempfile.TemporaryFile() as f: for x in write_data: @@ -454,8 +492,8 @@ class TestReadableToIterable(unittest.TestCase): def test_md5_creation(self): # Check creation with a real and noop md5 class data = u.ReadableToIterable(None, None, md5=True) - self.assertEqual(md5().hexdigest(), data.get_md5sum()) - self.assertIs(type(md5()), type(data.md5sum)) + self.assertEqual(hashlib.md5().hexdigest(), data.get_md5sum()) + self.assertIs(type(hashlib.md5()), type(data.md5sum)) data = u.ReadableToIterable(None, None, md5=False) self.assertEqual('', data.get_md5sum()) @@ -463,8 +501,8 @@ class TestReadableToIterable(unittest.TestCase): def test_unicode(self): # Check no errors are raised if unicode data is feed in. - unicode_data = u'abc' - actual_md5sum = md5(unicode_data.encode()).hexdigest() + unicode_data = 'abc' + actual_md5sum = hashlib.md5(unicode_data.encode()).hexdigest() chunk_size = 2 with tempfile.TemporaryFile(mode='w+') as f: @@ -486,27 +524,29 @@ class TestReadableToIterable(unittest.TestCase): class TestLengthWrapper(unittest.TestCase): def test_stringio(self): - contents = six.StringIO(u'a' * 50 + u'b' * 50) + contents = io.StringIO('a' * 50 + 'b' * 50) contents.seek(22) data = u.LengthWrapper(contents, 42, True) - s = u'a' * 28 + u'b' * 14 - read_data = u''.join(iter(data.read, '')) + s = 'a' * 28 + 'b' * 14 + read_data = ''.join(iter(data.read, '')) self.assertEqual(42, len(data)) self.assertEqual(42, len(read_data)) self.assertEqual(s, read_data) - self.assertEqual(md5(s.encode()).hexdigest(), data.get_md5sum()) + self.assertEqual(hashlib.md5(s.encode()).hexdigest(), + data.get_md5sum()) data.reset() - self.assertEqual(md5().hexdigest(), data.get_md5sum()) + self.assertEqual(hashlib.md5().hexdigest(), data.get_md5sum()) - read_data = u''.join(iter(data.read, '')) + read_data = ''.join(iter(data.read, '')) self.assertEqual(42, len(read_data)) self.assertEqual(s, read_data) - self.assertEqual(md5(s.encode()).hexdigest(), data.get_md5sum()) + self.assertEqual(hashlib.md5(s.encode()).hexdigest(), + data.get_md5sum()) def test_bytesio(self): - contents = six.BytesIO(b'a' * 50 + b'b' * 50) + contents = io.BytesIO(b'a' * 50 + b'b' * 50) contents.seek(22) data = u.LengthWrapper(contents, 42, True) s = b'a' * 28 + b'b' * 14 @@ -515,7 +555,7 @@ class TestLengthWrapper(unittest.TestCase): self.assertEqual(42, len(data)) self.assertEqual(42, len(read_data)) self.assertEqual(s, read_data) - self.assertEqual(md5(s).hexdigest(), data.get_md5sum()) + self.assertEqual(hashlib.md5(s).hexdigest(), data.get_md5sum()) def test_tempfile(self): with tempfile.NamedTemporaryFile(mode='wb') as f: @@ -529,7 +569,7 @@ class TestLengthWrapper(unittest.TestCase): self.assertEqual(42, len(data)) self.assertEqual(42, len(read_data)) self.assertEqual(s, read_data) - self.assertEqual(md5(s).hexdigest(), data.get_md5sum()) + self.assertEqual(hashlib.md5(s).hexdigest(), data.get_md5sum()) def test_segmented_file(self): with tempfile.NamedTemporaryFile(mode='wb') as f: @@ -548,15 +588,18 @@ class TestLengthWrapper(unittest.TestCase): self.assertEqual(segment_length, len(data)) self.assertEqual(segment_length, len(read_data)) self.assertEqual(s, read_data) - self.assertEqual(md5(s).hexdigest(), data.get_md5sum()) + self.assertEqual(hashlib.md5(s).hexdigest(), + data.get_md5sum()) data.reset() - self.assertEqual(md5().hexdigest(), data.get_md5sum()) + self.assertEqual(hashlib.md5().hexdigest(), + data.get_md5sum()) read_data = b''.join(iter(data.read, '')) self.assertEqual(segment_length, len(data)) self.assertEqual(segment_length, len(read_data)) self.assertEqual(s, read_data) - self.assertEqual(md5(s).hexdigest(), data.get_md5sum()) + self.assertEqual(hashlib.md5(s).hexdigest(), + data.get_md5sum()) class TestGroupers(unittest.TestCase): @@ -591,12 +634,12 @@ class TestApiResponeParser(unittest.TestCase): def test_utf8_default(self): result = u.parse_api_response( - {}, u'{"test": "\u2603"}'.encode('utf8')) - self.assertEqual({'test': u'\u2603'}, result) + {}, '{"test": "\u2603"}'.encode('utf8')) + self.assertEqual({'test': '\u2603'}, result) result = u.parse_api_response( - {}, u'{"test": "\\u2603"}'.encode('utf8')) - self.assertEqual({'test': u'\u2603'}, result) + {}, '{"test": "\\u2603"}'.encode('utf8')) + self.assertEqual({'test': '\u2603'}, result) def test_bad_json(self): self.assertRaises(ValueError, u.parse_api_response, @@ -610,38 +653,38 @@ class TestApiResponeParser(unittest.TestCase): result = u.parse_api_response( {'content-type': 'application/json; charset=iso8859-1'}, b'{"t\xe9st": "\xff"}') - self.assertEqual({u't\xe9st': u'\xff'}, result) + self.assertEqual({'t\xe9st': '\xff'}, result) def test_gzipped_utf8(self): - buf = six.BytesIO() + buf = io.BytesIO() gz = gzip.GzipFile(fileobj=buf, mode='w') - gz.write(u'{"test": "\u2603"}'.encode('utf8')) + gz.write('{"test": "\u2603"}'.encode('utf8')) gz.close() result = u.parse_api_response( {'content-encoding': 'gzip'}, buf.getvalue()) - self.assertEqual({'test': u'\u2603'}, result) + self.assertEqual({'test': '\u2603'}, result) class TestGetBody(unittest.TestCase): def test_not_gzipped(self): result = u.parse_api_response( - {}, u'{"test": "\\u2603"}'.encode('utf8')) - self.assertEqual({'test': u'\u2603'}, result) + {}, '{"test": "\\u2603"}'.encode('utf8')) + self.assertEqual({'test': '\u2603'}, result) def test_gzipped_body(self): - buf = six.BytesIO() + buf = io.BytesIO() gz = gzip.GzipFile(fileobj=buf, mode='w') - gz.write(u'{"test": "\u2603"}'.encode('utf8')) + gz.write('{"test": "\u2603"}'.encode('utf8')) gz.close() result = u.parse_api_response( {'content-encoding': 'gzip'}, buf.getvalue()) - self.assertEqual({'test': u'\u2603'}, result) + self.assertEqual({'test': '\u2603'}, result) -class JSONTracker(object): +class JSONTracker: def __init__(self, data): self.data = data self.calls = [] |