diff options
Diffstat (limited to 'src/waitress/channel.py')
-rw-r--r-- | src/waitress/channel.py | 414 |
1 files changed, 414 insertions, 0 deletions
diff --git a/src/waitress/channel.py b/src/waitress/channel.py new file mode 100644 index 0000000..a8bc76f --- /dev/null +++ b/src/waitress/channel.py @@ -0,0 +1,414 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Foundation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +import socket +import threading +import time +import traceback + +from waitress.buffers import ( + OverflowableBuffer, + ReadOnlyFileBasedBuffer, +) + +from waitress.parser import HTTPRequestParser + +from waitress.task import ( + ErrorTask, + WSGITask, +) + +from waitress.utilities import InternalServerError + +from . import wasyncore + + +class ClientDisconnected(Exception): + """ Raised when attempting to write to a closed socket.""" + + +class HTTPChannel(wasyncore.dispatcher, object): + """ + Setting self.requests = [somerequest] prevents more requests from being + received until the out buffers have been flushed. + + Setting self.requests = [] allows more requests to be received. + """ + + task_class = WSGITask + error_task_class = ErrorTask + parser_class = HTTPRequestParser + + request = None # A request parser instance + last_activity = 0 # Time of last activity + will_close = False # set to True to close the socket. + close_when_flushed = False # set to True to close the socket when flushed + requests = () # currently pending requests + sent_continue = False # used as a latch after sending 100 continue + total_outbufs_len = 0 # total bytes ready to send + current_outbuf_count = 0 # total bytes written to current outbuf + + # + # ASYNCHRONOUS METHODS (including __init__) + # + + def __init__( + self, server, sock, addr, adj, map=None, + ): + self.server = server + self.adj = adj + self.outbufs = [OverflowableBuffer(adj.outbuf_overflow)] + self.creation_time = self.last_activity = time.time() + self.sendbuf_len = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF) + + # task_lock used to push/pop requests + self.task_lock = threading.Lock() + # outbuf_lock used to access any outbuf (expected to use an RLock) + self.outbuf_lock = threading.Condition() + + wasyncore.dispatcher.__init__(self, sock, map=map) + + # Don't let wasyncore.dispatcher throttle self.addr on us. + self.addr = addr + + def writable(self): + # if there's data in the out buffer or we've been instructed to close + # the channel (possibly by our server maintenance logic), run + # handle_write + return self.total_outbufs_len or self.will_close or self.close_when_flushed + + def handle_write(self): + # Precondition: there's data in the out buffer to be sent, or + # there's a pending will_close request + if not self.connected: + # we dont want to close the channel twice + return + + # try to flush any pending output + if not self.requests: + # 1. There are no running tasks, so we don't need to try to lock + # the outbuf before sending + # 2. The data in the out buffer should be sent as soon as possible + # because it's either data left over from task output + # or a 100 Continue line sent within "received". + flush = self._flush_some + elif self.total_outbufs_len >= self.adj.send_bytes: + # 1. There's a running task, so we need to try to lock + # the outbuf before sending + # 2. Only try to send if the data in the out buffer is larger + # than self.adj_bytes to avoid TCP fragmentation + flush = self._flush_some_if_lockable + else: + # 1. There's not enough data in the out buffer to bother to send + # right now. + flush = None + + if flush: + try: + flush() + except socket.error: + if self.adj.log_socket_errors: + self.logger.exception("Socket error") + self.will_close = True + except Exception: + self.logger.exception("Unexpected exception when flushing") + self.will_close = True + + if self.close_when_flushed and not self.total_outbufs_len: + self.close_when_flushed = False + self.will_close = True + + if self.will_close: + self.handle_close() + + def readable(self): + # We might want to create a new task. We can only do this if: + # 1. We're not already about to close the connection. + # 2. There's no already currently running task(s). + # 3. There's no data in the output buffer that needs to be sent + # before we potentially create a new task. + return not (self.will_close or self.requests or self.total_outbufs_len) + + def handle_read(self): + try: + data = self.recv(self.adj.recv_bytes) + except socket.error: + if self.adj.log_socket_errors: + self.logger.exception("Socket error") + self.handle_close() + return + if data: + self.last_activity = time.time() + self.received(data) + + def received(self, data): + """ + Receives input asynchronously and assigns one or more requests to the + channel. + """ + # Preconditions: there's no task(s) already running + request = self.request + requests = [] + + if not data: + return False + + while data: + if request is None: + request = self.parser_class(self.adj) + n = request.received(data) + if request.expect_continue and request.headers_finished: + # guaranteed by parser to be a 1.1 request + request.expect_continue = False + if not self.sent_continue: + # there's no current task, so we don't need to try to + # lock the outbuf to append to it. + outbuf_payload = b"HTTP/1.1 100 Continue\r\n\r\n" + self.outbufs[-1].append(outbuf_payload) + self.current_outbuf_count += len(outbuf_payload) + self.total_outbufs_len += len(outbuf_payload) + self.sent_continue = True + self._flush_some() + request.completed = False + if request.completed: + # The request (with the body) is ready to use. + self.request = None + if not request.empty: + requests.append(request) + request = None + else: + self.request = request + if n >= len(data): + break + data = data[n:] + + if requests: + self.requests = requests + self.server.add_task(self) + + return True + + def _flush_some_if_lockable(self): + # Since our task may be appending to the outbuf, we try to acquire + # the lock, but we don't block if we can't. + if self.outbuf_lock.acquire(False): + try: + self._flush_some() + + if self.total_outbufs_len < self.adj.outbuf_high_watermark: + self.outbuf_lock.notify() + finally: + self.outbuf_lock.release() + + def _flush_some(self): + # Send as much data as possible to our client + + sent = 0 + dobreak = False + + while True: + outbuf = self.outbufs[0] + # use outbuf.__len__ rather than len(outbuf) FBO of not getting + # OverflowError on 32-bit Python + outbuflen = outbuf.__len__() + while outbuflen > 0: + chunk = outbuf.get(self.sendbuf_len) + num_sent = self.send(chunk) + if num_sent: + outbuf.skip(num_sent, True) + outbuflen -= num_sent + sent += num_sent + self.total_outbufs_len -= num_sent + else: + # failed to write anything, break out entirely + dobreak = True + break + else: + # self.outbufs[-1] must always be a writable outbuf + if len(self.outbufs) > 1: + toclose = self.outbufs.pop(0) + try: + toclose.close() + except Exception: + self.logger.exception("Unexpected error when closing an outbuf") + else: + # caught up, done flushing for now + dobreak = True + + if dobreak: + break + + if sent: + self.last_activity = time.time() + return True + + return False + + def handle_close(self): + with self.outbuf_lock: + for outbuf in self.outbufs: + try: + outbuf.close() + except Exception: + self.logger.exception( + "Unknown exception while trying to close outbuf" + ) + self.total_outbufs_len = 0 + self.connected = False + self.outbuf_lock.notify() + wasyncore.dispatcher.close(self) + + def add_channel(self, map=None): + """See wasyncore.dispatcher + + This hook keeps track of opened channels. + """ + wasyncore.dispatcher.add_channel(self, map) + self.server.active_channels[self._fileno] = self + + def del_channel(self, map=None): + """See wasyncore.dispatcher + + This hook keeps track of closed channels. + """ + fd = self._fileno # next line sets this to None + wasyncore.dispatcher.del_channel(self, map) + ac = self.server.active_channels + if fd in ac: + del ac[fd] + + # + # SYNCHRONOUS METHODS + # + + def write_soon(self, data): + if not self.connected: + # if the socket is closed then interrupt the task so that it + # can cleanup possibly before the app_iter is exhausted + raise ClientDisconnected + if data: + # the async mainloop might be popping data off outbuf; we can + # block here waiting for it because we're in a task thread + with self.outbuf_lock: + self._flush_outbufs_below_high_watermark() + if not self.connected: + raise ClientDisconnected + num_bytes = len(data) + if data.__class__ is ReadOnlyFileBasedBuffer: + # they used wsgi.file_wrapper + self.outbufs.append(data) + nextbuf = OverflowableBuffer(self.adj.outbuf_overflow) + self.outbufs.append(nextbuf) + self.current_outbuf_count = 0 + else: + if self.current_outbuf_count > self.adj.outbuf_high_watermark: + # rotate to a new buffer if the current buffer has hit + # the watermark to avoid it growing unbounded + nextbuf = OverflowableBuffer(self.adj.outbuf_overflow) + self.outbufs.append(nextbuf) + self.current_outbuf_count = 0 + self.outbufs[-1].append(data) + self.current_outbuf_count += num_bytes + self.total_outbufs_len += num_bytes + if self.total_outbufs_len >= self.adj.send_bytes: + self.server.pull_trigger() + return num_bytes + return 0 + + def _flush_outbufs_below_high_watermark(self): + # check first to avoid locking if possible + if self.total_outbufs_len > self.adj.outbuf_high_watermark: + with self.outbuf_lock: + while ( + self.connected + and self.total_outbufs_len > self.adj.outbuf_high_watermark + ): + self.server.pull_trigger() + self.outbuf_lock.wait() + + def service(self): + """Execute all pending requests """ + with self.task_lock: + while self.requests: + request = self.requests[0] + if request.error: + task = self.error_task_class(self, request) + else: + task = self.task_class(self, request) + try: + task.service() + except ClientDisconnected: + self.logger.info( + "Client disconnected while serving %s" % task.request.path + ) + task.close_on_finish = True + except Exception: + self.logger.exception( + "Exception while serving %s" % task.request.path + ) + if not task.wrote_header: + if self.adj.expose_tracebacks: + body = traceback.format_exc() + else: + body = ( + "The server encountered an unexpected " + "internal server error" + ) + req_version = request.version + req_headers = request.headers + request = self.parser_class(self.adj) + request.error = InternalServerError(body) + # copy some original request attributes to fulfill + # HTTP 1.1 requirements + request.version = req_version + try: + request.headers["CONNECTION"] = req_headers["CONNECTION"] + except KeyError: + pass + task = self.error_task_class(self, request) + try: + task.service() # must not fail + except ClientDisconnected: + task.close_on_finish = True + else: + task.close_on_finish = True + # we cannot allow self.requests to drop to empty til + # here; otherwise the mainloop gets confused + if task.close_on_finish: + self.close_when_flushed = True + for request in self.requests: + request.close() + self.requests = [] + else: + # before processing a new request, ensure there is not too + # much data in the outbufs waiting to be flushed + # NB: currently readable() returns False while we are + # flushing data so we know no new requests will come in + # that we need to account for, otherwise it'd be better + # to do this check at the start of the request instead of + # at the end to account for consecutive service() calls + if len(self.requests) > 1: + self._flush_outbufs_below_high_watermark() + request = self.requests.pop(0) + request.close() + + if self.connected: + self.server.pull_trigger() + self.last_activity = time.time() + + def cancel(self): + """ Cancels all pending / active requests """ + self.will_close = True + self.connected = False + self.last_activity = time.time() + self.requests = [] |