summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-06-18 18:08:19 +0000
committerAlan Conway <aconway@apache.org>2012-06-18 18:08:19 +0000
commita6d034417e59a06f783109ec4ca810737aa30428 (patch)
treef04e34c00a89726efe00d5b3a460bb73c6719bd7
parent81c4f3d48f66fddfb6b0b74f1f768cd7ee245ef7 (diff)
downloadqpid-python-a6d034417e59a06f783109ec4ca810737aa30428.tar.gz
QPID-3603: Fix & clean up in HA code.
- Fix fencepost error in getFirstSafe() - QueueGuard::attach completes messages before the ReplicatingSubscription postion - Fix minor test issues in brokertest.py and ha_test.py. - ReplicatingSubscription check for ready in acknowledge not dispatch. - HA test fix: retry wait_status retry on ConnectErrors, broker may not be up. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1351435 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.cpp24
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.h14
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp11
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h15
-rw-r--r--qpid/cpp/src/tests/brokertest.py14
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py15
6 files changed, 58 insertions, 35 deletions
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
index b391a5257b..6293f640e1 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
@@ -24,6 +24,7 @@
#include "qpid/broker/QueuedMessage.h"
#include "qpid/broker/QueueObserver.h"
#include "qpid/log/Statement.h"
+#include <boost/bind.hpp>
#include <sstream>
namespace qpid {
@@ -51,16 +52,13 @@ class QueueGuard::QueueObserver : public broker::QueueObserver
QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
: queue(q), subscription(0)
{
- // NOTE: There is no activity on the queue while QueueGuard constructor is
- // running It is called either from Primary before client connections are
- // allowed or from ConfigurationObserver::queueCreate before the queue is
- // visible.
std::ostringstream os;
os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": ";
logPrefix = os.str();
observer.reset(new QueueObserver(*this));
queue.addObserver(observer);
- firstSafe = queue.getPosition(); // FIXME aconway 2012-06-13: fencepost error
+ // Set after addObserver to ensure we dont miss an enqueue.
+ firstSafe = queue.getPosition() + 1; // Next message will be protected by the guard.
}
QueueGuard::~QueueGuard() { cancel(); }
@@ -101,9 +99,23 @@ void QueueGuard::cancel() {
queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1));
}
+namespace {
+void completeBefore(QueueGuard* guard, SequenceNumber position, const QueuedMessage& qm) {
+ if (qm.position <= position) guard->complete(qm);
+}
+}
+
void QueueGuard::attach(ReplicatingSubscription& rs) {
- Mutex::ScopedLock l(lock);
+ // NOTE: attach is called before the ReplicatingSubscription is active so
+ // it's position is not changing.
assert(firstSafe >= rs.getPosition());
+ // Complete any messages before or at the ReplicatingSubscription position.
+ if (!delayed.empty() && delayed.front() <= rs.getPosition()) {
+ // FIXME aconway 2012-06-15: queue iteration, only messages in delayed
+ queue.eachMessage(boost::bind(&completeBefore, this, rs.getPosition(), _1));
+ }
+ Mutex::ScopedLock l(lock);
+ // FIXME aconway 2012-06-15: complete messages before rs.getPosition
subscription = &rs;
}
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h
index 1f8c117d0b..9a68222467 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.h
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.h
@@ -43,12 +43,11 @@ class BrokerInfo;
class ReplicatingSubscription;
/**
- * A queue guard is a QueueObserver that delays completion of new
- * messages arriving on a queue. It works as part of a
- * ReplicatingSubscription to ensure messages are not acknowledged
- * till they have been replicated.
+ * A queue guard is a QueueObserver that delays completion of new messages
+ * arriving on a queue. It works as part of a ReplicatingSubscription to ensure
+ * messages are not acknowledged till they have been replicated.
*
- * The guard is created before the ReplicatingSubscription to protect
+ * The guard can be created before the ReplicatingSubscription to protect
* messages arriving before the creation of the subscription.
*
* THREAD SAFE: Concurrent calls:
@@ -78,8 +77,9 @@ class QueueGuard {
void attach(ReplicatingSubscription&);
- /** The first sequence number protected by this guard.
- * All messages at or after this position are protected.
+ /**
+ * The first sequence number to be protected by this guard. All messages at
+ * or after this position are protected.
*/
framing::SequenceNumber getFirstSafe();
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 40ede938a4..0d6cbb7ddc 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -248,9 +248,9 @@ ReplicatingSubscription::ReplicatingSubscription(
}
QPID_LOG(debug, logPrefix << "Subscribed: "
<< " backup:" << backup
- << " backup position:" << backupPosition
<< " primary:" << primary
<< " position:" << position
+ << " safe position: " << guard->getFirstSafe()
);
// Are we ready yet?
@@ -308,10 +308,7 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
backupPosition = qm.position;
}
// Deliver the message
- bool delivered = ConsumerImpl::deliver(qm);
- // If we have advanced past the initial position, the backup is ready.
- if (qm.position >= guard->getFirstSafe()) setReady();
- return delivered;
+ return ConsumerImpl::deliver(qm);
}
else
return ConsumerImpl::deliver(qm); // Message is for internal event queue.
@@ -329,7 +326,7 @@ void ReplicatingSubscription::setReady() {
ready = true;
}
// Notify Primary that a subscription is ready.
- QPID_LOG(info, logPrefix << "Caught up at " << getPosition());
+ QPID_LOG(debug, logPrefix << "Caught up");
if (Primary::get()) Primary::get()->readyReplica(*this);
}
@@ -346,6 +343,8 @@ void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) {
// Finish completion of message, it has been acknowledged by the backup.
QPID_LOG(trace, logPrefix << "Acknowledged " << qm);
guard->complete(qm);
+ // If next message is protected by the guard then we are ready
+ if (qm.position+1 >= guard->getFirstSafe()) setReady();
}
ConsumerImpl::acknowledged(qm);
}
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 7e46abf2ae..afa503d8cc 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -48,12 +48,17 @@ class QueueGuard;
/**
* A susbcription that replicates to a remote backup.
*
- * Runs on the primary. In conjunction with a QueueGuard, delays
- * completion of messages till the backup has acknowledged, informs
- * backup of locally dequeued messages.
+ * Runs on the primary. In conjunction with a QueueGuard, delays completion of
+ * messages till the backup has acknowledged, informs backup of locally dequeued
+ * messages.
*
- * THREAD SAFE: Called in subscription's connection thread but also
- * in arbitrary connection threads via dequeued.
+ * A ReplicatingSubscription is "ready" when all the messages on the queue have
+ * either been acknowledged by the backup, or are protected by the queue guard.
+ * On a primary broker the ReplicatingSubscription calls Primary::readyReplica
+ * when it is ready.
+ *
+ * THREAD SAFE: Called in subscription's connection thread but also in arbitrary
+ * connection threads via dequeued.
*
* Lifecycle: broker::Queue holds shared_ptrs to this as a consumer.
*
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 93b3868907..b8b642c504 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -76,20 +76,20 @@ def error_line(filename, n=1):
except: return ""
return ":\n" + "".join(result)
-def retry(function, timeout=10, delay=.01):
+def retry(function, timeout=10, delay=.01, max_delay=1):
"""Call function until it returns a true value or timeout expires.
- Double the delay for each retry. Returns what function returns if
- true, None if timeout expires."""
+ Double the delay for each retry up to max_delay.
+ Returns what function returns if true, None if timeout expires."""
deadline = time.time() + timeout
ret = None
- while not ret:
+ while True:
ret = function()
+ if ret: return ret
remaining = deadline - time.time()
if remaining <= 0: return False
delay = min(delay, remaining)
time.sleep(delay)
- delay *= 2
- return ret
+ delay = min(delay*2, max_delay)
class AtomicCounter:
def __init__(self):
@@ -657,7 +657,7 @@ class NumberedReceiver(Thread):
m = self.read_message()
while m != -1:
self.receiver.assert_running()
- assert m <= self.received, "Missing message %s>%s"%(m, self.received)
+ assert m <= self.received, "%s missing message %s>%s"%(queue, m, self.received)
if (m == self.received): # Ignore duplicates
self.received += 1
if self.sender:
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 0f9efdf80a..e5a204d03c 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -77,10 +77,17 @@ class HaBroker(Broker):
if not self._agent: self._agent = QmfAgent(self.host_port())
return self._agent
- def ha_status(self): return self.agent().getHaBroker().status
+ def ha_status(self):
+ hb = self.agent().getHaBroker()
+ hb.update()
+ return hb.status
def wait_status(self, status):
- assert retry(lambda: self.ha_status() == status), "%s, %r != %r"%(self, self.ha_status(), status)
+ def try_get_status():
+ # Ignore ConnectionError, the broker may not be up yet.
+ try: return self.ha_status() == status;
+ except ConnectionError: return False
+ assert retry(try_get_status, timeout=20), "%s, %r != %r"%(self, self.ha_status(), status)
# FIXME aconway 2012-05-01: do direct python call to qpid-config code.
def qpid_config(self, args):
@@ -755,12 +762,12 @@ class LongTests(BrokerTest):
return receivers[0].received > n + 100
# FIXME aconway 2012-05-17: client reconnect sometimes takes > 1 sec.
assert retry(enough, 10), "Stalled: %s < %s+100"%(receivers[0].received, n)
- for s in senders: s.stop()
- for r in receivers: r.stop()
except:
traceback.print_exc()
raise
finally:
+ for s in senders: s.stop()
+ for r in receivers: r.stop()
dead = []
for i in xrange(3):
if not brokers[i].is_running(): dead.append(i)