summaryrefslogtreecommitdiff
path: root/swiftclient
diff options
context:
space:
mode:
authorTimur Alperovich <timuralp@swiftstack.com>2017-06-28 12:02:21 -0700
committerTim Burke <tim.burke@gmail.com>2018-01-18 04:56:12 +0000
commit2faea932870956583f83226886d33304ee1eee46 (patch)
tree0d23a3d3389d506d5564613c296e0e92736f5cd6 /swiftclient
parenta9b8f0a0d191873ac88b0c70166a2b889096fa69 (diff)
downloadpython-swiftclient-2faea932870956583f83226886d33304ee1eee46.tar.gz
Allow for object uploads > 5GB from stdin.
When uploading from standard input, swiftclient should turn the upload into an SLO in the case of large objects. This patch picks the threshold as 10MB (and uses that as the default segment size). The consumers can also supply the --segment-size option to alter that threshold and the SLO segment size. The patch does buffer one segment in memory (which is why 10MB default was chosen). (test is updated) Change-Id: Ib13e0b687bc85930c29fe9f151cf96bc53b2e594
Diffstat (limited to 'swiftclient')
-rw-r--r--swiftclient/service.py244
-rwxr-xr-xswiftclient/shell.py8
2 files changed, 227 insertions, 25 deletions
diff --git a/swiftclient/service.py b/swiftclient/service.py
index 7b5ecd4..ed5e9e9 100644
--- a/swiftclient/service.py
+++ b/swiftclient/service.py
@@ -1502,7 +1502,8 @@ class SwiftService(object):
if hasattr(s, 'read'):
# We've got a file like object to upload to o
file_future = self.thread_manager.object_uu_pool.submit(
- self._upload_object_job, container, s, o, object_options
+ self._upload_object_job, container, s, o, object_options,
+ results_queue=rq
)
details['file'] = s
details['object'] = o
@@ -1784,6 +1785,132 @@ class SwiftService(object):
if fp is not None:
fp.close()
+ @staticmethod
+ def _put_object(conn, container, name, content, headers=None, md5=None):
+ """
+ Upload object into a given container and verify the resulting ETag, if
+ the md5 optional parameter is passed.
+
+ :param conn: The Swift connection to use for uploads.
+ :param container: The container to put the object into.
+ :param name: The name of the object.
+ :param content: Object content.
+ :param headers: Headers (optional) to associate with the object.
+ :param md5: MD5 sum of the content. If passed in, will be used to
+ verify the returned ETag.
+
+ :returns: A dictionary as the response from calling put_object.
+ The keys are:
+ - status
+ - reason
+ - headers
+ On error, the dictionary contains the following keys:
+ - success (with value False)
+ - error - the encountered exception (object)
+ - error_timestamp
+ - response_dict - results from the put_object call, as
+ documented above
+ - attempts - number of attempts made
+ """
+ if headers is None:
+ headers = {}
+ else:
+ headers = dict(headers)
+ if md5 is not None:
+ headers['etag'] = md5
+ results = {}
+ try:
+ etag = conn.put_object(
+ container, name, content, content_length=len(content),
+ headers=headers, response_dict=results)
+ if md5 is not None and etag != md5:
+ raise SwiftError('Upload verification failed for {0}: md5 '
+ 'mismatch {1} != {2}'.format(name, md5, etag))
+ results['success'] = True
+ except Exception as err:
+ traceback, err_time = report_traceback()
+ logger.exception(err)
+ return {
+ 'success': False,
+ 'error': err,
+ 'error_timestamp': err_time,
+ 'response_dict': results,
+ 'attempts': conn.attempts,
+ 'traceback': traceback
+ }
+ return results
+
+ @staticmethod
+ def _upload_stream_segment(conn, container, object_name,
+ segment_container, segment_name,
+ segment_size, segment_index,
+ headers, fd):
+ """
+ Upload a segment from a stream, buffering it in memory first. The
+ resulting object is placed either as a segment in the segment
+ container, or if it is smaller than a single segment, as the given
+ object name.
+
+ :param conn: Swift Connection to use.
+ :param container: Container in which the object would be placed.
+ :param object_name: Name of the final object (used in case the stream
+ is smaller than the segment_size)
+ :param segment_container: Container to hold the object segments.
+ :param segment_name: The name of the segment.
+ :param segment_size: Minimum segment size.
+ :param segment_index: The segment index.
+ :param headers: Headers to attach to the segment/object.
+ :param fd: File-like handle for the content. Must implement read().
+
+ :returns: Dictionary, containing the following keys:
+ - complete -- whether the stream is exhausted
+ - segment_size - the actual size of the segment (may be
+ smaller than the passed in segment_size)
+ - segment_location - path to the segment
+ - segment_index - index of the segment
+ - segment_etag - the ETag for the segment
+ """
+ buf = []
+ dgst = md5()
+ bytes_read = 0
+ while bytes_read < segment_size:
+ data = fd.read(segment_size - bytes_read)
+ if not data:
+ break
+ bytes_read += len(data)
+ dgst.update(data)
+ buf.append(data)
+ buf = b''.join(buf)
+ segment_hash = dgst.hexdigest()
+
+ if not buf and segment_index > 0:
+ # Happens if the segment size aligns with the object size
+ return {'complete': True,
+ 'segment_size': 0,
+ 'segment_index': None,
+ 'segment_etag': None,
+ 'segment_location': None,
+ 'success': True}
+
+ if segment_index == 0 and len(buf) < segment_size:
+ ret = SwiftService._put_object(
+ conn, container, object_name, buf, headers, segment_hash)
+ ret['segment_location'] = '/%s/%s' % (container, object_name)
+ else:
+ ret = SwiftService._put_object(
+ conn, segment_container, segment_name, buf, headers,
+ segment_hash)
+ ret['segment_location'] = '/%s/%s' % (
+ segment_container, segment_name)
+
+ ret.update(
+ dict(complete=len(buf) < segment_size,
+ segment_size=len(buf),
+ segment_index=segment_index,
+ segment_etag=segment_hash,
+ for_object=object_name))
+ return ret
+
def _get_chunk_data(self, conn, container, obj, headers, manifest=None):
chunks = []
if 'x-object-manifest' in headers:
@@ -1833,6 +1960,47 @@ class SwiftService(object):
# Each chunk is verified; check that we're at the end of the file
return not fp.read(1)
+ @staticmethod
+ def _upload_slo_manifest(conn, segment_results, container, obj, headers):
+ """
+ Upload an SLO manifest, given the results of uploading each segment, to
+ the specified container.
+
+ :param segment_results: List of response_dict structures, as populated
+ by _upload_segment_job. Specifically, each
+ entry must container the following keys:
+ - segment_location
+ - segment_etag
+ - segment_size
+ - segment_index
+ :param container: The container to put the manifest into.
+ :param obj: The name of the manifest object to use.
+ :param headers: Optional set of headers to attach to the manifest.
+ """
+ if headers is None:
+ headers = {}
+ segment_results.sort(key=lambda di: di['segment_index'])
+ for seg in segment_results:
+ seg_loc = seg['segment_location'].lstrip('/')
+ if isinstance(seg_loc, text_type):
+ seg_loc = seg_loc.encode('utf-8')
+
+ manifest_data = json.dumps([
+ {
+ 'path': d['segment_location'],
+ 'etag': d['segment_etag'],
+ 'size_bytes': d['segment_size']
+ } for d in segment_results
+ ])
+
+ response = {}
+ conn.put_object(
+ container, obj, manifest_data,
+ headers=headers,
+ query_string='multipart-manifest=put',
+ response_dict=response)
+ return response
+
def _upload_object_job(self, conn, container, source, obj, options,
results_queue=None):
if obj.startswith('./') or obj.startswith('.\\'):
@@ -1990,29 +2158,11 @@ class SwiftService(object):
res['segment_results'] = segment_results
if options['use_slo']:
- segment_results.sort(key=lambda di: di['segment_index'])
- for seg in segment_results:
- seg_loc = seg['segment_location'].lstrip('/')
- if isinstance(seg_loc, text_type):
- seg_loc = seg_loc.encode('utf-8')
- new_slo_manifest_paths.add(seg_loc)
-
- manifest_data = json.dumps([
- {
- 'path': d['segment_location'],
- 'etag': d['segment_etag'],
- 'size_bytes': d['segment_size']
- } for d in segment_results
- ])
-
- mr = {}
- conn.put_object(
- container, obj, manifest_data,
- headers=put_headers,
- query_string='multipart-manifest=put',
- response_dict=mr
- )
- res['manifest_response_dict'] = mr
+ response = self._upload_slo_manifest(
+ conn, segment_results, container, obj, put_headers)
+ res['manifest_response_dict'] = response
+ new_slo_manifest_paths = {
+ seg['segment_location'] for seg in segment_results}
else:
new_object_manifest = '%s/%s/%s/%s/%s/' % (
quote(seg_container.encode('utf8')),
@@ -2030,6 +2180,51 @@ class SwiftService(object):
response_dict=mr
)
res['manifest_response_dict'] = mr
+ elif options['use_slo'] and segment_size and not path:
+ segment = 0
+ results = []
+ while True:
+ segment_name = '%s/slo/%s/%s/%08d' % (
+ obj, put_headers['x-object-meta-mtime'],
+ segment_size, segment
+ )
+ seg_container = container + '_segments'
+ if options['segment_container']:
+ seg_container = options['segment_container']
+ ret = self._upload_stream_segment(
+ conn, container, obj,
+ seg_container,
+ segment_name,
+ segment_size,
+ segment,
+ put_headers,
+ stream
+ )
+ if not ret['success']:
+ return ret
+ if (ret['complete'] and segment == 0) or\
+ ret['segment_size'] > 0:
+ results.append(ret)
+ if results_queue is not None:
+ # Don't insert the 0-sized segments or objects
+ # themselves
+ if ret['segment_location'] != '/%s/%s' % (
+ container, obj) and ret['segment_size'] > 0:
+ results_queue.put(ret)
+ if ret['complete']:
+ break
+ segment += 1
+ if results[0]['segment_location'] != '/%s/%s' % (
+ container, obj):
+ response = self._upload_slo_manifest(
+ conn, results, container, obj, put_headers)
+ res['manifest_response_dict'] = response
+ new_slo_manifest_paths = {
+ r['segment_location'] for r in results}
+ res['large_object'] = True
+ else:
+ res['response_dict'] = ret
+ res['large_object'] = False
else:
res['large_object'] = False
obr = {}
@@ -2063,7 +2258,6 @@ class SwiftService(object):
finally:
if fp is not None:
fp.close()
-
if old_manifest or old_slo_manifest_paths:
drs = []
delobjsmap = {}
diff --git a/swiftclient/shell.py b/swiftclient/shell.py
index 43fcf47..d02c709 100755
--- a/swiftclient/shell.py
+++ b/swiftclient/shell.py
@@ -947,6 +947,8 @@ Optional arguments:
def st_upload(parser, args, output_manager):
+ DEFAULT_STDIN_SEGMENT = 10 * 1024 * 1024
+
parser.add_argument(
'-c', '--changed', action='store_true', dest='changed',
default=False, help='Only upload files that have changed since '
@@ -1060,6 +1062,12 @@ def st_upload(parser, args, output_manager):
st_upload_help)
return
+ if from_stdin:
+ if not options['use_slo']:
+ options['use_slo'] = True
+ if not options['segment_size']:
+ options['segment_size'] = DEFAULT_STDIN_SEGMENT
+
options['object_uu_threads'] = options['object_threads']
with SwiftService(options=options) as swift:
try: