diff options
author | Bert JW Regeer <xistence@0x58.com> | 2019-04-02 23:53:29 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-04-02 23:53:29 -0600 |
commit | d060d2424292a5bf566e091baad9974a0835896b (patch) | |
tree | 74dd74575d9dfec95167ec84a528d8a9ddc3118a | |
parent | a4135c0b2b36be2a224a1155defa486d9871ffc4 (diff) | |
parent | 055b226f7cc93e673a966c8b826e1b5eaee05756 (diff) | |
download | waitress-d060d2424292a5bf566e091baad9974a0835896b.tar.gz |
Merge pull request #243 from Pylons/queue-depth-warnings
adjust queue depth warnings to emit when all threads are busy
-rw-r--r-- | CHANGES.txt | 3 | ||||
-rw-r--r-- | waitress/channel.py | 1 | ||||
-rw-r--r-- | waitress/compat.py | 11 | ||||
-rw-r--r-- | waitress/task.py | 83 | ||||
-rw-r--r-- | waitress/tests/test_task.py | 64 |
5 files changed, 87 insertions, 75 deletions
diff --git a/CHANGES.txt b/CHANGES.txt index 4c583e2..cbba247 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -16,6 +16,9 @@ Bugfixes causing them to reopen. See https://github.com/Pylons/waitress/pull/239 +- Fix the queue depth warnings to only show when all threads are busy. + See https://github.com/Pylons/waitress/pull/243 + 1.2.1 (2019-01-25) ------------------ diff --git a/waitress/channel.py b/waitress/channel.py index fd5a3f5..199ce2a 100644 --- a/waitress/channel.py +++ b/waitress/channel.py @@ -270,7 +270,6 @@ class HTTPChannel(wasyncore.dispatcher, object): return False def handle_close(self): - # NB: default to True for when asyncore calls this function directly with self.outbuf_lock: for outbuf in self.outbufs: try: diff --git a/waitress/compat.py b/waitress/compat.py index c758b89..913ee5c 100644 --- a/waitress/compat.py +++ b/waitress/compat.py @@ -66,17 +66,6 @@ else: def tobytes(s): return s -try: - from Queue import ( - Queue, - Empty, - ) -except ImportError: # pragma: no cover - from queue import ( - Queue, - Empty, - ) - if PY3: # pragma: no cover import builtins exec_ = getattr(builtins, "exec") diff --git a/waitress/task.py b/waitress/task.py index dc283ee..315a9c2 100644 --- a/waitress/task.py +++ b/waitress/task.py @@ -12,13 +12,14 @@ # ############################################################################## +from collections import deque import socket import sys import threading import time from .buffers import ReadOnlyFileBasedBuffer -from .compat import Empty, Queue, reraise, tobytes +from .compat import reraise, tobytes from .utilities import ( Forwarded, PROXY_HEADERS, @@ -46,20 +47,18 @@ hop_by_hop = frozenset(( )) -class JustTesting(Exception): - pass - class ThreadedTaskDispatcher(object): """A Task Dispatcher that creates a thread for each task. """ - stop_count = 0 # Number of threads that will stop soon. + stop_count = 0 # Number of threads that will stop soon. + active = 0 # Number of currently active threads logger = logger queue_logger = queue_logger def __init__(self): - self.threads = {} # { thread number -> 1 } - self.queue = Queue() - self.thread_mgmt_lock = threading.Lock() + self.threads = set() + self.queue = deque() + self.queue_lock = threading.Condition(threading.Lock()) def start_new_thread(self, target, args): t = threading.Thread(target=target, name='waitress', args=args) @@ -67,27 +66,41 @@ class ThreadedTaskDispatcher(object): t.start() def handler_thread(self, thread_no): - threads = self.threads try: - while threads.get(thread_no): - task = self.queue.get() - if task is None: - # Special value: kill this thread. - break + # Upon starting this thread, mark ourselves as active + with self.queue_lock: + self.active += 1 + + while True: + with self.queue_lock: + while not self.queue and thread_no in self.threads: + # Mark ourselves as not active before waiting to be + # woken up, then we will once again be active + self.active -= 1 + self.queue_lock.wait() + self.active += 1 + + if thread_no not in self.threads: + break + + task = self.queue.popleft() + + if task is None: + # Special value: kill this thread. + break try: task.service() - except Exception as e: + except Exception: self.logger.exception( - 'Exception when servicing %r' % task) - if isinstance(e, JustTesting): - break + 'Exception when servicing %r', task) finally: - with self.thread_mgmt_lock: + with self.queue_lock: + self.active -= 1 self.stop_count -= 1 - threads.pop(thread_no, None) + self.threads.discard(thread_no) def set_thread_count(self, count): - with self.thread_mgmt_lock: + with self.queue_lock: threads = self.threads thread_no = 0 running = len(threads) - self.stop_count @@ -95,7 +108,7 @@ class ThreadedTaskDispatcher(object): # Start threads. while thread_no in threads: thread_no = thread_no + 1 - threads[thread_no] = 1 + threads.add(thread_no) running += 1 self.start_new_thread(self.handler_thread, (thread_no,)) thread_no = thread_no + 1 @@ -104,21 +117,23 @@ class ThreadedTaskDispatcher(object): to_stop = running - count self.stop_count += to_stop for n in range(to_stop): - self.queue.put(None) + self.queue.append(None) running -= 1 + self.queue_lock.notify(to_stop) def add_task(self, task): - queue_depth = self.queue.qsize() - if queue_depth > 0: - self.queue_logger.warning( - "Task queue depth is %d" % - queue_depth) try: task.defer() - self.queue.put(task) except: task.cancel() raise + with self.queue_lock: + self.queue.append(task) + self.queue_lock.notify() + if self.active >= len(self.threads): + self.queue_logger.warning( + "Task queue depth is %d", + len(self.queue)) def shutdown(self, cancel_pending=True, timeout=5): self.set_thread_count(0) @@ -134,14 +149,14 @@ class ThreadedTaskDispatcher(object): time.sleep(0.1) if cancel_pending: # Cancel remaining tasks. - try: + with self.queue_lock: queue = self.queue - while not queue.empty(): - task = queue.get() + while queue: + task = queue.popleft() if task is not None: task.cancel() - except Empty: # pragma: no cover - pass + threads.clear() + self.queue_lock.notify_all() return True return False diff --git a/waitress/tests/test_task.py b/waitress/tests/test_task.py index 23d92dc..f3f14d0 100644 --- a/waitress/tests/test_task.py +++ b/waitress/tests/test_task.py @@ -9,25 +9,34 @@ class TestThreadedTaskDispatcher(unittest.TestCase): def test_handler_thread_task_is_None(self): inst = self._makeOne() - inst.threads[0] = True - inst.queue.put(None) + inst.threads.add(0) + inst.queue.append(None) inst.handler_thread(0) self.assertEqual(inst.stop_count, -1) - self.assertEqual(inst.threads, {}) + self.assertEqual(inst.threads, set()) def test_handler_thread_task_raises(self): - from waitress.task import JustTesting inst = self._makeOne() - inst.threads[0] = True + inst.threads.add(0) inst.logger = DummyLogger() - task = DummyTask(JustTesting) + class BadDummyTask(DummyTask): + def service(self): + super(BadDummyTask, self).service() + inst.threads.clear() + raise Exception + task = BadDummyTask() inst.logger = DummyLogger() - inst.queue.put(task) + inst.queue.append(task) inst.handler_thread(0) self.assertEqual(inst.stop_count, -1) - self.assertEqual(inst.threads, {}) + self.assertEqual(inst.threads, set()) self.assertEqual(len(inst.logger.logged), 1) + def test_handler_thread_exits_if_threadno_cleared(self): + inst = self._makeOne() + inst.handler_thread(0) + self.assertEqual(inst.stop_count, -1) + def test_set_thread_count_increase(self): inst = self._makeOne() L = [] @@ -38,23 +47,23 @@ class TestThreadedTaskDispatcher(unittest.TestCase): def test_set_thread_count_increase_with_existing(self): inst = self._makeOne() L = [] - inst.threads = {0: 1} + inst.threads = {0} inst.start_new_thread = lambda *x: L.append(x) inst.set_thread_count(2) self.assertEqual(L, [(inst.handler_thread, (1,))]) def test_set_thread_count_decrease(self): inst = self._makeOne() - inst.threads = {'a': 1, 'b': 2} + inst.threads = {0, 1} inst.set_thread_count(1) - self.assertEqual(inst.queue.qsize(), 1) - self.assertEqual(inst.queue.get(), None) + self.assertEqual(len(inst.queue), 1) + self.assertEqual(inst.queue.popleft(), None) def test_set_thread_count_same(self): inst = self._makeOne() L = [] inst.start_new_thread = lambda *x: L.append(x) - inst.threads = {0: 1} + inst.threads = {0} inst.set_thread_count(1) self.assertEqual(L, []) @@ -62,7 +71,7 @@ class TestThreadedTaskDispatcher(unittest.TestCase): task = DummyTask() inst = self._makeOne() inst.add_task(task) - self.assertEqual(inst.queue.qsize(), 1) + self.assertEqual(len(inst.queue), 1) self.assertTrue(task.deferred) def test_log_queue_depth(self): @@ -70,24 +79,28 @@ class TestThreadedTaskDispatcher(unittest.TestCase): inst = self._makeOne() inst.queue_logger = DummyLogger() inst.add_task(task) - self.assertEqual(len(inst.queue_logger.logged), 0) - inst.add_task(task) self.assertEqual(len(inst.queue_logger.logged), 1) + inst.add_task(task) + self.assertEqual(len(inst.queue_logger.logged), 2) def test_add_task_defer_raises(self): - task = DummyTask(ValueError) + class BadDummyTask(DummyTask): + def defer(self): + super(BadDummyTask, self).defer() + raise ValueError + task = BadDummyTask() inst = self._makeOne() self.assertRaises(ValueError, inst.add_task, task) - self.assertEqual(inst.queue.qsize(), 0) + self.assertEqual(len(inst.queue), 0) self.assertTrue(task.deferred) self.assertTrue(task.cancelled) def test_shutdown_one_thread(self): inst = self._makeOne() - inst.threads[0] = 1 + inst.threads.add(0) inst.logger = DummyLogger() task = DummyTask() - inst.queue.put(task) + inst.queue.append(task) self.assertEqual(inst.shutdown(timeout=.01), True) self.assertEqual(inst.logger.logged, ['1 thread(s) still running']) self.assertEqual(task.cancelled, True) @@ -1468,18 +1481,11 @@ class DummyTask(object): deferred = False cancelled = False - def __init__(self, toraise=None): - self.toraise = toraise - def service(self): self.serviced = True - if self.toraise: - raise self.toraise def defer(self): self.deferred = True - if self.toraise: - raise self.toraise def cancel(self): self.cancelled = True @@ -1549,5 +1555,5 @@ class DummyLogger(object): def warning(self, msg, *args): self.logged.append(msg % args) - def exception(self, msg): - self.logged.append(msg) + def exception(self, msg, *args): + self.logged.append(msg % args) |