summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/HaBroker.cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2013-02-28 16:14:30 +0000
committerKim van der Riet <kpvdr@apache.org>2013-02-28 16:14:30 +0000
commit9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch)
tree2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/ha/HaBroker.cpp
parent172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff)
downloadqpid-python-9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919.tar.gz
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/HaBroker.cpp')
-rw-r--r--cpp/src/qpid/ha/HaBroker.cpp252
1 files changed, 54 insertions, 198 deletions
diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp
index ffbcb684bc..590db7efa5 100644
--- a/cpp/src/qpid/ha/HaBroker.cpp
+++ b/cpp/src/qpid/ha/HaBroker.cpp
@@ -26,6 +26,7 @@
#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
+#include "StandAlone.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
@@ -41,7 +42,6 @@
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokersUrl.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicUrl.h"
-#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
#include "qpid/log/Statement.h"
#include <boost/shared_ptr.hpp>
@@ -54,134 +54,100 @@ using namespace std;
using types::Variant;
using types::Uuid;
using sys::Mutex;
+using boost::shared_ptr;
+using boost::dynamic_pointer_cast;
// Called in Plugin::earlyInitialize
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
- : logPrefix("Broker: "),
- broker(b),
- systemId(broker.getSystem()->getSystemId().data()),
+ : systemId(b.getSystem()->getSystemId().data()),
settings(s),
+ broker(b),
observer(new ConnectionObserver(*this, systemId)),
- mgmtObject(0),
- status(STANDALONE),
- membership(systemId),
- replicationTest(s.replicateDefault.get())
+ role(new StandAlone),
+ membership(BrokerInfo(systemId, STANDALONE), *this)
{
// If we are joining a cluster we must start excluding clients now,
// otherwise there's a window for a client to connect before we get to
// initialize()
if (settings.cluster) {
- QPID_LOG(debug, logPrefix << "Rejecting client connections.");
- observer->setObserver(boost::shared_ptr<broker::ConnectionObserver>(
- new BackupConnectionExcluder));
+ QPID_LOG(debug, "Broker startup, rejecting client connections.");
+ shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder);
+ observer->setObserver(excluder, "Backup: ");
broker.getConnectionObservers().add(observer);
}
}
+namespace {
+const std::string NONE("none");
+bool isNone(const std::string& x) { return x.empty() || x == NONE; }
+}
+
// Called in Plugin::initialize
void HaBroker::initialize() {
-
// FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port.
- brokerInfo = BrokerInfo(
- broker.getSystem()->getNodeName(),
- broker.getPort(broker::Broker::TCP_TRANSPORT),
- systemId);
-
- QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
+ membership.add(
+ BrokerInfo(
+ membership.getSelf(),
+ settings.cluster ? JOINING : membership.getStatus(),
+ broker.getSystem()->getNodeName(),
+ broker.getPort(broker::Broker::TCP_TRANSPORT)
+ )
+ );
+ QPID_LOG(notice, "Initializing: " << membership.getInfo());
// Set up the management object.
ManagementAgent* ma = broker.getManagementAgent();
if (settings.cluster && !ma)
throw Exception("Cannot start HA: management is disabled");
_qmf::Package packageInit(ma);
- mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker");
+ mgmtObject = _qmf::HaBroker::shared_ptr(new _qmf::HaBroker(ma, this, "ha-broker"));
mgmtObject->set_replicateDefault(settings.replicateDefault.str());
mgmtObject->set_systemId(systemId);
ma->addObject(mgmtObject);
+ membership.setMgmtObject(mgmtObject);
// Register a factory for replicating subscriptions.
broker.getConsumerFactories().add(
- boost::shared_ptr<ReplicatingSubscription::Factory>(
+ shared_ptr<ReplicatingSubscription::Factory>(
new ReplicatingSubscription::Factory()));
// If we are in a cluster, start as backup in joining state.
if (settings.cluster) {
- status = JOINING;
- backup.reset(new Backup(*this, settings));
+ assert(membership.getStatus() == JOINING);
+ role.reset(new Backup(*this, settings));
broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
+ if (!isNone(settings.publicUrl)) setPublicUrl(Url(settings.publicUrl));
+ if (!isNone(settings.brokerUrl)) setBrokerUrl(Url(settings.brokerUrl));
}
-
- if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl));
- if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl));
-
-
- // NOTE: lock is not needed in a constructor, but create one
- // to pass to functions that have a ScopedLock parameter.
- Mutex::ScopedLock l(lock);
- statusChanged(l);
}
HaBroker::~HaBroker() {
- QPID_LOG(notice, logPrefix << "Shut down: " << brokerInfo);
+ QPID_LOG(notice, role->getLogPrefix() << "Shut down");
broker.getConnectionObservers().remove(observer);
}
-void HaBroker::recover() {
- auto_ptr<Backup> b;
- {
- Mutex::ScopedLock l(lock);
- // No longer replicating, close link. Note: link must be closed before we
- // setStatus(RECOVERING) as that will remove our broker info from the
- // outgoing link properties so we won't recognize self-connects.
- b = backup;
- }
- b.reset(); // Call destructor outside of lock.
- BrokerInfo::Set backups;
- {
- Mutex::ScopedLock l(lock);
- setStatus(RECOVERING, l);
- backups = membership.otherBackups();
- membership.reset(brokerInfo);
- // Drop the lock, new Primary may call back on activate.
- }
- // Outside of lock, may call back on activate()
- primary.reset(new Primary(*this, backups)); // Starts primary-ready check.
-}
-
-// Called back from Primary active check.
-void HaBroker::activate() { setStatus(ACTIVE); }
-
Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) {
switch (methodId) {
case _qmf::HaBroker::METHOD_PROMOTE: {
- switch (getStatus()) {
- case JOINING: recover(); break;
- case CATCHUP:
- QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
- throw Exception("Still catching up, cannot be promoted.");
- break;
- case READY: recover(); break;
- case RECOVERING: break;
- case ACTIVE: break;
- case STANDALONE: break;
- }
- break;
+ Role* r = role->promote();
+ if (r) role.reset(r);
+ break;
}
case _qmf::HaBroker::METHOD_SETBROKERSURL: {
setBrokerUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokersUrl&>(args).i_url));
break;
}
case _qmf::HaBroker::METHOD_SETPUBLICURL: {
- setClientUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicUrl&>(args).i_url));
+ setPublicUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicUrl&>(args).i_url));
break;
}
case _qmf::HaBroker::METHOD_REPLICATE: {
_qmf::ArgsHaBrokerReplicate& bq_args =
dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
- QPID_LOG(debug, logPrefix << "Replicate individual queue "
+ QPID_LOG(debug, role->getLogPrefix() << "Replicate individual queue "
<< bq_args.i_queue << " from " << bq_args.i_broker);
- boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
+ shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
Url url(bq_args.i_broker);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
Uuid uuid(true);
@@ -191,10 +157,10 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
false, // durable
settings.mechanism, settings.username, settings.password,
false); // no amq.failover - don't want to use client URL.
- boost::shared_ptr<broker::Link> link = result.first;
+ shared_ptr<broker::Link> link = result.first;
link->setUrl(url);
// Create a queue replicator
- boost::shared_ptr<QueueReplicator> qr(
+ shared_ptr<QueueReplicator> qr(
new QueueReplicator(*this, queue, link));
qr->activate();
broker.getExchanges().registerExchange(qr);
@@ -207,31 +173,23 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
return Manageable::STATUS_OK;
}
-void HaBroker::setClientUrl(const Url& url) {
+void HaBroker::setPublicUrl(const Url& url) {
Mutex::ScopedLock l(lock);
- if (url.empty()) throw Exception("Invalid empty URL for HA client failover");
- clientUrl = url;
- updateClientUrl(l);
-}
-
-void HaBroker::updateClientUrl(Mutex::ScopedLock&) {
- Url url = clientUrl.empty() ? brokerUrl : clientUrl;
- if (url.empty()) throw Url::Invalid("HA client URL is empty");
+ publicUrl = url;
mgmtObject->set_publicUrl(url.str());
knownBrokers.clear();
knownBrokers.push_back(url);
- QPID_LOG(debug, logPrefix << "Setting client URL to: " << url);
+ QPID_LOG(debug, role->getLogPrefix() << "Setting public URL to: " << url);
}
void HaBroker::setBrokerUrl(const Url& url) {
- Mutex::ScopedLock l(lock);
- if (url.empty()) throw Url::Invalid("HA broker URL is empty");
- brokerUrl = url;
- mgmtObject->set_brokersUrl(brokerUrl.str());
- QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
- if (backup.get()) backup->setBrokerUrl(brokerUrl);
- // Updating broker URL also updates defaulted client URL:
- if (clientUrl.empty()) updateClientUrl(l);
+ {
+ Mutex::ScopedLock l(lock);
+ brokerUrl = url;
+ mgmtObject->set_brokersUrl(brokerUrl.str());
+ QPID_LOG(info, role->getLogPrefix() << "Brokers URL set to: " << url);
+ }
+ role->setBrokerUrl(url); // Oustside lock
}
std::vector<Url> HaBroker::getKnownBrokers() const {
@@ -239,116 +197,14 @@ std::vector<Url> HaBroker::getKnownBrokers() const {
return knownBrokers;
}
-void HaBroker::shutdown() {
- QPID_LOG(critical, logPrefix << "Critical error, shutting down.");
+void HaBroker::shutdown(const std::string& message) {
+ QPID_LOG(critical, message);
broker.shutdown();
+ throw Exception(message);
}
BrokerStatus HaBroker::getStatus() const {
- Mutex::ScopedLock l(lock);
- return status;
-}
-
-void HaBroker::setStatus(BrokerStatus newStatus) {
- Mutex::ScopedLock l(lock);
- setStatus(newStatus, l);
-}
-
-namespace {
-bool checkTransition(BrokerStatus from, BrokerStatus to) {
- // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
- static const BrokerStatus TRANSITIONS[][2] = {
- { JOINING, CATCHUP }, // Connected to primary
- { JOINING, RECOVERING }, // Chosen as initial primary.
- { CATCHUP, READY }, // Caught up all queues, ready to take over.
- { READY, RECOVERING }, // Chosen as new primary
- { READY, CATCHUP }, // Timed out failing over, demoted to catch-up.
- { RECOVERING, ACTIVE } // All expected backups are ready
- };
- static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
- for (size_t i = 0; i < N; ++i) {
- if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to)
- return true;
- }
- return false;
-}
-} // namespace
-
-void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) {
- QPID_LOG(info, logPrefix << "Status change: "
- << printable(status) << " -> " << printable(newStatus));
- bool legal = checkTransition(status, newStatus);
- assert(legal);
- if (!legal) {
- QPID_LOG(critical, logPrefix << "Illegal state transition: "
- << printable(status) << " -> " << printable(newStatus));
- shutdown();
- }
- status = newStatus;
- statusChanged(l);
-}
-
-void HaBroker::statusChanged(Mutex::ScopedLock& l) {
- mgmtObject->set_status(printable(status).str());
- brokerInfo.setStatus(status);
- setLinkProperties(l);
-}
-
-void HaBroker::membershipUpdated(Mutex::ScopedLock&) {
- QPID_LOG(info, logPrefix << "Membership changed: " << membership);
- Variant::List brokers = membership.asList();
- mgmtObject->set_members(brokers);
- broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
-}
-
-void HaBroker::setMembership(const Variant::List& brokers) {
- Mutex::ScopedLock l(lock);
- membership.assign(brokers);
- QPID_LOG(info, logPrefix << "Membership update: " << membership);
- BrokerInfo info;
- // Update my status to what the primary says it is. The primary can toggle
- // status between READY and CATCHUP based on the state of our subscriptions.
- if (membership.get(systemId, info) && status != info.getStatus()) {
- setStatus(info.getStatus(), l);
- if (backup.get()) backup->setStatus(status);
- }
- membershipUpdated(l);
-}
-
-void HaBroker::resetMembership(const BrokerInfo& b) {
- Mutex::ScopedLock l(lock);
- membership.reset(b);
- QPID_LOG(debug, logPrefix << "Membership reset to: " << membership);
- membershipUpdated(l);
-}
-
-void HaBroker::addBroker(const BrokerInfo& b) {
- Mutex::ScopedLock l(lock);
- membership.add(b);
- QPID_LOG(debug, logPrefix << "Membership add: " << b);
- membershipUpdated(l);
-}
-
-void HaBroker::removeBroker(const Uuid& id) {
- Mutex::ScopedLock l(lock);
- membership.remove(id);
- QPID_LOG(debug, logPrefix << "Membership remove: " << id);
- membershipUpdated(l);
-}
-
-void HaBroker::setLinkProperties(Mutex::ScopedLock&) {
- framing::FieldTable linkProperties = broker.getLinkClientProperties();
- if (isBackup(status)) {
- // If this is a backup then any outgoing links are backup
- // links and need to be tagged.
- linkProperties.setTable(ConnectionObserver::BACKUP_TAG, brokerInfo.asFieldTable());
- }
- else {
- // If this is a primary then any outgoing links are federation links
- // and should not be tagged.
- linkProperties.erase(ConnectionObserver::BACKUP_TAG);
- }
- broker.setLinkClientProperties(linkProperties);
+ return membership.getStatus();
}
}} // namespace qpid::ha