summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBert JW Regeer <xistence@0x58.com>2019-04-02 23:53:29 -0600
committerGitHub <noreply@github.com>2019-04-02 23:53:29 -0600
commitd060d2424292a5bf566e091baad9974a0835896b (patch)
tree74dd74575d9dfec95167ec84a528d8a9ddc3118a
parenta4135c0b2b36be2a224a1155defa486d9871ffc4 (diff)
parent055b226f7cc93e673a966c8b826e1b5eaee05756 (diff)
downloadwaitress-d060d2424292a5bf566e091baad9974a0835896b.tar.gz
Merge pull request #243 from Pylons/queue-depth-warnings
adjust queue depth warnings to emit when all threads are busy
-rw-r--r--CHANGES.txt3
-rw-r--r--waitress/channel.py1
-rw-r--r--waitress/compat.py11
-rw-r--r--waitress/task.py83
-rw-r--r--waitress/tests/test_task.py64
5 files changed, 87 insertions, 75 deletions
diff --git a/CHANGES.txt b/CHANGES.txt
index 4c583e2..cbba247 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,9 @@ Bugfixes
causing them to reopen.
See https://github.com/Pylons/waitress/pull/239
+- Fix the queue depth warnings to only show when all threads are busy.
+ See https://github.com/Pylons/waitress/pull/243
+
1.2.1 (2019-01-25)
------------------
diff --git a/waitress/channel.py b/waitress/channel.py
index fd5a3f5..199ce2a 100644
--- a/waitress/channel.py
+++ b/waitress/channel.py
@@ -270,7 +270,6 @@ class HTTPChannel(wasyncore.dispatcher, object):
return False
def handle_close(self):
- # NB: default to True for when asyncore calls this function directly
with self.outbuf_lock:
for outbuf in self.outbufs:
try:
diff --git a/waitress/compat.py b/waitress/compat.py
index c758b89..913ee5c 100644
--- a/waitress/compat.py
+++ b/waitress/compat.py
@@ -66,17 +66,6 @@ else:
def tobytes(s):
return s
-try:
- from Queue import (
- Queue,
- Empty,
- )
-except ImportError: # pragma: no cover
- from queue import (
- Queue,
- Empty,
- )
-
if PY3: # pragma: no cover
import builtins
exec_ = getattr(builtins, "exec")
diff --git a/waitress/task.py b/waitress/task.py
index dc283ee..315a9c2 100644
--- a/waitress/task.py
+++ b/waitress/task.py
@@ -12,13 +12,14 @@
#
##############################################################################
+from collections import deque
import socket
import sys
import threading
import time
from .buffers import ReadOnlyFileBasedBuffer
-from .compat import Empty, Queue, reraise, tobytes
+from .compat import reraise, tobytes
from .utilities import (
Forwarded,
PROXY_HEADERS,
@@ -46,20 +47,18 @@ hop_by_hop = frozenset((
))
-class JustTesting(Exception):
- pass
-
class ThreadedTaskDispatcher(object):
"""A Task Dispatcher that creates a thread for each task.
"""
- stop_count = 0 # Number of threads that will stop soon.
+ stop_count = 0 # Number of threads that will stop soon.
+ active = 0 # Number of currently active threads
logger = logger
queue_logger = queue_logger
def __init__(self):
- self.threads = {} # { thread number -> 1 }
- self.queue = Queue()
- self.thread_mgmt_lock = threading.Lock()
+ self.threads = set()
+ self.queue = deque()
+ self.queue_lock = threading.Condition(threading.Lock())
def start_new_thread(self, target, args):
t = threading.Thread(target=target, name='waitress', args=args)
@@ -67,27 +66,41 @@ class ThreadedTaskDispatcher(object):
t.start()
def handler_thread(self, thread_no):
- threads = self.threads
try:
- while threads.get(thread_no):
- task = self.queue.get()
- if task is None:
- # Special value: kill this thread.
- break
+ # Upon starting this thread, mark ourselves as active
+ with self.queue_lock:
+ self.active += 1
+
+ while True:
+ with self.queue_lock:
+ while not self.queue and thread_no in self.threads:
+ # Mark ourselves as not active before waiting to be
+ # woken up, then we will once again be active
+ self.active -= 1
+ self.queue_lock.wait()
+ self.active += 1
+
+ if thread_no not in self.threads:
+ break
+
+ task = self.queue.popleft()
+
+ if task is None:
+ # Special value: kill this thread.
+ break
try:
task.service()
- except Exception as e:
+ except Exception:
self.logger.exception(
- 'Exception when servicing %r' % task)
- if isinstance(e, JustTesting):
- break
+ 'Exception when servicing %r', task)
finally:
- with self.thread_mgmt_lock:
+ with self.queue_lock:
+ self.active -= 1
self.stop_count -= 1
- threads.pop(thread_no, None)
+ self.threads.discard(thread_no)
def set_thread_count(self, count):
- with self.thread_mgmt_lock:
+ with self.queue_lock:
threads = self.threads
thread_no = 0
running = len(threads) - self.stop_count
@@ -95,7 +108,7 @@ class ThreadedTaskDispatcher(object):
# Start threads.
while thread_no in threads:
thread_no = thread_no + 1
- threads[thread_no] = 1
+ threads.add(thread_no)
running += 1
self.start_new_thread(self.handler_thread, (thread_no,))
thread_no = thread_no + 1
@@ -104,21 +117,23 @@ class ThreadedTaskDispatcher(object):
to_stop = running - count
self.stop_count += to_stop
for n in range(to_stop):
- self.queue.put(None)
+ self.queue.append(None)
running -= 1
+ self.queue_lock.notify(to_stop)
def add_task(self, task):
- queue_depth = self.queue.qsize()
- if queue_depth > 0:
- self.queue_logger.warning(
- "Task queue depth is %d" %
- queue_depth)
try:
task.defer()
- self.queue.put(task)
except:
task.cancel()
raise
+ with self.queue_lock:
+ self.queue.append(task)
+ self.queue_lock.notify()
+ if self.active >= len(self.threads):
+ self.queue_logger.warning(
+ "Task queue depth is %d",
+ len(self.queue))
def shutdown(self, cancel_pending=True, timeout=5):
self.set_thread_count(0)
@@ -134,14 +149,14 @@ class ThreadedTaskDispatcher(object):
time.sleep(0.1)
if cancel_pending:
# Cancel remaining tasks.
- try:
+ with self.queue_lock:
queue = self.queue
- while not queue.empty():
- task = queue.get()
+ while queue:
+ task = queue.popleft()
if task is not None:
task.cancel()
- except Empty: # pragma: no cover
- pass
+ threads.clear()
+ self.queue_lock.notify_all()
return True
return False
diff --git a/waitress/tests/test_task.py b/waitress/tests/test_task.py
index 23d92dc..f3f14d0 100644
--- a/waitress/tests/test_task.py
+++ b/waitress/tests/test_task.py
@@ -9,25 +9,34 @@ class TestThreadedTaskDispatcher(unittest.TestCase):
def test_handler_thread_task_is_None(self):
inst = self._makeOne()
- inst.threads[0] = True
- inst.queue.put(None)
+ inst.threads.add(0)
+ inst.queue.append(None)
inst.handler_thread(0)
self.assertEqual(inst.stop_count, -1)
- self.assertEqual(inst.threads, {})
+ self.assertEqual(inst.threads, set())
def test_handler_thread_task_raises(self):
- from waitress.task import JustTesting
inst = self._makeOne()
- inst.threads[0] = True
+ inst.threads.add(0)
inst.logger = DummyLogger()
- task = DummyTask(JustTesting)
+ class BadDummyTask(DummyTask):
+ def service(self):
+ super(BadDummyTask, self).service()
+ inst.threads.clear()
+ raise Exception
+ task = BadDummyTask()
inst.logger = DummyLogger()
- inst.queue.put(task)
+ inst.queue.append(task)
inst.handler_thread(0)
self.assertEqual(inst.stop_count, -1)
- self.assertEqual(inst.threads, {})
+ self.assertEqual(inst.threads, set())
self.assertEqual(len(inst.logger.logged), 1)
+ def test_handler_thread_exits_if_threadno_cleared(self):
+ inst = self._makeOne()
+ inst.handler_thread(0)
+ self.assertEqual(inst.stop_count, -1)
+
def test_set_thread_count_increase(self):
inst = self._makeOne()
L = []
@@ -38,23 +47,23 @@ class TestThreadedTaskDispatcher(unittest.TestCase):
def test_set_thread_count_increase_with_existing(self):
inst = self._makeOne()
L = []
- inst.threads = {0: 1}
+ inst.threads = {0}
inst.start_new_thread = lambda *x: L.append(x)
inst.set_thread_count(2)
self.assertEqual(L, [(inst.handler_thread, (1,))])
def test_set_thread_count_decrease(self):
inst = self._makeOne()
- inst.threads = {'a': 1, 'b': 2}
+ inst.threads = {0, 1}
inst.set_thread_count(1)
- self.assertEqual(inst.queue.qsize(), 1)
- self.assertEqual(inst.queue.get(), None)
+ self.assertEqual(len(inst.queue), 1)
+ self.assertEqual(inst.queue.popleft(), None)
def test_set_thread_count_same(self):
inst = self._makeOne()
L = []
inst.start_new_thread = lambda *x: L.append(x)
- inst.threads = {0: 1}
+ inst.threads = {0}
inst.set_thread_count(1)
self.assertEqual(L, [])
@@ -62,7 +71,7 @@ class TestThreadedTaskDispatcher(unittest.TestCase):
task = DummyTask()
inst = self._makeOne()
inst.add_task(task)
- self.assertEqual(inst.queue.qsize(), 1)
+ self.assertEqual(len(inst.queue), 1)
self.assertTrue(task.deferred)
def test_log_queue_depth(self):
@@ -70,24 +79,28 @@ class TestThreadedTaskDispatcher(unittest.TestCase):
inst = self._makeOne()
inst.queue_logger = DummyLogger()
inst.add_task(task)
- self.assertEqual(len(inst.queue_logger.logged), 0)
- inst.add_task(task)
self.assertEqual(len(inst.queue_logger.logged), 1)
+ inst.add_task(task)
+ self.assertEqual(len(inst.queue_logger.logged), 2)
def test_add_task_defer_raises(self):
- task = DummyTask(ValueError)
+ class BadDummyTask(DummyTask):
+ def defer(self):
+ super(BadDummyTask, self).defer()
+ raise ValueError
+ task = BadDummyTask()
inst = self._makeOne()
self.assertRaises(ValueError, inst.add_task, task)
- self.assertEqual(inst.queue.qsize(), 0)
+ self.assertEqual(len(inst.queue), 0)
self.assertTrue(task.deferred)
self.assertTrue(task.cancelled)
def test_shutdown_one_thread(self):
inst = self._makeOne()
- inst.threads[0] = 1
+ inst.threads.add(0)
inst.logger = DummyLogger()
task = DummyTask()
- inst.queue.put(task)
+ inst.queue.append(task)
self.assertEqual(inst.shutdown(timeout=.01), True)
self.assertEqual(inst.logger.logged, ['1 thread(s) still running'])
self.assertEqual(task.cancelled, True)
@@ -1468,18 +1481,11 @@ class DummyTask(object):
deferred = False
cancelled = False
- def __init__(self, toraise=None):
- self.toraise = toraise
-
def service(self):
self.serviced = True
- if self.toraise:
- raise self.toraise
def defer(self):
self.deferred = True
- if self.toraise:
- raise self.toraise
def cancel(self):
self.cancelled = True
@@ -1549,5 +1555,5 @@ class DummyLogger(object):
def warning(self, msg, *args):
self.logged.append(msg % args)
- def exception(self, msg):
- self.logged.append(msg)
+ def exception(self, msg, *args):
+ self.logged.append(msg % args)