summaryrefslogtreecommitdiff
path: root/swift/proxy/controllers
diff options
context:
space:
mode:
Diffstat (limited to 'swift/proxy/controllers')
-rw-r--r--swift/proxy/controllers/base.py23
-rw-r--r--swift/proxy/controllers/container.py140
-rw-r--r--swift/proxy/controllers/obj.py311
3 files changed, 260 insertions, 214 deletions
diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py
index 758aed72b..93bb056d2 100644
--- a/swift/proxy/controllers/base.py
+++ b/swift/proxy/controllers/base.py
@@ -615,10 +615,7 @@ def get_cache_key(account, container=None, obj=None, shard=None):
raise ValueError('Shard cache key requires account and container')
if obj:
raise ValueError('Shard cache key cannot have obj')
- if shard == 'updating':
- cache_key = 'shard-%s-v2/%s/%s' % (shard, account, container)
- else:
- cache_key = 'shard-%s/%s/%s' % (shard, account, container)
+ cache_key = 'shard-%s-v2/%s/%s' % (shard, account, container)
elif obj:
if not (account and container):
raise ValueError('Object cache key requires account and container')
@@ -1848,16 +1845,22 @@ class Controller(object):
:param transfer: If True, transfer headers from original client request
:returns: a dictionary of headers
"""
- # Use the additional headers first so they don't overwrite the headers
- # we require.
- headers = HeaderKeyDict(additional) if additional else HeaderKeyDict()
- if transfer:
- self.transfer_headers(orig_req.headers, headers)
- headers.setdefault('x-timestamp', Timestamp.now().internal)
+ headers = HeaderKeyDict()
if orig_req:
+ headers.update((k.lower(), v)
+ for k, v in orig_req.headers.items()
+ if k.lower().startswith('x-backend-'))
referer = orig_req.as_referer()
else:
referer = ''
+ # additional headers can override x-backend-* headers from orig_req
+ if additional:
+ headers.update(additional)
+ if orig_req and transfer:
+ # transfer headers from orig_req can override additional headers
+ self.transfer_headers(orig_req.headers, headers)
+ headers.setdefault('x-timestamp', Timestamp.now().internal)
+ # orig_req and additional headers cannot override the following...
headers['x-trans-id'] = self.trans_id
headers['connection'] = 'close'
headers['user-agent'] = self.app.backend_user_agent
diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py
index 4102d652a..fe8480ba3 100644
--- a/swift/proxy/controllers/container.py
+++ b/swift/proxy/controllers/container.py
@@ -21,7 +21,8 @@ from six.moves.urllib.parse import unquote
from swift.common.memcached import MemcacheConnectionError
from swift.common.utils import public, private, csv_append, Timestamp, \
- config_true_value, ShardRange, cache_from_env, filter_shard_ranges
+ config_true_value, ShardRange, cache_from_env, filter_namespaces, \
+ NamespaceBoundList
from swift.common.constraints import check_metadata, CONTAINER_LISTING_LIMIT
from swift.common.http import HTTP_ACCEPTED, is_success
from swift.common.request_helpers import get_sys_meta_prefix, get_param, \
@@ -109,25 +110,42 @@ class ContainerController(Controller):
req.swift_entity_path, concurrency)
return resp
- def _make_shard_ranges_response_body(self, req, shard_range_dicts):
- # filter shard ranges according to request constraints and return a
- # serialised list of shard ranges
+ def _make_namespaces_response_body(self, req, ns_bound_list):
+ """
+ Filter namespaces according to request constraints and return a
+ serialised list of namespaces.
+
+ :param req: the request object.
+ :param ns_bound_list: an instance of
+ :class:`~swift.common.utils.NamespaceBoundList`.
+ :return: a serialised list of namespaces.
+ """
marker = get_param(req, 'marker', '')
end_marker = get_param(req, 'end_marker')
includes = get_param(req, 'includes')
reverse = config_true_value(get_param(req, 'reverse'))
if reverse:
marker, end_marker = end_marker, marker
- shard_ranges = [
- ShardRange.from_dict(shard_range)
- for shard_range in shard_range_dicts]
- shard_ranges = filter_shard_ranges(shard_ranges, includes, marker,
- end_marker)
+ namespaces = ns_bound_list.get_namespaces()
+ namespaces = filter_namespaces(
+ namespaces, includes, marker, end_marker)
if reverse:
- shard_ranges.reverse()
- return json.dumps([dict(sr) for sr in shard_ranges]).encode('ascii')
+ namespaces.reverse()
+ return json.dumps([dict(ns) for ns in namespaces]).encode('ascii')
def _get_shard_ranges_from_cache(self, req, headers):
+ """
+ Try to fetch shard namespace data from cache and, if successful, return
+ a response. Also return the cache state.
+
+ The response body will be a list of dicts each of which describes
+ a Namespace (i.e. includes the keys ``lower``, ``upper`` and ``name``).
+
+ :param req: an instance of ``swob.Request``.
+ :param headers: Headers to be sent with request.
+ :return: a tuple comprising (an instance of ``swob.Response``or
+ ``None`` if no namespaces were found in cache, the cache state).
+ """
infocache = req.environ.setdefault('swift.infocache', {})
memcache = cache_from_env(req.environ, True)
cache_key = get_cache_key(self.account_name,
@@ -135,11 +153,10 @@ class ContainerController(Controller):
shard='listing')
resp_body = None
- cached_range_dicts = infocache.get(cache_key)
- if cached_range_dicts:
+ ns_bound_list = infocache.get(cache_key)
+ if ns_bound_list:
cache_state = 'infocache_hit'
- resp_body = self._make_shard_ranges_response_body(
- req, cached_range_dicts)
+ resp_body = self._make_namespaces_response_body(req, ns_bound_list)
elif memcache:
skip_chance = \
self.app.container_listing_shard_ranges_skip_cache
@@ -147,12 +164,20 @@ class ContainerController(Controller):
cache_state = 'skip'
else:
try:
- cached_range_dicts = memcache.get(
+ cached_namespaces = memcache.get(
cache_key, raise_on_error=True)
- if cached_range_dicts:
+ if cached_namespaces:
cache_state = 'hit'
- resp_body = self._make_shard_ranges_response_body(
- req, cached_range_dicts)
+ if six.PY2:
+ # json.loads() in memcache.get will convert json
+ # 'string' to 'unicode' with python2, here we cast
+ # 'unicode' back to 'str'
+ cached_namespaces = [
+ [lower.encode('utf-8'), name.encode('utf-8')]
+ for lower, name in cached_namespaces]
+ ns_bound_list = NamespaceBoundList(cached_namespaces)
+ resp_body = self._make_namespaces_response_body(
+ req, ns_bound_list)
else:
cache_state = 'miss'
except MemcacheConnectionError:
@@ -162,9 +187,9 @@ class ContainerController(Controller):
resp = None
else:
# shard ranges can be returned from cache
- infocache[cache_key] = tuple(cached_range_dicts)
+ infocache[cache_key] = ns_bound_list
self.logger.debug('Found %d shards in cache for %s',
- len(cached_range_dicts), req.path_qs)
+ len(ns_bound_list.bounds), req.path_qs)
headers.update({'x-backend-record-type': 'shard',
'x-backend-cached-results': 'true'})
# mimic GetOrHeadHandler.get_working_response...
@@ -180,36 +205,62 @@ class ContainerController(Controller):
return resp, cache_state
def _store_shard_ranges_in_cache(self, req, resp):
- # parse shard ranges returned from backend, store them in infocache and
- # memcache, and return a list of dicts
- cache_key = get_cache_key(self.account_name, self.container_name,
- shard='listing')
+ """
+ Parse shard ranges returned from backend, store them in both infocache
+ and memcache.
+
+ :param req: the request object.
+ :param resp: the response object for the shard range listing.
+ :return: an instance of
+ :class:`~swift.common.utils.NamespaceBoundList`.
+ """
+ # Note: Any gaps in the response's shard ranges will be 'lost' as a
+ # result of compacting the list of shard ranges to a
+ # NamespaceBoundList. That is ok. When the cached NamespaceBoundList is
+ # transformed back to shard range Namespaces to perform a listing, the
+ # Namespace before each gap will have expanded to include the gap,
+ # which means that the backend GET to that shard will have an
+ # end_marker beyond that shard's upper bound, and equal to the next
+ # available shard's lower. At worst, some misplaced objects, in the gap
+ # above the shard's upper, may be included in the shard's response.
data = self._parse_listing_response(req, resp)
backend_shard_ranges = self._parse_shard_ranges(req, data, resp)
if backend_shard_ranges is None:
return None
- cached_range_dicts = [dict(sr) for sr in backend_shard_ranges]
+ ns_bound_list = NamespaceBoundList.parse(backend_shard_ranges)
if resp.headers.get('x-backend-sharding-state') == 'sharded':
# cache in infocache even if no shard ranges returned; this
# is unexpected but use that result for this request
infocache = req.environ.setdefault('swift.infocache', {})
- infocache[cache_key] = tuple(cached_range_dicts)
+ cache_key = get_cache_key(
+ self.account_name, self.container_name, shard='listing')
+ infocache[cache_key] = ns_bound_list
memcache = cache_from_env(req.environ, True)
- if memcache and cached_range_dicts:
+ if memcache and ns_bound_list:
# cache in memcache only if shard ranges as expected
self.logger.debug('Caching %d shards for %s',
- len(cached_range_dicts), req.path_qs)
- memcache.set(cache_key, cached_range_dicts,
+ len(ns_bound_list.bounds), req.path_qs)
+ memcache.set(cache_key, ns_bound_list.bounds,
time=self.app.recheck_listing_shard_ranges)
- return cached_range_dicts
+ return ns_bound_list
def _get_shard_ranges_from_backend(self, req):
- # Make a backend request for shard ranges. The response is cached and
- # then returned as a list of dicts.
+ """
+ Make a backend request for shard ranges and return a response.
+
+ The response body will be a list of dicts each of which describes
+ a Namespace (i.e. includes the keys ``lower``, ``upper`` and ``name``).
+ If the response headers indicate that the response body contains a
+ complete list of shard ranges for a sharded container then the response
+ body will be transformed to a ``NamespaceBoundsList`` and cached.
+
+ :param req: an instance of ``swob.Request``.
+ :return: an instance of ``swob.Response``.
+ """
# Note: We instruct the backend server to ignore name constraints in
# request params if returning shard ranges so that the response can
- # potentially be cached. Only do this if the container state is
+ # potentially be cached, but we only cache it if the container state is
# 'sharded'. We don't attempt to cache shard ranges for a 'sharding'
# container as they may include the container itself as a 'gap filler'
# for shard ranges that have not yet cleaved; listings from 'gap
@@ -232,10 +283,10 @@ class ContainerController(Controller):
if (resp_record_type == 'shard' and
sharding_state == 'sharded' and
complete_listing):
- cached_range_dicts = self._store_shard_ranges_in_cache(req, resp)
- if cached_range_dicts:
- resp.body = self._make_shard_ranges_response_body(
- req, cached_range_dicts)
+ ns_bound_list = self._store_shard_ranges_in_cache(req, resp)
+ if ns_bound_list:
+ resp.body = self._make_namespaces_response_body(
+ req, ns_bound_list)
return resp
def _record_shard_listing_cache_metrics(
@@ -334,7 +385,6 @@ class ContainerController(Controller):
params['states'] = 'listing'
req.params = params
- memcache = cache_from_env(req.environ, True)
if (req.method == 'GET'
and get_param(req, 'states') == 'listing'
and record_type != 'object'):
@@ -346,6 +396,7 @@ class ContainerController(Controller):
info = None
may_get_listing_shards = False
+ memcache = cache_from_env(req.environ, True)
sr_cache_state = None
if (may_get_listing_shards and
self.app.recheck_listing_shard_ranges > 0
@@ -424,8 +475,15 @@ class ContainerController(Controller):
# 'X-Backend-Storage-Policy-Index'.
req.headers[policy_key] = resp.headers[policy_key]
shard_listing_history.append((self.account_name, self.container_name))
- shard_ranges = [ShardRange.from_dict(data)
- for data in json.loads(resp.body)]
+ # Note: when the response body has been synthesised from cached data,
+ # each item in the list only has 'name', 'lower' and 'upper' keys. We
+ # therefore cannot use ShardRange.from_dict(), and the ShardRange
+ # instances constructed here will only have 'name', 'lower' and 'upper'
+ # attributes set.
+ # Ideally we would construct Namespace objects here, but later we use
+ # the ShardRange account and container properties to access parsed
+ # parts of the name.
+ shard_ranges = [ShardRange(**data) for data in json.loads(resp.body)]
self.logger.debug('GET listing from %s shards for: %s',
len(shard_ranges), req.path_qs)
if not shard_ranges:
diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py
index b69631538..fc0f8a6d1 100644
--- a/swift/proxy/controllers/obj.py
+++ b/swift/proxy/controllers/obj.py
@@ -48,7 +48,7 @@ from swift.common.utils import (
normalize_delete_at_timestamp, public, get_expirer_container,
document_iters_to_http_response_body, parse_content_range,
quorum_size, reiterate, close_if_possible, safe_json_loads, md5,
- ShardRange, find_shard_range, cache_from_env, NamespaceBoundList)
+ ShardRange, find_namespace, cache_from_env, NamespaceBoundList)
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation
from swift.common import constraints
@@ -388,7 +388,7 @@ class BaseObjectController(Controller):
memcache.set(
cache_key, cached_namespaces.bounds,
time=self.app.recheck_updating_shard_ranges)
- update_shard = find_shard_range(obj, shard_ranges or [])
+ update_shard = find_namespace(obj, shard_ranges or [])
record_cache_op_metrics(
self.logger, 'shard_updating', cache_state, response)
return update_shard
@@ -1518,7 +1518,7 @@ class ECAppIter(object):
except ChunkWriteTimeout:
# slow client disconnect
self.logger.exception(
- "ChunkWriteTimeout fetching fragments for %r",
+ "ChunkWriteTimeout feeding fragments for %r",
quote(self.path))
except: # noqa
self.logger.exception("Exception fetching fragments for %r",
@@ -2497,10 +2497,10 @@ class ECFragGetter(object):
self.backend_headers = backend_headers
self.header_provider = header_provider
self.req_query_string = req.query_string
- self.client_chunk_size = policy.fragment_size
+ self.fragment_size = policy.fragment_size
self.skip_bytes = 0
self.bytes_used_from_backend = 0
- self.source = None
+ self.source = self.node = None
self.logger_thread_locals = logger_thread_locals
self.logger = logger
@@ -2578,8 +2578,8 @@ class ECFragGetter(object):
def learn_size_from_content_range(self, start, end, length):
"""
- If client_chunk_size is set, makes sure we yield things starting on
- chunk boundaries based on the Content-Range header in the response.
+ Make sure we yield things starting on fragment boundaries based on the
+ Content-Range header in the response.
Sets our Range header's first byterange to the value learned from
the Content-Range header in the response; if we were given a
@@ -2593,8 +2593,7 @@ class ECFragGetter(object):
if length == 0:
return
- if self.client_chunk_size:
- self.skip_bytes = bytes_to_skip(self.client_chunk_size, start)
+ self.skip_bytes = bytes_to_skip(self.fragment_size, start)
if 'Range' in self.backend_headers:
try:
@@ -2620,170 +2619,155 @@ class ECFragGetter(object):
it = self._get_response_parts_iter(req)
return it
- def _get_response_parts_iter(self, req):
- try:
- client_chunk_size = self.client_chunk_size
- node_timeout = self.app.recoverable_node_timeout
-
- # This is safe; it sets up a generator but does not call next()
- # on it, so no IO is performed.
- parts_iter = [
- http_response_to_document_iters(
- self.source, read_chunk_size=self.app.object_chunk_size)]
+ def get_next_doc_part(self):
+ node_timeout = self.app.recoverable_node_timeout
- def get_next_doc_part():
- while True:
- # the loop here is to resume if trying to parse
- # multipart/byteranges response raises a ChunkReadTimeout
- # and resets the parts_iter
- try:
- with WatchdogTimeout(self.app.watchdog, node_timeout,
- ChunkReadTimeout):
- # If we don't have a multipart/byteranges response,
- # but just a 200 or a single-range 206, then this
- # performs no IO, and just returns source (or
- # raises StopIteration).
- # Otherwise, this call to next() performs IO when
- # we have a multipart/byteranges response; as it
- # will read the MIME boundary and part headers.
- start_byte, end_byte, length, headers, part = next(
- parts_iter[0])
- return (start_byte, end_byte, length, headers, part)
- except ChunkReadTimeout:
- new_source, new_node = self._dig_for_source_and_node()
- if not new_source:
- raise
- self.app.error_occurred(
- self.node, 'Trying to read next part of '
- 'EC multi-part GET (retrying)')
- # Close-out the connection as best as possible.
- if getattr(self.source, 'swift_conn', None):
- close_swift_conn(self.source)
- self.source = new_source
- self.node = new_node
- # This is safe; it sets up a generator but does
- # not call next() on it, so no IO is performed.
- parts_iter[0] = http_response_to_document_iters(
+ while True:
+ # the loop here is to resume if trying to parse
+ # multipart/byteranges response raises a ChunkReadTimeout
+ # and resets the source_parts_iter
+ try:
+ with WatchdogTimeout(self.app.watchdog, node_timeout,
+ ChunkReadTimeout):
+ # If we don't have a multipart/byteranges response,
+ # but just a 200 or a single-range 206, then this
+ # performs no IO, and just returns source (or
+ # raises StopIteration).
+ # Otherwise, this call to next() performs IO when
+ # we have a multipart/byteranges response; as it
+ # will read the MIME boundary and part headers.
+ start_byte, end_byte, length, headers, part = next(
+ self.source_parts_iter)
+ return (start_byte, end_byte, length, headers, part)
+ except ChunkReadTimeout:
+ new_source, new_node = self._dig_for_source_and_node()
+ if not new_source:
+ raise
+ self.app.error_occurred(
+ self.node, 'Trying to read next part of '
+ 'EC multi-part GET (retrying)')
+ # Close-out the connection as best as possible.
+ if getattr(self.source, 'swift_conn', None):
+ close_swift_conn(self.source)
+ self.source = new_source
+ self.node = new_node
+ # This is safe; it sets up a generator but does
+ # not call next() on it, so no IO is performed.
+ self.source_parts_iter = \
+ http_response_to_document_iters(
+ new_source,
+ read_chunk_size=self.app.object_chunk_size)
+
+ def iter_bytes_from_response_part(self, part_file, nbytes):
+ nchunks = 0
+ buf = b''
+ part_file = ByteCountEnforcer(part_file, nbytes)
+ while True:
+ try:
+ with WatchdogTimeout(self.app.watchdog,
+ self.app.recoverable_node_timeout,
+ ChunkReadTimeout):
+ chunk = part_file.read(self.app.object_chunk_size)
+ nchunks += 1
+ # NB: this append must be *inside* the context
+ # manager for test.unit.SlowBody to do its thing
+ buf += chunk
+ if nbytes is not None:
+ nbytes -= len(chunk)
+ except (ChunkReadTimeout, ShortReadError):
+ exc_type, exc_value, exc_traceback = sys.exc_info()
+ try:
+ self.fast_forward(self.bytes_used_from_backend)
+ except (HTTPException, ValueError):
+ self.logger.exception('Unable to fast forward')
+ six.reraise(exc_type, exc_value, exc_traceback)
+ except RangeAlreadyComplete:
+ break
+ buf = b''
+ old_node = self.node
+ new_source, new_node = self._dig_for_source_and_node()
+ if new_source:
+ self.app.error_occurred(
+ old_node, 'Trying to read EC fragment '
+ 'during GET (retrying)')
+ # Close-out the connection as best as possible.
+ if getattr(self.source, 'swift_conn', None):
+ close_swift_conn(self.source)
+ self.source = new_source
+ self.node = new_node
+ # This is safe; it just sets up a generator but
+ # does not call next() on it, so no IO is
+ # performed.
+ self.source_parts_iter = \
+ http_response_to_document_iters(
new_source,
read_chunk_size=self.app.object_chunk_size)
-
- def iter_bytes_from_response_part(part_file, nbytes):
- nchunks = 0
- buf = b''
- part_file = ByteCountEnforcer(part_file, nbytes)
- while True:
try:
- with WatchdogTimeout(self.app.watchdog, node_timeout,
- ChunkReadTimeout):
- chunk = part_file.read(self.app.object_chunk_size)
- nchunks += 1
- # NB: this append must be *inside* the context
- # manager for test.unit.SlowBody to do its thing
- buf += chunk
- if nbytes is not None:
- nbytes -= len(chunk)
- except (ChunkReadTimeout, ShortReadError):
- exc_type, exc_value, exc_traceback = sys.exc_info()
- try:
- self.fast_forward(self.bytes_used_from_backend)
- except (HTTPException, ValueError):
- self.logger.exception('Unable to fast forward')
- six.reraise(exc_type, exc_value, exc_traceback)
- except RangeAlreadyComplete:
- break
- buf = b''
- old_node = self.node
- new_source, new_node = self._dig_for_source_and_node()
- if new_source:
- self.app.error_occurred(
- old_node, 'Trying to read EC fragment '
- 'during GET (retrying)')
- # Close-out the connection as best as possible.
- if getattr(self.source, 'swift_conn', None):
- close_swift_conn(self.source)
- self.source = new_source
- self.node = new_node
- # This is safe; it just sets up a generator but
- # does not call next() on it, so no IO is
- # performed.
- parts_iter[0] = http_response_to_document_iters(
- new_source,
- read_chunk_size=self.app.object_chunk_size)
- try:
- _junk, _junk, _junk, _junk, part_file = \
- get_next_doc_part()
- except StopIteration:
- # it's not clear to me how to make
- # get_next_doc_part raise StopIteration for the
- # first doc part of a new request
- six.reraise(exc_type, exc_value, exc_traceback)
- part_file = ByteCountEnforcer(part_file, nbytes)
- else:
- six.reraise(exc_type, exc_value, exc_traceback)
+ _junk, _junk, _junk, _junk, part_file = \
+ self.get_next_doc_part()
+ except StopIteration:
+ # it's not clear to me how to make
+ # get_next_doc_part raise StopIteration for the
+ # first doc part of a new request
+ six.reraise(exc_type, exc_value, exc_traceback)
+ part_file = ByteCountEnforcer(part_file, nbytes)
+ else:
+ six.reraise(exc_type, exc_value, exc_traceback)
+ else:
+ if buf and self.skip_bytes:
+ if self.skip_bytes < len(buf):
+ buf = buf[self.skip_bytes:]
+ self.bytes_used_from_backend += self.skip_bytes
+ self.skip_bytes = 0
else:
- if buf and self.skip_bytes:
- if self.skip_bytes < len(buf):
- buf = buf[self.skip_bytes:]
- self.bytes_used_from_backend += self.skip_bytes
- self.skip_bytes = 0
- else:
- self.skip_bytes -= len(buf)
- self.bytes_used_from_backend += len(buf)
- buf = b''
-
- if not chunk:
- if buf:
- with WatchdogTimeout(self.app.watchdog,
- self.app.client_timeout,
- ChunkWriteTimeout):
- self.bytes_used_from_backend += len(buf)
- yield buf
- buf = b''
- break
-
- if client_chunk_size is not None:
- while len(buf) >= client_chunk_size:
- client_chunk = buf[:client_chunk_size]
- buf = buf[client_chunk_size:]
- with WatchdogTimeout(self.app.watchdog,
- self.app.client_timeout,
- ChunkWriteTimeout):
- self.bytes_used_from_backend += \
- len(client_chunk)
- yield client_chunk
- else:
- with WatchdogTimeout(self.app.watchdog,
- self.app.client_timeout,
- ChunkWriteTimeout):
- self.bytes_used_from_backend += len(buf)
- yield buf
- buf = b''
-
- # This is for fairness; if the network is outpacing
- # the CPU, we'll always be able to read and write
- # data without encountering an EWOULDBLOCK, and so
- # eventlet will not switch greenthreads on its own.
- # We do it manually so that clients don't starve.
- #
- # The number 5 here was chosen by making stuff up.
- # It's not every single chunk, but it's not too big
- # either, so it seemed like it would probably be an
- # okay choice.
- #
- # Note that we may trampoline to other greenthreads
- # more often than once every 5 chunks, depending on
- # how blocking our network IO is; the explicit sleep
- # here simply provides a lower bound on the rate of
- # trampolining.
- if nchunks % 5 == 0:
- sleep()
+ self.skip_bytes -= len(buf)
+ self.bytes_used_from_backend += len(buf)
+ buf = b''
+
+ while buf and (len(buf) >= self.fragment_size or not chunk):
+ client_chunk = buf[:self.fragment_size]
+ buf = buf[self.fragment_size:]
+ with WatchdogTimeout(self.app.watchdog,
+ self.app.client_timeout,
+ ChunkWriteTimeout):
+ self.bytes_used_from_backend += len(client_chunk)
+ yield client_chunk
+
+ if not chunk:
+ break
+
+ # This is for fairness; if the network is outpacing
+ # the CPU, we'll always be able to read and write
+ # data without encountering an EWOULDBLOCK, and so
+ # eventlet will not switch greenthreads on its own.
+ # We do it manually so that clients don't starve.
+ #
+ # The number 5 here was chosen by making stuff up.
+ # It's not every single chunk, but it's not too big
+ # either, so it seemed like it would probably be an
+ # okay choice.
+ #
+ # Note that we may trampoline to other greenthreads
+ # more often than once every 5 chunks, depending on
+ # how blocking our network IO is; the explicit sleep
+ # here simply provides a lower bound on the rate of
+ # trampolining.
+ if nchunks % 5 == 0:
+ sleep()
+
+ def _get_response_parts_iter(self, req):
+ try:
+ # This is safe; it sets up a generator but does not call next()
+ # on it, so no IO is performed.
+ self.source_parts_iter = http_response_to_document_iters(
+ self.source, read_chunk_size=self.app.object_chunk_size)
part_iter = None
try:
while True:
try:
start_byte, end_byte, length, headers, part = \
- get_next_doc_part()
+ self.get_next_doc_part()
except StopIteration:
# it seems this is the only way out of the loop; not
# sure why the req.environ update is always needed
@@ -2800,7 +2784,8 @@ class ECFragGetter(object):
if (end_byte is not None
and start_byte is not None)
else None)
- part_iter = iter_bytes_from_response_part(part, byte_count)
+ part_iter = self.iter_bytes_from_response_part(
+ part, byte_count)
yield {'start_byte': start_byte, 'end_byte': end_byte,
'entity_length': length, 'headers': headers,
'part_iter': part_iter}