diff options
Diffstat (limited to 'waitress/task.py')
-rw-r--r-- | waitress/task.py | 61 |
1 files changed, 37 insertions, 24 deletions
diff --git a/waitress/task.py b/waitress/task.py index dc283ee..f0372aa 100644 --- a/waitress/task.py +++ b/waitress/task.py @@ -12,13 +12,14 @@ # ############################################################################## +from collections import deque import socket import sys import threading import time from .buffers import ReadOnlyFileBasedBuffer -from .compat import Empty, Queue, reraise, tobytes +from .compat import reraise, tobytes from .utilities import ( Forwarded, PROXY_HEADERS, @@ -49,6 +50,9 @@ hop_by_hop = frozenset(( class JustTesting(Exception): pass +ThreadIdle = 1 +ThreadBusy = 2 + class ThreadedTaskDispatcher(object): """A Task Dispatcher that creates a thread for each task. """ @@ -57,9 +61,9 @@ class ThreadedTaskDispatcher(object): queue_logger = queue_logger def __init__(self): - self.threads = {} # { thread number -> 1 } - self.queue = Queue() - self.thread_mgmt_lock = threading.Lock() + self.threads = {} # { thread number -> ThreadIdle or ThreadBusy } + self.queue = deque() + self.queue_lock = threading.Condition(threading.Lock()) def start_new_thread(self, target, args): t = threading.Thread(target=target, name='waitress', args=args) @@ -69,11 +73,18 @@ class ThreadedTaskDispatcher(object): def handler_thread(self, thread_no): threads = self.threads try: - while threads.get(thread_no): - task = self.queue.get() - if task is None: - # Special value: kill this thread. - break + while True: + with self.queue_lock: + while not self.queue and threads.get(thread_no): + threads[thread_no] = ThreadIdle + self.queue_lock.wait() + if not threads.get(thread_no): + break + task = self.queue.popleft() + if task is None: + # Special value: kill this thread. + break + threads[thread_no] = ThreadBusy try: task.service() except Exception as e: @@ -82,12 +93,12 @@ class ThreadedTaskDispatcher(object): if isinstance(e, JustTesting): break finally: - with self.thread_mgmt_lock: + with self.queue_lock: self.stop_count -= 1 threads.pop(thread_no, None) def set_thread_count(self, count): - with self.thread_mgmt_lock: + with self.queue_lock: threads = self.threads thread_no = 0 running = len(threads) - self.stop_count @@ -95,7 +106,7 @@ class ThreadedTaskDispatcher(object): # Start threads. while thread_no in threads: thread_no = thread_no + 1 - threads[thread_no] = 1 + threads[thread_no] = ThreadIdle running += 1 self.start_new_thread(self.handler_thread, (thread_no,)) thread_no = thread_no + 1 @@ -104,21 +115,23 @@ class ThreadedTaskDispatcher(object): to_stop = running - count self.stop_count += to_stop for n in range(to_stop): - self.queue.put(None) + self.queue.append(None) running -= 1 + self.queue_lock.notify(to_stop) def add_task(self, task): - queue_depth = self.queue.qsize() - if queue_depth > 0: - self.queue_logger.warning( - "Task queue depth is %d" % - queue_depth) try: task.defer() - self.queue.put(task) except: task.cancel() raise + with self.queue_lock: + self.queue.append(task) + self.queue_lock.notify() + if not any(x == ThreadIdle for x in self.threads.values()): + self.queue_logger.warning( + "Task queue depth is %d" % + len(self.queue)) def shutdown(self, cancel_pending=True, timeout=5): self.set_thread_count(0) @@ -134,14 +147,14 @@ class ThreadedTaskDispatcher(object): time.sleep(0.1) if cancel_pending: # Cancel remaining tasks. - try: + with self.queue_lock: queue = self.queue - while not queue.empty(): - task = queue.get() + while queue: + task = queue.popleft() if task is not None: task.cancel() - except Empty: # pragma: no cover - pass + threads.clear() + self.queue_lock.notify_all() return True return False |