summaryrefslogtreecommitdiff
path: root/swift/proxy/controllers/obj.py
diff options
context:
space:
mode:
Diffstat (limited to 'swift/proxy/controllers/obj.py')
-rw-r--r--swift/proxy/controllers/obj.py311
1 files changed, 148 insertions, 163 deletions
diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py
index b69631538..fc0f8a6d1 100644
--- a/swift/proxy/controllers/obj.py
+++ b/swift/proxy/controllers/obj.py
@@ -48,7 +48,7 @@ from swift.common.utils import (
normalize_delete_at_timestamp, public, get_expirer_container,
document_iters_to_http_response_body, parse_content_range,
quorum_size, reiterate, close_if_possible, safe_json_loads, md5,
- ShardRange, find_shard_range, cache_from_env, NamespaceBoundList)
+ ShardRange, find_namespace, cache_from_env, NamespaceBoundList)
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation
from swift.common import constraints
@@ -388,7 +388,7 @@ class BaseObjectController(Controller):
memcache.set(
cache_key, cached_namespaces.bounds,
time=self.app.recheck_updating_shard_ranges)
- update_shard = find_shard_range(obj, shard_ranges or [])
+ update_shard = find_namespace(obj, shard_ranges or [])
record_cache_op_metrics(
self.logger, 'shard_updating', cache_state, response)
return update_shard
@@ -1518,7 +1518,7 @@ class ECAppIter(object):
except ChunkWriteTimeout:
# slow client disconnect
self.logger.exception(
- "ChunkWriteTimeout fetching fragments for %r",
+ "ChunkWriteTimeout feeding fragments for %r",
quote(self.path))
except: # noqa
self.logger.exception("Exception fetching fragments for %r",
@@ -2497,10 +2497,10 @@ class ECFragGetter(object):
self.backend_headers = backend_headers
self.header_provider = header_provider
self.req_query_string = req.query_string
- self.client_chunk_size = policy.fragment_size
+ self.fragment_size = policy.fragment_size
self.skip_bytes = 0
self.bytes_used_from_backend = 0
- self.source = None
+ self.source = self.node = None
self.logger_thread_locals = logger_thread_locals
self.logger = logger
@@ -2578,8 +2578,8 @@ class ECFragGetter(object):
def learn_size_from_content_range(self, start, end, length):
"""
- If client_chunk_size is set, makes sure we yield things starting on
- chunk boundaries based on the Content-Range header in the response.
+ Make sure we yield things starting on fragment boundaries based on the
+ Content-Range header in the response.
Sets our Range header's first byterange to the value learned from
the Content-Range header in the response; if we were given a
@@ -2593,8 +2593,7 @@ class ECFragGetter(object):
if length == 0:
return
- if self.client_chunk_size:
- self.skip_bytes = bytes_to_skip(self.client_chunk_size, start)
+ self.skip_bytes = bytes_to_skip(self.fragment_size, start)
if 'Range' in self.backend_headers:
try:
@@ -2620,170 +2619,155 @@ class ECFragGetter(object):
it = self._get_response_parts_iter(req)
return it
- def _get_response_parts_iter(self, req):
- try:
- client_chunk_size = self.client_chunk_size
- node_timeout = self.app.recoverable_node_timeout
-
- # This is safe; it sets up a generator but does not call next()
- # on it, so no IO is performed.
- parts_iter = [
- http_response_to_document_iters(
- self.source, read_chunk_size=self.app.object_chunk_size)]
+ def get_next_doc_part(self):
+ node_timeout = self.app.recoverable_node_timeout
- def get_next_doc_part():
- while True:
- # the loop here is to resume if trying to parse
- # multipart/byteranges response raises a ChunkReadTimeout
- # and resets the parts_iter
- try:
- with WatchdogTimeout(self.app.watchdog, node_timeout,
- ChunkReadTimeout):
- # If we don't have a multipart/byteranges response,
- # but just a 200 or a single-range 206, then this
- # performs no IO, and just returns source (or
- # raises StopIteration).
- # Otherwise, this call to next() performs IO when
- # we have a multipart/byteranges response; as it
- # will read the MIME boundary and part headers.
- start_byte, end_byte, length, headers, part = next(
- parts_iter[0])
- return (start_byte, end_byte, length, headers, part)
- except ChunkReadTimeout:
- new_source, new_node = self._dig_for_source_and_node()
- if not new_source:
- raise
- self.app.error_occurred(
- self.node, 'Trying to read next part of '
- 'EC multi-part GET (retrying)')
- # Close-out the connection as best as possible.
- if getattr(self.source, 'swift_conn', None):
- close_swift_conn(self.source)
- self.source = new_source
- self.node = new_node
- # This is safe; it sets up a generator but does
- # not call next() on it, so no IO is performed.
- parts_iter[0] = http_response_to_document_iters(
+ while True:
+ # the loop here is to resume if trying to parse
+ # multipart/byteranges response raises a ChunkReadTimeout
+ # and resets the source_parts_iter
+ try:
+ with WatchdogTimeout(self.app.watchdog, node_timeout,
+ ChunkReadTimeout):
+ # If we don't have a multipart/byteranges response,
+ # but just a 200 or a single-range 206, then this
+ # performs no IO, and just returns source (or
+ # raises StopIteration).
+ # Otherwise, this call to next() performs IO when
+ # we have a multipart/byteranges response; as it
+ # will read the MIME boundary and part headers.
+ start_byte, end_byte, length, headers, part = next(
+ self.source_parts_iter)
+ return (start_byte, end_byte, length, headers, part)
+ except ChunkReadTimeout:
+ new_source, new_node = self._dig_for_source_and_node()
+ if not new_source:
+ raise
+ self.app.error_occurred(
+ self.node, 'Trying to read next part of '
+ 'EC multi-part GET (retrying)')
+ # Close-out the connection as best as possible.
+ if getattr(self.source, 'swift_conn', None):
+ close_swift_conn(self.source)
+ self.source = new_source
+ self.node = new_node
+ # This is safe; it sets up a generator but does
+ # not call next() on it, so no IO is performed.
+ self.source_parts_iter = \
+ http_response_to_document_iters(
+ new_source,
+ read_chunk_size=self.app.object_chunk_size)
+
+ def iter_bytes_from_response_part(self, part_file, nbytes):
+ nchunks = 0
+ buf = b''
+ part_file = ByteCountEnforcer(part_file, nbytes)
+ while True:
+ try:
+ with WatchdogTimeout(self.app.watchdog,
+ self.app.recoverable_node_timeout,
+ ChunkReadTimeout):
+ chunk = part_file.read(self.app.object_chunk_size)
+ nchunks += 1
+ # NB: this append must be *inside* the context
+ # manager for test.unit.SlowBody to do its thing
+ buf += chunk
+ if nbytes is not None:
+ nbytes -= len(chunk)
+ except (ChunkReadTimeout, ShortReadError):
+ exc_type, exc_value, exc_traceback = sys.exc_info()
+ try:
+ self.fast_forward(self.bytes_used_from_backend)
+ except (HTTPException, ValueError):
+ self.logger.exception('Unable to fast forward')
+ six.reraise(exc_type, exc_value, exc_traceback)
+ except RangeAlreadyComplete:
+ break
+ buf = b''
+ old_node = self.node
+ new_source, new_node = self._dig_for_source_and_node()
+ if new_source:
+ self.app.error_occurred(
+ old_node, 'Trying to read EC fragment '
+ 'during GET (retrying)')
+ # Close-out the connection as best as possible.
+ if getattr(self.source, 'swift_conn', None):
+ close_swift_conn(self.source)
+ self.source = new_source
+ self.node = new_node
+ # This is safe; it just sets up a generator but
+ # does not call next() on it, so no IO is
+ # performed.
+ self.source_parts_iter = \
+ http_response_to_document_iters(
new_source,
read_chunk_size=self.app.object_chunk_size)
-
- def iter_bytes_from_response_part(part_file, nbytes):
- nchunks = 0
- buf = b''
- part_file = ByteCountEnforcer(part_file, nbytes)
- while True:
try:
- with WatchdogTimeout(self.app.watchdog, node_timeout,
- ChunkReadTimeout):
- chunk = part_file.read(self.app.object_chunk_size)
- nchunks += 1
- # NB: this append must be *inside* the context
- # manager for test.unit.SlowBody to do its thing
- buf += chunk
- if nbytes is not None:
- nbytes -= len(chunk)
- except (ChunkReadTimeout, ShortReadError):
- exc_type, exc_value, exc_traceback = sys.exc_info()
- try:
- self.fast_forward(self.bytes_used_from_backend)
- except (HTTPException, ValueError):
- self.logger.exception('Unable to fast forward')
- six.reraise(exc_type, exc_value, exc_traceback)
- except RangeAlreadyComplete:
- break
- buf = b''
- old_node = self.node
- new_source, new_node = self._dig_for_source_and_node()
- if new_source:
- self.app.error_occurred(
- old_node, 'Trying to read EC fragment '
- 'during GET (retrying)')
- # Close-out the connection as best as possible.
- if getattr(self.source, 'swift_conn', None):
- close_swift_conn(self.source)
- self.source = new_source
- self.node = new_node
- # This is safe; it just sets up a generator but
- # does not call next() on it, so no IO is
- # performed.
- parts_iter[0] = http_response_to_document_iters(
- new_source,
- read_chunk_size=self.app.object_chunk_size)
- try:
- _junk, _junk, _junk, _junk, part_file = \
- get_next_doc_part()
- except StopIteration:
- # it's not clear to me how to make
- # get_next_doc_part raise StopIteration for the
- # first doc part of a new request
- six.reraise(exc_type, exc_value, exc_traceback)
- part_file = ByteCountEnforcer(part_file, nbytes)
- else:
- six.reraise(exc_type, exc_value, exc_traceback)
+ _junk, _junk, _junk, _junk, part_file = \
+ self.get_next_doc_part()
+ except StopIteration:
+ # it's not clear to me how to make
+ # get_next_doc_part raise StopIteration for the
+ # first doc part of a new request
+ six.reraise(exc_type, exc_value, exc_traceback)
+ part_file = ByteCountEnforcer(part_file, nbytes)
+ else:
+ six.reraise(exc_type, exc_value, exc_traceback)
+ else:
+ if buf and self.skip_bytes:
+ if self.skip_bytes < len(buf):
+ buf = buf[self.skip_bytes:]
+ self.bytes_used_from_backend += self.skip_bytes
+ self.skip_bytes = 0
else:
- if buf and self.skip_bytes:
- if self.skip_bytes < len(buf):
- buf = buf[self.skip_bytes:]
- self.bytes_used_from_backend += self.skip_bytes
- self.skip_bytes = 0
- else:
- self.skip_bytes -= len(buf)
- self.bytes_used_from_backend += len(buf)
- buf = b''
-
- if not chunk:
- if buf:
- with WatchdogTimeout(self.app.watchdog,
- self.app.client_timeout,
- ChunkWriteTimeout):
- self.bytes_used_from_backend += len(buf)
- yield buf
- buf = b''
- break
-
- if client_chunk_size is not None:
- while len(buf) >= client_chunk_size:
- client_chunk = buf[:client_chunk_size]
- buf = buf[client_chunk_size:]
- with WatchdogTimeout(self.app.watchdog,
- self.app.client_timeout,
- ChunkWriteTimeout):
- self.bytes_used_from_backend += \
- len(client_chunk)
- yield client_chunk
- else:
- with WatchdogTimeout(self.app.watchdog,
- self.app.client_timeout,
- ChunkWriteTimeout):
- self.bytes_used_from_backend += len(buf)
- yield buf
- buf = b''
-
- # This is for fairness; if the network is outpacing
- # the CPU, we'll always be able to read and write
- # data without encountering an EWOULDBLOCK, and so
- # eventlet will not switch greenthreads on its own.
- # We do it manually so that clients don't starve.
- #
- # The number 5 here was chosen by making stuff up.
- # It's not every single chunk, but it's not too big
- # either, so it seemed like it would probably be an
- # okay choice.
- #
- # Note that we may trampoline to other greenthreads
- # more often than once every 5 chunks, depending on
- # how blocking our network IO is; the explicit sleep
- # here simply provides a lower bound on the rate of
- # trampolining.
- if nchunks % 5 == 0:
- sleep()
+ self.skip_bytes -= len(buf)
+ self.bytes_used_from_backend += len(buf)
+ buf = b''
+
+ while buf and (len(buf) >= self.fragment_size or not chunk):
+ client_chunk = buf[:self.fragment_size]
+ buf = buf[self.fragment_size:]
+ with WatchdogTimeout(self.app.watchdog,
+ self.app.client_timeout,
+ ChunkWriteTimeout):
+ self.bytes_used_from_backend += len(client_chunk)
+ yield client_chunk
+
+ if not chunk:
+ break
+
+ # This is for fairness; if the network is outpacing
+ # the CPU, we'll always be able to read and write
+ # data without encountering an EWOULDBLOCK, and so
+ # eventlet will not switch greenthreads on its own.
+ # We do it manually so that clients don't starve.
+ #
+ # The number 5 here was chosen by making stuff up.
+ # It's not every single chunk, but it's not too big
+ # either, so it seemed like it would probably be an
+ # okay choice.
+ #
+ # Note that we may trampoline to other greenthreads
+ # more often than once every 5 chunks, depending on
+ # how blocking our network IO is; the explicit sleep
+ # here simply provides a lower bound on the rate of
+ # trampolining.
+ if nchunks % 5 == 0:
+ sleep()
+
+ def _get_response_parts_iter(self, req):
+ try:
+ # This is safe; it sets up a generator but does not call next()
+ # on it, so no IO is performed.
+ self.source_parts_iter = http_response_to_document_iters(
+ self.source, read_chunk_size=self.app.object_chunk_size)
part_iter = None
try:
while True:
try:
start_byte, end_byte, length, headers, part = \
- get_next_doc_part()
+ self.get_next_doc_part()
except StopIteration:
# it seems this is the only way out of the loop; not
# sure why the req.environ update is always needed
@@ -2800,7 +2784,8 @@ class ECFragGetter(object):
if (end_byte is not None
and start_byte is not None)
else None)
- part_iter = iter_bytes_from_response_part(part, byte_count)
+ part_iter = self.iter_bytes_from_response_part(
+ part, byte_count)
yield {'start_byte': start_byte, 'end_byte': end_byte,
'entity_length': length, 'headers': headers,
'part_iter': part_iter}