diff options
author | Alan Conway <aconway@apache.org> | 2012-10-09 19:52:24 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-10-09 19:52:24 +0000 |
commit | a769439c2e8bc10e33292e87008204a0d6c20059 (patch) | |
tree | f09fc24ca604c87b9343dd084f098cff00c65d8a | |
parent | a12252bbafc72607e11aad13a8cb1e86ffc5d30f (diff) | |
download | qpid-python-a769439c2e8bc10e33292e87008204a0d6c20059.tar.gz |
QPID-4360: Fix test bug: Non-ready HA broker can be incorrectly promoted to primary.
Test test_delete_missing_response was failing with "cluster active, cannot promote".
- Fixed test bug: "fake" primary triggered "cannot promote".
- Backup: always create QueueReplicator if not already existing.
- Terminology change: "initial" queues -> "catch-up" queues.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1396244 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.cpp | 33 | ||||
-rw-r--r-- | cpp/src/qpid/ha/Primary.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/ha/RemoteBackup.cpp | 22 | ||||
-rw-r--r-- | cpp/src/qpid/ha/RemoteBackup.h | 13 | ||||
-rwxr-xr-x | cpp/src/tests/ha_test.py | 2 | ||||
-rwxr-xr-x | cpp/src/tests/ha_tests.py | 28 |
6 files changed, 52 insertions, 50 deletions
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index 73ab5327fc..14e6e1a5d1 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -302,12 +302,12 @@ void BrokerReplicator::route(Deliverable& msg) { else if (type == HA_BROKER) doResponseHaBroker(values); } if (MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) { - QPID_LOG(debug, logPrefix << "Initial exchange configuration complete."); + QPID_LOG(debug, logPrefix << "All exchange responses received.") cleaner.cleanExchanges(); // Clean up exchanges that no longer exist in the primary alternates.clear(); } if (MessageTransfer::isLastQMFResponse(msg.getMessage(), QUEUE)) { - QPID_LOG(debug, logPrefix << "Initial queue configuration complete."); + QPID_LOG(debug, logPrefix << "All queue responses received.") cleaner.cleanQueues(); // Clean up queues that no longer exist in the primary } } @@ -338,16 +338,9 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { deleteQueue(name); } settings.populate(args, settings.storeSettings); - CreateQueueResult result = - broker.createQueue( - name, - settings, - 0 /*i.e. no owner regardless of exclusivity on master*/, - values[ALTEX].asString(), - userId, - remoteHost); - assert(result.second); - startQueueReplicator(result.first); + CreateQueueResult result = createQueue( + name, values[DURABLE].asBool(), autoDel, args, values[ALTEX].asString()); + assert(result.second); // Should be created since we destroed the previous queue above. } } @@ -494,7 +487,9 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { CreateQueueResult result = createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args, getAltExchange(values[ALTEXCHANGE])); - if (result.second) startQueueReplicator(result.first); + // It is normal for the queue to already exist if we are failing over. + if (!result.second) + QPID_LOG(debug, logPrefix << "Queue already replicated: " << name); } void BrokerReplicator::doResponseExchange(Variant::Map& values) { @@ -596,8 +591,8 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu } } -void BrokerReplicator::deactivateQueue(const std::string& name) { - boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name); +void BrokerReplicator::deactivateQueue(const std::string& queueName) { + boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(queueName); if (qr) { qr->deactivate(); // QueueReplicator's bridge is now queued for destruction but may not @@ -646,10 +641,12 @@ BrokerReplicator::CreateQueueResult BrokerReplicator::createQueue( string(), // Set alternate exchange below userId, remoteHost); - - if (!alternateExchange.empty()) { + boost::shared_ptr<Queue> queue = result.first; + if (!findQueueReplicator(queue->getName())) startQueueReplicator(queue); + if (result.second && !alternateExchange.empty()) { alternates.setAlternate( - alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1)); + alternateExchange, + boost::bind(&Queue::setAlternateExchange, result.first, _1)); } return result; } diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp index 4057f4dcde..bdb1a66a83 100644 --- a/cpp/src/qpid/ha/Primary.cpp +++ b/cpp/src/qpid/ha/Primary.cpp @@ -93,7 +93,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : new RemoteBackup(*i, haBroker.getReplicationTest(), false)); backups[i->getSystemId()] = backup; if (!backup->isReady()) expectedBackups.insert(backup); - backup->setInitialQueues(hb.getBroker().getQueues(), true); // Create guards + backup->setCatchupQueues(hb.getBroker().getQueues(), true); // Create guards } // Set timeout for expected brokers to connect and become ready. sys::Duration timeout(int64_t(hb.getSettings().backupTimeout*sys::TIME_SEC)); @@ -207,7 +207,7 @@ void Primary::opened(broker::Connection& connection) { { // Avoid deadlock with queue registry lock. Mutex::ScopedUnlock u(lock); - backup->setInitialQueues(haBroker.getBroker().getQueues(), false); + backup->setCatchupQueues(haBroker.getBroker().getQueues(), false); } backups[info.getSystemId()] = backup; } diff --git a/cpp/src/qpid/ha/RemoteBackup.cpp b/cpp/src/qpid/ha/RemoteBackup.cpp index 3421380940..ce4e545c80 100644 --- a/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/cpp/src/qpid/ha/RemoteBackup.cpp @@ -37,10 +37,10 @@ RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false) {} -void RemoteBackup::setInitialQueues(broker::QueueRegistry& queues, bool createGuards) +void RemoteBackup::setCatchupQueues(broker::QueueRegistry& queues, bool createGuards) { - QPID_LOG(debug, logPrefix << "Setting initial queues" << (createGuards ? " and guards" : "")); - queues.eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1, createGuards)); + QPID_LOG(debug, logPrefix << "Setting catch-up queues" << (createGuards ? " and guards" : "")); + queues.eachQueue(boost::bind(&RemoteBackup::catchupQueue, this, _1, createGuards)); } RemoteBackup::~RemoteBackup() { cancel(); } @@ -52,12 +52,14 @@ void RemoteBackup::cancel() { } bool RemoteBackup::isReady() { - return connected && initialQueues.empty(); + return connected && catchupQueues.empty(); } -void RemoteBackup::initialQueue(const QueuePtr& q, bool createGuard) { +void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) { if (replicationTest.isReplicated(ALL, *q)) { - initialQueues.insert(q); + QPID_LOG(debug, logPrefix << "Catch-up queue" + << (createGuard ? " and guard" : "") << ": " << q->getName()); + catchupQueues.insert(q); if (createGuard) guards[q].reset(new QueueGuard(*q, brokerInfo)); } } @@ -88,13 +90,13 @@ std::ostream& operator<<(std::ostream& o, const QueueSetPrinter& qp) { } void RemoteBackup::ready(const QueuePtr& q) { - initialQueues.erase(q); + catchupQueues.erase(q); QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName() - << QueueSetPrinter(", waiting for: ", initialQueues)); + << QueueSetPrinter(", waiting for: ", catchupQueues)); if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready"); } -// Called via ConfigurationObserver::queueCreate and from initialQueue +// Called via ConfigurationObserver::queueCreate and from catchupQueue void RemoteBackup::queueCreate(const QueuePtr& q) { if (replicationTest.isReplicated(ALL, *q)) guards[q].reset(new QueueGuard(*q, brokerInfo)); @@ -102,7 +104,7 @@ void RemoteBackup::queueCreate(const QueuePtr& q) { // Called via ConfigurationObserver void RemoteBackup::queueDestroy(const QueuePtr& q) { - initialQueues.erase(q); + catchupQueues.erase(q); GuardMap::iterator i = guards.find(q); if (i != guards.end()) { i->second->cancel(); diff --git a/cpp/src/qpid/ha/RemoteBackup.h b/cpp/src/qpid/ha/RemoteBackup.h index 8ee776e90b..e48ceff3ae 100644 --- a/cpp/src/qpid/ha/RemoteBackup.h +++ b/cpp/src/qpid/ha/RemoteBackup.h @@ -57,10 +57,10 @@ class RemoteBackup RemoteBackup(const BrokerInfo& info, ReplicationTest, bool connected); ~RemoteBackup(); - /** Set the initial queues for all queues in the registry. - *@createGuards if true create guards also, if false guards will be created on demand. + /** Set all queues in the registry as catch-up queues. + *@createGuards if true create guards also, if false guards are created on demand. */ - void setInitialQueues(broker::QueueRegistry&, bool createGuards); + void setCatchupQueues(broker::QueueRegistry&, bool createGuards); /** Return guard associated with a queue. Used to create ReplicatingSubscription. */ GuardPtr guard(const QueuePtr&); @@ -80,7 +80,7 @@ class RemoteBackup /** Called via ConfigurationObserver. Note: may set isReady() */ void queueDestroy(const QueuePtr&); - /**@return true when all initial queues for this backup are ready. */ + /**@return true when all catch-up queues for this backup are ready. */ bool isReady(); /**@return true if isReady() and this is the first call to reportReady */ @@ -94,14 +94,13 @@ class RemoteBackup typedef std::map<QueuePtr, GuardPtr> GuardMap; typedef std::set<QueuePtr> QueueSet; - /** Add queue to guard as an initial queue */ - void initialQueue(const QueuePtr&, bool createGuard); + void catchupQueue(const QueuePtr&, bool createGuard); std::string logPrefix; BrokerInfo brokerInfo; ReplicationTest replicationTest; GuardMap guards; - QueueSet initialQueues; + QueueSet catchupQueues; bool connected; bool reportedReady; }; diff --git a/cpp/src/tests/ha_test.py b/cpp/src/tests/ha_test.py index 79db67e3c8..462c90bfb3 100755 --- a/cpp/src/tests/ha_test.py +++ b/cpp/src/tests/ha_test.py @@ -199,7 +199,7 @@ class HaCluster(object): HaCluster._cluster_count += 1 for i in xrange(n): self.start(False) self.update_urls() - self[0].promote() + if promote: self[0].promote() def next_name(self): name="cluster%s-%s"%(self.id, self.broker_id) diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index a8d16a77c9..7ce0d1701a 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -627,25 +627,29 @@ acl deny all all def test_delete_missing_response(self): """Check that a backup correctly deletes leftover queues and exchanges that are missing from the initial reponse set.""" - cluster = HaCluster(self,2) - s = cluster[0].connect().session() + # This test is a bit contrived, we set up the situation on backup brokers + # and then promote one. + cluster = HaCluster(self, 2, promote=False) + + # cluster[0] Will be the primary + s = cluster[0].connect_admin().session() s.sender("q1;{create:always}") - s.sender("q2;{create:always}") s.sender("e1;{create:always, node:{type:topic}}") - s.sender("e2;{create:always, node:{type:topic}}") - cluster.bounce(0, promote_next=False) - # Fake a primary that has deleted some queues and exchanges. - s = cluster[0].connect_admin().session() + + # cluster[1] will be the backup, has extra queues/exchanges + s = cluster[1].connect_admin().session() + s.sender("q1;{create:always}") s.sender("q2;{create:always}") + s.sender("e1;{create:always, node:{type:topic}}") s.sender("e2;{create:always, node:{type:topic}}") - s.sender("x;{create:always}") # A new queue so we can wait for the update. + for a in ["q1", "q2", "e1", "e2"]: cluster[1].wait_backup(a) + cluster[0].promote() - # Verify the backup has deleted the missing queues and exchanges + # Verify the backup deletes the surpluis queue and exchange cluster[1].wait_status("ready") s = cluster[1].connect_admin().session() - cluster[1].wait_backup("x"); - self.assertRaises(NotFound, s.receiver, ("q1")); - self.assertRaises(NotFound, s.receiver, ("e1")); + self.assertRaises(NotFound, s.receiver, ("q2")); + self.assertRaises(NotFound, s.receiver, ("e2")); def test_auto_delete_qpid_4285(self): |