summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/HaBroker.cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
committerKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
commitd43d1912b376322e27fdcda551a73f9ff5487972 (patch)
treece493e10baa95f44be8beb5778ce51783463196d /cpp/src/qpid/ha/HaBroker.cpp
parent04877fec0c6346edec67072d7f2d247740cf2af5 (diff)
downloadqpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/HaBroker.cpp')
-rw-r--r--cpp/src/qpid/ha/HaBroker.cpp300
1 files changed, 235 insertions, 65 deletions
diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp
index 7d82fb63bd..d126639813 100644
--- a/cpp/src/qpid/ha/HaBroker.cpp
+++ b/cpp/src/qpid/ha/HaBroker.cpp
@@ -19,22 +19,31 @@
*
*/
#include "Backup.h"
-#include "ConnectionExcluder.h"
+#include "BackupConnectionExcluder.h"
+#include "ConnectionObserver.h"
#include "HaBroker.h"
-#include "Settings.h"
+#include "Primary.h"
+#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
+#include "Settings.h"
+#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/SignalHandler.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/management/ManagementAgent.h"
+#include "qpid/sys/SystemInfo.h"
+#include "qpid/types/Uuid.h"
+#include "qpid/framing/Uuid.h"
#include "qmf/org/apache/qpid/ha/Package.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
-#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokers.h"
-#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicBrokers.h"
-#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetExpectedBackups.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokersUrl.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicUrl.h"
+#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace ha {
@@ -42,91 +51,149 @@ namespace ha {
namespace _qmf = ::qmf::org::apache::qpid::ha;
using namespace management;
using namespace std;
+using types::Variant;
+using types::Uuid;
+using sys::Mutex;
-namespace {
-
-const std::string STANDALONE="standalone";
-const std::string CATCH_UP="catch-up";
-const std::string BACKUP="backup";
-const std::string PRIMARY="primary";
-
-} // namespace
-
-
+// Called in Plugin::earlyInitialize
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
- : broker(b),
+ : logPrefix("Broker: "),
+ broker(b),
+ systemId(broker.getSystem()->getSystemId().data()),
settings(s),
- mgmtObject(0)
+ observer(new ConnectionObserver(*this, systemId)),
+ mgmtObject(0),
+ status(STANDALONE),
+ membership(systemId),
+ replicationTest(s.replicateDefault.get())
{
- // Register a factory for replicating subscriptions.
- broker.getConsumerFactories().add(
- boost::shared_ptr<ReplicatingSubscription::Factory>(
- new ReplicatingSubscription::Factory()));
+ // If we are joining a cluster we must start excluding clients now,
+ // otherwise there's a window for a client to connect before we get to
+ // initialize()
+ if (settings.cluster) {
+ QPID_LOG(debug, logPrefix << "Rejecting client connections.");
+ observer->setObserver(boost::shared_ptr<broker::ConnectionObserver>(
+ new BackupConnectionExcluder));
+ broker.getConnectionObservers().add(observer);
+ }
+}
- broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
+// Called in Plugin::initialize
+void HaBroker::initialize() {
+ // FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port.
+ brokerInfo = BrokerInfo(
+ broker.getSystem()->getNodeName(), broker.getPort(broker::Broker::TCP_TRANSPORT), systemId);
+
+ // Set up the management object.
ManagementAgent* ma = broker.getManagementAgent();
- if (!ma)
+ if (settings.cluster && !ma)
throw Exception("Cannot start HA: management is disabled");
_qmf::Package packageInit(ma);
mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker");
- mgmtObject->set_status(settings.cluster ? BACKUP : STANDALONE);
- mgmtObject->set_replicateDefault(str(settings.replicateDefault));
+ mgmtObject->set_replicateDefault(settings.replicateDefault.str());
+ mgmtObject->set_systemId(systemId);
ma->addObject(mgmtObject);
- // NOTE: lock is not needed in a constructor but we created it just to pass
- // to the set functions.
- sys::Mutex::ScopedLock l(lock);
- if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l);
- if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l);
+ // Register a factory for replicating subscriptions.
+ broker.getConsumerFactories().add(
+ boost::shared_ptr<ReplicatingSubscription::Factory>(
+ new ReplicatingSubscription::Factory()));
+
+ // If we are in a cluster, start as backup in joining state.
+ if (settings.cluster) {
+ status = JOINING;
+ backup.reset(new Backup(*this, settings));
+ broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
+ }
+
+ if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl));
+ if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl));
+
+
+ QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
- // If we are in a cluster, we start in backup mode.
- if (settings.cluster) backup.reset(new Backup(*this, s));
+ // NOTE: lock is not needed in a constructor, but create one
+ // to pass to functions that have a ScopedLock parameter.
+ Mutex::ScopedLock l(lock);
+ statusChanged(l);
}
-HaBroker::~HaBroker() {}
+HaBroker::~HaBroker() {
+ QPID_LOG(notice, logPrefix << "Shut down: " << brokerInfo);
+ broker.getConnectionObservers().remove(observer);
+}
+
+void HaBroker::recover() {
+ auto_ptr<Backup> b;
+ {
+ Mutex::ScopedLock l(lock);
+ // No longer replicating, close link. Note: link must be closed before we
+ // setStatus(RECOVERING) as that will remove our broker info from the
+ // outgoing link properties so we won't recognize self-connects.
+ b = backup;
+ }
+ b.reset(); // Call destructor outside of lock.
+ BrokerInfo::Set backups;
+ {
+ Mutex::ScopedLock l(lock);
+ setStatus(RECOVERING, l);
+ backups = membership.otherBackups();
+ membership.reset(brokerInfo);
+ // Drop the lock, new Primary may call back on activate.
+ }
+ // Outside of lock, may call back on activate()
+ primary.reset(new Primary(*this, backups)); // Starts primary-ready check.
+}
+
+// Called back from Primary active check.
+void HaBroker::activate() { setStatus(ACTIVE); }
Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) {
- sys::Mutex::ScopedLock l(lock);
switch (methodId) {
case _qmf::HaBroker::METHOD_PROMOTE: {
- if (backup.get()) { // I am a backup
- // NOTE: resetting backup allows client connections, so any
- // primary state should be set up here before backup.reset()
- backup.reset();
- QPID_LOG(notice, "HA: Promoted to primary");
- mgmtObject->set_status(PRIMARY);
+ switch (getStatus()) {
+ case JOINING: recover(); break;
+ case CATCHUP:
+ QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
+ throw Exception("Still catching up, cannot be promoted.");
+ break;
+ case READY: recover(); break;
+ case RECOVERING: break;
+ case ACTIVE: break;
+ case STANDALONE: break;
}
break;
}
- case _qmf::HaBroker::METHOD_SETBROKERS: {
- setBrokerUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokers&>(args).i_url), l);
+ case _qmf::HaBroker::METHOD_SETBROKERSURL: {
+ setBrokerUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokersUrl&>(args).i_url));
break;
}
- case _qmf::HaBroker::METHOD_SETPUBLICBROKERS: {
- setClientUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicBrokers&>(args).i_url), l);
+ case _qmf::HaBroker::METHOD_SETPUBLICURL: {
+ setClientUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicUrl&>(args).i_url));
break;
}
- case _qmf::HaBroker::METHOD_SETEXPECTEDBACKUPS: {
- setExpectedBackups(dynamic_cast<_qmf::ArgsHaBrokerSetExpectedBackups&>(args).i_expectedBackups, l);
- break;
- }
case _qmf::HaBroker::METHOD_REPLICATE: {
_qmf::ArgsHaBrokerReplicate& bq_args =
dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
- QPID_LOG(debug, "HA replicating individual queue "<< bq_args.i_queue << " from " << bq_args.i_broker);
+ QPID_LOG(debug, logPrefix << "Replicate individual queue "
+ << bq_args.i_queue << " from " << bq_args.i_broker);
boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
Url url(bq_args.i_broker);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+ Uuid uuid(true);
std::pair<broker::Link::shared_ptr, bool> result = broker.getLinks().declare(
+ broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
url[0].host, url[0].port, protocol,
false, // durable
- settings.mechanism, settings.username, settings.password);
+ settings.mechanism, settings.username, settings.password,
+ false); // no amq.failover - don't want to use client URL.
boost::shared_ptr<broker::Link> link = result.first;
link->setUrl(url);
// Create a queue replicator
- boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
+ boost::shared_ptr<QueueReplicator> qr(
+ new QueueReplicator(*this, queue, link));
qr->activate();
broker.getExchanges().registerExchange(qr);
break;
@@ -138,43 +205,146 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
return Manageable::STATUS_OK;
}
-void HaBroker::setClientUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
+void HaBroker::setClientUrl(const Url& url) {
+ Mutex::ScopedLock l(lock);
if (url.empty()) throw Exception("Invalid empty URL for HA client failover");
clientUrl = url;
updateClientUrl(l);
}
-void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) {
+void HaBroker::updateClientUrl(Mutex::ScopedLock&) {
Url url = clientUrl.empty() ? brokerUrl : clientUrl;
if (url.empty()) throw Url::Invalid("HA client URL is empty");
- mgmtObject->set_publicBrokers(url.str());
+ mgmtObject->set_publicUrl(url.str());
knownBrokers.clear();
knownBrokers.push_back(url);
- QPID_LOG(debug, "HA: Setting client URL to: " << url);
+ QPID_LOG(debug, logPrefix << "Setting client URL to: " << url);
}
-void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
+void HaBroker::setBrokerUrl(const Url& url) {
+ Mutex::ScopedLock l(lock);
if (url.empty()) throw Url::Invalid("HA broker URL is empty");
- QPID_LOG(debug, "HA: Setting broker URL to: " << url);
brokerUrl = url;
- mgmtObject->set_brokers(brokerUrl.str());
+ mgmtObject->set_brokersUrl(brokerUrl.str());
if (backup.get()) backup->setBrokerUrl(brokerUrl);
// Updating broker URL also updates defaulted client URL:
if (clientUrl.empty()) updateClientUrl(l);
}
-void HaBroker::setExpectedBackups(size_t n, const sys::Mutex::ScopedLock&) {
- expectedBackups = n;
- mgmtObject->set_expectedBackups(n);
-}
-
std::vector<Url> HaBroker::getKnownBrokers() const {
+ Mutex::ScopedLock l(lock);
return knownBrokers;
}
-void HaBroker::shutdown(const std::string& message) {
- QPID_LOG(critical, "Shutting down: " << message);
+void HaBroker::shutdown() {
+ QPID_LOG(critical, logPrefix << "Critical error, shutting down.");
broker.shutdown();
}
+BrokerStatus HaBroker::getStatus() const {
+ Mutex::ScopedLock l(lock);
+ return status;
+}
+
+void HaBroker::setStatus(BrokerStatus newStatus) {
+ Mutex::ScopedLock l(lock);
+ setStatus(newStatus, l);
+}
+
+namespace {
+bool checkTransition(BrokerStatus from, BrokerStatus to) {
+ // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
+ static const BrokerStatus TRANSITIONS[][2] = {
+ { JOINING, CATCHUP }, // Connected to primary
+ { JOINING, RECOVERING }, // Chosen as initial primary.
+ { CATCHUP, READY }, // Caught up all queues, ready to take over.
+ { READY, RECOVERING }, // Chosen as new primary
+ { READY, CATCHUP }, // Timed out failing over, demoted to catch-up.
+ { RECOVERING, ACTIVE } // All expected backups are ready
+ };
+ static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
+ for (size_t i = 0; i < N; ++i) {
+ if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to)
+ return true;
+ }
+ return false;
+}
+} // namespace
+
+void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) {
+ QPID_LOG(info, logPrefix << "Status change: "
+ << printable(status) << " -> " << printable(newStatus));
+ bool legal = checkTransition(status, newStatus);
+ assert(legal);
+ if (!legal) {
+ QPID_LOG(critical, logPrefix << "Illegal state transition: "
+ << printable(status) << " -> " << printable(newStatus));
+ shutdown();
+ }
+ status = newStatus;
+ statusChanged(l);
+}
+
+void HaBroker::statusChanged(Mutex::ScopedLock& l) {
+ mgmtObject->set_status(printable(status).str());
+ brokerInfo.setStatus(status);
+ setLinkProperties(l);
+}
+
+void HaBroker::membershipUpdated(Mutex::ScopedLock&) {
+ Variant::List brokers = membership.asList();
+ mgmtObject->set_members(brokers);
+ broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
+}
+
+void HaBroker::setMembership(const Variant::List& brokers) {
+ Mutex::ScopedLock l(lock);
+ membership.assign(brokers);
+ QPID_LOG(info, logPrefix << "Membership update: " << membership);
+ BrokerInfo info;
+ // Update my status to what the primary says it is. The primary can toggle
+ // status between READY and CATCHUP based on the state of our subscriptions.
+ if (membership.get(systemId, info) && status != info.getStatus()) {
+ setStatus(info.getStatus(), l);
+ if (backup.get()) backup->setStatus(status);
+ }
+ membershipUpdated(l);
+}
+
+void HaBroker::resetMembership(const BrokerInfo& b) {
+ Mutex::ScopedLock l(lock);
+ membership.reset(b);
+ QPID_LOG(debug, logPrefix << "Membership reset to: " << membership);
+ membershipUpdated(l);
+}
+
+void HaBroker::addBroker(const BrokerInfo& b) {
+ Mutex::ScopedLock l(lock);
+ membership.add(b);
+ QPID_LOG(debug, logPrefix << "Membership add: " << b << " now: " << membership);
+ membershipUpdated(l);
+}
+
+void HaBroker::removeBroker(const Uuid& id) {
+ Mutex::ScopedLock l(lock);
+ membership.remove(id);
+ QPID_LOG(debug, logPrefix << "Membership remove: " << id << " now: " << membership);
+ membershipUpdated(l);
+}
+
+void HaBroker::setLinkProperties(Mutex::ScopedLock&) {
+ framing::FieldTable linkProperties = broker.getLinkClientProperties();
+ if (isBackup(status)) {
+ // If this is a backup then any outgoing links are backup
+ // links and need to be tagged.
+ linkProperties.setTable(ConnectionObserver::BACKUP_TAG, brokerInfo.asFieldTable());
+ }
+ else {
+ // If this is a primary then any outgoing links are federation links
+ // and should not be tagged.
+ linkProperties.erase(ConnectionObserver::BACKUP_TAG);
+ }
+ broker.setLinkClientProperties(linkProperties);
+}
+
}} // namespace qpid::ha