diff options
Diffstat (limited to 'swiftclient/service.py')
-rw-r--r-- | swiftclient/service.py | 119 |
1 files changed, 74 insertions, 45 deletions
diff --git a/swiftclient/service.py b/swiftclient/service.py index 8c6880a..223641b 100644 --- a/swiftclient/service.py +++ b/swiftclient/service.py @@ -43,7 +43,8 @@ from swiftclient.command_helpers import ( ) from swiftclient.utils import ( config_true_value, ReadableToIterable, LengthWrapper, EMPTY_ETAG, - parse_api_response, report_traceback, n_groups, split_request_headers + parse_api_response, report_traceback, n_groups, split_request_headers, + n_at_a_time ) from swiftclient.exceptions import ClientException from swiftclient.multithreading import MultiThreadingManager @@ -473,7 +474,7 @@ class SwiftService(object): or container, or an iterator for returning the results of the stat operations on a list of objects. - :raises: SwiftError + :raises SwiftError: """ if options is not None: options = dict(self._options, **options) @@ -637,7 +638,7 @@ class SwiftService(object): container/account, or an iterator for returning the results of posts to a list of objects. - :raises: SwiftError + :raises SwiftError: """ if options is not None: options = dict(self._options, **options) @@ -1031,8 +1032,8 @@ class SwiftService(object): 'download_object' dictionary containing the results of an individual file download. - :raises: ClientException - :raises: SwiftError + :raises ClientException: + :raises SwiftError: """ if options is not None: options = dict(self._options, **options) @@ -1396,8 +1397,8 @@ class SwiftService(object): :returns: A generator for returning the results of the uploads. - :raises: SwiftError - :raises: ClientException + :raises SwiftError: + :raises ClientException: """ if options is not None: options = dict(self._options, **options) @@ -1714,6 +1715,7 @@ class SwiftService(object): segment_name), 'log_line': '%s segment %s' % (obj_name, segment_index), } + fp = None try: fp = open(path, 'rb') fp.seek(segment_start) @@ -1761,6 +1763,9 @@ class SwiftService(object): if results_queue is not None: results_queue.put(res) return res + finally: + if fp is not None: + fp.close() def _get_chunk_data(self, conn, container, obj, headers, manifest=None): chunks = [] @@ -2008,29 +2013,36 @@ class SwiftService(object): else: res['large_object'] = False obr = {} - if path is not None: - content_length = getsize(path) - contents = LengthWrapper(open(path, 'rb'), - content_length, - md5=options['checksum']) - else: - content_length = None - contents = ReadableToIterable(stream, - md5=options['checksum']) - - etag = conn.put_object( - container, obj, contents, - content_length=content_length, headers=put_headers, - response_dict=obr - ) - res['response_dict'] = obr - - if (options['checksum'] and - etag and etag != contents.get_md5sum()): - raise SwiftError('Object upload verification failed: ' - 'md5 mismatch, local {0} != remote {1} ' - '(remote object has not been removed)' - .format(contents.get_md5sum(), etag)) + fp = None + try: + if path is not None: + content_length = getsize(path) + fp = open(path, 'rb') + contents = LengthWrapper(fp, + content_length, + md5=options['checksum']) + else: + content_length = None + contents = ReadableToIterable(stream, + md5=options['checksum']) + + etag = conn.put_object( + container, obj, contents, + content_length=content_length, headers=put_headers, + response_dict=obr + ) + res['response_dict'] = obr + + if (options['checksum'] and + etag and etag != contents.get_md5sum()): + raise SwiftError( + 'Object upload verification failed: ' + 'md5 mismatch, local {0} != remote {1} ' + '(remote object has not been removed)' + .format(contents.get_md5sum(), etag)) + finally: + if fp is not None: + fp.close() if old_manifest or old_slo_manifest_paths: drs = [] @@ -2124,8 +2136,8 @@ class SwiftService(object): 'bulk_delete' dictionary containing the results of an individual delete operation. - :raises: ClientException - :raises: SwiftError + :raises ClientException: + :raises SwiftError: """ if options is not None: options = dict(self._options, **options) @@ -2140,11 +2152,15 @@ class SwiftService(object): rq = Queue() obj_dels = {} - if self._should_bulk_delete(objects): - for obj_slice in n_groups( - objects, self._options['object_dd_threads']): - self._bulk_delete(container, obj_slice, options, - obj_dels) + bulk_page_size = self._bulk_delete_page_size(objects) + if bulk_page_size > 1: + page_at_a_time = n_at_a_time(objects, bulk_page_size) + for page_slice in page_at_a_time: + for obj_slice in n_groups( + page_slice, + self._options['object_dd_threads']): + self._bulk_delete(container, obj_slice, options, + obj_dels) else: self._per_item_delete(container, objects, options, obj_dels, rq) @@ -2197,23 +2213,36 @@ class SwiftService(object): and not res['success']): cancelled = True - def _should_bulk_delete(self, objects): - if len(objects) < 2 * self._options['object_dd_threads']: + def _bulk_delete_page_size(self, objects): + ''' + Given the iterable 'objects', will return how many items should be + deleted at a time. + + :param objects: An iterable that supports 'len()' + :returns: The bulk delete page size (i.e. the max number of + objects that can be bulk deleted at once, as reported by + the cluster). If bulk delete is disabled, return 1 + ''' + if len(objects) <= 2 * self._options['object_dd_threads']: # Not many objects; may as well delete one-by-one - return False + return 1 try: cap_result = self.capabilities() if not cap_result['success']: # This shouldn't actually happen, but just in case we start # being more nuanced about our capabilities result... - return False + return 1 except ClientException: # Old swift, presumably; assume no bulk middleware - return False + return 1 swift_info = cap_result['capabilities'] - return 'bulk_delete' in swift_info + if 'bulk_delete' in swift_info: + return swift_info['bulk_delete'].get( + 'max_deletes_per_request', 10000) + else: + return 1 def _per_item_delete(self, container, objects, options, rdict, rq): for obj in objects: @@ -2493,7 +2522,7 @@ class SwiftService(object): :returns: A generator returning the results of copying the given list of objects. - :raises: SwiftError + :raises SwiftError: """ if options is not None: options = dict(self._options, **options) @@ -2635,7 +2664,7 @@ class SwiftService(object): :returns: A dictionary containing the capabilities of the cluster. - :raises: ClientException + :raises ClientException: """ if not refresh_cache and url in self.capabilities_cache: return self.capabilities_cache[url] |