summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/ReplicatingSubscription.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.cpp21
1 files changed, 14 insertions, 7 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 7438c95bc2..ebc2365664 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -233,7 +233,10 @@ ReplicatingSubscription::ReplicatingSubscription(
// Set the guard
if (Primary::get()) guard = Primary::get()->getGuard(queue, getBrokerInfo());
- if (!guard) guard.reset(new QueueGuard(*queue, getBrokerInfo()));
+ if (!guard) {
+ QPID_LOG(debug, logPrefix << "No pre-set guard found, creating one.");
+ guard.reset(new QueueGuard(*queue, getBrokerInfo()));
+ }
guard->attach(*this);
// Guard is active, dequeued can be called concurrently.
@@ -332,9 +335,12 @@ void ReplicatingSubscription::cancel()
}
// Consumer override, called on primary in the backup's IO thread.
-void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) {
- // Finish completion of message, it has been acknowledged by the backup.
- guard->complete(msg);
+void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) {
+ if (qm.queue == getQueue().get()) { // Don't complete messages on the internal queue
+ // Finish completion of message, it has been acknowledged by the backup.
+ QPID_LOG(trace, logPrefix << "Acknowledged " << qm);
+ guard->complete(qm);
+ }
}
// Hide the "queue deleted" error for a ReplicatingSubscription when a
@@ -363,15 +369,16 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&)
// arbitrary connection threads.
void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
{
+ assert (qm.queue == getQueue().get());
bool doComplete = false;
- QPID_LOG(trace, logPrefix << "Dequeued " << qm);
{
Mutex::ScopedLock l(lock);
+ assert(!dequeues.contains(qm.position));
dequeues.add(qm.position);
+ // If we have not yet sent this message to the backup, then
+ // complete it now as it will never be accepted.
if (qm.position > position) doComplete = true;
}
- // If we have not yet sent this message to the backup, then
- // complete it now as it will never be accepted.
if (doComplete) guard->complete(qm);
notify(); // Ensure a call to doDispatch
}