diff options
author | Jenkins <jenkins@review.openstack.org> | 2013-04-04 23:08:25 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2013-04-04 23:08:25 +0000 |
commit | 2d97609a52bef2f437c15ff72dced29663c570ff (patch) | |
tree | 76d77ecbd0d7da9bfa9fdedeba73f08f63f66e38 | |
parent | fab61c8275893c7ab7279336b9d64d1154264059 (diff) | |
parent | 2b3d1719073fa58b651ca82f64a366e3f737d71a (diff) | |
download | python-swiftclient-2d97609a52bef2f437c15ff72dced29663c570ff.tar.gz |
Merge "Static large object support."1.4.0
-rwxr-xr-x | bin/swift | 139 | ||||
-rw-r--r-- | swiftclient/client.py | 37 | ||||
-rw-r--r-- | tests/test_swiftclient.py | 23 |
3 files changed, 154 insertions, 45 deletions
@@ -30,6 +30,11 @@ from time import sleep, time from traceback import format_exception from urllib import quote, unquote +try: + import simplejson as json +except ImportError: + import json + from swiftclient import Connection, ClientException, HTTPException, utils from swiftclient.version import version_info @@ -111,6 +116,8 @@ class QueueFunctionThread(Thread): self.args = args self.kwargs = kwargs self.exc_infos = [] + self.results = [] + self.store_results = kwargs.pop('store_results', False) def run(self): while True: @@ -123,7 +130,9 @@ class QueueFunctionThread(Thread): else: try: if not self.abort: - self.func(item, *self.args, **self.kwargs) + res = self.func(item, *self.args, **self.kwargs) + if self.store_results: + self.results.append(res) except Exception: self.exc_infos.append(exc_info()) finally: @@ -171,19 +180,23 @@ def st_delete(parser, args, print_queue, error_queue): def _delete_object((container, obj), conn): try: old_manifest = None + query_string = None if not options.leave_segments: try: - old_manifest = conn.head_object(container, obj).get( - 'x-object-manifest') + headers = conn.head_object(container, obj) + old_manifest = headers.get('x-object-manifest') + if utils.config_true_value( + headers.get('x-static-large-object')): + query_string = 'multipart-manifest=delete' except ClientException, err: if err.http_status != 404: raise - conn.delete_object(container, obj) + conn.delete_object(container, obj, query_string=query_string) if old_manifest: segment_queue = Queue(10000) scontainer, sprefix = old_manifest.split('/', 1) scontainer = unquote(scontainer) - sprefix = unquote(sprefix) + sprefix = unquote(sprefix).rstrip('/') + '/' for delobj in conn.get_container(scontainer, prefix=sprefix)[1]: segment_queue.put((scontainer, delobj['name'])) @@ -793,7 +806,10 @@ def st_upload(parser, args, print_queue, error_queue): default=[], help='Set request headers with the syntax header:value. ' ' This option may be repeated. Example -H content-type:text/plain ' '-H "Content-Length: 4000"') - + parser.add_option('', '--use-slo', action='store_true', default=False, + help='When used in conjuction with --segment-size will ' + 'create a Static Large Object instead of the default ' + 'Dynamic Large Object.') (options, args) = parse_args(parser, args) args = args[1:] if len(args) < 2: @@ -811,14 +827,17 @@ def st_upload(parser, args, print_queue, error_queue): seg_container = args[0] +'_segments' if options.segment_container: seg_container = options.segment_container - conn.put_object(job.get('container', seg_container), + etag = conn.put_object(job.get('container', seg_container), job['obj'], fp, content_length=job['segment_size']) + job['segment_location'] = '/%s/%s' % (seg_container, job['obj']) + job['segment_etag'] = etag if options.verbose and 'log_line' in job: if conn.attempts > 1: print_queue.put('%s [after %d attempts]' % (job['log_line'], conn.attempts)) else: print_queue.put(job['log_line']) + return job def _object_job(job, conn): path = job['path'] @@ -855,6 +874,8 @@ def st_upload(parser, args, print_queue, error_queue): # manifest object and need to delete the old segments # ourselves. old_manifest = None + old_slo_manifest_paths = [] + new_slo_manifest_paths = set() if options.changed or not options.leave_segments: try: headers = conn.head_object(container, obj) @@ -865,6 +886,16 @@ def st_upload(parser, args, print_queue, error_queue): return if not options.leave_segments: old_manifest = headers.get('x-object-manifest') + if utils.config_true_value( + headers.get('x-static-large-object')): + headers, manifest_data = conn.get_object( + container, obj, + query_string='multipart-manifest=get') + for old_seg in json.loads(manifest_data): + seg_path = old_seg['name'].lstrip('/') + if isinstance(seg_path, unicode): + seg_path = seg_path.encode('utf-8') + old_slo_manifest_paths.append(seg_path) except ClientException, err: if err.http_status != 404: raise @@ -879,9 +910,10 @@ def st_upload(parser, args, print_queue, error_queue): seg_container = options.segment_container full_size = getsize(path) segment_queue = Queue(10000) - segment_threads = [QueueFunctionThread(segment_queue, - _segment_job, create_connection()) for _junk in - xrange(options.segment_threads)] + segment_threads = [ + QueueFunctionThread(segment_queue, + _segment_job, create_connection(), store_results=True) + for _junk in xrange(options.segment_threads)] for thread in segment_threads: thread.start() segment = 0 @@ -890,13 +922,20 @@ def st_upload(parser, args, print_queue, error_queue): segment_size = int(options.segment_size) if segment_start + segment_size > full_size: segment_size = full_size - segment_start - segment_queue.put({'path': path, - 'obj': '%s/%s/%s/%s/%08d' % (obj, - put_headers['x-object-meta-mtime'], full_size, - options.segment_size, segment), - 'segment_start': segment_start, - 'segment_size': segment_size, - 'log_line': '%s segment %s' % (obj, segment)}) + if options.use_slo: + segment_name = '%s/slo/%s/%s/%s/%08d' % ( + obj, put_headers['x-object-meta-mtime'], + full_size, options.segment_size, segment) + else: + segment_name = '%s/%s/%s/%s/%08d' % ( + obj, put_headers['x-object-meta-mtime'], + full_size, options.segment_size, segment) + segment_queue.put( + {'path': path, 'obj': segment_name, + 'segment_start': segment_start, + 'segment_size': segment_size, + 'segment_index': segment, + 'log_line': '%s segment %s' % (obj, segment)}) segment += 1 segment_start += segment_size while not segment_queue.empty(): @@ -909,27 +948,59 @@ def st_upload(parser, args, print_queue, error_queue): raise ClientException('Aborting manifest creation ' 'because not all segments could be uploaded. %s/%s' % (container, obj)) - new_object_manifest = '%s/%s/%s/%s/%s' % ( - quote(seg_container), quote(obj), - put_headers['x-object-meta-mtime'], full_size, - options.segment_size) - if old_manifest == new_object_manifest: - old_manifest = None - put_headers['x-object-manifest'] = new_object_manifest - conn.put_object(container, obj, '', content_length=0, - headers=put_headers) + if options.use_slo: + slo_segments = [] + for thread in segment_threads: + slo_segments += thread.results + slo_segments.sort(key=lambda d: d['segment_index']) + for seg in slo_segments: + seg_loc = seg['segment_location'].lstrip('/') + if isinstance(seg_loc, unicode): + seg_loc = seg_loc.encode('utf-8') + new_slo_manifest_paths.add(seg_loc) + + manifest_data = json.dumps([ + {'path': d['segment_location'], + 'etag': d['segment_etag'], + 'size_bytes': d['segment_size']} + for d in slo_segments]) + + put_headers['x-static-large-object'] = 'true' + conn.put_object(container, obj, manifest_data, + headers=put_headers, + query_string='multipart-manifest=put') + else: + new_object_manifest = '%s/%s/%s/%s/%s/' % ( + quote(seg_container), quote(obj), + put_headers['x-object-meta-mtime'], full_size, + options.segment_size) + if old_manifest and old_manifest.rstrip('/') == \ + new_object_manifest.rstrip('/'): + old_manifest = None + put_headers['x-object-manifest'] = new_object_manifest + conn.put_object(container, obj, '', content_length=0, + headers=put_headers) else: conn.put_object(container, obj, open(path, 'rb'), content_length=getsize(path), headers=put_headers) - if old_manifest: + if old_manifest or old_slo_manifest_paths: segment_queue = Queue(10000) - scontainer, sprefix = old_manifest.split('/', 1) - scontainer = unquote(scontainer) - sprefix = unquote(sprefix) - for delobj in conn.get_container(scontainer, - prefix=sprefix)[1]: - segment_queue.put({'delete': True, - 'container': scontainer, 'obj': delobj['name']}) + if old_manifest: + scontainer, sprefix = old_manifest.split('/', 1) + scontainer = unquote(scontainer) + sprefix = unquote(sprefix).rstrip('/') + '/' + for delobj in conn.get_container(scontainer, + prefix=sprefix)[1]: + segment_queue.put({'delete': True, + 'container': scontainer, 'obj': delobj['name']}) + if old_slo_manifest_paths: + for seg_to_delete in old_slo_manifest_paths: + if seg_to_delete in new_slo_manifest_paths: + continue + scont, sobj = \ + seg_to_delete.split('/', 1) + segment_queue.put({'delete': True, + 'container': scont, 'obj': sobj}) if not segment_queue.empty(): segment_threads = [QueueFunctionThread(segment_queue, _segment_job, create_connection()) for _junk in diff --git a/swiftclient/client.py b/swiftclient/client.py index 3005a57..a607898 100644 --- a/swiftclient/client.py +++ b/swiftclient/client.py @@ -672,7 +672,7 @@ def delete_container(url, token, container, http_conn=None): def get_object(url, token, container, name, http_conn=None, - resp_chunk_size=None): + resp_chunk_size=None, query_string=None): """ Get an object @@ -686,6 +686,7 @@ def get_object(url, token, container, name, http_conn=None, you specify a resp_chunk_size you must fully read the object's contents before making another request. + :param query_string: if set will be appended with '?' to generated path :returns: a tuple of (response headers, the object's contents) The response headers will be a dict and all header names will be lowercase. :raises ClientException: HTTP GET request failed @@ -695,6 +696,8 @@ def get_object(url, token, container, name, http_conn=None, else: parsed, conn = http_connection(url) path = '%s/%s/%s' % (parsed.path, quote(container), quote(name)) + if query_string: + path += '?' + query_string method = 'GET' headers = {'X-Auth-Token': token} conn.request(method, path, '', headers) @@ -766,7 +769,8 @@ def head_object(url, token, container, name, http_conn=None): def put_object(url, token=None, container=None, name=None, contents=None, content_length=None, etag=None, chunk_size=None, - content_type=None, headers=None, http_conn=None, proxy=None): + content_type=None, headers=None, http_conn=None, proxy=None, + query_string=None): """ Put an object @@ -794,6 +798,7 @@ def put_object(url, token=None, container=None, name=None, contents=None, conn object) :param proxy: proxy to connect through, if any; None by default; str of the format 'http://127.0.0.1:8888' to set one + :param query_string: if set will be appended with '?' to generated path :returns: etag from server response :raises ClientException: HTTP PUT request failed """ @@ -806,6 +811,8 @@ def put_object(url, token=None, container=None, name=None, contents=None, path = '%s/%s' % (path.rstrip('/'), quote(container)) if name: path = '%s/%s' % (path.rstrip('/'), quote(name)) + if query_string: + path += '?' + query_string if headers: headers = dict(headers) else: @@ -901,7 +908,7 @@ def post_object(url, token, container, name, headers, http_conn=None): def delete_object(url, token=None, container=None, name=None, http_conn=None, - headers=None, proxy=None): + headers=None, proxy=None, query_string=None): """ Delete object @@ -916,6 +923,7 @@ def delete_object(url, token=None, container=None, name=None, http_conn=None, :param headers: additional headers to include in the request :param proxy: proxy to connect through, if any; None by default; str of the format 'http://127.0.0.1:8888' to set one + :param query_string: if set will be appended with '?' to generated path :raises ClientException: HTTP DELETE request failed """ if http_conn: @@ -927,6 +935,8 @@ def delete_object(url, token=None, container=None, name=None, http_conn=None, path = '%s/%s' % (path.rstrip('/'), quote(container)) if name: path = '%s/%s' % (path.rstrip('/'), quote(name)) + if query_string: + path += '?' + query_string if headers: headers = dict(headers) else: @@ -1085,14 +1095,16 @@ class Connection(object): """Wrapper for :func:`head_object`""" return self._retry(None, head_object, container, obj) - def get_object(self, container, obj, resp_chunk_size=None): + def get_object(self, container, obj, resp_chunk_size=None, + query_string=None): """Wrapper for :func:`get_object`""" return self._retry(None, get_object, container, obj, - resp_chunk_size=resp_chunk_size) + resp_chunk_size=resp_chunk_size, + query_string=query_string) def put_object(self, container, obj, contents, content_length=None, etag=None, chunk_size=None, content_type=None, - headers=None): + headers=None, query_string=None): """Wrapper for :func:`put_object`""" def _default_reset(*args, **kwargs): @@ -1100,7 +1112,11 @@ class Connection(object): 'ability to reset contents for reupload.' % (container, obj)) - reset_func = _default_reset + if isinstance(contents, str): + # if its a str then you can retry as much as you want + reset_func = None + else: + reset_func = _default_reset tell = getattr(contents, 'tell', None) seek = getattr(contents, 'seek', None) if tell and seek: @@ -1112,12 +1128,13 @@ class Connection(object): return self._retry(reset_func, put_object, container, obj, contents, content_length=content_length, etag=etag, chunk_size=chunk_size, content_type=content_type, - headers=headers) + headers=headers, query_string=query_string) def post_object(self, container, obj, headers): """Wrapper for :func:`post_object`""" return self._retry(None, post_object, container, obj, headers) - def delete_object(self, container, obj): + def delete_object(self, container, obj, query_string=None): """Wrapper for :func:`delete_object`""" - return self._retry(None, delete_object, container, obj) + return self._retry(None, delete_object, container, obj, + query_string=query_string) diff --git a/tests/test_swiftclient.py b/tests/test_swiftclient.py index 8bc8861..5fd1e28 100644 --- a/tests/test_swiftclient.py +++ b/tests/test_swiftclient.py @@ -121,12 +121,15 @@ class MockHttpTest(testtools.TestCase): def fake_http_connection(*args, **kwargs): _orig_http_connection = c.http_connection return_read = kwargs.get('return_read') + query_string = kwargs.get('query_string') def wrapper(url, proxy=None): parsed, _conn = _orig_http_connection(url, proxy=proxy) conn = fake_http_connect(*args, **kwargs)() - def request(*args, **kwargs): + def request(method, url, *args, **kwargs): + if query_string: + self.assert_(url.endswith('?' + query_string)) return conn.request = request @@ -430,6 +433,12 @@ class TestGetObject(MockHttpTest): self.assertRaises(c.ClientException, c.get_object, 'http://www.test.com', 'asdf', 'asdf', 'asdf') + def test_query_string(self): + c.http_connection = self.fake_http_connection(200, + query_string="hello=20") + c.get_object('http://www.test.com', 'asdf', 'asdf', 'asdf', + query_string="hello=20") + class TestHeadObject(MockHttpTest): @@ -496,6 +505,12 @@ class TestPutObject(MockHttpTest): except c.ClientException as e: self.assertEquals(e.http_response_content, body) + def test_query_string(self): + c.http_connection = self.fake_http_connection(200, + query_string="hello=20") + c.put_object('http://www.test.com', 'asdf', 'asdf', 'asdf', + query_string="hello=20") + class TestPostObject(MockHttpTest): @@ -543,6 +558,12 @@ class TestDeleteObject(MockHttpTest): self.assertRaises(c.ClientException, c.delete_object, 'http://www.test.com', 'asdf', 'asdf', 'asdf') + def test_query_string(self): + c.http_connection = self.fake_http_connection(200, + query_string="hello=20") + c.delete_object('http://www.test.com', 'asdf', 'asdf', 'asdf', + query_string="hello=20") + class TestConnection(MockHttpTest): |