summaryrefslogtreecommitdiff
path: root/swift/proxy/controllers/obj.py
diff options
context:
space:
mode:
Diffstat (limited to 'swift/proxy/controllers/obj.py')
-rw-r--r--swift/proxy/controllers/obj.py310
1 files changed, 156 insertions, 154 deletions
diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py
index 7149ecfb7..057c77ffa 100644
--- a/swift/proxy/controllers/obj.py
+++ b/swift/proxy/controllers/obj.py
@@ -2620,171 +2620,172 @@ class ECFragGetter(object):
it = self._get_response_parts_iter(req)
return it
+ def get_next_doc_part(self):
+ node_timeout = self.app.recoverable_node_timeout
+
+ while True:
+ # the loop here is to resume if trying to parse
+ # multipart/byteranges response raises a ChunkReadTimeout
+ # and resets the source_parts_iter
+ try:
+ with WatchdogTimeout(self.app.watchdog, node_timeout,
+ ChunkReadTimeout):
+ # If we don't have a multipart/byteranges response,
+ # but just a 200 or a single-range 206, then this
+ # performs no IO, and just returns source (or
+ # raises StopIteration).
+ # Otherwise, this call to next() performs IO when
+ # we have a multipart/byteranges response; as it
+ # will read the MIME boundary and part headers.
+ start_byte, end_byte, length, headers, part = next(
+ self.source_parts_iter)
+ return (start_byte, end_byte, length, headers, part)
+ except ChunkReadTimeout:
+ new_source, new_node = self._dig_for_source_and_node()
+ if not new_source:
+ raise
+ self.app.error_occurred(
+ self.node, 'Trying to read next part of '
+ 'EC multi-part GET (retrying)')
+ # Close-out the connection as best as possible.
+ if getattr(self.source, 'swift_conn', None):
+ close_swift_conn(self.source)
+ self.source = new_source
+ self.node = new_node
+ # This is safe; it sets up a generator but does
+ # not call next() on it, so no IO is performed.
+ self.source_parts_iter = \
+ http_response_to_document_iters(
+ new_source,
+ read_chunk_size=self.app.object_chunk_size)
+
+ def iter_bytes_from_response_part(self, part_file, nbytes):
+ client_chunk_size = self.client_chunk_size
+ node_timeout = self.app.recoverable_node_timeout
+ nchunks = 0
+ buf = b''
+ part_file = ByteCountEnforcer(part_file, nbytes)
+ while True:
+ try:
+ with WatchdogTimeout(self.app.watchdog, node_timeout,
+ ChunkReadTimeout):
+ chunk = part_file.read(self.app.object_chunk_size)
+ nchunks += 1
+ # NB: this append must be *inside* the context
+ # manager for test.unit.SlowBody to do its thing
+ buf += chunk
+ if nbytes is not None:
+ nbytes -= len(chunk)
+ except (ChunkReadTimeout, ShortReadError):
+ exc_type, exc_value, exc_traceback = sys.exc_info()
+ try:
+ self.fast_forward(self.bytes_used_from_backend)
+ except (HTTPException, ValueError):
+ self.logger.exception('Unable to fast forward')
+ six.reraise(exc_type, exc_value, exc_traceback)
+ except RangeAlreadyComplete:
+ break
+ buf = b''
+ old_node = self.node
+ new_source, new_node = self._dig_for_source_and_node()
+ if new_source:
+ self.app.error_occurred(
+ old_node, 'Trying to read EC fragment '
+ 'during GET (retrying)')
+ # Close-out the connection as best as possible.
+ if getattr(self.source, 'swift_conn', None):
+ close_swift_conn(self.source)
+ self.source = new_source
+ self.node = new_node
+ # This is safe; it just sets up a generator but
+ # does not call next() on it, so no IO is
+ # performed.
+ self.source_parts_iter = \
+ http_response_to_document_iters(
+ new_source,
+ read_chunk_size=self.app.object_chunk_size)
+ try:
+ _junk, _junk, _junk, _junk, part_file = \
+ self.get_next_doc_part()
+ except StopIteration:
+ # it's not clear to me how to make
+ # get_next_doc_part raise StopIteration for the
+ # first doc part of a new request
+ six.reraise(exc_type, exc_value, exc_traceback)
+ part_file = ByteCountEnforcer(part_file, nbytes)
+ else:
+ six.reraise(exc_type, exc_value, exc_traceback)
+ else:
+ if buf and self.skip_bytes:
+ if self.skip_bytes < len(buf):
+ buf = buf[self.skip_bytes:]
+ self.bytes_used_from_backend += self.skip_bytes
+ self.skip_bytes = 0
+ else:
+ self.skip_bytes -= len(buf)
+ self.bytes_used_from_backend += len(buf)
+ buf = b''
+
+ if not chunk:
+ if buf:
+ with WatchdogTimeout(self.app.watchdog,
+ self.app.client_timeout,
+ ChunkWriteTimeout):
+ self.bytes_used_from_backend += len(buf)
+ yield buf
+ buf = b''
+ break
+
+ if client_chunk_size is not None:
+ while len(buf) >= client_chunk_size:
+ client_chunk = buf[:client_chunk_size]
+ buf = buf[client_chunk_size:]
+ with WatchdogTimeout(self.app.watchdog,
+ self.app.client_timeout,
+ ChunkWriteTimeout):
+ self.bytes_used_from_backend += \
+ len(client_chunk)
+ yield client_chunk
+ else:
+ with WatchdogTimeout(self.app.watchdog,
+ self.app.client_timeout,
+ ChunkWriteTimeout):
+ self.bytes_used_from_backend += len(buf)
+ yield buf
+ buf = b''
+
+ # This is for fairness; if the network is outpacing
+ # the CPU, we'll always be able to read and write
+ # data without encountering an EWOULDBLOCK, and so
+ # eventlet will not switch greenthreads on its own.
+ # We do it manually so that clients don't starve.
+ #
+ # The number 5 here was chosen by making stuff up.
+ # It's not every single chunk, but it's not too big
+ # either, so it seemed like it would probably be an
+ # okay choice.
+ #
+ # Note that we may trampoline to other greenthreads
+ # more often than once every 5 chunks, depending on
+ # how blocking our network IO is; the explicit sleep
+ # here simply provides a lower bound on the rate of
+ # trampolining.
+ if nchunks % 5 == 0:
+ sleep()
+
def _get_response_parts_iter(self, req):
try:
- client_chunk_size = self.client_chunk_size
- node_timeout = self.app.recoverable_node_timeout
-
# This is safe; it sets up a generator but does not call next()
# on it, so no IO is performed.
self.source_parts_iter = http_response_to_document_iters(
self.source, read_chunk_size=self.app.object_chunk_size)
- def get_next_doc_part():
- while True:
- # the loop here is to resume if trying to parse
- # multipart/byteranges response raises a ChunkReadTimeout
- # and resets the source_parts_iter
- try:
- with WatchdogTimeout(self.app.watchdog, node_timeout,
- ChunkReadTimeout):
- # If we don't have a multipart/byteranges response,
- # but just a 200 or a single-range 206, then this
- # performs no IO, and just returns source (or
- # raises StopIteration).
- # Otherwise, this call to next() performs IO when
- # we have a multipart/byteranges response; as it
- # will read the MIME boundary and part headers.
- start_byte, end_byte, length, headers, part = next(
- self.source_parts_iter)
- return (start_byte, end_byte, length, headers, part)
- except ChunkReadTimeout:
- new_source, new_node = self._dig_for_source_and_node()
- if not new_source:
- raise
- self.app.error_occurred(
- self.node, 'Trying to read next part of '
- 'EC multi-part GET (retrying)')
- # Close-out the connection as best as possible.
- if getattr(self.source, 'swift_conn', None):
- close_swift_conn(self.source)
- self.source = new_source
- self.node = new_node
- # This is safe; it sets up a generator but does
- # not call next() on it, so no IO is performed.
- self.source_parts_iter = \
- http_response_to_document_iters(
- new_source,
- read_chunk_size=self.app.object_chunk_size)
-
- def iter_bytes_from_response_part(part_file, nbytes):
- nchunks = 0
- buf = b''
- part_file = ByteCountEnforcer(part_file, nbytes)
- while True:
- try:
- with WatchdogTimeout(self.app.watchdog, node_timeout,
- ChunkReadTimeout):
- chunk = part_file.read(self.app.object_chunk_size)
- nchunks += 1
- # NB: this append must be *inside* the context
- # manager for test.unit.SlowBody to do its thing
- buf += chunk
- if nbytes is not None:
- nbytes -= len(chunk)
- except (ChunkReadTimeout, ShortReadError):
- exc_type, exc_value, exc_traceback = sys.exc_info()
- try:
- self.fast_forward(self.bytes_used_from_backend)
- except (HTTPException, ValueError):
- self.logger.exception('Unable to fast forward')
- six.reraise(exc_type, exc_value, exc_traceback)
- except RangeAlreadyComplete:
- break
- buf = b''
- old_node = self.node
- new_source, new_node = self._dig_for_source_and_node()
- if new_source:
- self.app.error_occurred(
- old_node, 'Trying to read EC fragment '
- 'during GET (retrying)')
- # Close-out the connection as best as possible.
- if getattr(self.source, 'swift_conn', None):
- close_swift_conn(self.source)
- self.source = new_source
- self.node = new_node
- # This is safe; it just sets up a generator but
- # does not call next() on it, so no IO is
- # performed.
- self.source_parts_iter = \
- http_response_to_document_iters(
- new_source,
- read_chunk_size=self.app.object_chunk_size)
- try:
- _junk, _junk, _junk, _junk, part_file = \
- get_next_doc_part()
- except StopIteration:
- # it's not clear to me how to make
- # get_next_doc_part raise StopIteration for the
- # first doc part of a new request
- six.reraise(exc_type, exc_value, exc_traceback)
- part_file = ByteCountEnforcer(part_file, nbytes)
- else:
- six.reraise(exc_type, exc_value, exc_traceback)
- else:
- if buf and self.skip_bytes:
- if self.skip_bytes < len(buf):
- buf = buf[self.skip_bytes:]
- self.bytes_used_from_backend += self.skip_bytes
- self.skip_bytes = 0
- else:
- self.skip_bytes -= len(buf)
- self.bytes_used_from_backend += len(buf)
- buf = b''
-
- if not chunk:
- if buf:
- with WatchdogTimeout(self.app.watchdog,
- self.app.client_timeout,
- ChunkWriteTimeout):
- self.bytes_used_from_backend += len(buf)
- yield buf
- buf = b''
- break
-
- if client_chunk_size is not None:
- while len(buf) >= client_chunk_size:
- client_chunk = buf[:client_chunk_size]
- buf = buf[client_chunk_size:]
- with WatchdogTimeout(self.app.watchdog,
- self.app.client_timeout,
- ChunkWriteTimeout):
- self.bytes_used_from_backend += \
- len(client_chunk)
- yield client_chunk
- else:
- with WatchdogTimeout(self.app.watchdog,
- self.app.client_timeout,
- ChunkWriteTimeout):
- self.bytes_used_from_backend += len(buf)
- yield buf
- buf = b''
-
- # This is for fairness; if the network is outpacing
- # the CPU, we'll always be able to read and write
- # data without encountering an EWOULDBLOCK, and so
- # eventlet will not switch greenthreads on its own.
- # We do it manually so that clients don't starve.
- #
- # The number 5 here was chosen by making stuff up.
- # It's not every single chunk, but it's not too big
- # either, so it seemed like it would probably be an
- # okay choice.
- #
- # Note that we may trampoline to other greenthreads
- # more often than once every 5 chunks, depending on
- # how blocking our network IO is; the explicit sleep
- # here simply provides a lower bound on the rate of
- # trampolining.
- if nchunks % 5 == 0:
- sleep()
-
part_iter = None
try:
while True:
try:
start_byte, end_byte, length, headers, part = \
- get_next_doc_part()
+ self.get_next_doc_part()
except StopIteration:
# it seems this is the only way out of the loop; not
# sure why the req.environ update is always needed
@@ -2801,7 +2802,8 @@ class ECFragGetter(object):
if (end_byte is not None
and start_byte is not None)
else None)
- part_iter = iter_bytes_from_response_part(part, byte_count)
+ part_iter = self.iter_bytes_from_response_part(
+ part, byte_count)
yield {'start_byte': start_byte, 'end_byte': end_byte,
'entity_length': length, 'headers': headers,
'part_iter': part_iter}