diff options
author | Michael Merickel <michael@merickel.org> | 2019-04-05 11:07:43 -0500 |
---|---|---|
committer | Michael Merickel <michael@merickel.org> | 2019-04-05 11:19:33 -0500 |
commit | c1b681d177db6019fea189d122cb51cdea489eae (patch) | |
tree | a9bac56b707d623c94f3de12ebd70e0ec9533614 | |
parent | b1b1d3e14bfc4cabf1b6993b2ce2bd374f8ee32e (diff) | |
download | waitress-c1b681d177db6019fea189d122cb51cdea489eae.tar.gz |
warn if there are more pending jobs than idle threads
-rw-r--r-- | CHANGES.txt | 1 | ||||
-rw-r--r-- | waitress/task.py | 113 | ||||
-rw-r--r-- | waitress/tests/test_task.py | 34 |
3 files changed, 67 insertions, 81 deletions
diff --git a/CHANGES.txt b/CHANGES.txt index 5a3b0c7..be24559 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -36,6 +36,7 @@ Bugfixes - Fix the queue depth warnings to only show when all threads are busy. See https://github.com/Pylons/waitress/pull/243 + and https://github.com/Pylons/waitress/pull/247 - Trigger the ``app_iter`` to close as part of shutdown. This will only be noticeable for users of the internal server api. In more typical operations diff --git a/waitress/task.py b/waitress/task.py index 15aa771..81e512f 100644 --- a/waitress/task.py +++ b/waitress/task.py @@ -51,14 +51,16 @@ class ThreadedTaskDispatcher(object): """A Task Dispatcher that creates a thread for each task. """ stop_count = 0 # Number of threads that will stop soon. - active = 0 # Number of currently active threads + active_count = 0 # Number of currently active threads logger = logger queue_logger = queue_logger def __init__(self): self.threads = set() self.queue = deque() - self.queue_lock = threading.Condition(threading.Lock()) + self.lock = threading.Lock() + self.queue_cv = threading.Condition(self.lock) + self.thread_exit_cv = threading.Condition(self.lock) def start_new_thread(self, target, args): t = threading.Thread(target=target, name='waitress', args=args) @@ -66,41 +68,31 @@ class ThreadedTaskDispatcher(object): t.start() def handler_thread(self, thread_no): - try: - # Upon starting this thread, mark ourselves as active - with self.queue_lock: - self.active += 1 - - while True: - with self.queue_lock: - while not self.queue and thread_no in self.threads: - # Mark ourselves as not active before waiting to be - # woken up, then we will once again be active - self.active -= 1 - self.queue_lock.wait() - self.active += 1 - - if thread_no not in self.threads: - break - - task = self.queue.popleft() - - if task is None: - # Special value: kill this thread. - break - try: - task.service() - except Exception: - self.logger.exception( - 'Exception when servicing %r', task) - finally: - with self.queue_lock: - self.active -= 1 - self.stop_count -= 1 - self.threads.discard(thread_no) + while True: + with self.lock: + while not self.queue and self.stop_count == 0: + # Mark ourselves as idle before waiting to be + # woken up, then we will once again be active + self.active_count -= 1 + self.queue_cv.wait() + self.active_count += 1 + + if self.stop_count > 0: + self.active_count -= 1 + self.stop_count -= 1 + self.threads.discard(thread_no) + self.thread_exit_cv.notify() + break + + task = self.queue.popleft() + try: + task.service() + except BaseException: + self.logger.exception( + 'Exception when servicing %r', task) def set_thread_count(self, count): - with self.queue_lock: + with self.lock: threads = self.threads thread_no = 0 running = len(threads) - self.stop_count @@ -111,48 +103,47 @@ class ThreadedTaskDispatcher(object): threads.add(thread_no) running += 1 self.start_new_thread(self.handler_thread, (thread_no,)) + self.active_count += 1 thread_no = thread_no + 1 if running > count: # Stop threads. - to_stop = running - count - self.stop_count += to_stop - for n in range(to_stop): - self.queue.append(None) - running -= 1 - self.queue_lock.notify(to_stop) + self.stop_count += running - count + self.queue_cv.notify_all() def add_task(self, task): - with self.queue_lock: + with self.lock: self.queue.append(task) - self.queue_lock.notify() - if self.active >= len(self.threads): + self.queue_cv.notify() + queue_size = len(self.queue) + idle_threads = ( + len(self.threads) - self.stop_count - self.active_count) + if queue_size > idle_threads: self.queue_logger.warning( - "Task queue depth is %d", - len(self.queue)) + "Task queue depth is %d", queue_size - idle_threads) def shutdown(self, cancel_pending=True, timeout=5): self.set_thread_count(0) # Ensure the threads shut down. threads = self.threads expiration = time.time() + timeout - while threads: - if time.time() >= expiration: - self.logger.warning( - "%d thread(s) still running" % - len(threads)) - break - time.sleep(0.1) - if cancel_pending: - # Cancel remaining tasks. - with self.queue_lock: + with self.lock: + while threads: + if time.time() >= expiration: + self.logger.warning( + "%d thread(s) still running", len(threads)) + break + self.thread_exit_cv.wait(0.1) + if cancel_pending: + # Cancel remaining tasks. queue = self.queue + if len(queue) > 0: + self.logger.warning( + "Canceling %d pending task(s)", len(queue)) while queue: task = queue.popleft() - if task is not None: - task.cancel() - threads.clear() - self.queue_lock.notify_all() - return True + task.cancel() + self.queue_cv.notify_all() + return True return False class Task(object): diff --git a/waitress/tests/test_task.py b/waitress/tests/test_task.py index e1415ef..ffc34b7 100644 --- a/waitress/tests/test_task.py +++ b/waitress/tests/test_task.py @@ -7,14 +7,6 @@ class TestThreadedTaskDispatcher(unittest.TestCase): from waitress.task import ThreadedTaskDispatcher return ThreadedTaskDispatcher() - def test_handler_thread_task_is_None(self): - inst = self._makeOne() - inst.threads.add(0) - inst.queue.append(None) - inst.handler_thread(0) - self.assertEqual(inst.stop_count, -1) - self.assertEqual(inst.threads, set()) - def test_handler_thread_task_raises(self): inst = self._makeOne() inst.threads.add(0) @@ -22,21 +14,18 @@ class TestThreadedTaskDispatcher(unittest.TestCase): class BadDummyTask(DummyTask): def service(self): super(BadDummyTask, self).service() - inst.threads.clear() + inst.stop_count += 1 raise Exception task = BadDummyTask() inst.logger = DummyLogger() inst.queue.append(task) + inst.active_count += 1 inst.handler_thread(0) - self.assertEqual(inst.stop_count, -1) + self.assertEqual(inst.stop_count, 0) + self.assertEqual(inst.active_count, 0) self.assertEqual(inst.threads, set()) self.assertEqual(len(inst.logger.logged), 1) - def test_handler_thread_exits_if_threadno_cleared(self): - inst = self._makeOne() - inst.handler_thread(0) - self.assertEqual(inst.stop_count, -1) - def test_set_thread_count_increase(self): inst = self._makeOne() L = [] @@ -56,8 +45,7 @@ class TestThreadedTaskDispatcher(unittest.TestCase): inst = self._makeOne() inst.threads = {0, 1} inst.set_thread_count(1) - self.assertEqual(len(inst.queue), 1) - self.assertEqual(inst.queue.popleft(), None) + self.assertEqual(inst.stop_count, 1) def test_set_thread_count_same(self): inst = self._makeOne() @@ -67,13 +55,16 @@ class TestThreadedTaskDispatcher(unittest.TestCase): inst.set_thread_count(1) self.assertEqual(L, []) - def test_add_task(self): + def test_add_task_with_idle_threads(self): task = DummyTask() inst = self._makeOne() + inst.threads.add(0) + inst.queue_logger = DummyLogger() inst.add_task(task) self.assertEqual(len(inst.queue), 1) + self.assertEqual(len(inst.queue_logger.logged), 0) - def test_log_queue_depth(self): + def test_add_task_with_all_busy_threads(self): task = DummyTask() inst = self._makeOne() inst.queue_logger = DummyLogger() @@ -89,7 +80,10 @@ class TestThreadedTaskDispatcher(unittest.TestCase): task = DummyTask() inst.queue.append(task) self.assertEqual(inst.shutdown(timeout=.01), True) - self.assertEqual(inst.logger.logged, ['1 thread(s) still running']) + self.assertEqual(inst.logger.logged, [ + '1 thread(s) still running', + 'Canceling 1 pending task(s)', + ]) self.assertEqual(task.cancelled, True) def test_shutdown_no_threads(self): |