summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/HaBroker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/HaBroker.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp240
1 files changed, 240 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
new file mode 100644
index 0000000000..7699b0e1d2
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -0,0 +1,240 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Backup.h"
+#include "BackupConnectionExcluder.h"
+#include "ConnectionObserver.h"
+#include "HaBroker.h"
+#include "IdSetter.h"
+#include "Primary.h"
+#include "QueueReplicator.h"
+#include "ReplicatingSubscription.h"
+#include "Settings.h"
+#include "StandAlone.h"
+#include "QueueSnapshot.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/assert.h"
+#include "qpid/Exception.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/BrokerObserver.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/SignalHandler.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/sys/SystemInfo.h"
+#include "qpid/types/Uuid.h"
+#include "qpid/framing/Uuid.h"
+#include "qmf/org/apache/qpid/ha/Package.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokersUrl.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicUrl.h"
+#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace ha {
+
+namespace _qmf = ::qmf::org::apache::qpid::ha;
+using namespace management;
+using namespace std;
+using types::Variant;
+using types::Uuid;
+using sys::Mutex;
+using boost::shared_ptr;
+using boost::dynamic_pointer_cast;
+
+// In a HaBroker we always need to add QueueSnapshot and IdSetter to each queue
+// because we don't know in advance which queues might be used for stand-alone
+// replication.
+//
+// TODO aconway 2013-12-13: Can we restrict this to queues identified as replicated?
+//
+class HaBroker::BrokerObserver : public broker::BrokerObserver {
+ public:
+ BrokerObserver(const LogPrefix& lp) : logPrefix(lp) {}
+
+ void queueCreate(const boost::shared_ptr<broker::Queue>& q) {
+ q->getObservers().add(boost::shared_ptr<QueueSnapshot>(new QueueSnapshot));
+ q->getMessageInterceptors().add(
+ boost::shared_ptr<IdSetter>(new IdSetter(logPrefix, q->getName())));
+ }
+
+ private:
+ const LogPrefix& logPrefix;
+};
+
+// Called in Plugin::earlyInitialize
+HaBroker::HaBroker(broker::Broker& b, const Settings& s)
+ : systemId(b.getSystem()->getSystemId().data()),
+ settings(s),
+ userId(s.username+"@"+b.getRealm()),
+ broker(b),
+ observer(new ConnectionObserver(*this, systemId)),
+ role(new StandAlone),
+ membership(BrokerInfo(systemId, STANDALONE), *this), // Sets logPrefix
+ failoverExchange(new FailoverExchange(*b.GetVhostObject(), b))
+{
+ // If we are joining a cluster we must start excluding clients now,
+ // otherwise there's a window for a client to connect before we get to
+ // initialize()
+ if (settings.cluster) {
+ shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder(logPrefix));
+ observer->setObserver(excluder);
+ broker.getConnectionObservers().add(observer);
+ broker.getExchanges().registerExchange(failoverExchange);
+ }
+ broker.getBrokerObservers().add(boost::shared_ptr<BrokerObserver>(new BrokerObserver(logPrefix)));
+}
+
+namespace {
+const std::string NONE("none");
+bool isNone(const std::string& x) { return x.empty() || x == NONE; }
+}
+
+// Called in Plugin::initialize
+void HaBroker::initialize() {
+ if (settings.cluster) {
+ QPID_LOG(info, logPrefix << "Starting HA broker");
+ membership.setStatus(JOINING);
+ }
+
+ // Set up the management object.
+ ManagementAgent* ma = broker.getManagementAgent();
+ if (settings.cluster && !ma)
+ throw Exception("Cannot start HA: management is disabled");
+ _qmf::Package packageInit(ma);
+ mgmtObject = _qmf::HaBroker::shared_ptr(new _qmf::HaBroker(ma, this, "ha-broker"));
+ mgmtObject->set_replicateDefault(settings.replicateDefault.str());
+ mgmtObject->set_systemId(systemId);
+ ma->addObject(mgmtObject);
+ membership.setMgmtObject(mgmtObject);
+
+ // Register a factory for replicating subscriptions.
+ broker.getConsumerFactories().add(
+ shared_ptr<ReplicatingSubscription::Factory>(
+ new ReplicatingSubscription::Factory(*this)));
+
+ // If we are in a cluster, start as backup in joining state.
+ if (settings.cluster) {
+ assert(membership.getStatus() == JOINING);
+ role.reset(new Backup(*this, settings));
+ broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
+ if (!isNone(settings.publicUrl)) setPublicUrl(Url(settings.publicUrl));
+ if (!isNone(settings.brokerUrl)) setBrokerUrl(Url(settings.brokerUrl));
+ }
+}
+
+HaBroker::~HaBroker() {
+ broker.getConnectionObservers().remove(observer);
+}
+
+Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) {
+ switch (methodId) {
+ case _qmf::HaBroker::METHOD_PROMOTE: {
+ Role* r = role->promote();
+ if (r) role.reset(r);
+ break;
+ }
+ case _qmf::HaBroker::METHOD_SETBROKERSURL: {
+ setBrokerUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokersUrl&>(args).i_url));
+ break;
+ }
+ case _qmf::HaBroker::METHOD_SETPUBLICURL: {
+ setPublicUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicUrl&>(args).i_url));
+ break;
+ }
+ case _qmf::HaBroker::METHOD_REPLICATE: {
+ _qmf::ArgsHaBrokerReplicate& bq_args =
+ dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
+ QPID_LOG(debug, logPrefix << "Replicate individual queue "
+ << bq_args.i_queue << " from " << bq_args.i_broker);
+
+ shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
+ Url url(bq_args.i_broker);
+ string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+ Uuid uuid(true);
+ std::pair<broker::Link::shared_ptr, bool> result = broker.getLinks().declare(
+ broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
+ url[0].host, url[0].port, protocol,
+ false, // durable
+ settings.mechanism, settings.username, settings.password,
+ false); // no amq.failover - don't want to use client URL.
+ shared_ptr<broker::Link> link = result.first;
+ link->setUrl(url);
+ // Create a queue replicator
+ shared_ptr<QueueReplicator> qr(QueueReplicator::create(*this, queue, link));
+ broker.getExchanges().registerExchange(qr);
+ break;
+ }
+
+ default:
+ return Manageable::STATUS_UNKNOWN_METHOD;
+ }
+ return Manageable::STATUS_OK;
+}
+
+void HaBroker::setPublicUrl(const Url& url) {
+ Mutex::ScopedLock l(lock);
+ publicUrl = url;
+ mgmtObject->set_publicUrl(url.str());
+ knownBrokers.clear();
+ knownBrokers.push_back(url);
+ vector<Url> urls(1, url);
+ failoverExchange->updateUrls(urls);
+ QPID_LOG(debug, logPrefix << "Public URL set to: " << url);
+}
+
+void HaBroker::setBrokerUrl(const Url& url) {
+ {
+ Mutex::ScopedLock l(lock);
+ brokerUrl = url;
+ mgmtObject->set_brokersUrl(brokerUrl.str());
+ QPID_LOG(info, logPrefix << "Brokers URL set to: " << url);
+ }
+ role->setBrokerUrl(url); // Oustside lock
+}
+
+std::vector<Url> HaBroker::getKnownBrokers() const {
+ Mutex::ScopedLock l(lock);
+ return knownBrokers;
+}
+
+void HaBroker::shutdown(const std::string& message) {
+ QPID_LOG(critical, logPrefix << "Shutting down: " << message);
+ broker.shutdown();
+ throw Exception(message);
+}
+
+BrokerStatus HaBroker::getStatus() const {
+ return membership.getStatus();
+}
+
+void HaBroker::setAddress(const Address& a) {
+ QPID_LOG(info, logPrefix << "Set self address to: " << a);
+ membership.setSelfAddress(a);
+}
+
+boost::shared_ptr<QueueReplicator> HaBroker::findQueueReplicator(const std::string& queueName) {
+ return boost::dynamic_pointer_cast<QueueReplicator>(
+ broker.getExchanges().find(QueueReplicator::replicatorName(queueName)));
+}
+
+}} // namespace qpid::ha