diff options
Diffstat (limited to 'cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r-- | cpp/src/qpid/ha/ReplicatingSubscription.cpp | 71 |
1 files changed, 44 insertions, 27 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp index af6180305d..91a4538bc4 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -87,10 +87,13 @@ ReplicatingSubscription::ReplicatingSubscription( events(new Queue(mask(name))), consumer(new DelegatingConsumer(*this)) { + // Separate the remote part from a "local-remote" address. + string address = parent->getSession().getConnection().getUrl(); + size_t i = address.find('-'); + if (i != string::npos) address = address.substr(i+1); + logPrefix = "HA: Primary "; stringstream ss; - ss << "HA: Primary: " << getQueue()->getName() << " at " - << parent->getSession().getConnection().getUrl() << ": "; - logPrefix = ss.str(); + logSuffix = " (" + address + ")"; // FIXME aconway 2011-12-09: Failover optimization removed. // There was code here to re-use messages already on the backup @@ -99,7 +102,7 @@ ReplicatingSubscription::ReplicatingSubscription( // can be re-introduced later. Last revision with the optimization: // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters. - QPID_LOG(debug, logPrefix << "Created backup subscription " << getName()); + QPID_LOG(debug, logPrefix << "created backup subscription " << getName() << logSuffix); // FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0 // so we will start consuming from the lowest numbered message. @@ -109,23 +112,36 @@ ReplicatingSubscription::ReplicatingSubscription( // Message is delivered in the subscription's connection thread. bool ReplicatingSubscription::deliver(QueuedMessage& m) { - // Add position events for the subscribed queue, not for the internal event queue. - if (m.queue && m.queue == getQueue().get()) { - sys::Mutex::ScopedLock l(lock); - assert(position == m.position); - // m.position is the position of the newly enqueued m on the local queue. - // backupPosition is latest position on the backup queue (before enqueueing m.) - assert(m.position > backupPosition); - if (m.position - backupPosition > 1) { - // Position has advanced because of messages dequeued ahead of us. - SequenceNumber send(m.position); - --send; // Send the position before m was enqueued. - sendPositionEvent(send, l); + try { + // Add position events for the subscribed queue, not for the internal event queue. + if (m.queue && m.queue == getQueue().get()) { + sys::Mutex::ScopedLock l(lock); + if (position != m.position) + throw Exception( + QPID_MSG("Expected position " << position + << " but got " << m.position)); + // m.position is the position of the newly enqueued m on the local queue. + // backupPosition is latest position on the backup queue (before enqueueing m.) + if (m.position <= backupPosition) + throw Exception( + QPID_MSG("Expected position > " << backupPosition + << " but got " << m.position)); + + if (m.position - backupPosition > 1) { + // Position has advanced because of messages dequeued ahead of us. + SequenceNumber send(m.position); + --send; // Send the position before m was enqueued. + sendPositionEvent(send, l); + } + backupPosition = m.position; + QPID_LOG(trace, logPrefix << "replicating " << m << logSuffix); } - backupPosition = m.position; - QPID_LOG(trace, logPrefix << "Replicating message " << m.position); + return ConsumerImpl::deliver(m); + } catch (const std::exception& e) { + QPID_LOG(critical, logPrefix << "error replicating " << getQueue()->getName() + << logSuffix << ": " << e.what()); + throw; } - return ConsumerImpl::deliver(m); } ReplicatingSubscription::~ReplicatingSubscription() {} @@ -139,7 +155,7 @@ void ReplicatingSubscription::complete( { // Handle completions for the subscribed queue, not the internal event queue. if (qm.queue && qm.queue == getQueue().get()) { - QPID_LOG(trace, logPrefix << "Completed message " << qm.position); + QPID_LOG(trace, logPrefix << "completed " << qm << logSuffix); Delayed::iterator i= delayed.find(qm.position); // The same message can be completed twice, by acknowledged and // dequeued, remove it from the set so it only gets completed @@ -157,7 +173,7 @@ void ReplicatingSubscription::complete( void ReplicatingSubscription::enqueued(const QueuedMessage& qm) { sys::Mutex::ScopedLock l(lock); // Delay completion - QPID_LOG(trace, logPrefix << "Delaying completion of message " << qm.position); + QPID_LOG(trace, logPrefix << "delaying completion of " << qm << logSuffix); qm.payload->getIngressCompletion().startCompleter(); assert(delayed.find(qm.position) == delayed.end()); delayed[qm.position] = qm; @@ -168,7 +184,7 @@ void ReplicatingSubscription::enqueued(const QueuedMessage& qm) { void ReplicatingSubscription::cancelComplete( const Delayed::value_type& v, const sys::Mutex::ScopedLock&) { - QPID_LOG(trace, logPrefix << "Cancel completed message " << v.second.position); + QPID_LOG(trace, logPrefix << "cancel completed " << v.second << logSuffix); v.second.payload->getIngressCompletion().finishCompleter(); } @@ -179,7 +195,7 @@ void ReplicatingSubscription::cancel() boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); { sys::Mutex::ScopedLock l(lock); - QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName()); + QPID_LOG(debug, logPrefix << "cancel backup subscription " << getName() << logSuffix); for_each(delayed.begin(), delayed.end(), boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l))); delayed.clear(); @@ -201,7 +217,8 @@ bool ReplicatingSubscription::hideDeletedError() { return true; } // Called with lock held. Called in subscription's connection thread. void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l) { - QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues); + QPID_LOG(trace, logPrefix << "sending dequeues " << dequeues + << " from " << getQueue()->getName() << logSuffix); string buf(dequeues.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); dequeues.encode(buffer); @@ -216,7 +233,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm) { { sys::Mutex::ScopedLock l(lock); - QPID_LOG(trace, logPrefix << "Dequeued message " << qm.position); + QPID_LOG(trace, logPrefix << "dequeued " << qm << logSuffix); dequeues.add(qm.position); // If we have not yet sent this message to the backup, then // complete it now as it will never be accepted. @@ -229,8 +246,8 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm) void ReplicatingSubscription::sendPositionEvent( SequenceNumber position, const sys::Mutex::ScopedLock&l ) { - QPID_LOG(trace, logPrefix << "Sending position " << position - << ", was " << backupPosition); + QPID_LOG(trace, logPrefix << "sending position " << position + << ", was " << backupPosition << logSuffix); string buf(backupPosition.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); position.encode(buffer); |