summaryrefslogtreecommitdiff
path: root/src/waitress/channel.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/waitress/channel.py')
-rw-r--r--src/waitress/channel.py414
1 files changed, 414 insertions, 0 deletions
diff --git a/src/waitress/channel.py b/src/waitress/channel.py
new file mode 100644
index 0000000..a8bc76f
--- /dev/null
+++ b/src/waitress/channel.py
@@ -0,0 +1,414 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+import socket
+import threading
+import time
+import traceback
+
+from waitress.buffers import (
+ OverflowableBuffer,
+ ReadOnlyFileBasedBuffer,
+)
+
+from waitress.parser import HTTPRequestParser
+
+from waitress.task import (
+ ErrorTask,
+ WSGITask,
+)
+
+from waitress.utilities import InternalServerError
+
+from . import wasyncore
+
+
+class ClientDisconnected(Exception):
+ """ Raised when attempting to write to a closed socket."""
+
+
+class HTTPChannel(wasyncore.dispatcher, object):
+ """
+ Setting self.requests = [somerequest] prevents more requests from being
+ received until the out buffers have been flushed.
+
+ Setting self.requests = [] allows more requests to be received.
+ """
+
+ task_class = WSGITask
+ error_task_class = ErrorTask
+ parser_class = HTTPRequestParser
+
+ request = None # A request parser instance
+ last_activity = 0 # Time of last activity
+ will_close = False # set to True to close the socket.
+ close_when_flushed = False # set to True to close the socket when flushed
+ requests = () # currently pending requests
+ sent_continue = False # used as a latch after sending 100 continue
+ total_outbufs_len = 0 # total bytes ready to send
+ current_outbuf_count = 0 # total bytes written to current outbuf
+
+ #
+ # ASYNCHRONOUS METHODS (including __init__)
+ #
+
+ def __init__(
+ self, server, sock, addr, adj, map=None,
+ ):
+ self.server = server
+ self.adj = adj
+ self.outbufs = [OverflowableBuffer(adj.outbuf_overflow)]
+ self.creation_time = self.last_activity = time.time()
+ self.sendbuf_len = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
+
+ # task_lock used to push/pop requests
+ self.task_lock = threading.Lock()
+ # outbuf_lock used to access any outbuf (expected to use an RLock)
+ self.outbuf_lock = threading.Condition()
+
+ wasyncore.dispatcher.__init__(self, sock, map=map)
+
+ # Don't let wasyncore.dispatcher throttle self.addr on us.
+ self.addr = addr
+
+ def writable(self):
+ # if there's data in the out buffer or we've been instructed to close
+ # the channel (possibly by our server maintenance logic), run
+ # handle_write
+ return self.total_outbufs_len or self.will_close or self.close_when_flushed
+
+ def handle_write(self):
+ # Precondition: there's data in the out buffer to be sent, or
+ # there's a pending will_close request
+ if not self.connected:
+ # we dont want to close the channel twice
+ return
+
+ # try to flush any pending output
+ if not self.requests:
+ # 1. There are no running tasks, so we don't need to try to lock
+ # the outbuf before sending
+ # 2. The data in the out buffer should be sent as soon as possible
+ # because it's either data left over from task output
+ # or a 100 Continue line sent within "received".
+ flush = self._flush_some
+ elif self.total_outbufs_len >= self.adj.send_bytes:
+ # 1. There's a running task, so we need to try to lock
+ # the outbuf before sending
+ # 2. Only try to send if the data in the out buffer is larger
+ # than self.adj_bytes to avoid TCP fragmentation
+ flush = self._flush_some_if_lockable
+ else:
+ # 1. There's not enough data in the out buffer to bother to send
+ # right now.
+ flush = None
+
+ if flush:
+ try:
+ flush()
+ except socket.error:
+ if self.adj.log_socket_errors:
+ self.logger.exception("Socket error")
+ self.will_close = True
+ except Exception:
+ self.logger.exception("Unexpected exception when flushing")
+ self.will_close = True
+
+ if self.close_when_flushed and not self.total_outbufs_len:
+ self.close_when_flushed = False
+ self.will_close = True
+
+ if self.will_close:
+ self.handle_close()
+
+ def readable(self):
+ # We might want to create a new task. We can only do this if:
+ # 1. We're not already about to close the connection.
+ # 2. There's no already currently running task(s).
+ # 3. There's no data in the output buffer that needs to be sent
+ # before we potentially create a new task.
+ return not (self.will_close or self.requests or self.total_outbufs_len)
+
+ def handle_read(self):
+ try:
+ data = self.recv(self.adj.recv_bytes)
+ except socket.error:
+ if self.adj.log_socket_errors:
+ self.logger.exception("Socket error")
+ self.handle_close()
+ return
+ if data:
+ self.last_activity = time.time()
+ self.received(data)
+
+ def received(self, data):
+ """
+ Receives input asynchronously and assigns one or more requests to the
+ channel.
+ """
+ # Preconditions: there's no task(s) already running
+ request = self.request
+ requests = []
+
+ if not data:
+ return False
+
+ while data:
+ if request is None:
+ request = self.parser_class(self.adj)
+ n = request.received(data)
+ if request.expect_continue and request.headers_finished:
+ # guaranteed by parser to be a 1.1 request
+ request.expect_continue = False
+ if not self.sent_continue:
+ # there's no current task, so we don't need to try to
+ # lock the outbuf to append to it.
+ outbuf_payload = b"HTTP/1.1 100 Continue\r\n\r\n"
+ self.outbufs[-1].append(outbuf_payload)
+ self.current_outbuf_count += len(outbuf_payload)
+ self.total_outbufs_len += len(outbuf_payload)
+ self.sent_continue = True
+ self._flush_some()
+ request.completed = False
+ if request.completed:
+ # The request (with the body) is ready to use.
+ self.request = None
+ if not request.empty:
+ requests.append(request)
+ request = None
+ else:
+ self.request = request
+ if n >= len(data):
+ break
+ data = data[n:]
+
+ if requests:
+ self.requests = requests
+ self.server.add_task(self)
+
+ return True
+
+ def _flush_some_if_lockable(self):
+ # Since our task may be appending to the outbuf, we try to acquire
+ # the lock, but we don't block if we can't.
+ if self.outbuf_lock.acquire(False):
+ try:
+ self._flush_some()
+
+ if self.total_outbufs_len < self.adj.outbuf_high_watermark:
+ self.outbuf_lock.notify()
+ finally:
+ self.outbuf_lock.release()
+
+ def _flush_some(self):
+ # Send as much data as possible to our client
+
+ sent = 0
+ dobreak = False
+
+ while True:
+ outbuf = self.outbufs[0]
+ # use outbuf.__len__ rather than len(outbuf) FBO of not getting
+ # OverflowError on 32-bit Python
+ outbuflen = outbuf.__len__()
+ while outbuflen > 0:
+ chunk = outbuf.get(self.sendbuf_len)
+ num_sent = self.send(chunk)
+ if num_sent:
+ outbuf.skip(num_sent, True)
+ outbuflen -= num_sent
+ sent += num_sent
+ self.total_outbufs_len -= num_sent
+ else:
+ # failed to write anything, break out entirely
+ dobreak = True
+ break
+ else:
+ # self.outbufs[-1] must always be a writable outbuf
+ if len(self.outbufs) > 1:
+ toclose = self.outbufs.pop(0)
+ try:
+ toclose.close()
+ except Exception:
+ self.logger.exception("Unexpected error when closing an outbuf")
+ else:
+ # caught up, done flushing for now
+ dobreak = True
+
+ if dobreak:
+ break
+
+ if sent:
+ self.last_activity = time.time()
+ return True
+
+ return False
+
+ def handle_close(self):
+ with self.outbuf_lock:
+ for outbuf in self.outbufs:
+ try:
+ outbuf.close()
+ except Exception:
+ self.logger.exception(
+ "Unknown exception while trying to close outbuf"
+ )
+ self.total_outbufs_len = 0
+ self.connected = False
+ self.outbuf_lock.notify()
+ wasyncore.dispatcher.close(self)
+
+ def add_channel(self, map=None):
+ """See wasyncore.dispatcher
+
+ This hook keeps track of opened channels.
+ """
+ wasyncore.dispatcher.add_channel(self, map)
+ self.server.active_channels[self._fileno] = self
+
+ def del_channel(self, map=None):
+ """See wasyncore.dispatcher
+
+ This hook keeps track of closed channels.
+ """
+ fd = self._fileno # next line sets this to None
+ wasyncore.dispatcher.del_channel(self, map)
+ ac = self.server.active_channels
+ if fd in ac:
+ del ac[fd]
+
+ #
+ # SYNCHRONOUS METHODS
+ #
+
+ def write_soon(self, data):
+ if not self.connected:
+ # if the socket is closed then interrupt the task so that it
+ # can cleanup possibly before the app_iter is exhausted
+ raise ClientDisconnected
+ if data:
+ # the async mainloop might be popping data off outbuf; we can
+ # block here waiting for it because we're in a task thread
+ with self.outbuf_lock:
+ self._flush_outbufs_below_high_watermark()
+ if not self.connected:
+ raise ClientDisconnected
+ num_bytes = len(data)
+ if data.__class__ is ReadOnlyFileBasedBuffer:
+ # they used wsgi.file_wrapper
+ self.outbufs.append(data)
+ nextbuf = OverflowableBuffer(self.adj.outbuf_overflow)
+ self.outbufs.append(nextbuf)
+ self.current_outbuf_count = 0
+ else:
+ if self.current_outbuf_count > self.adj.outbuf_high_watermark:
+ # rotate to a new buffer if the current buffer has hit
+ # the watermark to avoid it growing unbounded
+ nextbuf = OverflowableBuffer(self.adj.outbuf_overflow)
+ self.outbufs.append(nextbuf)
+ self.current_outbuf_count = 0
+ self.outbufs[-1].append(data)
+ self.current_outbuf_count += num_bytes
+ self.total_outbufs_len += num_bytes
+ if self.total_outbufs_len >= self.adj.send_bytes:
+ self.server.pull_trigger()
+ return num_bytes
+ return 0
+
+ def _flush_outbufs_below_high_watermark(self):
+ # check first to avoid locking if possible
+ if self.total_outbufs_len > self.adj.outbuf_high_watermark:
+ with self.outbuf_lock:
+ while (
+ self.connected
+ and self.total_outbufs_len > self.adj.outbuf_high_watermark
+ ):
+ self.server.pull_trigger()
+ self.outbuf_lock.wait()
+
+ def service(self):
+ """Execute all pending requests """
+ with self.task_lock:
+ while self.requests:
+ request = self.requests[0]
+ if request.error:
+ task = self.error_task_class(self, request)
+ else:
+ task = self.task_class(self, request)
+ try:
+ task.service()
+ except ClientDisconnected:
+ self.logger.info(
+ "Client disconnected while serving %s" % task.request.path
+ )
+ task.close_on_finish = True
+ except Exception:
+ self.logger.exception(
+ "Exception while serving %s" % task.request.path
+ )
+ if not task.wrote_header:
+ if self.adj.expose_tracebacks:
+ body = traceback.format_exc()
+ else:
+ body = (
+ "The server encountered an unexpected "
+ "internal server error"
+ )
+ req_version = request.version
+ req_headers = request.headers
+ request = self.parser_class(self.adj)
+ request.error = InternalServerError(body)
+ # copy some original request attributes to fulfill
+ # HTTP 1.1 requirements
+ request.version = req_version
+ try:
+ request.headers["CONNECTION"] = req_headers["CONNECTION"]
+ except KeyError:
+ pass
+ task = self.error_task_class(self, request)
+ try:
+ task.service() # must not fail
+ except ClientDisconnected:
+ task.close_on_finish = True
+ else:
+ task.close_on_finish = True
+ # we cannot allow self.requests to drop to empty til
+ # here; otherwise the mainloop gets confused
+ if task.close_on_finish:
+ self.close_when_flushed = True
+ for request in self.requests:
+ request.close()
+ self.requests = []
+ else:
+ # before processing a new request, ensure there is not too
+ # much data in the outbufs waiting to be flushed
+ # NB: currently readable() returns False while we are
+ # flushing data so we know no new requests will come in
+ # that we need to account for, otherwise it'd be better
+ # to do this check at the start of the request instead of
+ # at the end to account for consecutive service() calls
+ if len(self.requests) > 1:
+ self._flush_outbufs_below_high_watermark()
+ request = self.requests.pop(0)
+ request.close()
+
+ if self.connected:
+ self.server.pull_trigger()
+ self.last_activity = time.time()
+
+ def cancel(self):
+ """ Cancels all pending / active requests """
+ self.will_close = True
+ self.connected = False
+ self.last_activity = time.time()
+ self.requests = []