summaryrefslogtreecommitdiff
path: root/waitress/task.py
diff options
context:
space:
mode:
Diffstat (limited to 'waitress/task.py')
-rw-r--r--waitress/task.py61
1 files changed, 37 insertions, 24 deletions
diff --git a/waitress/task.py b/waitress/task.py
index dc283ee..f0372aa 100644
--- a/waitress/task.py
+++ b/waitress/task.py
@@ -12,13 +12,14 @@
#
##############################################################################
+from collections import deque
import socket
import sys
import threading
import time
from .buffers import ReadOnlyFileBasedBuffer
-from .compat import Empty, Queue, reraise, tobytes
+from .compat import reraise, tobytes
from .utilities import (
Forwarded,
PROXY_HEADERS,
@@ -49,6 +50,9 @@ hop_by_hop = frozenset((
class JustTesting(Exception):
pass
+ThreadIdle = 1
+ThreadBusy = 2
+
class ThreadedTaskDispatcher(object):
"""A Task Dispatcher that creates a thread for each task.
"""
@@ -57,9 +61,9 @@ class ThreadedTaskDispatcher(object):
queue_logger = queue_logger
def __init__(self):
- self.threads = {} # { thread number -> 1 }
- self.queue = Queue()
- self.thread_mgmt_lock = threading.Lock()
+ self.threads = {} # { thread number -> ThreadIdle or ThreadBusy }
+ self.queue = deque()
+ self.queue_lock = threading.Condition(threading.Lock())
def start_new_thread(self, target, args):
t = threading.Thread(target=target, name='waitress', args=args)
@@ -69,11 +73,18 @@ class ThreadedTaskDispatcher(object):
def handler_thread(self, thread_no):
threads = self.threads
try:
- while threads.get(thread_no):
- task = self.queue.get()
- if task is None:
- # Special value: kill this thread.
- break
+ while True:
+ with self.queue_lock:
+ while not self.queue and threads.get(thread_no):
+ threads[thread_no] = ThreadIdle
+ self.queue_lock.wait()
+ if not threads.get(thread_no):
+ break
+ task = self.queue.popleft()
+ if task is None:
+ # Special value: kill this thread.
+ break
+ threads[thread_no] = ThreadBusy
try:
task.service()
except Exception as e:
@@ -82,12 +93,12 @@ class ThreadedTaskDispatcher(object):
if isinstance(e, JustTesting):
break
finally:
- with self.thread_mgmt_lock:
+ with self.queue_lock:
self.stop_count -= 1
threads.pop(thread_no, None)
def set_thread_count(self, count):
- with self.thread_mgmt_lock:
+ with self.queue_lock:
threads = self.threads
thread_no = 0
running = len(threads) - self.stop_count
@@ -95,7 +106,7 @@ class ThreadedTaskDispatcher(object):
# Start threads.
while thread_no in threads:
thread_no = thread_no + 1
- threads[thread_no] = 1
+ threads[thread_no] = ThreadIdle
running += 1
self.start_new_thread(self.handler_thread, (thread_no,))
thread_no = thread_no + 1
@@ -104,21 +115,23 @@ class ThreadedTaskDispatcher(object):
to_stop = running - count
self.stop_count += to_stop
for n in range(to_stop):
- self.queue.put(None)
+ self.queue.append(None)
running -= 1
+ self.queue_lock.notify(to_stop)
def add_task(self, task):
- queue_depth = self.queue.qsize()
- if queue_depth > 0:
- self.queue_logger.warning(
- "Task queue depth is %d" %
- queue_depth)
try:
task.defer()
- self.queue.put(task)
except:
task.cancel()
raise
+ with self.queue_lock:
+ self.queue.append(task)
+ self.queue_lock.notify()
+ if not any(x == ThreadIdle for x in self.threads.values()):
+ self.queue_logger.warning(
+ "Task queue depth is %d" %
+ len(self.queue))
def shutdown(self, cancel_pending=True, timeout=5):
self.set_thread_count(0)
@@ -134,14 +147,14 @@ class ThreadedTaskDispatcher(object):
time.sleep(0.1)
if cancel_pending:
# Cancel remaining tasks.
- try:
+ with self.queue_lock:
queue = self.queue
- while not queue.empty():
- task = queue.get()
+ while queue:
+ task = queue.popleft()
if task is not None:
task.cancel()
- except Empty: # pragma: no cover
- pass
+ threads.clear()
+ self.queue_lock.notify_all()
return True
return False