diff options
Diffstat (limited to 'swift/proxy')
-rw-r--r-- | swift/proxy/controllers/base.py | 23 | ||||
-rw-r--r-- | swift/proxy/controllers/container.py | 140 | ||||
-rw-r--r-- | swift/proxy/controllers/obj.py | 311 |
3 files changed, 260 insertions, 214 deletions
diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 758aed72b..93bb056d2 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -615,10 +615,7 @@ def get_cache_key(account, container=None, obj=None, shard=None): raise ValueError('Shard cache key requires account and container') if obj: raise ValueError('Shard cache key cannot have obj') - if shard == 'updating': - cache_key = 'shard-%s-v2/%s/%s' % (shard, account, container) - else: - cache_key = 'shard-%s/%s/%s' % (shard, account, container) + cache_key = 'shard-%s-v2/%s/%s' % (shard, account, container) elif obj: if not (account and container): raise ValueError('Object cache key requires account and container') @@ -1848,16 +1845,22 @@ class Controller(object): :param transfer: If True, transfer headers from original client request :returns: a dictionary of headers """ - # Use the additional headers first so they don't overwrite the headers - # we require. - headers = HeaderKeyDict(additional) if additional else HeaderKeyDict() - if transfer: - self.transfer_headers(orig_req.headers, headers) - headers.setdefault('x-timestamp', Timestamp.now().internal) + headers = HeaderKeyDict() if orig_req: + headers.update((k.lower(), v) + for k, v in orig_req.headers.items() + if k.lower().startswith('x-backend-')) referer = orig_req.as_referer() else: referer = '' + # additional headers can override x-backend-* headers from orig_req + if additional: + headers.update(additional) + if orig_req and transfer: + # transfer headers from orig_req can override additional headers + self.transfer_headers(orig_req.headers, headers) + headers.setdefault('x-timestamp', Timestamp.now().internal) + # orig_req and additional headers cannot override the following... headers['x-trans-id'] = self.trans_id headers['connection'] = 'close' headers['user-agent'] = self.app.backend_user_agent diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py index 4102d652a..fe8480ba3 100644 --- a/swift/proxy/controllers/container.py +++ b/swift/proxy/controllers/container.py @@ -21,7 +21,8 @@ from six.moves.urllib.parse import unquote from swift.common.memcached import MemcacheConnectionError from swift.common.utils import public, private, csv_append, Timestamp, \ - config_true_value, ShardRange, cache_from_env, filter_shard_ranges + config_true_value, ShardRange, cache_from_env, filter_namespaces, \ + NamespaceBoundList from swift.common.constraints import check_metadata, CONTAINER_LISTING_LIMIT from swift.common.http import HTTP_ACCEPTED, is_success from swift.common.request_helpers import get_sys_meta_prefix, get_param, \ @@ -109,25 +110,42 @@ class ContainerController(Controller): req.swift_entity_path, concurrency) return resp - def _make_shard_ranges_response_body(self, req, shard_range_dicts): - # filter shard ranges according to request constraints and return a - # serialised list of shard ranges + def _make_namespaces_response_body(self, req, ns_bound_list): + """ + Filter namespaces according to request constraints and return a + serialised list of namespaces. + + :param req: the request object. + :param ns_bound_list: an instance of + :class:`~swift.common.utils.NamespaceBoundList`. + :return: a serialised list of namespaces. + """ marker = get_param(req, 'marker', '') end_marker = get_param(req, 'end_marker') includes = get_param(req, 'includes') reverse = config_true_value(get_param(req, 'reverse')) if reverse: marker, end_marker = end_marker, marker - shard_ranges = [ - ShardRange.from_dict(shard_range) - for shard_range in shard_range_dicts] - shard_ranges = filter_shard_ranges(shard_ranges, includes, marker, - end_marker) + namespaces = ns_bound_list.get_namespaces() + namespaces = filter_namespaces( + namespaces, includes, marker, end_marker) if reverse: - shard_ranges.reverse() - return json.dumps([dict(sr) for sr in shard_ranges]).encode('ascii') + namespaces.reverse() + return json.dumps([dict(ns) for ns in namespaces]).encode('ascii') def _get_shard_ranges_from_cache(self, req, headers): + """ + Try to fetch shard namespace data from cache and, if successful, return + a response. Also return the cache state. + + The response body will be a list of dicts each of which describes + a Namespace (i.e. includes the keys ``lower``, ``upper`` and ``name``). + + :param req: an instance of ``swob.Request``. + :param headers: Headers to be sent with request. + :return: a tuple comprising (an instance of ``swob.Response``or + ``None`` if no namespaces were found in cache, the cache state). + """ infocache = req.environ.setdefault('swift.infocache', {}) memcache = cache_from_env(req.environ, True) cache_key = get_cache_key(self.account_name, @@ -135,11 +153,10 @@ class ContainerController(Controller): shard='listing') resp_body = None - cached_range_dicts = infocache.get(cache_key) - if cached_range_dicts: + ns_bound_list = infocache.get(cache_key) + if ns_bound_list: cache_state = 'infocache_hit' - resp_body = self._make_shard_ranges_response_body( - req, cached_range_dicts) + resp_body = self._make_namespaces_response_body(req, ns_bound_list) elif memcache: skip_chance = \ self.app.container_listing_shard_ranges_skip_cache @@ -147,12 +164,20 @@ class ContainerController(Controller): cache_state = 'skip' else: try: - cached_range_dicts = memcache.get( + cached_namespaces = memcache.get( cache_key, raise_on_error=True) - if cached_range_dicts: + if cached_namespaces: cache_state = 'hit' - resp_body = self._make_shard_ranges_response_body( - req, cached_range_dicts) + if six.PY2: + # json.loads() in memcache.get will convert json + # 'string' to 'unicode' with python2, here we cast + # 'unicode' back to 'str' + cached_namespaces = [ + [lower.encode('utf-8'), name.encode('utf-8')] + for lower, name in cached_namespaces] + ns_bound_list = NamespaceBoundList(cached_namespaces) + resp_body = self._make_namespaces_response_body( + req, ns_bound_list) else: cache_state = 'miss' except MemcacheConnectionError: @@ -162,9 +187,9 @@ class ContainerController(Controller): resp = None else: # shard ranges can be returned from cache - infocache[cache_key] = tuple(cached_range_dicts) + infocache[cache_key] = ns_bound_list self.logger.debug('Found %d shards in cache for %s', - len(cached_range_dicts), req.path_qs) + len(ns_bound_list.bounds), req.path_qs) headers.update({'x-backend-record-type': 'shard', 'x-backend-cached-results': 'true'}) # mimic GetOrHeadHandler.get_working_response... @@ -180,36 +205,62 @@ class ContainerController(Controller): return resp, cache_state def _store_shard_ranges_in_cache(self, req, resp): - # parse shard ranges returned from backend, store them in infocache and - # memcache, and return a list of dicts - cache_key = get_cache_key(self.account_name, self.container_name, - shard='listing') + """ + Parse shard ranges returned from backend, store them in both infocache + and memcache. + + :param req: the request object. + :param resp: the response object for the shard range listing. + :return: an instance of + :class:`~swift.common.utils.NamespaceBoundList`. + """ + # Note: Any gaps in the response's shard ranges will be 'lost' as a + # result of compacting the list of shard ranges to a + # NamespaceBoundList. That is ok. When the cached NamespaceBoundList is + # transformed back to shard range Namespaces to perform a listing, the + # Namespace before each gap will have expanded to include the gap, + # which means that the backend GET to that shard will have an + # end_marker beyond that shard's upper bound, and equal to the next + # available shard's lower. At worst, some misplaced objects, in the gap + # above the shard's upper, may be included in the shard's response. data = self._parse_listing_response(req, resp) backend_shard_ranges = self._parse_shard_ranges(req, data, resp) if backend_shard_ranges is None: return None - cached_range_dicts = [dict(sr) for sr in backend_shard_ranges] + ns_bound_list = NamespaceBoundList.parse(backend_shard_ranges) if resp.headers.get('x-backend-sharding-state') == 'sharded': # cache in infocache even if no shard ranges returned; this # is unexpected but use that result for this request infocache = req.environ.setdefault('swift.infocache', {}) - infocache[cache_key] = tuple(cached_range_dicts) + cache_key = get_cache_key( + self.account_name, self.container_name, shard='listing') + infocache[cache_key] = ns_bound_list memcache = cache_from_env(req.environ, True) - if memcache and cached_range_dicts: + if memcache and ns_bound_list: # cache in memcache only if shard ranges as expected self.logger.debug('Caching %d shards for %s', - len(cached_range_dicts), req.path_qs) - memcache.set(cache_key, cached_range_dicts, + len(ns_bound_list.bounds), req.path_qs) + memcache.set(cache_key, ns_bound_list.bounds, time=self.app.recheck_listing_shard_ranges) - return cached_range_dicts + return ns_bound_list def _get_shard_ranges_from_backend(self, req): - # Make a backend request for shard ranges. The response is cached and - # then returned as a list of dicts. + """ + Make a backend request for shard ranges and return a response. + + The response body will be a list of dicts each of which describes + a Namespace (i.e. includes the keys ``lower``, ``upper`` and ``name``). + If the response headers indicate that the response body contains a + complete list of shard ranges for a sharded container then the response + body will be transformed to a ``NamespaceBoundsList`` and cached. + + :param req: an instance of ``swob.Request``. + :return: an instance of ``swob.Response``. + """ # Note: We instruct the backend server to ignore name constraints in # request params if returning shard ranges so that the response can - # potentially be cached. Only do this if the container state is + # potentially be cached, but we only cache it if the container state is # 'sharded'. We don't attempt to cache shard ranges for a 'sharding' # container as they may include the container itself as a 'gap filler' # for shard ranges that have not yet cleaved; listings from 'gap @@ -232,10 +283,10 @@ class ContainerController(Controller): if (resp_record_type == 'shard' and sharding_state == 'sharded' and complete_listing): - cached_range_dicts = self._store_shard_ranges_in_cache(req, resp) - if cached_range_dicts: - resp.body = self._make_shard_ranges_response_body( - req, cached_range_dicts) + ns_bound_list = self._store_shard_ranges_in_cache(req, resp) + if ns_bound_list: + resp.body = self._make_namespaces_response_body( + req, ns_bound_list) return resp def _record_shard_listing_cache_metrics( @@ -334,7 +385,6 @@ class ContainerController(Controller): params['states'] = 'listing' req.params = params - memcache = cache_from_env(req.environ, True) if (req.method == 'GET' and get_param(req, 'states') == 'listing' and record_type != 'object'): @@ -346,6 +396,7 @@ class ContainerController(Controller): info = None may_get_listing_shards = False + memcache = cache_from_env(req.environ, True) sr_cache_state = None if (may_get_listing_shards and self.app.recheck_listing_shard_ranges > 0 @@ -424,8 +475,15 @@ class ContainerController(Controller): # 'X-Backend-Storage-Policy-Index'. req.headers[policy_key] = resp.headers[policy_key] shard_listing_history.append((self.account_name, self.container_name)) - shard_ranges = [ShardRange.from_dict(data) - for data in json.loads(resp.body)] + # Note: when the response body has been synthesised from cached data, + # each item in the list only has 'name', 'lower' and 'upper' keys. We + # therefore cannot use ShardRange.from_dict(), and the ShardRange + # instances constructed here will only have 'name', 'lower' and 'upper' + # attributes set. + # Ideally we would construct Namespace objects here, but later we use + # the ShardRange account and container properties to access parsed + # parts of the name. + shard_ranges = [ShardRange(**data) for data in json.loads(resp.body)] self.logger.debug('GET listing from %s shards for: %s', len(shard_ranges), req.path_qs) if not shard_ranges: diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index b69631538..fc0f8a6d1 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -48,7 +48,7 @@ from swift.common.utils import ( normalize_delete_at_timestamp, public, get_expirer_container, document_iters_to_http_response_body, parse_content_range, quorum_size, reiterate, close_if_possible, safe_json_loads, md5, - ShardRange, find_shard_range, cache_from_env, NamespaceBoundList) + ShardRange, find_namespace, cache_from_env, NamespaceBoundList) from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_metadata, check_object_creation from swift.common import constraints @@ -388,7 +388,7 @@ class BaseObjectController(Controller): memcache.set( cache_key, cached_namespaces.bounds, time=self.app.recheck_updating_shard_ranges) - update_shard = find_shard_range(obj, shard_ranges or []) + update_shard = find_namespace(obj, shard_ranges or []) record_cache_op_metrics( self.logger, 'shard_updating', cache_state, response) return update_shard @@ -1518,7 +1518,7 @@ class ECAppIter(object): except ChunkWriteTimeout: # slow client disconnect self.logger.exception( - "ChunkWriteTimeout fetching fragments for %r", + "ChunkWriteTimeout feeding fragments for %r", quote(self.path)) except: # noqa self.logger.exception("Exception fetching fragments for %r", @@ -2497,10 +2497,10 @@ class ECFragGetter(object): self.backend_headers = backend_headers self.header_provider = header_provider self.req_query_string = req.query_string - self.client_chunk_size = policy.fragment_size + self.fragment_size = policy.fragment_size self.skip_bytes = 0 self.bytes_used_from_backend = 0 - self.source = None + self.source = self.node = None self.logger_thread_locals = logger_thread_locals self.logger = logger @@ -2578,8 +2578,8 @@ class ECFragGetter(object): def learn_size_from_content_range(self, start, end, length): """ - If client_chunk_size is set, makes sure we yield things starting on - chunk boundaries based on the Content-Range header in the response. + Make sure we yield things starting on fragment boundaries based on the + Content-Range header in the response. Sets our Range header's first byterange to the value learned from the Content-Range header in the response; if we were given a @@ -2593,8 +2593,7 @@ class ECFragGetter(object): if length == 0: return - if self.client_chunk_size: - self.skip_bytes = bytes_to_skip(self.client_chunk_size, start) + self.skip_bytes = bytes_to_skip(self.fragment_size, start) if 'Range' in self.backend_headers: try: @@ -2620,170 +2619,155 @@ class ECFragGetter(object): it = self._get_response_parts_iter(req) return it - def _get_response_parts_iter(self, req): - try: - client_chunk_size = self.client_chunk_size - node_timeout = self.app.recoverable_node_timeout - - # This is safe; it sets up a generator but does not call next() - # on it, so no IO is performed. - parts_iter = [ - http_response_to_document_iters( - self.source, read_chunk_size=self.app.object_chunk_size)] + def get_next_doc_part(self): + node_timeout = self.app.recoverable_node_timeout - def get_next_doc_part(): - while True: - # the loop here is to resume if trying to parse - # multipart/byteranges response raises a ChunkReadTimeout - # and resets the parts_iter - try: - with WatchdogTimeout(self.app.watchdog, node_timeout, - ChunkReadTimeout): - # If we don't have a multipart/byteranges response, - # but just a 200 or a single-range 206, then this - # performs no IO, and just returns source (or - # raises StopIteration). - # Otherwise, this call to next() performs IO when - # we have a multipart/byteranges response; as it - # will read the MIME boundary and part headers. - start_byte, end_byte, length, headers, part = next( - parts_iter[0]) - return (start_byte, end_byte, length, headers, part) - except ChunkReadTimeout: - new_source, new_node = self._dig_for_source_and_node() - if not new_source: - raise - self.app.error_occurred( - self.node, 'Trying to read next part of ' - 'EC multi-part GET (retrying)') - # Close-out the connection as best as possible. - if getattr(self.source, 'swift_conn', None): - close_swift_conn(self.source) - self.source = new_source - self.node = new_node - # This is safe; it sets up a generator but does - # not call next() on it, so no IO is performed. - parts_iter[0] = http_response_to_document_iters( + while True: + # the loop here is to resume if trying to parse + # multipart/byteranges response raises a ChunkReadTimeout + # and resets the source_parts_iter + try: + with WatchdogTimeout(self.app.watchdog, node_timeout, + ChunkReadTimeout): + # If we don't have a multipart/byteranges response, + # but just a 200 or a single-range 206, then this + # performs no IO, and just returns source (or + # raises StopIteration). + # Otherwise, this call to next() performs IO when + # we have a multipart/byteranges response; as it + # will read the MIME boundary and part headers. + start_byte, end_byte, length, headers, part = next( + self.source_parts_iter) + return (start_byte, end_byte, length, headers, part) + except ChunkReadTimeout: + new_source, new_node = self._dig_for_source_and_node() + if not new_source: + raise + self.app.error_occurred( + self.node, 'Trying to read next part of ' + 'EC multi-part GET (retrying)') + # Close-out the connection as best as possible. + if getattr(self.source, 'swift_conn', None): + close_swift_conn(self.source) + self.source = new_source + self.node = new_node + # This is safe; it sets up a generator but does + # not call next() on it, so no IO is performed. + self.source_parts_iter = \ + http_response_to_document_iters( + new_source, + read_chunk_size=self.app.object_chunk_size) + + def iter_bytes_from_response_part(self, part_file, nbytes): + nchunks = 0 + buf = b'' + part_file = ByteCountEnforcer(part_file, nbytes) + while True: + try: + with WatchdogTimeout(self.app.watchdog, + self.app.recoverable_node_timeout, + ChunkReadTimeout): + chunk = part_file.read(self.app.object_chunk_size) + nchunks += 1 + # NB: this append must be *inside* the context + # manager for test.unit.SlowBody to do its thing + buf += chunk + if nbytes is not None: + nbytes -= len(chunk) + except (ChunkReadTimeout, ShortReadError): + exc_type, exc_value, exc_traceback = sys.exc_info() + try: + self.fast_forward(self.bytes_used_from_backend) + except (HTTPException, ValueError): + self.logger.exception('Unable to fast forward') + six.reraise(exc_type, exc_value, exc_traceback) + except RangeAlreadyComplete: + break + buf = b'' + old_node = self.node + new_source, new_node = self._dig_for_source_and_node() + if new_source: + self.app.error_occurred( + old_node, 'Trying to read EC fragment ' + 'during GET (retrying)') + # Close-out the connection as best as possible. + if getattr(self.source, 'swift_conn', None): + close_swift_conn(self.source) + self.source = new_source + self.node = new_node + # This is safe; it just sets up a generator but + # does not call next() on it, so no IO is + # performed. + self.source_parts_iter = \ + http_response_to_document_iters( new_source, read_chunk_size=self.app.object_chunk_size) - - def iter_bytes_from_response_part(part_file, nbytes): - nchunks = 0 - buf = b'' - part_file = ByteCountEnforcer(part_file, nbytes) - while True: try: - with WatchdogTimeout(self.app.watchdog, node_timeout, - ChunkReadTimeout): - chunk = part_file.read(self.app.object_chunk_size) - nchunks += 1 - # NB: this append must be *inside* the context - # manager for test.unit.SlowBody to do its thing - buf += chunk - if nbytes is not None: - nbytes -= len(chunk) - except (ChunkReadTimeout, ShortReadError): - exc_type, exc_value, exc_traceback = sys.exc_info() - try: - self.fast_forward(self.bytes_used_from_backend) - except (HTTPException, ValueError): - self.logger.exception('Unable to fast forward') - six.reraise(exc_type, exc_value, exc_traceback) - except RangeAlreadyComplete: - break - buf = b'' - old_node = self.node - new_source, new_node = self._dig_for_source_and_node() - if new_source: - self.app.error_occurred( - old_node, 'Trying to read EC fragment ' - 'during GET (retrying)') - # Close-out the connection as best as possible. - if getattr(self.source, 'swift_conn', None): - close_swift_conn(self.source) - self.source = new_source - self.node = new_node - # This is safe; it just sets up a generator but - # does not call next() on it, so no IO is - # performed. - parts_iter[0] = http_response_to_document_iters( - new_source, - read_chunk_size=self.app.object_chunk_size) - try: - _junk, _junk, _junk, _junk, part_file = \ - get_next_doc_part() - except StopIteration: - # it's not clear to me how to make - # get_next_doc_part raise StopIteration for the - # first doc part of a new request - six.reraise(exc_type, exc_value, exc_traceback) - part_file = ByteCountEnforcer(part_file, nbytes) - else: - six.reraise(exc_type, exc_value, exc_traceback) + _junk, _junk, _junk, _junk, part_file = \ + self.get_next_doc_part() + except StopIteration: + # it's not clear to me how to make + # get_next_doc_part raise StopIteration for the + # first doc part of a new request + six.reraise(exc_type, exc_value, exc_traceback) + part_file = ByteCountEnforcer(part_file, nbytes) + else: + six.reraise(exc_type, exc_value, exc_traceback) + else: + if buf and self.skip_bytes: + if self.skip_bytes < len(buf): + buf = buf[self.skip_bytes:] + self.bytes_used_from_backend += self.skip_bytes + self.skip_bytes = 0 else: - if buf and self.skip_bytes: - if self.skip_bytes < len(buf): - buf = buf[self.skip_bytes:] - self.bytes_used_from_backend += self.skip_bytes - self.skip_bytes = 0 - else: - self.skip_bytes -= len(buf) - self.bytes_used_from_backend += len(buf) - buf = b'' - - if not chunk: - if buf: - with WatchdogTimeout(self.app.watchdog, - self.app.client_timeout, - ChunkWriteTimeout): - self.bytes_used_from_backend += len(buf) - yield buf - buf = b'' - break - - if client_chunk_size is not None: - while len(buf) >= client_chunk_size: - client_chunk = buf[:client_chunk_size] - buf = buf[client_chunk_size:] - with WatchdogTimeout(self.app.watchdog, - self.app.client_timeout, - ChunkWriteTimeout): - self.bytes_used_from_backend += \ - len(client_chunk) - yield client_chunk - else: - with WatchdogTimeout(self.app.watchdog, - self.app.client_timeout, - ChunkWriteTimeout): - self.bytes_used_from_backend += len(buf) - yield buf - buf = b'' - - # This is for fairness; if the network is outpacing - # the CPU, we'll always be able to read and write - # data without encountering an EWOULDBLOCK, and so - # eventlet will not switch greenthreads on its own. - # We do it manually so that clients don't starve. - # - # The number 5 here was chosen by making stuff up. - # It's not every single chunk, but it's not too big - # either, so it seemed like it would probably be an - # okay choice. - # - # Note that we may trampoline to other greenthreads - # more often than once every 5 chunks, depending on - # how blocking our network IO is; the explicit sleep - # here simply provides a lower bound on the rate of - # trampolining. - if nchunks % 5 == 0: - sleep() + self.skip_bytes -= len(buf) + self.bytes_used_from_backend += len(buf) + buf = b'' + + while buf and (len(buf) >= self.fragment_size or not chunk): + client_chunk = buf[:self.fragment_size] + buf = buf[self.fragment_size:] + with WatchdogTimeout(self.app.watchdog, + self.app.client_timeout, + ChunkWriteTimeout): + self.bytes_used_from_backend += len(client_chunk) + yield client_chunk + + if not chunk: + break + + # This is for fairness; if the network is outpacing + # the CPU, we'll always be able to read and write + # data without encountering an EWOULDBLOCK, and so + # eventlet will not switch greenthreads on its own. + # We do it manually so that clients don't starve. + # + # The number 5 here was chosen by making stuff up. + # It's not every single chunk, but it's not too big + # either, so it seemed like it would probably be an + # okay choice. + # + # Note that we may trampoline to other greenthreads + # more often than once every 5 chunks, depending on + # how blocking our network IO is; the explicit sleep + # here simply provides a lower bound on the rate of + # trampolining. + if nchunks % 5 == 0: + sleep() + + def _get_response_parts_iter(self, req): + try: + # This is safe; it sets up a generator but does not call next() + # on it, so no IO is performed. + self.source_parts_iter = http_response_to_document_iters( + self.source, read_chunk_size=self.app.object_chunk_size) part_iter = None try: while True: try: start_byte, end_byte, length, headers, part = \ - get_next_doc_part() + self.get_next_doc_part() except StopIteration: # it seems this is the only way out of the loop; not # sure why the req.environ update is always needed @@ -2800,7 +2784,8 @@ class ECFragGetter(object): if (end_byte is not None and start_byte is not None) else None) - part_iter = iter_bytes_from_response_part(part, byte_count) + part_iter = self.iter_bytes_from_response_part( + part, byte_count) yield {'start_byte': start_byte, 'end_byte': end_byte, 'entity_length': length, 'headers': headers, 'part_iter': part_iter} |