summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGabriel Marks <gabriel.marks@mongodb.com>2022-01-04 23:12:00 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-01-04 23:41:46 +0000
commita29535aaa54d6ca2ec7f87cea34ac4138ae8803d (patch)
tree2c82ecfea8edb3975a64a73a578c21a8ea73965a
parentfdca8a8d628d5480e3f552f86ce89aa0d234741f (diff)
downloadmongo-a29535aaa54d6ca2ec7f87cea34ac4138ae8803d.tar.gz
SERVER-61505 Remove WireVersion::RESUMABLE_INITIAL_SYNC
-rw-r--r--src/mongo/db/repl/all_database_cloner.cpp9
-rw-r--r--src/mongo/db/repl/all_database_cloner_test.cpp52
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp101
-rw-r--r--src/mongo/db/repl/collection_cloner.h9
-rw-r--r--src/mongo/db/repl/collection_cloner_test.cpp222
-rw-r--r--src/mongo/db/repl/initial_sync_base_cloner.cpp19
-rw-r--r--src/mongo/db/repl/initial_sync_cloner_test_fixture.cpp1
-rw-r--r--src/mongo/db/repl/initial_sync_shared_data.h17
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp5
9 files changed, 30 insertions, 405 deletions
diff --git a/src/mongo/db/repl/all_database_cloner.cpp b/src/mongo/db/repl/all_database_cloner.cpp
index d18f67cc39f..b5856b7aef0 100644
--- a/src/mongo/db/repl/all_database_cloner.cpp
+++ b/src/mongo/db/repl/all_database_cloner.cpp
@@ -122,15 +122,6 @@ BaseCloner::AfterStageBehavior AllDatabaseCloner::connectStage() {
}
BaseCloner::AfterStageBehavior AllDatabaseCloner::getInitialSyncIdStage() {
- auto wireVersion = static_cast<WireVersion>(getClient()->getMaxWireVersion());
- {
- stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData());
- getSharedData()->setSyncSourceWireVersion(lk, wireVersion);
- }
-
- // Wire versions prior to resumable initial sync don't have a sync source id.
- if (wireVersion < WireVersion::RESUMABLE_INITIAL_SYNC)
- return kContinueNormally;
auto initialSyncId = getClient()->findOne(
NamespaceString{ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace},
BSONObj{});
diff --git a/src/mongo/db/repl/all_database_cloner_test.cpp b/src/mongo/db/repl/all_database_cloner_test.cpp
index b77caa19e47..ad8aa737c78 100644
--- a/src/mongo/db/repl/all_database_cloner_test.cpp
+++ b/src/mongo/db/repl/all_database_cloner_test.cpp
@@ -295,59 +295,7 @@ TEST_F(AllDatabaseClonerTest, RetriesListDatabasesButRollBackIdChanges) {
ASSERT_EQ(Minutes(60), getSharedData()->getTotalTimeUnreachable(WithLock::withoutLock()));
}
-TEST_F(AllDatabaseClonerTest, RetriesListDatabasesButSourceNodeIsDowngraded) {
- _mockClient->setWireVersions(WireVersion::RESUMABLE_INITIAL_SYNC,
- WireVersion::RESUMABLE_INITIAL_SYNC);
- auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
- _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
- _mockServer->setCommandReply("listDatabases", fromjson("{ok:1, databases:[]}"));
-
- // Stop at the listDatabases stage.
- auto timesEnteredBeforeStage = beforeStageFailPoint->setMode(
- FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}"));
-
- auto cloner = makeAllDatabaseCloner();
-
- // Run the cloner in a separate thread.
- stdx::thread clonerThread([&] {
- Client::initThread("ClonerRunner");
- ASSERT_NOT_OK(cloner->run());
- });
-
- // Wait until we get to the listDatabases stage.
- beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1);
-
- // Bring the server down.
- _mockServer->shutdown();
-
- auto beforeRBIDFailPoint =
- globalFailPointRegistry().find("hangBeforeCheckingRollBackIdClonerStage");
- auto timesEnteredRBID = beforeRBIDFailPoint->setMode(
- FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}"));
- beforeStageFailPoint->setMode(FailPoint::off, 0);
- beforeRBIDFailPoint->waitForTimesEntered(timesEnteredRBID + 1);
- _clock.advance(Minutes(60));
-
- // Bring the server up, but change the wire version to an older one.
- LOGV2(21053, "Bringing mock server back up.");
- _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS,
- WireVersion::SHARDED_TRANSACTIONS);
- _mockServer->reboot();
-
- // Allow the cloner to finish.
- beforeRBIDFailPoint->setMode(FailPoint::off, 0);
- clonerThread.join();
-
- // Total retries and outage time should be available.
- ASSERT_EQ(0, getSharedData()->getRetryingOperationsCount(WithLock::withoutLock()));
- ASSERT_EQ(1, getSharedData()->getTotalRetries(WithLock::withoutLock()));
- ASSERT_EQ(Minutes(60), getSharedData()->getTotalTimeUnreachable(WithLock::withoutLock()));
-}
-
TEST_F(AllDatabaseClonerTest, RetriesListDatabasesButInitialSyncIdChanges) {
- // Initial Sync Ids are not checked before wire version RESUMABLE_INITIAL_SYNC.
- _mockClient->setWireVersions(WireVersion::RESUMABLE_INITIAL_SYNC,
- WireVersion::RESUMABLE_INITIAL_SYNC);
auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
_mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
_mockServer->setCommandReply("listDatabases", fromjson("{ok:1, databases:[]}"));
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index 08701e697f6..f38cb4e64c6 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -103,9 +103,6 @@ CollectionCloner::CollectionCloner(const NamespaceString& sourceNss,
invariant(collectionOptions.uuid);
_sourceDbAndUuid = NamespaceStringOrUUID(sourceNss.db().toString(), *collectionOptions.uuid);
_stats.ns = _sourceNss.ns();
-
- // Find out whether the sync source supports resumable queries.
- _resumeSupported = (getClient()->getMaxWireVersion() >= WireVersion::RESUMABLE_INITIAL_SYNC);
}
BaseCloner::ClonerStages CollectionCloner::getStages() {
@@ -320,72 +317,32 @@ void CollectionCloner::runQuery() {
// Non-resumable query.
Query query;
- if (_resumeSupported) {
- if (_resumeToken) {
- // Resume the query from where we left off.
- LOGV2_DEBUG(21133, 1, "Collection cloner will resume the last successful query");
- query.requestResumeToken(true).resumeAfter(_resumeToken.get());
- } else {
- // New attempt at a resumable query.
- LOGV2_DEBUG(21134, 1, "Collection cloner will run a new query");
- query.requestResumeToken(true);
- }
- query.hint(BSON("$natural" << 1));
+ if (_resumeToken) {
+ // Resume the query from where we left off.
+ LOGV2_DEBUG(21133, 1, "Collection cloner will resume the last successful query");
+ query.requestResumeToken(true).resumeAfter(_resumeToken.get());
+ } else {
+ // New attempt at a resumable query.
+ LOGV2_DEBUG(21134, 1, "Collection cloner will run a new query");
+ query.requestResumeToken(true);
}
+ query.hint(BSON("$natural" << 1));
// We reset this every time we retry or resume a query.
// We distinguish the first batch from the rest so that we only store the remote cursor id
// the first time we get it.
_firstBatchOfQueryRound = true;
- try {
- getClient()->query_DEPRECATED(
- [this](DBClientCursorBatchIterator& iter) { handleNextBatch(iter); },
- _sourceDbAndUuid,
- BSONObj{},
- query,
- nullptr /* fieldsToReturn */,
- QueryOption_NoCursorTimeout | QueryOption_SecondaryOk |
- (collectionClonerUsesExhaust ? QueryOption_Exhaust : 0),
- _collectionClonerBatchSize,
- ReadConcernArgs::kImplicitDefault);
- } catch (...) {
- auto status = exceptionToStatus();
-
- // If the collection was dropped at any point, we can just move on to the next cloner.
- // This applies to both resumable (4.4) and non-resumable (4.2) queries.
- if (status == ErrorCodes::NamespaceNotFound) {
- throw; // This will re-throw the NamespaceNotFound, resulting in a clean exit.
- }
-
- // Wire version 4.2 only.
- if (!_resumeSupported) {
- // If we lost our cursor last round, the only time we can can continue is if we find out
- // this round that the collection was dropped on the source (that scenario is covered
- // right above). If that is not the case, then the cloner would have more work to do,
- // but since we cannot resume the query, we must abort initial sync.
- if (_lostNonResumableCursor) {
- abortNonResumableClone(status);
- }
-
- // Collection has changed upstream. This will trigger the code block above next round,
- // (unless we find out the collection was dropped via getting a NamespaceNotFound).
- if (_queryStage.isCursorError(status)) {
- LOGV2(21135,
- "Lost cursor during non-resumable query: {error}",
- "Lost cursor during non-resumable query",
- "error"_attr = status);
- _lostNonResumableCursor = true;
- throw;
- }
- // Any other errors (including network errors, but excluding NamespaceNotFound) result
- // in immediate failure.
- abortNonResumableClone(status);
- }
-
- // Re-throw all query errors for resumable (4.4) queries.
- throw;
- }
+ getClient()->query_DEPRECATED(
+ [this](DBClientCursorBatchIterator& iter) { handleNextBatch(iter); },
+ _sourceDbAndUuid,
+ BSONObj{},
+ query,
+ nullptr /* fieldsToReturn */,
+ QueryOption_NoCursorTimeout | QueryOption_SecondaryOk |
+ (collectionClonerUsesExhaust ? QueryOption_Exhaust : 0),
+ _collectionClonerBatchSize,
+ ReadConcernArgs::kImplicitDefault);
}
void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) {
@@ -408,7 +365,7 @@ void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) {
uasserted(ErrorCodes::InitialSyncFailure, "Lost remote cursor");
}
- if (_firstBatchOfQueryRound && _resumeSupported) {
+ if (_firstBatchOfQueryRound) {
// Store the cursorId of the remote cursor.
_remoteCursorId = iter.getCursorId();
}
@@ -433,10 +390,8 @@ void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) {
uassertStatusOK(newStatus);
}
- if (_resumeSupported) {
- // Store the resume token for this batch.
- _resumeToken = iter.getPostBatchResumeToken();
- }
+ // Store the resume token for this batch.
+ _resumeToken = iter.getPostBatchResumeToken();
initialSyncHangCollectionClonerAfterHandlingBatchResponse.executeIf(
[&](const BSONObj&) {
@@ -511,18 +466,6 @@ void CollectionCloner::waitForDatabaseWorkToComplete() {
_dbWorkTaskRunner.join();
}
-// Throws.
-void CollectionCloner::abortNonResumableClone(const Status& status) {
- invariant(!_resumeSupported);
- LOGV2(21141,
- "Error during non-resumable clone: {error}",
- "Error during non-resumable clone",
- "error"_attr = status);
- std::string message = str::stream()
- << "Collection clone failed and is not resumable. nss: " << _sourceNss;
- uasserted(ErrorCodes::InitialSyncFailure, message);
-}
-
CollectionCloner::Stats CollectionCloner::getStats() const {
stdx::lock_guard<Latch> lk(_mutex);
return _stats;
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index 5ca0057fba1..80d8a9d72bc 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -226,12 +226,6 @@ private:
*/
void runQuery();
- /**
- * Used to terminate the clone when we encounter a fatal error during a non-resumable query.
- * Throws.
- */
- void abortNonResumableClone(const Status& status);
-
// All member variables are labeled with one of the following codes indicating the
// synchronization rules for accessing them.
//
@@ -266,9 +260,6 @@ private:
// like _documentsToInsert, is destroyed after those threads exit.
TaskRunner _dbWorkTaskRunner; // (R)
- // Does the sync source support resumable queries? (wire version 4.4+)
- bool _resumeSupported = false; // (X)
-
// The resumeToken used to resume after network error.
boost::optional<BSONObj> _resumeToken; // (X)
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp
index 99bec69d0c2..64db34fd042 100644
--- a/src/mongo/db/repl/collection_cloner_test.cpp
+++ b/src/mongo/db/repl/collection_cloner_test.cpp
@@ -79,9 +79,6 @@ protected:
};
_storageInterface.createCollectionForBulkFn = _standardCreateCollectionFn;
-
- _mockClient->setWireVersions(WireVersion::RESUMABLE_INITIAL_SYNC,
- WireVersion::RESUMABLE_INITIAL_SYNC);
_mockServer->assignCollectionUuid(_nss.ns(), _collUuid);
_mockServer->setCommandReply("replSetGetRBID",
BSON("ok" << 1 << "rbid" << getSharedData()->getRollBackId()));
@@ -146,17 +143,6 @@ class CollectionClonerTestResumable : public CollectionClonerTest {
}
};
-class CollectionClonerTestNonResumable : public CollectionClonerTest {
- void setUp() final {
- CollectionClonerTest::setUp();
- // Set client wireVersion to 4.2, where we do not yet support resumable cloning.
- _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS,
- WireVersion::SHARDED_TRANSACTIONS);
- stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData());
- getSharedData()->setSyncSourceWireVersion(lk, WireVersion::SHARDED_TRANSACTIONS);
- }
-};
-
TEST_F(CollectionClonerTestResumable, CollectionClonerPassesThroughErrorFromCollStatsCommand) {
auto cloner = makeCollectionCloner();
cloner->setStopAfterStage_forTest("count");
@@ -581,73 +567,6 @@ TEST_F(CollectionClonerTestResumable, DoNotCreateIDIndexIfAutoIndexIdUsed) {
ASSERT_EQ(collNss, _nss);
}
-TEST_F(CollectionClonerTestNonResumable, NonResumableQuerySuccess) {
- // Set client wireVersion to 4.2, where we do not yet support resumable cloning.
- // Set up data for preliminary stages
- auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
- << "_id_");
- // The collection cloner pre-stage makes a remote call to collStats to store in-progress
- // metrics.
- setMockServerReplies(BSON("size" << 10),
- createCountResponse(3),
- createCursorResponse(_nss.ns(), BSON_ARRAY(_idIndexSpec)));
-
- // Set up documents to be returned from upstream node.
- _mockServer->insert(_nss.ns(), BSON("_id" << 1));
- _mockServer->insert(_nss.ns(), BSON("_id" << 2));
- _mockServer->insert(_nss.ns(), BSON("_id" << 3));
-
- auto cloner = makeCollectionCloner();
-
- ASSERT_OK(cloner->run());
-
- ASSERT_EQUALS(3, _collectionStats->insertCount);
- ASSERT_TRUE(_collectionStats->commitCalled);
- auto stats = cloner->getStats();
- ASSERT_EQUALS(3u, stats.documentsCopied);
-}
-
-TEST_F(CollectionClonerTestNonResumable, NonResumableQueryFailure) {
- // Set up data for preliminary stages
- auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
- << "_id_");
- // The collection cloner pre-stage makes a remote call to collStats to store in-progress
- // metrics.
- setMockServerReplies(BSON("size" << 10),
- createCountResponse(3),
- createCursorResponse(_nss.ns(), BSON_ARRAY(_idIndexSpec)));
-
- auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
- auto timesEnteredBeforeStage = beforeStageFailPoint->setMode(
- FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}"));
-
- // Set up documents to be returned from upstream node.
- _mockServer->insert(_nss.ns(), BSON("_id" << 1));
- _mockServer->insert(_nss.ns(), BSON("_id" << 2));
- _mockServer->insert(_nss.ns(), BSON("_id" << 3));
-
- auto cloner = makeCollectionCloner();
-
- // Run the cloner in a separate thread.
- stdx::thread clonerThread([&] {
- Client::initThread("ClonerRunner");
- auto status = cloner->run();
- ASSERT_EQUALS(ErrorCodes::InitialSyncFailure, status);
- ASSERT_STRING_CONTAINS(status.reason(), "Collection clone failed and is not resumable");
- });
-
- // Wait until we get to the query stage.
- beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1);
-
- // Bring the server down.
- _mockServer->shutdown();
-
- // Let us begin with the query stage.
- beforeStageFailPoint->setMode(FailPoint::off, 0);
-
- clonerThread.join();
-}
-
// We will retry our query without having yet obtained a resume token.
TEST_F(CollectionClonerTestResumable, ResumableQueryFailTransientlyBeforeFirstBatchRetrySuccess) {
_mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
@@ -1087,147 +1006,6 @@ TEST_F(CollectionClonerTestResumable, ResumableQueryTwoResumes) {
ASSERT_EQUALS(7u, stats.documentsCopied);
}
-// We receive a QueryPlanKilled error, then a NamespaceNotFound error, indicating that the
-// collection no longer exists in the database.
-TEST_F(CollectionClonerTestNonResumable, NonResumableCursorErrorDropOK) {
- // Set up data for preliminary stages
- auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
- << "_id_");
- setMockServerReplies(BSON("size" << 10),
- createCountResponse(3),
- createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec)));
-
- auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
- auto timesEnteredBeforeStage = beforeStageFailPoint->setMode(
- FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}"));
-
- auto beforeRetryFailPoint = globalFailPointRegistry().find("hangBeforeRetryingClonerStage");
- auto timesEnteredBeforeRetry = beforeRetryFailPoint->setMode(
- FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}"));
-
- // Set up documents to be returned from upstream node.
- _mockServer->insert(_nss.ns(), BSON("_id" << 1));
- _mockServer->insert(_nss.ns(), BSON("_id" << 2));
- _mockServer->insert(_nss.ns(), BSON("_id" << 3));
-
- auto cloner = makeCollectionCloner();
-
- // Run the cloner in a separate thread.
- stdx::thread clonerThread([&] {
- Client::initThread("ClonerRunner");
- ASSERT_OK(cloner->run());
- });
-
- // Wait until we get to the query stage.
- beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1);
-
- // This will cause the next batch to fail once (transiently), but we do not support resume.
- auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore");
- failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'QueryPlanKilled'}"));
-
- // Let us begin with the query stage.
- beforeStageFailPoint->setMode(FailPoint::off, 0);
- beforeRetryFailPoint->waitForTimesEntered(timesEnteredBeforeRetry + 1);
-
- // Follow-up the QueryPlanKilled error with a NamespaceNotFound.
- failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'NamespaceNotFound'}"));
-
- beforeRetryFailPoint->setMode(FailPoint::off, 0);
- clonerThread.join();
-}
-
-// We receive an OperationFailed error, but the next error we receive is _not_ NamespaceNotFound,
-// which means the collection still exists in the database, but we cannot resume the query.
-TEST_F(CollectionClonerTestNonResumable, NonResumableCursorErrorThenOtherError) {
- // Set up data for preliminary stages
- auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
- << "_id_");
- setMockServerReplies(BSON("size" << 10),
- createCountResponse(3),
- createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec)));
-
- auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
- auto timesEnteredBeforeStage = beforeStageFailPoint->setMode(
- FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}"));
-
- auto beforeRetryFailPoint = globalFailPointRegistry().find("hangBeforeRetryingClonerStage");
- auto timesEnteredBeforeRetry = beforeRetryFailPoint->setMode(
- FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}"));
-
- // Set up documents to be returned from upstream node.
- _mockServer->insert(_nss.ns(), BSON("_id" << 1));
- _mockServer->insert(_nss.ns(), BSON("_id" << 2));
- _mockServer->insert(_nss.ns(), BSON("_id" << 3));
-
- auto cloner = makeCollectionCloner();
-
- // Run the cloner in a separate thread.
- stdx::thread clonerThread([&] {
- Client::initThread("ClonerRunner");
- auto status = cloner->run();
- ASSERT_EQUALS(ErrorCodes::InitialSyncFailure, status);
- ASSERT_STRING_CONTAINS(status.reason(), "Collection clone failed and is not resumable");
- });
-
- // Wait until we get to the query stage.
- beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1);
-
- // This will cause the next batch to fail once (transiently), but we do not support resume.
- auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore");
- failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'OperationFailed'}"));
-
- // Let us begin with the query stage.
- beforeStageFailPoint->setMode(FailPoint::off, 0);
- beforeRetryFailPoint->waitForTimesEntered(timesEnteredBeforeRetry + 1);
-
- // Follow-up the QueryPlanKilled error with a NamespaceNotFound.
- failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'UnknownError'}"));
-
- beforeRetryFailPoint->setMode(FailPoint::off, 0);
- clonerThread.join();
-}
-
-// We receive a CursorNotFound error, but the next query succeeds, indicating that the
-// collection still exists in the database, but we cannot resume the query.
-TEST_F(CollectionClonerTestNonResumable, NonResumableCursorErrorThenSuccessEqualsFailure) {
- // Set up data for preliminary stages
- auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
- << "_id_");
- setMockServerReplies(BSON("size" << 10),
- createCountResponse(3),
- createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec)));
-
- auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
- auto timesEnteredBeforeStage = beforeStageFailPoint->setMode(
- FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}"));
-
- // Set up documents to be returned from upstream node.
- _mockServer->insert(_nss.ns(), BSON("_id" << 1));
- _mockServer->insert(_nss.ns(), BSON("_id" << 2));
- _mockServer->insert(_nss.ns(), BSON("_id" << 3));
-
- auto cloner = makeCollectionCloner();
- cloner->setBatchSize_forTest(2);
-
- // Run the cloner in a separate thread.
- stdx::thread clonerThread([&] {
- Client::initThread("ClonerRunner");
- auto status = cloner->run();
- ASSERT_EQUALS(ErrorCodes::InitialSyncFailure, status);
- ASSERT_STRING_CONTAINS(status.reason(), "Collection clone failed and is not resumable");
- });
-
- // Wait until we get to the query stage.
- beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1);
-
- // This will cause the next batch to fail once (transiently), but we do not support resume.
- auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore");
- failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'CursorNotFound'}"));
-
- // Let us begin with the query stage. We let the next retry succeed this time.
- beforeStageFailPoint->setMode(FailPoint::off, 0);
- clonerThread.join();
-}
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/initial_sync_base_cloner.cpp b/src/mongo/db/repl/initial_sync_base_cloner.cpp
index c08d102ca9c..f91c75aa92d 100644
--- a/src/mongo/db/repl/initial_sync_base_cloner.cpp
+++ b/src/mongo/db/repl/initial_sync_base_cloner.cpp
@@ -104,27 +104,14 @@ void InitialSyncBaseCloner::handleStageAttemptFailed(BaseClonerStage* stage, Sta
}
Status InitialSyncBaseCloner::checkSyncSourceIsStillValid() {
+ auto status = checkInitialSyncIdIsUnchanged();
+ if (!status.isOK())
+ return status;
- WireVersion wireVersion;
- {
- stdx::lock_guard<ReplSyncSharedData> lk(*getSharedData());
- auto wireVersionOpt = getSharedData()->getSyncSourceWireVersion(lk);
- // The wire version should always have been set by the time this is called.
- invariant(wireVersionOpt);
- wireVersion = *wireVersionOpt;
- }
- if (wireVersion >= WireVersion::RESUMABLE_INITIAL_SYNC) {
- auto status = checkInitialSyncIdIsUnchanged();
- if (!status.isOK())
- return status;
- }
return checkRollBackIdIsUnchanged();
}
Status InitialSyncBaseCloner::checkInitialSyncIdIsUnchanged() {
- uassert(ErrorCodes::InitialSyncFailure,
- "Sync source was downgraded and no longer supports resumable initial sync",
- getClient()->getMaxWireVersion() >= WireVersion::RESUMABLE_INITIAL_SYNC);
BSONObj initialSyncId;
try {
initialSyncId = getClient()->findOne(
diff --git a/src/mongo/db/repl/initial_sync_cloner_test_fixture.cpp b/src/mongo/db/repl/initial_sync_cloner_test_fixture.cpp
index c0c407f3bc3..60142aa7ba2 100644
--- a/src/mongo/db/repl/initial_sync_cloner_test_fixture.cpp
+++ b/src/mongo/db/repl/initial_sync_cloner_test_fixture.cpp
@@ -53,7 +53,6 @@ InitialSyncSharedData* InitialSyncClonerTestFixture::getSharedData() {
void InitialSyncClonerTestFixture::setInitialSyncId() {
stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData());
- getSharedData()->setSyncSourceWireVersion(lk, WireVersion::RESUMABLE_INITIAL_SYNC);
getSharedData()->setInitialSyncSourceId(lk, _initialSyncId);
}
diff --git a/src/mongo/db/repl/initial_sync_shared_data.h b/src/mongo/db/repl/initial_sync_shared_data.h
index 0644f151997..e8d6a9d1722 100644
--- a/src/mongo/db/repl/initial_sync_shared_data.h
+++ b/src/mongo/db/repl/initial_sync_shared_data.h
@@ -61,20 +61,6 @@ public:
}
/**
- * Sets the wire version of the sync source.
- */
- void setSyncSourceWireVersion(WithLock, WireVersion wireVersion) {
- _syncSourceWireVersion = wireVersion;
- }
-
- /**
- * Returns the wire version of the sync source, if previously set.
- */
- boost::optional<WireVersion> getSyncSourceWireVersion(WithLock) {
- return _syncSourceWireVersion;
- }
-
- /**
* Sets the initial sync ID of the sync source.
*/
void setInitialSyncSourceId(WithLock, boost::optional<UUID> syncSourceId) {
@@ -209,9 +195,6 @@ private:
// Operation that may currently be retrying.
RetryableOperation _retryableOp;
- // The sync source wire version at the start of data cloning.
- boost::optional<WireVersion> _syncSourceWireVersion;
-
// The initial sync ID on the source at the start of data cloning.
boost::optional<UUID> _initialSyncSourceId;
};
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 8d304ba1ebf..3875c64ef9b 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -51,6 +51,7 @@
#include "mongo/db/repl/oplog_fetcher_mock.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/repl_server_parameters_gen.h"
+#include "mongo/db/repl/replication_consistency_markers_impl.h"
#include "mongo/db/repl/replication_consistency_markers_mock.h"
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/replication_recovery_mock.h"
@@ -385,6 +386,10 @@ protected:
_mockServer->setCommandReply("listDatabases", makeListDatabasesResponse({}));
_options1.uuid = UUID::gen();
+ _mockServer->insert(
+ ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(),
+ BSON("_id" << UUID::gen()));
+
reset();
launchExecutorThread();