diff options
author | Robert Gemmell <robbie@apache.org> | 2016-07-05 21:55:35 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2016-07-05 21:55:35 +0000 |
commit | f160cb6566c17945f7ebc4f3a752b2cc6a051685 (patch) | |
tree | 809f04fc1967c22e5abc52de07602555bed0e920 /qpid/cpp/src/qpid/ha/Primary.cpp | |
parent | ebb276cca41582b73223b55eff9f2d4386f4f746 (diff) | |
download | qpid-python-f160cb6566c17945f7ebc4f3a752b2cc6a051685.tar.gz |
QPID-7207: remove cpp and python subdirs from svn trunk, they have migrated to their own git repositories
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1751566 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/Primary.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 439 |
1 files changed, 0 insertions, 439 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp deleted file mode 100644 index 3a9c9a86d9..0000000000 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ /dev/null @@ -1,439 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "Backup.h" -#include "HaBroker.h" -#include "Primary.h" -#include "ReplicationTest.h" -#include "IdSetter.h" -#include "ReplicatingSubscription.h" -#include "RemoteBackup.h" -#include "ConnectionObserver.h" -#include "QueueReplicator.h" -#include "qpid/assert.h" -#include "qpid/broker/Broker.h" -#include "qpid/broker/BrokerObserver.h" -#include "qpid/broker/Connection.h" -#include "qpid/broker/amqp_0_10/Connection.h" -#include "qpid/broker/Queue.h" -#include "qpid/broker/SessionHandlerObserver.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/framing/FieldValue.h" -#include "qpid/log/Statement.h" -#include "qpid/types/Uuid.h" -#include "qpid/types/Variant.h" -#include "qpid/sys/Timer.h" -#include <boost/bind.hpp> -#include <boost/shared_ptr.hpp> -#include <boost/intrusive_ptr.hpp> - -namespace qpid { -namespace ha { - -using sys::Mutex; -using boost::shared_ptr; -using boost::intrusive_ptr; -using namespace std; -using namespace framing; - -namespace { - -const std::string CLIENT_PROCESS_NAME("qpid.client_process"); -const std::string CLIENT_PID("qpid.client_pid"); -const std::string CLIENT_PPID("qpid.client_ppid"); - -class PrimaryConnectionObserver : public broker::ConnectionObserver -{ - public: - PrimaryConnectionObserver(Primary& p) : primary(p) {} - void opened(broker::Connection& c) { primary.opened(c); } - void closed(broker::Connection& c) { primary.closed(c); } - private: - Primary& primary; -}; - -class PrimaryBrokerObserver : public broker::BrokerObserver -{ - public: - PrimaryBrokerObserver(Primary& p) : primary(p) {} - void queueCreate(const Primary::QueuePtr& q) { primary.queueCreate(q); } - void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); } - void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); } - void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q); } - - private: - Primary& primary; -}; - -class ExpectedBackupTimerTask : public sys::TimerTask { - public: - ExpectedBackupTimerTask(Primary& p, sys::AbsTime deadline) - : TimerTask(deadline, "ExpectedBackupTimerTask"), primary(p) {} - void fire() { primary.timeoutExpectedBackups(); } - private: - Primary& primary; -}; - -class PrimaryErrorListener : public broker::SessionHandler::ErrorListener { - public: - PrimaryErrorListener(const LogPrefix& lp) : logPrefix(lp) {} - - void connectionException(framing::connection::CloseCode code, const std::string& msg) { - QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what()); - } - void channelException(framing::session::DetachCode code, const std::string& msg) { - QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what()); - } - void executionException(framing::execution::ErrorCode code, const std::string& msg) { - QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what()); - } - void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) { - QPID_LOG(debug, logPrefix << "Incoming " << framing::createSessionException(code, msg).what()); - } - void detach() {} - - private: - const LogPrefix& logPrefix; -}; - -class PrimarySessionHandlerObserver : public broker::SessionHandlerObserver { - public: - PrimarySessionHandlerObserver(const LogPrefix& logPrefix) - : errorListener(new PrimaryErrorListener(logPrefix)) {} - void newSessionHandler(broker::SessionHandler& sh) { - BrokerInfo info; - // Suppress error logging for backup connections - // TODO aconway 2014-01-31: Be more selective, suppress only expected errors? - if (ha::ConnectionObserver::getBrokerInfo(sh.getConnection(), info)) { - sh.setErrorListener(errorListener); - } - } - private: - boost::shared_ptr<PrimaryErrorListener> errorListener; -}; - - -} // namespace - -Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : - haBroker(hb), membership(hb.getMembership()), - logPrefix(hb.logPrefix), active(false), - replicationTest(hb.getSettings().replicateDefault.get()), - sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)), - queueLimits(logPrefix, hb.getBroker().getQueues(), replicationTest) -{ - // Note that at this point, we are still rejecting client connections. - // So we are safe from client interference while we set up the primary. - - hb.getMembership().setStatus(RECOVERING); - QPID_LOG(notice, logPrefix << "Promoted to primary"); - - // Process all QueueReplicators, handles auto-delete queues. - QueueReplicator::Vector qrs; - QueueReplicator::copy(hb.getBroker().getExchanges(), qrs); - std::for_each(qrs.begin(), qrs.end(), boost::bind(&QueueReplicator::promoted, _1)); - - if (!expect.empty()) { - // NOTE: RemoteBackups must be created before we set the BrokerObserver - // or ConnectionObserver so that there is no client activity while - // the QueueGuards are created. - QPID_LOG(notice, logPrefix << "Recovering backups: " << expect); - for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) { - boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0, haBroker.logPrefix)); - backups[i->getSystemId()] = backup; - if (!backup->isReady()) expectedBackups.insert(backup); - setCatchupQueues(backup, true); // Create guards - } - // Set timeout for expected brokers to connect and become ready. - sys::AbsTime deadline(sys::now(), hb.getSettings().backupTimeout); - timerTask = new ExpectedBackupTimerTask(*this, deadline); - hb.getBroker().getTimer().add(timerTask); - } - brokerObserver.reset(new PrimaryBrokerObserver(*this)); - haBroker.getBroker().getBrokerObservers().add(brokerObserver); - haBroker.getBroker().getSessionHandlerObservers().add(sessionHandlerObserver); - - checkReady(); // Outside lock - - // Allow client connections - connectionObserver.reset(new PrimaryConnectionObserver(*this)); - haBroker.getObserver()->setObserver(connectionObserver); -} - -Primary::~Primary() { - if (timerTask) timerTask->cancel(); - haBroker.getBroker().getBrokerObservers().remove(brokerObserver); - haBroker.getBroker().getSessionHandlerObservers().remove(sessionHandlerObserver); - haBroker.getObserver()->reset(); -} - -void Primary::checkReady() { - bool activate = false; - { - Mutex::ScopedLock l(lock); - if (!active && expectedBackups.empty()) - activate = active = true; - } - if (activate) { - membership.setStatus(ACTIVE); // Outside of lock. - QPID_LOG(notice, logPrefix << "All backups recovered."); - } -} - -void Primary::checkReady(boost::shared_ptr<RemoteBackup> backup) { - bool ready = false; - { - Mutex::ScopedLock l(lock); - if (backup->reportReady()) { - BrokerInfo info = backup->getBrokerInfo(); - info.setStatus(READY); - membership.add(info); - if (expectedBackups.erase(backup)) { - QPID_LOG(info, logPrefix << "Recovering backup is ready: " << info); - ready = true; - } - else - QPID_LOG(info, logPrefix << "New backup is ready: " << info); - } - } - if (ready) checkReady(); // Outside lock -} - -void Primary::timeoutExpectedBackups() { - try { - sys::Mutex::ScopedLock l(lock); - if (active) return; // Already activated - // Remove records for any expectedBackups that are not yet connected - // Allow backups that are connected to continue becoming ready. - for (BackupSet::iterator i = expectedBackups.begin(); i != expectedBackups.end();) - { - // This loop erases elements of backups in backupDisconnect, so - // save and increment the iterator. - BackupSet::iterator j = i++; - boost::shared_ptr<RemoteBackup> backup = *j; - if (!backup->getConnection()) { - BrokerInfo info = backup->getBrokerInfo(); - QPID_LOG(error, logPrefix << "Recovering backup timed out: " << info); - backupDisconnect(backup, l); // Calls erase(j) - // Keep broker in membership but downgrade status to CATCHUP. - // The broker will get this status change when it eventually connects. - info.setStatus(CATCHUP); - membership.add(info); - } - } - } - catch(const std::exception& e) { - QPID_LOG(error, logPrefix << "Error timing out backups: " << e.what()); - // No-where for this exception to go. - } - checkReady(); -} - -void Primary::readyReplica(const ReplicatingSubscription& rs) { - shared_ptr<RemoteBackup> backup; - { - sys::Mutex::ScopedLock l(lock); - BackupMap::iterator i = backups.find(rs.getBrokerInfo().getSystemId()); - if (i != backups.end()) { - backup = i->second; - backup->ready(rs.getQueue()); - } - } - if (backup) checkReady(backup); -} - -// NOTE: Called with queue registry lock held. -void Primary::queueCreate(const QueuePtr& q) { - // Set replication argument. - ReplicateLevel level = replicationTest.useLevel(*q); - q->addArgument(QPID_REPLICATE, printable(level).str()); - if (level) { - QPID_LOG(debug, logPrefix << "Created queue " << q->getName() - << " replication: " << printable(level)); - // Give each queue a unique id. Used by backups to avoid confusion of - // same-named queues. - q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true))); - { - Mutex::ScopedLock l(lock); - queueLimits.addQueue(q); // Throws if limit exceeded - for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) - i->second->queueCreate(q); - } - checkReady(); // Outside lock - } -} - -// NOTE: Called with queue registry lock held. -void Primary::queueDestroy(const QueuePtr& q) { - if (replicationTest.useLevel(*q)) { - QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName()); - { - Mutex::ScopedLock l(lock); - queueLimits.removeQueue(q); - for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) - i->second->queueDestroy(q); - } - checkReady(); // Outside lock - } -} - -// NOTE: Called with exchange registry lock held. -void Primary::exchangeCreate(const ExchangePtr& ex) { - ReplicateLevel level = replicationTest.useLevel(*ex); - FieldTable args = ex->getArgs(); - args.setString(QPID_REPLICATE, printable(level).str()); // Set replication arg. - if (level) { - QPID_LOG(debug, logPrefix << "Created exchange " << ex->getName() - << " replication: " << printable(level)); - // Give each exchange a unique id to avoid confusion of same-named exchanges. - args.set(QPID_HA_UUID, FieldTable::ValuePtr(new UuidValue(Uuid(true).data()))); - } - ex->setArgs(args); -} - -// NOTE: Called with exchange registry lock held. -void Primary::exchangeDestroy(const ExchangePtr& ex) { - if (replicationTest.useLevel(*ex)) { - QPID_LOG(debug, logPrefix << "Destroyed exchange " << ex->getName()); - // Do nothing - } - } - -// New backup connected -shared_ptr<RemoteBackup> Primary::backupConnect( - const BrokerInfo& info, broker::Connection& connection, Mutex::ScopedLock&) -{ - shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection, haBroker.logPrefix)); - queueLimits.addBackup(backup); - backups[info.getSystemId()] = backup; - return backup; -} - -// Remove a backup. Caller should not release the shared pointer returend till -// outside the lock. -void Primary::backupDisconnect(shared_ptr<RemoteBackup> backup, Mutex::ScopedLock&) { - queueLimits.addBackup(backup); - types::Uuid id = backup->getBrokerInfo().getSystemId(); - backup->cancel(); - expectedBackups.erase(backup); - backups.erase(id); - membership.remove(id); -} - - -void Primary::opened(broker::Connection& connection) { - BrokerInfo info; - shared_ptr<RemoteBackup> backup; - if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { - Mutex::ScopedLock l(lock); - BackupMap::iterator i = backups.find(info.getSystemId()); - if (info.getStatus() == JOINING) { - info.setStatus(CATCHUP); - membership.add(info); - } - if (i == backups.end()) { - if (info.getStatus() == JOINING) { - info.setStatus(CATCHUP); - membership.add(info); - } - QPID_LOG(info, logPrefix << "New backup connection: " << info); - backup = backupConnect(info, connection, l); - } - else if (i->second->getConnection()) { - // The backup is failing over before we recieved the closed() call - // for its previous connection. Remove the old entry and create a new one. - QPID_LOG(error, logPrefix << "Known backup reconnect before disconnection: " << info); - backupDisconnect(i->second, l); - backup = backupConnect(info, connection, l); - } else { - QPID_LOG(info, logPrefix << "Known backup reconnection: " << info); - i->second->setConnection(&connection); - backup = i->second; - } - } - else { - const types::Variant::Map& properties = connection.getClientProperties(); - std::ostringstream pinfo; - types::Variant::Map::const_iterator i = properties.find(CLIENT_PROCESS_NAME); - // FIXME aconway 2014-08-13: Conditional on logging. - if (i != properties.end()) { - pinfo << " " << i->second; - i = properties.find(CLIENT_PID); - if (i != properties.end()) - pinfo << "(" << i->second << ")"; - } - QPID_LOG(info, logPrefix << "Accepted client connection " << connection.getMgmtId() << pinfo.str()); - } - - // Outside lock - if (backup) { - setCatchupQueues(backup, false); - checkReady(backup); - } - checkReady(); -} - -void Primary::closed(broker::Connection& connection) { - BrokerInfo info; - shared_ptr<RemoteBackup> backup; - if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { - Mutex::ScopedLock l(lock); - BackupMap::iterator i = backups.find(info.getSystemId()); - // NOTE: It is possible for a backup connection to be rejected while we - // are a backup, but closed() is called after we have become primary. - // Checking isConnected() lets us ignore such spurious closes. - if (i == backups.end()) { - QPID_LOG(info, logPrefix << "Disconnect from unknown backup " << info); - } - else if (i->second->getConnection() != &connection) { - QPID_LOG(info, logPrefix << "Late disconnect from backup " << info); - } - else { - QPID_LOG(info, logPrefix << "Disconnect from " - << (i->second->getConnection() ? "" : "disconnected ") - << "backup " << info); - // Assign to shared_ptr so it will be deleted after we release the lock. - backup = i->second; - backupDisconnect(backup, l); - } - } - checkReady(); -} - -boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerInfo& info) -{ - Mutex::ScopedLock l(lock); - BackupMap::iterator i = backups.find(info.getSystemId()); - return i == backups.end() ? boost::shared_ptr<QueueGuard>() : i->second->guard(q); -} - -Role* Primary::promote() { - QPID_LOG(info, logPrefix << "Ignoring promotion, already primary"); - return 0; -} - -void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards) { - // Do queue iteration outside the lock to avoid deadlocks with QueueRegistry. - haBroker.getBroker().getQueues().eachQueue( - boost::bind(&RemoteBackup::catchupQueue, backup, _1, createGuards)); - backup->startCatchup(); -} - -}} // namespace qpid::ha |