summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/ReplicatingSubscription.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.cpp83
1 files changed, 58 insertions, 25 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 91a4538bc4..9067063fcf 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -20,6 +20,7 @@
*/
#include "ReplicatingSubscription.h"
+#include "Primary.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/ConnectionState.h"
@@ -64,14 +65,25 @@ ReplicatingSubscription::Factory::create(
boost::shared_ptr<ReplicatingSubscription> rs;
if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) {
rs.reset(new ReplicatingSubscription(
+ haBroker,
parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments));
queue->addObserver(rs);
+ // NOTE: readyPosition must be set _after_ addObserver, so
+ // messages can't be enqueued after setting readyPosition
+ // but before registering the observer.
+ rs->readyPosition = queue->getPosition();
+ QPID_LOG(debug, rs->logPrefix << "created backup subscription, catching up to "
+ << QueuedMessage(rs->getQueue().get(), 0, rs->readyPosition)
+ << rs->logSuffix);
+
+
}
return rs;
}
ReplicatingSubscription::ReplicatingSubscription(
+ HaBroker& haBroker,
SemanticState* parent,
const string& name,
Queue::shared_ptr queue,
@@ -84,15 +96,15 @@ ReplicatingSubscription::ReplicatingSubscription(
const framing::FieldTable& arguments
) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments),
+ logPrefix(haBroker, queue->getName()),
events(new Queue(mask(name))),
- consumer(new DelegatingConsumer(*this))
+ consumer(new DelegatingConsumer(*this)),
+ sentReady(false)
{
- // Separate the remote part from a "local-remote" address.
+ // Separate the remote part from a "local-remote" address for logging.
string address = parent->getSession().getConnection().getUrl();
size_t i = address.find('-');
if (i != string::npos) address = address.substr(i+1);
- logPrefix = "HA: Primary ";
- stringstream ss;
logSuffix = " (" + address + ")";
// FIXME aconway 2011-12-09: Failover optimization removed.
@@ -102,8 +114,6 @@ ReplicatingSubscription::ReplicatingSubscription(
// can be re-introduced later. Last revision with the optimization:
// r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
- QPID_LOG(debug, logPrefix << "created backup subscription " << getName() << logSuffix);
-
// FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0
// so we will start consuming from the lowest numbered message.
// This is incorrect if the sequence number wraps around, but
@@ -111,34 +121,50 @@ ReplicatingSubscription::ReplicatingSubscription(
}
// Message is delivered in the subscription's connection thread.
-bool ReplicatingSubscription::deliver(QueuedMessage& m) {
+bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
try {
// Add position events for the subscribed queue, not for the internal event queue.
- if (m.queue && m.queue == getQueue().get()) {
+ if (qm.queue == getQueue().get()) {
+ QPID_LOG(trace, logPrefix << "replicating " << qm << logSuffix);
sys::Mutex::ScopedLock l(lock);
- if (position != m.position)
+ if (position != qm.position)
throw Exception(
QPID_MSG("Expected position " << position
- << " but got " << m.position));
- // m.position is the position of the newly enqueued m on the local queue.
- // backupPosition is latest position on the backup queue (before enqueueing m.)
- if (m.position <= backupPosition)
+ << " but got " << qm.position));
+ // qm.position is the position of the newly enqueued qm on the local queue.
+ // backupPosition is latest position on backup queue before enqueueing qm.
+ if (qm.position <= backupPosition)
throw Exception(
QPID_MSG("Expected position > " << backupPosition
- << " but got " << m.position));
+ << " but got " << qm.position));
- if (m.position - backupPosition > 1) {
+ if (qm.position - backupPosition > 1) {
// Position has advanced because of messages dequeued ahead of us.
- SequenceNumber send(m.position);
- --send; // Send the position before m was enqueued.
+ SequenceNumber send(qm.position);
+ --send; // Send the position before qm was enqueued.
sendPositionEvent(send, l);
}
- backupPosition = m.position;
- QPID_LOG(trace, logPrefix << "replicating " << m << logSuffix);
+ backupPosition = qm.position;
+ // Deliver the message
+ bool delivered = ConsumerImpl::deliver(qm);
+
+ // We have advanced to the initial position, backup is ready.
+ if (!sentReady && qm.position >= readyPosition) {
+ sendReadyEvent(l);
+ sentReady = true;
+ QPID_LOG(info, logPrefix << "Caught up at " << qm
+ << logSuffix);
+ // If we are in a primary broker, notify that a subscription is ready.
+ // FIXME aconway 2012-04-30: rename addReplica->readyReplica
+ if (Primary::get())
+ Primary::get()->addReplica(qm.queue->getName());
+ }
+ return delivered;
}
- return ConsumerImpl::deliver(m);
+ else
+ return ConsumerImpl::deliver(qm); // Message is for internal event queue.
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "error replicating " << getQueue()->getName()
+ QPID_LOG(critical, logPrefix << "error replicating " << qm
<< logSuffix << ": " << e.what());
throw;
}
@@ -154,7 +180,7 @@ void ReplicatingSubscription::complete(
const QueuedMessage& qm, const sys::Mutex::ScopedLock&)
{
// Handle completions for the subscribed queue, not the internal event queue.
- if (qm.queue && qm.queue == getQueue().get()) {
+ if (qm.queue == getQueue().get()) {
QPID_LOG(trace, logPrefix << "completed " << qm << logSuffix);
Delayed::iterator i= delayed.find(qm.position);
// The same message can be completed twice, by acknowledged and
@@ -179,7 +205,6 @@ void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
delayed[qm.position] = qm;
}
-
// Function to complete a delayed message, called by cancel()
void ReplicatingSubscription::cancelComplete(
const Delayed::value_type& v, const sys::Mutex::ScopedLock&)
@@ -195,7 +220,8 @@ void ReplicatingSubscription::cancel()
boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
{
sys::Mutex::ScopedLock l(lock);
- QPID_LOG(debug, logPrefix << "cancel backup subscription " << getName() << logSuffix);
+ QPID_LOG(debug, logPrefix << "cancel backup subscription to "
+ << getQueue()->getName() << logSuffix);
for_each(delayed.begin(), delayed.end(),
boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l)));
delayed.clear();
@@ -255,6 +281,13 @@ void ReplicatingSubscription::sendPositionEvent(
sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l);
}
+// Called with lock held. Called in subscription's connection thread.
+void ReplicatingSubscription::sendReadyEvent(const sys::Mutex::ScopedLock&l )
+{
+ framing::Buffer buffer;
+ sendEvent(QueueReplicator::READY_EVENT_KEY, buffer, l);
+}
+
void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer,
const sys::Mutex::ScopedLock&)
{
@@ -300,7 +333,7 @@ bool ReplicatingSubscription::doDispatch()
ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {}
ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {}
-bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) { return delegate.deliver(m); }
+bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& qm) { return delegate.deliver(qm); }
void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }