summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnand Chitipothu <anandology@gmail.com>2009-05-29 19:49:45 +0530
committerAnand Chitipothu <anandology@gmail.com>2009-05-29 19:49:45 +0530
commit868db452f7f6095aa6466f6801b318ee2f5b6b0c (patch)
tree4fbc57dd76985d025c05631801733f7e7eb42ada
parent0f97c5e0ab7d9827506120efc22af3a9c21d1d70 (diff)
downloadflup-868db452f7f6095aa6466f6801b318ee2f5b6b0c.tar.gz
Let all the active requests to finish before quitting
-rw-r--r--flup/server/fcgi.py1
-rw-r--r--flup/server/threadedserver.py4
-rw-r--r--flup/server/threadpool.py77
3 files changed, 55 insertions, 27 deletions
diff --git a/flup/server/fcgi.py b/flup/server/fcgi.py
index ab160e9..b8441c9 100644
--- a/flup/server/fcgi.py
+++ b/flup/server/fcgi.py
@@ -114,6 +114,7 @@ class WSGIServer(BaseFCGIServer, ThreadedServer):
ret = ThreadedServer.run(self, sock)
self._cleanupSocket(sock)
+ self.shutdown()
return ret
diff --git a/flup/server/threadedserver.py b/flup/server/threadedserver.py
index c232347..628db78 100644
--- a/flup/server/threadedserver.py
+++ b/flup/server/threadedserver.py
@@ -109,6 +109,10 @@ class ThreadedServer(object):
# Return bool based on whether or not SIGHUP was received.
return self._hupReceived
+
+ def shutdown(self):
+ """Wait for running threads to finish."""
+ self._threadPool.shutdown()
def _mainloopPeriodic(self):
"""
diff --git a/flup/server/threadpool.py b/flup/server/threadpool.py
index a61885d..5d4f0be 100644
--- a/flup/server/threadpool.py
+++ b/flup/server/threadpool.py
@@ -28,7 +28,6 @@ __author__ = 'Allan Saddi <allan@saddi.com>'
__version__ = '$Revision$'
import sys
-import thread
import threading
class ThreadPool(object):
@@ -47,9 +46,29 @@ class ThreadPool(object):
self._workQueue = []
self._idleCount = self._workerCount = maxSpare
+ self._threads = []
+ self._stop = False
+
# Start the minimum number of worker threads.
for i in range(maxSpare):
- thread.start_new_thread(self._worker, ())
+ self._start_new_thread()
+
+ def _start_new_thread(self):
+ t = threading.Thread(target=self._worker)
+ self._threads.append(t)
+ t.start()
+ return t
+
+ def shutdown(self):
+ """shutdown all workers."""
+ self._lock.acquire()
+ self._stop = True
+ self._lock.notifyAll()
+ self._lock.release()
+
+ # wait for all threads to finish
+ for t in self._threads:
+ t.join()
def addJob(self, job, allowQueuing=True):
"""
@@ -71,7 +90,7 @@ class ThreadPool(object):
self._workerCount < self._maxThreads:
self._workerCount += 1
self._idleCount += 1
- thread.start_new_thread(self._worker, ())
+ self._start_new_thread()
# Hand off the job.
if self._idleCount or allowQueuing:
@@ -88,34 +107,38 @@ class ThreadPool(object):
Worker thread routine. Waits for a job, executes it, repeat.
"""
self._lock.acquire()
- while True:
- while not self._workQueue:
- self._lock.wait()
+ try:
+ while True:
+ while not self._workQueue and not self._stop:
+ self._lock.wait()
+
+ if self._stop:
+ return
- # We have a job to do...
- job = self._workQueue.pop(0)
+ # We have a job to do...
+ job = self._workQueue.pop(0)
- assert self._idleCount > 0
- self._idleCount -= 1
+ assert self._idleCount > 0
+ self._idleCount -= 1
- self._lock.release()
+ self._lock.release()
- try:
- job.run()
- except:
- # FIXME: This should really be reported somewhere.
- # But we can't simply report it to stderr because of fcgi
- pass
+ try:
+ job.run()
+ except:
+ # FIXME: This should really be reported somewhere.
+ # But we can't simply report it to stderr because of fcgi
+ pass
- self._lock.acquire()
+ self._lock.acquire()
- if self._idleCount == self._maxSpare:
- break # NB: lock still held
- self._idleCount += 1
- assert self._idleCount <= self._maxSpare
-
- # Die off...
- assert self._workerCount > self._maxSpare
- self._workerCount -= 1
+ if self._idleCount == self._maxSpare:
+ break # NB: lock still held
+ self._idleCount += 1
+ assert self._idleCount <= self._maxSpare
- self._lock.release()
+ # Die off...
+ assert self._workerCount > self._maxSpare
+ self._workerCount -= 1
+ finally:
+ self._lock.release()