diff options
Diffstat (limited to 'cpp/src/qpid/ha/ReplicatingSubscription.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/ReplicatingSubscription.cpp | 83 |
1 files changed, 58 insertions, 25 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 91a4538bc4..9067063fcf 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -20,6 +20,7 @@ */ #include "ReplicatingSubscription.h" +#include "Primary.h" #include "qpid/broker/Queue.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/ConnectionState.h" @@ -64,14 +65,25 @@ ReplicatingSubscription::Factory::create( boost::shared_ptr<ReplicatingSubscription> rs; if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) { rs.reset(new ReplicatingSubscription( + haBroker, parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); queue->addObserver(rs); + // NOTE: readyPosition must be set _after_ addObserver, so + // messages can't be enqueued after setting readyPosition + // but before registering the observer. + rs->readyPosition = queue->getPosition(); + QPID_LOG(debug, rs->logPrefix << "created backup subscription, catching up to " + << QueuedMessage(rs->getQueue().get(), 0, rs->readyPosition) + << rs->logSuffix); + + } return rs; } ReplicatingSubscription::ReplicatingSubscription( + HaBroker& haBroker, SemanticState* parent, const string& name, Queue::shared_ptr queue, @@ -84,15 +96,15 @@ ReplicatingSubscription::ReplicatingSubscription( const framing::FieldTable& arguments ) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments), + logPrefix(haBroker, queue->getName()), events(new Queue(mask(name))), - consumer(new DelegatingConsumer(*this)) + consumer(new DelegatingConsumer(*this)), + sentReady(false) { - // Separate the remote part from a "local-remote" address. + // Separate the remote part from a "local-remote" address for logging. string address = parent->getSession().getConnection().getUrl(); size_t i = address.find('-'); if (i != string::npos) address = address.substr(i+1); - logPrefix = "HA: Primary "; - stringstream ss; logSuffix = " (" + address + ")"; // FIXME aconway 2011-12-09: Failover optimization removed. @@ -102,8 +114,6 @@ ReplicatingSubscription::ReplicatingSubscription( // can be re-introduced later. Last revision with the optimization: // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters. - QPID_LOG(debug, logPrefix << "created backup subscription " << getName() << logSuffix); - // FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0 // so we will start consuming from the lowest numbered message. // This is incorrect if the sequence number wraps around, but @@ -111,34 +121,50 @@ ReplicatingSubscription::ReplicatingSubscription( } // Message is delivered in the subscription's connection thread. -bool ReplicatingSubscription::deliver(QueuedMessage& m) { +bool ReplicatingSubscription::deliver(QueuedMessage& qm) { try { // Add position events for the subscribed queue, not for the internal event queue. - if (m.queue && m.queue == getQueue().get()) { + if (qm.queue == getQueue().get()) { + QPID_LOG(trace, logPrefix << "replicating " << qm << logSuffix); sys::Mutex::ScopedLock l(lock); - if (position != m.position) + if (position != qm.position) throw Exception( QPID_MSG("Expected position " << position - << " but got " << m.position)); - // m.position is the position of the newly enqueued m on the local queue. - // backupPosition is latest position on the backup queue (before enqueueing m.) - if (m.position <= backupPosition) + << " but got " << qm.position)); + // qm.position is the position of the newly enqueued qm on the local queue. + // backupPosition is latest position on backup queue before enqueueing qm. + if (qm.position <= backupPosition) throw Exception( QPID_MSG("Expected position > " << backupPosition - << " but got " << m.position)); + << " but got " << qm.position)); - if (m.position - backupPosition > 1) { + if (qm.position - backupPosition > 1) { // Position has advanced because of messages dequeued ahead of us. - SequenceNumber send(m.position); - --send; // Send the position before m was enqueued. + SequenceNumber send(qm.position); + --send; // Send the position before qm was enqueued. sendPositionEvent(send, l); } - backupPosition = m.position; - QPID_LOG(trace, logPrefix << "replicating " << m << logSuffix); + backupPosition = qm.position; + // Deliver the message + bool delivered = ConsumerImpl::deliver(qm); + + // We have advanced to the initial position, backup is ready. + if (!sentReady && qm.position >= readyPosition) { + sendReadyEvent(l); + sentReady = true; + QPID_LOG(info, logPrefix << "Caught up at " << qm + << logSuffix); + // If we are in a primary broker, notify that a subscription is ready. + // FIXME aconway 2012-04-30: rename addReplica->readyReplica + if (Primary::get()) + Primary::get()->addReplica(qm.queue->getName()); + } + return delivered; } - return ConsumerImpl::deliver(m); + else + return ConsumerImpl::deliver(qm); // Message is for internal event queue. } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "error replicating " << getQueue()->getName() + QPID_LOG(critical, logPrefix << "error replicating " << qm << logSuffix << ": " << e.what()); throw; } @@ -154,7 +180,7 @@ void ReplicatingSubscription::complete( const QueuedMessage& qm, const sys::Mutex::ScopedLock&) { // Handle completions for the subscribed queue, not the internal event queue. - if (qm.queue && qm.queue == getQueue().get()) { + if (qm.queue == getQueue().get()) { QPID_LOG(trace, logPrefix << "completed " << qm << logSuffix); Delayed::iterator i= delayed.find(qm.position); // The same message can be completed twice, by acknowledged and @@ -179,7 +205,6 @@ void ReplicatingSubscription::enqueued(const QueuedMessage& qm) { delayed[qm.position] = qm; } - // Function to complete a delayed message, called by cancel() void ReplicatingSubscription::cancelComplete( const Delayed::value_type& v, const sys::Mutex::ScopedLock&) @@ -195,7 +220,8 @@ void ReplicatingSubscription::cancel() boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); { sys::Mutex::ScopedLock l(lock); - QPID_LOG(debug, logPrefix << "cancel backup subscription " << getName() << logSuffix); + QPID_LOG(debug, logPrefix << "cancel backup subscription to " + << getQueue()->getName() << logSuffix); for_each(delayed.begin(), delayed.end(), boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l))); delayed.clear(); @@ -255,6 +281,13 @@ void ReplicatingSubscription::sendPositionEvent( sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l); } +// Called with lock held. Called in subscription's connection thread. +void ReplicatingSubscription::sendReadyEvent(const sys::Mutex::ScopedLock&l ) +{ + framing::Buffer buffer; + sendEvent(QueueReplicator::READY_EVENT_KEY, buffer, l); +} + void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer, const sys::Mutex::ScopedLock&) { @@ -300,7 +333,7 @@ bool ReplicatingSubscription::doDispatch() ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {} ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {} -bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) { return delegate.deliver(m); } +bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& qm) { return delegate.deliver(qm); } void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); } bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); } bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); } |
