summaryrefslogtreecommitdiff
path: root/waitress/channel.py
diff options
context:
space:
mode:
Diffstat (limited to 'waitress/channel.py')
-rw-r--r--waitress/channel.py104
1 files changed, 73 insertions, 31 deletions
diff --git a/waitress/channel.py b/waitress/channel.py
index 5bacaa0..c53e4fe 100644
--- a/waitress/channel.py
+++ b/waitress/channel.py
@@ -18,6 +18,7 @@ import time
import traceback
from waitress.buffers import (
+ BytesIOBasedBuffer,
OverflowableBuffer,
ReadOnlyFileBasedBuffer,
)
@@ -54,7 +55,9 @@ class HTTPChannel(wasyncore.dispatcher, object):
close_when_flushed = False # set to True to close the socket when flushed
requests = () # currently pending requests
sent_continue = False # used as a latch after sending 100 continue
- total_outbufs_len = 0 # total bytes ready to send
+ known_outbufs_len = 0 # total known bytes ready to send
+ has_unseekable_outbufs = False # any unseekable data to send
+ has_outbuf_data = False # any data to write including unseekable
current_outbuf_count = 0 # total bytes written to current outbuf
#
@@ -90,7 +93,7 @@ class HTTPChannel(wasyncore.dispatcher, object):
# if there's data in the out buffer or we've been instructed to close
# the channel (possibly by our server maintenance logic), run
# handle_write
- return self.total_outbufs_len or self.will_close
+ return self.has_outbuf_data or self.will_close
def handle_write(self):
# Precondition: there's data in the out buffer to be sent, or
@@ -107,7 +110,10 @@ class HTTPChannel(wasyncore.dispatcher, object):
# because it's either data left over from task output
# or a 100 Continue line sent within "received".
flush = self._flush_some
- elif self.total_outbufs_len >= self.adj.send_bytes:
+ elif (
+ self.known_outbufs_len >= self.adj.send_bytes
+ or self.has_unseekable_outbufs
+ ):
# 1. There's a running task, so we need to try to lock
# the outbuf before sending
# 2. Only try to send if the data in the out buffer is larger
@@ -129,7 +135,7 @@ class HTTPChannel(wasyncore.dispatcher, object):
self.logger.exception('Unexpected exception when flushing')
self.will_close = True
- if self.close_when_flushed and not self.total_outbufs_len:
+ if self.close_when_flushed and not self.has_outbuf_data:
self.close_when_flushed = False
self.will_close = True
@@ -142,7 +148,7 @@ class HTTPChannel(wasyncore.dispatcher, object):
# 2. There's no already currently running task(s).
# 3. There's no data in the output buffer that needs to be sent
# before we potentially create a new task.
- return not (self.will_close or self.requests or self.total_outbufs_len)
+ return not (self.will_close or self.requests or self.has_outbuf_data)
def handle_read(self):
try:
@@ -181,7 +187,8 @@ class HTTPChannel(wasyncore.dispatcher, object):
outbuf_payload = b'HTTP/1.1 100 Continue\r\n\r\n'
self.outbufs[-1].append(outbuf_payload)
self.current_outbuf_count += len(outbuf_payload)
- self.total_outbufs_len += len(outbuf_payload)
+ self.known_outbufs_len += len(outbuf_payload)
+ self.has_outbuf_data = True
self.sent_continue = True
self._flush_some()
request.completed = False
@@ -210,7 +217,7 @@ class HTTPChannel(wasyncore.dispatcher, object):
try:
self._flush_some()
- if self.total_outbufs_len < self.adj.outbuf_high_watermark:
+ if self.known_outbufs_len < self.adj.outbuf_high_watermark:
self.outbuf_lock.notify()
finally:
self.outbuf_lock.release()
@@ -220,28 +227,38 @@ class HTTPChannel(wasyncore.dispatcher, object):
sent = 0
dobreak = False
+ outbufs = self.outbufs
while True:
- outbuf = self.outbufs[0]
- # use outbuf.__len__ rather than len(outbuf) FBO of not getting
- # OverflowError on 32-bit Python
- outbuflen = outbuf.__len__()
- while outbuflen > 0:
- chunk = outbuf.get(self.sendbuf_len)
+ outbuf = outbufs[0]
+ # remaining might be -1 for an unseekable ROFBB
+ # so we perform a read and assume that the ROFBB will update
+ # remaining when it knows it's empty
+ while outbuf.remaining != 0:
+ chunk = outbuf.read(self.sendbuf_len)
+ num_tosend = len(chunk)
num_sent = self.send(chunk)
- if num_sent:
- outbuf.skip(num_sent, True)
- outbuflen -= num_sent
- sent += num_sent
- self.total_outbufs_len -= num_sent
- else:
+ # handle_close may have been called by send() so be careful
+ # about mutating state below if num_sent is 0
+ sent += num_sent
+ if num_sent < num_tosend and self.connected:
+ # failed to write all of the data, so either put the
+ # remaining amount into a new buffer to be used on the
+ # next write or rollback the pointer to only skip what was
+ # successfully written
+ if outbuf.seekable:
+ outbuf.rollback(num_tosend - num_sent)
+ else:
+ outbuf = BytesIOBasedBuffer(chunk[num_sent:])
+ outbufs.appendleft(outbuf)
+ if not num_sent:
# failed to write anything, break out entirely
dobreak = True
break
else:
# self.outbufs[-1] must always be a writable outbuf
- if len(self.outbufs) > 1:
- toclose = self.outbufs.popleft()
+ if len(outbufs) > 1:
+ toclose = outbufs.popleft()
try:
toclose.close()
except Exception:
@@ -254,25 +271,44 @@ class HTTPChannel(wasyncore.dispatcher, object):
if dobreak:
break
+ # refresh the outbuf statistics after a write
+ self._scan_outbufs()
+
if sent:
self.last_activity = time.time()
return True
return False
+ def _scan_outbufs(self):
+ self.has_unseekable_outbufs = False
+ self.known_outbufs_len = 0
+ for o in self.outbufs:
+ if o.seekable:
+ self.known_outbufs_len += o.remaining
+ else:
+ self.has_unseekable_outbufs = True
+ self.has_outbuf_data = (
+ self.known_outbufs_len or self.has_unseekable_outbufs
+ )
+
def handle_close(self):
with self.outbuf_lock:
- while self.outbufs:
- outbuf = self.outbufs.popleft()
+ outbufs = self.outbufs
+ while outbufs:
+ toclose = outbufs.popleft()
try:
- outbuf.close()
+ toclose.close()
except Exception:
self.logger.exception(
'Unknown exception while trying to close outbuf')
- self.total_outbufs_len = 0
+ self.known_outbufs_len = 0
+ self.has_outbuf_data = False
+ self.has_unseekable_outbufs = False
+ self.current_outbuf_count = 0
self.connected = False
self.outbuf_lock.notify()
- wasyncore.dispatcher.close(self)
+ self.close()
def add_channel(self, map=None):
"""See wasyncore.dispatcher
@@ -315,6 +351,11 @@ class HTTPChannel(wasyncore.dispatcher, object):
nextbuf = OverflowableBuffer(self.adj.outbuf_overflow)
self.outbufs.append(nextbuf)
self.current_outbuf_count = 0
+ num_bytes = data.remaining
+ if num_bytes == -1:
+ self.has_unseekable_outbufs = True
+ else:
+ self.known_outbufs_len += num_bytes
else:
if self.current_outbuf_count > self.adj.outbuf_high_watermark:
# rotate to a new buffer if the current buffer has hit
@@ -323,9 +364,10 @@ class HTTPChannel(wasyncore.dispatcher, object):
self.outbufs.append(nextbuf)
self.current_outbuf_count = 0
self.outbufs[-1].append(data)
- num_bytes = len(data)
- self.current_outbuf_count += num_bytes
- self.total_outbufs_len += num_bytes
+ num_bytes = len(data)
+ self.current_outbuf_count += num_bytes
+ self.known_outbufs_len += num_bytes
+ self.has_outbuf_data = True
# XXX We might eventually need to pull the trigger here (to
# instruct select to stop blocking), but it slows things down so
# much that I'll hold off for now; "server push" on otherwise
@@ -335,11 +377,11 @@ class HTTPChannel(wasyncore.dispatcher, object):
def _flush_outbufs_below_high_watermark(self):
# check first to avoid locking if possible
- if self.total_outbufs_len > self.adj.outbuf_high_watermark:
+ if self.known_outbufs_len > self.adj.outbuf_high_watermark:
with self.outbuf_lock:
while (
self.connected and
- self.total_outbufs_len > self.adj.outbuf_high_watermark
+ self.known_outbufs_len > self.adj.outbuf_high_watermark
):
self.outbuf_lock.wait()