diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 24 |
1 files changed, 3 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 5fcb316ce6..ae6e7181d1 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -91,25 +91,6 @@ string mask(const string& in) return DOLLAR + in + INTERNAL; } -namespace { -bool getSequence(const Message& message, SequenceNumber& result) -{ - result = message.getSequence(); - return true; -} -} -bool ReplicatingSubscription::getNext( - broker::Queue& q, SequenceNumber position, SequenceNumber& result) -{ - QueueCursor cursor(REPLICATOR); - return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), position+1); -} - -bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) { - QueueCursor cursor(REPLICATOR); - return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(front))); -} - /* Called by SemanticState::consume to create a consumer */ boost::shared_ptr<broker::SemanticState::ConsumerImpl> ReplicatingSubscription::Factory::create( @@ -171,6 +152,7 @@ ReplicatingSubscription::ReplicatingSubscription( guard->attach(*this); QueueRange backup(arguments); // Remote backup range. + QueueRange backupOriginal(backup); QueueRange primary(guard->getRange()); // Unguarded range when the guard was set. backupPosition = backup.back; @@ -207,7 +189,7 @@ ReplicatingSubscription::ReplicatingSubscription( // queue and hasn't been tampered with then that will be the case. QPID_LOG(debug, logPrefix << "Subscribed:" - << " backup:" << backup + << " backup:" << backupOriginal << " adjusted backup:" << backup << " primary:" << primary << " catch-up: " << position << "-" << primary.back << "(" << primary.back-position << ")"); @@ -258,7 +240,7 @@ bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const // backupPosition is latest position on backup queue before enqueueing if (m.getSequence() <= backupPosition) throw Exception( - QPID_MSG("Expected position > " << backupPosition + QPID_MSG(logPrefix << "Expected position > " << backupPosition << " but got " << m.getSequence())); if (m.getSequence() - backupPosition > 1) { // Position has advanced because of messages dequeued ahead of us. |