diff options
Diffstat (limited to 'src/mongo/db/repl')
27 files changed, 509 insertions, 140 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index dbe257c01b8..b1aa28819d9 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -629,10 +629,17 @@ env.Library( ], ) +env.Library('member_data', + [ + 'member_data.cpp', + ], + LIBDEPS=[ + 'replica_set_messages', + ]) + env.Library('topology_coordinator', [ 'heartbeat_response_action.cpp', - 'member_data.cpp', 'topology_coordinator.cpp', env.Idlc('topology_coordinator.idl')[0], ], @@ -641,6 +648,7 @@ env.Library('topology_coordinator', '$BUILD_DIR/mongo/rpc/metadata', '$BUILD_DIR/mongo/util/fail_point', 'isself', + 'member_data', 'replica_set_messages', 'repl_settings', 'rslog', @@ -939,6 +947,8 @@ env.Library( LIBDEPS = [ 'task_runner', 'initial_sync_shared_data', + 'member_data', + 'replication_consistency_markers_impl', '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/client/clientdriver_network', '$BUILD_DIR/mongo/util/concurrency/thread_pool', diff --git a/src/mongo/db/repl/all_database_cloner.cpp b/src/mongo/db/repl/all_database_cloner.cpp index c1ed1686fc9..53fbbe297b3 100644 --- a/src/mongo/db/repl/all_database_cloner.cpp +++ b/src/mongo/db/repl/all_database_cloner.cpp @@ -35,6 +35,8 @@ #include "mongo/base/string_data.h" #include "mongo/db/repl/all_database_cloner.h" +#include "mongo/db/repl/replication_consistency_markers_gen.h" +#include "mongo/db/repl/replication_consistency_markers_impl.h" #include "mongo/logv2/log.h" #include "mongo/util/assert_util.h" namespace mongo { @@ -47,10 +49,11 @@ AllDatabaseCloner::AllDatabaseCloner(InitialSyncSharedData* sharedData, ThreadPool* dbPool) : BaseCloner("AllDatabaseCloner"_sd, sharedData, source, client, storageInterface, dbPool), _connectStage("connect", this, &AllDatabaseCloner::connectStage), + _getInitialSyncIdStage("getInitialSyncId", this, &AllDatabaseCloner::getInitialSyncIdStage), _listDatabasesStage("listDatabases", this, &AllDatabaseCloner::listDatabasesStage) {} BaseCloner::ClonerStages AllDatabaseCloner::getStages() { - return {&_connectStage, &_listDatabasesStage}; + return {&_connectStage, &_getInitialSyncIdStage, &_listDatabasesStage}; } Status AllDatabaseCloner::ensurePrimaryOrSecondary( @@ -65,9 +68,12 @@ Status AllDatabaseCloner::ensurePrimaryOrSecondary( // There is a window during startup where a node has an invalid configuration and will have // an isMaster response the same as a removed node. So we must check to see if the node is // removed by checking local configuration. - auto otherNodes = - ReplicationCoordinator::get(getGlobalServiceContext())->getOtherNodesInReplSet(); - if (std::find(otherNodes.begin(), otherNodes.end(), getSource()) == otherNodes.end()) { + auto memberData = ReplicationCoordinator::get(getGlobalServiceContext())->getMemberData(); + auto syncSourceIter = std::find_if( + memberData.begin(), memberData.end(), [source = getSource()](const MemberData& member) { + return member.getHostAndPort() == source; + }); + if (syncSourceIter == memberData.end()) { Status status(ErrorCodes::NotMasterOrSecondary, str::stream() << "Sync source " << getSource() << " has been removed from the replication configuration."); @@ -76,6 +82,20 @@ Status AllDatabaseCloner::ensurePrimaryOrSecondary( getSharedData()->setInitialSyncStatusIfOK(lk, status); return status; } + + // We also check if the sync source has gone into initial sync itself. If so, we'll never be + // able to sync from it and we should abort the attempt. Because there is a window during + // startup where a node will report being in STARTUP2 even if it is not in initial sync, + // we also check to see if it has a sync source. A node in STARTUP2 will not have a sync + // source unless it is in initial sync. + if (syncSourceIter->getState().startup2() && !syncSourceIter->getSyncSource().empty()) { + Status status(ErrorCodes::NotMasterOrSecondary, + str::stream() << "Sync source " << getSource() << " has been resynced."); + stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData()); + // Setting the status in the shared data will cancel the initial sync. + getSharedData()->setInitialSyncStatusIfOK(lk, status); + return status; + } return Status(ErrorCodes::NotMasterOrSecondary, str::stream() << "Cannot connect because sync source " << getSource() << " is neither primary nor secondary."); @@ -99,6 +119,36 @@ BaseCloner::AfterStageBehavior AllDatabaseCloner::connectStage() { return kContinueNormally; } +BaseCloner::AfterStageBehavior AllDatabaseCloner::getInitialSyncIdStage() { + auto wireVersion = static_cast<WireVersion>(getClient()->getMaxWireVersion()); + { + stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData()); + getSharedData()->setSyncSourceWireVersion(lk, wireVersion); + } + + // Wire versions prior to resumable initial sync don't have a sync source id. + if (wireVersion < WireVersion::RESUMABLE_INITIAL_SYNC) + return kContinueNormally; + auto initialSyncId = getClient()->findOne( + ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(), Query()); + // TODO(SERVER-47150): Remove this "if" and allow the uassert to fail. + if (initialSyncId.isEmpty()) { + stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData()); + getSharedData()->setInitialSyncSourceId(lk, UUID::gen()); + return kContinueNormally; + } + uassert(ErrorCodes::InitialSyncFailure, + "Cannot retrieve sync source initial sync ID", + !initialSyncId.isEmpty()); + InitialSyncIdDocument initialSyncIdDoc = + InitialSyncIdDocument::parse(IDLParserErrorContext("initialSyncId"), initialSyncId); + { + stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData()); + getSharedData()->setInitialSyncSourceId(lk, initialSyncIdDoc.get_id()); + } + return kContinueNormally; +} + BaseCloner::AfterStageBehavior AllDatabaseCloner::listDatabasesStage() { BSONObj res; auto databasesArray = getClient()->getDatabaseInfos(BSONObj(), true /* nameOnly */); diff --git a/src/mongo/db/repl/all_database_cloner.h b/src/mongo/db/repl/all_database_cloner.h index 5436b6172ce..8c6a264e906 100644 --- a/src/mongo/db/repl/all_database_cloner.h +++ b/src/mongo/db/repl/all_database_cloner.h @@ -69,7 +69,7 @@ private: public: ConnectStage(std::string name, AllDatabaseCloner* cloner, ClonerRunFn stageFunc) : ClonerStage<AllDatabaseCloner>(name, cloner, stageFunc){}; - virtual bool checkRollBackIdOnRetry() { + bool checkSyncSourceValidityOnRetry() final { return false; } }; @@ -103,6 +103,11 @@ private: AfterStageBehavior connectStage(); /** + * Stage function that gets the wire version and initial sync ID. + */ + AfterStageBehavior getInitialSyncIdStage(); + + /** * Stage function that retrieves database information from the sync source. */ AfterStageBehavior listDatabasesStage(); @@ -128,6 +133,7 @@ private: // (MX) Write access with mutex from main flow of control, read access with mutex from other // threads, read access allowed from main flow without mutex. ConnectStage _connectStage; // (R) + ConnectStage _getInitialSyncIdStage; // (R) ClonerStage<AllDatabaseCloner> _listDatabasesStage; // (R) std::vector<std::string> _databases; // (X) std::unique_ptr<DatabaseCloner> _currentDatabaseCloner; // (MX) diff --git a/src/mongo/db/repl/all_database_cloner_test.cpp b/src/mongo/db/repl/all_database_cloner_test.cpp index 10507e1bb85..2da6f3831cb 100644 --- a/src/mongo/db/repl/all_database_cloner_test.cpp +++ b/src/mongo/db/repl/all_database_cloner_test.cpp @@ -33,6 +33,7 @@ #include "mongo/db/repl/all_database_cloner.h" #include "mongo/db/repl/cloner_test_fixture.h" +#include "mongo/db/repl/replication_consistency_markers_impl.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/service_context_test_fixture.h" @@ -293,6 +294,112 @@ TEST_F(AllDatabaseClonerTest, RetriesListDatabasesButRollBackIdChanges) { ASSERT_EQ(Minutes(60), _sharedData->getTotalTimeUnreachable(WithLock::withoutLock())); } +TEST_F(AllDatabaseClonerTest, RetriesListDatabasesButSourceNodeIsDowngraded) { + _mockClient->setWireVersions(WireVersion::RESUMABLE_INITIAL_SYNC, + WireVersion::RESUMABLE_INITIAL_SYNC); + auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); + _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); + _mockServer->setCommandReply("listDatabases", fromjson("{ok:1, databases:[]}")); + + // Stop at the listDatabases stage. + auto timesEnteredBeforeStage = beforeStageFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}")); + + auto cloner = makeAllDatabaseCloner(); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + ASSERT_NOT_OK(cloner->run()); + }); + + // Wait until we get to the listDatabases stage. + beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1); + + // Bring the server down. + _mockServer->shutdown(); + + auto beforeRBIDFailPoint = + globalFailPointRegistry().find("hangBeforeCheckingRollBackIdClonerStage"); + auto timesEnteredRBID = beforeRBIDFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}")); + beforeStageFailPoint->setMode(FailPoint::off, 0); + beforeRBIDFailPoint->waitForTimesEntered(timesEnteredRBID + 1); + _clock.advance(Minutes(60)); + + // Bring the server up, but change the wire version to an older one. + unittest::log() << "Bringing mock server back up."; + _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS, + WireVersion::SHARDED_TRANSACTIONS); + _mockServer->reboot(); + + // Allow the cloner to finish. + beforeRBIDFailPoint->setMode(FailPoint::off, 0); + clonerThread.join(); + + // Total retries and outage time should be available. + ASSERT_EQ(0, _sharedData->getRetryingOperationsCount(WithLock::withoutLock())); + ASSERT_EQ(1, _sharedData->getTotalRetries(WithLock::withoutLock())); + ASSERT_EQ(Minutes(60), _sharedData->getTotalTimeUnreachable(WithLock::withoutLock())); +} + +TEST_F(AllDatabaseClonerTest, RetriesListDatabasesButInitialSyncIdChanges) { + // Initial Sync Ids are not checked before wire version RESUMABLE_INITIAL_SYNC. + _mockClient->setWireVersions(WireVersion::RESUMABLE_INITIAL_SYNC, + WireVersion::RESUMABLE_INITIAL_SYNC); + auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); + _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); + _mockServer->setCommandReply("listDatabases", fromjson("{ok:1, databases:[]}")); + + // Stop at the listDatabases stage. + auto timesEnteredBeforeStage = beforeStageFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}")); + + auto cloner = makeAllDatabaseCloner(); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + ASSERT_NOT_OK(cloner->run()); + }); + + // Wait until we get to the listDatabases stage. + beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1); + + // Bring the server down. + _mockServer->shutdown(); + + auto beforeRBIDFailPoint = + globalFailPointRegistry().find("hangBeforeCheckingRollBackIdClonerStage"); + auto timesEnteredRBID = beforeRBIDFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}")); + beforeStageFailPoint->setMode(FailPoint::off, 0); + beforeRBIDFailPoint->waitForTimesEntered(timesEnteredRBID + 1); + _clock.advance(Minutes(60)); + + // Bring the server up. + unittest::log() << "Bringing mock server back up."; + _mockServer->reboot(); + + // Clear and change the initial sync ID + _mockServer->remove( + ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(), + Query(), + 0 /* ignored flags */); + _mockServer->insert( + ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(), + BSON("_id" << UUID::gen())); + + // Allow the cloner to finish. + beforeRBIDFailPoint->setMode(FailPoint::off, 0); + clonerThread.join(); + + // Total retries and outage time should be available. + ASSERT_EQ(0, _sharedData->getRetryingOperationsCount(WithLock::withoutLock())); + ASSERT_EQ(1, _sharedData->getTotalRetries(WithLock::withoutLock())); + ASSERT_EQ(Minutes(60), _sharedData->getTotalTimeUnreachable(WithLock::withoutLock())); +} + TEST_F(AllDatabaseClonerTest, RetriesListDatabasesButTimesOut) { auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); diff --git a/src/mongo/db/repl/base_cloner.cpp b/src/mongo/db/repl/base_cloner.cpp index 1e1fdc45a19..02f8b1af48a 100644 --- a/src/mongo/db/repl/base_cloner.cpp +++ b/src/mongo/db/repl/base_cloner.cpp @@ -32,6 +32,8 @@ #include "mongo/platform/basic.h" #include "mongo/db/repl/base_cloner.h" +#include "mongo/db/repl/replication_consistency_markers_gen.h" +#include "mongo/db/repl/replication_consistency_markers_impl.h" #include "mongo/logv2/log.h" #include "mongo/util/scopeguard.h" @@ -183,6 +185,54 @@ void BaseCloner::clearRetryingState() { _retryableOp = boost::none; } +Status BaseCloner::checkSyncSourceIsStillValid() { + WireVersion wireVersion; + { + stdx::lock_guard<InitialSyncSharedData> lk(*_sharedData); + auto wireVersionOpt = _sharedData->getSyncSourceWireVersion(lk); + // The wire version should always have been set by the time this is called. + invariant(wireVersionOpt); + wireVersion = *wireVersionOpt; + } + if (wireVersion >= WireVersion::RESUMABLE_INITIAL_SYNC) { + auto status = checkInitialSyncIdIsUnchanged(); + if (!status.isOK()) + return status; + } + return checkRollBackIdIsUnchanged(); +} + +Status BaseCloner::checkInitialSyncIdIsUnchanged() { + uassert(ErrorCodes::InitialSyncFailure, + "Sync source was downgraded and no longer supports resumable initial sync", + getClient()->getMaxWireVersion() >= WireVersion::RESUMABLE_INITIAL_SYNC); + BSONObj initialSyncId; + try { + initialSyncId = getClient()->findOne( + ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(), Query()); + } catch (DBException& e) { + if (ErrorCodes::isRetriableError(e)) { + auto status = e.toStatus().withContext( + ": failed while attempting to retrieve initial sync ID after re-connect"); + LOGV2_DEBUG( + 4608505, 1, "Retrieving Initial Sync ID retriable error", "error"_attr = status); + return status; + } + throw; + } + uassert(ErrorCodes::InitialSyncFailure, + "Cannot retrieve sync source initial sync ID", + !initialSyncId.isEmpty()); + InitialSyncIdDocument initialSyncIdDoc = + InitialSyncIdDocument::parse(IDLParserErrorContext("initialSyncId"), initialSyncId); + + stdx::lock_guard<InitialSyncSharedData> lk(*_sharedData); + uassert(ErrorCodes::InitialSyncFailure, + "Sync source has been resynced since we started syncing from it", + _sharedData->getInitialSyncSourceId(lk) == initialSyncIdDoc.get_id()); + return Status::OK(); +} + Status BaseCloner::checkRollBackIdIsUnchanged() { BSONObj info; try { @@ -267,12 +317,13 @@ BaseCloner::AfterStageBehavior BaseCloner::runStageWithRetries(BaseClonerStage* } }, isThisStageFailPoint); - if (stage->checkRollBackIdOnRetry()) { - // If checkRollBackIdIsUnchanged fails without throwing, it means a network + if (stage->checkSyncSourceValidityOnRetry()) { + // If checkSyncSourceIsStillValid fails without throwing, it means a network // error occurred and it's safe to continue (which will cause another retry). - if (!checkRollBackIdIsUnchanged().isOK()) + if (!checkSyncSourceIsStillValid().isOK()) continue; - // After successfully checking the rollback ID, the client should always be OK. + // After successfully checking the sync source validity, the client should + // always be OK. invariant(!getClient()->isFailed()); } } diff --git a/src/mongo/db/repl/base_cloner.h b/src/mongo/db/repl/base_cloner.h index 38079eeebac..d34597b332f 100644 --- a/src/mongo/db/repl/base_cloner.h +++ b/src/mongo/db/repl/base_cloner.h @@ -112,11 +112,13 @@ protected: } /** - * Returns true if the rollback ID should be checked before retrying. - * This is provided because the "connect" stage must complete successfully - * before checking rollback ID. + * Returns true if the sync source validity should be checked before retrying. + * This includes checking the sync source member state, checking the rollback ID, + * and checking the sync source initial sync ID. + * This method is provided because early stages which connect and collect + * the initial sync ID must complete successfully before checking sync source validity. */ - virtual bool checkRollBackIdOnRetry() { + virtual bool checkSyncSourceValidityOnRetry() { return true; } @@ -226,12 +228,25 @@ private: AfterStageBehavior runStageWithRetries(BaseClonerStage* stage); /** + * Make sure the initial sync ID on the sync source has not changed. Throws an exception + * if it has. Returns a not-OK status if a network error occurs. + */ + Status checkInitialSyncIdIsUnchanged(); + + /** * Make sure the rollback ID has not changed. Throws an exception if it has. Returns * a not-OK status if a network error occurs. */ Status checkRollBackIdIsUnchanged(); /** + * Does validity checks on the sync source. If the sync source is now no longer usable, + * throws an exception. Returns a not-OK status if a network error occurs or if the sync + * source is temporarily unusable (e.g. restarting). + */ + Status checkSyncSourceIsStillValid(); + + /** * Supports pausing at certain stages for the initial sync fuzzer test framework. */ void pauseForFuzzer(BaseClonerStage* stage); diff --git a/src/mongo/db/repl/cloner_test_fixture.cpp b/src/mongo/db/repl/cloner_test_fixture.cpp index 0a6191fc171..e588748e7cc 100644 --- a/src/mongo/db/repl/cloner_test_fixture.cpp +++ b/src/mongo/db/repl/cloner_test_fixture.cpp @@ -31,6 +31,7 @@ #include "mongo/db/clientcursor.h" #include "mongo/db/repl/cloner_test_fixture.h" +#include "mongo/db/repl/replication_consistency_markers_impl.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/service_context_test_fixture.h" @@ -74,6 +75,11 @@ void ClonerTestFixture::setUp() { // Required by CollectionCloner::listIndexesStage() and IndexBuildsCoordinator. getServiceContext()->setStorageEngine(std::make_unique<StorageEngineMock>()); + + // Set the initial sync ID on the mock server. + _mockServer->insert( + ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(), + BSON("_id" << _initialSyncId)); } void ClonerTestFixture::tearDown() { @@ -84,5 +90,11 @@ void ClonerTestFixture::tearDown() { unittest::Test::tearDown(); } +void ClonerTestFixture::setInitialSyncId() { + stdx::lock_guard<InitialSyncSharedData> lk(*_sharedData); + _sharedData->setSyncSourceWireVersion(lk, WireVersion::RESUMABLE_INITIAL_SYNC); + _sharedData->setInitialSyncSourceId(lk, _initialSyncId); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/cloner_test_fixture.h b/src/mongo/db/repl/cloner_test_fixture.h index 726c107c273..6dddb4162da 100644 --- a/src/mongo/db/repl/cloner_test_fixture.h +++ b/src/mongo/db/repl/cloner_test_fixture.h @@ -56,6 +56,8 @@ protected: void tearDown() override; + void setInitialSyncId(); + StorageInterfaceMock _storageInterface; HostAndPort _source; std::unique_ptr<ThreadPool> _dbWorkThreadPool; @@ -63,6 +65,7 @@ protected: std::unique_ptr<DBClientConnection> _mockClient; std::unique_ptr<InitialSyncSharedData> _sharedData; ClockSourceMock _clock; + UUID _initialSyncId = UUID::gen(); private: static constexpr int kInitialRollbackId = 1; diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index 5fba71c2557..0527c205a39 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -59,7 +59,7 @@ public: CollectionClonerTest() {} protected: - void setUp() final { + void setUp() override { ClonerTestFixture::setUp(); _collectionStats = std::make_shared<CollectionMockStats>(); _standardCreateCollectionFn = [this](const NamespaceString& nss, @@ -125,7 +125,25 @@ protected: << "b_1")}; }; -TEST_F(CollectionClonerTest, CountStage) { +class CollectionClonerTestResumable : public CollectionClonerTest { + void setUp() final { + CollectionClonerTest::setUp(); + setInitialSyncId(); + } +}; + +class CollectionClonerTestNonResumable : public CollectionClonerTest { + void setUp() final { + CollectionClonerTest::setUp(); + // Set client wireVersion to 4.2, where we do not yet support resumable cloning. + _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS, + WireVersion::SHARDED_TRANSACTIONS); + stdx::lock_guard<InitialSyncSharedData> lk(*_sharedData); + _sharedData->setSyncSourceWireVersion(lk, WireVersion::SHARDED_TRANSACTIONS); + } +}; + +TEST_F(CollectionClonerTestResumable, CountStage) { auto cloner = makeCollectionCloner(); cloner->setStopAfterStage_forTest("count"); _mockServer->setCommandReply("count", createCountResponse(100)); @@ -134,7 +152,7 @@ TEST_F(CollectionClonerTest, CountStage) { } // On a negative count, the CollectionCloner should use a zero count. -TEST_F(CollectionClonerTest, CountStageNegativeCount) { +TEST_F(CollectionClonerTestResumable, CountStageNegativeCount) { auto cloner = makeCollectionCloner(); cloner->setStopAfterStage_forTest("count"); _mockServer->setCommandReply("count", createCountResponse(-100)); @@ -143,19 +161,21 @@ TEST_F(CollectionClonerTest, CountStageNegativeCount) { } // On NamespaceNotFound, the CollectionCloner should exit without doing anything. -TEST_F(CollectionClonerTest, CountStageNamespaceNotFound) { +TEST_F(CollectionClonerTestResumable, CountStageNamespaceNotFound) { auto cloner = makeCollectionCloner(); _mockServer->setCommandReply("count", Status(ErrorCodes::NamespaceNotFound, "NoSuchUuid")); ASSERT_OK(cloner->run()); } -TEST_F(CollectionClonerTest, CollectionClonerPassesThroughNonRetriableErrorFromCountCommand) { +TEST_F(CollectionClonerTestResumable, + CollectionClonerPassesThroughNonRetriableErrorFromCountCommand) { auto cloner = makeCollectionCloner(); _mockServer->setCommandReply("count", Status(ErrorCodes::OperationFailed, "")); ASSERT_EQUALS(ErrorCodes::OperationFailed, cloner->run()); } -TEST_F(CollectionClonerTest, CollectionClonerPassesThroughCommandStatusErrorFromCountCommand) { +TEST_F(CollectionClonerTestResumable, + CollectionClonerPassesThroughCommandStatusErrorFromCountCommand) { auto cloner = makeCollectionCloner(); _mockServer->setCommandReply("count", Status(ErrorCodes::OperationFailed, "")); _mockServer->setCommandReply("count", @@ -167,7 +187,8 @@ TEST_F(CollectionClonerTest, CollectionClonerPassesThroughCommandStatusErrorFrom ASSERT_STRING_CONTAINS(status.reason(), "TEST error"); } -TEST_F(CollectionClonerTest, CollectionClonerReturnsNoSuchKeyOnMissingDocumentCountFieldName) { +TEST_F(CollectionClonerTestResumable, + CollectionClonerReturnsNoSuchKeyOnMissingDocumentCountFieldName) { auto cloner = makeCollectionCloner(); cloner->setStopAfterStage_forTest("count"); _mockServer->setCommandReply("count", BSON("ok" << 1)); @@ -175,7 +196,7 @@ TEST_F(CollectionClonerTest, CollectionClonerReturnsNoSuchKeyOnMissingDocumentCo ASSERT_EQUALS(ErrorCodes::NoSuchKey, status); } -TEST_F(CollectionClonerTest, ListIndexesReturnedNoIndexes) { +TEST_F(CollectionClonerTestResumable, ListIndexesReturnedNoIndexes) { auto cloner = makeCollectionCloner(); cloner->setStopAfterStage_forTest("listIndexes"); _mockServer->setCommandReply("count", createCountResponse(1)); @@ -187,7 +208,7 @@ TEST_F(CollectionClonerTest, ListIndexesReturnedNoIndexes) { } // NamespaceNotFound is treated the same as no index. -TEST_F(CollectionClonerTest, ListIndexesReturnedNamespaceNotFound) { +TEST_F(CollectionClonerTestResumable, ListIndexesReturnedNamespaceNotFound) { auto cloner = makeCollectionCloner(); _mockServer->setCommandReply("count", createCountResponse(1)); _mockServer->setCommandReply("listIndexes", @@ -199,7 +220,7 @@ TEST_F(CollectionClonerTest, ListIndexesReturnedNamespaceNotFound) { ASSERT_EQ(0, cloner->getStats().indexes); } -TEST_F(CollectionClonerTest, ListIndexesHasResults) { +TEST_F(CollectionClonerTestResumable, ListIndexesHasResults) { auto cloner = makeCollectionCloner(); cloner->setStopAfterStage_forTest("listIndexes"); _mockServer->setCommandReply("count", createCountResponse(1)); @@ -216,7 +237,7 @@ TEST_F(CollectionClonerTest, ListIndexesHasResults) { ASSERT_EQ(3, cloner->getStats().indexes); } -TEST_F(CollectionClonerTest, CollectionClonerResendsListIndexesCommandOnRetriableError) { +TEST_F(CollectionClonerTestResumable, CollectionClonerResendsListIndexesCommandOnRetriableError) { auto cloner = makeCollectionCloner(); cloner->setStopAfterStage_forTest("listIndexes"); _mockServer->setCommandReply("count", createCountResponse(1)); @@ -233,7 +254,7 @@ TEST_F(CollectionClonerTest, CollectionClonerResendsListIndexesCommandOnRetriabl ASSERT_EQ(2, cloner->getStats().indexes); } -TEST_F(CollectionClonerTest, BeginCollection) { +TEST_F(CollectionClonerTestResumable, BeginCollection) { NamespaceString collNss; CollectionOptions collOptions; BSONObj collIdIndexSpec; @@ -270,7 +291,7 @@ TEST_F(CollectionClonerTest, BeginCollection) { } } -TEST_F(CollectionClonerTest, BeginCollectionFailed) { +TEST_F(CollectionClonerTestResumable, BeginCollectionFailed) { _storageInterface.createCollectionForBulkFn = [&](const NamespaceString& theNss, const CollectionOptions& theOptions, const BSONObj idIndexSpec, @@ -285,7 +306,7 @@ TEST_F(CollectionClonerTest, BeginCollectionFailed) { ASSERT_EQUALS(ErrorCodes::OperationFailed, cloner->run()); } -TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) { +TEST_F(CollectionClonerTestResumable, InsertDocumentsSingleBatch) { // Set up data for preliminary stages _mockServer->setCommandReply("count", createCountResponse(2)); _mockServer->setCommandReply("listIndexes", @@ -305,7 +326,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) { ASSERT_EQUALS(1u, stats.receivedBatches); } -TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) { +TEST_F(CollectionClonerTestResumable, InsertDocumentsMultipleBatches) { // Set up data for preliminary stages _mockServer->setCommandReply("count", createCountResponse(2)); _mockServer->setCommandReply("listIndexes", @@ -327,7 +348,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) { ASSERT_EQUALS(2u, stats.receivedBatches); } -TEST_F(CollectionClonerTest, InsertDocumentsScheduleDBWorkFailed) { +TEST_F(CollectionClonerTestResumable, InsertDocumentsScheduleDBWorkFailed) { // Set up data for preliminary stages _mockServer->setCommandReply("count", createCountResponse(2)); _mockServer->setCommandReply("listIndexes", @@ -364,7 +385,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsScheduleDBWorkFailed) { clonerThread.join(); } -TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) { +TEST_F(CollectionClonerTestResumable, InsertDocumentsCallbackCanceled) { // Set up data for preliminary stages _mockServer->setCommandReply("count", createCountResponse(2)); _mockServer->setCommandReply("listIndexes", @@ -407,7 +428,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) { clonerThread.join(); } -TEST_F(CollectionClonerTest, InsertDocumentsFailed) { +TEST_F(CollectionClonerTestResumable, InsertDocumentsFailed) { // Set up data for preliminary stages _mockServer->setCommandReply("count", createCountResponse(2)); _mockServer->setCommandReply("listIndexes", @@ -447,7 +468,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsFailed) { clonerThread.join(); } -TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) { +TEST_F(CollectionClonerTestResumable, DoNotCreateIDIndexIfAutoIndexIdUsed) { NamespaceString collNss; CollectionOptions collOptions; // We initialize collIndexSpecs with fake information to ensure it is overwritten by an empty @@ -482,11 +503,8 @@ TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) { ASSERT_EQ(collNss, _nss); } -TEST_F(CollectionClonerTest, NonResumableQuerySuccess) { +TEST_F(CollectionClonerTestNonResumable, NonResumableQuerySuccess) { // Set client wireVersion to 4.2, where we do not yet support resumable cloning. - _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS, - WireVersion::SHARDED_TRANSACTIONS); - // Set up data for preliminary stages auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" << "_id_"); @@ -509,11 +527,7 @@ TEST_F(CollectionClonerTest, NonResumableQuerySuccess) { ASSERT_EQUALS(3u, stats.documentsCopied); } -TEST_F(CollectionClonerTest, NonResumableQueryFailure) { - // Set client wireVersion to 4.2, where we do not yet support resumable cloning. - _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS, - WireVersion::SHARDED_TRANSACTIONS); - +TEST_F(CollectionClonerTestNonResumable, NonResumableQueryFailure) { // Set up data for preliminary stages auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" << "_id_"); @@ -553,7 +567,7 @@ TEST_F(CollectionClonerTest, NonResumableQueryFailure) { } // We will retry our query without having yet obtained a resume token. -TEST_F(CollectionClonerTest, ResumableQueryFailTransientlyBeforeFirstBatchRetrySuccess) { +TEST_F(CollectionClonerTestResumable, ResumableQueryFailTransientlyBeforeFirstBatchRetrySuccess) { _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); // Set up data for preliminary stages @@ -615,7 +629,7 @@ TEST_F(CollectionClonerTest, ResumableQueryFailTransientlyBeforeFirstBatchRetryS } // We will resume our query using the resume token we stored after receiving the first batch. -TEST_F(CollectionClonerTest, ResumableQueryFailTransientlyAfterFirstBatchRetrySuccess) { +TEST_F(CollectionClonerTestResumable, ResumableQueryFailTransientlyAfterFirstBatchRetrySuccess) { _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); // Set up data for preliminary stages @@ -671,7 +685,7 @@ TEST_F(CollectionClonerTest, ResumableQueryFailTransientlyAfterFirstBatchRetrySu ASSERT_EQUALS(5u, stats.documentsCopied); } -TEST_F(CollectionClonerTest, ResumableQueryNonRetriableError) { +TEST_F(CollectionClonerTestResumable, ResumableQueryNonRetriableError) { _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); // Set up data for preliminary stages @@ -715,7 +729,8 @@ TEST_F(CollectionClonerTest, ResumableQueryNonRetriableError) { clonerThread.join(); } -TEST_F(CollectionClonerTest, ResumableQueryFailNonTransientlyAfterProgressMadeCannotRetry) { +TEST_F(CollectionClonerTestResumable, + ResumableQueryFailNonTransientlyAfterProgressMadeCannotRetry) { _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); // Set up data for preliminary stages @@ -760,7 +775,7 @@ TEST_F(CollectionClonerTest, ResumableQueryFailNonTransientlyAfterProgressMadeCa } // We retry the query after a transient error and we immediately encounter a non-retriable one. -TEST_F(CollectionClonerTest, ResumableQueryNonTransientErrorAtRetry) { +TEST_F(CollectionClonerTestResumable, ResumableQueryNonTransientErrorAtRetry) { _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); // Set up data for preliminary stages @@ -821,7 +836,7 @@ TEST_F(CollectionClonerTest, ResumableQueryNonTransientErrorAtRetry) { // We retry the query after a transient error, we make a bit more progress and then we encounter // a non-retriable one. -TEST_F(CollectionClonerTest, ResumableQueryNonTransientErrorAfterPastRetry) { +TEST_F(CollectionClonerTestResumable, ResumableQueryNonTransientErrorAfterPastRetry) { _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); // Set up data for preliminary stages @@ -889,7 +904,7 @@ TEST_F(CollectionClonerTest, ResumableQueryNonTransientErrorAfterPastRetry) { // We resume a query, receive some more data, then get a transient error again. The goal of this // test is to make sure we don't forget to request the _next_ resume token when resuming a query. -TEST_F(CollectionClonerTest, ResumableQueryTwoResumes) { +TEST_F(CollectionClonerTestResumable, ResumableQueryTwoResumes) { /** * Test runs like so: @@ -989,11 +1004,7 @@ TEST_F(CollectionClonerTest, ResumableQueryTwoResumes) { // We receive a QueryPlanKilled error, then a NamespaceNotFound error, indicating that the // collection no longer exists in the database. -TEST_F(CollectionClonerTest, NonResumableCursorErrorDropOK) { - // Set client wireVersion to 4.2, where we do not yet support resumable cloning. - _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS, - WireVersion::SHARDED_TRANSACTIONS); - +TEST_F(CollectionClonerTestNonResumable, NonResumableCursorErrorDropOK) { // Set up data for preliminary stages auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" << "_id_"); @@ -1042,11 +1053,7 @@ TEST_F(CollectionClonerTest, NonResumableCursorErrorDropOK) { // We receive an OperationFailed error, but the next error we receive is _not_ NamespaceNotFound, // which means the collection still exists in the database, but we cannot resume the query. -TEST_F(CollectionClonerTest, NonResumableCursorErrorThenOtherError) { - // Set client wireVersion to 4.2, where we do not yet support resumable cloning. - _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS, - WireVersion::SHARDED_TRANSACTIONS); - +TEST_F(CollectionClonerTestNonResumable, NonResumableCursorErrorThenOtherError) { // Set up data for preliminary stages auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" << "_id_"); @@ -1097,11 +1104,7 @@ TEST_F(CollectionClonerTest, NonResumableCursorErrorThenOtherError) { // We receive a CursorNotFound error, but the next query succeeds, indicating that the // collection still exists in the database, but we cannot resume the query. -TEST_F(CollectionClonerTest, NonResumableCursorErrorThenSuccessEqualsFailure) { - // Set client wireVersion to 4.2, where we do not yet support resumable cloning. - _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS, - WireVersion::SHARDED_TRANSACTIONS); - +TEST_F(CollectionClonerTestNonResumable, NonResumableCursorErrorThenSuccessEqualsFailure) { // Set up data for preliminary stages auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" << "_id_"); diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp index d380af31aad..9301ee648a6 100644 --- a/src/mongo/db/repl/database_cloner_test.cpp +++ b/src/mongo/db/repl/database_cloner_test.cpp @@ -71,6 +71,7 @@ protected: return std::move(localLoader); }; + setInitialSyncId(); } std::unique_ptr<DatabaseCloner> makeDatabaseCloner() { return std::make_unique<DatabaseCloner>(_dbName, diff --git a/src/mongo/db/repl/initial_sync_shared_data.h b/src/mongo/db/repl/initial_sync_shared_data.h index 8ab165180cb..75b8ac7e5f9 100644 --- a/src/mongo/db/repl/initial_sync_shared_data.h +++ b/src/mongo/db/repl/initial_sync_shared_data.h @@ -34,8 +34,10 @@ #include "mongo/base/status.h" #include "mongo/base/string_data.h" #include "mongo/db/server_options.h" +#include "mongo/db/wire_version.h" #include "mongo/platform/mutex.h" #include "mongo/util/clock_source.h" +#include "mongo/util/uuid.h" namespace mongo { namespace repl { @@ -86,6 +88,34 @@ public: } /** + * Sets the wire version of the sync source. + */ + void setSyncSourceWireVersion(WithLock, WireVersion wireVersion) { + _syncSourceWireVersion = wireVersion; + } + + /** + * Returns the wire version of the sync source, if previously set. + */ + boost::optional<WireVersion> getSyncSourceWireVersion(WithLock) { + return _syncSourceWireVersion; + } + + /** + * Sets the initial sync ID of the sync source. + */ + void setInitialSyncSourceId(WithLock, boost::optional<UUID> syncSourceId) { + _initialSyncSourceId = syncSourceId; + } + + /** + * Gets the previously-set initial sync ID of the sync source. + */ + boost::optional<UUID> getInitialSyncSourceId(WithLock) { + return _initialSyncSourceId; + } + + /** * Returns the total time the sync source has been unreachable, including any current outage. */ Milliseconds getTotalTimeUnreachable(WithLock lk); @@ -221,6 +251,12 @@ private: // The total time across all outages in this initial sync attempt, but excluding any current // outage, that we were retrying because we were unable to reach the sync source. Milliseconds _totalTimeUnreachable; + + // The sync source wire version at the start of data cloning. + boost::optional<WireVersion> _syncSourceWireVersion; + + // The initial sync ID on the source at the start of data cloning. + boost::optional<UUID> _initialSyncSourceId; }; } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index f45de57cc67..3a1470c6e41 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -511,6 +511,7 @@ void InitialSyncer::waitForCloner_forTest() { void InitialSyncer::_setUp_inlock(OperationContext* opCtx, std::uint32_t initialSyncMaxAttempts) { // 'opCtx' is passed through from startup(). _replicationProcess->getConsistencyMarkers()->setInitialSyncFlag(opCtx); + _replicationProcess->getConsistencyMarkers()->clearInitialSyncId(opCtx); auto serviceCtx = opCtx->getServiceContext(); _storage->setInitialDataTimestamp(serviceCtx, Timestamp::kAllowUnstableCheckpointsSentinel); @@ -550,6 +551,7 @@ void InitialSyncer::_tearDown_inlock(OperationContext* opCtx, reconstructPreparedTransactions(opCtx, repl::OplogApplication::Mode::kInitialSync); + _replicationProcess->getConsistencyMarkers()->setInitialSyncIdIfNotSet(opCtx); _replicationProcess->getConsistencyMarkers()->clearInitialSyncFlag(opCtx); // All updates that represent initial sync must be completed before setting the initial data diff --git a/src/mongo/db/repl/replication_consistency_markers.h b/src/mongo/db/repl/replication_consistency_markers.h index e873d4977c9..57cd96262c1 100644 --- a/src/mongo/db/repl/replication_consistency_markers.h +++ b/src/mongo/db/repl/replication_consistency_markers.h @@ -72,6 +72,16 @@ class StorageInterface; * } * * See below for explanations of each field. + * + * The initialSyncId document, in 'local.replset.initialSyncId', is used to detect when nodes have + * been resynced. It is set at the end of initial sync, or whenever replication is started when the + * document does not exist. + * + * Example of all fields: + * { + * _id: <UUID>, + * wallTime: <Date_t> + * } */ class ReplicationConsistencyMarkers { ReplicationConsistencyMarkers(const ReplicationConsistencyMarkers&) = delete; @@ -254,6 +264,22 @@ public: * or `oplogTruncateAfterPoint`. */ virtual Status createInternalCollections(OperationContext* opCtx) = 0; + + /** + * Set the initial sync ID and wall time if it is not already set. This will create the + * collection if necessary. + */ + virtual void setInitialSyncIdIfNotSet(OperationContext* opCtx) = 0; + + /** + * Clears the initial sync ID by dropping the collection. + */ + virtual void clearInitialSyncId(OperationContext* opCtx) = 0; + + /** + * Returns the initial sync id object, or an empty object if none. + */ + virtual BSONObj getInitialSyncId(OperationContext* opCtx) = 0; }; } // namespace repl diff --git a/src/mongo/db/repl/replication_consistency_markers.idl b/src/mongo/db/repl/replication_consistency_markers.idl index df977970a30..7610f82e1fa 100644 --- a/src/mongo/db/repl/replication_consistency_markers.idl +++ b/src/mongo/db/repl/replication_consistency_markers.idl @@ -71,3 +71,13 @@ structs: type: string description: "Always set to 'oplogTruncateAfterPoint' to easily retrieve it." + InitialSyncIdDocument: + description: A document in which the server stores data related to the initial sync of the server. + fields: + _id: + type: uuid + description: "An arbitrary unique identifier associated with the initial sync of the server." + wallTime: + type: date + optional: true + description: "The walltime at which the initial sync document was written." diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp index c06c8126f1a..fe73097941c 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp @@ -47,6 +47,7 @@ namespace repl { constexpr StringData ReplicationConsistencyMarkersImpl::kDefaultMinValidNamespace; constexpr StringData ReplicationConsistencyMarkersImpl::kDefaultOplogTruncateAfterPointNamespace; +constexpr StringData ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace; namespace { const BSONObj kInitialSyncFlag(BSON(MinValidDocument::kInitialSyncFlagFieldName << true)); @@ -60,15 +61,18 @@ ReplicationConsistencyMarkersImpl::ReplicationConsistencyMarkersImpl( storageInterface, NamespaceString(ReplicationConsistencyMarkersImpl::kDefaultMinValidNamespace), NamespaceString( - ReplicationConsistencyMarkersImpl::kDefaultOplogTruncateAfterPointNamespace)) {} + ReplicationConsistencyMarkersImpl::kDefaultOplogTruncateAfterPointNamespace), + NamespaceString(ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace)) {} ReplicationConsistencyMarkersImpl::ReplicationConsistencyMarkersImpl( StorageInterface* storageInterface, NamespaceString minValidNss, - NamespaceString oplogTruncateAfterPointNss) + NamespaceString oplogTruncateAfterPointNss, + NamespaceString initialSyncIdNss) : _storageInterface(storageInterface), _minValidNss(minValidNss), - _oplogTruncateAfterPointNss(oplogTruncateAfterPointNss) {} + _oplogTruncateAfterPointNss(oplogTruncateAfterPointNss), + _initialSyncIdNss(initialSyncIdNss) {} boost::optional<MinValidDocument> ReplicationConsistencyMarkersImpl::_getMinValidDocument( OperationContext* opCtx) const { @@ -526,5 +530,44 @@ Status ReplicationConsistencyMarkersImpl::createInternalCollections(OperationCon return Status::OK(); } +void ReplicationConsistencyMarkersImpl::setInitialSyncIdIfNotSet(OperationContext* opCtx) { + auto status = + _storageInterface->createCollection(opCtx, _initialSyncIdNss, CollectionOptions()); + if (!status.isOK() && status.code() != ErrorCodes::NamespaceExists) { + LOGV2_FATAL( + 4608500, "Failed to create collection", "namespace"_attr = _initialSyncIdNss.ns()); + fassertFailedWithStatus(4608502, status); + } + + auto prevId = _storageInterface->findSingleton(opCtx, _initialSyncIdNss); + if (prevId.getStatus() == ErrorCodes::CollectionIsEmpty) { + auto doc = BSON("_id" << UUID::gen() << "wallTime" + << opCtx->getServiceContext()->getPreciseClockSource()->now()); + fassert(4608503, + _storageInterface->insertDocument(opCtx, + _initialSyncIdNss, + TimestampedBSONObj{doc, Timestamp()}, + OpTime::kUninitializedTerm)); + } else if (!prevId.isOK()) { + fassertFailedWithStatus(4608504, prevId.getStatus()); + } +} + +void ReplicationConsistencyMarkersImpl::clearInitialSyncId(OperationContext* opCtx) { + fassert(4608501, _storageInterface->dropCollection(opCtx, _initialSyncIdNss)); +} + +BSONObj ReplicationConsistencyMarkersImpl::getInitialSyncId(OperationContext* opCtx) { + auto idStatus = _storageInterface->findSingleton(opCtx, _initialSyncIdNss); + if (idStatus.isOK()) { + return idStatus.getValue(); + } + if (idStatus.getStatus() != ErrorCodes::CollectionIsEmpty && + idStatus.getStatus() != ErrorCodes::NamespaceNotFound) { + uassertStatusOK(idStatus); + } + return BSONObj(); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.h b/src/mongo/db/repl/replication_consistency_markers_impl.h index 552d03a372e..40fa2768529 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl.h +++ b/src/mongo/db/repl/replication_consistency_markers_impl.h @@ -53,11 +53,13 @@ public: static constexpr StringData kDefaultMinValidNamespace = "local.replset.minvalid"_sd; static constexpr StringData kDefaultOplogTruncateAfterPointNamespace = "local.replset.oplogTruncateAfterPoint"_sd; + static constexpr StringData kDefaultInitialSyncIdNamespace = "local.replset.initialSyncId"_sd; explicit ReplicationConsistencyMarkersImpl(StorageInterface* storageInterface); ReplicationConsistencyMarkersImpl(StorageInterface* storageInterface, NamespaceString minValidNss, - NamespaceString oplogTruncateAfterNss); + NamespaceString oplogTruncateAfterNss, + NamespaceString initialSyncIdNss); void initializeMinValidDocument(OperationContext* opCtx) override; @@ -89,7 +91,11 @@ public: void clearAppliedThrough(OperationContext* opCtx, const Timestamp& writeTimestamp) override; OpTime getAppliedThrough(OperationContext* opCtx) const override; - Status createInternalCollections(OperationContext* opCtx); + Status createInternalCollections(OperationContext* opCtx) override; + + void setInitialSyncIdIfNotSet(OperationContext* opCtx) override; + void clearInitialSyncId(OperationContext* opCtx) override; + BSONObj getInitialSyncId(OperationContext* opCtx) override; private: /** @@ -124,6 +130,7 @@ private: StorageInterface* _storageInterface; const NamespaceString _minValidNss; const NamespaceString _oplogTruncateAfterPointNss; + const NamespaceString _initialSyncIdNss; // Protects modifying and reading _isPrimary below. mutable Mutex _truncatePointIsPrimaryMutex = diff --git a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp index 8d1e7c07bc5..8767fec7057 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp @@ -56,6 +56,7 @@ using namespace mongo::repl; NamespaceString kMinValidNss("local", "replset.minvalid"); NamespaceString kOplogTruncateAfterPointNss("local", "replset.oplogTruncateAfterPoint"); +NamespaceString kInitialSyncIdNss("local", "replset.initialSyncId"); /** * Returns min valid document. @@ -121,7 +122,7 @@ bool RecoveryUnitWithDurabilityTracking::waitUntilDurable(OperationContext* opCt TEST_F(ReplicationConsistencyMarkersTest, InitialSyncFlag) { ReplicationConsistencyMarkersImpl consistencyMarkers( - getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss); + getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss, kInitialSyncIdNss); auto opCtx = getOperationContext(); ASSERT(consistencyMarkers.createInternalCollections(opCtx).isOK()); consistencyMarkers.initializeMinValidDocument(opCtx); @@ -145,7 +146,7 @@ TEST_F(ReplicationConsistencyMarkersTest, InitialSyncFlag) { TEST_F(ReplicationConsistencyMarkersTest, GetMinValidAfterSettingInitialSyncFlagWorks) { ReplicationConsistencyMarkersImpl consistencyMarkers( - getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss); + getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss, kInitialSyncIdNss); auto opCtx = getOperationContext(); ASSERT(consistencyMarkers.createInternalCollections(opCtx).isOK()); consistencyMarkers.initializeMinValidDocument(opCtx); @@ -164,7 +165,7 @@ TEST_F(ReplicationConsistencyMarkersTest, GetMinValidAfterSettingInitialSyncFlag TEST_F(ReplicationConsistencyMarkersTest, ClearInitialSyncFlagResetsOplogTruncateAfterPoint) { ReplicationConsistencyMarkersImpl consistencyMarkers( - getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss); + getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss, kInitialSyncIdNss); auto opCtx = getOperationContext(); ASSERT(consistencyMarkers.createInternalCollections(opCtx).isOK()); consistencyMarkers.initializeMinValidDocument(opCtx); @@ -187,7 +188,7 @@ TEST_F(ReplicationConsistencyMarkersTest, ClearInitialSyncFlagResetsOplogTruncat TEST_F(ReplicationConsistencyMarkersTest, ReplicationConsistencyMarkers) { ReplicationConsistencyMarkersImpl consistencyMarkers( - getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss); + getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss, kInitialSyncIdNss); auto opCtx = getOperationContext(); ASSERT(consistencyMarkers.createInternalCollections(opCtx).isOK()); consistencyMarkers.initializeMinValidDocument(opCtx); @@ -242,5 +243,43 @@ TEST_F(ReplicationConsistencyMarkersTest, ReplicationConsistencyMarkers) { ASSERT_FALSE(recoveryUnit->waitUntilDurableCalled); } +TEST_F(ReplicationConsistencyMarkersTest, InitialSyncId) { + ReplicationConsistencyMarkersImpl consistencyMarkers( + getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss, kInitialSyncIdNss); + auto opCtx = getOperationContext(); + + // Initially, initialSyncId should be unset. + auto initialSyncIdShouldBeUnset = consistencyMarkers.getInitialSyncId(opCtx); + ASSERT(initialSyncIdShouldBeUnset.isEmpty()) << initialSyncIdShouldBeUnset; + + // Clearing an already-clear initialSyncId should be OK. + consistencyMarkers.clearInitialSyncId(opCtx); + initialSyncIdShouldBeUnset = consistencyMarkers.getInitialSyncId(opCtx); + ASSERT(initialSyncIdShouldBeUnset.isEmpty()) << initialSyncIdShouldBeUnset; + + consistencyMarkers.setInitialSyncIdIfNotSet(opCtx); + auto firstInitialSyncIdBson = consistencyMarkers.getInitialSyncId(opCtx); + ASSERT_FALSE(firstInitialSyncIdBson.isEmpty()); + InitialSyncIdDocument firstInitialSyncIdDoc = InitialSyncIdDocument::parse( + IDLParserErrorContext("initialSyncId"), firstInitialSyncIdBson); + + // Setting it twice should change nothing. + consistencyMarkers.setInitialSyncIdIfNotSet(opCtx); + ASSERT_BSONOBJ_EQ(firstInitialSyncIdBson, consistencyMarkers.getInitialSyncId(opCtx)); + + // Clear it; should return to empty. + consistencyMarkers.clearInitialSyncId(opCtx); + initialSyncIdShouldBeUnset = consistencyMarkers.getInitialSyncId(opCtx); + ASSERT(initialSyncIdShouldBeUnset.isEmpty()) << initialSyncIdShouldBeUnset; + + // Set it; it should have a different UUID. + consistencyMarkers.setInitialSyncIdIfNotSet(opCtx); + auto secondInitialSyncIdBson = consistencyMarkers.getInitialSyncId(opCtx); + ASSERT_FALSE(secondInitialSyncIdBson.isEmpty()); + InitialSyncIdDocument secondInitialSyncIdDoc = InitialSyncIdDocument::parse( + IDLParserErrorContext("initialSyncId"), secondInitialSyncIdBson); + ASSERT_NE(firstInitialSyncIdDoc.get_id(), secondInitialSyncIdDoc.get_id()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.cpp b/src/mongo/db/repl/replication_consistency_markers_mock.cpp index 28710b460eb..875c47f9e3d 100644 --- a/src/mongo/db/repl/replication_consistency_markers_mock.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_mock.cpp @@ -135,5 +135,13 @@ Status ReplicationConsistencyMarkersMock::createInternalCollections(OperationCon return Status::OK(); } +void ReplicationConsistencyMarkersMock::setInitialSyncIdIfNotSet(OperationContext* opCtx) {} + +void ReplicationConsistencyMarkersMock::clearInitialSyncId(OperationContext* opCtx) {} + +BSONObj ReplicationConsistencyMarkersMock::getInitialSyncId(OperationContext* opCtx) { + return BSONObj(); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.h b/src/mongo/db/repl/replication_consistency_markers_mock.h index 7afe39a6bd7..a925e014d93 100644 --- a/src/mongo/db/repl/replication_consistency_markers_mock.h +++ b/src/mongo/db/repl/replication_consistency_markers_mock.h @@ -83,6 +83,10 @@ public: Status createInternalCollections(OperationContext* opCtx) override; + void setInitialSyncIdIfNotSet(OperationContext* opCtx) override; + void clearInitialSyncId(OperationContext* opCtx) override; + BSONObj getInitialSyncId(OperationContext* opCtx) override; + private: mutable Mutex _initialSyncFlagMutex = MONGO_MAKE_LATCH("ReplicationConsistencyMarkersMock::_initialSyncFlagMutex"); diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 881be41aa6d..dde8da44abb 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -745,13 +745,6 @@ public: virtual std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op, bool durablyWritten) = 0; /** - * Returns a vector of the members other than ourself in the replica set, as specified in - * the replica set config. Invalid to call if we are not in replica set mode. Returns - * an empty vector if we do not have a valid config. - */ - virtual std::vector<HostAndPort> getOtherNodesInReplSet() const = 0; - - /** * Returns a BSONObj containing a representation of the current default write concern. */ virtual WriteConcernOptions getGetLastErrorDefault() = 0; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 16ba689edab..2635494cbd4 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -738,6 +738,9 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* opCtx, auto memberState = getMemberState(); invariant(memberState.startup2() || memberState.removed()); invariant(setFollowerMode(MemberState::RS_RECOVERING)); + // Set an initial sync ID, in case we were upgraded or restored from backup without doing + // an initial sync. + _replicationProcess->getConsistencyMarkers()->setInitialSyncIdIfNotSet(opCtx); _externalState->startSteadyStateReplication(opCtx, this); return; } @@ -4273,24 +4276,6 @@ std::vector<HostAndPort> ReplicationCoordinatorImpl::getHostsWrittenTo(const OpT return _topCoord->getHostsWrittenTo(op, durablyWritten); } -std::vector<HostAndPort> ReplicationCoordinatorImpl::getOtherNodesInReplSet() const { - stdx::lock_guard<Latch> lk(_mutex); - invariant(_settings.usingReplSets()); - - std::vector<HostAndPort> nodes; - if (_selfIndex == -1) { - return nodes; - } - - for (int i = 0; i < _rsConfig.getNumMembers(); ++i) { - if (i == _selfIndex) - continue; - - nodes.push_back(_rsConfig.getMemberAt(i).getHostAndPort()); - } - return nodes; -} - Status ReplicationCoordinatorImpl::checkIfWriteConcernCanBeSatisfied( const WriteConcernOptions& writeConcern) const { stdx::lock_guard<Latch> lock(_mutex); diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index fc9e4a39888..b08d49f4429 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -261,8 +261,6 @@ public: virtual std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op, bool durablyWritten) override; - virtual std::vector<HostAndPort> getOtherNodesInReplSet() const override; - virtual WriteConcernOptions getGetLastErrorDefault() override; virtual Status checkReplEnabledForCommand(BSONObjBuilder* result) override; diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index aff27217a52..7d1498d80b2 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -3013,34 +3013,6 @@ TEST_F(ReplCoordTest, } } -TEST_F(ReplCoordTest, NodeReturnsNoNodesWhenGetOtherNodesInReplSetIsRunBeforeHavingAConfig) { - start(); - ASSERT_EQUALS(0U, getReplCoord()->getOtherNodesInReplSet().size()); -} - -TEST_F(ReplCoordTest, NodeReturnsListOfNodesOtherThanItselfInResponseToGetOtherNodesInReplSet) { - assertStartSuccess(BSON("_id" - << "mySet" - << "version" << 2 << "members" - << BSON_ARRAY(BSON("_id" << 0 << "host" - << "h1") - << BSON("_id" << 1 << "host" - << "h2") - << BSON("_id" << 2 << "host" - << "h3" - << "priority" << 0 << "hidden" << true))), - HostAndPort("h1")); - - std::vector<HostAndPort> otherNodes = getReplCoord()->getOtherNodesInReplSet(); - ASSERT_EQUALS(2U, otherNodes.size()); - if (otherNodes[0] == HostAndPort("h2")) { - ASSERT_EQUALS(HostAndPort("h3"), otherNodes[1]); - } else { - ASSERT_EQUALS(HostAndPort("h3"), otherNodes[0]); - ASSERT_EQUALS(HostAndPort("h2"), otherNodes[1]); - } -} - TEST_F(ReplCoordTest, AwaitIsMasterResponseReturnsCurrentTopologyVersionOnTimeOut) { init(); assertStartSuccess(BSON("_id" diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 6ddd4d4e84f..3f27644b3c6 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -407,10 +407,6 @@ std::vector<HostAndPort> ReplicationCoordinatorMock::getHostsWrittenTo(const OpT return std::vector<HostAndPort>(); } -std::vector<HostAndPort> ReplicationCoordinatorMock::getOtherNodesInReplSet() const { - return std::vector<HostAndPort>(); -} - Status ReplicationCoordinatorMock::checkIfWriteConcernCanBeSatisfied( const WriteConcernOptions& writeConcern) const { return Status::OK(); diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 7a4b8db79a0..970645e8154 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -227,8 +227,6 @@ public: virtual std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op, bool durablyWritten); - virtual std::vector<HostAndPort> getOtherNodesInReplSet() const; - virtual WriteConcernOptions getGetLastErrorDefault(); virtual Status checkReplEnabledForCommand(BSONObjBuilder* result); diff --git a/src/mongo/db/repl/replication_coordinator_noop.cpp b/src/mongo/db/repl/replication_coordinator_noop.cpp index d0e52db43a2..d54c2acc1a2 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.cpp +++ b/src/mongo/db/repl/replication_coordinator_noop.cpp @@ -350,10 +350,6 @@ std::vector<HostAndPort> ReplicationCoordinatorNoOp::getHostsWrittenTo(const OpT MONGO_UNREACHABLE; } -std::vector<HostAndPort> ReplicationCoordinatorNoOp::getOtherNodesInReplSet() const { - MONGO_UNREACHABLE; -} - Status ReplicationCoordinatorNoOp::checkIfWriteConcernCanBeSatisfied( const WriteConcernOptions&) const { MONGO_UNREACHABLE; diff --git a/src/mongo/db/repl/replication_coordinator_noop.h b/src/mongo/db/repl/replication_coordinator_noop.h index 382073c21fb..b897576dcf9 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.h +++ b/src/mongo/db/repl/replication_coordinator_noop.h @@ -196,8 +196,6 @@ public: std::vector<HostAndPort> getHostsWrittenTo(const OpTime&, bool) final; - std::vector<HostAndPort> getOtherNodesInReplSet() const final; - Status checkReplEnabledForCommand(BSONObjBuilder*) final; HostAndPort chooseNewSyncSource(const OpTime&) final; |