diff options
author | Darrell Bishop <darrell@swiftstack.com> | 2013-06-26 22:47:49 -0700 |
---|---|---|
committer | Darrell Bishop <darrell@swiftstack.com> | 2013-07-28 22:08:17 -0700 |
commit | 9198e95468b3005730c931da1701f34b1a9ce2d9 (patch) | |
tree | d6034f1a489b67273c19dc43221b142bd0c7bb65 /bin/swift | |
parent | 5d9c6f845cc98da720fea7e2343fdbb0db9a42a5 (diff) | |
download | python-swiftclient-9198e95468b3005730c931da1701f34b1a9ce2d9.tar.gz |
Move multi-threading code to a library.
This patch extracts the multi-threading code from bin/swift into
swiftclient/multithreading and adds tests. In particular, this new way
of doing it (with context managers) will prevent non-daemonic threads
from wedging the process when unexpected exceptions happen.
I enabled reporting of which lines, specifically, are not covered by
unit tests (added -m option to "coverage report" in .unittests).
This patch includes a drive-by fix for uploading a segmented file with
--use-slo when that object already exists. A key of "name" was used
instead of "path", raising KeyError.
There's also another drive-by fix for uploading segmented objects with
--use-slo. Commit 874e0e4427b80e1b15b74a1557b73ba9d61443ca regressed
this by removing the capturing of thread-worker results in
QueueFunctionThread.run(). This patch restores that functionality and
the feature (uploading SLO objects).
Change-Id: I0b4f677e4a734e83d1a25088d9a74f7d46384e53
Diffstat (limited to 'bin/swift')
-rwxr-xr-x | bin/swift | 658 |
1 files changed, 264 insertions, 394 deletions
@@ -22,12 +22,9 @@ from hashlib import md5 from optparse import OptionParser, SUPPRESS_HELP from os import environ, listdir, makedirs, utime, _exit as os_exit from os.path import basename, dirname, getmtime, getsize, isdir, join -from Queue import Queue from random import shuffle -from sys import argv, exc_info, exit, stderr, stdout -from threading import Thread +from sys import argv, exit, stderr, stdout from time import sleep, time, gmtime, strftime -from traceback import format_exception from urllib import quote, unquote try: @@ -35,7 +32,10 @@ try: except ImportError: import json -from swiftclient import Connection, ClientException, HTTPException, utils +from swiftclient import Connection, HTTPException +from swiftclient.utils import config_true_value +from swiftclient.multithreading import MultiThreadingManager +from swiftclient.exceptions import ClientException from swiftclient.version import version_info @@ -63,75 +63,6 @@ def mkdirs(path): raise -def put_errors_from_threads(threads, error_queue): - """ - Places any errors from the threads into error_queue. - :param threads: A list of QueueFunctionThread instances. - :param error_queue: A queue to put error strings into. - :returns: True if any errors were found. - """ - was_error = False - for thread in threads: - for info in thread.exc_infos: - was_error = True - if isinstance(info[1], ClientException): - error_queue.put(str(info[1])) - else: - error_queue.put(''.join(format_exception(*info))) - return was_error - - -class StopWorkerThreadSignal(object): - pass - - -class QueueFunctionThread(Thread): - - def __init__(self, queue, func, *args, **kwargs): - """ - Calls func for each item in queue; func is called with a queued - item as the first arg followed by *args and **kwargs. Use the - PriorityQueue for sending quit signal when Ctrl-C is pressed. - """ - Thread.__init__(self) - self.queue = queue - self.func = func - self.args = args - self.kwargs = kwargs - self.exc_infos = [] - self.results = [] - self.store_results = kwargs.pop('store_results', False) - - def run(self): - while True: - try: - item = self.queue.get() - if isinstance(item, StopWorkerThreadSignal): - break - except: - # This catch is important and it may occur when ctrl-C is - # pressed, in this case simply quit the thread - break - else: - try: - self.func(item, *self.args, **self.kwargs) - except Exception: - self.exc_infos.append(exc_info()) - - -def shutdown_worker_threads(queue, thread_list): - """ - Takes a job queue and a list of associated QueueFunctionThread objects, - puts a StopWorkerThreadSignal object into the queue, and waits for the - queue to flush. - """ - for thread in [t for t in thread_list if t.isAlive()]: - queue.put(StopWorkerThreadSignal()) - - while any(map(QueueFunctionThread.is_alive, thread_list)): - sleep(0.05) - - def immediate_exit(signum, frame): stderr.write(" Aborted\n") os_exit(2) @@ -145,7 +76,7 @@ delete [options] --all OR delete container [options] [object] [object] ... --leave-segments option.'''.strip('\n') -def st_delete(parser, args, print_queue, error_queue): +def st_delete(parser, args, thread_manager): parser.add_option( '-a', '--all', action='store_true', dest='yes_all', default=False, help='Indicates that you really want to delete ' @@ -164,20 +95,19 @@ def st_delete(parser, args, print_queue, error_queue): (options, args) = parse_args(parser, args) args = args[1:] if (not args and not options.yes_all) or (args and options.yes_all): - error_queue.put('Usage: %s [options] %s' % - (basename(argv[0]), st_delete_help)) + thread_manager.error('Usage: %s [options] %s', + basename(argv[0]), st_delete_help) return def _delete_segment((container, obj), conn): conn.delete_object(container, obj) if options.verbose: if conn.attempts > 2: - print_queue.put('%s/%s [after %d attempts]' % - (container, obj, conn.attempts)) + thread_manager.print_msg( + '%s/%s [after %d attempts]', container, + obj, conn.attempts) else: - print_queue.put('%s/%s' % (container, obj)) - - object_queue = Queue(10000) + thread_manager.print_msg('%s/%s', container, obj) def _delete_object((container, obj), conn): try: @@ -187,7 +117,7 @@ def st_delete(parser, args, print_queue, error_queue): try: headers = conn.head_object(container, obj) old_manifest = headers.get('x-object-manifest') - if utils.config_true_value( + if config_true_value( headers.get('x-static-large-object')): query_string = 'multipart-manifest=delete' except ClientException as err: @@ -195,7 +125,10 @@ def st_delete(parser, args, print_queue, error_queue): raise conn.delete_object(container, obj, query_string=query_string) if old_manifest: - segment_queue = Queue(10000) + segment_manager = thread_manager.queue_manager( + _delete_segment, options.object_threads, + connection_maker=create_connection) + segment_queue = segment_manager.queue scontainer, sprefix = old_manifest.split('/', 1) scontainer = unquote(scontainer) sprefix = unquote(sprefix).rstrip('/') + '/' @@ -203,32 +136,23 @@ def st_delete(parser, args, print_queue, error_queue): prefix=sprefix)[1]: segment_queue.put((scontainer, delobj['name'])) if not segment_queue.empty(): - segment_threads = [QueueFunctionThread( - segment_queue, - _delete_segment, create_connection()) for _junk in - xrange(options.object_threads)] - for thread in segment_threads: - thread.start() - shutdown_worker_threads(segment_queue, segment_threads) - put_errors_from_threads(segment_threads, error_queue) + with segment_manager: + pass if options.verbose: path = options.yes_all and join(container, obj) or obj if path[:1] in ('/', '\\'): path = path[1:] if conn.attempts > 1: - print_queue.put('%s [after %d attempts]' % - (path, conn.attempts)) + thread_manager.print_msg('%s [after %d attempts]', path, + conn.attempts) else: - print_queue.put(path) + thread_manager.print_msg(path) except ClientException as err: if err.http_status != 404: raise - error_queue.put('Object %s not found' % - repr('%s/%s' % (container, obj))) + thread_manager.error("Object '%s/%s' not found", container, obj) - container_queue = Queue(10000) - - def _delete_container(container, conn): + def _delete_container(container, conn, object_queue): try: marker = '' while True: @@ -256,54 +180,43 @@ def st_delete(parser, args, print_queue, error_queue): except ClientException as err: if err.http_status != 404: raise - error_queue.put('Container %s not found' % repr(container)) + thread_manager.error('Container %r not found', container) create_connection = lambda: get_conn(options) - object_threads = \ - [QueueFunctionThread(object_queue, _delete_object, create_connection()) - for _junk in xrange(options.object_threads)] - for thread in object_threads: - thread.start() - container_threads = \ - [QueueFunctionThread(container_queue, _delete_container, - create_connection()) - for _junk in xrange(options.container_threads)] - for thread in container_threads: - thread.start() - - try: - if not args: - conn = create_connection() - try: - marker = '' - while True: - containers = [ - c['name'] for c in conn.get_account(marker=marker)[1]] - if not containers: - break - for container in containers: - container_queue.put(container) - marker = containers[-1] - except ClientException as err: - if err.http_status != 404: - raise - error_queue.put('Account not found') - elif len(args) == 1: - if '/' in args[0]: - print >> stderr, 'WARNING: / in container name; you might ' \ - 'have meant %r instead of %r.' % ( + obj_manager = thread_manager.queue_manager( + _delete_object, options.object_threads, + connection_maker=create_connection) + with obj_manager as object_queue: + cont_manager = thread_manager.queue_manager( + _delete_container, options.container_threads, object_queue, + connection_maker=create_connection) + with cont_manager as container_queue: + if not args: + conn = create_connection() + try: + marker = '' + while True: + containers = [ + c['name'] + for c in conn.get_account(marker=marker)[1]] + if not containers: + break + for container in containers: + container_queue.put(container) + marker = containers[-1] + except ClientException as err: + if err.http_status != 404: + raise + thread_manager.error('Account not found') + elif len(args) == 1: + if '/' in args[0]: + print >> stderr, 'WARNING: / in container name; you ' + 'might have meant %r instead of %r.' % ( args[0].replace('/', ' ', 1), args[0]) - conn = create_connection() - _delete_container(args[0], conn) - else: - for obj in args[1:]: - object_queue.put((args[0], obj)) - finally: - shutdown_worker_threads(container_queue, container_threads) - put_errors_from_threads(container_threads, error_queue) - - shutdown_worker_threads(object_queue, object_threads) - put_errors_from_threads(object_threads, error_queue) + container_queue.put(args[0]) + else: + for obj in args[1:]: + object_queue.put((args[0], obj)) st_download_help = ''' @@ -318,7 +231,7 @@ download --all [options] OR download container [options] [object] [object] ... just redirect to stdout.'''.strip('\n') -def st_download(parser, args, print_queue, error_queue): +def st_download(parser, args, thread_manager): parser.add_option( '-a', '--all', action='store_true', dest='yes_all', default=False, help='Indicates that you really want to download ' @@ -350,12 +263,10 @@ def st_download(parser, args, print_queue, error_queue): if options.out_file and len(args) != 2: exit('-o option only allowed for single file downloads') if (not args and not options.yes_all) or (args and options.yes_all): - error_queue.put('Usage: %s [options] %s' % - (basename(argv[0]), st_download_help)) + thread_manager.error('Usage: %s [options] %s', basename(argv[0]), + st_download_help) return - object_queue = Queue(10000) - def _download_object(queue_arg, conn): if len(queue_arg) == 2: container, obj = queue_arg @@ -415,11 +326,12 @@ def st_download(parser, args, print_queue, error_queue): if not options.no_download: fp.close() if md5sum and md5sum.hexdigest() != etag: - error_queue.put('%s: md5sum != etag, %s != %s' % - (path, md5sum.hexdigest(), etag)) + thread_manager.error('%s: md5sum != etag, %s != %s', + path, md5sum.hexdigest(), etag) if content_length is not None and read_length != content_length: - error_queue.put('%s: read_length != content_length, %d != %d' % - (path, read_length, content_length)) + thread_manager.error( + '%s: read_length != content_length, %d != %d', + path, read_length, content_length) if 'x-object-meta-mtime' in headers and not options.out_file \ and not options.no_download: @@ -431,19 +343,23 @@ def st_download(parser, args, print_queue, error_queue): header_receipt - start_time, finish_time - start_time, float(read_length) / (finish_time - start_time) / 1000000) if conn.attempts > 1: - print_queue.put('%s [%s after %d attempts]' % - (path, time_str, conn.attempts)) + thread_manager.print_msg('%s [%s after %d attempts]', path, + time_str, conn.attempts) else: - print_queue.put('%s [%s]' % (path, time_str)) + thread_manager.print_msg('%s [%s]', path, time_str) except ClientException as err: if err.http_status != 404: raise - error_queue.put('Object %s not found' % - repr('%s/%s' % (container, obj))) + thread_manager.error("Object '%s/%s' not found", container, obj) - container_queue = Queue(10000) - - def _download_container(container, conn, prefix=None): + def _download_container(queue_arg, conn): + if len(queue_arg) == 2: + container, object_queue = queue_arg + prefix = None + elif len(queue_arg) == 3: + container, object_queue, prefix = queue_arg + else: + raise Exception("Invalid queue_arg length of %s" % len(queue_arg)) try: marker = options.marker while True: @@ -460,64 +376,49 @@ def st_download(parser, args, print_queue, error_queue): except ClientException as err: if err.http_status != 404: raise - error_queue.put('Container %s not found' % repr(container)) + thread_manager.error('Container %r not found', container) create_connection = lambda: get_conn(options) - object_threads = [QueueFunctionThread( - object_queue, _download_object, - create_connection()) for _junk in xrange(options.object_threads)] - for thread in object_threads: - thread.start() - container_threads = [QueueFunctionThread( - container_queue, - _download_container, create_connection()) - for _junk in xrange(options.container_threads)] - for thread in container_threads: - thread.start() - - # We musn't let the main thread die with an exception while non-daemonic - # threads exist or the process with hang and ignore Ctrl-C. So we catch - # anything and tidy up the threads in a finally block. - try: - if not args: - # --all case - conn = create_connection() - try: - marker = options.marker - while True: - containers = [ - c['name'] for c in conn.get_account( - marker=marker, prefix=options.prefix)[1]] - if not containers: - break - marker = containers[-1] - shuffle(containers) - for container in containers: - container_queue.put(container) - except ClientException as err: - if err.http_status != 404: - raise - error_queue.put('Account not found') - elif len(args) == 1: - if '/' in args[0]: - print >> stderr, ('WARNING: / in container name; you might ' - 'have meant %r instead of %r.' % ( + obj_manager = thread_manager.queue_manager( + _download_object, options.object_threads, + connection_maker=create_connection) + with obj_manager as object_queue: + cont_manager = thread_manager.queue_manager( + _download_container, options.container_threads, + connection_maker=create_connection) + with cont_manager as container_queue: + if not args: + # --all case + conn = create_connection() + try: + marker = options.marker + while True: + containers = [ + c['name'] for c in conn.get_account( + marker=marker, prefix=options.prefix)[1]] + if not containers: + break + marker = containers[-1] + shuffle(containers) + for container in containers: + container_queue.put((container, object_queue)) + except ClientException as err: + if err.http_status != 404: + raise + thread_manager.error('Account not found') + elif len(args) == 1: + if '/' in args[0]: + print >> stderr, ('WARNING: / in container name; you ' + 'might have meant %r instead of %r.' % ( args[0].replace('/', ' ', 1), args[0])) - _download_container(args[0], create_connection(), - options.prefix) - else: - if len(args) == 2: - obj = args[1] - object_queue.put((args[0], obj, options.out_file)) + container_queue.put((args[0], object_queue, options.prefix)) else: - for obj in args[1:]: - object_queue.put((args[0], obj)) - finally: - shutdown_worker_threads(container_queue, container_threads) - put_errors_from_threads(container_threads, error_queue) - - shutdown_worker_threads(object_queue, object_threads) - put_errors_from_threads(object_threads, error_queue) + if len(args) == 2: + obj = args[1] + object_queue.put((args[0], obj, options.out_file)) + else: + for obj in args[1:]: + object_queue.put((args[0], obj)) def prt_bytes(bytes, human_flag): @@ -560,7 +461,7 @@ list [options] [container] '''.strip('\n') -def st_list(parser, args, print_queue, error_queue): +def st_list(parser, args, thread_manager): parser.add_option( '-l', '--long', dest='long', help='Long listing ' 'similar to ls -l command', action='store_true', default=False) @@ -583,8 +484,8 @@ def st_list(parser, args, print_queue, error_queue): if options.delimiter and not args: exit('-d option only allowed for container listings') if len(args) > 1 or len(args) == 1 and args[0].find('/') >= 0: - error_queue.put('Usage: %s [options] %s' % - (basename(argv[0]), st_list_help)) + thread_manager.error('Usage: %s [options] %s', basename(argv[0]), + st_list_help) return conn = get_conn(options) @@ -605,12 +506,13 @@ def st_list(parser, args, print_queue, error_queue): item_name = item.get('name') if not options.long and not options.human: - print_queue.put(item.get('name', item.get('subdir'))) + thread_manager.print_msg( + item.get('name', item.get('subdir'))) else: item_bytes = item.get('bytes') total_bytes += item_bytes if len(args) == 0: # listing containers - bytes = prt_bytes(item_bytes, options.human) + byte_str = prt_bytes(item_bytes, options.human) count = item.get('count') total_count += count try: @@ -620,41 +522,42 @@ def st_list(parser, args, print_queue, error_queue): except ClientException: datestamp = '????-??-?? ??:??:??' if not options.totals: - print_queue.put("%5s %s %s %s" % - (count, bytes, datestamp, - item_name)) + thread_manager.print_msg("%5s %s %s %s", count, + byte_str, datestamp, + item_name) else: # list container contents subdir = item.get('subdir') if subdir is None: - bytes = prt_bytes(item_bytes, options.human) + byte_str = prt_bytes(item_bytes, options.human) date, xtime = item.get('last_modified').split('T') xtime = xtime.split('.')[0] else: - bytes = prt_bytes(0, options.human) + byte_str = prt_bytes(0, options.human) date = xtime = '' item_name = subdir if not options.totals: - print_queue.put("%s %10s %8s %s" % - (bytes, date, xtime, item_name)) + thread_manager.print_msg("%s %10s %8s %s", + byte_str, date, xtime, + item_name) marker = items[-1].get('name', items[-1].get('subdir')) # report totals if options.long or options.human: if len(args) == 0: - print_queue.put("%5s %s" % (prt_bytes(total_count, True), - prt_bytes(total_bytes, - options.human))) + thread_manager.print_msg( + "%5s %s", prt_bytes(total_count, True), + prt_bytes(total_bytes, options.human)) else: - print_queue.put("%s" % (prt_bytes(total_bytes, options.human))) + thread_manager.print_msg(prt_bytes(total_bytes, options.human)) except ClientException as err: if err.http_status != 404: raise if not args: - error_queue.put('Account not found') + thread_manager.error('Account not found') else: - error_queue.put('Container %s not found' % repr(args[0])) + thread_manager.error('Container %r not found', args[0]) st_stat_help = ''' stat [container] [object] @@ -663,7 +566,7 @@ stat [container] [object] like 'list --lh' noting number of objs a multiple of 1024'''.strip('\n') -def st_stat(parser, args, print_queue, error_queue): +def st_stat(parser, args, thread_manager): parser.add_option( '--lh', dest='human', help="report totals like 'list --lh'", action='store_true', default=False) @@ -674,36 +577,36 @@ def st_stat(parser, args, print_queue, error_queue): try: headers = conn.head_account() if options.verbose > 1: - print_queue.put(''' + thread_manager.print_msg(''' StorageURL: %s Auth Token: %s -'''.strip('\n') % (conn.url, conn.token)) +'''.strip('\n'), conn.url, conn.token) container_count = int(headers.get('x-account-container-count', 0)) object_count = prt_bytes(headers.get('x-account-object-count', 0), options.human).lstrip() bytes_used = prt_bytes(headers.get('x-account-bytes-used', 0), options.human).lstrip() - print_queue.put(''' + thread_manager.print_msg(''' Account: %s Containers: %d Objects: %s - Bytes: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], container_count, - object_count, bytes_used)) + Bytes: %s'''.strip('\n'), conn.url.rsplit('/', 1)[-1], container_count, + object_count, bytes_used) for key, value in headers.items(): if key.startswith('x-account-meta-'): - print_queue.put( - '%10s: %s' % ('Meta %s' % - key[len('x-account-meta-'):].title(), value)) + thread_manager.print_msg( + '%10s: %s', + 'Meta %s' % key[len('x-account-meta-'):].title(), + value) for key, value in headers.items(): if not key.startswith('x-account-meta-') and key not in ( 'content-length', 'date', 'x-account-container-count', 'x-account-object-count', 'x-account-bytes-used'): - print_queue.put( - '%10s: %s' % (key.title(), value)) + thread_manager.print_msg('%10s: %s', key.title(), value) except ClientException as err: if err.http_status != 404: raise - error_queue.put('Account not found') + thread_manager.error('Account not found') elif len(args) == 1: if '/' in args[0]: print >> stderr, 'WARNING: / in container name; you might have ' \ @@ -716,7 +619,7 @@ Containers: %d options.human).lstrip() bytes_used = prt_bytes(headers.get('x-container-bytes-used', 0), options.human).lstrip() - print_queue.put(''' + thread_manager.print_msg(''' Account: %s Container: %s Objects: %s @@ -724,69 +627,68 @@ Container: %s Read ACL: %s Write ACL: %s Sync To: %s - Sync Key: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0], - object_count, bytes_used, - headers.get('x-container-read', ''), - headers.get('x-container-write', ''), - headers.get('x-container-sync-to', ''), - headers.get('x-container-sync-key', ''))) + Sync Key: %s'''.strip('\n'), conn.url.rsplit('/', 1)[-1], args[0], + object_count, bytes_used, + headers.get('x-container-read', ''), + headers.get('x-container-write', ''), + headers.get('x-container-sync-to', ''), + headers.get('x-container-sync-key', '')) for key, value in headers.items(): if key.startswith('x-container-meta-'): - print_queue.put( - '%9s: %s' % ('Meta %s' % - key[len('x-container-meta-'):].title(), value)) + thread_manager.print_msg( + '%9s: %s', + 'Meta %s' % key[len('x-container-meta-'):].title(), + value) for key, value in headers.items(): if not key.startswith('x-container-meta-') and key not in ( 'content-length', 'date', 'x-container-object-count', 'x-container-bytes-used', 'x-container-read', 'x-container-write', 'x-container-sync-to', 'x-container-sync-key'): - print_queue.put( - '%9s: %s' % (key.title(), value)) + thread_manager.print_msg('%9s: %s', key.title(), value) except ClientException as err: if err.http_status != 404: raise - error_queue.put('Container %s not found' % repr(args[0])) + thread_manager.error('Container %r not found', args[0]) elif len(args) == 2: try: headers = conn.head_object(args[0], args[1]) - print_queue.put(''' + thread_manager.print_msg(''' Account: %s Container: %s Object: %s - Content Type: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0], - args[1], headers.get('content-type'))) + Content Type: %s'''.strip('\n'), conn.url.rsplit('/', 1)[-1], args[0], + args[1], headers.get('content-type')) if 'content-length' in headers: - print_queue.put('Content Length: %s' % - prt_bytes(headers['content-length'], - options.human).lstrip()) + thread_manager.print_msg('Content Length: %s', + prt_bytes(headers['content-length'], + options.human).lstrip()) if 'last-modified' in headers: - print_queue.put(' Last Modified: %s' % - headers['last-modified']) + thread_manager.print_msg(' Last Modified: %s', + headers['last-modified']) if 'etag' in headers: - print_queue.put(' ETag: %s' % headers['etag']) + thread_manager.print_msg(' ETag: %s', headers['etag']) if 'x-object-manifest' in headers: - print_queue.put(' Manifest: %s' % - headers['x-object-manifest']) + thread_manager.print_msg(' Manifest: %s', + headers['x-object-manifest']) for key, value in headers.items(): if key.startswith('x-object-meta-'): - print_queue.put( - '%14s: %s' % ('Meta %s' % - key[len('x-object-meta-'):].title(), value)) + thread_manager.print_msg( + '%14s: %s', + 'Meta %s' % key[len('x-object-meta-'):].title(), + value) for key, value in headers.items(): if not key.startswith('x-object-meta-') and key not in ( 'content-type', 'content-length', 'last-modified', 'etag', 'date', 'x-object-manifest'): - print_queue.put( - '%14s: %s' % (key.title(), value)) + thread_manager.print_msg('%14s: %s', key.title(), value) except ClientException as err: if err.http_status != 404: raise - error_queue.put('Object %s not found' % - repr('%s/%s' % (args[0], args[1]))) + thread_manager.error("Object %s/%s not found", args[0], args[1]) else: - error_queue.put('Usage: %s [options] %s' % - (basename(argv[0]), st_stat_help)) + thread_manager.error('Usage: %s [options] %s', basename(argv[0]), + st_stat_help) st_post_help = ''' @@ -800,7 +702,7 @@ post [options] [container] [object] post -m Color:Blue -m Size:Large'''.strip('\n') -def st_post(parser, args, print_queue, error_queue): +def st_post(parser, args, thread_manager): parser.add_option( '-r', '--read-acl', dest='read_acl', help='Sets the ' 'Read ACL for containers. Quick summary of ACL syntax: .r:*, ' @@ -831,19 +733,21 @@ def st_post(parser, args, print_queue, error_queue): exit('-r, -w, -t, and -k options only allowed for containers') conn = get_conn(options) if not args: - headers = split_headers(options.meta, 'X-Account-Meta-', error_queue) + headers = split_headers( + options.meta, 'X-Account-Meta-', thread_manager) try: conn.post_account(headers=headers) except ClientException as err: if err.http_status != 404: raise - error_queue.put('Account not found') + thread_manager.error('Account not found') elif len(args) == 1: if '/' in args[0]: print >> stderr, 'WARNING: / in container name; you might have ' \ 'meant %r instead of %r.' % \ (args[0].replace('/', ' ', 1), args[0]) - headers = split_headers(options.meta, 'X-Container-Meta-', error_queue) + headers = split_headers(options.meta, 'X-Container-Meta-', + thread_manager) if options.read_acl is not None: headers['X-Container-Read'] = options.read_acl if options.write_acl is not None: @@ -859,19 +763,18 @@ def st_post(parser, args, print_queue, error_queue): raise conn.put_container(args[0], headers=headers) elif len(args) == 2: - headers = split_headers(options.meta, 'X-Object-Meta-', error_queue) + headers = split_headers(options.meta, 'X-Object-Meta-', thread_manager) # add header options to the headers object for the request. - headers.update(split_headers(options.header, '', error_queue)) + headers.update(split_headers(options.header, '', thread_manager)) try: conn.post_object(args[0], args[1], headers=headers) except ClientException as err: if err.http_status != 404: raise - error_queue.put('Object %s not found' % - repr('%s/%s' % (args[0], args[1]))) + thread_manager.error("Object '%s/%s' not found", args[0], args[1]) else: - error_queue.put('Usage: %s [options] %s' % - (basename(argv[0]), st_post_help)) + thread_manager.error('Usage: %s [options] %s', basename(argv[0]), + st_post_help) st_upload_help = ''' @@ -885,7 +788,7 @@ upload [options] container file_or_directory [file_or_directory] [...] '''.strip('\n') -def st_upload(parser, args, print_queue, error_queue): +def st_upload(parser, args, thread_manager): parser.add_option( '-c', '--changed', action='store_true', dest='changed', default=False, help='Will only upload files that have changed since ' @@ -924,10 +827,9 @@ def st_upload(parser, args, print_queue, error_queue): (options, args) = parse_args(parser, args) args = args[1:] if len(args) < 2: - error_queue.put('Usage: %s [options] %s' % - (basename(argv[0]), st_upload_help)) + thread_manager.error( + 'Usage: %s [options] %s', basename(argv[0]), st_upload_help) return - object_queue = Queue(10000) def _segment_job(job, conn): if job.get('delete', False): @@ -945,10 +847,10 @@ def st_upload(parser, args, print_queue, error_queue): job['segment_etag'] = etag if options.verbose and 'log_line' in job: if conn.attempts > 1: - print_queue.put('%s [after %d attempts]' % - (job['log_line'], conn.attempts)) + thread_manager.print_msg('%s [after %d attempts]', + job['log_line'], conn.attempts) else: - print_queue.put(job['log_line']) + thread_manager.print_msg(job['log_line']) return job def _object_job(job, conn): @@ -998,13 +900,13 @@ def st_upload(parser, args, print_queue, error_queue): return if not options.leave_segments: old_manifest = headers.get('x-object-manifest') - if utils.config_true_value( + if config_true_value( headers.get('x-static-large-object')): headers, manifest_data = conn.get_object( container, obj, query_string='multipart-manifest=get') for old_seg in json.loads(manifest_data): - seg_path = old_seg['name'].lstrip('/') + seg_path = old_seg['path'].lstrip('/') if isinstance(seg_path, unicode): seg_path = seg_path.encode('utf-8') old_slo_manifest_paths.append(seg_path) @@ -1013,7 +915,7 @@ def st_upload(parser, args, print_queue, error_queue): raise # Merge the command line header options to the put_headers put_headers.update(split_headers(options.header, '', - error_queue)) + thread_manager)) # Don't do segment job if object is not big enough if options.segment_size and \ getsize(path) > int(options.segment_size): @@ -1021,15 +923,15 @@ def st_upload(parser, args, print_queue, error_queue): if options.segment_container: seg_container = options.segment_container full_size = getsize(path) - segment_queue = Queue(10000) - segment_threads = [ - QueueFunctionThread( - segment_queue, _segment_job, - create_connection(), store_results=True) - for _junk in xrange(options.segment_threads)] - for thread in segment_threads: - thread.start() - try: + + slo_segments = [] + error_counter = [0] + segment_manager = thread_manager.queue_manager( + _segment_job, options.segment_threads, + store_results=slo_segments, + error_counter=error_counter, + connection_maker=create_connection) + with segment_manager as segment_queue: segment = 0 segment_start = 0 while segment_start < full_size: @@ -1052,18 +954,12 @@ def st_upload(parser, args, print_queue, error_queue): 'log_line': '%s segment %s' % (obj, segment)}) segment += 1 segment_start += segment_size - finally: - shutdown_worker_threads(segment_queue, segment_threads) - if put_errors_from_threads(segment_threads, - error_queue): - raise ClientException( - 'Aborting manifest creation ' - 'because not all segments could be uploaded. ' - '%s/%s' % (container, obj)) + if error_counter[0]: + raise ClientException( + 'Aborting manifest creation ' + 'because not all segments could be uploaded. %s/%s' + % (container, obj)) if options.use_slo: - slo_segments = [] - for thread in segment_threads: - slo_segments += thread.results slo_segments.sort(key=lambda d: d['segment_index']) for seg in slo_segments: seg_loc = seg['segment_location'].lstrip('/') @@ -1097,7 +993,10 @@ def st_upload(parser, args, print_queue, error_queue): container, obj, open(path, 'rb'), content_length=getsize(path), headers=put_headers) if old_manifest or old_slo_manifest_paths: - segment_queue = Queue(10000) + segment_manager = thread_manager.queue_manager( + _segment_job, options.segment_threads, + connection_maker=create_connection) + segment_queue = segment_manager.queue if old_manifest: scontainer, sprefix = old_manifest.split('/', 1) scontainer = unquote(scontainer) @@ -1118,27 +1017,20 @@ def st_upload(parser, args, print_queue, error_queue): {'delete': True, 'container': scont, 'obj': sobj}) if not segment_queue.empty(): - segment_threads = [ - QueueFunctionThread( - segment_queue, - _segment_job, create_connection()) - for _junk in xrange(options.segment_threads)] - for thread in segment_threads: - thread.start() - shutdown_worker_threads(segment_queue, segment_threads) - put_errors_from_threads(segment_threads, error_queue) + with segment_manager: + pass if options.verbose: if conn.attempts > 1: - print_queue.put( - '%s [after %d attempts]' % (obj, conn.attempts)) + thread_manager.print_msg('%s [after %d attempts]', obj, + conn.attempts) else: - print_queue.put(obj) + thread_manager.print_msg(obj) except OSError as err: if err.errno != ENOENT: raise - error_queue.put('Local file %s not found' % repr(path)) + thread_manager.error('Local file %r not found', path) - def _upload_dir(path): + def _upload_dir(path, object_queue): names = listdir(path) if not names: object_queue.put({'path': path, 'dir_marker': True}) @@ -1146,17 +1038,13 @@ def st_upload(parser, args, print_queue, error_queue): for name in listdir(path): subpath = join(path, name) if isdir(subpath): - _upload_dir(subpath) + _upload_dir(subpath, object_queue) else: object_queue.put({'path': subpath}) create_connection = lambda: get_conn(options) - object_threads = [ - QueueFunctionThread(object_queue, _object_job, create_connection()) - for _junk in xrange(options.object_threads)] - for thread in object_threads: - thread.start() conn = create_connection() + # Try to create the container, just in case it doesn't exist. If this # fails, it might just be because the user doesn't have container PUT # permissions, so we'll ignore any error. If there's really a problem, @@ -1174,34 +1062,38 @@ def st_upload(parser, args, print_queue, error_queue): if msg: msg += ': ' msg += err.http_response_content[:60] - error_queue.put( - 'Error trying to create container %r: %s' % (args[0], msg)) + thread_manager.error( + 'Error trying to create container %r: %s', args[0], + msg) except Exception as err: - error_queue.put( - 'Error trying to create container %r: %s' % (args[0], err)) - - try: - for arg in args[1:]: - if isdir(arg): - _upload_dir(arg) - else: - object_queue.put({'path': arg}) - except ClientException as err: - if err.http_status != 404: - raise - error_queue.put('Account not found') - finally: - shutdown_worker_threads(object_queue, object_threads) - put_errors_from_threads(object_threads, error_queue) + thread_manager.error( + 'Error trying to create container %r: %s', args[0], + err) + + object_manager = thread_manager.queue_manager( + _object_job, options.object_threads, + connection_maker=create_connection) + with object_manager as object_queue: + try: + for arg in args[1:]: + if isdir(arg): + _upload_dir(arg, object_queue) + else: + object_queue.put({'path': arg}) + except ClientException as err: + if err.http_status != 404: + raise + thread_manager.error('Account not found') -def split_headers(options, prefix='', error_queue=None): +def split_headers(options, prefix='', thread_manager=None): """ Splits 'Key: Value' strings and returns them as a dictionary. :param options: An array of 'Key: Value' strings :param prefix: String to prepend to all of the keys in the dictionary. - :param error_queue: Queue for thread safe error reporting. + :param thread_manager: MultiThreadingManager for thread safe error + reporting. """ headers = {} for item in options: @@ -1211,8 +1103,8 @@ def split_headers(options, prefix='', error_queue=None): else: error_string = "Metadata parameter %s must contain a ':'.\n%s" \ % (item, st_post_help) - if error_queue: - error_queue.put(error_string) + if thread_manager: + thread_manager.error(error_string) else: exit(error_string) return headers @@ -1391,7 +1283,7 @@ Examples: help='Specify a CA bundle file to use in verifying a ' 'TLS (https) server certificate. ' 'Defaults to env[OS_CACERT]') - default_val = utils.config_true_value(environ.get('SWIFTCLIENT_INSECURE')) + default_val = config_true_value(environ.get('SWIFTCLIENT_INSECURE')) parser.add_option('--insecure', action="store_true", dest="insecure", default=default_val, @@ -1422,38 +1314,16 @@ Examples: logger = logging.getLogger("swiftclient") logging.basicConfig(level=logging.DEBUG) - print_queue = Queue(10000) - - def _print(item): - if isinstance(item, unicode): - item = item.encode('utf8') - print item + had_error = False - print_thread = QueueFunctionThread(print_queue, _print) - print_thread.start() - - error_count = 0 - error_queue = Queue(10000) - - def _error(item): - global error_count - error_count += 1 - if isinstance(item, unicode): - item = item.encode('utf8') - print >> stderr, item + with MultiThreadingManager() as thread_manager: + parser.usage = globals()['st_%s_help' % args[0]] + try: + globals()['st_%s' % args[0]](parser, argv[1:], thread_manager) + except (ClientException, HTTPException, socket.error) as err: + thread_manager.error(str(err)) - error_thread = QueueFunctionThread(error_queue, _error) - error_thread.start() + had_error = thread_manager.error_count - parser.usage = globals()['st_%s_help' % args[0]] - try: - globals()['st_%s' % args[0]](parser, argv[1:], print_queue, - error_queue) - except (ClientException, HTTPException, socket.error) as err: - error_queue.put(str(err)) - finally: - shutdown_worker_threads(print_queue, [print_thread]) - shutdown_worker_threads(error_queue, [error_thread]) - - if error_count: + if had_error: exit(1) |