summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--swiftclient/service.py244
-rwxr-xr-xswiftclient/shell.py8
-rw-r--r--tests/unit/test_service.py214
-rw-r--r--tests/unit/utils.py21
4 files changed, 462 insertions, 25 deletions
diff --git a/swiftclient/service.py b/swiftclient/service.py
index 7b5ecd4..ed5e9e9 100644
--- a/swiftclient/service.py
+++ b/swiftclient/service.py
@@ -1502,7 +1502,8 @@ class SwiftService(object):
if hasattr(s, 'read'):
# We've got a file like object to upload to o
file_future = self.thread_manager.object_uu_pool.submit(
- self._upload_object_job, container, s, o, object_options
+ self._upload_object_job, container, s, o, object_options,
+ results_queue=rq
)
details['file'] = s
details['object'] = o
@@ -1784,6 +1785,132 @@ class SwiftService(object):
if fp is not None:
fp.close()
+ @staticmethod
+ def _put_object(conn, container, name, content, headers=None, md5=None):
+ """
+ Upload object into a given container and verify the resulting ETag, if
+ the md5 optional parameter is passed.
+
+ :param conn: The Swift connection to use for uploads.
+ :param container: The container to put the object into.
+ :param name: The name of the object.
+ :param content: Object content.
+ :param headers: Headers (optional) to associate with the object.
+ :param md5: MD5 sum of the content. If passed in, will be used to
+ verify the returned ETag.
+
+ :returns: A dictionary as the response from calling put_object.
+ The keys are:
+ - status
+ - reason
+ - headers
+ On error, the dictionary contains the following keys:
+ - success (with value False)
+ - error - the encountered exception (object)
+ - error_timestamp
+ - response_dict - results from the put_object call, as
+ documented above
+ - attempts - number of attempts made
+ """
+ if headers is None:
+ headers = {}
+ else:
+ headers = dict(headers)
+ if md5 is not None:
+ headers['etag'] = md5
+ results = {}
+ try:
+ etag = conn.put_object(
+ container, name, content, content_length=len(content),
+ headers=headers, response_dict=results)
+ if md5 is not None and etag != md5:
+ raise SwiftError('Upload verification failed for {0}: md5 '
+ 'mismatch {1} != {2}'.format(name, md5, etag))
+ results['success'] = True
+ except Exception as err:
+ traceback, err_time = report_traceback()
+ logger.exception(err)
+ return {
+ 'success': False,
+ 'error': err,
+ 'error_timestamp': err_time,
+ 'response_dict': results,
+ 'attempts': conn.attempts,
+ 'traceback': traceback
+ }
+ return results
+
+ @staticmethod
+ def _upload_stream_segment(conn, container, object_name,
+ segment_container, segment_name,
+ segment_size, segment_index,
+ headers, fd):
+ """
+ Upload a segment from a stream, buffering it in memory first. The
+ resulting object is placed either as a segment in the segment
+ container, or if it is smaller than a single segment, as the given
+ object name.
+
+ :param conn: Swift Connection to use.
+ :param container: Container in which the object would be placed.
+ :param object_name: Name of the final object (used in case the stream
+ is smaller than the segment_size)
+ :param segment_container: Container to hold the object segments.
+ :param segment_name: The name of the segment.
+ :param segment_size: Minimum segment size.
+ :param segment_index: The segment index.
+ :param headers: Headers to attach to the segment/object.
+ :param fd: File-like handle for the content. Must implement read().
+
+ :returns: Dictionary, containing the following keys:
+ - complete -- whether the stream is exhausted
+ - segment_size - the actual size of the segment (may be
+ smaller than the passed in segment_size)
+ - segment_location - path to the segment
+ - segment_index - index of the segment
+ - segment_etag - the ETag for the segment
+ """
+ buf = []
+ dgst = md5()
+ bytes_read = 0
+ while bytes_read < segment_size:
+ data = fd.read(segment_size - bytes_read)
+ if not data:
+ break
+ bytes_read += len(data)
+ dgst.update(data)
+ buf.append(data)
+ buf = b''.join(buf)
+ segment_hash = dgst.hexdigest()
+
+ if not buf and segment_index > 0:
+ # Happens if the segment size aligns with the object size
+ return {'complete': True,
+ 'segment_size': 0,
+ 'segment_index': None,
+ 'segment_etag': None,
+ 'segment_location': None,
+ 'success': True}
+
+ if segment_index == 0 and len(buf) < segment_size:
+ ret = SwiftService._put_object(
+ conn, container, object_name, buf, headers, segment_hash)
+ ret['segment_location'] = '/%s/%s' % (container, object_name)
+ else:
+ ret = SwiftService._put_object(
+ conn, segment_container, segment_name, buf, headers,
+ segment_hash)
+ ret['segment_location'] = '/%s/%s' % (
+ segment_container, segment_name)
+
+ ret.update(
+ dict(complete=len(buf) < segment_size,
+ segment_size=len(buf),
+ segment_index=segment_index,
+ segment_etag=segment_hash,
+ for_object=object_name))
+ return ret
+
def _get_chunk_data(self, conn, container, obj, headers, manifest=None):
chunks = []
if 'x-object-manifest' in headers:
@@ -1833,6 +1960,47 @@ class SwiftService(object):
# Each chunk is verified; check that we're at the end of the file
return not fp.read(1)
+ @staticmethod
+ def _upload_slo_manifest(conn, segment_results, container, obj, headers):
+ """
+ Upload an SLO manifest, given the results of uploading each segment, to
+ the specified container.
+
+ :param segment_results: List of response_dict structures, as populated
+ by _upload_segment_job. Specifically, each
+ entry must container the following keys:
+ - segment_location
+ - segment_etag
+ - segment_size
+ - segment_index
+ :param container: The container to put the manifest into.
+ :param obj: The name of the manifest object to use.
+ :param headers: Optional set of headers to attach to the manifest.
+ """
+ if headers is None:
+ headers = {}
+ segment_results.sort(key=lambda di: di['segment_index'])
+ for seg in segment_results:
+ seg_loc = seg['segment_location'].lstrip('/')
+ if isinstance(seg_loc, text_type):
+ seg_loc = seg_loc.encode('utf-8')
+
+ manifest_data = json.dumps([
+ {
+ 'path': d['segment_location'],
+ 'etag': d['segment_etag'],
+ 'size_bytes': d['segment_size']
+ } for d in segment_results
+ ])
+
+ response = {}
+ conn.put_object(
+ container, obj, manifest_data,
+ headers=headers,
+ query_string='multipart-manifest=put',
+ response_dict=response)
+ return response
+
def _upload_object_job(self, conn, container, source, obj, options,
results_queue=None):
if obj.startswith('./') or obj.startswith('.\\'):
@@ -1990,29 +2158,11 @@ class SwiftService(object):
res['segment_results'] = segment_results
if options['use_slo']:
- segment_results.sort(key=lambda di: di['segment_index'])
- for seg in segment_results:
- seg_loc = seg['segment_location'].lstrip('/')
- if isinstance(seg_loc, text_type):
- seg_loc = seg_loc.encode('utf-8')
- new_slo_manifest_paths.add(seg_loc)
-
- manifest_data = json.dumps([
- {
- 'path': d['segment_location'],
- 'etag': d['segment_etag'],
- 'size_bytes': d['segment_size']
- } for d in segment_results
- ])
-
- mr = {}
- conn.put_object(
- container, obj, manifest_data,
- headers=put_headers,
- query_string='multipart-manifest=put',
- response_dict=mr
- )
- res['manifest_response_dict'] = mr
+ response = self._upload_slo_manifest(
+ conn, segment_results, container, obj, put_headers)
+ res['manifest_response_dict'] = response
+ new_slo_manifest_paths = {
+ seg['segment_location'] for seg in segment_results}
else:
new_object_manifest = '%s/%s/%s/%s/%s/' % (
quote(seg_container.encode('utf8')),
@@ -2030,6 +2180,51 @@ class SwiftService(object):
response_dict=mr
)
res['manifest_response_dict'] = mr
+ elif options['use_slo'] and segment_size and not path:
+ segment = 0
+ results = []
+ while True:
+ segment_name = '%s/slo/%s/%s/%08d' % (
+ obj, put_headers['x-object-meta-mtime'],
+ segment_size, segment
+ )
+ seg_container = container + '_segments'
+ if options['segment_container']:
+ seg_container = options['segment_container']
+ ret = self._upload_stream_segment(
+ conn, container, obj,
+ seg_container,
+ segment_name,
+ segment_size,
+ segment,
+ put_headers,
+ stream
+ )
+ if not ret['success']:
+ return ret
+ if (ret['complete'] and segment == 0) or\
+ ret['segment_size'] > 0:
+ results.append(ret)
+ if results_queue is not None:
+ # Don't insert the 0-sized segments or objects
+ # themselves
+ if ret['segment_location'] != '/%s/%s' % (
+ container, obj) and ret['segment_size'] > 0:
+ results_queue.put(ret)
+ if ret['complete']:
+ break
+ segment += 1
+ if results[0]['segment_location'] != '/%s/%s' % (
+ container, obj):
+ response = self._upload_slo_manifest(
+ conn, results, container, obj, put_headers)
+ res['manifest_response_dict'] = response
+ new_slo_manifest_paths = {
+ r['segment_location'] for r in results}
+ res['large_object'] = True
+ else:
+ res['response_dict'] = ret
+ res['large_object'] = False
else:
res['large_object'] = False
obr = {}
@@ -2063,7 +2258,6 @@ class SwiftService(object):
finally:
if fp is not None:
fp.close()
-
if old_manifest or old_slo_manifest_paths:
drs = []
delobjsmap = {}
diff --git a/swiftclient/shell.py b/swiftclient/shell.py
index 43fcf47..d02c709 100755
--- a/swiftclient/shell.py
+++ b/swiftclient/shell.py
@@ -947,6 +947,8 @@ Optional arguments:
def st_upload(parser, args, output_manager):
+ DEFAULT_STDIN_SEGMENT = 10 * 1024 * 1024
+
parser.add_argument(
'-c', '--changed', action='store_true', dest='changed',
default=False, help='Only upload files that have changed since '
@@ -1060,6 +1062,12 @@ def st_upload(parser, args, output_manager):
st_upload_help)
return
+ if from_stdin:
+ if not options['use_slo']:
+ options['use_slo'] = True
+ if not options['segment_size']:
+ options['segment_size'] = DEFAULT_STDIN_SEGMENT
+
options['object_uu_threads'] = options['object_threads']
with SwiftService(options=options) as swift:
try:
diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py
index 5ccc081..12fbaa0 100644
--- a/tests/unit/test_service.py
+++ b/tests/unit/test_service.py
@@ -36,6 +36,8 @@ from swiftclient.service import (
SwiftService, SwiftError, SwiftUploadObject
)
+from tests.unit import utils as test_utils
+
clean_os_environ = {}
environ_prefixes = ('ST_', 'OS_')
@@ -1088,6 +1090,83 @@ class TestService(unittest.TestCase):
self.assertEqual(upload_obj_resp['path'], obj['path'])
self.assertTrue(mock_open.return_value.closed)
+ @mock.patch('swiftclient.service.Connection')
+ def test_upload_stream(self, mock_conn):
+ service = SwiftService({})
+
+ stream = test_utils.FakeStream(2048)
+ segment_etag = md5(b'A' * 1024).hexdigest()
+
+ mock_conn.return_value.head_object.side_effect = \
+ ClientException('Not Found', http_status=404)
+ mock_conn.return_value.put_object.return_value = \
+ segment_etag
+ options = {'use_slo': True, 'segment_size': 1024}
+ resp_iter = service.upload(
+ 'container',
+ [SwiftUploadObject(stream, object_name='streamed')],
+ options)
+ responses = [x for x in resp_iter]
+ for resp in responses:
+ self.assertFalse('error' in resp)
+ self.assertTrue(resp['success'])
+ self.assertEqual(5, len(responses))
+ container_resp, segment_container_resp = responses[0:2]
+ segment_response = responses[2:4]
+ upload_obj_resp = responses[-1]
+ self.assertEqual(container_resp['action'],
+ 'create_container')
+ self.assertEqual(upload_obj_resp['action'],
+ 'upload_object')
+ self.assertEqual(upload_obj_resp['object'],
+ 'streamed')
+ self.assertTrue(upload_obj_resp['path'] is None)
+ self.assertTrue(upload_obj_resp['large_object'])
+ self.assertIn('manifest_response_dict', upload_obj_resp)
+ self.assertEqual(upload_obj_resp['manifest_response_dict'], {})
+ for i, resp in enumerate(segment_response):
+ self.assertEqual(i, resp['segment_index'])
+ self.assertEqual(1024, resp['segment_size'])
+ self.assertEqual('d47b127bc2de2d687ddc82dac354c415',
+ resp['segment_etag'])
+ self.assertTrue(resp['segment_location'].endswith(
+ '/0000000%d' % i))
+ self.assertTrue(resp['segment_location'].startswith(
+ '/container_segments/streamed'))
+
+ @mock.patch('swiftclient.service.Connection')
+ def test_upload_stream_fits_in_one_segment(self, mock_conn):
+ service = SwiftService({})
+
+ stream = test_utils.FakeStream(2048)
+ whole_etag = md5(b'A' * 2048).hexdigest()
+
+ mock_conn.return_value.head_object.side_effect = \
+ ClientException('Not Found', http_status=404)
+ mock_conn.return_value.put_object.return_value = \
+ whole_etag
+ options = {'use_slo': True, 'segment_size': 10240}
+ resp_iter = service.upload(
+ 'container',
+ [SwiftUploadObject(stream, object_name='streamed')],
+ options)
+ responses = [x for x in resp_iter]
+ for resp in responses:
+ self.assertNotIn('error', resp)
+ self.assertTrue(resp['success'])
+ self.assertEqual(3, len(responses))
+ container_resp, segment_container_resp = responses[0:2]
+ upload_obj_resp = responses[-1]
+ self.assertEqual(container_resp['action'],
+ 'create_container')
+ self.assertEqual(upload_obj_resp['action'],
+ 'upload_object')
+ self.assertEqual(upload_obj_resp['object'],
+ 'streamed')
+ self.assertTrue(upload_obj_resp['path'] is None)
+ self.assertFalse(upload_obj_resp['large_object'])
+ self.assertNotIn('manifest_response_dict', upload_obj_resp)
+
class TestServiceUpload(_TestServiceBase):
@@ -1226,6 +1305,141 @@ class TestServiceUpload(_TestServiceBase):
self.assertIsInstance(contents, utils.LengthWrapper)
self.assertEqual(len(contents), 10)
+ def test_upload_stream_segment(self):
+ common_params = {
+ 'segment_container': 'segments',
+ 'segment_name': 'test_stream_2',
+ 'container': 'test_stream',
+ 'object': 'stream_object',
+ }
+ tests = [
+ {'test_params': {
+ 'segment_size': 1024,
+ 'segment_index': 2,
+ 'content_size': 1024},
+ 'put_object_args': {
+ 'container': 'segments',
+ 'object': 'test_stream_2'},
+ 'expected': {
+ 'complete': False,
+ 'segment_etag': md5(b'A' * 1024).hexdigest()}},
+ {'test_params': {
+ 'segment_size': 2048,
+ 'segment_index': 0,
+ 'content_size': 512},
+ 'put_object_args': {
+ 'container': 'test_stream',
+ 'object': 'stream_object'},
+ 'expected': {
+ 'complete': True,
+ 'segment_etag': md5(b'A' * 512).hexdigest()}},
+ # 0-sized segment should not be uploaded
+ {'test_params': {
+ 'segment_size': 1024,
+ 'segment_index': 1,
+ 'content_size': 0},
+ 'put_object_args': {},
+ 'expected': {
+ 'complete': True}},
+ # 0-sized objects should be uploaded
+ {'test_params': {
+ 'segment_size': 1024,
+ 'segment_index': 0,
+ 'content_size': 0},
+ 'put_object_args': {
+ 'container': 'test_stream',
+ 'object': 'stream_object'},
+ 'expected': {
+ 'complete': True,
+ 'segment_etag': md5(b'').hexdigest()}},
+ # Test boundary conditions
+ {'test_params': {
+ 'segment_size': 1024,
+ 'segment_index': 1,
+ 'content_size': 1023},
+ 'put_object_args': {
+ 'container': 'segments',
+ 'object': 'test_stream_2'},
+ 'expected': {
+ 'complete': True,
+ 'segment_etag': md5(b'A' * 1023).hexdigest()}},
+ {'test_params': {
+ 'segment_size': 2048,
+ 'segment_index': 0,
+ 'content_size': 2047},
+ 'put_object_args': {
+ 'container': 'test_stream',
+ 'object': 'stream_object'},
+ 'expected': {
+ 'complete': True,
+ 'segment_etag': md5(b'A' * 2047).hexdigest()}},
+ {'test_params': {
+ 'segment_size': 1024,
+ 'segment_index': 2,
+ 'content_size': 1025},
+ 'put_object_args': {
+ 'container': 'segments',
+ 'object': 'test_stream_2'},
+ 'expected': {
+ 'complete': False,
+ 'segment_etag': md5(b'A' * 1024).hexdigest()}},
+ ]
+
+ for test_args in tests:
+ params = test_args['test_params']
+ stream = test_utils.FakeStream(params['content_size'])
+ segment_size = params['segment_size']
+ segment_index = params['segment_index']
+
+ def _fake_put_object(*args, **kwargs):
+ contents = args[2]
+ # Consume and compute md5
+ return md5(contents).hexdigest()
+
+ mock_conn = mock.Mock()
+ mock_conn.put_object.side_effect = _fake_put_object
+
+ s = SwiftService()
+ resp = s._upload_stream_segment(
+ conn=mock_conn,
+ container=common_params['container'],
+ object_name=common_params['object'],
+ segment_container=common_params['segment_container'],
+ segment_name=common_params['segment_name'],
+ segment_size=segment_size,
+ segment_index=segment_index,
+ headers={},
+ fd=stream)
+ expected_args = test_args['expected']
+ put_args = test_args['put_object_args']
+ expected_response = {
+ 'segment_size': min(len(stream), segment_size),
+ 'complete': expected_args['complete'],
+ 'success': True,
+ }
+ if len(stream) or segment_index == 0:
+ segment_location = '/%s/%s' % (put_args['container'],
+ put_args['object'])
+ expected_response.update(
+ {'segment_index': segment_index,
+ 'segment_location': segment_location,
+ 'segment_etag': expected_args['segment_etag'],
+ 'for_object': common_params['object']})
+ mock_conn.put_object.assert_called_once_with(
+ put_args['container'],
+ put_args['object'],
+ mock.ANY,
+ content_length=min(len(stream), segment_size),
+ headers={'etag': expected_args['segment_etag']},
+ response_dict=mock.ANY)
+ else:
+ self.assertEqual([], mock_conn.put_object.mock_calls)
+ expected_response.update(
+ {'segment_index': None,
+ 'segment_location': None,
+ 'segment_etag': None})
+ self.assertEqual(expected_response, resp)
+
def test_etag_mismatch_with_ignore_checksum(self):
def _consuming_conn(*a, **kw):
contents = a[2]
diff --git a/tests/unit/utils.py b/tests/unit/utils.py
index c05146e..2def73f 100644
--- a/tests/unit/utils.py
+++ b/tests/unit/utils.py
@@ -548,3 +548,24 @@ def _make_fake_import_keystone_client(fake_import):
return fake_import, fake_import
return _fake_import_keystone_client
+
+
+class FakeStream(object):
+ def __init__(self, size):
+ self.bytes_read = 0
+ self.size = size
+
+ def read(self, size=-1):
+ if self.bytes_read == self.size:
+ return b''
+
+ if size == -1 or size + self.bytes_read > self.size:
+ remaining = self.size - self.bytes_read
+ self.bytes_read = self.size
+ return b'A' * remaining
+
+ self.bytes_read += size
+ return b'A' * size
+
+ def __len__(self):
+ return self.size