summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp10
1 files changed, 5 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 4ccdfdfb74..1ede47ed60 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -220,12 +220,12 @@ bool ReplicatingSubscription::deliver(
QPID_LOG(trace, logPrefix << "On backup, skip " <<
LogMessageId(*getQueue(), m));
guard->complete(id); // This will never be acknowledged.
- result = false;
+ notify();
+ result = true;
}
else {
QPID_LOG(trace, logPrefix << "Replicated " << LogMessageId(*getQueue(), m));
- // Only consider unguarded messages for ready status.
- if (!ready && !isGuarded(l)) unacked += id;
+ if (!ready && !isGuarded(l)) unready += id;
sendIdEvent(id, l);
result = ConsumerImpl::deliver(c, m);
}
@@ -242,7 +242,7 @@ bool ReplicatingSubscription::deliver(
*@param position: must be <= last position seen by subscription.
*/
void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) {
- if (!ready && isGuarded(l) && unacked.empty()) {
+ if (!ready && isGuarded(l) && unready.empty()) {
ready = true;
sys::Mutex::ScopedUnlock u(lock);
// Notify Primary that a subscription is ready.
@@ -274,7 +274,7 @@ void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
guard->complete(id);
{
Mutex::ScopedLock l(lock);
- unacked -= id;
+ unready -= id;
checkReady(l);
}
ConsumerImpl::acknowledged(r);