summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-08-07 15:19:36 +0000
committerAlan Conway <aconway@apache.org>2012-08-07 15:19:36 +0000
commit1ae28e3697cf24c6b8a0f9df224f8ed10ef029c4 (patch)
tree0f7015e2735ac145a4260180f1796306ccc7645f
parente5ba61caeb60d04b3ab8accc2dd3d39927893ee6 (diff)
downloadqpid-python-1ae28e3697cf24c6b8a0f9df224f8ed10ef029c4.tar.gz
QPID-4191: HA removing self address breaks if a VIP is used.
Pre this patch the HA broker removed its own address from the set of cluster addresses to form the set of failover addresses. The goal was avoid useless self-connection attempts. However this was broken with a Virtual IP address where a single address is used for the entire cluster. The remove-self is not essential, self-connection attempts are prevented elsewhere. Backup brokers will be prevented from connecting to self by the same connection-observer as normal clients, and this patch addes self-connection checks ins This patch - removes the code to remove self-addresses - adds self-connection checks in ConnectionObserver - adds & reorders some log statements & comments for greater clarity. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1370308 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp32
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.h2
-rw-r--r--qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h2
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionObserver.cpp48
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionObserver.h4
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp14
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h2
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp15
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.cpp2
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py11
11 files changed, 68 insertions, 67 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index bac6fd23c8..e099554df6 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -51,35 +51,15 @@ Backup::Backup(HaBroker& hb, const Settings& s) :
if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
}
-bool Backup::isSelf(const Address& a) const {
- return sys::SystemInfo::isLocalHost(a.host) &&
- a.port == haBroker.getBroker().getPort(a.protocol);
-}
-
-// Remove my own address from the URL if possible.
-// This isn't 100% reliable given the many ways to specify a host,
-// but should work in most cases. We have additional measures to prevent
-// self-connection in ConnectionObserver
-Url Backup::removeSelf(const Url& brokers) const {
- Url url;
- for (Url::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
- if (!isSelf(*i)) url.push_back(*i);
- if (url.empty())
- throw Url::Invalid(logPrefix+"Failover URL is empty");
- QPID_LOG(debug, logPrefix << "Failover URL (excluding self): " << url);
- return url;
-}
-
void Backup::initialize(const Url& brokers) {
if (brokers.empty()) throw Url::Invalid("HA broker URL is empty");
QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers);
- Url url = removeSelf(brokers);
- string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+ string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol;
types::Uuid uuid(true);
// Declare the link
std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
- url[0].host, url[0].port, protocol,
+ brokers[0].host, brokers[0].port, protocol,
false, // durable
settings.mechanism, settings.username, settings.password,
false); // no amq.failover - don't want to use client URL.
@@ -90,7 +70,7 @@ void Backup::initialize(const Url& brokers) {
replicator->initialize();
broker.getExchanges().registerExchange(replicator);
}
- link->setUrl(url); // Outside the lock, once set link doesn't change.
+ link->setUrl(brokers); // Outside the lock, once set link doesn't change.
}
Backup::~Backup() {
@@ -107,10 +87,8 @@ void Backup::setBrokerUrl(const Url& url) {
sys::Mutex::ScopedLock l(lock);
linkSet = link;
}
- if (linkSet) {
- QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
- link->setUrl(removeSelf(url)); // Outside lock, once set link doesn't change
- }
+ if (linkSet)
+ link->setUrl(url); // Outside lock, once set link doesn't change
else
initialize(url); // Deferred initialization
}
diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h
index 1233a473ec..4f2d5babde 100644
--- a/qpid/cpp/src/qpid/ha/Backup.h
+++ b/qpid/cpp/src/qpid/ha/Backup.h
@@ -53,8 +53,6 @@ class Backup
void setStatus(BrokerStatus);
private:
- bool isSelf(const Address& a) const;
- Url removeSelf(const Url&) const;
void initialize(const Url&);
std::string logPrefix;
diff --git a/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
index ef537ab90a..5a67cde922 100644
--- a/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
+++ b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
@@ -36,7 +36,7 @@ class BackupConnectionExcluder : public broker::ConnectionObserver
{
public:
void opened(broker::Connection& connection) {
- QPID_LOG(debug, "Backup broker rejected connection "+connection.getMgmtId());
+ QPID_LOG(debug, "Backup: Rejected connection "+connection.getMgmtId());
connection.abort();
}
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 91a4bf242b..b7d36c641b 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -472,7 +472,8 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
boost::shared_ptr<Exchange> exchange = createExchange(
name, values[TYPE].asString(), values[DURABLE].asBool(), args,
getAltExchange(values[ALTEXCHANGE]));
- QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already exists: " << name);
+ // It is normal for the exchange to already exist if we are failing over.
+ QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already replicated: " << name);
}
namespace {
diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
index 553f50d802..81ba3e4301 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
@@ -32,7 +32,7 @@ namespace ha {
ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid)
: haBroker(hb), logPrefix("Connections: "), self(uuid) {}
-bool ConnectionObserver::getBrokerInfo(broker::Connection& connection, BrokerInfo& info) {
+bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, BrokerInfo& info) {
framing::FieldTable ft;
if (connection.getClientProperties().getTable(ConnectionObserver::BACKUP_TAG, ft)) {
info = BrokerInfo(ft);
@@ -51,29 +51,43 @@ ConnectionObserver::ObserverPtr ConnectionObserver::getObserver() {
return observer;
}
+bool ConnectionObserver::isSelf(const broker::Connection& connection) {
+ BrokerInfo info;
+ return getBrokerInfo(connection, info) && info.getSystemId() == self;
+}
+
void ConnectionObserver::opened(broker::Connection& connection) {
- if (connection.isLink()) return; // Allow outgoing links.
- if (connection.getClientProperties().isSet(ADMIN_TAG)) {
- QPID_LOG(debug, logPrefix << "Allowing admin connection: "
- << connection.getMgmtId());
- return; // No need to call observer, always allow admins.
- }
- BrokerInfo info; // Avoid self connections.
- if (getBrokerInfo(connection, info)) {
- if (info.getSystemId() == self) {
- QPID_LOG(debug, "HA broker rejected self connection "+connection.getMgmtId());
+ try {
+ if (connection.isLink()) return; // Allow outgoing links.
+ if (connection.getClientProperties().isSet(ADMIN_TAG)) {
+ QPID_LOG(debug, logPrefix << "Accepted admin connection: "
+ << connection.getMgmtId());
+ return; // No need to call observer, always allow admins.
+ }
+ if (isSelf(connection)) { // Reject self connections
+ QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId());
connection.abort();
+ return;
}
-
+ ObserverPtr o(getObserver());
+ if (o) o->opened(connection);
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, logPrefix << "Open error: " << e.what());
+ throw;
}
- ObserverPtr o(getObserver());
- if (o) o->opened(connection);
}
void ConnectionObserver::closed(broker::Connection& connection) {
- BrokerInfo info;
- ObserverPtr o(getObserver());
- if (o) o->closed(connection);
+ if (isSelf(connection)) return; // Ignore closing of self connections.
+ try {
+ ObserverPtr o(getObserver());
+ if (o) o->closed(connection);
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, logPrefix << "Close error: " << e.what());
+ throw;
+ }
}
const std::string ConnectionObserver::ADMIN_TAG="qpid.ha-admin";
diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.h b/qpid/cpp/src/qpid/ha/ConnectionObserver.h
index 5c1dabe8f8..e3a6d1154a 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionObserver.h
+++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.h
@@ -51,7 +51,7 @@ class ConnectionObserver : public broker::ConnectionObserver
static const std::string ADMIN_TAG;
static const std::string BACKUP_TAG;
- static bool getBrokerInfo(broker::Connection& connection, BrokerInfo& info);
+ static bool getBrokerInfo(const broker::Connection& connection, BrokerInfo& info);
ConnectionObserver(HaBroker& haBroker, const types::Uuid& self);
@@ -62,6 +62,8 @@ class ConnectionObserver : public broker::ConnectionObserver
void closed(broker::Connection& connection);
private:
+ bool isSelf(const broker::Connection&);
+
sys::Mutex lock;
HaBroker& haBroker;
std::string logPrefix;
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index fc74ce633a..3c984a023a 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -83,7 +83,11 @@ void HaBroker::initialize() {
// FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port.
brokerInfo = BrokerInfo(
- broker.getSystem()->getNodeName(), broker.getPort(broker::Broker::TCP_TRANSPORT), systemId);
+ broker.getSystem()->getNodeName(),
+ broker.getPort(broker::Broker::TCP_TRANSPORT),
+ systemId);
+
+ QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
// Set up the management object.
ManagementAgent* ma = broker.getManagementAgent();
@@ -111,8 +115,6 @@ void HaBroker::initialize() {
if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl));
- QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
-
// NOTE: lock is not needed in a constructor, but create one
// to pass to functions that have a ScopedLock parameter.
Mutex::ScopedLock l(lock);
@@ -226,6 +228,7 @@ void HaBroker::setBrokerUrl(const Url& url) {
if (url.empty()) throw Url::Invalid("HA broker URL is empty");
brokerUrl = url;
mgmtObject->set_brokersUrl(brokerUrl.str());
+ QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
if (backup.get()) backup->setBrokerUrl(brokerUrl);
// Updating broker URL also updates defaulted client URL:
if (clientUrl.empty()) updateClientUrl(l);
@@ -292,6 +295,7 @@ void HaBroker::statusChanged(Mutex::ScopedLock& l) {
}
void HaBroker::membershipUpdated(Mutex::ScopedLock&) {
+ QPID_LOG(info, logPrefix << "Membership changed: " << membership);
Variant::List brokers = membership.asList();
mgmtObject->set_members(brokers);
broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
@@ -321,14 +325,14 @@ void HaBroker::resetMembership(const BrokerInfo& b) {
void HaBroker::addBroker(const BrokerInfo& b) {
Mutex::ScopedLock l(lock);
membership.add(b);
- QPID_LOG(debug, logPrefix << "Membership add: " << b << " now: " << membership);
+ QPID_LOG(debug, logPrefix << "Membership add: " << b);
membershipUpdated(l);
}
void HaBroker::removeBroker(const Uuid& id) {
Mutex::ScopedLock l(lock);
membership.remove(id);
- QPID_LOG(debug, logPrefix << "Membership remove: " << id << " now: " << membership);
+ QPID_LOG(debug, logPrefix << "Membership remove: " << id);
membershipUpdated(l);
}
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 0ffc152097..7dabe6e35b 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -97,6 +97,8 @@ class HaBroker : public management::Manageable
void addBroker(const BrokerInfo& b); // Add a broker to the membership.
void removeBroker(const types::Uuid& id); // Remove a broker from membership.
+ types::Uuid getSystemId() const { return systemId; }
+
private:
void setClientUrl(const Url&);
void setBrokerUrl(const Url&);
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index e7aa4858be..c3261533e6 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -190,11 +190,13 @@ void Primary::queueDestroy(const QueuePtr& q) {
}
void Primary::opened(broker::Connection& connection) {
+ QPID_LOG(critical, "FIXME opened " << connection.getMgmtId());
BrokerInfo info;
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
Mutex::ScopedLock l(lock);
BackupMap::iterator i = backups.find(info.getSystemId());
if (i == backups.end()) {
+ QPID_LOG(debug, logPrefix << "New backup connected: " << info);
boost::shared_ptr<RemoteBackup> backup(
new RemoteBackup(info, haBroker.getReplicationTest(), true));
{
@@ -203,7 +205,6 @@ void Primary::opened(broker::Connection& connection) {
backup->setInitialQueues(haBroker.getBroker().getQueues(), false);
}
backups[info.getSystemId()] = backup;
- QPID_LOG(debug, logPrefix << "New backup connected: " << info);
}
else {
QPID_LOG(debug, logPrefix << "Known backup connected: " << info);
@@ -219,6 +220,12 @@ void Primary::opened(broker::Connection& connection) {
}
void Primary::closed(broker::Connection& connection) {
+ // NOTE: It is possible for a backup connection to be rejected while we are
+ // a backup, but closed() is called after we have become primary.
+ //
+ // For this reason we do not remove from the backups map here, the backups
+ // map holds all the backups we know about whether connected or not.
+ //
Mutex::ScopedLock l(lock);
BrokerInfo info;
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
@@ -227,12 +234,6 @@ void Primary::closed(broker::Connection& connection) {
BackupMap::iterator i = backups.find(info.getSystemId());
if (i != backups.end()) i->second->setConnected(false);
}
- // NOTE: we do not remove from the backups map here, the backups map holds
- // all the backups we know about whether connected or not.
- //
- // It is possible for a backup connection to be rejected while we are a backup,
- // but the closed is seen after we have become primary. Removing the entry
- // from backups in this case would be incorrect.
}
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
index a5693fd14e..9c7bf1e694 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -32,7 +32,7 @@ using sys::Mutex;
using boost::bind;
RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) :
- logPrefix("Primary remote backup "+info.getLogId()+": "),
+ logPrefix("Primary: Remote backup "+info.getLogId()+": "),
brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false)
{}
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 414ede7cca..246b0ed423 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -63,7 +63,6 @@ class HaBroker(Broker):
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
"--log-enable=debug+:ha::",
- "--log-enable=trace+:ha::", # FIXME aconway 2012-07-12:
# FIXME aconway 2012-02-13: workaround slow link failover.
"--link-maintenace-interval=0.1",
"--ha-cluster=%s"%ha_cluster]
@@ -112,9 +111,11 @@ class HaBroker(Broker):
def wait_status(self, status):
def try_get_status():
# Ignore ConnectionError, the broker may not be up yet.
- try: return self.ha_status() == status;
+ try:
+ self._status = self.ha_status()
+ return self._status == status;
except ConnectionError: return False
- assert retry(try_get_status, timeout=20), "%s status != %r"%(self, status)
+ assert retry(try_get_status, timeout=20), "%s %r != %r"%(self, self._status, status)
# FIXME aconway 2012-05-01: do direct python call to qpid-config code.
def qpid_config(self, args):
@@ -761,7 +762,7 @@ acl deny all all
s1.sender("ex").send("foo");
self.assertEqual(s1.receiver("q").fetch().content, "foo")
- def test_alterante_exchange(self):
+ def test_alternate_exchange(self):
"""Verify that alternate-exchange on exchanges and queues is propagated
to new members of a cluster. """
cluster = HaCluster(self, 2)
@@ -964,7 +965,7 @@ class RecoveryTests(BrokerTest):
"""
cluster = HaCluster(self, 3, args=["--ha-backup-timeout=0.5"]);
cluster[0].wait_status("active") # Primary ready
- for b in cluster[1:4]: b.wait_status("ready") # Backups ready
+ for b in cluster[1:3]: b.wait_status("ready") # Backups ready
for i in [0,1]: cluster.kill(i, False)
cluster[2].promote() # New primary, backups will be 1 and 2
cluster[2].wait_status("recovering")