diff options
author | Timur Alperovich <timuralp@swiftstack.com> | 2017-06-28 12:02:21 -0700 |
---|---|---|
committer | Tim Burke <tim.burke@gmail.com> | 2018-01-18 04:56:12 +0000 |
commit | 2faea932870956583f83226886d33304ee1eee46 (patch) | |
tree | 0d23a3d3389d506d5564613c296e0e92736f5cd6 | |
parent | a9b8f0a0d191873ac88b0c70166a2b889096fa69 (diff) | |
download | python-swiftclient-2faea932870956583f83226886d33304ee1eee46.tar.gz |
Allow for object uploads > 5GB from stdin.
When uploading from standard input, swiftclient should turn the upload
into an SLO in the case of large objects. This patch picks the
threshold as 10MB (and uses that as the default segment size). The
consumers can also supply the --segment-size option to alter that
threshold and the SLO segment size. The patch does buffer one segment
in memory (which is why 10MB default was chosen).
(test is updated)
Change-Id: Ib13e0b687bc85930c29fe9f151cf96bc53b2e594
-rw-r--r-- | swiftclient/service.py | 244 | ||||
-rwxr-xr-x | swiftclient/shell.py | 8 | ||||
-rw-r--r-- | tests/unit/test_service.py | 214 | ||||
-rw-r--r-- | tests/unit/utils.py | 21 |
4 files changed, 462 insertions, 25 deletions
diff --git a/swiftclient/service.py b/swiftclient/service.py index 7b5ecd4..ed5e9e9 100644 --- a/swiftclient/service.py +++ b/swiftclient/service.py @@ -1502,7 +1502,8 @@ class SwiftService(object): if hasattr(s, 'read'): # We've got a file like object to upload to o file_future = self.thread_manager.object_uu_pool.submit( - self._upload_object_job, container, s, o, object_options + self._upload_object_job, container, s, o, object_options, + results_queue=rq ) details['file'] = s details['object'] = o @@ -1784,6 +1785,132 @@ class SwiftService(object): if fp is not None: fp.close() + @staticmethod + def _put_object(conn, container, name, content, headers=None, md5=None): + """ + Upload object into a given container and verify the resulting ETag, if + the md5 optional parameter is passed. + + :param conn: The Swift connection to use for uploads. + :param container: The container to put the object into. + :param name: The name of the object. + :param content: Object content. + :param headers: Headers (optional) to associate with the object. + :param md5: MD5 sum of the content. If passed in, will be used to + verify the returned ETag. + + :returns: A dictionary as the response from calling put_object. + The keys are: + - status + - reason + - headers + On error, the dictionary contains the following keys: + - success (with value False) + - error - the encountered exception (object) + - error_timestamp + - response_dict - results from the put_object call, as + documented above + - attempts - number of attempts made + """ + if headers is None: + headers = {} + else: + headers = dict(headers) + if md5 is not None: + headers['etag'] = md5 + results = {} + try: + etag = conn.put_object( + container, name, content, content_length=len(content), + headers=headers, response_dict=results) + if md5 is not None and etag != md5: + raise SwiftError('Upload verification failed for {0}: md5 ' + 'mismatch {1} != {2}'.format(name, md5, etag)) + results['success'] = True + except Exception as err: + traceback, err_time = report_traceback() + logger.exception(err) + return { + 'success': False, + 'error': err, + 'error_timestamp': err_time, + 'response_dict': results, + 'attempts': conn.attempts, + 'traceback': traceback + } + return results + + @staticmethod + def _upload_stream_segment(conn, container, object_name, + segment_container, segment_name, + segment_size, segment_index, + headers, fd): + """ + Upload a segment from a stream, buffering it in memory first. The + resulting object is placed either as a segment in the segment + container, or if it is smaller than a single segment, as the given + object name. + + :param conn: Swift Connection to use. + :param container: Container in which the object would be placed. + :param object_name: Name of the final object (used in case the stream + is smaller than the segment_size) + :param segment_container: Container to hold the object segments. + :param segment_name: The name of the segment. + :param segment_size: Minimum segment size. + :param segment_index: The segment index. + :param headers: Headers to attach to the segment/object. + :param fd: File-like handle for the content. Must implement read(). + + :returns: Dictionary, containing the following keys: + - complete -- whether the stream is exhausted + - segment_size - the actual size of the segment (may be + smaller than the passed in segment_size) + - segment_location - path to the segment + - segment_index - index of the segment + - segment_etag - the ETag for the segment + """ + buf = [] + dgst = md5() + bytes_read = 0 + while bytes_read < segment_size: + data = fd.read(segment_size - bytes_read) + if not data: + break + bytes_read += len(data) + dgst.update(data) + buf.append(data) + buf = b''.join(buf) + segment_hash = dgst.hexdigest() + + if not buf and segment_index > 0: + # Happens if the segment size aligns with the object size + return {'complete': True, + 'segment_size': 0, + 'segment_index': None, + 'segment_etag': None, + 'segment_location': None, + 'success': True} + + if segment_index == 0 and len(buf) < segment_size: + ret = SwiftService._put_object( + conn, container, object_name, buf, headers, segment_hash) + ret['segment_location'] = '/%s/%s' % (container, object_name) + else: + ret = SwiftService._put_object( + conn, segment_container, segment_name, buf, headers, + segment_hash) + ret['segment_location'] = '/%s/%s' % ( + segment_container, segment_name) + + ret.update( + dict(complete=len(buf) < segment_size, + segment_size=len(buf), + segment_index=segment_index, + segment_etag=segment_hash, + for_object=object_name)) + return ret + def _get_chunk_data(self, conn, container, obj, headers, manifest=None): chunks = [] if 'x-object-manifest' in headers: @@ -1833,6 +1960,47 @@ class SwiftService(object): # Each chunk is verified; check that we're at the end of the file return not fp.read(1) + @staticmethod + def _upload_slo_manifest(conn, segment_results, container, obj, headers): + """ + Upload an SLO manifest, given the results of uploading each segment, to + the specified container. + + :param segment_results: List of response_dict structures, as populated + by _upload_segment_job. Specifically, each + entry must container the following keys: + - segment_location + - segment_etag + - segment_size + - segment_index + :param container: The container to put the manifest into. + :param obj: The name of the manifest object to use. + :param headers: Optional set of headers to attach to the manifest. + """ + if headers is None: + headers = {} + segment_results.sort(key=lambda di: di['segment_index']) + for seg in segment_results: + seg_loc = seg['segment_location'].lstrip('/') + if isinstance(seg_loc, text_type): + seg_loc = seg_loc.encode('utf-8') + + manifest_data = json.dumps([ + { + 'path': d['segment_location'], + 'etag': d['segment_etag'], + 'size_bytes': d['segment_size'] + } for d in segment_results + ]) + + response = {} + conn.put_object( + container, obj, manifest_data, + headers=headers, + query_string='multipart-manifest=put', + response_dict=response) + return response + def _upload_object_job(self, conn, container, source, obj, options, results_queue=None): if obj.startswith('./') or obj.startswith('.\\'): @@ -1990,29 +2158,11 @@ class SwiftService(object): res['segment_results'] = segment_results if options['use_slo']: - segment_results.sort(key=lambda di: di['segment_index']) - for seg in segment_results: - seg_loc = seg['segment_location'].lstrip('/') - if isinstance(seg_loc, text_type): - seg_loc = seg_loc.encode('utf-8') - new_slo_manifest_paths.add(seg_loc) - - manifest_data = json.dumps([ - { - 'path': d['segment_location'], - 'etag': d['segment_etag'], - 'size_bytes': d['segment_size'] - } for d in segment_results - ]) - - mr = {} - conn.put_object( - container, obj, manifest_data, - headers=put_headers, - query_string='multipart-manifest=put', - response_dict=mr - ) - res['manifest_response_dict'] = mr + response = self._upload_slo_manifest( + conn, segment_results, container, obj, put_headers) + res['manifest_response_dict'] = response + new_slo_manifest_paths = { + seg['segment_location'] for seg in segment_results} else: new_object_manifest = '%s/%s/%s/%s/%s/' % ( quote(seg_container.encode('utf8')), @@ -2030,6 +2180,51 @@ class SwiftService(object): response_dict=mr ) res['manifest_response_dict'] = mr + elif options['use_slo'] and segment_size and not path: + segment = 0 + results = [] + while True: + segment_name = '%s/slo/%s/%s/%08d' % ( + obj, put_headers['x-object-meta-mtime'], + segment_size, segment + ) + seg_container = container + '_segments' + if options['segment_container']: + seg_container = options['segment_container'] + ret = self._upload_stream_segment( + conn, container, obj, + seg_container, + segment_name, + segment_size, + segment, + put_headers, + stream + ) + if not ret['success']: + return ret + if (ret['complete'] and segment == 0) or\ + ret['segment_size'] > 0: + results.append(ret) + if results_queue is not None: + # Don't insert the 0-sized segments or objects + # themselves + if ret['segment_location'] != '/%s/%s' % ( + container, obj) and ret['segment_size'] > 0: + results_queue.put(ret) + if ret['complete']: + break + segment += 1 + if results[0]['segment_location'] != '/%s/%s' % ( + container, obj): + response = self._upload_slo_manifest( + conn, results, container, obj, put_headers) + res['manifest_response_dict'] = response + new_slo_manifest_paths = { + r['segment_location'] for r in results} + res['large_object'] = True + else: + res['response_dict'] = ret + res['large_object'] = False else: res['large_object'] = False obr = {} @@ -2063,7 +2258,6 @@ class SwiftService(object): finally: if fp is not None: fp.close() - if old_manifest or old_slo_manifest_paths: drs = [] delobjsmap = {} diff --git a/swiftclient/shell.py b/swiftclient/shell.py index 43fcf47..d02c709 100755 --- a/swiftclient/shell.py +++ b/swiftclient/shell.py @@ -947,6 +947,8 @@ Optional arguments: def st_upload(parser, args, output_manager): + DEFAULT_STDIN_SEGMENT = 10 * 1024 * 1024 + parser.add_argument( '-c', '--changed', action='store_true', dest='changed', default=False, help='Only upload files that have changed since ' @@ -1060,6 +1062,12 @@ def st_upload(parser, args, output_manager): st_upload_help) return + if from_stdin: + if not options['use_slo']: + options['use_slo'] = True + if not options['segment_size']: + options['segment_size'] = DEFAULT_STDIN_SEGMENT + options['object_uu_threads'] = options['object_threads'] with SwiftService(options=options) as swift: try: diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py index 5ccc081..12fbaa0 100644 --- a/tests/unit/test_service.py +++ b/tests/unit/test_service.py @@ -36,6 +36,8 @@ from swiftclient.service import ( SwiftService, SwiftError, SwiftUploadObject ) +from tests.unit import utils as test_utils + clean_os_environ = {} environ_prefixes = ('ST_', 'OS_') @@ -1088,6 +1090,83 @@ class TestService(unittest.TestCase): self.assertEqual(upload_obj_resp['path'], obj['path']) self.assertTrue(mock_open.return_value.closed) + @mock.patch('swiftclient.service.Connection') + def test_upload_stream(self, mock_conn): + service = SwiftService({}) + + stream = test_utils.FakeStream(2048) + segment_etag = md5(b'A' * 1024).hexdigest() + + mock_conn.return_value.head_object.side_effect = \ + ClientException('Not Found', http_status=404) + mock_conn.return_value.put_object.return_value = \ + segment_etag + options = {'use_slo': True, 'segment_size': 1024} + resp_iter = service.upload( + 'container', + [SwiftUploadObject(stream, object_name='streamed')], + options) + responses = [x for x in resp_iter] + for resp in responses: + self.assertFalse('error' in resp) + self.assertTrue(resp['success']) + self.assertEqual(5, len(responses)) + container_resp, segment_container_resp = responses[0:2] + segment_response = responses[2:4] + upload_obj_resp = responses[-1] + self.assertEqual(container_resp['action'], + 'create_container') + self.assertEqual(upload_obj_resp['action'], + 'upload_object') + self.assertEqual(upload_obj_resp['object'], + 'streamed') + self.assertTrue(upload_obj_resp['path'] is None) + self.assertTrue(upload_obj_resp['large_object']) + self.assertIn('manifest_response_dict', upload_obj_resp) + self.assertEqual(upload_obj_resp['manifest_response_dict'], {}) + for i, resp in enumerate(segment_response): + self.assertEqual(i, resp['segment_index']) + self.assertEqual(1024, resp['segment_size']) + self.assertEqual('d47b127bc2de2d687ddc82dac354c415', + resp['segment_etag']) + self.assertTrue(resp['segment_location'].endswith( + '/0000000%d' % i)) + self.assertTrue(resp['segment_location'].startswith( + '/container_segments/streamed')) + + @mock.patch('swiftclient.service.Connection') + def test_upload_stream_fits_in_one_segment(self, mock_conn): + service = SwiftService({}) + + stream = test_utils.FakeStream(2048) + whole_etag = md5(b'A' * 2048).hexdigest() + + mock_conn.return_value.head_object.side_effect = \ + ClientException('Not Found', http_status=404) + mock_conn.return_value.put_object.return_value = \ + whole_etag + options = {'use_slo': True, 'segment_size': 10240} + resp_iter = service.upload( + 'container', + [SwiftUploadObject(stream, object_name='streamed')], + options) + responses = [x for x in resp_iter] + for resp in responses: + self.assertNotIn('error', resp) + self.assertTrue(resp['success']) + self.assertEqual(3, len(responses)) + container_resp, segment_container_resp = responses[0:2] + upload_obj_resp = responses[-1] + self.assertEqual(container_resp['action'], + 'create_container') + self.assertEqual(upload_obj_resp['action'], + 'upload_object') + self.assertEqual(upload_obj_resp['object'], + 'streamed') + self.assertTrue(upload_obj_resp['path'] is None) + self.assertFalse(upload_obj_resp['large_object']) + self.assertNotIn('manifest_response_dict', upload_obj_resp) + class TestServiceUpload(_TestServiceBase): @@ -1226,6 +1305,141 @@ class TestServiceUpload(_TestServiceBase): self.assertIsInstance(contents, utils.LengthWrapper) self.assertEqual(len(contents), 10) + def test_upload_stream_segment(self): + common_params = { + 'segment_container': 'segments', + 'segment_name': 'test_stream_2', + 'container': 'test_stream', + 'object': 'stream_object', + } + tests = [ + {'test_params': { + 'segment_size': 1024, + 'segment_index': 2, + 'content_size': 1024}, + 'put_object_args': { + 'container': 'segments', + 'object': 'test_stream_2'}, + 'expected': { + 'complete': False, + 'segment_etag': md5(b'A' * 1024).hexdigest()}}, + {'test_params': { + 'segment_size': 2048, + 'segment_index': 0, + 'content_size': 512}, + 'put_object_args': { + 'container': 'test_stream', + 'object': 'stream_object'}, + 'expected': { + 'complete': True, + 'segment_etag': md5(b'A' * 512).hexdigest()}}, + # 0-sized segment should not be uploaded + {'test_params': { + 'segment_size': 1024, + 'segment_index': 1, + 'content_size': 0}, + 'put_object_args': {}, + 'expected': { + 'complete': True}}, + # 0-sized objects should be uploaded + {'test_params': { + 'segment_size': 1024, + 'segment_index': 0, + 'content_size': 0}, + 'put_object_args': { + 'container': 'test_stream', + 'object': 'stream_object'}, + 'expected': { + 'complete': True, + 'segment_etag': md5(b'').hexdigest()}}, + # Test boundary conditions + {'test_params': { + 'segment_size': 1024, + 'segment_index': 1, + 'content_size': 1023}, + 'put_object_args': { + 'container': 'segments', + 'object': 'test_stream_2'}, + 'expected': { + 'complete': True, + 'segment_etag': md5(b'A' * 1023).hexdigest()}}, + {'test_params': { + 'segment_size': 2048, + 'segment_index': 0, + 'content_size': 2047}, + 'put_object_args': { + 'container': 'test_stream', + 'object': 'stream_object'}, + 'expected': { + 'complete': True, + 'segment_etag': md5(b'A' * 2047).hexdigest()}}, + {'test_params': { + 'segment_size': 1024, + 'segment_index': 2, + 'content_size': 1025}, + 'put_object_args': { + 'container': 'segments', + 'object': 'test_stream_2'}, + 'expected': { + 'complete': False, + 'segment_etag': md5(b'A' * 1024).hexdigest()}}, + ] + + for test_args in tests: + params = test_args['test_params'] + stream = test_utils.FakeStream(params['content_size']) + segment_size = params['segment_size'] + segment_index = params['segment_index'] + + def _fake_put_object(*args, **kwargs): + contents = args[2] + # Consume and compute md5 + return md5(contents).hexdigest() + + mock_conn = mock.Mock() + mock_conn.put_object.side_effect = _fake_put_object + + s = SwiftService() + resp = s._upload_stream_segment( + conn=mock_conn, + container=common_params['container'], + object_name=common_params['object'], + segment_container=common_params['segment_container'], + segment_name=common_params['segment_name'], + segment_size=segment_size, + segment_index=segment_index, + headers={}, + fd=stream) + expected_args = test_args['expected'] + put_args = test_args['put_object_args'] + expected_response = { + 'segment_size': min(len(stream), segment_size), + 'complete': expected_args['complete'], + 'success': True, + } + if len(stream) or segment_index == 0: + segment_location = '/%s/%s' % (put_args['container'], + put_args['object']) + expected_response.update( + {'segment_index': segment_index, + 'segment_location': segment_location, + 'segment_etag': expected_args['segment_etag'], + 'for_object': common_params['object']}) + mock_conn.put_object.assert_called_once_with( + put_args['container'], + put_args['object'], + mock.ANY, + content_length=min(len(stream), segment_size), + headers={'etag': expected_args['segment_etag']}, + response_dict=mock.ANY) + else: + self.assertEqual([], mock_conn.put_object.mock_calls) + expected_response.update( + {'segment_index': None, + 'segment_location': None, + 'segment_etag': None}) + self.assertEqual(expected_response, resp) + def test_etag_mismatch_with_ignore_checksum(self): def _consuming_conn(*a, **kw): contents = a[2] diff --git a/tests/unit/utils.py b/tests/unit/utils.py index c05146e..2def73f 100644 --- a/tests/unit/utils.py +++ b/tests/unit/utils.py @@ -548,3 +548,24 @@ def _make_fake_import_keystone_client(fake_import): return fake_import, fake_import return _fake_import_keystone_client + + +class FakeStream(object): + def __init__(self, size): + self.bytes_read = 0 + self.size = size + + def read(self, size=-1): + if self.bytes_read == self.size: + return b'' + + if size == -1 or size + self.bytes_read > self.size: + remaining = self.size - self.bytes_read + self.bytes_read = self.size + return b'A' * remaining + + self.bytes_read += size + return b'A' * size + + def __len__(self): + return self.size |