summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xbin/swift100
-rw-r--r--swiftclient/client.py2
-rw-r--r--tests/__init__.py0
-rw-r--r--tests/test_swiftclient.py14
4 files changed, 93 insertions, 23 deletions
diff --git a/bin/swift b/bin/swift
index bce6bbc..acc0855 100755
--- a/bin/swift
+++ b/bin/swift
@@ -13,16 +13,17 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import signal
import socket
from errno import EEXIST, ENOENT
from hashlib import md5
from optparse import OptionParser
-from os import environ, listdir, makedirs, utime
+from os import environ, listdir, makedirs, utime, _exit as os_exit
from os.path import basename, dirname, getmtime, getsize, isdir, join
from Queue import Empty, Queue
from sys import argv, exc_info, exit, stderr, stdout
-from threading import enumerate as threading_enumerate, Thread
+from threading import current_thread, enumerate as threading_enumerate, Thread
from time import sleep
from traceback import format_exception
@@ -67,6 +68,29 @@ def put_errors_from_threads(threads, error_queue):
return was_error
+def attempt_graceful_exit(signum, frame):
+ """
+ Try to gracefully shut down. Sets abort=True on all non-main threads.
+
+ More importantly, installs the immediate_exit handler on the
+ signal that triggered this handler. If this function is installed
+ as a signal handler for SIGINT, then pressing Ctrl-C once will
+ cause this program to finish operations in progress, then exit.
+ Pressing it again will cause an immediate exit; no cleanup
+ handlers will get called.
+ """
+ stderr.write("Attempting graceful exit. "
+ "Press Ctrl-C again to exit immediately.\n")
+ main_thread = current_thread()
+ for thread in [t for t in threading_enumerate() if t is not main_thread]:
+ thread.abort = True
+ signal.signal(signum, immediate_exit)
+
+
+def immediate_exit(signum, frame):
+ os_exit(2)
+
+
class QueueFunctionThread(Thread):
def __init__(self, queue, func, *args, **kwargs):
@@ -83,23 +107,24 @@ class QueueFunctionThread(Thread):
self.exc_infos = []
def run(self):
- try:
- while True:
+ while True:
+ try:
+ item = self.queue.get_nowait()
+ except Empty:
+ if self.abort:
+ break
+ sleep(0.01)
+ else:
try:
- item = self.queue.get_nowait()
if not self.abort:
self.func(item, *self.args, **self.kwargs)
+ except Exception:
+ self.exc_infos.append(exc_info())
+ finally:
self.queue.task_done()
- except Empty:
- if self.abort:
- break
- sleep(0.01)
- except Exception:
- self.exc_infos.append(exc_info())
-
st_delete_help = '''
-delete --all OR delete container [--leave-segments] [object] [object] ...
+delete [options] --all OR delete container [options] [object] [object] ...
Deletes everything in the account (with --all), or everything in a
container, or a list of objects depending on the args given. Segments of
manifest objects will be deleted as well, unless you specify the
@@ -113,6 +138,12 @@ def st_delete(parser, args, print_queue, error_queue):
parser.add_option('', '--leave-segments', action='store_true',
dest='leave_segments', default=False, help='Indicates that you want '
'the segments of manifest objects left alone')
+ parser.add_option('', '--object-threads', type=int,
+ default=10, help='Number of threads to use for '
+ 'deleting objects')
+ parser.add_option('', '--container-threads', type=int,
+ default=10, help='Number of threads to use for '
+ 'deleting containers')
(options, args) = parse_args(parser, args)
args = args[1:]
if (not args and not options.yes_all) or (args and options.yes_all):
@@ -151,7 +182,7 @@ def st_delete(parser, args, print_queue, error_queue):
if not segment_queue.empty():
segment_threads = [QueueFunctionThread(segment_queue,
_delete_segment, create_connection()) for _junk in
- xrange(10)]
+ xrange(options.object_threads)]
for thread in segment_threads:
thread.start()
while not segment_queue.empty():
@@ -209,12 +240,15 @@ def st_delete(parser, args, print_queue, error_queue):
error_queue.put('Container %s not found' % repr(container))
create_connection = lambda: get_conn(options)
- object_threads = [QueueFunctionThread(object_queue, _delete_object,
- create_connection()) for _junk in xrange(10)]
+ object_threads = \
+ [QueueFunctionThread(object_queue, _delete_object, create_connection())
+ for _junk in xrange(options.object_threads)]
for thread in object_threads:
thread.start()
- container_threads = [QueueFunctionThread(container_queue,
- _delete_container, create_connection()) for _junk in xrange(10)]
+ container_threads = \
+ [QueueFunctionThread(container_queue, _delete_container,
+ create_connection())
+ for _junk in xrange(options.container_threads)]
for thread in container_threads:
thread.start()
if not args:
@@ -281,6 +315,12 @@ def st_download(parser, args, print_queue, error_queue):
'account download')
parser.add_option('-o', '--output', dest='out_file', help='For a single '
'file download, stream the output to an alternate location ')
+ parser.add_option('', '--object-threads', type=int,
+ default=10, help='Number of threads to use for '
+ 'downloading objects')
+ parser.add_option('', '--container-threads', type=int,
+ default=10, help='Number of threads to use for '
+ 'listing containers')
(options, args) = parse_args(parser, args)
args = args[1:]
if options.out_file == '-':
@@ -386,11 +426,12 @@ def st_download(parser, args, print_queue, error_queue):
create_connection = lambda: get_conn(options)
object_threads = [QueueFunctionThread(object_queue, _download_object,
- create_connection()) for _junk in xrange(10)]
+ create_connection()) for _junk in xrange(options.object_threads)]
for thread in object_threads:
thread.start()
container_threads = [QueueFunctionThread(container_queue,
- _download_container, create_connection()) for _junk in xrange(10)]
+ _download_container, create_connection())
+ for _junk in xrange(options.container_threads)]
for thread in container_threads:
thread.start()
if not args:
@@ -703,6 +744,12 @@ def st_upload(parser, args, print_queue, error_queue):
dest='leave_segments', default=False, help='Indicates that you want '
'the older segments of manifest objects left alone (in the case of '
'overwrites)')
+ parser.add_option('', '--object-threads', type=int,
+ default=10, help='Number of threads to use for '
+ 'uploading full objects')
+ parser.add_option('', '--segment-threads', type=int,
+ default=10, help='Number of threads to use for '
+ 'uploading object segments')
(options, args) = parse_args(parser, args)
args = args[1:]
if len(args) < 2:
@@ -780,7 +827,7 @@ def st_upload(parser, args, print_queue, error_queue):
segment_queue = Queue(10000)
segment_threads = [QueueFunctionThread(segment_queue,
_segment_job, create_connection()) for _junk in
- xrange(10)]
+ xrange(options.segment_threads)]
for thread in segment_threads:
thread.start()
segment = 0
@@ -829,7 +876,7 @@ def st_upload(parser, args, print_queue, error_queue):
if not segment_queue.empty():
segment_threads = [QueueFunctionThread(segment_queue,
_segment_job, create_connection()) for _junk in
- xrange(10)]
+ xrange(options.segment_threads)]
for thread in segment_threads:
thread.start()
while not segment_queue.empty():
@@ -864,7 +911,7 @@ def st_upload(parser, args, print_queue, error_queue):
create_connection = lambda: get_conn(options)
object_threads = [QueueFunctionThread(object_queue, _object_job,
- create_connection()) for _junk in xrange(10)]
+ create_connection()) for _junk in xrange(options.object_threads)]
for thread in object_threads:
thread.start()
conn = create_connection()
@@ -1030,6 +1077,8 @@ Example:
exit('no such command: %s' % args[0])
exit()
+ signal.signal(signal.SIGINT, attempt_graceful_exit)
+
print_queue = Queue(10000)
def _print(item):
@@ -1040,9 +1089,12 @@ Example:
print_thread = QueueFunctionThread(print_queue, _print)
print_thread.start()
+ error_count = 0
error_queue = Queue(10000)
def _error(item):
+ global error_count
+ error_count += 1
if isinstance(item, unicode):
item = item.encode('utf8')
print >> stderr, item
@@ -1067,6 +1119,8 @@ Example:
error_thread.abort = True
while error_thread.isAlive():
error_thread.join(0.01)
+ if error_count:
+ exit(1)
except (SystemExit, Exception):
for thread in threading_enumerate():
thread.abort = True
diff --git a/swiftclient/client.py b/swiftclient/client.py
index 2b78060..79e6594 100644
--- a/swiftclient/client.py
+++ b/swiftclient/client.py
@@ -282,6 +282,8 @@ def get_auth(url, user, key, snet=False, tenant_name=None, auth_version="1.0"):
if auth_version in ["1.0", "1"]:
return _get_auth_v1_0(url, user, key, snet)
elif auth_version in ["2.0", "2"]:
+ if not tenant_name and ':' in user:
+ (tenant_name, user) = user.split(':')
if not tenant_name:
raise ClientException('No tenant specified')
return _get_auth_v2_0(url, user, tenant_name, key, snet)
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/__init__.py
diff --git a/tests/test_swiftclient.py b/tests/test_swiftclient.py
index bf8adb9..b165dee 100644
--- a/tests/test_swiftclient.py
+++ b/tests/test_swiftclient.py
@@ -201,6 +201,20 @@ class TestGetAuth(MockHttpTest):
'http://www.tests.com', 'asdf', 'asdf',
auth_version='2.0')
+ def test_auth_v2_with_tenant_user_in_user(self):
+ def read(*args, **kwargs):
+ acct_url = 'http://127.0.01/AUTH_FOO'
+ body = {'access': {'serviceCatalog':
+ [{u'endpoints': [{'publicURL': acct_url}],
+ 'type': 'object-store'}],
+ 'token': {'id': 'XXXXXXX'}}}
+ return c.json_dumps(body)
+ c.http_connection = self.fake_http_connection(200, return_read=read)
+ url, token = c.get_auth('http://www.test.com', 'foo:bar', 'asdf',
+ auth_version="2.0")
+ self.assertTrue(url.startswith("http"))
+ self.assertTrue(token)
+
class TestGetAccount(MockHttpTest):