summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Merickel <michael@merickel.org>2019-03-28 01:57:42 -0500
committerMichael Merickel <michael@merickel.org>2019-04-05 13:00:08 -0500
commit610544cfd7374d0c0067ba1dec217bb7b4c9092d (patch)
tree88258b6ea079e290c534f1053c5a589103fd0a03
parent5bd69e9545718913bd2be0b4ab04f7da3d9f74c8 (diff)
downloadwaitress-610544cfd7374d0c0067ba1dec217bb7b4c9092d.tar.gz
add backpressure on the app_iter if the socket can't send data quickly enough
-rw-r--r--CHANGES.txt18
-rw-r--r--docs/api.rst2
-rw-r--r--docs/arguments.rst7
-rw-r--r--docs/runner.rst5
-rw-r--r--waitress/adjustments.py5
-rw-r--r--waitress/channel.py40
-rw-r--r--waitress/runner.py5
-rw-r--r--waitress/tests/test_channel.py71
8 files changed, 144 insertions, 9 deletions
diff --git a/CHANGES.txt b/CHANGES.txt
index be24559..d5c6274 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,15 @@ Deprecations
Features
~~~~~~~~
+- Add a new ``outbuf_high_watermark`` adjustment which is used to apply
+ backpressure on the ``app_iter`` to avoid letting it spin faster than data
+ can be written to the socket. This stabilizes responses that iterate quickly
+ with a lot of data.
+ See https://github.com/Pylons/waitress/pull/242
+
+Bugfixes
+~~~~~~~~
+
- Stop early and close the ``app_iter`` when attempting to write to a closed
socket due to a client disconnect. This should notify a long-lived streaming
response when a client hangs up.
@@ -26,9 +35,6 @@ Features
still be flushed efficiently.
See https://github.com/Pylons/waitress/pull/246
-Bugfixes
-~~~~~~~~
-
- When a client closes a socket unexpectedly there was potential for memory
leaks in which data was written to the buffers after they were closed,
causing them to reopen.
@@ -43,6 +49,12 @@ Bugfixes
the server will die before benefiting from these changes.
See https://github.com/Pylons/waitress/pull/245
+- Fix a bug in which an ``app_iter`` that emits data chunks quicker than can
+ be written to a socket may never cleanup data that has already been sent.
+ This would cause buffers in waitress to grow without bounds. These buffers
+ now properly reclaim their space as data is written to the socket.
+ See https://github.com/Pylons/waitress/pull/242
+
1.2.1 (2019-01-25)
------------------
diff --git a/docs/api.rst b/docs/api.rst
index 70c174c..a921a1b 100644
--- a/docs/api.rst
+++ b/docs/api.rst
@@ -5,6 +5,6 @@
.. module:: waitress
-.. function:: serve(app, listen='0.0.0.0:8080', unix_socket=None, unix_socket_perms='600', threads=4, url_scheme='http', url_prefix='', ident='waitress', backlog=1204, recv_bytes=8192, send_bytes=1, outbuf_overflow=104856, inbuf_overflow=52488, connection_limit=1000, cleanup_interval=30, channel_timeout=120, log_socket_errors=True, max_request_header_size=262144, max_request_body_size=1073741824, expose_tracebacks=False)
+.. function:: serve(app, listen='0.0.0.0:8080', unix_socket=None, unix_socket_perms='600', threads=4, url_scheme='http', url_prefix='', ident='waitress', backlog=1204, recv_bytes=8192, send_bytes=1, outbuf_overflow=104856, outbuf_high_watermark=16777216, inbuf_overflow=52488, connection_limit=1000, cleanup_interval=30, channel_timeout=120, log_socket_errors=True, max_request_header_size=262144, max_request_body_size=1073741824, expose_tracebacks=False)
See :ref:`arguments` for more information.
diff --git a/docs/arguments.rst b/docs/arguments.rst
index 8bacc44..22535a8 100644
--- a/docs/arguments.rst
+++ b/docs/arguments.rst
@@ -200,6 +200,13 @@ outbuf_overflow
Default: ``1048576`` (1MB)
+outbuf_high_watermark
+ The app_iter will pause when pending output is larger than this value
+ and will resume once enough data is written to the socket to fall below
+ this threshold.
+
+ Default: ``16777216`` (16MB)
+
inbuf_overflow
A tempfile should be created if the pending input is larger than
inbuf_overflow, which is measured in bytes. The default is conservative.
diff --git a/docs/runner.rst b/docs/runner.rst
index 0b61307..2776e44 100644
--- a/docs/runner.rst
+++ b/docs/runner.rst
@@ -152,6 +152,11 @@ Tuning options:
A temporary file should be created if the pending output is larger than
this. Default is 1048576 (1MB).
+``--outbuf-high-watermark=INT``
+ The app_iter will pause when pending output is larger than this value
+ and will resume once enough data is written to the socket to fall below
+ this threshold. Default is 16777216 (16MB).
+
``--inbuf-overflow=INT``
A temporary file should be created if the pending input is larger than
this. Default is 524288 (512KB).
diff --git a/waitress/adjustments.py b/waitress/adjustments.py
index 3b0b364..5c1879b 100644
--- a/waitress/adjustments.py
+++ b/waitress/adjustments.py
@@ -112,6 +112,7 @@ class Adjustments(object):
('recv_bytes', int),
('send_bytes', int),
('outbuf_overflow', int),
+ ('outbuf_high_watermark', int),
('inbuf_overflow', int),
('connection_limit', int),
('cleanup_interval', int),
@@ -204,6 +205,10 @@ class Adjustments(object):
# is conservative.
outbuf_overflow = 1048576
+ # The app_iter will pause when pending output is larger than this value
+ # in bytes.
+ outbuf_high_watermark = 16777216
+
# A tempfile should be created if the pending input is larger than
# inbuf_overflow, which is measured in bytes. The default is 512K. This
# is conservative.
diff --git a/waitress/channel.py b/waitress/channel.py
index 8fee59c..298188a 100644
--- a/waitress/channel.py
+++ b/waitress/channel.py
@@ -75,8 +75,8 @@ class HTTPChannel(wasyncore.dispatcher, object):
# task_lock used to push/pop requests
self.task_lock = threading.Lock()
- # outbuf_lock used to access any outbuf
- self.outbuf_lock = threading.RLock()
+ # outbuf_lock used to access any outbuf (expected to use an RLock)
+ self.outbuf_lock = threading.Condition()
wasyncore.dispatcher.__init__(self, sock, map=map)
@@ -205,6 +205,9 @@ class HTTPChannel(wasyncore.dispatcher, object):
if self.outbuf_lock.acquire(False):
try:
self._flush_some()
+
+ if self.total_outbufs_len < self.adj.outbuf_high_watermark:
+ self.outbuf_lock.notify()
finally:
self.outbuf_lock.release()
@@ -265,6 +268,7 @@ class HTTPChannel(wasyncore.dispatcher, object):
'Unknown exception while trying to close outbuf')
self.total_outbufs_len = 0
self.connected = False
+ self.outbuf_lock.notify()
wasyncore.dispatcher.close(self)
def add_channel(self, map=None):
@@ -299,9 +303,8 @@ class HTTPChannel(wasyncore.dispatcher, object):
# the async mainloop might be popping data off outbuf; we can
# block here waiting for it because we're in a task thread
with self.outbuf_lock:
- # check again after acquiring the lock to ensure we the
- # outbufs are not closed
- if not self.connected: # pragma: no cover
+ overflowed = self._flush_outbufs_below_high_watermark()
+ if not self.connected:
raise ClientDisconnected
if data.__class__ is ReadOnlyFileBasedBuffer:
# they used wsgi.file_wrapper
@@ -309,6 +312,12 @@ class HTTPChannel(wasyncore.dispatcher, object):
nextbuf = OverflowableBuffer(self.adj.outbuf_overflow)
self.outbufs.append(nextbuf)
else:
+ # if we overflowed then start a new buffer to ensure
+ # the original eventually gets pruned otherwise it may
+ # grow unbounded
+ if overflowed:
+ nextbuf = OverflowableBuffer(self.adj.outbuf_overflow)
+ self.outbufs.append(nextbuf)
self.outbufs[-1].append(data)
num_bytes = len(data)
self.total_outbufs_len += num_bytes
@@ -319,6 +328,18 @@ class HTTPChannel(wasyncore.dispatcher, object):
return num_bytes
return 0
+ def _flush_outbufs_below_high_watermark(self):
+ overflowed = self.total_outbufs_len > self.adj.outbuf_high_watermark
+ # check first to avoid locking if possible
+ if overflowed:
+ with self.outbuf_lock:
+ while (
+ self.connected and
+ self.total_outbufs_len > self.adj.outbuf_high_watermark
+ ):
+ self.outbuf_lock.wait()
+ return overflowed
+
def service(self):
"""Execute all pending requests """
with self.task_lock:
@@ -370,6 +391,15 @@ class HTTPChannel(wasyncore.dispatcher, object):
request.close()
self.requests = []
else:
+ # before processing a new request, ensure there is not too
+ # much data in the outbufs waiting to be flushed
+ # NB: currently readable() returns False while we are
+ # flushing data so we know no new requests will come in
+ # that we need to account for, otherwise it'd be better
+ # to do this check at the start of the request instead of
+ # at the end to account for consecutive service() calls
+ if len(self.requests) > 1:
+ self._flush_outbufs_below_high_watermark()
request = self.requests.pop(0)
request.close()
diff --git a/waitress/runner.py b/waitress/runner.py
index abdb38e..6797276 100644
--- a/waitress/runner.py
+++ b/waitress/runner.py
@@ -126,6 +126,11 @@ Tuning options:
A temporary file should be created if the pending output is larger
than this. Default is 1048576 (1MB).
+ --outbuf-high-watermark=INT
+ The app_iter will pause when pending output is larger than this value
+ and will resume once enough data is written to the socket to fall below
+ this threshold. Default is 16777216 (16MB).
+
--inbuf-overflow=INT
A temporary file should be created if the pending input is larger
than this. Default is 524288 (512KB).
diff --git a/waitress/tests/test_channel.py b/waitress/tests/test_channel.py
index f66766b..ff3bba6 100644
--- a/waitress/tests/test_channel.py
+++ b/waitress/tests/test_channel.py
@@ -228,6 +228,65 @@ class TestHTTPChannel(unittest.TestCase):
inst.connected = False
self.assertRaises(ClientDisconnected, lambda: inst.write_soon(b'stuff'))
+ def test_write_soon_disconnected_while_over_watermark(self):
+ from waitress.channel import ClientDisconnected
+ inst, sock, map = self._makeOneWithMap()
+ def dummy_flush():
+ inst.connected = False
+ inst._flush_outbufs_below_high_watermark = dummy_flush
+ self.assertRaises(ClientDisconnected, lambda: inst.write_soon(b'stuff'))
+
+ def test_write_soon_rotates_outbuf_on_overflow(self):
+ inst, sock, map = self._makeOneWithMap()
+ inst.adj.outbuf_high_watermark = 3
+ inst.outbufs = [DummyBuffer(b'abcd')]
+ inst.total_outbufs_len = sum(len(x) for x in inst.outbufs)
+ class Lock(DummyLock):
+ def wait(self):
+ inst.outbufs[0].prune()
+ inst.total_outbufs_len = sum(len(x) for x in inst.outbufs)
+ super(Lock, self).wait()
+ inst.outbuf_lock = Lock()
+ wrote = inst.write_soon(b'xyz')
+ self.assertEqual(wrote, 3)
+ self.assertEqual(len(inst.outbufs), 2)
+ self.assertTrue(inst.outbufs[0].pruned)
+ self.assertEqual(inst.outbufs[1].get(), b'xyz')
+ self.assertTrue(inst.outbuf_lock.waited)
+
+ def test_handle_write_notify_after_flush(self):
+ inst, sock, map = self._makeOneWithMap()
+ inst.requests = [True]
+ inst.outbufs = [DummyBuffer(b'abc')]
+ inst.total_outbufs_len = len(inst.outbufs[0])
+ inst.adj.send_bytes = 1
+ inst.adj.outbuf_high_watermark = 5
+ inst.will_close = False
+ inst.last_activity = 0
+ result = inst.handle_write()
+ self.assertEqual(result, None)
+ self.assertEqual(inst.will_close, False)
+ self.assertTrue(inst.outbuf_lock.acquired)
+ self.assertTrue(inst.outbuf_lock.notified)
+ self.assertEqual(sock.sent, b'abc')
+
+ def test_handle_write_no_notify_after_flush(self):
+ inst, sock, map = self._makeOneWithMap()
+ inst.requests = [True]
+ inst.outbufs = [DummyBuffer(b'abc')]
+ inst.total_outbufs_len = len(inst.outbufs[0])
+ inst.adj.send_bytes = 1
+ inst.adj.outbuf_high_watermark = 2
+ sock.send = lambda x: False
+ inst.will_close = False
+ inst.last_activity = 0
+ result = inst.handle_write()
+ self.assertEqual(result, None)
+ self.assertEqual(inst.will_close, False)
+ self.assertTrue(inst.outbuf_lock.acquired)
+ self.assertFalse(inst.outbuf_lock.notified)
+ self.assertEqual(sock.sent, b'')
+
def test__flush_some_empty_outbuf(self):
inst, sock, map = self._makeOneWithMap()
result = inst._flush_some()
@@ -652,6 +711,7 @@ class DummySock(object):
return len(data)
class DummyLock(object):
+ notified = False
def __init__(self, acquirable=True):
self.acquirable = acquirable
@@ -664,6 +724,12 @@ class DummyLock(object):
def release(self):
self.released = True
+ def notify(self):
+ self.notified = True
+
+ def wait(self):
+ self.waited = True
+
def __exit__(self, type, val, traceback):
self.acquire(True)
@@ -687,6 +753,10 @@ class DummyBuffer(object):
def skip(self, num, x):
self.skipped = num
+ def prune(self):
+ self.pruned = True
+ self.data = b''
+
def __len__(self):
return len(self.data)
@@ -695,6 +765,7 @@ class DummyBuffer(object):
class DummyAdjustments(object):
outbuf_overflow = 1048576
+ outbuf_high_watermark = 1048576
inbuf_overflow = 512000
cleanup_interval = 900
url_scheme = 'http'