From 34af957c1472aca30650f75698da467ca037d8f1 Mon Sep 17 00:00:00 2001 From: Michael Merickel Date: Tue, 2 Apr 2019 20:57:27 -0500 Subject: adjust queue depth warnings to emit when all threads are busy --- CHANGES.txt | 3 +++ waitress/compat.py | 11 -------- waitress/task.py | 61 +++++++++++++++++++++++++++------------------ waitress/tests/test_task.py | 23 ++++++++++------- 4 files changed, 54 insertions(+), 44 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 4c583e2..cbba247 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -16,6 +16,9 @@ Bugfixes causing them to reopen. See https://github.com/Pylons/waitress/pull/239 +- Fix the queue depth warnings to only show when all threads are busy. + See https://github.com/Pylons/waitress/pull/243 + 1.2.1 (2019-01-25) ------------------ diff --git a/waitress/compat.py b/waitress/compat.py index c758b89..913ee5c 100644 --- a/waitress/compat.py +++ b/waitress/compat.py @@ -66,17 +66,6 @@ else: def tobytes(s): return s -try: - from Queue import ( - Queue, - Empty, - ) -except ImportError: # pragma: no cover - from queue import ( - Queue, - Empty, - ) - if PY3: # pragma: no cover import builtins exec_ = getattr(builtins, "exec") diff --git a/waitress/task.py b/waitress/task.py index dc283ee..f0372aa 100644 --- a/waitress/task.py +++ b/waitress/task.py @@ -12,13 +12,14 @@ # ############################################################################## +from collections import deque import socket import sys import threading import time from .buffers import ReadOnlyFileBasedBuffer -from .compat import Empty, Queue, reraise, tobytes +from .compat import reraise, tobytes from .utilities import ( Forwarded, PROXY_HEADERS, @@ -49,6 +50,9 @@ hop_by_hop = frozenset(( class JustTesting(Exception): pass +ThreadIdle = 1 +ThreadBusy = 2 + class ThreadedTaskDispatcher(object): """A Task Dispatcher that creates a thread for each task. """ @@ -57,9 +61,9 @@ class ThreadedTaskDispatcher(object): queue_logger = queue_logger def __init__(self): - self.threads = {} # { thread number -> 1 } - self.queue = Queue() - self.thread_mgmt_lock = threading.Lock() + self.threads = {} # { thread number -> ThreadIdle or ThreadBusy } + self.queue = deque() + self.queue_lock = threading.Condition(threading.Lock()) def start_new_thread(self, target, args): t = threading.Thread(target=target, name='waitress', args=args) @@ -69,11 +73,18 @@ class ThreadedTaskDispatcher(object): def handler_thread(self, thread_no): threads = self.threads try: - while threads.get(thread_no): - task = self.queue.get() - if task is None: - # Special value: kill this thread. - break + while True: + with self.queue_lock: + while not self.queue and threads.get(thread_no): + threads[thread_no] = ThreadIdle + self.queue_lock.wait() + if not threads.get(thread_no): + break + task = self.queue.popleft() + if task is None: + # Special value: kill this thread. + break + threads[thread_no] = ThreadBusy try: task.service() except Exception as e: @@ -82,12 +93,12 @@ class ThreadedTaskDispatcher(object): if isinstance(e, JustTesting): break finally: - with self.thread_mgmt_lock: + with self.queue_lock: self.stop_count -= 1 threads.pop(thread_no, None) def set_thread_count(self, count): - with self.thread_mgmt_lock: + with self.queue_lock: threads = self.threads thread_no = 0 running = len(threads) - self.stop_count @@ -95,7 +106,7 @@ class ThreadedTaskDispatcher(object): # Start threads. while thread_no in threads: thread_no = thread_no + 1 - threads[thread_no] = 1 + threads[thread_no] = ThreadIdle running += 1 self.start_new_thread(self.handler_thread, (thread_no,)) thread_no = thread_no + 1 @@ -104,21 +115,23 @@ class ThreadedTaskDispatcher(object): to_stop = running - count self.stop_count += to_stop for n in range(to_stop): - self.queue.put(None) + self.queue.append(None) running -= 1 + self.queue_lock.notify(to_stop) def add_task(self, task): - queue_depth = self.queue.qsize() - if queue_depth > 0: - self.queue_logger.warning( - "Task queue depth is %d" % - queue_depth) try: task.defer() - self.queue.put(task) except: task.cancel() raise + with self.queue_lock: + self.queue.append(task) + self.queue_lock.notify() + if not any(x == ThreadIdle for x in self.threads.values()): + self.queue_logger.warning( + "Task queue depth is %d" % + len(self.queue)) def shutdown(self, cancel_pending=True, timeout=5): self.set_thread_count(0) @@ -134,14 +147,14 @@ class ThreadedTaskDispatcher(object): time.sleep(0.1) if cancel_pending: # Cancel remaining tasks. - try: + with self.queue_lock: queue = self.queue - while not queue.empty(): - task = queue.get() + while queue: + task = queue.popleft() if task is not None: task.cancel() - except Empty: # pragma: no cover - pass + threads.clear() + self.queue_lock.notify_all() return True return False diff --git a/waitress/tests/test_task.py b/waitress/tests/test_task.py index 23d92dc..ecb0b52 100644 --- a/waitress/tests/test_task.py +++ b/waitress/tests/test_task.py @@ -10,7 +10,7 @@ class TestThreadedTaskDispatcher(unittest.TestCase): def test_handler_thread_task_is_None(self): inst = self._makeOne() inst.threads[0] = True - inst.queue.put(None) + inst.queue.append(None) inst.handler_thread(0) self.assertEqual(inst.stop_count, -1) self.assertEqual(inst.threads, {}) @@ -22,12 +22,17 @@ class TestThreadedTaskDispatcher(unittest.TestCase): inst.logger = DummyLogger() task = DummyTask(JustTesting) inst.logger = DummyLogger() - inst.queue.put(task) + inst.queue.append(task) inst.handler_thread(0) self.assertEqual(inst.stop_count, -1) self.assertEqual(inst.threads, {}) self.assertEqual(len(inst.logger.logged), 1) + def test_handler_thread_exits_if_threadno_cleared(self): + inst = self._makeOne() + inst.handler_thread(0) + self.assertEqual(inst.stop_count, -1) + def test_set_thread_count_increase(self): inst = self._makeOne() L = [] @@ -47,8 +52,8 @@ class TestThreadedTaskDispatcher(unittest.TestCase): inst = self._makeOne() inst.threads = {'a': 1, 'b': 2} inst.set_thread_count(1) - self.assertEqual(inst.queue.qsize(), 1) - self.assertEqual(inst.queue.get(), None) + self.assertEqual(len(inst.queue), 1) + self.assertEqual(inst.queue.popleft(), None) def test_set_thread_count_same(self): inst = self._makeOne() @@ -62,7 +67,7 @@ class TestThreadedTaskDispatcher(unittest.TestCase): task = DummyTask() inst = self._makeOne() inst.add_task(task) - self.assertEqual(inst.queue.qsize(), 1) + self.assertEqual(len(inst.queue), 1) self.assertTrue(task.deferred) def test_log_queue_depth(self): @@ -70,15 +75,15 @@ class TestThreadedTaskDispatcher(unittest.TestCase): inst = self._makeOne() inst.queue_logger = DummyLogger() inst.add_task(task) - self.assertEqual(len(inst.queue_logger.logged), 0) - inst.add_task(task) self.assertEqual(len(inst.queue_logger.logged), 1) + inst.add_task(task) + self.assertEqual(len(inst.queue_logger.logged), 2) def test_add_task_defer_raises(self): task = DummyTask(ValueError) inst = self._makeOne() self.assertRaises(ValueError, inst.add_task, task) - self.assertEqual(inst.queue.qsize(), 0) + self.assertEqual(len(inst.queue), 0) self.assertTrue(task.deferred) self.assertTrue(task.cancelled) @@ -87,7 +92,7 @@ class TestThreadedTaskDispatcher(unittest.TestCase): inst.threads[0] = 1 inst.logger = DummyLogger() task = DummyTask() - inst.queue.put(task) + inst.queue.append(task) self.assertEqual(inst.shutdown(timeout=.01), True) self.assertEqual(inst.logger.logged, ['1 thread(s) still running']) self.assertEqual(task.cancelled, True) -- cgit v1.2.1