diff options
author | Tim Burke <tim.burke@gmail.com> | 2015-03-03 12:35:03 -0800 |
---|---|---|
committer | Tim Burke <tim.burke@gmail.com> | 2015-03-23 18:35:45 -0700 |
commit | a4fb70ece189aff85f234ab6b3f275b69e936c03 (patch) | |
tree | 5fe51c0846b7fbb45137bb5689bde08f61033dd8 | |
parent | 925c01ebfbdfb6478a3786f24a9572deae40f8f8 (diff) | |
download | python-swiftclient-a4fb70ece189aff85f234ab6b3f275b69e936c03.tar.gz |
Compare each chunk of large objects when uploading
Previously, we compared the ETag from Swift against the MD5 of the
entire large object. However, the ETag for large objects is generally
the MD5 of the concatenation of the ETags for each segment, unless the
object is a DLO whose segments span more than one page of a container
listing. Rather than worry about ETags, just compare each chunk of the
segmented file. This allows the use of --skip-identical when uploading
SLOs and DLOs.
Additionally, there are several test-related improvements:
* The default arguments for OutputManager are now evaluated on
construction, rather than on definition, so that
TestOutputManager.test_instantiation will succeed when using nosetest
as a test runner. (See also: bug 1251507)
* An account_username option is now available in the functional tests
config file for auth systems that do not follow the account:username
format.
* CaptureOutput no longer writes to the captured stream, and
MockHttpTest now captures output. These were polluting test output
unnecessarily. (See also: bug 1201376)
Change-Id: Ic484e9a0c186c9283c4012c6a2fa77b96b8edf8a
Closes-Bug: #1201376
Closes-Bug: #1379252
Related-Bug: #1251507
-rw-r--r-- | swiftclient/multithreading.py | 7 | ||||
-rw-r--r-- | swiftclient/service.py | 100 | ||||
-rw-r--r-- | tests/functional/test_swiftclient.py | 10 | ||||
-rw-r--r-- | tests/unit/test_service.py | 128 | ||||
-rw-r--r-- | tests/unit/utils.py | 9 |
5 files changed, 214 insertions, 40 deletions
diff --git a/swiftclient/multithreading.py b/swiftclient/multithreading.py index 32d8ffa..c53d987 100644 --- a/swiftclient/multithreading.py +++ b/swiftclient/multithreading.py @@ -45,7 +45,7 @@ class OutputManager(object): """ DEFAULT_OFFSET = 14 - def __init__(self, print_stream=sys.stdout, error_stream=sys.stderr): + def __init__(self, print_stream=None, error_stream=None): """ :param print_stream: The stream to which :meth:`print_msg` sends formatted messages. @@ -54,9 +54,10 @@ class OutputManager(object): On Python 2, Unicode messages are encoded to utf8. """ - self.print_stream = print_stream + self.print_stream = print_stream or sys.stdout self.print_pool = ThreadPoolExecutor(max_workers=1) - self.error_stream = error_stream + + self.error_stream = error_stream or sys.stderr self.error_print_pool = ThreadPoolExecutor(max_workers=1) self.error_count = 0 diff --git a/swiftclient/service.py b/swiftclient/service.py index f24d430..bb132dd 100644 --- a/swiftclient/service.py +++ b/swiftclient/service.py @@ -1159,7 +1159,7 @@ class SwiftService(object): 'headers': [], 'segment_size': None, 'use_slo': False, - 'segment_container: None, + 'segment_container': None, 'leave_segments': False, 'changed': None, 'skip_identical': False, @@ -1502,6 +1502,51 @@ class SwiftService(object): results_queue.put(res) return res + def _get_chunk_data(self, conn, container, obj, headers): + chunks = [] + if 'x-object-manifest' in headers: + scontainer, sprefix = headers['x-object-manifest'].split('/', 1) + for part in self.list(scontainer, {'prefix': sprefix}): + if part["success"]: + chunks.extend(part["listing"]) + else: + raise part["error"] + elif config_true_value(headers.get('x-static-large-object')): + _, manifest_data = conn.get_object( + container, obj, query_string='multipart-manifest=get') + for chunk in json.loads(manifest_data): + if chunk.get('sub_slo'): + scont, sobj = chunk['name'].lstrip('/').split('/', 1) + chunks.extend(self._get_chunk_data( + conn, scont, sobj, {'x-static-large-object': True})) + else: + chunks.append(chunk) + else: + chunks.append({'hash': headers.get('etag').strip('"'), + 'bytes': int(headers.get('content-length'))}) + return chunks + + def _is_identical(self, chunk_data, path): + try: + fp = open(path, 'rb') + except IOError: + return False + + with fp: + for chunk in chunk_data: + to_read = chunk['bytes'] + md5sum = md5() + while to_read: + data = fp.read(min(65536, to_read)) + if not data: + return False + md5sum.update(data) + to_read -= len(data) + if md5sum.hexdigest() != chunk['hash']: + return False + # Each chunk is verified; check that we're at the end of the file + return not fp.read(1) + def _upload_object_job(self, conn, container, source, obj, options, results_queue=None): res = { @@ -1533,32 +1578,27 @@ class SwiftService(object): old_manifest = None old_slo_manifest_paths = [] new_slo_manifest_paths = set() + segment_size = int(0 if options['segment_size'] is None + else options['segment_size']) if (options['changed'] or options['skip_identical'] or not options['leave_segments']): - checksum = None - if options['skip_identical']: - try: - fp = open(path, 'rb') - except IOError: - pass - else: - with fp: - md5sum = md5() - while True: - data = fp.read(65536) - if not data: - break - md5sum.update(data) - checksum = md5sum.hexdigest() try: headers = conn.head_object(container, obj) - if options['skip_identical'] and checksum is not None: - if checksum == headers.get('etag'): - res.update({ - 'success': True, - 'status': 'skipped-identical' - }) - return res + is_slo = config_true_value( + headers.get('x-static-large-object')) + + if options['skip_identical'] or ( + is_slo and not options['leave_segments']): + chunk_data = self._get_chunk_data( + conn, container, obj, headers) + + if options['skip_identical'] and self._is_identical( + chunk_data, path): + res.update({ + 'success': True, + 'status': 'skipped-identical' + }) + return res cl = int(headers.get('content-length')) mt = headers.get('x-object-meta-mtime') @@ -1572,13 +1612,8 @@ class SwiftService(object): return res if not options['leave_segments']: old_manifest = headers.get('x-object-manifest') - if config_true_value( - headers.get('x-static-large-object')): - headers, manifest_data = conn.get_object( - container, obj, - query_string='multipart-manifest=get' - ) - for old_seg in json.loads(manifest_data): + if is_slo: + for old_seg in chunk_data: seg_path = old_seg['name'].lstrip('/') if isinstance(seg_path, text_type): seg_path = seg_path.encode('utf-8') @@ -1598,8 +1633,8 @@ class SwiftService(object): # a segment job if we're reading from a stream - we may fail if we # go over the single object limit, but this gives us a nice way # to create objects from memory - if (path is not None and options['segment_size'] - and (getsize(path) > int(options['segment_size']))): + if (path is not None and segment_size + and (getsize(path) > segment_size)): res['large_object'] = True seg_container = container + '_segments' if options['segment_container']: @@ -1612,7 +1647,6 @@ class SwiftService(object): segment_start = 0 while segment_start < full_size: - segment_size = int(options['segment_size']) if segment_start + segment_size > full_size: segment_size = full_size - segment_start if options['use_slo']: diff --git a/tests/functional/test_swiftclient.py b/tests/functional/test_swiftclient.py index f5d14aa..4b57f1d 100644 --- a/tests/functional/test_swiftclient.py +++ b/tests/functional/test_swiftclient.py @@ -51,8 +51,13 @@ class TestFunctional(testtools.TestCase): auth_ssl = config.getboolean('func_test', 'auth_ssl') auth_prefix = config.get('func_test', 'auth_prefix') self.auth_version = config.get('func_test', 'auth_version') - self.account = config.get('func_test', 'account') - self.username = config.get('func_test', 'username') + try: + self.account_username = config.get('func_test', + 'account_username') + except configparser.NoOptionError: + account = config.get('func_test', 'account') + username = config.get('func_test', 'username') + self.account_username = "%s:%s" % (account, username) self.password = config.get('func_test', 'password') self.auth_url = "" if auth_ssl: @@ -62,7 +67,6 @@ class TestFunctional(testtools.TestCase): self.auth_url += "%s:%s%s" % (auth_host, auth_port, auth_prefix) if self.auth_version == "1": self.auth_url += 'v1.0' - self.account_username = "%s:%s" % (self.account, self.username) else: self.skip_tests = True diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py index 3309813..073f06e 100644 --- a/tests/unit/test_service.py +++ b/tests/unit/test_service.py @@ -817,3 +817,131 @@ class TestServiceUpload(testtools.TestCase): contents = mock_conn.put_object.call_args[0][2] self.assertEqual(contents.get_md5sum(), md5(b'a' * 30).hexdigest()) + + def test_upload_object_job_identical_etag(self): + with tempfile.NamedTemporaryFile() as f: + f.write(b'a' * 30) + f.flush() + + mock_conn = mock.Mock() + mock_conn.head_object.return_value = { + 'content-length': 30, + 'etag': md5(b'a' * 30).hexdigest()} + type(mock_conn).attempts = mock.PropertyMock(return_value=2) + + s = SwiftService() + r = s._upload_object_job(conn=mock_conn, + container='test_c', + source=f.name, + obj='test_o', + options={'changed': False, + 'skip_identical': True, + 'leave_segments': True, + 'header': '', + 'segment_size': 0}) + + self.assertTrue(r['success']) + self.assertIn('status', r) + self.assertEqual(r['status'], 'skipped-identical') + self.assertEqual(mock_conn.put_object.call_count, 0) + self.assertEqual(mock_conn.head_object.call_count, 1) + mock_conn.head_object.assert_called_with('test_c', 'test_o') + + def test_upload_object_job_identical_slo_with_nesting(self): + with tempfile.NamedTemporaryFile() as f: + f.write(b'a' * 30) + f.flush() + seg_etag = md5(b'a' * 10).hexdigest() + submanifest = "[%s]" % ",".join( + ['{"bytes":10,"hash":"%s"}' % seg_etag] * 2) + submanifest_etag = md5(seg_etag.encode('ascii') * 2).hexdigest() + manifest = "[%s]" % ",".join([ + '{"sub_slo":true,"name":"/test_c_segments/test_sub_slo",' + '"bytes":20,"hash":"%s"}' % submanifest_etag, + '{"bytes":10,"hash":"%s"}' % seg_etag]) + + mock_conn = mock.Mock() + mock_conn.head_object.return_value = { + 'x-static-large-object': True, + 'content-length': 30, + 'etag': md5(submanifest_etag.encode('ascii') + + seg_etag.encode('ascii')).hexdigest()} + mock_conn.get_object.side_effect = [ + (None, manifest), + (None, submanifest)] + type(mock_conn).attempts = mock.PropertyMock(return_value=2) + + s = SwiftService() + r = s._upload_object_job(conn=mock_conn, + container='test_c', + source=f.name, + obj='test_o', + options={'changed': False, + 'skip_identical': True, + 'leave_segments': True, + 'header': '', + 'segment_size': 10}) + + self.assertIsNone(r.get('error')) + self.assertTrue(r['success']) + self.assertEqual('skipped-identical', r.get('status')) + self.assertEqual(0, mock_conn.put_object.call_count) + self.assertEqual([mock.call('test_c', 'test_o')], + mock_conn.head_object.mock_calls) + self.assertEqual([ + mock.call('test_c', 'test_o', + query_string='multipart-manifest=get'), + mock.call('test_c_segments', 'test_sub_slo', + query_string='multipart-manifest=get'), + ], mock_conn.get_object.mock_calls) + + def test_upload_object_job_identical_dlo(self): + with tempfile.NamedTemporaryFile() as f: + f.write(b'a' * 30) + f.flush() + segment_etag = md5(b'a' * 10).hexdigest() + + mock_conn = mock.Mock() + mock_conn.head_object.return_value = { + 'x-object-manifest': 'test_c_segments/test_o/prefix', + 'content-length': 30, + 'etag': md5(segment_etag.encode('ascii') * 3).hexdigest()} + mock_conn.get_container.side_effect = [ + (None, [{"bytes": 10, "hash": segment_etag, + "name": "test_o/prefix/00"}, + {"bytes": 10, "hash": segment_etag, + "name": "test_o/prefix/01"}]), + (None, [{"bytes": 10, "hash": segment_etag, + "name": "test_o/prefix/02"}]), + (None, {})] + type(mock_conn).attempts = mock.PropertyMock(return_value=2) + + s = SwiftService() + with mock.patch('swiftclient.service.get_conn', + return_value=mock_conn): + r = s._upload_object_job(conn=mock_conn, + container='test_c', + source=f.name, + obj='test_o', + options={'changed': False, + 'skip_identical': True, + 'leave_segments': True, + 'header': '', + 'segment_size': 10}) + + self.assertIsNone(r.get('error')) + self.assertTrue(r['success']) + self.assertEqual('skipped-identical', r.get('status')) + self.assertEqual(0, mock_conn.put_object.call_count) + self.assertEqual(1, mock_conn.head_object.call_count) + self.assertEqual(3, mock_conn.get_container.call_count) + mock_conn.head_object.assert_called_with('test_c', 'test_o') + expected = [ + mock.call('test_c_segments', prefix='test_o/prefix', + marker='', delimiter=None), + mock.call('test_c_segments', prefix='test_o/prefix', + marker="test_o/prefix/01", delimiter=None), + mock.call('test_c_segments', prefix='test_o/prefix', + marker="test_o/prefix/02", delimiter=None), + ] + mock_conn.get_container.assert_has_calls(expected) diff --git a/tests/unit/utils.py b/tests/unit/utils.py index 88d6d12..bb68f4f 100644 --- a/tests/unit/utils.py +++ b/tests/unit/utils.py @@ -207,6 +207,12 @@ class MockHttpTest(testtools.TestCase): self.fake_connect = None self.request_log = [] + # Capture output, since the test-runner stdout/stderr moneky-patching + # won't cover the references to sys.stdout/sys.stderr in + # swiftclient.multithreading + self.capture_output = CaptureOutput() + self.capture_output.__enter__() + def fake_http_connection(*args, **kwargs): self.validateMockedRequestsConsumed() self.request_log = [] @@ -367,6 +373,7 @@ class MockHttpTest(testtools.TestCase): # un-hygienic mocking on the swiftclient.client module; which may lead # to some unfortunate test order dependency bugs by way of the broken # window theory if any other modules are similarly patched + self.capture_output.__exit__() reload_module(c) @@ -392,7 +399,7 @@ class CaptureStream(object): self.stream = stream self._capture = six.StringIO() self._buffer = CaptureStreamBuffer(self) - self.streams = [self.stream, self._capture] + self.streams = [self._capture] @property def buffer(self): |