summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp21
1 files changed, 13 insertions, 8 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 67e1e77681..e08a34529e 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -107,7 +107,7 @@ ReplicatingSubscription::ReplicatingSubscription(
const framing::FieldTable& arguments
) : ConsumerImpl(parent, name, queue_, ack, REPLICATOR, exclusive, tag,
resumeId, resumeTtl, arguments),
-
+ logPrefix(hb.logPrefix),
position(0), wasStopped(false), ready(false), cancelled(false),
haBroker(hb),
primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()))
@@ -121,7 +121,7 @@ void ReplicatingSubscription::initialize() {
FieldTable ft;
if (!getArguments().getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft))
throw InvalidArgumentException(
- logPrefix+"Can't subscribe, no broker info: "+getTag());
+ logPrefix.get()+"Can't subscribe, no broker info: "+getTag());
info.assign(ft);
// Set a log prefix message that identifies the remote broker.
@@ -132,7 +132,7 @@ void ReplicatingSubscription::initialize() {
// If there's already a guard (we are in failover) use it, else create one.
if (primary) guard = primary->getGuard(queue, info);
- if (!guard) guard.reset(new QueueGuard(*queue, info));
+ if (!guard) guard.reset(new QueueGuard(*queue, info, logPrefix.prePrefix));
// NOTE: Once the observer is attached we can have concurrent
// calls to dequeued so we need to lock use of this->dequeues.
@@ -147,7 +147,7 @@ void ReplicatingSubscription::initialize() {
if (!snapshot) {
queue->getObservers().remove(
boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
- throw ResourceDeletedException(logPrefix+"Can't subscribe, queue deleted");
+ throw ResourceDeletedException(logPrefix.get()+"Can't subscribe, queue deleted");
}
ReplicationIdSet primaryIds = snapshot->getSnapshot();
std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET);
@@ -166,10 +166,10 @@ void ReplicatingSubscription::initialize() {
// position >= front so if front is safe then position must be.
position = front;
- QPID_LOG(debug, logPrefix << "Subscribed: front " << front
- << ", back " << back
+ QPID_LOG(debug, logPrefix << "Subscribed: primary ["
+ << front << "," << back << "]=" << primaryIds
<< ", guarded " << guard->getFirst()
- << ", on backup " << skipEnqueue);
+ << ", backup (keep " << skipEnqueue << ", drop " << initDequeues << ")");
checkReady(l);
}
@@ -242,7 +242,12 @@ void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) {
ready = true;
sys::Mutex::ScopedUnlock u(lock);
// Notify Primary that a subscription is ready.
- QPID_LOG(debug, logPrefix << "Caught up");
+ if (position+1 >= guard->getFirst()) {
+ QPID_LOG(debug, logPrefix << "Caught up at " << position);
+ } else {
+ QPID_LOG(debug, logPrefix << "Caught up at " << position << "short of guard at " << guard->getFirst());
+ }
+
if (primary) primary->readyReplica(*this);
}
}