diff options
author | Bert JW Regeer <xistence@0x58.com> | 2020-10-31 00:03:50 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-31 00:03:50 -0700 |
commit | 31d7498c84cf0041f37beb503fd0ddf78d9d41e2 (patch) | |
tree | c2ce19d7fef77c748fb7246c28a951f7829c6a9e /src | |
parent | 5faf6989f4ff019940f41e2ee4855615f41afcc5 (diff) | |
parent | a514a47e55f0a663477831d083f608a7d2db035b (diff) | |
download | waitress-31d7498c84cf0041f37beb503fd0ddf78d9d41e2.tar.gz |
Merge pull request #310 from perfact/notify-client-close
Notify client close
Diffstat (limited to 'src')
-rw-r--r-- | src/waitress/adjustments.py | 8 | ||||
-rw-r--r-- | src/waitress/channel.py | 277 | ||||
-rw-r--r-- | src/waitress/runner.py | 6 | ||||
-rw-r--r-- | src/waitress/task.py | 5 |
4 files changed, 175 insertions, 121 deletions
diff --git a/src/waitress/adjustments.py b/src/waitress/adjustments.py index 45ac41b..42d2bc0 100644 --- a/src/waitress/adjustments.py +++ b/src/waitress/adjustments.py @@ -135,6 +135,7 @@ class Adjustments: ("unix_socket", str), ("unix_socket_perms", asoctal), ("sockets", as_socket_list), + ("channel_request_lookahead", int), ) _param_map = dict(_params) @@ -280,6 +281,13 @@ class Adjustments: # be used for e.g. socket activation sockets = [] + # By setting this to a value larger than zero, each channel stays readable + # and continues to read requests from the client even if a request is still + # running, until the number of buffered requests exceeds this value. + # This allows detecting if a client closed the connection while its request + # is being processed. + channel_request_lookahead = 0 + def __init__(self, **kw): if "listen" in kw and ("host" in kw or "port" in kw): diff --git a/src/waitress/channel.py b/src/waitress/channel.py index 65bc87f..296a16a 100644 --- a/src/waitress/channel.py +++ b/src/waitress/channel.py @@ -40,11 +40,11 @@ class HTTPChannel(wasyncore.dispatcher): error_task_class = ErrorTask parser_class = HTTPRequestParser - request = None # A request parser instance + # A request that has not been received yet completely is stored here + request = None last_activity = 0 # Time of last activity will_close = False # set to True to close the socket. close_when_flushed = False # set to True to close the socket when flushed - requests = () # currently pending requests sent_continue = False # used as a latch after sending 100 continue total_outbufs_len = 0 # total bytes ready to send current_outbuf_count = 0 # total bytes written to current outbuf @@ -60,8 +60,9 @@ class HTTPChannel(wasyncore.dispatcher): self.creation_time = self.last_activity = time.time() self.sendbuf_len = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF) - # task_lock used to push/pop requests - self.task_lock = threading.Lock() + # requests_lock used to push/pop requests and modify the request that is + # currently being created + self.requests_lock = threading.Lock() # outbuf_lock used to access any outbuf (expected to use an RLock) self.outbuf_lock = threading.Condition() @@ -69,6 +70,15 @@ class HTTPChannel(wasyncore.dispatcher): # Don't let wasyncore.dispatcher throttle self.addr on us. self.addr = addr + self.requests = [] + + def check_client_disconnected(self): + """ + This method is inserted into the environment of any created task so it + may occasionally check if the client has disconnected and interrupt + execution. + """ + return not self.connected def writable(self): # if there's data in the out buffer or we've been instructed to close @@ -125,18 +135,18 @@ class HTTPChannel(wasyncore.dispatcher): self.handle_close() def readable(self): - # We might want to create a new task. We can only do this if: + # We might want to read more requests. We can only do this if: # 1. We're not already about to close the connection. # 2. We're not waiting to flush remaining data before closing the # connection - # 3. There's no already currently running task(s). + # 3. There are not too many tasks already queued # 4. There's no data in the output buffer that needs to be sent # before we potentially create a new task. return not ( self.will_close or self.close_when_flushed - or self.requests + or len(self.requests) > self.adj.channel_request_lookahead or self.total_outbufs_len ) @@ -153,57 +163,69 @@ class HTTPChannel(wasyncore.dispatcher): if data: self.last_activity = time.time() self.received(data) + else: + # Client disconnected. + self.connected = False + + def send_continue(self): + """ + Send a 100-Continue header to the client. This is either called from + receive (if no requests are running and the client expects it) or at + the end of service (if no more requests are queued and a request has + been read partially that expects it). + """ + self.request.expect_continue = False + outbuf_payload = b"HTTP/1.1 100 Continue\r\n\r\n" + num_bytes = len(outbuf_payload) + with self.outbuf_lock: + self.outbufs[-1].append(outbuf_payload) + self.current_outbuf_count += num_bytes + self.total_outbufs_len += num_bytes + self.sent_continue = True + self._flush_some() + self.request.completed = False def received(self, data): """ Receives input asynchronously and assigns one or more requests to the channel. """ - # Preconditions: there's no task(s) already running - request = self.request - requests = [] - if not data: return False - while data: - if request is None: - request = self.parser_class(self.adj) - n = request.received(data) - - if request.expect_continue and request.headers_finished: - # guaranteed by parser to be a 1.1 request - request.expect_continue = False - - if not self.sent_continue: - # there's no current task, so we don't need to try to - # lock the outbuf to append to it. - outbuf_payload = b"HTTP/1.1 100 Continue\r\n\r\n" - num_bytes = len(outbuf_payload) - self.outbufs[-1].append(outbuf_payload) - self.current_outbuf_count += num_bytes - self.total_outbufs_len += num_bytes - self.sent_continue = True - self._flush_some() - request.completed = False - - if request.completed: - # The request (with the body) is ready to use. - self.request = None - - if not request.empty: - requests.append(request) - request = None - else: - self.request = request - - if n >= len(data): - break - data = data[n:] - - if requests: - self.requests = requests - self.server.add_task(self) + with self.requests_lock: + while data: + if self.request is None: + self.request = self.parser_class(self.adj) + n = self.request.received(data) + + # if there are requests queued, we can not send the continue + # header yet since the responses need to be kept in order + if ( + self.request.expect_continue + and self.request.headers_finished + and not self.requests + and not self.sent_continue + ): + self.send_continue() + + if self.request.completed: + # The request (with the body) is ready to use. + self.sent_continue = False + + if not self.request.empty: + self.requests.append(self.request) + if len(self.requests) == 1: + # self.requests was empty before so the main thread + # is in charge of starting the task. Otherwise, + # service() will add a new task after each request + # has been processed + self.server.add_task(self) + self.request = None + + if n >= len(data): + break + data = data[n:] return True @@ -360,88 +382,101 @@ class HTTPChannel(wasyncore.dispatcher): self.outbuf_lock.wait() def service(self): - """Execute all pending requests """ - with self.task_lock: - while self.requests: - request = self.requests[0] + """Execute one request. If there are more, we add another task to the + server at the end.""" + + request = self.requests[0] + + if request.error: + task = self.error_task_class(self, request) + else: + task = self.task_class(self, request) - if request.error: - task = self.error_task_class(self, request) + try: + if self.connected: + task.service() + else: + task.close_on_finish = True + except ClientDisconnected: + self.logger.info("Client disconnected while serving %s" % task.request.path) + task.close_on_finish = True + except Exception: + self.logger.exception("Exception while serving %s" % task.request.path) + + if not task.wrote_header: + if self.adj.expose_tracebacks: + body = traceback.format_exc() else: - task = self.task_class(self, request) + body = "The server encountered an unexpected internal server error" + req_version = request.version + req_headers = request.headers + err_request = self.parser_class(self.adj) + err_request.error = InternalServerError(body) + # copy some original request attributes to fulfill + # HTTP 1.1 requirements + err_request.version = req_version try: - task.service() + err_request.headers["CONNECTION"] = req_headers["CONNECTION"] + except KeyError: + pass + task = self.error_task_class(self, err_request) + try: + task.service() # must not fail except ClientDisconnected: - self.logger.info( - "Client disconnected while serving %s" % task.request.path - ) task.close_on_finish = True - except Exception: - self.logger.exception( - "Exception while serving %s" % task.request.path - ) + else: + task.close_on_finish = True - if not task.wrote_header: - if self.adj.expose_tracebacks: - body = traceback.format_exc() - else: - body = ( - "The server encountered an unexpected " - "internal server error" - ) - req_version = request.version - req_headers = request.headers - request = self.parser_class(self.adj) - request.error = InternalServerError(body) - # copy some original request attributes to fulfill - # HTTP 1.1 requirements - request.version = req_version - try: - request.headers["CONNECTION"] = req_headers["CONNECTION"] - except KeyError: - pass - task = self.error_task_class(self, request) - try: - task.service() # must not fail - except ClientDisconnected: - task.close_on_finish = True - else: - task.close_on_finish = True - # we cannot allow self.requests to drop to empty til - # here; otherwise the mainloop gets confused - - if task.close_on_finish: - self.close_when_flushed = True - - for request in self.requests: - request.close() - self.requests = [] - else: - # before processing a new request, ensure there is not too - # much data in the outbufs waiting to be flushed - # NB: currently readable() returns False while we are - # flushing data so we know no new requests will come in - # that we need to account for, otherwise it'd be better - # to do this check at the start of the request instead of - # at the end to account for consecutive service() calls - - if len(self.requests) > 1: - self._flush_outbufs_below_high_watermark() - - # this is a little hacky but basically it's forcing the - # next request to create a new outbuf to avoid sharing - # outbufs across requests which can cause outbufs to - # not be deallocated regularly when a connection is open - # for a long time - - if self.current_outbuf_count > 0: - self.current_outbuf_count = self.adj.outbuf_high_watermark - - request = self.requests.pop(0) + if task.close_on_finish: + with self.requests_lock: + self.close_when_flushed = True + + for request in self.requests: request.close() + self.requests = [] + else: + # before processing a new request, ensure there is not too + # much data in the outbufs waiting to be flushed + # NB: currently readable() returns False while we are + # flushing data so we know no new requests will come in + # that we need to account for, otherwise it'd be better + # to do this check at the start of the request instead of + # at the end to account for consecutive service() calls + + if len(self.requests) > 1: + self._flush_outbufs_below_high_watermark() + + # this is a little hacky but basically it's forcing the + # next request to create a new outbuf to avoid sharing + # outbufs across requests which can cause outbufs to + # not be deallocated regularly when a connection is open + # for a long time + + if self.current_outbuf_count > 0: + self.current_outbuf_count = self.adj.outbuf_high_watermark + + request.close() + + # Add new task to process the next request + with self.requests_lock: + self.requests.pop(0) + if self.connected and self.requests: + self.server.add_task(self) + elif ( + self.connected + and self.request is not None + and self.request.expect_continue + and self.request.headers_finished + and not self.sent_continue + ): + # A request waits for a signal to continue, but we could + # not send it until now because requests were being + # processed and the output needs to be kept in order + self.send_continue() if self.connected: self.server.pull_trigger() + self.last_activity = time.time() def cancel(self): diff --git a/src/waitress/runner.py b/src/waitress/runner.py index 4fb3e6b..c23ca0e 100644 --- a/src/waitress/runner.py +++ b/src/waitress/runner.py @@ -169,6 +169,12 @@ Tuning options: The use_poll argument passed to ``asyncore.loop()``. Helps overcome open file descriptors limit. Default is False. + --channel-request-lookahead=INT + Allows channels to stay readable and buffer more requests up to the + given maximum even if a request is already being processed. This allows + detecting if a client closed the connection while its request is being + processed. Default is 0. + """ RUNNER_PATTERN = re.compile( diff --git a/src/waitress/task.py b/src/waitress/task.py index 3a7cf17..2ac8f4c 100644 --- a/src/waitress/task.py +++ b/src/waitress/task.py @@ -560,6 +560,11 @@ class WSGITask(Task): if mykey not in environ: environ[mykey] = value + # Insert a callable into the environment that allows the application to + # check if the client disconnected. Only works with + # channel_request_lookahead larger than 0. + environ["waitress.client_disconnected"] = self.channel.check_client_disconnected + # cache the environ for this request self.environ = environ return environ |