diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 98 |
1 files changed, 69 insertions, 29 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 6c33002b5c..0070118102 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -132,20 +132,68 @@ bool ReplicatingSubscription::deliver(QueuedMessage& m) { ReplicatingSubscription::~ReplicatingSubscription() {} + +// INVARIANT: delayed contains msg <=> we have outstanding startCompletion on msg + +// Mark a message completed. May be called by acknowledge or dequeued +void ReplicatingSubscription::complete( + const QueuedMessage& qm, const sys::Mutex::ScopedLock&) +{ + // Handle completions for the subscribed queue, not the internal event queue. + if (qm.queue && qm.queue == getQueue().get()) { + QPID_LOG(trace, logPrefix << "Completed message " << qm.position); + Delayed::iterator i= delayed.find(qm.position); + // The same message can be completed twice, by acknowledged and + // dequeued, remove it from the set so it only gets completed + // once. + if (i != delayed.end()) { + assert(i->second.payload == qm.payload); + qm.payload->getIngressCompletion().finishCompleter(); + delayed.erase(i); + } + } +} + +// Called before we get notified of the message being available and +// under the message lock in the queue. Called in arbitrary connection thread. +void ReplicatingSubscription::enqueued(const QueuedMessage& qm) { + sys::Mutex::ScopedLock l(lock); + // Delay completion + QPID_LOG(trace, logPrefix << "Delaying completion of message " << qm.position); + qm.payload->getIngressCompletion().startCompleter(); + assert(delayed.find(qm.position) == delayed.end()); + delayed[qm.position] = qm; +} + + +// Function to complete a delayed message, called by cancel() +void ReplicatingSubscription::cancelComplete( + const Delayed::value_type& v, const sys::Mutex::ScopedLock&) +{ + QPID_LOG(trace, logPrefix << "Cancel completed message " << v.second.position); + v.second.payload->getIngressCompletion().finishCompleter(); +} + // Called in the subscription's connection thread. void ReplicatingSubscription::cancel() { - QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName()); getQueue()->removeObserver( boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); + { + sys::Mutex::ScopedLock l(lock); + QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName()); + for_each(delayed.begin(), delayed.end(), + boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l))); + delayed.clear(); + } ConsumerImpl::cancel(); } -// Called before we get notified of the message being available and -// under the message lock in the queue. Called in arbitrary connection thread. -void ReplicatingSubscription::enqueued(const QueuedMessage& m) { - //delay completion - m.payload->getIngressCompletion().startCompleter(); +// Called on primary in the backups IO thread. +void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) { + sys::Mutex::ScopedLock l(lock); + // Finish completion of message, it has been acknowledged by the backup. + complete(msg, l); } // Called with lock held. Called in subscription's connection thread. @@ -160,6 +208,21 @@ void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l) sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l); } +// Called after the message has been removed from the deque and under +// the messageLock in the queue. Called in arbitrary connection threads. +void ReplicatingSubscription::dequeued(const QueuedMessage& qm) +{ + { + sys::Mutex::ScopedLock l(lock); + QPID_LOG(trace, logPrefix << "Dequeued message " << qm.position); + dequeues.add(qm.position); + // If we have not yet sent this message to the backup, then + // complete it now as it will never be accepted. + if (qm.position > position) complete(qm, l); + } + notify(); // Ensure a call to doDispatch +} + // Called with lock held. Called in subscription's connection thread. void ReplicatingSubscription::sendPositionEvent( SequenceNumber position, const sys::Mutex::ScopedLock&l ) @@ -205,28 +268,6 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& events->dispatch(consumer); } -// Called after the message has been removed from the deque and under -// the messageLock in the queue. Called in arbitrary connection threads. -void ReplicatingSubscription::dequeued(const QueuedMessage& m) -{ - { - sys::Mutex::ScopedLock l(lock); - dequeues.add(m.position); - // If we have not yet sent this message to the backup, then - // complete it now as it will never be accepted. - - // FIXME aconway 2012-01-05: suspect use of position in - // foreign connection thread. Race with deliver() which is - // not under the message lock? - if (m.position > position) { - m.payload->getIngressCompletion().finishCompleter(); - QPID_LOG(trace, logPrefix << "Dequeued and completed message " << m.position << " early"); - } - else - QPID_LOG(trace, logPrefix << "Dequeued message " << m.position); - } - notify(); // Ensure a call to doDispatch -} // Called in subscription's connection thread. bool ReplicatingSubscription::doDispatch() @@ -244,7 +285,6 @@ bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) { re void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); } bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); } bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); } -void ReplicatingSubscription::DelegatingConsumer::cancel() {} OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); } }} // namespace qpid::ha |