summaryrefslogtreecommitdiff
path: root/waitress/channel.py
blob: b09733b401cb986a126711c406b5e77d5d0532d1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Dual-mode channel
"""
import asyncore
import socket
import sys
import time

from waitress.compat import thread
from waitress.adjustments import Adjustments
from waitress.buffers import OverflowableBuffer
from waitress.parser import HTTPRequestParser
from waitress.task import HTTPTask

class HTTPServerChannel(asyncore.dispatcher, object):
    """Channel that switches between asynchronous and synchronous mode.

    Call set_sync() before using a channel in a thread other than
    the thread handling the main loop.

    Call set_async() to give the channel back to the thread handling
    the main loop.
    """
    task_class = HTTPTask
    parser_class = HTTPRequestParser

    task_lock = thread.allocate_lock() # syncs access to task-related attrs

    active_channels = {}        # Class-specific channel tracker
    next_channel_cleanup = [0]  # Class-specific cleanup time
    proto_request = None        # A request parser instance
    last_activity = 0           # Time of last activity
    tasks = None                # List of channel-related tasks to execute
    running_tasks = False       # True when another thread is running tasks
    will_close = False          # will_close is set to True to close the socket.
    async_mode = True           # boolean: async or sync mode

    #
    # ASYNCHRONOUS METHODS (including __init__)
    #

    def __init__(
            self,
            server,
            sock,
            addr,
            adj=None,
            map=None,
            ):
        self.addr = addr
        if adj is None:
            adj = Adjustments()
        self.adj = adj
        self.outbuf = OverflowableBuffer(adj.outbuf_overflow)
        self.creation_time = time.time()
        asyncore.dispatcher.__init__(self, sock, map=map)
        self.server = server
        self.last_activity = t = self.creation_time
        self.check_maintenance(t)

    def handle_close(self):
        self.close()

    def writable(self):
        if not self.async_mode:
            return False
        return self.will_close or self.outbuf

    def handle_write(self):
        if not self.async_mode:
            return
        if self.outbuf:
            try:
                self._flush_some()
            except socket.error:
                self.handle_comm_error()
        elif self.will_close:
            self.close()
        self.last_activity = time.time()

    def readable(self):
        self.check_maintenance(time.time())
        if not self.async_mode:
            return False
        return not self.will_close

    def handle_read(self):
        if not self.async_mode or self.will_close:
            return
        try:
            data = self.recv(self.adj.recv_bytes)
        except socket.error:
            self.handle_comm_error()
            return
        self.last_activity = time.time()
        self.received(data)

    def set_sync(self):
        """Switches to synchronous mode.

        The main thread will stop calling received().
        """
        self.async_mode = False

    def add_channel(self, map=None):
        """See asyncore.dispatcher

        This hook keeps track of opened channels.
        """
        asyncore.dispatcher.add_channel(self, map)
        self.__class__.active_channels[self._fileno] = self

    def del_channel(self, map=None):
        """See asyncore.dispatcher

        This hook keeps track of closed channels.
        """
        fd = self._fileno # next line sets this to None
        asyncore.dispatcher.del_channel(self, map)
        ac = self.__class__.active_channels
        if fd in ac:
            del ac[fd]

    def check_maintenance(self, now):
        """
        Performs maintenance if necessary.
        """
        ncc = self.__class__.next_channel_cleanup
        if now < ncc[0]:
            return False
        ncc[0] = now + self.adj.cleanup_interval
        return self.maintenance()

    def maintenance(self):
        """
        Closes connections that have not had any activity in a while.

        The timeout is configured through adj.channel_timeout (seconds).

        CM: the maintenance method never closes the channel upon which it is
        called, it closes any other channel that is a) not running a task and
        b) has exceeded its inactivity timeout.  Since channel maintenance is
        only done at channel creation time, this means that an inactive
        channel will not be closed until at least one other channel is
        created.
        """
        now = time.time()
        cutoff = now - self.adj.channel_timeout
        for channel in self.__class__.active_channels.values():
            if (channel is not self and not channel.running_tasks and
                channel.last_activity < cutoff):
                channel.close()

    def received(self, data):
        """
        Receives input asynchronously and send requests to
        handle_request().
        """
        preq = self.proto_request
        while data:
            if preq is None:
                preq = self.parser_class(self.adj)
            n = preq.received(data)
            if preq.expect_continue and preq.headers_finished:
                # guaranteed by parser to be a 1.1 request
                self.write(b'HTTP/1.1 100 Continue\r\n\r\n')
                preq.expect_continue = False
            if preq.completed:
                # The request (with the body) is ready to use.
                self.proto_request = None
                if not preq.empty:
                    self.handle_request(preq)
                preq = None
            else:
                self.proto_request = preq
            if n >= len(data):
                break
            data = data[n:]

    def handle_request(self, req):
        """Creates and queues a task for processing a request.

        Subclasses may override this method to handle some requests
        immediately in the main async thread.
        """
        task = self.task_class(self, req)
        self.queue_task(task)

    def handle_error(self, exc_info=None): # exc_info for tests
        """See async.dispatcher

        Handles program errors (not communication errors)
        """
        if exc_info is None: # pragma: no cover
            t, v = sys.exc_info()[:2]
        else:
            t, v = exc_info[:2]
        if t is SystemExit or t is KeyboardInterrupt:
            raise t(v)
        asyncore.dispatcher.handle_error(self)

    def handle_comm_error(self):
        """
        Handles communication errors (not program errors)
        """
        if self.adj.log_socket_errors:
            # handle_error calls close
            self.handle_error()
        else:
            # Ignore socket errors.
            self.close()


    #
    # SYNCHRONOUS METHODS
    #

    def set_async(self):
        """Switches to asynchronous mode.

        The main thread will begin calling received() again.
        """
        self.async_mode = True
        self.server.pull_trigger()
        self.last_activity = time.time()

    #
    # METHODS USED IN BOTH MODES
    #

    def write(self, data):
        wrote = 0
        if isinstance(data, bytes):
            if data:
                self.outbuf.append(data)
                wrote = len(data)
        else:
            for v in data:
                if v:
                    self.outbuf.append(v)
                    wrote += len(v)

        while len(self.outbuf) >= self.adj.send_bytes:
            # Send what we can without blocking.
            # We propagate errors to the application on purpose
            # (to stop the application if the connection closes).
            if not self._flush_some(): # pragma: no cover (coverage bug?)
                break

        return wrote

    def _flush_some(self):
        """Flushes data.

        Returns 1 if some data was sent."""
        outbuf = self.outbuf
        if outbuf and self.connected:
            chunk = outbuf.get(self.adj.send_bytes)
            num_sent = self.send(chunk)
            if num_sent:
                outbuf.skip(num_sent, 1)
                return True
        return False

    def close_when_done(self):
        # Flush all possible.
        while self._flush_some():
            pass
        self.will_close = True
        if not self.async_mode:
            # For safety, don't close the socket until the
            # main thread calls handle_write().
            self.async_mode = True
            self.server.pull_trigger()

    def close(self):
        # Always close in asynchronous mode.  If the connection is
        # closed in a thread, the main loop can end up with a bad file
        # descriptor.
        assert self.async_mode
        self.connected = False
        asyncore.dispatcher.close(self)

    def queue_task(self, task):
        """Queue a channel-related task to be executed in another thread."""
        start = False
        self.task_lock.acquire()
        try:
            if self.tasks is None:
                self.tasks = []
            self.tasks.append(task)
            if not self.running_tasks:
                self.running_tasks = True
                start = True
        finally:
            self.task_lock.release()
        if start:
            self.set_sync()
            self.server.add_task(self)

    #
    # ITask implementation.  Delegates to the queued tasks.
    #

    def service(self):
        """Execute all pending tasks"""
        while True:
            task = None
            self.task_lock.acquire()
            try:
                if self.tasks:
                    task = self.tasks.pop(0)
                else:
                    # No more tasks
                    self.running_tasks = False
                    self.set_async()
                    break
            finally:
                self.task_lock.release()
            try:
                task.service()
            except:
                # propagate the exception, but keep executing tasks
                self.server.add_task(self)
                raise

    def cancel(self):
        """Cancels all pending tasks"""
        self.task_lock.acquire()
        try:
            if self.tasks:
                old = self.tasks[:]
            else:
                old = []
            self.tasks = []
            self.running_tasks = False
        finally:
            self.task_lock.release()
        try:
            for task in old:
                task.cancel()
        finally:
            self.set_async()

    def defer(self):
        pass