summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/Primary.cpp
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2016-07-05 21:55:35 +0000
committerRobert Gemmell <robbie@apache.org>2016-07-05 21:55:35 +0000
commitf160cb6566c17945f7ebc4f3a752b2cc6a051685 (patch)
tree809f04fc1967c22e5abc52de07602555bed0e920 /qpid/cpp/src/qpid/ha/Primary.cpp
parentebb276cca41582b73223b55eff9f2d4386f4f746 (diff)
downloadqpid-python-f160cb6566c17945f7ebc4f3a752b2cc6a051685.tar.gz
QPID-7207: remove cpp and python subdirs from svn trunk, they have migrated to their own git repositories
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1751566 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/Primary.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp439
1 files changed, 0 insertions, 439 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
deleted file mode 100644
index 3a9c9a86d9..0000000000
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ /dev/null
@@ -1,439 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "Backup.h"
-#include "HaBroker.h"
-#include "Primary.h"
-#include "ReplicationTest.h"
-#include "IdSetter.h"
-#include "ReplicatingSubscription.h"
-#include "RemoteBackup.h"
-#include "ConnectionObserver.h"
-#include "QueueReplicator.h"
-#include "qpid/assert.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/BrokerObserver.h"
-#include "qpid/broker/Connection.h"
-#include "qpid/broker/amqp_0_10/Connection.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/SessionHandlerObserver.h"
-#include "qpid/framing/FieldTable.h"
-#include "qpid/framing/FieldValue.h"
-#include "qpid/log/Statement.h"
-#include "qpid/types/Uuid.h"
-#include "qpid/types/Variant.h"
-#include "qpid/sys/Timer.h"
-#include <boost/bind.hpp>
-#include <boost/shared_ptr.hpp>
-#include <boost/intrusive_ptr.hpp>
-
-namespace qpid {
-namespace ha {
-
-using sys::Mutex;
-using boost::shared_ptr;
-using boost::intrusive_ptr;
-using namespace std;
-using namespace framing;
-
-namespace {
-
-const std::string CLIENT_PROCESS_NAME("qpid.client_process");
-const std::string CLIENT_PID("qpid.client_pid");
-const std::string CLIENT_PPID("qpid.client_ppid");
-
-class PrimaryConnectionObserver : public broker::ConnectionObserver
-{
- public:
- PrimaryConnectionObserver(Primary& p) : primary(p) {}
- void opened(broker::Connection& c) { primary.opened(c); }
- void closed(broker::Connection& c) { primary.closed(c); }
- private:
- Primary& primary;
-};
-
-class PrimaryBrokerObserver : public broker::BrokerObserver
-{
- public:
- PrimaryBrokerObserver(Primary& p) : primary(p) {}
- void queueCreate(const Primary::QueuePtr& q) { primary.queueCreate(q); }
- void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); }
- void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); }
- void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q); }
-
- private:
- Primary& primary;
-};
-
-class ExpectedBackupTimerTask : public sys::TimerTask {
- public:
- ExpectedBackupTimerTask(Primary& p, sys::AbsTime deadline)
- : TimerTask(deadline, "ExpectedBackupTimerTask"), primary(p) {}
- void fire() { primary.timeoutExpectedBackups(); }
- private:
- Primary& primary;
-};
-
-class PrimaryErrorListener : public broker::SessionHandler::ErrorListener {
- public:
- PrimaryErrorListener(const LogPrefix& lp) : logPrefix(lp) {}
-
- void connectionException(framing::connection::CloseCode code, const std::string& msg) {
- QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what());
- }
- void channelException(framing::session::DetachCode code, const std::string& msg) {
- QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what());
- }
- void executionException(framing::execution::ErrorCode code, const std::string& msg) {
- QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what());
- }
- void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
- QPID_LOG(debug, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
- }
- void detach() {}
-
- private:
- const LogPrefix& logPrefix;
-};
-
-class PrimarySessionHandlerObserver : public broker::SessionHandlerObserver {
- public:
- PrimarySessionHandlerObserver(const LogPrefix& logPrefix)
- : errorListener(new PrimaryErrorListener(logPrefix)) {}
- void newSessionHandler(broker::SessionHandler& sh) {
- BrokerInfo info;
- // Suppress error logging for backup connections
- // TODO aconway 2014-01-31: Be more selective, suppress only expected errors?
- if (ha::ConnectionObserver::getBrokerInfo(sh.getConnection(), info)) {
- sh.setErrorListener(errorListener);
- }
- }
- private:
- boost::shared_ptr<PrimaryErrorListener> errorListener;
-};
-
-
-} // namespace
-
-Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
- haBroker(hb), membership(hb.getMembership()),
- logPrefix(hb.logPrefix), active(false),
- replicationTest(hb.getSettings().replicateDefault.get()),
- sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)),
- queueLimits(logPrefix, hb.getBroker().getQueues(), replicationTest)
-{
- // Note that at this point, we are still rejecting client connections.
- // So we are safe from client interference while we set up the primary.
-
- hb.getMembership().setStatus(RECOVERING);
- QPID_LOG(notice, logPrefix << "Promoted to primary");
-
- // Process all QueueReplicators, handles auto-delete queues.
- QueueReplicator::Vector qrs;
- QueueReplicator::copy(hb.getBroker().getExchanges(), qrs);
- std::for_each(qrs.begin(), qrs.end(), boost::bind(&QueueReplicator::promoted, _1));
-
- if (!expect.empty()) {
- // NOTE: RemoteBackups must be created before we set the BrokerObserver
- // or ConnectionObserver so that there is no client activity while
- // the QueueGuards are created.
- QPID_LOG(notice, logPrefix << "Recovering backups: " << expect);
- for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) {
- boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0, haBroker.logPrefix));
- backups[i->getSystemId()] = backup;
- if (!backup->isReady()) expectedBackups.insert(backup);
- setCatchupQueues(backup, true); // Create guards
- }
- // Set timeout for expected brokers to connect and become ready.
- sys::AbsTime deadline(sys::now(), hb.getSettings().backupTimeout);
- timerTask = new ExpectedBackupTimerTask(*this, deadline);
- hb.getBroker().getTimer().add(timerTask);
- }
- brokerObserver.reset(new PrimaryBrokerObserver(*this));
- haBroker.getBroker().getBrokerObservers().add(brokerObserver);
- haBroker.getBroker().getSessionHandlerObservers().add(sessionHandlerObserver);
-
- checkReady(); // Outside lock
-
- // Allow client connections
- connectionObserver.reset(new PrimaryConnectionObserver(*this));
- haBroker.getObserver()->setObserver(connectionObserver);
-}
-
-Primary::~Primary() {
- if (timerTask) timerTask->cancel();
- haBroker.getBroker().getBrokerObservers().remove(brokerObserver);
- haBroker.getBroker().getSessionHandlerObservers().remove(sessionHandlerObserver);
- haBroker.getObserver()->reset();
-}
-
-void Primary::checkReady() {
- bool activate = false;
- {
- Mutex::ScopedLock l(lock);
- if (!active && expectedBackups.empty())
- activate = active = true;
- }
- if (activate) {
- membership.setStatus(ACTIVE); // Outside of lock.
- QPID_LOG(notice, logPrefix << "All backups recovered.");
- }
-}
-
-void Primary::checkReady(boost::shared_ptr<RemoteBackup> backup) {
- bool ready = false;
- {
- Mutex::ScopedLock l(lock);
- if (backup->reportReady()) {
- BrokerInfo info = backup->getBrokerInfo();
- info.setStatus(READY);
- membership.add(info);
- if (expectedBackups.erase(backup)) {
- QPID_LOG(info, logPrefix << "Recovering backup is ready: " << info);
- ready = true;
- }
- else
- QPID_LOG(info, logPrefix << "New backup is ready: " << info);
- }
- }
- if (ready) checkReady(); // Outside lock
-}
-
-void Primary::timeoutExpectedBackups() {
- try {
- sys::Mutex::ScopedLock l(lock);
- if (active) return; // Already activated
- // Remove records for any expectedBackups that are not yet connected
- // Allow backups that are connected to continue becoming ready.
- for (BackupSet::iterator i = expectedBackups.begin(); i != expectedBackups.end();)
- {
- // This loop erases elements of backups in backupDisconnect, so
- // save and increment the iterator.
- BackupSet::iterator j = i++;
- boost::shared_ptr<RemoteBackup> backup = *j;
- if (!backup->getConnection()) {
- BrokerInfo info = backup->getBrokerInfo();
- QPID_LOG(error, logPrefix << "Recovering backup timed out: " << info);
- backupDisconnect(backup, l); // Calls erase(j)
- // Keep broker in membership but downgrade status to CATCHUP.
- // The broker will get this status change when it eventually connects.
- info.setStatus(CATCHUP);
- membership.add(info);
- }
- }
- }
- catch(const std::exception& e) {
- QPID_LOG(error, logPrefix << "Error timing out backups: " << e.what());
- // No-where for this exception to go.
- }
- checkReady();
-}
-
-void Primary::readyReplica(const ReplicatingSubscription& rs) {
- shared_ptr<RemoteBackup> backup;
- {
- sys::Mutex::ScopedLock l(lock);
- BackupMap::iterator i = backups.find(rs.getBrokerInfo().getSystemId());
- if (i != backups.end()) {
- backup = i->second;
- backup->ready(rs.getQueue());
- }
- }
- if (backup) checkReady(backup);
-}
-
-// NOTE: Called with queue registry lock held.
-void Primary::queueCreate(const QueuePtr& q) {
- // Set replication argument.
- ReplicateLevel level = replicationTest.useLevel(*q);
- q->addArgument(QPID_REPLICATE, printable(level).str());
- if (level) {
- QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
- << " replication: " << printable(level));
- // Give each queue a unique id. Used by backups to avoid confusion of
- // same-named queues.
- q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true)));
- {
- Mutex::ScopedLock l(lock);
- queueLimits.addQueue(q); // Throws if limit exceeded
- for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
- i->second->queueCreate(q);
- }
- checkReady(); // Outside lock
- }
-}
-
-// NOTE: Called with queue registry lock held.
-void Primary::queueDestroy(const QueuePtr& q) {
- if (replicationTest.useLevel(*q)) {
- QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName());
- {
- Mutex::ScopedLock l(lock);
- queueLimits.removeQueue(q);
- for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
- i->second->queueDestroy(q);
- }
- checkReady(); // Outside lock
- }
-}
-
-// NOTE: Called with exchange registry lock held.
-void Primary::exchangeCreate(const ExchangePtr& ex) {
- ReplicateLevel level = replicationTest.useLevel(*ex);
- FieldTable args = ex->getArgs();
- args.setString(QPID_REPLICATE, printable(level).str()); // Set replication arg.
- if (level) {
- QPID_LOG(debug, logPrefix << "Created exchange " << ex->getName()
- << " replication: " << printable(level));
- // Give each exchange a unique id to avoid confusion of same-named exchanges.
- args.set(QPID_HA_UUID, FieldTable::ValuePtr(new UuidValue(Uuid(true).data())));
- }
- ex->setArgs(args);
-}
-
-// NOTE: Called with exchange registry lock held.
-void Primary::exchangeDestroy(const ExchangePtr& ex) {
- if (replicationTest.useLevel(*ex)) {
- QPID_LOG(debug, logPrefix << "Destroyed exchange " << ex->getName());
- // Do nothing
- }
- }
-
-// New backup connected
-shared_ptr<RemoteBackup> Primary::backupConnect(
- const BrokerInfo& info, broker::Connection& connection, Mutex::ScopedLock&)
-{
- shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection, haBroker.logPrefix));
- queueLimits.addBackup(backup);
- backups[info.getSystemId()] = backup;
- return backup;
-}
-
-// Remove a backup. Caller should not release the shared pointer returend till
-// outside the lock.
-void Primary::backupDisconnect(shared_ptr<RemoteBackup> backup, Mutex::ScopedLock&) {
- queueLimits.addBackup(backup);
- types::Uuid id = backup->getBrokerInfo().getSystemId();
- backup->cancel();
- expectedBackups.erase(backup);
- backups.erase(id);
- membership.remove(id);
-}
-
-
-void Primary::opened(broker::Connection& connection) {
- BrokerInfo info;
- shared_ptr<RemoteBackup> backup;
- if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
- Mutex::ScopedLock l(lock);
- BackupMap::iterator i = backups.find(info.getSystemId());
- if (info.getStatus() == JOINING) {
- info.setStatus(CATCHUP);
- membership.add(info);
- }
- if (i == backups.end()) {
- if (info.getStatus() == JOINING) {
- info.setStatus(CATCHUP);
- membership.add(info);
- }
- QPID_LOG(info, logPrefix << "New backup connection: " << info);
- backup = backupConnect(info, connection, l);
- }
- else if (i->second->getConnection()) {
- // The backup is failing over before we recieved the closed() call
- // for its previous connection. Remove the old entry and create a new one.
- QPID_LOG(error, logPrefix << "Known backup reconnect before disconnection: " << info);
- backupDisconnect(i->second, l);
- backup = backupConnect(info, connection, l);
- } else {
- QPID_LOG(info, logPrefix << "Known backup reconnection: " << info);
- i->second->setConnection(&connection);
- backup = i->second;
- }
- }
- else {
- const types::Variant::Map& properties = connection.getClientProperties();
- std::ostringstream pinfo;
- types::Variant::Map::const_iterator i = properties.find(CLIENT_PROCESS_NAME);
- // FIXME aconway 2014-08-13: Conditional on logging.
- if (i != properties.end()) {
- pinfo << " " << i->second;
- i = properties.find(CLIENT_PID);
- if (i != properties.end())
- pinfo << "(" << i->second << ")";
- }
- QPID_LOG(info, logPrefix << "Accepted client connection " << connection.getMgmtId() << pinfo.str());
- }
-
- // Outside lock
- if (backup) {
- setCatchupQueues(backup, false);
- checkReady(backup);
- }
- checkReady();
-}
-
-void Primary::closed(broker::Connection& connection) {
- BrokerInfo info;
- shared_ptr<RemoteBackup> backup;
- if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
- Mutex::ScopedLock l(lock);
- BackupMap::iterator i = backups.find(info.getSystemId());
- // NOTE: It is possible for a backup connection to be rejected while we
- // are a backup, but closed() is called after we have become primary.
- // Checking isConnected() lets us ignore such spurious closes.
- if (i == backups.end()) {
- QPID_LOG(info, logPrefix << "Disconnect from unknown backup " << info);
- }
- else if (i->second->getConnection() != &connection) {
- QPID_LOG(info, logPrefix << "Late disconnect from backup " << info);
- }
- else {
- QPID_LOG(info, logPrefix << "Disconnect from "
- << (i->second->getConnection() ? "" : "disconnected ")
- << "backup " << info);
- // Assign to shared_ptr so it will be deleted after we release the lock.
- backup = i->second;
- backupDisconnect(backup, l);
- }
- }
- checkReady();
-}
-
-boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerInfo& info)
-{
- Mutex::ScopedLock l(lock);
- BackupMap::iterator i = backups.find(info.getSystemId());
- return i == backups.end() ? boost::shared_ptr<QueueGuard>() : i->second->guard(q);
-}
-
-Role* Primary::promote() {
- QPID_LOG(info, logPrefix << "Ignoring promotion, already primary");
- return 0;
-}
-
-void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards) {
- // Do queue iteration outside the lock to avoid deadlocks with QueueRegistry.
- haBroker.getBroker().getQueues().eachQueue(
- boost::bind(&RemoteBackup::catchupQueue, backup, _1, createGuards));
- backup->startCatchup();
-}
-
-}} // namespace qpid::ha