diff options
author | Alan Conway <aconway@apache.org> | 2012-04-23 15:51:33 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-04-23 15:51:33 +0000 |
commit | a7c7e18f5d000b1e6e211a97e725b8bdca2900cc (patch) | |
tree | 912863a229b580fad5d115c9d048d6037ccd52ce | |
parent | dda3a026db3037ac23570cfd29ee680d33ed4f7b (diff) | |
download | qpid-python-a7c7e18f5d000b1e6e211a97e725b8bdca2900cc.tar.gz |
QPID-3603: Install HA connection excluder at the beginning of initialization.
Previosly excluder was being installed late allowing connections to a
backup and diverting backups from the real primary.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1329300 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 84 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 4 |
3 files changed, 53 insertions, 53 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 53550cfcd1..3f3fa87a01 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -47,6 +47,8 @@ using std::string; Backup::Backup(HaBroker& hb, const Settings& s) : haBroker(hb), broker(hb.getBroker()), settings(s), excluder(new ConnectionExcluder()) { + // Exclude client connections before starting the link to avoid self-connection. + broker.getConnectionObservers().add(excluder); // Empty brokerUrl means delay initialization until setUrl() is called. if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl)); } @@ -62,12 +64,18 @@ void Backup::initialize(const Url& url) { settings.mechanism, settings.username, settings.password); link = result.first; link->setUrl(url); - replicator.reset(new BrokerReplicator(haBroker, link)); broker.getExchanges().registerExchange(replicator); - broker.getConnectionObservers().add(excluder); } +Backup::~Backup() { + if (link) link->close(); + if (replicator.get()) broker.getExchanges().destroy(replicator->getName()); + replicator.reset(); + broker.getConnectionObservers().remove(excluder); // This allows client connections. +} + + void Backup::setBrokerUrl(const Url& url) { // Ignore empty URLs seen during start-up for some tests. if (url.empty()) return; @@ -81,10 +89,4 @@ void Backup::setBrokerUrl(const Url& url) { } } -Backup::~Backup() { - if (link) link->close(); - if (replicator.get()) broker.getExchanges().destroy(replicator->getName()); - broker.getConnectionObservers().remove(excluder); // This allows client connections. -} - }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 70afd2bfa7..d0c99cbdb6 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -190,8 +190,6 @@ BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& : Exchange(QPID_CONFIGURATION_REPLICATOR), haBroker(hb), broker(hb.getBroker()), link(l) { - QPID_LOG(info, "HA: Backup replicating from " << - link->getTransport() << ":" << link->getHost() << ":" << link->getPort()); broker.getLinks().declare( link->getHost(), link->getPort(), false, // durable @@ -230,7 +228,7 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler); sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler); sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler); - QPID_LOG(debug, "HA: Backup activated configuration bridge: " << queueName); + QPID_LOG(debug, "HA: Backup configuration bridge: " << queueName); } void BrokerReplicator::route(Deliverable& msg) { @@ -246,6 +244,7 @@ void BrokerReplicator::route(Deliverable& msg) { if (headers->getAsString(QMF_CONTENT) == EVENT) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { Variant::Map& map = i->asMap(); + QPID_LOG(trace, "HA: Backup received event: " << map); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values); @@ -257,8 +256,10 @@ void BrokerReplicator::route(Deliverable& msg) { } } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { - string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME]; - Variant::Map& values = i->asMap()[VALUES].asMap(); + Variant::Map& map = i->asMap(); + QPID_LOG(trace, "HA: Backup received event: " << map); + string type = map[SCHEMA_ID].asMap()[CLASS_NAME]; + Variant::Map& values = map[VALUES].asMap(); framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); if (type == QUEUE) doResponseQueue(values); @@ -268,14 +269,13 @@ void BrokerReplicator::route(Deliverable& msg) { } } } catch (const std::exception& e) { - QPID_LOG(critical, "HA: Backup configuration replication failed: " << e.what() + QPID_LOG(critical, "HA: Backup configuration failed: " << e.what() << ": while handling: " << list); throw; } } void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { - QPID_LOG(debug, "HA: Backup queue declare event " << values); string name = values[QNAME].asString(); Variant::Map argsMap = asMapVoid(values[ARGS]); if (values[DISP] == CREATED && replicateLevel(argsMap)) { @@ -292,26 +292,27 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { values[USER].asString(), values[RHOST].asString()); if (result.second) { - // FIXME aconway 2011-11-22: should delete old queue and - // re-create from event. - // Events are always up to date, whereas responses may be - // out of date. + QPID_LOG(debug, "HA: Backup queue declare event: " << name); startQueueReplicator(result.first); } else { // FIXME aconway 2011-12-02: what's the right way to handle this? - QPID_LOG(warning, "HA: Backup queue already exists: " << name); + // Should we delete the old & re-create form the event? Responses + // may be old but events are always up-to-date. + QPID_LOG(warning, "HA: Backup queue declare event, already exists: " << name); } } } void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { - QPID_LOG(debug, "HA: Backup queue delete event " << values); // The remote queue has already been deleted so replicator // sessions may be closed by a "queue deleted" exception. string name = values[QNAME].asString(); boost::shared_ptr<Queue> queue = broker.getQueues().find(name); - if (queue && replicateLevel(queue->getSettings())) { - QPID_LOG(debug, "HA: Backup deleting queue: " << name); + if (!queue) { + QPID_LOG(warning, "HA: Backup queue delete event, does not exist: " << name); + } else if (!replicateLevel(queue->getSettings())) { + QPID_LOG(warning, "HA: Backup queue delete event, not replicated: " << name); + } else { string rname = QueueReplicator::replicatorName(name); boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname); boost::shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex); @@ -320,11 +321,11 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { // actually be destroyed, deleting the exhange broker.getExchanges().destroy(rname); broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString()); + QPID_LOG(debug, "HA: Backup queue delete event: " << name); } } void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { - QPID_LOG(debug, "HA: Backup exchange declare event " << values); Variant::Map argsMap(asMapVoid(values[ARGS])); if (values[DISP] == CREATED && replicateLevel(argsMap)) { string name = values[EXNAME].asString(); @@ -339,32 +340,32 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { values[USER].asString(), values[RHOST].asString()).second) { - QPID_LOG(debug, "HA: Backup created exchange: " << name); + QPID_LOG(debug, "HA: Backup exchange declare event: " << name); } else { - // FIXME aconway 2011-11-22: should delete pre-exisitng exchange + // FIXME aconway 2011-11-22: should delete pre-existing exchange // and re-create from event. See comment in doEventQueueDeclare. - QPID_LOG(warning, "HA: Backup exchange already exists: " << name); + QPID_LOG(debug, "HA: Backup exchange declare event, already exists: " << name); } } } void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { - QPID_LOG(debug, "HA: Backup exchange delete event " << values); string name = values[EXNAME].asString(); - try { - boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name); - if (exchange && replicateLevel(exchange->getArgs())) { - QPID_LOG(debug, "HA: Backup deleting exchange:" << name); - broker.deleteExchange( - name, - values[USER].asString(), - values[RHOST].asString()); - } - } catch (const framing::NotFoundException&) {} + boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name); + if (!exchange) { + QPID_LOG(warning, "HA: Backup exchange delete event, does not exist: " << name); + } else if (!replicateLevel(exchange->getArgs())) { + QPID_LOG(warning, "HA: Backup exchange delete event, not replicated: " << name); + } else { + QPID_LOG(debug, "HA: Backup exchange delete event:" << name); + broker.deleteExchange( + name, + values[USER].asString(), + values[RHOST].asString()); + } } void BrokerReplicator::doEventBind(Variant::Map& values) { - QPID_LOG(debug, "HA: Backup bind event " << values); boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(values[EXNAME].asString()); boost::shared_ptr<Queue> queue = @@ -377,15 +378,14 @@ void BrokerReplicator::doEventBind(Variant::Map& values) { framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); - QPID_LOG(debug, "HA: Backup replicated binding exchange=" << exchange->getName() + exchange->bind(queue, key, &args); + QPID_LOG(debug, "HA: Backup bind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); - exchange->bind(queue, key, &args); } } void BrokerReplicator::doEventUnbind(Variant::Map& values) { - QPID_LOG(debug, "HA: Backup unbind event " << values); boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(values[EXNAME].asString()); boost::shared_ptr<Queue> queue = @@ -398,15 +398,14 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); - QPID_LOG(debug, "HA: Backup replicated unbinding exchange=" << exchange->getName() + exchange->unbind(queue, key, &args); + QPID_LOG(debug, "HA: Backup unbind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); - exchange->unbind(queue, key, &args); } } void BrokerReplicator::doResponseQueue(Variant::Map& values) { - QPID_LOG(debug, "HA: Backup queue response " << values); Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); if (!replicateLevel(argsMap)) return; framing::FieldTable args; @@ -423,16 +422,16 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/); if (result.second) { + QPID_LOG(debug, "HA: Backup queue response: " << name); startQueueReplicator(result.first); } else { // FIXME aconway 2011-11-22: Normal to find queue already // exists if we're failing over. - QPID_LOG(warning, "HA: Backup catch-up queue already exists: " << name); + QPID_LOG(warning, "HA: Backup queue response, already exists: " << name); } } void BrokerReplicator::doResponseExchange(Variant::Map& values) { - QPID_LOG(debug, "HA: Backup exchange response " << values); Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); if (!replicateLevel(argsMap)) return; framing::FieldTable args; @@ -446,9 +445,9 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/).second) { - QPID_LOG(debug, "HA: Backup catch-up exchange: " << values[NAME].asString()); + QPID_LOG(debug, "HA: Backup exchange response: " << values[NAME].asString()); } else { - QPID_LOG(warning, "HA: Backup catch-up exchange already exists: " << + QPID_LOG(warning, "HA: Backup exchange query, already exists: " << values[QNAME].asString()); } } @@ -475,7 +474,6 @@ const std::string QUEUE_REF("queueRef"); } // namespace void BrokerReplicator::doResponseBind(Variant::Map& values) { - QPID_LOG(debug, "HA: Backup bind response " << values); std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]); std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]); boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName); @@ -489,7 +487,7 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); string key = values[KEY].asString(); exchange->bind(queue, key, &args); - QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << exchange->getName() + QPID_LOG(debug, "HA: Backup bind response: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); } diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index f26f236efa..61b42ccc07 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -34,7 +34,7 @@ class HaBroker(Broker): assert BrokerTest.ha_lib, "Cannot locate HA plug-in" args = copy(args) args += ["--load-module", BrokerTest.ha_lib, - "--log-enable=info+", "--log-enable=trace+:ha::", # FIXME aconway 2012-04-18: trace + "--log-enable=info+", "--log-enable=debug+:ha::", # FIXME aconway 2012-02-13: workaround slow link failover. "--link-maintenace-interval=0.1", "--ha-cluster=%s"%ha_cluster] @@ -325,7 +325,7 @@ class ReplicationTests(BrokerTest): """Verify that a backup broker fails over and recovers queue state""" brokers = HaCluster(self, 3) brokers[0].connect().session().sender("q;{create:always}").send("a") - for b in brokers[1:]: b.assert_browse_backup("q", ["a"]) + for b in brokers[1:]: b.assert_browse_backup("q", ["a"], msg=b) brokers[0].expect = EXPECT_EXIT_FAIL brokers.kill(0) brokers[1].connect().session().sender("q").send("b") |