diff options
Diffstat (limited to 'cpp/src/qpid/cluster/PollableQueue.h')
-rw-r--r-- | cpp/src/qpid/cluster/PollableQueue.h | 95 |
1 files changed, 35 insertions, 60 deletions
diff --git a/cpp/src/qpid/cluster/PollableQueue.h b/cpp/src/qpid/cluster/PollableQueue.h index 0bba2ba790..65f615d8b6 100644 --- a/cpp/src/qpid/cluster/PollableQueue.h +++ b/cpp/src/qpid/cluster/PollableQueue.h @@ -22,78 +22,53 @@ * */ -#include "qpid/cluster/PollableCondition.h" -#include "qpid/sys/Dispatcher.h" -#include "qpid/sys/Mutex.h" -#include <boost/function.hpp> -#include <boost/bind.hpp> -#include <deque> +#include "qpid/sys/PollableQueue.h" +#include <qpid/log/Statement.h> namespace qpid { - -namespace sys { class Poller; } - namespace cluster { -// FIXME aconway 2008-08-11: this could be of more general interest, -// move to common lib. - /** - * A queue that can be polled by sys::Poller. Any thread can push to - * the queue, on wakeup the poller thread processes all items on the - * queue by passing them to a callback in a batch. + * More convenient version of PollableQueue that handles iterating + * over the batch and error handling. */ -template <class T> -class PollableQueue { - typedef std::deque<T> Queue; - +template <class T> class PollableQueue : public sys::PollableQueue<T> { public: - typedef typename Queue::iterator iterator; - - /** Callback to process a range of items. */ - typedef boost::function<void (const iterator&, const iterator&)> Callback; - - /** When the queue is selected by the poller, values are passed to callback cb. */ - explicit PollableQueue(const Callback& cb); - - /** Push a value onto the queue. Thread safe */ - void push(const T& t) { ScopedLock l(lock); queue.push_back(t); condition.set(); } + typedef boost::function<void (const T&)> Callback; + typedef boost::function<void()> ErrorCallback; + + PollableQueue(Callback f, ErrorCallback err, const std::string& msg, + const boost::shared_ptr<sys::Poller>& poller) + : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1), + poller), + callback(f), error(err), message(msg) + {} + + typename sys::PollableQueue<T>::Batch::const_iterator + handleBatch(const typename sys::PollableQueue<T>::Batch& values) { + try { + typename sys::PollableQueue<T>::Batch::const_iterator i = values.begin(); + while (i != values.end() && !this->isStopped()) { + callback(*i); + ++i; + } + return i; + } + catch (const std::exception& e) { + QPID_LOG(error, message << ": " << e.what()); + this->stop(); + error(); + return values.end(); + } + } - /** Start polling. */ - void start(const boost::shared_ptr<sys::Poller>& poller) { handle.startWatch(poller); } - - /** Stop polling. */ - void stop() { handle.stopWatch(); } - private: - typedef sys::Mutex::ScopedLock ScopedLock; - typedef sys::Mutex::ScopedUnlock ScopedUnlock; - - void dispatch(sys::DispatchHandle&); - - sys::Mutex lock; Callback callback; - PollableCondition condition; - sys::DispatchHandle handle; - Queue queue; - Queue batch; + ErrorCallback error; + std::string message; }; -template <class T> PollableQueue<T>::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12: - : callback(cb), - handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0) -{} - -template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { - ScopedLock l(lock); // Lock for concurrent push() - batch.clear(); - batch.swap(queue); - condition.clear(); - ScopedUnlock u(lock); - callback(batch.begin(), batch.end()); // Process the batch outside the lock. - h.rewatch(); -} - + }} // namespace qpid::cluster #endif /*!QPID_CLUSTER_POLLABLEQUEUE_H*/ |