summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/PollableQueue.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/PollableQueue.h')
-rw-r--r--cpp/src/qpid/cluster/PollableQueue.h95
1 files changed, 35 insertions, 60 deletions
diff --git a/cpp/src/qpid/cluster/PollableQueue.h b/cpp/src/qpid/cluster/PollableQueue.h
index 0bba2ba790..65f615d8b6 100644
--- a/cpp/src/qpid/cluster/PollableQueue.h
+++ b/cpp/src/qpid/cluster/PollableQueue.h
@@ -22,78 +22,53 @@
*
*/
-#include "qpid/cluster/PollableCondition.h"
-#include "qpid/sys/Dispatcher.h"
-#include "qpid/sys/Mutex.h"
-#include <boost/function.hpp>
-#include <boost/bind.hpp>
-#include <deque>
+#include "qpid/sys/PollableQueue.h"
+#include <qpid/log/Statement.h>
namespace qpid {
-
-namespace sys { class Poller; }
-
namespace cluster {
-// FIXME aconway 2008-08-11: this could be of more general interest,
-// move to common lib.
-
/**
- * A queue that can be polled by sys::Poller. Any thread can push to
- * the queue, on wakeup the poller thread processes all items on the
- * queue by passing them to a callback in a batch.
+ * More convenient version of PollableQueue that handles iterating
+ * over the batch and error handling.
*/
-template <class T>
-class PollableQueue {
- typedef std::deque<T> Queue;
-
+template <class T> class PollableQueue : public sys::PollableQueue<T> {
public:
- typedef typename Queue::iterator iterator;
-
- /** Callback to process a range of items. */
- typedef boost::function<void (const iterator&, const iterator&)> Callback;
-
- /** When the queue is selected by the poller, values are passed to callback cb. */
- explicit PollableQueue(const Callback& cb);
-
- /** Push a value onto the queue. Thread safe */
- void push(const T& t) { ScopedLock l(lock); queue.push_back(t); condition.set(); }
+ typedef boost::function<void (const T&)> Callback;
+ typedef boost::function<void()> ErrorCallback;
+
+ PollableQueue(Callback f, ErrorCallback err, const std::string& msg,
+ const boost::shared_ptr<sys::Poller>& poller)
+ : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1),
+ poller),
+ callback(f), error(err), message(msg)
+ {}
+
+ typename sys::PollableQueue<T>::Batch::const_iterator
+ handleBatch(const typename sys::PollableQueue<T>::Batch& values) {
+ try {
+ typename sys::PollableQueue<T>::Batch::const_iterator i = values.begin();
+ while (i != values.end() && !this->isStopped()) {
+ callback(*i);
+ ++i;
+ }
+ return i;
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, message << ": " << e.what());
+ this->stop();
+ error();
+ return values.end();
+ }
+ }
- /** Start polling. */
- void start(const boost::shared_ptr<sys::Poller>& poller) { handle.startWatch(poller); }
-
- /** Stop polling. */
- void stop() { handle.stopWatch(); }
-
private:
- typedef sys::Mutex::ScopedLock ScopedLock;
- typedef sys::Mutex::ScopedUnlock ScopedUnlock;
-
- void dispatch(sys::DispatchHandle&);
-
- sys::Mutex lock;
Callback callback;
- PollableCondition condition;
- sys::DispatchHandle handle;
- Queue queue;
- Queue batch;
+ ErrorCallback error;
+ std::string message;
};
-template <class T> PollableQueue<T>::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12:
- : callback(cb),
- handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0)
-{}
-
-template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) {
- ScopedLock l(lock); // Lock for concurrent push()
- batch.clear();
- batch.swap(queue);
- condition.clear();
- ScopedUnlock u(lock);
- callback(batch.begin(), batch.end()); // Process the batch outside the lock.
- h.rewatch();
-}
-
+
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_POLLABLEQUEUE_H*/