diff options
author | Gabriel Marks <gabriel.marks@mongodb.com> | 2022-01-04 23:12:00 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-01-04 23:41:46 +0000 |
commit | a29535aaa54d6ca2ec7f87cea34ac4138ae8803d (patch) | |
tree | 2c82ecfea8edb3975a64a73a578c21a8ea73965a | |
parent | fdca8a8d628d5480e3f552f86ce89aa0d234741f (diff) | |
download | mongo-a29535aaa54d6ca2ec7f87cea34ac4138ae8803d.tar.gz |
SERVER-61505 Remove WireVersion::RESUMABLE_INITIAL_SYNC
-rw-r--r-- | src/mongo/db/repl/all_database_cloner.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/all_database_cloner_test.cpp | 52 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.cpp | 101 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.h | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner_test.cpp | 222 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_sync_base_cloner.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_sync_cloner_test_fixture.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_sync_shared_data.h | 17 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer_test.cpp | 5 |
9 files changed, 30 insertions, 405 deletions
diff --git a/src/mongo/db/repl/all_database_cloner.cpp b/src/mongo/db/repl/all_database_cloner.cpp index d18f67cc39f..b5856b7aef0 100644 --- a/src/mongo/db/repl/all_database_cloner.cpp +++ b/src/mongo/db/repl/all_database_cloner.cpp @@ -122,15 +122,6 @@ BaseCloner::AfterStageBehavior AllDatabaseCloner::connectStage() { } BaseCloner::AfterStageBehavior AllDatabaseCloner::getInitialSyncIdStage() { - auto wireVersion = static_cast<WireVersion>(getClient()->getMaxWireVersion()); - { - stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData()); - getSharedData()->setSyncSourceWireVersion(lk, wireVersion); - } - - // Wire versions prior to resumable initial sync don't have a sync source id. - if (wireVersion < WireVersion::RESUMABLE_INITIAL_SYNC) - return kContinueNormally; auto initialSyncId = getClient()->findOne( NamespaceString{ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace}, BSONObj{}); diff --git a/src/mongo/db/repl/all_database_cloner_test.cpp b/src/mongo/db/repl/all_database_cloner_test.cpp index b77caa19e47..ad8aa737c78 100644 --- a/src/mongo/db/repl/all_database_cloner_test.cpp +++ b/src/mongo/db/repl/all_database_cloner_test.cpp @@ -295,59 +295,7 @@ TEST_F(AllDatabaseClonerTest, RetriesListDatabasesButRollBackIdChanges) { ASSERT_EQ(Minutes(60), getSharedData()->getTotalTimeUnreachable(WithLock::withoutLock())); } -TEST_F(AllDatabaseClonerTest, RetriesListDatabasesButSourceNodeIsDowngraded) { - _mockClient->setWireVersions(WireVersion::RESUMABLE_INITIAL_SYNC, - WireVersion::RESUMABLE_INITIAL_SYNC); - auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); - _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); - _mockServer->setCommandReply("listDatabases", fromjson("{ok:1, databases:[]}")); - - // Stop at the listDatabases stage. - auto timesEnteredBeforeStage = beforeStageFailPoint->setMode( - FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}")); - - auto cloner = makeAllDatabaseCloner(); - - // Run the cloner in a separate thread. - stdx::thread clonerThread([&] { - Client::initThread("ClonerRunner"); - ASSERT_NOT_OK(cloner->run()); - }); - - // Wait until we get to the listDatabases stage. - beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1); - - // Bring the server down. - _mockServer->shutdown(); - - auto beforeRBIDFailPoint = - globalFailPointRegistry().find("hangBeforeCheckingRollBackIdClonerStage"); - auto timesEnteredRBID = beforeRBIDFailPoint->setMode( - FailPoint::alwaysOn, 0, fromjson("{cloner: 'AllDatabaseCloner', stage: 'listDatabases'}")); - beforeStageFailPoint->setMode(FailPoint::off, 0); - beforeRBIDFailPoint->waitForTimesEntered(timesEnteredRBID + 1); - _clock.advance(Minutes(60)); - - // Bring the server up, but change the wire version to an older one. - LOGV2(21053, "Bringing mock server back up."); - _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS, - WireVersion::SHARDED_TRANSACTIONS); - _mockServer->reboot(); - - // Allow the cloner to finish. - beforeRBIDFailPoint->setMode(FailPoint::off, 0); - clonerThread.join(); - - // Total retries and outage time should be available. - ASSERT_EQ(0, getSharedData()->getRetryingOperationsCount(WithLock::withoutLock())); - ASSERT_EQ(1, getSharedData()->getTotalRetries(WithLock::withoutLock())); - ASSERT_EQ(Minutes(60), getSharedData()->getTotalTimeUnreachable(WithLock::withoutLock())); -} - TEST_F(AllDatabaseClonerTest, RetriesListDatabasesButInitialSyncIdChanges) { - // Initial Sync Ids are not checked before wire version RESUMABLE_INITIAL_SYNC. - _mockClient->setWireVersions(WireVersion::RESUMABLE_INITIAL_SYNC, - WireVersion::RESUMABLE_INITIAL_SYNC); auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); _mockServer->setCommandReply("listDatabases", fromjson("{ok:1, databases:[]}")); diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 08701e697f6..f38cb4e64c6 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -103,9 +103,6 @@ CollectionCloner::CollectionCloner(const NamespaceString& sourceNss, invariant(collectionOptions.uuid); _sourceDbAndUuid = NamespaceStringOrUUID(sourceNss.db().toString(), *collectionOptions.uuid); _stats.ns = _sourceNss.ns(); - - // Find out whether the sync source supports resumable queries. - _resumeSupported = (getClient()->getMaxWireVersion() >= WireVersion::RESUMABLE_INITIAL_SYNC); } BaseCloner::ClonerStages CollectionCloner::getStages() { @@ -320,72 +317,32 @@ void CollectionCloner::runQuery() { // Non-resumable query. Query query; - if (_resumeSupported) { - if (_resumeToken) { - // Resume the query from where we left off. - LOGV2_DEBUG(21133, 1, "Collection cloner will resume the last successful query"); - query.requestResumeToken(true).resumeAfter(_resumeToken.get()); - } else { - // New attempt at a resumable query. - LOGV2_DEBUG(21134, 1, "Collection cloner will run a new query"); - query.requestResumeToken(true); - } - query.hint(BSON("$natural" << 1)); + if (_resumeToken) { + // Resume the query from where we left off. + LOGV2_DEBUG(21133, 1, "Collection cloner will resume the last successful query"); + query.requestResumeToken(true).resumeAfter(_resumeToken.get()); + } else { + // New attempt at a resumable query. + LOGV2_DEBUG(21134, 1, "Collection cloner will run a new query"); + query.requestResumeToken(true); } + query.hint(BSON("$natural" << 1)); // We reset this every time we retry or resume a query. // We distinguish the first batch from the rest so that we only store the remote cursor id // the first time we get it. _firstBatchOfQueryRound = true; - try { - getClient()->query_DEPRECATED( - [this](DBClientCursorBatchIterator& iter) { handleNextBatch(iter); }, - _sourceDbAndUuid, - BSONObj{}, - query, - nullptr /* fieldsToReturn */, - QueryOption_NoCursorTimeout | QueryOption_SecondaryOk | - (collectionClonerUsesExhaust ? QueryOption_Exhaust : 0), - _collectionClonerBatchSize, - ReadConcernArgs::kImplicitDefault); - } catch (...) { - auto status = exceptionToStatus(); - - // If the collection was dropped at any point, we can just move on to the next cloner. - // This applies to both resumable (4.4) and non-resumable (4.2) queries. - if (status == ErrorCodes::NamespaceNotFound) { - throw; // This will re-throw the NamespaceNotFound, resulting in a clean exit. - } - - // Wire version 4.2 only. - if (!_resumeSupported) { - // If we lost our cursor last round, the only time we can can continue is if we find out - // this round that the collection was dropped on the source (that scenario is covered - // right above). If that is not the case, then the cloner would have more work to do, - // but since we cannot resume the query, we must abort initial sync. - if (_lostNonResumableCursor) { - abortNonResumableClone(status); - } - - // Collection has changed upstream. This will trigger the code block above next round, - // (unless we find out the collection was dropped via getting a NamespaceNotFound). - if (_queryStage.isCursorError(status)) { - LOGV2(21135, - "Lost cursor during non-resumable query: {error}", - "Lost cursor during non-resumable query", - "error"_attr = status); - _lostNonResumableCursor = true; - throw; - } - // Any other errors (including network errors, but excluding NamespaceNotFound) result - // in immediate failure. - abortNonResumableClone(status); - } - - // Re-throw all query errors for resumable (4.4) queries. - throw; - } + getClient()->query_DEPRECATED( + [this](DBClientCursorBatchIterator& iter) { handleNextBatch(iter); }, + _sourceDbAndUuid, + BSONObj{}, + query, + nullptr /* fieldsToReturn */, + QueryOption_NoCursorTimeout | QueryOption_SecondaryOk | + (collectionClonerUsesExhaust ? QueryOption_Exhaust : 0), + _collectionClonerBatchSize, + ReadConcernArgs::kImplicitDefault); } void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) { @@ -408,7 +365,7 @@ void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) { uasserted(ErrorCodes::InitialSyncFailure, "Lost remote cursor"); } - if (_firstBatchOfQueryRound && _resumeSupported) { + if (_firstBatchOfQueryRound) { // Store the cursorId of the remote cursor. _remoteCursorId = iter.getCursorId(); } @@ -433,10 +390,8 @@ void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) { uassertStatusOK(newStatus); } - if (_resumeSupported) { - // Store the resume token for this batch. - _resumeToken = iter.getPostBatchResumeToken(); - } + // Store the resume token for this batch. + _resumeToken = iter.getPostBatchResumeToken(); initialSyncHangCollectionClonerAfterHandlingBatchResponse.executeIf( [&](const BSONObj&) { @@ -511,18 +466,6 @@ void CollectionCloner::waitForDatabaseWorkToComplete() { _dbWorkTaskRunner.join(); } -// Throws. -void CollectionCloner::abortNonResumableClone(const Status& status) { - invariant(!_resumeSupported); - LOGV2(21141, - "Error during non-resumable clone: {error}", - "Error during non-resumable clone", - "error"_attr = status); - std::string message = str::stream() - << "Collection clone failed and is not resumable. nss: " << _sourceNss; - uasserted(ErrorCodes::InitialSyncFailure, message); -} - CollectionCloner::Stats CollectionCloner::getStats() const { stdx::lock_guard<Latch> lk(_mutex); return _stats; diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index 5ca0057fba1..80d8a9d72bc 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -226,12 +226,6 @@ private: */ void runQuery(); - /** - * Used to terminate the clone when we encounter a fatal error during a non-resumable query. - * Throws. - */ - void abortNonResumableClone(const Status& status); - // All member variables are labeled with one of the following codes indicating the // synchronization rules for accessing them. // @@ -266,9 +260,6 @@ private: // like _documentsToInsert, is destroyed after those threads exit. TaskRunner _dbWorkTaskRunner; // (R) - // Does the sync source support resumable queries? (wire version 4.4+) - bool _resumeSupported = false; // (X) - // The resumeToken used to resume after network error. boost::optional<BSONObj> _resumeToken; // (X) diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index 99bec69d0c2..64db34fd042 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -79,9 +79,6 @@ protected: }; _storageInterface.createCollectionForBulkFn = _standardCreateCollectionFn; - - _mockClient->setWireVersions(WireVersion::RESUMABLE_INITIAL_SYNC, - WireVersion::RESUMABLE_INITIAL_SYNC); _mockServer->assignCollectionUuid(_nss.ns(), _collUuid); _mockServer->setCommandReply("replSetGetRBID", BSON("ok" << 1 << "rbid" << getSharedData()->getRollBackId())); @@ -146,17 +143,6 @@ class CollectionClonerTestResumable : public CollectionClonerTest { } }; -class CollectionClonerTestNonResumable : public CollectionClonerTest { - void setUp() final { - CollectionClonerTest::setUp(); - // Set client wireVersion to 4.2, where we do not yet support resumable cloning. - _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS, - WireVersion::SHARDED_TRANSACTIONS); - stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData()); - getSharedData()->setSyncSourceWireVersion(lk, WireVersion::SHARDED_TRANSACTIONS); - } -}; - TEST_F(CollectionClonerTestResumable, CollectionClonerPassesThroughErrorFromCollStatsCommand) { auto cloner = makeCollectionCloner(); cloner->setStopAfterStage_forTest("count"); @@ -581,73 +567,6 @@ TEST_F(CollectionClonerTestResumable, DoNotCreateIDIndexIfAutoIndexIdUsed) { ASSERT_EQ(collNss, _nss); } -TEST_F(CollectionClonerTestNonResumable, NonResumableQuerySuccess) { - // Set client wireVersion to 4.2, where we do not yet support resumable cloning. - // Set up data for preliminary stages - auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" - << "_id_"); - // The collection cloner pre-stage makes a remote call to collStats to store in-progress - // metrics. - setMockServerReplies(BSON("size" << 10), - createCountResponse(3), - createCursorResponse(_nss.ns(), BSON_ARRAY(_idIndexSpec))); - - // Set up documents to be returned from upstream node. - _mockServer->insert(_nss.ns(), BSON("_id" << 1)); - _mockServer->insert(_nss.ns(), BSON("_id" << 2)); - _mockServer->insert(_nss.ns(), BSON("_id" << 3)); - - auto cloner = makeCollectionCloner(); - - ASSERT_OK(cloner->run()); - - ASSERT_EQUALS(3, _collectionStats->insertCount); - ASSERT_TRUE(_collectionStats->commitCalled); - auto stats = cloner->getStats(); - ASSERT_EQUALS(3u, stats.documentsCopied); -} - -TEST_F(CollectionClonerTestNonResumable, NonResumableQueryFailure) { - // Set up data for preliminary stages - auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" - << "_id_"); - // The collection cloner pre-stage makes a remote call to collStats to store in-progress - // metrics. - setMockServerReplies(BSON("size" << 10), - createCountResponse(3), - createCursorResponse(_nss.ns(), BSON_ARRAY(_idIndexSpec))); - - auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); - auto timesEnteredBeforeStage = beforeStageFailPoint->setMode( - FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}")); - - // Set up documents to be returned from upstream node. - _mockServer->insert(_nss.ns(), BSON("_id" << 1)); - _mockServer->insert(_nss.ns(), BSON("_id" << 2)); - _mockServer->insert(_nss.ns(), BSON("_id" << 3)); - - auto cloner = makeCollectionCloner(); - - // Run the cloner in a separate thread. - stdx::thread clonerThread([&] { - Client::initThread("ClonerRunner"); - auto status = cloner->run(); - ASSERT_EQUALS(ErrorCodes::InitialSyncFailure, status); - ASSERT_STRING_CONTAINS(status.reason(), "Collection clone failed and is not resumable"); - }); - - // Wait until we get to the query stage. - beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1); - - // Bring the server down. - _mockServer->shutdown(); - - // Let us begin with the query stage. - beforeStageFailPoint->setMode(FailPoint::off, 0); - - clonerThread.join(); -} - // We will retry our query without having yet obtained a resume token. TEST_F(CollectionClonerTestResumable, ResumableQueryFailTransientlyBeforeFirstBatchRetrySuccess) { _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); @@ -1087,147 +1006,6 @@ TEST_F(CollectionClonerTestResumable, ResumableQueryTwoResumes) { ASSERT_EQUALS(7u, stats.documentsCopied); } -// We receive a QueryPlanKilled error, then a NamespaceNotFound error, indicating that the -// collection no longer exists in the database. -TEST_F(CollectionClonerTestNonResumable, NonResumableCursorErrorDropOK) { - // Set up data for preliminary stages - auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" - << "_id_"); - setMockServerReplies(BSON("size" << 10), - createCountResponse(3), - createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec))); - - auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); - auto timesEnteredBeforeStage = beforeStageFailPoint->setMode( - FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}")); - - auto beforeRetryFailPoint = globalFailPointRegistry().find("hangBeforeRetryingClonerStage"); - auto timesEnteredBeforeRetry = beforeRetryFailPoint->setMode( - FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}")); - - // Set up documents to be returned from upstream node. - _mockServer->insert(_nss.ns(), BSON("_id" << 1)); - _mockServer->insert(_nss.ns(), BSON("_id" << 2)); - _mockServer->insert(_nss.ns(), BSON("_id" << 3)); - - auto cloner = makeCollectionCloner(); - - // Run the cloner in a separate thread. - stdx::thread clonerThread([&] { - Client::initThread("ClonerRunner"); - ASSERT_OK(cloner->run()); - }); - - // Wait until we get to the query stage. - beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1); - - // This will cause the next batch to fail once (transiently), but we do not support resume. - auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore"); - failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'QueryPlanKilled'}")); - - // Let us begin with the query stage. - beforeStageFailPoint->setMode(FailPoint::off, 0); - beforeRetryFailPoint->waitForTimesEntered(timesEnteredBeforeRetry + 1); - - // Follow-up the QueryPlanKilled error with a NamespaceNotFound. - failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'NamespaceNotFound'}")); - - beforeRetryFailPoint->setMode(FailPoint::off, 0); - clonerThread.join(); -} - -// We receive an OperationFailed error, but the next error we receive is _not_ NamespaceNotFound, -// which means the collection still exists in the database, but we cannot resume the query. -TEST_F(CollectionClonerTestNonResumable, NonResumableCursorErrorThenOtherError) { - // Set up data for preliminary stages - auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" - << "_id_"); - setMockServerReplies(BSON("size" << 10), - createCountResponse(3), - createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec))); - - auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); - auto timesEnteredBeforeStage = beforeStageFailPoint->setMode( - FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}")); - - auto beforeRetryFailPoint = globalFailPointRegistry().find("hangBeforeRetryingClonerStage"); - auto timesEnteredBeforeRetry = beforeRetryFailPoint->setMode( - FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}")); - - // Set up documents to be returned from upstream node. - _mockServer->insert(_nss.ns(), BSON("_id" << 1)); - _mockServer->insert(_nss.ns(), BSON("_id" << 2)); - _mockServer->insert(_nss.ns(), BSON("_id" << 3)); - - auto cloner = makeCollectionCloner(); - - // Run the cloner in a separate thread. - stdx::thread clonerThread([&] { - Client::initThread("ClonerRunner"); - auto status = cloner->run(); - ASSERT_EQUALS(ErrorCodes::InitialSyncFailure, status); - ASSERT_STRING_CONTAINS(status.reason(), "Collection clone failed and is not resumable"); - }); - - // Wait until we get to the query stage. - beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1); - - // This will cause the next batch to fail once (transiently), but we do not support resume. - auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore"); - failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'OperationFailed'}")); - - // Let us begin with the query stage. - beforeStageFailPoint->setMode(FailPoint::off, 0); - beforeRetryFailPoint->waitForTimesEntered(timesEnteredBeforeRetry + 1); - - // Follow-up the QueryPlanKilled error with a NamespaceNotFound. - failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'UnknownError'}")); - - beforeRetryFailPoint->setMode(FailPoint::off, 0); - clonerThread.join(); -} - -// We receive a CursorNotFound error, but the next query succeeds, indicating that the -// collection still exists in the database, but we cannot resume the query. -TEST_F(CollectionClonerTestNonResumable, NonResumableCursorErrorThenSuccessEqualsFailure) { - // Set up data for preliminary stages - auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" - << "_id_"); - setMockServerReplies(BSON("size" << 10), - createCountResponse(3), - createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec))); - - auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); - auto timesEnteredBeforeStage = beforeStageFailPoint->setMode( - FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}")); - - // Set up documents to be returned from upstream node. - _mockServer->insert(_nss.ns(), BSON("_id" << 1)); - _mockServer->insert(_nss.ns(), BSON("_id" << 2)); - _mockServer->insert(_nss.ns(), BSON("_id" << 3)); - - auto cloner = makeCollectionCloner(); - cloner->setBatchSize_forTest(2); - - // Run the cloner in a separate thread. - stdx::thread clonerThread([&] { - Client::initThread("ClonerRunner"); - auto status = cloner->run(); - ASSERT_EQUALS(ErrorCodes::InitialSyncFailure, status); - ASSERT_STRING_CONTAINS(status.reason(), "Collection clone failed and is not resumable"); - }); - - // Wait until we get to the query stage. - beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1); - - // This will cause the next batch to fail once (transiently), but we do not support resume. - auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore"); - failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'CursorNotFound'}")); - - // Let us begin with the query stage. We let the next retry succeed this time. - beforeStageFailPoint->setMode(FailPoint::off, 0); - clonerThread.join(); -} } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/initial_sync_base_cloner.cpp b/src/mongo/db/repl/initial_sync_base_cloner.cpp index c08d102ca9c..f91c75aa92d 100644 --- a/src/mongo/db/repl/initial_sync_base_cloner.cpp +++ b/src/mongo/db/repl/initial_sync_base_cloner.cpp @@ -104,27 +104,14 @@ void InitialSyncBaseCloner::handleStageAttemptFailed(BaseClonerStage* stage, Sta } Status InitialSyncBaseCloner::checkSyncSourceIsStillValid() { + auto status = checkInitialSyncIdIsUnchanged(); + if (!status.isOK()) + return status; - WireVersion wireVersion; - { - stdx::lock_guard<ReplSyncSharedData> lk(*getSharedData()); - auto wireVersionOpt = getSharedData()->getSyncSourceWireVersion(lk); - // The wire version should always have been set by the time this is called. - invariant(wireVersionOpt); - wireVersion = *wireVersionOpt; - } - if (wireVersion >= WireVersion::RESUMABLE_INITIAL_SYNC) { - auto status = checkInitialSyncIdIsUnchanged(); - if (!status.isOK()) - return status; - } return checkRollBackIdIsUnchanged(); } Status InitialSyncBaseCloner::checkInitialSyncIdIsUnchanged() { - uassert(ErrorCodes::InitialSyncFailure, - "Sync source was downgraded and no longer supports resumable initial sync", - getClient()->getMaxWireVersion() >= WireVersion::RESUMABLE_INITIAL_SYNC); BSONObj initialSyncId; try { initialSyncId = getClient()->findOne( diff --git a/src/mongo/db/repl/initial_sync_cloner_test_fixture.cpp b/src/mongo/db/repl/initial_sync_cloner_test_fixture.cpp index c0c407f3bc3..60142aa7ba2 100644 --- a/src/mongo/db/repl/initial_sync_cloner_test_fixture.cpp +++ b/src/mongo/db/repl/initial_sync_cloner_test_fixture.cpp @@ -53,7 +53,6 @@ InitialSyncSharedData* InitialSyncClonerTestFixture::getSharedData() { void InitialSyncClonerTestFixture::setInitialSyncId() { stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData()); - getSharedData()->setSyncSourceWireVersion(lk, WireVersion::RESUMABLE_INITIAL_SYNC); getSharedData()->setInitialSyncSourceId(lk, _initialSyncId); } diff --git a/src/mongo/db/repl/initial_sync_shared_data.h b/src/mongo/db/repl/initial_sync_shared_data.h index 0644f151997..e8d6a9d1722 100644 --- a/src/mongo/db/repl/initial_sync_shared_data.h +++ b/src/mongo/db/repl/initial_sync_shared_data.h @@ -61,20 +61,6 @@ public: } /** - * Sets the wire version of the sync source. - */ - void setSyncSourceWireVersion(WithLock, WireVersion wireVersion) { - _syncSourceWireVersion = wireVersion; - } - - /** - * Returns the wire version of the sync source, if previously set. - */ - boost::optional<WireVersion> getSyncSourceWireVersion(WithLock) { - return _syncSourceWireVersion; - } - - /** * Sets the initial sync ID of the sync source. */ void setInitialSyncSourceId(WithLock, boost::optional<UUID> syncSourceId) { @@ -209,9 +195,6 @@ private: // Operation that may currently be retrying. RetryableOperation _retryableOp; - // The sync source wire version at the start of data cloning. - boost::optional<WireVersion> _syncSourceWireVersion; - // The initial sync ID on the source at the start of data cloning. boost::optional<UUID> _initialSyncSourceId; }; diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 8d304ba1ebf..3875c64ef9b 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -51,6 +51,7 @@ #include "mongo/db/repl/oplog_fetcher_mock.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/repl_server_parameters_gen.h" +#include "mongo/db/repl/replication_consistency_markers_impl.h" #include "mongo/db/repl/replication_consistency_markers_mock.h" #include "mongo/db/repl/replication_process.h" #include "mongo/db/repl/replication_recovery_mock.h" @@ -385,6 +386,10 @@ protected: _mockServer->setCommandReply("listDatabases", makeListDatabasesResponse({})); _options1.uuid = UUID::gen(); + _mockServer->insert( + ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(), + BSON("_id" << UUID::gen())); + reset(); launchExecutorThread(); |