diff options
author | Alan Conway <aconway@apache.org> | 2012-10-02 21:46:50 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-10-02 21:46:50 +0000 |
commit | 8ad1a1f59ad2ddd802c5efba06cf445d1fae1c64 (patch) | |
tree | 1fc850ad0238a2b98400ce1f93a94ae46dafe9a3 /cpp/src | |
parent | 316664b4c66e4fc53eaa42d4ef8f5c1cad50bc72 (diff) | |
download | qpid-python-8ad1a1f59ad2ddd802c5efba06cf445d1fae1c64.tar.gz |
QPID-4285: HA backups continuously disconnect / re-sync after attempting to replicate a deleted queue
Fixes queues getting into a partially deleted state: previously when a broker was
promoted, it did not clean up it's QueueReplicators. The QueueReplicators keep a
shared_ptr to the Queue so this kept Queues in memory after they were destroyed. It also
kept them in QMF, since the management object is unregistered in the destructor.
This patch cleans up properly on promotion.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1393201 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/ha/AlternateExchangeSetter.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/ha/Backup.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.cpp | 20 | ||||
-rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 20 | ||||
-rwxr-xr-x | cpp/src/tests/ha_tests.py | 20 |
6 files changed, 58 insertions, 14 deletions
diff --git a/cpp/src/qpid/ha/AlternateExchangeSetter.h b/cpp/src/qpid/ha/AlternateExchangeSetter.h index 1878939aad..2386a01084 100644 --- a/cpp/src/qpid/ha/AlternateExchangeSetter.h +++ b/cpp/src/qpid/ha/AlternateExchangeSetter.h @@ -45,7 +45,7 @@ class AlternateExchangeSetter /** If altEx is already known, call setter(altEx) now else save for later */ void setAlternate(const std::string& altEx, const SetFunction& setter) { - broker::Exchange::shared_ptr ex = exchanges.find(altEx); + boost::shared_ptr<broker::Exchange> ex = exchanges.find(altEx); if (ex) setter(ex); // Set immediately. else setters.insert(Setters::value_type(altEx, setter)); // Save for later. } diff --git a/cpp/src/qpid/ha/Backup.cpp b/cpp/src/qpid/ha/Backup.cpp index 6852a58b0c..3024656daa 100644 --- a/cpp/src/qpid/ha/Backup.cpp +++ b/cpp/src/qpid/ha/Backup.cpp @@ -75,8 +75,13 @@ void Backup::initialize(const Url& brokers) { } Backup::~Backup() { + QPID_LOG(debug, logPrefix << "Backup shutting down."); if (link) link->close(); - if (replicator.get()) broker.getExchanges().destroy(replicator->getName()); + if (replicator.get()) { + broker.getExchanges().destroy(replicator->getName()); + replicator->shutdown(); + replicator.reset(); + } } // Called via management. diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index 7572c7e516..73ab5327fc 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -68,7 +68,7 @@ using namespace broker; namespace { -const string QPID_CONFIGURATION_REPLICATOR("qpid.configuration-replicator"); +const string QPID_CONFIGURATION_REPLICATOR("qpid.broker-replicator"); const string CLASS_NAME("_class_name"); const string EVENT("_event"); @@ -208,7 +208,12 @@ void BrokerReplicator::initialize() { ); } -BrokerReplicator::~BrokerReplicator() { } +BrokerReplicator::~BrokerReplicator() { shutdown(); } + +void BrokerReplicator::shutdown() { + QPID_LOG(debug, logPrefix << "BrokerReplicator shutting down."); + broker.getQueues().eachQueue(boost::bind(&BrokerReplicator::deactivate, this, _1)); +} // This is called in the connection IO thread when the bridge is started. void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { @@ -591,7 +596,7 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu } } -void BrokerReplicator::deleteQueue(const std::string& name) { +void BrokerReplicator::deactivateQueue(const std::string& name) { boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name); if (qr) { qr->deactivate(); @@ -599,7 +604,14 @@ void BrokerReplicator::deleteQueue(const std::string& name) { // actually be destroyed. broker.getExchanges().destroy(qr->getName()); } - qr.reset(); +} + +void BrokerReplicator::deactivate(boost::shared_ptr<broker::Queue> q) { + deactivateQueue(q->getName()); +} + +void BrokerReplicator::deleteQueue(const std::string& name) { + deactivateQueue(name); try { broker.deleteQueue(name, userId, remoteHost); QPID_LOG(debug, logPrefix << "Queue deleted: " << name); diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h index 11c828d50e..bbdf3e2c0e 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.h +++ b/cpp/src/qpid/ha/BrokerReplicator.h @@ -76,6 +76,7 @@ class BrokerReplicator : public broker::Exchange, bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); void route(broker::Deliverable&); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); + void shutdown(); private: typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr; @@ -141,6 +142,8 @@ class BrokerReplicator : public broker::Exchange, const qpid::framing::FieldTable& args, const std::string& alternateExchange); + void deactivateQueue(const std::string& name); + void deactivate(boost::shared_ptr<broker::Queue> q); void deleteQueue(const std::string& name); void deleteExchange(const std::string& name); diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index c872e408c5..c8341ccef3 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -44,6 +44,7 @@ namespace qpid { namespace ha { using namespace broker; using namespace framing; +using namespace std; const std::string QPID_HA_EVENT_PREFIX("qpid.ha-"); const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue"); @@ -124,13 +125,18 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa SequenceNumber front, back; queue->getRange(front, back, broker::REPLICATOR); if (front <= back) settings.setInt(ReplicatingSubscription::QPID_FRONT, front); - peer.getMessage().subscribe( - args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, - false/*exclusive*/, "", 0, settings); - // FIXME aconway 2012-05-22: use a finite credit window? - peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); - peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); - + try { + peer.getMessage().subscribe( + args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, + false/*exclusive*/, "", 0, settings); + // FIXME aconway 2012-05-22: use a finite credit window? + peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); + peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); + } + catch(const exception& e) { + QPID_LOG(error, QPID_MSG(logPrefix + "Cannot connect to primary: " << e.what())); + throw; + } qpid::Address primary; link->getRemoteAddress(primary); QPID_LOG(info, logPrefix << "Connected to " << primary << "(" << bridgeName << ")"); diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index 86f33d8030..f1620cf55d 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -648,6 +648,24 @@ acl deny all all self.assertRaises(NotFound, s.receiver, ("e1")); + def test_auto_delete_qpid_4285(self): + """Regression test for QPID-4285: an auto delete queue gets stuck in + a partially deleted state and causes replication errors.""" + cluster = HaCluster(self,2) + cluster[1].wait_status("ready") + s = cluster[0].connect().session() + s.receiver("q;{create:always}") + cluster[1].wait_backup("q") + cluster.kill(0) # Make the backup take over. + s = cluster[1].connect().session() + s.receiver("q;{delete:always}").close() # Delete q on new primary + try: + s.receiver("q") + self.fail("Expected NotFound exception") # Should not be avaliable + except NotFound: pass + assert not cluster[1].agent().getQueue("q") # Should not be in QMF + + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit @@ -660,7 +678,7 @@ def fairshare(msgs, limit, levels): msgs = postponed count = 0 last_priority = None - postponed = [] + postponed = [ ] msg = msgs.pop(0) if last_priority and priority_level(msg.priority, levels) == last_priority: count += 1 |