summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Merickel <michael@merickel.org>2019-04-02 20:57:27 -0500
committerMichael Merickel <michael@merickel.org>2019-04-02 21:46:49 -0500
commit34af957c1472aca30650f75698da467ca037d8f1 (patch)
tree5c9670c8803a6fd6e2a533d52ca8cf38fabc11f5
parenta4135c0b2b36be2a224a1155defa486d9871ffc4 (diff)
downloadwaitress-34af957c1472aca30650f75698da467ca037d8f1.tar.gz
adjust queue depth warnings to emit when all threads are busy
-rw-r--r--CHANGES.txt3
-rw-r--r--waitress/compat.py11
-rw-r--r--waitress/task.py61
-rw-r--r--waitress/tests/test_task.py23
4 files changed, 54 insertions, 44 deletions
diff --git a/CHANGES.txt b/CHANGES.txt
index 4c583e2..cbba247 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,9 @@ Bugfixes
causing them to reopen.
See https://github.com/Pylons/waitress/pull/239
+- Fix the queue depth warnings to only show when all threads are busy.
+ See https://github.com/Pylons/waitress/pull/243
+
1.2.1 (2019-01-25)
------------------
diff --git a/waitress/compat.py b/waitress/compat.py
index c758b89..913ee5c 100644
--- a/waitress/compat.py
+++ b/waitress/compat.py
@@ -66,17 +66,6 @@ else:
def tobytes(s):
return s
-try:
- from Queue import (
- Queue,
- Empty,
- )
-except ImportError: # pragma: no cover
- from queue import (
- Queue,
- Empty,
- )
-
if PY3: # pragma: no cover
import builtins
exec_ = getattr(builtins, "exec")
diff --git a/waitress/task.py b/waitress/task.py
index dc283ee..f0372aa 100644
--- a/waitress/task.py
+++ b/waitress/task.py
@@ -12,13 +12,14 @@
#
##############################################################################
+from collections import deque
import socket
import sys
import threading
import time
from .buffers import ReadOnlyFileBasedBuffer
-from .compat import Empty, Queue, reraise, tobytes
+from .compat import reraise, tobytes
from .utilities import (
Forwarded,
PROXY_HEADERS,
@@ -49,6 +50,9 @@ hop_by_hop = frozenset((
class JustTesting(Exception):
pass
+ThreadIdle = 1
+ThreadBusy = 2
+
class ThreadedTaskDispatcher(object):
"""A Task Dispatcher that creates a thread for each task.
"""
@@ -57,9 +61,9 @@ class ThreadedTaskDispatcher(object):
queue_logger = queue_logger
def __init__(self):
- self.threads = {} # { thread number -> 1 }
- self.queue = Queue()
- self.thread_mgmt_lock = threading.Lock()
+ self.threads = {} # { thread number -> ThreadIdle or ThreadBusy }
+ self.queue = deque()
+ self.queue_lock = threading.Condition(threading.Lock())
def start_new_thread(self, target, args):
t = threading.Thread(target=target, name='waitress', args=args)
@@ -69,11 +73,18 @@ class ThreadedTaskDispatcher(object):
def handler_thread(self, thread_no):
threads = self.threads
try:
- while threads.get(thread_no):
- task = self.queue.get()
- if task is None:
- # Special value: kill this thread.
- break
+ while True:
+ with self.queue_lock:
+ while not self.queue and threads.get(thread_no):
+ threads[thread_no] = ThreadIdle
+ self.queue_lock.wait()
+ if not threads.get(thread_no):
+ break
+ task = self.queue.popleft()
+ if task is None:
+ # Special value: kill this thread.
+ break
+ threads[thread_no] = ThreadBusy
try:
task.service()
except Exception as e:
@@ -82,12 +93,12 @@ class ThreadedTaskDispatcher(object):
if isinstance(e, JustTesting):
break
finally:
- with self.thread_mgmt_lock:
+ with self.queue_lock:
self.stop_count -= 1
threads.pop(thread_no, None)
def set_thread_count(self, count):
- with self.thread_mgmt_lock:
+ with self.queue_lock:
threads = self.threads
thread_no = 0
running = len(threads) - self.stop_count
@@ -95,7 +106,7 @@ class ThreadedTaskDispatcher(object):
# Start threads.
while thread_no in threads:
thread_no = thread_no + 1
- threads[thread_no] = 1
+ threads[thread_no] = ThreadIdle
running += 1
self.start_new_thread(self.handler_thread, (thread_no,))
thread_no = thread_no + 1
@@ -104,21 +115,23 @@ class ThreadedTaskDispatcher(object):
to_stop = running - count
self.stop_count += to_stop
for n in range(to_stop):
- self.queue.put(None)
+ self.queue.append(None)
running -= 1
+ self.queue_lock.notify(to_stop)
def add_task(self, task):
- queue_depth = self.queue.qsize()
- if queue_depth > 0:
- self.queue_logger.warning(
- "Task queue depth is %d" %
- queue_depth)
try:
task.defer()
- self.queue.put(task)
except:
task.cancel()
raise
+ with self.queue_lock:
+ self.queue.append(task)
+ self.queue_lock.notify()
+ if not any(x == ThreadIdle for x in self.threads.values()):
+ self.queue_logger.warning(
+ "Task queue depth is %d" %
+ len(self.queue))
def shutdown(self, cancel_pending=True, timeout=5):
self.set_thread_count(0)
@@ -134,14 +147,14 @@ class ThreadedTaskDispatcher(object):
time.sleep(0.1)
if cancel_pending:
# Cancel remaining tasks.
- try:
+ with self.queue_lock:
queue = self.queue
- while not queue.empty():
- task = queue.get()
+ while queue:
+ task = queue.popleft()
if task is not None:
task.cancel()
- except Empty: # pragma: no cover
- pass
+ threads.clear()
+ self.queue_lock.notify_all()
return True
return False
diff --git a/waitress/tests/test_task.py b/waitress/tests/test_task.py
index 23d92dc..ecb0b52 100644
--- a/waitress/tests/test_task.py
+++ b/waitress/tests/test_task.py
@@ -10,7 +10,7 @@ class TestThreadedTaskDispatcher(unittest.TestCase):
def test_handler_thread_task_is_None(self):
inst = self._makeOne()
inst.threads[0] = True
- inst.queue.put(None)
+ inst.queue.append(None)
inst.handler_thread(0)
self.assertEqual(inst.stop_count, -1)
self.assertEqual(inst.threads, {})
@@ -22,12 +22,17 @@ class TestThreadedTaskDispatcher(unittest.TestCase):
inst.logger = DummyLogger()
task = DummyTask(JustTesting)
inst.logger = DummyLogger()
- inst.queue.put(task)
+ inst.queue.append(task)
inst.handler_thread(0)
self.assertEqual(inst.stop_count, -1)
self.assertEqual(inst.threads, {})
self.assertEqual(len(inst.logger.logged), 1)
+ def test_handler_thread_exits_if_threadno_cleared(self):
+ inst = self._makeOne()
+ inst.handler_thread(0)
+ self.assertEqual(inst.stop_count, -1)
+
def test_set_thread_count_increase(self):
inst = self._makeOne()
L = []
@@ -47,8 +52,8 @@ class TestThreadedTaskDispatcher(unittest.TestCase):
inst = self._makeOne()
inst.threads = {'a': 1, 'b': 2}
inst.set_thread_count(1)
- self.assertEqual(inst.queue.qsize(), 1)
- self.assertEqual(inst.queue.get(), None)
+ self.assertEqual(len(inst.queue), 1)
+ self.assertEqual(inst.queue.popleft(), None)
def test_set_thread_count_same(self):
inst = self._makeOne()
@@ -62,7 +67,7 @@ class TestThreadedTaskDispatcher(unittest.TestCase):
task = DummyTask()
inst = self._makeOne()
inst.add_task(task)
- self.assertEqual(inst.queue.qsize(), 1)
+ self.assertEqual(len(inst.queue), 1)
self.assertTrue(task.deferred)
def test_log_queue_depth(self):
@@ -70,15 +75,15 @@ class TestThreadedTaskDispatcher(unittest.TestCase):
inst = self._makeOne()
inst.queue_logger = DummyLogger()
inst.add_task(task)
- self.assertEqual(len(inst.queue_logger.logged), 0)
- inst.add_task(task)
self.assertEqual(len(inst.queue_logger.logged), 1)
+ inst.add_task(task)
+ self.assertEqual(len(inst.queue_logger.logged), 2)
def test_add_task_defer_raises(self):
task = DummyTask(ValueError)
inst = self._makeOne()
self.assertRaises(ValueError, inst.add_task, task)
- self.assertEqual(inst.queue.qsize(), 0)
+ self.assertEqual(len(inst.queue), 0)
self.assertTrue(task.deferred)
self.assertTrue(task.cancelled)
@@ -87,7 +92,7 @@ class TestThreadedTaskDispatcher(unittest.TestCase):
inst.threads[0] = 1
inst.logger = DummyLogger()
task = DummyTask()
- inst.queue.put(task)
+ inst.queue.append(task)
self.assertEqual(inst.shutdown(timeout=.01), True)
self.assertEqual(inst.logger.logged, ['1 thread(s) still running'])
self.assertEqual(task.cancelled, True)