diff options
author | Jenkins <jenkins@review.openstack.org> | 2015-06-01 17:18:21 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2015-06-01 17:18:21 +0000 |
commit | 49483a3b111e55405f7e522a8896225aad8473ac (patch) | |
tree | 8817e5e042fec72e8f17171e70780b4f1a957b7e | |
parent | bcca25a2c82cf4926613384a2b56b4f2577bc897 (diff) | |
parent | ff0b3b02f07de341fa9eb81156ac2a0565d85cd4 (diff) | |
download | python-swiftclient-49483a3b111e55405f7e522a8896225aad8473ac.tar.gz |
Merge "Compare each chunk of large objects when downloading"
-rw-r--r-- | swiftclient/service.py | 72 | ||||
-rw-r--r-- | tests/unit/test_service.py | 365 |
2 files changed, 425 insertions, 12 deletions
diff --git a/swiftclient/service.py b/swiftclient/service.py index d607d65..90daf5a 100644 --- a/swiftclient/service.py +++ b/swiftclient/service.py @@ -974,8 +974,7 @@ class SwiftService(object): for o_down in interruptable_as_completed(o_downs): yield o_down.result() - @staticmethod - def _download_object_job(conn, container, obj, options): + def _download_object_job(self, conn, container, obj, options): out_file = options['out_file'] results_dict = {} @@ -984,7 +983,10 @@ class SwiftService(object): pseudodir = False path = join(container, obj) if options['yes_all'] else obj path = path.lstrip(os_path_sep) - if options['skip_identical'] and out_file != '-': + options['skip_identical'] = (options['skip_identical'] and + out_file != '-') + + if options['skip_identical']: filename = out_file if out_file else path try: fp = open(filename, 'rb') @@ -1002,10 +1004,55 @@ class SwiftService(object): try: start_time = time() - headers, body = \ - conn.get_object(container, obj, resp_chunk_size=65536, - headers=req_headers, - response_dict=results_dict) + get_args = {'resp_chunk_size': 65536, + 'headers': req_headers, + 'response_dict': results_dict} + if options['skip_identical']: + # Assume the file is a large object; if we're wrong, the query + # string is ignored and the If-None-Match header will trigger + # the behavior we want + get_args['query_string'] = 'multipart-manifest=get' + + try: + headers, body = conn.get_object(container, obj, **get_args) + except ClientException as e: + if not options['skip_identical']: + raise + if e.http_status != 304: # Only handling Not Modified + raise + + headers = results_dict['headers'] + if 'x-object-manifest' in headers: + # DLO: most likely it has more than one page worth of + # segments and we have an empty file locally + body = [] + elif config_true_value(headers.get('x-static-large-object')): + # SLO: apparently we have a copy of the manifest locally? + # provide no chunking data to force a fresh download + body = [b'[]'] + else: + # Normal object: let it bubble up + raise + + if options['skip_identical']: + if config_true_value(headers.get('x-static-large-object')) or \ + 'x-object-manifest' in headers: + # The request was chunked, so stitch it back together + chunk_data = self._get_chunk_data(conn, container, obj, + headers, b''.join(body)) + else: + chunk_data = None + + if chunk_data is not None: + if self._is_identical(chunk_data, filename): + raise ClientException('Large object is identical', + http_status=304) + + # Large objects are different; start the real download + del get_args['query_string'] + get_args['response_dict'].clear() + headers, body = conn.get_object(container, obj, **get_args) + headers_receipt = time() obj_body = _SwiftReader(path, body, headers) @@ -1503,7 +1550,7 @@ class SwiftService(object): results_queue.put(res) return res - def _get_chunk_data(self, conn, container, obj, headers): + def _get_chunk_data(self, conn, container, obj, headers, manifest=None): chunks = [] if 'x-object-manifest' in headers: scontainer, sprefix = headers['x-object-manifest'].split('/', 1) @@ -1513,10 +1560,11 @@ class SwiftService(object): else: raise part["error"] elif config_true_value(headers.get('x-static-large-object')): - manifest_headers, manifest_data = conn.get_object( - container, obj, query_string='multipart-manifest=get') - manifest_data = parse_api_response(manifest_headers, manifest_data) - for chunk in manifest_data: + if manifest is None: + headers, manifest = conn.get_object( + container, obj, query_string='multipart-manifest=get') + manifest = parse_api_response(headers, manifest) + for chunk in manifest: if chunk.get('sub_slo'): scont, sobj = chunk['name'].lstrip('/').split('/', 1) chunks.extend(self._get_chunk_data( diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py index 1e0d96d..74a6ce3 100644 --- a/tests/unit/test_service.py +++ b/tests/unit/test_service.py @@ -988,3 +988,368 @@ class TestServiceUpload(testtools.TestCase): marker="test_o/prefix/02", delimiter=None), ] mock_conn.get_container.assert_has_calls(expected) + + +class TestServiceDownload(testtools.TestCase): + + def _assertDictEqual(self, a, b, m=None): + # assertDictEqual is not available in py2.6 so use a shallow check + # instead + if not m: + m = '{0} != {1}'.format(a, b) + + if hasattr(self, 'assertDictEqual'): + self.assertDictEqual(a, b, m) + else: + self.assertTrue(isinstance(a, dict), m) + self.assertTrue(isinstance(b, dict), m) + self.assertEqual(len(a), len(b), m) + for k, v in a.items(): + self.assertIn(k, b, m) + self.assertEqual(b[k], v, m) + + def test_download_object_job_skip_identical(self): + with tempfile.NamedTemporaryFile() as f: + f.write(b'a' * 30) + f.flush() + + err = swiftclient.ClientException('Object GET failed', + http_status=304) + + def fake_get(*args, **kwargs): + kwargs['response_dict']['headers'] = {} + raise err + + mock_conn = mock.Mock() + mock_conn.get_object.side_effect = fake_get + type(mock_conn).attempts = mock.PropertyMock(return_value=2) + expected_r = { + 'action': 'download_object', + 'container': 'test_c', + 'object': 'test_o', + 'success': False, + 'error': err, + 'response_dict': {'headers': {}}, + 'path': 'test_o', + 'pseudodir': False, + 'attempts': 2, + } + + s = SwiftService() + r = s._download_object_job(conn=mock_conn, + container='test_c', + obj='test_o', + options={'out_file': f.name, + 'header': {}, + 'yes_all': False, + 'skip_identical': True}) + self._assertDictEqual(r, expected_r) + + self.assertEqual(mock_conn.get_object.call_count, 1) + mock_conn.get_object.assert_called_with( + 'test_c', + 'test_o', + resp_chunk_size=65536, + headers={'If-None-Match': md5(b'a' * 30).hexdigest()}, + query_string='multipart-manifest=get', + response_dict=expected_r['response_dict']) + + def test_download_object_job_skip_identical_dlo(self): + with tempfile.NamedTemporaryFile() as f: + f.write(b'a' * 30) + f.flush() + on_disk_md5 = md5(b'a' * 30).hexdigest() + segment_md5 = md5(b'a' * 10).hexdigest() + + mock_conn = mock.Mock() + mock_conn.get_object.return_value = ( + {'x-object-manifest': 'test_c_segments/test_o/prefix'}, [b'']) + mock_conn.get_container.side_effect = [ + (None, [{'name': 'test_o/prefix/1', + 'bytes': 10, 'hash': segment_md5}, + {'name': 'test_o/prefix/2', + 'bytes': 10, 'hash': segment_md5}]), + (None, [{'name': 'test_o/prefix/3', + 'bytes': 10, 'hash': segment_md5}]), + (None, [])] + + type(mock_conn).attempts = mock.PropertyMock(return_value=2) + expected_r = { + 'action': 'download_object', + 'container': 'test_c', + 'object': 'test_o', + 'success': False, + 'response_dict': {}, + 'path': 'test_o', + 'pseudodir': False, + 'attempts': 2, + } + + s = SwiftService() + with mock.patch('swiftclient.service.get_conn', + return_value=mock_conn): + r = s._download_object_job(conn=mock_conn, + container='test_c', + obj='test_o', + options={'out_file': f.name, + 'header': {}, + 'yes_all': False, + 'skip_identical': True}) + + err = r.pop('error') + self.assertEqual("Large object is identical", err.msg) + self.assertEqual(304, err.http_status) + + self._assertDictEqual(r, expected_r) + + self.assertEqual(mock_conn.get_object.call_count, 1) + mock_conn.get_object.assert_called_with( + 'test_c', + 'test_o', + resp_chunk_size=65536, + headers={'If-None-Match': on_disk_md5}, + query_string='multipart-manifest=get', + response_dict=expected_r['response_dict']) + self.assertEqual(mock_conn.get_container.mock_calls, [ + mock.call('test_c_segments', + delimiter=None, + prefix='test_o/prefix', + marker=''), + mock.call('test_c_segments', + delimiter=None, + prefix='test_o/prefix', + marker='test_o/prefix/2'), + mock.call('test_c_segments', + delimiter=None, + prefix='test_o/prefix', + marker='test_o/prefix/3')]) + + def test_download_object_job_skip_identical_nested_slo(self): + with tempfile.NamedTemporaryFile() as f: + f.write(b'a' * 30) + f.flush() + on_disk_md5 = md5(b'a' * 30).hexdigest() + + seg_etag = md5(b'a' * 10).hexdigest() + submanifest = "[%s]" % ",".join( + ['{"bytes":10,"hash":"%s"}' % seg_etag] * 2) + submanifest_etag = md5(seg_etag.encode('ascii') * 2).hexdigest() + manifest = "[%s]" % ",".join([ + '{"sub_slo":true,"name":"/test_c_segments/test_sub_slo",' + '"bytes":20,"hash":"%s"}' % submanifest_etag, + '{"bytes":10,"hash":"%s"}' % seg_etag]) + + mock_conn = mock.Mock() + mock_conn.get_object.side_effect = [ + ({'x-static-large-object': True, + 'content-length': 30, + 'etag': md5(submanifest_etag.encode('ascii') + + seg_etag.encode('ascii')).hexdigest()}, + [manifest.encode('ascii')]), + ({'x-static-large-object': True, + 'content-length': 20, + 'etag': submanifest_etag}, + submanifest.encode('ascii'))] + + type(mock_conn).attempts = mock.PropertyMock(return_value=2) + expected_r = { + 'action': 'download_object', + 'container': 'test_c', + 'object': 'test_o', + 'success': False, + 'response_dict': {}, + 'path': 'test_o', + 'pseudodir': False, + 'attempts': 2, + } + + s = SwiftService() + with mock.patch('swiftclient.service.get_conn', + return_value=mock_conn): + r = s._download_object_job(conn=mock_conn, + container='test_c', + obj='test_o', + options={'out_file': f.name, + 'header': {}, + 'yes_all': False, + 'skip_identical': True}) + + err = r.pop('error') + self.assertEqual("Large object is identical", err.msg) + self.assertEqual(304, err.http_status) + + self._assertDictEqual(r, expected_r) + self.assertEqual(mock_conn.get_object.mock_calls, [ + mock.call('test_c', + 'test_o', + resp_chunk_size=65536, + headers={'If-None-Match': on_disk_md5}, + query_string='multipart-manifest=get', + response_dict={}), + mock.call('test_c_segments', + 'test_sub_slo', + query_string='multipart-manifest=get')]) + + def test_download_object_job_skip_identical_diff_dlo(self): + with tempfile.NamedTemporaryFile() as f: + f.write(b'a' * 30) + f.write(b'b') + f.flush() + on_disk_md5 = md5(b'a' * 30 + b'b').hexdigest() + segment_md5 = md5(b'a' * 10).hexdigest() + + mock_conn = mock.Mock() + mock_conn.get_object.side_effect = [ + ({'x-object-manifest': 'test_c_segments/test_o/prefix'}, + [b'']), + ({'x-object-manifest': 'test_c_segments/test_o/prefix'}, + [b'a' * 30])] + mock_conn.get_container.side_effect = [ + (None, [{'name': 'test_o/prefix/1', + 'bytes': 10, 'hash': segment_md5}, + {'name': 'test_o/prefix/2', + 'bytes': 10, 'hash': segment_md5}]), + (None, [{'name': 'test_o/prefix/3', + 'bytes': 10, 'hash': segment_md5}]), + (None, [])] + + type(mock_conn).attempts = mock.PropertyMock(return_value=2) + type(mock_conn).auth_end_time = mock.PropertyMock(return_value=14) + expected_r = { + 'action': 'download_object', + 'container': 'test_c', + 'object': 'test_o', + 'success': True, + 'response_dict': {}, + 'path': 'test_o', + 'pseudodir': False, + 'read_length': 30, + 'attempts': 2, + 'start_time': 0, + 'headers_receipt': 1, + 'finish_time': 2, + 'auth_end_time': mock_conn.auth_end_time, + } + + s = SwiftService() + with mock.patch('swiftclient.service.time', side_effect=range(3)): + with mock.patch('swiftclient.service.get_conn', + return_value=mock_conn): + r = s._download_object_job( + conn=mock_conn, + container='test_c', + obj='test_o', + options={'out_file': f.name, + 'header': {}, + 'no_download': True, + 'yes_all': False, + 'skip_identical': True}) + + self._assertDictEqual(r, expected_r) + + self.assertEqual(mock_conn.get_container.mock_calls, [ + mock.call('test_c_segments', + delimiter=None, + prefix='test_o/prefix', + marker=''), + mock.call('test_c_segments', + delimiter=None, + prefix='test_o/prefix', + marker='test_o/prefix/2'), + mock.call('test_c_segments', + delimiter=None, + prefix='test_o/prefix', + marker='test_o/prefix/3')]) + self.assertEqual(mock_conn.get_object.mock_calls, [ + mock.call('test_c', + 'test_o', + resp_chunk_size=65536, + headers={'If-None-Match': on_disk_md5}, + query_string='multipart-manifest=get', + response_dict={}), + mock.call('test_c', + 'test_o', + resp_chunk_size=65536, + headers={'If-None-Match': on_disk_md5}, + response_dict={})]) + + def test_download_object_job_skip_identical_diff_nested_slo(self): + with tempfile.NamedTemporaryFile() as f: + f.write(b'a' * 29) + f.flush() + on_disk_md5 = md5(b'a' * 29).hexdigest() + + seg_etag = md5(b'a' * 10).hexdigest() + submanifest = "[%s]" % ",".join( + ['{"bytes":10,"hash":"%s"}' % seg_etag] * 2) + submanifest_etag = md5(seg_etag.encode('ascii') * 2).hexdigest() + manifest = "[%s]" % ",".join([ + '{"sub_slo":true,"name":"/test_c_segments/test_sub_slo",' + '"bytes":20,"hash":"%s"}' % submanifest_etag, + '{"bytes":10,"hash":"%s"}' % seg_etag]) + + mock_conn = mock.Mock() + mock_conn.get_object.side_effect = [ + ({'x-static-large-object': True, + 'content-length': 30, + 'etag': md5(submanifest_etag.encode('ascii') + + seg_etag.encode('ascii')).hexdigest()}, + [manifest.encode('ascii')]), + ({'x-static-large-object': True, + 'content-length': 20, + 'etag': submanifest_etag}, + submanifest.encode('ascii')), + ({'x-static-large-object': True, + 'content-length': 30, + 'etag': md5(submanifest_etag.encode('ascii') + + seg_etag.encode('ascii')).hexdigest()}, + [b'a' * 30])] + + type(mock_conn).attempts = mock.PropertyMock(return_value=2) + type(mock_conn).auth_end_time = mock.PropertyMock(return_value=14) + expected_r = { + 'action': 'download_object', + 'container': 'test_c', + 'object': 'test_o', + 'success': True, + 'response_dict': {}, + 'path': 'test_o', + 'pseudodir': False, + 'read_length': 30, + 'attempts': 2, + 'start_time': 0, + 'headers_receipt': 1, + 'finish_time': 2, + 'auth_end_time': mock_conn.auth_end_time, + } + + s = SwiftService() + with mock.patch('swiftclient.service.time', side_effect=range(3)): + with mock.patch('swiftclient.service.get_conn', + return_value=mock_conn): + r = s._download_object_job( + conn=mock_conn, + container='test_c', + obj='test_o', + options={'out_file': f.name, + 'header': {}, + 'no_download': True, + 'yes_all': False, + 'skip_identical': True}) + + self._assertDictEqual(r, expected_r) + self.assertEqual(mock_conn.get_object.mock_calls, [ + mock.call('test_c', + 'test_o', + resp_chunk_size=65536, + headers={'If-None-Match': on_disk_md5}, + query_string='multipart-manifest=get', + response_dict={}), + mock.call('test_c_segments', + 'test_sub_slo', + query_string='multipart-manifest=get'), + mock.call('test_c', + 'test_o', + resp_chunk_size=65536, + headers={'If-None-Match': on_disk_md5}, + response_dict={})]) |