summaryrefslogtreecommitdiff
path: root/swiftclient/service.py
diff options
context:
space:
mode:
Diffstat (limited to 'swiftclient/service.py')
-rw-r--r--swiftclient/service.py225
1 files changed, 224 insertions, 1 deletions
diff --git a/swiftclient/service.py b/swiftclient/service.py
index 6ccba55..af412d1 100644
--- a/swiftclient/service.py
+++ b/swiftclient/service.py
@@ -200,7 +200,9 @@ _default_local_options = {
'human': False,
'dir_marker': False,
'checksum': True,
- 'shuffle': False
+ 'shuffle': False,
+ 'destination': None,
+ 'fresh_metadata': False,
}
POLICY = 'X-Storage-Policy'
@@ -330,6 +332,42 @@ class SwiftPostObject(object):
self.options = options
+class SwiftCopyObject(object):
+ """
+ Class for specifying an object copy,
+ allowing the destination/headers/metadata/fresh_metadata to be specified
+ separately for each individual object.
+ destination and fresh_metadata should be set in options
+ """
+ def __init__(self, object_name, options=None):
+ if not isinstance(object_name, string_types) or not object_name:
+ raise SwiftError(
+ "Object names must be specified as non-empty strings"
+ )
+
+ self.object_name = object_name
+ self.options = options
+
+ if self.options is None:
+ self.destination = None
+ self.fresh_metadata = False
+ else:
+ self.destination = self.options.get('destination')
+ self.fresh_metadata = self.options.get('fresh_metadata', False)
+
+ if self.destination is not None:
+ destination_components = self.destination.split('/')
+ if destination_components[0] or len(destination_components) < 2:
+ raise SwiftError("destination must be in format /cont[/obj]")
+ if not destination_components[-1]:
+ raise SwiftError("destination must not end in a slash")
+ if len(destination_components) == 2:
+ # only container set in destination
+ self.destination = "{0}/{1}".format(
+ self.destination, object_name
+ )
+
+
class _SwiftReader(object):
"""
Class for downloading objects from swift and raising appropriate
@@ -2391,6 +2429,191 @@ class SwiftService(object):
return res
+ # Copy related methods
+ #
+ def copy(self, container, objects, options=None):
+ """
+ Copy operations on a list of objects in a container. Destination
+ containers will be created.
+
+ :param container: The container from which to copy the objects.
+ :param objects: A list of object names (strings) or SwiftCopyObject
+ instances containing an object name and an
+ options dict (can be None) to override the options for
+ that individual copy operation::
+
+ [
+ 'object_name',
+ SwiftCopyObject(
+ 'object_name',
+ options={
+ 'destination': '/container/object',
+ 'fresh_metadata': False,
+ ...
+ }),
+ ...
+ ]
+
+ The options dict is described below.
+ :param options: A dictionary containing options to override the global
+ options specified during the service object creation.
+ These options are applied to all copy operations
+ performed by this call, unless overridden on a per
+ object basis.
+ The options "destination" and "fresh_metadata" do
+ not need to be set, in this case objects will be
+ copied onto themselves and metadata will not be
+ refreshed.
+ The option "destination" can also be specified in the
+ format '/container', in which case objects without an
+ explicit destination will be copied to the destination
+ /container/original_object_name. Combinations of
+ multiple objects and a destination in the format
+ '/container/object' is invalid. Possible options are
+ given below::
+
+ {
+ 'meta': [],
+ 'header': [],
+ 'destination': '/container/object',
+ 'fresh_metadata': False,
+ }
+
+ :returns: A generator returning the results of copying the given list
+ of objects.
+
+ :raises: SwiftError
+ """
+ if options is not None:
+ options = dict(self._options, **options)
+ else:
+ options = self._options
+
+ # Try to create the container, just in case it doesn't exist. If this
+ # fails, it might just be because the user doesn't have container PUT
+ # permissions, so we'll ignore any error. If there's really a problem,
+ # it'll surface on the first object COPY.
+ containers = set(
+ next(p for p in obj.destination.split("/") if p)
+ for obj in objects
+ if isinstance(obj, SwiftCopyObject) and obj.destination
+ )
+ if options.get('destination'):
+ destination_split = options['destination'].split('/')
+ if destination_split[0]:
+ raise SwiftError("destination must be in format /cont[/obj]")
+ _str_objs = [
+ o for o in objects if not isinstance(o, SwiftCopyObject)
+ ]
+ if len(destination_split) > 2 and len(_str_objs) > 1:
+ # TODO (clayg): could be useful to copy multiple objects into
+ # a destination like "/container/common/prefix/for/objects/"
+ # where the trailing "/" indicates the destination option is a
+ # prefix!
+ raise SwiftError("Combination of multiple objects and "
+ "destination including object is invalid")
+ if destination_split[-1] == '':
+ # N.B. this protects the above case
+ raise SwiftError("destination can not end in a slash")
+ containers.add(destination_split[1])
+
+ policy_header = {}
+ _header = split_headers(options["header"])
+ if POLICY in _header:
+ policy_header[POLICY] = _header[POLICY]
+ create_containers = [
+ self.thread_manager.container_pool.submit(
+ self._create_container_job, cont, headers=policy_header)
+ for cont in containers
+ ]
+
+ # wait for container creation jobs to complete before any COPY
+ for r in interruptable_as_completed(create_containers):
+ res = r.result()
+ yield res
+
+ copy_futures = []
+ copy_objects = self._make_copy_objects(objects, options)
+ for copy_object in copy_objects:
+ obj = copy_object.object_name
+ obj_options = copy_object.options
+ destination = copy_object.destination
+ fresh_metadata = copy_object.fresh_metadata
+ headers = split_headers(
+ options['meta'], 'X-Object-Meta-')
+ # add header options to the headers object for the request.
+ headers.update(
+ split_headers(options['header'], ''))
+ if obj_options is not None:
+ if 'meta' in obj_options:
+ headers.update(
+ split_headers(
+ obj_options['meta'], 'X-Object-Meta-'
+ )
+ )
+ if 'header' in obj_options:
+ headers.update(
+ split_headers(obj_options['header'], '')
+ )
+
+ copy = self.thread_manager.object_uu_pool.submit(
+ self._copy_object_job, container, obj, destination,
+ headers, fresh_metadata
+ )
+ copy_futures.append(copy)
+
+ for r in interruptable_as_completed(copy_futures):
+ res = r.result()
+ yield res
+
+ @staticmethod
+ def _make_copy_objects(objects, options):
+ copy_objects = []
+
+ for o in objects:
+ if isinstance(o, string_types):
+ obj = SwiftCopyObject(o, options)
+ copy_objects.append(obj)
+ elif isinstance(o, SwiftCopyObject):
+ copy_objects.append(o)
+ else:
+ raise SwiftError(
+ "The copy operation takes only strings or "
+ "SwiftCopyObjects as input",
+ obj=o)
+
+ return copy_objects
+
+ @staticmethod
+ def _copy_object_job(conn, container, obj, destination, headers,
+ fresh_metadata):
+ response_dict = {}
+ res = {
+ 'success': True,
+ 'action': 'copy_object',
+ 'container': container,
+ 'object': obj,
+ 'destination': destination,
+ 'headers': headers,
+ 'fresh_metadata': fresh_metadata,
+ 'response_dict': response_dict
+ }
+ try:
+ conn.copy_object(
+ container, obj, destination=destination, headers=headers,
+ fresh_metadata=fresh_metadata, response_dict=response_dict)
+ except Exception as err:
+ traceback, err_time = report_traceback()
+ logger.exception(err)
+ res.update({
+ 'success': False,
+ 'error': err,
+ 'traceback': traceback,
+ 'error_timestamp': err_time
+ })
+
+ return res
+
# Capabilities related methods
#
def capabilities(self, url=None, refresh_cache=False):