summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBert JW Regeer <bertjw@regeer.org>2019-04-02 21:51:06 -0600
committerBert JW Regeer <bertjw@regeer.org>2019-04-02 22:47:58 -0600
commitf72bd6236edd30bf81c4ecf44f14e6081e851ecc (patch)
tree7535dda25065e09013a4328d18f1888ee9d29bbc
parentc083d0b7a645fe75c6a0ab0393be844cf2caf046 (diff)
downloadwaitress-f72bd6236edd30bf81c4ecf44f14e6081e851ecc.tar.gz
Instead of iterating over dictionary, use integer
This avoids looping over the dictionary in `add_task` for the extra complexity of keeping a simple counter of which threads are active or not activate.
-rw-r--r--waitress/task.py39
1 files changed, 23 insertions, 16 deletions
diff --git a/waitress/task.py b/waitress/task.py
index a252de5..315a9c2 100644
--- a/waitress/task.py
+++ b/waitress/task.py
@@ -47,18 +47,16 @@ hop_by_hop = frozenset((
))
-ThreadIdle = 1
-ThreadBusy = 2
-
class ThreadedTaskDispatcher(object):
"""A Task Dispatcher that creates a thread for each task.
"""
- stop_count = 0 # Number of threads that will stop soon.
+ stop_count = 0 # Number of threads that will stop soon.
+ active = 0 # Number of currently active threads
logger = logger
queue_logger = queue_logger
def __init__(self):
- self.threads = {} # { thread number -> ThreadIdle or ThreadBusy }
+ self.threads = set()
self.queue = deque()
self.queue_lock = threading.Condition(threading.Lock())
@@ -68,29 +66,38 @@ class ThreadedTaskDispatcher(object):
t.start()
def handler_thread(self, thread_no):
- threads = self.threads
try:
+ # Upon starting this thread, mark ourselves as active
+ with self.queue_lock:
+ self.active += 1
+
while True:
with self.queue_lock:
- while not self.queue and threads.get(thread_no):
- threads[thread_no] = ThreadIdle
+ while not self.queue and thread_no in self.threads:
+ # Mark ourselves as not active before waiting to be
+ # woken up, then we will once again be active
+ self.active -= 1
self.queue_lock.wait()
- if not threads.get(thread_no):
+ self.active += 1
+
+ if thread_no not in self.threads:
break
+
task = self.queue.popleft()
+
if task is None:
# Special value: kill this thread.
break
- threads[thread_no] = ThreadBusy
try:
task.service()
- except Exception as e:
+ except Exception:
self.logger.exception(
- 'Exception when servicing %r' % task)
+ 'Exception when servicing %r', task)
finally:
with self.queue_lock:
+ self.active -= 1
self.stop_count -= 1
- threads.pop(thread_no, None)
+ self.threads.discard(thread_no)
def set_thread_count(self, count):
with self.queue_lock:
@@ -101,7 +108,7 @@ class ThreadedTaskDispatcher(object):
# Start threads.
while thread_no in threads:
thread_no = thread_no + 1
- threads[thread_no] = ThreadIdle
+ threads.add(thread_no)
running += 1
self.start_new_thread(self.handler_thread, (thread_no,))
thread_no = thread_no + 1
@@ -123,9 +130,9 @@ class ThreadedTaskDispatcher(object):
with self.queue_lock:
self.queue.append(task)
self.queue_lock.notify()
- if not any(x == ThreadIdle for x in self.threads.values()):
+ if self.active >= len(self.threads):
self.queue_logger.warning(
- "Task queue depth is %d" %
+ "Task queue depth is %d",
len(self.queue))
def shutdown(self, cancel_pending=True, timeout=5):