summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp24
1 files changed, 3 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 5fcb316ce6..ae6e7181d1 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -91,25 +91,6 @@ string mask(const string& in)
return DOLLAR + in + INTERNAL;
}
-namespace {
-bool getSequence(const Message& message, SequenceNumber& result)
-{
- result = message.getSequence();
- return true;
-}
-}
-bool ReplicatingSubscription::getNext(
- broker::Queue& q, SequenceNumber position, SequenceNumber& result)
-{
- QueueCursor cursor(REPLICATOR);
- return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), position+1);
-}
-
-bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) {
- QueueCursor cursor(REPLICATOR);
- return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(front)));
-}
-
/* Called by SemanticState::consume to create a consumer */
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
ReplicatingSubscription::Factory::create(
@@ -171,6 +152,7 @@ ReplicatingSubscription::ReplicatingSubscription(
guard->attach(*this);
QueueRange backup(arguments); // Remote backup range.
+ QueueRange backupOriginal(backup);
QueueRange primary(guard->getRange()); // Unguarded range when the guard was set.
backupPosition = backup.back;
@@ -207,7 +189,7 @@ ReplicatingSubscription::ReplicatingSubscription(
// queue and hasn't been tampered with then that will be the case.
QPID_LOG(debug, logPrefix << "Subscribed:"
- << " backup:" << backup
+ << " backup:" << backupOriginal << " adjusted backup:" << backup
<< " primary:" << primary
<< " catch-up: " << position << "-" << primary.back
<< "(" << primary.back-position << ")");
@@ -258,7 +240,7 @@ bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const
// backupPosition is latest position on backup queue before enqueueing
if (m.getSequence() <= backupPosition)
throw Exception(
- QPID_MSG("Expected position > " << backupPosition
+ QPID_MSG(logPrefix << "Expected position > " << backupPosition
<< " but got " << m.getSequence()));
if (m.getSequence() - backupPosition > 1) {
// Position has advanced because of messages dequeued ahead of us.