diff options
author | Alan Conway <aconway@apache.org> | 2012-09-19 21:42:21 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-09-19 21:42:21 +0000 |
commit | 653f560b36eb7070303eabc0aad0e0ab5fe47f8f (patch) | |
tree | 66c570fe1663016cda22ccb40da00a6ac28c8cb0 /cpp | |
parent | a770b917bad25a996b2177f83ca9c38df7161edf (diff) | |
download | qpid-python-653f560b36eb7070303eabc0aad0e0ab5fe47f8f.tar.gz |
QPID-4322: HA sporadic failure in ha_tests
Added Queue::getRange to get range atomically, fixes races around getting
the front and backup of the range as two separate operations.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1387785 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/Link.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 17 | ||||
-rw-r--r-- | cpp/src/qpid/ha/QueueRange.h | 11 | ||||
-rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 19 | ||||
-rw-r--r-- | cpp/src/qpid/ha/ReplicatingSubscription.cpp | 24 | ||||
-rw-r--r-- | cpp/src/qpid/ha/ReplicatingSubscription.h | 11 | ||||
-rwxr-xr-x | cpp/src/tests/ha_tests.py | 2 |
8 files changed, 48 insertions, 48 deletions
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 6479e47799..afa5623ecd 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -230,7 +230,7 @@ void Link::startConnectionLH () setStateLH(STATE_CONNECTING); broker->connect (host, boost::lexical_cast<std::string>(port), transport, boost::bind (&Link::closed, this, _1, _2)); - QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port); + QPID_LOG (info, "Inter-broker link connecting to " << host << ":" << port); } catch(const std::exception& e) { QPID_LOG(error, "Link connection to " << host << ":" << port << " failed: " << e.what()); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 9be3a1acac..276e17a8b5 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -1390,6 +1390,16 @@ SequenceNumber Queue::getPosition() { return sequence; } +void Queue::getRange(framing::SequenceNumber& front, framing::SequenceNumber& back, + SubscriptionType type) +{ + Mutex::ScopedLock locker(messageLock); + QueueCursor cursor(type); + back = sequence; + Message* message = messages->next(cursor); + front = message ? message->getSequence() : back+1; +} + int Queue::getEventMode() { return eventMode; } void Queue::recoveryComplete(ExchangeRegistry& exchanges) diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 671a24d53e..d52afec6b9 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -375,9 +375,24 @@ class Queue : public boost::enable_shared_from_this<Queue>, /** *@return sequence number for the back of the queue. The next message pushed - * will be at getPosition+1 + * will be at getPosition()+1 */ QPID_BROKER_EXTERN framing::SequenceNumber getPosition(); + + /** + * Set front and back. + * If the queue is empty then front = back+1 (the first message to + * consume will be the next message pushed.) + * + *@param front = Position of first message to consume. + *@param back = getPosition(), next message pushed will be getPosition()+1 + *@param type Subscription type to use to determine the front. + */ + QPID_BROKER_EXTERN void getRange( + framing::SequenceNumber& front, framing::SequenceNumber& back, + SubscriptionType type=CONSUMER + ); + QPID_BROKER_EXTERN void addObserver(boost::shared_ptr<QueueObserver>); QPID_BROKER_EXTERN void removeObserver(boost::shared_ptr<QueueObserver>); QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key); diff --git a/cpp/src/qpid/ha/QueueRange.h b/cpp/src/qpid/ha/QueueRange.h index d734326910..f67ac146e6 100644 --- a/cpp/src/qpid/ha/QueueRange.h +++ b/cpp/src/qpid/ha/QueueRange.h @@ -24,6 +24,7 @@ #include "ReplicatingSubscription.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/QueueCursor.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/SequenceNumber.h" #include <iostream> @@ -51,15 +52,7 @@ struct QueueRange { QueueRange() : front(1), back(0) { } // Empty range. - QueueRange(broker::Queue& q) { - if (ReplicatingSubscription::getFront(q, front)) - back = q.getPosition(); - else { - back = q.getPosition(); - front = back+1; // empty - } - assert(front <= back + 1); - } + QueueRange(broker::Queue& q) { q.getRange(front, back, broker::REPLICATOR); } QueueRange(const framing::FieldTable& args) { back = args.getAsInt(ReplicatingSubscription::QPID_BACK); diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index 8baecb0e0f..82daad9d9c 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -119,9 +119,9 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa queue->getPosition()); settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable()); - SequenceNumber front; - if (ReplicatingSubscription::getFront(*queue, front)) - settings.setInt(ReplicatingSubscription::QPID_FRONT, front); + SequenceNumber front, back; + queue->getRange(front, back, broker::REPLICATOR); + if (front <= back) settings.setInt(ReplicatingSubscription::QPID_FRONT, front); QPID_LOG(debug, logPrefix << " subscribe with settings " << settings); peer.getMessage().subscribe( @@ -152,6 +152,17 @@ void QueueReplicator::dequeue(SequenceNumber n, sys::Mutex::ScopedLock&) { queue->dequeueMessageAt(n); } +namespace { +bool getSequence(const Message& message, SequenceNumber& result) { + result = message.getSequence(); + return true; +} +bool getNext(broker::Queue& q, SequenceNumber position, SequenceNumber& result) { + QueueCursor cursor(REPLICATOR); + return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), position+1); +} +} // namespace + // Called in connection thread of the queues bridge to primary. void QueueReplicator::route(Deliverable& msg) { @@ -176,7 +187,7 @@ void QueueReplicator::route(Deliverable& msg) << " to " << position); // Verify that there are no messages after the new position in the queue. SequenceNumber next; - if (ReplicatingSubscription::getNext(*queue, position, next)) + if (getNext(*queue, position, next)) throw Exception(QPID_MSG(logPrefix << "Invalid position " << position << " preceeds message at " << next)); queue->setPosition(position); diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 5fcb316ce6..ae6e7181d1 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -91,25 +91,6 @@ string mask(const string& in) return DOLLAR + in + INTERNAL; } -namespace { -bool getSequence(const Message& message, SequenceNumber& result) -{ - result = message.getSequence(); - return true; -} -} -bool ReplicatingSubscription::getNext( - broker::Queue& q, SequenceNumber position, SequenceNumber& result) -{ - QueueCursor cursor(REPLICATOR); - return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), position+1); -} - -bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) { - QueueCursor cursor(REPLICATOR); - return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(front))); -} - /* Called by SemanticState::consume to create a consumer */ boost::shared_ptr<broker::SemanticState::ConsumerImpl> ReplicatingSubscription::Factory::create( @@ -171,6 +152,7 @@ ReplicatingSubscription::ReplicatingSubscription( guard->attach(*this); QueueRange backup(arguments); // Remote backup range. + QueueRange backupOriginal(backup); QueueRange primary(guard->getRange()); // Unguarded range when the guard was set. backupPosition = backup.back; @@ -207,7 +189,7 @@ ReplicatingSubscription::ReplicatingSubscription( // queue and hasn't been tampered with then that will be the case. QPID_LOG(debug, logPrefix << "Subscribed:" - << " backup:" << backup + << " backup:" << backupOriginal << " adjusted backup:" << backup << " primary:" << primary << " catch-up: " << position << "-" << primary.back << "(" << primary.back-position << ")"); @@ -258,7 +240,7 @@ bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const // backupPosition is latest position on backup queue before enqueueing if (m.getSequence() <= backupPosition) throw Exception( - QPID_MSG("Expected position > " << backupPosition + QPID_MSG(logPrefix << "Expected position > " << backupPosition << " but got " << m.getSequence())); if (m.getSequence() - backupPosition > 1) { // Position has advanced because of messages dequeued ahead of us. diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.h b/cpp/src/qpid/ha/ReplicatingSubscription.h index 4f54ffce96..f714e8e01a 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -80,17 +80,6 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl static const std::string QPID_FRONT; static const std::string QPID_BROKER_INFO; - // TODO aconway 2012-05-23: these don't belong on ReplicatingSubscription - /** Get position of front message on queue. - *@return false if queue is empty. - */ - static bool getFront(broker::Queue&, framing::SequenceNumber& result); - /** Get next message after position in queue. - *@return false if none found. - */ - static bool getNext(broker::Queue&, framing::SequenceNumber position, - framing::SequenceNumber& result); - ReplicatingSubscription(broker::SemanticState* parent, const std::string& name, boost::shared_ptr<broker::Queue> , bool ack, bool acquire, bool exclusive, const std::string& tag, diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index e32077f219..64d87fbc86 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -886,7 +886,7 @@ class LongTests(BrokerTest): """Wait for receiver r to pass n""" def check(): r.check() # Verify no exceptions - return r.received > n + return r.received > n + 100 assert retry(check), "Stalled %s at %s"%(r.queue, n) for r in receivers: wait_passed(r, 0) |