summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/QueueGuard.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/QueueGuard.cpp')
-rw-r--r--cpp/src/qpid/ha/QueueGuard.cpp36
1 files changed, 30 insertions, 6 deletions
diff --git a/cpp/src/qpid/ha/QueueGuard.cpp b/cpp/src/qpid/ha/QueueGuard.cpp
index 55dc6b0d50..5ea4c8e6f8 100644
--- a/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/cpp/src/qpid/ha/QueueGuard.cpp
@@ -22,6 +22,7 @@
#include "ReplicatingSubscription.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/QueueObserver.h"
#include "qpid/log/Statement.h"
#include <sstream>
@@ -31,19 +32,33 @@ namespace ha {
using namespace broker;
using sys::Mutex;
+class QueueGuard::QueueObserver : public broker::QueueObserver
+{
+ public:
+ QueueObserver(QueueGuard& g) : guard(g) {}
+ void enqueued(const broker::QueuedMessage& qm) { guard.enqueued(qm); }
+ void dequeued(const broker::QueuedMessage& qm) { guard.dequeued(qm); }
+ void acquired(const broker::QueuedMessage&) {}
+ void requeued(const broker::QueuedMessage&) {}
+ private:
+ QueueGuard& guard;
+};
+
+
+
QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
: queue(q), subscription(0)
{
// Set a log prefix message that identifies the remote broker.
std::ostringstream os;
- os << "HA subscription " << queue.getName() << "@" << info.getLogId() << ": ";
+ os << "HA guard " << queue.getName() << "@" << info.getLogId() << ": ";
logPrefix = os.str();
+ observer.reset(new QueueObserver(*this));
+ queue.addObserver(observer);
+ readyPosition = queue.getPosition(); // Must set after addObserver()
}
-void QueueGuard::initialize() {
- Mutex::ScopedLock l(lock);
- queue.addObserver(shared_from_this());
-}
+QueueGuard::~QueueGuard() { cancel(); }
void QueueGuard::enqueued(const QueuedMessage& qm) {
// Delay completion
@@ -69,7 +84,11 @@ void QueueGuard::dequeued(const QueuedMessage& qm) {
}
void QueueGuard::cancel() {
- queue.removeObserver(shared_from_this());
+ queue.removeObserver(observer);
+ {
+ sys::Mutex::ScopedLock l(lock);
+ if (delayed.empty()) return; // No need if no delayed messages.
+ }
queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1));
}
@@ -94,6 +113,11 @@ void QueueGuard::complete(const QueuedMessage& qm) {
complete(qm, l);
}
+framing::SequenceNumber QueueGuard::getReadyPosition() {
+ // No lock, readyPosition is immutable.
+ return readyPosition;
+}
+
// FIXME aconway 2012-06-04: TODO support for timeout.
}} // namespaces qpid::ha