diff options
author | Timur Alperovich <timuralp@swiftstack.com> | 2017-06-28 12:02:21 -0700 |
---|---|---|
committer | Tim Burke <tim.burke@gmail.com> | 2018-01-18 04:56:12 +0000 |
commit | 2faea932870956583f83226886d33304ee1eee46 (patch) | |
tree | 0d23a3d3389d506d5564613c296e0e92736f5cd6 /tests/unit/test_service.py | |
parent | a9b8f0a0d191873ac88b0c70166a2b889096fa69 (diff) | |
download | python-swiftclient-2faea932870956583f83226886d33304ee1eee46.tar.gz |
Allow for object uploads > 5GB from stdin.
When uploading from standard input, swiftclient should turn the upload
into an SLO in the case of large objects. This patch picks the
threshold as 10MB (and uses that as the default segment size). The
consumers can also supply the --segment-size option to alter that
threshold and the SLO segment size. The patch does buffer one segment
in memory (which is why 10MB default was chosen).
(test is updated)
Change-Id: Ib13e0b687bc85930c29fe9f151cf96bc53b2e594
Diffstat (limited to 'tests/unit/test_service.py')
-rw-r--r-- | tests/unit/test_service.py | 214 |
1 files changed, 214 insertions, 0 deletions
diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py index 5ccc081..12fbaa0 100644 --- a/tests/unit/test_service.py +++ b/tests/unit/test_service.py @@ -36,6 +36,8 @@ from swiftclient.service import ( SwiftService, SwiftError, SwiftUploadObject ) +from tests.unit import utils as test_utils + clean_os_environ = {} environ_prefixes = ('ST_', 'OS_') @@ -1088,6 +1090,83 @@ class TestService(unittest.TestCase): self.assertEqual(upload_obj_resp['path'], obj['path']) self.assertTrue(mock_open.return_value.closed) + @mock.patch('swiftclient.service.Connection') + def test_upload_stream(self, mock_conn): + service = SwiftService({}) + + stream = test_utils.FakeStream(2048) + segment_etag = md5(b'A' * 1024).hexdigest() + + mock_conn.return_value.head_object.side_effect = \ + ClientException('Not Found', http_status=404) + mock_conn.return_value.put_object.return_value = \ + segment_etag + options = {'use_slo': True, 'segment_size': 1024} + resp_iter = service.upload( + 'container', + [SwiftUploadObject(stream, object_name='streamed')], + options) + responses = [x for x in resp_iter] + for resp in responses: + self.assertFalse('error' in resp) + self.assertTrue(resp['success']) + self.assertEqual(5, len(responses)) + container_resp, segment_container_resp = responses[0:2] + segment_response = responses[2:4] + upload_obj_resp = responses[-1] + self.assertEqual(container_resp['action'], + 'create_container') + self.assertEqual(upload_obj_resp['action'], + 'upload_object') + self.assertEqual(upload_obj_resp['object'], + 'streamed') + self.assertTrue(upload_obj_resp['path'] is None) + self.assertTrue(upload_obj_resp['large_object']) + self.assertIn('manifest_response_dict', upload_obj_resp) + self.assertEqual(upload_obj_resp['manifest_response_dict'], {}) + for i, resp in enumerate(segment_response): + self.assertEqual(i, resp['segment_index']) + self.assertEqual(1024, resp['segment_size']) + self.assertEqual('d47b127bc2de2d687ddc82dac354c415', + resp['segment_etag']) + self.assertTrue(resp['segment_location'].endswith( + '/0000000%d' % i)) + self.assertTrue(resp['segment_location'].startswith( + '/container_segments/streamed')) + + @mock.patch('swiftclient.service.Connection') + def test_upload_stream_fits_in_one_segment(self, mock_conn): + service = SwiftService({}) + + stream = test_utils.FakeStream(2048) + whole_etag = md5(b'A' * 2048).hexdigest() + + mock_conn.return_value.head_object.side_effect = \ + ClientException('Not Found', http_status=404) + mock_conn.return_value.put_object.return_value = \ + whole_etag + options = {'use_slo': True, 'segment_size': 10240} + resp_iter = service.upload( + 'container', + [SwiftUploadObject(stream, object_name='streamed')], + options) + responses = [x for x in resp_iter] + for resp in responses: + self.assertNotIn('error', resp) + self.assertTrue(resp['success']) + self.assertEqual(3, len(responses)) + container_resp, segment_container_resp = responses[0:2] + upload_obj_resp = responses[-1] + self.assertEqual(container_resp['action'], + 'create_container') + self.assertEqual(upload_obj_resp['action'], + 'upload_object') + self.assertEqual(upload_obj_resp['object'], + 'streamed') + self.assertTrue(upload_obj_resp['path'] is None) + self.assertFalse(upload_obj_resp['large_object']) + self.assertNotIn('manifest_response_dict', upload_obj_resp) + class TestServiceUpload(_TestServiceBase): @@ -1226,6 +1305,141 @@ class TestServiceUpload(_TestServiceBase): self.assertIsInstance(contents, utils.LengthWrapper) self.assertEqual(len(contents), 10) + def test_upload_stream_segment(self): + common_params = { + 'segment_container': 'segments', + 'segment_name': 'test_stream_2', + 'container': 'test_stream', + 'object': 'stream_object', + } + tests = [ + {'test_params': { + 'segment_size': 1024, + 'segment_index': 2, + 'content_size': 1024}, + 'put_object_args': { + 'container': 'segments', + 'object': 'test_stream_2'}, + 'expected': { + 'complete': False, + 'segment_etag': md5(b'A' * 1024).hexdigest()}}, + {'test_params': { + 'segment_size': 2048, + 'segment_index': 0, + 'content_size': 512}, + 'put_object_args': { + 'container': 'test_stream', + 'object': 'stream_object'}, + 'expected': { + 'complete': True, + 'segment_etag': md5(b'A' * 512).hexdigest()}}, + # 0-sized segment should not be uploaded + {'test_params': { + 'segment_size': 1024, + 'segment_index': 1, + 'content_size': 0}, + 'put_object_args': {}, + 'expected': { + 'complete': True}}, + # 0-sized objects should be uploaded + {'test_params': { + 'segment_size': 1024, + 'segment_index': 0, + 'content_size': 0}, + 'put_object_args': { + 'container': 'test_stream', + 'object': 'stream_object'}, + 'expected': { + 'complete': True, + 'segment_etag': md5(b'').hexdigest()}}, + # Test boundary conditions + {'test_params': { + 'segment_size': 1024, + 'segment_index': 1, + 'content_size': 1023}, + 'put_object_args': { + 'container': 'segments', + 'object': 'test_stream_2'}, + 'expected': { + 'complete': True, + 'segment_etag': md5(b'A' * 1023).hexdigest()}}, + {'test_params': { + 'segment_size': 2048, + 'segment_index': 0, + 'content_size': 2047}, + 'put_object_args': { + 'container': 'test_stream', + 'object': 'stream_object'}, + 'expected': { + 'complete': True, + 'segment_etag': md5(b'A' * 2047).hexdigest()}}, + {'test_params': { + 'segment_size': 1024, + 'segment_index': 2, + 'content_size': 1025}, + 'put_object_args': { + 'container': 'segments', + 'object': 'test_stream_2'}, + 'expected': { + 'complete': False, + 'segment_etag': md5(b'A' * 1024).hexdigest()}}, + ] + + for test_args in tests: + params = test_args['test_params'] + stream = test_utils.FakeStream(params['content_size']) + segment_size = params['segment_size'] + segment_index = params['segment_index'] + + def _fake_put_object(*args, **kwargs): + contents = args[2] + # Consume and compute md5 + return md5(contents).hexdigest() + + mock_conn = mock.Mock() + mock_conn.put_object.side_effect = _fake_put_object + + s = SwiftService() + resp = s._upload_stream_segment( + conn=mock_conn, + container=common_params['container'], + object_name=common_params['object'], + segment_container=common_params['segment_container'], + segment_name=common_params['segment_name'], + segment_size=segment_size, + segment_index=segment_index, + headers={}, + fd=stream) + expected_args = test_args['expected'] + put_args = test_args['put_object_args'] + expected_response = { + 'segment_size': min(len(stream), segment_size), + 'complete': expected_args['complete'], + 'success': True, + } + if len(stream) or segment_index == 0: + segment_location = '/%s/%s' % (put_args['container'], + put_args['object']) + expected_response.update( + {'segment_index': segment_index, + 'segment_location': segment_location, + 'segment_etag': expected_args['segment_etag'], + 'for_object': common_params['object']}) + mock_conn.put_object.assert_called_once_with( + put_args['container'], + put_args['object'], + mock.ANY, + content_length=min(len(stream), segment_size), + headers={'etag': expected_args['segment_etag']}, + response_dict=mock.ANY) + else: + self.assertEqual([], mock_conn.put_object.mock_calls) + expected_response.update( + {'segment_index': None, + 'segment_location': None, + 'segment_etag': None}) + self.assertEqual(expected_response, resp) + def test_etag_mismatch_with_ignore_checksum(self): def _consuming_conn(*a, **kw): contents = a[2] |