diff options
Diffstat (limited to 'swift/proxy/controllers/obj.py')
-rw-r--r-- | swift/proxy/controllers/obj.py | 311 |
1 files changed, 148 insertions, 163 deletions
diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index b69631538..fc0f8a6d1 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -48,7 +48,7 @@ from swift.common.utils import ( normalize_delete_at_timestamp, public, get_expirer_container, document_iters_to_http_response_body, parse_content_range, quorum_size, reiterate, close_if_possible, safe_json_loads, md5, - ShardRange, find_shard_range, cache_from_env, NamespaceBoundList) + ShardRange, find_namespace, cache_from_env, NamespaceBoundList) from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_metadata, check_object_creation from swift.common import constraints @@ -388,7 +388,7 @@ class BaseObjectController(Controller): memcache.set( cache_key, cached_namespaces.bounds, time=self.app.recheck_updating_shard_ranges) - update_shard = find_shard_range(obj, shard_ranges or []) + update_shard = find_namespace(obj, shard_ranges or []) record_cache_op_metrics( self.logger, 'shard_updating', cache_state, response) return update_shard @@ -1518,7 +1518,7 @@ class ECAppIter(object): except ChunkWriteTimeout: # slow client disconnect self.logger.exception( - "ChunkWriteTimeout fetching fragments for %r", + "ChunkWriteTimeout feeding fragments for %r", quote(self.path)) except: # noqa self.logger.exception("Exception fetching fragments for %r", @@ -2497,10 +2497,10 @@ class ECFragGetter(object): self.backend_headers = backend_headers self.header_provider = header_provider self.req_query_string = req.query_string - self.client_chunk_size = policy.fragment_size + self.fragment_size = policy.fragment_size self.skip_bytes = 0 self.bytes_used_from_backend = 0 - self.source = None + self.source = self.node = None self.logger_thread_locals = logger_thread_locals self.logger = logger @@ -2578,8 +2578,8 @@ class ECFragGetter(object): def learn_size_from_content_range(self, start, end, length): """ - If client_chunk_size is set, makes sure we yield things starting on - chunk boundaries based on the Content-Range header in the response. + Make sure we yield things starting on fragment boundaries based on the + Content-Range header in the response. Sets our Range header's first byterange to the value learned from the Content-Range header in the response; if we were given a @@ -2593,8 +2593,7 @@ class ECFragGetter(object): if length == 0: return - if self.client_chunk_size: - self.skip_bytes = bytes_to_skip(self.client_chunk_size, start) + self.skip_bytes = bytes_to_skip(self.fragment_size, start) if 'Range' in self.backend_headers: try: @@ -2620,170 +2619,155 @@ class ECFragGetter(object): it = self._get_response_parts_iter(req) return it - def _get_response_parts_iter(self, req): - try: - client_chunk_size = self.client_chunk_size - node_timeout = self.app.recoverable_node_timeout - - # This is safe; it sets up a generator but does not call next() - # on it, so no IO is performed. - parts_iter = [ - http_response_to_document_iters( - self.source, read_chunk_size=self.app.object_chunk_size)] + def get_next_doc_part(self): + node_timeout = self.app.recoverable_node_timeout - def get_next_doc_part(): - while True: - # the loop here is to resume if trying to parse - # multipart/byteranges response raises a ChunkReadTimeout - # and resets the parts_iter - try: - with WatchdogTimeout(self.app.watchdog, node_timeout, - ChunkReadTimeout): - # If we don't have a multipart/byteranges response, - # but just a 200 or a single-range 206, then this - # performs no IO, and just returns source (or - # raises StopIteration). - # Otherwise, this call to next() performs IO when - # we have a multipart/byteranges response; as it - # will read the MIME boundary and part headers. - start_byte, end_byte, length, headers, part = next( - parts_iter[0]) - return (start_byte, end_byte, length, headers, part) - except ChunkReadTimeout: - new_source, new_node = self._dig_for_source_and_node() - if not new_source: - raise - self.app.error_occurred( - self.node, 'Trying to read next part of ' - 'EC multi-part GET (retrying)') - # Close-out the connection as best as possible. - if getattr(self.source, 'swift_conn', None): - close_swift_conn(self.source) - self.source = new_source - self.node = new_node - # This is safe; it sets up a generator but does - # not call next() on it, so no IO is performed. - parts_iter[0] = http_response_to_document_iters( + while True: + # the loop here is to resume if trying to parse + # multipart/byteranges response raises a ChunkReadTimeout + # and resets the source_parts_iter + try: + with WatchdogTimeout(self.app.watchdog, node_timeout, + ChunkReadTimeout): + # If we don't have a multipart/byteranges response, + # but just a 200 or a single-range 206, then this + # performs no IO, and just returns source (or + # raises StopIteration). + # Otherwise, this call to next() performs IO when + # we have a multipart/byteranges response; as it + # will read the MIME boundary and part headers. + start_byte, end_byte, length, headers, part = next( + self.source_parts_iter) + return (start_byte, end_byte, length, headers, part) + except ChunkReadTimeout: + new_source, new_node = self._dig_for_source_and_node() + if not new_source: + raise + self.app.error_occurred( + self.node, 'Trying to read next part of ' + 'EC multi-part GET (retrying)') + # Close-out the connection as best as possible. + if getattr(self.source, 'swift_conn', None): + close_swift_conn(self.source) + self.source = new_source + self.node = new_node + # This is safe; it sets up a generator but does + # not call next() on it, so no IO is performed. + self.source_parts_iter = \ + http_response_to_document_iters( + new_source, + read_chunk_size=self.app.object_chunk_size) + + def iter_bytes_from_response_part(self, part_file, nbytes): + nchunks = 0 + buf = b'' + part_file = ByteCountEnforcer(part_file, nbytes) + while True: + try: + with WatchdogTimeout(self.app.watchdog, + self.app.recoverable_node_timeout, + ChunkReadTimeout): + chunk = part_file.read(self.app.object_chunk_size) + nchunks += 1 + # NB: this append must be *inside* the context + # manager for test.unit.SlowBody to do its thing + buf += chunk + if nbytes is not None: + nbytes -= len(chunk) + except (ChunkReadTimeout, ShortReadError): + exc_type, exc_value, exc_traceback = sys.exc_info() + try: + self.fast_forward(self.bytes_used_from_backend) + except (HTTPException, ValueError): + self.logger.exception('Unable to fast forward') + six.reraise(exc_type, exc_value, exc_traceback) + except RangeAlreadyComplete: + break + buf = b'' + old_node = self.node + new_source, new_node = self._dig_for_source_and_node() + if new_source: + self.app.error_occurred( + old_node, 'Trying to read EC fragment ' + 'during GET (retrying)') + # Close-out the connection as best as possible. + if getattr(self.source, 'swift_conn', None): + close_swift_conn(self.source) + self.source = new_source + self.node = new_node + # This is safe; it just sets up a generator but + # does not call next() on it, so no IO is + # performed. + self.source_parts_iter = \ + http_response_to_document_iters( new_source, read_chunk_size=self.app.object_chunk_size) - - def iter_bytes_from_response_part(part_file, nbytes): - nchunks = 0 - buf = b'' - part_file = ByteCountEnforcer(part_file, nbytes) - while True: try: - with WatchdogTimeout(self.app.watchdog, node_timeout, - ChunkReadTimeout): - chunk = part_file.read(self.app.object_chunk_size) - nchunks += 1 - # NB: this append must be *inside* the context - # manager for test.unit.SlowBody to do its thing - buf += chunk - if nbytes is not None: - nbytes -= len(chunk) - except (ChunkReadTimeout, ShortReadError): - exc_type, exc_value, exc_traceback = sys.exc_info() - try: - self.fast_forward(self.bytes_used_from_backend) - except (HTTPException, ValueError): - self.logger.exception('Unable to fast forward') - six.reraise(exc_type, exc_value, exc_traceback) - except RangeAlreadyComplete: - break - buf = b'' - old_node = self.node - new_source, new_node = self._dig_for_source_and_node() - if new_source: - self.app.error_occurred( - old_node, 'Trying to read EC fragment ' - 'during GET (retrying)') - # Close-out the connection as best as possible. - if getattr(self.source, 'swift_conn', None): - close_swift_conn(self.source) - self.source = new_source - self.node = new_node - # This is safe; it just sets up a generator but - # does not call next() on it, so no IO is - # performed. - parts_iter[0] = http_response_to_document_iters( - new_source, - read_chunk_size=self.app.object_chunk_size) - try: - _junk, _junk, _junk, _junk, part_file = \ - get_next_doc_part() - except StopIteration: - # it's not clear to me how to make - # get_next_doc_part raise StopIteration for the - # first doc part of a new request - six.reraise(exc_type, exc_value, exc_traceback) - part_file = ByteCountEnforcer(part_file, nbytes) - else: - six.reraise(exc_type, exc_value, exc_traceback) + _junk, _junk, _junk, _junk, part_file = \ + self.get_next_doc_part() + except StopIteration: + # it's not clear to me how to make + # get_next_doc_part raise StopIteration for the + # first doc part of a new request + six.reraise(exc_type, exc_value, exc_traceback) + part_file = ByteCountEnforcer(part_file, nbytes) + else: + six.reraise(exc_type, exc_value, exc_traceback) + else: + if buf and self.skip_bytes: + if self.skip_bytes < len(buf): + buf = buf[self.skip_bytes:] + self.bytes_used_from_backend += self.skip_bytes + self.skip_bytes = 0 else: - if buf and self.skip_bytes: - if self.skip_bytes < len(buf): - buf = buf[self.skip_bytes:] - self.bytes_used_from_backend += self.skip_bytes - self.skip_bytes = 0 - else: - self.skip_bytes -= len(buf) - self.bytes_used_from_backend += len(buf) - buf = b'' - - if not chunk: - if buf: - with WatchdogTimeout(self.app.watchdog, - self.app.client_timeout, - ChunkWriteTimeout): - self.bytes_used_from_backend += len(buf) - yield buf - buf = b'' - break - - if client_chunk_size is not None: - while len(buf) >= client_chunk_size: - client_chunk = buf[:client_chunk_size] - buf = buf[client_chunk_size:] - with WatchdogTimeout(self.app.watchdog, - self.app.client_timeout, - ChunkWriteTimeout): - self.bytes_used_from_backend += \ - len(client_chunk) - yield client_chunk - else: - with WatchdogTimeout(self.app.watchdog, - self.app.client_timeout, - ChunkWriteTimeout): - self.bytes_used_from_backend += len(buf) - yield buf - buf = b'' - - # This is for fairness; if the network is outpacing - # the CPU, we'll always be able to read and write - # data without encountering an EWOULDBLOCK, and so - # eventlet will not switch greenthreads on its own. - # We do it manually so that clients don't starve. - # - # The number 5 here was chosen by making stuff up. - # It's not every single chunk, but it's not too big - # either, so it seemed like it would probably be an - # okay choice. - # - # Note that we may trampoline to other greenthreads - # more often than once every 5 chunks, depending on - # how blocking our network IO is; the explicit sleep - # here simply provides a lower bound on the rate of - # trampolining. - if nchunks % 5 == 0: - sleep() + self.skip_bytes -= len(buf) + self.bytes_used_from_backend += len(buf) + buf = b'' + + while buf and (len(buf) >= self.fragment_size or not chunk): + client_chunk = buf[:self.fragment_size] + buf = buf[self.fragment_size:] + with WatchdogTimeout(self.app.watchdog, + self.app.client_timeout, + ChunkWriteTimeout): + self.bytes_used_from_backend += len(client_chunk) + yield client_chunk + + if not chunk: + break + + # This is for fairness; if the network is outpacing + # the CPU, we'll always be able to read and write + # data without encountering an EWOULDBLOCK, and so + # eventlet will not switch greenthreads on its own. + # We do it manually so that clients don't starve. + # + # The number 5 here was chosen by making stuff up. + # It's not every single chunk, but it's not too big + # either, so it seemed like it would probably be an + # okay choice. + # + # Note that we may trampoline to other greenthreads + # more often than once every 5 chunks, depending on + # how blocking our network IO is; the explicit sleep + # here simply provides a lower bound on the rate of + # trampolining. + if nchunks % 5 == 0: + sleep() + + def _get_response_parts_iter(self, req): + try: + # This is safe; it sets up a generator but does not call next() + # on it, so no IO is performed. + self.source_parts_iter = http_response_to_document_iters( + self.source, read_chunk_size=self.app.object_chunk_size) part_iter = None try: while True: try: start_byte, end_byte, length, headers, part = \ - get_next_doc_part() + self.get_next_doc_part() except StopIteration: # it seems this is the only way out of the loop; not # sure why the req.environ update is always needed @@ -2800,7 +2784,8 @@ class ECFragGetter(object): if (end_byte is not None and start_byte is not None) else None) - part_iter = iter_bytes_from_response_part(part, byte_count) + part_iter = self.iter_bytes_from_response_part( + part, byte_count) yield {'start_byte': start_byte, 'end_byte': end_byte, 'entity_length': length, 'headers': headers, 'part_iter': part_iter} |