summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--swiftclient/multithreading.py7
-rw-r--r--swiftclient/service.py100
-rw-r--r--tests/functional/test_swiftclient.py10
-rw-r--r--tests/unit/test_service.py128
-rw-r--r--tests/unit/utils.py9
5 files changed, 214 insertions, 40 deletions
diff --git a/swiftclient/multithreading.py b/swiftclient/multithreading.py
index 32d8ffa..c53d987 100644
--- a/swiftclient/multithreading.py
+++ b/swiftclient/multithreading.py
@@ -45,7 +45,7 @@ class OutputManager(object):
"""
DEFAULT_OFFSET = 14
- def __init__(self, print_stream=sys.stdout, error_stream=sys.stderr):
+ def __init__(self, print_stream=None, error_stream=None):
"""
:param print_stream: The stream to which :meth:`print_msg` sends
formatted messages.
@@ -54,9 +54,10 @@ class OutputManager(object):
On Python 2, Unicode messages are encoded to utf8.
"""
- self.print_stream = print_stream
+ self.print_stream = print_stream or sys.stdout
self.print_pool = ThreadPoolExecutor(max_workers=1)
- self.error_stream = error_stream
+
+ self.error_stream = error_stream or sys.stderr
self.error_print_pool = ThreadPoolExecutor(max_workers=1)
self.error_count = 0
diff --git a/swiftclient/service.py b/swiftclient/service.py
index f24d430..bb132dd 100644
--- a/swiftclient/service.py
+++ b/swiftclient/service.py
@@ -1159,7 +1159,7 @@ class SwiftService(object):
'headers': [],
'segment_size': None,
'use_slo': False,
- 'segment_container: None,
+ 'segment_container': None,
'leave_segments': False,
'changed': None,
'skip_identical': False,
@@ -1502,6 +1502,51 @@ class SwiftService(object):
results_queue.put(res)
return res
+ def _get_chunk_data(self, conn, container, obj, headers):
+ chunks = []
+ if 'x-object-manifest' in headers:
+ scontainer, sprefix = headers['x-object-manifest'].split('/', 1)
+ for part in self.list(scontainer, {'prefix': sprefix}):
+ if part["success"]:
+ chunks.extend(part["listing"])
+ else:
+ raise part["error"]
+ elif config_true_value(headers.get('x-static-large-object')):
+ _, manifest_data = conn.get_object(
+ container, obj, query_string='multipart-manifest=get')
+ for chunk in json.loads(manifest_data):
+ if chunk.get('sub_slo'):
+ scont, sobj = chunk['name'].lstrip('/').split('/', 1)
+ chunks.extend(self._get_chunk_data(
+ conn, scont, sobj, {'x-static-large-object': True}))
+ else:
+ chunks.append(chunk)
+ else:
+ chunks.append({'hash': headers.get('etag').strip('"'),
+ 'bytes': int(headers.get('content-length'))})
+ return chunks
+
+ def _is_identical(self, chunk_data, path):
+ try:
+ fp = open(path, 'rb')
+ except IOError:
+ return False
+
+ with fp:
+ for chunk in chunk_data:
+ to_read = chunk['bytes']
+ md5sum = md5()
+ while to_read:
+ data = fp.read(min(65536, to_read))
+ if not data:
+ return False
+ md5sum.update(data)
+ to_read -= len(data)
+ if md5sum.hexdigest() != chunk['hash']:
+ return False
+ # Each chunk is verified; check that we're at the end of the file
+ return not fp.read(1)
+
def _upload_object_job(self, conn, container, source, obj, options,
results_queue=None):
res = {
@@ -1533,32 +1578,27 @@ class SwiftService(object):
old_manifest = None
old_slo_manifest_paths = []
new_slo_manifest_paths = set()
+ segment_size = int(0 if options['segment_size'] is None
+ else options['segment_size'])
if (options['changed'] or options['skip_identical']
or not options['leave_segments']):
- checksum = None
- if options['skip_identical']:
- try:
- fp = open(path, 'rb')
- except IOError:
- pass
- else:
- with fp:
- md5sum = md5()
- while True:
- data = fp.read(65536)
- if not data:
- break
- md5sum.update(data)
- checksum = md5sum.hexdigest()
try:
headers = conn.head_object(container, obj)
- if options['skip_identical'] and checksum is not None:
- if checksum == headers.get('etag'):
- res.update({
- 'success': True,
- 'status': 'skipped-identical'
- })
- return res
+ is_slo = config_true_value(
+ headers.get('x-static-large-object'))
+
+ if options['skip_identical'] or (
+ is_slo and not options['leave_segments']):
+ chunk_data = self._get_chunk_data(
+ conn, container, obj, headers)
+
+ if options['skip_identical'] and self._is_identical(
+ chunk_data, path):
+ res.update({
+ 'success': True,
+ 'status': 'skipped-identical'
+ })
+ return res
cl = int(headers.get('content-length'))
mt = headers.get('x-object-meta-mtime')
@@ -1572,13 +1612,8 @@ class SwiftService(object):
return res
if not options['leave_segments']:
old_manifest = headers.get('x-object-manifest')
- if config_true_value(
- headers.get('x-static-large-object')):
- headers, manifest_data = conn.get_object(
- container, obj,
- query_string='multipart-manifest=get'
- )
- for old_seg in json.loads(manifest_data):
+ if is_slo:
+ for old_seg in chunk_data:
seg_path = old_seg['name'].lstrip('/')
if isinstance(seg_path, text_type):
seg_path = seg_path.encode('utf-8')
@@ -1598,8 +1633,8 @@ class SwiftService(object):
# a segment job if we're reading from a stream - we may fail if we
# go over the single object limit, but this gives us a nice way
# to create objects from memory
- if (path is not None and options['segment_size']
- and (getsize(path) > int(options['segment_size']))):
+ if (path is not None and segment_size
+ and (getsize(path) > segment_size)):
res['large_object'] = True
seg_container = container + '_segments'
if options['segment_container']:
@@ -1612,7 +1647,6 @@ class SwiftService(object):
segment_start = 0
while segment_start < full_size:
- segment_size = int(options['segment_size'])
if segment_start + segment_size > full_size:
segment_size = full_size - segment_start
if options['use_slo']:
diff --git a/tests/functional/test_swiftclient.py b/tests/functional/test_swiftclient.py
index f5d14aa..4b57f1d 100644
--- a/tests/functional/test_swiftclient.py
+++ b/tests/functional/test_swiftclient.py
@@ -51,8 +51,13 @@ class TestFunctional(testtools.TestCase):
auth_ssl = config.getboolean('func_test', 'auth_ssl')
auth_prefix = config.get('func_test', 'auth_prefix')
self.auth_version = config.get('func_test', 'auth_version')
- self.account = config.get('func_test', 'account')
- self.username = config.get('func_test', 'username')
+ try:
+ self.account_username = config.get('func_test',
+ 'account_username')
+ except configparser.NoOptionError:
+ account = config.get('func_test', 'account')
+ username = config.get('func_test', 'username')
+ self.account_username = "%s:%s" % (account, username)
self.password = config.get('func_test', 'password')
self.auth_url = ""
if auth_ssl:
@@ -62,7 +67,6 @@ class TestFunctional(testtools.TestCase):
self.auth_url += "%s:%s%s" % (auth_host, auth_port, auth_prefix)
if self.auth_version == "1":
self.auth_url += 'v1.0'
- self.account_username = "%s:%s" % (self.account, self.username)
else:
self.skip_tests = True
diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py
index 3309813..073f06e 100644
--- a/tests/unit/test_service.py
+++ b/tests/unit/test_service.py
@@ -817,3 +817,131 @@ class TestServiceUpload(testtools.TestCase):
contents = mock_conn.put_object.call_args[0][2]
self.assertEqual(contents.get_md5sum(), md5(b'a' * 30).hexdigest())
+
+ def test_upload_object_job_identical_etag(self):
+ with tempfile.NamedTemporaryFile() as f:
+ f.write(b'a' * 30)
+ f.flush()
+
+ mock_conn = mock.Mock()
+ mock_conn.head_object.return_value = {
+ 'content-length': 30,
+ 'etag': md5(b'a' * 30).hexdigest()}
+ type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+
+ s = SwiftService()
+ r = s._upload_object_job(conn=mock_conn,
+ container='test_c',
+ source=f.name,
+ obj='test_o',
+ options={'changed': False,
+ 'skip_identical': True,
+ 'leave_segments': True,
+ 'header': '',
+ 'segment_size': 0})
+
+ self.assertTrue(r['success'])
+ self.assertIn('status', r)
+ self.assertEqual(r['status'], 'skipped-identical')
+ self.assertEqual(mock_conn.put_object.call_count, 0)
+ self.assertEqual(mock_conn.head_object.call_count, 1)
+ mock_conn.head_object.assert_called_with('test_c', 'test_o')
+
+ def test_upload_object_job_identical_slo_with_nesting(self):
+ with tempfile.NamedTemporaryFile() as f:
+ f.write(b'a' * 30)
+ f.flush()
+ seg_etag = md5(b'a' * 10).hexdigest()
+ submanifest = "[%s]" % ",".join(
+ ['{"bytes":10,"hash":"%s"}' % seg_etag] * 2)
+ submanifest_etag = md5(seg_etag.encode('ascii') * 2).hexdigest()
+ manifest = "[%s]" % ",".join([
+ '{"sub_slo":true,"name":"/test_c_segments/test_sub_slo",'
+ '"bytes":20,"hash":"%s"}' % submanifest_etag,
+ '{"bytes":10,"hash":"%s"}' % seg_etag])
+
+ mock_conn = mock.Mock()
+ mock_conn.head_object.return_value = {
+ 'x-static-large-object': True,
+ 'content-length': 30,
+ 'etag': md5(submanifest_etag.encode('ascii') +
+ seg_etag.encode('ascii')).hexdigest()}
+ mock_conn.get_object.side_effect = [
+ (None, manifest),
+ (None, submanifest)]
+ type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+
+ s = SwiftService()
+ r = s._upload_object_job(conn=mock_conn,
+ container='test_c',
+ source=f.name,
+ obj='test_o',
+ options={'changed': False,
+ 'skip_identical': True,
+ 'leave_segments': True,
+ 'header': '',
+ 'segment_size': 10})
+
+ self.assertIsNone(r.get('error'))
+ self.assertTrue(r['success'])
+ self.assertEqual('skipped-identical', r.get('status'))
+ self.assertEqual(0, mock_conn.put_object.call_count)
+ self.assertEqual([mock.call('test_c', 'test_o')],
+ mock_conn.head_object.mock_calls)
+ self.assertEqual([
+ mock.call('test_c', 'test_o',
+ query_string='multipart-manifest=get'),
+ mock.call('test_c_segments', 'test_sub_slo',
+ query_string='multipart-manifest=get'),
+ ], mock_conn.get_object.mock_calls)
+
+ def test_upload_object_job_identical_dlo(self):
+ with tempfile.NamedTemporaryFile() as f:
+ f.write(b'a' * 30)
+ f.flush()
+ segment_etag = md5(b'a' * 10).hexdigest()
+
+ mock_conn = mock.Mock()
+ mock_conn.head_object.return_value = {
+ 'x-object-manifest': 'test_c_segments/test_o/prefix',
+ 'content-length': 30,
+ 'etag': md5(segment_etag.encode('ascii') * 3).hexdigest()}
+ mock_conn.get_container.side_effect = [
+ (None, [{"bytes": 10, "hash": segment_etag,
+ "name": "test_o/prefix/00"},
+ {"bytes": 10, "hash": segment_etag,
+ "name": "test_o/prefix/01"}]),
+ (None, [{"bytes": 10, "hash": segment_etag,
+ "name": "test_o/prefix/02"}]),
+ (None, {})]
+ type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+
+ s = SwiftService()
+ with mock.patch('swiftclient.service.get_conn',
+ return_value=mock_conn):
+ r = s._upload_object_job(conn=mock_conn,
+ container='test_c',
+ source=f.name,
+ obj='test_o',
+ options={'changed': False,
+ 'skip_identical': True,
+ 'leave_segments': True,
+ 'header': '',
+ 'segment_size': 10})
+
+ self.assertIsNone(r.get('error'))
+ self.assertTrue(r['success'])
+ self.assertEqual('skipped-identical', r.get('status'))
+ self.assertEqual(0, mock_conn.put_object.call_count)
+ self.assertEqual(1, mock_conn.head_object.call_count)
+ self.assertEqual(3, mock_conn.get_container.call_count)
+ mock_conn.head_object.assert_called_with('test_c', 'test_o')
+ expected = [
+ mock.call('test_c_segments', prefix='test_o/prefix',
+ marker='', delimiter=None),
+ mock.call('test_c_segments', prefix='test_o/prefix',
+ marker="test_o/prefix/01", delimiter=None),
+ mock.call('test_c_segments', prefix='test_o/prefix',
+ marker="test_o/prefix/02", delimiter=None),
+ ]
+ mock_conn.get_container.assert_has_calls(expected)
diff --git a/tests/unit/utils.py b/tests/unit/utils.py
index 88d6d12..bb68f4f 100644
--- a/tests/unit/utils.py
+++ b/tests/unit/utils.py
@@ -207,6 +207,12 @@ class MockHttpTest(testtools.TestCase):
self.fake_connect = None
self.request_log = []
+ # Capture output, since the test-runner stdout/stderr moneky-patching
+ # won't cover the references to sys.stdout/sys.stderr in
+ # swiftclient.multithreading
+ self.capture_output = CaptureOutput()
+ self.capture_output.__enter__()
+
def fake_http_connection(*args, **kwargs):
self.validateMockedRequestsConsumed()
self.request_log = []
@@ -367,6 +373,7 @@ class MockHttpTest(testtools.TestCase):
# un-hygienic mocking on the swiftclient.client module; which may lead
# to some unfortunate test order dependency bugs by way of the broken
# window theory if any other modules are similarly patched
+ self.capture_output.__exit__()
reload_module(c)
@@ -392,7 +399,7 @@ class CaptureStream(object):
self.stream = stream
self._capture = six.StringIO()
self._buffer = CaptureStreamBuffer(self)
- self.streams = [self.stream, self._capture]
+ self.streams = [self._capture]
@property
def buffer(self):