diff options
author | Alan Conway <aconway@apache.org> | 2012-06-18 18:08:19 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-06-18 18:08:19 +0000 |
commit | a6d034417e59a06f783109ec4ca810737aa30428 (patch) | |
tree | f04e34c00a89726efe00d5b3a460bb73c6719bd7 | |
parent | 81c4f3d48f66fddfb6b0b74f1f768cd7ee245ef7 (diff) | |
download | qpid-python-a6d034417e59a06f783109ec4ca810737aa30428.tar.gz |
QPID-3603: Fix & clean up in HA code.
- Fix fencepost error in getFirstSafe()
- QueueGuard::attach completes messages before the ReplicatingSubscription postion
- Fix minor test issues in brokertest.py and ha_test.py.
- ReplicatingSubscription check for ready in acknowledge not dispatch.
- HA test fix: retry wait_status retry on ConnectErrors, broker may not be up.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1351435 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueGuard.cpp | 24 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueGuard.h | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 15 | ||||
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 14 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 15 |
6 files changed, 58 insertions, 35 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp index b391a5257b..6293f640e1 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp +++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp @@ -24,6 +24,7 @@ #include "qpid/broker/QueuedMessage.h" #include "qpid/broker/QueueObserver.h" #include "qpid/log/Statement.h" +#include <boost/bind.hpp> #include <sstream> namespace qpid { @@ -51,16 +52,13 @@ class QueueGuard::QueueObserver : public broker::QueueObserver QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) : queue(q), subscription(0) { - // NOTE: There is no activity on the queue while QueueGuard constructor is - // running It is called either from Primary before client connections are - // allowed or from ConfigurationObserver::queueCreate before the queue is - // visible. std::ostringstream os; os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": "; logPrefix = os.str(); observer.reset(new QueueObserver(*this)); queue.addObserver(observer); - firstSafe = queue.getPosition(); // FIXME aconway 2012-06-13: fencepost error + // Set after addObserver to ensure we dont miss an enqueue. + firstSafe = queue.getPosition() + 1; // Next message will be protected by the guard. } QueueGuard::~QueueGuard() { cancel(); } @@ -101,9 +99,23 @@ void QueueGuard::cancel() { queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1)); } +namespace { +void completeBefore(QueueGuard* guard, SequenceNumber position, const QueuedMessage& qm) { + if (qm.position <= position) guard->complete(qm); +} +} + void QueueGuard::attach(ReplicatingSubscription& rs) { - Mutex::ScopedLock l(lock); + // NOTE: attach is called before the ReplicatingSubscription is active so + // it's position is not changing. assert(firstSafe >= rs.getPosition()); + // Complete any messages before or at the ReplicatingSubscription position. + if (!delayed.empty() && delayed.front() <= rs.getPosition()) { + // FIXME aconway 2012-06-15: queue iteration, only messages in delayed + queue.eachMessage(boost::bind(&completeBefore, this, rs.getPosition(), _1)); + } + Mutex::ScopedLock l(lock); + // FIXME aconway 2012-06-15: complete messages before rs.getPosition subscription = &rs; } diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h index 1f8c117d0b..9a68222467 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.h +++ b/qpid/cpp/src/qpid/ha/QueueGuard.h @@ -43,12 +43,11 @@ class BrokerInfo; class ReplicatingSubscription; /** - * A queue guard is a QueueObserver that delays completion of new - * messages arriving on a queue. It works as part of a - * ReplicatingSubscription to ensure messages are not acknowledged - * till they have been replicated. + * A queue guard is a QueueObserver that delays completion of new messages + * arriving on a queue. It works as part of a ReplicatingSubscription to ensure + * messages are not acknowledged till they have been replicated. * - * The guard is created before the ReplicatingSubscription to protect + * The guard can be created before the ReplicatingSubscription to protect * messages arriving before the creation of the subscription. * * THREAD SAFE: Concurrent calls: @@ -78,8 +77,9 @@ class QueueGuard { void attach(ReplicatingSubscription&); - /** The first sequence number protected by this guard. - * All messages at or after this position are protected. + /** + * The first sequence number to be protected by this guard. All messages at + * or after this position are protected. */ framing::SequenceNumber getFirstSafe(); diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 40ede938a4..0d6cbb7ddc 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -248,9 +248,9 @@ ReplicatingSubscription::ReplicatingSubscription( } QPID_LOG(debug, logPrefix << "Subscribed: " << " backup:" << backup - << " backup position:" << backupPosition << " primary:" << primary << " position:" << position + << " safe position: " << guard->getFirstSafe() ); // Are we ready yet? @@ -308,10 +308,7 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { backupPosition = qm.position; } // Deliver the message - bool delivered = ConsumerImpl::deliver(qm); - // If we have advanced past the initial position, the backup is ready. - if (qm.position >= guard->getFirstSafe()) setReady(); - return delivered; + return ConsumerImpl::deliver(qm); } else return ConsumerImpl::deliver(qm); // Message is for internal event queue. @@ -329,7 +326,7 @@ void ReplicatingSubscription::setReady() { ready = true; } // Notify Primary that a subscription is ready. - QPID_LOG(info, logPrefix << "Caught up at " << getPosition()); + QPID_LOG(debug, logPrefix << "Caught up"); if (Primary::get()) Primary::get()->readyReplica(*this); } @@ -346,6 +343,8 @@ void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) { // Finish completion of message, it has been acknowledged by the backup. QPID_LOG(trace, logPrefix << "Acknowledged " << qm); guard->complete(qm); + // If next message is protected by the guard then we are ready + if (qm.position+1 >= guard->getFirstSafe()) setReady(); } ConsumerImpl::acknowledged(qm); } diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 7e46abf2ae..afa503d8cc 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -48,12 +48,17 @@ class QueueGuard; /** * A susbcription that replicates to a remote backup. * - * Runs on the primary. In conjunction with a QueueGuard, delays - * completion of messages till the backup has acknowledged, informs - * backup of locally dequeued messages. + * Runs on the primary. In conjunction with a QueueGuard, delays completion of + * messages till the backup has acknowledged, informs backup of locally dequeued + * messages. * - * THREAD SAFE: Called in subscription's connection thread but also - * in arbitrary connection threads via dequeued. + * A ReplicatingSubscription is "ready" when all the messages on the queue have + * either been acknowledged by the backup, or are protected by the queue guard. + * On a primary broker the ReplicatingSubscription calls Primary::readyReplica + * when it is ready. + * + * THREAD SAFE: Called in subscription's connection thread but also in arbitrary + * connection threads via dequeued. * * Lifecycle: broker::Queue holds shared_ptrs to this as a consumer. * diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 93b3868907..b8b642c504 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -76,20 +76,20 @@ def error_line(filename, n=1): except: return "" return ":\n" + "".join(result) -def retry(function, timeout=10, delay=.01): +def retry(function, timeout=10, delay=.01, max_delay=1): """Call function until it returns a true value or timeout expires. - Double the delay for each retry. Returns what function returns if - true, None if timeout expires.""" + Double the delay for each retry up to max_delay. + Returns what function returns if true, None if timeout expires.""" deadline = time.time() + timeout ret = None - while not ret: + while True: ret = function() + if ret: return ret remaining = deadline - time.time() if remaining <= 0: return False delay = min(delay, remaining) time.sleep(delay) - delay *= 2 - return ret + delay = min(delay*2, max_delay) class AtomicCounter: def __init__(self): @@ -657,7 +657,7 @@ class NumberedReceiver(Thread): m = self.read_message() while m != -1: self.receiver.assert_running() - assert m <= self.received, "Missing message %s>%s"%(m, self.received) + assert m <= self.received, "%s missing message %s>%s"%(queue, m, self.received) if (m == self.received): # Ignore duplicates self.received += 1 if self.sender: diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 0f9efdf80a..e5a204d03c 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -77,10 +77,17 @@ class HaBroker(Broker): if not self._agent: self._agent = QmfAgent(self.host_port()) return self._agent - def ha_status(self): return self.agent().getHaBroker().status + def ha_status(self): + hb = self.agent().getHaBroker() + hb.update() + return hb.status def wait_status(self, status): - assert retry(lambda: self.ha_status() == status), "%s, %r != %r"%(self, self.ha_status(), status) + def try_get_status(): + # Ignore ConnectionError, the broker may not be up yet. + try: return self.ha_status() == status; + except ConnectionError: return False + assert retry(try_get_status, timeout=20), "%s, %r != %r"%(self, self.ha_status(), status) # FIXME aconway 2012-05-01: do direct python call to qpid-config code. def qpid_config(self, args): @@ -755,12 +762,12 @@ class LongTests(BrokerTest): return receivers[0].received > n + 100 # FIXME aconway 2012-05-17: client reconnect sometimes takes > 1 sec. assert retry(enough, 10), "Stalled: %s < %s+100"%(receivers[0].received, n) - for s in senders: s.stop() - for r in receivers: r.stop() except: traceback.print_exc() raise finally: + for s in senders: s.stop() + for r in receivers: r.stop() dead = [] for i in xrange(3): if not brokers[i].is_running(): dead.append(i) |