summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2020-03-27 14:50:35 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-03-27 19:19:44 +0000
commitb70908c4898c4b02d8c70df3f796fa3e8cfe5bdd (patch)
treed29354ae8cf4931f7956de2eabd5bd56219027fc
parentdc7724efa185a1b64f95fc84d0f7794dbc2fcdae (diff)
downloadmongo-b70908c4898c4b02d8c70df3f796fa3e8cfe5bdd.tar.gz
SERVER-46085 Fail initial sync attempt if sync source is in initial sync
-rw-r--r--jstests/replsets/initial_sync_fails_after_source_resyncs.js93
-rw-r--r--jstests/replsets/initial_sync_fails_when_source_resyncs.js93
-rw-r--r--src/mongo/client/dbclient_mockcursor.cpp4
-rw-r--r--src/mongo/db/repl/SConscript12
-rw-r--r--src/mongo/db/repl/all_database_cloner.cpp58
-rw-r--r--src/mongo/db/repl/all_database_cloner.h8
-rw-r--r--src/mongo/db/repl/all_database_cloner_test.cpp107
-rw-r--r--src/mongo/db/repl/base_cloner.cpp59
-rw-r--r--src/mongo/db/repl/base_cloner.h23
-rw-r--r--src/mongo/db/repl/cloner_test_fixture.cpp12
-rw-r--r--src/mongo/db/repl/cloner_test_fixture.h3
-rw-r--r--src/mongo/db/repl/collection_cloner_test.cpp103
-rw-r--r--src/mongo/db/repl/database_cloner_test.cpp1
-rw-r--r--src/mongo/db/repl/initial_sync_shared_data.h36
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp2
-rw-r--r--src/mongo/db/repl/replication_consistency_markers.h26
-rw-r--r--src/mongo/db/repl/replication_consistency_markers.idl10
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl.cpp49
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl.h11
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl_test.cpp47
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_mock.cpp8
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_mock.h4
-rw-r--r--src/mongo/db/repl/replication_coordinator.h7
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp21
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp28
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_noop.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_noop.h2
-rw-r--r--src/mongo/embedded/replication_coordinator_embedded.cpp4
-rw-r--r--src/mongo/embedded/replication_coordinator_embedded.h2
-rw-r--r--src/mongo/util/uuid.h2
33 files changed, 699 insertions, 148 deletions
diff --git a/jstests/replsets/initial_sync_fails_after_source_resyncs.js b/jstests/replsets/initial_sync_fails_after_source_resyncs.js
new file mode 100644
index 00000000000..f89137531fb
--- /dev/null
+++ b/jstests/replsets/initial_sync_fails_after_source_resyncs.js
@@ -0,0 +1,93 @@
+/**
+ * Tests that initial sync will abort an attempt if the sync source enters and completes initial
+ * sync during cloning (i.e. the source is resynced during an outage).
+ * @tags: [requires_fcv_44]
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js");
+
+const testName = "initial_sync_fails_after_source_resyncs";
+const rst = new ReplSetTest({
+ name: testName,
+ nodes: [{}, {rsConfig: {priority: 0, votes: 0}}],
+ allowChaining: true,
+ useBridge: true
+});
+const nodes = rst.startSet();
+rst.initiateWithHighElectionTimeout();
+
+const primary = rst.getPrimary();
+const primaryDb = primary.getDB("test");
+const initialSyncSource = rst.getSecondary();
+
+// Add some data to be cloned.
+assert.commandWorked(primaryDb.test.insert([{a: 1}, {b: 2}, {c: 3}]));
+rst.awaitReplication();
+
+jsTest.log("Adding the initial sync destination node to the replica set");
+const initialSyncNode = rst.add({
+ rsConfig: {priority: 0, votes: 0},
+ setParameter: {
+ 'failpoint.initialSyncHangBeforeCopyingDatabases': tojson({mode: 'alwaysOn'}),
+ // This test is specifically testing that the cloners detect the source going into initial
+ // sync mode, so we turn off the oplog fetcher to ensure that we don't inadvertently test
+ // that instead.
+ 'failpoint.hangBeforeStartingOplogFetcher': tojson({mode: 'alwaysOn'}),
+ 'numInitialSyncAttempts': 1,
+ 'failpoint.forceSyncSourceCandidate':
+ tojson({mode: 'alwaysOn', data: {hostAndPort: initialSyncSource.host}})
+ }
+});
+rst.reInitiate();
+rst.waitForState(initialSyncNode, ReplSetTest.State.STARTUP_2);
+
+// The code handling this case is common to all cloners, so run it only for the stage most likely
+// to see an error.
+const cloner = 'CollectionCloner';
+const stage = 'query';
+
+// Set us up to hang before finish so we can check status.
+const beforeFinishFailPoint = configureFailPoint(initialSyncNode, "initialSyncHangBeforeFinish");
+const initialSyncNodeDb = initialSyncNode.getDB("test");
+const failPointData = {
+ cloner: cloner,
+ stage: stage,
+ nss: 'test.test'
+};
+// Set us up to stop right before the given stage.
+const beforeStageFailPoint =
+ configureFailPoint(initialSyncNodeDb, "hangBeforeClonerStage", failPointData);
+// Release the initial failpoint.
+assert.commandWorked(initialSyncNodeDb.adminCommand(
+ {configureFailPoint: "initialSyncHangBeforeCopyingDatabases", mode: "off"}));
+beforeStageFailPoint.wait();
+
+jsTestLog("Testing resyncing sync source in cloner " + cloner + " stage " + stage);
+initialSyncSource.disconnect(initialSyncNode);
+rst.restart(initialSyncSource, {startClean: true});
+// Wait for the source to reach SECONDARY
+rst.awaitSecondaryNodes(undefined, [initialSyncSource]);
+
+jsTestLog("Resuming the initial sync.");
+initialSyncSource.reconnect(initialSyncNode);
+beforeStageFailPoint.off();
+beforeFinishFailPoint.wait();
+const res = assert.commandWorked(initialSyncNode.adminCommand({replSetGetStatus: 1}));
+
+// The initial sync should have failed.
+assert.eq(res.initialSyncStatus.failedInitialSyncAttempts, 1);
+beforeFinishFailPoint.off();
+
+// Release the initial sync source and sync node oplog fetcher so the test completes.
+assert.commandWorked(initialSyncNodeDb.adminCommand(
+ {configureFailPoint: "hangBeforeStartingOplogFetcher", mode: "off"}));
+assert.commandWorked(initialSyncSource.getDB("admin").adminCommand(
+ {configureFailPoint: "initialSyncHangBeforeFinish", mode: "off"}));
+
+// We skip validation and dbhashes because the initial sync failed so the initial sync node is
+// invalid and unreachable.
+TestData.skipCheckDBHashes = true;
+rst.stopSet(null, null, {skipValidation: true});
+})();
diff --git a/jstests/replsets/initial_sync_fails_when_source_resyncs.js b/jstests/replsets/initial_sync_fails_when_source_resyncs.js
new file mode 100644
index 00000000000..862fd6083fd
--- /dev/null
+++ b/jstests/replsets/initial_sync_fails_when_source_resyncs.js
@@ -0,0 +1,93 @@
+/**
+ * Tests that initial sync will abort an attempt if the sync source enters initial sync during
+ * cloning. This test will timeout if the attempt is not aborted.
+ *
+ * @tags: [requires_fcv_44]
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js");
+
+const testName = "initial_sync_fails_when_source_resyncs";
+const rst = new ReplSetTest(
+ {name: testName, nodes: [{}, {rsConfig: {priority: 0, votes: 0}}], allowChaining: true});
+const nodes = rst.startSet();
+rst.initiateWithHighElectionTimeout();
+
+const primary = rst.getPrimary();
+const primaryDb = primary.getDB("test");
+let initialSyncSource = rst.getSecondary();
+
+// Add some data to be cloned.
+assert.commandWorked(primaryDb.test.insert([{a: 1}, {b: 2}, {c: 3}]));
+rst.awaitReplication();
+
+jsTest.log("Adding the initial sync destination node to the replica set");
+const initialSyncNode = rst.add({
+ rsConfig: {priority: 0, votes: 0},
+ setParameter: {
+ 'failpoint.initialSyncHangBeforeCopyingDatabases': tojson({mode: 'alwaysOn'}),
+ // This test is specifically testing that the cloners detect the source going into initial
+ // sync mode, so we turn off the oplog fetcher to ensure that we don't inadvertently test
+ // that instead.
+ 'failpoint.hangBeforeStartingOplogFetcher': tojson({mode: 'alwaysOn'}),
+ 'numInitialSyncAttempts': 1,
+ 'failpoint.forceSyncSourceCandidate':
+ tojson({mode: 'alwaysOn', data: {hostAndPort: initialSyncSource.host}})
+ }
+});
+rst.reInitiate();
+rst.waitForState(initialSyncNode, ReplSetTest.State.STARTUP_2);
+
+// The code handling this case is common to all cloners, so run it only for the stage most likely
+// to see an error.
+const cloner = 'CollectionCloner';
+const stage = 'query';
+
+// Set us up to hang before finish so we can check status.
+const beforeFinishFailPoint = configureFailPoint(initialSyncNode, "initialSyncHangBeforeFinish");
+const initialSyncNodeDb = initialSyncNode.getDB("test");
+const failPointData = {
+ cloner: cloner,
+ stage: stage,
+ nss: 'test.test'
+};
+// Set us up to stop right before the given stage.
+const beforeStageFailPoint =
+ configureFailPoint(initialSyncNodeDb, "hangBeforeClonerStage", failPointData);
+// Release the initial failpoint.
+assert.commandWorked(initialSyncNodeDb.adminCommand(
+ {configureFailPoint: "initialSyncHangBeforeCopyingDatabases", mode: "off"}));
+beforeStageFailPoint.wait();
+
+jsTestLog("Testing resyncing sync source in cloner " + cloner + " stage " + stage);
+
+// We hold the source in initial sync mode.
+initialSyncSource = rst.restart(initialSyncSource, {
+ startClean: true,
+ setParameter: {"failpoint.initialSyncHangBeforeFinish": tojson({mode: "alwaysOn"})}
+});
+// Wait for the source to go into initial sync.
+rst.waitForState(initialSyncSource, ReplSetTest.State.STARTUP_2);
+
+jsTestLog("Resuming the initial sync.");
+beforeStageFailPoint.off();
+beforeFinishFailPoint.wait();
+const res = assert.commandWorked(initialSyncNode.adminCommand({replSetGetStatus: 1}));
+
+// The initial sync should have failed.
+assert.eq(res.initialSyncStatus.failedInitialSyncAttempts, 1);
+beforeFinishFailPoint.off();
+
+// Release the initial sync source and sync node oplog fetcher so the test completes.
+assert.commandWorked(initialSyncNodeDb.adminCommand(
+ {configureFailPoint: "hangBeforeStartingOplogFetcher", mode: "off"}));
+assert.commandWorked(initialSyncSource.getDB("admin").adminCommand(
+ {configureFailPoint: "initialSyncHangBeforeFinish", mode: "off"}));
+
+// We skip validation and dbhashes because the initial sync failed so the initial sync node is
+// invalid and unreachable.
+TestData.skipCheckDBHashes = true;
+rst.stopSet(null, null, {skipValidation: true});
+})();
diff --git a/src/mongo/client/dbclient_mockcursor.cpp b/src/mongo/client/dbclient_mockcursor.cpp
index d70fe3b3d06..40b505c7abe 100644
--- a/src/mongo/client/dbclient_mockcursor.cpp
+++ b/src/mongo/client/dbclient_mockcursor.cpp
@@ -74,7 +74,7 @@ void DBClientMockCursor::_fillNextBatch() {
int leftInBatch = _batchSize;
batch.objs.clear();
while (_iter.more() && (!_batchSize || leftInBatch--)) {
- batch.objs.emplace_back(_iter.next().Obj());
+ batch.objs.emplace_back(_iter.next().Obj().getOwned());
}
batch.pos = 0;
@@ -92,4 +92,4 @@ boost::optional<BSONObj> DBClientMockCursor::getPostBatchResumeToken() const {
return _postBatchResumeToken;
}
-} // namespace mongo \ No newline at end of file
+} // namespace mongo
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index dbe257c01b8..b1aa28819d9 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -629,10 +629,17 @@ env.Library(
],
)
+env.Library('member_data',
+ [
+ 'member_data.cpp',
+ ],
+ LIBDEPS=[
+ 'replica_set_messages',
+ ])
+
env.Library('topology_coordinator',
[
'heartbeat_response_action.cpp',
- 'member_data.cpp',
'topology_coordinator.cpp',
env.Idlc('topology_coordinator.idl')[0],
],
@@ -641,6 +648,7 @@ env.Library('topology_coordinator',
'$BUILD_DIR/mongo/rpc/metadata',
'$BUILD_DIR/mongo/util/fail_point',
'isself',
+ 'member_data',
'replica_set_messages',
'repl_settings',
'rslog',
@@ -939,6 +947,8 @@ env.Library(
LIBDEPS = [
'task_runner',
'initial_sync_shared_data',
+ 'member_data',
+ 'replication_consistency_markers_impl',
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/client/clientdriver_network',
'$BUILD_DIR/mongo/util/concurrency/thread_pool',
diff --git a/src/mongo/db/repl/all_database_cloner.cpp b/src/mongo/db/repl/all_database_cloner.cpp
index c1ed1686fc9..53fbbe297b3 100644
--- a/src/mongo/db/repl/all_database_cloner.cpp
+++ b/src/mongo/db/repl/all_database_cloner.cpp
@@ -35,6 +35,8 @@
#include "mongo/base/string_data.h"
#include "mongo/db/repl/all_database_cloner.h"
+#include "mongo/db/repl/replication_consistency_markers_gen.h"
+#include "mongo/db/repl/replication_consistency_markers_impl.h"
#include "mongo/logv2/log.h"
#include "mongo/util/assert_util.h"
namespace mongo {
@@ -47,10 +49,11 @@ AllDatabaseCloner::AllDatabaseCloner(InitialSyncSharedData* sharedData,
ThreadPool* dbPool)
: BaseCloner("AllDatabaseCloner"_sd, sharedData, source, client, storageInterface, dbPool),
_connectStage("connect", this, &AllDatabaseCloner::connectStage),
+ _getInitialSyncIdStage("getInitialSyncId", this, &AllDatabaseCloner::getInitialSyncIdStage),
_listDatabasesStage("listDatabases", this, &AllDatabaseCloner::listDatabasesStage) {}
BaseCloner::ClonerStages AllDatabaseCloner::getStages() {
- return {&_connectStage, &_listDatabasesStage};
+ return {&_connectStage, &_getInitialSyncIdStage, &_listDatabasesStage};
}
Status AllDatabaseCloner::ensurePrimaryOrSecondary(
@@ -65,9 +68,12 @@ Status AllDatabaseCloner::ensurePrimaryOrSecondary(
// There is a window during startup where a node has an invalid configuration and will have
// an isMaster response the same as a removed node. So we must check to see if the node is
// removed by checking local configuration.
- auto otherNodes =
- ReplicationCoordinator::get(getGlobalServiceContext())->getOtherNodesInReplSet();
- if (std::find(otherNodes.begin(), otherNodes.end(), getSource()) == otherNodes.end()) {
+ auto memberData = ReplicationCoordinator::get(getGlobalServiceContext())->getMemberData();
+ auto syncSourceIter = std::find_if(
+ memberData.begin(), memberData.end(), [source = getSource()](const MemberData& member) {
+ return member.getHostAndPort() == source;
+ });
+ if (syncSourceIter == memberData.end()) {
Status status(ErrorCodes::NotMasterOrSecondary,
str::stream() << "Sync source " << getSource()
<< " has been removed from the replication configuration.");
@@ -76,6 +82,20 @@ Status AllDatabaseCloner::ensurePrimaryOrSecondary(
getSharedData()->setInitialSyncStatusIfOK(lk, status);
return status;
}
+
+ // We also check if the sync source has gone into initial sync itself. If so, we'll never be
+ // able to sync from it and we should abort the attempt. Because there is a window during
+ // startup where a node will report being in STARTUP2 even if it is not in initial sync,
+ // we also check to see if it has a sync source. A node in STARTUP2 will not have a sync
+ // source unless it is in initial sync.
+ if (syncSourceIter->getState().startup2() && !syncSourceIter->getSyncSource().empty()) {
+ Status status(ErrorCodes::NotMasterOrSecondary,
+ str::stream() << "Sync source " << getSource() << " has been resynced.");
+ stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData());
+ // Setting the status in the shared data will cancel the initial sync.
+ getSharedData()->setInitialSyncStatusIfOK(lk, status);
+ return status;
+ }
return Status(ErrorCodes::NotMasterOrSecondary,
str::stream() << "Cannot connect because sync source " << getSource()
<< " is neither primary nor secondary.");
@@ -99,6 +119,36 @@ BaseCloner::AfterStageBehavior AllDatabaseCloner::connectStage() {
return kContinueNormally;
}
+BaseCloner::AfterStageBehavior AllDatabaseCloner::getInitialSyncIdStage() {
+ auto wireVersion = static_cast<WireVersion>(getClient()->getMaxWireVersion());
+ {
+ stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData());
+ getSharedData()->setSyncSourceWireVersion(lk, wireVersion);
+ }
+
+ // Wire versions prior to resumable initial sync don't have a sync source id.
+ if (wireVersion < WireVersion::RESUMABLE_INITIAL_SYNC)
+ return kContinueNormally;
+ auto initialSyncId = getClient()->findOne(
+ ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(), Query());
+ // TODO(SERVER-47150): Remove this "if" and allow the uassert to fail.
+ if (initialSyncId.isEmpty()) {
+ stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData());
+ getSharedData()->setInitialSyncSourceId(lk, UUID::gen());
+ return kContinueNormally;
+ }
+ uassert(ErrorCodes::InitialSyncFailure,
+ "Cannot retrieve sync source initial sync ID",
+ !initialSyncId.isEmpty());
+ InitialSyncIdDocument initialSyncIdDoc =
+ InitialSyncIdDocument::parse(IDLParserErrorContext("initialSyncId"), initialSyncId);
+ {
+ stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData());
+ getSharedData()->setInitialSyncSourceId(lk, initialSyncIdDoc.get_id());
+ }
+ return kContinueNormally;
+}
+
BaseCloner::AfterStageBehavior AllDatabaseCloner::listDatabasesStage() {
BSONObj res;
auto databasesArray = getClient()->getDatabaseInfos(BSONObj(), true /* nameOnly */);
diff --git a/src/mongo/db/repl/all_database_cloner.h b/src/mongo/db/repl/all_database_cloner.h
index 5436b6172ce..8c6a264e906 100644
--- a/src/mongo/db/repl/all_database_cloner.h
+++ b/src/mongo/db/repl/all_database_cloner.h
@@ -69,7 +69,7 @@ private:
public:
ConnectStage(std::string name, AllDatabaseCloner* cloner, ClonerRunFn stageFunc)
: ClonerStage<AllDatabaseCloner>(name, cloner, stageFunc){};
- virtual bool checkRollBackIdOnRetry() {
+ bool checkSyncSourceValidityOnRetry() final {
return false;
}
};
@@ -103,6 +103,11 @@ private:
AfterStageBehavior connectStage();
/**
+ * Stage function that gets the wire version and initial sync ID.
+ */
+ AfterStageBehavior getInitialSyncIdStage();
+
+ /**
* Stage function that retrieves database information from the sync source.
*/
AfterStageBehavior listDatabasesStage();
@@ -128,6 +133,7 @@ private:
// (MX) Write access with mutex from main flow of control, read access with mutex from other
// threads, read access allowed from main flow without mutex.
ConnectStage _connectStage; // (R)
+ ConnectStage _getInitialSyncIdStage; // (R)
ClonerStage<AllDatabaseCloner> _listDatabasesStage; // (R)
std::vector<std::string> _databases; // (X)
std::unique_ptr<DatabaseCloner> _currentDatabaseCloner; // (MX)
diff --git a/src/mongo/db/repl/all_database_cloner_test.cpp b/src/mongo/db/repl/all_database_cloner_test.cpp
index 10507e1bb85..2da6f3831cb 100644
--- a/src/mongo/db/repl/all_database_cloner_test.cpp
+++ b/src/mongo/db/repl/all_database_cloner_test.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/repl/all_database_cloner.h"
#include "mongo/db/repl/cloner_test_fixture.h"
+#include "mongo/db/repl/replication_consistency_markers_impl.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/service_context_test_fixture.h"
@@ -293,6 +294,112 @@ TEST_F(AllDatabaseClonerTest, RetriesListDatabasesButRollBackIdChanges) {
ASSERT_EQ(Minutes(60), _sharedData->getTotalTimeUnreachable(WithLock::withoutLock()));
}
+TEST_F(AllDatabaseClonerTest, RetriesListDatabasesButSourceNodeIsDowngraded) {
+ _mockClient->setWireVersions(WireVersion::RESUMABLE_INITIAL_SYNC,
+ WireVersion::RESUMABLE_INITIAL_SYNC);
+ auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
+ _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
+ _mockServer->setCommandReply("listDatabases", fromjson("{ok:1, databases:[]}"));
+
+ // Stop at the listDatabases stage.
+ auto timesEnteredBeforeStage = beforeStageFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}"));
+
+ auto cloner = makeAllDatabaseCloner();
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ ASSERT_NOT_OK(cloner->run());
+ });
+
+ // Wait until we get to the listDatabases stage.
+ beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1);
+
+ // Bring the server down.
+ _mockServer->shutdown();
+
+ auto beforeRBIDFailPoint =
+ globalFailPointRegistry().find("hangBeforeCheckingRollBackIdClonerStage");
+ auto timesEnteredRBID = beforeRBIDFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}"));
+ beforeStageFailPoint->setMode(FailPoint::off, 0);
+ beforeRBIDFailPoint->waitForTimesEntered(timesEnteredRBID + 1);
+ _clock.advance(Minutes(60));
+
+ // Bring the server up, but change the wire version to an older one.
+ unittest::log() << "Bringing mock server back up.";
+ _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS,
+ WireVersion::SHARDED_TRANSACTIONS);
+ _mockServer->reboot();
+
+ // Allow the cloner to finish.
+ beforeRBIDFailPoint->setMode(FailPoint::off, 0);
+ clonerThread.join();
+
+ // Total retries and outage time should be available.
+ ASSERT_EQ(0, _sharedData->getRetryingOperationsCount(WithLock::withoutLock()));
+ ASSERT_EQ(1, _sharedData->getTotalRetries(WithLock::withoutLock()));
+ ASSERT_EQ(Minutes(60), _sharedData->getTotalTimeUnreachable(WithLock::withoutLock()));
+}
+
+TEST_F(AllDatabaseClonerTest, RetriesListDatabasesButInitialSyncIdChanges) {
+ // Initial Sync Ids are not checked before wire version RESUMABLE_INITIAL_SYNC.
+ _mockClient->setWireVersions(WireVersion::RESUMABLE_INITIAL_SYNC,
+ WireVersion::RESUMABLE_INITIAL_SYNC);
+ auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
+ _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
+ _mockServer->setCommandReply("listDatabases", fromjson("{ok:1, databases:[]}"));
+
+ // Stop at the listDatabases stage.
+ auto timesEnteredBeforeStage = beforeStageFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}"));
+
+ auto cloner = makeAllDatabaseCloner();
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ ASSERT_NOT_OK(cloner->run());
+ });
+
+ // Wait until we get to the listDatabases stage.
+ beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1);
+
+ // Bring the server down.
+ _mockServer->shutdown();
+
+ auto beforeRBIDFailPoint =
+ globalFailPointRegistry().find("hangBeforeCheckingRollBackIdClonerStage");
+ auto timesEnteredRBID = beforeRBIDFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}"));
+ beforeStageFailPoint->setMode(FailPoint::off, 0);
+ beforeRBIDFailPoint->waitForTimesEntered(timesEnteredRBID + 1);
+ _clock.advance(Minutes(60));
+
+ // Bring the server up.
+ unittest::log() << "Bringing mock server back up.";
+ _mockServer->reboot();
+
+ // Clear and change the initial sync ID
+ _mockServer->remove(
+ ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(),
+ Query(),
+ 0 /* ignored flags */);
+ _mockServer->insert(
+ ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(),
+ BSON("_id" << UUID::gen()));
+
+ // Allow the cloner to finish.
+ beforeRBIDFailPoint->setMode(FailPoint::off, 0);
+ clonerThread.join();
+
+ // Total retries and outage time should be available.
+ ASSERT_EQ(0, _sharedData->getRetryingOperationsCount(WithLock::withoutLock()));
+ ASSERT_EQ(1, _sharedData->getTotalRetries(WithLock::withoutLock()));
+ ASSERT_EQ(Minutes(60), _sharedData->getTotalTimeUnreachable(WithLock::withoutLock()));
+}
+
TEST_F(AllDatabaseClonerTest, RetriesListDatabasesButTimesOut) {
auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
_mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
diff --git a/src/mongo/db/repl/base_cloner.cpp b/src/mongo/db/repl/base_cloner.cpp
index 1e1fdc45a19..02f8b1af48a 100644
--- a/src/mongo/db/repl/base_cloner.cpp
+++ b/src/mongo/db/repl/base_cloner.cpp
@@ -32,6 +32,8 @@
#include "mongo/platform/basic.h"
#include "mongo/db/repl/base_cloner.h"
+#include "mongo/db/repl/replication_consistency_markers_gen.h"
+#include "mongo/db/repl/replication_consistency_markers_impl.h"
#include "mongo/logv2/log.h"
#include "mongo/util/scopeguard.h"
@@ -183,6 +185,54 @@ void BaseCloner::clearRetryingState() {
_retryableOp = boost::none;
}
+Status BaseCloner::checkSyncSourceIsStillValid() {
+ WireVersion wireVersion;
+ {
+ stdx::lock_guard<InitialSyncSharedData> lk(*_sharedData);
+ auto wireVersionOpt = _sharedData->getSyncSourceWireVersion(lk);
+ // The wire version should always have been set by the time this is called.
+ invariant(wireVersionOpt);
+ wireVersion = *wireVersionOpt;
+ }
+ if (wireVersion >= WireVersion::RESUMABLE_INITIAL_SYNC) {
+ auto status = checkInitialSyncIdIsUnchanged();
+ if (!status.isOK())
+ return status;
+ }
+ return checkRollBackIdIsUnchanged();
+}
+
+Status BaseCloner::checkInitialSyncIdIsUnchanged() {
+ uassert(ErrorCodes::InitialSyncFailure,
+ "Sync source was downgraded and no longer supports resumable initial sync",
+ getClient()->getMaxWireVersion() >= WireVersion::RESUMABLE_INITIAL_SYNC);
+ BSONObj initialSyncId;
+ try {
+ initialSyncId = getClient()->findOne(
+ ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(), Query());
+ } catch (DBException& e) {
+ if (ErrorCodes::isRetriableError(e)) {
+ auto status = e.toStatus().withContext(
+ ": failed while attempting to retrieve initial sync ID after re-connect");
+ LOGV2_DEBUG(
+ 4608505, 1, "Retrieving Initial Sync ID retriable error", "error"_attr = status);
+ return status;
+ }
+ throw;
+ }
+ uassert(ErrorCodes::InitialSyncFailure,
+ "Cannot retrieve sync source initial sync ID",
+ !initialSyncId.isEmpty());
+ InitialSyncIdDocument initialSyncIdDoc =
+ InitialSyncIdDocument::parse(IDLParserErrorContext("initialSyncId"), initialSyncId);
+
+ stdx::lock_guard<InitialSyncSharedData> lk(*_sharedData);
+ uassert(ErrorCodes::InitialSyncFailure,
+ "Sync source has been resynced since we started syncing from it",
+ _sharedData->getInitialSyncSourceId(lk) == initialSyncIdDoc.get_id());
+ return Status::OK();
+}
+
Status BaseCloner::checkRollBackIdIsUnchanged() {
BSONObj info;
try {
@@ -267,12 +317,13 @@ BaseCloner::AfterStageBehavior BaseCloner::runStageWithRetries(BaseClonerStage*
}
},
isThisStageFailPoint);
- if (stage->checkRollBackIdOnRetry()) {
- // If checkRollBackIdIsUnchanged fails without throwing, it means a network
+ if (stage->checkSyncSourceValidityOnRetry()) {
+ // If checkSyncSourceIsStillValid fails without throwing, it means a network
// error occurred and it's safe to continue (which will cause another retry).
- if (!checkRollBackIdIsUnchanged().isOK())
+ if (!checkSyncSourceIsStillValid().isOK())
continue;
- // After successfully checking the rollback ID, the client should always be OK.
+ // After successfully checking the sync source validity, the client should
+ // always be OK.
invariant(!getClient()->isFailed());
}
}
diff --git a/src/mongo/db/repl/base_cloner.h b/src/mongo/db/repl/base_cloner.h
index 38079eeebac..d34597b332f 100644
--- a/src/mongo/db/repl/base_cloner.h
+++ b/src/mongo/db/repl/base_cloner.h
@@ -112,11 +112,13 @@ protected:
}
/**
- * Returns true if the rollback ID should be checked before retrying.
- * This is provided because the "connect" stage must complete successfully
- * before checking rollback ID.
+ * Returns true if the sync source validity should be checked before retrying.
+ * This includes checking the sync source member state, checking the rollback ID,
+ * and checking the sync source initial sync ID.
+ * This method is provided because early stages which connect and collect
+ * the initial sync ID must complete successfully before checking sync source validity.
*/
- virtual bool checkRollBackIdOnRetry() {
+ virtual bool checkSyncSourceValidityOnRetry() {
return true;
}
@@ -226,12 +228,25 @@ private:
AfterStageBehavior runStageWithRetries(BaseClonerStage* stage);
/**
+ * Make sure the initial sync ID on the sync source has not changed. Throws an exception
+ * if it has. Returns a not-OK status if a network error occurs.
+ */
+ Status checkInitialSyncIdIsUnchanged();
+
+ /**
* Make sure the rollback ID has not changed. Throws an exception if it has. Returns
* a not-OK status if a network error occurs.
*/
Status checkRollBackIdIsUnchanged();
/**
+ * Does validity checks on the sync source. If the sync source is now no longer usable,
+ * throws an exception. Returns a not-OK status if a network error occurs or if the sync
+ * source is temporarily unusable (e.g. restarting).
+ */
+ Status checkSyncSourceIsStillValid();
+
+ /**
* Supports pausing at certain stages for the initial sync fuzzer test framework.
*/
void pauseForFuzzer(BaseClonerStage* stage);
diff --git a/src/mongo/db/repl/cloner_test_fixture.cpp b/src/mongo/db/repl/cloner_test_fixture.cpp
index 0a6191fc171..e588748e7cc 100644
--- a/src/mongo/db/repl/cloner_test_fixture.cpp
+++ b/src/mongo/db/repl/cloner_test_fixture.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/clientcursor.h"
#include "mongo/db/repl/cloner_test_fixture.h"
+#include "mongo/db/repl/replication_consistency_markers_impl.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/service_context_test_fixture.h"
@@ -74,6 +75,11 @@ void ClonerTestFixture::setUp() {
// Required by CollectionCloner::listIndexesStage() and IndexBuildsCoordinator.
getServiceContext()->setStorageEngine(std::make_unique<StorageEngineMock>());
+
+ // Set the initial sync ID on the mock server.
+ _mockServer->insert(
+ ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(),
+ BSON("_id" << _initialSyncId));
}
void ClonerTestFixture::tearDown() {
@@ -84,5 +90,11 @@ void ClonerTestFixture::tearDown() {
unittest::Test::tearDown();
}
+void ClonerTestFixture::setInitialSyncId() {
+ stdx::lock_guard<InitialSyncSharedData> lk(*_sharedData);
+ _sharedData->setSyncSourceWireVersion(lk, WireVersion::RESUMABLE_INITIAL_SYNC);
+ _sharedData->setInitialSyncSourceId(lk, _initialSyncId);
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/cloner_test_fixture.h b/src/mongo/db/repl/cloner_test_fixture.h
index 726c107c273..6dddb4162da 100644
--- a/src/mongo/db/repl/cloner_test_fixture.h
+++ b/src/mongo/db/repl/cloner_test_fixture.h
@@ -56,6 +56,8 @@ protected:
void tearDown() override;
+ void setInitialSyncId();
+
StorageInterfaceMock _storageInterface;
HostAndPort _source;
std::unique_ptr<ThreadPool> _dbWorkThreadPool;
@@ -63,6 +65,7 @@ protected:
std::unique_ptr<DBClientConnection> _mockClient;
std::unique_ptr<InitialSyncSharedData> _sharedData;
ClockSourceMock _clock;
+ UUID _initialSyncId = UUID::gen();
private:
static constexpr int kInitialRollbackId = 1;
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp
index 5fba71c2557..0527c205a39 100644
--- a/src/mongo/db/repl/collection_cloner_test.cpp
+++ b/src/mongo/db/repl/collection_cloner_test.cpp
@@ -59,7 +59,7 @@ public:
CollectionClonerTest() {}
protected:
- void setUp() final {
+ void setUp() override {
ClonerTestFixture::setUp();
_collectionStats = std::make_shared<CollectionMockStats>();
_standardCreateCollectionFn = [this](const NamespaceString& nss,
@@ -125,7 +125,25 @@ protected:
<< "b_1")};
};
-TEST_F(CollectionClonerTest, CountStage) {
+class CollectionClonerTestResumable : public CollectionClonerTest {
+ void setUp() final {
+ CollectionClonerTest::setUp();
+ setInitialSyncId();
+ }
+};
+
+class CollectionClonerTestNonResumable : public CollectionClonerTest {
+ void setUp() final {
+ CollectionClonerTest::setUp();
+ // Set client wireVersion to 4.2, where we do not yet support resumable cloning.
+ _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS,
+ WireVersion::SHARDED_TRANSACTIONS);
+ stdx::lock_guard<InitialSyncSharedData> lk(*_sharedData);
+ _sharedData->setSyncSourceWireVersion(lk, WireVersion::SHARDED_TRANSACTIONS);
+ }
+};
+
+TEST_F(CollectionClonerTestResumable, CountStage) {
auto cloner = makeCollectionCloner();
cloner->setStopAfterStage_forTest("count");
_mockServer->setCommandReply("count", createCountResponse(100));
@@ -134,7 +152,7 @@ TEST_F(CollectionClonerTest, CountStage) {
}
// On a negative count, the CollectionCloner should use a zero count.
-TEST_F(CollectionClonerTest, CountStageNegativeCount) {
+TEST_F(CollectionClonerTestResumable, CountStageNegativeCount) {
auto cloner = makeCollectionCloner();
cloner->setStopAfterStage_forTest("count");
_mockServer->setCommandReply("count", createCountResponse(-100));
@@ -143,19 +161,21 @@ TEST_F(CollectionClonerTest, CountStageNegativeCount) {
}
// On NamespaceNotFound, the CollectionCloner should exit without doing anything.
-TEST_F(CollectionClonerTest, CountStageNamespaceNotFound) {
+TEST_F(CollectionClonerTestResumable, CountStageNamespaceNotFound) {
auto cloner = makeCollectionCloner();
_mockServer->setCommandReply("count", Status(ErrorCodes::NamespaceNotFound, "NoSuchUuid"));
ASSERT_OK(cloner->run());
}
-TEST_F(CollectionClonerTest, CollectionClonerPassesThroughNonRetriableErrorFromCountCommand) {
+TEST_F(CollectionClonerTestResumable,
+ CollectionClonerPassesThroughNonRetriableErrorFromCountCommand) {
auto cloner = makeCollectionCloner();
_mockServer->setCommandReply("count", Status(ErrorCodes::OperationFailed, ""));
ASSERT_EQUALS(ErrorCodes::OperationFailed, cloner->run());
}
-TEST_F(CollectionClonerTest, CollectionClonerPassesThroughCommandStatusErrorFromCountCommand) {
+TEST_F(CollectionClonerTestResumable,
+ CollectionClonerPassesThroughCommandStatusErrorFromCountCommand) {
auto cloner = makeCollectionCloner();
_mockServer->setCommandReply("count", Status(ErrorCodes::OperationFailed, ""));
_mockServer->setCommandReply("count",
@@ -167,7 +187,8 @@ TEST_F(CollectionClonerTest, CollectionClonerPassesThroughCommandStatusErrorFrom
ASSERT_STRING_CONTAINS(status.reason(), "TEST error");
}
-TEST_F(CollectionClonerTest, CollectionClonerReturnsNoSuchKeyOnMissingDocumentCountFieldName) {
+TEST_F(CollectionClonerTestResumable,
+ CollectionClonerReturnsNoSuchKeyOnMissingDocumentCountFieldName) {
auto cloner = makeCollectionCloner();
cloner->setStopAfterStage_forTest("count");
_mockServer->setCommandReply("count", BSON("ok" << 1));
@@ -175,7 +196,7 @@ TEST_F(CollectionClonerTest, CollectionClonerReturnsNoSuchKeyOnMissingDocumentCo
ASSERT_EQUALS(ErrorCodes::NoSuchKey, status);
}
-TEST_F(CollectionClonerTest, ListIndexesReturnedNoIndexes) {
+TEST_F(CollectionClonerTestResumable, ListIndexesReturnedNoIndexes) {
auto cloner = makeCollectionCloner();
cloner->setStopAfterStage_forTest("listIndexes");
_mockServer->setCommandReply("count", createCountResponse(1));
@@ -187,7 +208,7 @@ TEST_F(CollectionClonerTest, ListIndexesReturnedNoIndexes) {
}
// NamespaceNotFound is treated the same as no index.
-TEST_F(CollectionClonerTest, ListIndexesReturnedNamespaceNotFound) {
+TEST_F(CollectionClonerTestResumable, ListIndexesReturnedNamespaceNotFound) {
auto cloner = makeCollectionCloner();
_mockServer->setCommandReply("count", createCountResponse(1));
_mockServer->setCommandReply("listIndexes",
@@ -199,7 +220,7 @@ TEST_F(CollectionClonerTest, ListIndexesReturnedNamespaceNotFound) {
ASSERT_EQ(0, cloner->getStats().indexes);
}
-TEST_F(CollectionClonerTest, ListIndexesHasResults) {
+TEST_F(CollectionClonerTestResumable, ListIndexesHasResults) {
auto cloner = makeCollectionCloner();
cloner->setStopAfterStage_forTest("listIndexes");
_mockServer->setCommandReply("count", createCountResponse(1));
@@ -216,7 +237,7 @@ TEST_F(CollectionClonerTest, ListIndexesHasResults) {
ASSERT_EQ(3, cloner->getStats().indexes);
}
-TEST_F(CollectionClonerTest, CollectionClonerResendsListIndexesCommandOnRetriableError) {
+TEST_F(CollectionClonerTestResumable, CollectionClonerResendsListIndexesCommandOnRetriableError) {
auto cloner = makeCollectionCloner();
cloner->setStopAfterStage_forTest("listIndexes");
_mockServer->setCommandReply("count", createCountResponse(1));
@@ -233,7 +254,7 @@ TEST_F(CollectionClonerTest, CollectionClonerResendsListIndexesCommandOnRetriabl
ASSERT_EQ(2, cloner->getStats().indexes);
}
-TEST_F(CollectionClonerTest, BeginCollection) {
+TEST_F(CollectionClonerTestResumable, BeginCollection) {
NamespaceString collNss;
CollectionOptions collOptions;
BSONObj collIdIndexSpec;
@@ -270,7 +291,7 @@ TEST_F(CollectionClonerTest, BeginCollection) {
}
}
-TEST_F(CollectionClonerTest, BeginCollectionFailed) {
+TEST_F(CollectionClonerTestResumable, BeginCollectionFailed) {
_storageInterface.createCollectionForBulkFn = [&](const NamespaceString& theNss,
const CollectionOptions& theOptions,
const BSONObj idIndexSpec,
@@ -285,7 +306,7 @@ TEST_F(CollectionClonerTest, BeginCollectionFailed) {
ASSERT_EQUALS(ErrorCodes::OperationFailed, cloner->run());
}
-TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) {
+TEST_F(CollectionClonerTestResumable, InsertDocumentsSingleBatch) {
// Set up data for preliminary stages
_mockServer->setCommandReply("count", createCountResponse(2));
_mockServer->setCommandReply("listIndexes",
@@ -305,7 +326,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) {
ASSERT_EQUALS(1u, stats.receivedBatches);
}
-TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) {
+TEST_F(CollectionClonerTestResumable, InsertDocumentsMultipleBatches) {
// Set up data for preliminary stages
_mockServer->setCommandReply("count", createCountResponse(2));
_mockServer->setCommandReply("listIndexes",
@@ -327,7 +348,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) {
ASSERT_EQUALS(2u, stats.receivedBatches);
}
-TEST_F(CollectionClonerTest, InsertDocumentsScheduleDBWorkFailed) {
+TEST_F(CollectionClonerTestResumable, InsertDocumentsScheduleDBWorkFailed) {
// Set up data for preliminary stages
_mockServer->setCommandReply("count", createCountResponse(2));
_mockServer->setCommandReply("listIndexes",
@@ -364,7 +385,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsScheduleDBWorkFailed) {
clonerThread.join();
}
-TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) {
+TEST_F(CollectionClonerTestResumable, InsertDocumentsCallbackCanceled) {
// Set up data for preliminary stages
_mockServer->setCommandReply("count", createCountResponse(2));
_mockServer->setCommandReply("listIndexes",
@@ -407,7 +428,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) {
clonerThread.join();
}
-TEST_F(CollectionClonerTest, InsertDocumentsFailed) {
+TEST_F(CollectionClonerTestResumable, InsertDocumentsFailed) {
// Set up data for preliminary stages
_mockServer->setCommandReply("count", createCountResponse(2));
_mockServer->setCommandReply("listIndexes",
@@ -447,7 +468,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsFailed) {
clonerThread.join();
}
-TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) {
+TEST_F(CollectionClonerTestResumable, DoNotCreateIDIndexIfAutoIndexIdUsed) {
NamespaceString collNss;
CollectionOptions collOptions;
// We initialize collIndexSpecs with fake information to ensure it is overwritten by an empty
@@ -482,11 +503,8 @@ TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) {
ASSERT_EQ(collNss, _nss);
}
-TEST_F(CollectionClonerTest, NonResumableQuerySuccess) {
+TEST_F(CollectionClonerTestNonResumable, NonResumableQuerySuccess) {
// Set client wireVersion to 4.2, where we do not yet support resumable cloning.
- _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS,
- WireVersion::SHARDED_TRANSACTIONS);
-
// Set up data for preliminary stages
auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
<< "_id_");
@@ -509,11 +527,7 @@ TEST_F(CollectionClonerTest, NonResumableQuerySuccess) {
ASSERT_EQUALS(3u, stats.documentsCopied);
}
-TEST_F(CollectionClonerTest, NonResumableQueryFailure) {
- // Set client wireVersion to 4.2, where we do not yet support resumable cloning.
- _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS,
- WireVersion::SHARDED_TRANSACTIONS);
-
+TEST_F(CollectionClonerTestNonResumable, NonResumableQueryFailure) {
// Set up data for preliminary stages
auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
<< "_id_");
@@ -553,7 +567,7 @@ TEST_F(CollectionClonerTest, NonResumableQueryFailure) {
}
// We will retry our query without having yet obtained a resume token.
-TEST_F(CollectionClonerTest, ResumableQueryFailTransientlyBeforeFirstBatchRetrySuccess) {
+TEST_F(CollectionClonerTestResumable, ResumableQueryFailTransientlyBeforeFirstBatchRetrySuccess) {
_mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
// Set up data for preliminary stages
@@ -615,7 +629,7 @@ TEST_F(CollectionClonerTest, ResumableQueryFailTransientlyBeforeFirstBatchRetryS
}
// We will resume our query using the resume token we stored after receiving the first batch.
-TEST_F(CollectionClonerTest, ResumableQueryFailTransientlyAfterFirstBatchRetrySuccess) {
+TEST_F(CollectionClonerTestResumable, ResumableQueryFailTransientlyAfterFirstBatchRetrySuccess) {
_mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
// Set up data for preliminary stages
@@ -671,7 +685,7 @@ TEST_F(CollectionClonerTest, ResumableQueryFailTransientlyAfterFirstBatchRetrySu
ASSERT_EQUALS(5u, stats.documentsCopied);
}
-TEST_F(CollectionClonerTest, ResumableQueryNonRetriableError) {
+TEST_F(CollectionClonerTestResumable, ResumableQueryNonRetriableError) {
_mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
// Set up data for preliminary stages
@@ -715,7 +729,8 @@ TEST_F(CollectionClonerTest, ResumableQueryNonRetriableError) {
clonerThread.join();
}
-TEST_F(CollectionClonerTest, ResumableQueryFailNonTransientlyAfterProgressMadeCannotRetry) {
+TEST_F(CollectionClonerTestResumable,
+ ResumableQueryFailNonTransientlyAfterProgressMadeCannotRetry) {
_mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
// Set up data for preliminary stages
@@ -760,7 +775,7 @@ TEST_F(CollectionClonerTest, ResumableQueryFailNonTransientlyAfterProgressMadeCa
}
// We retry the query after a transient error and we immediately encounter a non-retriable one.
-TEST_F(CollectionClonerTest, ResumableQueryNonTransientErrorAtRetry) {
+TEST_F(CollectionClonerTestResumable, ResumableQueryNonTransientErrorAtRetry) {
_mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
// Set up data for preliminary stages
@@ -821,7 +836,7 @@ TEST_F(CollectionClonerTest, ResumableQueryNonTransientErrorAtRetry) {
// We retry the query after a transient error, we make a bit more progress and then we encounter
// a non-retriable one.
-TEST_F(CollectionClonerTest, ResumableQueryNonTransientErrorAfterPastRetry) {
+TEST_F(CollectionClonerTestResumable, ResumableQueryNonTransientErrorAfterPastRetry) {
_mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
// Set up data for preliminary stages
@@ -889,7 +904,7 @@ TEST_F(CollectionClonerTest, ResumableQueryNonTransientErrorAfterPastRetry) {
// We resume a query, receive some more data, then get a transient error again. The goal of this
// test is to make sure we don't forget to request the _next_ resume token when resuming a query.
-TEST_F(CollectionClonerTest, ResumableQueryTwoResumes) {
+TEST_F(CollectionClonerTestResumable, ResumableQueryTwoResumes) {
/**
* Test runs like so:
@@ -989,11 +1004,7 @@ TEST_F(CollectionClonerTest, ResumableQueryTwoResumes) {
// We receive a QueryPlanKilled error, then a NamespaceNotFound error, indicating that the
// collection no longer exists in the database.
-TEST_F(CollectionClonerTest, NonResumableCursorErrorDropOK) {
- // Set client wireVersion to 4.2, where we do not yet support resumable cloning.
- _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS,
- WireVersion::SHARDED_TRANSACTIONS);
-
+TEST_F(CollectionClonerTestNonResumable, NonResumableCursorErrorDropOK) {
// Set up data for preliminary stages
auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
<< "_id_");
@@ -1042,11 +1053,7 @@ TEST_F(CollectionClonerTest, NonResumableCursorErrorDropOK) {
// We receive an OperationFailed error, but the next error we receive is _not_ NamespaceNotFound,
// which means the collection still exists in the database, but we cannot resume the query.
-TEST_F(CollectionClonerTest, NonResumableCursorErrorThenOtherError) {
- // Set client wireVersion to 4.2, where we do not yet support resumable cloning.
- _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS,
- WireVersion::SHARDED_TRANSACTIONS);
-
+TEST_F(CollectionClonerTestNonResumable, NonResumableCursorErrorThenOtherError) {
// Set up data for preliminary stages
auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
<< "_id_");
@@ -1097,11 +1104,7 @@ TEST_F(CollectionClonerTest, NonResumableCursorErrorThenOtherError) {
// We receive a CursorNotFound error, but the next query succeeds, indicating that the
// collection still exists in the database, but we cannot resume the query.
-TEST_F(CollectionClonerTest, NonResumableCursorErrorThenSuccessEqualsFailure) {
- // Set client wireVersion to 4.2, where we do not yet support resumable cloning.
- _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS,
- WireVersion::SHARDED_TRANSACTIONS);
-
+TEST_F(CollectionClonerTestNonResumable, NonResumableCursorErrorThenSuccessEqualsFailure) {
// Set up data for preliminary stages
auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
<< "_id_");
diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp
index d380af31aad..9301ee648a6 100644
--- a/src/mongo/db/repl/database_cloner_test.cpp
+++ b/src/mongo/db/repl/database_cloner_test.cpp
@@ -71,6 +71,7 @@ protected:
return std::move(localLoader);
};
+ setInitialSyncId();
}
std::unique_ptr<DatabaseCloner> makeDatabaseCloner() {
return std::make_unique<DatabaseCloner>(_dbName,
diff --git a/src/mongo/db/repl/initial_sync_shared_data.h b/src/mongo/db/repl/initial_sync_shared_data.h
index 8ab165180cb..75b8ac7e5f9 100644
--- a/src/mongo/db/repl/initial_sync_shared_data.h
+++ b/src/mongo/db/repl/initial_sync_shared_data.h
@@ -34,8 +34,10 @@
#include "mongo/base/status.h"
#include "mongo/base/string_data.h"
#include "mongo/db/server_options.h"
+#include "mongo/db/wire_version.h"
#include "mongo/platform/mutex.h"
#include "mongo/util/clock_source.h"
+#include "mongo/util/uuid.h"
namespace mongo {
namespace repl {
@@ -86,6 +88,34 @@ public:
}
/**
+ * Sets the wire version of the sync source.
+ */
+ void setSyncSourceWireVersion(WithLock, WireVersion wireVersion) {
+ _syncSourceWireVersion = wireVersion;
+ }
+
+ /**
+ * Returns the wire version of the sync source, if previously set.
+ */
+ boost::optional<WireVersion> getSyncSourceWireVersion(WithLock) {
+ return _syncSourceWireVersion;
+ }
+
+ /**
+ * Sets the initial sync ID of the sync source.
+ */
+ void setInitialSyncSourceId(WithLock, boost::optional<UUID> syncSourceId) {
+ _initialSyncSourceId = syncSourceId;
+ }
+
+ /**
+ * Gets the previously-set initial sync ID of the sync source.
+ */
+ boost::optional<UUID> getInitialSyncSourceId(WithLock) {
+ return _initialSyncSourceId;
+ }
+
+ /**
* Returns the total time the sync source has been unreachable, including any current outage.
*/
Milliseconds getTotalTimeUnreachable(WithLock lk);
@@ -221,6 +251,12 @@ private:
// The total time across all outages in this initial sync attempt, but excluding any current
// outage, that we were retrying because we were unable to reach the sync source.
Milliseconds _totalTimeUnreachable;
+
+ // The sync source wire version at the start of data cloning.
+ boost::optional<WireVersion> _syncSourceWireVersion;
+
+ // The initial sync ID on the source at the start of data cloning.
+ boost::optional<UUID> _initialSyncSourceId;
};
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index f45de57cc67..3a1470c6e41 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -511,6 +511,7 @@ void InitialSyncer::waitForCloner_forTest() {
void InitialSyncer::_setUp_inlock(OperationContext* opCtx, std::uint32_t initialSyncMaxAttempts) {
// 'opCtx' is passed through from startup().
_replicationProcess->getConsistencyMarkers()->setInitialSyncFlag(opCtx);
+ _replicationProcess->getConsistencyMarkers()->clearInitialSyncId(opCtx);
auto serviceCtx = opCtx->getServiceContext();
_storage->setInitialDataTimestamp(serviceCtx, Timestamp::kAllowUnstableCheckpointsSentinel);
@@ -550,6 +551,7 @@ void InitialSyncer::_tearDown_inlock(OperationContext* opCtx,
reconstructPreparedTransactions(opCtx, repl::OplogApplication::Mode::kInitialSync);
+ _replicationProcess->getConsistencyMarkers()->setInitialSyncIdIfNotSet(opCtx);
_replicationProcess->getConsistencyMarkers()->clearInitialSyncFlag(opCtx);
// All updates that represent initial sync must be completed before setting the initial data
diff --git a/src/mongo/db/repl/replication_consistency_markers.h b/src/mongo/db/repl/replication_consistency_markers.h
index e873d4977c9..57cd96262c1 100644
--- a/src/mongo/db/repl/replication_consistency_markers.h
+++ b/src/mongo/db/repl/replication_consistency_markers.h
@@ -72,6 +72,16 @@ class StorageInterface;
* }
*
* See below for explanations of each field.
+ *
+ * The initialSyncId document, in 'local.replset.initialSyncId', is used to detect when nodes have
+ * been resynced. It is set at the end of initial sync, or whenever replication is started when the
+ * document does not exist.
+ *
+ * Example of all fields:
+ * {
+ * _id: <UUID>,
+ * wallTime: <Date_t>
+ * }
*/
class ReplicationConsistencyMarkers {
ReplicationConsistencyMarkers(const ReplicationConsistencyMarkers&) = delete;
@@ -254,6 +264,22 @@ public:
* or `oplogTruncateAfterPoint`.
*/
virtual Status createInternalCollections(OperationContext* opCtx) = 0;
+
+ /**
+ * Set the initial sync ID and wall time if it is not already set. This will create the
+ * collection if necessary.
+ */
+ virtual void setInitialSyncIdIfNotSet(OperationContext* opCtx) = 0;
+
+ /**
+ * Clears the initial sync ID by dropping the collection.
+ */
+ virtual void clearInitialSyncId(OperationContext* opCtx) = 0;
+
+ /**
+ * Returns the initial sync id object, or an empty object if none.
+ */
+ virtual BSONObj getInitialSyncId(OperationContext* opCtx) = 0;
};
} // namespace repl
diff --git a/src/mongo/db/repl/replication_consistency_markers.idl b/src/mongo/db/repl/replication_consistency_markers.idl
index df977970a30..7610f82e1fa 100644
--- a/src/mongo/db/repl/replication_consistency_markers.idl
+++ b/src/mongo/db/repl/replication_consistency_markers.idl
@@ -71,3 +71,13 @@ structs:
type: string
description: "Always set to 'oplogTruncateAfterPoint' to easily retrieve it."
+ InitialSyncIdDocument:
+ description: A document in which the server stores data related to the initial sync of the server.
+ fields:
+ _id:
+ type: uuid
+ description: "An arbitrary unique identifier associated with the initial sync of the server."
+ wallTime:
+ type: date
+ optional: true
+ description: "The walltime at which the initial sync document was written."
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
index c06c8126f1a..fe73097941c 100644
--- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp
+++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
@@ -47,6 +47,7 @@ namespace repl {
constexpr StringData ReplicationConsistencyMarkersImpl::kDefaultMinValidNamespace;
constexpr StringData ReplicationConsistencyMarkersImpl::kDefaultOplogTruncateAfterPointNamespace;
+constexpr StringData ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace;
namespace {
const BSONObj kInitialSyncFlag(BSON(MinValidDocument::kInitialSyncFlagFieldName << true));
@@ -60,15 +61,18 @@ ReplicationConsistencyMarkersImpl::ReplicationConsistencyMarkersImpl(
storageInterface,
NamespaceString(ReplicationConsistencyMarkersImpl::kDefaultMinValidNamespace),
NamespaceString(
- ReplicationConsistencyMarkersImpl::kDefaultOplogTruncateAfterPointNamespace)) {}
+ ReplicationConsistencyMarkersImpl::kDefaultOplogTruncateAfterPointNamespace),
+ NamespaceString(ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace)) {}
ReplicationConsistencyMarkersImpl::ReplicationConsistencyMarkersImpl(
StorageInterface* storageInterface,
NamespaceString minValidNss,
- NamespaceString oplogTruncateAfterPointNss)
+ NamespaceString oplogTruncateAfterPointNss,
+ NamespaceString initialSyncIdNss)
: _storageInterface(storageInterface),
_minValidNss(minValidNss),
- _oplogTruncateAfterPointNss(oplogTruncateAfterPointNss) {}
+ _oplogTruncateAfterPointNss(oplogTruncateAfterPointNss),
+ _initialSyncIdNss(initialSyncIdNss) {}
boost::optional<MinValidDocument> ReplicationConsistencyMarkersImpl::_getMinValidDocument(
OperationContext* opCtx) const {
@@ -526,5 +530,44 @@ Status ReplicationConsistencyMarkersImpl::createInternalCollections(OperationCon
return Status::OK();
}
+void ReplicationConsistencyMarkersImpl::setInitialSyncIdIfNotSet(OperationContext* opCtx) {
+ auto status =
+ _storageInterface->createCollection(opCtx, _initialSyncIdNss, CollectionOptions());
+ if (!status.isOK() && status.code() != ErrorCodes::NamespaceExists) {
+ LOGV2_FATAL(
+ 4608500, "Failed to create collection", "namespace"_attr = _initialSyncIdNss.ns());
+ fassertFailedWithStatus(4608502, status);
+ }
+
+ auto prevId = _storageInterface->findSingleton(opCtx, _initialSyncIdNss);
+ if (prevId.getStatus() == ErrorCodes::CollectionIsEmpty) {
+ auto doc = BSON("_id" << UUID::gen() << "wallTime"
+ << opCtx->getServiceContext()->getPreciseClockSource()->now());
+ fassert(4608503,
+ _storageInterface->insertDocument(opCtx,
+ _initialSyncIdNss,
+ TimestampedBSONObj{doc, Timestamp()},
+ OpTime::kUninitializedTerm));
+ } else if (!prevId.isOK()) {
+ fassertFailedWithStatus(4608504, prevId.getStatus());
+ }
+}
+
+void ReplicationConsistencyMarkersImpl::clearInitialSyncId(OperationContext* opCtx) {
+ fassert(4608501, _storageInterface->dropCollection(opCtx, _initialSyncIdNss));
+}
+
+BSONObj ReplicationConsistencyMarkersImpl::getInitialSyncId(OperationContext* opCtx) {
+ auto idStatus = _storageInterface->findSingleton(opCtx, _initialSyncIdNss);
+ if (idStatus.isOK()) {
+ return idStatus.getValue();
+ }
+ if (idStatus.getStatus() != ErrorCodes::CollectionIsEmpty &&
+ idStatus.getStatus() != ErrorCodes::NamespaceNotFound) {
+ uassertStatusOK(idStatus);
+ }
+ return BSONObj();
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.h b/src/mongo/db/repl/replication_consistency_markers_impl.h
index 552d03a372e..40fa2768529 100644
--- a/src/mongo/db/repl/replication_consistency_markers_impl.h
+++ b/src/mongo/db/repl/replication_consistency_markers_impl.h
@@ -53,11 +53,13 @@ public:
static constexpr StringData kDefaultMinValidNamespace = "local.replset.minvalid"_sd;
static constexpr StringData kDefaultOplogTruncateAfterPointNamespace =
"local.replset.oplogTruncateAfterPoint"_sd;
+ static constexpr StringData kDefaultInitialSyncIdNamespace = "local.replset.initialSyncId"_sd;
explicit ReplicationConsistencyMarkersImpl(StorageInterface* storageInterface);
ReplicationConsistencyMarkersImpl(StorageInterface* storageInterface,
NamespaceString minValidNss,
- NamespaceString oplogTruncateAfterNss);
+ NamespaceString oplogTruncateAfterNss,
+ NamespaceString initialSyncIdNss);
void initializeMinValidDocument(OperationContext* opCtx) override;
@@ -89,7 +91,11 @@ public:
void clearAppliedThrough(OperationContext* opCtx, const Timestamp& writeTimestamp) override;
OpTime getAppliedThrough(OperationContext* opCtx) const override;
- Status createInternalCollections(OperationContext* opCtx);
+ Status createInternalCollections(OperationContext* opCtx) override;
+
+ void setInitialSyncIdIfNotSet(OperationContext* opCtx) override;
+ void clearInitialSyncId(OperationContext* opCtx) override;
+ BSONObj getInitialSyncId(OperationContext* opCtx) override;
private:
/**
@@ -124,6 +130,7 @@ private:
StorageInterface* _storageInterface;
const NamespaceString _minValidNss;
const NamespaceString _oplogTruncateAfterPointNss;
+ const NamespaceString _initialSyncIdNss;
// Protects modifying and reading _isPrimary below.
mutable Mutex _truncatePointIsPrimaryMutex =
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp
index 8d1e7c07bc5..8767fec7057 100644
--- a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp
+++ b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp
@@ -56,6 +56,7 @@ using namespace mongo::repl;
NamespaceString kMinValidNss("local", "replset.minvalid");
NamespaceString kOplogTruncateAfterPointNss("local", "replset.oplogTruncateAfterPoint");
+NamespaceString kInitialSyncIdNss("local", "replset.initialSyncId");
/**
* Returns min valid document.
@@ -121,7 +122,7 @@ bool RecoveryUnitWithDurabilityTracking::waitUntilDurable(OperationContext* opCt
TEST_F(ReplicationConsistencyMarkersTest, InitialSyncFlag) {
ReplicationConsistencyMarkersImpl consistencyMarkers(
- getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss);
+ getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss, kInitialSyncIdNss);
auto opCtx = getOperationContext();
ASSERT(consistencyMarkers.createInternalCollections(opCtx).isOK());
consistencyMarkers.initializeMinValidDocument(opCtx);
@@ -145,7 +146,7 @@ TEST_F(ReplicationConsistencyMarkersTest, InitialSyncFlag) {
TEST_F(ReplicationConsistencyMarkersTest, GetMinValidAfterSettingInitialSyncFlagWorks) {
ReplicationConsistencyMarkersImpl consistencyMarkers(
- getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss);
+ getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss, kInitialSyncIdNss);
auto opCtx = getOperationContext();
ASSERT(consistencyMarkers.createInternalCollections(opCtx).isOK());
consistencyMarkers.initializeMinValidDocument(opCtx);
@@ -164,7 +165,7 @@ TEST_F(ReplicationConsistencyMarkersTest, GetMinValidAfterSettingInitialSyncFlag
TEST_F(ReplicationConsistencyMarkersTest, ClearInitialSyncFlagResetsOplogTruncateAfterPoint) {
ReplicationConsistencyMarkersImpl consistencyMarkers(
- getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss);
+ getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss, kInitialSyncIdNss);
auto opCtx = getOperationContext();
ASSERT(consistencyMarkers.createInternalCollections(opCtx).isOK());
consistencyMarkers.initializeMinValidDocument(opCtx);
@@ -187,7 +188,7 @@ TEST_F(ReplicationConsistencyMarkersTest, ClearInitialSyncFlagResetsOplogTruncat
TEST_F(ReplicationConsistencyMarkersTest, ReplicationConsistencyMarkers) {
ReplicationConsistencyMarkersImpl consistencyMarkers(
- getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss);
+ getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss, kInitialSyncIdNss);
auto opCtx = getOperationContext();
ASSERT(consistencyMarkers.createInternalCollections(opCtx).isOK());
consistencyMarkers.initializeMinValidDocument(opCtx);
@@ -242,5 +243,43 @@ TEST_F(ReplicationConsistencyMarkersTest, ReplicationConsistencyMarkers) {
ASSERT_FALSE(recoveryUnit->waitUntilDurableCalled);
}
+TEST_F(ReplicationConsistencyMarkersTest, InitialSyncId) {
+ ReplicationConsistencyMarkersImpl consistencyMarkers(
+ getStorageInterface(), kMinValidNss, kOplogTruncateAfterPointNss, kInitialSyncIdNss);
+ auto opCtx = getOperationContext();
+
+ // Initially, initialSyncId should be unset.
+ auto initialSyncIdShouldBeUnset = consistencyMarkers.getInitialSyncId(opCtx);
+ ASSERT(initialSyncIdShouldBeUnset.isEmpty()) << initialSyncIdShouldBeUnset;
+
+ // Clearing an already-clear initialSyncId should be OK.
+ consistencyMarkers.clearInitialSyncId(opCtx);
+ initialSyncIdShouldBeUnset = consistencyMarkers.getInitialSyncId(opCtx);
+ ASSERT(initialSyncIdShouldBeUnset.isEmpty()) << initialSyncIdShouldBeUnset;
+
+ consistencyMarkers.setInitialSyncIdIfNotSet(opCtx);
+ auto firstInitialSyncIdBson = consistencyMarkers.getInitialSyncId(opCtx);
+ ASSERT_FALSE(firstInitialSyncIdBson.isEmpty());
+ InitialSyncIdDocument firstInitialSyncIdDoc = InitialSyncIdDocument::parse(
+ IDLParserErrorContext("initialSyncId"), firstInitialSyncIdBson);
+
+ // Setting it twice should change nothing.
+ consistencyMarkers.setInitialSyncIdIfNotSet(opCtx);
+ ASSERT_BSONOBJ_EQ(firstInitialSyncIdBson, consistencyMarkers.getInitialSyncId(opCtx));
+
+ // Clear it; should return to empty.
+ consistencyMarkers.clearInitialSyncId(opCtx);
+ initialSyncIdShouldBeUnset = consistencyMarkers.getInitialSyncId(opCtx);
+ ASSERT(initialSyncIdShouldBeUnset.isEmpty()) << initialSyncIdShouldBeUnset;
+
+ // Set it; it should have a different UUID.
+ consistencyMarkers.setInitialSyncIdIfNotSet(opCtx);
+ auto secondInitialSyncIdBson = consistencyMarkers.getInitialSyncId(opCtx);
+ ASSERT_FALSE(secondInitialSyncIdBson.isEmpty());
+ InitialSyncIdDocument secondInitialSyncIdDoc = InitialSyncIdDocument::parse(
+ IDLParserErrorContext("initialSyncId"), secondInitialSyncIdBson);
+ ASSERT_NE(firstInitialSyncIdDoc.get_id(), secondInitialSyncIdDoc.get_id());
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.cpp b/src/mongo/db/repl/replication_consistency_markers_mock.cpp
index 28710b460eb..875c47f9e3d 100644
--- a/src/mongo/db/repl/replication_consistency_markers_mock.cpp
+++ b/src/mongo/db/repl/replication_consistency_markers_mock.cpp
@@ -135,5 +135,13 @@ Status ReplicationConsistencyMarkersMock::createInternalCollections(OperationCon
return Status::OK();
}
+void ReplicationConsistencyMarkersMock::setInitialSyncIdIfNotSet(OperationContext* opCtx) {}
+
+void ReplicationConsistencyMarkersMock::clearInitialSyncId(OperationContext* opCtx) {}
+
+BSONObj ReplicationConsistencyMarkersMock::getInitialSyncId(OperationContext* opCtx) {
+ return BSONObj();
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.h b/src/mongo/db/repl/replication_consistency_markers_mock.h
index 7afe39a6bd7..a925e014d93 100644
--- a/src/mongo/db/repl/replication_consistency_markers_mock.h
+++ b/src/mongo/db/repl/replication_consistency_markers_mock.h
@@ -83,6 +83,10 @@ public:
Status createInternalCollections(OperationContext* opCtx) override;
+ void setInitialSyncIdIfNotSet(OperationContext* opCtx) override;
+ void clearInitialSyncId(OperationContext* opCtx) override;
+ BSONObj getInitialSyncId(OperationContext* opCtx) override;
+
private:
mutable Mutex _initialSyncFlagMutex =
MONGO_MAKE_LATCH("ReplicationConsistencyMarkersMock::_initialSyncFlagMutex");
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index 881be41aa6d..dde8da44abb 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -745,13 +745,6 @@ public:
virtual std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op, bool durablyWritten) = 0;
/**
- * Returns a vector of the members other than ourself in the replica set, as specified in
- * the replica set config. Invalid to call if we are not in replica set mode. Returns
- * an empty vector if we do not have a valid config.
- */
- virtual std::vector<HostAndPort> getOtherNodesInReplSet() const = 0;
-
- /**
* Returns a BSONObj containing a representation of the current default write concern.
*/
virtual WriteConcernOptions getGetLastErrorDefault() = 0;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 16ba689edab..2635494cbd4 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -738,6 +738,9 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* opCtx,
auto memberState = getMemberState();
invariant(memberState.startup2() || memberState.removed());
invariant(setFollowerMode(MemberState::RS_RECOVERING));
+ // Set an initial sync ID, in case we were upgraded or restored from backup without doing
+ // an initial sync.
+ _replicationProcess->getConsistencyMarkers()->setInitialSyncIdIfNotSet(opCtx);
_externalState->startSteadyStateReplication(opCtx, this);
return;
}
@@ -4273,24 +4276,6 @@ std::vector<HostAndPort> ReplicationCoordinatorImpl::getHostsWrittenTo(const OpT
return _topCoord->getHostsWrittenTo(op, durablyWritten);
}
-std::vector<HostAndPort> ReplicationCoordinatorImpl::getOtherNodesInReplSet() const {
- stdx::lock_guard<Latch> lk(_mutex);
- invariant(_settings.usingReplSets());
-
- std::vector<HostAndPort> nodes;
- if (_selfIndex == -1) {
- return nodes;
- }
-
- for (int i = 0; i < _rsConfig.getNumMembers(); ++i) {
- if (i == _selfIndex)
- continue;
-
- nodes.push_back(_rsConfig.getMemberAt(i).getHostAndPort());
- }
- return nodes;
-}
-
Status ReplicationCoordinatorImpl::checkIfWriteConcernCanBeSatisfied(
const WriteConcernOptions& writeConcern) const {
stdx::lock_guard<Latch> lock(_mutex);
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index fc9e4a39888..b08d49f4429 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -261,8 +261,6 @@ public:
virtual std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op,
bool durablyWritten) override;
- virtual std::vector<HostAndPort> getOtherNodesInReplSet() const override;
-
virtual WriteConcernOptions getGetLastErrorDefault() override;
virtual Status checkReplEnabledForCommand(BSONObjBuilder* result) override;
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index aff27217a52..7d1498d80b2 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -3013,34 +3013,6 @@ TEST_F(ReplCoordTest,
}
}
-TEST_F(ReplCoordTest, NodeReturnsNoNodesWhenGetOtherNodesInReplSetIsRunBeforeHavingAConfig) {
- start();
- ASSERT_EQUALS(0U, getReplCoord()->getOtherNodesInReplSet().size());
-}
-
-TEST_F(ReplCoordTest, NodeReturnsListOfNodesOtherThanItselfInResponseToGetOtherNodesInReplSet) {
- assertStartSuccess(BSON("_id"
- << "mySet"
- << "version" << 2 << "members"
- << BSON_ARRAY(BSON("_id" << 0 << "host"
- << "h1")
- << BSON("_id" << 1 << "host"
- << "h2")
- << BSON("_id" << 2 << "host"
- << "h3"
- << "priority" << 0 << "hidden" << true))),
- HostAndPort("h1"));
-
- std::vector<HostAndPort> otherNodes = getReplCoord()->getOtherNodesInReplSet();
- ASSERT_EQUALS(2U, otherNodes.size());
- if (otherNodes[0] == HostAndPort("h2")) {
- ASSERT_EQUALS(HostAndPort("h3"), otherNodes[1]);
- } else {
- ASSERT_EQUALS(HostAndPort("h3"), otherNodes[0]);
- ASSERT_EQUALS(HostAndPort("h2"), otherNodes[1]);
- }
-}
-
TEST_F(ReplCoordTest, AwaitIsMasterResponseReturnsCurrentTopologyVersionOnTimeOut) {
init();
assertStartSuccess(BSON("_id"
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 6ddd4d4e84f..3f27644b3c6 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -407,10 +407,6 @@ std::vector<HostAndPort> ReplicationCoordinatorMock::getHostsWrittenTo(const OpT
return std::vector<HostAndPort>();
}
-std::vector<HostAndPort> ReplicationCoordinatorMock::getOtherNodesInReplSet() const {
- return std::vector<HostAndPort>();
-}
-
Status ReplicationCoordinatorMock::checkIfWriteConcernCanBeSatisfied(
const WriteConcernOptions& writeConcern) const {
return Status::OK();
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 7a4b8db79a0..970645e8154 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -227,8 +227,6 @@ public:
virtual std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op, bool durablyWritten);
- virtual std::vector<HostAndPort> getOtherNodesInReplSet() const;
-
virtual WriteConcernOptions getGetLastErrorDefault();
virtual Status checkReplEnabledForCommand(BSONObjBuilder* result);
diff --git a/src/mongo/db/repl/replication_coordinator_noop.cpp b/src/mongo/db/repl/replication_coordinator_noop.cpp
index d0e52db43a2..d54c2acc1a2 100644
--- a/src/mongo/db/repl/replication_coordinator_noop.cpp
+++ b/src/mongo/db/repl/replication_coordinator_noop.cpp
@@ -350,10 +350,6 @@ std::vector<HostAndPort> ReplicationCoordinatorNoOp::getHostsWrittenTo(const OpT
MONGO_UNREACHABLE;
}
-std::vector<HostAndPort> ReplicationCoordinatorNoOp::getOtherNodesInReplSet() const {
- MONGO_UNREACHABLE;
-}
-
Status ReplicationCoordinatorNoOp::checkIfWriteConcernCanBeSatisfied(
const WriteConcernOptions&) const {
MONGO_UNREACHABLE;
diff --git a/src/mongo/db/repl/replication_coordinator_noop.h b/src/mongo/db/repl/replication_coordinator_noop.h
index 382073c21fb..b897576dcf9 100644
--- a/src/mongo/db/repl/replication_coordinator_noop.h
+++ b/src/mongo/db/repl/replication_coordinator_noop.h
@@ -196,8 +196,6 @@ public:
std::vector<HostAndPort> getHostsWrittenTo(const OpTime&, bool) final;
- std::vector<HostAndPort> getOtherNodesInReplSet() const final;
-
Status checkReplEnabledForCommand(BSONObjBuilder*) final;
HostAndPort chooseNewSyncSource(const OpTime&) final;
diff --git a/src/mongo/embedded/replication_coordinator_embedded.cpp b/src/mongo/embedded/replication_coordinator_embedded.cpp
index 5e8d2c68a95..a9014d89808 100644
--- a/src/mongo/embedded/replication_coordinator_embedded.cpp
+++ b/src/mongo/embedded/replication_coordinator_embedded.cpp
@@ -375,10 +375,6 @@ std::vector<HostAndPort> ReplicationCoordinatorEmbedded::getHostsWrittenTo(const
UASSERT_NOT_IMPLEMENTED;
}
-std::vector<HostAndPort> ReplicationCoordinatorEmbedded::getOtherNodesInReplSet() const {
- UASSERT_NOT_IMPLEMENTED;
-}
-
Status ReplicationCoordinatorEmbedded::checkIfWriteConcernCanBeSatisfied(
const WriteConcernOptions&) const {
UASSERT_NOT_IMPLEMENTED;
diff --git a/src/mongo/embedded/replication_coordinator_embedded.h b/src/mongo/embedded/replication_coordinator_embedded.h
index 06716e97a4e..75cb9b53922 100644
--- a/src/mongo/embedded/replication_coordinator_embedded.h
+++ b/src/mongo/embedded/replication_coordinator_embedded.h
@@ -203,8 +203,6 @@ public:
std::vector<HostAndPort> getHostsWrittenTo(const repl::OpTime&, bool) override;
- std::vector<HostAndPort> getOtherNodesInReplSet() const override;
-
Status checkReplEnabledForCommand(BSONObjBuilder*) override;
HostAndPort chooseNewSyncSource(const repl::OpTime&) override;
diff --git a/src/mongo/util/uuid.h b/src/mongo/util/uuid.h
index b90b4d29d30..c142da4abea 100644
--- a/src/mongo/util/uuid.h
+++ b/src/mongo/util/uuid.h
@@ -46,6 +46,7 @@ namespace repl {
class CollectionInfo;
class OplogEntryBase;
class DurableReplOperation;
+class InitialSyncIdDocument;
} // namespace repl
namespace idl {
@@ -83,6 +84,7 @@ class UUID {
friend class repl::CollectionInfo;
friend class repl::OplogEntryBase;
friend class repl::DurableReplOperation;
+ friend class repl::InitialSyncIdDocument;
friend class ResumeTokenInternal;
friend class ShardCollectionTypeBase;
friend class VoteCommitIndexBuild;