summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChmouel Boudjnah <chmouel@chmouel.com>2012-05-08 11:17:04 +0100
committerChmouel Boudjnah <chmouel@chmouel.com>2012-05-08 13:09:17 +0100
commitb55acc34f0df54a48665eb3fbc7d9eb024dd7cb2 (patch)
treef2827325ace0f7aa2fad7208fcf59d2506d0eb84
downloadpython-swiftclient-b55acc34f0df54a48665eb3fbc7d9eb024dd7cb2.tar.gz
First commit.
-rw-r--r--README.rst20
-rwxr-xr-xbin/swift1066
-rw-r--r--setup.py44
-rw-r--r--swiftclient/__init__.py2
-rw-r--r--swiftclient/client.py950
5 files changed, 2082 insertions, 0 deletions
diff --git a/README.rst b/README.rst
new file mode 100644
index 0000000..0664b86
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,20 @@
+Python bindings to the OpenStack Object Storage API
+===================================================
+
+This is a python client for the Swift API. There's a Python API (the
+``swiftclient`` module), and a command-line script (``swift``).
+
+Development takes place via the usual OpenStack processes as outlined
+in the `OpenStack wiki`_. The master repository is on GitHub__.
+
+__ http://wiki.openstack.org/HowToContribute
+__ http://github.com/openstack/python-swiftclient
+
+This code is based on original the client previously included with
+`OpenStack's swift`__ The python-swiftclient is licensed under the
+Apache License like the rest of OpenStack.
+
+__ http://github.com/openstack/swift
+
+.. contents:: Contents:
+ :local:
diff --git a/bin/swift b/bin/swift
new file mode 100755
index 0000000..fea6e9d
--- /dev/null
+++ b/bin/swift
@@ -0,0 +1,1066 @@
+#!/usr/bin/python -u
+# Copyright (c) 2010-2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import socket
+
+from errno import EEXIST, ENOENT
+from hashlib import md5
+from optparse import OptionParser, SUPPRESS_HELP
+from os import environ, listdir, makedirs, utime
+from os.path import basename, dirname, getmtime, getsize, isdir, join
+from Queue import Empty, Queue
+from sys import argv, exc_info, exit, stderr, stdout
+from threading import enumerate as threading_enumerate, Thread
+from time import sleep
+from traceback import format_exception
+
+from swiftclient import Connection, ClientException, HTTPException
+
+
+def get_conn(options):
+ """
+ Return a connection building it from the options.
+ """
+ return Connection(options.auth,
+ options.user,
+ options.key,
+ snet=options.snet,
+ tenant_name=options.os_tenant_name,
+ auth_version=options.auth_version)
+
+
+def mkdirs(path):
+ try:
+ makedirs(path)
+ except OSError, err:
+ if err.errno != EEXIST:
+ raise
+
+
+def put_errors_from_threads(threads, error_queue):
+ """
+ Places any errors from the threads into error_queue.
+ :param threads: A list of QueueFunctionThread instances.
+ :param error_queue: A queue to put error strings into.
+ :returns: True if any errors were found.
+ """
+ was_error = False
+ for thread in threads:
+ for info in thread.exc_infos:
+ was_error = True
+ if isinstance(info[1], ClientException):
+ error_queue.put(str(info[1]))
+ else:
+ error_queue.put(''.join(format_exception(*info)))
+ return was_error
+
+
+class QueueFunctionThread(Thread):
+
+ def __init__(self, queue, func, *args, **kwargs):
+ """ Calls func for each item in queue; func is called with a queued
+ item as the first arg followed by *args and **kwargs. Use the abort
+ attribute to have the thread empty the queue (without processing)
+ and exit. """
+ Thread.__init__(self)
+ self.abort = False
+ self.queue = queue
+ self.func = func
+ self.args = args
+ self.kwargs = kwargs
+ self.exc_infos = []
+
+ def run(self):
+ try:
+ while True:
+ try:
+ item = self.queue.get_nowait()
+ if not self.abort:
+ self.func(item, *self.args, **self.kwargs)
+ self.queue.task_done()
+ except Empty:
+ if self.abort:
+ break
+ sleep(0.01)
+ except Exception:
+ self.exc_infos.append(exc_info())
+
+
+st_delete_help = '''
+delete --all OR delete container [--leave-segments] [object] [object] ...
+ Deletes everything in the account (with --all), or everything in a
+ container, or a list of objects depending on the args given. Segments of
+ manifest objects will be deleted as well, unless you specify the
+ --leave-segments option.'''.strip('\n')
+
+
+def st_delete(parser, args, print_queue, error_queue):
+ parser.add_option('-a', '--all', action='store_true', dest='yes_all',
+ default=False, help='Indicates that you really want to delete '
+ 'everything in the account')
+ parser.add_option('', '--leave-segments', action='store_true',
+ dest='leave_segments', default=False, help='Indicates that you want '
+ 'the segments of manifest objects left alone')
+ (options, args) = parse_args(parser, args)
+ args = args[1:]
+ if (not args and not options.yes_all) or (args and options.yes_all):
+ error_queue.put('Usage: %s [options] %s' %
+ (basename(argv[0]), st_delete_help))
+ return
+
+ def _delete_segment((container, obj), conn):
+ conn.delete_object(container, obj)
+ if options.verbose:
+ if conn.attempts > 2:
+ print_queue.put('%s/%s [after %d attempts]' %
+ (container, obj, conn.attempts))
+ else:
+ print_queue.put('%s/%s' % (container, obj))
+
+ object_queue = Queue(10000)
+
+ def _delete_object((container, obj), conn):
+ try:
+ old_manifest = None
+ if not options.leave_segments:
+ try:
+ old_manifest = conn.head_object(container, obj).get(
+ 'x-object-manifest')
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ conn.delete_object(container, obj)
+ if old_manifest:
+ segment_queue = Queue(10000)
+ scontainer, sprefix = old_manifest.split('/', 1)
+ for delobj in conn.get_container(scontainer,
+ prefix=sprefix)[1]:
+ segment_queue.put((scontainer, delobj['name']))
+ if not segment_queue.empty():
+ segment_threads = [QueueFunctionThread(segment_queue,
+ _delete_segment, create_connection()) for _junk in
+ xrange(10)]
+ for thread in segment_threads:
+ thread.start()
+ while not segment_queue.empty():
+ sleep(0.01)
+ for thread in segment_threads:
+ thread.abort = True
+ while thread.isAlive():
+ thread.join(0.01)
+ put_errors_from_threads(segment_threads, error_queue)
+ if options.verbose:
+ path = options.yes_all and join(container, obj) or obj
+ if path[:1] in ('/', '\\'):
+ path = path[1:]
+ if conn.attempts > 1:
+ print_queue.put('%s [after %d attempts]' %
+ (path, conn.attempts))
+ else:
+ print_queue.put(path)
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ error_queue.put('Object %s not found' %
+ repr('%s/%s' % (container, obj)))
+
+ container_queue = Queue(10000)
+
+ def _delete_container(container, conn):
+ try:
+ marker = ''
+ while True:
+ objects = [o['name'] for o in
+ conn.get_container(container, marker=marker)[1]]
+ if not objects:
+ break
+ for obj in objects:
+ object_queue.put((container, obj))
+ marker = objects[-1]
+ while not object_queue.empty():
+ sleep(0.01)
+ attempts = 1
+ while True:
+ try:
+ conn.delete_container(container)
+ break
+ except ClientException, err:
+ if err.http_status != 409:
+ raise
+ if attempts > 10:
+ raise
+ attempts += 1
+ sleep(1)
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ error_queue.put('Container %s not found' % repr(container))
+
+ create_connection = lambda: get_conn(options)
+ object_threads = [QueueFunctionThread(object_queue, _delete_object,
+ create_connection()) for _junk in xrange(10)]
+ for thread in object_threads:
+ thread.start()
+ container_threads = [QueueFunctionThread(container_queue,
+ _delete_container, create_connection()) for _junk in xrange(10)]
+ for thread in container_threads:
+ thread.start()
+ if not args:
+ conn = create_connection()
+ try:
+ marker = ''
+ while True:
+ containers = \
+ [c['name'] for c in conn.get_account(marker=marker)[1]]
+ if not containers:
+ break
+ for container in containers:
+ container_queue.put(container)
+ marker = containers[-1]
+ while not container_queue.empty():
+ sleep(0.01)
+ while not object_queue.empty():
+ sleep(0.01)
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ error_queue.put('Account not found')
+ elif len(args) == 1:
+ if '/' in args[0]:
+ print >> stderr, 'WARNING: / in container name; you might have ' \
+ 'meant %r instead of %r.' % \
+ (args[0].replace('/', ' ', 1), args[0])
+ conn = create_connection()
+ _delete_container(args[0], conn)
+ else:
+ for obj in args[1:]:
+ object_queue.put((args[0], obj))
+ while not container_queue.empty():
+ sleep(0.01)
+ for thread in container_threads:
+ thread.abort = True
+ while thread.isAlive():
+ thread.join(0.01)
+ put_errors_from_threads(container_threads, error_queue)
+ while not object_queue.empty():
+ sleep(0.01)
+ for thread in object_threads:
+ thread.abort = True
+ while thread.isAlive():
+ thread.join(0.01)
+ put_errors_from_threads(object_threads, error_queue)
+
+
+st_download_help = '''
+download --all OR download container [options] [object] [object] ...
+ Downloads everything in the account (with --all), or everything in a
+ container, or a list of objects depending on the args given. For a single
+ object download, you may use the -o [--output] <filename> option to
+ redirect the output to a specific file or if "-" then just redirect to
+ stdout.'''.strip('\n')
+
+
+def st_download(parser, args, print_queue, error_queue):
+ parser.add_option('-a', '--all', action='store_true', dest='yes_all',
+ default=False, help='Indicates that you really want to download '
+ 'everything in the account')
+ parser.add_option('-m', '--marker', dest='marker',
+ default='', help='Marker to use when starting a container or '
+ 'account download')
+ parser.add_option('-o', '--output', dest='out_file', help='For a single '
+ 'file download, stream the output to an alternate location ')
+ (options, args) = parse_args(parser, args)
+ args = args[1:]
+ if options.out_file == '-':
+ options.verbose = 0
+ if options.out_file and len(args) != 2:
+ exit('-o option only allowed for single file downloads')
+ if (not args and not options.yes_all) or (args and options.yes_all):
+ error_queue.put('Usage: %s [options] %s' %
+ (basename(argv[0]), st_download_help))
+ return
+
+ object_queue = Queue(10000)
+
+ def _download_object(queue_arg, conn):
+ if len(queue_arg) == 2:
+ container, obj = queue_arg
+ out_file = None
+ elif len(queue_arg) == 3:
+ container, obj, out_file = queue_arg
+ else:
+ raise Exception("Invalid queue_arg length of %s" % len(queue_arg))
+ try:
+ headers, body = \
+ conn.get_object(container, obj, resp_chunk_size=65536)
+ content_type = headers.get('content-type')
+ if 'content-length' in headers:
+ content_length = int(headers.get('content-length'))
+ else:
+ content_length = None
+ etag = headers.get('etag')
+ path = options.yes_all and join(container, obj) or obj
+ if path[:1] in ('/', '\\'):
+ path = path[1:]
+ md5sum = None
+ make_dir = out_file != "-"
+ if content_type.split(';', 1)[0] == 'text/directory':
+ if make_dir and not isdir(path):
+ mkdirs(path)
+ read_length = 0
+ if 'x-object-manifest' not in headers:
+ md5sum = md5()
+ for chunk in body:
+ read_length += len(chunk)
+ if md5sum:
+ md5sum.update(chunk)
+ else:
+ dirpath = dirname(path)
+ if make_dir and dirpath and not isdir(dirpath):
+ mkdirs(dirpath)
+ if out_file == "-":
+ fp = stdout
+ elif out_file:
+ fp = open(out_file, 'wb')
+ else:
+ fp = open(path, 'wb')
+ read_length = 0
+ if 'x-object-manifest' not in headers:
+ md5sum = md5()
+ for chunk in body:
+ fp.write(chunk)
+ read_length += len(chunk)
+ if md5sum:
+ md5sum.update(chunk)
+ fp.close()
+ if md5sum and md5sum.hexdigest() != etag:
+ error_queue.put('%s: md5sum != etag, %s != %s' %
+ (path, md5sum.hexdigest(), etag))
+ if content_length is not None and read_length != content_length:
+ error_queue.put('%s: read_length != content_length, %d != %d' %
+ (path, read_length, content_length))
+ if 'x-object-meta-mtime' in headers and not options.out_file:
+ mtime = float(headers['x-object-meta-mtime'])
+ utime(path, (mtime, mtime))
+ if options.verbose:
+ if conn.attempts > 1:
+ print_queue.put('%s [after %d attempts' %
+ (path, conn.attempts))
+ else:
+ print_queue.put(path)
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ error_queue.put('Object %s not found' %
+ repr('%s/%s' % (container, obj)))
+
+ container_queue = Queue(10000)
+
+ def _download_container(container, conn):
+ try:
+ marker = options.marker
+ while True:
+ objects = [o['name'] for o in
+ conn.get_container(container, marker=marker)[1]]
+ if not objects:
+ break
+ for obj in objects:
+ object_queue.put((container, obj))
+ marker = objects[-1]
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ error_queue.put('Container %s not found' % repr(container))
+
+ create_connection = lambda: get_conn(options)
+ object_threads = [QueueFunctionThread(object_queue, _download_object,
+ create_connection()) for _junk in xrange(10)]
+ for thread in object_threads:
+ thread.start()
+ container_threads = [QueueFunctionThread(container_queue,
+ _download_container, create_connection()) for _junk in xrange(10)]
+ for thread in container_threads:
+ thread.start()
+ if not args:
+ conn = create_connection()
+ try:
+ marker = options.marker
+ while True:
+ containers = [c['name']
+ for c in conn.get_account(marker=marker)[1]]
+ if not containers:
+ break
+ for container in containers:
+ container_queue.put(container)
+ marker = containers[-1]
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ error_queue.put('Account not found')
+ elif len(args) == 1:
+ if '/' in args[0]:
+ print >> stderr, 'WARNING: / in container name; you might have ' \
+ 'meant %r instead of %r.' % \
+ (args[0].replace('/', ' ', 1), args[0])
+ _download_container(args[0], create_connection())
+ else:
+ if len(args) == 2:
+ obj = args[1]
+ object_queue.put((args[0], obj, options.out_file))
+ else:
+ for obj in args[1:]:
+ object_queue.put((args[0], obj))
+ while not container_queue.empty():
+ sleep(0.01)
+ for thread in container_threads:
+ thread.abort = True
+ while thread.isAlive():
+ thread.join(0.01)
+ put_errors_from_threads(container_threads, error_queue)
+ while not object_queue.empty():
+ sleep(0.01)
+ for thread in object_threads:
+ thread.abort = True
+ while thread.isAlive():
+ thread.join(0.01)
+ put_errors_from_threads(object_threads, error_queue)
+
+
+st_list_help = '''
+list [options] [container]
+ Lists the containers for the account or the objects for a container. -p or
+ --prefix is an option that will only list items beginning with that prefix.
+ -d or --delimiter is option (for container listings only) that will roll up
+ items with the given delimiter (see Cloud Files general documentation for
+ what this means).
+'''.strip('\n')
+
+
+def st_list(parser, args, print_queue, error_queue):
+ parser.add_option('-p', '--prefix', dest='prefix', help='Will only list '
+ 'items beginning with the prefix')
+ parser.add_option('-d', '--delimiter', dest='delimiter', help='Will roll '
+ 'up items with the given delimiter (see Cloud Files general '
+ 'documentation for what this means)')
+ (options, args) = parse_args(parser, args)
+ args = args[1:]
+ if options.delimiter and not args:
+ exit('-d option only allowed for container listings')
+ if len(args) > 1:
+ error_queue.put('Usage: %s [options] %s' %
+ (basename(argv[0]), st_list_help))
+ return
+
+ conn = get_conn(options)
+ try:
+ marker = ''
+ while True:
+ if not args:
+ items = \
+ conn.get_account(marker=marker, prefix=options.prefix)[1]
+ else:
+ items = conn.get_container(args[0], marker=marker,
+ prefix=options.prefix, delimiter=options.delimiter)[1]
+ if not items:
+ break
+ for item in items:
+ print_queue.put(item.get('name', item.get('subdir')))
+ marker = items[-1].get('name', items[-1].get('subdir'))
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ if not args:
+ error_queue.put('Account not found')
+ else:
+ error_queue.put('Container %s not found' % repr(args[0]))
+
+
+st_stat_help = '''
+stat [container] [object]
+ Displays information for the account, container, or object depending on the
+ args given (if any).'''.strip('\n')
+
+
+def st_stat(parser, args, print_queue, error_queue):
+ (options, args) = parse_args(parser, args)
+ args = args[1:]
+ conn = get_conn(options)
+ if not args:
+ try:
+ headers = conn.head_account()
+ if options.verbose > 1:
+ print_queue.put('''
+StorageURL: %s
+Auth Token: %s
+'''.strip('\n') % (conn.url, conn.token))
+ container_count = int(headers.get('x-account-container-count', 0))
+ object_count = int(headers.get('x-account-object-count', 0))
+ bytes_used = int(headers.get('x-account-bytes-used', 0))
+ print_queue.put('''
+ Account: %s
+Containers: %d
+ Objects: %d
+ Bytes: %d'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], container_count,
+ object_count, bytes_used))
+ for key, value in headers.items():
+ if key.startswith('x-account-meta-'):
+ print_queue.put('%10s: %s' % ('Meta %s' %
+ key[len('x-account-meta-'):].title(), value))
+ for key, value in headers.items():
+ if not key.startswith('x-account-meta-') and key not in (
+ 'content-length', 'date', 'x-account-container-count',
+ 'x-account-object-count', 'x-account-bytes-used'):
+ print_queue.put(
+ '%10s: %s' % (key.title(), value))
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ error_queue.put('Account not found')
+ elif len(args) == 1:
+ if '/' in args[0]:
+ print >> stderr, 'WARNING: / in container name; you might have ' \
+ 'meant %r instead of %r.' % \
+ (args[0].replace('/', ' ', 1), args[0])
+ try:
+ headers = conn.head_container(args[0])
+ object_count = int(headers.get('x-container-object-count', 0))
+ bytes_used = int(headers.get('x-container-bytes-used', 0))
+ print_queue.put('''
+ Account: %s
+Container: %s
+ Objects: %d
+ Bytes: %d
+ Read ACL: %s
+Write ACL: %s
+ Sync To: %s
+ Sync Key: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
+ object_count, bytes_used,
+ headers.get('x-container-read', ''),
+ headers.get('x-container-write', ''),
+ headers.get('x-container-sync-to', ''),
+ headers.get('x-container-sync-key', '')))
+ for key, value in headers.items():
+ if key.startswith('x-container-meta-'):
+ print_queue.put('%9s: %s' % ('Meta %s' %
+ key[len('x-container-meta-'):].title(), value))
+ for key, value in headers.items():
+ if not key.startswith('x-container-meta-') and key not in (
+ 'content-length', 'date', 'x-container-object-count',
+ 'x-container-bytes-used', 'x-container-read',
+ 'x-container-write', 'x-container-sync-to',
+ 'x-container-sync-key'):
+ print_queue.put(
+ '%9s: %s' % (key.title(), value))
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ error_queue.put('Container %s not found' % repr(args[0]))
+ elif len(args) == 2:
+ try:
+ headers = conn.head_object(args[0], args[1])
+ print_queue.put('''
+ Account: %s
+ Container: %s
+ Object: %s
+ Content Type: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
+ args[1], headers.get('content-type')))
+ if 'content-length' in headers:
+ print_queue.put('Content Length: %s' %
+ headers['content-length'])
+ if 'last-modified' in headers:
+ print_queue.put(' Last Modified: %s' %
+ headers['last-modified'])
+ if 'etag' in headers:
+ print_queue.put(' ETag: %s' % headers['etag'])
+ if 'x-object-manifest' in headers:
+ print_queue.put(' Manifest: %s' %
+ headers['x-object-manifest'])
+ for key, value in headers.items():
+ if key.startswith('x-object-meta-'):
+ print_queue.put('%14s: %s' % ('Meta %s' %
+ key[len('x-object-meta-'):].title(), value))
+ for key, value in headers.items():
+ if not key.startswith('x-object-meta-') and key not in (
+ 'content-type', 'content-length', 'last-modified',
+ 'etag', 'date', 'x-object-manifest'):
+ print_queue.put(
+ '%14s: %s' % (key.title(), value))
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ error_queue.put('Object %s not found' %
+ repr('%s/%s' % (args[0], args[1])))
+ else:
+ error_queue.put('Usage: %s [options] %s' %
+ (basename(argv[0]), st_stat_help))
+
+
+st_post_help = '''
+post [options] [container] [object]
+ Updates meta information for the account, container, or object depending on
+ the args given. If the container is not found, it will be created
+ automatically; but this is not true for accounts and objects. Containers
+ also allow the -r (or --read-acl) and -w (or --write-acl) options. The -m
+ or --meta option is allowed on all and used to define the user meta data
+ items to set in the form Name:Value. This option can be repeated. Example:
+ post -m Color:Blue -m Size:Large'''.strip('\n')
+
+
+def st_post(parser, args, print_queue, error_queue):
+ parser.add_option('-r', '--read-acl', dest='read_acl', help='Sets the '
+ 'Read ACL for containers. Quick summary of ACL syntax: .r:*, '
+ '.r:-.example.com, .r:www.example.com, account1, account2:user2')
+ parser.add_option('-w', '--write-acl', dest='write_acl', help='Sets the '
+ 'Write ACL for containers. Quick summary of ACL syntax: account1, '
+ 'account2:user2')
+ parser.add_option('-t', '--sync-to', dest='sync_to', help='Sets the '
+ 'Sync To for containers, for multi-cluster replication.')
+ parser.add_option('-k', '--sync-key', dest='sync_key', help='Sets the '
+ 'Sync Key for containers, for multi-cluster replication.')
+ parser.add_option('-m', '--meta', action='append', dest='meta', default=[],
+ help='Sets a meta data item with the syntax name:value. This option '
+ 'may be repeated. Example: -m Color:Blue -m Size:Large')
+ (options, args) = parse_args(parser, args)
+ args = args[1:]
+ if (options.read_acl or options.write_acl or options.sync_to or
+ options.sync_key) and not args:
+ exit('-r, -w, -t, and -k options only allowed for containers')
+ conn = get_conn(options)
+ if not args:
+ headers = split_headers(options.meta, 'X-Account-Meta-', error_queue)
+ try:
+ conn.post_account(headers=headers)
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ error_queue.put('Account not found')
+ elif len(args) == 1:
+ if '/' in args[0]:
+ print >> stderr, 'WARNING: / in container name; you might have ' \
+ 'meant %r instead of %r.' % \
+ (args[0].replace('/', ' ', 1), args[0])
+ headers = split_headers(options.meta, 'X-Container-Meta-', error_queue)
+ if options.read_acl is not None:
+ headers['X-Container-Read'] = options.read_acl
+ if options.write_acl is not None:
+ headers['X-Container-Write'] = options.write_acl
+ if options.sync_to is not None:
+ headers['X-Container-Sync-To'] = options.sync_to
+ if options.sync_key is not None:
+ headers['X-Container-Sync-Key'] = options.sync_key
+ try:
+ conn.post_container(args[0], headers=headers)
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ conn.put_container(args[0], headers=headers)
+ elif len(args) == 2:
+ headers = split_headers(options.meta, 'X-Object-Meta-', error_queue)
+ try:
+ conn.post_object(args[0], args[1], headers=headers)
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ error_queue.put('Object %s not found' %
+ repr('%s/%s' % (args[0], args[1])))
+ else:
+ error_queue.put('Usage: %s [options] %s' %
+ (basename(argv[0]), st_post_help))
+
+
+st_upload_help = '''
+upload [options] container file_or_directory [file_or_directory] [...]
+ Uploads to the given container the files and directories specified by the
+ remaining args. -c or --changed is an option that will only upload files
+ that have changed since the last upload. -S <size> or --segment-size <size>
+ and --leave-segments are options as well (see --help for more).
+'''.strip('\n')
+
+
+def st_upload(parser, args, print_queue, error_queue):
+ parser.add_option('-c', '--changed', action='store_true', dest='changed',
+ default=False, help='Will only upload files that have changed since '
+ 'the last upload')
+ parser.add_option('-S', '--segment-size', dest='segment_size', help='Will '
+ 'upload files in segments no larger than <size> and then create a '
+ '"manifest" file that will download all the segments as if it were '
+ 'the original file. The segments will be uploaded to a '
+ '<container>_segments container so as to not pollute the main '
+ '<container> listings.')
+ parser.add_option('', '--leave-segments', action='store_true',
+ dest='leave_segments', default=False, help='Indicates that you want '
+ 'the older segments of manifest objects left alone (in the case of '
+ 'overwrites)')
+ (options, args) = parse_args(parser, args)
+ args = args[1:]
+ if len(args) < 2:
+ error_queue.put('Usage: %s [options] %s' %
+ (basename(argv[0]), st_upload_help))
+ return
+ object_queue = Queue(10000)
+
+ def _segment_job(job, conn):
+ if job.get('delete', False):
+ conn.delete_object(job['container'], job['obj'])
+ else:
+ fp = open(job['path'], 'rb')
+ fp.seek(job['segment_start'])
+ conn.put_object(job.get('container', args[0] + '_segments'),
+ job['obj'], fp, content_length=job['segment_size'])
+ if options.verbose and 'log_line' in job:
+ if conn.attempts > 1:
+ print_queue.put('%s [after %d attempts]' %
+ (job['log_line'], conn.attempts))
+ else:
+ print_queue.put(job['log_line'])
+
+ def _object_job(job, conn):
+ path = job['path']
+ container = job.get('container', args[0])
+ dir_marker = job.get('dir_marker', False)
+ try:
+ obj = path
+ if obj.startswith('./') or obj.startswith('.\\'):
+ obj = obj[2:]
+ if obj.startswith('/'):
+ obj = obj[1:]
+ put_headers = {'x-object-meta-mtime': str(getmtime(path))}
+ if dir_marker:
+ if options.changed:
+ try:
+ headers = conn.head_object(container, obj)
+ ct = headers.get('content-type')
+ cl = int(headers.get('content-length'))
+ et = headers.get('etag')
+ mt = headers.get('x-object-meta-mtime')
+ if ct.split(';', 1)[0] == 'text/directory' and \
+ cl == 0 and \
+ et == 'd41d8cd98f00b204e9800998ecf8427e' and \
+ mt == put_headers['x-object-meta-mtime']:
+ return
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ conn.put_object(container, obj, '', content_length=0,
+ content_type='text/directory',
+ headers=put_headers)
+ else:
+ # We need to HEAD all objects now in case we're overwriting a
+ # manifest object and need to delete the old segments
+ # ourselves.
+ old_manifest = None
+ if options.changed or not options.leave_segments:
+ try:
+ headers = conn.head_object(container, obj)
+ cl = int(headers.get('content-length'))
+ mt = headers.get('x-object-meta-mtime')
+ if options.changed and cl == getsize(path) and \
+ mt == put_headers['x-object-meta-mtime']:
+ return
+ if not options.leave_segments:
+ old_manifest = headers.get('x-object-manifest')
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ if options.segment_size and \
+ getsize(path) < options.segment_size:
+ full_size = getsize(path)
+ segment_queue = Queue(10000)
+ segment_threads = [QueueFunctionThread(segment_queue,
+ _segment_job, create_connection()) for _junk in
+ xrange(10)]
+ for thread in segment_threads:
+ thread.start()
+ segment = 0
+ segment_start = 0
+ while segment_start < full_size:
+ segment_size = int(options.segment_size)
+ if segment_start + segment_size > full_size:
+ segment_size = full_size - segment_start
+ segment_queue.put({'path': path,
+ 'obj': '%s/%s/%s/%08d' % (obj,
+ put_headers['x-object-meta-mtime'], full_size,
+ segment),
+ 'segment_start': segment_start,
+ 'segment_size': segment_size,
+ 'log_line': '%s segment %s' % (obj, segment)})
+ segment += 1
+ segment_start += segment_size
+ while not segment_queue.empty():
+ sleep(0.01)
+ for thread in segment_threads:
+ thread.abort = True
+ while thread.isAlive():
+ thread.join(0.01)
+ if put_errors_from_threads(segment_threads, error_queue):
+ raise ClientException('Aborting manifest creation '
+ 'because not all segments could be uploaded. %s/%s'
+ % (container, obj))
+ new_object_manifest = '%s_segments/%s/%s/%s/' % (
+ container, obj, put_headers['x-object-meta-mtime'],
+ full_size)
+ if old_manifest == new_object_manifest:
+ old_manifest = None
+ put_headers['x-object-manifest'] = new_object_manifest
+ conn.put_object(container, obj, '', content_length=0,
+ headers=put_headers)
+ else:
+ conn.put_object(container, obj, open(path, 'rb'),
+ content_length=getsize(path), headers=put_headers)
+ if old_manifest:
+ segment_queue = Queue(10000)
+ scontainer, sprefix = old_manifest.split('/', 1)
+ for delobj in conn.get_container(scontainer,
+ prefix=sprefix)[1]:
+ segment_queue.put({'delete': True,
+ 'container': scontainer, 'obj': delobj['name']})
+ if not segment_queue.empty():
+ segment_threads = [QueueFunctionThread(segment_queue,
+ _segment_job, create_connection()) for _junk in
+ xrange(10)]
+ for thread in segment_threads:
+ thread.start()
+ while not segment_queue.empty():
+ sleep(0.01)
+ for thread in segment_threads:
+ thread.abort = True
+ while thread.isAlive():
+ thread.join(0.01)
+ put_errors_from_threads(segment_threads, error_queue)
+ if options.verbose:
+ if conn.attempts > 1:
+ print_queue.put(
+ '%s [after %d attempts]' % (obj, conn.attempts))
+ else:
+ print_queue.put(obj)
+ except OSError, err:
+ if err.errno != ENOENT:
+ raise
+ error_queue.put('Local file %s not found' % repr(path))
+
+ def _upload_dir(path):
+ names = listdir(path)
+ if not names:
+ object_queue.put({'path': path, 'dir_marker': True})
+ else:
+ for name in listdir(path):
+ subpath = join(path, name)
+ if isdir(subpath):
+ _upload_dir(subpath)
+ else:
+ object_queue.put({'path': subpath})
+
+ create_connection = lambda: get_conn(options)
+ object_threads = [QueueFunctionThread(object_queue, _object_job,
+ create_connection()) for _junk in xrange(10)]
+ for thread in object_threads:
+ thread.start()
+ conn = create_connection()
+ # Try to create the container, just in case it doesn't exist. If this
+ # fails, it might just be because the user doesn't have container PUT
+ # permissions, so we'll ignore any error. If there's really a problem,
+ # it'll surface on the first object PUT.
+ try:
+ conn.put_container(args[0])
+ if options.segment_size is not None:
+ conn.put_container(args[0] + '_segments')
+ except ClientException, err:
+ msg = ' '.join(str(x) for x in (err.http_status, err.http_reason))
+ if err.http_response_content:
+ if msg:
+ msg += ': '
+ msg += err.http_response_content[:60]
+ error_queue.put(
+ 'Error trying to create container %r: %s' % (args[0], msg))
+ except Exception, err:
+ error_queue.put(
+ 'Error trying to create container %r: %s' % (args[0], err))
+ try:
+ for arg in args[1:]:
+ if isdir(arg):
+ _upload_dir(arg)
+ else:
+ object_queue.put({'path': arg})
+ while not object_queue.empty():
+ sleep(0.01)
+ for thread in object_threads:
+ thread.abort = True
+ while thread.isAlive():
+ thread.join(0.01)
+ put_errors_from_threads(object_threads, error_queue)
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ error_queue.put('Account not found')
+
+
+def split_headers(options, prefix='', error_queue=None):
+ """
+ Splits 'Key: Value' strings and returns them as a dictionary.
+
+ :param options: An array of 'Key: Value' strings
+ :param prefix: String to prepend to all of the keys in the dictionary.
+ :param error_queue: Queue for thread safe error reporting.
+ """
+ headers = {}
+ for item in options:
+ split_item = item.split(':', 1)
+ if len(split_item) == 2:
+ headers[prefix + split_item[0]] = split_item[1]
+ else:
+ error_string = "Metadata parameter %s must contain a ':'.\n%s" \
+ % (item, st_post_help)
+ if error_queue != None:
+ error_queue.put(error_string)
+ else:
+ exit(error_string)
+ return headers
+
+
+def parse_args(parser, args, enforce_requires=True):
+ if not args:
+ args = ['-h']
+ (options, args) = parser.parse_args(args)
+
+ if (not (options.auth and options.user and options.key) or
+ options.os_auth_url):
+ # Use 2.0 auth if none of the old args are present
+ options.auth_version = "2.0"
+
+ if options.auth_version == "2.0" and not options.os_tenant_name and \
+ options.os_username and options.os_username.find(':'):
+ (options.os_tenant_name,
+ options.os_username) = options.os_username.split(':')
+
+ # Use new-style args if old ones not present
+ if not options.auth and options.os_auth_url:
+ options.auth = options.os_auth_url
+ if not options.user and options.os_username:
+ options.user = options.os_username
+ if not options.key and options.os_password:
+ options.key = options.os_password
+
+ # Handle trailing '/' in URL
+ if options.auth and not options.auth.endswith('/'):
+ options.auth += '/'
+
+ if enforce_requires and \
+ not (options.auth and options.user and options.key):
+ exit('''
+Requires ST_AUTH, ST_USER, and ST_KEY environment variables be set or
+overridden with -A, -U, or -K.'''.strip('\n'))
+ return options, args
+
+
+if __name__ == '__main__':
+ parser = OptionParser(version='%prog 1.0', usage='''
+Usage: %%prog <command> [options] [args]
+
+Commands:
+ %(st_stat_help)s
+ %(st_list_help)s
+ %(st_upload_help)s
+ %(st_post_help)s
+ %(st_download_help)s
+ %(st_delete_help)s
+
+Example:
+ %%prog -A https://auth.api.rackspacecloud.com/v1.0 -U user -K key stat
+'''.strip('\n') % globals())
+ parser.add_option('-s', '--snet', action='store_true', dest='snet',
+ default=False, help='Use SERVICENET internal network')
+ parser.add_option('-v', '--verbose', action='count', dest='verbose',
+ default=1, help='Print more info')
+ parser.add_option('-q', '--quiet', action='store_const', dest='verbose',
+ const=0, default=1, help='Suppress status output')
+ parser.add_option('-A', '--auth', dest='auth',
+ default=environ.get('ST_AUTH'),
+ help='URL for obtaining an auth token')
+ parser.add_option('-V', '--auth-version',
+ dest='auth_version',
+ default=environ.get('ST_AUTH_VERSION', '1.0'),
+ type=str,
+ help='Specify a version for authentication'\
+ '(default: 1.0)')
+ parser.add_option('-U', '--user', dest='user',
+ default=environ.get('ST_USER'),
+ help='User name for obtaining an auth token')
+ parser.add_option('-K', '--key', dest='key',
+ default=environ.get('ST_KEY'),
+ help='Key for obtaining an auth token')
+ parser.add_option('--os_auth_url', dest='os_auth_url',
+ default=environ.get('OS_AUTH_URL'),
+ help=SUPPRESS_HELP)
+ parser.add_option('--os_username', dest='os_username',
+ default=environ.get('OS_USERNAME'),
+ help=SUPPRESS_HELP)
+ parser.add_option('--os_tenant_name', dest='os_tenant_name',
+ default=environ.get('OS_TENANT_NAME'),
+ help=SUPPRESS_HELP)
+ parser.add_option('--os_password', dest='os_password',
+ default=environ.get('OS_PASSWORD'),
+ help=SUPPRESS_HELP)
+ parser.disable_interspersed_args()
+ (options, args) = parse_args(parser, argv[1:], enforce_requires=False)
+ parser.enable_interspersed_args()
+
+ commands = ('delete', 'download', 'list', 'post', 'stat', 'upload')
+ if not args or args[0] not in commands:
+ parser.print_usage()
+ if args:
+ exit('no such command: %s' % args[0])
+ exit()
+
+ print_queue = Queue(10000)
+
+ def _print(item):
+ if isinstance(item, unicode):
+ item = item.encode('utf8')
+ print item
+
+ print_thread = QueueFunctionThread(print_queue, _print)
+ print_thread.start()
+
+ error_queue = Queue(10000)
+
+ def _error(item):
+ if isinstance(item, unicode):
+ item = item.encode('utf8')
+ print >> stderr, item
+
+ error_thread = QueueFunctionThread(error_queue, _error)
+ error_thread.start()
+
+ try:
+ parser.usage = globals()['st_%s_help' % args[0]]
+ try:
+ globals()['st_%s' % args[0]](parser, argv[1:], print_queue,
+ error_queue)
+ except (ClientException, HTTPException, socket.error), err:
+ error_queue.put(str(err))
+ while not print_queue.empty():
+ sleep(0.01)
+ print_thread.abort = True
+ while print_thread.isAlive():
+ print_thread.join(0.01)
+ while not error_queue.empty():
+ sleep(0.01)
+ error_thread.abort = True
+ while error_thread.isAlive():
+ error_thread.join(0.01)
+ except (SystemExit, Exception):
+ for thread in threading_enumerate():
+ thread.abort = True
+ raise
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..309a241
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,44 @@
+# -*- encoding: utf-8 -*-
+import os
+import setuptools
+import sys
+
+# TODO: Figuring out how we are going to do the versionning (and if
+# any).
+version = '0.1'
+name = 'python-swiftclient'
+requires = []
+
+
+def read(fname):
+ return open(os.path.join(os.path.dirname(__file__), fname)).read()
+
+if sys.version_info < (2, 6):
+ requires.append('simplejson')
+
+setuptools.setup(
+ name=name,
+ version=version,
+ description='Client Library for OpenStack Object Storage API',
+ long_description=read('README.rst'),
+ url='https://github.com/chmouel/python-swiftclient',
+ license='Apache License (2.0)',
+ author='OpenStack, LLC.',
+ author_email='openstack-admins@lists.launchpad.net',
+ packages=setuptools.find_packages(exclude=['tests', 'tests.*']),
+ install_requires=requires,
+ classifiers=[
+ 'Development Status :: 4 - Beta',
+ 'Environment :: Console',
+ 'Intended Audience :: Developers',
+ 'Intended Audience :: Information Technology',
+ 'License :: OSI Approved :: Apache Software License',
+ 'Operating System :: OS Independent',
+ 'Programming Language :: Python :: 2.6',
+ 'Environment :: No Input/Output (Daemon)',
+ ],
+ test_suite='nose.collector',
+ scripts=[
+ 'bin/swift',
+ ],
+)
diff --git a/swiftclient/__init__.py b/swiftclient/__init__.py
new file mode 100644
index 0000000..9eedf9b
--- /dev/null
+++ b/swiftclient/__init__.py
@@ -0,0 +1,2 @@
+# -*- encoding: utf-8 -*-
+from client import *
diff --git a/swiftclient/client.py b/swiftclient/client.py
new file mode 100644
index 0000000..5b9351e
--- /dev/null
+++ b/swiftclient/client.py
@@ -0,0 +1,950 @@
+# Copyright (c) 2010-2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Cloud Files client library used internally
+"""
+
+import socket
+from urllib import quote as _quote
+from urlparse import urlparse, urlunparse, urljoin
+
+try:
+ from eventlet.green.httplib import HTTPException, HTTPSConnection
+except ImportError:
+ from httplib import HTTPException, HTTPSConnection
+
+try:
+ from eventlet import sleep
+except ImportError:
+ from time import sleep
+
+try:
+ from swift.common.bufferedhttp \
+ import BufferedHTTPConnection as HTTPConnection
+except ImportError:
+ try:
+ from eventlet.green.httplib import HTTPConnection
+ except ImportError:
+ from httplib import HTTPConnection
+
+
+def quote(value, safe='/'):
+ """
+ Patched version of urllib.quote that encodes utf8 strings before quoting
+ """
+ if isinstance(value, unicode):
+ value = value.encode('utf8')
+ return _quote(value, safe)
+
+
+# look for a real json parser first
+try:
+ # simplejson is popular and pretty good
+ from simplejson import loads as json_loads
+ from simplejson import dumps as json_dumps
+except ImportError:
+ # 2.6 will have a json module in the stdlib
+ from json import loads as json_loads
+ from json import dumps as json_dumps
+
+
+class ClientException(Exception):
+
+ def __init__(self, msg, http_scheme='', http_host='', http_port='',
+ http_path='', http_query='', http_status=0, http_reason='',
+ http_device='', http_response_content=''):
+ Exception.__init__(self, msg)
+ self.msg = msg
+ self.http_scheme = http_scheme
+ self.http_host = http_host
+ self.http_port = http_port
+ self.http_path = http_path
+ self.http_query = http_query
+ self.http_status = http_status
+ self.http_reason = http_reason
+ self.http_device = http_device
+ self.http_response_content = http_response_content
+
+ def __str__(self):
+ a = self.msg
+ b = ''
+ if self.http_scheme:
+ b += '%s://' % self.http_scheme
+ if self.http_host:
+ b += self.http_host
+ if self.http_port:
+ b += ':%s' % self.http_port
+ if self.http_path:
+ b += self.http_path
+ if self.http_query:
+ b += '?%s' % self.http_query
+ if self.http_status:
+ if b:
+ b = '%s %s' % (b, self.http_status)
+ else:
+ b = str(self.http_status)
+ if self.http_reason:
+ if b:
+ b = '%s %s' % (b, self.http_reason)
+ else:
+ b = '- %s' % self.http_reason
+ if self.http_device:
+ if b:
+ b = '%s: device %s' % (b, self.http_device)
+ else:
+ b = 'device %s' % self.http_device
+ if self.http_response_content:
+ if len(self.http_response_content) <= 60:
+ b += ' %s' % self.http_response_content
+ else:
+ b += ' [first 60 chars of response] %s' % \
+ self.http_response_content[:60]
+ return b and '%s: %s' % (a, b) or a
+
+
+def http_connection(url, proxy=None):
+ """
+ Make an HTTPConnection or HTTPSConnection
+
+ :param url: url to connect to
+ :param proxy: proxy to connect through, if any; None by default; str of the
+ format 'http://127.0.0.1:8888' to set one
+ :returns: tuple of (parsed url, connection object)
+ :raises ClientException: Unable to handle protocol scheme
+ """
+ parsed = urlparse(url)
+ proxy_parsed = urlparse(proxy) if proxy else None
+ if parsed.scheme == 'http':
+ conn = HTTPConnection((proxy_parsed if proxy else parsed).netloc)
+ elif parsed.scheme == 'https':
+ conn = HTTPSConnection((proxy_parsed if proxy else parsed).netloc)
+ else:
+ raise ClientException('Cannot handle protocol scheme %s for url %s' %
+ (parsed.scheme, repr(url)))
+ if proxy:
+ conn._set_tunnel(parsed.hostname, parsed.port)
+ return parsed, conn
+
+
+def json_request(method, url, **kwargs):
+ """Takes a request in json parse it and return in json"""
+ kwargs.setdefault('headers', {})
+ if 'body' in kwargs:
+ kwargs['headers']['Content-Type'] = 'application/json'
+ kwargs['body'] = json_dumps(kwargs['body'])
+ parsed, conn = http_connection(url)
+ conn.request(method, parsed.path, **kwargs)
+ resp = conn.getresponse()
+ body = resp.read()
+ if body:
+ try:
+ body = json_loads(body)
+ except ValueError:
+ body = None
+ if not body or resp.status < 200 or resp.status >= 300:
+ raise ClientException('Auth GET failed', http_scheme=parsed.scheme,
+ http_host=conn.host,
+ http_port=conn.port,
+ http_path=parsed.path,
+ http_status=resp.status,
+ http_reason=resp.reason)
+ return resp, body
+
+
+def _get_auth_v1_0(url, user, key, snet):
+ parsed, conn = http_connection(url)
+ conn.request('GET', parsed.path, '',
+ {'X-Auth-User': user, 'X-Auth-Key': key})
+ resp = conn.getresponse()
+ resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Auth GET failed', http_scheme=parsed.scheme,
+ http_host=conn.host, http_port=conn.port,
+ http_path=parsed.path, http_status=resp.status,
+ http_reason=resp.reason)
+ url = resp.getheader('x-storage-url')
+ if snet:
+ parsed = list(urlparse(url))
+ # Second item in the list is the netloc
+ netloc = parsed[1]
+ parsed[1] = 'snet-' + netloc
+ url = urlunparse(parsed)
+ return url, resp.getheader('x-storage-token',
+ resp.getheader('x-auth-token'))
+
+
+def _get_auth_v2_0(url, user, tenant_name, key, snet):
+ body = {'auth': {'passwordCredentials':
+ {'password': key, 'username': user},
+ 'tenantName': tenant_name}}
+ token_url = urljoin(url, "tokens")
+ resp, body = json_request("POST", token_url, body=body)
+ token_id = None
+ try:
+ url = None
+ catalogs = body['access']['serviceCatalog']
+ for service in catalogs:
+ if service['type'] == 'object-store':
+ url = service['endpoints'][0]['publicURL']
+ token_id = body['access']['token']['id']
+ if not url:
+ raise ClientException("There is no object-store endpoint " \
+ "on this auth server.")
+ except(KeyError, IndexError):
+ raise ClientException("Error while getting answers from auth server")
+
+ if snet:
+ parsed = list(urlparse(url))
+ # Second item in the list is the netloc
+ parsed[1] = 'snet-' + parsed[1]
+ url = urlunparse(parsed)
+
+ return url, token_id
+
+
+def get_auth(url, user, key, snet=False, tenant_name=None, auth_version="1.0"):
+ """
+ Get authentication/authorization credentials.
+
+ The snet parameter is used for Rackspace's ServiceNet internal network
+ implementation. In this function, it simply adds *snet-* to the beginning
+ of the host name for the returned storage URL. With Rackspace Cloud Files,
+ use of this network path causes no bandwidth charges but requires the
+ client to be running on Rackspace's ServiceNet network.
+
+ :param url: authentication/authorization URL
+ :param user: user to authenticate as
+ :param key: key or password for authorization
+ :param snet: use SERVICENET internal network (see above), default is False
+ :param auth_version: OpenStack auth version, default is 1.0
+ :param tenant_name: The tenant/account name, required when connecting
+ to a auth 2.0 system.
+ :returns: tuple of (storage URL, auth token)
+ :raises: ClientException: HTTP GET request to auth URL failed
+ """
+ if auth_version in ["1.0", "1"]:
+ return _get_auth_v1_0(url, user, key, snet)
+ elif auth_version in ["2.0", "2"]:
+ if not tenant_name:
+ raise ClientException('No tenant specified')
+ return _get_auth_v2_0(url, user, tenant_name, key, snet)
+
+
+def get_account(url, token, marker=None, limit=None, prefix=None,
+ http_conn=None, full_listing=False):
+ """
+ Get a listing of containers for the account.
+
+ :param url: storage URL
+ :param token: auth token
+ :param marker: marker query
+ :param limit: limit query
+ :param prefix: prefix query
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :param full_listing: if True, return a full listing, else returns a max
+ of 10000 listings
+ :returns: a tuple of (response headers, a list of containers) The response
+ headers will be a dict and all header names will be lowercase.
+ :raises ClientException: HTTP GET request failed
+ """
+ if not http_conn:
+ http_conn = http_connection(url)
+ if full_listing:
+ rv = get_account(url, token, marker, limit, prefix, http_conn)
+ listing = rv[1]
+ while listing:
+ marker = listing[-1]['name']
+ listing = \
+ get_account(url, token, marker, limit, prefix, http_conn)[1]
+ if listing:
+ rv[1].extend(listing)
+ return rv
+ parsed, conn = http_conn
+ qs = 'format=json'
+ if marker:
+ qs += '&marker=%s' % quote(marker)
+ if limit:
+ qs += '&limit=%d' % limit
+ if prefix:
+ qs += '&prefix=%s' % quote(prefix)
+ conn.request('GET', '%s?%s' % (parsed.path, qs), '',
+ {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ resp_headers = {}
+ for header, value in resp.getheaders():
+ resp_headers[header.lower()] = value
+ if resp.status < 200 or resp.status >= 300:
+ body = resp.read()
+ raise ClientException('Account GET failed', http_scheme=parsed.scheme,
+ http_host=conn.host, http_port=conn.port,
+ http_path=parsed.path, http_query=qs, http_status=resp.status,
+ http_reason=resp.reason, http_response_content=body)
+ if resp.status == 204:
+ resp.read()
+ return resp_headers, []
+ return resp_headers, json_loads(resp.read())
+
+
+def head_account(url, token, http_conn=None):
+ """
+ Get account stats.
+
+ :param url: storage URL
+ :param token: auth token
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :returns: a dict containing the response's headers (all header names will
+ be lowercase)
+ :raises ClientException: HTTP HEAD request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ body = resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Account HEAD failed', http_scheme=parsed.scheme,
+ http_host=conn.host, http_port=conn.port,
+ http_path=parsed.path, http_status=resp.status,
+ http_reason=resp.reason, http_response_content=body)
+ resp_headers = {}
+ for header, value in resp.getheaders():
+ resp_headers[header.lower()] = value
+ return resp_headers
+
+
+def post_account(url, token, headers, http_conn=None):
+ """
+ Update an account's metadata.
+
+ :param url: storage URL
+ :param token: auth token
+ :param headers: additional headers to include in the request
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :raises ClientException: HTTP POST request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ headers['X-Auth-Token'] = token
+ conn.request('POST', parsed.path, '', headers)
+ resp = conn.getresponse()
+ body = resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Account POST failed',
+ http_scheme=parsed.scheme,
+ http_host=conn.host,
+ http_port=conn.port,
+ http_path=parsed.path,
+ http_status=resp.status,
+ http_reason=resp.reason,
+ http_response_content=body)
+
+
+def get_container(url, token, container, marker=None, limit=None,
+ prefix=None, delimiter=None, http_conn=None,
+ full_listing=False):
+ """
+ Get a listing of objects for the container.
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name to get a listing for
+ :param marker: marker query
+ :param limit: limit query
+ :param prefix: prefix query
+ :param delimeter: string to delimit the queries on
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :param full_listing: if True, return a full listing, else returns a max
+ of 10000 listings
+ :returns: a tuple of (response headers, a list of objects) The response
+ headers will be a dict and all header names will be lowercase.
+ :raises ClientException: HTTP GET request failed
+ """
+ if not http_conn:
+ http_conn = http_connection(url)
+ if full_listing:
+ rv = get_container(url, token, container, marker, limit, prefix,
+ delimiter, http_conn)
+ listing = rv[1]
+ while listing:
+ if not delimiter:
+ marker = listing[-1]['name']
+ else:
+ marker = listing[-1].get('name', listing[-1].get('subdir'))
+ listing = get_container(url, token, container, marker, limit,
+ prefix, delimiter, http_conn)[1]
+ if listing:
+ rv[1].extend(listing)
+ return rv
+ parsed, conn = http_conn
+ path = '%s/%s' % (parsed.path, quote(container))
+ qs = 'format=json'
+ if marker:
+ qs += '&marker=%s' % quote(marker)
+ if limit:
+ qs += '&limit=%d' % limit
+ if prefix:
+ qs += '&prefix=%s' % quote(prefix)
+ if delimiter:
+ qs += '&delimiter=%s' % quote(delimiter)
+ conn.request('GET', '%s?%s' % (path, qs), '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ body = resp.read()
+ raise ClientException('Container GET failed',
+ http_scheme=parsed.scheme, http_host=conn.host,
+ http_port=conn.port, http_path=path, http_query=qs,
+ http_status=resp.status, http_reason=resp.reason,
+ http_response_content=body)
+ resp_headers = {}
+ for header, value in resp.getheaders():
+ resp_headers[header.lower()] = value
+ if resp.status == 204:
+ resp.read()
+ return resp_headers, []
+ return resp_headers, json_loads(resp.read())
+
+
+def head_container(url, token, container, http_conn=None, headers=None):
+ """
+ Get container stats.
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name to get stats for
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :returns: a dict containing the response's headers (all header names will
+ be lowercase)
+ :raises ClientException: HTTP HEAD request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s' % (parsed.path, quote(container))
+ req_headers = {'X-Auth-Token': token}
+ if headers:
+ req_headers.update(headers)
+ conn.request('HEAD', path, '', req_headers)
+ resp = conn.getresponse()
+ body = resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Container HEAD failed',
+ http_scheme=parsed.scheme, http_host=conn.host,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason, http_response_content=body)
+ resp_headers = {}
+ for header, value in resp.getheaders():
+ resp_headers[header.lower()] = value
+ return resp_headers
+
+
+def put_container(url, token, container, headers=None, http_conn=None):
+ """
+ Create a container
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name to create
+ :param headers: additional headers to include in the request
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :raises ClientException: HTTP PUT request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s' % (parsed.path, quote(container))
+ if not headers:
+ headers = {}
+ headers['X-Auth-Token'] = token
+ conn.request('PUT', path, '', headers)
+ resp = conn.getresponse()
+ body = resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Container PUT failed',
+ http_scheme=parsed.scheme, http_host=conn.host,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason, http_response_content=body)
+
+
+def post_container(url, token, container, headers, http_conn=None):
+ """
+ Update a container's metadata.
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name to update
+ :param headers: additional headers to include in the request
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :raises ClientException: HTTP POST request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s' % (parsed.path, quote(container))
+ headers['X-Auth-Token'] = token
+ conn.request('POST', path, '', headers)
+ resp = conn.getresponse()
+ body = resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Container POST failed',
+ http_scheme=parsed.scheme, http_host=conn.host,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason, http_response_content=body)
+
+
+def delete_container(url, token, container, http_conn=None):
+ """
+ Delete a container
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name to delete
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :raises ClientException: HTTP DELETE request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s' % (parsed.path, quote(container))
+ conn.request('DELETE', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ body = resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Container DELETE failed',
+ http_scheme=parsed.scheme, http_host=conn.host,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason, http_response_content=body)
+
+
+def get_object(url, token, container, name, http_conn=None,
+ resp_chunk_size=None):
+ """
+ Get an object
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name that the object is in
+ :param name: object name to get
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :param resp_chunk_size: if defined, chunk size of data to read. NOTE: If
+ you specify a resp_chunk_size you must fully read
+ the object's contents before making another
+ request.
+ :returns: a tuple of (response headers, the object's contents) The response
+ headers will be a dict and all header names will be lowercase.
+ :raises ClientException: HTTP GET request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
+ conn.request('GET', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ body = resp.read()
+ raise ClientException('Object GET failed', http_scheme=parsed.scheme,
+ http_host=conn.host, http_port=conn.port, http_path=path,
+ http_status=resp.status, http_reason=resp.reason,
+ http_response_content=body)
+ if resp_chunk_size:
+
+ def _object_body():
+ buf = resp.read(resp_chunk_size)
+ while buf:
+ yield buf
+ buf = resp.read(resp_chunk_size)
+ object_body = _object_body()
+ else:
+ object_body = resp.read()
+ resp_headers = {}
+ for header, value in resp.getheaders():
+ resp_headers[header.lower()] = value
+ return resp_headers, object_body
+
+
+def head_object(url, token, container, name, http_conn=None):
+ """
+ Get object info
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name that the object is in
+ :param name: object name to get info for
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :returns: a dict containing the response's headers (all header names will
+ be lowercase)
+ :raises ClientException: HTTP HEAD request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
+ conn.request('HEAD', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ body = resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Object HEAD failed', http_scheme=parsed.scheme,
+ http_host=conn.host, http_port=conn.port, http_path=path,
+ http_status=resp.status, http_reason=resp.reason,
+ http_response_content=body)
+ resp_headers = {}
+ for header, value in resp.getheaders():
+ resp_headers[header.lower()] = value
+ return resp_headers
+
+
+def put_object(url, token=None, container=None, name=None, contents=None,
+ content_length=None, etag=None, chunk_size=65536,
+ content_type=None, headers=None, http_conn=None, proxy=None):
+ """
+ Put an object
+
+ :param url: storage URL
+ :param token: auth token; if None, no token will be sent
+ :param container: container name that the object is in; if None, the
+ container name is expected to be part of the url
+ :param name: object name to put; if None, the object name is expected to be
+ part of the url
+ :param contents: a string or a file like object to read object data from;
+ if None, a zero-byte put will be done
+ :param content_length: value to send as content-length header; also limits
+ the amount read from contents; if None, it will be
+ computed via the contents or chunked transfer
+ encoding will be used
+ :param etag: etag of contents; if None, no etag will be sent
+ :param chunk_size: chunk size of data to write; default 65536
+ :param content_type: value to send as content-type header; if None, no
+ content-type will be set (remote end will likely try
+ to auto-detect it)
+ :param headers: additional headers to include in the request, if any
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :param proxy: proxy to connect through, if any; None by default; str of the
+ format 'http://127.0.0.1:8888' to set one
+ :returns: etag from server response
+ :raises ClientException: HTTP PUT request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url, proxy=proxy)
+ path = parsed.path
+ if container:
+ path = '%s/%s' % (path.rstrip('/'), quote(container))
+ if name:
+ path = '%s/%s' % (path.rstrip('/'), quote(name))
+ if headers:
+ headers = dict(headers)
+ else:
+ headers = {}
+ if token:
+ headers['X-Auth-Token'] = token
+ if etag:
+ headers['ETag'] = etag.strip('"')
+ if content_length is not None:
+ headers['Content-Length'] = str(content_length)
+ else:
+ for n, v in headers.iteritems():
+ if n.lower() == 'content-length':
+ content_length = int(v)
+ if content_type is not None:
+ headers['Content-Type'] = content_type
+ if not contents:
+ headers['Content-Length'] = '0'
+ if hasattr(contents, 'read'):
+ conn.putrequest('PUT', path)
+ for header, value in headers.iteritems():
+ conn.putheader(header, value)
+ if content_length is None:
+ conn.putheader('Transfer-Encoding', 'chunked')
+ conn.endheaders()
+ chunk = contents.read(chunk_size)
+ while chunk:
+ conn.send('%x\r\n%s\r\n' % (len(chunk), chunk))
+ chunk = contents.read(chunk_size)
+ conn.send('0\r\n\r\n')
+ else:
+ conn.endheaders()
+ left = content_length
+ while left > 0:
+ size = chunk_size
+ if size > left:
+ size = left
+ chunk = contents.read(size)
+ conn.send(chunk)
+ left -= len(chunk)
+ else:
+ conn.request('PUT', path, contents, headers)
+ resp = conn.getresponse()
+ body = resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Object PUT failed', http_scheme=parsed.scheme,
+ http_host=conn.host, http_port=conn.port, http_path=path,
+ http_status=resp.status, http_reason=resp.reason,
+ http_response_content=body)
+ return resp.getheader('etag', '').strip('"')
+
+
+def post_object(url, token, container, name, headers, http_conn=None):
+ """
+ Update object metadata
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name that the object is in
+ :param name: name of the object to update
+ :param headers: additional headers to include in the request
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :raises ClientException: HTTP POST request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
+ headers['X-Auth-Token'] = token
+ conn.request('POST', path, '', headers)
+ resp = conn.getresponse()
+ body = resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Object POST failed', http_scheme=parsed.scheme,
+ http_host=conn.host, http_port=conn.port, http_path=path,
+ http_status=resp.status, http_reason=resp.reason,
+ http_response_content=body)
+
+
+def delete_object(url, token=None, container=None, name=None, http_conn=None,
+ headers=None, proxy=None):
+ """
+ Delete object
+
+ :param url: storage URL
+ :param token: auth token; if None, no token will be sent
+ :param container: container name that the object is in; if None, the
+ container name is expected to be part of the url
+ :param name: object name to delete; if None, the object name is expected to
+ be part of the url
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :param headers: additional headers to include in the request
+ :param proxy: proxy to connect through, if any; None by default; str of the
+ format 'http://127.0.0.1:8888' to set one
+ :raises ClientException: HTTP DELETE request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url, proxy=proxy)
+ path = parsed.path
+ if container:
+ path = '%s/%s' % (path.rstrip('/'), quote(container))
+ if name:
+ path = '%s/%s' % (path.rstrip('/'), quote(name))
+ if headers:
+ headers = dict(headers)
+ else:
+ headers = {}
+ if token:
+ headers['X-Auth-Token'] = token
+ conn.request('DELETE', path, '', headers)
+ resp = conn.getresponse()
+ body = resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Object DELETE failed',
+ http_scheme=parsed.scheme, http_host=conn.host,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason, http_response_content=body)
+
+
+class Connection(object):
+ """Convenience class to make requests that will also retry the request"""
+
+ def __init__(self, authurl, user, key, retries=5, preauthurl=None,
+ preauthtoken=None, snet=False, starting_backoff=1,
+ tenant_name=None,
+ auth_version="1"):
+ """
+ :param authurl: authentication URL
+ :param user: user name to authenticate as
+ :param key: key/password to authenticate with
+ :param retries: Number of times to retry the request before failing
+ :param preauthurl: storage URL (if you have already authenticated)
+ :param preauthtoken: authentication token (if you have already
+ authenticated)
+ :param snet: use SERVICENET internal network default is False
+ :param auth_version: OpenStack auth version, default is 1.0
+ :param tenant_name: The tenant/account name, required when connecting
+ to a auth 2.0 system.
+ """
+ self.authurl = authurl
+ self.user = user
+ self.key = key
+ self.retries = retries
+ self.http_conn = None
+ self.url = preauthurl
+ self.token = preauthtoken
+ self.attempts = 0
+ self.snet = snet
+ self.starting_backoff = starting_backoff
+ self.auth_version = auth_version
+ self.tenant_name = tenant_name
+
+ def get_auth(self):
+ return get_auth(self.authurl, self.user,
+ self.key, snet=self.snet,
+ tenant_name=self.tenant_name,
+ auth_version=self.auth_version)
+
+ def http_connection(self):
+ return http_connection(self.url)
+
+ def _retry(self, reset_func, func, *args, **kwargs):
+ self.attempts = 0
+ backoff = self.starting_backoff
+ while self.attempts <= self.retries:
+ self.attempts += 1
+ try:
+ if not self.url or not self.token:
+ self.url, self.token = self.get_auth()
+ self.http_conn = None
+ if not self.http_conn:
+ self.http_conn = self.http_connection()
+ kwargs['http_conn'] = self.http_conn
+ rv = func(self.url, self.token, *args, **kwargs)
+ return rv
+ except (socket.error, HTTPException):
+ if self.attempts > self.retries:
+ raise
+ self.http_conn = None
+ except ClientException, err:
+ if self.attempts > self.retries:
+ raise
+ if err.http_status == 401:
+ self.url = self.token = None
+ if self.attempts > 1:
+ raise
+ elif err.http_status == 408:
+ self.http_conn = None
+ elif 500 <= err.http_status <= 599:
+ pass
+ else:
+ raise
+ sleep(backoff)
+ backoff *= 2
+ if reset_func:
+ reset_func(func, *args, **kwargs)
+
+ def head_account(self):
+ """Wrapper for :func:`head_account`"""
+ return self._retry(None, head_account)
+
+ def get_account(self, marker=None, limit=None, prefix=None,
+ full_listing=False):
+ """Wrapper for :func:`get_account`"""
+ # TODO(unknown): With full_listing=True this will restart the entire
+ # listing with each retry. Need to make a better version that just
+ # retries where it left off.
+ return self._retry(None, get_account, marker=marker, limit=limit,
+ prefix=prefix, full_listing=full_listing)
+
+ def post_account(self, headers):
+ """Wrapper for :func:`post_account`"""
+ return self._retry(None, post_account, headers)
+
+ def head_container(self, container):
+ """Wrapper for :func:`head_container`"""
+ return self._retry(None, head_container, container)
+
+ def get_container(self, container, marker=None, limit=None, prefix=None,
+ delimiter=None, full_listing=False):
+ """Wrapper for :func:`get_container`"""
+ # TODO(unknown): With full_listing=True this will restart the entire
+ # listing with each retry. Need to make a better version that just
+ # retries where it left off.
+ return self._retry(None, get_container, container, marker=marker,
+ limit=limit, prefix=prefix, delimiter=delimiter,
+ full_listing=full_listing)
+
+ def put_container(self, container, headers=None):
+ """Wrapper for :func:`put_container`"""
+ return self._retry(None, put_container, container, headers=headers)
+
+ def post_container(self, container, headers):
+ """Wrapper for :func:`post_container`"""
+ return self._retry(None, post_container, container, headers)
+
+ def delete_container(self, container):
+ """Wrapper for :func:`delete_container`"""
+ return self._retry(None, delete_container, container)
+
+ def head_object(self, container, obj):
+ """Wrapper for :func:`head_object`"""
+ return self._retry(None, head_object, container, obj)
+
+ def get_object(self, container, obj, resp_chunk_size=None):
+ """Wrapper for :func:`get_object`"""
+ return self._retry(None, get_object, container, obj,
+ resp_chunk_size=resp_chunk_size)
+
+ def put_object(self, container, obj, contents, content_length=None,
+ etag=None, chunk_size=65536, content_type=None,
+ headers=None):
+ """Wrapper for :func:`put_object`"""
+
+ def _default_reset(*args, **kwargs):
+ raise ClientException('put_object(%r, %r, ...) failure and no '
+ 'ability to reset contents for reupload.' % (container, obj))
+
+ reset_func = _default_reset
+ tell = getattr(contents, 'tell', None)
+ seek = getattr(contents, 'seek', None)
+ if tell and seek:
+ orig_pos = tell()
+ reset_func = lambda *a, **k: seek(orig_pos)
+ elif not contents:
+ reset_func = lambda *a, **k: None
+
+ return self._retry(reset_func, put_object, container, obj, contents,
+ content_length=content_length, etag=etag, chunk_size=chunk_size,
+ content_type=content_type, headers=headers)
+
+ def post_object(self, container, obj, headers):
+ """Wrapper for :func:`post_object`"""
+ return self._retry(None, post_object, container, obj, headers)
+
+ def delete_object(self, container, obj):
+ """Wrapper for :func:`delete_object`"""
+ return self._retry(None, delete_object, container, obj)