/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "Backup.h" #include "HaBroker.h" #include "Primary.h" #include "ReplicationTest.h" #include "IdSetter.h" #include "ReplicatingSubscription.h" #include "RemoteBackup.h" #include "ConnectionObserver.h" #include "QueueReplicator.h" #include "qpid/assert.h" #include "qpid/broker/Broker.h" #include "qpid/broker/BrokerObserver.h" #include "qpid/broker/Connection.h" #include "qpid/broker/amqp_0_10/Connection.h" #include "qpid/broker/Queue.h" #include "qpid/broker/SessionHandlerObserver.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/FieldValue.h" #include "qpid/log/Statement.h" #include "qpid/types/Uuid.h" #include "qpid/types/Variant.h" #include "qpid/sys/Timer.h" #include #include #include namespace qpid { namespace ha { using sys::Mutex; using boost::shared_ptr; using boost::intrusive_ptr; using namespace std; using namespace framing; namespace { const std::string CLIENT_PROCESS_NAME("qpid.client_process"); const std::string CLIENT_PID("qpid.client_pid"); const std::string CLIENT_PPID("qpid.client_ppid"); class PrimaryConnectionObserver : public broker::ConnectionObserver { public: PrimaryConnectionObserver(Primary& p) : primary(p) {} void opened(broker::Connection& c) { primary.opened(c); } void closed(broker::Connection& c) { primary.closed(c); } private: Primary& primary; }; class PrimaryBrokerObserver : public broker::BrokerObserver { public: PrimaryBrokerObserver(Primary& p) : primary(p) {} void queueCreate(const Primary::QueuePtr& q) { primary.queueCreate(q); } void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); } void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); } void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q); } private: Primary& primary; }; class ExpectedBackupTimerTask : public sys::TimerTask { public: ExpectedBackupTimerTask(Primary& p, sys::AbsTime deadline) : TimerTask(deadline, "ExpectedBackupTimerTask"), primary(p) {} void fire() { primary.timeoutExpectedBackups(); } private: Primary& primary; }; class PrimaryErrorListener : public broker::SessionHandler::ErrorListener { public: PrimaryErrorListener(const LogPrefix& lp) : logPrefix(lp) {} void connectionException(framing::connection::CloseCode code, const std::string& msg) { QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what()); } void channelException(framing::session::DetachCode code, const std::string& msg) { QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what()); } void executionException(framing::execution::ErrorCode code, const std::string& msg) { QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what()); } void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) { QPID_LOG(debug, logPrefix << "Incoming " << framing::createSessionException(code, msg).what()); } void detach() {} private: const LogPrefix& logPrefix; }; class PrimarySessionHandlerObserver : public broker::SessionHandlerObserver { public: PrimarySessionHandlerObserver(const LogPrefix& logPrefix) : errorListener(new PrimaryErrorListener(logPrefix)) {} void newSessionHandler(broker::SessionHandler& sh) { BrokerInfo info; // Suppress error logging for backup connections // TODO aconway 2014-01-31: Be more selective, suppress only expected errors? if (ha::ConnectionObserver::getBrokerInfo(sh.getConnection(), info)) { sh.setErrorListener(errorListener); } } private: boost::shared_ptr errorListener; }; } // namespace Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : haBroker(hb), membership(hb.getMembership()), logPrefix(hb.logPrefix), active(false), replicationTest(hb.getSettings().replicateDefault.get()), sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)), queueLimits(logPrefix, hb.getBroker().getQueues(), replicationTest) { // Note that at this point, we are still rejecting client connections. // So we are safe from client interference while we set up the primary. hb.getMembership().setStatus(RECOVERING); QPID_LOG(notice, logPrefix << "Promoted to primary"); // Process all QueueReplicators, handles auto-delete queues. QueueReplicator::Vector qrs; QueueReplicator::copy(hb.getBroker().getExchanges(), qrs); std::for_each(qrs.begin(), qrs.end(), boost::bind(&QueueReplicator::promoted, _1)); if (!expect.empty()) { // NOTE: RemoteBackups must be created before we set the BrokerObserver // or ConnectionObserver so that there is no client activity while // the QueueGuards are created. QPID_LOG(notice, logPrefix << "Recovering backups: " << expect); for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) { boost::shared_ptr backup(new RemoteBackup(*i, 0, haBroker.logPrefix)); backups[i->getSystemId()] = backup; if (!backup->isReady()) expectedBackups.insert(backup); setCatchupQueues(backup, true); // Create guards } // Set timeout for expected brokers to connect and become ready. sys::AbsTime deadline(sys::now(), hb.getSettings().backupTimeout); timerTask = new ExpectedBackupTimerTask(*this, deadline); hb.getBroker().getTimer().add(timerTask); } brokerObserver.reset(new PrimaryBrokerObserver(*this)); haBroker.getBroker().getBrokerObservers().add(brokerObserver); haBroker.getBroker().getSessionHandlerObservers().add(sessionHandlerObserver); checkReady(); // Outside lock // Allow client connections connectionObserver.reset(new PrimaryConnectionObserver(*this)); haBroker.getObserver()->setObserver(connectionObserver); } Primary::~Primary() { if (timerTask) timerTask->cancel(); haBroker.getBroker().getBrokerObservers().remove(brokerObserver); haBroker.getBroker().getSessionHandlerObservers().remove(sessionHandlerObserver); haBroker.getObserver()->reset(); } void Primary::checkReady() { bool activate = false; { Mutex::ScopedLock l(lock); if (!active && expectedBackups.empty()) activate = active = true; } if (activate) { membership.setStatus(ACTIVE); // Outside of lock. QPID_LOG(notice, logPrefix << "All backups recovered."); } } void Primary::checkReady(boost::shared_ptr backup) { bool ready = false; { Mutex::ScopedLock l(lock); if (backup->reportReady()) { BrokerInfo info = backup->getBrokerInfo(); info.setStatus(READY); membership.add(info); if (expectedBackups.erase(backup)) { QPID_LOG(info, logPrefix << "Recovering backup is ready: " << info); ready = true; } else QPID_LOG(info, logPrefix << "New backup is ready: " << info); } } if (ready) checkReady(); // Outside lock } void Primary::timeoutExpectedBackups() { try { sys::Mutex::ScopedLock l(lock); if (active) return; // Already activated // Remove records for any expectedBackups that are not yet connected // Allow backups that are connected to continue becoming ready. for (BackupSet::iterator i = expectedBackups.begin(); i != expectedBackups.end();) { // This loop erases elements of backups in backupDisconnect, so // save and increment the iterator. BackupSet::iterator j = i++; boost::shared_ptr backup = *j; if (!backup->getConnection()) { BrokerInfo info = backup->getBrokerInfo(); QPID_LOG(error, logPrefix << "Recovering backup timed out: " << info); backupDisconnect(backup, l); // Calls erase(j) // Keep broker in membership but downgrade status to CATCHUP. // The broker will get this status change when it eventually connects. info.setStatus(CATCHUP); membership.add(info); } } } catch(const std::exception& e) { QPID_LOG(error, logPrefix << "Error timing out backups: " << e.what()); // No-where for this exception to go. } checkReady(); } void Primary::readyReplica(const ReplicatingSubscription& rs) { shared_ptr backup; { sys::Mutex::ScopedLock l(lock); BackupMap::iterator i = backups.find(rs.getBrokerInfo().getSystemId()); if (i != backups.end()) { backup = i->second; backup->ready(rs.getQueue()); } } if (backup) checkReady(backup); } void Primary::addReplica(ReplicatingSubscription& rs) { // Note this is called before the ReplicatingSubscription has been activated // on the queue. sys::Mutex::ScopedLock l(lock); replicas[make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())] = &rs; } // NOTE: Called with queue registry lock held. void Primary::queueCreate(const QueuePtr& q) { // Set replication argument. ReplicateLevel level = replicationTest.useLevel(*q); q->addArgument(QPID_REPLICATE, printable(level).str()); if (level) { QPID_LOG(debug, logPrefix << "Created queue " << q->getName() << " replication: " << printable(level)); // Give each queue a unique id. Used by backups to avoid confusion of // same-named queues. q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true))); { Mutex::ScopedLock l(lock); queueLimits.addQueue(q); // Throws if limit exceeded for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) i->second->queueCreate(q); } checkReady(); // Outside lock } } // NOTE: Called with queue registry lock held. void Primary::queueDestroy(const QueuePtr& q) { if (replicationTest.useLevel(*q)) { QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName()); { Mutex::ScopedLock l(lock); queueLimits.removeQueue(q); for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) i->second->queueDestroy(q); } checkReady(); // Outside lock } } // NOTE: Called with exchange registry lock held. void Primary::exchangeCreate(const ExchangePtr& ex) { ReplicateLevel level = replicationTest.useLevel(*ex); FieldTable args = ex->getArgs(); args.setString(QPID_REPLICATE, printable(level).str()); // Set replication arg. if (level) { QPID_LOG(debug, logPrefix << "Created exchange " << ex->getName() << " replication: " << printable(level)); // Give each exchange a unique id to avoid confusion of same-named exchanges. args.set(QPID_HA_UUID, FieldTable::ValuePtr(new UuidValue(Uuid(true).data()))); } ex->setArgs(args); } // NOTE: Called with exchange registry lock held. void Primary::exchangeDestroy(const ExchangePtr& ex) { if (replicationTest.useLevel(*ex)) { QPID_LOG(debug, logPrefix << "Destroyed exchange " << ex->getName()); // Do nothing } } // New backup connected shared_ptr Primary::backupConnect( const BrokerInfo& info, broker::Connection& connection, Mutex::ScopedLock&) { shared_ptr backup(new RemoteBackup(info, &connection, haBroker.logPrefix)); queueLimits.addBackup(backup); backups[info.getSystemId()] = backup; return backup; } // Remove a backup. Caller should not release the shared pointer returend till // outside the lock. void Primary::backupDisconnect(shared_ptr backup, Mutex::ScopedLock&) { queueLimits.addBackup(backup); types::Uuid id = backup->getBrokerInfo().getSystemId(); backup->cancel(); expectedBackups.erase(backup); backups.erase(id); membership.remove(id); } void Primary::opened(broker::Connection& connection) { BrokerInfo info; shared_ptr backup; if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { Mutex::ScopedLock l(lock); BackupMap::iterator i = backups.find(info.getSystemId()); if (info.getStatus() == JOINING) { info.setStatus(CATCHUP); membership.add(info); } if (i == backups.end()) { if (info.getStatus() == JOINING) { info.setStatus(CATCHUP); membership.add(info); } QPID_LOG(info, logPrefix << "New backup connection: " << info); backup = backupConnect(info, connection, l); } else if (i->second->getConnection()) { // The backup is failing over before we recieved the closed() call // for its previous connection. Remove the old entry and create a new one. QPID_LOG(error, logPrefix << "Known backup reconnect before disconnection: " << info); backupDisconnect(i->second, l); backup = backupConnect(info, connection, l); } else { QPID_LOG(info, logPrefix << "Known backup reconnection: " << info); i->second->setConnection(&connection); backup = i->second; } } else { const types::Variant::Map& properties = connection.getClientProperties(); std::ostringstream pinfo; types::Variant::Map::const_iterator i = properties.find(CLIENT_PROCESS_NAME); // FIXME aconway 2014-08-13: Conditional on logging. if (i != properties.end()) { pinfo << " " << i->second; i = properties.find(CLIENT_PID); if (i != properties.end()) pinfo << "(" << i->second << ")"; } QPID_LOG(info, logPrefix << "Accepted client connection " << connection.getMgmtId() << pinfo.str()); } // Outside lock if (backup) { setCatchupQueues(backup, false); checkReady(backup); } checkReady(); } void Primary::closed(broker::Connection& connection) { BrokerInfo info; shared_ptr backup; if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { Mutex::ScopedLock l(lock); BackupMap::iterator i = backups.find(info.getSystemId()); // NOTE: It is possible for a backup connection to be rejected while we // are a backup, but closed() is called after we have become primary. // Checking isConnected() lets us ignore such spurious closes. if (i == backups.end()) { QPID_LOG(info, logPrefix << "Disconnect from unknown backup " << info); } else if (i->second->getConnection() != &connection) { QPID_LOG(info, logPrefix << "Late disconnect from backup " << info); } else { QPID_LOG(info, logPrefix << "Disconnect from " << (i->second->getConnection() ? "" : "disconnected ") << "backup " << info); // Assign to shared_ptr so it will be deleted after we release the lock. backup = i->second; backupDisconnect(backup, l); } } checkReady(); } boost::shared_ptr Primary::getGuard(const QueuePtr& q, const BrokerInfo& info) { Mutex::ScopedLock l(lock); BackupMap::iterator i = backups.find(info.getSystemId()); return i == backups.end() ? boost::shared_ptr() : i->second->guard(q); } Role* Primary::promote() { QPID_LOG(info, logPrefix << "Ignoring promotion, already primary"); return 0; } void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards) { // Do queue iteration outside the lock to avoid deadlocks with QueueRegistry. haBroker.getBroker().getQueues().eachQueue( boost::bind(&RemoteBackup::catchupQueue, backup, _1, createGuards)); backup->startCatchup(); } }} // namespace qpid::ha