diff options
Diffstat (limited to 'waitress/channel.py')
-rw-r--r-- | waitress/channel.py | 104 |
1 files changed, 73 insertions, 31 deletions
diff --git a/waitress/channel.py b/waitress/channel.py index 5bacaa0..c53e4fe 100644 --- a/waitress/channel.py +++ b/waitress/channel.py @@ -18,6 +18,7 @@ import time import traceback from waitress.buffers import ( + BytesIOBasedBuffer, OverflowableBuffer, ReadOnlyFileBasedBuffer, ) @@ -54,7 +55,9 @@ class HTTPChannel(wasyncore.dispatcher, object): close_when_flushed = False # set to True to close the socket when flushed requests = () # currently pending requests sent_continue = False # used as a latch after sending 100 continue - total_outbufs_len = 0 # total bytes ready to send + known_outbufs_len = 0 # total known bytes ready to send + has_unseekable_outbufs = False # any unseekable data to send + has_outbuf_data = False # any data to write including unseekable current_outbuf_count = 0 # total bytes written to current outbuf # @@ -90,7 +93,7 @@ class HTTPChannel(wasyncore.dispatcher, object): # if there's data in the out buffer or we've been instructed to close # the channel (possibly by our server maintenance logic), run # handle_write - return self.total_outbufs_len or self.will_close + return self.has_outbuf_data or self.will_close def handle_write(self): # Precondition: there's data in the out buffer to be sent, or @@ -107,7 +110,10 @@ class HTTPChannel(wasyncore.dispatcher, object): # because it's either data left over from task output # or a 100 Continue line sent within "received". flush = self._flush_some - elif self.total_outbufs_len >= self.adj.send_bytes: + elif ( + self.known_outbufs_len >= self.adj.send_bytes + or self.has_unseekable_outbufs + ): # 1. There's a running task, so we need to try to lock # the outbuf before sending # 2. Only try to send if the data in the out buffer is larger @@ -129,7 +135,7 @@ class HTTPChannel(wasyncore.dispatcher, object): self.logger.exception('Unexpected exception when flushing') self.will_close = True - if self.close_when_flushed and not self.total_outbufs_len: + if self.close_when_flushed and not self.has_outbuf_data: self.close_when_flushed = False self.will_close = True @@ -142,7 +148,7 @@ class HTTPChannel(wasyncore.dispatcher, object): # 2. There's no already currently running task(s). # 3. There's no data in the output buffer that needs to be sent # before we potentially create a new task. - return not (self.will_close or self.requests or self.total_outbufs_len) + return not (self.will_close or self.requests or self.has_outbuf_data) def handle_read(self): try: @@ -181,7 +187,8 @@ class HTTPChannel(wasyncore.dispatcher, object): outbuf_payload = b'HTTP/1.1 100 Continue\r\n\r\n' self.outbufs[-1].append(outbuf_payload) self.current_outbuf_count += len(outbuf_payload) - self.total_outbufs_len += len(outbuf_payload) + self.known_outbufs_len += len(outbuf_payload) + self.has_outbuf_data = True self.sent_continue = True self._flush_some() request.completed = False @@ -210,7 +217,7 @@ class HTTPChannel(wasyncore.dispatcher, object): try: self._flush_some() - if self.total_outbufs_len < self.adj.outbuf_high_watermark: + if self.known_outbufs_len < self.adj.outbuf_high_watermark: self.outbuf_lock.notify() finally: self.outbuf_lock.release() @@ -220,28 +227,38 @@ class HTTPChannel(wasyncore.dispatcher, object): sent = 0 dobreak = False + outbufs = self.outbufs while True: - outbuf = self.outbufs[0] - # use outbuf.__len__ rather than len(outbuf) FBO of not getting - # OverflowError on 32-bit Python - outbuflen = outbuf.__len__() - while outbuflen > 0: - chunk = outbuf.get(self.sendbuf_len) + outbuf = outbufs[0] + # remaining might be -1 for an unseekable ROFBB + # so we perform a read and assume that the ROFBB will update + # remaining when it knows it's empty + while outbuf.remaining != 0: + chunk = outbuf.read(self.sendbuf_len) + num_tosend = len(chunk) num_sent = self.send(chunk) - if num_sent: - outbuf.skip(num_sent, True) - outbuflen -= num_sent - sent += num_sent - self.total_outbufs_len -= num_sent - else: + # handle_close may have been called by send() so be careful + # about mutating state below if num_sent is 0 + sent += num_sent + if num_sent < num_tosend and self.connected: + # failed to write all of the data, so either put the + # remaining amount into a new buffer to be used on the + # next write or rollback the pointer to only skip what was + # successfully written + if outbuf.seekable: + outbuf.rollback(num_tosend - num_sent) + else: + outbuf = BytesIOBasedBuffer(chunk[num_sent:]) + outbufs.appendleft(outbuf) + if not num_sent: # failed to write anything, break out entirely dobreak = True break else: # self.outbufs[-1] must always be a writable outbuf - if len(self.outbufs) > 1: - toclose = self.outbufs.popleft() + if len(outbufs) > 1: + toclose = outbufs.popleft() try: toclose.close() except Exception: @@ -254,25 +271,44 @@ class HTTPChannel(wasyncore.dispatcher, object): if dobreak: break + # refresh the outbuf statistics after a write + self._scan_outbufs() + if sent: self.last_activity = time.time() return True return False + def _scan_outbufs(self): + self.has_unseekable_outbufs = False + self.known_outbufs_len = 0 + for o in self.outbufs: + if o.seekable: + self.known_outbufs_len += o.remaining + else: + self.has_unseekable_outbufs = True + self.has_outbuf_data = ( + self.known_outbufs_len or self.has_unseekable_outbufs + ) + def handle_close(self): with self.outbuf_lock: - while self.outbufs: - outbuf = self.outbufs.popleft() + outbufs = self.outbufs + while outbufs: + toclose = outbufs.popleft() try: - outbuf.close() + toclose.close() except Exception: self.logger.exception( 'Unknown exception while trying to close outbuf') - self.total_outbufs_len = 0 + self.known_outbufs_len = 0 + self.has_outbuf_data = False + self.has_unseekable_outbufs = False + self.current_outbuf_count = 0 self.connected = False self.outbuf_lock.notify() - wasyncore.dispatcher.close(self) + self.close() def add_channel(self, map=None): """See wasyncore.dispatcher @@ -315,6 +351,11 @@ class HTTPChannel(wasyncore.dispatcher, object): nextbuf = OverflowableBuffer(self.adj.outbuf_overflow) self.outbufs.append(nextbuf) self.current_outbuf_count = 0 + num_bytes = data.remaining + if num_bytes == -1: + self.has_unseekable_outbufs = True + else: + self.known_outbufs_len += num_bytes else: if self.current_outbuf_count > self.adj.outbuf_high_watermark: # rotate to a new buffer if the current buffer has hit @@ -323,9 +364,10 @@ class HTTPChannel(wasyncore.dispatcher, object): self.outbufs.append(nextbuf) self.current_outbuf_count = 0 self.outbufs[-1].append(data) - num_bytes = len(data) - self.current_outbuf_count += num_bytes - self.total_outbufs_len += num_bytes + num_bytes = len(data) + self.current_outbuf_count += num_bytes + self.known_outbufs_len += num_bytes + self.has_outbuf_data = True # XXX We might eventually need to pull the trigger here (to # instruct select to stop blocking), but it slows things down so # much that I'll hold off for now; "server push" on otherwise @@ -335,11 +377,11 @@ class HTTPChannel(wasyncore.dispatcher, object): def _flush_outbufs_below_high_watermark(self): # check first to avoid locking if possible - if self.total_outbufs_len > self.adj.outbuf_high_watermark: + if self.known_outbufs_len > self.adj.outbuf_high_watermark: with self.outbuf_lock: while ( self.connected and - self.total_outbufs_len > self.adj.outbuf_high_watermark + self.known_outbufs_len > self.adj.outbuf_high_watermark ): self.outbuf_lock.wait() |