diff options
author | Tong Li <litong01@us.ibm.com> | 2013-05-23 16:39:31 -0400 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2013-05-29 19:09:49 +0000 |
commit | 874e0e4427b80e1b15b74a1557b73ba9d61443ca (patch) | |
tree | 321df2707e8eab2441f19ef94f270bc9216ee6d0 /bin | |
parent | 7e84b69291efbc07be6ced17b24ffa42efdc67c0 (diff) | |
download | python-swiftclient-874e0e4427b80e1b15b74a1557b73ba9d61443ca.tar.gz |
remove busy-wait so that swift client won't use up all CPU cycles
The current implementation uses busy-wait and checking flags to
communicate between threads. It wastes a lot of CPU powers. With
python 2.6 is required for Swift, the communication between
threads should now be using queue and signal mechanisms. This
patch removed the busy-wait loops and use queue and queue join
for threads coordination which will not consume CPU cycles if
a thread is blocked.
Change-Id: I648cd637a92a159d5c13baa83f357cee2dfe7937
Diffstat (limited to 'bin')
-rwxr-xr-x | bin/swift | 178 |
1 files changed, 61 insertions, 117 deletions
@@ -22,10 +22,10 @@ from hashlib import md5 from optparse import OptionParser, SUPPRESS_HELP from os import environ, listdir, makedirs, utime, _exit as os_exit from os.path import basename, dirname, getmtime, getsize, isdir, join -from Queue import Empty, Queue +from Queue import Queue from random import shuffle from sys import argv, exc_info, exit, stderr, stdout -from threading import current_thread, enumerate as threading_enumerate, Thread +from threading import enumerate as threading_enumerate, Thread from time import sleep, time from traceback import format_exception from urllib import quote, unquote @@ -79,38 +79,29 @@ def put_errors_from_threads(threads, error_queue): return was_error -def attempt_graceful_exit(signum, frame): - """ - Try to gracefully shut down. Sets abort=True on all non-main threads. - - More importantly, installs the immediate_exit handler on the - signal that triggered this handler. If this function is installed - as a signal handler for SIGINT, then pressing Ctrl-C once will - cause this program to finish operations in progress, then exit. - Pressing it again will cause an immediate exit; no cleanup - handlers will get called. - """ - stderr.write("Attempting graceful exit. " - "Press Ctrl-C again to exit immediately.\n") - main_thread = current_thread() - for thread in [t for t in threading_enumerate() if t is not main_thread]: - thread.abort = True - signal.signal(signum, immediate_exit) +class StopWorkerThreadSignal(object): + pass + + +def shutdown_worker_threads(queue, thread_list): + for thread in [t for t in thread_list if t.isAlive()]: + queue.put(StopWorkerThreadSignal()) def immediate_exit(signum, frame): + stderr.write(" Aborted\n") os_exit(2) class QueueFunctionThread(Thread): def __init__(self, queue, func, *args, **kwargs): - """ Calls func for each item in queue; func is called with a queued - item as the first arg followed by *args and **kwargs. Use the abort - attribute to have the thread empty the queue (without processing) - and exit. """ + """ + Calls func for each item in queue; func is called with a queued + item as the first arg followed by *args and **kwargs. Use the + PriorityQueue for sending quit signal when Ctrl-C is pressed. + """ Thread.__init__(self) - self.abort = False self.queue = queue self.func = func self.args = args @@ -122,21 +113,19 @@ class QueueFunctionThread(Thread): def run(self): while True: try: - item = self.queue.get_nowait() - except Empty: - if self.abort: + item = self.queue.get() + if isinstance(item, StopWorkerThreadSignal): break - sleep(0.01) + except: + # This catch is important and it may occur when ctrl-C is + # pressed, in this case simply quit the thread + break else: try: - if not self.abort: - res = self.func(item, *self.args, **self.kwargs) - if self.store_results: - self.results.append(res) + self.func(item, *self.args, **self.kwargs) except Exception: self.exc_infos.append(exc_info()) - finally: - self.queue.task_done() + st_delete_help = ''' delete [options] --all OR delete container [options] [object] [object] ... @@ -210,12 +199,7 @@ def st_delete(parser, args, print_queue, error_queue): xrange(options.object_threads)] for thread in segment_threads: thread.start() - while not segment_queue.empty(): - sleep(0.01) - for thread in segment_threads: - thread.abort = True - while thread.isAlive(): - thread.join(0.01) + shutdown_worker_threads(segment_queue, segment_threads) put_errors_from_threads(segment_threads, error_queue) if options.verbose: path = options.yes_all and join(container, obj) or obj @@ -237,20 +221,16 @@ def st_delete(parser, args, print_queue, error_queue): def _delete_container(container, conn): try: marker = '' - had_objects = False while True: objects = [o['name'] for o in conn.get_container(container, marker=marker)[1]] if not objects: break - had_objects = True for obj in objects: object_queue.put((container, obj)) marker = objects[-1] - if had_objects: - # By using join() instead of empty() we should avoid most - # occurrences of 409 below. - object_queue.join() + while not object_queue.empty(): + sleep(0.05) attempts = 1 while True: try: @@ -292,10 +272,6 @@ def st_delete(parser, args, print_queue, error_queue): for container in containers: container_queue.put(container) marker = containers[-1] - while not container_queue.empty(): - sleep(0.01) - while not object_queue.empty(): - sleep(0.01) except ClientException as err: if err.http_status != 404: raise @@ -310,19 +286,11 @@ def st_delete(parser, args, print_queue, error_queue): else: for obj in args[1:]: object_queue.put((args[0], obj)) - while not container_queue.empty(): - sleep(0.01) - for thread in container_threads: - thread.abort = True - while thread.isAlive(): - thread.join(0.01) + + shutdown_worker_threads(container_queue, container_threads) put_errors_from_threads(container_threads, error_queue) - while not object_queue.empty(): - sleep(0.01) - for thread in object_threads: - thread.abort = True - while thread.isAlive(): - thread.join(0.01) + + shutdown_worker_threads(object_queue, object_threads) put_errors_from_threads(object_threads, error_queue) @@ -516,19 +484,11 @@ def st_download(parser, args, print_queue, error_queue): else: for obj in args[1:]: object_queue.put((args[0], obj)) - while not container_queue.empty(): - sleep(0.01) - for thread in container_threads: - thread.abort = True - while thread.isAlive(): - thread.join(0.01) + + shutdown_worker_threads(container_queue, container_threads) put_errors_from_threads(container_threads, error_queue) - while not object_queue.empty(): - sleep(0.01) - for thread in object_threads: - thread.abort = True - while thread.isAlive(): - thread.join(0.01) + + shutdown_worker_threads(object_queue, object_threads) put_errors_from_threads(object_threads, error_queue) @@ -969,12 +929,7 @@ def st_upload(parser, args, print_queue, error_queue): 'log_line': '%s segment %s' % (obj, segment)}) segment += 1 segment_start += segment_size - while not segment_queue.empty(): - sleep(0.01) - for thread in segment_threads: - thread.abort = True - while thread.isAlive(): - thread.join(0.01) + shutdown_worker_threads(segment_queue, segment_threads) if put_errors_from_threads(segment_threads, error_queue): raise ClientException( 'Aborting manifest creation ' @@ -1045,12 +1000,7 @@ def st_upload(parser, args, print_queue, error_queue): for _junk in xrange(options.segment_threads)] for thread in segment_threads: thread.start() - while not segment_queue.empty(): - sleep(0.01) - for thread in segment_threads: - thread.abort = True - while thread.isAlive(): - thread.join(0.01) + shutdown_worker_threads(segment_queue, segment_threads) put_errors_from_threads(segment_threads, error_queue) if options.verbose: if conn.attempts > 1: @@ -1110,12 +1060,8 @@ def st_upload(parser, args, print_queue, error_queue): _upload_dir(arg) else: object_queue.put({'path': arg}) - while not object_queue.empty(): - sleep(0.01) - for thread in object_threads: - thread.abort = True - while thread.isAlive(): - thread.join(0.01) + + shutdown_worker_threads(object_queue, object_threads) put_errors_from_threads(object_threads, error_queue) except ClientException as err: if err.http_status != 404: @@ -1333,7 +1279,7 @@ Examples: exit('no such command: %s' % args[0]) exit() - signal.signal(signal.SIGINT, attempt_graceful_exit) + signal.signal(signal.SIGINT, immediate_exit) if options.debug: logger = logging.getLogger("swiftclient") @@ -1347,7 +1293,7 @@ Examples: print item print_thread = QueueFunctionThread(print_queue, _print) - print_thread.start() + print_thread.setDaemon(True) error_count = 0 error_queue = Queue(10000) @@ -1360,28 +1306,26 @@ Examples: print >> stderr, item error_thread = QueueFunctionThread(error_queue, _error) - error_thread.start() + error_thread.setDaemon(True) + parser.usage = globals()['st_%s_help' % args[0]] try: - parser.usage = globals()['st_%s_help' % args[0]] - try: - globals()['st_%s' % args[0]](parser, argv[1:], print_queue, - error_queue) - except (ClientException, HTTPException, socket.error) as err: - error_queue.put(str(err)) - while not print_queue.empty(): - sleep(0.01) - print_thread.abort = True - while print_thread.isAlive(): - print_thread.join(0.01) - while not error_queue.empty(): - sleep(0.01) - error_thread.abort = True - while error_thread.isAlive(): - error_thread.join(0.01) - if error_count: - exit(1) - except (SystemExit, Exception): - for thread in threading_enumerate(): - thread.abort = True - raise + globals()['st_%s' % args[0]](parser, argv[1:], print_queue, + error_queue) + except (ClientException, HTTPException, socket.error) as err: + error_queue.put(str(err)) + + # Let other threads start working, now start print and error thread, + # this is to prevent the main thread shutdown two thread prematurely + print_thread.start() + error_thread.start() + + # If not all the worker threads have finished, then the main thread + # has to wait. Only when there are main, error and print thread left + # the main thread can proceed to finish up. + while (len(threading_enumerate()) > 3 or not error_queue.empty() or + not print_queue.empty()): + sleep(0.5) + + if error_count: + exit(1) |