summaryrefslogtreecommitdiff
path: root/bin
diff options
context:
space:
mode:
authorTong Li <litong01@us.ibm.com>2013-05-23 16:39:31 -0400
committerGerrit Code Review <review@openstack.org>2013-05-29 19:09:49 +0000
commit874e0e4427b80e1b15b74a1557b73ba9d61443ca (patch)
tree321df2707e8eab2441f19ef94f270bc9216ee6d0 /bin
parent7e84b69291efbc07be6ced17b24ffa42efdc67c0 (diff)
downloadpython-swiftclient-874e0e4427b80e1b15b74a1557b73ba9d61443ca.tar.gz
remove busy-wait so that swift client won't use up all CPU cycles
The current implementation uses busy-wait and checking flags to communicate between threads. It wastes a lot of CPU powers. With python 2.6 is required for Swift, the communication between threads should now be using queue and signal mechanisms. This patch removed the busy-wait loops and use queue and queue join for threads coordination which will not consume CPU cycles if a thread is blocked. Change-Id: I648cd637a92a159d5c13baa83f357cee2dfe7937
Diffstat (limited to 'bin')
-rwxr-xr-xbin/swift178
1 files changed, 61 insertions, 117 deletions
diff --git a/bin/swift b/bin/swift
index f43e5fd..e30d697 100755
--- a/bin/swift
+++ b/bin/swift
@@ -22,10 +22,10 @@ from hashlib import md5
from optparse import OptionParser, SUPPRESS_HELP
from os import environ, listdir, makedirs, utime, _exit as os_exit
from os.path import basename, dirname, getmtime, getsize, isdir, join
-from Queue import Empty, Queue
+from Queue import Queue
from random import shuffle
from sys import argv, exc_info, exit, stderr, stdout
-from threading import current_thread, enumerate as threading_enumerate, Thread
+from threading import enumerate as threading_enumerate, Thread
from time import sleep, time
from traceback import format_exception
from urllib import quote, unquote
@@ -79,38 +79,29 @@ def put_errors_from_threads(threads, error_queue):
return was_error
-def attempt_graceful_exit(signum, frame):
- """
- Try to gracefully shut down. Sets abort=True on all non-main threads.
-
- More importantly, installs the immediate_exit handler on the
- signal that triggered this handler. If this function is installed
- as a signal handler for SIGINT, then pressing Ctrl-C once will
- cause this program to finish operations in progress, then exit.
- Pressing it again will cause an immediate exit; no cleanup
- handlers will get called.
- """
- stderr.write("Attempting graceful exit. "
- "Press Ctrl-C again to exit immediately.\n")
- main_thread = current_thread()
- for thread in [t for t in threading_enumerate() if t is not main_thread]:
- thread.abort = True
- signal.signal(signum, immediate_exit)
+class StopWorkerThreadSignal(object):
+ pass
+
+
+def shutdown_worker_threads(queue, thread_list):
+ for thread in [t for t in thread_list if t.isAlive()]:
+ queue.put(StopWorkerThreadSignal())
def immediate_exit(signum, frame):
+ stderr.write(" Aborted\n")
os_exit(2)
class QueueFunctionThread(Thread):
def __init__(self, queue, func, *args, **kwargs):
- """ Calls func for each item in queue; func is called with a queued
- item as the first arg followed by *args and **kwargs. Use the abort
- attribute to have the thread empty the queue (without processing)
- and exit. """
+ """
+ Calls func for each item in queue; func is called with a queued
+ item as the first arg followed by *args and **kwargs. Use the
+ PriorityQueue for sending quit signal when Ctrl-C is pressed.
+ """
Thread.__init__(self)
- self.abort = False
self.queue = queue
self.func = func
self.args = args
@@ -122,21 +113,19 @@ class QueueFunctionThread(Thread):
def run(self):
while True:
try:
- item = self.queue.get_nowait()
- except Empty:
- if self.abort:
+ item = self.queue.get()
+ if isinstance(item, StopWorkerThreadSignal):
break
- sleep(0.01)
+ except:
+ # This catch is important and it may occur when ctrl-C is
+ # pressed, in this case simply quit the thread
+ break
else:
try:
- if not self.abort:
- res = self.func(item, *self.args, **self.kwargs)
- if self.store_results:
- self.results.append(res)
+ self.func(item, *self.args, **self.kwargs)
except Exception:
self.exc_infos.append(exc_info())
- finally:
- self.queue.task_done()
+
st_delete_help = '''
delete [options] --all OR delete container [options] [object] [object] ...
@@ -210,12 +199,7 @@ def st_delete(parser, args, print_queue, error_queue):
xrange(options.object_threads)]
for thread in segment_threads:
thread.start()
- while not segment_queue.empty():
- sleep(0.01)
- for thread in segment_threads:
- thread.abort = True
- while thread.isAlive():
- thread.join(0.01)
+ shutdown_worker_threads(segment_queue, segment_threads)
put_errors_from_threads(segment_threads, error_queue)
if options.verbose:
path = options.yes_all and join(container, obj) or obj
@@ -237,20 +221,16 @@ def st_delete(parser, args, print_queue, error_queue):
def _delete_container(container, conn):
try:
marker = ''
- had_objects = False
while True:
objects = [o['name'] for o in
conn.get_container(container, marker=marker)[1]]
if not objects:
break
- had_objects = True
for obj in objects:
object_queue.put((container, obj))
marker = objects[-1]
- if had_objects:
- # By using join() instead of empty() we should avoid most
- # occurrences of 409 below.
- object_queue.join()
+ while not object_queue.empty():
+ sleep(0.05)
attempts = 1
while True:
try:
@@ -292,10 +272,6 @@ def st_delete(parser, args, print_queue, error_queue):
for container in containers:
container_queue.put(container)
marker = containers[-1]
- while not container_queue.empty():
- sleep(0.01)
- while not object_queue.empty():
- sleep(0.01)
except ClientException as err:
if err.http_status != 404:
raise
@@ -310,19 +286,11 @@ def st_delete(parser, args, print_queue, error_queue):
else:
for obj in args[1:]:
object_queue.put((args[0], obj))
- while not container_queue.empty():
- sleep(0.01)
- for thread in container_threads:
- thread.abort = True
- while thread.isAlive():
- thread.join(0.01)
+
+ shutdown_worker_threads(container_queue, container_threads)
put_errors_from_threads(container_threads, error_queue)
- while not object_queue.empty():
- sleep(0.01)
- for thread in object_threads:
- thread.abort = True
- while thread.isAlive():
- thread.join(0.01)
+
+ shutdown_worker_threads(object_queue, object_threads)
put_errors_from_threads(object_threads, error_queue)
@@ -516,19 +484,11 @@ def st_download(parser, args, print_queue, error_queue):
else:
for obj in args[1:]:
object_queue.put((args[0], obj))
- while not container_queue.empty():
- sleep(0.01)
- for thread in container_threads:
- thread.abort = True
- while thread.isAlive():
- thread.join(0.01)
+
+ shutdown_worker_threads(container_queue, container_threads)
put_errors_from_threads(container_threads, error_queue)
- while not object_queue.empty():
- sleep(0.01)
- for thread in object_threads:
- thread.abort = True
- while thread.isAlive():
- thread.join(0.01)
+
+ shutdown_worker_threads(object_queue, object_threads)
put_errors_from_threads(object_threads, error_queue)
@@ -969,12 +929,7 @@ def st_upload(parser, args, print_queue, error_queue):
'log_line': '%s segment %s' % (obj, segment)})
segment += 1
segment_start += segment_size
- while not segment_queue.empty():
- sleep(0.01)
- for thread in segment_threads:
- thread.abort = True
- while thread.isAlive():
- thread.join(0.01)
+ shutdown_worker_threads(segment_queue, segment_threads)
if put_errors_from_threads(segment_threads, error_queue):
raise ClientException(
'Aborting manifest creation '
@@ -1045,12 +1000,7 @@ def st_upload(parser, args, print_queue, error_queue):
for _junk in xrange(options.segment_threads)]
for thread in segment_threads:
thread.start()
- while not segment_queue.empty():
- sleep(0.01)
- for thread in segment_threads:
- thread.abort = True
- while thread.isAlive():
- thread.join(0.01)
+ shutdown_worker_threads(segment_queue, segment_threads)
put_errors_from_threads(segment_threads, error_queue)
if options.verbose:
if conn.attempts > 1:
@@ -1110,12 +1060,8 @@ def st_upload(parser, args, print_queue, error_queue):
_upload_dir(arg)
else:
object_queue.put({'path': arg})
- while not object_queue.empty():
- sleep(0.01)
- for thread in object_threads:
- thread.abort = True
- while thread.isAlive():
- thread.join(0.01)
+
+ shutdown_worker_threads(object_queue, object_threads)
put_errors_from_threads(object_threads, error_queue)
except ClientException as err:
if err.http_status != 404:
@@ -1333,7 +1279,7 @@ Examples:
exit('no such command: %s' % args[0])
exit()
- signal.signal(signal.SIGINT, attempt_graceful_exit)
+ signal.signal(signal.SIGINT, immediate_exit)
if options.debug:
logger = logging.getLogger("swiftclient")
@@ -1347,7 +1293,7 @@ Examples:
print item
print_thread = QueueFunctionThread(print_queue, _print)
- print_thread.start()
+ print_thread.setDaemon(True)
error_count = 0
error_queue = Queue(10000)
@@ -1360,28 +1306,26 @@ Examples:
print >> stderr, item
error_thread = QueueFunctionThread(error_queue, _error)
- error_thread.start()
+ error_thread.setDaemon(True)
+ parser.usage = globals()['st_%s_help' % args[0]]
try:
- parser.usage = globals()['st_%s_help' % args[0]]
- try:
- globals()['st_%s' % args[0]](parser, argv[1:], print_queue,
- error_queue)
- except (ClientException, HTTPException, socket.error) as err:
- error_queue.put(str(err))
- while not print_queue.empty():
- sleep(0.01)
- print_thread.abort = True
- while print_thread.isAlive():
- print_thread.join(0.01)
- while not error_queue.empty():
- sleep(0.01)
- error_thread.abort = True
- while error_thread.isAlive():
- error_thread.join(0.01)
- if error_count:
- exit(1)
- except (SystemExit, Exception):
- for thread in threading_enumerate():
- thread.abort = True
- raise
+ globals()['st_%s' % args[0]](parser, argv[1:], print_queue,
+ error_queue)
+ except (ClientException, HTTPException, socket.error) as err:
+ error_queue.put(str(err))
+
+ # Let other threads start working, now start print and error thread,
+ # this is to prevent the main thread shutdown two thread prematurely
+ print_thread.start()
+ error_thread.start()
+
+ # If not all the worker threads have finished, then the main thread
+ # has to wait. Only when there are main, error and print thread left
+ # the main thread can proceed to finish up.
+ while (len(threading_enumerate()) > 3 or not error_queue.empty() or
+ not print_queue.empty()):
+ sleep(0.5)
+
+ if error_count:
+ exit(1)