diff options
author | Alan Conway <aconway@apache.org> | 2008-09-12 18:07:47 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-09-12 18:07:47 +0000 |
commit | ebb59131b198b693c5774dd51656fec520ddd770 (patch) | |
tree | 688cbbfe3aae8874156cd2ac2bf9430ef3af19df /cpp/src/qpid/sys/PollableQueue.h | |
parent | ad8e400b786c5868d4e0d2aa880625240e44e311 (diff) | |
download | qpid-python-ebb59131b198b693c5774dd51656fec520ddd770.tar.gz |
Added ClusterMap and test. Moved PollableCondition, PollableQueue to sys.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@694758 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/PollableQueue.h')
-rw-r--r-- | cpp/src/qpid/sys/PollableQueue.h | 56 |
1 files changed, 41 insertions, 15 deletions
diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h index 2e5d3a0d3d..153ae31135 100644 --- a/cpp/src/qpid/sys/PollableQueue.h +++ b/cpp/src/qpid/sys/PollableQueue.h @@ -24,7 +24,7 @@ #include "qpid/sys/PollableCondition.h" #include "qpid/sys/Dispatcher.h" -#include "qpid/sys/Mutex.h" +#include "qpid/sys/Monitor.h" #include <boost/function.hpp> #include <boost/bind.hpp> #include <algorithm> @@ -54,58 +54,84 @@ class PollableQueue { /** Callback to process a range of items. */ typedef boost::function<void (const iterator&, const iterator&)> Callback; - /** Functor tempalate to create a Callback from a functor that handles a single item. */ + /** @see forEach() */ template <class F> struct ForEach { F handleOne; ForEach(const F& f) : handleOne(f) {} void operator()(const iterator& i, const iterator& j) const { std::for_each(i, j, handleOne); } }; - /** Function to create ForEach instances */ + + /** Create a range callback from a functor that processes a single item. */ template <class F> static ForEach<F> forEach(const F& f) { return ForEach<F>(f); } /** When the queue is selected by the poller, values are passed to callback cb. */ explicit PollableQueue(const Callback& cb); /** Push a value onto the queue. Thread safe */ - void push(const T& t) { ScopedLock l(lock); queue.push_back(t); condition.set(); } + void push(const T& t); /** Start polling. */ - void start(const boost::shared_ptr<sys::Poller>& poller) { handle.startWatch(poller); } + void start(const boost::shared_ptr<sys::Poller>& poller); - /** Stop polling. */ - void stop() { handle.stopWatch(); } + /** Stop polling and wait for the current callback, if any, to complete. */ + void stop(); private: - typedef sys::Mutex::ScopedLock ScopedLock; - typedef sys::Mutex::ScopedUnlock ScopedUnlock; + typedef sys::Monitor::ScopedLock ScopedLock; + typedef sys::Monitor::ScopedUnlock ScopedUnlock; void dispatch(sys::DispatchHandle&); - sys::Mutex lock; + sys::Monitor lock; Callback callback; PollableCondition condition; sys::DispatchHandle handle; Queue queue; Queue batch; + bool dispatching, stopped; }; template <class T> PollableQueue<T>::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12: : callback(cb), - handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0) + handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0), + dispatching(false), stopped(true) {} +template <class T> void PollableQueue<T>::start(const boost::shared_ptr<sys::Poller>& poller) { + ScopedLock l(lock); + stopped = false; + handle.startWatch(poller); +} + +template <class T> void PollableQueue<T>::push(const T& t) { + ScopedLock l(lock); + queue.push_back(t); + condition.set(); +} + template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { - ScopedLock l(lock); // Lock for concurrent push() - batch.clear(); - batch.swap(queue); + ScopedLock l(lock); + if (stopped) return; + dispatching = true; condition.clear(); + batch.clear(); + batch.swap(queue); // Snapshot of current queue contents. { // Process outside the lock to allow concurrent push. ScopedUnlock u(lock); callback(batch.begin(), batch.end()); - h.rewatch(); } batch.clear(); + dispatching = false; + if (stopped) lock.notifyAll(); + else h.rewatch(); +} + +template <class T> void PollableQueue<T>::stop() { + ScopedLock l(lock); + handle.stopWatch(); + stopped = true; + while (dispatching) lock.wait(); } }} // namespace qpid::sys |