diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 21 |
1 files changed, 13 insertions, 8 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 67e1e77681..e08a34529e 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -107,7 +107,7 @@ ReplicatingSubscription::ReplicatingSubscription( const framing::FieldTable& arguments ) : ConsumerImpl(parent, name, queue_, ack, REPLICATOR, exclusive, tag, resumeId, resumeTtl, arguments), - + logPrefix(hb.logPrefix), position(0), wasStopped(false), ready(false), cancelled(false), haBroker(hb), primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole())) @@ -121,7 +121,7 @@ void ReplicatingSubscription::initialize() { FieldTable ft; if (!getArguments().getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft)) throw InvalidArgumentException( - logPrefix+"Can't subscribe, no broker info: "+getTag()); + logPrefix.get()+"Can't subscribe, no broker info: "+getTag()); info.assign(ft); // Set a log prefix message that identifies the remote broker. @@ -132,7 +132,7 @@ void ReplicatingSubscription::initialize() { // If there's already a guard (we are in failover) use it, else create one. if (primary) guard = primary->getGuard(queue, info); - if (!guard) guard.reset(new QueueGuard(*queue, info)); + if (!guard) guard.reset(new QueueGuard(*queue, info, logPrefix.prePrefix)); // NOTE: Once the observer is attached we can have concurrent // calls to dequeued so we need to lock use of this->dequeues. @@ -147,7 +147,7 @@ void ReplicatingSubscription::initialize() { if (!snapshot) { queue->getObservers().remove( boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this())); - throw ResourceDeletedException(logPrefix+"Can't subscribe, queue deleted"); + throw ResourceDeletedException(logPrefix.get()+"Can't subscribe, queue deleted"); } ReplicationIdSet primaryIds = snapshot->getSnapshot(); std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET); @@ -166,10 +166,10 @@ void ReplicatingSubscription::initialize() { // position >= front so if front is safe then position must be. position = front; - QPID_LOG(debug, logPrefix << "Subscribed: front " << front - << ", back " << back + QPID_LOG(debug, logPrefix << "Subscribed: primary [" + << front << "," << back << "]=" << primaryIds << ", guarded " << guard->getFirst() - << ", on backup " << skipEnqueue); + << ", backup (keep " << skipEnqueue << ", drop " << initDequeues << ")"); checkReady(l); } @@ -242,7 +242,12 @@ void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) { ready = true; sys::Mutex::ScopedUnlock u(lock); // Notify Primary that a subscription is ready. - QPID_LOG(debug, logPrefix << "Caught up"); + if (position+1 >= guard->getFirst()) { + QPID_LOG(debug, logPrefix << "Caught up at " << position); + } else { + QPID_LOG(debug, logPrefix << "Caught up at " << position << "short of guard at " << guard->getFirst()); + } + if (primary) primary->readyReplica(*this); } } |