diff options
author | Gordon Sim <gsim@apache.org> | 2009-01-12 19:57:30 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2009-01-12 19:57:30 +0000 |
commit | e223147aaeb402d5e38eeeca23045d6ddb9ec51d (patch) | |
tree | 585af4aae560440c9da4ab43717bb32d0b894fca | |
parent | b33b48b479b81a15b2ffa9dd894ff462dbe1403c (diff) | |
download | qpid-python-e223147aaeb402d5e38eeeca23045d6ddb9ec51d.tar.gz |
Allow any remaining events in PollableQueue to be processed after Poller threads return from poll loop.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@733881 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/sys/PollableQueue.h | 26 |
1 files changed, 22 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/sys/PollableQueue.h b/qpid/cpp/src/qpid/sys/PollableQueue.h index 7f11cc35a9..5dd2483256 100644 --- a/qpid/cpp/src/qpid/sys/PollableQueue.h +++ b/qpid/cpp/src/qpid/sys/PollableQueue.h @@ -26,6 +26,7 @@ #include "qpid/sys/Dispatcher.h" #include "qpid/sys/DispatchHandle.h" #include "qpid/sys/Monitor.h" +#include "qpid/sys/Thread.h" #include <boost/function.hpp> #include <boost/bind.hpp> #include <algorithm> @@ -71,12 +72,20 @@ class PollableQueue { size_t size() { ScopedLock l(lock); return queue.size(); } bool empty() { ScopedLock l(lock); return queue.empty(); } + + /** + * Allow any queued events to be processed; intended for calling + * after all dispatch threads exit the Poller loop in order to + * ensure clean shutdown with no events left on the queue. + */ + void shutdown(); private: typedef sys::Monitor::ScopedLock ScopedLock; typedef sys::Monitor::ScopedUnlock ScopedUnlock; void dispatch(sys::DispatchHandle&); + void process(); mutable sys::Monitor lock; Callback callback; @@ -119,6 +128,14 @@ template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { ScopedLock l(lock); assert(dispatcher.id() == 0); dispatcher = Thread::current(); + process(); + dispatcher = Thread(); + if (queue.empty()) condition.clear(); + if (stopped) lock.notifyAll(); + else h.rewatch(); +} + +template <class T> void PollableQueue<T>::process() { while (!stopped && !queue.empty()) { assert(batch.empty()); batch.swap(queue); @@ -131,10 +148,11 @@ template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { batch.clear(); } } - dispatcher = Thread(); - if (queue.empty()) condition.clear(); - if (stopped) lock.notifyAll(); - else h.rewatch(); +} + +template <class T> void PollableQueue<T>::shutdown() { + ScopedLock l(lock); + process(); } template <class T> void PollableQueue<T>::stop() { |