/** * Copyright (C) 2008 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication #include "mongo/platform/basic.h" #include "mongo/db/repl/rs_initialsync.h" #include #include "mongo/bson/timestamp.h" #include "mongo/bson/util/bson_extract.h" #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_manager_global.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/document_validation.h" #include "mongo/db/client.h" #include "mongo/db/cloner.h" #include "mongo/db/commands/list_collections_filter.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/op_observer.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/data_replicator.h" #include "mongo/db/repl/initial_sync.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplogreader.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_external_state.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/service_context.h" #include "mongo/stdx/memory.h" #include "mongo/util/exit.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/socket_exception.h" #include "mongo/util/scopeguard.h" namespace mongo { namespace repl { namespace { using std::list; using std::string; /** * Truncates the oplog (removes any documents) and resets internal variables that were * originally initialized or affected by using values from the oplog at startup time. These * include the last applied optime, the last fetched optime, and the sync source blacklist. * Also resets the bgsync thread so that it reconnects its sync source after the oplog has been * truncated. */ void truncateAndResetOplog(OperationContext* txn, ReplicationCoordinator* replCoord, BackgroundSync* bgsync) { // Add field to minvalid document to tell us to restart initial sync if we crash StorageInterface::get(txn)->setInitialSyncFlag(txn); AutoGetDb autoDb(txn, "local", MODE_X); massert(28585, "no local database found", autoDb.getDb()); invariant(txn->lockState()->isCollectionLockedForMode(rsOplogName, MODE_X)); // Note: the following order is important. // The bgsync thread uses an empty optime as a sentinel to know to wait // for initial sync; thus, we must // ensure the lastAppliedOpTime is empty before restarting the bgsync thread // via stop(). // We must clear the sync source blacklist after calling stop() // because the bgsync thread, while running, may update the blacklist. replCoord->resetMyLastOpTimes(); bgsync->stop(); bgsync->clearBuffer(txn); replCoord->clearSyncSourceBlacklist(); // Truncate the oplog in case there was a prior initial sync that failed. Collection* collection = autoDb.getDb()->getCollection(rsOplogName); fassert(28565, collection); MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { WriteUnitOfWork wunit(txn); Status status = collection->truncate(txn); fassert(28564, status); wunit.commit(); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "truncate", collection->ns().ns()); } bool _initialSyncClone(OperationContext* txn, Cloner& cloner, const std::string& host, const std::string& db, std::vector& collections, bool dataPass) { if (db == "local") return true; if (dataPass) log() << "initial sync cloning db: " << db; else log() << "initial sync cloning indexes for : " << db; CloneOptions options; options.fromDB = db; options.slaveOk = true; options.useReplAuth = true; options.snapshot = false; options.syncData = dataPass; options.syncIndexes = !dataPass; options.createCollections = false; // Make database stable ScopedTransaction transaction(txn, MODE_IX); Lock::DBLock dbWrite(txn->lockState(), db, MODE_X); Status status = cloner.copyDb(txn, db, host, options, nullptr, collections); if (!status.isOK()) { log() << "initial sync: error while " << (dataPass ? "cloning " : "indexing ") << db << ". " << status.toString(); return false; } if (dataPass && (db == "admin")) { fassertNoTrace(28619, checkAdminDatabase(txn, dbHolder().get(txn, db))); } return true; } /** * Replays the sync target's oplog from lastOp to the latest op on the sync target. * * @param syncer used to apply the oplog (from the reader). * @param r the oplog reader. * @return if applying the oplog succeeded. */ bool _initialSyncApplyOplog(OperationContext* txn, repl::InitialSync* syncer, OplogReader* r, BackgroundSync* bgsync) { const OpTime startOpTime = getGlobalReplicationCoordinator()->getMyLastAppliedOpTime(); BSONObj lastOp; // If the fail point is set, exit failing. if (MONGO_FAIL_POINT(failInitSyncWithBufferedEntriesLeft)) { log() << "adding fake oplog entry to buffer."; bgsync->pushTestOpToBuffer( txn, BSON("ts" << startOpTime.getTimestamp() << "t" << startOpTime.getTerm() << "v" << 1 << "op" << "n")); return false; } try { // It may have been a long time since we last used this connection to // query the oplog, depending on the size of the databases we needed to clone. // A common problem is that TCP keepalives are set too infrequent, and thus // our connection here is terminated by a firewall due to inactivity. // Solution is to increase the TCP keepalive frequency. lastOp = r->getLastOp(rsOplogName); } catch (SocketException&) { HostAndPort host = r->getHost(); log() << "connection lost to " << host.toString() << "; is your tcp keepalive interval set appropriately?"; if (!r->connect(host)) { error() << "initial sync couldn't connect to " << host.toString(); throw; } // retry lastOp = r->getLastOp(rsOplogName); } if (lastOp.isEmpty()) { error() << "initial sync lastOp is empty"; sleepsecs(1); return false; } OpTime stopOpTime = fassertStatusOK(28777, OpTime::parseFromOplogEntry(lastOp)); // If we already have what we need then return. if (stopOpTime == startOpTime) return true; verify(!stopOpTime.isNull()); verify(stopOpTime.getTimestamp() > startOpTime.getTimestamp()); // apply till stopOpTime try { LOG(2) << "Applying oplog entries from " << startOpTime << " until " << stopOpTime; syncer->oplogApplication(txn, stopOpTime); if (inShutdown()) { return false; } } catch (const DBException&) { warning() << "initial sync failed during oplog application phase, and will retry"; sleepsecs(5); return false; } return true; } /** * Do the initial sync for this member. There are several steps to this process: * * 0. Add _initialSyncFlag to minValid collection to tell us to restart initial sync if we * crash in the middle of this procedure * 1. Record start time. * 2. Clone. * 3. Set minValid1 to sync target's latest op time. * 4. Apply ops from start to minValid1, fetching missing docs as needed. * 5. Set minValid2 to sync target's latest op time. * 6. Apply ops from minValid1 to minValid2. * 7. Build indexes. * 8. Set minValid3 to sync target's latest op time. * 9. Apply ops from minValid2 to minValid3. 10. Cleanup minValid collection: remove _initialSyncFlag field, set ts to minValid3 OpTime * * At that point, initial sync is finished. Note that the oplog from the sync target is applied * three times: step 4, 6, and 8. 4 may involve refetching, 6 should not. By the end of 6, * this member should have consistent data. 8 is "cosmetic," it is only to get this member * closer to the latest op time before it can transition out of startup state * * Returns a Status with ErrorCode::ShutdownInProgress if the node enters shutdown, * ErrorCode::InitialSyncOplogSourceMissing if the node fails to find an sync source, Status::OK * if everything worked, and ErrorCode::InitialSyncFailure for all other error cases. */ Status _initialSync(BackgroundSync* bgsync) { log() << "initial sync pending"; const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext(); OperationContext& txn = *txnPtr; txn.setReplicatedWrites(false); DisableDocumentValidation validationDisabler(&txn); ReplicationCoordinator* replCoord(getGlobalReplicationCoordinator()); // reset state for initial sync truncateAndResetOplog(&txn, replCoord, bgsync); OplogReader r; std::size_t currentRetry = 0; while (r.getHost().empty()) { // We must prime the sync source selector so that it considers all candidates regardless // of oplog position, by passing in null OpTime as the last op fetched time. r.connectToSyncSource(&txn, OpTime(), replCoord); if (r.getHost().empty()) { std::string msg = "No valid sync source found in current replica set to do an initial sync."; if (++currentRetry >= kInitialSyncMaxConnectRetries) { return Status(ErrorCodes::InitialSyncOplogSourceMissing, msg); } LOG(1) << msg << ", retry " << currentRetry << " of " << kInitialSyncMaxConnectRetries; sleepsecs(1); } if (inShutdown()) { return Status(ErrorCodes::ShutdownInProgress, "shutting down"); } } InitialSync init(bgsync, multiInitialSyncApply); init.setHostname(r.getHost().toString()); BSONObj lastOp = r.getLastOp(rsOplogName); if (lastOp.isEmpty()) { std::string msg = "initial sync couldn't read remote oplog"; sleepsecs(15); return Status(ErrorCodes::InitialSyncFailure, msg); } log() << "initial sync drop all databases"; dropAllDatabasesExceptLocal(&txn); if (MONGO_FAIL_POINT(initialSyncHangBeforeCopyingDatabases)) { log() << "initial sync - initialSyncHangBeforeCopyingDatabases fail point enabled. " "Blocking until fail point is disabled."; while (MONGO_FAIL_POINT(initialSyncHangBeforeCopyingDatabases)) { mongo::sleepsecs(1); } } log() << "initial sync clone all databases"; list dbs = r.conn()->getDatabaseNames(); { // Clone admin database first, to catch schema errors. list::iterator admin = std::find(dbs.begin(), dbs.end(), "admin"); if (admin != dbs.end()) { dbs.splice(dbs.begin(), dbs, admin); } // Ignore local db dbs.erase(std::remove(dbs.begin(), dbs.end(), "local"), dbs.end()); } Cloner cloner; std::map> collectionsPerDb; for (auto&& db : dbs) { CloneOptions options; options.fromDB = db; log() << "fetching and creating collections for " << db; std::list initialCollections = r.conn()->getCollectionInfos( options.fromDB, ListCollectionsFilter::makeTypeCollectionFilter()); // may uassert auto fetchStatus = cloner.filterCollectionsForClone(options, initialCollections); if (!fetchStatus.isOK()) { return fetchStatus.getStatus(); } auto collections = fetchStatus.getValue(); ScopedTransaction transaction(&txn, MODE_IX); Lock::DBLock dbWrite(txn.lockState(), db, MODE_X); auto createStatus = cloner.createCollectionsForDb(&txn, collections, db); if (!createStatus.isOK()) { return createStatus; } collectionsPerDb.emplace(db, std::move(collections)); } for (auto&& dbCollsPair : collectionsPerDb) { if (!_initialSyncClone(&txn, cloner, r.conn()->getServerAddress(), dbCollsPair.first, dbCollsPair.second, true)) { return Status(ErrorCodes::InitialSyncFailure, "initial sync failed data cloning"); } } log() << "initial sync data copy, starting syncup"; // prime oplog, but don't need to actually apply the op as the cloned data already reflects it. fassertStatusOK( 40142, StorageInterface::get(&txn)->insertDocument(&txn, NamespaceString(rsOplogName), lastOp)); OpTime lastOptime = OplogEntry(lastOp).getOpTime(); ReplClientInfo::forClient(txn.getClient()).setLastOp(lastOptime); replCoord->setMyLastAppliedOpTime(lastOptime); setNewTimestamp(lastOptime.getTimestamp()); std::string msg = "oplog sync 1 of 3"; log() << msg; if (!_initialSyncApplyOplog(&txn, &init, &r, bgsync)) { return Status(ErrorCodes::InitialSyncFailure, str::stream() << "initial sync failed: " << msg); } // Now we sync to the latest op on the sync target _again_, as we may have recloned ops that // were "from the future" from the data clone. During this second application, nothing should // need to be recloned. // TODO: replace with "tail" instance below, since we don't need to retry/reclone missing docs. msg = "oplog sync 2 of 3"; log() << msg; if (!_initialSyncApplyOplog(&txn, &init, &r, bgsync)) { return Status(ErrorCodes::InitialSyncFailure, str::stream() << "initial sync failed: " << msg); } // data should now be consistent msg = "initial sync building indexes"; log() << msg; for (auto&& dbCollsPair : collectionsPerDb) { if (!_initialSyncClone(&txn, cloner, r.conn()->getServerAddress(), dbCollsPair.first, dbCollsPair.second, false)) { return Status(ErrorCodes::InitialSyncFailure, str::stream() << "initial sync failed: " << msg); } } // WARNING: If the 3rd oplog sync step is removed we must reset minValid to the last entry on // the source server so that we don't come out of recovering until we get there (since the // previous steps could have fetched newer document than the oplog entry we were applying from). msg = "oplog sync 3 of 3"; log() << msg; InitialSync tail(bgsync, multiSyncApply); // Use the non-initial sync apply code if (!_initialSyncApplyOplog(&txn, &tail, &r, bgsync)) { return Status(ErrorCodes::InitialSyncFailure, str::stream() << "initial sync failed: " << msg); } // --------- Status status = getGlobalAuthorizationManager()->initialize(&txn); if (!status.isOK()) { warning() << "Failed to reinitialize auth data after initial sync. " << status; return status; } log() << "initial sync finishing up"; // Initial sync is now complete. // Clear the initial sync flag -- cannot be done under a db lock, or recursive. StorageInterface::get(&txn)->clearInitialSyncFlag(&txn); // Clear maint. mode. while (replCoord->getMaintenanceMode()) { replCoord->setMaintenanceMode(false); } log() << "initial sync done"; return Status::OK(); } stdx::mutex _initialSyncMutex; const auto kMaxFailedAttempts = 10; const auto kInitialSyncRetrySleepDuration = Seconds{5}; } // namespace Status checkAdminDatabase(OperationContext* txn, Database* adminDb) { // Assumes txn holds MODE_X or MODE_S lock on "admin" database. if (!adminDb) { return Status::OK(); } Collection* const usersCollection = adminDb->getCollection(AuthorizationManager::usersCollectionNamespace); const bool hasUsers = usersCollection && !Helpers::findOne(txn, usersCollection, BSONObj(), false).isNull(); Collection* const adminVersionCollection = adminDb->getCollection(AuthorizationManager::versionCollectionNamespace); BSONObj authSchemaVersionDocument; if (!adminVersionCollection || !Helpers::findOne(txn, adminVersionCollection, AuthorizationManager::versionDocumentQuery, authSchemaVersionDocument)) { if (!hasUsers) { // It's OK to have no auth version document if there are no user documents. return Status::OK(); } std::string msg = str::stream() << "During initial sync, found documents in " << AuthorizationManager::usersCollectionNamespace.ns() << " but could not find an auth schema version document in " << AuthorizationManager::versionCollectionNamespace.ns() << ". " << "This indicates that the primary of this replica set was not successfully " "upgraded to schema version " << AuthorizationManager::schemaVersion26Final << ", which is the minimum supported schema version in this version of MongoDB"; return {ErrorCodes::AuthSchemaIncompatible, msg}; } long long foundSchemaVersion; Status status = bsonExtractIntegerField(authSchemaVersionDocument, AuthorizationManager::schemaVersionFieldName, &foundSchemaVersion); if (!status.isOK()) { std::string msg = str::stream() << "During initial sync, found malformed auth schema version document: " << status.toString() << "; document: " << authSchemaVersionDocument; return {ErrorCodes::AuthSchemaIncompatible, msg}; } if ((foundSchemaVersion != AuthorizationManager::schemaVersion26Final) && (foundSchemaVersion != AuthorizationManager::schemaVersion28SCRAM)) { std::string msg = str::stream() << "During initial sync, found auth schema version " << foundSchemaVersion << ", but this version of MongoDB only supports schema versions " << AuthorizationManager::schemaVersion26Final << " and " << AuthorizationManager::schemaVersion28SCRAM; return {ErrorCodes::AuthSchemaIncompatible, msg}; } return Status::OK(); } void syncDoInitialSync(ReplicationCoordinatorExternalState* replicationCoordinatorExternalState) { stdx::unique_lock lk(_initialSyncMutex, stdx::defer_lock); if (!lk.try_lock()) { uasserted(34474, "Initial Sync Already Active."); } std::unique_ptr bgsync; { log() << "Starting replication fetcher thread for initial sync"; auto txn = cc().makeOperationContext(); bgsync = stdx::make_unique( replicationCoordinatorExternalState, replicationCoordinatorExternalState->makeInitialSyncOplogBuffer(txn.get())); bgsync->startup(txn.get()); createOplog(txn.get()); } ON_BLOCK_EXIT([&bgsync]() { log() << "Stopping replication fetcher thread for initial sync"; auto txn = cc().makeOperationContext(); bgsync->shutdown(txn.get()); bgsync->join(txn.get()); }); int failedAttempts = 0; while (failedAttempts < kMaxFailedAttempts) { try { // leave loop when successful Status status = _initialSync(bgsync.get()); if (status.isOK()) { break; } else { error() << status; } } catch (const DBException& e) { error() << e; // Return if in shutdown if (inShutdown()) { return; } } if (inShutdown()) { return; } error() << "initial sync attempt failed, " << (kMaxFailedAttempts - ++failedAttempts) << " attempts remaining"; sleepmillis(durationCount(kInitialSyncRetrySleepDuration)); } // No need to print a stack if (failedAttempts >= kMaxFailedAttempts) { severe() << "The maximum number of retries have been exhausted for initial sync."; fassertFailedNoTrace(16233); } } } // namespace repl } // namespace mongo