diff options
author | Alan Conway <aconway@apache.org> | 2012-06-18 20:43:19 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-06-18 20:43:19 +0000 |
commit | c45ee73853cb7c84bb2a7dd0c7f9fdecd7aa9286 (patch) | |
tree | f31c2dc75fdcb8ec16ea9732a84a4907c5306850 | |
parent | bd64faa59c1a90d41e389115a8445f9917a804df (diff) | |
download | qpid-python-c45ee73853cb7c84bb2a7dd0c7f9fdecd7aa9286.tar.gz |
QPID-4072: HA use backup messages in failover.
ReplicatingSubscription syncs the primary and backup queues, and does not
re-send messages that are already on the backup.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1351481 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/CMakeLists.txt | 1 | ||||
-rw-r--r-- | qpid/cpp/src/ha.mk | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueGuard.cpp | 31 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueGuard.h | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueRange.h | 71 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 134 | ||||
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 5 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 5 |
8 files changed, 152 insertions, 114 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 9791c391fe..cb037573a1 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -648,6 +648,7 @@ if (BUILD_HA) qpid/ha/Primary.cpp qpid/ha/Primary.h qpid/ha/PrimaryConnectionMonitor.h + qpid/ha/QueueRange.h qpid/ha/QueueReplicator.cpp qpid/ha/QueueReplicator.h qpid/ha/ReplicatingSubscription.cpp diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk index da8e35c90d..bf1bd10256 100644 --- a/qpid/cpp/src/ha.mk +++ b/qpid/cpp/src/ha.mk @@ -43,6 +43,7 @@ ha_la_SOURCES = \ qpid/ha/PrimaryConnectionMonitor.h \ qpid/ha/QueueGuard.cpp \ qpid/ha/QueueGuard.h \ + qpid/ha/QueueRange.h \ qpid/ha/QueueReplicator.cpp \ qpid/ha/QueueReplicator.h \ qpid/ha/ReplicatingSubscription.cpp \ diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp index 6293f640e1..1f4ff4e48b 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp +++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp @@ -50,15 +50,15 @@ class QueueGuard::QueueObserver : public broker::QueueObserver QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) - : queue(q), subscription(0) + : queue(q), subscription(0), range(q) { + // NOTE: The QueueGuard is created before the queue becomes active: either + // when a backup is promoted, or when a new queue is created on the primary. std::ostringstream os; os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": "; logPrefix = os.str(); observer.reset(new QueueObserver(*this)); queue.addObserver(observer); - // Set after addObserver to ensure we dont miss an enqueue. - firstSafe = queue.getPosition() + 1; // Next message will be protected by the guard. } QueueGuard::~QueueGuard() { cancel(); } @@ -99,24 +99,24 @@ void QueueGuard::cancel() { queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1)); } +void QueueGuard::attach(ReplicatingSubscription& rs) { + Mutex::ScopedLock l(lock); + subscription = &rs; +} + namespace { void completeBefore(QueueGuard* guard, SequenceNumber position, const QueuedMessage& qm) { if (qm.position <= position) guard->complete(qm); } } -void QueueGuard::attach(ReplicatingSubscription& rs) { - // NOTE: attach is called before the ReplicatingSubscription is active so - // it's position is not changing. - assert(firstSafe >= rs.getPosition()); - // Complete any messages before or at the ReplicatingSubscription position. - if (!delayed.empty() && delayed.front() <= rs.getPosition()) { +bool QueueGuard::subscriptionStart(SequenceNumber position) { + // Complete any messages before or at the ReplicatingSubscription start position. + if (!delayed.empty() && delayed.front() <= position) { // FIXME aconway 2012-06-15: queue iteration, only messages in delayed - queue.eachMessage(boost::bind(&completeBefore, this, rs.getPosition(), _1)); + queue.eachMessage(boost::bind(&completeBefore, this, position, _1)); } - Mutex::ScopedLock l(lock); - // FIXME aconway 2012-06-15: complete messages before rs.getPosition - subscription = &rs; + return position >= range.back; } void QueueGuard::complete(const QueuedMessage& qm) { @@ -135,11 +135,6 @@ void QueueGuard::complete(const QueuedMessage& qm) { qm.payload->getIngressCompletion().finishCompleter(); } -framing::SequenceNumber QueueGuard::getFirstSafe() { - // No lock, firstSafe is immutable. - return firstSafe; -} - // FIXME aconway 2012-06-04: TODO support for timeout. }} // namespaces qpid::ha diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h index 9a68222467..9c6fb55015 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.h +++ b/qpid/cpp/src/qpid/ha/QueueGuard.h @@ -23,6 +23,7 @@ */ #include "types.h" +#include "QueueRange.h" #include "qpid/framing/SequenceNumber.h" #include "qpid/framing/SequenceSet.h" #include "qpid/types/Uuid.h" @@ -78,10 +79,19 @@ class QueueGuard { void attach(ReplicatingSubscription&); /** - * The first sequence number to be protected by this guard. All messages at - * or after this position are protected. + * Return the queue range at the time the QueueGuard was created. The + * QueueGuard is created before the queue becomes active: either when a + * backup is promoted, or when a new queue is created on the primary. + * + * NOTE: The first position protected by the guard is getRange().getBack()+1 */ - framing::SequenceNumber getFirstSafe(); + const QueueRange& getRange() const { return range; } // range is immutable, no lock needed. + + /** Inform the guard of the stating position for the attached subscription. + * Complete messages that will not be seen by the subscriptino. + *@return true if the subscription has already advanced to a guarded position. + */ + bool subscriptionStart(framing::SequenceNumber position); private: class QueueObserver; @@ -92,7 +102,7 @@ class QueueGuard { framing::SequenceSet delayed; ReplicatingSubscription* subscription; boost::shared_ptr<QueueObserver> observer; - framing::SequenceNumber firstSafe; // Immutable + const QueueRange range; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/QueueRange.h b/qpid/cpp/src/qpid/ha/QueueRange.h new file mode 100644 index 0000000000..3ca034e411 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/QueueRange.h @@ -0,0 +1,71 @@ +#ifndef QPID_HA_QUEUERANGE_H +#define QPID_HA_QUEUERANGE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ReplicatingSubscription.h" +#include "qpid/broker/Queue.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/SequenceNumber.h" +#include <iostream> + +namespace qpid { +namespace ha { + +/** + * Get the front/back range of a queue or from a ReplicatingSubscription arguments table. + */ +struct QueueRange { + public: + framing::SequenceNumber front, back; + + QueueRange() { } + + QueueRange(broker::Queue& q) { + back = q.getPosition(); + front = back+1; // assume empty + ReplicatingSubscription::getFront(q, front); + assert(front <= back + 1); + } + + QueueRange(const framing::FieldTable& args) { + back = args.getAsInt(ReplicatingSubscription::QPID_BACK); + front = back+1; + if (args.isSet(ReplicatingSubscription::QPID_FRONT)) + front = args.getAsInt(ReplicatingSubscription::QPID_FRONT); + if (back+1 < front) + throw Exception(QPID_MSG("Invalid range [" << front << "," << back <<"]")); + } + + bool empty() const { return front == back+1; } +}; + + +inline std::ostream& operator<<(std::ostream& o, const QueueRange& qr) { + if (qr.front > qr.back) return o << "[-" << qr.back << "]"; + else return o << "[" << qr.front << "," << qr.back << "]"; +} + + +}} // namespace qpid::ha + +#endif /*!QPID_HA_QUEUERANGE_H*/ diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 0d6cbb7ddc..629014b215 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -20,6 +20,7 @@ */ #include "QueueGuard.h" +#include "QueueRange.h" #include "QueueReplicator.h" #include "ReplicatingSubscription.h" #include "Primary.h" @@ -56,31 +57,31 @@ class DequeueScanner public: DequeueScanner( ReplicatingSubscription& rs, - const SequenceNumber& first_, - const SequenceNumber& last_ // Inclusive - ) : subscription(rs), first(first_), last(last_) + SequenceNumber front_, + SequenceNumber back_ // Inclusive + ) : subscription(rs), front(front_), back(back_) { - assert(first <= last); - // INVARIANT no deques are needed for positions <= at. - at = first - 1; + assert(front <= back); + // INVARIANT deques have been added for positions <= at. + at = front - 1; } void operator()(const QueuedMessage& qm) { - if (qm.position >= first && qm.position <= last) { - if (qm.position > at+1) - subscription.dequeued(at+1, qm.position-1); + if (qm.position >= front && qm.position <= back) { + if (qm.position > at+1) subscription.dequeued(at+1, qm.position-1); at = qm.position; } } + // Must call after scanning the queue. void finish() { - if (at < last) subscription.dequeued(at+1, last); + if (at < back) subscription.dequeued(at+1, back); } private: ReplicatingSubscription& subscription; - SequenceNumber first; - SequenceNumber last; + SequenceNumber front; + SequenceNumber back; SequenceNumber at; }; @@ -146,38 +147,6 @@ ReplicatingSubscription::Factory::create( return rs; } -struct QueueRange { - bool empty; - SequenceNumber front; - SequenceNumber back; - - QueueRange() { } - - QueueRange(broker::Queue& q) { - back = q.getPosition(); - front = back+1; // Assume empty - empty = !ReplicatingSubscription::getFront(q, front); - assert(empty || front <= back); - } - - QueueRange(const framing::FieldTable args) { - back = args.getAsInt(ReplicatingSubscription::QPID_BACK); - front = back+1; // Assume empty - empty = !args.isSet(ReplicatingSubscription::QPID_FRONT); - if (!empty) { - front = args.getAsInt(ReplicatingSubscription::QPID_FRONT); - if (back < front) - throw InvalidArgumentException( - QPID_MSG("Invalid range [" << front << "," << back <<"]")); - } - } -}; - -ostream& operator<<(ostream& o, const QueueRange& qr) { - if (qr.front > qr.back) return o << "[-" << qr.back << "]"; - else return o << "[" << qr.front << "," << qr.back << "]"; -} - ReplicatingSubscription::ReplicatingSubscription( SemanticState* parent, const string& name, @@ -205,16 +174,9 @@ ReplicatingSubscription::ReplicatingSubscription( os << "Primary " << queue->getName() << "@" << info.getLogId() << ": "; logPrefix = os.str(); - // FIXME aconway 2012-06-10: unsafe to rely in queue front or position they are changing? - - QueueRange primary(*queue); // The local primary queue. - QueueRange backup(arguments); // The remote backup state. - backupPosition = backup.back; - // NOTE: Once the guard is attached we can have concurrent - // calles to dequeued so we need to lock use of this->deques. + // calls to dequeued so we need to lock use of this->deques. // - // However we must attach the guard _before_ we scan for // initial dequeues to be sure we don't miss any dequeues // between the scan and attaching the guard. @@ -222,43 +184,44 @@ ReplicatingSubscription::ReplicatingSubscription( if (!guard) guard.reset(new QueueGuard(*queue, getBrokerInfo())); guard->attach(*this); - // We can re-use some backup messages if backup and primary queues - // overlap and the backup is not missing messages at the front of the queue. + QueueRange backup(arguments); // The remote backup state. + QueueRange primary(guard->getRange()); // The local state at the time the guard was set. + backupPosition = backup.back; + + // Sync backup and primary queues, don't send messages already on the backup - /* if (!primary.empty && // Primary not empty - !backup.empty && // Backup not empty - primary.front >= backup.front && // Not missing messages at the front - primary.front <= backup.back // Overlap - ) + if (backup.back < primary.front || backup.front > primary.back + || primary.empty() || backup.empty()) { - // Scan primary queue for gaps that should be dequeued on the backup. - // NOTE: this runs concurrently with the guard calling dequeued() - // FIXME aconway 2012-05-22: optimize queue iteration + // No overlap - erase backup and start from the beginning + if (!backup.empty()) dequeued(backup.front, backup.back); + position = primary.front-1; + } + else { // backup and primary do overlap. + // Remove messages from backup that are not in primary. + if (primary.back < backup.back) { + dequeued(primary.back+1, backup.back); // Trim excess messages at back + backup.back = primary.back; + } + if (backup.front < primary.front) { + dequeued(backup.front, primary.front-1); // Trim excess messages at front + backup.front = primary.front; + } DequeueScanner scan(*this, backup.front, backup.back); - queue->eachMessage(scan); + // FIXME aconway 2012-06-15: Optimize queue traversal, only in range. + queue->eachMessage(boost::ref(scan)); // Remove missing messages in between. scan.finish(); - // If the backup was ahead it has been pruned back to the primary. - position = std::min(guard->getFirstSafe(), backup.back); + position = backup.back; } - else */ { - // Clear the backup queue and reset to start browsing at the - // front of the primary queue. - if (!backup.empty) dequeued(backup.front, backup.back); - position = primary.front - 1; // Start consuming from front. - } - QPID_LOG(debug, logPrefix << "Subscribed: " + + QPID_LOG(debug, logPrefix << "Subscribed:" << " backup:" << backup << " primary:" << primary - << " position:" << position - << " safe position: " << guard->getFirstSafe() - ); - - // Are we ready yet? - if (position+1 >= guard->getFirstSafe()) // Next message will be safe. - setReady(); - else - QPID_LOG(debug, logPrefix << "Catching up from " - << position << " to " << guard->getFirstSafe()); + << " catch-up: " << position << "-" << primary.back + << "(" << primary.back-position << ")"); + + // Check if we are ready yet. + if (guard->subscriptionStart(position)) setReady(); } catch (const std::exception& e) { throw InvalidArgumentException(QPID_MSG(logPrefix << e.what() @@ -307,11 +270,8 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { // Backup will automatically advance by 1 on delivery of message. backupPosition = qm.position; } - // Deliver the message - return ConsumerImpl::deliver(qm); } - else - return ConsumerImpl::deliver(qm); // Message is for internal event queue. + return ConsumerImpl::deliver(qm); } catch (const std::exception& e) { QPID_LOG(critical, logPrefix << "Error replicating " << qm << ": " << e.what()); @@ -344,7 +304,7 @@ void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) { QPID_LOG(trace, logPrefix << "Acknowledged " << qm); guard->complete(qm); // If next message is protected by the guard then we are ready - if (qm.position+1 >= guard->getFirstSafe()) setReady(); + if (qm.position >= guard->getRange().back) setReady(); } ConsumerImpl::acknowledged(qm); } diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index b8b642c504..aea4460e5a 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -572,7 +572,7 @@ class NumberedSender(Thread): """ Thread.__init__(self) cmd = ["qpid-send", - "--broker", url or broker.host_port(), + "--broker", url or broker.host_port(), "--address", "%s;{create:always}"%queue, "--connection-options", "{%s}"%(connection_options), "--content-stdin" @@ -647,6 +647,7 @@ class NumberedReceiver(Thread): self.error = None self.sender = sender self.received = 0 + self.queue = queue def read_message(self): n = int(self.receiver.stdout.readline()) @@ -657,7 +658,7 @@ class NumberedReceiver(Thread): m = self.read_message() while m != -1: self.receiver.assert_running() - assert m <= self.received, "%s missing message %s>%s"%(queue, m, self.received) + assert m <= self.received, "%s missing message %s>%s"%(self.queue, m, self.received) if (m == self.received): # Ignore duplicates self.received += 1 if self.sender: diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index e5a204d03c..7338136bfd 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -729,7 +729,7 @@ class LongTests(BrokerTest): brokers = HaCluster(self, 3) # Start sender and receiver threads - n = 10; + n = 10 senders = [NumberedSender(brokers[0], max_depth=1024, failover_updates=False, queue="test%s"%(i)) for i in xrange(n)] receivers = [NumberedReceiver(brokers[0], sender=senders[i], @@ -760,8 +760,7 @@ class LongTests(BrokerTest): def enough(): # Verify we're still running receivers[0].check() # Verify no exceptions return receivers[0].received > n + 100 - # FIXME aconway 2012-05-17: client reconnect sometimes takes > 1 sec. - assert retry(enough, 10), "Stalled: %s < %s+100"%(receivers[0].received, n) + assert retry(enough), "Stalled: %s < %s+100"%(receivers[0].received, n) except: traceback.print_exc() raise |