diff options
author | Anand Chitipothu <anandology@gmail.com> | 2009-05-29 19:49:45 +0530 |
---|---|---|
committer | Anand Chitipothu <anandology@gmail.com> | 2009-05-29 19:49:45 +0530 |
commit | 868db452f7f6095aa6466f6801b318ee2f5b6b0c (patch) | |
tree | 4fbc57dd76985d025c05631801733f7e7eb42ada | |
parent | 0f97c5e0ab7d9827506120efc22af3a9c21d1d70 (diff) | |
download | flup-868db452f7f6095aa6466f6801b318ee2f5b6b0c.tar.gz |
Let all the active requests to finish before quitting
-rw-r--r-- | flup/server/fcgi.py | 1 | ||||
-rw-r--r-- | flup/server/threadedserver.py | 4 | ||||
-rw-r--r-- | flup/server/threadpool.py | 77 |
3 files changed, 55 insertions, 27 deletions
diff --git a/flup/server/fcgi.py b/flup/server/fcgi.py index ab160e9..b8441c9 100644 --- a/flup/server/fcgi.py +++ b/flup/server/fcgi.py @@ -114,6 +114,7 @@ class WSGIServer(BaseFCGIServer, ThreadedServer): ret = ThreadedServer.run(self, sock) self._cleanupSocket(sock) + self.shutdown() return ret diff --git a/flup/server/threadedserver.py b/flup/server/threadedserver.py index c232347..628db78 100644 --- a/flup/server/threadedserver.py +++ b/flup/server/threadedserver.py @@ -109,6 +109,10 @@ class ThreadedServer(object): # Return bool based on whether or not SIGHUP was received. return self._hupReceived + + def shutdown(self): + """Wait for running threads to finish.""" + self._threadPool.shutdown() def _mainloopPeriodic(self): """ diff --git a/flup/server/threadpool.py b/flup/server/threadpool.py index a61885d..5d4f0be 100644 --- a/flup/server/threadpool.py +++ b/flup/server/threadpool.py @@ -28,7 +28,6 @@ __author__ = 'Allan Saddi <allan@saddi.com>' __version__ = '$Revision$' import sys -import thread import threading class ThreadPool(object): @@ -47,9 +46,29 @@ class ThreadPool(object): self._workQueue = [] self._idleCount = self._workerCount = maxSpare + self._threads = [] + self._stop = False + # Start the minimum number of worker threads. for i in range(maxSpare): - thread.start_new_thread(self._worker, ()) + self._start_new_thread() + + def _start_new_thread(self): + t = threading.Thread(target=self._worker) + self._threads.append(t) + t.start() + return t + + def shutdown(self): + """shutdown all workers.""" + self._lock.acquire() + self._stop = True + self._lock.notifyAll() + self._lock.release() + + # wait for all threads to finish + for t in self._threads: + t.join() def addJob(self, job, allowQueuing=True): """ @@ -71,7 +90,7 @@ class ThreadPool(object): self._workerCount < self._maxThreads: self._workerCount += 1 self._idleCount += 1 - thread.start_new_thread(self._worker, ()) + self._start_new_thread() # Hand off the job. if self._idleCount or allowQueuing: @@ -88,34 +107,38 @@ class ThreadPool(object): Worker thread routine. Waits for a job, executes it, repeat. """ self._lock.acquire() - while True: - while not self._workQueue: - self._lock.wait() + try: + while True: + while not self._workQueue and not self._stop: + self._lock.wait() + + if self._stop: + return - # We have a job to do... - job = self._workQueue.pop(0) + # We have a job to do... + job = self._workQueue.pop(0) - assert self._idleCount > 0 - self._idleCount -= 1 + assert self._idleCount > 0 + self._idleCount -= 1 - self._lock.release() + self._lock.release() - try: - job.run() - except: - # FIXME: This should really be reported somewhere. - # But we can't simply report it to stderr because of fcgi - pass + try: + job.run() + except: + # FIXME: This should really be reported somewhere. + # But we can't simply report it to stderr because of fcgi + pass - self._lock.acquire() + self._lock.acquire() - if self._idleCount == self._maxSpare: - break # NB: lock still held - self._idleCount += 1 - assert self._idleCount <= self._maxSpare - - # Die off... - assert self._workerCount > self._maxSpare - self._workerCount -= 1 + if self._idleCount == self._maxSpare: + break # NB: lock still held + self._idleCount += 1 + assert self._idleCount <= self._maxSpare - self._lock.release() + # Die off... + assert self._workerCount > self._maxSpare + self._workerCount -= 1 + finally: + self._lock.release() |