summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-10-09 19:52:24 +0000
committerAlan Conway <aconway@apache.org>2012-10-09 19:52:24 +0000
commita769439c2e8bc10e33292e87008204a0d6c20059 (patch)
treef09fc24ca604c87b9343dd084f098cff00c65d8a
parenta12252bbafc72607e11aad13a8cb1e86ffc5d30f (diff)
downloadqpid-python-a769439c2e8bc10e33292e87008204a0d6c20059.tar.gz
QPID-4360: Fix test bug: Non-ready HA broker can be incorrectly promoted to primary.
Test test_delete_missing_response was failing with "cluster active, cannot promote". - Fixed test bug: "fake" primary triggered "cannot promote". - Backup: always create QueueReplicator if not already existing. - Terminology change: "initial" queues -> "catch-up" queues. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1396244 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp33
-rw-r--r--cpp/src/qpid/ha/Primary.cpp4
-rw-r--r--cpp/src/qpid/ha/RemoteBackup.cpp22
-rw-r--r--cpp/src/qpid/ha/RemoteBackup.h13
-rwxr-xr-xcpp/src/tests/ha_test.py2
-rwxr-xr-xcpp/src/tests/ha_tests.py28
6 files changed, 52 insertions, 50 deletions
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
index 73ab5327fc..14e6e1a5d1 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -302,12 +302,12 @@ void BrokerReplicator::route(Deliverable& msg) {
else if (type == HA_BROKER) doResponseHaBroker(values);
}
if (MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) {
- QPID_LOG(debug, logPrefix << "Initial exchange configuration complete.");
+ QPID_LOG(debug, logPrefix << "All exchange responses received.")
cleaner.cleanExchanges(); // Clean up exchanges that no longer exist in the primary
alternates.clear();
}
if (MessageTransfer::isLastQMFResponse(msg.getMessage(), QUEUE)) {
- QPID_LOG(debug, logPrefix << "Initial queue configuration complete.");
+ QPID_LOG(debug, logPrefix << "All queue responses received.")
cleaner.cleanQueues(); // Clean up queues that no longer exist in the primary
}
}
@@ -338,16 +338,9 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
deleteQueue(name);
}
settings.populate(args, settings.storeSettings);
- CreateQueueResult result =
- broker.createQueue(
- name,
- settings,
- 0 /*i.e. no owner regardless of exclusivity on master*/,
- values[ALTEX].asString(),
- userId,
- remoteHost);
- assert(result.second);
- startQueueReplicator(result.first);
+ CreateQueueResult result = createQueue(
+ name, values[DURABLE].asBool(), autoDel, args, values[ALTEX].asString());
+ assert(result.second); // Should be created since we destroed the previous queue above.
}
}
@@ -494,7 +487,9 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
CreateQueueResult result =
createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
getAltExchange(values[ALTEXCHANGE]));
- if (result.second) startQueueReplicator(result.first);
+ // It is normal for the queue to already exist if we are failing over.
+ if (!result.second)
+ QPID_LOG(debug, logPrefix << "Queue already replicated: " << name);
}
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
@@ -596,8 +591,8 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu
}
}
-void BrokerReplicator::deactivateQueue(const std::string& name) {
- boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
+void BrokerReplicator::deactivateQueue(const std::string& queueName) {
+ boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(queueName);
if (qr) {
qr->deactivate();
// QueueReplicator's bridge is now queued for destruction but may not
@@ -646,10 +641,12 @@ BrokerReplicator::CreateQueueResult BrokerReplicator::createQueue(
string(), // Set alternate exchange below
userId,
remoteHost);
-
- if (!alternateExchange.empty()) {
+ boost::shared_ptr<Queue> queue = result.first;
+ if (!findQueueReplicator(queue->getName())) startQueueReplicator(queue);
+ if (result.second && !alternateExchange.empty()) {
alternates.setAlternate(
- alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1));
+ alternateExchange,
+ boost::bind(&Queue::setAlternateExchange, result.first, _1));
}
return result;
}
diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp
index 4057f4dcde..bdb1a66a83 100644
--- a/cpp/src/qpid/ha/Primary.cpp
+++ b/cpp/src/qpid/ha/Primary.cpp
@@ -93,7 +93,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
new RemoteBackup(*i, haBroker.getReplicationTest(), false));
backups[i->getSystemId()] = backup;
if (!backup->isReady()) expectedBackups.insert(backup);
- backup->setInitialQueues(hb.getBroker().getQueues(), true); // Create guards
+ backup->setCatchupQueues(hb.getBroker().getQueues(), true); // Create guards
}
// Set timeout for expected brokers to connect and become ready.
sys::Duration timeout(int64_t(hb.getSettings().backupTimeout*sys::TIME_SEC));
@@ -207,7 +207,7 @@ void Primary::opened(broker::Connection& connection) {
{
// Avoid deadlock with queue registry lock.
Mutex::ScopedUnlock u(lock);
- backup->setInitialQueues(haBroker.getBroker().getQueues(), false);
+ backup->setCatchupQueues(haBroker.getBroker().getQueues(), false);
}
backups[info.getSystemId()] = backup;
}
diff --git a/cpp/src/qpid/ha/RemoteBackup.cpp b/cpp/src/qpid/ha/RemoteBackup.cpp
index 3421380940..ce4e545c80 100644
--- a/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -37,10 +37,10 @@ RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con)
brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false)
{}
-void RemoteBackup::setInitialQueues(broker::QueueRegistry& queues, bool createGuards)
+void RemoteBackup::setCatchupQueues(broker::QueueRegistry& queues, bool createGuards)
{
- QPID_LOG(debug, logPrefix << "Setting initial queues" << (createGuards ? " and guards" : ""));
- queues.eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1, createGuards));
+ QPID_LOG(debug, logPrefix << "Setting catch-up queues" << (createGuards ? " and guards" : ""));
+ queues.eachQueue(boost::bind(&RemoteBackup::catchupQueue, this, _1, createGuards));
}
RemoteBackup::~RemoteBackup() { cancel(); }
@@ -52,12 +52,14 @@ void RemoteBackup::cancel() {
}
bool RemoteBackup::isReady() {
- return connected && initialQueues.empty();
+ return connected && catchupQueues.empty();
}
-void RemoteBackup::initialQueue(const QueuePtr& q, bool createGuard) {
+void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) {
if (replicationTest.isReplicated(ALL, *q)) {
- initialQueues.insert(q);
+ QPID_LOG(debug, logPrefix << "Catch-up queue"
+ << (createGuard ? " and guard" : "") << ": " << q->getName());
+ catchupQueues.insert(q);
if (createGuard) guards[q].reset(new QueueGuard(*q, brokerInfo));
}
}
@@ -88,13 +90,13 @@ std::ostream& operator<<(std::ostream& o, const QueueSetPrinter& qp) {
}
void RemoteBackup::ready(const QueuePtr& q) {
- initialQueues.erase(q);
+ catchupQueues.erase(q);
QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName()
- << QueueSetPrinter(", waiting for: ", initialQueues));
+ << QueueSetPrinter(", waiting for: ", catchupQueues));
if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready");
}
-// Called via ConfigurationObserver::queueCreate and from initialQueue
+// Called via ConfigurationObserver::queueCreate and from catchupQueue
void RemoteBackup::queueCreate(const QueuePtr& q) {
if (replicationTest.isReplicated(ALL, *q))
guards[q].reset(new QueueGuard(*q, brokerInfo));
@@ -102,7 +104,7 @@ void RemoteBackup::queueCreate(const QueuePtr& q) {
// Called via ConfigurationObserver
void RemoteBackup::queueDestroy(const QueuePtr& q) {
- initialQueues.erase(q);
+ catchupQueues.erase(q);
GuardMap::iterator i = guards.find(q);
if (i != guards.end()) {
i->second->cancel();
diff --git a/cpp/src/qpid/ha/RemoteBackup.h b/cpp/src/qpid/ha/RemoteBackup.h
index 8ee776e90b..e48ceff3ae 100644
--- a/cpp/src/qpid/ha/RemoteBackup.h
+++ b/cpp/src/qpid/ha/RemoteBackup.h
@@ -57,10 +57,10 @@ class RemoteBackup
RemoteBackup(const BrokerInfo& info, ReplicationTest, bool connected);
~RemoteBackup();
- /** Set the initial queues for all queues in the registry.
- *@createGuards if true create guards also, if false guards will be created on demand.
+ /** Set all queues in the registry as catch-up queues.
+ *@createGuards if true create guards also, if false guards are created on demand.
*/
- void setInitialQueues(broker::QueueRegistry&, bool createGuards);
+ void setCatchupQueues(broker::QueueRegistry&, bool createGuards);
/** Return guard associated with a queue. Used to create ReplicatingSubscription. */
GuardPtr guard(const QueuePtr&);
@@ -80,7 +80,7 @@ class RemoteBackup
/** Called via ConfigurationObserver. Note: may set isReady() */
void queueDestroy(const QueuePtr&);
- /**@return true when all initial queues for this backup are ready. */
+ /**@return true when all catch-up queues for this backup are ready. */
bool isReady();
/**@return true if isReady() and this is the first call to reportReady */
@@ -94,14 +94,13 @@ class RemoteBackup
typedef std::map<QueuePtr, GuardPtr> GuardMap;
typedef std::set<QueuePtr> QueueSet;
- /** Add queue to guard as an initial queue */
- void initialQueue(const QueuePtr&, bool createGuard);
+ void catchupQueue(const QueuePtr&, bool createGuard);
std::string logPrefix;
BrokerInfo brokerInfo;
ReplicationTest replicationTest;
GuardMap guards;
- QueueSet initialQueues;
+ QueueSet catchupQueues;
bool connected;
bool reportedReady;
};
diff --git a/cpp/src/tests/ha_test.py b/cpp/src/tests/ha_test.py
index 79db67e3c8..462c90bfb3 100755
--- a/cpp/src/tests/ha_test.py
+++ b/cpp/src/tests/ha_test.py
@@ -199,7 +199,7 @@ class HaCluster(object):
HaCluster._cluster_count += 1
for i in xrange(n): self.start(False)
self.update_urls()
- self[0].promote()
+ if promote: self[0].promote()
def next_name(self):
name="cluster%s-%s"%(self.id, self.broker_id)
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py
index a8d16a77c9..7ce0d1701a 100755
--- a/cpp/src/tests/ha_tests.py
+++ b/cpp/src/tests/ha_tests.py
@@ -627,25 +627,29 @@ acl deny all all
def test_delete_missing_response(self):
"""Check that a backup correctly deletes leftover queues and exchanges that are
missing from the initial reponse set."""
- cluster = HaCluster(self,2)
- s = cluster[0].connect().session()
+ # This test is a bit contrived, we set up the situation on backup brokers
+ # and then promote one.
+ cluster = HaCluster(self, 2, promote=False)
+
+ # cluster[0] Will be the primary
+ s = cluster[0].connect_admin().session()
s.sender("q1;{create:always}")
- s.sender("q2;{create:always}")
s.sender("e1;{create:always, node:{type:topic}}")
- s.sender("e2;{create:always, node:{type:topic}}")
- cluster.bounce(0, promote_next=False)
- # Fake a primary that has deleted some queues and exchanges.
- s = cluster[0].connect_admin().session()
+
+ # cluster[1] will be the backup, has extra queues/exchanges
+ s = cluster[1].connect_admin().session()
+ s.sender("q1;{create:always}")
s.sender("q2;{create:always}")
+ s.sender("e1;{create:always, node:{type:topic}}")
s.sender("e2;{create:always, node:{type:topic}}")
- s.sender("x;{create:always}") # A new queue so we can wait for the update.
+ for a in ["q1", "q2", "e1", "e2"]: cluster[1].wait_backup(a)
+
cluster[0].promote()
- # Verify the backup has deleted the missing queues and exchanges
+ # Verify the backup deletes the surpluis queue and exchange
cluster[1].wait_status("ready")
s = cluster[1].connect_admin().session()
- cluster[1].wait_backup("x");
- self.assertRaises(NotFound, s.receiver, ("q1"));
- self.assertRaises(NotFound, s.receiver, ("e1"));
+ self.assertRaises(NotFound, s.receiver, ("q2"));
+ self.assertRaises(NotFound, s.receiver, ("e2"));
def test_auto_delete_qpid_4285(self):