diff options
Diffstat (limited to 'src/mongo/db/repl/all_database_cloner.cpp')
-rw-r--r-- | src/mongo/db/repl/all_database_cloner.cpp | 58 |
1 files changed, 54 insertions, 4 deletions
diff --git a/src/mongo/db/repl/all_database_cloner.cpp b/src/mongo/db/repl/all_database_cloner.cpp index c1ed1686fc9..53fbbe297b3 100644 --- a/src/mongo/db/repl/all_database_cloner.cpp +++ b/src/mongo/db/repl/all_database_cloner.cpp @@ -35,6 +35,8 @@ #include "mongo/base/string_data.h" #include "mongo/db/repl/all_database_cloner.h" +#include "mongo/db/repl/replication_consistency_markers_gen.h" +#include "mongo/db/repl/replication_consistency_markers_impl.h" #include "mongo/logv2/log.h" #include "mongo/util/assert_util.h" namespace mongo { @@ -47,10 +49,11 @@ AllDatabaseCloner::AllDatabaseCloner(InitialSyncSharedData* sharedData, ThreadPool* dbPool) : BaseCloner("AllDatabaseCloner"_sd, sharedData, source, client, storageInterface, dbPool), _connectStage("connect", this, &AllDatabaseCloner::connectStage), + _getInitialSyncIdStage("getInitialSyncId", this, &AllDatabaseCloner::getInitialSyncIdStage), _listDatabasesStage("listDatabases", this, &AllDatabaseCloner::listDatabasesStage) {} BaseCloner::ClonerStages AllDatabaseCloner::getStages() { - return {&_connectStage, &_listDatabasesStage}; + return {&_connectStage, &_getInitialSyncIdStage, &_listDatabasesStage}; } Status AllDatabaseCloner::ensurePrimaryOrSecondary( @@ -65,9 +68,12 @@ Status AllDatabaseCloner::ensurePrimaryOrSecondary( // There is a window during startup where a node has an invalid configuration and will have // an isMaster response the same as a removed node. So we must check to see if the node is // removed by checking local configuration. - auto otherNodes = - ReplicationCoordinator::get(getGlobalServiceContext())->getOtherNodesInReplSet(); - if (std::find(otherNodes.begin(), otherNodes.end(), getSource()) == otherNodes.end()) { + auto memberData = ReplicationCoordinator::get(getGlobalServiceContext())->getMemberData(); + auto syncSourceIter = std::find_if( + memberData.begin(), memberData.end(), [source = getSource()](const MemberData& member) { + return member.getHostAndPort() == source; + }); + if (syncSourceIter == memberData.end()) { Status status(ErrorCodes::NotMasterOrSecondary, str::stream() << "Sync source " << getSource() << " has been removed from the replication configuration."); @@ -76,6 +82,20 @@ Status AllDatabaseCloner::ensurePrimaryOrSecondary( getSharedData()->setInitialSyncStatusIfOK(lk, status); return status; } + + // We also check if the sync source has gone into initial sync itself. If so, we'll never be + // able to sync from it and we should abort the attempt. Because there is a window during + // startup where a node will report being in STARTUP2 even if it is not in initial sync, + // we also check to see if it has a sync source. A node in STARTUP2 will not have a sync + // source unless it is in initial sync. + if (syncSourceIter->getState().startup2() && !syncSourceIter->getSyncSource().empty()) { + Status status(ErrorCodes::NotMasterOrSecondary, + str::stream() << "Sync source " << getSource() << " has been resynced."); + stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData()); + // Setting the status in the shared data will cancel the initial sync. + getSharedData()->setInitialSyncStatusIfOK(lk, status); + return status; + } return Status(ErrorCodes::NotMasterOrSecondary, str::stream() << "Cannot connect because sync source " << getSource() << " is neither primary nor secondary."); @@ -99,6 +119,36 @@ BaseCloner::AfterStageBehavior AllDatabaseCloner::connectStage() { return kContinueNormally; } +BaseCloner::AfterStageBehavior AllDatabaseCloner::getInitialSyncIdStage() { + auto wireVersion = static_cast<WireVersion>(getClient()->getMaxWireVersion()); + { + stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData()); + getSharedData()->setSyncSourceWireVersion(lk, wireVersion); + } + + // Wire versions prior to resumable initial sync don't have a sync source id. + if (wireVersion < WireVersion::RESUMABLE_INITIAL_SYNC) + return kContinueNormally; + auto initialSyncId = getClient()->findOne( + ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(), Query()); + // TODO(SERVER-47150): Remove this "if" and allow the uassert to fail. + if (initialSyncId.isEmpty()) { + stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData()); + getSharedData()->setInitialSyncSourceId(lk, UUID::gen()); + return kContinueNormally; + } + uassert(ErrorCodes::InitialSyncFailure, + "Cannot retrieve sync source initial sync ID", + !initialSyncId.isEmpty()); + InitialSyncIdDocument initialSyncIdDoc = + InitialSyncIdDocument::parse(IDLParserErrorContext("initialSyncId"), initialSyncId); + { + stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData()); + getSharedData()->setInitialSyncSourceId(lk, initialSyncIdDoc.get_id()); + } + return kContinueNormally; +} + BaseCloner::AfterStageBehavior AllDatabaseCloner::listDatabasesStage() { BSONObj res; auto databasesArray = getClient()->getDatabaseInfos(BSONObj(), true /* nameOnly */); |