diff options
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 108 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 33 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 3 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_test.py | 2 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 24 |
7 files changed, 156 insertions, 24 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 276e17a8b5..95cba7bd8e 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -370,7 +370,7 @@ bool Queue::acquire(const QueueCursor& position, const std::string& consumer) bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c) { - checkNotDeleted(c); + if (!checkNotDeleted(c)) return false; QueueListeners::NotificationSet set; while (true) { //TODO: reduce lock scope @@ -1443,11 +1443,11 @@ QueueListeners& Queue::getListeners() { return listeners; } Messages& Queue::getMessages() { return *messages; } const Messages& Queue::getMessages() const { return *messages; } -void Queue::checkNotDeleted(const Consumer::shared_ptr& c) +bool Queue::checkNotDeleted(const Consumer::shared_ptr& c) { - if (deleted && !c->hideDeletedError()) { + if (deleted && !c->hideDeletedError()) throw ResourceDeletedException(QPID_MSG("Queue " << getName() << " has been deleted.")); - } + return !deleted; } void Queue::addObserver(boost::shared_ptr<QueueObserver> observer) diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index d52afec6b9..b628f17c08 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -188,7 +188,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, int getEventMode(); void dequeueFromStore(boost::intrusive_ptr<PersistableMessage>); void abandoned(const Message& message); - void checkNotDeleted(const Consumer::shared_ptr&); + bool checkNotDeleted(const Consumer::shared_ptr&); void notifyDeleted(); uint32_t remove(uint32_t maxCount, MessagePredicate, MessageFunctor, SubscriptionType); virtual bool checkDepth(const QueueDepth& increment, const Message&); diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 2f0d304686..7572c7e516 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -33,6 +33,7 @@ #include "qpid/broker/SessionHandler.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/reply_exceptions.h" #include "qmf/org/apache/qpid/broker/EventBind.h" #include "qmf/org/apache/qpid/broker/EventUnbind.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" @@ -58,8 +59,9 @@ using qmf::org::apache::qpid::broker::EventQueueDeclare; using qmf::org::apache::qpid::broker::EventQueueDelete; using qmf::org::apache::qpid::broker::EventSubscribe; using qmf::org::apache::qpid::ha::EventMembersUpdate; +using qpid::broker::amqp_0_10::MessageTransfer; using namespace framing; -using std::string; +using namespace std; using std::ostream; using types::Variant; using namespace broker; @@ -177,8 +179,11 @@ BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& logPrefix("Backup: "), replicationTest(hb.getReplicationTest()), haBroker(hb), broker(hb.getBroker()), link(l), initialized(false), - alternates(hb.getBroker().getExchanges()) -{} + alternates(hb.getBroker().getExchanges()), + cleaner(*this) +{ + getArgs().setString(QPID_REPLICATE, printable(NONE).str()); +} void BrokerReplicator::initialize() { // Can't do this in the constructor because we need a shared_ptr to this. @@ -223,6 +228,10 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH << " status:" << printable(haBroker.getStatus())); initialized = true; + // Scan for existing replicated queues and exchanges. Any that have not been seen by + // the time all reponses are received will be cleaned up. + cleaner.start(); + framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); @@ -254,7 +263,7 @@ void BrokerReplicator::route(Deliverable& msg) { } Variant::List list; try { - if (!qpid::broker::amqp_0_10::MessageTransfer::isQMFv2(msg.getMessage())) + if (!MessageTransfer::isQMFv2(msg.getMessage())) throw Exception("Unexpected message, not QMF2 event or query response."); // decode as list string content = msg.getMessage().getContent(); @@ -287,13 +296,18 @@ void BrokerReplicator::route(Deliverable& msg) { else if (type == BINDING) doResponseBind(values); else if (type == HA_BROKER) doResponseHaBroker(values); } - if (qpid::broker::amqp_0_10::MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) { - // We have received all of the exchange response. + if (MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) { + QPID_LOG(debug, logPrefix << "Initial exchange configuration complete."); + cleaner.cleanExchanges(); // Clean up exchanges that no longer exist in the primary alternates.clear(); } + if (MessageTransfer::isLastQMFResponse(msg.getMessage(), QUEUE)) { + QPID_LOG(debug, logPrefix << "Initial queue configuration complete."); + cleaner.cleanQueues(); // Clean up queues that no longer exist in the primary + } } } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what() + QPID_LOG(critical, logPrefix << "Configuration replication failed: " << e.what() << ": while handling: " << list); haBroker.shutdown(); throw; @@ -308,14 +322,15 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { if (values[DISP] == CREATED && replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl)) { string name = values[QNAME].asString(); QueueSettings settings(values[DURABLE].asBool(), values[AUTODEL].asBool()); + QPID_LOG(debug, logPrefix << "Queue declare event: " << name); + cleaner.forgetQueue(name); framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); // If we already have a queue with this name, replace it. // The queue was definitely created on the primary. if (broker.getQueues().find(name)) { QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name); - broker.deleteQueue(name, userId, remoteHost); - stopQueueReplicator(name); + deleteQueue(name); } settings.populate(args, settings.storeSettings); CreateQueueResult result = @@ -346,8 +361,8 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { boost::shared_ptr<Queue> queue = broker.getQueues().find(name); if (queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) { QPID_LOG(debug, logPrefix << "Queue delete event: " << name); - stopQueueReplicator(name); - broker.deleteQueue(name, userId, remoteHost); + cleaner.forgetQueue(name); + deleteQueue(name); } } @@ -357,13 +372,14 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) { string name = values[EXNAME].asString(); QPID_LOG(debug, logPrefix << "Exchange declare event: " << name); + cleaner.forgetExchange(name); framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); // If we already have a exchange with this name, replace it. // The exchange was definitely created on the primary. if (broker.getExchanges().find(name)) { - broker.deleteExchange(name, userId, remoteHost); - QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << name); + deleteExchange(name); + QPID_LOG(warning, logPrefix << "Replaced existing exchange: " << name); } CreateExchangeResult result = createExchange( name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, @@ -377,12 +393,13 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { string name = values[EXNAME].asString(); boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name); if (!exchange) { - QPID_LOG(warning, logPrefix << "Exchange delete event, does not exist: " << name); + QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << name); } else if (!replicationTest.replicateLevel(exchange->getArgs())) { QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name); } else { QPID_LOG(debug, logPrefix << "Exchange delete event:" << name); - broker.deleteExchange(name, userId, remoteHost); + cleaner.forgetExchange(name); + deleteExchange(name); replicatedExchanges.erase(name); } } @@ -457,6 +474,7 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { values[EXCLUSIVE].asBool())) return; string name(values[NAME].asString()); + cleaner.forgetQueue(name); QPID_LOG(debug, logPrefix << "Queue response: " << name); if (broker.getQueues().find(name)) { // Already exists if (findQueueReplicator(name)) @@ -478,6 +496,7 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); if (!replicationTest.replicateLevel(argsMap)) return; string name = values[NAME].asString(); + cleaner.forgetExchange(name); QPID_LOG(debug, logPrefix << "Exchange response: " << name); if (broker.getExchanges().find(name)) { if (replicatedExchanges.find(name) != replicatedExchanges.end()) @@ -572,7 +591,7 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu } } -void BrokerReplicator::stopQueueReplicator(const std::string& name) { +void BrokerReplicator::deleteQueue(const std::string& name) { boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name); if (qr) { qr->deactivate(); @@ -580,6 +599,22 @@ void BrokerReplicator::stopQueueReplicator(const std::string& name) { // actually be destroyed. broker.getExchanges().destroy(qr->getName()); } + qr.reset(); + try { + broker.deleteQueue(name, userId, remoteHost); + QPID_LOG(debug, logPrefix << "Queue deleted: " << name); + } catch (const framing::NotFoundException&) { + QPID_LOG(debug, logPrefix << "Queue not found for deletion: " << name); + } +} + +void BrokerReplicator::deleteExchange(const std::string& name) { + try { + broker.deleteExchange(name, userId, remoteHost); + QPID_LOG(debug, logPrefix << "Exchange deleted: " << name); + } catch (const framing::NotFoundException&) { + QPID_LOG(debug, logPrefix << "Exchange not found for deletion: " << name); + } } BrokerReplicator::CreateQueueResult BrokerReplicator::createQueue( @@ -639,4 +674,45 @@ bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, co string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } +// BrokerReplicator::Cleaner + +BrokerReplicator::Cleaner::Cleaner(BrokerReplicator& br) : brokerReplicator(br) {} + +void BrokerReplicator::Cleaner::start() { + queues.clear(); + exchanges.clear(); + brokerReplicator.broker.getExchanges().eachExchange(boost::bind(&Cleaner::addExchange, this, _1)); + brokerReplicator.broker.getQueues().eachQueue(boost::bind(&Cleaner::addQueue, this, _1)); +} + +void BrokerReplicator::Cleaner::addExchange(boost::shared_ptr<broker::Exchange> ex) { + if (brokerReplicator.replicationTest.isReplicated(CONFIGURATION, *ex)) + exchanges.insert(ex->getName()); +} + +void BrokerReplicator::Cleaner::addQueue(boost::shared_ptr<broker::Queue> q) { + if (brokerReplicator.replicationTest.isReplicated(CONFIGURATION, *q)) + queues.insert(q->getName()); +} + +void BrokerReplicator::Cleaner::forgetExchange(const std::string& name) { + exchanges.erase(name); +} + +void BrokerReplicator::Cleaner::forgetQueue(const std::string& name) { + queues.erase(name); +} + +void BrokerReplicator::Cleaner::cleanExchanges() { + for_each(exchanges.begin(), exchanges.end(), + boost::bind(&BrokerReplicator::deleteExchange, &brokerReplicator, _1)); + exchanges.clear(); +} + +void BrokerReplicator::Cleaner::cleanQueues() { + for_each(queues.begin(), queues.end(), + boost::bind(&BrokerReplicator::deleteQueue, &brokerReplicator, _1)); + queues.clear(); +} + }} // namespace broker diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index 5c8a983d45..11c828d50e 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -31,6 +31,7 @@ #include "qpid/management/ManagementObject.h" #include <boost/shared_ptr.hpp> #include <boost/enable_shared_from_this.hpp> +#include <set> namespace qpid { @@ -81,6 +82,33 @@ class BrokerReplicator : public broker::Exchange, typedef std::pair<boost::shared_ptr<broker::Queue>, bool> CreateQueueResult; typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> CreateExchangeResult; + /** Keep track of queues and exchanges that need to be cleaned up. */ + class Cleaner { + public: + Cleaner(BrokerReplicator&); + + /** Scan for existing queues and exchanges. */ + void start(); + + // Forget a queue/exchange that does not need cleaning + void forgetExchange(const std::string& name); + void forgetQueue(const std::string& name); + + void cleanExchanges(); + void cleanQueues(); + + private: + typedef std::set<std::string> Names; + + // add a queue/exchange that may need cleaning. + void addExchange(boost::shared_ptr<broker::Exchange>); + void addQueue(boost::shared_ptr<broker::Queue>); + + BrokerReplicator& brokerReplicator; + Names queues, exchanges; + }; + friend class Cleaner; + void initializeBridge(broker::Bridge&, broker::SessionHandler&); void doEventQueueDeclare(types::Variant::Map& values); @@ -98,7 +126,6 @@ class BrokerReplicator : public broker::Exchange, QueueReplicatorPtr findQueueReplicator(const std::string& qname); void startQueueReplicator(const boost::shared_ptr<broker::Queue>&); - void stopQueueReplicator(const std::string& name); CreateQueueResult createQueue( const std::string& name, @@ -114,6 +141,9 @@ class BrokerReplicator : public broker::Exchange, const qpid::framing::FieldTable& args, const std::string& alternateExchange); + void deleteQueue(const std::string& name); + void deleteExchange(const std::string& name); + std::string logPrefix; std::string userId, remoteHost; ReplicationTest replicationTest; @@ -125,6 +155,7 @@ class BrokerReplicator : public broker::Exchange, qpid::Address primary; typedef std::set<std::string> StringSet; StringSet replicatedExchanges; // exchanges that have been replicated. + Cleaner cleaner; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 23ece4c2d3..c872e408c5 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -70,6 +70,7 @@ QueueReplicator::QueueReplicator(HaBroker& hb, args.setString(QPID_REPLICATE, printable(NONE).str()); Uuid uuid(true); bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str(); + getArgs().setString(QPID_REPLICATE, printable(NONE).str()); } // This must be separate from the constructor so we can call shared_from_this. @@ -123,8 +124,6 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa SequenceNumber front, back; queue->getRange(front, back, broker::REPLICATOR); if (front <= back) settings.setInt(ReplicatingSubscription::QPID_FRONT, front); - QPID_LOG(debug, logPrefix << " subscribe with settings " << settings); - peer.getMessage().subscribe( args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false/*exclusive*/, "", 0, settings); diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 2f9d9a1211..79db67e3c8 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -209,6 +209,7 @@ class HaCluster(object): def start(self, update_urls=True, args=[]): """Start a new broker in the cluster""" b = HaBroker(self.test, name=self.next_name(), **self.kwargs) + b.ready() self._brokers.append(b) if update_urls: self.update_urls() return b @@ -235,6 +236,7 @@ class HaCluster(object): self._brokers[i] = HaBroker( self.test, name=b.name, port=b.port(), brokers_url=self.url, **self.kwargs) + self._brokers[i].ready() def bounce(self, i, promote_next=True): """Stop and restart a broker in a cluster.""" diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 3c43c6a914..86f33d8030 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -624,6 +624,30 @@ acl deny all all actual = [m.content for m in primary.get_messages("pq", 100)] self.assertEqual(expect, actual) + def test_delete_missing_response(self): + """Check that a backup correctly deletes leftover queues and exchanges that are + missing from the initial reponse set.""" + cluster = HaCluster(self,2) + s = cluster[0].connect().session() + s.sender("q1;{create:always}") + s.sender("q2;{create:always}") + s.sender("e1;{create:always, node:{type:topic}}") + s.sender("e2;{create:always, node:{type:topic}}") + cluster.bounce(0, promote_next=False) + # Fake a primary that has deleted some queues and exchanges. + s = cluster[0].connect_admin().session() + s.sender("q2;{create:always}") + s.sender("e2;{create:always, node:{type:topic}}") + s.sender("x;{create:always}") # A new queue so we can wait for the update. + cluster[0].promote() + # Verify the backup has deleted the missing queues and exchanges + cluster[1].wait_status("ready") + s = cluster[1].connect_admin().session() + cluster[1].wait_backup("x"); + self.assertRaises(NotFound, s.receiver, ("q1")); + self.assertRaises(NotFound, s.receiver, ("e1")); + + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit |