diff options
author | Alan Conway <aconway@apache.org> | 2012-01-19 23:06:24 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-01-19 23:06:24 +0000 |
commit | f46f5438624f6960644aadc7f3d451ba1690dc80 (patch) | |
tree | 2da8fd75731b8ceafc95e1eb2da7c6d2620134ea | |
parent | 7be847b9351c7afffa21b3d168e3bd2c9d5b16b7 (diff) | |
download | qpid-python-f46f5438624f6960644aadc7f3d451ba1690dc80.tar.gz |
QPID-3603: Fix race condition in setting initial position of ReplicatingSubscription.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233665 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 18 |
3 files changed, 8 insertions, 20 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 12886eff0b..b95c27982f 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -1471,15 +1471,6 @@ class FindLowest }; } -bool Queue::getOldest(qpid::framing::SequenceNumber& oldest) -{ - //Horribly inefficient, but saves modifying Messages interface and - //all its implementations at present: - FindLowest f; - eachMessage(boost::bind(&FindLowest::process, &f, _1)); - return f.getLowest(oldest); -} - Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} bool Queue::UsageBarrier::acquire() diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index a53916ffbc..6049de6191 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -407,7 +407,6 @@ class Queue : public boost::enable_shared_from_this<Queue>, uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); } void setDequeueSincePurge(uint32_t value); - bool getOldest(framing::SequenceNumber& result); }; } } diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 00be587fe4..8df24f2dfc 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -98,24 +98,21 @@ ReplicatingSubscription::ReplicatingSubscription( // Note that broker::Queue::getPosition() returns the sequence // number that will be assigned to the next message *minus 1*. - // this->position is inherited from ConsumerImpl. It tracks the - // position of the last message browsed on the local (primary) - // queue, or more exactly the next sequence number to browse - // *minus 1* - qpid::framing::SequenceNumber oldest; - position = queue->getOldest(oldest) ? --oldest : queue->getPosition(); - // this->backupPosition tracks the position of the remote backup // queue, i.e. the sequence number for the next delivered message // *minus one* backupPosition = 0; + + // FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0 + // so we will start consuming from the lowest numbered message. + // This is incorrect if the sequence number wraps around, but + // this is what all consumers currently do. } // Message is delivered in the subscription's connection thread. bool ReplicatingSubscription::deliver(QueuedMessage& m) { // Add position events for the subscribed queue, not for the internal event queue. - if (m.queue && m.queue->getName() == getQueue()->getName()) { - QPID_LOG(trace, "HA: replicating message to backup: " << QueuePos(m)); + if (m.queue && m.queue == getQueue().get()) { assert(position == m.position); { sys::Mutex::ScopedLock l(lock); @@ -130,6 +127,7 @@ bool ReplicatingSubscription::deliver(QueuedMessage& m) { } backupPosition = position; } + QPID_LOG(trace, "HA: replicating message to backup: " << QueuePos(m)); } return ConsumerImpl::deliver(m); } @@ -213,7 +211,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& m) { sys::Mutex::ScopedLock l(lock); dequeues.add(m.position); - QPID_LOG(trace, "HA: Will dequeue " << QueuePos(m) << " on " << getName()); + QPID_LOG(trace, "HA: Dequeued " << QueuePos(m) << " on " << getName()); } notify(); // Ensure a call to doDispatch if (m.position > position) { |