diff options
author | Michael Merickel <michael@merickel.org> | 2019-03-28 01:57:42 -0500 |
---|---|---|
committer | Michael Merickel <michael@merickel.org> | 2019-04-05 13:00:08 -0500 |
commit | 610544cfd7374d0c0067ba1dec217bb7b4c9092d (patch) | |
tree | 88258b6ea079e290c534f1053c5a589103fd0a03 | |
parent | 5bd69e9545718913bd2be0b4ab04f7da3d9f74c8 (diff) | |
download | waitress-610544cfd7374d0c0067ba1dec217bb7b4c9092d.tar.gz |
add backpressure on the app_iter if the socket can't send data quickly enough
-rw-r--r-- | CHANGES.txt | 18 | ||||
-rw-r--r-- | docs/api.rst | 2 | ||||
-rw-r--r-- | docs/arguments.rst | 7 | ||||
-rw-r--r-- | docs/runner.rst | 5 | ||||
-rw-r--r-- | waitress/adjustments.py | 5 | ||||
-rw-r--r-- | waitress/channel.py | 40 | ||||
-rw-r--r-- | waitress/runner.py | 5 | ||||
-rw-r--r-- | waitress/tests/test_channel.py | 71 |
8 files changed, 144 insertions, 9 deletions
diff --git a/CHANGES.txt b/CHANGES.txt index be24559..d5c6274 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,15 @@ Deprecations Features ~~~~~~~~ +- Add a new ``outbuf_high_watermark`` adjustment which is used to apply + backpressure on the ``app_iter`` to avoid letting it spin faster than data + can be written to the socket. This stabilizes responses that iterate quickly + with a lot of data. + See https://github.com/Pylons/waitress/pull/242 + +Bugfixes +~~~~~~~~ + - Stop early and close the ``app_iter`` when attempting to write to a closed socket due to a client disconnect. This should notify a long-lived streaming response when a client hangs up. @@ -26,9 +35,6 @@ Features still be flushed efficiently. See https://github.com/Pylons/waitress/pull/246 -Bugfixes -~~~~~~~~ - - When a client closes a socket unexpectedly there was potential for memory leaks in which data was written to the buffers after they were closed, causing them to reopen. @@ -43,6 +49,12 @@ Bugfixes the server will die before benefiting from these changes. See https://github.com/Pylons/waitress/pull/245 +- Fix a bug in which an ``app_iter`` that emits data chunks quicker than can + be written to a socket may never cleanup data that has already been sent. + This would cause buffers in waitress to grow without bounds. These buffers + now properly reclaim their space as data is written to the socket. + See https://github.com/Pylons/waitress/pull/242 + 1.2.1 (2019-01-25) ------------------ diff --git a/docs/api.rst b/docs/api.rst index 70c174c..a921a1b 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -5,6 +5,6 @@ .. module:: waitress -.. function:: serve(app, listen='0.0.0.0:8080', unix_socket=None, unix_socket_perms='600', threads=4, url_scheme='http', url_prefix='', ident='waitress', backlog=1204, recv_bytes=8192, send_bytes=1, outbuf_overflow=104856, inbuf_overflow=52488, connection_limit=1000, cleanup_interval=30, channel_timeout=120, log_socket_errors=True, max_request_header_size=262144, max_request_body_size=1073741824, expose_tracebacks=False) +.. function:: serve(app, listen='0.0.0.0:8080', unix_socket=None, unix_socket_perms='600', threads=4, url_scheme='http', url_prefix='', ident='waitress', backlog=1204, recv_bytes=8192, send_bytes=1, outbuf_overflow=104856, outbuf_high_watermark=16777216, inbuf_overflow=52488, connection_limit=1000, cleanup_interval=30, channel_timeout=120, log_socket_errors=True, max_request_header_size=262144, max_request_body_size=1073741824, expose_tracebacks=False) See :ref:`arguments` for more information. diff --git a/docs/arguments.rst b/docs/arguments.rst index 8bacc44..22535a8 100644 --- a/docs/arguments.rst +++ b/docs/arguments.rst @@ -200,6 +200,13 @@ outbuf_overflow Default: ``1048576`` (1MB) +outbuf_high_watermark + The app_iter will pause when pending output is larger than this value + and will resume once enough data is written to the socket to fall below + this threshold. + + Default: ``16777216`` (16MB) + inbuf_overflow A tempfile should be created if the pending input is larger than inbuf_overflow, which is measured in bytes. The default is conservative. diff --git a/docs/runner.rst b/docs/runner.rst index 0b61307..2776e44 100644 --- a/docs/runner.rst +++ b/docs/runner.rst @@ -152,6 +152,11 @@ Tuning options: A temporary file should be created if the pending output is larger than this. Default is 1048576 (1MB). +``--outbuf-high-watermark=INT`` + The app_iter will pause when pending output is larger than this value + and will resume once enough data is written to the socket to fall below + this threshold. Default is 16777216 (16MB). + ``--inbuf-overflow=INT`` A temporary file should be created if the pending input is larger than this. Default is 524288 (512KB). diff --git a/waitress/adjustments.py b/waitress/adjustments.py index 3b0b364..5c1879b 100644 --- a/waitress/adjustments.py +++ b/waitress/adjustments.py @@ -112,6 +112,7 @@ class Adjustments(object): ('recv_bytes', int), ('send_bytes', int), ('outbuf_overflow', int), + ('outbuf_high_watermark', int), ('inbuf_overflow', int), ('connection_limit', int), ('cleanup_interval', int), @@ -204,6 +205,10 @@ class Adjustments(object): # is conservative. outbuf_overflow = 1048576 + # The app_iter will pause when pending output is larger than this value + # in bytes. + outbuf_high_watermark = 16777216 + # A tempfile should be created if the pending input is larger than # inbuf_overflow, which is measured in bytes. The default is 512K. This # is conservative. diff --git a/waitress/channel.py b/waitress/channel.py index 8fee59c..298188a 100644 --- a/waitress/channel.py +++ b/waitress/channel.py @@ -75,8 +75,8 @@ class HTTPChannel(wasyncore.dispatcher, object): # task_lock used to push/pop requests self.task_lock = threading.Lock() - # outbuf_lock used to access any outbuf - self.outbuf_lock = threading.RLock() + # outbuf_lock used to access any outbuf (expected to use an RLock) + self.outbuf_lock = threading.Condition() wasyncore.dispatcher.__init__(self, sock, map=map) @@ -205,6 +205,9 @@ class HTTPChannel(wasyncore.dispatcher, object): if self.outbuf_lock.acquire(False): try: self._flush_some() + + if self.total_outbufs_len < self.adj.outbuf_high_watermark: + self.outbuf_lock.notify() finally: self.outbuf_lock.release() @@ -265,6 +268,7 @@ class HTTPChannel(wasyncore.dispatcher, object): 'Unknown exception while trying to close outbuf') self.total_outbufs_len = 0 self.connected = False + self.outbuf_lock.notify() wasyncore.dispatcher.close(self) def add_channel(self, map=None): @@ -299,9 +303,8 @@ class HTTPChannel(wasyncore.dispatcher, object): # the async mainloop might be popping data off outbuf; we can # block here waiting for it because we're in a task thread with self.outbuf_lock: - # check again after acquiring the lock to ensure we the - # outbufs are not closed - if not self.connected: # pragma: no cover + overflowed = self._flush_outbufs_below_high_watermark() + if not self.connected: raise ClientDisconnected if data.__class__ is ReadOnlyFileBasedBuffer: # they used wsgi.file_wrapper @@ -309,6 +312,12 @@ class HTTPChannel(wasyncore.dispatcher, object): nextbuf = OverflowableBuffer(self.adj.outbuf_overflow) self.outbufs.append(nextbuf) else: + # if we overflowed then start a new buffer to ensure + # the original eventually gets pruned otherwise it may + # grow unbounded + if overflowed: + nextbuf = OverflowableBuffer(self.adj.outbuf_overflow) + self.outbufs.append(nextbuf) self.outbufs[-1].append(data) num_bytes = len(data) self.total_outbufs_len += num_bytes @@ -319,6 +328,18 @@ class HTTPChannel(wasyncore.dispatcher, object): return num_bytes return 0 + def _flush_outbufs_below_high_watermark(self): + overflowed = self.total_outbufs_len > self.adj.outbuf_high_watermark + # check first to avoid locking if possible + if overflowed: + with self.outbuf_lock: + while ( + self.connected and + self.total_outbufs_len > self.adj.outbuf_high_watermark + ): + self.outbuf_lock.wait() + return overflowed + def service(self): """Execute all pending requests """ with self.task_lock: @@ -370,6 +391,15 @@ class HTTPChannel(wasyncore.dispatcher, object): request.close() self.requests = [] else: + # before processing a new request, ensure there is not too + # much data in the outbufs waiting to be flushed + # NB: currently readable() returns False while we are + # flushing data so we know no new requests will come in + # that we need to account for, otherwise it'd be better + # to do this check at the start of the request instead of + # at the end to account for consecutive service() calls + if len(self.requests) > 1: + self._flush_outbufs_below_high_watermark() request = self.requests.pop(0) request.close() diff --git a/waitress/runner.py b/waitress/runner.py index abdb38e..6797276 100644 --- a/waitress/runner.py +++ b/waitress/runner.py @@ -126,6 +126,11 @@ Tuning options: A temporary file should be created if the pending output is larger than this. Default is 1048576 (1MB). + --outbuf-high-watermark=INT + The app_iter will pause when pending output is larger than this value + and will resume once enough data is written to the socket to fall below + this threshold. Default is 16777216 (16MB). + --inbuf-overflow=INT A temporary file should be created if the pending input is larger than this. Default is 524288 (512KB). diff --git a/waitress/tests/test_channel.py b/waitress/tests/test_channel.py index f66766b..ff3bba6 100644 --- a/waitress/tests/test_channel.py +++ b/waitress/tests/test_channel.py @@ -228,6 +228,65 @@ class TestHTTPChannel(unittest.TestCase): inst.connected = False self.assertRaises(ClientDisconnected, lambda: inst.write_soon(b'stuff')) + def test_write_soon_disconnected_while_over_watermark(self): + from waitress.channel import ClientDisconnected + inst, sock, map = self._makeOneWithMap() + def dummy_flush(): + inst.connected = False + inst._flush_outbufs_below_high_watermark = dummy_flush + self.assertRaises(ClientDisconnected, lambda: inst.write_soon(b'stuff')) + + def test_write_soon_rotates_outbuf_on_overflow(self): + inst, sock, map = self._makeOneWithMap() + inst.adj.outbuf_high_watermark = 3 + inst.outbufs = [DummyBuffer(b'abcd')] + inst.total_outbufs_len = sum(len(x) for x in inst.outbufs) + class Lock(DummyLock): + def wait(self): + inst.outbufs[0].prune() + inst.total_outbufs_len = sum(len(x) for x in inst.outbufs) + super(Lock, self).wait() + inst.outbuf_lock = Lock() + wrote = inst.write_soon(b'xyz') + self.assertEqual(wrote, 3) + self.assertEqual(len(inst.outbufs), 2) + self.assertTrue(inst.outbufs[0].pruned) + self.assertEqual(inst.outbufs[1].get(), b'xyz') + self.assertTrue(inst.outbuf_lock.waited) + + def test_handle_write_notify_after_flush(self): + inst, sock, map = self._makeOneWithMap() + inst.requests = [True] + inst.outbufs = [DummyBuffer(b'abc')] + inst.total_outbufs_len = len(inst.outbufs[0]) + inst.adj.send_bytes = 1 + inst.adj.outbuf_high_watermark = 5 + inst.will_close = False + inst.last_activity = 0 + result = inst.handle_write() + self.assertEqual(result, None) + self.assertEqual(inst.will_close, False) + self.assertTrue(inst.outbuf_lock.acquired) + self.assertTrue(inst.outbuf_lock.notified) + self.assertEqual(sock.sent, b'abc') + + def test_handle_write_no_notify_after_flush(self): + inst, sock, map = self._makeOneWithMap() + inst.requests = [True] + inst.outbufs = [DummyBuffer(b'abc')] + inst.total_outbufs_len = len(inst.outbufs[0]) + inst.adj.send_bytes = 1 + inst.adj.outbuf_high_watermark = 2 + sock.send = lambda x: False + inst.will_close = False + inst.last_activity = 0 + result = inst.handle_write() + self.assertEqual(result, None) + self.assertEqual(inst.will_close, False) + self.assertTrue(inst.outbuf_lock.acquired) + self.assertFalse(inst.outbuf_lock.notified) + self.assertEqual(sock.sent, b'') + def test__flush_some_empty_outbuf(self): inst, sock, map = self._makeOneWithMap() result = inst._flush_some() @@ -652,6 +711,7 @@ class DummySock(object): return len(data) class DummyLock(object): + notified = False def __init__(self, acquirable=True): self.acquirable = acquirable @@ -664,6 +724,12 @@ class DummyLock(object): def release(self): self.released = True + def notify(self): + self.notified = True + + def wait(self): + self.waited = True + def __exit__(self, type, val, traceback): self.acquire(True) @@ -687,6 +753,10 @@ class DummyBuffer(object): def skip(self, num, x): self.skipped = num + def prune(self): + self.pruned = True + self.data = b'' + def __len__(self): return len(self.data) @@ -695,6 +765,7 @@ class DummyBuffer(object): class DummyAdjustments(object): outbuf_overflow = 1048576 + outbuf_high_watermark = 1048576 inbuf_overflow = 512000 cleanup_interval = 900 url_scheme = 'http' |