From b70908c4898c4b02d8c70df3f796fa3e8cfe5bdd Mon Sep 17 00:00:00 2001 From: Matthew Russotto Date: Fri, 27 Mar 2020 14:50:35 -0400 Subject: SERVER-46085 Fail initial sync attempt if sync source is in initial sync --- .../initial_sync_fails_after_source_resyncs.js | 93 ++++++++++++++++++ .../initial_sync_fails_when_source_resyncs.js | 93 ++++++++++++++++++ src/mongo/client/dbclient_mockcursor.cpp | 4 +- src/mongo/db/repl/SConscript | 12 ++- src/mongo/db/repl/all_database_cloner.cpp | 58 ++++++++++- src/mongo/db/repl/all_database_cloner.h | 8 +- src/mongo/db/repl/all_database_cloner_test.cpp | 107 +++++++++++++++++++++ src/mongo/db/repl/base_cloner.cpp | 59 +++++++++++- src/mongo/db/repl/base_cloner.h | 23 ++++- src/mongo/db/repl/cloner_test_fixture.cpp | 12 +++ src/mongo/db/repl/cloner_test_fixture.h | 3 + src/mongo/db/repl/collection_cloner_test.cpp | 103 ++++++++++---------- src/mongo/db/repl/database_cloner_test.cpp | 1 + src/mongo/db/repl/initial_sync_shared_data.h | 36 +++++++ src/mongo/db/repl/initial_syncer.cpp | 2 + .../db/repl/replication_consistency_markers.h | 26 +++++ .../db/repl/replication_consistency_markers.idl | 10 ++ .../repl/replication_consistency_markers_impl.cpp | 49 +++++++++- .../db/repl/replication_consistency_markers_impl.h | 11 ++- .../replication_consistency_markers_impl_test.cpp | 47 ++++++++- .../repl/replication_consistency_markers_mock.cpp | 8 ++ .../db/repl/replication_consistency_markers_mock.h | 4 + src/mongo/db/repl/replication_coordinator.h | 7 -- src/mongo/db/repl/replication_coordinator_impl.cpp | 21 +--- src/mongo/db/repl/replication_coordinator_impl.h | 2 - .../db/repl/replication_coordinator_impl_test.cpp | 28 ------ src/mongo/db/repl/replication_coordinator_mock.cpp | 4 - src/mongo/db/repl/replication_coordinator_mock.h | 2 - src/mongo/db/repl/replication_coordinator_noop.cpp | 4 - src/mongo/db/repl/replication_coordinator_noop.h | 2 - .../embedded/replication_coordinator_embedded.cpp | 4 - .../embedded/replication_coordinator_embedded.h | 2 - src/mongo/util/uuid.h | 2 + 33 files changed, 699 insertions(+), 148 deletions(-) create mode 100644 jstests/replsets/initial_sync_fails_after_source_resyncs.js create mode 100644 jstests/replsets/initial_sync_fails_when_source_resyncs.js diff --git a/jstests/replsets/initial_sync_fails_after_source_resyncs.js b/jstests/replsets/initial_sync_fails_after_source_resyncs.js new file mode 100644 index 00000000000..f89137531fb --- /dev/null +++ b/jstests/replsets/initial_sync_fails_after_source_resyncs.js @@ -0,0 +1,93 @@ +/** + * Tests that initial sync will abort an attempt if the sync source enters and completes initial + * sync during cloning (i.e. the source is resynced during an outage). + * @tags: [requires_fcv_44] + */ +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); + +const testName = "initial_sync_fails_after_source_resyncs"; +const rst = new ReplSetTest({ + name: testName, + nodes: [{}, {rsConfig: {priority: 0, votes: 0}}], + allowChaining: true, + useBridge: true +}); +const nodes = rst.startSet(); +rst.initiateWithHighElectionTimeout(); + +const primary = rst.getPrimary(); +const primaryDb = primary.getDB("test"); +const initialSyncSource = rst.getSecondary(); + +// Add some data to be cloned. +assert.commandWorked(primaryDb.test.insert([{a: 1}, {b: 2}, {c: 3}])); +rst.awaitReplication(); + +jsTest.log("Adding the initial sync destination node to the replica set"); +const initialSyncNode = rst.add({ + rsConfig: {priority: 0, votes: 0}, + setParameter: { + 'failpoint.initialSyncHangBeforeCopyingDatabases': tojson({mode: 'alwaysOn'}), + // This test is specifically testing that the cloners detect the source going into initial + // sync mode, so we turn off the oplog fetcher to ensure that we don't inadvertently test + // that instead. + 'failpoint.hangBeforeStartingOplogFetcher': tojson({mode: 'alwaysOn'}), + 'numInitialSyncAttempts': 1, + 'failpoint.forceSyncSourceCandidate': + tojson({mode: 'alwaysOn', data: {hostAndPort: initialSyncSource.host}}) + } +}); +rst.reInitiate(); +rst.waitForState(initialSyncNode, ReplSetTest.State.STARTUP_2); + +// The code handling this case is common to all cloners, so run it only for the stage most likely +// to see an error. +const cloner = 'CollectionCloner'; +const stage = 'query'; + +// Set us up to hang before finish so we can check status. +const beforeFinishFailPoint = configureFailPoint(initialSyncNode, "initialSyncHangBeforeFinish"); +const initialSyncNodeDb = initialSyncNode.getDB("test"); +const failPointData = { + cloner: cloner, + stage: stage, + nss: 'test.test' +}; +// Set us up to stop right before the given stage. +const beforeStageFailPoint = + configureFailPoint(initialSyncNodeDb, "hangBeforeClonerStage", failPointData); +// Release the initial failpoint. +assert.commandWorked(initialSyncNodeDb.adminCommand( + {configureFailPoint: "initialSyncHangBeforeCopyingDatabases", mode: "off"})); +beforeStageFailPoint.wait(); + +jsTestLog("Testing resyncing sync source in cloner " + cloner + " stage " + stage); +initialSyncSource.disconnect(initialSyncNode); +rst.restart(initialSyncSource, {startClean: true}); +// Wait for the source to reach SECONDARY +rst.awaitSecondaryNodes(undefined, [initialSyncSource]); + +jsTestLog("Resuming the initial sync."); +initialSyncSource.reconnect(initialSyncNode); +beforeStageFailPoint.off(); +beforeFinishFailPoint.wait(); +const res = assert.commandWorked(initialSyncNode.adminCommand({replSetGetStatus: 1})); + +// The initial sync should have failed. +assert.eq(res.initialSyncStatus.failedInitialSyncAttempts, 1); +beforeFinishFailPoint.off(); + +// Release the initial sync source and sync node oplog fetcher so the test completes. +assert.commandWorked(initialSyncNodeDb.adminCommand( + {configureFailPoint: "hangBeforeStartingOplogFetcher", mode: "off"})); +assert.commandWorked(initialSyncSource.getDB("admin").adminCommand( + {configureFailPoint: "initialSyncHangBeforeFinish", mode: "off"})); + +// We skip validation and dbhashes because the initial sync failed so the initial sync node is +// invalid and unreachable. +TestData.skipCheckDBHashes = true; +rst.stopSet(null, null, {skipValidation: true}); +})(); diff --git a/jstests/replsets/initial_sync_fails_when_source_resyncs.js b/jstests/replsets/initial_sync_fails_when_source_resyncs.js new file mode 100644 index 00000000000..862fd6083fd --- /dev/null +++ b/jstests/replsets/initial_sync_fails_when_source_resyncs.js @@ -0,0 +1,93 @@ +/** + * Tests that initial sync will abort an attempt if the sync source enters initial sync during + * cloning. This test will timeout if the attempt is not aborted. + * + * @tags: [requires_fcv_44] + */ +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); + +const testName = "initial_sync_fails_when_source_resyncs"; +const rst = new ReplSetTest( + {name: testName, nodes: [{}, {rsConfig: {priority: 0, votes: 0}}], allowChaining: true}); +const nodes = rst.startSet(); +rst.initiateWithHighElectionTimeout(); + +const primary = rst.getPrimary(); +const primaryDb = primary.getDB("test"); +let initialSyncSource = rst.getSecondary(); + +// Add some data to be cloned. +assert.commandWorked(primaryDb.test.insert([{a: 1}, {b: 2}, {c: 3}])); +rst.awaitReplication(); + +jsTest.log("Adding the initial sync destination node to the replica set"); +const initialSyncNode = rst.add({ + rsConfig: {priority: 0, votes: 0}, + setParameter: { + 'failpoint.initialSyncHangBeforeCopyingDatabases': tojson({mode: 'alwaysOn'}), + // This test is specifically testing that the cloners detect the source going into initial + // sync mode, so we turn off the oplog fetcher to ensure that we don't inadvertently test + // that instead. + 'failpoint.hangBeforeStartingOplogFetcher': tojson({mode: 'alwaysOn'}), + 'numInitialSyncAttempts': 1, + 'failpoint.forceSyncSourceCandidate': + tojson({mode: 'alwaysOn', data: {hostAndPort: initialSyncSource.host}}) + } +}); +rst.reInitiate(); +rst.waitForState(initialSyncNode, ReplSetTest.State.STARTUP_2); + +// The code handling this case is common to all cloners, so run it only for the stage most likely +// to see an error. +const cloner = 'CollectionCloner'; +const stage = 'query'; + +// Set us up to hang before finish so we can check status. +const beforeFinishFailPoint = configureFailPoint(initialSyncNode, "initialSyncHangBeforeFinish"); +const initialSyncNodeDb = initialSyncNode.getDB("test"); +const failPointData = { + cloner: cloner, + stage: stage, + nss: 'test.test' +}; +// Set us up to stop right before the given stage. +const beforeStageFailPoint = + configureFailPoint(initialSyncNodeDb, "hangBeforeClonerStage", failPointData); +// Release the initial failpoint. +assert.commandWorked(initialSyncNodeDb.adminCommand( + {configureFailPoint: "initialSyncHangBeforeCopyingDatabases", mode: "off"})); +beforeStageFailPoint.wait(); + +jsTestLog("Testing resyncing sync source in cloner " + cloner + " stage " + stage); + +// We hold the source in initial sync mode. +initialSyncSource = rst.restart(initialSyncSource, { + startClean: true, + setParameter: {"failpoint.initialSyncHangBeforeFinish": tojson({mode: "alwaysOn"})} +}); +// Wait for the source to go into initial sync. +rst.waitForState(initialSyncSource, ReplSetTest.State.STARTUP_2); + +jsTestLog("Resuming the initial sync."); +beforeStageFailPoint.off(); +beforeFinishFailPoint.wait(); +const res = assert.commandWorked(initialSyncNode.adminCommand({replSetGetStatus: 1})); + +// The initial sync should have failed. +assert.eq(res.initialSyncStatus.failedInitialSyncAttempts, 1); +beforeFinishFailPoint.off(); + +// Release the initial sync source and sync node oplog fetcher so the test completes. +assert.commandWorked(initialSyncNodeDb.adminCommand( + {configureFailPoint: "hangBeforeStartingOplogFetcher", mode: "off"})); +assert.commandWorked(initialSyncSource.getDB("admin").adminCommand( + {configureFailPoint: "initialSyncHangBeforeFinish", mode: "off"})); + +// We skip validation and dbhashes because the initial sync failed so the initial sync node is +// invalid and unreachable. +TestData.skipCheckDBHashes = true; +rst.stopSet(null, null, {skipValidation: true}); +})(); diff --git a/src/mongo/client/dbclient_mockcursor.cpp b/src/mongo/client/dbclient_mockcursor.cpp index d70fe3b3d06..40b505c7abe 100644 --- a/src/mongo/client/dbclient_mockcursor.cpp +++ b/src/mongo/client/dbclient_mockcursor.cpp @@ -74,7 +74,7 @@ void DBClientMockCursor::_fillNextBatch() { int leftInBatch = _batchSize; batch.objs.clear(); while (_iter.more() && (!_batchSize || leftInBatch--)) { - batch.objs.emplace_back(_iter.next().Obj()); + batch.objs.emplace_back(_iter.next().Obj().getOwned()); } batch.pos = 0; @@ -92,4 +92,4 @@ boost::optional DBClientMockCursor::getPostBatchResumeToken() const { return _postBatchResumeToken; } -} // namespace mongo \ No newline at end of file +} // namespace mongo diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index dbe257c01b8..b1aa28819d9 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -629,10 +629,17 @@ env.Library( ], ) +env.Library('member_data', + [ + 'member_data.cpp', + ], + LIBDEPS=[ + 'replica_set_messages', + ]) + env.Library('topology_coordinator', [ 'heartbeat_response_action.cpp', - 'member_data.cpp', 'topology_coordinator.cpp', env.Idlc('topology_coordinator.idl')[0], ], @@ -641,6 +648,7 @@ env.Library('topology_coordinator', '$BUILD_DIR/mongo/rpc/metadata', '$BUILD_DIR/mongo/util/fail_point', 'isself', + 'member_data', 'replica_set_messages', 'repl_settings', 'rslog', @@ -939,6 +947,8 @@ env.Library( LIBDEPS = [ 'task_runner', 'initial_sync_shared_data', + 'member_data', + 'replication_consistency_markers_impl', '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/client/clientdriver_network', '$BUILD_DIR/mongo/util/concurrency/thread_pool', diff --git a/src/mongo/db/repl/all_database_cloner.cpp b/src/mongo/db/repl/all_database_cloner.cpp index c1ed1686fc9..53fbbe297b3 100644 --- a/src/mongo/db/repl/all_database_cloner.cpp +++ b/src/mongo/db/repl/all_database_cloner.cpp @@ -35,6 +35,8 @@ #include "mongo/base/string_data.h" #include "mongo/db/repl/all_database_cloner.h" +#include "mongo/db/repl/replication_consistency_markers_gen.h" +#include "mongo/db/repl/replication_consistency_markers_impl.h" #include "mongo/logv2/log.h" #include "mongo/util/assert_util.h" namespace mongo { @@ -47,10 +49,11 @@ AllDatabaseCloner::AllDatabaseCloner(InitialSyncSharedData* sharedData, ThreadPool* dbPool) : BaseCloner("AllDatabaseCloner"_sd, sharedData, source, client, storageInterface, dbPool), _connectStage("connect", this, &AllDatabaseCloner::connectStage), + _getInitialSyncIdStage("getInitialSyncId", this, &AllDatabaseCloner::getInitialSyncIdStage), _listDatabasesStage("listDatabases", this, &AllDatabaseCloner::listDatabasesStage) {} BaseCloner::ClonerStages AllDatabaseCloner::getStages() { - return {&_connectStage, &_listDatabasesStage}; + return {&_connectStage, &_getInitialSyncIdStage, &_listDatabasesStage}; } Status AllDatabaseCloner::ensurePrimaryOrSecondary( @@ -65,9 +68,12 @@ Status AllDatabaseCloner::ensurePrimaryOrSecondary( // There is a window during startup where a node has an invalid configuration and will have // an isMaster response the same as a removed node. So we must check to see if the node is // removed by checking local configuration. - auto otherNodes = - ReplicationCoordinator::get(getGlobalServiceContext())->getOtherNodesInReplSet(); - if (std::find(otherNodes.begin(), otherNodes.end(), getSource()) == otherNodes.end()) { + auto memberData = ReplicationCoordinator::get(getGlobalServiceContext())->getMemberData(); + auto syncSourceIter = std::find_if( + memberData.begin(), memberData.end(), [source = getSource()](const MemberData& member) { + return member.getHostAndPort() == source; + }); + if (syncSourceIter == memberData.end()) { Status status(ErrorCodes::NotMasterOrSecondary, str::stream() << "Sync source " << getSource() << " has been removed from the replication configuration."); @@ -76,6 +82,20 @@ Status AllDatabaseCloner::ensurePrimaryOrSecondary( getSharedData()->setInitialSyncStatusIfOK(lk, status); return status; } + + // We also check if the sync source has gone into initial sync itself. If so, we'll never be + // able to sync from it and we should abort the attempt. Because there is a window during + // startup where a node will report being in STARTUP2 even if it is not in initial sync, + // we also check to see if it has a sync source. A node in STARTUP2 will not have a sync + // source unless it is in initial sync. + if (syncSourceIter->getState().startup2() && !syncSourceIter->getSyncSource().empty()) { + Status status(ErrorCodes::NotMasterOrSecondary, + str::stream() << "Sync source " << getSource() << " has been resynced."); + stdx::lock_guard lk(*getSharedData()); + // Setting the status in the shared data will cancel the initial sync. + getSharedData()->setInitialSyncStatusIfOK(lk, status); + return status; + } return Status(ErrorCodes::NotMasterOrSecondary, str::stream() << "Cannot connect because sync source " << getSource() << " is neither primary nor secondary."); @@ -99,6 +119,36 @@ BaseCloner::AfterStageBehavior AllDatabaseCloner::connectStage() { return kContinueNormally; } +BaseCloner::AfterStageBehavior AllDatabaseCloner::getInitialSyncIdStage() { + auto wireVersion = static_cast(getClient()->getMaxWireVersion()); + { + stdx::lock_guard lk(*getSharedData()); + getSharedData()->setSyncSourceWireVersion(lk, wireVersion); + } + + // Wire versions prior to resumable initial sync don't have a sync source id. + if (wireVersion < WireVersion::RESUMABLE_INITIAL_SYNC) + return kContinueNormally; + auto initialSyncId = getClient()->findOne( + ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(), Query()); + // TODO(SERVER-47150): Remove this "if" and allow the uassert to fail. + if (initialSyncId.isEmpty()) { + stdx::lock_guard lk(*getSharedData()); + getSharedData()->setInitialSyncSourceId(lk, UUID::gen()); + return kContinueNormally; + } + uassert(ErrorCodes::InitialSyncFailure, + "Cannot retrieve sync source initial sync ID", + !initialSyncId.isEmpty()); + InitialSyncIdDocument initialSyncIdDoc = + InitialSyncIdDocument::parse(IDLParserErrorContext("initialSyncId"), initialSyncId); + { + stdx::lock_guard lk(*getSharedData()); + getSharedData()->setInitialSyncSourceId(lk, initialSyncIdDoc.get_id()); + } + return kContinueNormally; +} + BaseCloner::AfterStageBehavior AllDatabaseCloner::listDatabasesStage() { BSONObj res; auto databasesArray = getClient()->getDatabaseInfos(BSONObj(), true /* nameOnly */); diff --git a/src/mongo/db/repl/all_database_cloner.h b/src/mongo/db/repl/all_database_cloner.h index 5436b6172ce..8c6a264e906 100644 --- a/src/mongo/db/repl/all_database_cloner.h +++ b/src/mongo/db/repl/all_database_cloner.h @@ -69,7 +69,7 @@ private: public: ConnectStage(std::string name, AllDatabaseCloner* cloner, ClonerRunFn stageFunc) : ClonerStage(name, cloner, stageFunc){}; - virtual bool checkRollBackIdOnRetry() { + bool checkSyncSourceValidityOnRetry() final { return false; } }; @@ -102,6 +102,11 @@ private: */ AfterStageBehavior connectStage(); + /** + * Stage function that gets the wire version and initial sync ID. + */ + AfterStageBehavior getInitialSyncIdStage(); + /** * Stage function that retrieves database information from the sync source. */ @@ -128,6 +133,7 @@ private: // (MX) Write access with mutex from main flow of control, read access with mutex from other // threads, read access allowed from main flow without mutex. ConnectStage _connectStage; // (R) + ConnectStage _getInitialSyncIdStage; // (R) ClonerStage _listDatabasesStage; // (R) std::vector _databases; // (X) std::unique_ptr _currentDatabaseCloner; // (MX) diff --git a/src/mongo/db/repl/all_database_cloner_test.cpp b/src/mongo/db/repl/all_database_cloner_test.cpp index 10507e1bb85..2da6f3831cb 100644 --- a/src/mongo/db/repl/all_database_cloner_test.cpp +++ b/src/mongo/db/repl/all_database_cloner_test.cpp @@ -33,6 +33,7 @@ #include "mongo/db/repl/all_database_cloner.h" #include "mongo/db/repl/cloner_test_fixture.h" +#include "mongo/db/repl/replication_consistency_markers_impl.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/service_context_test_fixture.h" @@ -293,6 +294,112 @@ TEST_F(AllDatabaseClonerTest, RetriesListDatabasesButRollBackIdChanges) { ASSERT_EQ(Minutes(60), _sharedData->getTotalTimeUnreachable(WithLock::withoutLock())); } +TEST_F(AllDatabaseClonerTest, RetriesListDatabasesButSourceNodeIsDowngraded) { + _mockClient->setWireVersions(WireVersion::RESUMABLE_INITIAL_SYNC, + WireVersion::RESUMABLE_INITIAL_SYNC); + auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); + _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); + _mockServer->setCommandReply("listDatabases", fromjson("{ok:1, databases:[]}")); + + // Stop at the listDatabases stage. + auto timesEnteredBeforeStage = beforeStageFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}")); + + auto cloner = makeAllDatabaseCloner(); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + ASSERT_NOT_OK(cloner->run()); + }); + + // Wait until we get to the listDatabases stage. + beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1); + + // Bring the server down. + _mockServer->shutdown(); + + auto beforeRBIDFailPoint = + globalFailPointRegistry().find("hangBeforeCheckingRollBackIdClonerStage"); + auto timesEnteredRBID = beforeRBIDFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}")); + beforeStageFailPoint->setMode(FailPoint::off, 0); + beforeRBIDFailPoint->waitForTimesEntered(timesEnteredRBID + 1); + _clock.advance(Minutes(60)); + + // Bring the server up, but change the wire version to an older one. + unittest::log() << "Bringing mock server back up."; + _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS, + WireVersion::SHARDED_TRANSACTIONS); + _mockServer->reboot(); + + // Allow the cloner to finish. + beforeRBIDFailPoint->setMode(FailPoint::off, 0); + clonerThread.join(); + + // Total retries and outage time should be available. + ASSERT_EQ(0, _sharedData->getRetryingOperationsCount(WithLock::withoutLock())); + ASSERT_EQ(1, _sharedData->getTotalRetries(WithLock::withoutLock())); + ASSERT_EQ(Minutes(60), _sharedData->getTotalTimeUnreachable(WithLock::withoutLock())); +} + +TEST_F(AllDatabaseClonerTest, RetriesListDatabasesButInitialSyncIdChanges) { + // Initial Sync Ids are not checked before wire version RESUMABLE_INITIAL_SYNC. + _mockClient->setWireVersions(WireVersion::RESUMABLE_INITIAL_SYNC, + WireVersion::RESUMABLE_INITIAL_SYNC); + auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); + _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); + _mockServer->setCommandReply("listDatabases", fromjson("{ok:1, databases:[]}")); + + // Stop at the listDatabases stage. + auto timesEnteredBeforeStage = beforeStageFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}")); + + auto cloner = makeAllDatabaseCloner(); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + ASSERT_NOT_OK(cloner->run()); + }); + + // Wait until we get to the listDatabases stage. + beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1); + + // Bring the server down. + _mockServer->shutdown(); + + auto beforeRBIDFailPoint = + globalFailPointRegistry().find("hangBeforeCheckingRollBackIdClonerStage"); + auto timesEnteredRBID = beforeRBIDFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}")); + beforeStageFailPoint->setMode(FailPoint::off, 0); + beforeRBIDFailPoint->waitForTimesEntered(timesEnteredRBID + 1); + _clock.advance(Minutes(60)); + + // Bring the server up. + unittest::log() << "Bringing mock server back up."; + _mockServer->reboot(); + + // Clear and change the initial sync ID + _mockServer->remove( + ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(), + Query(), + 0 /* ignored flags */); + _mockServer->insert( + ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(), + BSON("_id" << UUID::gen())); + + // Allow the cloner to finish. + beforeRBIDFailPoint->setMode(FailPoint::off, 0); + clonerThread.join(); + + // Total retries and outage time should be available. + ASSERT_EQ(0, _sharedData->getRetryingOperationsCount(WithLock::withoutLock())); + ASSERT_EQ(1, _sharedData->getTotalRetries(WithLock::withoutLock())); + ASSERT_EQ(Minutes(60), _sharedData->getTotalTimeUnreachable(WithLock::withoutLock())); +} + TEST_F(AllDatabaseClonerTest, RetriesListDatabasesButTimesOut) { auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); diff --git a/src/mongo/db/repl/base_cloner.cpp b/src/mongo/db/repl/base_cloner.cpp index 1e1fdc45a19..02f8b1af48a 100644 --- a/src/mongo/db/repl/base_cloner.cpp +++ b/src/mongo/db/repl/base_cloner.cpp @@ -32,6 +32,8 @@ #include "mongo/platform/basic.h" #include "mongo/db/repl/base_cloner.h" +#include "mongo/db/repl/replication_consistency_markers_gen.h" +#include "mongo/db/repl/replication_consistency_markers_impl.h" #include "mongo/logv2/log.h" #include "mongo/util/scopeguard.h" @@ -183,6 +185,54 @@ void BaseCloner::clearRetryingState() { _retryableOp = boost::none; } +Status BaseCloner::checkSyncSourceIsStillValid() { + WireVersion wireVersion; + { + stdx::lock_guard lk(*_sharedData); + auto wireVersionOpt = _sharedData->getSyncSourceWireVersion(lk); + // The wire version should always have been set by the time this is called. + invariant(wireVersionOpt); + wireVersion = *wireVersionOpt; + } + if (wireVersion >= WireVersion::RESUMABLE_INITIAL_SYNC) { + auto status = checkInitialSyncIdIsUnchanged(); + if (!status.isOK()) + return status; + } + return checkRollBackIdIsUnchanged(); +} + +Status BaseCloner::checkInitialSyncIdIsUnchanged() { + uassert(ErrorCodes::InitialSyncFailure, + "Sync source was downgraded and no longer supports resumable initial sync", + getClient()->getMaxWireVersion() >= WireVersion::RESUMABLE_INITIAL_SYNC); + BSONObj initialSyncId; + try { + initialSyncId = getClient()->findOne( + ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(), Query()); + } catch (DBException& e) { + if (ErrorCodes::isRetriableError(e)) { + auto status = e.toStatus().withContext( + ": failed while attempting to retrieve initial sync ID after re-connect"); + LOGV2_DEBUG( + 4608505, 1, "Retrieving Initial Sync ID retriable error", "error"_attr = status); + return status; + } + throw; + } + uassert(ErrorCodes::InitialSyncFailure, + "Cannot retrieve sync source initial sync ID", + !initialSyncId.isEmpty()); + InitialSyncIdDocument initialSyncIdDoc = + InitialSyncIdDocument::parse(IDLParserErrorContext("initialSyncId"), initialSyncId); + + stdx::lock_guard lk(*_sharedData); + uassert(ErrorCodes::InitialSyncFailure, + "Sync source has been resynced since we started syncing from it", + _sharedData->getInitialSyncSourceId(lk) == initialSyncIdDoc.get_id()); + return Status::OK(); +} + Status BaseCloner::checkRollBackIdIsUnchanged() { BSONObj info; try { @@ -267,12 +317,13 @@ BaseCloner::AfterStageBehavior BaseCloner::runStageWithRetries(BaseClonerStage* } }, isThisStageFailPoint); - if (stage->checkRollBackIdOnRetry()) { - // If checkRollBackIdIsUnchanged fails without throwing, it means a network + if (stage->checkSyncSourceValidityOnRetry()) { + // If checkSyncSourceIsStillValid fails without throwing, it means a network // error occurred and it's safe to continue (which will cause another retry). - if (!checkRollBackIdIsUnchanged().isOK()) + if (!checkSyncSourceIsStillValid().isOK()) continue; - // After successfully checking the rollback ID, the client should always be OK. + // After successfully checking the sync source validity, the client should + // always be OK. invariant(!getClient()->isFailed()); } } diff --git a/src/mongo/db/repl/base_cloner.h b/src/mongo/db/repl/base_cloner.h index 38079eeebac..d34597b332f 100644 --- a/src/mongo/db/repl/base_cloner.h +++ b/src/mongo/db/repl/base_cloner.h @@ -112,11 +112,13 @@ protected: } /** - * Returns true if the rollback ID should be checked before retrying. - * This is provided because the "connect" stage must complete successfully - * before checking rollback ID. + * Returns true if the sync source validity should be checked before retrying. + * This includes checking the sync source member state, checking the rollback ID, + * and checking the sync source initial sync ID. + * This method is provided because early stages which connect and collect + * the initial sync ID must complete successfully before checking sync source validity. */ - virtual bool checkRollBackIdOnRetry() { + virtual bool checkSyncSourceValidityOnRetry() { return true; } @@ -225,12 +227,25 @@ private: AfterStageBehavior runStageWithRetries(BaseClonerStage* stage); + /** + * Make sure the initial sync ID on the sync source has not changed. Throws an exception + * if it has. Returns a not-OK status if a network error occurs. + */ + Status checkInitialSyncIdIsUnchanged(); + /** * Make sure the rollback ID has not changed. Throws an exception if it has. Returns * a not-OK status if a network error occurs. */ Status checkRollBackIdIsUnchanged(); + /** + * Does validity checks on the sync source. If the sync source is now no longer usable, + * throws an exception. Returns a not-OK status if a network error occurs or if the sync + * source is temporarily unusable (e.g. restarting). + */ + Status checkSyncSourceIsStillValid(); + /** * Supports pausing at certain stages for the initial sync fuzzer test framework. */ diff --git a/src/mongo/db/repl/cloner_test_fixture.cpp b/src/mongo/db/repl/cloner_test_fixture.cpp index 0a6191fc171..e588748e7cc 100644 --- a/src/mongo/db/repl/cloner_test_fixture.cpp +++ b/src/mongo/db/repl/cloner_test_fixture.cpp @@ -31,6 +31,7 @@ #include "mongo/db/clientcursor.h" #include "mongo/db/repl/cloner_test_fixture.h" +#include "mongo/db/repl/replication_consistency_markers_impl.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/service_context_test_fixture.h" @@ -74,6 +75,11 @@ void ClonerTestFixture::setUp() { // Required by CollectionCloner::listIndexesStage() and IndexBuildsCoordinator. getServiceContext()->setStorageEngine(std::make_unique()); + + // Set the initial sync ID on the mock server. + _mockServer->insert( + ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(), + BSON("_id" << _initialSyncId)); } void ClonerTestFixture::tearDown() { @@ -84,5 +90,11 @@ void ClonerTestFixture::tearDown() { unittest::Test::tearDown(); } +void ClonerTestFixture::setInitialSyncId() { + stdx::lock_guard lk(*_sharedData); + _sharedData->setSyncSourceWireVersion(lk, WireVersion::RESUMABLE_INITIAL_SYNC); + _sharedData->setInitialSyncSourceId(lk, _initialSyncId); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/cloner_test_fixture.h b/src/mongo/db/repl/cloner_test_fixture.h index 726c107c273..6dddb4162da 100644 --- a/src/mongo/db/repl/cloner_test_fixture.h +++ b/src/mongo/db/repl/cloner_test_fixture.h @@ -56,6 +56,8 @@ protected: void tearDown() override; + void setInitialSyncId(); + StorageInterfaceMock _storageInterface; HostAndPort _source; std::unique_ptr _dbWorkThreadPool; @@ -63,6 +65,7 @@ protected: std::unique_ptr _mockClient; std::unique_ptr _sharedData; ClockSourceMock _clock; + UUID _initialSyncId = UUID::gen(); private: static constexpr int kInitialRollbackId = 1; diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index 5fba71c2557..0527c205a39 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -59,7 +59,7 @@ public: CollectionClonerTest() {} protected: - void setUp() final { + void setUp() override { ClonerTestFixture::setUp(); _collectionStats = std::make_shared(); _standardCreateCollectionFn = [this](const NamespaceString& nss, @@ -125,7 +125,25 @@ protected: << "b_1")}; }; -TEST_F(CollectionClonerTest, CountStage) { +class CollectionClonerTestResumable : public CollectionClonerTest { + void setUp() final { + CollectionClonerTest::setUp(); + setInitialSyncId(); + } +}; + +class CollectionClonerTestNonResumable : public CollectionClonerTest { + void setUp() final { + CollectionClonerTest::setUp(); + // Set client wireVersion to 4.2, where we do not yet support resumable cloning. + _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS, + WireVersion::SHARDED_TRANSACTIONS); + stdx::lock_guard lk(*_sharedData); + _sharedData->setSyncSourceWireVersion(lk, WireVersion::SHARDED_TRANSACTIONS); + } +}; + +TEST_F(CollectionClonerTestResumable, CountStage) { auto cloner = makeCollectionCloner(); cloner->setStopAfterStage_forTest("count"); _mockServer->setCommandReply("count", createCountResponse(100)); @@ -134,7 +152,7 @@ TEST_F(CollectionClonerTest, CountStage) { } // On a negative count, the CollectionCloner should use a zero count. -TEST_F(CollectionClonerTest, CountStageNegativeCount) { +TEST_F(CollectionClonerTestResumable, CountStageNegativeCount) { auto cloner = makeCollectionCloner(); cloner->setStopAfterStage_forTest("count"); _mockServer->setCommandReply("count", createCountResponse(-100)); @@ -143,19 +161,21 @@ TEST_F(CollectionClonerTest, CountStageNegativeCount) { } // On NamespaceNotFound, the CollectionCloner should exit without doing anything. -TEST_F(CollectionClonerTest, CountStageNamespaceNotFound) { +TEST_F(CollectionClonerTestResumable, CountStageNamespaceNotFound) { auto cloner = makeCollectionCloner(); _mockServer->setCommandReply("count", Status(ErrorCodes::NamespaceNotFound, "NoSuchUuid")); ASSERT_OK(cloner->run()); } -TEST_F(CollectionClonerTest, CollectionClonerPassesThroughNonRetriableErrorFromCountCommand) { +TEST_F(CollectionClonerTestResumable, + CollectionClonerPassesThroughNonRetriableErrorFromCountCommand) { auto cloner = makeCollectionCloner(); _mockServer->setCommandReply("count", Status(ErrorCodes::OperationFailed, "")); ASSERT_EQUALS(ErrorCodes::OperationFailed, cloner->run()); } -TEST_F(CollectionClonerTest, CollectionClonerPassesThroughCommandStatusErrorFromCountCommand) { +TEST_F(CollectionClonerTestResumable, + CollectionClonerPassesThroughCommandStatusErrorFromCountCommand) { auto cloner = makeCollectionCloner(); _mockServer->setCommandReply("count", Status(ErrorCodes::OperationFailed, "")); _mockServer->setCommandReply("count", @@ -167,7 +187,8 @@ TEST_F(CollectionClonerTest, CollectionClonerPassesThroughCommandStatusErrorFrom ASSERT_STRING_CONTAINS(status.reason(), "TEST error"); } -TEST_F(CollectionClonerTest, CollectionClonerReturnsNoSuchKeyOnMissingDocumentCountFieldName) { +TEST_F(CollectionClonerTestResumable, + CollectionClonerReturnsNoSuchKeyOnMissingDocumentCountFieldName) { auto cloner = makeCollectionCloner(); cloner->setStopAfterStage_forTest("count"); _mockServer->setCommandReply("count", BSON("ok" << 1)); @@ -175,7 +196,7 @@ TEST_F(CollectionClonerTest, CollectionClonerReturnsNoSuchKeyOnMissingDocumentCo ASSERT_EQUALS(ErrorCodes::NoSuchKey, status); } -TEST_F(CollectionClonerTest, ListIndexesReturnedNoIndexes) { +TEST_F(CollectionClonerTestResumable, ListIndexesReturnedNoIndexes) { auto cloner = makeCollectionCloner(); cloner->setStopAfterStage_forTest("listIndexes"); _mockServer->setCommandReply("count", createCountResponse(1)); @@ -187,7 +208,7 @@ TEST_F(CollectionClonerTest, ListIndexesReturnedNoIndexes) { } // NamespaceNotFound is treated the same as no index. -TEST_F(CollectionClonerTest, ListIndexesReturnedNamespaceNotFound) { +TEST_F(CollectionClonerTestResumable, ListIndexesReturnedNamespaceNotFound) { auto cloner = makeCollectionCloner(); _mockServer->setCommandReply("count", createCountResponse(1)); _mockServer->setCommandReply("listIndexes", @@ -199,7 +220,7 @@ TEST_F(CollectionClonerTest, ListIndexesReturnedNamespaceNotFound) { ASSERT_EQ(0, cloner->getStats().indexes); } -TEST_F(CollectionClonerTest, ListIndexesHasResults) { +TEST_F(CollectionClonerTestResumable, ListIndexesHasResults) { auto cloner = makeCollectionCloner(); cloner->setStopAfterStage_forTest("listIndexes"); _mockServer->setCommandReply("count", createCountResponse(1)); @@ -216,7 +237,7 @@ TEST_F(CollectionClonerTest, ListIndexesHasResults) { ASSERT_EQ(3, cloner->getStats().indexes); } -TEST_F(CollectionClonerTest, CollectionClonerResendsListIndexesCommandOnRetriableError) { +TEST_F(CollectionClonerTestResumable, CollectionClonerResendsListIndexesCommandOnRetriableError) { auto cloner = makeCollectionCloner(); cloner->setStopAfterStage_forTest("listIndexes"); _mockServer->setCommandReply("count", createCountResponse(1)); @@ -233,7 +254,7 @@ TEST_F(CollectionClonerTest, CollectionClonerResendsListIndexesCommandOnRetriabl ASSERT_EQ(2, cloner->getStats().indexes); } -TEST_F(CollectionClonerTest, BeginCollection) { +TEST_F(CollectionClonerTestResumable, BeginCollection) { NamespaceString collNss; CollectionOptions collOptions; BSONObj collIdIndexSpec; @@ -270,7 +291,7 @@ TEST_F(CollectionClonerTest, BeginCollection) { } } -TEST_F(CollectionClonerTest, BeginCollectionFailed) { +TEST_F(CollectionClonerTestResumable, BeginCollectionFailed) { _storageInterface.createCollectionForBulkFn = [&](const NamespaceString& theNss, const CollectionOptions& theOptions, const BSONObj idIndexSpec, @@ -285,7 +306,7 @@ TEST_F(CollectionClonerTest, BeginCollectionFailed) { ASSERT_EQUALS(ErrorCodes::OperationFailed, cloner->run()); } -TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) { +TEST_F(CollectionClonerTestResumable, InsertDocumentsSingleBatch) { // Set up data for preliminary stages _mockServer->setCommandReply("count", createCountResponse(2)); _mockServer->setCommandReply("listIndexes", @@ -305,7 +326,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) { ASSERT_EQUALS(1u, stats.receivedBatches); } -TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) { +TEST_F(CollectionClonerTestResumable, InsertDocumentsMultipleBatches) { // Set up data for preliminary stages _mockServer->setCommandReply("count", createCountResponse(2)); _mockServer->setCommandReply("listIndexes", @@ -327,7 +348,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) { ASSERT_EQUALS(2u, stats.receivedBatches); } -TEST_F(CollectionClonerTest, InsertDocumentsScheduleDBWorkFailed) { +TEST_F(CollectionClonerTestResumable, InsertDocumentsScheduleDBWorkFailed) { // Set up data for preliminary stages _mockServer->setCommandReply("count", createCountResponse(2)); _mockServer->setCommandReply("listIndexes", @@ -364,7 +385,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsScheduleDBWorkFailed) { clonerThread.join(); } -TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) { +TEST_F(CollectionClonerTestResumable, InsertDocumentsCallbackCanceled) { // Set up data for preliminary stages _mockServer->setCommandReply("count", createCountResponse(2)); _mockServer->setCommandReply("listIndexes", @@ -407,7 +428,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) { clonerThread.join(); } -TEST_F(CollectionClonerTest, InsertDocumentsFailed) { +TEST_F(CollectionClonerTestResumable, InsertDocumentsFailed) { // Set up data for preliminary stages _mockServer->setCommandReply("count", createCountResponse(2)); _mockServer->setCommandReply("listIndexes", @@ -447,7 +468,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsFailed) { clonerThread.join(); } -TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) { +TEST_F(CollectionClonerTestResumable, DoNotCreateIDIndexIfAutoIndexIdUsed) { NamespaceString collNss; CollectionOptions collOptions; // We initialize collIndexSpecs with fake information to ensure it is overwritten by an empty @@ -482,11 +503,8 @@ TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) { ASSERT_EQ(collNss, _nss); } -TEST_F(CollectionClonerTest, NonResumableQuerySuccess) { +TEST_F(CollectionClonerTestNonResumable, NonResumableQuerySuccess) { // Set client wireVersion to 4.2, where we do not yet support resumable cloning. - _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS, - WireVersion::SHARDED_TRANSACTIONS); - // Set up data for preliminary stages auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" << "_id_"); @@ -509,11 +527,7 @@ TEST_F(CollectionClonerTest, NonResumableQuerySuccess) { ASSERT_EQUALS(3u, stats.documentsCopied); } -TEST_F(CollectionClonerTest, NonResumableQueryFailure) { - // Set client wireVersion to 4.2, where we do not yet support resumable cloning. - _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS, - WireVersion::SHARDED_TRANSACTIONS); - +TEST_F(CollectionClonerTestNonResumable, NonResumableQueryFailure) { // Set up data for preliminary stages auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" << "_id_"); @@ -553,7 +567,7 @@ TEST_F(CollectionClonerTest, NonResumableQueryFailure) { } // We will retry our query without having yet obtained a resume token. -TEST_F(CollectionClonerTest, ResumableQueryFailTransientlyBeforeFirstBatchRetrySuccess) { +TEST_F(CollectionClonerTestResumable, ResumableQueryFailTransientlyBeforeFirstBatchRetrySuccess) { _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); // Set up data for preliminary stages @@ -615,7 +629,7 @@ TEST_F(CollectionClonerTest, ResumableQueryFailTransientlyBeforeFirstBatchRetryS } // We will resume our query using the resume token we stored after receiving the first batch. -TEST_F(CollectionClonerTest, ResumableQueryFailTransientlyAfterFirstBatchRetrySuccess) { +TEST_F(CollectionClonerTestResumable, ResumableQueryFailTransientlyAfterFirstBatchRetrySuccess) { _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); // Set up data for preliminary stages @@ -671,7 +685,7 @@ TEST_F(CollectionClonerTest, ResumableQueryFailTransientlyAfterFirstBatchRetrySu ASSERT_EQUALS(5u, stats.documentsCopied); } -TEST_F(CollectionClonerTest, ResumableQueryNonRetriableError) { +TEST_F(CollectionClonerTestResumable, ResumableQueryNonRetriableError) { _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); // Set up data for preliminary stages @@ -715,7 +729,8 @@ TEST_F(CollectionClonerTest, ResumableQueryNonRetriableError) { clonerThread.join(); } -TEST_F(CollectionClonerTest, ResumableQueryFailNonTransientlyAfterProgressMadeCannotRetry) { +TEST_F(CollectionClonerTestResumable, + ResumableQueryFailNonTransientlyAfterProgressMadeCannotRetry) { _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); // Set up data for preliminary stages @@ -760,7 +775,7 @@ TEST_F(CollectionClonerTest, ResumableQueryFailNonTransientlyAfterProgressMadeCa } // We retry the query after a transient error and we immediately encounter a non-retriable one. -TEST_F(CollectionClonerTest, ResumableQueryNonTransientErrorAtRetry) { +TEST_F(CollectionClonerTestResumable, ResumableQueryNonTransientErrorAtRetry) { _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); // Set up data for preliminary stages @@ -821,7 +836,7 @@ TEST_F(CollectionClonerTest, ResumableQueryNonTransientErrorAtRetry) { // We retry the query after a transient error, we make a bit more progress and then we encounter // a non-retriable one. -TEST_F(CollectionClonerTest, ResumableQueryNonTransientErrorAfterPastRetry) { +TEST_F(CollectionClonerTestResumable, ResumableQueryNonTransientErrorAfterPastRetry) { _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); // Set up data for preliminary stages @@ -889,7 +904,7 @@ TEST_F(CollectionClonerTest, ResumableQueryNonTransientErrorAfterPastRetry) { // We resume a query, receive some more data, then get a transient error again. The goal of this // test is to make sure we don't forget to request the _next_ resume token when resuming a query. -TEST_F(CollectionClonerTest, ResumableQueryTwoResumes) { +TEST_F(CollectionClonerTestResumable, ResumableQueryTwoResumes) { /** * Test runs like so: @@ -989,11 +1004,7 @@ TEST_F(CollectionClonerTest, ResumableQueryTwoResumes) { // We receive a QueryPlanKilled error, then a NamespaceNotFound error, indicating that the // collection no longer exists in the database. -TEST_F(CollectionClonerTest, NonResumableCursorErrorDropOK) { - // Set client wireVersion to 4.2, where we do not yet support resumable cloning. - _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS, - WireVersion::SHARDED_TRANSACTIONS); - +TEST_F(CollectionClonerTestNonResumable, NonResumableCursorErrorDropOK) { // Set up data for preliminary stages auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" << "_id_"); @@ -1042,11 +1053,7 @@ TEST_F(CollectionClonerTest, NonResumableCursorErrorDropOK) { // We receive an OperationFailed error, but the next error we receive is _not_ NamespaceNotFound, // which means the collection still exists in the database, but we cannot resume the query. -TEST_F(CollectionClonerTest, NonResumableCursorErrorThenOtherError) { - // Set client wireVersion to 4.2, where we do not yet support resumable cloning. - _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS, - WireVersion::SHARDED_TRANSACTIONS); - +TEST_F(CollectionClonerTestNonResumable, NonResumableCursorErrorThenOtherError) { // Set up data for preliminary stages auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" << "_id_"); @@ -1097,11 +1104,7 @@ TEST_F(CollectionClonerTest, NonResumableCursorErrorThenOtherError) { // We receive a CursorNotFound error, but the next query succeeds, indicating that the // collection still exists in the database, but we cannot resume the query. -TEST_F(CollectionClonerTest, NonResumableCursorErrorThenSuccessEqualsFailure) { - // Set client wireVersion to 4.2, where we do not yet support resumable cloning. - _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS, - WireVersion::SHARDED_TRANSACTIONS); - +TEST_F(CollectionClonerTestNonResumable, NonResumableCursorErrorThenSuccessEqualsFailure) { // Set up data for preliminary stages auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" << "_id_"); diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp index d380af31aad..9301ee648a6 100644 --- a/src/mongo/db/repl/database_cloner_test.cpp +++ b/src/mongo/db/repl/database_cloner_test.cpp @@ -71,6 +71,7 @@ protected: return std::move(localLoader); }; + setInitialSyncId(); } std::unique_ptr makeDatabaseCloner() { return std::make_unique(_dbName, diff --git a/src/mongo/db/repl/initial_sync_shared_data.h b/src/mongo/db/repl/initial_sync_shared_data.h index 8ab165180cb..75b8ac7e5f9 100644 --- a/src/mongo/db/repl/initial_sync_shared_data.h +++ b/src/mongo/db/repl/initial_sync_shared_data.h @@ -34,8 +34,10 @@ #include "mongo/base/status.h" #include "mongo/base/string_data.h" #include "mongo/db/server_options.h" +#include "mongo/db/wire_version.h" #include "mongo/platform/mutex.h" #include "mongo/util/clock_source.h" +#include "mongo/util/uuid.h" namespace mongo { namespace repl { @@ -85,6 +87,34 @@ public: return _totalRetries; } + /** + * Sets the wire version of the sync source. + */ + void setSyncSourceWireVersion(WithLock, WireVersion wireVersion) { + _syncSourceWireVersion = wireVersion; + } + + /** + * Returns the wire version of the sync source, if previously set. + */ + boost::optional getSyncSourceWireVersion(WithLock) { + return _syncSourceWireVersion; + } + + /** + * Sets the initial sync ID of the sync source. + */ + void setInitialSyncSourceId(WithLock, boost::optional syncSourceId) { + _initialSyncSourceId = syncSourceId; + } + + /** + * Gets the previously-set initial sync ID of the sync source. + */ + boost::optional getInitialSyncSourceId(WithLock) { + return _initialSyncSourceId; + } + /** * Returns the total time the sync source has been unreachable, including any current outage. */ @@ -221,6 +251,12 @@ private: // The total time across all outages in this initial sync attempt, but excluding any current // outage, that we were retrying because we were unable to reach the sync source. Milliseconds _totalTimeUnreachable; + + // The sync source wire version at the start of data cloning. + boost::optional _syncSourceWireVersion; + + // The initial sync ID on the source at the start of data cloning. + boost::optional _initialSyncSourceId; }; } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index f45de57cc67..3a1470c6e41 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -511,6 +511,7 @@ void InitialSyncer::waitForCloner_forTest() { void InitialSyncer::_setUp_inlock(OperationContext* opCtx, std::uint32_t initialSyncMaxAttempts) { // 'opCtx' is passed through from startup(). _replicationProcess->getConsistencyMarkers()->setInitialSyncFlag(opCtx); + _replicationProcess->getConsistencyMarkers()->clearInitialSyncId(opCtx); auto serviceCtx = opCtx->getServiceContext(); _storage->setInitialDataTimestamp(serviceCtx, Timestamp::kAllowUnstableCheckpointsSentinel); @@ -550,6 +551,7 @@ void InitialSyncer::_tearDown_inlock(OperationContext* opCtx, reconstructPreparedTransactions(opCtx, repl::OplogApplication::Mode::kInitialSync); + _replicationProcess->getConsistencyMarkers()->setInitialSyncIdIfNotSet(opCtx); _replicationProcess->getConsistencyMarkers()->clearInitialSyncFlag(opCtx); // All updates that represent initial sync must be completed before setting the initial data diff --git a/src/mongo/db/repl/replication_consistency_markers.h b/src/mongo/db/repl/replication_consistency_markers.h index e873d4977c9..57cd96262c1 100644 --- a/src/mongo/db/repl/replication_consistency_markers.h +++ b/src/mongo/db/repl/replication_consistency_markers.h @@ -72,6 +72,16 @@ class StorageInterface; * } * * See below for explanations of each field. + * + * The initialSyncId document, in 'local.replset.initialSyncId', is used to detect when nodes have + * been resynced. It is set at the end of initial sync, or whenever replication is started when the + * document does not exist. + * + * Example of all fields: + * { + * _id: , + * wallTime: + * } */ class ReplicationConsistencyMarkers { ReplicationConsistencyMarkers(const ReplicationConsistencyMarkers&) = delete; @@ -254,6 +264,22 @@ public: * or `oplogTruncateAfterPoint`. */ virtual Status createInternalCollections(OperationContext* opCtx) = 0; + + /** + * Set the initial sync ID and wall time if it is not already set. This will create the + * collection if necessary. + */ + virtual void setInitialSyncIdIfNotSet(OperationContext* opCtx) = 0; + + /** + * Clears the initial sync ID by dropping the collection. + */ + virtual void clearInitialSyncId(OperationContext* opCtx) = 0; + + /** + * Returns the initial sync id object, or an empty object if none. + */ + virtual BSONObj getInitialSyncId(OperationContext* opCtx) = 0; }; } // namespace repl diff --git a/src/mongo/db/repl/replication_consistency_markers.idl b/src/mongo/db/repl/replication_consistency_markers.idl index df977970a30..7610f82e1fa 100644 --- a/src/mongo/db/repl/replication_consistency_markers.idl +++ b/src/mongo/db/repl/replication_consistency_markers.idl @@ -71,3 +71,13 @@ structs: type: string description: "Always set to 'oplogTruncateAfterPoint' to easily retrieve it." + InitialSyncIdDocument: + description: A document in which the server stores data related to the initial sync of the server. + fields: + _id: + type: uuid + description: "An arbitrary unique identifier associated with the initial sync of the server." + wallTime: + type: date + optional: true + description: "The walltime at which the initial sync document was written." diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp index c06c8126f1a..fe73097941c 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp @@ -47,6 +47,7 @@ namespace repl { constexpr StringData ReplicationConsistencyMarkersImpl::kDefaultMinValidNamespace; constexpr StringData ReplicationConsistencyMarkersImpl::kDefaultOplogTruncateAfterPointNamespace; +constexpr StringData ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace; namespace { const BSONObj kInitialSyncFlag(BSON(MinValidDocument::kInitialSyncFlagFieldName << true)); @@ -60,15 +61,18 @@ ReplicationConsistencyMarkersImpl::ReplicationConsistencyMarkersImpl( storageInterface, NamespaceString(ReplicationConsistencyMarkersImpl::kDefaultMinValidNamespace), NamespaceString( - ReplicationConsistencyMarkersImpl::kDefaultOplogTruncateAfterPointNamespace)) {} + ReplicationConsistencyMarkersImpl::kDefaultOplogTruncateAfterPointNamespace), + NamespaceString(ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace)) {} ReplicationConsistencyMarkersImpl::ReplicationConsistencyMarkersImpl( StorageInterface* storageInterface, NamespaceString minValidNss, - NamespaceString oplogTruncateAfterPointNss) + NamespaceString oplogTruncateAfterPointNss, + NamespaceString initialSyncIdNss) : _storageInterface(storageInterface), _minValidNss(minValidNss), - _oplogTruncateAfterPointNss(oplogTruncateAfterPointNss) {} + _oplogTruncateAfterPointNss(oplogTruncateAfterPointNss), + _initialSyncIdNss(initialSyncIdNss) {} boost::optional ReplicationConsistencyMarkersImpl::_getMinValidDocument( OperationContext* opCtx) const { @@ -526,5 +530,44 @@ Status ReplicationConsistencyMarkersImpl::createInternalCollections(OperationCon return Status::OK(); } +void ReplicationConsistencyMarkersImpl::setInitialSyncIdIfNotSet(OperationContext* opCtx) { + auto status = + _storageInterface->createCollection(opCtx, _initialSyncIdNss, CollectionOptions()); + if (!status.isOK() && status.code() != ErrorCodes::NamespaceExists) { + LOGV2_FATAL( + 4608500, "Failed to create collection", "namespace"_attr = _initialSyncIdNss.ns()); + fassertFailedWithStatus(4608502, status); + } + + auto prevId = _storageInterface->findSingleton(opCtx, _initialSyncIdNss); + if (prevId.getStatus() == ErrorCodes::CollectionIsEmpty) { + auto doc = BSON("_id" << UUID::gen() << "wallTime" + << opCtx->getServiceContext()->getPreciseClockSource()->now()); + fassert(4608503, + _storageInterface->insertDocument(opCtx, + _initialSyncIdNss, + TimestampedBSONObj{doc, Timestamp()}, + OpTime::kUninitializedTerm)); + } else if (!prevId.isOK()) { + fassertFailedWithStatus(4608504, prevId.getStatus()); + } +} + +void ReplicationConsistencyMarkersImpl::clearInitialSyncId(OperationContext* opCtx) { + fassert(4608501, _storageInterface->dropCollection(opCtx, _initialSyncIdNss)); +} + +BSONObj ReplicationConsistencyMarkersImpl::getInitialSyncId(OperationContext* opCtx) { + auto idStatus = _storageInterface->findSingleton(opCtx, _initialSyncIdNss); + if (idStatus.isOK()) { + return idStatus.getValue(); + } + if (idStatus.getStatus() != ErrorCodes::CollectionIsEmpty && + idStatus.getStatus() != ErrorCodes::NamespaceNotFound) { + uassertStatusOK(idStatus); + } + return BSONObj(); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.h b/src/mongo/db/repl/replication_consistency_markers_impl.h index 552d03a372e..40fa2768529 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl.h +++ b/src/mongo/db/repl/replication_consistency_markers_impl.h @@ -53,11 +53,13 @@ public: static constexpr StringData kDefaultMinValidNamespace = "local.replset.minvalid"_sd; static constexpr StringData kDefaultOplogTruncateAfterPointNamespace = "local.replset.oplogTruncateAfterPoint"_sd; + static constexpr StringData kDefaultInitialSyncIdNamespace = "local.replset.initialSyncId"_sd; explicit ReplicationConsistencyMarkersImpl(StorageInterface* storageInterface); ReplicationConsistencyMarkersImpl(StorageInterface* storageInterface, NamespaceString minValidNss, - NamespaceString oplogTruncateAfterNss); + NamespaceString oplogTruncateAfterNss, + NamespaceString initialSyncIdNss); void initializeMinValidDocument(OperationContext* opCtx) override; @@ -89,7 +91,11 @@ public: void clearAppliedThrough(OperationContext* opCtx, const Timestamp& writeTimestamp) override; OpTime getAppliedThrough(OperationContext* opCtx) const override; - Status createInternalCollections(OperationContext* opCtx); + Status createInternalCollections(OperationContext* opCtx) override; + + void setInitialSyncIdIfNotSet(OperationContext* opCtx) override; + void clearInitialSyncId(OperationContext* opCtx) override; + BSONObj getInitialSyncId(OperationContext* opCtx) override; private: /** @@ -124,6 +130,7 @@ private: StorageInterface* _storageInterface; const NamespaceString _minValidNss; const NamespaceString _oplogTruncateAfterPointNss; + const NamespaceString _initialSyncIdNss; // Protects modifying and reading _isPrimary below. mutable Mutex _truncatePointIsPrimaryMutex = diff --git a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp index 8d1e7c07bc5..8767fec7057 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp @@ -56,6 +56,7 @@ using namespace mongo::repl; NamespaceString kMinValidNss("local", "replset.minvalid"); NamespaceString kOplogTruncateAfterPointNss("local", "replset.oplogTruncateAfterPoint"); +NamespaceString kInitialSyncIdNss("local", "replset.initialSyncId"); /** * Returns min valid document. @@ -121,7 +122,7 @@ bool RecoveryUnitWithDurabilityTracking::waitUntilDurable(OperationContext* opCt TEST_F(ReplicationConsistencyMarkersTest, InitialSyncFlag) { ReplicationConsistencyMarkersImpl consistencyMarkers( - getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss); + getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss, kInitialSyncIdNss); auto opCtx = getOperationContext(); ASSERT(consistencyMarkers.createInternalCollections(opCtx).isOK()); consistencyMarkers.initializeMinValidDocument(opCtx); @@ -145,7 +146,7 @@ TEST_F(ReplicationConsistencyMarkersTest, InitialSyncFlag) { TEST_F(ReplicationConsistencyMarkersTest, GetMinValidAfterSettingInitialSyncFlagWorks) { ReplicationConsistencyMarkersImpl consistencyMarkers( - getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss); + getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss, kInitialSyncIdNss); auto opCtx = getOperationContext(); ASSERT(consistencyMarkers.createInternalCollections(opCtx).isOK()); consistencyMarkers.initializeMinValidDocument(opCtx); @@ -164,7 +165,7 @@ TEST_F(ReplicationConsistencyMarkersTest, GetMinValidAfterSettingInitialSyncFlag TEST_F(ReplicationConsistencyMarkersTest, ClearInitialSyncFlagResetsOplogTruncateAfterPoint) { ReplicationConsistencyMarkersImpl consistencyMarkers( - getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss); + getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss, kInitialSyncIdNss); auto opCtx = getOperationContext(); ASSERT(consistencyMarkers.createInternalCollections(opCtx).isOK()); consistencyMarkers.initializeMinValidDocument(opCtx); @@ -187,7 +188,7 @@ TEST_F(ReplicationConsistencyMarkersTest, ClearInitialSyncFlagResetsOplogTruncat TEST_F(ReplicationConsistencyMarkersTest, ReplicationConsistencyMarkers) { ReplicationConsistencyMarkersImpl consistencyMarkers( - getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss); + getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss, kInitialSyncIdNss); auto opCtx = getOperationContext(); ASSERT(consistencyMarkers.createInternalCollections(opCtx).isOK()); consistencyMarkers.initializeMinValidDocument(opCtx); @@ -242,5 +243,43 @@ TEST_F(ReplicationConsistencyMarkersTest, ReplicationConsistencyMarkers) { ASSERT_FALSE(recoveryUnit->waitUntilDurableCalled); } +TEST_F(ReplicationConsistencyMarkersTest, InitialSyncId) { + ReplicationConsistencyMarkersImpl consistencyMarkers( + getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss, kInitialSyncIdNss); + auto opCtx = getOperationContext(); + + // Initially, initialSyncId should be unset. + auto initialSyncIdShouldBeUnset = consistencyMarkers.getInitialSyncId(opCtx); + ASSERT(initialSyncIdShouldBeUnset.isEmpty()) << initialSyncIdShouldBeUnset; + + // Clearing an already-clear initialSyncId should be OK. + consistencyMarkers.clearInitialSyncId(opCtx); + initialSyncIdShouldBeUnset = consistencyMarkers.getInitialSyncId(opCtx); + ASSERT(initialSyncIdShouldBeUnset.isEmpty()) << initialSyncIdShouldBeUnset; + + consistencyMarkers.setInitialSyncIdIfNotSet(opCtx); + auto firstInitialSyncIdBson = consistencyMarkers.getInitialSyncId(opCtx); + ASSERT_FALSE(firstInitialSyncIdBson.isEmpty()); + InitialSyncIdDocument firstInitialSyncIdDoc = InitialSyncIdDocument::parse( + IDLParserErrorContext("initialSyncId"), firstInitialSyncIdBson); + + // Setting it twice should change nothing. + consistencyMarkers.setInitialSyncIdIfNotSet(opCtx); + ASSERT_BSONOBJ_EQ(firstInitialSyncIdBson, consistencyMarkers.getInitialSyncId(opCtx)); + + // Clear it; should return to empty. + consistencyMarkers.clearInitialSyncId(opCtx); + initialSyncIdShouldBeUnset = consistencyMarkers.getInitialSyncId(opCtx); + ASSERT(initialSyncIdShouldBeUnset.isEmpty()) << initialSyncIdShouldBeUnset; + + // Set it; it should have a different UUID. + consistencyMarkers.setInitialSyncIdIfNotSet(opCtx); + auto secondInitialSyncIdBson = consistencyMarkers.getInitialSyncId(opCtx); + ASSERT_FALSE(secondInitialSyncIdBson.isEmpty()); + InitialSyncIdDocument secondInitialSyncIdDoc = InitialSyncIdDocument::parse( + IDLParserErrorContext("initialSyncId"), secondInitialSyncIdBson); + ASSERT_NE(firstInitialSyncIdDoc.get_id(), secondInitialSyncIdDoc.get_id()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.cpp b/src/mongo/db/repl/replication_consistency_markers_mock.cpp index 28710b460eb..875c47f9e3d 100644 --- a/src/mongo/db/repl/replication_consistency_markers_mock.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_mock.cpp @@ -135,5 +135,13 @@ Status ReplicationConsistencyMarkersMock::createInternalCollections(OperationCon return Status::OK(); } +void ReplicationConsistencyMarkersMock::setInitialSyncIdIfNotSet(OperationContext* opCtx) {} + +void ReplicationConsistencyMarkersMock::clearInitialSyncId(OperationContext* opCtx) {} + +BSONObj ReplicationConsistencyMarkersMock::getInitialSyncId(OperationContext* opCtx) { + return BSONObj(); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.h b/src/mongo/db/repl/replication_consistency_markers_mock.h index 7afe39a6bd7..a925e014d93 100644 --- a/src/mongo/db/repl/replication_consistency_markers_mock.h +++ b/src/mongo/db/repl/replication_consistency_markers_mock.h @@ -83,6 +83,10 @@ public: Status createInternalCollections(OperationContext* opCtx) override; + void setInitialSyncIdIfNotSet(OperationContext* opCtx) override; + void clearInitialSyncId(OperationContext* opCtx) override; + BSONObj getInitialSyncId(OperationContext* opCtx) override; + private: mutable Mutex _initialSyncFlagMutex = MONGO_MAKE_LATCH("ReplicationConsistencyMarkersMock::_initialSyncFlagMutex"); diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 881be41aa6d..dde8da44abb 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -744,13 +744,6 @@ public: */ virtual std::vector getHostsWrittenTo(const OpTime& op, bool durablyWritten) = 0; - /** - * Returns a vector of the members other than ourself in the replica set, as specified in - * the replica set config. Invalid to call if we are not in replica set mode. Returns - * an empty vector if we do not have a valid config. - */ - virtual std::vector getOtherNodesInReplSet() const = 0; - /** * Returns a BSONObj containing a representation of the current default write concern. */ diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 16ba689edab..2635494cbd4 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -738,6 +738,9 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* opCtx, auto memberState = getMemberState(); invariant(memberState.startup2() || memberState.removed()); invariant(setFollowerMode(MemberState::RS_RECOVERING)); + // Set an initial sync ID, in case we were upgraded or restored from backup without doing + // an initial sync. + _replicationProcess->getConsistencyMarkers()->setInitialSyncIdIfNotSet(opCtx); _externalState->startSteadyStateReplication(opCtx, this); return; } @@ -4273,24 +4276,6 @@ std::vector ReplicationCoordinatorImpl::getHostsWrittenTo(const OpT return _topCoord->getHostsWrittenTo(op, durablyWritten); } -std::vector ReplicationCoordinatorImpl::getOtherNodesInReplSet() const { - stdx::lock_guard lk(_mutex); - invariant(_settings.usingReplSets()); - - std::vector nodes; - if (_selfIndex == -1) { - return nodes; - } - - for (int i = 0; i < _rsConfig.getNumMembers(); ++i) { - if (i == _selfIndex) - continue; - - nodes.push_back(_rsConfig.getMemberAt(i).getHostAndPort()); - } - return nodes; -} - Status ReplicationCoordinatorImpl::checkIfWriteConcernCanBeSatisfied( const WriteConcernOptions& writeConcern) const { stdx::lock_guard lock(_mutex); diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index fc9e4a39888..b08d49f4429 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -261,8 +261,6 @@ public: virtual std::vector getHostsWrittenTo(const OpTime& op, bool durablyWritten) override; - virtual std::vector getOtherNodesInReplSet() const override; - virtual WriteConcernOptions getGetLastErrorDefault() override; virtual Status checkReplEnabledForCommand(BSONObjBuilder* result) override; diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index aff27217a52..7d1498d80b2 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -3013,34 +3013,6 @@ TEST_F(ReplCoordTest, } } -TEST_F(ReplCoordTest, NodeReturnsNoNodesWhenGetOtherNodesInReplSetIsRunBeforeHavingAConfig) { - start(); - ASSERT_EQUALS(0U, getReplCoord()->getOtherNodesInReplSet().size()); -} - -TEST_F(ReplCoordTest, NodeReturnsListOfNodesOtherThanItselfInResponseToGetOtherNodesInReplSet) { - assertStartSuccess(BSON("_id" - << "mySet" - << "version" << 2 << "members" - << BSON_ARRAY(BSON("_id" << 0 << "host" - << "h1") - << BSON("_id" << 1 << "host" - << "h2") - << BSON("_id" << 2 << "host" - << "h3" - << "priority" << 0 << "hidden" << true))), - HostAndPort("h1")); - - std::vector otherNodes = getReplCoord()->getOtherNodesInReplSet(); - ASSERT_EQUALS(2U, otherNodes.size()); - if (otherNodes[0] == HostAndPort("h2")) { - ASSERT_EQUALS(HostAndPort("h3"), otherNodes[1]); - } else { - ASSERT_EQUALS(HostAndPort("h3"), otherNodes[0]); - ASSERT_EQUALS(HostAndPort("h2"), otherNodes[1]); - } -} - TEST_F(ReplCoordTest, AwaitIsMasterResponseReturnsCurrentTopologyVersionOnTimeOut) { init(); assertStartSuccess(BSON("_id" diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 6ddd4d4e84f..3f27644b3c6 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -407,10 +407,6 @@ std::vector ReplicationCoordinatorMock::getHostsWrittenTo(const OpT return std::vector(); } -std::vector ReplicationCoordinatorMock::getOtherNodesInReplSet() const { - return std::vector(); -} - Status ReplicationCoordinatorMock::checkIfWriteConcernCanBeSatisfied( const WriteConcernOptions& writeConcern) const { return Status::OK(); diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 7a4b8db79a0..970645e8154 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -227,8 +227,6 @@ public: virtual std::vector getHostsWrittenTo(const OpTime& op, bool durablyWritten); - virtual std::vector getOtherNodesInReplSet() const; - virtual WriteConcernOptions getGetLastErrorDefault(); virtual Status checkReplEnabledForCommand(BSONObjBuilder* result); diff --git a/src/mongo/db/repl/replication_coordinator_noop.cpp b/src/mongo/db/repl/replication_coordinator_noop.cpp index d0e52db43a2..d54c2acc1a2 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.cpp +++ b/src/mongo/db/repl/replication_coordinator_noop.cpp @@ -350,10 +350,6 @@ std::vector ReplicationCoordinatorNoOp::getHostsWrittenTo(const OpT MONGO_UNREACHABLE; } -std::vector ReplicationCoordinatorNoOp::getOtherNodesInReplSet() const { - MONGO_UNREACHABLE; -} - Status ReplicationCoordinatorNoOp::checkIfWriteConcernCanBeSatisfied( const WriteConcernOptions&) const { MONGO_UNREACHABLE; diff --git a/src/mongo/db/repl/replication_coordinator_noop.h b/src/mongo/db/repl/replication_coordinator_noop.h index 382073c21fb..b897576dcf9 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.h +++ b/src/mongo/db/repl/replication_coordinator_noop.h @@ -196,8 +196,6 @@ public: std::vector getHostsWrittenTo(const OpTime&, bool) final; - std::vector getOtherNodesInReplSet() const final; - Status checkReplEnabledForCommand(BSONObjBuilder*) final; HostAndPort chooseNewSyncSource(const OpTime&) final; diff --git a/src/mongo/embedded/replication_coordinator_embedded.cpp b/src/mongo/embedded/replication_coordinator_embedded.cpp index 5e8d2c68a95..a9014d89808 100644 --- a/src/mongo/embedded/replication_coordinator_embedded.cpp +++ b/src/mongo/embedded/replication_coordinator_embedded.cpp @@ -375,10 +375,6 @@ std::vector ReplicationCoordinatorEmbedded::getHostsWrittenTo(const UASSERT_NOT_IMPLEMENTED; } -std::vector ReplicationCoordinatorEmbedded::getOtherNodesInReplSet() const { - UASSERT_NOT_IMPLEMENTED; -} - Status ReplicationCoordinatorEmbedded::checkIfWriteConcernCanBeSatisfied( const WriteConcernOptions&) const { UASSERT_NOT_IMPLEMENTED; diff --git a/src/mongo/embedded/replication_coordinator_embedded.h b/src/mongo/embedded/replication_coordinator_embedded.h index 06716e97a4e..75cb9b53922 100644 --- a/src/mongo/embedded/replication_coordinator_embedded.h +++ b/src/mongo/embedded/replication_coordinator_embedded.h @@ -203,8 +203,6 @@ public: std::vector getHostsWrittenTo(const repl::OpTime&, bool) override; - std::vector getOtherNodesInReplSet() const override; - Status checkReplEnabledForCommand(BSONObjBuilder*) override; HostAndPort chooseNewSyncSource(const repl::OpTime&) override; diff --git a/src/mongo/util/uuid.h b/src/mongo/util/uuid.h index b90b4d29d30..c142da4abea 100644 --- a/src/mongo/util/uuid.h +++ b/src/mongo/util/uuid.h @@ -46,6 +46,7 @@ namespace repl { class CollectionInfo; class OplogEntryBase; class DurableReplOperation; +class InitialSyncIdDocument; } // namespace repl namespace idl { @@ -83,6 +84,7 @@ class UUID { friend class repl::CollectionInfo; friend class repl::OplogEntryBase; friend class repl::DurableReplOperation; + friend class repl::InitialSyncIdDocument; friend class ResumeTokenInternal; friend class ShardCollectionTypeBase; friend class VoteCommitIndexBuild; -- cgit v1.2.1