summaryrefslogtreecommitdiff
path: root/waitress/channel.py
diff options
context:
space:
mode:
Diffstat (limited to 'waitress/channel.py')
-rw-r--r--waitress/channel.py338
1 files changed, 178 insertions, 160 deletions
diff --git a/waitress/channel.py b/waitress/channel.py
index 2a629b5..3bdfa17 100644
--- a/waitress/channel.py
+++ b/waitress/channel.py
@@ -15,14 +15,14 @@
"""
import asyncore
import socket
-import sys
import time
import traceback
-from waitress.compat import thread
from waitress.buffers import OverflowableBuffer
from waitress.parser import HTTPRequestParser
+from waitress.compat import thread
+
from waitress.task import (
ErrorTask,
WSGITask,
@@ -36,22 +36,23 @@ from waitress.utilities import (
class HTTPChannel(logging_dispatcher, object):
"""Channel that switches between asynchronous and synchronous mode.
- Set self.task = sometask before using a channel in a thread other than
- the thread handling the main loop.
+ Set self.requests = [somerequest] before using a channel in a thread other
+ than the thread handling the main loop.
- Set self.task = None to give the channel back to the thread handling
+ Set self.requests = [] to give the channel back to the thread handling
the main loop.
"""
task_class = WSGITask
error_task_class = ErrorTask
parser_class = HTTPRequestParser
- task_lock = thread.allocate_lock() # syncs access to task-related attrs
-
- request = None # A request parser instance
- last_activity = 0 # Time of last activity
- will_close = False # will_close is set to True to close the socket.
- task = None # currently running task
+ request = None # A request parser instance
+ last_activity = 0 # Time of last activity
+ will_close = False # set to True to close the socket.
+ requests = () # currently pending requests
+ sent_continue = False # used as a latch after sending 100 continue
+ task_lock = thread.allocate_lock() # lock used to push/pop requests
+ force_flush = False # indicates a need to flush the outbuf
#
# ASYNCHRONOUS METHODS (including __init__)
@@ -69,43 +70,147 @@ class HTTPChannel(logging_dispatcher, object):
self.addr = addr
self.adj = adj
self.outbuf = OverflowableBuffer(adj.outbuf_overflow)
- self.inbuf = OverflowableBuffer(adj.inbuf_overflow)
self.creation_time = self.last_activity = time.time()
asyncore.dispatcher.__init__(self, sock, map=map)
def writable(self):
- if self.task is not None:
- return False
- return self.will_close or self.outbuf
+ return bool(self.outbuf)
def handle_write(self):
- if self.task is not None:
+ # Precondition: there's data in the out buffer to be sent
+ if not self.connected:
return
- if self.outbuf:
+
+ if not self.requests:
+ # 1. There are no running tasks, so we don't need to try to lock
+ # the outbuf before sending
+ # 2. The data in the out buffer should be sent as soon as possible
+ # because it's either data left over from task output
+ # or a 100 Continue line sent within "received".
+ flush = self._flush_some
+ elif self.force_flush:
+ # 1. There's a running task, so we need to try to lock
+ # the outbuf before sending
+ # 2. This is the last chunk sent by the Nth of M tasks in a
+ # sequence on this channel, so flush it regardless of whether
+ # it's >= self.adj.send_bytes. We need to do this now, or it
+ # won't get done.
+ flush = self._flush_some_if_lockable
+ self.force_flush = False
+ elif (len(self.outbuf) >= self.adj.send_bytes):
+ # 1. There's a running task, so we need to try to lock
+ # the outbuf before sending
+ # 2. Only try to send if the data in the out buffer is larger
+ # than self.adj_bytes to avoid TCP fragmentation
+ flush = self._flush_some_if_lockable
+ self.force_flush = False
+ else:
+ # 1. There's not enough data in the out buffer to bother to send
+ # right now.
+ flush = None
+
+ if flush:
try:
- self._flush_some()
+ flush()
except socket.error:
- self.handle_comm_error()
- elif self.will_close:
+ if self.adj.log_socket_errors:
+ self.logger.exception('Socket error')
+ self.will_close = True
+ if self.will_close:
self.handle_close()
- self.last_activity = time.time()
def readable(self):
- if self.task is not None or self.will_close:
- return False
- if self.inbuf:
- self.received()
- return not self.will_close
+ # We might want to create a new task. We can only do this if:
+ # 1. We're not already about to close the connection.
+ # 2. There's no already currently running task(s).
+ # 3. There's no data in the output buffer that needs to be sent
+ # before we potentially create a new task.
+ return not (self.will_close or self.requests or self.outbuf)
def handle_read(self):
try:
data = self.recv(self.adj.recv_bytes)
except socket.error:
- self.handle_comm_error()
+ if self.adj.log_socket_errors:
+ self.logger.exception('Socket error')
+ self.handle_close()
return
- self.last_activity = time.time()
if data:
- self.inbuf.append(data)
+ self.last_activity = time.time()
+ self.received(data)
+
+ def received(self, data):
+ """
+ Receives input asynchronously and assigns a task to the channel.
+ """
+ # Preconditions: there's no task(s) already running
+ request = self.request
+ requests = []
+
+ if not data:
+ return False
+
+ while data:
+ if request is None:
+ request = self.parser_class(self.adj)
+ n = request.received(data)
+ if request.expect_continue and request.headers_finished:
+ # guaranteed by parser to be a 1.1 request
+ request.expect_continue = False
+ if not self.sent_continue:
+ # there's no current task, so we don't need to try to
+ # lock the outbuf to append to it.
+ self.outbuf.append(b'HTTP/1.1 100 Continue\r\n\r\n')
+ self.sent_expect_continue = True
+ self._flush_some()
+ request.completed = False
+ if request.completed:
+ # The request (with the body) is ready to use.
+ self.request = None
+ if not request.empty:
+ requests.append(request)
+ request = None
+ else:
+ self.request = request
+ if n >= len(data):
+ break
+ data = data[n:]
+
+ if requests:
+ self.requests = requests
+ self.server.add_task(self)
+
+ return True
+
+ def _flush_some_if_lockable(self):
+ # Since our task may be appending to the outbuf, we try to acquire
+ # the lock, but we don't block if we can't.
+ outbuf = self.outbuf
+ locked = outbuf.lock.acquire(0)
+ if locked:
+ try:
+ self._flush_some()
+ finally:
+ outbuf.lock.release()
+
+ def _flush_some(self):
+ # Send as much data as possible to our client
+ outbuf = self.outbuf
+ outbuflen = len(outbuf)
+ while outbuflen > 0:
+ chunk = outbuf.get(self.adj.send_bytes)
+ num_sent = self.send(chunk)
+ if num_sent:
+ outbuf.skip(num_sent, True)
+ outbuflen -= num_sent
+ else:
+ return False
+ self.last_activity = time.time()
+ return True
+
+ def handle_close(self):
+ self.connected = False
+ asyncore.dispatcher.close(self)
def add_channel(self, map=None):
"""See asyncore.dispatcher
@@ -126,149 +231,62 @@ class HTTPChannel(logging_dispatcher, object):
if fd in ac:
del ac[fd]
- def received(self):
- """
- Receives input asynchronously and assigns a task to the channel.
- """
- if self.task is not None:
- return False
- chunk = self.inbuf.get(self.adj.recv_bytes)
- if not chunk:
- return False
- if self.request is None:
- self.request = self.parser_class(self.adj)
- request = self.request
- n = request.received(chunk)
- if n:
- self.inbuf.skip(n, True)
- if request.expect_continue and request.headers_finished:
- # guaranteed by parser to be a 1.1 request
- self.write(b'HTTP/1.1 100 Continue\r\n\r\n')
- request.expect_continue = False
- if request.completed:
- # The request (with the body) is ready to use.
- if request.connection_close and self.inbuf:
- self.inbuf = OverflowableBuffer(self.adj.inbuf_overflow)
- self.request = None
- if not request.empty:
- if request.error:
- self.inbuf = OverflowableBuffer(self.adj.inbuf_overflow)
- task = self.error_task_class(self, request)
- else:
- task = self.task_class(self, request)
- self.task = task
- self.server.add_task(self)
- return
- if self.inbuf:
- self.server.pull_trigger()
-
- def handle_error(self, exc_info=None): # exc_info for tests
- """See async.dispatcher
-
- Handles program errors (not communication errors)
- """
- if exc_info is None: # pragma: no cover
- t, v = sys.exc_info()[:2]
- else:
- t, v = exc_info[:2]
- if t is SystemExit or t is KeyboardInterrupt:
- raise t(v)
- asyncore.dispatcher.handle_error(self)
-
- def handle_comm_error(self):
- """
- Handles communication errors (not program errors)
- """
- if self.adj.log_socket_errors:
- # handle_error calls close
- self.handle_error()
- else:
- # Ignore socket errors.
- self.handle_close()
-
#
- # METHODS USED IN BOTH MODES
+ # SYNCHRONOUS METHODS
#
- def handle_close(self):
- # Always close in asynchronous mode. If the connection is
- # closed in a thread, the main loop can end up with a bad file
- # descriptor.
- # XXX this method is probably called in a thread by virtue of
- # "handle_comm_error"
- if self.task is not None:
- self.will_close = True
- return
- self.connected = False
- asyncore.dispatcher.close(self)
-
- def write(self, data):
- wrote = 0
+ def write_soon(self, data):
if data:
- self.outbuf.append(data)
- wrote = len(data)
-
- while len(self.outbuf) >= self.adj.send_bytes:
- # Send what we can without blocking.
- # We propagate errors to the application on purpose
- # (to stop the application if the connection closes).
- if not self._flush_some(): # pragma: no cover (coverage bug?)
- break
-
- return wrote
-
- def _flush_some(self):
- """Flushes data.
-
- Returns True if some data was sent."""
- outbuf = self.outbuf
- if outbuf and self.connected:
- chunk = outbuf.get(self.adj.send_bytes)
- num_sent = self.send(chunk)
- if num_sent:
- outbuf.skip(num_sent, True)
- return True
- return False
-
- #
- # ITask implementation. Delegates to the queued tasks.
- #
+ # the async mainloop might be popping data off outbuf; we can
+ # block here waiting for it because we're in a thread
+ with self.outbuf.lock:
+ self.outbuf.append(data)
+ return len(data)
+ return 0
def service(self):
"""Execute a pending task"""
- if self.task is None:
- return
- task = self.task
- try:
- task.service()
- except:
- self.logger.exception('Exception when serving %s' %
- task.request.uri)
- if not task.wrote_header:
- if self.adj.expose_tracebacks:
- body = traceback.format_exc()
+ with self.task_lock:
+ while self.requests:
+ request = self.requests[0]
+ if request.error:
+ task = self.error_task_class(self, request)
else:
- body = ('The server encountered an unexpected internal '
- 'server error')
- request = self.parser_class(self.adj)
- request.error = InternalServerError(body)
- task = self.error_task_class(self, request)
- task.service() # must not fail
- else:
- task.close_on_finish = True
- while self._flush_some():
- pass
- self.task = None
- if task.close_on_finish:
- self.will_close = True
+ task = self.task_class(self, request)
+ try:
+ task.service()
+ except:
+ self.logger.exception('Exception when serving %s' %
+ task.request.uri)
+ if not task.wrote_header:
+ if self.adj.expose_tracebacks:
+ body = traceback.format_exc()
+ else:
+ body = ('The server encountered an unexpected '
+ 'internal server error')
+ request = self.parser_class(self.adj)
+ request.error = InternalServerError(body)
+ task = self.error_task_class(self, request)
+ task.service() # must not fail
+ else:
+ task.close_on_finish = True
+ # we cannot allow self.requests to drop to empty til
+ # here; otherwise the mainloop gets confused
+ if task.close_on_finish:
+ self.will_close = True
+ self.requests = []
+ else:
+ self.requests.pop(0)
+
+ self.force_flush = True
self.server.pull_trigger()
self.last_activity = time.time()
def cancel(self):
- """Cancels all pending tasks"""
- if self.task is not None:
- self.task.cancel()
- self.task = None
+ """ Cancels all pending requests """
+ self.force_flush = True
+ self.last_activity = time.time()
+ self.requests = []
def defer(self):
pass