diff options
-rwxr-xr-x | bin/swift | 100 | ||||
-rw-r--r-- | swiftclient/client.py | 2 | ||||
-rw-r--r-- | tests/__init__.py | 0 | ||||
-rw-r--r-- | tests/test_swiftclient.py | 14 |
4 files changed, 93 insertions, 23 deletions
@@ -13,16 +13,17 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. +import signal import socket from errno import EEXIST, ENOENT from hashlib import md5 from optparse import OptionParser -from os import environ, listdir, makedirs, utime +from os import environ, listdir, makedirs, utime, _exit as os_exit from os.path import basename, dirname, getmtime, getsize, isdir, join from Queue import Empty, Queue from sys import argv, exc_info, exit, stderr, stdout -from threading import enumerate as threading_enumerate, Thread +from threading import current_thread, enumerate as threading_enumerate, Thread from time import sleep from traceback import format_exception @@ -67,6 +68,29 @@ def put_errors_from_threads(threads, error_queue): return was_error +def attempt_graceful_exit(signum, frame): + """ + Try to gracefully shut down. Sets abort=True on all non-main threads. + + More importantly, installs the immediate_exit handler on the + signal that triggered this handler. If this function is installed + as a signal handler for SIGINT, then pressing Ctrl-C once will + cause this program to finish operations in progress, then exit. + Pressing it again will cause an immediate exit; no cleanup + handlers will get called. + """ + stderr.write("Attempting graceful exit. " + "Press Ctrl-C again to exit immediately.\n") + main_thread = current_thread() + for thread in [t for t in threading_enumerate() if t is not main_thread]: + thread.abort = True + signal.signal(signum, immediate_exit) + + +def immediate_exit(signum, frame): + os_exit(2) + + class QueueFunctionThread(Thread): def __init__(self, queue, func, *args, **kwargs): @@ -83,23 +107,24 @@ class QueueFunctionThread(Thread): self.exc_infos = [] def run(self): - try: - while True: + while True: + try: + item = self.queue.get_nowait() + except Empty: + if self.abort: + break + sleep(0.01) + else: try: - item = self.queue.get_nowait() if not self.abort: self.func(item, *self.args, **self.kwargs) + except Exception: + self.exc_infos.append(exc_info()) + finally: self.queue.task_done() - except Empty: - if self.abort: - break - sleep(0.01) - except Exception: - self.exc_infos.append(exc_info()) - st_delete_help = ''' -delete --all OR delete container [--leave-segments] [object] [object] ... +delete [options] --all OR delete container [options] [object] [object] ... Deletes everything in the account (with --all), or everything in a container, or a list of objects depending on the args given. Segments of manifest objects will be deleted as well, unless you specify the @@ -113,6 +138,12 @@ def st_delete(parser, args, print_queue, error_queue): parser.add_option('', '--leave-segments', action='store_true', dest='leave_segments', default=False, help='Indicates that you want ' 'the segments of manifest objects left alone') + parser.add_option('', '--object-threads', type=int, + default=10, help='Number of threads to use for ' + 'deleting objects') + parser.add_option('', '--container-threads', type=int, + default=10, help='Number of threads to use for ' + 'deleting containers') (options, args) = parse_args(parser, args) args = args[1:] if (not args and not options.yes_all) or (args and options.yes_all): @@ -151,7 +182,7 @@ def st_delete(parser, args, print_queue, error_queue): if not segment_queue.empty(): segment_threads = [QueueFunctionThread(segment_queue, _delete_segment, create_connection()) for _junk in - xrange(10)] + xrange(options.object_threads)] for thread in segment_threads: thread.start() while not segment_queue.empty(): @@ -209,12 +240,15 @@ def st_delete(parser, args, print_queue, error_queue): error_queue.put('Container %s not found' % repr(container)) create_connection = lambda: get_conn(options) - object_threads = [QueueFunctionThread(object_queue, _delete_object, - create_connection()) for _junk in xrange(10)] + object_threads = \ + [QueueFunctionThread(object_queue, _delete_object, create_connection()) + for _junk in xrange(options.object_threads)] for thread in object_threads: thread.start() - container_threads = [QueueFunctionThread(container_queue, - _delete_container, create_connection()) for _junk in xrange(10)] + container_threads = \ + [QueueFunctionThread(container_queue, _delete_container, + create_connection()) + for _junk in xrange(options.container_threads)] for thread in container_threads: thread.start() if not args: @@ -281,6 +315,12 @@ def st_download(parser, args, print_queue, error_queue): 'account download') parser.add_option('-o', '--output', dest='out_file', help='For a single ' 'file download, stream the output to an alternate location ') + parser.add_option('', '--object-threads', type=int, + default=10, help='Number of threads to use for ' + 'downloading objects') + parser.add_option('', '--container-threads', type=int, + default=10, help='Number of threads to use for ' + 'listing containers') (options, args) = parse_args(parser, args) args = args[1:] if options.out_file == '-': @@ -386,11 +426,12 @@ def st_download(parser, args, print_queue, error_queue): create_connection = lambda: get_conn(options) object_threads = [QueueFunctionThread(object_queue, _download_object, - create_connection()) for _junk in xrange(10)] + create_connection()) for _junk in xrange(options.object_threads)] for thread in object_threads: thread.start() container_threads = [QueueFunctionThread(container_queue, - _download_container, create_connection()) for _junk in xrange(10)] + _download_container, create_connection()) + for _junk in xrange(options.container_threads)] for thread in container_threads: thread.start() if not args: @@ -703,6 +744,12 @@ def st_upload(parser, args, print_queue, error_queue): dest='leave_segments', default=False, help='Indicates that you want ' 'the older segments of manifest objects left alone (in the case of ' 'overwrites)') + parser.add_option('', '--object-threads', type=int, + default=10, help='Number of threads to use for ' + 'uploading full objects') + parser.add_option('', '--segment-threads', type=int, + default=10, help='Number of threads to use for ' + 'uploading object segments') (options, args) = parse_args(parser, args) args = args[1:] if len(args) < 2: @@ -780,7 +827,7 @@ def st_upload(parser, args, print_queue, error_queue): segment_queue = Queue(10000) segment_threads = [QueueFunctionThread(segment_queue, _segment_job, create_connection()) for _junk in - xrange(10)] + xrange(options.segment_threads)] for thread in segment_threads: thread.start() segment = 0 @@ -829,7 +876,7 @@ def st_upload(parser, args, print_queue, error_queue): if not segment_queue.empty(): segment_threads = [QueueFunctionThread(segment_queue, _segment_job, create_connection()) for _junk in - xrange(10)] + xrange(options.segment_threads)] for thread in segment_threads: thread.start() while not segment_queue.empty(): @@ -864,7 +911,7 @@ def st_upload(parser, args, print_queue, error_queue): create_connection = lambda: get_conn(options) object_threads = [QueueFunctionThread(object_queue, _object_job, - create_connection()) for _junk in xrange(10)] + create_connection()) for _junk in xrange(options.object_threads)] for thread in object_threads: thread.start() conn = create_connection() @@ -1030,6 +1077,8 @@ Example: exit('no such command: %s' % args[0]) exit() + signal.signal(signal.SIGINT, attempt_graceful_exit) + print_queue = Queue(10000) def _print(item): @@ -1040,9 +1089,12 @@ Example: print_thread = QueueFunctionThread(print_queue, _print) print_thread.start() + error_count = 0 error_queue = Queue(10000) def _error(item): + global error_count + error_count += 1 if isinstance(item, unicode): item = item.encode('utf8') print >> stderr, item @@ -1067,6 +1119,8 @@ Example: error_thread.abort = True while error_thread.isAlive(): error_thread.join(0.01) + if error_count: + exit(1) except (SystemExit, Exception): for thread in threading_enumerate(): thread.abort = True diff --git a/swiftclient/client.py b/swiftclient/client.py index 2b78060..79e6594 100644 --- a/swiftclient/client.py +++ b/swiftclient/client.py @@ -282,6 +282,8 @@ def get_auth(url, user, key, snet=False, tenant_name=None, auth_version="1.0"): if auth_version in ["1.0", "1"]: return _get_auth_v1_0(url, user, key, snet) elif auth_version in ["2.0", "2"]: + if not tenant_name and ':' in user: + (tenant_name, user) = user.split(':') if not tenant_name: raise ClientException('No tenant specified') return _get_auth_v2_0(url, user, tenant_name, key, snet) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/__init__.py diff --git a/tests/test_swiftclient.py b/tests/test_swiftclient.py index bf8adb9..b165dee 100644 --- a/tests/test_swiftclient.py +++ b/tests/test_swiftclient.py @@ -201,6 +201,20 @@ class TestGetAuth(MockHttpTest): 'http://www.tests.com', 'asdf', 'asdf', auth_version='2.0') + def test_auth_v2_with_tenant_user_in_user(self): + def read(*args, **kwargs): + acct_url = 'http://127.0.01/AUTH_FOO' + body = {'access': {'serviceCatalog': + [{u'endpoints': [{'publicURL': acct_url}], + 'type': 'object-store'}], + 'token': {'id': 'XXXXXXX'}}} + return c.json_dumps(body) + c.http_connection = self.fake_http_connection(200, return_read=read) + url, token = c.get_auth('http://www.test.com', 'foo:bar', 'asdf', + auth_version="2.0") + self.assertTrue(url.startswith("http")) + self.assertTrue(token) + class TestGetAccount(MockHttpTest): |