summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-04-23 15:51:33 +0000
committerAlan Conway <aconway@apache.org>2012-04-23 15:51:33 +0000
commita7c7e18f5d000b1e6e211a97e725b8bdca2900cc (patch)
tree912863a229b580fad5d115c9d048d6037ccd52ce
parentdda3a026db3037ac23570cfd29ee680d33ed4f7b (diff)
downloadqpid-python-a7c7e18f5d000b1e6e211a97e725b8bdca2900cc.tar.gz
QPID-3603: Install HA connection excluder at the beginning of initialization.
Previosly excluder was being installed late allowing connections to a backup and diverting backups from the real primary. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1329300 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp18
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp84
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py4
3 files changed, 53 insertions, 53 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 53550cfcd1..3f3fa87a01 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -47,6 +47,8 @@ using std::string;
Backup::Backup(HaBroker& hb, const Settings& s) :
haBroker(hb), broker(hb.getBroker()), settings(s), excluder(new ConnectionExcluder())
{
+ // Exclude client connections before starting the link to avoid self-connection.
+ broker.getConnectionObservers().add(excluder);
// Empty brokerUrl means delay initialization until setUrl() is called.
if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
}
@@ -62,12 +64,18 @@ void Backup::initialize(const Url& url) {
settings.mechanism, settings.username, settings.password);
link = result.first;
link->setUrl(url);
-
replicator.reset(new BrokerReplicator(haBroker, link));
broker.getExchanges().registerExchange(replicator);
- broker.getConnectionObservers().add(excluder);
}
+Backup::~Backup() {
+ if (link) link->close();
+ if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
+ replicator.reset();
+ broker.getConnectionObservers().remove(excluder); // This allows client connections.
+}
+
+
void Backup::setBrokerUrl(const Url& url) {
// Ignore empty URLs seen during start-up for some tests.
if (url.empty()) return;
@@ -81,10 +89,4 @@ void Backup::setBrokerUrl(const Url& url) {
}
}
-Backup::~Backup() {
- if (link) link->close();
- if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
- broker.getConnectionObservers().remove(excluder); // This allows client connections.
-}
-
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 70afd2bfa7..d0c99cbdb6 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -190,8 +190,6 @@ BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>&
: Exchange(QPID_CONFIGURATION_REPLICATOR),
haBroker(hb), broker(hb.getBroker()), link(l)
{
- QPID_LOG(info, "HA: Backup replicating from " <<
- link->getTransport() << ":" << link->getHost() << ":" << link->getPort());
broker.getLinks().declare(
link->getHost(), link->getPort(),
false, // durable
@@ -230,7 +228,7 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler);
sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler);
sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler);
- QPID_LOG(debug, "HA: Backup activated configuration bridge: " << queueName);
+ QPID_LOG(debug, "HA: Backup configuration bridge: " << queueName);
}
void BrokerReplicator::route(Deliverable& msg) {
@@ -246,6 +244,7 @@ void BrokerReplicator::route(Deliverable& msg) {
if (headers->getAsString(QMF_CONTENT) == EVENT) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
+ QPID_LOG(trace, "HA: Backup received event: " << map);
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
@@ -257,8 +256,10 @@ void BrokerReplicator::route(Deliverable& msg) {
}
} else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
- string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME];
- Variant::Map& values = i->asMap()[VALUES].asMap();
+ Variant::Map& map = i->asMap();
+ QPID_LOG(trace, "HA: Backup received event: " << map);
+ string type = map[SCHEMA_ID].asMap()[CLASS_NAME];
+ Variant::Map& values = map[VALUES].asMap();
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
if (type == QUEUE) doResponseQueue(values);
@@ -268,14 +269,13 @@ void BrokerReplicator::route(Deliverable& msg) {
}
}
} catch (const std::exception& e) {
- QPID_LOG(critical, "HA: Backup configuration replication failed: " << e.what()
+ QPID_LOG(critical, "HA: Backup configuration failed: " << e.what()
<< ": while handling: " << list);
throw;
}
}
void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
- QPID_LOG(debug, "HA: Backup queue declare event " << values);
string name = values[QNAME].asString();
Variant::Map argsMap = asMapVoid(values[ARGS]);
if (values[DISP] == CREATED && replicateLevel(argsMap)) {
@@ -292,26 +292,27 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
values[USER].asString(),
values[RHOST].asString());
if (result.second) {
- // FIXME aconway 2011-11-22: should delete old queue and
- // re-create from event.
- // Events are always up to date, whereas responses may be
- // out of date.
+ QPID_LOG(debug, "HA: Backup queue declare event: " << name);
startQueueReplicator(result.first);
} else {
// FIXME aconway 2011-12-02: what's the right way to handle this?
- QPID_LOG(warning, "HA: Backup queue already exists: " << name);
+ // Should we delete the old & re-create form the event? Responses
+ // may be old but events are always up-to-date.
+ QPID_LOG(warning, "HA: Backup queue declare event, already exists: " << name);
}
}
}
void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
- QPID_LOG(debug, "HA: Backup queue delete event " << values);
// The remote queue has already been deleted so replicator
// sessions may be closed by a "queue deleted" exception.
string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
- if (queue && replicateLevel(queue->getSettings())) {
- QPID_LOG(debug, "HA: Backup deleting queue: " << name);
+ if (!queue) {
+ QPID_LOG(warning, "HA: Backup queue delete event, does not exist: " << name);
+ } else if (!replicateLevel(queue->getSettings())) {
+ QPID_LOG(warning, "HA: Backup queue delete event, not replicated: " << name);
+ } else {
string rname = QueueReplicator::replicatorName(name);
boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname);
boost::shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex);
@@ -320,11 +321,11 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
// actually be destroyed, deleting the exhange
broker.getExchanges().destroy(rname);
broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString());
+ QPID_LOG(debug, "HA: Backup queue delete event: " << name);
}
}
void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
- QPID_LOG(debug, "HA: Backup exchange declare event " << values);
Variant::Map argsMap(asMapVoid(values[ARGS]));
if (values[DISP] == CREATED && replicateLevel(argsMap)) {
string name = values[EXNAME].asString();
@@ -339,32 +340,32 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
values[USER].asString(),
values[RHOST].asString()).second)
{
- QPID_LOG(debug, "HA: Backup created exchange: " << name);
+ QPID_LOG(debug, "HA: Backup exchange declare event: " << name);
} else {
- // FIXME aconway 2011-11-22: should delete pre-exisitng exchange
+ // FIXME aconway 2011-11-22: should delete pre-existing exchange
// and re-create from event. See comment in doEventQueueDeclare.
- QPID_LOG(warning, "HA: Backup exchange already exists: " << name);
+ QPID_LOG(debug, "HA: Backup exchange declare event, already exists: " << name);
}
}
}
void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
- QPID_LOG(debug, "HA: Backup exchange delete event " << values);
string name = values[EXNAME].asString();
- try {
- boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
- if (exchange && replicateLevel(exchange->getArgs())) {
- QPID_LOG(debug, "HA: Backup deleting exchange:" << name);
- broker.deleteExchange(
- name,
- values[USER].asString(),
- values[RHOST].asString());
- }
- } catch (const framing::NotFoundException&) {}
+ boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
+ if (!exchange) {
+ QPID_LOG(warning, "HA: Backup exchange delete event, does not exist: " << name);
+ } else if (!replicateLevel(exchange->getArgs())) {
+ QPID_LOG(warning, "HA: Backup exchange delete event, not replicated: " << name);
+ } else {
+ QPID_LOG(debug, "HA: Backup exchange delete event:" << name);
+ broker.deleteExchange(
+ name,
+ values[USER].asString(),
+ values[RHOST].asString());
+ }
}
void BrokerReplicator::doEventBind(Variant::Map& values) {
- QPID_LOG(debug, "HA: Backup bind event " << values);
boost::shared_ptr<Exchange> exchange =
broker.getExchanges().find(values[EXNAME].asString());
boost::shared_ptr<Queue> queue =
@@ -377,15 +378,14 @@ void BrokerReplicator::doEventBind(Variant::Map& values) {
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
- QPID_LOG(debug, "HA: Backup replicated binding exchange=" << exchange->getName()
+ exchange->bind(queue, key, &args);
+ QPID_LOG(debug, "HA: Backup bind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
- exchange->bind(queue, key, &args);
}
}
void BrokerReplicator::doEventUnbind(Variant::Map& values) {
- QPID_LOG(debug, "HA: Backup unbind event " << values);
boost::shared_ptr<Exchange> exchange =
broker.getExchanges().find(values[EXNAME].asString());
boost::shared_ptr<Queue> queue =
@@ -398,15 +398,14 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) {
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
- QPID_LOG(debug, "HA: Backup replicated unbinding exchange=" << exchange->getName()
+ exchange->unbind(queue, key, &args);
+ QPID_LOG(debug, "HA: Backup unbind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
- exchange->unbind(queue, key, &args);
}
}
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
- QPID_LOG(debug, "HA: Backup queue response " << values);
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!replicateLevel(argsMap)) return;
framing::FieldTable args;
@@ -423,16 +422,16 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/);
if (result.second) {
+ QPID_LOG(debug, "HA: Backup queue response: " << name);
startQueueReplicator(result.first);
} else {
// FIXME aconway 2011-11-22: Normal to find queue already
// exists if we're failing over.
- QPID_LOG(warning, "HA: Backup catch-up queue already exists: " << name);
+ QPID_LOG(warning, "HA: Backup queue response, already exists: " << name);
}
}
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
- QPID_LOG(debug, "HA: Backup exchange response " << values);
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!replicateLevel(argsMap)) return;
framing::FieldTable args;
@@ -446,9 +445,9 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/).second)
{
- QPID_LOG(debug, "HA: Backup catch-up exchange: " << values[NAME].asString());
+ QPID_LOG(debug, "HA: Backup exchange response: " << values[NAME].asString());
} else {
- QPID_LOG(warning, "HA: Backup catch-up exchange already exists: " <<
+ QPID_LOG(warning, "HA: Backup exchange query, already exists: " <<
values[QNAME].asString());
}
}
@@ -475,7 +474,6 @@ const std::string QUEUE_REF("queueRef");
} // namespace
void BrokerReplicator::doResponseBind(Variant::Map& values) {
- QPID_LOG(debug, "HA: Backup bind response " << values);
std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName);
@@ -489,7 +487,7 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
string key = values[KEY].asString();
exchange->bind(queue, key, &args);
- QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << exchange->getName()
+ QPID_LOG(debug, "HA: Backup bind response: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
}
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index f26f236efa..61b42ccc07 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -34,7 +34,7 @@ class HaBroker(Broker):
assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
- "--log-enable=info+", "--log-enable=trace+:ha::", # FIXME aconway 2012-04-18: trace
+ "--log-enable=info+", "--log-enable=debug+:ha::",
# FIXME aconway 2012-02-13: workaround slow link failover.
"--link-maintenace-interval=0.1",
"--ha-cluster=%s"%ha_cluster]
@@ -325,7 +325,7 @@ class ReplicationTests(BrokerTest):
"""Verify that a backup broker fails over and recovers queue state"""
brokers = HaCluster(self, 3)
brokers[0].connect().session().sender("q;{create:always}").send("a")
- for b in brokers[1:]: b.assert_browse_backup("q", ["a"])
+ for b in brokers[1:]: b.assert_browse_backup("q", ["a"], msg=b)
brokers[0].expect = EXPECT_EXIT_FAIL
brokers.kill(0)
brokers[1].connect().session().sender("q").send("b")