summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/PollableQueue.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/PollableQueue.h')
-rw-r--r--cpp/src/qpid/sys/PollableQueue.h56
1 files changed, 41 insertions, 15 deletions
diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h
index 2e5d3a0d3d..153ae31135 100644
--- a/cpp/src/qpid/sys/PollableQueue.h
+++ b/cpp/src/qpid/sys/PollableQueue.h
@@ -24,7 +24,7 @@
#include "qpid/sys/PollableCondition.h"
#include "qpid/sys/Dispatcher.h"
-#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Monitor.h"
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <algorithm>
@@ -54,58 +54,84 @@ class PollableQueue {
/** Callback to process a range of items. */
typedef boost::function<void (const iterator&, const iterator&)> Callback;
- /** Functor tempalate to create a Callback from a functor that handles a single item. */
+ /** @see forEach() */
template <class F> struct ForEach {
F handleOne;
ForEach(const F& f) : handleOne(f) {}
void operator()(const iterator& i, const iterator& j) const { std::for_each(i, j, handleOne); }
};
- /** Function to create ForEach instances */
+
+ /** Create a range callback from a functor that processes a single item. */
template <class F> static ForEach<F> forEach(const F& f) { return ForEach<F>(f); }
/** When the queue is selected by the poller, values are passed to callback cb. */
explicit PollableQueue(const Callback& cb);
/** Push a value onto the queue. Thread safe */
- void push(const T& t) { ScopedLock l(lock); queue.push_back(t); condition.set(); }
+ void push(const T& t);
/** Start polling. */
- void start(const boost::shared_ptr<sys::Poller>& poller) { handle.startWatch(poller); }
+ void start(const boost::shared_ptr<sys::Poller>& poller);
- /** Stop polling. */
- void stop() { handle.stopWatch(); }
+ /** Stop polling and wait for the current callback, if any, to complete. */
+ void stop();
private:
- typedef sys::Mutex::ScopedLock ScopedLock;
- typedef sys::Mutex::ScopedUnlock ScopedUnlock;
+ typedef sys::Monitor::ScopedLock ScopedLock;
+ typedef sys::Monitor::ScopedUnlock ScopedUnlock;
void dispatch(sys::DispatchHandle&);
- sys::Mutex lock;
+ sys::Monitor lock;
Callback callback;
PollableCondition condition;
sys::DispatchHandle handle;
Queue queue;
Queue batch;
+ bool dispatching, stopped;
};
template <class T> PollableQueue<T>::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12:
: callback(cb),
- handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0)
+ handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0),
+ dispatching(false), stopped(true)
{}
+template <class T> void PollableQueue<T>::start(const boost::shared_ptr<sys::Poller>& poller) {
+ ScopedLock l(lock);
+ stopped = false;
+ handle.startWatch(poller);
+}
+
+template <class T> void PollableQueue<T>::push(const T& t) {
+ ScopedLock l(lock);
+ queue.push_back(t);
+ condition.set();
+}
+
template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) {
- ScopedLock l(lock); // Lock for concurrent push()
- batch.clear();
- batch.swap(queue);
+ ScopedLock l(lock);
+ if (stopped) return;
+ dispatching = true;
condition.clear();
+ batch.clear();
+ batch.swap(queue); // Snapshot of current queue contents.
{
// Process outside the lock to allow concurrent push.
ScopedUnlock u(lock);
callback(batch.begin(), batch.end());
- h.rewatch();
}
batch.clear();
+ dispatching = false;
+ if (stopped) lock.notifyAll();
+ else h.rewatch();
+}
+
+template <class T> void PollableQueue<T>::stop() {
+ ScopedLock l(lock);
+ handle.stopWatch();
+ stopped = true;
+ while (dispatching) lock.wait();
}
}} // namespace qpid::sys