summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-04-04 23:08:25 +0000
committerGerrit Code Review <review@openstack.org>2013-04-04 23:08:25 +0000
commit2d97609a52bef2f437c15ff72dced29663c570ff (patch)
tree76d77ecbd0d7da9bfa9fdedeba73f08f63f66e38
parentfab61c8275893c7ab7279336b9d64d1154264059 (diff)
parent2b3d1719073fa58b651ca82f64a366e3f737d71a (diff)
downloadpython-swiftclient-1.4.0.tar.gz
Merge "Static large object support."1.4.0
-rwxr-xr-xbin/swift139
-rw-r--r--swiftclient/client.py37
-rw-r--r--tests/test_swiftclient.py23
3 files changed, 154 insertions, 45 deletions
diff --git a/bin/swift b/bin/swift
index 6746c25..c94be8c 100755
--- a/bin/swift
+++ b/bin/swift
@@ -30,6 +30,11 @@ from time import sleep, time
from traceback import format_exception
from urllib import quote, unquote
+try:
+ import simplejson as json
+except ImportError:
+ import json
+
from swiftclient import Connection, ClientException, HTTPException, utils
from swiftclient.version import version_info
@@ -111,6 +116,8 @@ class QueueFunctionThread(Thread):
self.args = args
self.kwargs = kwargs
self.exc_infos = []
+ self.results = []
+ self.store_results = kwargs.pop('store_results', False)
def run(self):
while True:
@@ -123,7 +130,9 @@ class QueueFunctionThread(Thread):
else:
try:
if not self.abort:
- self.func(item, *self.args, **self.kwargs)
+ res = self.func(item, *self.args, **self.kwargs)
+ if self.store_results:
+ self.results.append(res)
except Exception:
self.exc_infos.append(exc_info())
finally:
@@ -171,19 +180,23 @@ def st_delete(parser, args, print_queue, error_queue):
def _delete_object((container, obj), conn):
try:
old_manifest = None
+ query_string = None
if not options.leave_segments:
try:
- old_manifest = conn.head_object(container, obj).get(
- 'x-object-manifest')
+ headers = conn.head_object(container, obj)
+ old_manifest = headers.get('x-object-manifest')
+ if utils.config_true_value(
+ headers.get('x-static-large-object')):
+ query_string = 'multipart-manifest=delete'
except ClientException, err:
if err.http_status != 404:
raise
- conn.delete_object(container, obj)
+ conn.delete_object(container, obj, query_string=query_string)
if old_manifest:
segment_queue = Queue(10000)
scontainer, sprefix = old_manifest.split('/', 1)
scontainer = unquote(scontainer)
- sprefix = unquote(sprefix)
+ sprefix = unquote(sprefix).rstrip('/') + '/'
for delobj in conn.get_container(scontainer,
prefix=sprefix)[1]:
segment_queue.put((scontainer, delobj['name']))
@@ -793,7 +806,10 @@ def st_upload(parser, args, print_queue, error_queue):
default=[], help='Set request headers with the syntax header:value. '
' This option may be repeated. Example -H content-type:text/plain '
'-H "Content-Length: 4000"')
-
+ parser.add_option('', '--use-slo', action='store_true', default=False,
+ help='When used in conjuction with --segment-size will '
+ 'create a Static Large Object instead of the default '
+ 'Dynamic Large Object.')
(options, args) = parse_args(parser, args)
args = args[1:]
if len(args) < 2:
@@ -811,14 +827,17 @@ def st_upload(parser, args, print_queue, error_queue):
seg_container = args[0] +'_segments'
if options.segment_container:
seg_container = options.segment_container
- conn.put_object(job.get('container', seg_container),
+ etag = conn.put_object(job.get('container', seg_container),
job['obj'], fp, content_length=job['segment_size'])
+ job['segment_location'] = '/%s/%s' % (seg_container, job['obj'])
+ job['segment_etag'] = etag
if options.verbose and 'log_line' in job:
if conn.attempts > 1:
print_queue.put('%s [after %d attempts]' %
(job['log_line'], conn.attempts))
else:
print_queue.put(job['log_line'])
+ return job
def _object_job(job, conn):
path = job['path']
@@ -855,6 +874,8 @@ def st_upload(parser, args, print_queue, error_queue):
# manifest object and need to delete the old segments
# ourselves.
old_manifest = None
+ old_slo_manifest_paths = []
+ new_slo_manifest_paths = set()
if options.changed or not options.leave_segments:
try:
headers = conn.head_object(container, obj)
@@ -865,6 +886,16 @@ def st_upload(parser, args, print_queue, error_queue):
return
if not options.leave_segments:
old_manifest = headers.get('x-object-manifest')
+ if utils.config_true_value(
+ headers.get('x-static-large-object')):
+ headers, manifest_data = conn.get_object(
+ container, obj,
+ query_string='multipart-manifest=get')
+ for old_seg in json.loads(manifest_data):
+ seg_path = old_seg['name'].lstrip('/')
+ if isinstance(seg_path, unicode):
+ seg_path = seg_path.encode('utf-8')
+ old_slo_manifest_paths.append(seg_path)
except ClientException, err:
if err.http_status != 404:
raise
@@ -879,9 +910,10 @@ def st_upload(parser, args, print_queue, error_queue):
seg_container = options.segment_container
full_size = getsize(path)
segment_queue = Queue(10000)
- segment_threads = [QueueFunctionThread(segment_queue,
- _segment_job, create_connection()) for _junk in
- xrange(options.segment_threads)]
+ segment_threads = [
+ QueueFunctionThread(segment_queue,
+ _segment_job, create_connection(), store_results=True)
+ for _junk in xrange(options.segment_threads)]
for thread in segment_threads:
thread.start()
segment = 0
@@ -890,13 +922,20 @@ def st_upload(parser, args, print_queue, error_queue):
segment_size = int(options.segment_size)
if segment_start + segment_size > full_size:
segment_size = full_size - segment_start
- segment_queue.put({'path': path,
- 'obj': '%s/%s/%s/%s/%08d' % (obj,
- put_headers['x-object-meta-mtime'], full_size,
- options.segment_size, segment),
- 'segment_start': segment_start,
- 'segment_size': segment_size,
- 'log_line': '%s segment %s' % (obj, segment)})
+ if options.use_slo:
+ segment_name = '%s/slo/%s/%s/%s/%08d' % (
+ obj, put_headers['x-object-meta-mtime'],
+ full_size, options.segment_size, segment)
+ else:
+ segment_name = '%s/%s/%s/%s/%08d' % (
+ obj, put_headers['x-object-meta-mtime'],
+ full_size, options.segment_size, segment)
+ segment_queue.put(
+ {'path': path, 'obj': segment_name,
+ 'segment_start': segment_start,
+ 'segment_size': segment_size,
+ 'segment_index': segment,
+ 'log_line': '%s segment %s' % (obj, segment)})
segment += 1
segment_start += segment_size
while not segment_queue.empty():
@@ -909,27 +948,59 @@ def st_upload(parser, args, print_queue, error_queue):
raise ClientException('Aborting manifest creation '
'because not all segments could be uploaded. %s/%s'
% (container, obj))
- new_object_manifest = '%s/%s/%s/%s/%s' % (
- quote(seg_container), quote(obj),
- put_headers['x-object-meta-mtime'], full_size,
- options.segment_size)
- if old_manifest == new_object_manifest:
- old_manifest = None
- put_headers['x-object-manifest'] = new_object_manifest
- conn.put_object(container, obj, '', content_length=0,
- headers=put_headers)
+ if options.use_slo:
+ slo_segments = []
+ for thread in segment_threads:
+ slo_segments += thread.results
+ slo_segments.sort(key=lambda d: d['segment_index'])
+ for seg in slo_segments:
+ seg_loc = seg['segment_location'].lstrip('/')
+ if isinstance(seg_loc, unicode):
+ seg_loc = seg_loc.encode('utf-8')
+ new_slo_manifest_paths.add(seg_loc)
+
+ manifest_data = json.dumps([
+ {'path': d['segment_location'],
+ 'etag': d['segment_etag'],
+ 'size_bytes': d['segment_size']}
+ for d in slo_segments])
+
+ put_headers['x-static-large-object'] = 'true'
+ conn.put_object(container, obj, manifest_data,
+ headers=put_headers,
+ query_string='multipart-manifest=put')
+ else:
+ new_object_manifest = '%s/%s/%s/%s/%s/' % (
+ quote(seg_container), quote(obj),
+ put_headers['x-object-meta-mtime'], full_size,
+ options.segment_size)
+ if old_manifest and old_manifest.rstrip('/') == \
+ new_object_manifest.rstrip('/'):
+ old_manifest = None
+ put_headers['x-object-manifest'] = new_object_manifest
+ conn.put_object(container, obj, '', content_length=0,
+ headers=put_headers)
else:
conn.put_object(container, obj, open(path, 'rb'),
content_length=getsize(path), headers=put_headers)
- if old_manifest:
+ if old_manifest or old_slo_manifest_paths:
segment_queue = Queue(10000)
- scontainer, sprefix = old_manifest.split('/', 1)
- scontainer = unquote(scontainer)
- sprefix = unquote(sprefix)
- for delobj in conn.get_container(scontainer,
- prefix=sprefix)[1]:
- segment_queue.put({'delete': True,
- 'container': scontainer, 'obj': delobj['name']})
+ if old_manifest:
+ scontainer, sprefix = old_manifest.split('/', 1)
+ scontainer = unquote(scontainer)
+ sprefix = unquote(sprefix).rstrip('/') + '/'
+ for delobj in conn.get_container(scontainer,
+ prefix=sprefix)[1]:
+ segment_queue.put({'delete': True,
+ 'container': scontainer, 'obj': delobj['name']})
+ if old_slo_manifest_paths:
+ for seg_to_delete in old_slo_manifest_paths:
+ if seg_to_delete in new_slo_manifest_paths:
+ continue
+ scont, sobj = \
+ seg_to_delete.split('/', 1)
+ segment_queue.put({'delete': True,
+ 'container': scont, 'obj': sobj})
if not segment_queue.empty():
segment_threads = [QueueFunctionThread(segment_queue,
_segment_job, create_connection()) for _junk in
diff --git a/swiftclient/client.py b/swiftclient/client.py
index 3005a57..a607898 100644
--- a/swiftclient/client.py
+++ b/swiftclient/client.py
@@ -672,7 +672,7 @@ def delete_container(url, token, container, http_conn=None):
def get_object(url, token, container, name, http_conn=None,
- resp_chunk_size=None):
+ resp_chunk_size=None, query_string=None):
"""
Get an object
@@ -686,6 +686,7 @@ def get_object(url, token, container, name, http_conn=None,
you specify a resp_chunk_size you must fully read
the object's contents before making another
request.
+ :param query_string: if set will be appended with '?' to generated path
:returns: a tuple of (response headers, the object's contents) The response
headers will be a dict and all header names will be lowercase.
:raises ClientException: HTTP GET request failed
@@ -695,6 +696,8 @@ def get_object(url, token, container, name, http_conn=None,
else:
parsed, conn = http_connection(url)
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
+ if query_string:
+ path += '?' + query_string
method = 'GET'
headers = {'X-Auth-Token': token}
conn.request(method, path, '', headers)
@@ -766,7 +769,8 @@ def head_object(url, token, container, name, http_conn=None):
def put_object(url, token=None, container=None, name=None, contents=None,
content_length=None, etag=None, chunk_size=None,
- content_type=None, headers=None, http_conn=None, proxy=None):
+ content_type=None, headers=None, http_conn=None, proxy=None,
+ query_string=None):
"""
Put an object
@@ -794,6 +798,7 @@ def put_object(url, token=None, container=None, name=None, contents=None,
conn object)
:param proxy: proxy to connect through, if any; None by default; str of the
format 'http://127.0.0.1:8888' to set one
+ :param query_string: if set will be appended with '?' to generated path
:returns: etag from server response
:raises ClientException: HTTP PUT request failed
"""
@@ -806,6 +811,8 @@ def put_object(url, token=None, container=None, name=None, contents=None,
path = '%s/%s' % (path.rstrip('/'), quote(container))
if name:
path = '%s/%s' % (path.rstrip('/'), quote(name))
+ if query_string:
+ path += '?' + query_string
if headers:
headers = dict(headers)
else:
@@ -901,7 +908,7 @@ def post_object(url, token, container, name, headers, http_conn=None):
def delete_object(url, token=None, container=None, name=None, http_conn=None,
- headers=None, proxy=None):
+ headers=None, proxy=None, query_string=None):
"""
Delete object
@@ -916,6 +923,7 @@ def delete_object(url, token=None, container=None, name=None, http_conn=None,
:param headers: additional headers to include in the request
:param proxy: proxy to connect through, if any; None by default; str of the
format 'http://127.0.0.1:8888' to set one
+ :param query_string: if set will be appended with '?' to generated path
:raises ClientException: HTTP DELETE request failed
"""
if http_conn:
@@ -927,6 +935,8 @@ def delete_object(url, token=None, container=None, name=None, http_conn=None,
path = '%s/%s' % (path.rstrip('/'), quote(container))
if name:
path = '%s/%s' % (path.rstrip('/'), quote(name))
+ if query_string:
+ path += '?' + query_string
if headers:
headers = dict(headers)
else:
@@ -1085,14 +1095,16 @@ class Connection(object):
"""Wrapper for :func:`head_object`"""
return self._retry(None, head_object, container, obj)
- def get_object(self, container, obj, resp_chunk_size=None):
+ def get_object(self, container, obj, resp_chunk_size=None,
+ query_string=None):
"""Wrapper for :func:`get_object`"""
return self._retry(None, get_object, container, obj,
- resp_chunk_size=resp_chunk_size)
+ resp_chunk_size=resp_chunk_size,
+ query_string=query_string)
def put_object(self, container, obj, contents, content_length=None,
etag=None, chunk_size=None, content_type=None,
- headers=None):
+ headers=None, query_string=None):
"""Wrapper for :func:`put_object`"""
def _default_reset(*args, **kwargs):
@@ -1100,7 +1112,11 @@ class Connection(object):
'ability to reset contents for reupload.'
% (container, obj))
- reset_func = _default_reset
+ if isinstance(contents, str):
+ # if its a str then you can retry as much as you want
+ reset_func = None
+ else:
+ reset_func = _default_reset
tell = getattr(contents, 'tell', None)
seek = getattr(contents, 'seek', None)
if tell and seek:
@@ -1112,12 +1128,13 @@ class Connection(object):
return self._retry(reset_func, put_object, container, obj, contents,
content_length=content_length, etag=etag,
chunk_size=chunk_size, content_type=content_type,
- headers=headers)
+ headers=headers, query_string=query_string)
def post_object(self, container, obj, headers):
"""Wrapper for :func:`post_object`"""
return self._retry(None, post_object, container, obj, headers)
- def delete_object(self, container, obj):
+ def delete_object(self, container, obj, query_string=None):
"""Wrapper for :func:`delete_object`"""
- return self._retry(None, delete_object, container, obj)
+ return self._retry(None, delete_object, container, obj,
+ query_string=query_string)
diff --git a/tests/test_swiftclient.py b/tests/test_swiftclient.py
index 8bc8861..5fd1e28 100644
--- a/tests/test_swiftclient.py
+++ b/tests/test_swiftclient.py
@@ -121,12 +121,15 @@ class MockHttpTest(testtools.TestCase):
def fake_http_connection(*args, **kwargs):
_orig_http_connection = c.http_connection
return_read = kwargs.get('return_read')
+ query_string = kwargs.get('query_string')
def wrapper(url, proxy=None):
parsed, _conn = _orig_http_connection(url, proxy=proxy)
conn = fake_http_connect(*args, **kwargs)()
- def request(*args, **kwargs):
+ def request(method, url, *args, **kwargs):
+ if query_string:
+ self.assert_(url.endswith('?' + query_string))
return
conn.request = request
@@ -430,6 +433,12 @@ class TestGetObject(MockHttpTest):
self.assertRaises(c.ClientException, c.get_object,
'http://www.test.com', 'asdf', 'asdf', 'asdf')
+ def test_query_string(self):
+ c.http_connection = self.fake_http_connection(200,
+ query_string="hello=20")
+ c.get_object('http://www.test.com', 'asdf', 'asdf', 'asdf',
+ query_string="hello=20")
+
class TestHeadObject(MockHttpTest):
@@ -496,6 +505,12 @@ class TestPutObject(MockHttpTest):
except c.ClientException as e:
self.assertEquals(e.http_response_content, body)
+ def test_query_string(self):
+ c.http_connection = self.fake_http_connection(200,
+ query_string="hello=20")
+ c.put_object('http://www.test.com', 'asdf', 'asdf', 'asdf',
+ query_string="hello=20")
+
class TestPostObject(MockHttpTest):
@@ -543,6 +558,12 @@ class TestDeleteObject(MockHttpTest):
self.assertRaises(c.ClientException, c.delete_object,
'http://www.test.com', 'asdf', 'asdf', 'asdf')
+ def test_query_string(self):
+ c.http_connection = self.fake_http_connection(200,
+ query_string="hello=20")
+ c.delete_object('http://www.test.com', 'asdf', 'asdf', 'asdf',
+ query_string="hello=20")
+
class TestConnection(MockHttpTest):