summaryrefslogtreecommitdiff
path: root/swiftclient/service.py
diff options
context:
space:
mode:
Diffstat (limited to 'swiftclient/service.py')
-rw-r--r--swiftclient/service.py119
1 files changed, 74 insertions, 45 deletions
diff --git a/swiftclient/service.py b/swiftclient/service.py
index 8c6880a..223641b 100644
--- a/swiftclient/service.py
+++ b/swiftclient/service.py
@@ -43,7 +43,8 @@ from swiftclient.command_helpers import (
)
from swiftclient.utils import (
config_true_value, ReadableToIterable, LengthWrapper, EMPTY_ETAG,
- parse_api_response, report_traceback, n_groups, split_request_headers
+ parse_api_response, report_traceback, n_groups, split_request_headers,
+ n_at_a_time
)
from swiftclient.exceptions import ClientException
from swiftclient.multithreading import MultiThreadingManager
@@ -473,7 +474,7 @@ class SwiftService(object):
or container, or an iterator for returning the results of the
stat operations on a list of objects.
- :raises: SwiftError
+ :raises SwiftError:
"""
if options is not None:
options = dict(self._options, **options)
@@ -637,7 +638,7 @@ class SwiftService(object):
container/account, or an iterator for returning the results
of posts to a list of objects.
- :raises: SwiftError
+ :raises SwiftError:
"""
if options is not None:
options = dict(self._options, **options)
@@ -1031,8 +1032,8 @@ class SwiftService(object):
'download_object' dictionary containing the results of an
individual file download.
- :raises: ClientException
- :raises: SwiftError
+ :raises ClientException:
+ :raises SwiftError:
"""
if options is not None:
options = dict(self._options, **options)
@@ -1396,8 +1397,8 @@ class SwiftService(object):
:returns: A generator for returning the results of the uploads.
- :raises: SwiftError
- :raises: ClientException
+ :raises SwiftError:
+ :raises ClientException:
"""
if options is not None:
options = dict(self._options, **options)
@@ -1714,6 +1715,7 @@ class SwiftService(object):
segment_name),
'log_line': '%s segment %s' % (obj_name, segment_index),
}
+ fp = None
try:
fp = open(path, 'rb')
fp.seek(segment_start)
@@ -1761,6 +1763,9 @@ class SwiftService(object):
if results_queue is not None:
results_queue.put(res)
return res
+ finally:
+ if fp is not None:
+ fp.close()
def _get_chunk_data(self, conn, container, obj, headers, manifest=None):
chunks = []
@@ -2008,29 +2013,36 @@ class SwiftService(object):
else:
res['large_object'] = False
obr = {}
- if path is not None:
- content_length = getsize(path)
- contents = LengthWrapper(open(path, 'rb'),
- content_length,
- md5=options['checksum'])
- else:
- content_length = None
- contents = ReadableToIterable(stream,
- md5=options['checksum'])
-
- etag = conn.put_object(
- container, obj, contents,
- content_length=content_length, headers=put_headers,
- response_dict=obr
- )
- res['response_dict'] = obr
-
- if (options['checksum'] and
- etag and etag != contents.get_md5sum()):
- raise SwiftError('Object upload verification failed: '
- 'md5 mismatch, local {0} != remote {1} '
- '(remote object has not been removed)'
- .format(contents.get_md5sum(), etag))
+ fp = None
+ try:
+ if path is not None:
+ content_length = getsize(path)
+ fp = open(path, 'rb')
+ contents = LengthWrapper(fp,
+ content_length,
+ md5=options['checksum'])
+ else:
+ content_length = None
+ contents = ReadableToIterable(stream,
+ md5=options['checksum'])
+
+ etag = conn.put_object(
+ container, obj, contents,
+ content_length=content_length, headers=put_headers,
+ response_dict=obr
+ )
+ res['response_dict'] = obr
+
+ if (options['checksum'] and
+ etag and etag != contents.get_md5sum()):
+ raise SwiftError(
+ 'Object upload verification failed: '
+ 'md5 mismatch, local {0} != remote {1} '
+ '(remote object has not been removed)'
+ .format(contents.get_md5sum(), etag))
+ finally:
+ if fp is not None:
+ fp.close()
if old_manifest or old_slo_manifest_paths:
drs = []
@@ -2124,8 +2136,8 @@ class SwiftService(object):
'bulk_delete' dictionary containing the results of an
individual delete operation.
- :raises: ClientException
- :raises: SwiftError
+ :raises ClientException:
+ :raises SwiftError:
"""
if options is not None:
options = dict(self._options, **options)
@@ -2140,11 +2152,15 @@ class SwiftService(object):
rq = Queue()
obj_dels = {}
- if self._should_bulk_delete(objects):
- for obj_slice in n_groups(
- objects, self._options['object_dd_threads']):
- self._bulk_delete(container, obj_slice, options,
- obj_dels)
+ bulk_page_size = self._bulk_delete_page_size(objects)
+ if bulk_page_size > 1:
+ page_at_a_time = n_at_a_time(objects, bulk_page_size)
+ for page_slice in page_at_a_time:
+ for obj_slice in n_groups(
+ page_slice,
+ self._options['object_dd_threads']):
+ self._bulk_delete(container, obj_slice, options,
+ obj_dels)
else:
self._per_item_delete(container, objects, options,
obj_dels, rq)
@@ -2197,23 +2213,36 @@ class SwiftService(object):
and not res['success']):
cancelled = True
- def _should_bulk_delete(self, objects):
- if len(objects) < 2 * self._options['object_dd_threads']:
+ def _bulk_delete_page_size(self, objects):
+ '''
+ Given the iterable 'objects', will return how many items should be
+ deleted at a time.
+
+ :param objects: An iterable that supports 'len()'
+ :returns: The bulk delete page size (i.e. the max number of
+ objects that can be bulk deleted at once, as reported by
+ the cluster). If bulk delete is disabled, return 1
+ '''
+ if len(objects) <= 2 * self._options['object_dd_threads']:
# Not many objects; may as well delete one-by-one
- return False
+ return 1
try:
cap_result = self.capabilities()
if not cap_result['success']:
# This shouldn't actually happen, but just in case we start
# being more nuanced about our capabilities result...
- return False
+ return 1
except ClientException:
# Old swift, presumably; assume no bulk middleware
- return False
+ return 1
swift_info = cap_result['capabilities']
- return 'bulk_delete' in swift_info
+ if 'bulk_delete' in swift_info:
+ return swift_info['bulk_delete'].get(
+ 'max_deletes_per_request', 10000)
+ else:
+ return 1
def _per_item_delete(self, container, objects, options, rdict, rq):
for obj in objects:
@@ -2493,7 +2522,7 @@ class SwiftService(object):
:returns: A generator returning the results of copying the given list
of objects.
- :raises: SwiftError
+ :raises SwiftError:
"""
if options is not None:
options = dict(self._options, **options)
@@ -2635,7 +2664,7 @@ class SwiftService(object):
:returns: A dictionary containing the capabilities of the cluster.
- :raises: ClientException
+ :raises ClientException:
"""
if not refresh_cache and url in self.capabilities_cache:
return self.capabilities_cache[url]