summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-17 14:16:11 +0000
committerAlan Conway <aconway@apache.org>2012-02-17 14:16:11 +0000
commit91cd22db895e41296bf57f0aea288d2068ab5c04 (patch)
tree7472171275c76075fa5c53caabde601781084a9c
parentfd205ac89a9146ef22b4dd057d83fa84fb12ece0 (diff)
downloadqpid-python-91cd22db895e41296bf57f0aea288d2068ab5c04.tar.gz
QPID-3603: HA brokers set known-hosts to the HA broker-url.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-7@1245547 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp17
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.h2
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp57
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h13
-rw-r--r--qpid/cpp/src/qpid/ha/Settings.h8
5 files changed, 66 insertions, 31 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 32f87dc722..5acbfb9d5f 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -46,10 +46,12 @@ using std::string;
Backup::Backup(broker::Broker& b, const Settings& s) :
broker(b), settings(s), excluder(new ConnectionExcluder())
{
+ // Empty brokerUrl means delay initialization until setUrl() is called.
if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
}
void Backup::initialize(const Url& url) {
+ assert(!url.empty());
QPID_LOG(notice, "Ha: Backup started: " << url);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
// Declare the link
@@ -66,20 +68,23 @@ void Backup::initialize(const Url& url) {
broker.getConnectionObservers().add(excluder);
}
-void Backup::setUrl(const Url& url) {
+void Backup::setBrokerUrl(const Url& url) {
+ // Ignore empty URLs seen during start-up for some tests.
+ if (url.empty()) return;
sys::Mutex::ScopedLock l(lock);
- if (!replicator.get())
- initialize(url);
- else {
- QPID_LOG(info, "HA: Backup URL set to " << url);
+ if (link) { // URL changed after we initialized.
+ QPID_LOG(info, "HA: Backup failover URL set to " << url);
link->setUrl(url);
}
+ else {
+ initialize(url); // Deferred initialization
+ }
}
Backup::~Backup() {
if (link) link->close();
if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
- broker.getConnectionObservers().remove(excluder); // Allows client connections.
+ broker.getConnectionObservers().remove(excluder); // This allows client connections.
}
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h
index 3ebb6d018d..526b238b82 100644
--- a/qpid/cpp/src/qpid/ha/Backup.h
+++ b/qpid/cpp/src/qpid/ha/Backup.h
@@ -49,7 +49,7 @@ class Backup
public:
Backup(broker::Broker&, const Settings&);
~Backup();
- void setUrl(const Url&);
+ void setBrokerUrl(const Url&);
private:
void initialize(const Url&);
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 349b85431a..0d3bd51439 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -39,21 +39,13 @@ using namespace management;
using namespace std;
namespace {
-Url url(const std::string& s, const std::string& id) {
- try {
- // Allow the URL to be empty, used in tests that set the URL
- // after starting broker
- return s.empty() ? Url() : Url(s);
- } catch (const std::exception& e) {
- throw Exception(Msg() << "Invalid URL for " << id << ": '" << s << "'");
- }
-}
const std::string PRIMARY="primary";
const std::string BACKUP="backup";
} // namespace
+
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
: broker(b),
settings(s),
@@ -65,6 +57,8 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
boost::shared_ptr<ReplicatingSubscription::Factory>(
new ReplicatingSubscription::Factory()));
+ broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
+
ManagementAgent* ma = broker.getManagementAgent();
if (!ma)
throw Exception("Cannot start HA: management is disabled");
@@ -74,6 +68,9 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
mgmtObject->set_status(BACKUP);
ma->addObject(mgmtObject);
}
+ sys::Mutex::ScopedLock l(lock);
+ if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l);
+ if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l);
}
HaBroker::~HaBroker() {}
@@ -92,17 +89,15 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
break;
}
case _qmf::HaBroker::METHOD_SETCLIENTADDRESSES: {
- string url = dynamic_cast<_qmf::ArgsHaBrokerSetClientAddresses&>(args)
- .i_clientAddresses;
- mgmtObject->set_clientAddresses(url);
- // FIXME aconway 2012-01-30: upate status for new URL
+ setClientUrl(
+ Url(dynamic_cast<_qmf::ArgsHaBrokerSetClientAddresses&>(args).
+ i_clientAddresses), l);
break;
}
case _qmf::HaBroker::METHOD_SETBROKERADDRESSES: {
- string url = dynamic_cast<_qmf::ArgsHaBrokerSetBrokerAddresses&>(args)
- .i_brokerAddresses;
- mgmtObject->set_brokerAddresses(url);
- if (backup.get()) backup->setUrl(Url(url));
+ setBrokerUrl(
+ Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokerAddresses&>(args)
+ .i_brokerAddresses), l);
break;
}
default:
@@ -111,4 +106,32 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
return Manageable::STATUS_OK;
}
+void HaBroker::setClientUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
+ if (url.empty()) throw Exception("Invalid empty URL for HA client failover");
+ clientUrl = url;
+ updateClientUrl(l);
+}
+
+void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) {
+ Url url = clientUrl.empty() ? brokerUrl : clientUrl;
+ assert(!url.empty());
+ mgmtObject->set_clientAddresses(url.str());
+ knownBrokers.clear();
+ knownBrokers.push_back(url);
+ QPID_LOG(debug, "HA: Setting client known-brokers to: " << url);
+}
+
+void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
+ if (url.empty()) throw Exception("Invalid empty URL for HA broker failover");
+ brokerUrl = url;
+ mgmtObject->set_brokerAddresses(brokerUrl.str());
+ if (backup.get()) backup->setBrokerUrl(brokerUrl);
+ // Updating broker URL also updates defaulted client URL:
+ if (clientUrl.empty()) updateClientUrl(l);
+}
+
+std::vector<Url> HaBroker::getKnownBrokers() const {
+ return knownBrokers;
+}
+
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index e3dc46946b..4d7bf80c90 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -54,11 +54,20 @@ class HaBroker : public management::Manageable
uint32_t methodId, management::Args& args, std::string& text);
private:
- sys::Mutex lock;
+ void setClientUrl(const Url&, const sys::Mutex::ScopedLock&);
+ void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&);
+ void updateClientUrl(const sys::Mutex::ScopedLock&);
+ bool isPrimary(const sys::Mutex::ScopedLock&) { return !backup.get(); }
+ std::vector<Url> getKnownBrokers() const;
+
broker::Broker& broker;
- Settings settings;
+ const Settings settings;
+
+ sys::Mutex lock;
std::auto_ptr<Backup> backup;
qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
+ Url clientUrl, brokerUrl;
+ std::vector<Url> knownBrokers;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h
index a2d2e89d82..049c873b9f 100644
--- a/qpid/cpp/src/qpid/ha/Settings.h
+++ b/qpid/cpp/src/qpid/ha/Settings.h
@@ -27,8 +27,6 @@
namespace qpid {
namespace ha {
-using std::string;
-
/**
* Configurable settings for HA.
*/
@@ -37,9 +35,9 @@ class Settings
public:
Settings() : enabled(false) {}
bool enabled;
- string clientUrl;
- string brokerUrl;
- string username, password, mechanism;
+ std::string clientUrl;
+ std::string brokerUrl;
+ std::string username, password, mechanism;
private:
};
}} // namespace qpid::ha