diff options
author | Alan Conway <aconway@apache.org> | 2012-10-02 18:41:41 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-10-02 18:41:41 +0000 |
commit | b45b6daf6121e3f069de938439e1e37ff0cf7cc1 (patch) | |
tree | 90e8934452fa96d1b473dc73fdfca36f0d155490 /qpid/cpp | |
parent | 91dfa1fd49a50a3dae9d12ae5f6b08c711428683 (diff) | |
download | qpid-python-b45b6daf6121e3f069de938439e1e37ff0cf7cc1.tar.gz |
QPID-4285: HA backups continuously disconnect / re-sync after attempting to replicate a deleted queue. (Based on patch by Jason Dillama)
This does not directly tackle the origin of the problem but extends Jasons's patch since
it addresses something we had to fix anyway: "leaking" queues and exchanges. It does 2 things.
1. enabled hideDeletedError on all subscription objects used by HA
This suppress the troublesome exception with a harmless no-op
2. Delete queues/exchanges missing from responses (based on Jasons patch)
Fix the "leak" of queues and exchanges possible when an object replicated
to a backup is deleted from the newn primary before the backup connects.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1393089 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 108 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 33 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 3 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_test.py | 2 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 24 |
7 files changed, 156 insertions, 24 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 276e17a8b5..95cba7bd8e 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -370,7 +370,7 @@ bool Queue::acquire(const QueueCursor& position, const std::string& consumer) bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c) { - checkNotDeleted(c); + if (!checkNotDeleted(c)) return false; QueueListeners::NotificationSet set; while (true) { //TODO: reduce lock scope @@ -1443,11 +1443,11 @@ QueueListeners& Queue::getListeners() { return listeners; } Messages& Queue::getMessages() { return *messages; } const Messages& Queue::getMessages() const { return *messages; } -void Queue::checkNotDeleted(const Consumer::shared_ptr& c) +bool Queue::checkNotDeleted(const Consumer::shared_ptr& c) { - if (deleted && !c->hideDeletedError()) { + if (deleted && !c->hideDeletedError()) throw ResourceDeletedException(QPID_MSG("Queue " << getName() << " has been deleted.")); - } + return !deleted; } void Queue::addObserver(boost::shared_ptr<QueueObserver> observer) diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index d52afec6b9..b628f17c08 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -188,7 +188,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, int getEventMode(); void dequeueFromStore(boost::intrusive_ptr<PersistableMessage>); void abandoned(const Message& message); - void checkNotDeleted(const Consumer::shared_ptr&); + bool checkNotDeleted(const Consumer::shared_ptr&); void notifyDeleted(); uint32_t remove(uint32_t maxCount, MessagePredicate, MessageFunctor, SubscriptionType); virtual bool checkDepth(const QueueDepth& increment, const Message&); diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 2f0d304686..7572c7e516 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -33,6 +33,7 @@ #include "qpid/broker/SessionHandler.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/reply_exceptions.h" #include "qmf/org/apache/qpid/broker/EventBind.h" #include "qmf/org/apache/qpid/broker/EventUnbind.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" @@ -58,8 +59,9 @@ using qmf::org::apache::qpid::broker::EventQueueDeclare; using qmf::org::apache::qpid::broker::EventQueueDelete; using qmf::org::apache::qpid::broker::EventSubscribe; using qmf::org::apache::qpid::ha::EventMembersUpdate; +using qpid::broker::amqp_0_10::MessageTransfer; using namespace framing; -using std::string; +using namespace std; using std::ostream; using types::Variant; using namespace broker; @@ -177,8 +179,11 @@ BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& logPrefix("Backup: "), replicationTest(hb.getReplicationTest()), haBroker(hb), broker(hb.getBroker()), link(l), initialized(false), - alternates(hb.getBroker().getExchanges()) -{} + alternates(hb.getBroker().getExchanges()), + cleaner(*this) +{ + getArgs().setString(QPID_REPLICATE, printable(NONE).str()); +} void BrokerReplicator::initialize() { // Can't do this in the constructor because we need a shared_ptr to this. @@ -223,6 +228,10 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH << " status:" << printable(haBroker.getStatus())); initialized = true; + // Scan for existing replicated queues and exchanges. Any that have not been seen by + // the time all reponses are received will be cleaned up. + cleaner.start(); + framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); @@ -254,7 +263,7 @@ void BrokerReplicator::route(Deliverable& msg) { } Variant::List list; try { - if (!qpid::broker::amqp_0_10::MessageTransfer::isQMFv2(msg.getMessage())) + if (!MessageTransfer::isQMFv2(msg.getMessage())) throw Exception("Unexpected message, not QMF2 event or query response."); // decode as list string content = msg.getMessage().getContent(); @@ -287,13 +296,18 @@ void BrokerReplicator::route(Deliverable& msg) { else if (type == BINDING) doResponseBind(values); else if (type == HA_BROKER) doResponseHaBroker(values); } - if (qpid::broker::amqp_0_10::MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) { - // We have received all of the exchange response. + if (MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) { + QPID_LOG(debug, logPrefix << "Initial exchange configuration complete."); + cleaner.cleanExchanges(); // Clean up exchanges that no longer exist in the primary alternates.clear(); } + if (MessageTransfer::isLastQMFResponse(msg.getMessage(), QUEUE)) { + QPID_LOG(debug, logPrefix << "Initial queue configuration complete."); + cleaner.cleanQueues(); // Clean up queues that no longer exist in the primary + } } } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what() + QPID_LOG(critical, logPrefix << "Configuration replication failed: " << e.what() << ": while handling: " << list); haBroker.shutdown(); throw; @@ -308,14 +322,15 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { if (values[DISP] == CREATED && replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl)) { string name = values[QNAME].asString(); QueueSettings settings(values[DURABLE].asBool(), values[AUTODEL].asBool()); + QPID_LOG(debug, logPrefix << "Queue declare event: " << name); + cleaner.forgetQueue(name); framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); // If we already have a queue with this name, replace it. // The queue was definitely created on the primary. if (broker.getQueues().find(name)) { QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name); - broker.deleteQueue(name, userId, remoteHost); - stopQueueReplicator(name); + deleteQueue(name); } settings.populate(args, settings.storeSettings); CreateQueueResult result = @@ -346,8 +361,8 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { boost::shared_ptr<Queue> queue = broker.getQueues().find(name); if (queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) { QPID_LOG(debug, logPrefix << "Queue delete event: " << name); - stopQueueReplicator(name); - broker.deleteQueue(name, userId, remoteHost); + cleaner.forgetQueue(name); + deleteQueue(name); } } @@ -357,13 +372,14 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) { string name = values[EXNAME].asString(); QPID_LOG(debug, logPrefix << "Exchange declare event: " << name); + cleaner.forgetExchange(name); framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); // If we already have a exchange with this name, replace it. // The exchange was definitely created on the primary. if (broker.getExchanges().find(name)) { - broker.deleteExchange(name, userId, remoteHost); - QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << name); + deleteExchange(name); + QPID_LOG(warning, logPrefix << "Replaced existing exchange: " << name); } CreateExchangeResult result = createExchange( name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, @@ -377,12 +393,13 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { string name = values[EXNAME].asString(); boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name); if (!exchange) { - QPID_LOG(warning, logPrefix << "Exchange delete event, does not exist: " << name); + QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << name); } else if (!replicationTest.replicateLevel(exchange->getArgs())) { QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name); } else { QPID_LOG(debug, logPrefix << "Exchange delete event:" << name); - broker.deleteExchange(name, userId, remoteHost); + cleaner.forgetExchange(name); + deleteExchange(name); replicatedExchanges.erase(name); } } @@ -457,6 +474,7 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { values[EXCLUSIVE].asBool())) return; string name(values[NAME].asString()); + cleaner.forgetQueue(name); QPID_LOG(debug, logPrefix << "Queue response: " << name); if (broker.getQueues().find(name)) { // Already exists if (findQueueReplicator(name)) @@ -478,6 +496,7 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); if (!replicationTest.replicateLevel(argsMap)) return; string name = values[NAME].asString(); + cleaner.forgetExchange(name); QPID_LOG(debug, logPrefix << "Exchange response: " << name); if (broker.getExchanges().find(name)) { if (replicatedExchanges.find(name) != replicatedExchanges.end()) @@ -572,7 +591,7 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu } } -void BrokerReplicator::stopQueueReplicator(const std::string& name) { +void BrokerReplicator::deleteQueue(const std::string& name) { boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name); if (qr) { qr->deactivate(); @@ -580,6 +599,22 @@ void BrokerReplicator::stopQueueReplicator(const std::string& name) { // actually be destroyed. broker.getExchanges().destroy(qr->getName()); } + qr.reset(); + try { + broker.deleteQueue(name, userId, remoteHost); + QPID_LOG(debug, logPrefix << "Queue deleted: " << name); + } catch (const framing::NotFoundException&) { + QPID_LOG(debug, logPrefix << "Queue not found for deletion: " << name); + } +} + +void BrokerReplicator::deleteExchange(const std::string& name) { + try { + broker.deleteExchange(name, userId, remoteHost); + QPID_LOG(debug, logPrefix << "Exchange deleted: " << name); + } catch (const framing::NotFoundException&) { + QPID_LOG(debug, logPrefix << "Exchange not found for deletion: " << name); + } } BrokerReplicator::CreateQueueResult BrokerReplicator::createQueue( @@ -639,4 +674,45 @@ bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, co string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } +// BrokerReplicator::Cleaner + +BrokerReplicator::Cleaner::Cleaner(BrokerReplicator& br) : brokerReplicator(br) {} + +void BrokerReplicator::Cleaner::start() { + queues.clear(); + exchanges.clear(); + brokerReplicator.broker.getExchanges().eachExchange(boost::bind(&Cleaner::addExchange, this, _1)); + brokerReplicator.broker.getQueues().eachQueue(boost::bind(&Cleaner::addQueue, this, _1)); +} + +void BrokerReplicator::Cleaner::addExchange(boost::shared_ptr<broker::Exchange> ex) { + if (brokerReplicator.replicationTest.isReplicated(CONFIGURATION, *ex)) + exchanges.insert(ex->getName()); +} + +void BrokerReplicator::Cleaner::addQueue(boost::shared_ptr<broker::Queue> q) { + if (brokerReplicator.replicationTest.isReplicated(CONFIGURATION, *q)) + queues.insert(q->getName()); +} + +void BrokerReplicator::Cleaner::forgetExchange(const std::string& name) { + exchanges.erase(name); +} + +void BrokerReplicator::Cleaner::forgetQueue(const std::string& name) { + queues.erase(name); +} + +void BrokerReplicator::Cleaner::cleanExchanges() { + for_each(exchanges.begin(), exchanges.end(), + boost::bind(&BrokerReplicator::deleteExchange, &brokerReplicator, _1)); + exchanges.clear(); +} + +void BrokerReplicator::Cleaner::cleanQueues() { + for_each(queues.begin(), queues.end(), + boost::bind(&BrokerReplicator::deleteQueue, &brokerReplicator, _1)); + queues.clear(); +} + }} // namespace broker diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index 5c8a983d45..11c828d50e 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -31,6 +31,7 @@ #include "qpid/management/ManagementObject.h" #include <boost/shared_ptr.hpp> #include <boost/enable_shared_from_this.hpp> +#include <set> namespace qpid { @@ -81,6 +82,33 @@ class BrokerReplicator : public broker::Exchange, typedef std::pair<boost::shared_ptr<broker::Queue>, bool> CreateQueueResult; typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> CreateExchangeResult; + /** Keep track of queues and exchanges that need to be cleaned up. */ + class Cleaner { + public: + Cleaner(BrokerReplicator&); + + /** Scan for existing queues and exchanges. */ + void start(); + + // Forget a queue/exchange that does not need cleaning + void forgetExchange(const std::string& name); + void forgetQueue(const std::string& name); + + void cleanExchanges(); + void cleanQueues(); + + private: + typedef std::set<std::string> Names; + + // add a queue/exchange that may need cleaning. + void addExchange(boost::shared_ptr<broker::Exchange>); + void addQueue(boost::shared_ptr<broker::Queue>); + + BrokerReplicator& brokerReplicator; + Names queues, exchanges; + }; + friend class Cleaner; + void initializeBridge(broker::Bridge&, broker::SessionHandler&); void doEventQueueDeclare(types::Variant::Map& values); @@ -98,7 +126,6 @@ class BrokerReplicator : public broker::Exchange, QueueReplicatorPtr findQueueReplicator(const std::string& qname); void startQueueReplicator(const boost::shared_ptr<broker::Queue>&); - void stopQueueReplicator(const std::string& name); CreateQueueResult createQueue( const std::string& name, @@ -114,6 +141,9 @@ class BrokerReplicator : public broker::Exchange, const qpid::framing::FieldTable& args, const std::string& alternateExchange); + void deleteQueue(const std::string& name); + void deleteExchange(const std::string& name); + std::string logPrefix; std::string userId, remoteHost; ReplicationTest replicationTest; @@ -125,6 +155,7 @@ class BrokerReplicator : public broker::Exchange, qpid::Address primary; typedef std::set<std::string> StringSet; StringSet replicatedExchanges; // exchanges that have been replicated. + Cleaner cleaner; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 23ece4c2d3..c872e408c5 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -70,6 +70,7 @@ QueueReplicator::QueueReplicator(HaBroker& hb, args.setString(QPID_REPLICATE, printable(NONE).str()); Uuid uuid(true); bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str(); + getArgs().setString(QPID_REPLICATE, printable(NONE).str()); } // This must be separate from the constructor so we can call shared_from_this. @@ -123,8 +124,6 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa SequenceNumber front, back; queue->getRange(front, back, broker::REPLICATOR); if (front <= back) settings.setInt(ReplicatingSubscription::QPID_FRONT, front); - QPID_LOG(debug, logPrefix << " subscribe with settings " << settings); - peer.getMessage().subscribe( args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false/*exclusive*/, "", 0, settings); diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 2f9d9a1211..79db67e3c8 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -209,6 +209,7 @@ class HaCluster(object): def start(self, update_urls=True, args=[]): """Start a new broker in the cluster""" b = HaBroker(self.test, name=self.next_name(), **self.kwargs) + b.ready() self._brokers.append(b) if update_urls: self.update_urls() return b @@ -235,6 +236,7 @@ class HaCluster(object): self._brokers[i] = HaBroker( self.test, name=b.name, port=b.port(), brokers_url=self.url, **self.kwargs) + self._brokers[i].ready() def bounce(self, i, promote_next=True): """Stop and restart a broker in a cluster.""" diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 3c43c6a914..86f33d8030 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -624,6 +624,30 @@ acl deny all all actual = [m.content for m in primary.get_messages("pq", 100)] self.assertEqual(expect, actual) + def test_delete_missing_response(self): + """Check that a backup correctly deletes leftover queues and exchanges that are + missing from the initial reponse set.""" + cluster = HaCluster(self,2) + s = cluster[0].connect().session() + s.sender("q1;{create:always}") + s.sender("q2;{create:always}") + s.sender("e1;{create:always, node:{type:topic}}") + s.sender("e2;{create:always, node:{type:topic}}") + cluster.bounce(0, promote_next=False) + # Fake a primary that has deleted some queues and exchanges. + s = cluster[0].connect_admin().session() + s.sender("q2;{create:always}") + s.sender("e2;{create:always, node:{type:topic}}") + s.sender("x;{create:always}") # A new queue so we can wait for the update. + cluster[0].promote() + # Verify the backup has deleted the missing queues and exchanges + cluster[1].wait_status("ready") + s = cluster[1].connect_admin().session() + cluster[1].wait_backup("x"); + self.assertRaises(NotFound, s.receiver, ("q1")); + self.assertRaises(NotFound, s.receiver, ("e1")); + + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit |