summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-x.unittests2
-rwxr-xr-xbin/swift658
-rw-r--r--doc/source/conf.py7
-rw-r--r--doc/source/swiftclient.rst16
-rw-r--r--swiftclient/client.py60
-rw-r--r--swiftclient/exceptions.py72
-rw-r--r--swiftclient/multithreading.py241
-rw-r--r--swiftclient/utils.py1
-rw-r--r--tests/test_multithreading.py334
9 files changed, 928 insertions, 463 deletions
diff --git a/.unittests b/.unittests
index 4758296..bf5b027 100755
--- a/.unittests
+++ b/.unittests
@@ -3,6 +3,6 @@ set -e
python setup.py testr --coverage
RET=$?
-coverage report
+coverage report -m
rm -f .coverage
exit $RET
diff --git a/bin/swift b/bin/swift
index 28ee31d..35a7675 100755
--- a/bin/swift
+++ b/bin/swift
@@ -22,12 +22,9 @@ from hashlib import md5
from optparse import OptionParser, SUPPRESS_HELP
from os import environ, listdir, makedirs, utime, _exit as os_exit
from os.path import basename, dirname, getmtime, getsize, isdir, join
-from Queue import Queue
from random import shuffle
-from sys import argv, exc_info, exit, stderr, stdout
-from threading import Thread
+from sys import argv, exit, stderr, stdout
from time import sleep, time, gmtime, strftime
-from traceback import format_exception
from urllib import quote, unquote
try:
@@ -35,7 +32,10 @@ try:
except ImportError:
import json
-from swiftclient import Connection, ClientException, HTTPException, utils
+from swiftclient import Connection, HTTPException
+from swiftclient.utils import config_true_value
+from swiftclient.multithreading import MultiThreadingManager
+from swiftclient.exceptions import ClientException
from swiftclient.version import version_info
@@ -63,75 +63,6 @@ def mkdirs(path):
raise
-def put_errors_from_threads(threads, error_queue):
- """
- Places any errors from the threads into error_queue.
- :param threads: A list of QueueFunctionThread instances.
- :param error_queue: A queue to put error strings into.
- :returns: True if any errors were found.
- """
- was_error = False
- for thread in threads:
- for info in thread.exc_infos:
- was_error = True
- if isinstance(info[1], ClientException):
- error_queue.put(str(info[1]))
- else:
- error_queue.put(''.join(format_exception(*info)))
- return was_error
-
-
-class StopWorkerThreadSignal(object):
- pass
-
-
-class QueueFunctionThread(Thread):
-
- def __init__(self, queue, func, *args, **kwargs):
- """
- Calls func for each item in queue; func is called with a queued
- item as the first arg followed by *args and **kwargs. Use the
- PriorityQueue for sending quit signal when Ctrl-C is pressed.
- """
- Thread.__init__(self)
- self.queue = queue
- self.func = func
- self.args = args
- self.kwargs = kwargs
- self.exc_infos = []
- self.results = []
- self.store_results = kwargs.pop('store_results', False)
-
- def run(self):
- while True:
- try:
- item = self.queue.get()
- if isinstance(item, StopWorkerThreadSignal):
- break
- except:
- # This catch is important and it may occur when ctrl-C is
- # pressed, in this case simply quit the thread
- break
- else:
- try:
- self.func(item, *self.args, **self.kwargs)
- except Exception:
- self.exc_infos.append(exc_info())
-
-
-def shutdown_worker_threads(queue, thread_list):
- """
- Takes a job queue and a list of associated QueueFunctionThread objects,
- puts a StopWorkerThreadSignal object into the queue, and waits for the
- queue to flush.
- """
- for thread in [t for t in thread_list if t.isAlive()]:
- queue.put(StopWorkerThreadSignal())
-
- while any(map(QueueFunctionThread.is_alive, thread_list)):
- sleep(0.05)
-
-
def immediate_exit(signum, frame):
stderr.write(" Aborted\n")
os_exit(2)
@@ -145,7 +76,7 @@ delete [options] --all OR delete container [options] [object] [object] ...
--leave-segments option.'''.strip('\n')
-def st_delete(parser, args, print_queue, error_queue):
+def st_delete(parser, args, thread_manager):
parser.add_option(
'-a', '--all', action='store_true', dest='yes_all',
default=False, help='Indicates that you really want to delete '
@@ -164,20 +95,19 @@ def st_delete(parser, args, print_queue, error_queue):
(options, args) = parse_args(parser, args)
args = args[1:]
if (not args and not options.yes_all) or (args and options.yes_all):
- error_queue.put('Usage: %s [options] %s' %
- (basename(argv[0]), st_delete_help))
+ thread_manager.error('Usage: %s [options] %s',
+ basename(argv[0]), st_delete_help)
return
def _delete_segment((container, obj), conn):
conn.delete_object(container, obj)
if options.verbose:
if conn.attempts > 2:
- print_queue.put('%s/%s [after %d attempts]' %
- (container, obj, conn.attempts))
+ thread_manager.print_msg(
+ '%s/%s [after %d attempts]', container,
+ obj, conn.attempts)
else:
- print_queue.put('%s/%s' % (container, obj))
-
- object_queue = Queue(10000)
+ thread_manager.print_msg('%s/%s', container, obj)
def _delete_object((container, obj), conn):
try:
@@ -187,7 +117,7 @@ def st_delete(parser, args, print_queue, error_queue):
try:
headers = conn.head_object(container, obj)
old_manifest = headers.get('x-object-manifest')
- if utils.config_true_value(
+ if config_true_value(
headers.get('x-static-large-object')):
query_string = 'multipart-manifest=delete'
except ClientException as err:
@@ -195,7 +125,10 @@ def st_delete(parser, args, print_queue, error_queue):
raise
conn.delete_object(container, obj, query_string=query_string)
if old_manifest:
- segment_queue = Queue(10000)
+ segment_manager = thread_manager.queue_manager(
+ _delete_segment, options.object_threads,
+ connection_maker=create_connection)
+ segment_queue = segment_manager.queue
scontainer, sprefix = old_manifest.split('/', 1)
scontainer = unquote(scontainer)
sprefix = unquote(sprefix).rstrip('/') + '/'
@@ -203,32 +136,23 @@ def st_delete(parser, args, print_queue, error_queue):
prefix=sprefix)[1]:
segment_queue.put((scontainer, delobj['name']))
if not segment_queue.empty():
- segment_threads = [QueueFunctionThread(
- segment_queue,
- _delete_segment, create_connection()) for _junk in
- xrange(options.object_threads)]
- for thread in segment_threads:
- thread.start()
- shutdown_worker_threads(segment_queue, segment_threads)
- put_errors_from_threads(segment_threads, error_queue)
+ with segment_manager:
+ pass
if options.verbose:
path = options.yes_all and join(container, obj) or obj
if path[:1] in ('/', '\\'):
path = path[1:]
if conn.attempts > 1:
- print_queue.put('%s [after %d attempts]' %
- (path, conn.attempts))
+ thread_manager.print_msg('%s [after %d attempts]', path,
+ conn.attempts)
else:
- print_queue.put(path)
+ thread_manager.print_msg(path)
except ClientException as err:
if err.http_status != 404:
raise
- error_queue.put('Object %s not found' %
- repr('%s/%s' % (container, obj)))
+ thread_manager.error("Object '%s/%s' not found", container, obj)
- container_queue = Queue(10000)
-
- def _delete_container(container, conn):
+ def _delete_container(container, conn, object_queue):
try:
marker = ''
while True:
@@ -256,54 +180,43 @@ def st_delete(parser, args, print_queue, error_queue):
except ClientException as err:
if err.http_status != 404:
raise
- error_queue.put('Container %s not found' % repr(container))
+ thread_manager.error('Container %r not found', container)
create_connection = lambda: get_conn(options)
- object_threads = \
- [QueueFunctionThread(object_queue, _delete_object, create_connection())
- for _junk in xrange(options.object_threads)]
- for thread in object_threads:
- thread.start()
- container_threads = \
- [QueueFunctionThread(container_queue, _delete_container,
- create_connection())
- for _junk in xrange(options.container_threads)]
- for thread in container_threads:
- thread.start()
-
- try:
- if not args:
- conn = create_connection()
- try:
- marker = ''
- while True:
- containers = [
- c['name'] for c in conn.get_account(marker=marker)[1]]
- if not containers:
- break
- for container in containers:
- container_queue.put(container)
- marker = containers[-1]
- except ClientException as err:
- if err.http_status != 404:
- raise
- error_queue.put('Account not found')
- elif len(args) == 1:
- if '/' in args[0]:
- print >> stderr, 'WARNING: / in container name; you might ' \
- 'have meant %r instead of %r.' % (
+ obj_manager = thread_manager.queue_manager(
+ _delete_object, options.object_threads,
+ connection_maker=create_connection)
+ with obj_manager as object_queue:
+ cont_manager = thread_manager.queue_manager(
+ _delete_container, options.container_threads, object_queue,
+ connection_maker=create_connection)
+ with cont_manager as container_queue:
+ if not args:
+ conn = create_connection()
+ try:
+ marker = ''
+ while True:
+ containers = [
+ c['name']
+ for c in conn.get_account(marker=marker)[1]]
+ if not containers:
+ break
+ for container in containers:
+ container_queue.put(container)
+ marker = containers[-1]
+ except ClientException as err:
+ if err.http_status != 404:
+ raise
+ thread_manager.error('Account not found')
+ elif len(args) == 1:
+ if '/' in args[0]:
+ print >> stderr, 'WARNING: / in container name; you '
+ 'might have meant %r instead of %r.' % (
args[0].replace('/', ' ', 1), args[0])
- conn = create_connection()
- _delete_container(args[0], conn)
- else:
- for obj in args[1:]:
- object_queue.put((args[0], obj))
- finally:
- shutdown_worker_threads(container_queue, container_threads)
- put_errors_from_threads(container_threads, error_queue)
-
- shutdown_worker_threads(object_queue, object_threads)
- put_errors_from_threads(object_threads, error_queue)
+ container_queue.put(args[0])
+ else:
+ for obj in args[1:]:
+ object_queue.put((args[0], obj))
st_download_help = '''
@@ -318,7 +231,7 @@ download --all [options] OR download container [options] [object] [object] ...
just redirect to stdout.'''.strip('\n')
-def st_download(parser, args, print_queue, error_queue):
+def st_download(parser, args, thread_manager):
parser.add_option(
'-a', '--all', action='store_true', dest='yes_all',
default=False, help='Indicates that you really want to download '
@@ -350,12 +263,10 @@ def st_download(parser, args, print_queue, error_queue):
if options.out_file and len(args) != 2:
exit('-o option only allowed for single file downloads')
if (not args and not options.yes_all) or (args and options.yes_all):
- error_queue.put('Usage: %s [options] %s' %
- (basename(argv[0]), st_download_help))
+ thread_manager.error('Usage: %s [options] %s', basename(argv[0]),
+ st_download_help)
return
- object_queue = Queue(10000)
-
def _download_object(queue_arg, conn):
if len(queue_arg) == 2:
container, obj = queue_arg
@@ -415,11 +326,12 @@ def st_download(parser, args, print_queue, error_queue):
if not options.no_download:
fp.close()
if md5sum and md5sum.hexdigest() != etag:
- error_queue.put('%s: md5sum != etag, %s != %s' %
- (path, md5sum.hexdigest(), etag))
+ thread_manager.error('%s: md5sum != etag, %s != %s',
+ path, md5sum.hexdigest(), etag)
if content_length is not None and read_length != content_length:
- error_queue.put('%s: read_length != content_length, %d != %d' %
- (path, read_length, content_length))
+ thread_manager.error(
+ '%s: read_length != content_length, %d != %d',
+ path, read_length, content_length)
if 'x-object-meta-mtime' in headers and not options.out_file \
and not options.no_download:
@@ -431,19 +343,23 @@ def st_download(parser, args, print_queue, error_queue):
header_receipt - start_time, finish_time - start_time,
float(read_length) / (finish_time - start_time) / 1000000)
if conn.attempts > 1:
- print_queue.put('%s [%s after %d attempts]' %
- (path, time_str, conn.attempts))
+ thread_manager.print_msg('%s [%s after %d attempts]', path,
+ time_str, conn.attempts)
else:
- print_queue.put('%s [%s]' % (path, time_str))
+ thread_manager.print_msg('%s [%s]', path, time_str)
except ClientException as err:
if err.http_status != 404:
raise
- error_queue.put('Object %s not found' %
- repr('%s/%s' % (container, obj)))
+ thread_manager.error("Object '%s/%s' not found", container, obj)
- container_queue = Queue(10000)
-
- def _download_container(container, conn, prefix=None):
+ def _download_container(queue_arg, conn):
+ if len(queue_arg) == 2:
+ container, object_queue = queue_arg
+ prefix = None
+ elif len(queue_arg) == 3:
+ container, object_queue, prefix = queue_arg
+ else:
+ raise Exception("Invalid queue_arg length of %s" % len(queue_arg))
try:
marker = options.marker
while True:
@@ -460,64 +376,49 @@ def st_download(parser, args, print_queue, error_queue):
except ClientException as err:
if err.http_status != 404:
raise
- error_queue.put('Container %s not found' % repr(container))
+ thread_manager.error('Container %r not found', container)
create_connection = lambda: get_conn(options)
- object_threads = [QueueFunctionThread(
- object_queue, _download_object,
- create_connection()) for _junk in xrange(options.object_threads)]
- for thread in object_threads:
- thread.start()
- container_threads = [QueueFunctionThread(
- container_queue,
- _download_container, create_connection())
- for _junk in xrange(options.container_threads)]
- for thread in container_threads:
- thread.start()
-
- # We musn't let the main thread die with an exception while non-daemonic
- # threads exist or the process with hang and ignore Ctrl-C. So we catch
- # anything and tidy up the threads in a finally block.
- try:
- if not args:
- # --all case
- conn = create_connection()
- try:
- marker = options.marker
- while True:
- containers = [
- c['name'] for c in conn.get_account(
- marker=marker, prefix=options.prefix)[1]]
- if not containers:
- break
- marker = containers[-1]
- shuffle(containers)
- for container in containers:
- container_queue.put(container)
- except ClientException as err:
- if err.http_status != 404:
- raise
- error_queue.put('Account not found')
- elif len(args) == 1:
- if '/' in args[0]:
- print >> stderr, ('WARNING: / in container name; you might '
- 'have meant %r instead of %r.' % (
+ obj_manager = thread_manager.queue_manager(
+ _download_object, options.object_threads,
+ connection_maker=create_connection)
+ with obj_manager as object_queue:
+ cont_manager = thread_manager.queue_manager(
+ _download_container, options.container_threads,
+ connection_maker=create_connection)
+ with cont_manager as container_queue:
+ if not args:
+ # --all case
+ conn = create_connection()
+ try:
+ marker = options.marker
+ while True:
+ containers = [
+ c['name'] for c in conn.get_account(
+ marker=marker, prefix=options.prefix)[1]]
+ if not containers:
+ break
+ marker = containers[-1]
+ shuffle(containers)
+ for container in containers:
+ container_queue.put((container, object_queue))
+ except ClientException as err:
+ if err.http_status != 404:
+ raise
+ thread_manager.error('Account not found')
+ elif len(args) == 1:
+ if '/' in args[0]:
+ print >> stderr, ('WARNING: / in container name; you '
+ 'might have meant %r instead of %r.' % (
args[0].replace('/', ' ', 1), args[0]))
- _download_container(args[0], create_connection(),
- options.prefix)
- else:
- if len(args) == 2:
- obj = args[1]
- object_queue.put((args[0], obj, options.out_file))
+ container_queue.put((args[0], object_queue, options.prefix))
else:
- for obj in args[1:]:
- object_queue.put((args[0], obj))
- finally:
- shutdown_worker_threads(container_queue, container_threads)
- put_errors_from_threads(container_threads, error_queue)
-
- shutdown_worker_threads(object_queue, object_threads)
- put_errors_from_threads(object_threads, error_queue)
+ if len(args) == 2:
+ obj = args[1]
+ object_queue.put((args[0], obj, options.out_file))
+ else:
+ for obj in args[1:]:
+ object_queue.put((args[0], obj))
def prt_bytes(bytes, human_flag):
@@ -560,7 +461,7 @@ list [options] [container]
'''.strip('\n')
-def st_list(parser, args, print_queue, error_queue):
+def st_list(parser, args, thread_manager):
parser.add_option(
'-l', '--long', dest='long', help='Long listing '
'similar to ls -l command', action='store_true', default=False)
@@ -583,8 +484,8 @@ def st_list(parser, args, print_queue, error_queue):
if options.delimiter and not args:
exit('-d option only allowed for container listings')
if len(args) > 1 or len(args) == 1 and args[0].find('/') >= 0:
- error_queue.put('Usage: %s [options] %s' %
- (basename(argv[0]), st_list_help))
+ thread_manager.error('Usage: %s [options] %s', basename(argv[0]),
+ st_list_help)
return
conn = get_conn(options)
@@ -605,12 +506,13 @@ def st_list(parser, args, print_queue, error_queue):
item_name = item.get('name')
if not options.long and not options.human:
- print_queue.put(item.get('name', item.get('subdir')))
+ thread_manager.print_msg(
+ item.get('name', item.get('subdir')))
else:
item_bytes = item.get('bytes')
total_bytes += item_bytes
if len(args) == 0: # listing containers
- bytes = prt_bytes(item_bytes, options.human)
+ byte_str = prt_bytes(item_bytes, options.human)
count = item.get('count')
total_count += count
try:
@@ -620,41 +522,42 @@ def st_list(parser, args, print_queue, error_queue):
except ClientException:
datestamp = '????-??-?? ??:??:??'
if not options.totals:
- print_queue.put("%5s %s %s %s" %
- (count, bytes, datestamp,
- item_name))
+ thread_manager.print_msg("%5s %s %s %s", count,
+ byte_str, datestamp,
+ item_name)
else: # list container contents
subdir = item.get('subdir')
if subdir is None:
- bytes = prt_bytes(item_bytes, options.human)
+ byte_str = prt_bytes(item_bytes, options.human)
date, xtime = item.get('last_modified').split('T')
xtime = xtime.split('.')[0]
else:
- bytes = prt_bytes(0, options.human)
+ byte_str = prt_bytes(0, options.human)
date = xtime = ''
item_name = subdir
if not options.totals:
- print_queue.put("%s %10s %8s %s" %
- (bytes, date, xtime, item_name))
+ thread_manager.print_msg("%s %10s %8s %s",
+ byte_str, date, xtime,
+ item_name)
marker = items[-1].get('name', items[-1].get('subdir'))
# report totals
if options.long or options.human:
if len(args) == 0:
- print_queue.put("%5s %s" % (prt_bytes(total_count, True),
- prt_bytes(total_bytes,
- options.human)))
+ thread_manager.print_msg(
+ "%5s %s", prt_bytes(total_count, True),
+ prt_bytes(total_bytes, options.human))
else:
- print_queue.put("%s" % (prt_bytes(total_bytes, options.human)))
+ thread_manager.print_msg(prt_bytes(total_bytes, options.human))
except ClientException as err:
if err.http_status != 404:
raise
if not args:
- error_queue.put('Account not found')
+ thread_manager.error('Account not found')
else:
- error_queue.put('Container %s not found' % repr(args[0]))
+ thread_manager.error('Container %r not found', args[0])
st_stat_help = '''
stat [container] [object]
@@ -663,7 +566,7 @@ stat [container] [object]
like 'list --lh' noting number of objs a multiple of 1024'''.strip('\n')
-def st_stat(parser, args, print_queue, error_queue):
+def st_stat(parser, args, thread_manager):
parser.add_option(
'--lh', dest='human', help="report totals like 'list --lh'",
action='store_true', default=False)
@@ -674,36 +577,36 @@ def st_stat(parser, args, print_queue, error_queue):
try:
headers = conn.head_account()
if options.verbose > 1:
- print_queue.put('''
+ thread_manager.print_msg('''
StorageURL: %s
Auth Token: %s
-'''.strip('\n') % (conn.url, conn.token))
+'''.strip('\n'), conn.url, conn.token)
container_count = int(headers.get('x-account-container-count', 0))
object_count = prt_bytes(headers.get('x-account-object-count', 0),
options.human).lstrip()
bytes_used = prt_bytes(headers.get('x-account-bytes-used', 0),
options.human).lstrip()
- print_queue.put('''
+ thread_manager.print_msg('''
Account: %s
Containers: %d
Objects: %s
- Bytes: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], container_count,
- object_count, bytes_used))
+ Bytes: %s'''.strip('\n'), conn.url.rsplit('/', 1)[-1], container_count,
+ object_count, bytes_used)
for key, value in headers.items():
if key.startswith('x-account-meta-'):
- print_queue.put(
- '%10s: %s' % ('Meta %s' %
- key[len('x-account-meta-'):].title(), value))
+ thread_manager.print_msg(
+ '%10s: %s',
+ 'Meta %s' % key[len('x-account-meta-'):].title(),
+ value)
for key, value in headers.items():
if not key.startswith('x-account-meta-') and key not in (
'content-length', 'date', 'x-account-container-count',
'x-account-object-count', 'x-account-bytes-used'):
- print_queue.put(
- '%10s: %s' % (key.title(), value))
+ thread_manager.print_msg('%10s: %s', key.title(), value)
except ClientException as err:
if err.http_status != 404:
raise
- error_queue.put('Account not found')
+ thread_manager.error('Account not found')
elif len(args) == 1:
if '/' in args[0]:
print >> stderr, 'WARNING: / in container name; you might have ' \
@@ -716,7 +619,7 @@ Containers: %d
options.human).lstrip()
bytes_used = prt_bytes(headers.get('x-container-bytes-used', 0),
options.human).lstrip()
- print_queue.put('''
+ thread_manager.print_msg('''
Account: %s
Container: %s
Objects: %s
@@ -724,69 +627,68 @@ Container: %s
Read ACL: %s
Write ACL: %s
Sync To: %s
- Sync Key: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
- object_count, bytes_used,
- headers.get('x-container-read', ''),
- headers.get('x-container-write', ''),
- headers.get('x-container-sync-to', ''),
- headers.get('x-container-sync-key', '')))
+ Sync Key: %s'''.strip('\n'), conn.url.rsplit('/', 1)[-1], args[0],
+ object_count, bytes_used,
+ headers.get('x-container-read', ''),
+ headers.get('x-container-write', ''),
+ headers.get('x-container-sync-to', ''),
+ headers.get('x-container-sync-key', ''))
for key, value in headers.items():
if key.startswith('x-container-meta-'):
- print_queue.put(
- '%9s: %s' % ('Meta %s' %
- key[len('x-container-meta-'):].title(), value))
+ thread_manager.print_msg(
+ '%9s: %s',
+ 'Meta %s' % key[len('x-container-meta-'):].title(),
+ value)
for key, value in headers.items():
if not key.startswith('x-container-meta-') and key not in (
'content-length', 'date', 'x-container-object-count',
'x-container-bytes-used', 'x-container-read',
'x-container-write', 'x-container-sync-to',
'x-container-sync-key'):
- print_queue.put(
- '%9s: %s' % (key.title(), value))
+ thread_manager.print_msg('%9s: %s', key.title(), value)
except ClientException as err:
if err.http_status != 404:
raise
- error_queue.put('Container %s not found' % repr(args[0]))
+ thread_manager.error('Container %r not found', args[0])
elif len(args) == 2:
try:
headers = conn.head_object(args[0], args[1])
- print_queue.put('''
+ thread_manager.print_msg('''
Account: %s
Container: %s
Object: %s
- Content Type: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
- args[1], headers.get('content-type')))
+ Content Type: %s'''.strip('\n'), conn.url.rsplit('/', 1)[-1], args[0],
+ args[1], headers.get('content-type'))
if 'content-length' in headers:
- print_queue.put('Content Length: %s' %
- prt_bytes(headers['content-length'],
- options.human).lstrip())
+ thread_manager.print_msg('Content Length: %s',
+ prt_bytes(headers['content-length'],
+ options.human).lstrip())
if 'last-modified' in headers:
- print_queue.put(' Last Modified: %s' %
- headers['last-modified'])
+ thread_manager.print_msg(' Last Modified: %s',
+ headers['last-modified'])
if 'etag' in headers:
- print_queue.put(' ETag: %s' % headers['etag'])
+ thread_manager.print_msg(' ETag: %s', headers['etag'])
if 'x-object-manifest' in headers:
- print_queue.put(' Manifest: %s' %
- headers['x-object-manifest'])
+ thread_manager.print_msg(' Manifest: %s',
+ headers['x-object-manifest'])
for key, value in headers.items():
if key.startswith('x-object-meta-'):
- print_queue.put(
- '%14s: %s' % ('Meta %s' %
- key[len('x-object-meta-'):].title(), value))
+ thread_manager.print_msg(
+ '%14s: %s',
+ 'Meta %s' % key[len('x-object-meta-'):].title(),
+ value)
for key, value in headers.items():
if not key.startswith('x-object-meta-') and key not in (
'content-type', 'content-length', 'last-modified',
'etag', 'date', 'x-object-manifest'):
- print_queue.put(
- '%14s: %s' % (key.title(), value))
+ thread_manager.print_msg('%14s: %s', key.title(), value)
except ClientException as err:
if err.http_status != 404:
raise
- error_queue.put('Object %s not found' %
- repr('%s/%s' % (args[0], args[1])))
+ thread_manager.error("Object %s/%s not found", args[0], args[1])
else:
- error_queue.put('Usage: %s [options] %s' %
- (basename(argv[0]), st_stat_help))
+ thread_manager.error('Usage: %s [options] %s', basename(argv[0]),
+ st_stat_help)
st_post_help = '''
@@ -800,7 +702,7 @@ post [options] [container] [object]
post -m Color:Blue -m Size:Large'''.strip('\n')
-def st_post(parser, args, print_queue, error_queue):
+def st_post(parser, args, thread_manager):
parser.add_option(
'-r', '--read-acl', dest='read_acl', help='Sets the '
'Read ACL for containers. Quick summary of ACL syntax: .r:*, '
@@ -831,19 +733,21 @@ def st_post(parser, args, print_queue, error_queue):
exit('-r, -w, -t, and -k options only allowed for containers')
conn = get_conn(options)
if not args:
- headers = split_headers(options.meta, 'X-Account-Meta-', error_queue)
+ headers = split_headers(
+ options.meta, 'X-Account-Meta-', thread_manager)
try:
conn.post_account(headers=headers)
except ClientException as err:
if err.http_status != 404:
raise
- error_queue.put('Account not found')
+ thread_manager.error('Account not found')
elif len(args) == 1:
if '/' in args[0]:
print >> stderr, 'WARNING: / in container name; you might have ' \
'meant %r instead of %r.' % \
(args[0].replace('/', ' ', 1), args[0])
- headers = split_headers(options.meta, 'X-Container-Meta-', error_queue)
+ headers = split_headers(options.meta, 'X-Container-Meta-',
+ thread_manager)
if options.read_acl is not None:
headers['X-Container-Read'] = options.read_acl
if options.write_acl is not None:
@@ -859,19 +763,18 @@ def st_post(parser, args, print_queue, error_queue):
raise
conn.put_container(args[0], headers=headers)
elif len(args) == 2:
- headers = split_headers(options.meta, 'X-Object-Meta-', error_queue)
+ headers = split_headers(options.meta, 'X-Object-Meta-', thread_manager)
# add header options to the headers object for the request.
- headers.update(split_headers(options.header, '', error_queue))
+ headers.update(split_headers(options.header, '', thread_manager))
try:
conn.post_object(args[0], args[1], headers=headers)
except ClientException as err:
if err.http_status != 404:
raise
- error_queue.put('Object %s not found' %
- repr('%s/%s' % (args[0], args[1])))
+ thread_manager.error("Object '%s/%s' not found", args[0], args[1])
else:
- error_queue.put('Usage: %s [options] %s' %
- (basename(argv[0]), st_post_help))
+ thread_manager.error('Usage: %s [options] %s', basename(argv[0]),
+ st_post_help)
st_upload_help = '''
@@ -885,7 +788,7 @@ upload [options] container file_or_directory [file_or_directory] [...]
'''.strip('\n')
-def st_upload(parser, args, print_queue, error_queue):
+def st_upload(parser, args, thread_manager):
parser.add_option(
'-c', '--changed', action='store_true', dest='changed',
default=False, help='Will only upload files that have changed since '
@@ -924,10 +827,9 @@ def st_upload(parser, args, print_queue, error_queue):
(options, args) = parse_args(parser, args)
args = args[1:]
if len(args) < 2:
- error_queue.put('Usage: %s [options] %s' %
- (basename(argv[0]), st_upload_help))
+ thread_manager.error(
+ 'Usage: %s [options] %s', basename(argv[0]), st_upload_help)
return
- object_queue = Queue(10000)
def _segment_job(job, conn):
if job.get('delete', False):
@@ -945,10 +847,10 @@ def st_upload(parser, args, print_queue, error_queue):
job['segment_etag'] = etag
if options.verbose and 'log_line' in job:
if conn.attempts > 1:
- print_queue.put('%s [after %d attempts]' %
- (job['log_line'], conn.attempts))
+ thread_manager.print_msg('%s [after %d attempts]',
+ job['log_line'], conn.attempts)
else:
- print_queue.put(job['log_line'])
+ thread_manager.print_msg(job['log_line'])
return job
def _object_job(job, conn):
@@ -998,13 +900,13 @@ def st_upload(parser, args, print_queue, error_queue):
return
if not options.leave_segments:
old_manifest = headers.get('x-object-manifest')
- if utils.config_true_value(
+ if config_true_value(
headers.get('x-static-large-object')):
headers, manifest_data = conn.get_object(
container, obj,
query_string='multipart-manifest=get')
for old_seg in json.loads(manifest_data):
- seg_path = old_seg['name'].lstrip('/')
+ seg_path = old_seg['path'].lstrip('/')
if isinstance(seg_path, unicode):
seg_path = seg_path.encode('utf-8')
old_slo_manifest_paths.append(seg_path)
@@ -1013,7 +915,7 @@ def st_upload(parser, args, print_queue, error_queue):
raise
# Merge the command line header options to the put_headers
put_headers.update(split_headers(options.header, '',
- error_queue))
+ thread_manager))
# Don't do segment job if object is not big enough
if options.segment_size and \
getsize(path) > int(options.segment_size):
@@ -1021,15 +923,15 @@ def st_upload(parser, args, print_queue, error_queue):
if options.segment_container:
seg_container = options.segment_container
full_size = getsize(path)
- segment_queue = Queue(10000)
- segment_threads = [
- QueueFunctionThread(
- segment_queue, _segment_job,
- create_connection(), store_results=True)
- for _junk in xrange(options.segment_threads)]
- for thread in segment_threads:
- thread.start()
- try:
+
+ slo_segments = []
+ error_counter = [0]
+ segment_manager = thread_manager.queue_manager(
+ _segment_job, options.segment_threads,
+ store_results=slo_segments,
+ error_counter=error_counter,
+ connection_maker=create_connection)
+ with segment_manager as segment_queue:
segment = 0
segment_start = 0
while segment_start < full_size:
@@ -1052,18 +954,12 @@ def st_upload(parser, args, print_queue, error_queue):
'log_line': '%s segment %s' % (obj, segment)})
segment += 1
segment_start += segment_size
- finally:
- shutdown_worker_threads(segment_queue, segment_threads)
- if put_errors_from_threads(segment_threads,
- error_queue):
- raise ClientException(
- 'Aborting manifest creation '
- 'because not all segments could be uploaded. '
- '%s/%s' % (container, obj))
+ if error_counter[0]:
+ raise ClientException(
+ 'Aborting manifest creation '
+ 'because not all segments could be uploaded. %s/%s'
+ % (container, obj))
if options.use_slo:
- slo_segments = []
- for thread in segment_threads:
- slo_segments += thread.results
slo_segments.sort(key=lambda d: d['segment_index'])
for seg in slo_segments:
seg_loc = seg['segment_location'].lstrip('/')
@@ -1097,7 +993,10 @@ def st_upload(parser, args, print_queue, error_queue):
container, obj, open(path, 'rb'),
content_length=getsize(path), headers=put_headers)
if old_manifest or old_slo_manifest_paths:
- segment_queue = Queue(10000)
+ segment_manager = thread_manager.queue_manager(
+ _segment_job, options.segment_threads,
+ connection_maker=create_connection)
+ segment_queue = segment_manager.queue
if old_manifest:
scontainer, sprefix = old_manifest.split('/', 1)
scontainer = unquote(scontainer)
@@ -1118,27 +1017,20 @@ def st_upload(parser, args, print_queue, error_queue):
{'delete': True,
'container': scont, 'obj': sobj})
if not segment_queue.empty():
- segment_threads = [
- QueueFunctionThread(
- segment_queue,
- _segment_job, create_connection())
- for _junk in xrange(options.segment_threads)]
- for thread in segment_threads:
- thread.start()
- shutdown_worker_threads(segment_queue, segment_threads)
- put_errors_from_threads(segment_threads, error_queue)
+ with segment_manager:
+ pass
if options.verbose:
if conn.attempts > 1:
- print_queue.put(
- '%s [after %d attempts]' % (obj, conn.attempts))
+ thread_manager.print_msg('%s [after %d attempts]', obj,
+ conn.attempts)
else:
- print_queue.put(obj)
+ thread_manager.print_msg(obj)
except OSError as err:
if err.errno != ENOENT:
raise
- error_queue.put('Local file %s not found' % repr(path))
+ thread_manager.error('Local file %r not found', path)
- def _upload_dir(path):
+ def _upload_dir(path, object_queue):
names = listdir(path)
if not names:
object_queue.put({'path': path, 'dir_marker': True})
@@ -1146,17 +1038,13 @@ def st_upload(parser, args, print_queue, error_queue):
for name in listdir(path):
subpath = join(path, name)
if isdir(subpath):
- _upload_dir(subpath)
+ _upload_dir(subpath, object_queue)
else:
object_queue.put({'path': subpath})
create_connection = lambda: get_conn(options)
- object_threads = [
- QueueFunctionThread(object_queue, _object_job, create_connection())
- for _junk in xrange(options.object_threads)]
- for thread in object_threads:
- thread.start()
conn = create_connection()
+
# Try to create the container, just in case it doesn't exist. If this
# fails, it might just be because the user doesn't have container PUT
# permissions, so we'll ignore any error. If there's really a problem,
@@ -1174,34 +1062,38 @@ def st_upload(parser, args, print_queue, error_queue):
if msg:
msg += ': '
msg += err.http_response_content[:60]
- error_queue.put(
- 'Error trying to create container %r: %s' % (args[0], msg))
+ thread_manager.error(
+ 'Error trying to create container %r: %s', args[0],
+ msg)
except Exception as err:
- error_queue.put(
- 'Error trying to create container %r: %s' % (args[0], err))
-
- try:
- for arg in args[1:]:
- if isdir(arg):
- _upload_dir(arg)
- else:
- object_queue.put({'path': arg})
- except ClientException as err:
- if err.http_status != 404:
- raise
- error_queue.put('Account not found')
- finally:
- shutdown_worker_threads(object_queue, object_threads)
- put_errors_from_threads(object_threads, error_queue)
+ thread_manager.error(
+ 'Error trying to create container %r: %s', args[0],
+ err)
+
+ object_manager = thread_manager.queue_manager(
+ _object_job, options.object_threads,
+ connection_maker=create_connection)
+ with object_manager as object_queue:
+ try:
+ for arg in args[1:]:
+ if isdir(arg):
+ _upload_dir(arg, object_queue)
+ else:
+ object_queue.put({'path': arg})
+ except ClientException as err:
+ if err.http_status != 404:
+ raise
+ thread_manager.error('Account not found')
-def split_headers(options, prefix='', error_queue=None):
+def split_headers(options, prefix='', thread_manager=None):
"""
Splits 'Key: Value' strings and returns them as a dictionary.
:param options: An array of 'Key: Value' strings
:param prefix: String to prepend to all of the keys in the dictionary.
- :param error_queue: Queue for thread safe error reporting.
+ :param thread_manager: MultiThreadingManager for thread safe error
+ reporting.
"""
headers = {}
for item in options:
@@ -1211,8 +1103,8 @@ def split_headers(options, prefix='', error_queue=None):
else:
error_string = "Metadata parameter %s must contain a ':'.\n%s" \
% (item, st_post_help)
- if error_queue:
- error_queue.put(error_string)
+ if thread_manager:
+ thread_manager.error(error_string)
else:
exit(error_string)
return headers
@@ -1391,7 +1283,7 @@ Examples:
help='Specify a CA bundle file to use in verifying a '
'TLS (https) server certificate. '
'Defaults to env[OS_CACERT]')
- default_val = utils.config_true_value(environ.get('SWIFTCLIENT_INSECURE'))
+ default_val = config_true_value(environ.get('SWIFTCLIENT_INSECURE'))
parser.add_option('--insecure',
action="store_true", dest="insecure",
default=default_val,
@@ -1422,38 +1314,16 @@ Examples:
logger = logging.getLogger("swiftclient")
logging.basicConfig(level=logging.DEBUG)
- print_queue = Queue(10000)
-
- def _print(item):
- if isinstance(item, unicode):
- item = item.encode('utf8')
- print item
+ had_error = False
- print_thread = QueueFunctionThread(print_queue, _print)
- print_thread.start()
-
- error_count = 0
- error_queue = Queue(10000)
-
- def _error(item):
- global error_count
- error_count += 1
- if isinstance(item, unicode):
- item = item.encode('utf8')
- print >> stderr, item
+ with MultiThreadingManager() as thread_manager:
+ parser.usage = globals()['st_%s_help' % args[0]]
+ try:
+ globals()['st_%s' % args[0]](parser, argv[1:], thread_manager)
+ except (ClientException, HTTPException, socket.error) as err:
+ thread_manager.error(str(err))
- error_thread = QueueFunctionThread(error_queue, _error)
- error_thread.start()
+ had_error = thread_manager.error_count
- parser.usage = globals()['st_%s_help' % args[0]]
- try:
- globals()['st_%s' % args[0]](parser, argv[1:], print_queue,
- error_queue)
- except (ClientException, HTTPException, socket.error) as err:
- error_queue.put(str(err))
- finally:
- shutdown_worker_threads(print_queue, [print_thread])
- shutdown_worker_threads(error_queue, [error_thread])
-
- if error_count:
+ if had_error:
exit(1)
diff --git a/doc/source/conf.py b/doc/source/conf.py
index b1baab4..356b89e 100644
--- a/doc/source/conf.py
+++ b/doc/source/conf.py
@@ -15,8 +15,6 @@
import sys
import os
-import swiftclient
-
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
@@ -36,6 +34,9 @@ sys.path.insert(0, ROOT)
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.doctest', 'sphinx.ext.todo',
'sphinx.ext.coverage']
+autoclass_content = 'both'
+autodoc_default_flags = ['members', 'undoc-members', 'show-inheritance']
+
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
@@ -50,7 +51,7 @@ master_doc = 'index'
# General information about the project.
project = u'Swiftclient'
-copyright = u'2012 OpenStack, LLC.'
+copyright = u'2013 OpenStack, LLC.'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
diff --git a/doc/source/swiftclient.rst b/doc/source/swiftclient.rst
index bc2bac8..8c5a020 100644
--- a/doc/source/swiftclient.rst
+++ b/doc/source/swiftclient.rst
@@ -4,14 +4,18 @@ swiftclient
==============
.. automodule:: swiftclient
- :members:
- :undoc-members:
- :show-inheritance:
swiftclient.client
==================
.. automodule:: swiftclient.client
- :members:
- :undoc-members:
- :show-inheritance:
+
+swiftclient.exceptions
+======================
+
+.. automodule:: swiftclient.exceptions
+
+swiftclient.multithreading
+==========================
+
+.. automodule:: swiftclient.multithreading
diff --git a/swiftclient/client.py b/swiftclient/client.py
index c9012be..e50e674 100644
--- a/swiftclient/client.py
+++ b/swiftclient/client.py
@@ -28,6 +28,8 @@ from urlparse import urlparse, urlunparse
from httplib import HTTPException, HTTPConnection, HTTPSConnection
from time import sleep
+from swiftclient.exceptions import ClientException, InvalidHeadersException
+
try:
from swiftclient.https_connection import HTTPSConnectionNoSSLComp
except ImportError:
@@ -102,64 +104,6 @@ except ImportError:
from json import loads as json_loads
-class InvalidHeadersException(Exception):
- pass
-
-
-class ClientException(Exception):
-
- def __init__(self, msg, http_scheme='', http_host='', http_port='',
- http_path='', http_query='', http_status=0, http_reason='',
- http_device='', http_response_content=''):
- Exception.__init__(self, msg)
- self.msg = msg
- self.http_scheme = http_scheme
- self.http_host = http_host
- self.http_port = http_port
- self.http_path = http_path
- self.http_query = http_query
- self.http_status = http_status
- self.http_reason = http_reason
- self.http_device = http_device
- self.http_response_content = http_response_content
-
- def __str__(self):
- a = self.msg
- b = ''
- if self.http_scheme:
- b += '%s://' % self.http_scheme
- if self.http_host:
- b += self.http_host
- if self.http_port:
- b += ':%s' % self.http_port
- if self.http_path:
- b += self.http_path
- if self.http_query:
- b += '?%s' % self.http_query
- if self.http_status:
- if b:
- b = '%s %s' % (b, self.http_status)
- else:
- b = str(self.http_status)
- if self.http_reason:
- if b:
- b = '%s %s' % (b, self.http_reason)
- else:
- b = '- %s' % self.http_reason
- if self.http_device:
- if b:
- b = '%s: device %s' % (b, self.http_device)
- else:
- b = 'device %s' % self.http_device
- if self.http_response_content:
- if len(self.http_response_content) <= 60:
- b += ' %s' % self.http_response_content
- else:
- b += ' [first 60 chars of response] %s' \
- % self.http_response_content[:60]
- return b and '%s: %s' % (a, b) or a
-
-
def http_connection(url, proxy=None, ssl_compression=True):
"""
Make an HTTPConnection or HTTPSConnection
diff --git a/swiftclient/exceptions.py b/swiftclient/exceptions.py
new file mode 100644
index 0000000..fe730e5
--- /dev/null
+++ b/swiftclient/exceptions.py
@@ -0,0 +1,72 @@
+# Copyright (c) 2010-2013 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class ClientException(Exception):
+
+ def __init__(self, msg, http_scheme='', http_host='', http_port='',
+ http_path='', http_query='', http_status=0, http_reason='',
+ http_device='', http_response_content=''):
+ Exception.__init__(self, msg)
+ self.msg = msg
+ self.http_scheme = http_scheme
+ self.http_host = http_host
+ self.http_port = http_port
+ self.http_path = http_path
+ self.http_query = http_query
+ self.http_status = http_status
+ self.http_reason = http_reason
+ self.http_device = http_device
+ self.http_response_content = http_response_content
+
+ def __str__(self):
+ a = self.msg
+ b = ''
+ if self.http_scheme:
+ b += '%s://' % self.http_scheme
+ if self.http_host:
+ b += self.http_host
+ if self.http_port:
+ b += ':%s' % self.http_port
+ if self.http_path:
+ b += self.http_path
+ if self.http_query:
+ b += '?%s' % self.http_query
+ if self.http_status:
+ if b:
+ b = '%s %s' % (b, self.http_status)
+ else:
+ b = str(self.http_status)
+ if self.http_reason:
+ if b:
+ b = '%s %s' % (b, self.http_reason)
+ else:
+ b = '- %s' % self.http_reason
+ if self.http_device:
+ if b:
+ b = '%s: device %s' % (b, self.http_device)
+ else:
+ b = 'device %s' % self.http_device
+ if self.http_response_content:
+ if len(self.http_response_content) <= 60:
+ b += ' %s' % self.http_response_content
+ else:
+ b += ' [first 60 chars of response] %s' \
+ % self.http_response_content[:60]
+ return b and '%s: %s' % (a, b) or a
+
+
+class InvalidHeadersException(Exception):
+ pass
diff --git a/swiftclient/multithreading.py b/swiftclient/multithreading.py
new file mode 100644
index 0000000..890a789
--- /dev/null
+++ b/swiftclient/multithreading.py
@@ -0,0 +1,241 @@
+# Copyright (c) 2010-2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import sys
+from time import sleep
+from Queue import Queue
+from threading import Thread
+from traceback import format_exception
+
+from swiftclient.exceptions import ClientException
+
+
+class StopWorkerThreadSignal(object):
+ pass
+
+
+class QueueFunctionThread(Thread):
+ """
+ Calls `func`` for each item in ``queue``; ``func`` is called with a
+ de-queued item as the first arg followed by ``*args`` and ``**kwargs``.
+
+ Any exceptions raised by ``func`` are stored in :attr:`self.exc_infos`.
+
+ If the optional kwarg ``store_results`` is specified, it must be a list and
+ each result of invoking ``func`` will be appended to that list.
+
+ Putting a :class:`StopWorkerThreadSignal` instance into queue will cause
+ this thread to exit.
+ """
+
+ def __init__(self, queue, func, *args, **kwargs):
+ """
+ :param queue: A :class:`Queue` object from which work jobs will be
+ pulled.
+ :param func: A callable which will be invoked with a dequeued item
+ followed by ``*args`` and ``**kwargs``.
+ :param \*args: Optional positional arguments for ``func``.
+ :param \*\*kwargs: Optional kwargs for func. If the kwarg
+ ``store_results`` is specified, its value must be a
+ list, and every result from invoking ``func`` will
+ be appended to the supplied list. The kwarg
+ ``store_results`` will not be passed into ``func``.
+ """
+ Thread.__init__(self)
+ self.queue = queue
+ self.func = func
+ self.args = args
+ self.kwargs = kwargs
+ self.exc_infos = []
+ self.store_results = kwargs.pop('store_results', None)
+
+ def run(self):
+ while True:
+ item = self.queue.get()
+ if isinstance(item, StopWorkerThreadSignal):
+ break
+ try:
+ result = self.func(item, *self.args, **self.kwargs)
+ if self.store_results is not None:
+ self.store_results.append(result)
+ except Exception:
+ self.exc_infos.append(sys.exc_info())
+
+
+class QueueFunctionManager(object):
+ """
+ A context manager to handle the life-cycle of a single :class:`Queue`
+ and a list of associated :class:`QueueFunctionThread` instances.
+
+ This class is not usually instantiated directly. Instead, call the
+ :meth:`MultiThreadingManager.queue_manager` object method,
+ which will return an instance of this class.
+
+ When entering the context, ``thread_count`` :class:`QueueFunctionThread`
+ instances are created and started. The input queue is returned. Inside
+ the context, any work item put into the queue will get worked on by one of
+ the :class:`QueueFunctionThread` instances.
+
+ When the context is exited, all threads are sent a
+ :class:`StopWorkerThreadSignal` instance and then all threads are waited
+ upon. Finally, any exceptions from any of the threads are reported on via
+ the supplied ``thread_manager``'s :meth:`error` method. If an
+ ``error_counter`` list was supplied on instantiation, its first element is
+ incremented once for every exception which occurred.
+ """
+
+ def __init__(self, func, thread_count, thread_manager, thread_args=None,
+ thread_kwargs=None, error_counter=None,
+ connection_maker=None):
+ """
+ :param func: The worker function which will be passed into each
+ :class:`QueueFunctionThread`'s constructor.
+ :param thread_count: The number of worker threads to run.
+ :param thread_manager: An instance of :class:`MultiThreadingManager`.
+ :param thread_args: Optional positional arguments to be passed into
+ each invocation of ``func`` after the de-queued
+ work item.
+ :param thread_kwargs: Optional keyword arguments to be passed into each
+ invocation of ``func``. If a list is supplied as
+ the ``store_results`` keyword argument, it will
+ be filled with every result of invoking ``func``
+ in all threads.
+ :param error_counter: Optional list containing one integer. If
+ supplied, the list's first element will be
+ incremented once for each exception in any
+ thread. This happens only when exiting the
+ context.
+ :param connection_maker: Optional callable. If supplied, this callable
+ will be invoked once per created thread, and
+ the result will be passed into func after the
+ de-queued work item but before ``thread_args``
+ and ``thread_kwargs``. This is used to ensure
+ each thread has its own connection to Swift.
+ """
+ self.func = func
+ self.thread_count = thread_count
+ self.thread_manager = thread_manager
+ self.error_counter = error_counter
+ self.connection_maker = connection_maker
+ self.queue = Queue(10000)
+ self.thread_list = []
+ self.thread_args = thread_args if thread_args else ()
+ self.thread_kwargs = thread_kwargs if thread_kwargs else {}
+
+ def __enter__(self):
+ for _junk in xrange(self.thread_count):
+ if self.connection_maker:
+ thread_args = (self.connection_maker(),) + self.thread_args
+ else:
+ thread_args = self.thread_args
+ qf_thread = QueueFunctionThread(self.queue, self.func,
+ *thread_args, **self.thread_kwargs)
+ qf_thread.start()
+ self.thread_list.append(qf_thread)
+ return self.queue
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ for thread in [t for t in self.thread_list if t.isAlive()]:
+ self.queue.put(StopWorkerThreadSignal())
+
+ while any(map(QueueFunctionThread.is_alive, self.thread_list)):
+ sleep(0.05)
+
+ for thread in self.thread_list:
+ for info in thread.exc_infos:
+ if self.error_counter:
+ self.error_counter[0] += 1
+ if isinstance(info[1], ClientException):
+ self.thread_manager.error(str(info[1]))
+ else:
+ self.thread_manager.error(''.join(format_exception(*info)))
+
+
+class MultiThreadingManager(object):
+ """
+ One object to manage context for multi-threading. This should make
+ bin/swift less error-prone and allow us to test this code.
+
+ This object is a context manager and returns itself into the context. When
+ entering the context, two printing threads are created (see below) and they
+ are waited on and cleaned up when exiting the context.
+
+ A convenience method, :meth:`queue_manager`, is provided to create a
+ :class:`QueueFunctionManager` context manager (a thread-pool with an
+ associated input queue for work items).
+
+ Also, thread-safe printing to two streams is provided. The
+ :meth:`print_msg` method will print to the supplied ``print_stream``
+ (defaults to ``sys.stdout``) and the :meth:`error` method will print to the
+ supplied ``error_stream`` (defaults to ``sys.stderr``). Both of these
+ printing methods will format the given string with any supplied ``*args``
+ (a la printf) and encode the result to utf8 if necessary.
+
+ The attribute :attr:`self.error_count` is incremented once per error
+ message printed, so an application can tell if any worker threads
+ encountered exceptions or otherwise called :meth:`error` on this instance.
+ The swift command-line tool uses this to exit non-zero if any error strings
+ were printed.
+ """
+
+ def __init__(self, print_stream=sys.stdout, error_stream=sys.stderr):
+ """
+ :param print_stream: The stream to which :meth:`print_msg` sends
+ formatted messages, encoded to utf8 if necessary.
+ :param error_stream: The stream to which :meth:`error` sends formatted
+ messages, encoded to utf8 if necessary.
+ """
+ self.print_stream = print_stream
+ self.printer = QueueFunctionManager(self._print, 1, self)
+ self.error_stream = error_stream
+ self.error_printer = QueueFunctionManager(self._print_error, 1, self)
+ self.error_count = 0
+
+ def __enter__(self):
+ self.printer.__enter__()
+ self.error_printer.__enter__()
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.error_printer.__exit__(exc_type, exc_value, traceback)
+ self.printer.__exit__(exc_type, exc_value, traceback)
+
+ def queue_manager(self, func, thread_count, *args, **kwargs):
+ connection_maker = kwargs.pop('connection_maker', None)
+ error_counter = kwargs.pop('error_counter', None)
+ return QueueFunctionManager(func, thread_count, self, thread_args=args,
+ thread_kwargs=kwargs,
+ connection_maker=connection_maker,
+ error_counter=error_counter)
+
+ def print_msg(self, msg, *fmt_args):
+ if fmt_args:
+ msg = msg % fmt_args
+ self.printer.queue.put(msg)
+
+ def error(self, msg, *fmt_args):
+ if fmt_args:
+ msg = msg % fmt_args
+ self.error_printer.queue.put(msg)
+
+ def _print(self, item, stream=None):
+ if stream is None:
+ stream = self.print_stream
+ if isinstance(item, unicode):
+ item = item.encode('utf8')
+ print >>stream, item
+
+ def _print_error(self, item):
+ self.error_count += 1
+ return self._print(item, stream=self.error_stream)
diff --git a/swiftclient/utils.py b/swiftclient/utils.py
index f309d29..33d89a5 100644
--- a/swiftclient/utils.py
+++ b/swiftclient/utils.py
@@ -12,7 +12,6 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
"""Miscellaneous utility functions for use with Swift."""
TRUE_VALUES = set(('true', '1', 'yes', 'on', 't', 'y'))
diff --git a/tests/test_multithreading.py b/tests/test_multithreading.py
new file mode 100644
index 0000000..5a28582
--- /dev/null
+++ b/tests/test_multithreading.py
@@ -0,0 +1,334 @@
+# Copyright (c) 2010-2013 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import time
+import mock
+import testtools
+import threading
+from cStringIO import StringIO
+from Queue import Queue, Empty
+
+from swiftclient import multithreading as mt
+from swiftclient.exceptions import ClientException
+
+
+class ThreadTestCase(testtools.TestCase):
+ def setUp(self):
+ super(ThreadTestCase, self).setUp()
+ self.got_args_kwargs = Queue()
+ self.starting_thread_count = threading.active_count()
+
+ def _func(self, q_item, *args, **kwargs):
+ self.got_items.put(q_item)
+ self.got_args_kwargs.put((args, kwargs))
+
+ if q_item == 'go boom':
+ raise Exception('I went boom!')
+ if q_item == 'c boom':
+ raise ClientException(
+ 'Client Boom', http_scheme='http', http_host='192.168.22.1',
+ http_port=80, http_path='/booze', http_status=404,
+ http_reason='to much', http_response_content='no sir!')
+
+ return 'best result EVAR!'
+
+ def assertQueueContains(self, queue, expected_contents):
+ got_contents = []
+ try:
+ while True:
+ got_contents.append(queue.get(timeout=0.1))
+ except Empty:
+ pass
+ if isinstance(expected_contents, set):
+ got_contents = set(got_contents)
+ self.assertEqual(expected_contents, got_contents)
+
+
+class TestQueueFunctionThread(ThreadTestCase):
+ def setUp(self):
+ super(TestQueueFunctionThread, self).setUp()
+
+ self.input_queue = Queue()
+ self.got_items = Queue()
+ self.stored_results = []
+
+ self.qft = mt.QueueFunctionThread(self.input_queue, self._func,
+ 'one_arg', 'two_arg',
+ red_fish='blue_arg',
+ store_results=self.stored_results)
+ self.qft.start()
+
+ def tearDown(self):
+ if self.qft.is_alive():
+ self.finish_up_thread()
+
+ super(TestQueueFunctionThread, self).tearDown()
+
+ def finish_up_thread(self):
+ self.input_queue.put(mt.StopWorkerThreadSignal())
+ while self.qft.is_alive():
+ time.sleep(0.05)
+
+ def test_plumbing_and_store_results(self):
+ self.input_queue.put('abc')
+ self.input_queue.put(123)
+ self.finish_up_thread()
+
+ self.assertQueueContains(self.got_items, ['abc', 123])
+ self.assertQueueContains(self.got_args_kwargs, [
+ (('one_arg', 'two_arg'), {'red_fish': 'blue_arg'}),
+ (('one_arg', 'two_arg'), {'red_fish': 'blue_arg'})])
+ self.assertEqual(self.stored_results,
+ ['best result EVAR!', 'best result EVAR!'])
+
+ def test_exception_handling(self):
+ self.input_queue.put('go boom')
+ self.input_queue.put('ok')
+ self.input_queue.put('go boom')
+ self.finish_up_thread()
+
+ self.assertQueueContains(self.got_items,
+ ['go boom', 'ok', 'go boom'])
+ self.assertEqual(len(self.qft.exc_infos), 2)
+ self.assertEqual(Exception, self.qft.exc_infos[0][0])
+ self.assertEqual(Exception, self.qft.exc_infos[1][0])
+ self.assertEqual(('I went boom!',), self.qft.exc_infos[0][1].args)
+ self.assertEqual(('I went boom!',), self.qft.exc_infos[1][1].args)
+
+
+class TestQueueFunctionManager(ThreadTestCase):
+ def setUp(self):
+ super(TestQueueFunctionManager, self).setUp()
+ self.thread_manager = mock.create_autospec(
+ mt.MultiThreadingManager, spec_set=True, instance=True)
+ self.thread_count = 4
+ self.error_counter = [0]
+ self.got_items = Queue()
+ self.stored_results = []
+ self.qfq = mt.QueueFunctionManager(
+ self._func, self.thread_count, self.thread_manager,
+ thread_args=('1arg', '2arg'),
+ thread_kwargs={'a': 'b', 'store_results': self.stored_results},
+ error_counter=self.error_counter,
+ connection_maker=self.connection_maker)
+
+ def connection_maker(self):
+ return 'yup, I made a connection'
+
+ def test_context_manager_without_error_counter(self):
+ self.qfq = mt.QueueFunctionManager(
+ self._func, self.thread_count, self.thread_manager,
+ thread_args=('1arg', '2arg'),
+ thread_kwargs={'a': 'b', 'store_results': self.stored_results},
+ connection_maker=self.connection_maker)
+
+ with self.qfq as input_queue:
+ self.assertEqual(self.starting_thread_count + self.thread_count,
+ threading.active_count())
+ input_queue.put('go boom')
+
+ self.assertEqual(self.starting_thread_count, threading.active_count())
+ error_strs = map(str, self.thread_manager.error.call_args_list)
+ self.assertEqual(1, len(error_strs))
+ self.assertTrue('Exception: I went boom!' in error_strs[0])
+
+ def test_context_manager_without_conn_maker_or_error_counter(self):
+ self.qfq = mt.QueueFunctionManager(
+ self._func, self.thread_count, self.thread_manager,
+ thread_args=('1arg', '2arg'), thread_kwargs={'a': 'b'})
+
+ with self.qfq as input_queue:
+ self.assertEqual(self.starting_thread_count + self.thread_count,
+ threading.active_count())
+ for i in xrange(20):
+ input_queue.put('slap%d' % i)
+
+ self.assertEqual(self.starting_thread_count, threading.active_count())
+ self.assertEqual([], self.thread_manager.error.call_args_list)
+ self.assertEqual(0, self.error_counter[0])
+ self.assertQueueContains(self.got_items,
+ set(['slap%d' % i for i in xrange(20)]))
+ self.assertQueueContains(
+ self.got_args_kwargs,
+ [(('1arg', '2arg'), {'a': 'b'})] * 20)
+ self.assertEqual(self.stored_results, [])
+
+ def test_context_manager_with_exceptions(self):
+ with self.qfq as input_queue:
+ self.assertEqual(self.starting_thread_count + self.thread_count,
+ threading.active_count())
+ for i in xrange(20):
+ input_queue.put('item%d' % i if i % 2 == 0 else 'go boom')
+
+ self.assertEqual(self.starting_thread_count, threading.active_count())
+ error_strs = map(str, self.thread_manager.error.call_args_list)
+ self.assertEqual(10, len(error_strs))
+ self.assertTrue(all(['Exception: I went boom!' in s for s in
+ error_strs]))
+ self.assertEqual(10, self.error_counter[0])
+ expected_items = set(['go boom'] + ['item%d' % i for i in xrange(20)
+ if i % 2 == 0])
+ self.assertQueueContains(self.got_items, expected_items)
+ self.assertQueueContains(
+ self.got_args_kwargs,
+ [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20)
+ self.assertEqual(self.stored_results, ['best result EVAR!'] * 10)
+
+ def test_context_manager_with_client_exceptions(self):
+ with self.qfq as input_queue:
+ self.assertEqual(self.starting_thread_count + self.thread_count,
+ threading.active_count())
+ for i in xrange(20):
+ input_queue.put('item%d' % i if i % 2 == 0 else 'c boom')
+
+ self.assertEqual(self.starting_thread_count, threading.active_count())
+ error_strs = map(str, self.thread_manager.error.call_args_list)
+ self.assertEqual(10, len(error_strs))
+ stringification = 'Client Boom: ' \
+ 'http://192.168.22.1:80/booze 404 to much no sir!'
+ self.assertTrue(all([stringification in s for s in error_strs]))
+ self.assertEqual(10, self.error_counter[0])
+ expected_items = set(['c boom'] + ['item%d' % i for i in xrange(20)
+ if i % 2 == 0])
+ self.assertQueueContains(self.got_items, expected_items)
+ self.assertQueueContains(
+ self.got_args_kwargs,
+ [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20)
+ self.assertEqual(self.stored_results, ['best result EVAR!'] * 10)
+
+ def test_context_manager_with_connection_maker(self):
+ with self.qfq as input_queue:
+ self.assertEqual(self.starting_thread_count + self.thread_count,
+ threading.active_count())
+ for i in xrange(20):
+ input_queue.put('item%d' % i)
+
+ self.assertEqual(self.starting_thread_count, threading.active_count())
+ self.assertEqual([], self.thread_manager.error.call_args_list)
+ self.assertEqual(0, self.error_counter[0])
+ self.assertQueueContains(self.got_items,
+ set(['item%d' % i for i in xrange(20)]))
+ self.assertQueueContains(
+ self.got_args_kwargs,
+ [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20)
+ self.assertEqual(self.stored_results, ['best result EVAR!'] * 20)
+
+
+class TestMultiThreadingManager(ThreadTestCase):
+
+ @mock.patch('swiftclient.multithreading.QueueFunctionManager')
+ def test_instantiation(self, mock_qfq):
+ thread_manager = mt.MultiThreadingManager()
+
+ self.assertEqual([
+ mock.call(thread_manager._print, 1, thread_manager),
+ mock.call(thread_manager._print_error, 1, thread_manager),
+ ], mock_qfq.call_args_list)
+
+ # These contexts don't get entered into until the
+ # MultiThreadingManager's context is entered.
+ self.assertEqual([], thread_manager.printer.__enter__.call_args_list)
+ self.assertEqual([],
+ thread_manager.error_printer.__enter__.call_args_list)
+
+ # Test default values for the streams.
+ self.assertEqual(sys.stdout, thread_manager.print_stream)
+ self.assertEqual(sys.stderr, thread_manager.error_stream)
+
+ @mock.patch('swiftclient.multithreading.QueueFunctionManager')
+ def test_queue_manager_no_args(self, mock_qfq):
+ thread_manager = mt.MultiThreadingManager()
+
+ mock_qfq.reset_mock()
+ mock_qfq.return_value = 'slap happy!'
+
+ self.assertEqual(
+ 'slap happy!',
+ thread_manager.queue_manager(self._func, 88))
+
+ self.assertEqual([
+ mock.call(self._func, 88, thread_manager, thread_args=(),
+ thread_kwargs={}, connection_maker=None,
+ error_counter=None)
+ ], mock_qfq.call_args_list)
+
+ @mock.patch('swiftclient.multithreading.QueueFunctionManager')
+ def test_queue_manager_with_args(self, mock_qfq):
+ thread_manager = mt.MultiThreadingManager()
+
+ mock_qfq.reset_mock()
+ mock_qfq.return_value = 'do run run'
+
+ self.assertEqual(
+ 'do run run',
+ thread_manager.queue_manager(self._func, 88, 'fun', times='are',
+ connection_maker='abc', to='be had',
+ error_counter='def'))
+
+ self.assertEqual([
+ mock.call(self._func, 88, thread_manager, thread_args=('fun',),
+ thread_kwargs={'times': 'are', 'to': 'be had'},
+ connection_maker='abc', error_counter='def')
+ ], mock_qfq.call_args_list)
+
+ def test_printers(self):
+ out_stream = StringIO()
+ err_stream = StringIO()
+
+ with mt.MultiThreadingManager(
+ print_stream=out_stream,
+ error_stream=err_stream) as thread_manager:
+
+ # Sanity-checking these gives power to the previous test which
+ # looked at the default values of thread_manager.print/error_stream
+ self.assertEqual(out_stream, thread_manager.print_stream)
+ self.assertEqual(err_stream, thread_manager.error_stream)
+
+ self.assertEqual(self.starting_thread_count + 2,
+ threading.active_count())
+
+ thread_manager.print_msg('one-argument')
+ thread_manager.print_msg('one %s, %d fish', 'fish', 88)
+ thread_manager.error('I have %d problems, but a %s is not one',
+ 99, u'\u062A\u062A')
+ thread_manager.print_msg('some\n%s\nover the %r', 'where',
+ u'\u062A\u062A')
+ thread_manager.error('one-error-argument')
+ thread_manager.error('Sometimes\n%.1f%% just\ndoes not\nwork!',
+ 3.14159)
+
+ self.assertEqual(self.starting_thread_count, threading.active_count())
+
+ out_stream.seek(0)
+ self.assertEqual([
+ 'one-argument\n',
+ 'one fish, 88 fish\n',
+ 'some\n', 'where\n', "over the u'\\u062a\\u062a'\n",
+ ], list(out_stream.readlines()))
+
+ err_stream.seek(0)
+ self.assertEqual([
+ u'I have 99 problems, but a \u062A\u062A is not one\n'.encode(
+ 'utf8'),
+ 'one-error-argument\n',
+ 'Sometimes\n', '3.1% just\n', 'does not\n', 'work!\n',
+ ], list(err_stream.readlines()))
+
+ self.assertEqual(3, thread_manager.error_count)
+
+
+if __name__ == '__main__':
+ testtools.main()