diff options
author | Gordon Sim <gsim@apache.org> | 2009-01-12 19:57:30 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2009-01-12 19:57:30 +0000 |
commit | 081b523538d7731c4c0128f90a6393720a7d4ecc (patch) | |
tree | e714059b75941dde709417a530984a1e52e79446 /cpp | |
parent | feb2c38b24deb8fcacdb11cdd4920a61d0c88ee0 (diff) | |
download | qpid-python-081b523538d7731c4c0128f90a6393720a7d4ecc.tar.gz |
Allow any remaining events in PollableQueue to be processed after Poller threads return from poll loop.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@733881 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/sys/PollableQueue.h | 26 |
1 files changed, 22 insertions, 4 deletions
diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h index 7f11cc35a9..5dd2483256 100644 --- a/cpp/src/qpid/sys/PollableQueue.h +++ b/cpp/src/qpid/sys/PollableQueue.h @@ -26,6 +26,7 @@ #include "qpid/sys/Dispatcher.h" #include "qpid/sys/DispatchHandle.h" #include "qpid/sys/Monitor.h" +#include "qpid/sys/Thread.h" #include <boost/function.hpp> #include <boost/bind.hpp> #include <algorithm> @@ -71,12 +72,20 @@ class PollableQueue { size_t size() { ScopedLock l(lock); return queue.size(); } bool empty() { ScopedLock l(lock); return queue.empty(); } + + /** + * Allow any queued events to be processed; intended for calling + * after all dispatch threads exit the Poller loop in order to + * ensure clean shutdown with no events left on the queue. + */ + void shutdown(); private: typedef sys::Monitor::ScopedLock ScopedLock; typedef sys::Monitor::ScopedUnlock ScopedUnlock; void dispatch(sys::DispatchHandle&); + void process(); mutable sys::Monitor lock; Callback callback; @@ -119,6 +128,14 @@ template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { ScopedLock l(lock); assert(dispatcher.id() == 0); dispatcher = Thread::current(); + process(); + dispatcher = Thread(); + if (queue.empty()) condition.clear(); + if (stopped) lock.notifyAll(); + else h.rewatch(); +} + +template <class T> void PollableQueue<T>::process() { while (!stopped && !queue.empty()) { assert(batch.empty()); batch.swap(queue); @@ -131,10 +148,11 @@ template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { batch.clear(); } } - dispatcher = Thread(); - if (queue.empty()) condition.clear(); - if (stopped) lock.notifyAll(); - else h.rewatch(); +} + +template <class T> void PollableQueue<T>::shutdown() { + ScopedLock l(lock); + process(); } template <class T> void PollableQueue<T>::stop() { |