diff options
| author | Alan Conway <aconway@apache.org> | 2012-06-12 21:20:27 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-06-12 21:20:27 +0000 |
| commit | 9bc4901f70f8f77b78cb80c33d126b95cd709ccc (patch) | |
| tree | 228a9abcf3c949bc3bcf25cb16e42887376ac0a9 /cpp/src/qpid/ha/ReplicatingSubscription.cpp | |
| parent | 68ec17cbe42066fae4f4ebdec810a922794d2701 (diff) | |
| download | qpid-python-9bc4901f70f8f77b78cb80c33d126b95cd709ccc.tar.gz | |
QPID-3603: Guard should not complete messages from the internal event queue.
ReplicatingSubscription::acknowledge was calling Guard::complete for messages on the
internal event queue as well as the replicated queue. Corrected to only complete
messages from the replicated queue.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1349542 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/ReplicatingSubscription.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/ReplicatingSubscription.cpp | 21 |
1 files changed, 14 insertions, 7 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 7438c95bc2..ebc2365664 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -233,7 +233,10 @@ ReplicatingSubscription::ReplicatingSubscription( // Set the guard if (Primary::get()) guard = Primary::get()->getGuard(queue, getBrokerInfo()); - if (!guard) guard.reset(new QueueGuard(*queue, getBrokerInfo())); + if (!guard) { + QPID_LOG(debug, logPrefix << "No pre-set guard found, creating one."); + guard.reset(new QueueGuard(*queue, getBrokerInfo())); + } guard->attach(*this); // Guard is active, dequeued can be called concurrently. @@ -332,9 +335,12 @@ void ReplicatingSubscription::cancel() } // Consumer override, called on primary in the backup's IO thread. -void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) { - // Finish completion of message, it has been acknowledged by the backup. - guard->complete(msg); +void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) { + if (qm.queue == getQueue().get()) { // Don't complete messages on the internal queue + // Finish completion of message, it has been acknowledged by the backup. + QPID_LOG(trace, logPrefix << "Acknowledged " << qm); + guard->complete(qm); + } } // Hide the "queue deleted" error for a ReplicatingSubscription when a @@ -363,15 +369,16 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&) // arbitrary connection threads. void ReplicatingSubscription::dequeued(const QueuedMessage& qm) { + assert (qm.queue == getQueue().get()); bool doComplete = false; - QPID_LOG(trace, logPrefix << "Dequeued " << qm); { Mutex::ScopedLock l(lock); + assert(!dequeues.contains(qm.position)); dequeues.add(qm.position); + // If we have not yet sent this message to the backup, then + // complete it now as it will never be accepted. if (qm.position > position) doComplete = true; } - // If we have not yet sent this message to the backup, then - // complete it now as it will never be accepted. if (doComplete) guard->complete(qm); notify(); // Ensure a call to doDispatch } |
