diff options
| author | Alan Conway <aconway@apache.org> | 2012-06-12 21:20:07 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-06-12 21:20:07 +0000 |
| commit | ed0d321d0ad56b9ea3d370d8b6b34fce2c8ef892 (patch) | |
| tree | 1e225776914dc5b14e0e38c8e2236d5ce144ac3b /cpp/src/qpid/ha/QueueGuard.cpp | |
| parent | c9fc98ae80a5d8c4f58541f9738aa975723ff3d6 (diff) | |
| download | qpid-python-ed0d321d0ad56b9ea3d370d8b6b34fce2c8ef892.tar.gz | |
QPID-3603: Introduced RemoteBackup to track backup status.
The primary creates RemoteBackup object for each connected or expected
backup. On first being promoted, the new primary has a RemoteBackup
for each of the known backups at the time of the failure.
The RemoteBackup manages queue guards for its backup and
tracks it's readiness.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1349540 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/QueueGuard.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/QueueGuard.cpp | 36 |
1 files changed, 30 insertions, 6 deletions
diff --git a/cpp/src/qpid/ha/QueueGuard.cpp b/cpp/src/qpid/ha/QueueGuard.cpp index 55dc6b0d50..5ea4c8e6f8 100644 --- a/cpp/src/qpid/ha/QueueGuard.cpp +++ b/cpp/src/qpid/ha/QueueGuard.cpp @@ -22,6 +22,7 @@ #include "ReplicatingSubscription.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueuedMessage.h" +#include "qpid/broker/QueueObserver.h" #include "qpid/log/Statement.h" #include <sstream> @@ -31,19 +32,33 @@ namespace ha { using namespace broker; using sys::Mutex; +class QueueGuard::QueueObserver : public broker::QueueObserver +{ + public: + QueueObserver(QueueGuard& g) : guard(g) {} + void enqueued(const broker::QueuedMessage& qm) { guard.enqueued(qm); } + void dequeued(const broker::QueuedMessage& qm) { guard.dequeued(qm); } + void acquired(const broker::QueuedMessage&) {} + void requeued(const broker::QueuedMessage&) {} + private: + QueueGuard& guard; +}; + + + QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) : queue(q), subscription(0) { // Set a log prefix message that identifies the remote broker. std::ostringstream os; - os << "HA subscription " << queue.getName() << "@" << info.getLogId() << ": "; + os << "HA guard " << queue.getName() << "@" << info.getLogId() << ": "; logPrefix = os.str(); + observer.reset(new QueueObserver(*this)); + queue.addObserver(observer); + readyPosition = queue.getPosition(); // Must set after addObserver() } -void QueueGuard::initialize() { - Mutex::ScopedLock l(lock); - queue.addObserver(shared_from_this()); -} +QueueGuard::~QueueGuard() { cancel(); } void QueueGuard::enqueued(const QueuedMessage& qm) { // Delay completion @@ -69,7 +84,11 @@ void QueueGuard::dequeued(const QueuedMessage& qm) { } void QueueGuard::cancel() { - queue.removeObserver(shared_from_this()); + queue.removeObserver(observer); + { + sys::Mutex::ScopedLock l(lock); + if (delayed.empty()) return; // No need if no delayed messages. + } queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1)); } @@ -94,6 +113,11 @@ void QueueGuard::complete(const QueuedMessage& qm) { complete(qm, l); } +framing::SequenceNumber QueueGuard::getReadyPosition() { + // No lock, readyPosition is immutable. + return readyPosition; +} + // FIXME aconway 2012-06-04: TODO support for timeout. }} // namespaces qpid::ha |
