From c167f61280bdf7349cb822d5326ec0de2d3ea067 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 19 Jan 2012 23:07:38 +0000 Subject: QPID-3603: Code cleanup to make ReplicatingSubscription more readable. Clarified deliver() and dequeued() logic and locking. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233675 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 52 +++++++++++------------- 1 file changed, 23 insertions(+), 29 deletions(-) diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index a77154c595..3dae5fd0a7 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -103,14 +103,6 @@ ReplicatingSubscription::ReplicatingSubscription( QPID_LOG(debug, logPrefix << "Created subscription " << name); - // Note that broker::Queue::getPosition() returns the sequence - // number that will be assigned to the next message *minus 1*. - - // this->backupPosition tracks the position of the remote backup - // queue, i.e. the sequence number for the next delivered message - // *minus one* - backupPosition = 0; - // FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0 // so we will start consuming from the lowest numbered message. // This is incorrect if the sequence number wraps around, but @@ -121,22 +113,20 @@ ReplicatingSubscription::ReplicatingSubscription( bool ReplicatingSubscription::deliver(QueuedMessage& m) { // Add position events for the subscribed queue, not for the internal event queue. if (m.queue && m.queue == getQueue().get()) { + sys::Mutex::ScopedLock l(lock); assert(position == m.position); - { - sys::Mutex::ScopedLock l(lock); - // this->position is the new position after enqueueing m locally. - // this->backupPosition is the backup position before enqueueing m. - assert(position > backupPosition); - if (position - backupPosition > 1) { - // Position has advanced because of messages dequeued ahead of us. - SequenceNumber send(position); - --send; // Send the position before m was enqueued. - sendPositionEvent(send, l); - QPID_LOG(trace, logPrefix << "Sending position " << send - << ", was " << backupPosition); - } - backupPosition = position; + // m.position is the position of the newly enqueued m on the local queue. + // backupPosition is latest position on the backup queue (before enqueueing m.) + assert(m.position > backupPosition); + if (m.position - backupPosition > 1) { + // Position has advanced because of messages dequeued ahead of us. + SequenceNumber send(m.position); + --send; // Send the position before m was enqueued. + sendPositionEvent(send, l); + QPID_LOG(trace, logPrefix << "Sending position " << send + << ", was " << backupPosition); } + backupPosition = m.position; QPID_LOG(trace, logPrefix << "Replicating message " << m.position); } return ConsumerImpl::deliver(m); @@ -215,21 +205,25 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& } // Called after the message has been removed from the deque and under -// the message lock in the queue. Called in arbitrary connection threads. +// the messageLock in the queue. Called in arbitrary connection threads. void ReplicatingSubscription::dequeued(const QueuedMessage& m) { QPID_LOG(trace, logPrefix << "Dequeued message " << m.position); { sys::Mutex::ScopedLock l(lock); dequeues.add(m.position); + // If we have not yet sent this message to the backup, then + // complete it now as it will never be accepted. + + // FIXME aconway 2012-01-05: suspect use of position in + // foreign connection thread. Race with deliver() which is + // not under the message lock? + if (m.position > position) { + m.payload->getIngressCompletion().finishCompleter(); + QPID_LOG(trace, logPrefix << "Completed message " << m.position << " early"); + } } notify(); // Ensure a call to doDispatch - // FIXME aconway 2011-12-20: not thread safe to access position here, - // we're not in the dispatch thread. - if (m.position > position) { - m.payload->getIngressCompletion().finishCompleter(); - QPID_LOG(trace, logPrefix << "Completed message " << m.position << " early"); - } } // Called in subscription's connection thread. -- cgit v1.2.1