From f022aac0cf460ca4d3208ba1c22fae5f32ae34af Mon Sep 17 00:00:00 2001 From: Darrell Bishop Date: Wed, 26 Jun 2013 11:41:29 -0700 Subject: Add -p option to download command. Allow the ability to download a subset of containers (--all with -p) or a subset of objects within a container (container name with -p). This patch also includes a drive-by fix for "download --all" which would not actually download any objects (for me, at least) because the object queue got filled with "stop" messages before the container workers had run long enough to put work in the object queue. Doh! I also closed up a few holes where an (unexpected, obviously) Exception could cause the process to hang because non-daemon threads still existed. Change-Id: I71c6935c60282b5353badc2dfce8a935d47e3bb7 --- bin/swift | 281 +++++++++++++++++++++++++++++++++----------------------------- 1 file changed, 151 insertions(+), 130 deletions(-) (limited to 'bin') diff --git a/bin/swift b/bin/swift index 8cce1c3..9ca0745 100755 --- a/bin/swift +++ b/bin/swift @@ -25,7 +25,7 @@ from os.path import basename, dirname, getmtime, getsize, isdir, join from Queue import Queue from random import shuffle from sys import argv, exc_info, exit, stderr, stdout -from threading import enumerate as threading_enumerate, Thread +from threading import Thread from time import sleep, time, gmtime, strftime from traceback import format_exception from urllib import quote, unquote @@ -84,16 +84,6 @@ class StopWorkerThreadSignal(object): pass -def shutdown_worker_threads(queue, thread_list): - for thread in [t for t in thread_list if t.isAlive()]: - queue.put(StopWorkerThreadSignal()) - - -def immediate_exit(signum, frame): - stderr.write(" Aborted\n") - os_exit(2) - - class QueueFunctionThread(Thread): def __init__(self, queue, func, *args, **kwargs): @@ -128,6 +118,24 @@ class QueueFunctionThread(Thread): self.exc_infos.append(exc_info()) +def shutdown_worker_threads(queue, thread_list): + """ + Takes a job queue and a list of associated QueueFunctionThread objects, + puts a StopWorkerThreadSignal object into the queue, and waits for the + queue to flush. + """ + for thread in [t for t in thread_list if t.isAlive()]: + queue.put(StopWorkerThreadSignal()) + + while any(map(QueueFunctionThread.is_alive, thread_list)): + sleep(0.05) + + +def immediate_exit(signum, frame): + stderr.write(" Aborted\n") + os_exit(2) + + st_delete_help = ''' delete [options] --all OR delete container [options] [object] [object] ... Deletes everything in the account (with --all), or everything in a @@ -261,47 +269,52 @@ def st_delete(parser, args, print_queue, error_queue): for _junk in xrange(options.container_threads)] for thread in container_threads: thread.start() - if not args: - conn = create_connection() - try: - marker = '' - while True: - containers = \ - [c['name'] for c in conn.get_account(marker=marker)[1]] - if not containers: - break - for container in containers: - container_queue.put(container) - marker = containers[-1] - except ClientException as err: - if err.http_status != 404: - raise - error_queue.put('Account not found') - elif len(args) == 1: - if '/' in args[0]: - print >> stderr, 'WARNING: / in container name; you might have ' \ - 'meant %r instead of %r.' % \ - (args[0].replace('/', ' ', 1), args[0]) - conn = create_connection() - _delete_container(args[0], conn) - else: - for obj in args[1:]: - object_queue.put((args[0], obj)) - shutdown_worker_threads(container_queue, container_threads) - put_errors_from_threads(container_threads, error_queue) + try: + if not args: + conn = create_connection() + try: + marker = '' + while True: + containers = [ + c['name'] for c in conn.get_account(marker=marker)[1]] + if not containers: + break + for container in containers: + container_queue.put(container) + marker = containers[-1] + except ClientException as err: + if err.http_status != 404: + raise + error_queue.put('Account not found') + elif len(args) == 1: + if '/' in args[0]: + print >> stderr, 'WARNING: / in container name; you might ' \ + 'have meant %r instead of %r.' % ( + args[0].replace('/', ' ', 1), args[0]) + conn = create_connection() + _delete_container(args[0], conn) + else: + for obj in args[1:]: + object_queue.put((args[0], obj)) + finally: + shutdown_worker_threads(container_queue, container_threads) + put_errors_from_threads(container_threads, error_queue) - shutdown_worker_threads(object_queue, object_threads) - put_errors_from_threads(object_threads, error_queue) + shutdown_worker_threads(object_queue, object_threads) + put_errors_from_threads(object_threads, error_queue) st_download_help = ''' -download --all OR download container [options] [object] [object] ... - Downloads everything in the account (with --all), or everything in a - container, or a list of objects depending on the args given. For a single - object download, you may use the -o [--output] option to - redirect the output to a specific file or if "-" then just redirect to - stdout.'''.strip('\n') +download --all [options] OR download container [options] [object] [object] ... + Downloads everything in the account (with --all), or everything in all + containers in the account matching a prefix (with --all and -p [--prefix]), + or everything in a container, or a subset of a container with -p + [--prefix], or a list of objects depending on the args given. -p or + --prefix is an option that will only download items beginning with that + prefix. For a single object download, you may use the -o [--output] + option to redirect the output to a specific file or if "-" then + just redirect to stdout.'''.strip('\n') def st_download(parser, args, print_queue, error_queue): @@ -313,6 +326,9 @@ def st_download(parser, args, print_queue, error_queue): '-m', '--marker', dest='marker', default='', help='Marker to use when starting a container or ' 'account download') + parser.add_option( + '-p', '--prefix', dest='prefix', + help='Will only download items beginning with the prefix') parser.add_option( '-o', '--output', dest='out_file', help='For a single ' 'file download, stream the output to an alternate location ') @@ -426,12 +442,14 @@ def st_download(parser, args, print_queue, error_queue): container_queue = Queue(10000) - def _download_container(container, conn): + def _download_container(container, conn, prefix=None): try: marker = options.marker while True: - objects = [o['name'] for o in - conn.get_container(container, marker=marker)[1]] + objects = [ + o['name'] for o in + conn.get_container(container, marker=marker, + prefix=prefix)[1]] if not objects: break marker = objects[-1] @@ -455,42 +473,50 @@ def st_download(parser, args, print_queue, error_queue): for _junk in xrange(options.container_threads)] for thread in container_threads: thread.start() - if not args: - conn = create_connection() - try: - marker = options.marker - while True: - containers = [c['name'] - for c in conn.get_account(marker=marker)[1]] - if not containers: - break - marker = containers[-1] - shuffle(containers) - for container in containers: - container_queue.put(container) - except ClientException as err: - if err.http_status != 404: - raise - error_queue.put('Account not found') - elif len(args) == 1: - if '/' in args[0]: - print >> stderr, 'WARNING: / in container name; you might have ' \ - 'meant %r instead of %r.' % \ - (args[0].replace('/', ' ', 1), args[0]) - _download_container(args[0], create_connection()) - else: - if len(args) == 2: - obj = args[1] - object_queue.put((args[0], obj, options.out_file)) - else: - for obj in args[1:]: - object_queue.put((args[0], obj)) - shutdown_worker_threads(container_queue, container_threads) - put_errors_from_threads(container_threads, error_queue) + # We musn't let the main thread die with an exception while non-daemonic + # threads exist or the process with hang and ignore Ctrl-C. So we catch + # anything and tidy up the threads in a finally block. + try: + if not args: + # --all case + conn = create_connection() + try: + marker = options.marker + while True: + containers = [ + c['name'] for c in conn.get_account( + marker=marker, prefix=options.prefix)[1]] + if not containers: + break + marker = containers[-1] + shuffle(containers) + for container in containers: + container_queue.put(container) + except ClientException as err: + if err.http_status != 404: + raise + error_queue.put('Account not found') + elif len(args) == 1: + if '/' in args[0]: + print >> stderr, ('WARNING: / in container name; you might ' + 'have meant %r instead of %r.' % ( + args[0].replace('/', ' ', 1), args[0])) + _download_container(args[0], create_connection(), + options.prefix) + else: + if len(args) == 2: + obj = args[1] + object_queue.put((args[0], obj, options.out_file)) + else: + for obj in args[1:]: + object_queue.put((args[0], obj)) + finally: + shutdown_worker_threads(container_queue, container_threads) + put_errors_from_threads(container_threads, error_queue) - shutdown_worker_threads(object_queue, object_threads) - put_errors_from_threads(object_threads, error_queue) + shutdown_worker_threads(object_queue, object_threads) + put_errors_from_threads(object_threads, error_queue) def prt_bytes(bytes, human_flag): @@ -546,7 +572,7 @@ def st_list(parser, args, print_queue, error_queue): parser.add_option( '-d', '--delimiter', dest='delimiter', help='Will roll up items with the given delimiter' - ' (see Cloud Files general documentation for what this means)') + ' (see OpenStack Swift API documentation for what this means)') (options, args) = parse_args(parser, args) args = args[1:] if options.delimiter and not args: @@ -971,34 +997,37 @@ def st_upload(parser, args, print_queue, error_queue): for _junk in xrange(options.segment_threads)] for thread in segment_threads: thread.start() - segment = 0 - segment_start = 0 - while segment_start < full_size: - segment_size = int(options.segment_size) - if segment_start + segment_size > full_size: - segment_size = full_size - segment_start - if options.use_slo: - segment_name = '%s/slo/%s/%s/%s/%08d' % ( - obj, put_headers['x-object-meta-mtime'], - full_size, options.segment_size, segment) - else: - segment_name = '%s/%s/%s/%s/%08d' % ( - obj, put_headers['x-object-meta-mtime'], - full_size, options.segment_size, segment) - segment_queue.put( - {'path': path, 'obj': segment_name, - 'segment_start': segment_start, - 'segment_size': segment_size, - 'segment_index': segment, - 'log_line': '%s segment %s' % (obj, segment)}) - segment += 1 - segment_start += segment_size - shutdown_worker_threads(segment_queue, segment_threads) - if put_errors_from_threads(segment_threads, error_queue): - raise ClientException( - 'Aborting manifest creation ' - 'because not all segments could be uploaded. %s/%s' - % (container, obj)) + try: + segment = 0 + segment_start = 0 + while segment_start < full_size: + segment_size = int(options.segment_size) + if segment_start + segment_size > full_size: + segment_size = full_size - segment_start + if options.use_slo: + segment_name = '%s/slo/%s/%s/%s/%08d' % ( + obj, put_headers['x-object-meta-mtime'], + full_size, options.segment_size, segment) + else: + segment_name = '%s/%s/%s/%s/%08d' % ( + obj, put_headers['x-object-meta-mtime'], + full_size, options.segment_size, segment) + segment_queue.put( + {'path': path, 'obj': segment_name, + 'segment_start': segment_start, + 'segment_size': segment_size, + 'segment_index': segment, + 'log_line': '%s segment %s' % (obj, segment)}) + segment += 1 + segment_start += segment_size + finally: + shutdown_worker_threads(segment_queue, segment_threads) + if put_errors_from_threads(segment_threads, + error_queue): + raise ClientException( + 'Aborting manifest creation ' + 'because not all segments could be uploaded. ' + '%s/%s' % (container, obj)) if options.use_slo: slo_segments = [] for thread in segment_threads: @@ -1118,19 +1147,20 @@ def st_upload(parser, args, print_queue, error_queue): except Exception as err: error_queue.put( 'Error trying to create container %r: %s' % (args[0], err)) + try: for arg in args[1:]: if isdir(arg): _upload_dir(arg) else: object_queue.put({'path': arg}) - - shutdown_worker_threads(object_queue, object_threads) - put_errors_from_threads(object_threads, error_queue) except ClientException as err: if err.http_status != 404: raise error_queue.put('Account not found') + finally: + shutdown_worker_threads(object_queue, object_threads) + put_errors_from_threads(object_threads, error_queue) def split_headers(options, prefix='', error_queue=None): @@ -1364,7 +1394,7 @@ Examples: print item print_thread = QueueFunctionThread(print_queue, _print) - print_thread.setDaemon(True) + print_thread.start() error_count = 0 error_queue = Queue(10000) @@ -1377,7 +1407,7 @@ Examples: print >> stderr, item error_thread = QueueFunctionThread(error_queue, _error) - error_thread.setDaemon(True) + error_thread.start() parser.usage = globals()['st_%s_help' % args[0]] try: @@ -1385,18 +1415,9 @@ Examples: error_queue) except (ClientException, HTTPException, socket.error) as err: error_queue.put(str(err)) - - # Let other threads start working, now start print and error thread, - # this is to prevent the main thread shutdown two thread prematurely - print_thread.start() - error_thread.start() - - # If not all the worker threads have finished, then the main thread - # has to wait. Only when there are main, error and print thread left - # the main thread can proceed to finish up. - while (len(threading_enumerate()) > 3 or not error_queue.empty() or - not print_queue.empty()): - sleep(0.5) + finally: + shutdown_worker_threads(print_queue, [print_thread]) + shutdown_worker_threads(error_queue, [error_thread]) if error_count: exit(1) -- cgit v1.2.1