diff options
Diffstat (limited to 'cpp/src/qpid/ha/StatusCheck.cpp')
-rw-r--r-- | cpp/src/qpid/ha/StatusCheck.cpp | 132 |
1 files changed, 132 insertions, 0 deletions
diff --git a/cpp/src/qpid/ha/StatusCheck.cpp b/cpp/src/qpid/ha/StatusCheck.cpp new file mode 100644 index 0000000000..17613ce3dd --- /dev/null +++ b/cpp/src/qpid/ha/StatusCheck.cpp @@ -0,0 +1,132 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "StatusCheck.h" +#include "qpid/log/Statement.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Session.h" +#include "qpid/types/Variant.h" + +namespace qpid { +namespace ha { + +using namespace qpid::messaging; +using namespace qpid::types; +using namespace std; +using namespace sys; + +const string HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker"; + +class StatusCheckThread : public sys::Runnable { + public: + StatusCheckThread(StatusCheck& sc, const qpid::Address& addr, const BrokerInfo& self) + : url(addr), statusCheck(sc), brokerInfo(self) {} + void run(); + private: + Url url; + StatusCheck& statusCheck; + uint16_t linkHeartbeatInterval; + BrokerInfo brokerInfo; +}; + +void StatusCheckThread::run() { + QPID_LOG(debug, statusCheck.logPrefix << "Checking status of " << url); + Variant::Map options, clientProperties; + clientProperties = brokerInfo.asMap(); // Detect self connections. + clientProperties["qpid.ha-admin"] = 1; // Allow connection to backups. + + options["client-properties"] = clientProperties; + options["heartbeat"] = statusCheck.linkHeartbeatInterval; + Connection c(url.str(), options); + + try { + c.open(); + Session session = c.createSession(); + messaging::Address responses("#;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.replicate':none}}}}"); + Receiver r = session.createReceiver(responses); + Sender s = session.createSender("qmf.default.direct/broker"); + Message request; + request.setReplyTo(responses); + request.setContentType("amqp/map"); + request.setProperty("x-amqp-0-10.app-id", "qmf2"); + request.setProperty("qmf.opcode", "_query_request"); + Variant::Map oid; + oid["_object_name"] = HA_BROKER; + Variant::Map content; + content["_what"] = "OBJECT"; + content["_object_id"] = oid; + encode(content, request); + s.send(request); + Message response = r.fetch(statusCheck.linkHeartbeatInterval*Duration::SECOND); + session.acknowledge(); + Variant::List contentIn; + decode(response, contentIn); + if (contentIn.size() == 1) { + Variant::Map details = contentIn.front().asMap()["_values"].asMap(); + string status = details["status"].getString(); + if (status != "joining") { + statusCheck.setPromote(false); + QPID_LOG(info, statusCheck.logPrefix << "Status of " << url << " is " + << status << ", this broker will refuse promotion."); + } + QPID_LOG(debug, statusCheck.logPrefix << "Status of " << url << ": " << status); + } + } catch(const exception& error) { + QPID_LOG(info, "Checking status of " << url << ": " << error.what()); + } + delete this; +} + +StatusCheck::StatusCheck(const string& lp, uint16_t lh, const BrokerInfo& self) + : logPrefix(lp), promote(true), linkHeartbeatInterval(lh), brokerInfo(self) +{} + +StatusCheck::~StatusCheck() { + // Join any leftovers + for (size_t i = 0; i < threads.size(); ++i) threads[i].join(); +} + +void StatusCheck::setUrl(const Url& url) { + Mutex::ScopedLock l(lock); + for (size_t i = 0; i < url.size(); ++i) + threads.push_back(Thread(new StatusCheckThread(*this, url[i], brokerInfo))); +} + +bool StatusCheck::canPromote() { + Mutex::ScopedLock l(lock); + while (!threads.empty()) { + Thread t = threads.back(); + threads.pop_back(); + Mutex::ScopedUnlock u(lock); + t.join(); + } + return promote; +} + +void StatusCheck::setPromote(bool p) { + Mutex::ScopedLock l(lock); + promote = p; +} + +}} // namespace qpid::ha |