diff options
Diffstat (limited to 'swiftclient/service.py')
-rw-r--r-- | swiftclient/service.py | 122 |
1 files changed, 105 insertions, 17 deletions
diff --git a/swiftclient/service.py b/swiftclient/service.py index 5292dc5..fb334fd 100644 --- a/swiftclient/service.py +++ b/swiftclient/service.py @@ -86,6 +86,9 @@ class SwiftError(Exception): value += " segment:%s" % self.segment return value + def __repr__(self): + return str(self) + def process_options(options): # tolerate sloppy auth_version @@ -186,6 +189,7 @@ _default_local_options = { 'leave_segments': False, 'changed': None, 'skip_identical': False, + 'version_id': None, 'yes_all': False, 'read_acl': None, 'write_acl': None, @@ -200,6 +204,7 @@ _default_local_options = { 'meta': [], 'prefix': None, 'delimiter': None, + 'versions': False, 'fail_fast': False, 'human': False, 'dir_marker': False, @@ -336,6 +341,20 @@ class SwiftPostObject(object): self.options = options +class SwiftDeleteObject(object): + """ + Class for specifying an object delete, allowing the headers/metadata to be + specified separately for each individual object. + """ + def __init__(self, object_name, options=None): + if not (isinstance(object_name, string_types) and object_name): + raise SwiftError( + "Object names must be specified as non-empty strings" + ) + self.object_name = object_name + self.options = options + + class SwiftCopyObject(object): """ Class for specifying an object copy, @@ -489,6 +508,7 @@ class SwiftService(object): { 'human': False, + 'version_id': None, 'header': [] } @@ -871,6 +891,7 @@ class SwiftService(object): 'long': False, 'prefix': None, 'delimiter': None, + 'versions': False, 'header': [] } @@ -967,13 +988,19 @@ class SwiftService(object): @staticmethod def _list_container_job(conn, container, options, result_queue): marker = options.get('marker', '') + version_marker = options.get('version_marker', '') error = None req_headers = split_headers(options.get('header', [])) + if options.get('versions', False): + query_string = 'versions=true' + else: + query_string = None try: while True: _, items = conn.get_container( - container, marker=marker, prefix=options['prefix'], - delimiter=options['delimiter'], headers=req_headers + container, marker=marker, version_marker=version_marker, + prefix=options['prefix'], delimiter=options['delimiter'], + headers=req_headers, query_string=query_string ) if not items: @@ -991,6 +1018,7 @@ class SwiftService(object): result_queue.put(res) marker = items[-1].get('name', items[-1].get('subdir')) + version_marker = items[-1].get('version_id', '') except ClientException as err: traceback, err_time = report_traceback() logger.exception(err) @@ -1016,6 +1044,7 @@ class SwiftService(object): 'prefix': options['prefix'], 'success': False, 'marker': marker, + 'version_marker': version_marker, 'error': error[0], 'traceback': error[1], 'error_timestamp': error[2] @@ -1042,6 +1071,7 @@ class SwiftService(object): 'no_download': False, 'header': [], 'skip_identical': False, + 'version_id': None, 'out_directory': None, 'checksum': True, 'out_file': None, @@ -1151,6 +1181,9 @@ class SwiftService(object): get_args = {'resp_chunk_size': DISK_BUFFER, 'headers': req_headers, 'response_dict': results_dict} + if options.get('version_id') is not None: + get_args['query_string'] = ( + 'version-id=%s' % options['version_id']) if options['skip_identical']: # Assume the file is a large object; if we're wrong, the query # string is ignored and the If-None-Match header will trigger @@ -2337,14 +2370,28 @@ class SwiftService(object): of objects. :param container: The container to delete or delete from. - :param objects: The list of objects to delete. + :param objects: A list of object names (strings) or SwiftDeleteObject + instances containing an object name, and an + options dict (can be None) to override the options for + that individual delete operation:: + + [ + 'object_name', + SwiftDeleteObject('object_name', + options={...}), + ... + ] + + The options dict is described below. :param options: A dictionary containing options to override the global options specified during the service object creation:: { 'yes_all': False, 'leave_segments': False, + 'version_id': None, 'prefix': None, + 'versions': False, 'header': [], } @@ -2364,23 +2411,28 @@ class SwiftService(object): if container is not None: if objects is not None: + delete_objects = self._make_delete_objects(objects) if options['prefix']: - objects = [obj for obj in objects - if obj.startswith(options['prefix'])] + delete_objects = [ + obj for obj in delete_objects + if obj.object_name.startswith(options['prefix'])] rq = Queue() obj_dels = {} - bulk_page_size = self._bulk_delete_page_size(objects) + bulk_page_size = self._bulk_delete_page_size(delete_objects) if bulk_page_size > 1: - page_at_a_time = n_at_a_time(objects, bulk_page_size) + page_at_a_time = n_at_a_time(delete_objects, + bulk_page_size) for page_slice in page_at_a_time: for obj_slice in n_groups( page_slice, self._options['object_dd_threads']): - self._bulk_delete(container, obj_slice, options, + object_names = [ + obj.object_name for obj in obj_slice] + self._bulk_delete(container, object_names, options, obj_dels) else: - self._per_item_delete(container, objects, options, + self._per_item_delete(container, delete_objects, options, obj_dels, rq) # Start a thread to watch for delete results @@ -2445,6 +2497,11 @@ class SwiftService(object): # Not many objects; may as well delete one-by-one return 1 + if any(obj.options for obj in objects + if isinstance(obj, SwiftDeleteObject)): + # we can't do per option deletes for bulk + return 1 + try: cap_result = self.capabilities() if not cap_result['success']: @@ -2463,9 +2520,11 @@ class SwiftService(object): return 1 def _per_item_delete(self, container, objects, options, rdict, rq): - for obj in objects: + for delete_obj in objects: + obj = delete_obj.object_name + obj_options = dict(options, **delete_obj.options or {}) obj_del = self.thread_manager.object_dd_pool.submit( - self._delete_object, container, obj, options, + self._delete_object, container, obj, obj_options, results_queue=rq ) obj_details = {'container': container, 'object': obj} @@ -2500,6 +2559,24 @@ class SwiftService(object): results_queue.put(res) return res + @staticmethod + def _make_delete_objects(objects): + delete_objects = [] + + for o in objects: + if isinstance(o, string_types): + obj = SwiftDeleteObject(o) + delete_objects.append(obj) + elif isinstance(o, SwiftDeleteObject): + delete_objects.append(o) + else: + raise SwiftError( + "The delete operation takes only strings or " + "SwiftDeleteObjects as input", + obj=o) + + return delete_objects + def _delete_object(self, conn, container, obj, options, results_queue=None): _headers = {} @@ -2511,7 +2588,7 @@ class SwiftService(object): } try: old_manifest = None - query_string = None + query_params = {} if not options['leave_segments']: try: @@ -2520,11 +2597,15 @@ class SwiftService(object): query_string='symlink=get') old_manifest = headers.get('x-object-manifest') if config_true_value(headers.get('x-static-large-object')): - query_string = 'multipart-manifest=delete' + query_params['multipart-manifest'] = 'delete' except ClientException as err: if err.http_status != 404: raise + if options.get('version_id') is not None: + query_params['version-id'] = options['version_id'] + query_string = '&'.join('%s=%s' % (k, v) for (k, v) + in sorted(query_params.items())) results_dict = {} conn.delete_object(container, obj, headers=_headers, @@ -2611,12 +2692,17 @@ class SwiftService(object): try: for part in self.list(container=container, options=options): if not part["success"]: - raise part["error"] - + delete_objects = [] + for item in part['listing']: + delete_opts = {} + if options.get('versions', False) and 'version_id' in item: + delete_opts['version_id'] = item['version_id'] + delete_obj = SwiftDeleteObject(item['name'], delete_opts) + delete_objects.append(delete_obj) for res in self.delete( container=container, - objects=[o['name'] for o in part['listing']], + objects=delete_objects, options=options): yield res if options['prefix']: @@ -2679,7 +2765,9 @@ class SwiftService(object): 'No content received on account POST. ' 'Is the bulk operations middleware enabled?')}) except Exception as e: - res.update({'success': False, 'error': e}) + traceback, err_time = report_traceback() + logger.exception(e) + res.update({'success': False, 'error': e, 'traceback': traceback}) res.update({ 'action': 'bulk_delete', |