/** * Copyright (C) 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication #include "mongo/platform/basic.h" #include "mongo/db/repl/replication_coordinator_external_state_impl.h" #include #include #include #include "mongo/base/status_with.h" #include "mongo/bson/oid.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/service_context.h" #include "mongo/db/jsobj.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/op_observer.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/isself.h" #include "mongo/db/repl/master_slave.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/rs_sync.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/s/d_state.h" #include "mongo/stdx/functional.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/net/message_port.h" #include "mongo/util/net/sock.h" namespace mongo { namespace repl { namespace { const char configCollectionName[] = "local.system.replset"; const char configDatabaseName[] = "local"; const char meCollectionName[] = "local.me"; const char meDatabaseName[] = "local"; const char tsFieldName[] = "ts"; } // namespace ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl() : _startedThreads(false) , _nextThreadId(0) {} ReplicationCoordinatorExternalStateImpl::~ReplicationCoordinatorExternalStateImpl() {} void ReplicationCoordinatorExternalStateImpl::startThreads() { boost::lock_guard lk(_threadMutex); if (_startedThreads) { return; } log() << "Starting replication applier threads"; _applierThread.reset(new boost::thread(runSyncThread)); BackgroundSync* bgsync = BackgroundSync::get(); _producerThread.reset(new boost::thread(stdx::bind(&BackgroundSync::producerThread, bgsync))); _syncSourceFeedbackThread.reset(new boost::thread(stdx::bind(&SyncSourceFeedback::run, &_syncSourceFeedback))); _startedThreads = true; } void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext* txn) { repl::startMasterSlave(txn); } void ReplicationCoordinatorExternalStateImpl::shutdown() { boost::lock_guard lk(_threadMutex); if (_startedThreads) { log() << "Stopping replication applier threads"; _syncSourceFeedback.shutdown(); _syncSourceFeedbackThread->join(); _applierThread->join(); BackgroundSync* bgsync = BackgroundSync::get(); bgsync->shutdown(); _producerThread->join(); } } void ReplicationCoordinatorExternalStateImpl::initiateOplog(OperationContext* txn) { createOplog(txn); ScopedTransaction scopedXact(txn, MODE_X); Lock::GlobalWrite globalWrite(txn->lockState()); WriteUnitOfWork wuow(txn); getGlobalServiceContext()->getOpObserver()->onOpMessage(txn, BSON("msg" << "initiating set")); wuow.commit(); } void ReplicationCoordinatorExternalStateImpl::forwardSlaveProgress() { _syncSourceFeedback.forwardSlaveProgress(); } OID ReplicationCoordinatorExternalStateImpl::ensureMe(OperationContext* txn) { std::string myname = getHostName(); OID myRID; { ScopedTransaction transaction(txn, MODE_IX); Lock::DBLock lock(txn->lockState(), meDatabaseName, MODE_X); BSONObj me; // local.me is an identifier for a server for getLastError w:2+ if (!Helpers::getSingleton(txn, meCollectionName, me) || !me.hasField("host") || me["host"].String() != myname) { myRID = OID::gen(); // clean out local.me Helpers::emptyCollection(txn, meCollectionName); // repopulate BSONObjBuilder b; b.append("_id", myRID); b.append("host", myname); Helpers::putSingleton(txn, meCollectionName, b.done()); } else { myRID = me["_id"].OID(); } } return myRID; } StatusWith ReplicationCoordinatorExternalStateImpl::loadLocalConfigDocument( OperationContext* txn) { try { BSONObj config; if (!Helpers::getSingleton(txn, configCollectionName, config)) { return StatusWith( ErrorCodes::NoMatchingDocument, str::stream() << "Did not find replica set configuration document in " << configCollectionName); } return StatusWith(config); } catch (const DBException& ex) { return StatusWith(ex.toStatus()); } } Status ReplicationCoordinatorExternalStateImpl::storeLocalConfigDocument( OperationContext* txn, const BSONObj& config) { try { ScopedTransaction transaction(txn, MODE_IX); Lock::DBLock dbWriteLock(txn->lockState(), configDatabaseName, MODE_X); Helpers::putSingleton(txn, configCollectionName, config); return Status::OK(); } catch (const DBException& ex) { return ex.toStatus(); } } void ReplicationCoordinatorExternalStateImpl::setGlobalOpTime(const OpTime& newTime) { setNewOptime(newTime); } StatusWith ReplicationCoordinatorExternalStateImpl::loadLastOpTime( OperationContext* txn) { try { BSONObj oplogEntry; if (!Helpers::getLast(txn, rsOplogName.c_str(), oplogEntry)) { return StatusWith( ErrorCodes::NoMatchingDocument, str::stream() << "Did not find any entries in " << rsOplogName); } BSONElement tsElement = oplogEntry[tsFieldName]; if (tsElement.eoo()) { return StatusWith( ErrorCodes::NoSuchKey, str::stream() << "Most recent entry in " << rsOplogName << " missing \"" << tsFieldName << "\" field"); } if (tsElement.type() != Timestamp) { return StatusWith( ErrorCodes::TypeMismatch, str::stream() << "Expected type of \"" << tsFieldName << "\" in most recent " << rsOplogName << " entry to have type Timestamp, but found " << typeName(tsElement.type())); } return StatusWith(tsElement._opTime()); } catch (const DBException& ex) { return StatusWith(ex.toStatus()); } } bool ReplicationCoordinatorExternalStateImpl::isSelf(const HostAndPort& host) { return repl::isSelf(host); } HostAndPort ReplicationCoordinatorExternalStateImpl::getClientHostAndPort( const OperationContext* txn) { return HostAndPort(txn->getClient()->clientAddress(true)); } void ReplicationCoordinatorExternalStateImpl::closeConnections() { MessagingPort::closeAllSockets( ReplicationExecutor::NetworkInterface::kMessagingPortKeepOpen); } void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* txn) { ServiceContext* environment = getGlobalServiceContext(); environment->killAllUserOperations(txn); } void ReplicationCoordinatorExternalStateImpl::clearShardingState() { shardingState.clearCollectionMetadata(); } void ReplicationCoordinatorExternalStateImpl::signalApplierToChooseNewSyncSource() { BackgroundSync::get()->clearSyncTarget(); } OperationContext* ReplicationCoordinatorExternalStateImpl::createOperationContext( const std::string& threadName) { Client::initThreadIfNotAlready(threadName.c_str()); return new OperationContextImpl; } void ReplicationCoordinatorExternalStateImpl::dropAllTempCollections(OperationContext* txn) { std::vector dbNames; StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); storageEngine->listDatabases(&dbNames); for (std::vector::iterator it = dbNames.begin(); it != dbNames.end(); ++it) { // The local db is special because it isn't replicated. It is cleared at startup even on // replica set members. if (*it == "local") continue; LOG(2) << "Removing temporary collections from " << *it; Database* db = dbHolder().get(txn, *it); // Since we must be holding the global lock during this function, if listDatabases // returned this dbname, we should be able to get a reference to it - it can't have // been dropped. invariant(db); db->clearTmpCollections(txn); } } } // namespace repl } // namespace mongo