diff options
Diffstat (limited to 'cpp/src/qpid/ha/QueueGuard.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/QueueGuard.cpp | 36 |
1 files changed, 30 insertions, 6 deletions
diff --git a/cpp/src/qpid/ha/QueueGuard.cpp b/cpp/src/qpid/ha/QueueGuard.cpp index 55dc6b0d50..5ea4c8e6f8 100644 --- a/cpp/src/qpid/ha/QueueGuard.cpp +++ b/cpp/src/qpid/ha/QueueGuard.cpp @@ -22,6 +22,7 @@ #include "ReplicatingSubscription.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueuedMessage.h" +#include "qpid/broker/QueueObserver.h" #include "qpid/log/Statement.h" #include <sstream> @@ -31,19 +32,33 @@ namespace ha { using namespace broker; using sys::Mutex; +class QueueGuard::QueueObserver : public broker::QueueObserver +{ + public: + QueueObserver(QueueGuard& g) : guard(g) {} + void enqueued(const broker::QueuedMessage& qm) { guard.enqueued(qm); } + void dequeued(const broker::QueuedMessage& qm) { guard.dequeued(qm); } + void acquired(const broker::QueuedMessage&) {} + void requeued(const broker::QueuedMessage&) {} + private: + QueueGuard& guard; +}; + + + QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) : queue(q), subscription(0) { // Set a log prefix message that identifies the remote broker. std::ostringstream os; - os << "HA subscription " << queue.getName() << "@" << info.getLogId() << ": "; + os << "HA guard " << queue.getName() << "@" << info.getLogId() << ": "; logPrefix = os.str(); + observer.reset(new QueueObserver(*this)); + queue.addObserver(observer); + readyPosition = queue.getPosition(); // Must set after addObserver() } -void QueueGuard::initialize() { - Mutex::ScopedLock l(lock); - queue.addObserver(shared_from_this()); -} +QueueGuard::~QueueGuard() { cancel(); } void QueueGuard::enqueued(const QueuedMessage& qm) { // Delay completion @@ -69,7 +84,11 @@ void QueueGuard::dequeued(const QueuedMessage& qm) { } void QueueGuard::cancel() { - queue.removeObserver(shared_from_this()); + queue.removeObserver(observer); + { + sys::Mutex::ScopedLock l(lock); + if (delayed.empty()) return; // No need if no delayed messages. + } queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1)); } @@ -94,6 +113,11 @@ void QueueGuard::complete(const QueuedMessage& qm) { complete(qm, l); } +framing::SequenceNumber QueueGuard::getReadyPosition() { + // No lock, readyPosition is immutable. + return readyPosition; +} + // FIXME aconway 2012-06-04: TODO support for timeout. }} // namespaces qpid::ha |
