summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/cluster/PollableQueue.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/PollableQueue.h')
-rw-r--r--qpid/cpp/src/qpid/cluster/PollableQueue.h20
1 files changed, 19 insertions, 1 deletions
diff --git a/qpid/cpp/src/qpid/cluster/PollableQueue.h b/qpid/cpp/src/qpid/cluster/PollableQueue.h
index 2aed6de5b9..59d0bcd36a 100644
--- a/qpid/cpp/src/qpid/cluster/PollableQueue.h
+++ b/qpid/cpp/src/qpid/cluster/PollableQueue.h
@@ -31,6 +31,13 @@ namespace cluster {
/**
* More convenient version of PollableQueue that handles iterating
* over the batch and error handling.
+ *
+ * Constructed in "bypass" mode where items are processed directly
+ * rather than put on the queue. This is important for the
+ * PRE_INIT stage when Cluster is pumping CPG dispatch directly
+ * before the poller has started.
+ *
+ * Calling start() starts the pollable queue and disabled bypass mode.
*/
template <class T> class PollableQueue : public sys::PollableQueue<T> {
public:
@@ -41,7 +48,7 @@ template <class T> class PollableQueue : public sys::PollableQueue<T> {
const boost::shared_ptr<sys::Poller>& poller)
: sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1),
poller),
- callback(f), error(err), message(msg)
+ callback(f), error(err), message(msg), bypass(true)
{}
typename sys::PollableQueue<T>::Batch::const_iterator
@@ -62,10 +69,21 @@ template <class T> class PollableQueue : public sys::PollableQueue<T> {
}
}
+ void push(const T& t) {
+ if (bypass) callback(t);
+ else sys::PollableQueue<T>::push(t);
+ }
+
+ void start() {
+ bypass = false;
+ sys::PollableQueue<T>::start();
+ }
+
private:
Callback callback;
ErrorCallback error;
std::string message;
+ bool bypass;
};