diff options
Diffstat (limited to 'cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r-- | cpp/src/qpid/ha/ReplicatingSubscription.cpp | 57 |
1 files changed, 23 insertions, 34 deletions
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 6f7519cd1f..933716e8fa 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -91,25 +91,6 @@ string mask(const string& in) return DOLLAR + in + INTERNAL; } -namespace { -bool getSequence(const Message& message, SequenceNumber& result) -{ - result = message.getSequence(); - return true; -} -} -bool ReplicatingSubscription::getNext( - broker::Queue& q, SequenceNumber from, SequenceNumber& result) -{ - QueueCursor cursor(REPLICATOR); - return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), from); -} - -bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) { - QueueCursor cursor(REPLICATOR); - return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(front))); -} - /* Called by SemanticState::consume to create a consumer */ boost::shared_ptr<broker::SemanticState::ConsumerImpl> ReplicatingSubscription::Factory::create( @@ -157,7 +138,7 @@ ReplicatingSubscription::ReplicatingSubscription( // Set a log prefix message that identifies the remote broker. ostringstream os; - os << "Primary " << queue->getName() << "@" << info.getLogId() << ": "; + os << "Primary " << queue->getName() << "@" << info << ": "; logPrefix = os.str(); // NOTE: Once the guard is attached we can have concurrent @@ -171,6 +152,7 @@ ReplicatingSubscription::ReplicatingSubscription( guard->attach(*this); QueueRange backup(arguments); // Remote backup range. + QueueRange backupOriginal(backup); QueueRange primary(guard->getRange()); // Unguarded range when the guard was set. backupPosition = backup.back; @@ -207,7 +189,7 @@ ReplicatingSubscription::ReplicatingSubscription( // queue and hasn't been tampered with then that will be the case. QPID_LOG(debug, logPrefix << "Subscribed:" - << " backup:" << backup + << " backup:" << backupOriginal << " adjusted backup:" << backup << " primary:" << primary << " catch-up: " << position << "-" << primary.back << "(" << primary.back-position << ")"); @@ -222,9 +204,7 @@ ReplicatingSubscription::ReplicatingSubscription( } } -ReplicatingSubscription::~ReplicatingSubscription() { - QPID_LOG(debug, logPrefix << "Detroyed replicating subscription"); -} +ReplicatingSubscription::~ReplicatingSubscription() {} // Called in subscription's connection thread when the subscription is created. // Called separate from ctor because sending events requires @@ -248,19 +228,20 @@ void ReplicatingSubscription::initialize() { } // Message is delivered in the subscription's connection thread. -bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const qpid::broker::Message& m) { - position = m.getSequence(); +bool ReplicatingSubscription::deliver( + const qpid::broker::QueueCursor& c, const qpid::broker::Message& m) +{ try { - QPID_LOG(trace, logPrefix << "Replicating " << getQueue()->getName() << "[" << m.getSequence() << "]"); + QPID_LOG(trace, logPrefix << "Replicating " << m.getSequence()); { Mutex::ScopedLock l(lock); - //FIXME GRS: position is no longer set//assert(position == m.getSequence()); + position = m.getSequence(); - // m.getSequence() is the position of the newly enqueued message on local queue. + // m.getSequence() is the position of the new message on local queue. // backupPosition is latest position on backup queue before enqueueing if (m.getSequence() <= backupPosition) throw Exception( - QPID_MSG("Expected position > " << backupPosition + QPID_MSG(logPrefix << "Expected position > " << backupPosition << " but got " << m.getSequence())); if (m.getSequence() - backupPosition > 1) { // Position has advanced because of messages dequeued ahead of us. @@ -272,7 +253,7 @@ bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const } return ConsumerImpl::deliver(c, m); } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "Error replicating " << getQueue()->getName() << "[" << m.getSequence() << "]" + QPID_LOG(critical, logPrefix << "Error replicating " << m.getSequence() << ": " << e.what()); throw; } @@ -292,6 +273,7 @@ void ReplicatingSubscription::setReady() { // Called in the subscription's connection thread. void ReplicatingSubscription::cancel() { + QPID_LOG(debug, logPrefix << "Cancelled"); guard->cancel(); ConsumerImpl::cancel(); } @@ -299,7 +281,7 @@ void ReplicatingSubscription::cancel() // Consumer override, called on primary in the backup's IO thread. void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) { // Finish completion of message, it has been acknowledged by the backup. - QPID_LOG(trace, logPrefix << "Acknowledged " << getQueue()->getName() << "[" << r.getMessageId() << "]"); + QPID_LOG(trace, logPrefix << "Acknowledged " << r.getMessageId()); guard->complete(r.getMessageId()); // If next message is protected by the guard then we are ready if (r.getMessageId() >= guard->getRange().back) setReady(); @@ -328,7 +310,7 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&) // arbitrary connection threads. void ReplicatingSubscription::dequeued(const Message& m) { - QPID_LOG(trace, logPrefix << "Dequeued " << getQueue()->getName() << "[" << m.getSequence() << "]"); + QPID_LOG(trace, logPrefix << "Dequeued " << m.getSequence()); { Mutex::ScopedLock l(lock); dequeues.add(m.getSequence()); @@ -396,7 +378,14 @@ bool ReplicatingSubscription::doDispatch() Mutex::ScopedLock l(lock); if (!dequeues.empty()) sendDequeueEvent(l); } - return ConsumerImpl::doDispatch(); + try { + return ConsumerImpl::doDispatch(); + } + catch (const std::exception& e) { + // FIXME aconway 2012-10-05: detect queue deletion, no warning. + QPID_LOG(warning, logPrefix << " exception in dispatch: " << e.what()); + return false; + } } }} // namespace qpid::ha |