summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp32
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.h2
-rw-r--r--qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h2
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionObserver.cpp22
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionObserver.h4
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp14
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h2
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp12
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.cpp2
10 files changed, 41 insertions, 54 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index bac6fd23c8..e099554df6 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -51,35 +51,15 @@ Backup::Backup(HaBroker& hb, const Settings& s) :
if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
}
-bool Backup::isSelf(const Address& a) const {
- return sys::SystemInfo::isLocalHost(a.host) &&
- a.port == haBroker.getBroker().getPort(a.protocol);
-}
-
-// Remove my own address from the URL if possible.
-// This isn't 100% reliable given the many ways to specify a host,
-// but should work in most cases. We have additional measures to prevent
-// self-connection in ConnectionObserver
-Url Backup::removeSelf(const Url& brokers) const {
- Url url;
- for (Url::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
- if (!isSelf(*i)) url.push_back(*i);
- if (url.empty())
- throw Url::Invalid(logPrefix+"Failover URL is empty");
- QPID_LOG(debug, logPrefix << "Failover URL (excluding self): " << url);
- return url;
-}
-
void Backup::initialize(const Url& brokers) {
if (brokers.empty()) throw Url::Invalid("HA broker URL is empty");
QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers);
- Url url = removeSelf(brokers);
- string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+ string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol;
types::Uuid uuid(true);
// Declare the link
std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
- url[0].host, url[0].port, protocol,
+ brokers[0].host, brokers[0].port, protocol,
false, // durable
settings.mechanism, settings.username, settings.password,
false); // no amq.failover - don't want to use client URL.
@@ -90,7 +70,7 @@ void Backup::initialize(const Url& brokers) {
replicator->initialize();
broker.getExchanges().registerExchange(replicator);
}
- link->setUrl(url); // Outside the lock, once set link doesn't change.
+ link->setUrl(brokers); // Outside the lock, once set link doesn't change.
}
Backup::~Backup() {
@@ -107,10 +87,8 @@ void Backup::setBrokerUrl(const Url& url) {
sys::Mutex::ScopedLock l(lock);
linkSet = link;
}
- if (linkSet) {
- QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
- link->setUrl(removeSelf(url)); // Outside lock, once set link doesn't change
- }
+ if (linkSet)
+ link->setUrl(url); // Outside lock, once set link doesn't change
else
initialize(url); // Deferred initialization
}
diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h
index 1233a473ec..4f2d5babde 100644
--- a/qpid/cpp/src/qpid/ha/Backup.h
+++ b/qpid/cpp/src/qpid/ha/Backup.h
@@ -53,8 +53,6 @@ class Backup
void setStatus(BrokerStatus);
private:
- bool isSelf(const Address& a) const;
- Url removeSelf(const Url&) const;
void initialize(const Url&);
std::string logPrefix;
diff --git a/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
index ef537ab90a..5a67cde922 100644
--- a/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
+++ b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
@@ -36,7 +36,7 @@ class BackupConnectionExcluder : public broker::ConnectionObserver
{
public:
void opened(broker::Connection& connection) {
- QPID_LOG(debug, "Backup broker rejected connection "+connection.getMgmtId());
+ QPID_LOG(debug, "Backup: Rejected connection "+connection.getMgmtId());
connection.abort();
}
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index c8c4a42d72..cf413f35d0 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -474,7 +474,8 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
boost::shared_ptr<Exchange> exchange = createExchange(
name, values[TYPE].asString(), values[DURABLE].asBool(), args,
getAltExchange(values[ALTEXCHANGE]));
- QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already exists: " << name);
+ // It is normal for the exchange to already exist if we are failing over.
+ QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already replicated: " << name);
}
namespace {
diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
index 3f7a1710d9..81ba3e4301 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
@@ -32,7 +32,7 @@ namespace ha {
ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid)
: haBroker(hb), logPrefix("Connections: "), self(uuid) {}
-bool ConnectionObserver::getBrokerInfo(broker::Connection& connection, BrokerInfo& info) {
+bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, BrokerInfo& info) {
framing::FieldTable ft;
if (connection.getClientProperties().getTable(ConnectionObserver::BACKUP_TAG, ft)) {
info = BrokerInfo(ft);
@@ -51,21 +51,23 @@ ConnectionObserver::ObserverPtr ConnectionObserver::getObserver() {
return observer;
}
+bool ConnectionObserver::isSelf(const broker::Connection& connection) {
+ BrokerInfo info;
+ return getBrokerInfo(connection, info) && info.getSystemId() == self;
+}
+
void ConnectionObserver::opened(broker::Connection& connection) {
try {
if (connection.isLink()) return; // Allow outgoing links.
if (connection.getClientProperties().isSet(ADMIN_TAG)) {
- QPID_LOG(debug, logPrefix << "Allowing admin connection: "
+ QPID_LOG(debug, logPrefix << "Accepted admin connection: "
<< connection.getMgmtId());
return; // No need to call observer, always allow admins.
}
- BrokerInfo info; // Avoid self connections.
- if (getBrokerInfo(connection, info)) {
- if (info.getSystemId() == self) {
- QPID_LOG(debug, "HA broker rejected self connection "+connection.getMgmtId());
- connection.abort();
- }
-
+ if (isSelf(connection)) { // Reject self connections
+ QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId());
+ connection.abort();
+ return;
}
ObserverPtr o(getObserver());
if (o) o->opened(connection);
@@ -77,8 +79,8 @@ void ConnectionObserver::opened(broker::Connection& connection) {
}
void ConnectionObserver::closed(broker::Connection& connection) {
+ if (isSelf(connection)) return; // Ignore closing of self connections.
try {
- BrokerInfo info;
ObserverPtr o(getObserver());
if (o) o->closed(connection);
}
diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.h b/qpid/cpp/src/qpid/ha/ConnectionObserver.h
index 5c1dabe8f8..e3a6d1154a 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionObserver.h
+++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.h
@@ -51,7 +51,7 @@ class ConnectionObserver : public broker::ConnectionObserver
static const std::string ADMIN_TAG;
static const std::string BACKUP_TAG;
- static bool getBrokerInfo(broker::Connection& connection, BrokerInfo& info);
+ static bool getBrokerInfo(const broker::Connection& connection, BrokerInfo& info);
ConnectionObserver(HaBroker& haBroker, const types::Uuid& self);
@@ -62,6 +62,8 @@ class ConnectionObserver : public broker::ConnectionObserver
void closed(broker::Connection& connection);
private:
+ bool isSelf(const broker::Connection&);
+
sys::Mutex lock;
HaBroker& haBroker;
std::string logPrefix;
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index d126639813..ffbcb684bc 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -83,7 +83,11 @@ void HaBroker::initialize() {
// FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port.
brokerInfo = BrokerInfo(
- broker.getSystem()->getNodeName(), broker.getPort(broker::Broker::TCP_TRANSPORT), systemId);
+ broker.getSystem()->getNodeName(),
+ broker.getPort(broker::Broker::TCP_TRANSPORT),
+ systemId);
+
+ QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
// Set up the management object.
ManagementAgent* ma = broker.getManagementAgent();
@@ -111,8 +115,6 @@ void HaBroker::initialize() {
if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl));
- QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
-
// NOTE: lock is not needed in a constructor, but create one
// to pass to functions that have a ScopedLock parameter.
Mutex::ScopedLock l(lock);
@@ -226,6 +228,7 @@ void HaBroker::setBrokerUrl(const Url& url) {
if (url.empty()) throw Url::Invalid("HA broker URL is empty");
brokerUrl = url;
mgmtObject->set_brokersUrl(brokerUrl.str());
+ QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
if (backup.get()) backup->setBrokerUrl(brokerUrl);
// Updating broker URL also updates defaulted client URL:
if (clientUrl.empty()) updateClientUrl(l);
@@ -292,6 +295,7 @@ void HaBroker::statusChanged(Mutex::ScopedLock& l) {
}
void HaBroker::membershipUpdated(Mutex::ScopedLock&) {
+ QPID_LOG(info, logPrefix << "Membership changed: " << membership);
Variant::List brokers = membership.asList();
mgmtObject->set_members(brokers);
broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
@@ -321,14 +325,14 @@ void HaBroker::resetMembership(const BrokerInfo& b) {
void HaBroker::addBroker(const BrokerInfo& b) {
Mutex::ScopedLock l(lock);
membership.add(b);
- QPID_LOG(debug, logPrefix << "Membership add: " << b << " now: " << membership);
+ QPID_LOG(debug, logPrefix << "Membership add: " << b);
membershipUpdated(l);
}
void HaBroker::removeBroker(const Uuid& id) {
Mutex::ScopedLock l(lock);
membership.remove(id);
- QPID_LOG(debug, logPrefix << "Membership remove: " << id << " now: " << membership);
+ QPID_LOG(debug, logPrefix << "Membership remove: " << id);
membershipUpdated(l);
}
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 0ffc152097..7dabe6e35b 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -97,6 +97,8 @@ class HaBroker : public management::Manageable
void addBroker(const BrokerInfo& b); // Add a broker to the membership.
void removeBroker(const types::Uuid& id); // Remove a broker from membership.
+ types::Uuid getSystemId() const { return systemId; }
+
private:
void setClientUrl(const Url&);
void setBrokerUrl(const Url&);
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 69c94bfc7d..45a0e246f3 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -225,6 +225,12 @@ void Primary::opened(broker::Connection& connection) {
}
void Primary::closed(broker::Connection& connection) {
+ // NOTE: It is possible for a backup connection to be rejected while we are
+ // a backup, but closed() is called after we have become primary.
+ //
+ // For this reason we do not remove from the backups map here, the backups
+ // map holds all the backups we know about whether connected or not.
+ //
Mutex::ScopedLock l(lock);
BrokerInfo info;
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
@@ -233,12 +239,6 @@ void Primary::closed(broker::Connection& connection) {
BackupMap::iterator i = backups.find(info.getSystemId());
if (i != backups.end()) i->second->setConnected(false);
}
- // NOTE: we do not remove from the backups map here, the backups map holds
- // all the backups we know about whether connected or not.
- //
- // It is possible for a backup connection to be rejected while we are a backup,
- // but the closed is seen after we have become primary. Removing the entry
- // from backups in this case would be incorrect.
}
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
index a5693fd14e..9c7bf1e694 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -32,7 +32,7 @@ using sys::Mutex;
using boost::bind;
RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) :
- logPrefix("Primary remote backup "+info.getLogId()+": "),
+ logPrefix("Primary: Remote backup "+info.getLogId()+": "),
brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false)
{}