summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Merickel <github@m.merickel.org>2019-04-02 23:50:01 -0500
committerGitHub <noreply@github.com>2019-04-02 23:50:01 -0500
commit055b226f7cc93e673a966c8b826e1b5eaee05756 (patch)
tree74dd74575d9dfec95167ec84a528d8a9ddc3118a
parentc083d0b7a645fe75c6a0ab0393be844cf2caf046 (diff)
parent69c929ff00ba43638439772fdb5bfc983d445848 (diff)
downloadwaitress-055b226f7cc93e673a966c8b826e1b5eaee05756.tar.gz
Merge pull request #244 from Pylons/queue-depth-warnings-int
Queue depth warnings (using integer inc/dec for active)
-rw-r--r--waitress/channel.py1
-rw-r--r--waitress/task.py39
-rw-r--r--waitress/tests/test_task.py20
3 files changed, 33 insertions, 27 deletions
diff --git a/waitress/channel.py b/waitress/channel.py
index fd5a3f5..199ce2a 100644
--- a/waitress/channel.py
+++ b/waitress/channel.py
@@ -270,7 +270,6 @@ class HTTPChannel(wasyncore.dispatcher, object):
return False
def handle_close(self):
- # NB: default to True for when asyncore calls this function directly
with self.outbuf_lock:
for outbuf in self.outbufs:
try:
diff --git a/waitress/task.py b/waitress/task.py
index a252de5..315a9c2 100644
--- a/waitress/task.py
+++ b/waitress/task.py
@@ -47,18 +47,16 @@ hop_by_hop = frozenset((
))
-ThreadIdle = 1
-ThreadBusy = 2
-
class ThreadedTaskDispatcher(object):
"""A Task Dispatcher that creates a thread for each task.
"""
- stop_count = 0 # Number of threads that will stop soon.
+ stop_count = 0 # Number of threads that will stop soon.
+ active = 0 # Number of currently active threads
logger = logger
queue_logger = queue_logger
def __init__(self):
- self.threads = {} # { thread number -> ThreadIdle or ThreadBusy }
+ self.threads = set()
self.queue = deque()
self.queue_lock = threading.Condition(threading.Lock())
@@ -68,29 +66,38 @@ class ThreadedTaskDispatcher(object):
t.start()
def handler_thread(self, thread_no):
- threads = self.threads
try:
+ # Upon starting this thread, mark ourselves as active
+ with self.queue_lock:
+ self.active += 1
+
while True:
with self.queue_lock:
- while not self.queue and threads.get(thread_no):
- threads[thread_no] = ThreadIdle
+ while not self.queue and thread_no in self.threads:
+ # Mark ourselves as not active before waiting to be
+ # woken up, then we will once again be active
+ self.active -= 1
self.queue_lock.wait()
- if not threads.get(thread_no):
+ self.active += 1
+
+ if thread_no not in self.threads:
break
+
task = self.queue.popleft()
+
if task is None:
# Special value: kill this thread.
break
- threads[thread_no] = ThreadBusy
try:
task.service()
- except Exception as e:
+ except Exception:
self.logger.exception(
- 'Exception when servicing %r' % task)
+ 'Exception when servicing %r', task)
finally:
with self.queue_lock:
+ self.active -= 1
self.stop_count -= 1
- threads.pop(thread_no, None)
+ self.threads.discard(thread_no)
def set_thread_count(self, count):
with self.queue_lock:
@@ -101,7 +108,7 @@ class ThreadedTaskDispatcher(object):
# Start threads.
while thread_no in threads:
thread_no = thread_no + 1
- threads[thread_no] = ThreadIdle
+ threads.add(thread_no)
running += 1
self.start_new_thread(self.handler_thread, (thread_no,))
thread_no = thread_no + 1
@@ -123,9 +130,9 @@ class ThreadedTaskDispatcher(object):
with self.queue_lock:
self.queue.append(task)
self.queue_lock.notify()
- if not any(x == ThreadIdle for x in self.threads.values()):
+ if self.active >= len(self.threads):
self.queue_logger.warning(
- "Task queue depth is %d" %
+ "Task queue depth is %d",
len(self.queue))
def shutdown(self, cancel_pending=True, timeout=5):
diff --git a/waitress/tests/test_task.py b/waitress/tests/test_task.py
index 6045bf7..f3f14d0 100644
--- a/waitress/tests/test_task.py
+++ b/waitress/tests/test_task.py
@@ -9,15 +9,15 @@ class TestThreadedTaskDispatcher(unittest.TestCase):
def test_handler_thread_task_is_None(self):
inst = self._makeOne()
- inst.threads[0] = 1
+ inst.threads.add(0)
inst.queue.append(None)
inst.handler_thread(0)
self.assertEqual(inst.stop_count, -1)
- self.assertEqual(inst.threads, {})
+ self.assertEqual(inst.threads, set())
def test_handler_thread_task_raises(self):
inst = self._makeOne()
- inst.threads[0] = 1
+ inst.threads.add(0)
inst.logger = DummyLogger()
class BadDummyTask(DummyTask):
def service(self):
@@ -29,7 +29,7 @@ class TestThreadedTaskDispatcher(unittest.TestCase):
inst.queue.append(task)
inst.handler_thread(0)
self.assertEqual(inst.stop_count, -1)
- self.assertEqual(inst.threads, {})
+ self.assertEqual(inst.threads, set())
self.assertEqual(len(inst.logger.logged), 1)
def test_handler_thread_exits_if_threadno_cleared(self):
@@ -47,14 +47,14 @@ class TestThreadedTaskDispatcher(unittest.TestCase):
def test_set_thread_count_increase_with_existing(self):
inst = self._makeOne()
L = []
- inst.threads = {0: 1}
+ inst.threads = {0}
inst.start_new_thread = lambda *x: L.append(x)
inst.set_thread_count(2)
self.assertEqual(L, [(inst.handler_thread, (1,))])
def test_set_thread_count_decrease(self):
inst = self._makeOne()
- inst.threads = {'a': 1, 'b': 2}
+ inst.threads = {0, 1}
inst.set_thread_count(1)
self.assertEqual(len(inst.queue), 1)
self.assertEqual(inst.queue.popleft(), None)
@@ -63,7 +63,7 @@ class TestThreadedTaskDispatcher(unittest.TestCase):
inst = self._makeOne()
L = []
inst.start_new_thread = lambda *x: L.append(x)
- inst.threads = {0: 1}
+ inst.threads = {0}
inst.set_thread_count(1)
self.assertEqual(L, [])
@@ -97,7 +97,7 @@ class TestThreadedTaskDispatcher(unittest.TestCase):
def test_shutdown_one_thread(self):
inst = self._makeOne()
- inst.threads[0] = 1
+ inst.threads.add(0)
inst.logger = DummyLogger()
task = DummyTask()
inst.queue.append(task)
@@ -1555,5 +1555,5 @@ class DummyLogger(object):
def warning(self, msg, *args):
self.logged.append(msg % args)
- def exception(self, msg):
- self.logged.append(msg)
+ def exception(self, msg, *args):
+ self.logged.append(msg % args)