diff options
Diffstat (limited to 'waitress/channel.py')
-rw-r--r-- | waitress/channel.py | 338 |
1 files changed, 178 insertions, 160 deletions
diff --git a/waitress/channel.py b/waitress/channel.py index 2a629b5..3bdfa17 100644 --- a/waitress/channel.py +++ b/waitress/channel.py @@ -15,14 +15,14 @@ """ import asyncore import socket -import sys import time import traceback -from waitress.compat import thread from waitress.buffers import OverflowableBuffer from waitress.parser import HTTPRequestParser +from waitress.compat import thread + from waitress.task import ( ErrorTask, WSGITask, @@ -36,22 +36,23 @@ from waitress.utilities import ( class HTTPChannel(logging_dispatcher, object): """Channel that switches between asynchronous and synchronous mode. - Set self.task = sometask before using a channel in a thread other than - the thread handling the main loop. + Set self.requests = [somerequest] before using a channel in a thread other + than the thread handling the main loop. - Set self.task = None to give the channel back to the thread handling + Set self.requests = [] to give the channel back to the thread handling the main loop. """ task_class = WSGITask error_task_class = ErrorTask parser_class = HTTPRequestParser - task_lock = thread.allocate_lock() # syncs access to task-related attrs - - request = None # A request parser instance - last_activity = 0 # Time of last activity - will_close = False # will_close is set to True to close the socket. - task = None # currently running task + request = None # A request parser instance + last_activity = 0 # Time of last activity + will_close = False # set to True to close the socket. + requests = () # currently pending requests + sent_continue = False # used as a latch after sending 100 continue + task_lock = thread.allocate_lock() # lock used to push/pop requests + force_flush = False # indicates a need to flush the outbuf # # ASYNCHRONOUS METHODS (including __init__) @@ -69,43 +70,147 @@ class HTTPChannel(logging_dispatcher, object): self.addr = addr self.adj = adj self.outbuf = OverflowableBuffer(adj.outbuf_overflow) - self.inbuf = OverflowableBuffer(adj.inbuf_overflow) self.creation_time = self.last_activity = time.time() asyncore.dispatcher.__init__(self, sock, map=map) def writable(self): - if self.task is not None: - return False - return self.will_close or self.outbuf + return bool(self.outbuf) def handle_write(self): - if self.task is not None: + # Precondition: there's data in the out buffer to be sent + if not self.connected: return - if self.outbuf: + + if not self.requests: + # 1. There are no running tasks, so we don't need to try to lock + # the outbuf before sending + # 2. The data in the out buffer should be sent as soon as possible + # because it's either data left over from task output + # or a 100 Continue line sent within "received". + flush = self._flush_some + elif self.force_flush: + # 1. There's a running task, so we need to try to lock + # the outbuf before sending + # 2. This is the last chunk sent by the Nth of M tasks in a + # sequence on this channel, so flush it regardless of whether + # it's >= self.adj.send_bytes. We need to do this now, or it + # won't get done. + flush = self._flush_some_if_lockable + self.force_flush = False + elif (len(self.outbuf) >= self.adj.send_bytes): + # 1. There's a running task, so we need to try to lock + # the outbuf before sending + # 2. Only try to send if the data in the out buffer is larger + # than self.adj_bytes to avoid TCP fragmentation + flush = self._flush_some_if_lockable + self.force_flush = False + else: + # 1. There's not enough data in the out buffer to bother to send + # right now. + flush = None + + if flush: try: - self._flush_some() + flush() except socket.error: - self.handle_comm_error() - elif self.will_close: + if self.adj.log_socket_errors: + self.logger.exception('Socket error') + self.will_close = True + if self.will_close: self.handle_close() - self.last_activity = time.time() def readable(self): - if self.task is not None or self.will_close: - return False - if self.inbuf: - self.received() - return not self.will_close + # We might want to create a new task. We can only do this if: + # 1. We're not already about to close the connection. + # 2. There's no already currently running task(s). + # 3. There's no data in the output buffer that needs to be sent + # before we potentially create a new task. + return not (self.will_close or self.requests or self.outbuf) def handle_read(self): try: data = self.recv(self.adj.recv_bytes) except socket.error: - self.handle_comm_error() + if self.adj.log_socket_errors: + self.logger.exception('Socket error') + self.handle_close() return - self.last_activity = time.time() if data: - self.inbuf.append(data) + self.last_activity = time.time() + self.received(data) + + def received(self, data): + """ + Receives input asynchronously and assigns a task to the channel. + """ + # Preconditions: there's no task(s) already running + request = self.request + requests = [] + + if not data: + return False + + while data: + if request is None: + request = self.parser_class(self.adj) + n = request.received(data) + if request.expect_continue and request.headers_finished: + # guaranteed by parser to be a 1.1 request + request.expect_continue = False + if not self.sent_continue: + # there's no current task, so we don't need to try to + # lock the outbuf to append to it. + self.outbuf.append(b'HTTP/1.1 100 Continue\r\n\r\n') + self.sent_expect_continue = True + self._flush_some() + request.completed = False + if request.completed: + # The request (with the body) is ready to use. + self.request = None + if not request.empty: + requests.append(request) + request = None + else: + self.request = request + if n >= len(data): + break + data = data[n:] + + if requests: + self.requests = requests + self.server.add_task(self) + + return True + + def _flush_some_if_lockable(self): + # Since our task may be appending to the outbuf, we try to acquire + # the lock, but we don't block if we can't. + outbuf = self.outbuf + locked = outbuf.lock.acquire(0) + if locked: + try: + self._flush_some() + finally: + outbuf.lock.release() + + def _flush_some(self): + # Send as much data as possible to our client + outbuf = self.outbuf + outbuflen = len(outbuf) + while outbuflen > 0: + chunk = outbuf.get(self.adj.send_bytes) + num_sent = self.send(chunk) + if num_sent: + outbuf.skip(num_sent, True) + outbuflen -= num_sent + else: + return False + self.last_activity = time.time() + return True + + def handle_close(self): + self.connected = False + asyncore.dispatcher.close(self) def add_channel(self, map=None): """See asyncore.dispatcher @@ -126,149 +231,62 @@ class HTTPChannel(logging_dispatcher, object): if fd in ac: del ac[fd] - def received(self): - """ - Receives input asynchronously and assigns a task to the channel. - """ - if self.task is not None: - return False - chunk = self.inbuf.get(self.adj.recv_bytes) - if not chunk: - return False - if self.request is None: - self.request = self.parser_class(self.adj) - request = self.request - n = request.received(chunk) - if n: - self.inbuf.skip(n, True) - if request.expect_continue and request.headers_finished: - # guaranteed by parser to be a 1.1 request - self.write(b'HTTP/1.1 100 Continue\r\n\r\n') - request.expect_continue = False - if request.completed: - # The request (with the body) is ready to use. - if request.connection_close and self.inbuf: - self.inbuf = OverflowableBuffer(self.adj.inbuf_overflow) - self.request = None - if not request.empty: - if request.error: - self.inbuf = OverflowableBuffer(self.adj.inbuf_overflow) - task = self.error_task_class(self, request) - else: - task = self.task_class(self, request) - self.task = task - self.server.add_task(self) - return - if self.inbuf: - self.server.pull_trigger() - - def handle_error(self, exc_info=None): # exc_info for tests - """See async.dispatcher - - Handles program errors (not communication errors) - """ - if exc_info is None: # pragma: no cover - t, v = sys.exc_info()[:2] - else: - t, v = exc_info[:2] - if t is SystemExit or t is KeyboardInterrupt: - raise t(v) - asyncore.dispatcher.handle_error(self) - - def handle_comm_error(self): - """ - Handles communication errors (not program errors) - """ - if self.adj.log_socket_errors: - # handle_error calls close - self.handle_error() - else: - # Ignore socket errors. - self.handle_close() - # - # METHODS USED IN BOTH MODES + # SYNCHRONOUS METHODS # - def handle_close(self): - # Always close in asynchronous mode. If the connection is - # closed in a thread, the main loop can end up with a bad file - # descriptor. - # XXX this method is probably called in a thread by virtue of - # "handle_comm_error" - if self.task is not None: - self.will_close = True - return - self.connected = False - asyncore.dispatcher.close(self) - - def write(self, data): - wrote = 0 + def write_soon(self, data): if data: - self.outbuf.append(data) - wrote = len(data) - - while len(self.outbuf) >= self.adj.send_bytes: - # Send what we can without blocking. - # We propagate errors to the application on purpose - # (to stop the application if the connection closes). - if not self._flush_some(): # pragma: no cover (coverage bug?) - break - - return wrote - - def _flush_some(self): - """Flushes data. - - Returns True if some data was sent.""" - outbuf = self.outbuf - if outbuf and self.connected: - chunk = outbuf.get(self.adj.send_bytes) - num_sent = self.send(chunk) - if num_sent: - outbuf.skip(num_sent, True) - return True - return False - - # - # ITask implementation. Delegates to the queued tasks. - # + # the async mainloop might be popping data off outbuf; we can + # block here waiting for it because we're in a thread + with self.outbuf.lock: + self.outbuf.append(data) + return len(data) + return 0 def service(self): """Execute a pending task""" - if self.task is None: - return - task = self.task - try: - task.service() - except: - self.logger.exception('Exception when serving %s' % - task.request.uri) - if not task.wrote_header: - if self.adj.expose_tracebacks: - body = traceback.format_exc() + with self.task_lock: + while self.requests: + request = self.requests[0] + if request.error: + task = self.error_task_class(self, request) else: - body = ('The server encountered an unexpected internal ' - 'server error') - request = self.parser_class(self.adj) - request.error = InternalServerError(body) - task = self.error_task_class(self, request) - task.service() # must not fail - else: - task.close_on_finish = True - while self._flush_some(): - pass - self.task = None - if task.close_on_finish: - self.will_close = True + task = self.task_class(self, request) + try: + task.service() + except: + self.logger.exception('Exception when serving %s' % + task.request.uri) + if not task.wrote_header: + if self.adj.expose_tracebacks: + body = traceback.format_exc() + else: + body = ('The server encountered an unexpected ' + 'internal server error') + request = self.parser_class(self.adj) + request.error = InternalServerError(body) + task = self.error_task_class(self, request) + task.service() # must not fail + else: + task.close_on_finish = True + # we cannot allow self.requests to drop to empty til + # here; otherwise the mainloop gets confused + if task.close_on_finish: + self.will_close = True + self.requests = [] + else: + self.requests.pop(0) + + self.force_flush = True self.server.pull_trigger() self.last_activity = time.time() def cancel(self): - """Cancels all pending tasks""" - if self.task is not None: - self.task.cancel() - self.task = None + """ Cancels all pending requests """ + self.force_flush = True + self.last_activity = time.time() + self.requests = [] def defer(self): pass |