summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoel Wright <joel.wright@sohonet.com>2014-10-14 16:54:41 +0100
committerJoel Wright <joel.wright@sohonet.com>2015-07-20 20:44:51 +0100
commita8c4df98eee43b419d6dd30e80c838d9f2efd025 (patch)
tree31eaad7eb51f4c9ad1e04b073887b270cc6b9edc
parent63998b481c7fd2d242efa12e2ca5b959bcdd113b (diff)
downloadpython-swiftclient-a8c4df98eee43b419d6dd30e80c838d9f2efd025.tar.gz
Reduce memory usage for download/delete and add --no-shuffle option to st_download
The current code builds a full object listing before performing either a multiple download or delete operation (and also shuffles this complete list in the case of a download). This patch removes the creation of the full object list and adds the ability to turn off shuffle for files when downloading. Also added is a limit on the number of list results that can be queued by a single call to service.list without consuming results (reduces memory overhead for large listings). Some tests added for service.py download and list. Change-Id: Ie737cbb7f8b1fa8a79bbb88914730b05aa7f2906
-rw-r--r--swiftclient/service.py129
-rwxr-xr-xswiftclient/shell.py18
-rw-r--r--tests/unit/test_service.py492
-rw-r--r--tests/unit/test_shell.py56
4 files changed, 595 insertions, 100 deletions
diff --git a/swiftclient/service.py b/swiftclient/service.py
index 7c55769..4f331c4 100644
--- a/swiftclient/service.py
+++ b/swiftclient/service.py
@@ -176,7 +176,8 @@ _default_local_options = {
'fail_fast': False,
'human': False,
'dir_marker': False,
- 'checksum': True
+ 'checksum': True,
+ 'shuffle': False
}
POLICY = 'X-Storage-Policy'
@@ -752,7 +753,7 @@ class SwiftService(object):
else:
options = self._options
- rq = Queue()
+ rq = Queue(maxsize=10) # Just stop list running away consuming memory
if container is None:
listing_future = self.thread_manager.container_pool.submit(
@@ -895,6 +896,7 @@ class SwiftService(object):
'out_directory': None,
'out_file': None,
'remove_prefix': False,
+ 'shuffle' : False
}
:returns: A generator for returning the results of the download
@@ -916,39 +918,20 @@ class SwiftService(object):
try:
options_copy = deepcopy(options)
options_copy["long"] = False
- containers = []
+
for part in self.list(options=options_copy):
if part["success"]:
- containers.extend([
- i['name'] for i in part["listing"]
- ])
- else:
- raise part["error"]
+ containers = [i['name'] for i in part["listing"]]
- shuffle(containers)
-
- o_downs = []
- for con in containers:
- objs = []
- for part in self.list(
- container=con, options=options_copy):
- if part["success"]:
- objs.extend([
- i['name'] for i in part["listing"]
- ])
- else:
- raise part["error"]
- shuffle(objs)
-
- o_downs.extend(
- self.thread_manager.object_dd_pool.submit(
- self._download_object_job, con, obj,
- options_copy
- ) for obj in objs
- )
+ if options['shuffle']:
+ shuffle(containers)
- for o_down in interruptable_as_completed(o_downs):
- yield o_down.result()
+ for con in containers:
+ for res in self._download_container(
+ con, options_copy):
+ yield res
+ else:
+ raise part["error"]
# If we see a 404 here, the listing of the account failed
except ClientException as err:
@@ -1153,14 +1136,17 @@ class SwiftService(object):
}
return res
- def _download_container(self, container, options):
+ def _submit_page_downloads(self, container, page_generator, options):
try:
- objects = []
- for part in self.list(container=container, options=options):
- if part["success"]:
- objects.extend([o["name"] for o in part["listing"]])
- else:
- raise part["error"]
+ list_page = next(page_generator)
+ except StopIteration:
+ return None
+
+ if list_page["success"]:
+ objects = [o["name"] for o in list_page["listing"]]
+
+ if options["shuffle"]:
+ shuffle(objects)
o_downs = [
self.thread_manager.object_dd_pool.submit(
@@ -1168,14 +1154,60 @@ class SwiftService(object):
) for obj in objects
]
- for o_down in interruptable_as_completed(o_downs):
- yield o_down.result()
+ return o_downs
+ else:
+ raise list_page["error"]
+ def _download_container(self, container, options):
+ _page_generator = self.list(container=container, options=options)
+ try:
+ next_page_downs = self._submit_page_downloads(
+ container, _page_generator, options
+ )
except ClientException as err:
if err.http_status != 404:
raise
- raise SwiftError('Container %r not found' % container,
- container=container)
+ raise SwiftError(
+ 'Container %r not found' % container, container=container
+ )
+
+ error = None
+ while next_page_downs:
+ page_downs = next_page_downs
+ next_page_downs = None
+
+ # Start downloading the next page of list results when
+ # we have completed 80% of the previous page
+ next_page_triggered = False
+ next_page_trigger_point = 0.8 * len(page_downs)
+
+ page_results_yielded = 0
+ for o_down in interruptable_as_completed(page_downs):
+ yield o_down.result()
+
+ # Do we need to start the next set of downloads yet?
+ if not next_page_triggered:
+ page_results_yielded += 1
+ if page_results_yielded >= next_page_trigger_point:
+ try:
+ next_page_downs = self._submit_page_downloads(
+ container, _page_generator, options
+ )
+ except ClientException as err:
+ # Allow the current page to finish downloading
+ error = err
+ except Exception:
+ # Something unexpected went wrong - cancel
+ # remaining downloads
+ for _d in page_downs:
+ _d.cancel()
+ raise
+ finally:
+ # Stop counting and testing
+ next_page_triggered = True
+
+ if error:
+ raise error
# Upload related methods
#
@@ -2080,17 +2112,18 @@ class SwiftService(object):
def _delete_container(self, container, options):
try:
- objs = []
for part in self.list(container=container):
if part["success"]:
- objs.extend([o['name'] for o in part['listing']])
+ objs = [o['name'] for o in part['listing']]
+
+ o_dels = self.delete(
+ container=container, objects=objs, options=options
+ )
+ for res in o_dels:
+ yield res
else:
raise part["error"]
- for res in self.delete(
- container=container, objects=objs, options=options):
- yield res
-
con_del = self.thread_manager.container_pool.submit(
self._delete_empty_container, container
)
diff --git a/swiftclient/shell.py b/swiftclient/shell.py
index a77ea07..35d7c50 100755
--- a/swiftclient/shell.py
+++ b/swiftclient/shell.py
@@ -198,6 +198,14 @@ Optional arguments:
Example --header "content-type:text/plain"
--skip-identical Skip downloading files that are identical on both
sides.
+ --no-shuffle By default, when downloading a complete account or
+ container, download order is randomised in order to
+ to reduce the load on individual drives when multiple
+ clients are executed simultaneously to download the
+ same set of objects (e.g. a nightly automated download
+ script to multiple servers). Enable this option to
+ submit download jobs to the thread pool in the order
+ they are listed in the object store.
'''.strip("\n")
@@ -247,6 +255,14 @@ def st_download(parser, args, output_manager):
'--skip-identical', action='store_true', dest='skip_identical',
default=False, help='Skip downloading files that are identical on '
'both sides.')
+ parser.add_option(
+ '--no-shuffle', action='store_false', dest='shuffle',
+ default=True, help='By default, download order is randomised in order '
+ 'to reduce the load on individual drives when multiple clients are '
+ 'executed simultaneously to download the same set of objects (e.g. a '
+ 'nightly automated download script to multiple servers). Enable this '
+ 'option to submit download jobs to the thread pool in the order they '
+ 'are listed in the object store.')
(options, args) = parse_args(parser, args)
args = args[1:]
if options.out_file == '-':
@@ -353,6 +369,8 @@ def st_download(parser, args, output_manager):
except SwiftError as e:
output_manager.error(e.value)
+ except Exception as e:
+ output_manager.error(e)
st_list_options = '''[--long] [--lh] [--totals] [--prefix <prefix>]
diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py
index 339aca1..db47263 100644
--- a/tests/unit/test_service.py
+++ b/tests/unit/test_service.py
@@ -14,23 +14,25 @@
# limitations under the License.
import mock
import os
+import six
import tempfile
import testtools
import time
+
+from concurrent.futures import Future
from hashlib import md5
from mock import Mock, PropertyMock
from six.moves.queue import Queue, Empty as QueueEmptyError
from six import BytesIO
+from time import sleep
+
import swiftclient
import swiftclient.utils as utils
from swiftclient.client import Connection, ClientException
-from swiftclient.service import SwiftService, SwiftError,\
- SwiftUploadObject
-import six
-if six.PY2:
- import __builtin__ as builtins
-else:
- import builtins
+from swiftclient.service import (
+ SwiftService, SwiftError, SwiftUploadObject
+)
+
clean_os_environ = {}
environ_prefixes = ('ST_', 'OS_')
@@ -39,6 +41,12 @@ for key in os.environ:
clean_os_environ[key] = ''
+if six.PY2:
+ import __builtin__ as builtins
+else:
+ import builtins
+
+
class TestSwiftPostObject(testtools.TestCase):
def setUp(self):
@@ -142,25 +150,24 @@ class TestSwiftReader(testtools.TestCase):
'97ac82a5b825239e782d0339e2d7b910')
-class TestServiceDelete(testtools.TestCase):
- def setUp(self):
- super(TestServiceDelete, self).setUp()
- self.opts = {'leave_segments': False, 'yes_all': False}
- self.exc = Exception('test_exc')
- # Base response to be copied and updated to matched the expected
- # response for each test
- self.expected = {
- 'action': None, # Should be string in the form delete_XX
- 'container': 'test_c',
- 'object': 'test_o',
- 'attempts': 2,
- 'response_dict': {},
- 'success': None # Should be a bool
- }
+class _TestServiceBase(testtools.TestCase):
+ def _assertDictEqual(self, a, b, m=None):
+ # assertDictEqual is not available in py2.6 so use a shallow check
+ # instead
+ if hasattr(self, 'assertDictEqual'):
+ self.assertDictEqual(a, b, m)
+ else:
+ self.assertTrue(isinstance(a, dict))
+ self.assertTrue(isinstance(b, dict))
+ self.assertEqual(len(a), len(b), m)
+ for k, v in a.items():
+ self.assertTrue(k in b, m)
+ self.assertEqual(b[k], v, m)
def _get_mock_connection(self, attempts=2):
m = Mock(spec=Connection)
type(m).attempts = PropertyMock(return_value=attempts)
+ type(m).auth_end_time = PropertyMock(return_value=4)
return m
def _get_queue(self, q):
@@ -178,18 +185,22 @@ class TestServiceDelete(testtools.TestCase):
return expected
- def _assertDictEqual(self, a, b, m=None):
- # assertDictEqual is not available in py2.6 so use a shallow check
- # instead
- if hasattr(self, 'assertDictEqual'):
- self.assertDictEqual(a, b, m)
- else:
- self.assertTrue(isinstance(a, dict))
- self.assertTrue(isinstance(b, dict))
- self.assertEqual(len(a), len(b), m)
- for k, v in a.items():
- self.assertTrue(k in b, m)
- self.assertEqual(b[k], v, m)
+
+class TestServiceDelete(_TestServiceBase):
+ def setUp(self):
+ super(TestServiceDelete, self).setUp()
+ self.opts = {'leave_segments': False, 'yes_all': False}
+ self.exc = Exception('test_exc')
+ # Base response to be copied and updated to matched the expected
+ # response for each test
+ self.expected = {
+ 'action': None, # Should be string in the form delete_XX
+ 'container': 'test_c',
+ 'object': 'test_o',
+ 'attempts': 2,
+ 'response_dict': {},
+ 'success': None # Should be a bool
+ }
def test_delete_segment(self):
mock_q = Queue()
@@ -542,6 +553,226 @@ class TestSwiftUploadObject(testtools.TestCase):
self.assertRaises(SwiftError, self.suo, [])
+class TestServiceList(_TestServiceBase):
+ def setUp(self):
+ super(TestServiceList, self).setUp()
+ self.opts = {'prefix': None, 'long': False, 'delimiter': ''}
+ self.exc = Exception('test_exc')
+ # Base response to be copied and updated to matched the expected
+ # response for each test
+ self.expected = {
+ 'action': None, # Should be list_X_part (account or container)
+ 'container': None, # Should be a string when listing a container
+ 'prefix': None,
+ 'success': None # Should be a bool
+ }
+
+ def test_list_account(self):
+ mock_q = Queue()
+ mock_conn = self._get_mock_connection()
+ get_account_returns = [
+ (None, [{'name': 'test_c'}]),
+ (None, [])
+ ]
+ mock_conn.get_account = Mock(side_effect=get_account_returns)
+
+ expected_r = self._get_expected({
+ 'action': 'list_account_part',
+ 'success': True,
+ 'listing': [{'name': 'test_c'}],
+ 'marker': ''
+ })
+
+ SwiftService._list_account_job(
+ mock_conn, self.opts, mock_q
+ )
+ self._assertDictEqual(expected_r, self._get_queue(mock_q))
+ self.assertIsNone(self._get_queue(mock_q))
+
+ long_opts = dict(self.opts, **{'long': True})
+ mock_conn.head_container = Mock(return_value={'test_m': '1'})
+ get_account_returns = [
+ (None, [{'name': 'test_c'}]),
+ (None, [])
+ ]
+ mock_conn.get_account = Mock(side_effect=get_account_returns)
+
+ expected_r_long = self._get_expected({
+ 'action': 'list_account_part',
+ 'success': True,
+ 'listing': [{'name': 'test_c', 'meta': {'test_m': '1'}}],
+ 'marker': '',
+ })
+
+ SwiftService._list_account_job(
+ mock_conn, long_opts, mock_q
+ )
+ self._assertDictEqual(expected_r_long, self._get_queue(mock_q))
+ self.assertIsNone(self._get_queue(mock_q))
+
+ def test_list_account_exception(self):
+ mock_q = Queue()
+ mock_conn = self._get_mock_connection()
+ mock_conn.get_account = Mock(side_effect=self.exc)
+ expected_r = self._get_expected({
+ 'action': 'list_account_part',
+ 'success': False,
+ 'error': self.exc,
+ 'marker': ''
+ })
+
+ SwiftService._list_account_job(
+ mock_conn, self.opts, mock_q)
+
+ mock_conn.get_account.assert_called_once_with(
+ marker='', prefix=None
+ )
+ self._assertDictEqual(expected_r, self._get_queue(mock_q))
+ self.assertIsNone(self._get_queue(mock_q))
+
+ def test_list_container(self):
+ mock_q = Queue()
+ mock_conn = self._get_mock_connection()
+ get_container_returns = [
+ (None, [{'name': 'test_o'}]),
+ (None, [])
+ ]
+ mock_conn.get_container = Mock(side_effect=get_container_returns)
+
+ expected_r = self._get_expected({
+ 'action': 'list_container_part',
+ 'container': 'test_c',
+ 'success': True,
+ 'listing': [{'name': 'test_o'}],
+ 'marker': ''
+ })
+
+ SwiftService._list_container_job(
+ mock_conn, 'test_c', self.opts, mock_q
+ )
+ self._assertDictEqual(expected_r, self._get_queue(mock_q))
+ self.assertIsNone(self._get_queue(mock_q))
+
+ long_opts = dict(self.opts, **{'long': True})
+ mock_conn.head_container = Mock(return_value={'test_m': '1'})
+ get_container_returns = [
+ (None, [{'name': 'test_o'}]),
+ (None, [])
+ ]
+ mock_conn.get_container = Mock(side_effect=get_container_returns)
+
+ expected_r_long = self._get_expected({
+ 'action': 'list_container_part',
+ 'container': 'test_c',
+ 'success': True,
+ 'listing': [{'name': 'test_o'}],
+ 'marker': ''
+ })
+
+ SwiftService._list_container_job(
+ mock_conn, 'test_c', long_opts, mock_q
+ )
+ self._assertDictEqual(expected_r_long, self._get_queue(mock_q))
+ self.assertIsNone(self._get_queue(mock_q))
+
+ def test_list_container_exception(self):
+ mock_q = Queue()
+ mock_conn = self._get_mock_connection()
+ mock_conn.get_container = Mock(side_effect=self.exc)
+ expected_r = self._get_expected({
+ 'action': 'list_container_part',
+ 'container': 'test_c',
+ 'success': False,
+ 'error': self.exc,
+ 'marker': ''
+ })
+
+ SwiftService._list_container_job(
+ mock_conn, 'test_c', self.opts, mock_q
+ )
+
+ mock_conn.get_container.assert_called_once_with(
+ 'test_c', marker='', delimiter='', prefix=None
+ )
+ self._assertDictEqual(expected_r, self._get_queue(mock_q))
+ self.assertIsNone(self._get_queue(mock_q))
+
+ @mock.patch('swiftclient.service.get_conn')
+ def test_list_queue_size(self, mock_get_conn):
+ mock_conn = self._get_mock_connection()
+ # Return more results than should fit in the results queue
+ get_account_returns = [
+ (None, [{'name': 'container1'}]),
+ (None, [{'name': 'container2'}]),
+ (None, [{'name': 'container3'}]),
+ (None, [{'name': 'container4'}]),
+ (None, [{'name': 'container5'}]),
+ (None, [{'name': 'container6'}]),
+ (None, [{'name': 'container7'}]),
+ (None, [{'name': 'container8'}]),
+ (None, [{'name': 'container9'}]),
+ (None, [{'name': 'container10'}]),
+ (None, [{'name': 'container11'}]),
+ (None, [{'name': 'container12'}]),
+ (None, [{'name': 'container13'}]),
+ (None, [{'name': 'container14'}]),
+ (None, [])
+ ]
+ mock_conn.get_account = Mock(side_effect=get_account_returns)
+ mock_get_conn.return_value = mock_conn
+
+ s = SwiftService(options=self.opts)
+ lg = s.list()
+
+ # Start the generator
+ first_list_part = next(lg)
+
+ # Wait for the number of calls to get_account to reach our expected
+ # value, then let it run some more to make sure the value remains
+ # stable
+ count = mock_conn.get_account.call_count
+ stable = 0
+ while mock_conn.get_account.call_count != count or stable < 5:
+ if mock_conn.get_account.call_count == count:
+ stable += 1
+ else:
+ count = mock_conn.get_account.call_count
+ stable = 0
+ # The test requires a small sleep to allow other threads to
+ # execute - in this mocked environment we assume that if the call
+ # count to get_account has not changed in 0.25s then no more calls
+ # will be made.
+ sleep(0.05)
+
+ stable_get_account_call_count = mock_conn.get_account.call_count
+
+ # Collect all remaining results from the generator
+ list_results = [first_list_part] + list(lg)
+
+ # Make sure the stable call count is correct - this should be 12 calls
+ # to get_account;
+ # 1 for first_list_part
+ # 10 for the values on the queue
+ # 1 for the value blocking whilst trying to place onto the queue
+ self.assertEqual(12, stable_get_account_call_count)
+
+ # Make sure all the containers were listed and placed onto the queue
+ self.assertEqual(15, mock_conn.get_account.call_count)
+
+ # Check the results were all returned
+ observed_listing = []
+ for lir in list_results:
+ observed_listing.append(
+ [li['name'] for li in lir['listing']]
+ )
+ expected_listing = []
+ for gar in get_account_returns[:-1]: # The empty list is not returned
+ expected_listing.append(
+ [li['name'] for li in gar[1]]
+ )
+ self.assertEqual(observed_listing, expected_listing)
+
+
class TestService(testtools.TestCase):
def test_upload_with_bad_segment_size(self):
@@ -589,23 +820,7 @@ class TestService(testtools.TestCase):
self.assertEqual(upload_obj_resp['path'], obj['path'])
-class TestServiceUpload(testtools.TestCase):
-
- def _assertDictEqual(self, a, b, m=None):
- # assertDictEqual is not available in py2.6 so use a shallow check
- # instead
- if not m:
- m = '{0} != {1}'.format(a, b)
-
- if hasattr(self, 'assertDictEqual'):
- self.assertDictEqual(a, b, m)
- else:
- self.assertIsInstance(a, dict, m)
- self.assertIsInstance(b, dict, m)
- self.assertEqual(len(a), len(b), m)
- for k, v in a.items():
- self.assertIn(k, b, m)
- self.assertEqual(b[k], v, m)
+class TestServiceUpload(_TestServiceBase):
def test_upload_segment_job(self):
with tempfile.NamedTemporaryFile() as f:
@@ -1027,7 +1242,7 @@ class TestServiceUpload(testtools.TestCase):
mock_conn.get_container.assert_has_calls(expected)
-class TestServiceDownload(testtools.TestCase):
+class TestServiceDownload(_TestServiceBase):
def setUp(self):
super(TestServiceDownload, self).setUp()
@@ -1036,6 +1251,19 @@ class TestServiceDownload(testtools.TestCase):
self.obj_content = b'c' * 10
self.obj_etag = md5(self.obj_content).hexdigest()
self.obj_len = len(self.obj_content)
+ self.exc = Exception('test_exc')
+ # Base response to be copied and updated to matched the expected
+ # response for each test
+ self.expected = {
+ 'action': 'download_object', # Should always be download_object
+ 'container': 'test_c',
+ 'object': 'test_o',
+ 'attempts': 2,
+ 'response_dict': {},
+ 'path': 'test_o',
+ 'pseudodir': False,
+ 'success': None # Should be a bool
+ }
def _readbody(self):
yield self.obj_content
@@ -1056,6 +1284,166 @@ class TestServiceDownload(testtools.TestCase):
self.assertIn(k, b, m)
self.assertEqual(b[k], v, m)
+ @mock.patch('swiftclient.service.SwiftService.list')
+ @mock.patch('swiftclient.service.SwiftService._submit_page_downloads')
+ @mock.patch('swiftclient.service.interruptable_as_completed')
+ def test_download_container_job(self, as_comp, sub_page, service_list):
+ """
+ Check that paged downloads work correctly
+ """
+ as_comp.side_effect = [
+
+ ]
+ sub_page.side_effect = [
+ range(0, 10), range(0, 10), [] # simulate multiple result pages
+ ]
+ r = Mock(spec=Future)
+ r.result.return_value = self._get_expected({
+ 'success': True,
+ 'start_time': 1,
+ 'finish_time': 2,
+ 'headers_receipt': 3,
+ 'auth_end_time': 4,
+ 'read_length': len(b'objcontent'),
+ })
+ as_comp.side_effect = [
+ [r for _ in range(0, 10)],
+ [r for _ in range(0, 10)]
+ ]
+
+ s = SwiftService()
+ down_gen = s._download_container('test_c', self.opts)
+ results = list(down_gen)
+ self.assertEqual(20, len(results))
+
+ @mock.patch('swiftclient.service.SwiftService.list')
+ @mock.patch('swiftclient.service.SwiftService._submit_page_downloads')
+ @mock.patch('swiftclient.service.interruptable_as_completed')
+ def test_download_container_job_error(
+ self, as_comp, sub_page, service_list):
+ """
+ Check that paged downloads work correctly
+ """
+ class BoomError(Exception):
+ def __init__(self, value):
+ self.value = value
+
+ def __str__(self):
+ return repr(self.value)
+
+ def _make_result():
+ r = Mock(spec=Future)
+ r.result.return_value = self._get_expected({
+ 'success': True,
+ 'start_time': 1,
+ 'finish_time': 2,
+ 'headers_receipt': 3,
+ 'auth_end_time': 4,
+ 'read_length': len(b'objcontent'),
+ })
+ return r
+
+ as_comp.side_effect = [
+
+ ]
+ # We need Futures here because the error will cause a call to .cancel()
+ sub_page_effects = [
+ [_make_result() for _ in range(0, 10)],
+ BoomError('Go Boom')
+ ]
+ sub_page.side_effect = sub_page_effects
+ # ...but we must also mock the returns to as_completed
+ as_comp.side_effect = [
+ [_make_result() for _ in range(0, 10)]
+ ]
+
+ s = SwiftService()
+ self.assertRaises(
+ BoomError,
+ lambda: list(s._download_container('test_c', self.opts))
+ )
+ # This was an unknown error, so make sure we attempt to cancel futures
+ for spe in sub_page_effects[0]:
+ spe.cancel.assert_called_once_with()
+
+ # Now test ClientException
+ sub_page_effects = [
+ [_make_result() for _ in range(0, 10)],
+ ClientException('Go Boom')
+ ]
+ sub_page.side_effect = sub_page_effects
+ as_comp.side_effect = [
+ [_make_result() for _ in range(0, 10)],
+ [_make_result() for _ in range(0, 10)]
+ ]
+ self.assertRaises(
+ ClientException,
+ lambda: list(s._download_container('test_c', self.opts))
+ )
+ # This was a ClientException, so make sure we don't cancel futures
+ for spe in sub_page_effects[0]:
+ self.assertFalse(spe.cancel.called)
+
+ def test_download_object_job(self):
+ mock_conn = self._get_mock_connection()
+ objcontent = six.BytesIO(b'objcontent')
+ mock_conn.get_object.side_effect = [
+ ({'content-type': 'text/plain',
+ 'etag': '2cbbfe139a744d6abbe695e17f3c1991'},
+ objcontent)
+ ]
+ expected_r = self._get_expected({
+ 'success': True,
+ 'start_time': 1,
+ 'finish_time': 2,
+ 'headers_receipt': 3,
+ 'auth_end_time': 4,
+ 'read_length': len(b'objcontent'),
+ })
+
+ with mock.patch.object(builtins, 'open') as mock_open:
+ written_content = Mock()
+ mock_open.return_value = written_content
+ s = SwiftService()
+ _opts = self.opts.copy()
+ _opts['no_download'] = False
+ actual_r = s._download_object_job(
+ mock_conn, 'test_c', 'test_o', _opts)
+ actual_r = dict( # Need to override the times we got from the call
+ actual_r,
+ **{
+ 'start_time': 1,
+ 'finish_time': 2,
+ 'headers_receipt': 3
+ }
+ )
+ mock_open.assert_called_once_with('test_o', 'wb')
+ written_content.write.assert_called_once_with(b'objcontent')
+
+ mock_conn.get_object.assert_called_once_with(
+ 'test_c', 'test_o', resp_chunk_size=65536, headers={},
+ response_dict={}
+ )
+ self._assertDictEqual(expected_r, actual_r)
+
+ def test_download_object_job_exception(self):
+ mock_conn = self._get_mock_connection()
+ mock_conn.get_object = Mock(side_effect=self.exc)
+ expected_r = self._get_expected({
+ 'success': False,
+ 'error': self.exc
+ })
+
+ s = SwiftService()
+ actual_r = s._download_object_job(
+ mock_conn, 'test_c', 'test_o', self.opts)
+
+ mock_conn.get_object.assert_called_once_with(
+ 'test_c', 'test_o', resp_chunk_size=65536, headers={},
+ response_dict={}
+ )
+ self._assertDictEqual(expected_r, actual_r)
+
def test_download(self):
service = SwiftService()
with mock.patch('swiftclient.service.Connection') as mock_conn:
diff --git a/tests/unit/test_shell.py b/tests/unit/test_shell.py
index 8384f7a..5f9d497 100644
--- a/tests/unit/test_shell.py
+++ b/tests/unit/test_shell.py
@@ -377,6 +377,62 @@ class TestShell(unittest.TestCase):
swiftclient.shell.main(argv)
self.assertEqual('objcontent', output.out)
+ @mock.patch('swiftclient.service.shuffle')
+ @mock.patch('swiftclient.service.Connection')
+ def test_download_shuffle(self, connection, mock_shuffle):
+ # Test that the container and object lists are shuffled
+ mock_shuffle.side_effect = lambda l: l
+ connection.return_value.get_object.return_value = [
+ {'content-type': 'text/plain',
+ 'etag': EMPTY_ETAG},
+ '']
+
+ connection.return_value.get_container.side_effect = [
+ (None, [{'name': 'object'}]),
+ (None, [{'name': 'pseudo/'}]),
+ (None, []),
+ ]
+ connection.return_value.auth_end_time = 0
+ connection.return_value.attempts = 0
+ connection.return_value.get_account.side_effect = [
+ (None, [{'name': 'container'}]),
+ (None, [])
+ ]
+
+ with mock.patch(BUILTIN_OPEN) as mock_open:
+ argv = ["", "download", "--all"]
+ swiftclient.shell.main(argv)
+ self.assertEqual(3, mock_shuffle.call_count)
+ mock_shuffle.assert_any_call(['container'])
+ mock_shuffle.assert_any_call(['object'])
+ mock_shuffle.assert_any_call(['pseudo/'])
+ mock_open.assert_called_once_with('container/object', 'wb')
+
+ # Test that the container and object lists are not shuffled
+ mock_shuffle.reset_mock()
+ connection.return_value.get_object.return_value = [
+ {'content-type': 'text/plain',
+ 'etag': 'd41d8cd98f00b204e9800998ecf8427e'},
+ '']
+
+ connection.return_value.get_container.side_effect = [
+ (None, [{'name': 'object'}]),
+ (None, [{'name': 'pseudo/'}]),
+ (None, []),
+ ]
+ connection.return_value.auth_end_time = 0
+ connection.return_value.attempts = 0
+ connection.return_value.get_account.side_effect = [
+ (None, [{'name': 'container'}]),
+ (None, [])
+ ]
+
+ with mock.patch(BUILTIN_OPEN) as mock_open:
+ argv = ["", "download", "--all", "--no-shuffle"]
+ swiftclient.shell.main(argv)
+ self.assertEqual(0, mock_shuffle.call_count)
+ mock_open.assert_called_once_with('container/object', 'wb')
+
@mock.patch('swiftclient.service.Connection')
def test_download_no_content_type(self, connection):
connection.return_value.get_object.return_value = [