summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBert JW Regeer <xistence@0x58.com>2019-04-05 10:29:29 -0600
committerGitHub <noreply@github.com>2019-04-05 10:29:29 -0600
commit5bd69e9545718913bd2be0b4ab04f7da3d9f74c8 (patch)
treea9bac56b707d623c94f3de12ebd70e0ec9533614
parentb1b1d3e14bfc4cabf1b6993b2ce2bd374f8ee32e (diff)
parentc1b681d177db6019fea189d122cb51cdea489eae (diff)
downloadwaitress-5bd69e9545718913bd2be0b4ab04f7da3d9f74c8.tar.gz
Merge pull request #247 from Pylons/idle-warning-redux-2
warn if there are more pending jobs than idle threads
-rw-r--r--CHANGES.txt1
-rw-r--r--waitress/task.py113
-rw-r--r--waitress/tests/test_task.py34
3 files changed, 67 insertions, 81 deletions
diff --git a/CHANGES.txt b/CHANGES.txt
index 5a3b0c7..be24559 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -36,6 +36,7 @@ Bugfixes
- Fix the queue depth warnings to only show when all threads are busy.
See https://github.com/Pylons/waitress/pull/243
+ and https://github.com/Pylons/waitress/pull/247
- Trigger the ``app_iter`` to close as part of shutdown. This will only be
noticeable for users of the internal server api. In more typical operations
diff --git a/waitress/task.py b/waitress/task.py
index 15aa771..81e512f 100644
--- a/waitress/task.py
+++ b/waitress/task.py
@@ -51,14 +51,16 @@ class ThreadedTaskDispatcher(object):
"""A Task Dispatcher that creates a thread for each task.
"""
stop_count = 0 # Number of threads that will stop soon.
- active = 0 # Number of currently active threads
+ active_count = 0 # Number of currently active threads
logger = logger
queue_logger = queue_logger
def __init__(self):
self.threads = set()
self.queue = deque()
- self.queue_lock = threading.Condition(threading.Lock())
+ self.lock = threading.Lock()
+ self.queue_cv = threading.Condition(self.lock)
+ self.thread_exit_cv = threading.Condition(self.lock)
def start_new_thread(self, target, args):
t = threading.Thread(target=target, name='waitress', args=args)
@@ -66,41 +68,31 @@ class ThreadedTaskDispatcher(object):
t.start()
def handler_thread(self, thread_no):
- try:
- # Upon starting this thread, mark ourselves as active
- with self.queue_lock:
- self.active += 1
-
- while True:
- with self.queue_lock:
- while not self.queue and thread_no in self.threads:
- # Mark ourselves as not active before waiting to be
- # woken up, then we will once again be active
- self.active -= 1
- self.queue_lock.wait()
- self.active += 1
-
- if thread_no not in self.threads:
- break
-
- task = self.queue.popleft()
-
- if task is None:
- # Special value: kill this thread.
- break
- try:
- task.service()
- except Exception:
- self.logger.exception(
- 'Exception when servicing %r', task)
- finally:
- with self.queue_lock:
- self.active -= 1
- self.stop_count -= 1
- self.threads.discard(thread_no)
+ while True:
+ with self.lock:
+ while not self.queue and self.stop_count == 0:
+ # Mark ourselves as idle before waiting to be
+ # woken up, then we will once again be active
+ self.active_count -= 1
+ self.queue_cv.wait()
+ self.active_count += 1
+
+ if self.stop_count > 0:
+ self.active_count -= 1
+ self.stop_count -= 1
+ self.threads.discard(thread_no)
+ self.thread_exit_cv.notify()
+ break
+
+ task = self.queue.popleft()
+ try:
+ task.service()
+ except BaseException:
+ self.logger.exception(
+ 'Exception when servicing %r', task)
def set_thread_count(self, count):
- with self.queue_lock:
+ with self.lock:
threads = self.threads
thread_no = 0
running = len(threads) - self.stop_count
@@ -111,48 +103,47 @@ class ThreadedTaskDispatcher(object):
threads.add(thread_no)
running += 1
self.start_new_thread(self.handler_thread, (thread_no,))
+ self.active_count += 1
thread_no = thread_no + 1
if running > count:
# Stop threads.
- to_stop = running - count
- self.stop_count += to_stop
- for n in range(to_stop):
- self.queue.append(None)
- running -= 1
- self.queue_lock.notify(to_stop)
+ self.stop_count += running - count
+ self.queue_cv.notify_all()
def add_task(self, task):
- with self.queue_lock:
+ with self.lock:
self.queue.append(task)
- self.queue_lock.notify()
- if self.active >= len(self.threads):
+ self.queue_cv.notify()
+ queue_size = len(self.queue)
+ idle_threads = (
+ len(self.threads) - self.stop_count - self.active_count)
+ if queue_size > idle_threads:
self.queue_logger.warning(
- "Task queue depth is %d",
- len(self.queue))
+ "Task queue depth is %d", queue_size - idle_threads)
def shutdown(self, cancel_pending=True, timeout=5):
self.set_thread_count(0)
# Ensure the threads shut down.
threads = self.threads
expiration = time.time() + timeout
- while threads:
- if time.time() >= expiration:
- self.logger.warning(
- "%d thread(s) still running" %
- len(threads))
- break
- time.sleep(0.1)
- if cancel_pending:
- # Cancel remaining tasks.
- with self.queue_lock:
+ with self.lock:
+ while threads:
+ if time.time() >= expiration:
+ self.logger.warning(
+ "%d thread(s) still running", len(threads))
+ break
+ self.thread_exit_cv.wait(0.1)
+ if cancel_pending:
+ # Cancel remaining tasks.
queue = self.queue
+ if len(queue) > 0:
+ self.logger.warning(
+ "Canceling %d pending task(s)", len(queue))
while queue:
task = queue.popleft()
- if task is not None:
- task.cancel()
- threads.clear()
- self.queue_lock.notify_all()
- return True
+ task.cancel()
+ self.queue_cv.notify_all()
+ return True
return False
class Task(object):
diff --git a/waitress/tests/test_task.py b/waitress/tests/test_task.py
index e1415ef..ffc34b7 100644
--- a/waitress/tests/test_task.py
+++ b/waitress/tests/test_task.py
@@ -7,14 +7,6 @@ class TestThreadedTaskDispatcher(unittest.TestCase):
from waitress.task import ThreadedTaskDispatcher
return ThreadedTaskDispatcher()
- def test_handler_thread_task_is_None(self):
- inst = self._makeOne()
- inst.threads.add(0)
- inst.queue.append(None)
- inst.handler_thread(0)
- self.assertEqual(inst.stop_count, -1)
- self.assertEqual(inst.threads, set())
-
def test_handler_thread_task_raises(self):
inst = self._makeOne()
inst.threads.add(0)
@@ -22,21 +14,18 @@ class TestThreadedTaskDispatcher(unittest.TestCase):
class BadDummyTask(DummyTask):
def service(self):
super(BadDummyTask, self).service()
- inst.threads.clear()
+ inst.stop_count += 1
raise Exception
task = BadDummyTask()
inst.logger = DummyLogger()
inst.queue.append(task)
+ inst.active_count += 1
inst.handler_thread(0)
- self.assertEqual(inst.stop_count, -1)
+ self.assertEqual(inst.stop_count, 0)
+ self.assertEqual(inst.active_count, 0)
self.assertEqual(inst.threads, set())
self.assertEqual(len(inst.logger.logged), 1)
- def test_handler_thread_exits_if_threadno_cleared(self):
- inst = self._makeOne()
- inst.handler_thread(0)
- self.assertEqual(inst.stop_count, -1)
-
def test_set_thread_count_increase(self):
inst = self._makeOne()
L = []
@@ -56,8 +45,7 @@ class TestThreadedTaskDispatcher(unittest.TestCase):
inst = self._makeOne()
inst.threads = {0, 1}
inst.set_thread_count(1)
- self.assertEqual(len(inst.queue), 1)
- self.assertEqual(inst.queue.popleft(), None)
+ self.assertEqual(inst.stop_count, 1)
def test_set_thread_count_same(self):
inst = self._makeOne()
@@ -67,13 +55,16 @@ class TestThreadedTaskDispatcher(unittest.TestCase):
inst.set_thread_count(1)
self.assertEqual(L, [])
- def test_add_task(self):
+ def test_add_task_with_idle_threads(self):
task = DummyTask()
inst = self._makeOne()
+ inst.threads.add(0)
+ inst.queue_logger = DummyLogger()
inst.add_task(task)
self.assertEqual(len(inst.queue), 1)
+ self.assertEqual(len(inst.queue_logger.logged), 0)
- def test_log_queue_depth(self):
+ def test_add_task_with_all_busy_threads(self):
task = DummyTask()
inst = self._makeOne()
inst.queue_logger = DummyLogger()
@@ -89,7 +80,10 @@ class TestThreadedTaskDispatcher(unittest.TestCase):
task = DummyTask()
inst.queue.append(task)
self.assertEqual(inst.shutdown(timeout=.01), True)
- self.assertEqual(inst.logger.logged, ['1 thread(s) still running'])
+ self.assertEqual(inst.logger.logged, [
+ '1 thread(s) still running',
+ 'Canceling 1 pending task(s)',
+ ])
self.assertEqual(task.cancelled, True)
def test_shutdown_no_threads(self):