summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBert JW Regeer <xistence@0x58.com>2019-04-06 15:54:53 -0600
committerGitHub <noreply@github.com>2019-04-06 15:54:53 -0600
commit5583715063c30fda571ab0e0169b0068403dc53d (patch)
tree91021216340c576eb796d685bfe8683050946113
parent5bd69e9545718913bd2be0b4ab04f7da3d9f74c8 (diff)
parent9e1b10aa3d8787868ac8d7556b27d36a22357660 (diff)
downloadwaitress-5583715063c30fda571ab0e0169b0068403dc53d.tar.gz
Merge pull request #242 from Pylons/outbuf-high-watermark
outbuf_high_watermark
-rw-r--r--CHANGES.txt11
-rw-r--r--docs/api.rst2
-rw-r--r--docs/arguments.rst7
-rw-r--r--docs/runner.rst5
-rw-r--r--waitress/adjustments.py5
-rw-r--r--waitress/channel.py80
-rw-r--r--waitress/runner.py5
-rw-r--r--waitress/tests/test_channel.py76
8 files changed, 165 insertions, 26 deletions
diff --git a/CHANGES.txt b/CHANGES.txt
index be24559..2897939 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,12 @@ Deprecations
Features
~~~~~~~~
+- Add a new ``outbuf_high_watermark`` adjustment which is used to apply
+ backpressure on the ``app_iter`` to avoid letting it spin faster than data
+ can be written to the socket. This stabilizes responses that iterate quickly
+ with a lot of data.
+ See https://github.com/Pylons/waitress/pull/242
+
- Stop early and close the ``app_iter`` when attempting to write to a closed
socket due to a client disconnect. This should notify a long-lived streaming
response when a client hangs up.
@@ -43,6 +49,11 @@ Bugfixes
the server will die before benefiting from these changes.
See https://github.com/Pylons/waitress/pull/245
+- Fix a bug in which a streaming ``app_iter`` may never cleanup data that has
+ already been sent. This would cause buffers in waitress to grow without
+ bounds. These buffers now properly rotate and release their data.
+ See https://github.com/Pylons/waitress/pull/242
+
1.2.1 (2019-01-25)
------------------
diff --git a/docs/api.rst b/docs/api.rst
index 70c174c..a921a1b 100644
--- a/docs/api.rst
+++ b/docs/api.rst
@@ -5,6 +5,6 @@
.. module:: waitress
-.. function:: serve(app, listen='0.0.0.0:8080', unix_socket=None, unix_socket_perms='600', threads=4, url_scheme='http', url_prefix='', ident='waitress', backlog=1204, recv_bytes=8192, send_bytes=1, outbuf_overflow=104856, inbuf_overflow=52488, connection_limit=1000, cleanup_interval=30, channel_timeout=120, log_socket_errors=True, max_request_header_size=262144, max_request_body_size=1073741824, expose_tracebacks=False)
+.. function:: serve(app, listen='0.0.0.0:8080', unix_socket=None, unix_socket_perms='600', threads=4, url_scheme='http', url_prefix='', ident='waitress', backlog=1204, recv_bytes=8192, send_bytes=1, outbuf_overflow=104856, outbuf_high_watermark=16777216, inbuf_overflow=52488, connection_limit=1000, cleanup_interval=30, channel_timeout=120, log_socket_errors=True, max_request_header_size=262144, max_request_body_size=1073741824, expose_tracebacks=False)
See :ref:`arguments` for more information.
diff --git a/docs/arguments.rst b/docs/arguments.rst
index 8bacc44..22535a8 100644
--- a/docs/arguments.rst
+++ b/docs/arguments.rst
@@ -200,6 +200,13 @@ outbuf_overflow
Default: ``1048576`` (1MB)
+outbuf_high_watermark
+ The app_iter will pause when pending output is larger than this value
+ and will resume once enough data is written to the socket to fall below
+ this threshold.
+
+ Default: ``16777216`` (16MB)
+
inbuf_overflow
A tempfile should be created if the pending input is larger than
inbuf_overflow, which is measured in bytes. The default is conservative.
diff --git a/docs/runner.rst b/docs/runner.rst
index 0b61307..2776e44 100644
--- a/docs/runner.rst
+++ b/docs/runner.rst
@@ -152,6 +152,11 @@ Tuning options:
A temporary file should be created if the pending output is larger than
this. Default is 1048576 (1MB).
+``--outbuf-high-watermark=INT``
+ The app_iter will pause when pending output is larger than this value
+ and will resume once enough data is written to the socket to fall below
+ this threshold. Default is 16777216 (16MB).
+
``--inbuf-overflow=INT``
A temporary file should be created if the pending input is larger than
this. Default is 524288 (512KB).
diff --git a/waitress/adjustments.py b/waitress/adjustments.py
index 3b0b364..5c1879b 100644
--- a/waitress/adjustments.py
+++ b/waitress/adjustments.py
@@ -112,6 +112,7 @@ class Adjustments(object):
('recv_bytes', int),
('send_bytes', int),
('outbuf_overflow', int),
+ ('outbuf_high_watermark', int),
('inbuf_overflow', int),
('connection_limit', int),
('cleanup_interval', int),
@@ -204,6 +205,10 @@ class Adjustments(object):
# is conservative.
outbuf_overflow = 1048576
+ # The app_iter will pause when pending output is larger than this value
+ # in bytes.
+ outbuf_high_watermark = 16777216
+
# A tempfile should be created if the pending input is larger than
# inbuf_overflow, which is measured in bytes. The default is 512K. This
# is conservative.
diff --git a/waitress/channel.py b/waitress/channel.py
index 8fee59c..12e6fce 100644
--- a/waitress/channel.py
+++ b/waitress/channel.py
@@ -53,6 +53,8 @@ class HTTPChannel(wasyncore.dispatcher, object):
close_when_flushed = False # set to True to close the socket when flushed
requests = () # currently pending requests
sent_continue = False # used as a latch after sending 100 continue
+ total_outbufs_len = 0 # total bytes ready to send
+ current_outbuf_count = 0 # total bytes written to current outbuf
#
# ASYNCHRONOUS METHODS (including __init__)
@@ -69,14 +71,13 @@ class HTTPChannel(wasyncore.dispatcher, object):
self.server = server
self.adj = adj
self.outbufs = [OverflowableBuffer(adj.outbuf_overflow)]
- self.total_outbufs_len = 0
self.creation_time = self.last_activity = time.time()
self.sendbuf_len = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
# task_lock used to push/pop requests
self.task_lock = threading.Lock()
- # outbuf_lock used to access any outbuf
- self.outbuf_lock = threading.RLock()
+ # outbuf_lock used to access any outbuf (expected to use an RLock)
+ self.outbuf_lock = threading.Condition()
wasyncore.dispatcher.__init__(self, sock, map=map)
@@ -122,7 +123,7 @@ class HTTPChannel(wasyncore.dispatcher, object):
if self.adj.log_socket_errors:
self.logger.exception('Socket error')
self.will_close = True
- except:
+ except Exception:
self.logger.exception('Unexpected exception when flushing')
self.will_close = True
@@ -177,6 +178,7 @@ class HTTPChannel(wasyncore.dispatcher, object):
# lock the outbuf to append to it.
outbuf_payload = b'HTTP/1.1 100 Continue\r\n\r\n'
self.outbufs[-1].append(outbuf_payload)
+ self.current_outbuf_count += len(outbuf_payload)
self.total_outbufs_len += len(outbuf_payload)
self.sent_continue = True
self._flush_some()
@@ -205,6 +207,9 @@ class HTTPChannel(wasyncore.dispatcher, object):
if self.outbuf_lock.acquire(False):
try:
self._flush_some()
+
+ if self.total_outbufs_len < self.adj.outbuf_high_watermark:
+ self.outbuf_lock.notify()
finally:
self.outbuf_lock.release()
@@ -217,23 +222,8 @@ class HTTPChannel(wasyncore.dispatcher, object):
while True:
outbuf = self.outbufs[0]
# use outbuf.__len__ rather than len(outbuf) FBO of not getting
- # OverflowError on Python 2
+ # OverflowError on 32-bit Python
outbuflen = outbuf.__len__()
- if outbuflen <= 0:
- # self.outbufs[-1] must always be a writable outbuf
- if len(self.outbufs) > 1:
- toclose = self.outbufs.pop(0)
- try:
- toclose.close()
- except:
- self.logger.exception(
- 'Unexpected error when closing an outbuf')
- continue # pragma: no cover (coverage bug, it is hit)
- else:
- if hasattr(outbuf, 'prune'):
- outbuf.prune()
- dobreak = True
-
while outbuflen > 0:
chunk = outbuf.get(self.sendbuf_len)
num_sent = self.send(chunk)
@@ -243,8 +233,21 @@ class HTTPChannel(wasyncore.dispatcher, object):
sent += num_sent
self.total_outbufs_len -= num_sent
else:
+ # failed to write anything, break out entirely
dobreak = True
break
+ else:
+ # self.outbufs[-1] must always be a writable outbuf
+ if len(self.outbufs) > 1:
+ toclose = self.outbufs.pop(0)
+ try:
+ toclose.close()
+ except Exception:
+ self.logger.exception(
+ 'Unexpected error when closing an outbuf')
+ else:
+ # caught up, done flushing for now
+ dobreak = True
if dobreak:
break
@@ -260,11 +263,12 @@ class HTTPChannel(wasyncore.dispatcher, object):
for outbuf in self.outbufs:
try:
outbuf.close()
- except:
+ except Exception:
self.logger.exception(
'Unknown exception while trying to close outbuf')
self.total_outbufs_len = 0
self.connected = False
+ self.outbuf_lock.notify()
wasyncore.dispatcher.close(self)
def add_channel(self, map=None):
@@ -299,18 +303,25 @@ class HTTPChannel(wasyncore.dispatcher, object):
# the async mainloop might be popping data off outbuf; we can
# block here waiting for it because we're in a task thread
with self.outbuf_lock:
- # check again after acquiring the lock to ensure we the
- # outbufs are not closed
- if not self.connected: # pragma: no cover
+ self._flush_outbufs_below_high_watermark()
+ if not self.connected:
raise ClientDisconnected
if data.__class__ is ReadOnlyFileBasedBuffer:
# they used wsgi.file_wrapper
self.outbufs.append(data)
nextbuf = OverflowableBuffer(self.adj.outbuf_overflow)
self.outbufs.append(nextbuf)
+ self.current_outbuf_count = 0
else:
+ if self.current_outbuf_count > self.adj.outbuf_high_watermark:
+ # rotate to a new buffer if the current buffer has hit
+ # the watermark to avoid it growing unbounded
+ nextbuf = OverflowableBuffer(self.adj.outbuf_overflow)
+ self.outbufs.append(nextbuf)
+ self.current_outbuf_count = 0
self.outbufs[-1].append(data)
num_bytes = len(data)
+ self.current_outbuf_count += num_bytes
self.total_outbufs_len += num_bytes
# XXX We might eventually need to pull the trigger here (to
# instruct select to stop blocking), but it slows things down so
@@ -319,6 +330,16 @@ class HTTPChannel(wasyncore.dispatcher, object):
return num_bytes
return 0
+ def _flush_outbufs_below_high_watermark(self):
+ # check first to avoid locking if possible
+ if self.total_outbufs_len > self.adj.outbuf_high_watermark:
+ with self.outbuf_lock:
+ while (
+ self.connected and
+ self.total_outbufs_len > self.adj.outbuf_high_watermark
+ ):
+ self.outbuf_lock.wait()
+
def service(self):
"""Execute all pending requests """
with self.task_lock:
@@ -334,7 +355,7 @@ class HTTPChannel(wasyncore.dispatcher, object):
self.logger.info('Client disconnected while serving %s' %
task.request.path)
task.close_on_finish = True
- except:
+ except Exception:
self.logger.exception('Exception while serving %s' %
task.request.path)
if not task.wrote_header:
@@ -370,6 +391,15 @@ class HTTPChannel(wasyncore.dispatcher, object):
request.close()
self.requests = []
else:
+ # before processing a new request, ensure there is not too
+ # much data in the outbufs waiting to be flushed
+ # NB: currently readable() returns False while we are
+ # flushing data so we know no new requests will come in
+ # that we need to account for, otherwise it'd be better
+ # to do this check at the start of the request instead of
+ # at the end to account for consecutive service() calls
+ if len(self.requests) > 1:
+ self._flush_outbufs_below_high_watermark()
request = self.requests.pop(0)
request.close()
diff --git a/waitress/runner.py b/waitress/runner.py
index abdb38e..6797276 100644
--- a/waitress/runner.py
+++ b/waitress/runner.py
@@ -126,6 +126,11 @@ Tuning options:
A temporary file should be created if the pending output is larger
than this. Default is 1048576 (1MB).
+ --outbuf-high-watermark=INT
+ The app_iter will pause when pending output is larger than this value
+ and will resume once enough data is written to the socket to fall below
+ this threshold. Default is 16777216 (16MB).
+
--inbuf-overflow=INT
A temporary file should be created if the pending input is larger
than this. Default is 524288 (512KB).
diff --git a/waitress/tests/test_channel.py b/waitress/tests/test_channel.py
index f66766b..35fccf6 100644
--- a/waitress/tests/test_channel.py
+++ b/waitress/tests/test_channel.py
@@ -228,6 +228,74 @@ class TestHTTPChannel(unittest.TestCase):
inst.connected = False
self.assertRaises(ClientDisconnected, lambda: inst.write_soon(b'stuff'))
+ def test_write_soon_disconnected_while_over_watermark(self):
+ from waitress.channel import ClientDisconnected
+ inst, sock, map = self._makeOneWithMap()
+ def dummy_flush():
+ inst.connected = False
+ inst._flush_outbufs_below_high_watermark = dummy_flush
+ self.assertRaises(ClientDisconnected, lambda: inst.write_soon(b'stuff'))
+
+ def test_write_soon_rotates_outbuf_on_overflow(self):
+ inst, sock, map = self._makeOneWithMap()
+ inst.adj.outbuf_high_watermark = 3
+ inst.current_outbuf_count = 4
+ wrote = inst.write_soon(b'xyz')
+ self.assertEqual(wrote, 3)
+ self.assertEqual(len(inst.outbufs), 2)
+ self.assertEqual(inst.outbufs[0].get(), b'')
+ self.assertEqual(inst.outbufs[1].get(), b'xyz')
+
+ def test_write_soon_waits_on_backpressure(self):
+ inst, sock, map = self._makeOneWithMap()
+ inst.adj.outbuf_high_watermark = 3
+ inst.total_outbufs_len = 4
+ inst.current_outbuf_count = 4
+ class Lock(DummyLock):
+ def wait(self):
+ inst.total_outbufs_len = 0
+ super(Lock, self).wait()
+ inst.outbuf_lock = Lock()
+ wrote = inst.write_soon(b'xyz')
+ self.assertEqual(wrote, 3)
+ self.assertEqual(len(inst.outbufs), 2)
+ self.assertEqual(inst.outbufs[0].get(), b'')
+ self.assertEqual(inst.outbufs[1].get(), b'xyz')
+ self.assertTrue(inst.outbuf_lock.waited)
+
+ def test_handle_write_notify_after_flush(self):
+ inst, sock, map = self._makeOneWithMap()
+ inst.requests = [True]
+ inst.outbufs = [DummyBuffer(b'abc')]
+ inst.total_outbufs_len = len(inst.outbufs[0])
+ inst.adj.send_bytes = 1
+ inst.adj.outbuf_high_watermark = 5
+ inst.will_close = False
+ inst.last_activity = 0
+ result = inst.handle_write()
+ self.assertEqual(result, None)
+ self.assertEqual(inst.will_close, False)
+ self.assertTrue(inst.outbuf_lock.acquired)
+ self.assertTrue(inst.outbuf_lock.notified)
+ self.assertEqual(sock.sent, b'abc')
+
+ def test_handle_write_no_notify_after_flush(self):
+ inst, sock, map = self._makeOneWithMap()
+ inst.requests = [True]
+ inst.outbufs = [DummyBuffer(b'abc')]
+ inst.total_outbufs_len = len(inst.outbufs[0])
+ inst.adj.send_bytes = 1
+ inst.adj.outbuf_high_watermark = 2
+ sock.send = lambda x: False
+ inst.will_close = False
+ inst.last_activity = 0
+ result = inst.handle_write()
+ self.assertEqual(result, None)
+ self.assertEqual(inst.will_close, False)
+ self.assertTrue(inst.outbuf_lock.acquired)
+ self.assertFalse(inst.outbuf_lock.notified)
+ self.assertEqual(sock.sent, b'')
+
def test__flush_some_empty_outbuf(self):
inst, sock, map = self._makeOneWithMap()
result = inst._flush_some()
@@ -652,6 +720,7 @@ class DummySock(object):
return len(data)
class DummyLock(object):
+ notified = False
def __init__(self, acquirable=True):
self.acquirable = acquirable
@@ -664,6 +733,12 @@ class DummyLock(object):
def release(self):
self.released = True
+ def notify(self):
+ self.notified = True
+
+ def wait(self):
+ self.waited = True
+
def __exit__(self, type, val, traceback):
self.acquire(True)
@@ -695,6 +770,7 @@ class DummyBuffer(object):
class DummyAdjustments(object):
outbuf_overflow = 1048576
+ outbuf_high_watermark = 1048576
inbuf_overflow = 512000
cleanup_interval = 900
url_scheme = 'http'