diff options
author | Michael Merickel <github@m.merickel.org> | 2019-04-02 23:50:01 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-04-02 23:50:01 -0500 |
commit | 055b226f7cc93e673a966c8b826e1b5eaee05756 (patch) | |
tree | 74dd74575d9dfec95167ec84a528d8a9ddc3118a | |
parent | c083d0b7a645fe75c6a0ab0393be844cf2caf046 (diff) | |
parent | 69c929ff00ba43638439772fdb5bfc983d445848 (diff) | |
download | waitress-055b226f7cc93e673a966c8b826e1b5eaee05756.tar.gz |
Merge pull request #244 from Pylons/queue-depth-warnings-int
Queue depth warnings (using integer inc/dec for active)
-rw-r--r-- | waitress/channel.py | 1 | ||||
-rw-r--r-- | waitress/task.py | 39 | ||||
-rw-r--r-- | waitress/tests/test_task.py | 20 |
3 files changed, 33 insertions, 27 deletions
diff --git a/waitress/channel.py b/waitress/channel.py index fd5a3f5..199ce2a 100644 --- a/waitress/channel.py +++ b/waitress/channel.py @@ -270,7 +270,6 @@ class HTTPChannel(wasyncore.dispatcher, object): return False def handle_close(self): - # NB: default to True for when asyncore calls this function directly with self.outbuf_lock: for outbuf in self.outbufs: try: diff --git a/waitress/task.py b/waitress/task.py index a252de5..315a9c2 100644 --- a/waitress/task.py +++ b/waitress/task.py @@ -47,18 +47,16 @@ hop_by_hop = frozenset(( )) -ThreadIdle = 1 -ThreadBusy = 2 - class ThreadedTaskDispatcher(object): """A Task Dispatcher that creates a thread for each task. """ - stop_count = 0 # Number of threads that will stop soon. + stop_count = 0 # Number of threads that will stop soon. + active = 0 # Number of currently active threads logger = logger queue_logger = queue_logger def __init__(self): - self.threads = {} # { thread number -> ThreadIdle or ThreadBusy } + self.threads = set() self.queue = deque() self.queue_lock = threading.Condition(threading.Lock()) @@ -68,29 +66,38 @@ class ThreadedTaskDispatcher(object): t.start() def handler_thread(self, thread_no): - threads = self.threads try: + # Upon starting this thread, mark ourselves as active + with self.queue_lock: + self.active += 1 + while True: with self.queue_lock: - while not self.queue and threads.get(thread_no): - threads[thread_no] = ThreadIdle + while not self.queue and thread_no in self.threads: + # Mark ourselves as not active before waiting to be + # woken up, then we will once again be active + self.active -= 1 self.queue_lock.wait() - if not threads.get(thread_no): + self.active += 1 + + if thread_no not in self.threads: break + task = self.queue.popleft() + if task is None: # Special value: kill this thread. break - threads[thread_no] = ThreadBusy try: task.service() - except Exception as e: + except Exception: self.logger.exception( - 'Exception when servicing %r' % task) + 'Exception when servicing %r', task) finally: with self.queue_lock: + self.active -= 1 self.stop_count -= 1 - threads.pop(thread_no, None) + self.threads.discard(thread_no) def set_thread_count(self, count): with self.queue_lock: @@ -101,7 +108,7 @@ class ThreadedTaskDispatcher(object): # Start threads. while thread_no in threads: thread_no = thread_no + 1 - threads[thread_no] = ThreadIdle + threads.add(thread_no) running += 1 self.start_new_thread(self.handler_thread, (thread_no,)) thread_no = thread_no + 1 @@ -123,9 +130,9 @@ class ThreadedTaskDispatcher(object): with self.queue_lock: self.queue.append(task) self.queue_lock.notify() - if not any(x == ThreadIdle for x in self.threads.values()): + if self.active >= len(self.threads): self.queue_logger.warning( - "Task queue depth is %d" % + "Task queue depth is %d", len(self.queue)) def shutdown(self, cancel_pending=True, timeout=5): diff --git a/waitress/tests/test_task.py b/waitress/tests/test_task.py index 6045bf7..f3f14d0 100644 --- a/waitress/tests/test_task.py +++ b/waitress/tests/test_task.py @@ -9,15 +9,15 @@ class TestThreadedTaskDispatcher(unittest.TestCase): def test_handler_thread_task_is_None(self): inst = self._makeOne() - inst.threads[0] = 1 + inst.threads.add(0) inst.queue.append(None) inst.handler_thread(0) self.assertEqual(inst.stop_count, -1) - self.assertEqual(inst.threads, {}) + self.assertEqual(inst.threads, set()) def test_handler_thread_task_raises(self): inst = self._makeOne() - inst.threads[0] = 1 + inst.threads.add(0) inst.logger = DummyLogger() class BadDummyTask(DummyTask): def service(self): @@ -29,7 +29,7 @@ class TestThreadedTaskDispatcher(unittest.TestCase): inst.queue.append(task) inst.handler_thread(0) self.assertEqual(inst.stop_count, -1) - self.assertEqual(inst.threads, {}) + self.assertEqual(inst.threads, set()) self.assertEqual(len(inst.logger.logged), 1) def test_handler_thread_exits_if_threadno_cleared(self): @@ -47,14 +47,14 @@ class TestThreadedTaskDispatcher(unittest.TestCase): def test_set_thread_count_increase_with_existing(self): inst = self._makeOne() L = [] - inst.threads = {0: 1} + inst.threads = {0} inst.start_new_thread = lambda *x: L.append(x) inst.set_thread_count(2) self.assertEqual(L, [(inst.handler_thread, (1,))]) def test_set_thread_count_decrease(self): inst = self._makeOne() - inst.threads = {'a': 1, 'b': 2} + inst.threads = {0, 1} inst.set_thread_count(1) self.assertEqual(len(inst.queue), 1) self.assertEqual(inst.queue.popleft(), None) @@ -63,7 +63,7 @@ class TestThreadedTaskDispatcher(unittest.TestCase): inst = self._makeOne() L = [] inst.start_new_thread = lambda *x: L.append(x) - inst.threads = {0: 1} + inst.threads = {0} inst.set_thread_count(1) self.assertEqual(L, []) @@ -97,7 +97,7 @@ class TestThreadedTaskDispatcher(unittest.TestCase): def test_shutdown_one_thread(self): inst = self._makeOne() - inst.threads[0] = 1 + inst.threads.add(0) inst.logger = DummyLogger() task = DummyTask() inst.queue.append(task) @@ -1555,5 +1555,5 @@ class DummyLogger(object): def warning(self, msg, *args): self.logged.append(msg % args) - def exception(self, msg): - self.logged.append(msg) + def exception(self, msg, *args): + self.logged.append(msg % args) |