1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
|
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Dual-mode channel
"""
import asyncore
import socket
import sys
import time
from waitress.compat import thread
from waitress.adjustments import Adjustments
from waitress.buffers import OverflowableBuffer
from waitress.parser import HTTPRequestParser
from waitress.task import HTTPTask
class HTTPServerChannel(asyncore.dispatcher, object):
"""Channel that switches between asynchronous and synchronous mode.
Call set_sync() before using a channel in a thread other than
the thread handling the main loop.
Call set_async() to give the channel back to the thread handling
the main loop.
"""
task_class = HTTPTask
parser_class = HTTPRequestParser
task_lock = thread.allocate_lock() # syncs access to task-related attrs
active_channels = {} # Class-specific channel tracker
next_channel_cleanup = [0] # Class-specific cleanup time
proto_request = None # A request parser instance
last_activity = 0 # Time of last activity
tasks = None # List of channel-related tasks to execute
running_tasks = False # True when another thread is running tasks
will_close = False # will_close is set to True to close the socket.
async_mode = True # boolean: async or sync mode
#
# ASYNCHRONOUS METHODS (including __init__)
#
def __init__(
self,
server,
sock,
addr,
adj=None,
map=None,
):
self.addr = addr
if adj is None:
adj = Adjustments()
self.adj = adj
self.outbuf = OverflowableBuffer(adj.outbuf_overflow)
self.creation_time = time.time()
asyncore.dispatcher.__init__(self, sock, map=map)
self.server = server
self.last_activity = t = self.creation_time
self.check_maintenance(t)
def handle_close(self):
self.close()
def writable(self):
if not self.async_mode:
return False
return self.will_close or self.outbuf
def handle_write(self):
if not self.async_mode:
return
if self.outbuf:
try:
self._flush_some()
except socket.error:
self.handle_comm_error()
elif self.will_close:
self.close()
self.last_activity = time.time()
def readable(self):
self.check_maintenance(time.time())
if not self.async_mode:
return False
return not self.will_close
def handle_read(self):
if not self.async_mode or self.will_close:
return
try:
data = self.recv(self.adj.recv_bytes)
except socket.error:
self.handle_comm_error()
return
self.last_activity = time.time()
self.received(data)
def set_sync(self):
"""Switches to synchronous mode.
The main thread will stop calling received().
"""
self.async_mode = False
def add_channel(self, map=None):
"""See asyncore.dispatcher
This hook keeps track of opened channels.
"""
asyncore.dispatcher.add_channel(self, map)
self.__class__.active_channels[self._fileno] = self
def del_channel(self, map=None):
"""See asyncore.dispatcher
This hook keeps track of closed channels.
"""
fd = self._fileno # next line sets this to None
asyncore.dispatcher.del_channel(self, map)
ac = self.__class__.active_channels
if fd in ac:
del ac[fd]
def check_maintenance(self, now):
"""
Performs maintenance if necessary.
"""
ncc = self.__class__.next_channel_cleanup
if now < ncc[0]:
return False
ncc[0] = now + self.adj.cleanup_interval
return self.maintenance()
def maintenance(self):
"""
Closes connections that have not had any activity in a while.
The timeout is configured through adj.channel_timeout (seconds).
CM: the maintenance method never closes the channel upon which it is
called, it closes any other channel that is a) not running a task and
b) has exceeded its inactivity timeout. Since channel maintenance is
only done at channel creation time, this means that an inactive
channel will not be closed until at least one other channel is
created.
"""
now = time.time()
cutoff = now - self.adj.channel_timeout
for channel in self.__class__.active_channels.values():
if (channel is not self and not channel.running_tasks and
channel.last_activity < cutoff):
channel.close()
def received(self, data):
"""
Receives input asynchronously and send requests to
handle_request().
"""
preq = self.proto_request
while data:
if preq is None:
preq = self.parser_class(self.adj)
n = preq.received(data)
if preq.expect_continue and preq.headers_finished:
# guaranteed by parser to be a 1.1 request
self.write(b'HTTP/1.1 100 Continue\r\n\r\n')
preq.expect_continue = False
if preq.completed:
# The request (with the body) is ready to use.
self.proto_request = None
if not preq.empty:
self.handle_request(preq)
preq = None
else:
self.proto_request = preq
if n >= len(data):
break
data = data[n:]
def handle_request(self, req):
"""Creates and queues a task for processing a request.
Subclasses may override this method to handle some requests
immediately in the main async thread.
"""
task = self.task_class(self, req)
self.queue_task(task)
def handle_error(self, exc_info=None): # exc_info for tests
"""See async.dispatcher
Handles program errors (not communication errors)
"""
if exc_info is None: # pragma: no cover
t, v = sys.exc_info()[:2]
else:
t, v = exc_info[:2]
if t is SystemExit or t is KeyboardInterrupt:
raise t(v)
asyncore.dispatcher.handle_error(self)
def handle_comm_error(self):
"""
Handles communication errors (not program errors)
"""
if self.adj.log_socket_errors:
# handle_error calls close
self.handle_error()
else:
# Ignore socket errors.
self.close()
#
# SYNCHRONOUS METHODS
#
def set_async(self):
"""Switches to asynchronous mode.
The main thread will begin calling received() again.
"""
self.async_mode = True
self.server.pull_trigger()
self.last_activity = time.time()
#
# METHODS USED IN BOTH MODES
#
def write(self, data):
wrote = 0
if isinstance(data, bytes):
if data:
self.outbuf.append(data)
wrote = len(data)
else:
for v in data:
if v:
self.outbuf.append(v)
wrote += len(v)
while len(self.outbuf) >= self.adj.send_bytes:
# Send what we can without blocking.
# We propagate errors to the application on purpose
# (to stop the application if the connection closes).
if not self._flush_some(): # pragma: no cover (coverage bug?)
break
return wrote
def _flush_some(self):
"""Flushes data.
Returns 1 if some data was sent."""
outbuf = self.outbuf
if outbuf and self.connected:
chunk = outbuf.get(self.adj.send_bytes)
num_sent = self.send(chunk)
if num_sent:
outbuf.skip(num_sent, 1)
return True
return False
def close_when_done(self):
# Flush all possible.
while self._flush_some():
pass
self.will_close = True
if not self.async_mode:
# For safety, don't close the socket until the
# main thread calls handle_write().
self.async_mode = True
self.server.pull_trigger()
def close(self):
# Always close in asynchronous mode. If the connection is
# closed in a thread, the main loop can end up with a bad file
# descriptor.
assert self.async_mode
self.connected = False
asyncore.dispatcher.close(self)
def queue_task(self, task):
"""Queue a channel-related task to be executed in another thread."""
start = False
self.task_lock.acquire()
try:
if self.tasks is None:
self.tasks = []
self.tasks.append(task)
if not self.running_tasks:
self.running_tasks = True
start = True
finally:
self.task_lock.release()
if start:
self.set_sync()
self.server.add_task(self)
#
# ITask implementation. Delegates to the queued tasks.
#
def service(self):
"""Execute all pending tasks"""
while True:
task = None
self.task_lock.acquire()
try:
if self.tasks:
task = self.tasks.pop(0)
else:
# No more tasks
self.running_tasks = False
self.set_async()
break
finally:
self.task_lock.release()
try:
task.service()
except:
# propagate the exception, but keep executing tasks
self.server.add_task(self)
raise
def cancel(self):
"""Cancels all pending tasks"""
self.task_lock.acquire()
try:
if self.tasks:
old = self.tasks[:]
else:
old = []
self.tasks = []
self.running_tasks = False
finally:
self.task_lock.release()
try:
for task in old:
task.cancel()
finally:
self.set_async()
def defer(self):
pass
|