/** * Copyright (C) 2018-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplicationInitialSync #include "mongo/platform/basic.h" #include "mongo/db/repl/databases_cloner.h" #include #include #include #include "mongo/client/remote_command_retry_scheduler.h" #include "mongo/db/catalog/collection_options.h" #include "mongo/db/client.h" #include "mongo/db/repl/databases_cloner_gen.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/stdx/functional.h" #include "mongo/util/assert_util.h" #include "mongo/util/destructor_guard.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" namespace mongo { namespace repl { namespace { using Request = executor::RemoteCommandRequest; using Response = executor::RemoteCommandResponse; using LockGuard = stdx::lock_guard; using UniqueLock = stdx::unique_lock; } // namespace DatabasesCloner::DatabasesCloner(StorageInterface* si, executor::TaskExecutor* exec, ThreadPool* dbWorkThreadPool, HostAndPort source, IncludeDbFilterFn includeDbPred, OnFinishFn finishFn) : _status(ErrorCodes::NotYetInitialized, ""), _exec(exec), _dbWorkThreadPool(dbWorkThreadPool), _source(source), _includeDbFn(includeDbPred), _finishFn(finishFn), _storage(si) { uassert(ErrorCodes::InvalidOptions, "storage interface must be provided.", si); uassert(ErrorCodes::InvalidOptions, "executor must be provided.", exec); uassert( ErrorCodes::InvalidOptions, "db worker thread pool must be provided.", dbWorkThreadPool); uassert(ErrorCodes::InvalidOptions, "source must be provided.", !source.empty()); uassert(ErrorCodes::InvalidOptions, "finishFn must be provided.", finishFn); uassert(ErrorCodes::InvalidOptions, "includeDbPred must be provided.", includeDbPred); }; DatabasesCloner::~DatabasesCloner() { DESTRUCTOR_GUARD(shutdown(); join();); } std::string DatabasesCloner::toString() const { LockGuard lk(_mutex); return str::stream() << "initial sync --" << " active:" << _isActive_inlock() << " status:" << _status.toString() << " source:" << _source.toString() << " db cloners completed:" << _stats.databasesCloned << " db count:" << _databaseCloners.size(); } void DatabasesCloner::join() { if (auto listDatabaseScheduler = _getListDatabasesScheduler()) { listDatabaseScheduler->join(); } auto databaseCloners = _getDatabaseCloners(); for (auto&& cloner : databaseCloners) { cloner->join(); } } void DatabasesCloner::shutdown() { { LockGuard lock(_mutex); switch (_state) { case State::kPreStart: // Transition directly from PreStart to Complete if not started yet. _state = State::kComplete; return; case State::kRunning: _state = State::kShuttingDown; break; case State::kShuttingDown: case State::kComplete: // Nothing to do if we are already in ShuttingDown or Complete state. return; } } if (auto listDatabaseScheduler = _getListDatabasesScheduler()) { listDatabaseScheduler->shutdown(); } auto databaseCloners = _getDatabaseCloners(); for (auto&& cloner : databaseCloners) { cloner->shutdown(); } } bool DatabasesCloner::isActive() { LockGuard lk(_mutex); return _isActive_inlock(); } bool DatabasesCloner::_isActive_inlock() const { return State::kRunning == _state || State::kShuttingDown == _state; } Status DatabasesCloner::getStatus() { LockGuard lk(_mutex); return _status; } DatabasesCloner::Stats DatabasesCloner::getStats() const { LockGuard lk(_mutex); DatabasesCloner::Stats stats = _stats; for (auto&& databaseCloner : _databaseCloners) { stats.databaseStats.emplace_back(databaseCloner->getStats()); } return stats; } std::string DatabasesCloner::Stats::toString() const { return toBSON().toString(); } BSONObj DatabasesCloner::Stats::toBSON() const { BSONObjBuilder bob; append(&bob); return bob.obj(); } void DatabasesCloner::Stats::append(BSONObjBuilder* builder) const { builder->appendNumber("databasesCloned", databasesCloned); for (auto&& db : databaseStats) { BSONObjBuilder dbBuilder(builder->subobjStart(db.dbname)); db.append(&dbBuilder); dbBuilder.doneFast(); } } Status DatabasesCloner::startup() noexcept { LockGuard lk(_mutex); switch (_state) { case State::kPreStart: _state = State::kRunning; break; case State::kRunning: return Status(ErrorCodes::InternalError, "databases cloner already started"); case State::kShuttingDown: return Status(ErrorCodes::ShutdownInProgress, "databases cloner shutting down"); case State::kComplete: return Status(ErrorCodes::ShutdownInProgress, "databases cloner completed"); } if (!_status.isOK() && _status.code() != ErrorCodes::NotYetInitialized) { return _status; } // Schedule listDatabase command which will kick off the database cloner per result db. We only // retrieve database names since computing & fetching all database stats can be costly on the // remote node when there are a large number of collections. Request listDBsReq(_source, "admin", BSON("listDatabases" << true << "nameOnly" << true), ReadPreferenceSetting::secondaryPreferredMetadata(), nullptr); _listDBsScheduler = stdx::make_unique( _exec, listDBsReq, [this](const auto& x) { this->_onListDatabaseFinish(x); }, RemoteCommandRetryScheduler::makeRetryPolicy( numInitialSyncListDatabasesAttempts.load(), executor::RemoteCommandRequest::kNoTimeout, RemoteCommandRetryScheduler::kAllRetriableErrors)); _status = _listDBsScheduler->startup(); if (!_status.isOK()) { _state = State::kComplete; return _status; } return Status::OK(); } void DatabasesCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& work) { LockGuard lk(_mutex); _scheduleDbWorkFn = work; } void DatabasesCloner::setStartCollectionClonerFn( const StartCollectionClonerFn& startCollectionCloner) { LockGuard lk(_mutex); _startCollectionClonerFn = startCollectionCloner; } StatusWith> DatabasesCloner::parseListDatabasesResponse_forTest( BSONObj dbResponse) { return _parseListDatabasesResponse(dbResponse); } void DatabasesCloner::setAdminAsFirst_forTest(std::vector& dbsArray) { _setAdminAsFirst(dbsArray); } StatusWith> DatabasesCloner::_parseListDatabasesResponse( BSONObj dbResponse) { if (!dbResponse.hasField("databases")) { return Status(ErrorCodes::BadValue, "The 'listDatabases' response does not contain a 'databases' field."); } BSONElement response = dbResponse["databases"]; try { return response.Array(); } catch (const AssertionException&) { return Status(ErrorCodes::BadValue, "The 'listDatabases' response is unable to be transformed into an array."); } } void DatabasesCloner::_setAdminAsFirst(std::vector& dbsArray) { auto adminIter = std::find_if(dbsArray.begin(), dbsArray.end(), [](BSONElement elem) { if (!elem.isABSONObj()) { return false; } auto bsonObj = elem.Obj(); std::string databaseName = bsonObj.getStringField("name"); return (databaseName == "admin"); }); if (adminIter != dbsArray.end()) { std::iter_swap(adminIter, dbsArray.begin()); } } void DatabasesCloner::_onListDatabaseFinish( const executor::TaskExecutor::RemoteCommandCallbackArgs& cbd) { Status respStatus = cbd.response.status; if (respStatus.isOK()) { respStatus = getStatusFromCommandResult(cbd.response.data); } UniqueLock lk(_mutex); if (!respStatus.isOK()) { LOG(1) << "'listDatabases' failed: " << respStatus; _fail_inlock(&lk, respStatus); return; } // There should not be any cloners yet. invariant(_databaseCloners.size() == 0); const auto respBSON = cbd.response.data; auto databasesArray = _parseListDatabasesResponse(respBSON); if (!databasesArray.isOK()) { LOG(1) << "'listDatabases' returned a malformed response: " << databasesArray.getStatus().toString(); _fail_inlock(&lk, databasesArray.getStatus()); return; } auto dbsArray = databasesArray.getValue(); // Ensure that the 'admin' database is the first element in the array of databases so that it // will be the first to be cloned. This allows users to authenticate against a database while // initial sync is occurring. _setAdminAsFirst(dbsArray); for (BSONElement arrayElement : dbsArray) { const BSONObj dbBSON = arrayElement.Obj(); // Check to see if we want to exclude this db from the clone. if (!_includeDbFn(dbBSON)) { LOG(1) << "Excluding database from the 'listDatabases' response: " << dbBSON; continue; } if (!dbBSON.hasField("name")) { LOG(1) << "Excluding database due to the 'listDatabases' response not containing a " "'name' field for this entry: " << dbBSON; } const std::string dbName = dbBSON["name"].str(); std::shared_ptr dbCloner{nullptr}; // filters for DatabasesCloner. const auto collectionFilterPred = [dbName](const BSONObj& collInfo) { const auto collName = collInfo["name"].str(); const NamespaceString ns(dbName, collName); if (ns.isSystem() && !ns.isLegalClientSystemNS()) { LOG(1) << "Skipping 'system' collection: " << ns.ns(); return false; } if (!ns.isNormal()) { LOG(1) << "Skipping non-normal collection: " << ns.ns(); return false; } LOG(2) << "Allowing cloning of collectionInfo: " << collInfo; return true; }; const auto onCollectionFinish = [](const Status& status, const NamespaceString& srcNss) { if (status.isOK()) { LOG(1) << "collection clone finished: " << srcNss; } else { error() << "collection clone for '" << srcNss << "' failed due to " << status.toString(); } }; const auto onDbFinish = [this, dbName](const Status& status) { _onEachDBCloneFinish(status, dbName); }; Status startStatus = Status::OK(); try { dbCloner.reset(new DatabaseCloner( _exec, _dbWorkThreadPool, _source, dbName, BSONObj(), // do not filter collections out during listCollections call. collectionFilterPred, _storage, // use storage provided. onCollectionFinish, onDbFinish)); if (_scheduleDbWorkFn) { dbCloner->setScheduleDbWorkFn_forTest(_scheduleDbWorkFn); } if (_startCollectionClonerFn) { dbCloner->setStartCollectionClonerFn(_startCollectionClonerFn); } // Start first database cloner. if (_databaseCloners.empty()) { startStatus = dbCloner->startup(); } } catch (...) { startStatus = exceptionToStatus(); } if (!startStatus.isOK()) { std::string err = str::stream() << "could not create cloner for database: " << dbName << " due to: " << startStatus.toString(); _setStatus_inlock({ErrorCodes::InitialSyncFailure, err}); error() << err; break; // exit for_each loop } // add cloner to list. _databaseCloners.push_back(dbCloner); } if (_databaseCloners.size() == 0) { if (_status.isOK()) { _succeed_inlock(&lk); } else { _fail_inlock(&lk, _status); } } } std::vector> DatabasesCloner::_getDatabaseCloners() const { LockGuard lock(_mutex); return _databaseCloners; } RemoteCommandRetryScheduler* DatabasesCloner::_getListDatabasesScheduler() const { LockGuard lock(_mutex); return _listDBsScheduler.get(); } void DatabasesCloner::_onEachDBCloneFinish(const Status& status, const std::string& name) { UniqueLock lk(_mutex); if (!status.isOK()) { warning() << "database '" << name << "' (" << (_stats.databasesCloned + 1) << " of " << _databaseCloners.size() << ") clone failed due to " << status.toString(); _fail_inlock(&lk, status); return; } if (StringData(name).equalCaseInsensitive("admin")) { LOG(1) << "Finished the 'admin' db, now calling isAdminDbValid."; // Do special checks for the admin database because of auth. collections. auto adminStatus = Status(ErrorCodes::NotYetInitialized, ""); { // TODO: Move isAdminDbValid() out of the collection/database cloner code paths. OperationContext* opCtx = cc().getOperationContext(); ServiceContext::UniqueOperationContext opCtxPtr; if (!opCtx) { opCtxPtr = cc().makeOperationContext(); opCtx = opCtxPtr.get(); } adminStatus = _storage->isAdminDbValid(opCtx); } if (!adminStatus.isOK()) { LOG(1) << "Validation failed on 'admin' db due to " << adminStatus; _fail_inlock(&lk, adminStatus); return; } } _stats.databasesCloned++; if (_stats.databasesCloned == _databaseCloners.size()) { _succeed_inlock(&lk); return; } // Start next database cloner. auto&& dbCloner = _databaseCloners[_stats.databasesCloned]; auto startStatus = dbCloner->startup(); if (!startStatus.isOK()) { warning() << "failed to schedule database '" << name << "' (" << (_stats.databasesCloned + 1) << " of " << _databaseCloners.size() << ") due to " << startStatus.toString(); _fail_inlock(&lk, startStatus); return; } } void DatabasesCloner::_fail_inlock(UniqueLock* lk, Status status) { LOG(3) << "DatabasesCloner::_fail_inlock called"; if (!_isActive_inlock()) { return; } _setStatus_inlock(status); // TODO: shutdown outstanding work, like any cloners active invariant(_finishFn); auto finish = _finishFn; _finishFn = {}; lk->unlock(); LOG(3) << "DatabasesCloner - calling _finishFn with status: " << _status; finish(status); // Release any resources that might be held by the '_finishFn' (moved to 'finish') function // object. finish = OnFinishFn(); lk->lock(); invariant(_state != State::kComplete); _state = State::kComplete; } void DatabasesCloner::_succeed_inlock(UniqueLock* lk) { LOG(3) << "DatabasesCloner::_succeed_inlock called"; const auto status = Status::OK(); _setStatus_inlock(status); invariant(_finishFn); auto finish = _finishFn; _finishFn = OnFinishFn(); lk->unlock(); LOG(3) << "DatabasesCloner - calling _finishFn with status OK"; finish(status); // Release any resources that might be held by the '_finishFn' (moved to 'finish') function // object. finish = OnFinishFn(); lk->lock(); invariant(_state != State::kComplete); _state = State::kComplete; } void DatabasesCloner::_setStatus_inlock(Status s) { // Only set the first time called, all subsequent failures are not recorded --only first. if (!s.isOK() && (_status.isOK() || _status == ErrorCodes::NotYetInitialized)) { LOG(1) << "setting DatabasesCloner status to " << s; _status = s; } } } // namespace repl } // namespace mongo