summaryrefslogtreecommitdiff
path: root/test/unit/test_service.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/unit/test_service.py')
-rw-r--r--test/unit/test_service.py2909
1 files changed, 2909 insertions, 0 deletions
diff --git a/test/unit/test_service.py b/test/unit/test_service.py
new file mode 100644
index 0000000..ed3a2d6
--- /dev/null
+++ b/test/unit/test_service.py
@@ -0,0 +1,2909 @@
+# -*- coding: utf-8 -*-
+# Copyright (c) 2014 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import unicode_literals
+import contextlib
+import mock
+import os
+import six
+import tempfile
+import unittest
+import time
+
+from concurrent.futures import Future
+from hashlib import md5
+from mock import Mock, PropertyMock
+from six.moves.queue import Queue, Empty as QueueEmptyError
+from six import BytesIO
+from time import sleep
+
+import swiftclient
+import swiftclient.utils as utils
+from swiftclient.client import Connection, ClientException
+from swiftclient.service import (
+ SwiftService, SwiftError, SwiftUploadObject
+)
+
+from test.unit import utils as test_utils
+
+
+clean_os_environ = {}
+environ_prefixes = ('ST_', 'OS_')
+for key in os.environ:
+ if any(key.startswith(m) for m in environ_prefixes):
+ clean_os_environ[key] = ''
+
+
+if six.PY2:
+ import __builtin__ as builtins
+else:
+ import builtins
+
+
+class TestSwiftPostObject(unittest.TestCase):
+
+ def setUp(self):
+ super(TestSwiftPostObject, self).setUp()
+ self.spo = swiftclient.service.SwiftPostObject
+
+ def test_create(self):
+ spo = self.spo('obj_name')
+
+ self.assertEqual(spo.object_name, 'obj_name')
+ self.assertIsNone(spo.options)
+
+ def test_create_with_invalid_name(self):
+ # empty strings are not allowed as names
+ self.assertRaises(SwiftError, self.spo, '')
+
+ # names cannot be anything but strings
+ self.assertRaises(SwiftError, self.spo, 1)
+
+
+class TestSwiftCopyObject(unittest.TestCase):
+
+ def setUp(self):
+ super(TestSwiftCopyObject, self).setUp()
+ self.sco = swiftclient.service.SwiftCopyObject
+
+ def test_create(self):
+ sco = self.sco('obj_name')
+
+ self.assertEqual(sco.object_name, 'obj_name')
+ self.assertIsNone(sco.destination)
+ self.assertFalse(sco.fresh_metadata)
+
+ sco = self.sco('obj_name',
+ {'destination': '/dest', 'fresh_metadata': True})
+
+ self.assertEqual(sco.object_name, 'obj_name')
+ self.assertEqual(sco.destination, '/dest/obj_name')
+ self.assertTrue(sco.fresh_metadata)
+
+ sco = self.sco('obj_name',
+ {'destination': '/dest/new_obj/a',
+ 'fresh_metadata': False})
+
+ self.assertEqual(sco.object_name, 'obj_name')
+ self.assertEqual(sco.destination, '/dest/new_obj/a')
+ self.assertFalse(sco.fresh_metadata)
+
+ def test_create_with_invalid_name(self):
+ # empty strings are not allowed as names
+ self.assertRaises(SwiftError, self.sco, '')
+
+ # names cannot be anything but strings
+ self.assertRaises(SwiftError, self.sco, 1)
+
+
+class TestSwiftReader(unittest.TestCase):
+
+ def setUp(self):
+ super(TestSwiftReader, self).setUp()
+ self.sr = swiftclient.service._SwiftReader
+ self.md5_type = type(md5())
+
+ def test_create(self):
+ sr = self.sr('path', 'body', {})
+
+ self.assertEqual(sr._path, 'path')
+ self.assertEqual(sr._body, 'body')
+ self.assertIsNone(sr._content_length)
+ self.assertFalse(sr._expected_md5)
+
+ self.assertIsNone(sr._actual_md5)
+
+ def test_create_with_large_object_headers(self):
+ # md5 should not be initialized if large object headers are present
+ sr = self.sr('path', 'body', {'x-object-manifest': 'test',
+ 'etag': '"%s"' % ('0' * 32)})
+ self.assertEqual(sr._path, 'path')
+ self.assertEqual(sr._body, 'body')
+ self.assertIsNone(sr._content_length)
+ self.assertFalse(sr._expected_md5)
+ self.assertIsNone(sr._actual_md5)
+
+ sr = self.sr('path', 'body', {'x-static-large-object': 'test',
+ 'etag': '"%s"' % ('0' * 32)})
+ self.assertEqual(sr._path, 'path')
+ self.assertEqual(sr._body, 'body')
+ self.assertIsNone(sr._content_length)
+ self.assertFalse(sr._expected_md5)
+ self.assertIsNone(sr._actual_md5)
+
+ def test_create_with_content_range_header(self):
+ # md5 should not be initialized if large object headers are present
+ sr = self.sr('path', 'body', {'content-range': 'bytes 0-3/10',
+ 'etag': '"%s"' % ('0' * 32)})
+ self.assertEqual(sr._path, 'path')
+ self.assertEqual(sr._body, 'body')
+ self.assertIsNone(sr._content_length)
+ self.assertFalse(sr._expected_md5)
+ self.assertIsNone(sr._actual_md5)
+
+ def test_create_with_ignore_checksum(self):
+ # md5 should not be initialized if checksum is False
+ sr = self.sr('path', 'body', {}, False)
+ self.assertEqual(sr._path, 'path')
+ self.assertEqual(sr._body, 'body')
+ self.assertIsNone(sr._content_length)
+ self.assertFalse(sr._expected_md5)
+ self.assertIsNone(sr._actual_md5)
+
+ def test_create_with_content_length(self):
+ sr = self.sr('path', 'body', {'content-length': 5})
+
+ self.assertEqual(sr._path, 'path')
+ self.assertEqual(sr._body, 'body')
+ self.assertEqual(sr._content_length, 5)
+ self.assertFalse(sr._expected_md5)
+
+ self.assertIsNone(sr._actual_md5)
+
+ # Check Contentlength raises error if it isn't an integer
+ self.assertRaises(SwiftError, self.sr, 'path', 'body',
+ {'content-length': 'notanint'})
+
+ def test_iterator_usage(self):
+ def _consume(sr):
+ for _ in sr:
+ pass
+
+ sr = self.sr('path', BytesIO(b'body'), {})
+ _consume(sr)
+
+ # Check error is raised if expected etag doesn't match calculated md5.
+ # md5 for a SwiftReader that has done nothing is
+ # d41d8cd98f00b204e9800998ecf8427e i.e md5 of nothing
+ sr = self.sr('path', BytesIO(b'body'),
+ {'etag': md5(b'doesntmatch').hexdigest()})
+ self.assertRaises(SwiftError, _consume, sr)
+
+ sr = self.sr('path', BytesIO(b'body'),
+ {'etag': md5(b'body').hexdigest()})
+ _consume(sr)
+
+ # Should still work if etag was quoted
+ sr = self.sr('path', BytesIO(b'body'),
+ {'etag': '"%s"' % md5(b'body').hexdigest()})
+ _consume(sr)
+
+ # Check error is raised if SwiftReader doesn't read the same length
+ # as the content length it is created with
+ sr = self.sr('path', BytesIO(b'body'), {'content-length': 5})
+ self.assertRaises(SwiftError, _consume, sr)
+
+ sr = self.sr('path', BytesIO(b'body'), {'content-length': 4})
+ _consume(sr)
+
+ # Check that the iterator generates expected length and etag values
+ sr = self.sr('path', ['abc'.encode()] * 3,
+ {'content-length': 9,
+ 'etag': md5('abc'.encode() * 3).hexdigest()})
+ _consume(sr)
+ self.assertEqual(sr._actual_read, 9)
+ self.assertEqual(sr._actual_md5.hexdigest(),
+ md5('abc'.encode() * 3).hexdigest())
+
+
+class _TestServiceBase(unittest.TestCase):
+ def _get_mock_connection(self, attempts=2):
+ m = Mock(spec=Connection)
+ type(m).attempts = PropertyMock(return_value=attempts)
+ type(m).auth_end_time = PropertyMock(return_value=4)
+ return m
+
+ def _get_queue(self, q):
+ # Instead of blocking pull items straight from the queue.
+ # expects at least one item otherwise the test will fail.
+ try:
+ return q.get_nowait()
+ except QueueEmptyError:
+ self.fail('Expected item in queue but found none')
+
+ def _get_expected(self, update=None):
+ expected = self.expected.copy()
+ if update:
+ expected.update(update)
+
+ return expected
+
+
+class TestServiceDelete(_TestServiceBase):
+ def setUp(self):
+ super(TestServiceDelete, self).setUp()
+ self.opts = {'leave_segments': False, 'yes_all': False}
+ self.exc = Exception('test_exc')
+ # Base response to be copied and updated to matched the expected
+ # response for each test
+ self.expected = {
+ 'action': None, # Should be string in the form delete_XX
+ 'container': 'test_c',
+ 'object': 'test_o',
+ 'attempts': 2,
+ 'response_dict': {},
+ 'success': None # Should be a bool
+ }
+
+ def test_delete_segment(self):
+ mock_q = Queue()
+ mock_conn = self._get_mock_connection()
+ expected_r = self._get_expected({
+ 'action': 'delete_segment',
+ 'object': 'test_s',
+ 'success': True,
+ })
+
+ r = SwiftService._delete_segment(mock_conn, 'test_c', 'test_s', mock_q)
+
+ mock_conn.delete_object.assert_called_once_with(
+ 'test_c', 'test_s', response_dict={}
+ )
+ self.assertEqual(expected_r, r)
+ self.assertEqual(expected_r, self._get_queue(mock_q))
+
+ def test_delete_segment_exception(self):
+ mock_q = Queue()
+ mock_conn = self._get_mock_connection()
+ mock_conn.delete_object = Mock(side_effect=self.exc)
+ expected_r = self._get_expected({
+ 'action': 'delete_segment',
+ 'object': 'test_s',
+ 'success': False,
+ 'error': self.exc,
+ 'traceback': mock.ANY,
+ 'error_timestamp': mock.ANY
+ })
+
+ before = time.time()
+ r = SwiftService._delete_segment(mock_conn, 'test_c', 'test_s', mock_q)
+ after = time.time()
+
+ mock_conn.delete_object.assert_called_once_with(
+ 'test_c', 'test_s', response_dict={}
+ )
+ self.assertEqual(expected_r, r)
+ self.assertEqual(expected_r, self._get_queue(mock_q))
+ self.assertGreaterEqual(r['error_timestamp'], before)
+ self.assertLessEqual(r['error_timestamp'], after)
+ self.assertIn('Traceback', r['traceback'])
+
+ def test_delete_object(self):
+ mock_q = Queue()
+ mock_conn = self._get_mock_connection()
+ mock_conn.head_object = Mock(return_value={})
+ expected_r = self._get_expected({
+ 'action': 'delete_object',
+ 'success': True
+ })
+
+ s = SwiftService()
+ r = s._delete_object(mock_conn, 'test_c', 'test_o', self.opts, mock_q)
+
+ mock_conn.head_object.assert_called_once_with(
+ 'test_c', 'test_o', query_string='symlink=get', headers={})
+ mock_conn.delete_object.assert_called_once_with(
+ 'test_c', 'test_o', query_string=None, response_dict={},
+ headers={}
+ )
+ self.assertEqual(expected_r, r)
+
+ def test_delete_object_with_headers(self):
+ mock_q = Queue()
+ mock_conn = self._get_mock_connection()
+ mock_conn.head_object = Mock(return_value={})
+ expected_r = self._get_expected({
+ 'action': 'delete_object',
+ 'success': True
+ })
+ opt_c = self.opts.copy()
+ opt_c['header'] = ['Skip-Middleware: Test']
+
+ s = SwiftService()
+ r = s._delete_object(mock_conn, 'test_c', 'test_o', opt_c, mock_q)
+
+ mock_conn.head_object.assert_called_once_with(
+ 'test_c', 'test_o', headers={'Skip-Middleware': 'Test'},
+ query_string='symlink=get')
+ mock_conn.delete_object.assert_called_once_with(
+ 'test_c', 'test_o', query_string=None, response_dict={},
+ headers={'Skip-Middleware': 'Test'}
+ )
+ self.assertEqual(expected_r, r)
+
+ def test_delete_object_exception(self):
+ mock_q = Queue()
+ mock_conn = self._get_mock_connection()
+ mock_conn.delete_object = Mock(side_effect=self.exc)
+ expected_r = self._get_expected({
+ 'action': 'delete_object',
+ 'success': False,
+ 'error': self.exc,
+ 'traceback': mock.ANY,
+ 'error_timestamp': mock.ANY
+ })
+ # _delete_object doesn't populate attempts or response dict if it hits
+ # an error. This may not be the correct behaviour.
+ del expected_r['response_dict'], expected_r['attempts']
+
+ before = time.time()
+ s = SwiftService()
+ r = s._delete_object(mock_conn, 'test_c', 'test_o', self.opts, mock_q)
+ after = time.time()
+
+ mock_conn.head_object.assert_called_once_with(
+ 'test_c', 'test_o', query_string='symlink=get', headers={})
+ mock_conn.delete_object.assert_called_once_with(
+ 'test_c', 'test_o', query_string=None, response_dict={},
+ headers={}
+ )
+ self.assertEqual(expected_r, r)
+ self.assertGreaterEqual(r['error_timestamp'], before)
+ self.assertLessEqual(r['error_timestamp'], after)
+ self.assertIn('Traceback', r['traceback'])
+
+ def test_delete_object_slo_support(self):
+ # If SLO headers are present the delete call should include an
+ # additional query string to cause the right delete server side
+ mock_q = Queue()
+ mock_conn = self._get_mock_connection()
+ mock_conn.head_object = Mock(
+ return_value={'x-static-large-object': True}
+ )
+ expected_r = self._get_expected({
+ 'action': 'delete_object',
+ 'success': True
+ })
+
+ s = SwiftService()
+ r = s._delete_object(mock_conn, 'test_c', 'test_o', self.opts, mock_q)
+
+ mock_conn.head_object.assert_called_once_with(
+ 'test_c', 'test_o', query_string='symlink=get', headers={})
+ mock_conn.delete_object.assert_called_once_with(
+ 'test_c', 'test_o',
+ query_string='multipart-manifest=delete',
+ response_dict={},
+ headers={}
+ )
+ self.assertEqual(expected_r, r)
+
+ def test_delete_object_dlo_support(self):
+ mock_q = Queue()
+ s = SwiftService()
+ mock_conn = self._get_mock_connection()
+ expected_r = self._get_expected({
+ 'action': 'delete_object',
+ 'success': True,
+ 'dlo_segments_deleted': True
+ })
+ # A DLO object is determined in _delete_object by heading the object
+ # and checking for the existence of a x-object-manifest header.
+ # Mock that here.
+ mock_conn.head_object = Mock(
+ return_value={'x-object-manifest': 'manifest_c/manifest_p'}
+ )
+ mock_conn.get_container = Mock(
+ side_effect=[(None, [{'name': 'test_seg_1'},
+ {'name': 'test_seg_2'}]),
+ (None, {})]
+ )
+
+ def get_mock_list_conn(options):
+ return mock_conn
+
+ with mock.patch('swiftclient.service.get_conn', get_mock_list_conn):
+ r = s._delete_object(
+ mock_conn, 'test_c', 'test_o', self.opts, mock_q
+ )
+
+ self.assertEqual(expected_r, r)
+ expected = [
+ mock.call('test_c', 'test_o', query_string=None, response_dict={},
+ headers={}),
+ mock.call('manifest_c', 'test_seg_1', response_dict={}),
+ mock.call('manifest_c', 'test_seg_2', response_dict={})]
+ mock_conn.delete_object.assert_has_calls(expected, any_order=True)
+
+ def test_delete_empty_container(self):
+ mock_conn = self._get_mock_connection()
+ expected_r = self._get_expected({
+ 'action': 'delete_container',
+ 'success': True,
+ 'object': None
+ })
+
+ r = SwiftService._delete_empty_container(mock_conn, 'test_c',
+ self.opts)
+
+ mock_conn.delete_container.assert_called_once_with(
+ 'test_c', response_dict={}, headers={}
+ )
+ self.assertEqual(expected_r, r)
+
+ def test_delete_empty_container_with_headers(self):
+ mock_conn = self._get_mock_connection()
+ expected_r = self._get_expected({
+ 'action': 'delete_container',
+ 'success': True,
+ 'object': None
+ })
+ opt_c = self.opts.copy()
+ opt_c['header'] = ['Skip-Middleware: Test']
+
+ r = SwiftService._delete_empty_container(mock_conn, 'test_c', opt_c)
+
+ mock_conn.delete_container.assert_called_once_with(
+ 'test_c', response_dict={}, headers={'Skip-Middleware': 'Test'}
+ )
+ self.assertEqual(expected_r, r)
+
+ def test_delete_empty_container_exception(self):
+ mock_conn = self._get_mock_connection()
+ mock_conn.delete_container = Mock(side_effect=self.exc)
+ expected_r = self._get_expected({
+ 'action': 'delete_container',
+ 'success': False,
+ 'object': None,
+ 'error': self.exc,
+ 'traceback': mock.ANY,
+ 'error_timestamp': mock.ANY
+ })
+
+ before = time.time()
+ s = SwiftService()
+ r = s._delete_empty_container(mock_conn, 'test_c', {})
+ after = time.time()
+
+ mock_conn.delete_container.assert_called_once_with(
+ 'test_c', response_dict={}, headers={}
+ )
+ self.assertEqual(expected_r, r)
+ self.assertGreaterEqual(r['error_timestamp'], before)
+ self.assertLessEqual(r['error_timestamp'], after)
+ self.assertIn('Traceback', r['traceback'])
+
+ @mock.patch.object(swiftclient.service.SwiftService, 'capabilities',
+ lambda *a: {'action': 'capabilities',
+ 'timestamp': time.time(),
+ 'success': True,
+ 'capabilities': {
+ 'bulk_delete':
+ {'max_deletes_per_request': 10}}
+ })
+ def test_bulk_delete_page_size(self):
+ # make a list of 100 objects
+ obj_list = ['x%02d' % i for i in range(100)]
+ errors = []
+
+ # _bulk_delete_page_size uses 2x the number of threads to determine
+ # if if there are "many" object to delete or not
+
+ # format is: [(thread_count, expected result), ...]
+ obj_threads_exp = [
+ (10, 10), # something small
+ (49, 10), # just under the bounds
+ (50, 1), # cutover point
+ (51, 1), # just over bounds
+ (100, 1), # something big
+ ]
+ for thread_count, exp in obj_threads_exp:
+ s = SwiftService(options={'object_dd_threads': thread_count})
+ res = s._bulk_delete_page_size(obj_list)
+ if res != exp:
+ msg = 'failed for thread_count %d: got %r expected %r' % \
+ (thread_count, res, exp)
+ errors.append(msg)
+ if errors:
+ self.fail('_bulk_delete_page_size() failed\n' + '\n'.join(errors))
+
+
+class TestSwiftError(unittest.TestCase):
+
+ def test_is_exception(self):
+ se = SwiftError(5)
+ self.assertIsInstance(se, Exception)
+
+ def test_empty_swifterror_creation(self):
+ se = SwiftError(5)
+
+ self.assertEqual(se.value, 5)
+ self.assertIsNone(se.container)
+ self.assertIsNone(se.obj)
+ self.assertIsNone(se.segment)
+ self.assertIsNone(se.exception)
+
+ self.assertEqual(str(se), '5')
+
+ def test_swifterror_creation(self):
+ test_exc = Exception('test exc')
+ se = SwiftError(5, 'con', 'obj', 'seg', test_exc)
+
+ self.assertEqual(se.value, 5)
+ self.assertEqual(se.container, 'con')
+ self.assertEqual(se.obj, 'obj')
+ self.assertEqual(se.segment, 'seg')
+ self.assertEqual(se.exception, test_exc)
+
+ self.assertEqual(str(se), '5 container:con object:obj segment:seg')
+
+
+class TestServiceUtils(unittest.TestCase):
+
+ def setUp(self):
+ super(TestServiceUtils, self).setUp()
+ with mock.patch.dict(swiftclient.service.environ, clean_os_environ):
+ swiftclient.service._default_global_options = \
+ swiftclient.service._build_default_global_options()
+ self.opts = swiftclient.service._default_global_options.copy()
+
+ def test_process_options_defaults(self):
+ # The only actions that should be taken on default options set is
+ # to change the auth version to v2.0 and create the os_options dict
+ opt_c = self.opts.copy()
+
+ swiftclient.service.process_options(opt_c)
+
+ self.assertIn('os_options', opt_c)
+ del opt_c['os_options']
+ self.assertEqual(opt_c['auth_version'], '2.0')
+ opt_c['auth_version'] = '1.0'
+
+ self.assertEqual(opt_c, self.opts)
+
+ def test_process_options_auth_version(self):
+ # auth_version should be set to 2.0
+ # if it isn't already set to 3.0
+ # and the v1 command line arguments aren't present
+ opt_c = self.opts.copy()
+
+ # Check v3 isn't changed
+ opt_c['auth_version'] = '3'
+ swiftclient.service.process_options(opt_c)
+ self.assertEqual(opt_c['auth_version'], '3')
+
+ # Check v1 isn't changed if user, key and auth are set
+ opt_c = self.opts.copy()
+ opt_c['auth_version'] = '1'
+ opt_c['auth'] = True
+ opt_c['user'] = True
+ opt_c['key'] = True
+ swiftclient.service.process_options(opt_c)
+ self.assertEqual(opt_c['auth_version'], '1')
+
+ def test_process_options_new_style_args(self):
+ # checks new style args are copied to old style
+ # when old style don't exist
+ opt_c = self.opts.copy()
+
+ opt_c['auth'] = ''
+ opt_c['user'] = ''
+ opt_c['key'] = ''
+ opt_c['os_auth_url'] = 'os_auth'
+ opt_c['os_username'] = 'os_user'
+ opt_c['os_password'] = 'os_pass'
+ swiftclient.service.process_options(opt_c)
+ self.assertEqual(opt_c['auth_version'], '2.0')
+ self.assertEqual(opt_c['auth'], 'os_auth')
+ self.assertEqual(opt_c['user'], 'os_user')
+ self.assertEqual(opt_c['key'], 'os_pass')
+
+ # Check old style args are left alone if they exist
+ opt_c = self.opts.copy()
+ opt_c['auth'] = 'auth'
+ opt_c['user'] = 'user'
+ opt_c['key'] = 'key'
+ opt_c['os_auth_url'] = 'os_auth'
+ opt_c['os_username'] = 'os_user'
+ opt_c['os_password'] = 'os_pass'
+ swiftclient.service.process_options(opt_c)
+ self.assertEqual(opt_c['auth_version'], '1.0')
+ self.assertEqual(opt_c['auth'], 'auth')
+ self.assertEqual(opt_c['user'], 'user')
+ self.assertEqual(opt_c['key'], 'key')
+
+ def test_split_headers(self):
+ mock_headers = ['color:blue', 'SIZE: large']
+ expected = {'Color': 'blue', 'Size': 'large'}
+
+ actual = swiftclient.service.split_headers(mock_headers)
+ self.assertEqual(expected, actual)
+
+ def test_split_headers_prefix(self):
+ mock_headers = ['color:blue', 'size:large']
+ expected = {'Prefix-Color': 'blue', 'Prefix-Size': 'large'}
+
+ actual = swiftclient.service.split_headers(mock_headers, 'prefix-')
+ self.assertEqual(expected, actual)
+
+ def test_split_headers_list_of_tuples(self):
+ mock_headers = [('color', 'blue'), ('size', 'large')]
+ expected = {'Prefix-Color': 'blue', 'Prefix-Size': 'large'}
+
+ actual = swiftclient.service.split_headers(mock_headers, 'prefix-')
+ self.assertEqual(expected, actual)
+
+ def test_split_headers_dict(self):
+ expected = {'Color': 'blue', 'Size': 'large'}
+
+ actual = swiftclient.service.split_headers(expected)
+ self.assertEqual(expected, actual)
+
+ def test_split_headers_error(self):
+ self.assertRaises(SwiftError, swiftclient.service.split_headers,
+ ['notvalid'])
+ self.assertRaises(SwiftError, swiftclient.service.split_headers,
+ [('also', 'not', 'valid')])
+
+
+class TestSwiftUploadObject(unittest.TestCase):
+
+ def setUp(self):
+ self.suo = swiftclient.service.SwiftUploadObject
+ super(TestSwiftUploadObject, self).setUp()
+
+ def test_create_with_string(self):
+ suo = self.suo('source')
+ self.assertEqual(suo.source, 'source')
+ self.assertEqual(suo.object_name, 'source')
+ self.assertIsNone(suo.options)
+
+ suo = self.suo('source', 'obj_name')
+ self.assertEqual(suo.source, 'source')
+ self.assertEqual(suo.object_name, 'obj_name')
+ self.assertIsNone(suo.options)
+
+ suo = self.suo('source', 'obj_name', {'opt': '123'})
+ self.assertEqual(suo.source, 'source')
+ self.assertEqual(suo.object_name, 'obj_name')
+ self.assertEqual(suo.options, {'opt': '123'})
+
+ def test_create_with_file(self):
+ with tempfile.TemporaryFile() as mock_file:
+ # Check error is raised if no object name is provided with a
+ # filelike object
+ self.assertRaises(SwiftError, self.suo, mock_file)
+
+ # Check that empty strings are invalid object names
+ self.assertRaises(SwiftError, self.suo, mock_file, '')
+
+ suo = self.suo(mock_file, 'obj_name')
+ self.assertEqual(suo.source, mock_file)
+ self.assertEqual(suo.object_name, 'obj_name')
+ self.assertIsNone(suo.options)
+
+ suo = self.suo(mock_file, 'obj_name', {'opt': '123'})
+ self.assertEqual(suo.source, mock_file)
+ self.assertEqual(suo.object_name, 'obj_name')
+ self.assertEqual(suo.options, {'opt': '123'})
+
+ def test_create_with_no_source(self):
+ suo = self.suo(None, 'obj_name')
+ self.assertIsNone(suo.source)
+ self.assertEqual(suo.object_name, 'obj_name')
+ self.assertIsNone(suo.options)
+
+ # Check error is raised if source is None without an object name
+ self.assertRaises(SwiftError, self.suo, None)
+
+ def test_create_with_invalid_source(self):
+ # Source can only be None, string or filelike object,
+ # check an error is raised with an invalid type.
+ self.assertRaises(SwiftError, self.suo, [])
+
+
+class TestServiceList(_TestServiceBase):
+ def setUp(self):
+ super(TestServiceList, self).setUp()
+ self.opts = {'prefix': None, 'long': False, 'delimiter': ''}
+ self.exc = Exception('test_exc')
+ # Base response to be copied and updated to matched the expected
+ # response for each test
+ self.expected = {
+ 'action': None, # Should be list_X_part (account or container)
+ 'container': None, # Should be a string when listing a container
+ 'prefix': None,
+ 'success': None # Should be a bool
+ }
+
+ def test_list_account(self):
+ mock_q = Queue()
+ mock_conn = self._get_mock_connection()
+ get_account_returns = [
+ (None, [{'name': 'test_c'}]),
+ (None, [])
+ ]
+ mock_conn.get_account = Mock(side_effect=get_account_returns)
+
+ expected_r = self._get_expected({
+ 'action': 'list_account_part',
+ 'success': True,
+ 'listing': [{'name': 'test_c'}],
+ 'marker': ''
+ })
+
+ SwiftService._list_account_job(
+ mock_conn, self.opts, mock_q
+ )
+ self.assertEqual(expected_r, self._get_queue(mock_q))
+ self.assertIsNone(self._get_queue(mock_q))
+
+ long_opts = dict(self.opts, **{'long': True})
+ mock_conn.head_container = Mock(return_value={'test_m': '1'})
+ get_account_returns = [
+ (None, [{'name': 'test_c'}]),
+ (None, [])
+ ]
+ mock_conn.get_account = Mock(side_effect=get_account_returns)
+
+ expected_r_long = self._get_expected({
+ 'action': 'list_account_part',
+ 'success': True,
+ 'listing': [{'name': 'test_c', 'meta': {'test_m': '1'}}],
+ 'marker': '',
+ })
+
+ SwiftService._list_account_job(
+ mock_conn, long_opts, mock_q
+ )
+ self.assertEqual(expected_r_long, self._get_queue(mock_q))
+ self.assertIsNone(self._get_queue(mock_q))
+
+ def test_list_account_with_headers(self):
+ mock_q = Queue()
+ mock_conn = self._get_mock_connection()
+ get_account_returns = [
+ (None, [{'name': 'test_c'}]),
+ (None, [])
+ ]
+ mock_conn.get_account = Mock(side_effect=get_account_returns)
+
+ expected_r = self._get_expected({
+ 'action': 'list_account_part',
+ 'success': True,
+ 'listing': [{'name': 'test_c'}],
+ 'marker': ''
+ })
+ opt_c = self.opts.copy()
+ opt_c['header'] = ['Skip-Middleware: True']
+ SwiftService._list_account_job(
+ mock_conn, opt_c, mock_q
+ )
+ self.assertEqual(expected_r, self._get_queue(mock_q))
+ self.assertIsNone(self._get_queue(mock_q))
+ self.assertEqual(mock_conn.get_account.mock_calls, [
+ mock.call(headers={'Skip-Middleware': 'True'}, marker='',
+ prefix=None),
+ mock.call(headers={'Skip-Middleware': 'True'}, marker='test_c',
+ prefix=None)])
+
+ def test_list_account_exception(self):
+ mock_q = Queue()
+ mock_conn = self._get_mock_connection()
+ mock_conn.get_account = Mock(side_effect=self.exc)
+ expected_r = self._get_expected({
+ 'action': 'list_account_part',
+ 'success': False,
+ 'error': self.exc,
+ 'marker': '',
+ 'traceback': mock.ANY,
+ 'error_timestamp': mock.ANY
+ })
+
+ SwiftService._list_account_job(
+ mock_conn, self.opts, mock_q)
+
+ mock_conn.get_account.assert_called_once_with(
+ marker='', prefix=None, headers={}
+ )
+ self.assertEqual(expected_r, self._get_queue(mock_q))
+ self.assertIsNone(self._get_queue(mock_q))
+
+ def test_list_container(self):
+ mock_q = Queue()
+ mock_conn = self._get_mock_connection()
+ get_container_returns = [
+ (None, [{'name': 'test_o'}]),
+ (None, [])
+ ]
+ mock_conn.get_container = Mock(side_effect=get_container_returns)
+
+ expected_r = self._get_expected({
+ 'action': 'list_container_part',
+ 'container': 'test_c',
+ 'success': True,
+ 'listing': [{'name': 'test_o'}],
+ 'marker': ''
+ })
+
+ SwiftService._list_container_job(
+ mock_conn, 'test_c', self.opts, mock_q
+ )
+ self.assertEqual(expected_r, self._get_queue(mock_q))
+ self.assertIsNone(self._get_queue(mock_q))
+
+ long_opts = dict(self.opts, **{'long': True})
+ mock_conn.head_container = Mock(return_value={'test_m': '1'})
+ get_container_returns = [
+ (None, [{'name': 'test_o'}]),
+ (None, [])
+ ]
+ mock_conn.get_container = Mock(side_effect=get_container_returns)
+
+ expected_r_long = self._get_expected({
+ 'action': 'list_container_part',
+ 'container': 'test_c',
+ 'success': True,
+ 'listing': [{'name': 'test_o'}],
+ 'marker': ''
+ })
+
+ SwiftService._list_container_job(
+ mock_conn, 'test_c', long_opts, mock_q
+ )
+ self.assertEqual(expected_r_long, self._get_queue(mock_q))
+ self.assertIsNone(self._get_queue(mock_q))
+
+ def test_list_container_marker(self):
+ mock_q = Queue()
+ mock_conn = self._get_mock_connection()
+
+ get_container_returns = [
+ (None, [{'name': 'b'}, {'name': 'c'}]),
+ (None, [])
+ ]
+ mock_get_cont = Mock(side_effect=get_container_returns)
+ mock_conn.get_container = mock_get_cont
+
+ expected_r = self._get_expected({
+ 'action': 'list_container_part',
+ 'container': 'test_c',
+ 'success': True,
+ 'listing': [{'name': 'b'}, {'name': 'c'}],
+ 'marker': 'b'
+ })
+
+ _opts = self.opts.copy()
+ _opts['marker'] = 'b'
+ SwiftService._list_container_job(mock_conn, 'test_c', _opts, mock_q)
+
+ # This does not test if the marker is propagated, because we always
+ # get the final call to the get_container with the final item 'c',
+ # even if marker wasn't set. This test just makes sure the whole
+ # stack works in a sane way.
+ mock_kw = mock_get_cont.call_args[1]
+ self.assertEqual(mock_kw['marker'], 'c')
+
+ # This tests that the lower levels get the marker delivered.
+ self.assertEqual(expected_r, self._get_queue(mock_q))
+
+ self.assertIsNone(self._get_queue(mock_q))
+
+ def test_list_container_with_headers(self):
+ mock_q = Queue()
+ mock_conn = self._get_mock_connection()
+ get_container_returns = [
+ (None, [{'name': 'test_o'}]),
+ (None, [])
+ ]
+ mock_conn.get_container = Mock(side_effect=get_container_returns)
+
+ expected_r = self._get_expected({
+ 'action': 'list_container_part',
+ 'container': 'test_c',
+ 'success': True,
+ 'listing': [{'name': 'test_o'}],
+ 'marker': ''
+ })
+
+ opt_c = self.opts.copy()
+ opt_c['header'] = ['Skip-Middleware: Test']
+
+ SwiftService._list_container_job(
+ mock_conn, 'test_c', opt_c, mock_q
+ )
+ self.assertEqual(expected_r, self._get_queue(mock_q))
+ self.assertIsNone(self._get_queue(mock_q))
+ self.assertEqual(mock_conn.get_container.mock_calls, [
+ mock.call('test_c', headers={'Skip-Middleware': 'Test'},
+ delimiter='', marker='', prefix=None),
+ mock.call('test_c', headers={'Skip-Middleware': 'Test'},
+ delimiter='', marker='test_o', prefix=None)])
+
+ def test_list_container_exception(self):
+ mock_q = Queue()
+ mock_conn = self._get_mock_connection()
+ mock_conn.get_container = Mock(side_effect=self.exc)
+ expected_r = self._get_expected({
+ 'action': 'list_container_part',
+ 'container': 'test_c',
+ 'success': False,
+ 'error': self.exc,
+ 'marker': '',
+ 'error_timestamp': mock.ANY,
+ 'traceback': mock.ANY
+ })
+
+ SwiftService._list_container_job(
+ mock_conn, 'test_c', self.opts, mock_q
+ )
+
+ mock_conn.get_container.assert_called_once_with(
+ 'test_c', marker='', delimiter='', prefix=None, headers={}
+ )
+ self.assertEqual(expected_r, self._get_queue(mock_q))
+ self.assertIsNone(self._get_queue(mock_q))
+
+ @mock.patch('swiftclient.service.get_conn')
+ def test_list_queue_size(self, mock_get_conn):
+ mock_conn = self._get_mock_connection()
+ # Return more results than should fit in the results queue
+ get_account_returns = [
+ (None, [{'name': 'container1'}]),
+ (None, [{'name': 'container2'}]),
+ (None, [{'name': 'container3'}]),
+ (None, [{'name': 'container4'}]),
+ (None, [{'name': 'container5'}]),
+ (None, [{'name': 'container6'}]),
+ (None, [{'name': 'container7'}]),
+ (None, [{'name': 'container8'}]),
+ (None, [{'name': 'container9'}]),
+ (None, [{'name': 'container10'}]),
+ (None, [{'name': 'container11'}]),
+ (None, [{'name': 'container12'}]),
+ (None, [{'name': 'container13'}]),
+ (None, [{'name': 'container14'}]),
+ (None, [])
+ ]
+ mock_conn.get_account = Mock(side_effect=get_account_returns)
+ mock_get_conn.return_value = mock_conn
+
+ s = SwiftService(options=self.opts)
+ lg = s.list()
+
+ # Start the generator
+ first_list_part = next(lg)
+
+ # Wait for the number of calls to get_account to reach our expected
+ # value, then let it run some more to make sure the value remains
+ # stable
+ count = mock_conn.get_account.call_count
+ stable = 0
+ while mock_conn.get_account.call_count != count or stable < 5:
+ if mock_conn.get_account.call_count == count:
+ stable += 1
+ else:
+ count = mock_conn.get_account.call_count
+ stable = 0
+ # The test requires a small sleep to allow other threads to
+ # execute - in this mocked environment we assume that if the call
+ # count to get_account has not changed in 0.25s then no more calls
+ # will be made.
+ sleep(0.05)
+
+ stable_get_account_call_count = mock_conn.get_account.call_count
+
+ # Collect all remaining results from the generator
+ list_results = [first_list_part] + list(lg)
+
+ # Make sure the stable call count is correct - this should be 12 calls
+ # to get_account;
+ # 1 for first_list_part
+ # 10 for the values on the queue
+ # 1 for the value blocking whilst trying to place onto the queue
+ self.assertEqual(12, stable_get_account_call_count)
+
+ # Make sure all the containers were listed and placed onto the queue
+ self.assertEqual(15, mock_conn.get_account.call_count)
+
+ # Check the results were all returned
+ observed_listing = []
+ for lir in list_results:
+ observed_listing.append(
+ [li['name'] for li in lir['listing']]
+ )
+ expected_listing = []
+ for gar in get_account_returns[:-1]: # The empty list is not returned
+ expected_listing.append(
+ [li['name'] for li in gar[1]]
+ )
+ self.assertEqual(observed_listing, expected_listing)
+
+
+class TestService(unittest.TestCase):
+
+ def test_upload_with_bad_segment_size(self):
+ for bad in ('ten', '1234X', '100.3'):
+ options = {'segment_size': bad}
+ try:
+ service = SwiftService(options)
+ next(service.upload('c', 'o'))
+ self.fail('Expected SwiftError when segment_size=%s' % bad)
+ except SwiftError as exc:
+ self.assertEqual('Segment size should be an integer value',
+ exc.value)
+
+ @mock.patch('swiftclient.service.stat')
+ @mock.patch('swiftclient.service.getmtime', return_value=1.0)
+ @mock.patch('swiftclient.service.getsize', return_value=4)
+ def test_upload_with_relative_path(self, *args, **kwargs):
+ service = SwiftService({})
+ objects = [{'path': "./testobj",
+ 'strt_indx': 2},
+ {'path': os.path.join(os.getcwd(), "testobj"),
+ 'strt_indx': 1},
+ {'path': ".\\testobj",
+ 'strt_indx': 2}]
+ for obj in objects:
+ with mock.patch('swiftclient.service.Connection') as mock_conn, \
+ mock.patch.object(builtins, 'open') as mock_open:
+ mock_open.return_value = six.StringIO('asdf')
+ mock_conn.return_value.head_object.side_effect = \
+ ClientException('Not Found', http_status=404)
+ mock_conn.return_value.put_object.return_value =\
+ 'd41d8cd98f00b204e9800998ecf8427e'
+ resp_iter = service.upload(
+ 'c', [SwiftUploadObject(obj['path'])])
+ responses = [x for x in resp_iter]
+ for resp in responses:
+ self.assertIsNone(resp.get('error'))
+ self.assertIs(True, resp['success'])
+ self.assertEqual(2, len(responses))
+ create_container_resp, upload_obj_resp = responses
+ self.assertEqual(create_container_resp['action'],
+ 'create_container')
+ self.assertEqual(upload_obj_resp['action'],
+ 'upload_object')
+ self.assertEqual(upload_obj_resp['object'],
+ obj['path'][obj['strt_indx']:])
+ self.assertEqual(upload_obj_resp['path'], obj['path'])
+ self.assertTrue(mock_open.return_value.closed)
+
+ @mock.patch('swiftclient.service.Connection')
+ def test_upload_stream(self, mock_conn):
+ service = SwiftService({})
+
+ stream = test_utils.FakeStream(2048)
+ segment_etag = md5(b'A' * 1024).hexdigest()
+
+ mock_conn.return_value.head_object.side_effect = \
+ ClientException('Not Found', http_status=404)
+ mock_conn.return_value.put_object.return_value = \
+ segment_etag
+ options = {'use_slo': True, 'segment_size': 1024}
+ resp_iter = service.upload(
+ 'container',
+ [SwiftUploadObject(stream, object_name='streamed')],
+ options)
+ responses = [x for x in resp_iter]
+ for resp in responses:
+ self.assertFalse('error' in resp)
+ self.assertTrue(resp['success'])
+ self.assertEqual(5, len(responses))
+ container_resp, segment_container_resp = responses[0:2]
+ segment_response = responses[2:4]
+ upload_obj_resp = responses[-1]
+ self.assertEqual(container_resp['action'],
+ 'create_container')
+ self.assertEqual(upload_obj_resp['action'],
+ 'upload_object')
+ self.assertEqual(upload_obj_resp['object'],
+ 'streamed')
+ self.assertTrue(upload_obj_resp['path'] is None)
+ self.assertTrue(upload_obj_resp['large_object'])
+ self.assertIn('manifest_response_dict', upload_obj_resp)
+ self.assertEqual(upload_obj_resp['manifest_response_dict'], {})
+ for i, resp in enumerate(segment_response):
+ self.assertEqual(i, resp['segment_index'])
+ self.assertEqual(1024, resp['segment_size'])
+ self.assertEqual('d47b127bc2de2d687ddc82dac354c415',
+ resp['segment_etag'])
+ self.assertTrue(resp['segment_location'].endswith(
+ '/0000000%d' % i))
+ self.assertTrue(resp['segment_location'].startswith(
+ '/container_segments/streamed'))
+
+ @mock.patch('swiftclient.service.Connection')
+ def test_upload_stream_fits_in_one_segment(self, mock_conn):
+ service = SwiftService({})
+
+ stream = test_utils.FakeStream(2048)
+ whole_etag = md5(b'A' * 2048).hexdigest()
+
+ mock_conn.return_value.head_object.side_effect = \
+ ClientException('Not Found', http_status=404)
+ mock_conn.return_value.put_object.return_value = \
+ whole_etag
+ options = {'use_slo': True, 'segment_size': 10240}
+ resp_iter = service.upload(
+ 'container',
+ [SwiftUploadObject(stream, object_name='streamed')],
+ options)
+ responses = [x for x in resp_iter]
+ for resp in responses:
+ self.assertNotIn('error', resp)
+ self.assertTrue(resp['success'])
+ self.assertEqual(3, len(responses))
+ container_resp, segment_container_resp = responses[0:2]
+ upload_obj_resp = responses[-1]
+ self.assertEqual(container_resp['action'],
+ 'create_container')
+ self.assertEqual(upload_obj_resp['action'],
+ 'upload_object')
+ self.assertEqual(upload_obj_resp['object'],
+ 'streamed')
+ self.assertTrue(upload_obj_resp['path'] is None)
+ self.assertFalse(upload_obj_resp['large_object'])
+ self.assertNotIn('manifest_response_dict', upload_obj_resp)
+
+
+class TestServiceUpload(_TestServiceBase):
+
+ @contextlib.contextmanager
+ def assert_open_results_are_closed(self):
+ opened_files = []
+ builtin_open = builtins.open
+
+ def open_wrapper(*a, **kw):
+ opened_files.append((builtin_open(*a, **kw), a, kw))
+ return opened_files[-1][0]
+
+ with mock.patch.object(builtins, 'open', open_wrapper):
+ yield
+ for fp, args, kwargs in opened_files:
+ formatted_args = [repr(a) for a in args]
+ formatted_args.extend('%s=%r' % kv for kv in kwargs.items())
+ formatted_args = ', '.join(formatted_args)
+ self.assertTrue(fp.closed,
+ 'Failed to close open(%s)' % formatted_args)
+
+ def test_upload_object_job_file_with_unicode_path(self):
+ # Uploading a file results in the file object being wrapped in a
+ # LengthWrapper. This test sets the options in such a way that much
+ # of _upload_object_job is skipped bringing the critical path down
+ # to around 60 lines to ease testing.
+ with tempfile.NamedTemporaryFile() as f:
+ f.write(b'a' * 30)
+ f.flush()
+ expected_r = {
+ 'action': 'upload_object',
+ 'attempts': 2,
+ 'container': 'test_c',
+ 'headers': {},
+ 'large_object': True,
+ 'object': 'テスト/dummy.dat',
+ 'manifest_response_dict': {},
+ 'segment_results': [{'action': 'upload_segment',
+ 'success': True}] * 3,
+ 'status': 'uploaded',
+ 'success': True,
+ }
+ expected_mtime = '%f' % os.path.getmtime(f.name)
+
+ mock_conn = mock.Mock()
+ mock_conn.put_object.return_value = ''
+ type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+
+ s = SwiftService()
+ with mock.patch.object(s, '_upload_segment_job') as mock_job:
+ mock_job.return_value = {
+ 'action': 'upload_segment',
+ 'success': True}
+
+ r = s._upload_object_job(conn=mock_conn,
+ container='test_c',
+ source=f.name,
+ obj='テスト/dummy.dat',
+ options=dict(s._options,
+ segment_size=10,
+ leave_segments=True))
+
+ mtime = r['headers']['x-object-meta-mtime']
+ self.assertEqual(expected_mtime, mtime)
+ del r['headers']['x-object-meta-mtime']
+
+ self.assertEqual(
+ 'test_c_segments/%E3%83%86%E3%82%B9%E3%83%88/dummy.dat/' +
+ '%s/30/10/' % mtime, r['headers']['x-object-manifest'])
+ del r['headers']['x-object-manifest']
+
+ self.assertEqual(r['path'], f.name)
+ del r['path']
+
+ self.assertEqual(r, expected_r)
+ self.assertEqual(mock_conn.put_object.call_count, 1)
+ mock_conn.put_object.assert_called_with('test_c', 'テスト/dummy.dat',
+ '',
+ content_length=0,
+ headers={},
+ response_dict={})
+
+ def test_upload_segment_job(self):
+ with tempfile.NamedTemporaryFile() as f:
+ f.write(b'a' * 10)
+ f.write(b'b' * 10)
+ f.write(b'c' * 10)
+ f.flush()
+
+ # run read() when put_object is called to calculate md5sum
+ def _consuming_conn(*a, **kw):
+ contents = a[2]
+ contents.read() # Force md5 calculation
+ return contents.get_md5sum()
+
+ mock_conn = mock.Mock()
+ mock_conn.put_object.side_effect = _consuming_conn
+ type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+ expected_r = {
+ 'action': 'upload_segment',
+ 'for_container': 'test_c',
+ 'for_object': 'test_o',
+ 'segment_index': 2,
+ 'segment_size': 10,
+ 'segment_location': '/test_c_segments/test_s_1',
+ 'log_line': 'test_o segment 2',
+ 'success': True,
+ 'response_dict': {},
+ 'segment_etag': md5(b'b' * 10).hexdigest(),
+ 'attempts': 2,
+ }
+
+ s = SwiftService()
+ with self.assert_open_results_are_closed():
+ r = s._upload_segment_job(conn=mock_conn,
+ path=f.name,
+ container='test_c',
+ segment_name='test_s_1',
+ segment_start=10,
+ segment_size=10,
+ segment_index=2,
+ obj_name='test_o',
+ options={'segment_container': None,
+ 'checksum': True})
+
+ self.assertEqual(r, expected_r)
+
+ self.assertEqual(mock_conn.put_object.call_count, 1)
+ mock_conn.put_object.assert_called_with(
+ 'test_c_segments', 'test_s_1',
+ mock.ANY,
+ content_length=10,
+ content_type='application/swiftclient-segment',
+ response_dict={})
+ contents = mock_conn.put_object.call_args[0][2]
+ self.assertIsInstance(contents, utils.LengthWrapper)
+ self.assertEqual(len(contents), 10)
+
+ def test_upload_stream_segment(self):
+ common_params = {
+ 'segment_container': 'segments',
+ 'segment_name': 'test_stream_2',
+ 'container': 'test_stream',
+ 'object': 'stream_object',
+ }
+ tests = [
+ {'test_params': {
+ 'segment_size': 1024,
+ 'segment_index': 2,
+ 'content_size': 1024},
+ 'put_object_args': {
+ 'container': 'segments',
+ 'object': 'test_stream_2'},
+ 'expected': {
+ 'complete': False,
+ 'segment_etag': md5(b'A' * 1024).hexdigest()}},
+ {'test_params': {
+ 'segment_size': 2048,
+ 'segment_index': 0,
+ 'content_size': 512},
+ 'put_object_args': {
+ 'container': 'test_stream',
+ 'object': 'stream_object'},
+ 'expected': {
+ 'complete': True,
+ 'segment_etag': md5(b'A' * 512).hexdigest()}},
+ # 0-sized segment should not be uploaded
+ {'test_params': {
+ 'segment_size': 1024,
+ 'segment_index': 1,
+ 'content_size': 0},
+ 'put_object_args': {},
+ 'expected': {
+ 'complete': True}},
+ # 0-sized objects should be uploaded
+ {'test_params': {
+ 'segment_size': 1024,
+ 'segment_index': 0,
+ 'content_size': 0},
+ 'put_object_args': {
+ 'container': 'test_stream',
+ 'object': 'stream_object'},
+ 'expected': {
+ 'complete': True,
+ 'segment_etag': md5(b'').hexdigest()}},
+ # Test boundary conditions
+ {'test_params': {
+ 'segment_size': 1024,
+ 'segment_index': 1,
+ 'content_size': 1023},
+ 'put_object_args': {
+ 'container': 'segments',
+ 'object': 'test_stream_2'},
+ 'expected': {
+ 'complete': True,
+ 'segment_etag': md5(b'A' * 1023).hexdigest()}},
+ {'test_params': {
+ 'segment_size': 2048,
+ 'segment_index': 0,
+ 'content_size': 2047},
+ 'put_object_args': {
+ 'container': 'test_stream',
+ 'object': 'stream_object'},
+ 'expected': {
+ 'complete': True,
+ 'segment_etag': md5(b'A' * 2047).hexdigest()}},
+ {'test_params': {
+ 'segment_size': 1024,
+ 'segment_index': 2,
+ 'content_size': 1025},
+ 'put_object_args': {
+ 'container': 'segments',
+ 'object': 'test_stream_2'},
+ 'expected': {
+ 'complete': False,
+ 'segment_etag': md5(b'A' * 1024).hexdigest()}},
+ ]
+
+ for test_args in tests:
+ params = test_args['test_params']
+ stream = test_utils.FakeStream(params['content_size'])
+ segment_size = params['segment_size']
+ segment_index = params['segment_index']
+
+ def _fake_put_object(*args, **kwargs):
+ contents = args[2]
+ # Consume and compute md5
+ return md5(contents).hexdigest()
+
+ mock_conn = mock.Mock()
+ mock_conn.put_object.side_effect = _fake_put_object
+
+ s = SwiftService()
+ resp = s._upload_stream_segment(
+ conn=mock_conn,
+ container=common_params['container'],
+ object_name=common_params['object'],
+ segment_container=common_params['segment_container'],
+ segment_name=common_params['segment_name'],
+ segment_size=segment_size,
+ segment_index=segment_index,
+ headers={},
+ fd=stream)
+ expected_args = test_args['expected']
+ put_args = test_args['put_object_args']
+ expected_response = {
+ 'segment_size': min(len(stream), segment_size),
+ 'complete': expected_args['complete'],
+ 'success': True,
+ }
+ if len(stream) or segment_index == 0:
+ segment_location = '/%s/%s' % (put_args['container'],
+ put_args['object'])
+ expected_response.update(
+ {'segment_index': segment_index,
+ 'segment_location': segment_location,
+ 'segment_etag': expected_args['segment_etag'],
+ 'for_object': common_params['object']})
+ mock_conn.put_object.assert_called_once_with(
+ put_args['container'],
+ put_args['object'],
+ mock.ANY,
+ content_length=min(len(stream), segment_size),
+ headers={'etag': expected_args['segment_etag']},
+ response_dict=mock.ANY)
+ else:
+ self.assertEqual([], mock_conn.put_object.mock_calls)
+ expected_response.update(
+ {'segment_index': None,
+ 'segment_location': None,
+ 'segment_etag': None})
+ self.assertEqual(expected_response, resp)
+
+ def test_etag_mismatch_with_ignore_checksum(self):
+ def _consuming_conn(*a, **kw):
+ contents = a[2]
+ contents.read() # Force md5 calculation
+ return 'badresponseetag'
+
+ with tempfile.NamedTemporaryFile() as f:
+ f.write(b'a' * 10)
+ f.write(b'b' * 10)
+ f.write(b'c' * 10)
+ f.flush()
+
+ mock_conn = mock.Mock()
+ mock_conn.put_object.side_effect = _consuming_conn
+ type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+
+ s = SwiftService()
+ r = s._upload_segment_job(conn=mock_conn,
+ path=f.name,
+ container='test_c',
+ segment_name='test_s_1',
+ segment_start=10,
+ segment_size=10,
+ segment_index=2,
+ obj_name='test_o',
+ options={'segment_container': None,
+ 'checksum': False})
+
+ self.assertIsNone(r.get('error'))
+ self.assertEqual(mock_conn.put_object.call_count, 1)
+ mock_conn.put_object.assert_called_with(
+ 'test_c_segments', 'test_s_1',
+ mock.ANY,
+ content_length=10,
+ content_type='application/swiftclient-segment',
+ response_dict={})
+ contents = mock_conn.put_object.call_args[0][2]
+ # Check that md5sum is not calculated.
+ self.assertEqual(contents.get_md5sum(), '')
+
+ def test_upload_segment_job_etag_mismatch(self):
+ def _consuming_conn(*a, **kw):
+ contents = a[2]
+ contents.read() # Force md5 calculation
+ return 'badresponseetag'
+
+ with tempfile.NamedTemporaryFile() as f:
+ f.write(b'a' * 10)
+ f.write(b'b' * 10)
+ f.write(b'c' * 10)
+ f.flush()
+
+ mock_conn = mock.Mock()
+ mock_conn.put_object.side_effect = _consuming_conn
+ type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+
+ s = SwiftService()
+ with self.assert_open_results_are_closed():
+ r = s._upload_segment_job(conn=mock_conn,
+ path=f.name,
+ container='test_c',
+ segment_name='test_s_1',
+ segment_start=10,
+ segment_size=10,
+ segment_index=2,
+ obj_name='test_o',
+ options={'segment_container': None,
+ 'checksum': True})
+
+ self.assertIn('md5 mismatch', str(r.get('error')))
+
+ self.assertEqual(mock_conn.put_object.call_count, 1)
+ mock_conn.put_object.assert_called_with(
+ 'test_c_segments', 'test_s_1',
+ mock.ANY,
+ content_length=10,
+ content_type='application/swiftclient-segment',
+ response_dict={})
+ contents = mock_conn.put_object.call_args[0][2]
+ self.assertEqual(contents.get_md5sum(), md5(b'b' * 10).hexdigest())
+
+ def test_upload_object_job_file(self):
+ # Uploading a file results in the file object being wrapped in a
+ # LengthWrapper. This test sets the options in such a way that much
+ # of _upload_object_job is skipped bringing the critical path down
+ # to around 60 lines to ease testing.
+ with tempfile.NamedTemporaryFile() as f:
+ f.write(b'a' * 30)
+ f.flush()
+ expected_r = {
+ 'action': 'upload_object',
+ 'attempts': 2,
+ 'container': 'test_c',
+ 'headers': {},
+ 'large_object': False,
+ 'object': 'test_o',
+ 'response_dict': {},
+ 'status': 'uploaded',
+ 'success': True,
+ }
+ expected_mtime = '%f' % os.path.getmtime(f.name)
+
+ # run read() when put_object is called to calculate md5sum
+ # md5sum is verified in _upload_object_job.
+ def _consuming_conn(*a, **kw):
+ contents = a[2]
+ contents.read() # Force md5 calculation
+ return contents.get_md5sum()
+
+ mock_conn = mock.Mock()
+ mock_conn.put_object.side_effect = _consuming_conn
+ type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+
+ s = SwiftService()
+ with self.assert_open_results_are_closed():
+ r = s._upload_object_job(conn=mock_conn,
+ container='test_c',
+ source=f.name,
+ obj='test_o',
+ options=dict(s._options,
+ leave_segments=True))
+
+ mtime = r['headers']['x-object-meta-mtime']
+ self.assertEqual(expected_mtime, mtime)
+ del r['headers']['x-object-meta-mtime']
+
+ self.assertEqual(r['path'], f.name)
+ del r['path']
+
+ self.assertEqual(r, expected_r)
+ self.assertEqual(mock_conn.put_object.call_count, 1)
+ mock_conn.put_object.assert_called_with('test_c', 'test_o',
+ mock.ANY,
+ content_length=30,
+ headers={},
+ response_dict={})
+ contents = mock_conn.put_object.call_args[0][2]
+ self.assertIsInstance(contents, utils.LengthWrapper)
+ self.assertEqual(len(contents), 30)
+
+ @mock.patch('swiftclient.service.time', return_value=1400000000)
+ def test_upload_object_job_stream(self, time_mock):
+ # Streams are wrapped as ReadableToIterable
+ with tempfile.TemporaryFile() as f:
+ f.write(b'a' * 30)
+ f.flush()
+ f.seek(0)
+ expected_r = {
+ 'action': 'upload_object',
+ 'attempts': 2,
+ 'container': 'test_c',
+ 'headers': {},
+ 'large_object': False,
+ 'object': 'test_o',
+ 'response_dict': {},
+ 'status': 'uploaded',
+ 'success': True,
+ 'path': None,
+ }
+ expected_mtime = 1400000000
+
+ mock_conn = mock.Mock()
+ mock_conn.put_object.return_value = ''
+ type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+
+ s = SwiftService()
+ r = s._upload_object_job(conn=mock_conn,
+ container='test_c',
+ source=f,
+ obj='test_o',
+ options=dict(s._options,
+ leave_segments=True))
+
+ mtime = float(r['headers']['x-object-meta-mtime'])
+ self.assertEqual(mtime, expected_mtime)
+ del r['headers']['x-object-meta-mtime']
+
+ self.assertEqual(r, expected_r)
+ self.assertEqual(mock_conn.put_object.call_count, 1)
+ mock_conn.put_object.assert_called_with('test_c', 'test_o',
+ mock.ANY,
+ content_length=None,
+ headers={},
+ response_dict={})
+ contents = mock_conn.put_object.call_args[0][2]
+ self.assertIsInstance(contents, utils.ReadableToIterable)
+ self.assertEqual(contents.chunk_size, 65536)
+ # next retrieves the first chunk of the stream or len(chunk_size)
+ # or less, it also forces the md5 to be calculated.
+ self.assertEqual(next(contents), b'a' * 30)
+ self.assertEqual(contents.get_md5sum(), md5(b'a' * 30).hexdigest())
+
+ def test_upload_object_job_etag_mismatch(self):
+ # The etag test for both streams and files use the same code
+ # so only one test should be needed.
+ def _consuming_conn(*a, **kw):
+ contents = a[2]
+ contents.read() # Force md5 calculation
+ return 'badresponseetag'
+
+ with tempfile.NamedTemporaryFile() as f:
+ f.write(b'a' * 30)
+ f.flush()
+
+ mock_conn = mock.Mock()
+ mock_conn.put_object.side_effect = _consuming_conn
+ type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+
+ s = SwiftService()
+ r = s._upload_object_job(conn=mock_conn,
+ container='test_c',
+ source=f.name,
+ obj='test_o',
+ options=dict(s._options,
+ leave_segments=True))
+
+ self.assertIs(r['success'], False)
+ self.assertIn('md5 mismatch', str(r.get('error')))
+
+ self.assertEqual(mock_conn.put_object.call_count, 1)
+ expected_headers = {'x-object-meta-mtime': mock.ANY}
+ mock_conn.put_object.assert_called_with('test_c', 'test_o',
+ mock.ANY,
+ content_length=30,
+ headers=expected_headers,
+ response_dict={})
+
+ contents = mock_conn.put_object.call_args[0][2]
+ self.assertEqual(contents.get_md5sum(), md5(b'a' * 30).hexdigest())
+
+ def test_upload_object_job_identical_etag(self):
+ with tempfile.NamedTemporaryFile() as f:
+ f.write(b'a' * 30)
+ f.flush()
+
+ mock_conn = mock.Mock()
+ mock_conn.head_object.return_value = {
+ 'content-length': 30,
+ 'etag': md5(b'a' * 30).hexdigest()}
+ type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+
+ s = SwiftService()
+ r = s._upload_object_job(conn=mock_conn,
+ container='test_c',
+ source=f.name,
+ obj='test_o',
+ options={'changed': False,
+ 'skip_identical': True,
+ 'leave_segments': True,
+ 'header': '',
+ 'segment_size': 0})
+
+ self.assertIsNone(r.get('error'))
+ self.assertIs(True, r['success'])
+ self.assertEqual(r.get('status'), 'skipped-identical')
+ self.assertEqual(mock_conn.put_object.call_count, 0)
+ self.assertEqual(mock_conn.head_object.call_count, 1)
+ mock_conn.head_object.assert_called_with('test_c', 'test_o')
+
+ def test_upload_object_job_identical_slo_with_nesting(self):
+ with tempfile.NamedTemporaryFile() as f:
+ f.write(b'a' * 30)
+ f.flush()
+ seg_etag = md5(b'a' * 10).hexdigest()
+ submanifest = "[%s]" % ",".join(
+ ['{"bytes":10,"hash":"%s"}' % seg_etag] * 2)
+ submanifest_etag = md5(seg_etag.encode('ascii') * 2).hexdigest()
+ manifest = "[%s]" % ",".join([
+ '{"sub_slo":true,"name":"/test_c_segments/test_sub_slo",'
+ '"bytes":20,"hash":"%s"}' % submanifest_etag,
+ '{"bytes":10,"hash":"%s"}' % seg_etag])
+
+ mock_conn = mock.Mock()
+ mock_conn.head_object.return_value = {
+ 'x-static-large-object': True,
+ 'content-length': 30,
+ 'etag': md5(submanifest_etag.encode('ascii') +
+ seg_etag.encode('ascii')).hexdigest()}
+ mock_conn.get_object.side_effect = [
+ ({}, manifest.encode('ascii')),
+ ({}, submanifest.encode('ascii'))]
+ type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+
+ s = SwiftService()
+ r = s._upload_object_job(conn=mock_conn,
+ container='test_c',
+ source=f.name,
+ obj='test_o',
+ options={'changed': False,
+ 'skip_identical': True,
+ 'leave_segments': True,
+ 'header': '',
+ 'segment_size': 10})
+
+ self.assertIsNone(r.get('error'))
+ self.assertIs(True, r['success'])
+ self.assertEqual('skipped-identical', r.get('status'))
+ self.assertEqual(0, mock_conn.put_object.call_count)
+ self.assertEqual([mock.call('test_c', 'test_o')],
+ mock_conn.head_object.mock_calls)
+ self.assertEqual([
+ mock.call('test_c', 'test_o',
+ query_string='multipart-manifest=get'),
+ mock.call('test_c_segments', 'test_sub_slo',
+ query_string='multipart-manifest=get'),
+ ], mock_conn.get_object.mock_calls)
+
+ def test_upload_object_job_identical_dlo(self):
+ with tempfile.NamedTemporaryFile() as f:
+ f.write(b'a' * 30)
+ f.flush()
+ segment_etag = md5(b'a' * 10).hexdigest()
+
+ mock_conn = mock.Mock()
+ mock_conn.head_object.return_value = {
+ 'x-object-manifest': 'test_c_segments/test_o/prefix',
+ 'content-length': 30,
+ 'etag': md5(segment_etag.encode('ascii') * 3).hexdigest()}
+ mock_conn.get_container.side_effect = [
+ (None, [{"bytes": 10, "hash": segment_etag,
+ "name": "test_o/prefix/00"},
+ {"bytes": 10, "hash": segment_etag,
+ "name": "test_o/prefix/01"}]),
+ (None, [{"bytes": 10, "hash": segment_etag,
+ "name": "test_o/prefix/02"}]),
+ (None, {})]
+ type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+
+ s = SwiftService()
+ with mock.patch('swiftclient.service.get_conn',
+ return_value=mock_conn):
+ r = s._upload_object_job(conn=mock_conn,
+ container='test_c',
+ source=f.name,
+ obj='test_o',
+ options={'changed': False,
+ 'skip_identical': True,
+ 'leave_segments': True,
+ 'header': '',
+ 'segment_size': 10})
+
+ self.assertIsNone(r.get('error'))
+ self.assertIs(True, r['success'])
+ self.assertEqual('skipped-identical', r.get('status'))
+ self.assertEqual(0, mock_conn.put_object.call_count)
+ self.assertEqual(1, mock_conn.head_object.call_count)
+ self.assertEqual(3, mock_conn.get_container.call_count)
+ mock_conn.head_object.assert_called_with('test_c', 'test_o')
+ expected = [
+ mock.call('test_c_segments', prefix='test_o/prefix',
+ marker='', delimiter=None, headers={}),
+ mock.call('test_c_segments', prefix='test_o/prefix',
+ marker="test_o/prefix/01", delimiter=None,
+ headers={}),
+ mock.call('test_c_segments', prefix='test_o/prefix',
+ marker="test_o/prefix/02", delimiter=None,
+ headers={}),
+ ]
+ mock_conn.get_container.assert_has_calls(expected)
+
+ def test_make_upload_objects(self):
+ check_names_pseudo_to_expected = {
+ (('/absolute/file/path',), ''): ['absolute/file/path'],
+ (('relative/file/path',), ''): ['relative/file/path'],
+ (('/absolute/file/path',), '/absolute/pseudo/dir'): [
+ 'absolute/pseudo/dir/absolute/file/path'],
+ (('/absolute/file/path',), 'relative/pseudo/dir'): [
+ 'relative/pseudo/dir/absolute/file/path'],
+ (('relative/file/path',), '/absolute/pseudo/dir'): [
+ 'absolute/pseudo/dir/relative/file/path'],
+ (('relative/file/path',), 'relative/pseudo/dir'): [
+ 'relative/pseudo/dir/relative/file/path'],
+ }
+ errors = []
+ for (filenames, pseudo_folder), expected in \
+ check_names_pseudo_to_expected.items():
+ actual = SwiftService._make_upload_objects(
+ filenames, pseudo_folder=pseudo_folder)
+ try:
+ self.assertEqual(expected, [o.object_name for o in actual])
+ except AssertionError as e:
+ msg = 'given (%r, %r) expected %r, got %s' % (
+ filenames, pseudo_folder, expected, e)
+ errors.append(msg)
+ self.assertFalse(errors, "\nERRORS:\n%s" % '\n'.join(errors))
+
+ def test_create_dir_marker_job_unchanged(self):
+ mock_conn = mock.Mock()
+ mock_conn.head_object.return_value = {
+ 'content-type': 'application/directory',
+ 'content-length': '0',
+ 'x-object-meta-mtime': '1.234000',
+ 'etag': md5().hexdigest()}
+
+ s = SwiftService()
+ with mock.patch('swiftclient.service.get_conn',
+ return_value=mock_conn):
+ with mock.patch('swiftclient.service.getmtime',
+ return_value=1.234):
+ r = s._create_dir_marker_job(conn=mock_conn,
+ container='test_c',
+ obj='test_o',
+ path='test',
+ options={'changed': True,
+ 'skip_identical': True,
+ 'leave_segments': True,
+ 'header': '',
+ 'segment_size': 10})
+ self.assertEqual({
+ 'action': 'create_dir_marker',
+ 'container': 'test_c',
+ 'object': 'test_o',
+ 'path': 'test',
+ 'headers': {'x-object-meta-mtime': '1.234000'},
+ # NO response dict!
+ 'success': True,
+ }, r)
+ self.assertEqual([], mock_conn.put_object.mock_calls)
+
+ def test_create_dir_marker_job_unchanged_old_type(self):
+ mock_conn = mock.Mock()
+ mock_conn.head_object.return_value = {
+ 'content-type': 'text/directory',
+ 'content-length': '0',
+ 'x-object-meta-mtime': '1.000000',
+ 'etag': md5().hexdigest()}
+
+ s = SwiftService()
+ with mock.patch('swiftclient.service.get_conn',
+ return_value=mock_conn):
+ with mock.patch('swiftclient.service.time',
+ return_value=1.234):
+ r = s._create_dir_marker_job(conn=mock_conn,
+ container='test_c',
+ obj='test_o',
+ options={'changed': True,
+ 'skip_identical': True,
+ 'leave_segments': True,
+ 'header': '',
+ 'segment_size': 10})
+ self.assertEqual({
+ 'action': 'create_dir_marker',
+ 'container': 'test_c',
+ 'object': 'test_o',
+ 'path': None,
+ 'headers': {'x-object-meta-mtime': '1.000000'},
+ # NO response dict!
+ 'success': True,
+ }, r)
+ self.assertEqual([], mock_conn.put_object.mock_calls)
+
+ def test_create_dir_marker_job_overwrites_bad_type(self):
+ mock_conn = mock.Mock()
+ mock_conn.head_object.return_value = {
+ 'content-type': 'text/plain',
+ 'content-length': '0',
+ 'x-object-meta-mtime': '1.000000',
+ 'etag': md5().hexdigest()}
+
+ s = SwiftService()
+ with mock.patch('swiftclient.service.get_conn',
+ return_value=mock_conn):
+ with mock.patch('swiftclient.service.time',
+ return_value=1.234):
+ r = s._create_dir_marker_job(conn=mock_conn,
+ container='test_c',
+ obj='test_o',
+ options={'changed': True,
+ 'skip_identical': True,
+ 'leave_segments': True,
+ 'header': '',
+ 'segment_size': 10})
+ self.assertEqual({
+ 'action': 'create_dir_marker',
+ 'container': 'test_c',
+ 'object': 'test_o',
+ 'path': None,
+ 'headers': {'x-object-meta-mtime': '1.000000'},
+ 'response_dict': {},
+ 'success': True,
+ }, r)
+ self.assertEqual([mock.call(
+ 'test_c', 'test_o', '',
+ content_length=0,
+ content_type='application/directory',
+ headers={'x-object-meta-mtime': '1.000000'},
+ response_dict={})], mock_conn.put_object.mock_calls)
+
+ def test_create_dir_marker_job_missing(self):
+ mock_conn = mock.Mock()
+ mock_conn.head_object.side_effect = \
+ ClientException('Not Found', http_status=404)
+
+ s = SwiftService()
+ with mock.patch('swiftclient.service.get_conn',
+ return_value=mock_conn):
+ with mock.patch('swiftclient.service.time',
+ return_value=1.234):
+ r = s._create_dir_marker_job(conn=mock_conn,
+ container='test_c',
+ obj='test_o',
+ options={'changed': True,
+ 'skip_identical': True,
+ 'leave_segments': True,
+ 'header': '',
+ 'segment_size': 10})
+ self.assertEqual({
+ 'action': 'create_dir_marker',
+ 'container': 'test_c',
+ 'object': 'test_o',
+ 'path': None,
+ 'headers': {'x-object-meta-mtime': '1.000000'},
+ 'response_dict': {},
+ 'success': True,
+ }, r)
+ self.assertEqual([mock.call(
+ 'test_c', 'test_o', '',
+ content_length=0,
+ content_type='application/directory',
+ headers={'x-object-meta-mtime': '1.000000'},
+ response_dict={})], mock_conn.put_object.mock_calls)
+
+
+class TestServiceDownload(_TestServiceBase):
+
+ def setUp(self):
+ super(TestServiceDownload, self).setUp()
+ self.opts = swiftclient.service._default_local_options.copy()
+ self.opts['no_download'] = True
+ self.obj_content = b'c' * 10
+ self.obj_etag = md5(self.obj_content).hexdigest()
+ self.obj_len = len(self.obj_content)
+ self.exc = Exception('test_exc')
+ # Base response to be copied and updated to matched the expected
+ # response for each test
+ self.expected = {
+ 'action': 'download_object', # Should always be download_object
+ 'container': 'test_c',
+ 'object': 'test_o',
+ 'attempts': 2,
+ 'response_dict': {},
+ 'path': 'test_o',
+ 'pseudodir': False,
+ 'success': None # Should be a bool
+ }
+
+ def _readbody(self):
+ yield self.obj_content
+
+ @mock.patch('swiftclient.service.SwiftService.list')
+ @mock.patch('swiftclient.service.SwiftService._submit_page_downloads')
+ @mock.patch('swiftclient.service.interruptable_as_completed')
+ def test_download_container_job(self, as_comp, sub_page, service_list):
+ """
+ Check that paged downloads work correctly
+ """
+ obj_count = [0]
+
+ def make_counting_generator(object_to_yield, total_count):
+ # maintain a counter of objects yielded
+ count = [0]
+
+ def counting_generator():
+ while count[0] < 10:
+ yield object_to_yield
+ count[0] += 1
+ total_count[0] += 1
+ return counting_generator()
+
+ obj_count_on_sub_page_call = []
+ sub_page_call_count = [0]
+
+ def fake_sub_page(*args):
+ # keep a record of obj_count when this function is called
+ obj_count_on_sub_page_call.append(obj_count[0])
+ sub_page_call_count[0] += 1
+ if sub_page_call_count[0] < 3:
+ return range(0, 10)
+ return None
+
+ sub_page.side_effect = fake_sub_page
+
+ r = Mock(spec=Future)
+ r.result.return_value = self._get_expected({
+ 'success': True,
+ 'start_time': 1,
+ 'finish_time': 2,
+ 'headers_receipt': 3,
+ 'auth_end_time': 4,
+ 'read_length': len(b'objcontent'),
+ })
+
+ as_comp.side_effect = [
+ make_counting_generator(r, obj_count),
+ make_counting_generator(r, obj_count)
+ ]
+
+ s = SwiftService()
+ down_gen = s._download_container('test_c', self.opts)
+ results = list(down_gen)
+ self.assertEqual(20, len(results))
+ self.assertEqual(2, as_comp.call_count)
+ self.assertEqual(3, sub_page_call_count[0])
+ self.assertEqual([0, 7, 17], obj_count_on_sub_page_call)
+
+ @mock.patch('swiftclient.service.SwiftService.list')
+ @mock.patch('swiftclient.service.SwiftService._submit_page_downloads')
+ @mock.patch('swiftclient.service.interruptable_as_completed')
+ def test_download_container_job_error(
+ self, as_comp, sub_page, service_list):
+ """
+ Check that paged downloads work correctly
+ """
+ class BoomError(Exception):
+ def __init__(self, value):
+ self.value = value
+
+ def __str__(self):
+ return repr(self.value)
+
+ def _make_result():
+ r = Mock(spec=Future)
+ r.result.return_value = self._get_expected({
+ 'success': True,
+ 'start_time': 1,
+ 'finish_time': 2,
+ 'headers_receipt': 3,
+ 'auth_end_time': 4,
+ 'read_length': len(b'objcontent'),
+ })
+ return r
+
+ as_comp.side_effect = [
+
+ ]
+ # We need Futures here because the error will cause a call to .cancel()
+ sub_page_effects = [
+ [_make_result() for _ in range(0, 10)],
+ BoomError('Go Boom')
+ ]
+ sub_page.side_effect = sub_page_effects
+ # ...but we must also mock the returns to as_completed
+ as_comp.side_effect = [
+ [_make_result() for _ in range(0, 10)]
+ ]
+
+ s = SwiftService()
+ self.assertRaises(
+ BoomError,
+ lambda: list(s._download_container('test_c', self.opts))
+ )
+ # This was an unknown error, so make sure we attempt to cancel futures
+ for spe in sub_page_effects[0]:
+ spe.cancel.assert_called_once_with()
+ self.assertEqual(1, as_comp.call_count)
+
+ # Now test ClientException
+ sub_page_effects = [
+ [_make_result() for _ in range(0, 10)],
+ ClientException('Go Boom')
+ ]
+ sub_page.side_effect = sub_page_effects
+ as_comp.reset_mock()
+ as_comp.side_effect = [
+ [_make_result() for _ in range(0, 10)],
+ ]
+ self.assertRaises(
+ ClientException,
+ lambda: list(s._download_container('test_c', self.opts))
+ )
+ # This was a ClientException, so make sure we don't cancel futures
+ for spe in sub_page_effects[0]:
+ self.assertFalse(spe.cancel.called)
+ self.assertEqual(1, as_comp.call_count)
+
+ def test_download_object_job(self):
+ mock_conn = self._get_mock_connection()
+ objcontent = six.BytesIO(b'objcontent')
+ mock_conn.get_object.side_effect = [
+ ({'content-type': 'text/plain',
+ 'etag': '2cbbfe139a744d6abbe695e17f3c1991'},
+ objcontent)
+ ]
+ expected_r = self._get_expected({
+ 'success': True,
+ 'start_time': 1,
+ 'finish_time': 2,
+ 'headers_receipt': 3,
+ 'auth_end_time': 4,
+ 'read_length': len(b'objcontent'),
+ })
+
+ with mock.patch.object(builtins, 'open') as mock_open:
+ written_content = Mock()
+ mock_open.return_value = written_content
+ s = SwiftService()
+ _opts = self.opts.copy()
+ _opts['no_download'] = False
+ actual_r = s._download_object_job(
+ mock_conn, 'test_c', 'test_o', _opts)
+ actual_r = dict( # Need to override the times we got from the call
+ actual_r,
+ **{
+ 'start_time': 1,
+ 'finish_time': 2,
+ 'headers_receipt': 3
+ }
+ )
+ mock_open.assert_called_once_with('test_o', 'wb', 65536)
+ written_content.write.assert_called_once_with(b'objcontent')
+
+ mock_conn.get_object.assert_called_once_with(
+ 'test_c', 'test_o', resp_chunk_size=65536, headers={},
+ response_dict={}
+ )
+ self.assertEqual(expected_r, actual_r)
+
+ def test_download_object_job_with_mtime(self):
+ mock_conn = self._get_mock_connection()
+ objcontent = six.BytesIO(b'objcontent')
+ mock_conn.get_object.side_effect = [
+ ({'content-type': 'text/plain',
+ 'etag': '2cbbfe139a744d6abbe695e17f3c1991',
+ 'x-object-meta-mtime': '1454113727.682512'},
+ objcontent)
+ ]
+ expected_r = self._get_expected({
+ 'success': True,
+ 'start_time': 1,
+ 'finish_time': 2,
+ 'headers_receipt': 3,
+ 'auth_end_time': 4,
+ 'read_length': len(b'objcontent'),
+ })
+
+ with mock.patch.object(builtins, 'open') as mock_open, \
+ mock.patch('swiftclient.service.utime') as mock_utime:
+ written_content = Mock()
+ mock_open.return_value = written_content
+ s = SwiftService()
+ _opts = self.opts.copy()
+ _opts['no_download'] = False
+ actual_r = s._download_object_job(
+ mock_conn, 'test_c', 'test_o', _opts)
+ actual_r = dict( # Need to override the times we got from the call
+ actual_r,
+ **{
+ 'start_time': 1,
+ 'finish_time': 2,
+ 'headers_receipt': 3
+ }
+ )
+ mock_open.assert_called_once_with('test_o', 'wb', 65536)
+ mock_utime.assert_called_once_with(
+ 'test_o', (1454113727.682512, 1454113727.682512))
+ written_content.write.assert_called_once_with(b'objcontent')
+
+ mock_conn.get_object.assert_called_once_with(
+ 'test_c', 'test_o', resp_chunk_size=65536, headers={},
+ response_dict={}
+ )
+ self.assertEqual(expected_r, actual_r)
+
+ def test_download_object_job_bad_mtime(self):
+ mock_conn = self._get_mock_connection()
+ objcontent = six.BytesIO(b'objcontent')
+ mock_conn.get_object.side_effect = [
+ ({'content-type': 'text/plain',
+ 'etag': '2cbbfe139a744d6abbe695e17f3c1991',
+ 'x-object-meta-mtime': 'foo'},
+ objcontent)
+ ]
+ expected_r = self._get_expected({
+ 'success': True,
+ 'start_time': 1,
+ 'finish_time': 2,
+ 'headers_receipt': 3,
+ 'auth_end_time': 4,
+ 'read_length': len(b'objcontent'),
+ })
+
+ with mock.patch.object(builtins, 'open') as mock_open, \
+ mock.patch('swiftclient.service.utime') as mock_utime:
+ written_content = Mock()
+ mock_open.return_value = written_content
+ s = SwiftService()
+ _opts = self.opts.copy()
+ _opts['no_download'] = False
+ actual_r = s._download_object_job(
+ mock_conn, 'test_c', 'test_o', _opts)
+ actual_r = dict( # Need to override the times we got from the call
+ actual_r,
+ **{
+ 'start_time': 1,
+ 'finish_time': 2,
+ 'headers_receipt': 3
+ }
+ )
+ mock_open.assert_called_once_with('test_o', 'wb', 65536)
+ self.assertEqual(0, len(mock_utime.mock_calls))
+ written_content.write.assert_called_once_with(b'objcontent')
+
+ mock_conn.get_object.assert_called_once_with(
+ 'test_c', 'test_o', resp_chunk_size=65536, headers={},
+ response_dict={}
+ )
+ self.assertEqual(expected_r, actual_r)
+
+ def test_download_object_job_ignore_mtime(self):
+ mock_conn = self._get_mock_connection()
+ objcontent = six.BytesIO(b'objcontent')
+ mock_conn.get_object.side_effect = [
+ ({'content-type': 'text/plain',
+ 'etag': '2cbbfe139a744d6abbe695e17f3c1991',
+ 'x-object-meta-mtime': '1454113727.682512'},
+ objcontent)
+ ]
+ expected_r = self._get_expected({
+ 'success': True,
+ 'start_time': 1,
+ 'finish_time': 2,
+ 'headers_receipt': 3,
+ 'auth_end_time': 4,
+ 'read_length': len(b'objcontent'),
+ })
+
+ with mock.patch.object(builtins, 'open') as mock_open, \
+ mock.patch('swiftclient.service.utime') as mock_utime:
+ written_content = Mock()
+ mock_open.return_value = written_content
+ s = SwiftService()
+ _opts = self.opts.copy()
+ _opts['no_download'] = False
+ _opts['ignore_mtime'] = True
+ actual_r = s._download_object_job(
+ mock_conn, 'test_c', 'test_o', _opts)
+ actual_r = dict( # Need to override the times we got from the call
+ actual_r,
+ **{
+ 'start_time': 1,
+ 'finish_time': 2,
+ 'headers_receipt': 3
+ }
+ )
+ mock_open.assert_called_once_with('test_o', 'wb', 65536)
+ self.assertEqual([], mock_utime.mock_calls)
+ written_content.write.assert_called_once_with(b'objcontent')
+
+ mock_conn.get_object.assert_called_once_with(
+ 'test_c', 'test_o', resp_chunk_size=65536, headers={},
+ response_dict={}
+ )
+ self.assertEqual(expected_r, actual_r)
+
+ def test_download_object_job_exception(self):
+ mock_conn = self._get_mock_connection()
+ mock_conn.get_object = Mock(side_effect=self.exc)
+ expected_r = self._get_expected({
+ 'success': False,
+ 'error': self.exc,
+ 'error_timestamp': mock.ANY,
+ 'traceback': mock.ANY
+ })
+
+ s = SwiftService()
+ actual_r = s._download_object_job(
+ mock_conn, 'test_c', 'test_o', self.opts)
+
+ mock_conn.get_object.assert_called_once_with(
+ 'test_c', 'test_o', resp_chunk_size=65536, headers={},
+ response_dict={}
+ )
+ self.assertEqual(expected_r, actual_r)
+
+ def test_download(self):
+ with mock.patch('swiftclient.service.Connection') as mock_conn:
+ header = {'content-length': self.obj_len,
+ 'etag': self.obj_etag}
+ mock_conn.get_object.return_value = header, self._readbody()
+
+ resp = SwiftService()._download_object_job(mock_conn,
+ 'c',
+ 'test',
+ self.opts)
+
+ self.assertIsNone(resp.get('error'))
+ self.assertIs(True, resp['success'])
+ self.assertEqual(resp['action'], 'download_object')
+ self.assertEqual(resp['object'], 'test')
+ self.assertEqual(resp['path'], 'test')
+
+ @mock.patch('swiftclient.service.interruptable_as_completed')
+ @mock.patch('swiftclient.service.SwiftService._download_container')
+ @mock.patch('swiftclient.service.SwiftService._download_object_job')
+ def test_download_with_objects_empty(self, mock_down_obj,
+ mock_down_cont, mock_as_comp):
+ fake_future = Future()
+ fake_future.set_result(1)
+ mock_as_comp.return_value = [fake_future]
+ service = SwiftService()
+ next(service.download('c', [], self.opts), None)
+ mock_down_obj.assert_not_called()
+ mock_down_cont.assert_not_called()
+
+ next(service.download('c', options=self.opts), None)
+ self.assertTrue(mock_down_cont.called)
+
+ def test_download_with_output_dir(self):
+ with mock.patch('swiftclient.service.Connection') as mock_conn:
+ header = {'content-length': self.obj_len,
+ 'etag': self.obj_etag}
+ mock_conn.get_object.return_value = header, self._readbody()
+
+ options = self.opts.copy()
+ options['out_directory'] = 'temp_dir'
+ resp = SwiftService()._download_object_job(mock_conn,
+ 'c',
+ 'example/test',
+ options)
+
+ self.assertIsNone(resp.get('error'))
+ self.assertIs(True, resp['success'])
+ self.assertEqual(resp['action'], 'download_object')
+ self.assertEqual(resp['object'], 'example/test')
+ self.assertEqual(resp['path'], 'temp_dir/example/test')
+
+ def test_download_with_remove_prefix(self):
+ with mock.patch('swiftclient.service.Connection') as mock_conn:
+ header = {'content-length': self.obj_len,
+ 'etag': self.obj_etag}
+ mock_conn.get_object.return_value = header, self._readbody()
+
+ options = self.opts.copy()
+ options['prefix'] = 'example/'
+ options['remove_prefix'] = True
+ resp = SwiftService()._download_object_job(mock_conn,
+ 'c',
+ 'example/test',
+ options)
+
+ self.assertIsNone(resp.get('error'))
+ self.assertIs(True, resp['success'])
+ self.assertEqual(resp['action'], 'download_object')
+ self.assertEqual(resp['object'], 'example/test')
+ self.assertEqual(resp['path'], 'test')
+
+ def test_download_with_remove_prefix_and_remove_slashes(self):
+ with mock.patch('swiftclient.service.Connection') as mock_conn:
+ header = {'content-length': self.obj_len,
+ 'etag': self.obj_etag}
+ mock_conn.get_object.return_value = header, self._readbody()
+
+ options = self.opts.copy()
+ options['prefix'] = 'example'
+ options['remove_prefix'] = True
+ resp = SwiftService()._download_object_job(mock_conn,
+ 'c',
+ 'example/test',
+ options)
+
+ self.assertIsNone(resp.get('error'))
+ self.assertIs(True, resp['success'])
+ self.assertEqual(resp['action'], 'download_object')
+ self.assertEqual(resp['object'], 'example/test')
+ self.assertEqual(resp['path'], 'test')
+
+ def test_download_with_output_dir_and_remove_prefix(self):
+ with mock.patch('swiftclient.service.Connection') as mock_conn:
+ header = {'content-length': self.obj_len,
+ 'etag': self.obj_etag}
+ mock_conn.get_object.return_value = header, self._readbody()
+
+ options = self.opts.copy()
+ options['prefix'] = 'example'
+ options['out_directory'] = 'new/dir'
+ options['remove_prefix'] = True
+ resp = SwiftService()._download_object_job(mock_conn,
+ 'c',
+ 'example/test',
+ options)
+
+ self.assertIsNone(resp.get('error'))
+ self.assertIs(True, resp['success'])
+ self.assertEqual(resp['action'], 'download_object')
+ self.assertEqual(resp['object'], 'example/test')
+ self.assertEqual(resp['path'], 'new/dir/test')
+
+ def test_download_object_job_skip_identical(self):
+ with tempfile.NamedTemporaryFile() as f:
+ f.write(b'a' * 30)
+ f.flush()
+
+ err = swiftclient.ClientException('Object GET failed',
+ http_status=304)
+
+ def fake_get(*args, **kwargs):
+ kwargs['response_dict']['headers'] = {}
+ raise err
+
+ mock_conn = mock.Mock()
+ mock_conn.get_object.side_effect = fake_get
+ type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+ expected_r = {
+ 'action': 'download_object',
+ 'container': 'test_c',
+ 'object': 'test_o',
+ 'success': False,
+ 'error': err,
+ 'response_dict': {'headers': {}},
+ 'path': 'test_o',
+ 'pseudodir': False,
+ 'attempts': 2,
+ 'traceback': mock.ANY,
+ 'error_timestamp': mock.ANY
+ }
+
+ s = SwiftService()
+ r = s._download_object_job(conn=mock_conn,
+ container='test_c',
+ obj='test_o',
+ options={'out_file': f.name,
+ 'out_directory': None,
+ 'prefix': None,
+ 'remove_prefix': False,
+ 'header': {},
+ 'yes_all': False,
+ 'skip_identical': True})
+ self.assertEqual(r, expected_r)
+
+ self.assertEqual(mock_conn.get_object.call_count, 1)
+ mock_conn.get_object.assert_called_with(
+ 'test_c',
+ 'test_o',
+ resp_chunk_size=65536,
+ headers={'If-None-Match': md5(b'a' * 30).hexdigest()},
+ query_string='multipart-manifest=get',
+ response_dict=expected_r['response_dict'])
+
+ def test_download_object_job_skip_identical_dlo(self):
+ with tempfile.NamedTemporaryFile() as f:
+ f.write(b'a' * 30)
+ f.flush()
+ on_disk_md5 = md5(b'a' * 30).hexdigest()
+ segment_md5 = md5(b'a' * 10).hexdigest()
+
+ mock_conn = mock.Mock()
+ mock_conn.get_object.return_value = (
+ {'x-object-manifest': 'test_c_segments/test_o/prefix'}, [b''])
+ mock_conn.get_container.side_effect = [
+ (None, [{'name': 'test_o/prefix/1',
+ 'bytes': 10, 'hash': segment_md5},
+ {'name': 'test_o/prefix/2',
+ 'bytes': 10, 'hash': segment_md5}]),
+ (None, [{'name': 'test_o/prefix/3',
+ 'bytes': 10, 'hash': segment_md5}]),
+ (None, [])]
+
+ type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+ expected_r = {
+ 'action': 'download_object',
+ 'container': 'test_c',
+ 'object': 'test_o',
+ 'success': False,
+ 'response_dict': {},
+ 'path': 'test_o',
+ 'pseudodir': False,
+ 'attempts': 2,
+ 'traceback': mock.ANY,
+ 'error_timestamp': mock.ANY
+ }
+
+ s = SwiftService()
+ with mock.patch('swiftclient.service.get_conn',
+ return_value=mock_conn):
+ r = s._download_object_job(conn=mock_conn,
+ container='test_c',
+ obj='test_o',
+ options={'out_file': f.name,
+ 'out_directory': None,
+ 'prefix': None,
+ 'remove_prefix': False,
+ 'header': {},
+ 'yes_all': False,
+ 'skip_identical': True})
+
+ err = r.pop('error')
+ self.assertEqual("Large object is identical", err.msg)
+ self.assertEqual(304, err.http_status)
+
+ self.assertEqual(r, expected_r)
+
+ self.assertEqual(mock_conn.get_object.call_count, 1)
+ mock_conn.get_object.assert_called_with(
+ 'test_c',
+ 'test_o',
+ resp_chunk_size=65536,
+ headers={'If-None-Match': on_disk_md5},
+ query_string='multipart-manifest=get',
+ response_dict=expected_r['response_dict'])
+ self.assertEqual(mock_conn.get_container.mock_calls, [
+ mock.call('test_c_segments',
+ delimiter=None,
+ prefix='test_o/prefix',
+ marker='',
+ headers={}),
+ mock.call('test_c_segments',
+ delimiter=None,
+ prefix='test_o/prefix',
+ marker='test_o/prefix/2',
+ headers={}),
+ mock.call('test_c_segments',
+ delimiter=None,
+ prefix='test_o/prefix',
+ marker='test_o/prefix/3',
+ headers={})])
+
+ def test_download_object_job_skip_identical_nested_slo(self):
+ with tempfile.NamedTemporaryFile() as f:
+ f.write(b'a' * 30)
+ f.flush()
+ on_disk_md5 = md5(b'a' * 30).hexdigest()
+
+ seg_etag = md5(b'a' * 10).hexdigest()
+ submanifest = "[%s]" % ",".join(
+ ['{"bytes":10,"hash":"%s"}' % seg_etag] * 2)
+ submanifest_etag = md5(seg_etag.encode('ascii') * 2).hexdigest()
+ manifest = "[%s]" % ",".join([
+ '{"sub_slo":true,"name":"/test_c_segments/test_sub_slo",'
+ '"bytes":20,"hash":"%s"}' % submanifest_etag,
+ '{"bytes":10,"hash":"%s"}' % seg_etag])
+
+ mock_conn = mock.Mock()
+ mock_conn.get_object.side_effect = [
+ ({'x-static-large-object': True,
+ 'content-length': 30,
+ 'etag': md5(submanifest_etag.encode('ascii') +
+ seg_etag.encode('ascii')).hexdigest()},
+ [manifest.encode('ascii')]),
+ ({'x-static-large-object': True,
+ 'content-length': 20,
+ 'etag': submanifest_etag},
+ submanifest.encode('ascii'))]
+
+ type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+ expected_r = {
+ 'action': 'download_object',
+ 'container': 'test_c',
+ 'object': 'test_o',
+ 'success': False,
+ 'response_dict': {},
+ 'path': 'test_o',
+ 'pseudodir': False,
+ 'attempts': 2,
+ 'traceback': mock.ANY,
+ 'error_timestamp': mock.ANY
+ }
+
+ s = SwiftService()
+ with mock.patch('swiftclient.service.get_conn',
+ return_value=mock_conn):
+ r = s._download_object_job(conn=mock_conn,
+ container='test_c',
+ obj='test_o',
+ options={'out_file': f.name,
+ 'out_directory': None,
+ 'prefix': None,
+ 'remove_prefix': False,
+ 'header': {},
+ 'yes_all': False,
+ 'skip_identical': True})
+
+ err = r.pop('error')
+ self.assertEqual("Large object is identical", err.msg)
+ self.assertEqual(304, err.http_status)
+
+ self.assertEqual(r, expected_r)
+ self.assertEqual(mock_conn.get_object.mock_calls, [
+ mock.call('test_c',
+ 'test_o',
+ resp_chunk_size=65536,
+ headers={'If-None-Match': on_disk_md5},
+ query_string='multipart-manifest=get',
+ response_dict={}),
+ mock.call('test_c_segments',
+ 'test_sub_slo',
+ query_string='multipart-manifest=get')])
+
+ def test_download_object_job_skip_identical_diff_dlo(self):
+ with tempfile.NamedTemporaryFile() as f:
+ f.write(b'a' * 30)
+ f.write(b'b')
+ f.flush()
+ on_disk_md5 = md5(b'a' * 30 + b'b').hexdigest()
+ segment_md5 = md5(b'a' * 10).hexdigest()
+
+ mock_conn = mock.Mock()
+ mock_conn.get_object.side_effect = [
+ ({'x-object-manifest': 'test_c_segments/test_o/prefix'},
+ [b'']),
+ ({'x-object-manifest': 'test_c_segments/test_o/prefix'},
+ [b'a' * 30])]
+ mock_conn.get_container.side_effect = [
+ (None, [{'name': 'test_o/prefix/1',
+ 'bytes': 10, 'hash': segment_md5},
+ {'name': 'test_o/prefix/2',
+ 'bytes': 10, 'hash': segment_md5}]),
+ (None, [{'name': 'test_o/prefix/3',
+ 'bytes': 10, 'hash': segment_md5}]),
+ (None, [])]
+
+ type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+ type(mock_conn).auth_end_time = mock.PropertyMock(return_value=14)
+ expected_r = {
+ 'action': 'download_object',
+ 'container': 'test_c',
+ 'object': 'test_o',
+ 'success': True,
+ 'response_dict': {},
+ 'path': 'test_o',
+ 'pseudodir': False,
+ 'read_length': 30,
+ 'attempts': 2,
+ 'start_time': 0,
+ 'headers_receipt': 1,
+ 'finish_time': 2,
+ 'auth_end_time': mock_conn.auth_end_time,
+ }
+
+ options = self.opts.copy()
+ options['out_file'] = f.name
+ options['skip_identical'] = True
+ s = SwiftService()
+ with mock.patch('swiftclient.service.time', side_effect=range(3)):
+ with mock.patch('swiftclient.service.get_conn',
+ return_value=mock_conn):
+ r = s._download_object_job(
+ conn=mock_conn,
+ container='test_c',
+ obj='test_o',
+ options=options)
+
+ self.assertEqual(r, expected_r)
+
+ self.assertEqual(mock_conn.get_container.mock_calls, [
+ mock.call('test_c_segments',
+ delimiter=None,
+ prefix='test_o/prefix',
+ marker='',
+ headers={}),
+ mock.call('test_c_segments',
+ delimiter=None,
+ prefix='test_o/prefix',
+ marker='test_o/prefix/2',
+ headers={}),
+ mock.call('test_c_segments',
+ delimiter=None,
+ prefix='test_o/prefix',
+ marker='test_o/prefix/3',
+ headers={})])
+ self.assertEqual(mock_conn.get_object.mock_calls, [
+ mock.call('test_c',
+ 'test_o',
+ resp_chunk_size=65536,
+ headers={'If-None-Match': on_disk_md5},
+ query_string='multipart-manifest=get',
+ response_dict={}),
+ mock.call('test_c',
+ 'test_o',
+ resp_chunk_size=65536,
+ headers={'If-None-Match': on_disk_md5},
+ response_dict={})])
+
+ def test_download_object_job_skip_identical_diff_nested_slo(self):
+ with tempfile.NamedTemporaryFile() as f:
+ f.write(b'a' * 29)
+ f.flush()
+ on_disk_md5 = md5(b'a' * 29).hexdigest()
+
+ seg_etag = md5(b'a' * 10).hexdigest()
+ submanifest = "[%s]" % ",".join(
+ ['{"bytes":10,"hash":"%s"}' % seg_etag] * 2)
+ submanifest_etag = md5(seg_etag.encode('ascii') * 2).hexdigest()
+ manifest = "[%s]" % ",".join([
+ '{"sub_slo":true,"name":"/test_c_segments/test_sub_slo",'
+ '"bytes":20,"hash":"%s"}' % submanifest_etag,
+ '{"bytes":10,"hash":"%s"}' % seg_etag])
+
+ mock_conn = mock.Mock()
+ mock_conn.get_object.side_effect = [
+ ({'x-static-large-object': True,
+ 'content-length': 30,
+ 'etag': md5(submanifest_etag.encode('ascii') +
+ seg_etag.encode('ascii')).hexdigest()},
+ [manifest.encode('ascii')]),
+ ({'x-static-large-object': True,
+ 'content-length': 20,
+ 'etag': submanifest_etag},
+ submanifest.encode('ascii')),
+ ({'x-static-large-object': True,
+ 'content-length': 30,
+ 'etag': md5(submanifest_etag.encode('ascii') +
+ seg_etag.encode('ascii')).hexdigest()},
+ [b'a' * 30])]
+
+ type(mock_conn).attempts = mock.PropertyMock(return_value=2)
+ type(mock_conn).auth_end_time = mock.PropertyMock(return_value=14)
+ expected_r = {
+ 'action': 'download_object',
+ 'container': 'test_c',
+ 'object': 'test_o',
+ 'success': True,
+ 'response_dict': {},
+ 'path': 'test_o',
+ 'pseudodir': False,
+ 'read_length': 30,
+ 'attempts': 2,
+ 'start_time': 0,
+ 'headers_receipt': 1,
+ 'finish_time': 2,
+ 'auth_end_time': mock_conn.auth_end_time,
+ }
+
+ options = self.opts.copy()
+ options['out_file'] = f.name
+ options['skip_identical'] = True
+ s = SwiftService()
+ with mock.patch('swiftclient.service.time', side_effect=range(3)):
+ with mock.patch('swiftclient.service.get_conn',
+ return_value=mock_conn):
+ r = s._download_object_job(
+ conn=mock_conn,
+ container='test_c',
+ obj='test_o',
+ options=options)
+
+ self.assertEqual(r, expected_r)
+ self.assertEqual(mock_conn.get_object.mock_calls, [
+ mock.call('test_c',
+ 'test_o',
+ resp_chunk_size=65536,
+ headers={'If-None-Match': on_disk_md5},
+ query_string='multipart-manifest=get',
+ response_dict={}),
+ mock.call('test_c_segments',
+ 'test_sub_slo',
+ query_string='multipart-manifest=get'),
+ mock.call('test_c',
+ 'test_o',
+ resp_chunk_size=65536,
+ headers={'If-None-Match': on_disk_md5},
+ response_dict={})])
+
+
+class TestServicePost(_TestServiceBase):
+
+ def setUp(self):
+ super(TestServicePost, self).setUp()
+ self.opts = swiftclient.service._default_local_options.copy()
+
+ @mock.patch('swiftclient.service.MultiThreadingManager')
+ @mock.patch('swiftclient.service.ResultsIterator')
+ def test_object_post(self, res_iter, thread_manager):
+ """
+ Check post method translates strings and objects to _post_object_job
+ calls correctly
+ """
+ tm_instance = Mock()
+ thread_manager.return_value = tm_instance
+
+ self.opts.update({'meta': ["meta1:test1"], "header": ["hdr1:test1"]})
+ spo = swiftclient.service.SwiftPostObject(
+ "test_spo",
+ {'meta': ["meta1:test2"], "header": ["hdr1:test2"]})
+
+ SwiftService().post('test_c', ['test_o', spo], self.opts)
+
+ calls = [
+ mock.call(
+ SwiftService._post_object_job, 'test_c', 'test_o',
+ {
+ "X-Object-Meta-Meta1": "test1",
+ "Hdr1": "test1"},
+ {}),
+ mock.call(
+ SwiftService._post_object_job, 'test_c', 'test_spo',
+ {
+ "X-Object-Meta-Meta1": "test2",
+ "Hdr1": "test2"},
+ {}),
+ ]
+ tm_instance.object_uu_pool.submit.assert_has_calls(calls)
+ self.assertEqual(
+ tm_instance.object_uu_pool.submit.call_count, len(calls))
+
+ res_iter.assert_called_with(
+ [tm_instance.object_uu_pool.submit()] * len(calls))
+
+
+class TestServiceCopy(_TestServiceBase):
+
+ def setUp(self):
+ super(TestServiceCopy, self).setUp()
+ self.opts = swiftclient.service._default_local_options.copy()
+
+ @mock.patch('swiftclient.service.MultiThreadingManager')
+ @mock.patch('swiftclient.service.interruptable_as_completed')
+ def test_object_copy(self, inter_compl, thread_manager):
+ """
+ Check copy method translates strings and objects to _copy_object_job
+ calls correctly
+ """
+ tm_instance = Mock()
+ thread_manager.return_value = tm_instance
+
+ self.opts.update({'meta': ["meta1:test1"], "header": ["hdr1:test1"]})
+ sco = swiftclient.service.SwiftCopyObject(
+ "test_sco",
+ options={'meta': ["meta1:test2"], "header": ["hdr1:test2"],
+ 'destination': "/cont_new/test_sco"})
+
+ res = SwiftService().copy('test_c', ['test_o', sco], self.opts)
+ res = list(res)
+
+ calls = [
+ mock.call(
+ SwiftService._create_container_job, 'cont_new', headers={}),
+ ]
+ tm_instance.container_pool.submit.assert_has_calls(calls,
+ any_order=True)
+ self.assertEqual(
+ tm_instance.container_pool.submit.call_count, len(calls))
+
+ calls = [
+ mock.call(
+ SwiftService._copy_object_job, 'test_c', 'test_o',
+ None,
+ {
+ "X-Object-Meta-Meta1": "test1",
+ "Hdr1": "test1"},
+ False),
+ mock.call(
+ SwiftService._copy_object_job, 'test_c', 'test_sco',
+ '/cont_new/test_sco',
+ {
+ "X-Object-Meta-Meta1": "test2",
+ "Hdr1": "test2"},
+ False),
+ ]
+ tm_instance.object_uu_pool.submit.assert_has_calls(calls)
+ self.assertEqual(
+ tm_instance.object_uu_pool.submit.call_count, len(calls))
+
+ inter_compl.assert_called_with(
+ [tm_instance.object_uu_pool.submit()] * len(calls))
+
+ def test_object_copy_fail_dest(self):
+ """
+ Destination in incorrect format and destination with object
+ used when multiple objects are copied raises SwiftError
+ """
+ with self.assertRaises(SwiftError):
+ list(SwiftService().copy('test_c', ['test_o'],
+ {'destination': 'cont'}))
+ with self.assertRaises(SwiftError):
+ list(SwiftService().copy('test_c', ['test_o', 'test_o2'],
+ {'destination': '/cont/obj'}))