diff options
author | Bert JW Regeer <xistence@0x58.com> | 2019-04-06 15:54:53 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-04-06 15:54:53 -0600 |
commit | 5583715063c30fda571ab0e0169b0068403dc53d (patch) | |
tree | 91021216340c576eb796d685bfe8683050946113 | |
parent | 5bd69e9545718913bd2be0b4ab04f7da3d9f74c8 (diff) | |
parent | 9e1b10aa3d8787868ac8d7556b27d36a22357660 (diff) | |
download | waitress-5583715063c30fda571ab0e0169b0068403dc53d.tar.gz |
Merge pull request #242 from Pylons/outbuf-high-watermark
outbuf_high_watermark
-rw-r--r-- | CHANGES.txt | 11 | ||||
-rw-r--r-- | docs/api.rst | 2 | ||||
-rw-r--r-- | docs/arguments.rst | 7 | ||||
-rw-r--r-- | docs/runner.rst | 5 | ||||
-rw-r--r-- | waitress/adjustments.py | 5 | ||||
-rw-r--r-- | waitress/channel.py | 80 | ||||
-rw-r--r-- | waitress/runner.py | 5 | ||||
-rw-r--r-- | waitress/tests/test_channel.py | 76 |
8 files changed, 165 insertions, 26 deletions
diff --git a/CHANGES.txt b/CHANGES.txt index be24559..2897939 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,12 @@ Deprecations Features ~~~~~~~~ +- Add a new ``outbuf_high_watermark`` adjustment which is used to apply + backpressure on the ``app_iter`` to avoid letting it spin faster than data + can be written to the socket. This stabilizes responses that iterate quickly + with a lot of data. + See https://github.com/Pylons/waitress/pull/242 + - Stop early and close the ``app_iter`` when attempting to write to a closed socket due to a client disconnect. This should notify a long-lived streaming response when a client hangs up. @@ -43,6 +49,11 @@ Bugfixes the server will die before benefiting from these changes. See https://github.com/Pylons/waitress/pull/245 +- Fix a bug in which a streaming ``app_iter`` may never cleanup data that has + already been sent. This would cause buffers in waitress to grow without + bounds. These buffers now properly rotate and release their data. + See https://github.com/Pylons/waitress/pull/242 + 1.2.1 (2019-01-25) ------------------ diff --git a/docs/api.rst b/docs/api.rst index 70c174c..a921a1b 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -5,6 +5,6 @@ .. module:: waitress -.. function:: serve(app, listen='0.0.0.0:8080', unix_socket=None, unix_socket_perms='600', threads=4, url_scheme='http', url_prefix='', ident='waitress', backlog=1204, recv_bytes=8192, send_bytes=1, outbuf_overflow=104856, inbuf_overflow=52488, connection_limit=1000, cleanup_interval=30, channel_timeout=120, log_socket_errors=True, max_request_header_size=262144, max_request_body_size=1073741824, expose_tracebacks=False) +.. function:: serve(app, listen='0.0.0.0:8080', unix_socket=None, unix_socket_perms='600', threads=4, url_scheme='http', url_prefix='', ident='waitress', backlog=1204, recv_bytes=8192, send_bytes=1, outbuf_overflow=104856, outbuf_high_watermark=16777216, inbuf_overflow=52488, connection_limit=1000, cleanup_interval=30, channel_timeout=120, log_socket_errors=True, max_request_header_size=262144, max_request_body_size=1073741824, expose_tracebacks=False) See :ref:`arguments` for more information. diff --git a/docs/arguments.rst b/docs/arguments.rst index 8bacc44..22535a8 100644 --- a/docs/arguments.rst +++ b/docs/arguments.rst @@ -200,6 +200,13 @@ outbuf_overflow Default: ``1048576`` (1MB) +outbuf_high_watermark + The app_iter will pause when pending output is larger than this value + and will resume once enough data is written to the socket to fall below + this threshold. + + Default: ``16777216`` (16MB) + inbuf_overflow A tempfile should be created if the pending input is larger than inbuf_overflow, which is measured in bytes. The default is conservative. diff --git a/docs/runner.rst b/docs/runner.rst index 0b61307..2776e44 100644 --- a/docs/runner.rst +++ b/docs/runner.rst @@ -152,6 +152,11 @@ Tuning options: A temporary file should be created if the pending output is larger than this. Default is 1048576 (1MB). +``--outbuf-high-watermark=INT`` + The app_iter will pause when pending output is larger than this value + and will resume once enough data is written to the socket to fall below + this threshold. Default is 16777216 (16MB). + ``--inbuf-overflow=INT`` A temporary file should be created if the pending input is larger than this. Default is 524288 (512KB). diff --git a/waitress/adjustments.py b/waitress/adjustments.py index 3b0b364..5c1879b 100644 --- a/waitress/adjustments.py +++ b/waitress/adjustments.py @@ -112,6 +112,7 @@ class Adjustments(object): ('recv_bytes', int), ('send_bytes', int), ('outbuf_overflow', int), + ('outbuf_high_watermark', int), ('inbuf_overflow', int), ('connection_limit', int), ('cleanup_interval', int), @@ -204,6 +205,10 @@ class Adjustments(object): # is conservative. outbuf_overflow = 1048576 + # The app_iter will pause when pending output is larger than this value + # in bytes. + outbuf_high_watermark = 16777216 + # A tempfile should be created if the pending input is larger than # inbuf_overflow, which is measured in bytes. The default is 512K. This # is conservative. diff --git a/waitress/channel.py b/waitress/channel.py index 8fee59c..12e6fce 100644 --- a/waitress/channel.py +++ b/waitress/channel.py @@ -53,6 +53,8 @@ class HTTPChannel(wasyncore.dispatcher, object): close_when_flushed = False # set to True to close the socket when flushed requests = () # currently pending requests sent_continue = False # used as a latch after sending 100 continue + total_outbufs_len = 0 # total bytes ready to send + current_outbuf_count = 0 # total bytes written to current outbuf # # ASYNCHRONOUS METHODS (including __init__) @@ -69,14 +71,13 @@ class HTTPChannel(wasyncore.dispatcher, object): self.server = server self.adj = adj self.outbufs = [OverflowableBuffer(adj.outbuf_overflow)] - self.total_outbufs_len = 0 self.creation_time = self.last_activity = time.time() self.sendbuf_len = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF) # task_lock used to push/pop requests self.task_lock = threading.Lock() - # outbuf_lock used to access any outbuf - self.outbuf_lock = threading.RLock() + # outbuf_lock used to access any outbuf (expected to use an RLock) + self.outbuf_lock = threading.Condition() wasyncore.dispatcher.__init__(self, sock, map=map) @@ -122,7 +123,7 @@ class HTTPChannel(wasyncore.dispatcher, object): if self.adj.log_socket_errors: self.logger.exception('Socket error') self.will_close = True - except: + except Exception: self.logger.exception('Unexpected exception when flushing') self.will_close = True @@ -177,6 +178,7 @@ class HTTPChannel(wasyncore.dispatcher, object): # lock the outbuf to append to it. outbuf_payload = b'HTTP/1.1 100 Continue\r\n\r\n' self.outbufs[-1].append(outbuf_payload) + self.current_outbuf_count += len(outbuf_payload) self.total_outbufs_len += len(outbuf_payload) self.sent_continue = True self._flush_some() @@ -205,6 +207,9 @@ class HTTPChannel(wasyncore.dispatcher, object): if self.outbuf_lock.acquire(False): try: self._flush_some() + + if self.total_outbufs_len < self.adj.outbuf_high_watermark: + self.outbuf_lock.notify() finally: self.outbuf_lock.release() @@ -217,23 +222,8 @@ class HTTPChannel(wasyncore.dispatcher, object): while True: outbuf = self.outbufs[0] # use outbuf.__len__ rather than len(outbuf) FBO of not getting - # OverflowError on Python 2 + # OverflowError on 32-bit Python outbuflen = outbuf.__len__() - if outbuflen <= 0: - # self.outbufs[-1] must always be a writable outbuf - if len(self.outbufs) > 1: - toclose = self.outbufs.pop(0) - try: - toclose.close() - except: - self.logger.exception( - 'Unexpected error when closing an outbuf') - continue # pragma: no cover (coverage bug, it is hit) - else: - if hasattr(outbuf, 'prune'): - outbuf.prune() - dobreak = True - while outbuflen > 0: chunk = outbuf.get(self.sendbuf_len) num_sent = self.send(chunk) @@ -243,8 +233,21 @@ class HTTPChannel(wasyncore.dispatcher, object): sent += num_sent self.total_outbufs_len -= num_sent else: + # failed to write anything, break out entirely dobreak = True break + else: + # self.outbufs[-1] must always be a writable outbuf + if len(self.outbufs) > 1: + toclose = self.outbufs.pop(0) + try: + toclose.close() + except Exception: + self.logger.exception( + 'Unexpected error when closing an outbuf') + else: + # caught up, done flushing for now + dobreak = True if dobreak: break @@ -260,11 +263,12 @@ class HTTPChannel(wasyncore.dispatcher, object): for outbuf in self.outbufs: try: outbuf.close() - except: + except Exception: self.logger.exception( 'Unknown exception while trying to close outbuf') self.total_outbufs_len = 0 self.connected = False + self.outbuf_lock.notify() wasyncore.dispatcher.close(self) def add_channel(self, map=None): @@ -299,18 +303,25 @@ class HTTPChannel(wasyncore.dispatcher, object): # the async mainloop might be popping data off outbuf; we can # block here waiting for it because we're in a task thread with self.outbuf_lock: - # check again after acquiring the lock to ensure we the - # outbufs are not closed - if not self.connected: # pragma: no cover + self._flush_outbufs_below_high_watermark() + if not self.connected: raise ClientDisconnected if data.__class__ is ReadOnlyFileBasedBuffer: # they used wsgi.file_wrapper self.outbufs.append(data) nextbuf = OverflowableBuffer(self.adj.outbuf_overflow) self.outbufs.append(nextbuf) + self.current_outbuf_count = 0 else: + if self.current_outbuf_count > self.adj.outbuf_high_watermark: + # rotate to a new buffer if the current buffer has hit + # the watermark to avoid it growing unbounded + nextbuf = OverflowableBuffer(self.adj.outbuf_overflow) + self.outbufs.append(nextbuf) + self.current_outbuf_count = 0 self.outbufs[-1].append(data) num_bytes = len(data) + self.current_outbuf_count += num_bytes self.total_outbufs_len += num_bytes # XXX We might eventually need to pull the trigger here (to # instruct select to stop blocking), but it slows things down so @@ -319,6 +330,16 @@ class HTTPChannel(wasyncore.dispatcher, object): return num_bytes return 0 + def _flush_outbufs_below_high_watermark(self): + # check first to avoid locking if possible + if self.total_outbufs_len > self.adj.outbuf_high_watermark: + with self.outbuf_lock: + while ( + self.connected and + self.total_outbufs_len > self.adj.outbuf_high_watermark + ): + self.outbuf_lock.wait() + def service(self): """Execute all pending requests """ with self.task_lock: @@ -334,7 +355,7 @@ class HTTPChannel(wasyncore.dispatcher, object): self.logger.info('Client disconnected while serving %s' % task.request.path) task.close_on_finish = True - except: + except Exception: self.logger.exception('Exception while serving %s' % task.request.path) if not task.wrote_header: @@ -370,6 +391,15 @@ class HTTPChannel(wasyncore.dispatcher, object): request.close() self.requests = [] else: + # before processing a new request, ensure there is not too + # much data in the outbufs waiting to be flushed + # NB: currently readable() returns False while we are + # flushing data so we know no new requests will come in + # that we need to account for, otherwise it'd be better + # to do this check at the start of the request instead of + # at the end to account for consecutive service() calls + if len(self.requests) > 1: + self._flush_outbufs_below_high_watermark() request = self.requests.pop(0) request.close() diff --git a/waitress/runner.py b/waitress/runner.py index abdb38e..6797276 100644 --- a/waitress/runner.py +++ b/waitress/runner.py @@ -126,6 +126,11 @@ Tuning options: A temporary file should be created if the pending output is larger than this. Default is 1048576 (1MB). + --outbuf-high-watermark=INT + The app_iter will pause when pending output is larger than this value + and will resume once enough data is written to the socket to fall below + this threshold. Default is 16777216 (16MB). + --inbuf-overflow=INT A temporary file should be created if the pending input is larger than this. Default is 524288 (512KB). diff --git a/waitress/tests/test_channel.py b/waitress/tests/test_channel.py index f66766b..35fccf6 100644 --- a/waitress/tests/test_channel.py +++ b/waitress/tests/test_channel.py @@ -228,6 +228,74 @@ class TestHTTPChannel(unittest.TestCase): inst.connected = False self.assertRaises(ClientDisconnected, lambda: inst.write_soon(b'stuff')) + def test_write_soon_disconnected_while_over_watermark(self): + from waitress.channel import ClientDisconnected + inst, sock, map = self._makeOneWithMap() + def dummy_flush(): + inst.connected = False + inst._flush_outbufs_below_high_watermark = dummy_flush + self.assertRaises(ClientDisconnected, lambda: inst.write_soon(b'stuff')) + + def test_write_soon_rotates_outbuf_on_overflow(self): + inst, sock, map = self._makeOneWithMap() + inst.adj.outbuf_high_watermark = 3 + inst.current_outbuf_count = 4 + wrote = inst.write_soon(b'xyz') + self.assertEqual(wrote, 3) + self.assertEqual(len(inst.outbufs), 2) + self.assertEqual(inst.outbufs[0].get(), b'') + self.assertEqual(inst.outbufs[1].get(), b'xyz') + + def test_write_soon_waits_on_backpressure(self): + inst, sock, map = self._makeOneWithMap() + inst.adj.outbuf_high_watermark = 3 + inst.total_outbufs_len = 4 + inst.current_outbuf_count = 4 + class Lock(DummyLock): + def wait(self): + inst.total_outbufs_len = 0 + super(Lock, self).wait() + inst.outbuf_lock = Lock() + wrote = inst.write_soon(b'xyz') + self.assertEqual(wrote, 3) + self.assertEqual(len(inst.outbufs), 2) + self.assertEqual(inst.outbufs[0].get(), b'') + self.assertEqual(inst.outbufs[1].get(), b'xyz') + self.assertTrue(inst.outbuf_lock.waited) + + def test_handle_write_notify_after_flush(self): + inst, sock, map = self._makeOneWithMap() + inst.requests = [True] + inst.outbufs = [DummyBuffer(b'abc')] + inst.total_outbufs_len = len(inst.outbufs[0]) + inst.adj.send_bytes = 1 + inst.adj.outbuf_high_watermark = 5 + inst.will_close = False + inst.last_activity = 0 + result = inst.handle_write() + self.assertEqual(result, None) + self.assertEqual(inst.will_close, False) + self.assertTrue(inst.outbuf_lock.acquired) + self.assertTrue(inst.outbuf_lock.notified) + self.assertEqual(sock.sent, b'abc') + + def test_handle_write_no_notify_after_flush(self): + inst, sock, map = self._makeOneWithMap() + inst.requests = [True] + inst.outbufs = [DummyBuffer(b'abc')] + inst.total_outbufs_len = len(inst.outbufs[0]) + inst.adj.send_bytes = 1 + inst.adj.outbuf_high_watermark = 2 + sock.send = lambda x: False + inst.will_close = False + inst.last_activity = 0 + result = inst.handle_write() + self.assertEqual(result, None) + self.assertEqual(inst.will_close, False) + self.assertTrue(inst.outbuf_lock.acquired) + self.assertFalse(inst.outbuf_lock.notified) + self.assertEqual(sock.sent, b'') + def test__flush_some_empty_outbuf(self): inst, sock, map = self._makeOneWithMap() result = inst._flush_some() @@ -652,6 +720,7 @@ class DummySock(object): return len(data) class DummyLock(object): + notified = False def __init__(self, acquirable=True): self.acquirable = acquirable @@ -664,6 +733,12 @@ class DummyLock(object): def release(self): self.released = True + def notify(self): + self.notified = True + + def wait(self): + self.waited = True + def __exit__(self, type, val, traceback): self.acquire(True) @@ -695,6 +770,7 @@ class DummyBuffer(object): class DummyAdjustments(object): outbuf_overflow = 1048576 + outbuf_high_watermark = 1048576 inbuf_overflow = 512000 cleanup_interval = 900 url_scheme = 'http' |