summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp98
1 files changed, 69 insertions, 29 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 6c33002b5c..0070118102 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -132,20 +132,68 @@ bool ReplicatingSubscription::deliver(QueuedMessage& m) {
ReplicatingSubscription::~ReplicatingSubscription() {}
+
+// INVARIANT: delayed contains msg <=> we have outstanding startCompletion on msg
+
+// Mark a message completed. May be called by acknowledge or dequeued
+void ReplicatingSubscription::complete(
+ const QueuedMessage& qm, const sys::Mutex::ScopedLock&)
+{
+ // Handle completions for the subscribed queue, not the internal event queue.
+ if (qm.queue && qm.queue == getQueue().get()) {
+ QPID_LOG(trace, logPrefix << "Completed message " << qm.position);
+ Delayed::iterator i= delayed.find(qm.position);
+ // The same message can be completed twice, by acknowledged and
+ // dequeued, remove it from the set so it only gets completed
+ // once.
+ if (i != delayed.end()) {
+ assert(i->second.payload == qm.payload);
+ qm.payload->getIngressCompletion().finishCompleter();
+ delayed.erase(i);
+ }
+ }
+}
+
+// Called before we get notified of the message being available and
+// under the message lock in the queue. Called in arbitrary connection thread.
+void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
+ sys::Mutex::ScopedLock l(lock);
+ // Delay completion
+ QPID_LOG(trace, logPrefix << "Delaying completion of message " << qm.position);
+ qm.payload->getIngressCompletion().startCompleter();
+ assert(delayed.find(qm.position) == delayed.end());
+ delayed[qm.position] = qm;
+}
+
+
+// Function to complete a delayed message, called by cancel()
+void ReplicatingSubscription::cancelComplete(
+ const Delayed::value_type& v, const sys::Mutex::ScopedLock&)
+{
+ QPID_LOG(trace, logPrefix << "Cancel completed message " << v.second.position);
+ v.second.payload->getIngressCompletion().finishCompleter();
+}
+
// Called in the subscription's connection thread.
void ReplicatingSubscription::cancel()
{
- QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName());
getQueue()->removeObserver(
boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
+ {
+ sys::Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName());
+ for_each(delayed.begin(), delayed.end(),
+ boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l)));
+ delayed.clear();
+ }
ConsumerImpl::cancel();
}
-// Called before we get notified of the message being available and
-// under the message lock in the queue. Called in arbitrary connection thread.
-void ReplicatingSubscription::enqueued(const QueuedMessage& m) {
- //delay completion
- m.payload->getIngressCompletion().startCompleter();
+// Called on primary in the backups IO thread.
+void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) {
+ sys::Mutex::ScopedLock l(lock);
+ // Finish completion of message, it has been acknowledged by the backup.
+ complete(msg, l);
}
// Called with lock held. Called in subscription's connection thread.
@@ -160,6 +208,21 @@ void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l)
sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
}
+// Called after the message has been removed from the deque and under
+// the messageLock in the queue. Called in arbitrary connection threads.
+void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
+{
+ {
+ sys::Mutex::ScopedLock l(lock);
+ QPID_LOG(trace, logPrefix << "Dequeued message " << qm.position);
+ dequeues.add(qm.position);
+ // If we have not yet sent this message to the backup, then
+ // complete it now as it will never be accepted.
+ if (qm.position > position) complete(qm, l);
+ }
+ notify(); // Ensure a call to doDispatch
+}
+
// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendPositionEvent(
SequenceNumber position, const sys::Mutex::ScopedLock&l )
@@ -205,28 +268,6 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer&
events->dispatch(consumer);
}
-// Called after the message has been removed from the deque and under
-// the messageLock in the queue. Called in arbitrary connection threads.
-void ReplicatingSubscription::dequeued(const QueuedMessage& m)
-{
- {
- sys::Mutex::ScopedLock l(lock);
- dequeues.add(m.position);
- // If we have not yet sent this message to the backup, then
- // complete it now as it will never be accepted.
-
- // FIXME aconway 2012-01-05: suspect use of position in
- // foreign connection thread. Race with deliver() which is
- // not under the message lock?
- if (m.position > position) {
- m.payload->getIngressCompletion().finishCompleter();
- QPID_LOG(trace, logPrefix << "Dequeued and completed message " << m.position << " early");
- }
- else
- QPID_LOG(trace, logPrefix << "Dequeued message " << m.position);
- }
- notify(); // Ensure a call to doDispatch
-}
// Called in subscription's connection thread.
bool ReplicatingSubscription::doDispatch()
@@ -244,7 +285,6 @@ bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) { re
void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
-void ReplicatingSubscription::DelegatingConsumer::cancel() {}
OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
}} // namespace qpid::ha