diff options
author | XueruiFa <xuerui.fa@mongodb.com> | 2020-06-22 18:31:41 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-07-01 16:09:42 +0000 |
commit | 5ffbd8f8322651b4953f29da0cde9e31eab039d4 (patch) | |
tree | b81486a2abdcf60825b1b9427d964499c366d94b /src/mongo/db | |
parent | e4f92ef12f233e86be9de019f16db4f5dde47ad5 (diff) | |
download | mongo-5ffbd8f8322651b4953f29da0cde9e31eab039d4.tar.gz |
SERVER-47270: Only run the SyncSourceResolver if the syncing node is in rollback-via-refetch
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer.cpp | 36 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.h | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_mock.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_test.cpp | 106 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_process.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver_test.cpp | 73 |
10 files changed, 165 insertions, 132 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 4a6f2928365..b1bdb3eff66 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -491,12 +491,20 @@ void BackgroundSync::_produce() { // "lastFetched" not used. Already set in _enqueueDocuments. Status fetcherReturnStatus = Status::OK(); + int syncSourceRBID = syncSourceResp.rbid; + DataReplicatorExternalStateBackgroundSync dataReplicatorExternalState( _replCoord, _replicationCoordinatorExternalState, this); OplogFetcher* oplogFetcher; try { - auto onOplogFetcherShutdownCallbackFn = [&fetcherReturnStatus](const Status& status) { + auto onOplogFetcherShutdownCallbackFn = [&fetcherReturnStatus, + &syncSourceRBID](const Status& status, int rbid) { fetcherReturnStatus = status; + // If the syncSourceResp rbid is uninitialized, syncSourceRBID will be set to the + // rbid obtained in the oplog fetcher. + if (syncSourceRBID == ReplicationProcess::kUninitializedRollbackId) { + syncSourceRBID = rbid; + } }; // The construction of OplogFetcher has to be outside bgsync mutex, because it calls // replication coordinator. @@ -574,8 +582,7 @@ void BackgroundSync::_produce() { } else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing) { auto opCtx = cc().makeOperationContext(); auto storageInterface = StorageInterface::get(opCtx.get()); - _runRollback( - opCtx.get(), fetcherReturnStatus, source, syncSourceResp.rbid, storageInterface); + _runRollback(opCtx.get(), fetcherReturnStatus, source, syncSourceRBID, storageInterface); if (bgSyncHangAfterRunRollback.shouldFail()) { LOGV2(21095, "bgSyncHangAfterRunRollback failpoint is set"); diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index 08dc0b83be5..ccd3bd67b68 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -1151,24 +1151,24 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> } const auto& config = configResult.getValue(); - _oplogFetcher = - _createOplogFetcherFn(_exec, - beginFetchingOpTime, - _syncSource, - config, - std::make_unique<OplogFetcherRestartDecisionInitialSyncer>( - _sharedData.get(), _opts.oplogFetcherMaxFetcherRestarts), - _rollbackChecker->getBaseRBID(), - false /* requireFresherSyncSource */, - _dataReplicatorExternalState.get(), - [=](OplogFetcher::Documents::const_iterator first, - OplogFetcher::Documents::const_iterator last, - const OplogFetcher::DocumentsInfo& info) { - return _enqueueDocuments(first, last, info); - }, - [=](const Status& s) { _oplogFetcherCallback(s, onCompletionGuard); }, - initialSyncOplogFetcherBatchSize, - OplogFetcher::StartingPoint::kEnqueueFirstDoc); + _oplogFetcher = _createOplogFetcherFn( + _exec, + beginFetchingOpTime, + _syncSource, + config, + std::make_unique<OplogFetcherRestartDecisionInitialSyncer>( + _sharedData.get(), _opts.oplogFetcherMaxFetcherRestarts), + _rollbackChecker->getBaseRBID(), + false /* requireFresherSyncSource */, + _dataReplicatorExternalState.get(), + [=](OplogFetcher::Documents::const_iterator first, + OplogFetcher::Documents::const_iterator last, + const OplogFetcher::DocumentsInfo& info) { + return _enqueueDocuments(first, last, info); + }, + [=](const Status& s, int rbid) { _oplogFetcherCallback(s, onCompletionGuard); }, + initialSyncOplogFetcherBatchSize, + OplogFetcher::StartingPoint::kEnqueueFirstDoc); LOGV2_DEBUG(21178, 2, diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 7c664838887..43be886e67e 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -307,8 +307,7 @@ Milliseconds OplogFetcher::_getRetriedFindMaxTime() const { void OplogFetcher::_finishCallback(Status status) { invariant(isActive()); - - _onShutdownCallbackFn(status); + _onShutdownCallbackFn(status, _requiredRBID); decltype(_onShutdownCallbackFn) onShutdownCallbackFn; decltype(_oplogFetcherRestartDecision) oplogFetcherRestartDecision; @@ -764,15 +763,19 @@ Status OplogFetcher::_checkRemoteOplogStart(const OplogFetcher::Documents& docum int remoteRBID) { using namespace fmt::literals; - // Once we establish our cursor, we need to ensure that our upstream node hasn't rolled back - // since that could cause it to not have our required minValid point. The cursor will be - // killed if the upstream node rolls back so we don't need to keep checking once the cursor - // is established. - if (remoteRBID != _requiredRBID) { + // Once we establish our cursor, if we use rollback-via-refetch, we need to ensure that our + // upstream node hasn't rolled back since that could cause it to not have our required minValid + // point. The cursor will be killed if the upstream node rolls back so we don't need to keep + // checking once the cursor is established. If we do not use rollback-via-refetch, this check is + // not necessary, and _requiredRBID will be set to kUninitializedRollbackId in that case. + if (_requiredRBID != ReplicationProcess::kUninitializedRollbackId && + remoteRBID != _requiredRBID) { return Status(ErrorCodes::InvalidSyncSource, "Upstream node rolled back after choosing it as a sync source. Choosing " "new sync source."); } + // Set _requiredRBID to remoteRBID so that it can be returned when the oplog fetcher shuts down. + _requiredRBID = remoteRBID; // Sometimes our remoteLastOpApplied may be stale; if we received a document with an // opTime later than remoteLastApplied, we can assume the remote is at least up to that diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index 0292dc5e7b0..4c3a765a780 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -41,6 +41,7 @@ #include "mongo/db/repl/abstract_async_component.h" #include "mongo/db/repl/data_replicator_external_state.h" #include "mongo/db/repl/repl_set_config.h" +#include "mongo/db/repl/replication_process.h" #include "mongo/util/fail_point.h" namespace mongo { @@ -84,10 +85,13 @@ public: * The status will be Status::OK() if we have processed the last batch of operations from the * cursor. * + * rbid will be set to the rollback id of the oplog query metadata for the first batch fetched + * from the sync source. + * * This function will be called 0 times if startup() fails and at most once after startup() * returns success. */ - using OnShutdownCallbackFn = std::function<void(const Status& shutdownStatus)>; + using OnShutdownCallbackFn = std::function<void(const Status& shutdownStatus, int rbid)>; /** * Container for BSON documents extracted from cursor results. @@ -350,8 +354,7 @@ private: * Checks the first batch of results from query. * 'documents' are the first batch of results returned from tailing the remote oplog. * 'remoteLastOpApplied' is the last OpTime applied on the sync source. - * 'remoteRBID' is a RollbackId for the sync source returned in this oplog query. This is - * optional for compatibility with 3.4 servers that do not send OplogQueryMetadata. + * 'remoteRBID' is a RollbackId for the sync source returned in this oplog query. * * Returns TooStaleToSyncFromSource if we are too stale to sync from our source. * Returns OplogStartMissing if we should go into rollback. @@ -377,7 +380,8 @@ private: // Namespace of the oplog to read. const NamespaceString _nss = NamespaceString::kRsOplogNamespace; - // Rollback ID that the sync source is required to have after the first batch. + // Rollback ID that the sync source is required to have after the first batch. If the value is + // uninitialized, the oplog fetcher has not contacted the sync source yet. int _requiredRBID; // Indicates whether the current batch is the first received via this cursor. diff --git a/src/mongo/db/repl/oplog_fetcher_mock.cpp b/src/mongo/db/repl/oplog_fetcher_mock.cpp index 271cf8b772a..0331fffe950 100644 --- a/src/mongo/db/repl/oplog_fetcher_mock.cpp +++ b/src/mongo/db/repl/oplog_fetcher_mock.cpp @@ -61,7 +61,7 @@ OplogFetcherMock::OplogFetcherMock( // Pass a dummy EnqueueDocumentsFn to the base OplogFetcher. [](const auto& a1, const auto& a2, const auto& a3) { return Status::OK(); }, // Pass a dummy OnShutdownCallbackFn to the base OplogFetcher. - [](const auto& a) {}, + [](const auto& a, const int b) {}, batchSize, startingPoint), _oplogFetcherRestartDecision(std::move(oplogFetcherRestartDecision)), @@ -210,7 +210,7 @@ void OplogFetcherMock::_finishCallback(Status status) { invariant(isActive()); // Call _onShutdownCallbackFn outside of the mutex. - _onShutdownCallbackFn(status); + _onShutdownCallbackFn(status, ReplicationProcess::kUninitializedRollbackId); decltype(_onShutdownCallbackFn) onShutdownCallbackFn; decltype(_oplogFetcherRestartDecision) oplogFetcherRestartDecision; diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index 3987c4a6437..61380e4c244 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -250,12 +250,18 @@ public: Status getStatus() const; /** + * Returns the rbid at shutdown. + */ + int getRBID() const; + + /** * Use this for oplog fetcher shutdown callback. */ - void operator()(const Status& status); + void operator()(const Status& status, int rbid); private: Status _status = executor::TaskExecutorTest::getDetectableErrorStatus(); + int _rbid = ReplicationProcess::kUninitializedRollbackId; }; ShutdownState::ShutdownState() = default; @@ -264,8 +270,13 @@ Status ShutdownState::getStatus() const { return _status; } -void ShutdownState::operator()(const Status& status) { +int ShutdownState::getRBID() const { + return _rbid; +} + +void ShutdownState::operator()(const Status& status, int rbid) { _status = status; + _rbid = rbid; } class OplogFetcherTest : public executor::ThreadPoolExecutorTest, @@ -274,7 +285,7 @@ protected: static const OpTime remoteNewerOpTime; static const OpTime staleOpTime; static const Date_t staleWallTime; - static const int rbid = 2; + static const int remoteRBID = 2; static const int primaryIndex = 2; static const int syncSourceIndex = 2; static const rpc::OplogQueryMetadata oqMetadata; @@ -292,17 +303,21 @@ protected: OplogFetcher::OnShutdownCallbackFn fn, int numRestarts = 0, bool requireFresherSyncSource = true, - OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc); + OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc, + int requiredRBID = ReplicationProcess::kUninitializedRollbackId); std::unique_ptr<OplogFetcher> getOplogFetcherAfterConnectionCreated( OplogFetcher::OnShutdownCallbackFn fn, int numRestarts = 0, bool requireFresherSyncSource = true, - OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc); + OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc, + int requiredRBID = ReplicationProcess::kUninitializedRollbackId); - std::unique_ptr<ShutdownState> processSingleBatch(const Message& response, - bool shouldShutdown = false, - bool requireFresherSyncSource = true, - bool lastFetchedShouldAdvance = false); + std::unique_ptr<ShutdownState> processSingleBatch( + const Message& response, + bool shouldShutdown = false, + bool requireFresherSyncSource = true, + bool lastFetchedShouldAdvance = false, + int requiredRBID = ReplicationProcess::kUninitializedRollbackId); /** * Tests checkSyncSource result handling. @@ -324,15 +339,15 @@ protected: std::unique_ptr<MockRemoteDBServer> _mockServer; }; -const int OplogFetcherTest::rbid; +const int OplogFetcherTest::remoteRBID; const OpTime OplogFetcherTest::remoteNewerOpTime = OpTime(Timestamp(1000, 1), 2); const rpc::OplogQueryMetadata OplogFetcherTest::oqMetadata = rpc::OplogQueryMetadata( - {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, primaryIndex, syncSourceIndex); + {staleOpTime, staleWallTime}, remoteNewerOpTime, remoteRBID, primaryIndex, syncSourceIndex); const OpTime OplogFetcherTest::staleOpTime = OpTime(Timestamp(1, 1), 0); const Date_t OplogFetcherTest::staleWallTime = Date_t() + Seconds(staleOpTime.getSecs()); const rpc::OplogQueryMetadata OplogFetcherTest::staleOqMetadata = rpc::OplogQueryMetadata( - {staleOpTime, staleWallTime}, staleOpTime, rbid, primaryIndex, syncSourceIndex); + {staleOpTime, staleWallTime}, staleOpTime, remoteRBID, primaryIndex, syncSourceIndex); const rpc::ReplSetMetadata OplogFetcherTest::replSetMetadata = rpc::ReplSetMetadata(1, OpTimeAndWallTime(), OpTime(), 1, 0, OID(), syncSourceIndex, false); @@ -369,16 +384,17 @@ void OplogFetcherTest::setUp() { } std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcher() { - return makeOplogFetcherWithDifferentExecutor(&getExecutor(), [](Status) {}); + return makeOplogFetcherWithDifferentExecutor(&getExecutor(), [](Status, int) {}); } std::unique_ptr<OplogFetcher> OplogFetcherTest::getOplogFetcherAfterConnectionCreated( OplogFetcher::OnShutdownCallbackFn fn, int numRestarts, bool requireFresherSyncSource, - OplogFetcher::StartingPoint startingPoint) { + OplogFetcher::StartingPoint startingPoint, + int requiredRBID) { auto oplogFetcher = makeOplogFetcherWithDifferentExecutor( - &getExecutor(), fn, numRestarts, requireFresherSyncSource, startingPoint); + &getExecutor(), fn, numRestarts, requireFresherSyncSource, startingPoint, requiredRBID); auto waitForConnCreatedFailPoint = globalFailPointRegistry().find("hangAfterOplogFetcherCallbackScheduled"); @@ -400,14 +416,15 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcherWithDifferentExe OplogFetcher::OnShutdownCallbackFn fn, int numRestarts, bool requireFresherSyncSource, - OplogFetcher::StartingPoint startingPoint) { + OplogFetcher::StartingPoint startingPoint, + int requiredRBID) { auto oplogFetcher = std::make_unique<OplogFetcher>( executor, lastFetched, source, _createConfig(), std::make_unique<OplogFetcher::OplogFetcherRestartDecisionDefault>(numRestarts), - rbid, + requiredRBID, requireFresherSyncSource, dataReplicatorExternalState.get(), enqueueDocumentsFn, @@ -425,12 +442,17 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcherWithDifferentExe std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(const Message& response, bool shouldShutdown, bool requireFresherSyncSource, - bool lastFetchedShouldAdvance) { + bool lastFetchedShouldAdvance, + int requiredRBID) { auto shutdownState = std::make_unique<ShutdownState>(); // Create an oplog fetcher with no retries. - auto oplogFetcher = getOplogFetcherAfterConnectionCreated( - std::ref(*shutdownState), 0, requireFresherSyncSource); + auto oplogFetcher = + getOplogFetcherAfterConnectionCreated(std::ref(*shutdownState), + 0, + requireFresherSyncSource, + OplogFetcher::StartingPoint::kSkipFirstDoc, + requiredRBID); // Update lastFetched before it is updated by getting the next batch. lastFetched = oplogFetcher->getLastOpTimeFetched_forTest(); @@ -515,8 +537,8 @@ TEST_F(OplogFetcherTest, OplogFetcherReturnsOperationFailedIfExecutorFailsToSche taskExecutorMock.shouldFailScheduleWorkRequest = []() { return true; }; // The onShutdownFn should not be called because the oplog fetcher should fail during startup. - auto oplogFetcher = - makeOplogFetcherWithDifferentExecutor(&taskExecutorMock, [](Status) { MONGO_UNREACHABLE; }); + auto oplogFetcher = makeOplogFetcherWithDifferentExecutor( + &taskExecutorMock, [](Status, int) { MONGO_UNREACHABLE; }); // Last optime fetched should match values passed to constructor. ASSERT_EQUALS(lastFetched, oplogFetcher->getLastOpTimeFetched_forTest()); @@ -549,6 +571,7 @@ TEST_F(OplogFetcherTest, ShuttingExecutorDownAfterStartupButBeforeRunQuerySchedu oplogFetcher->join(); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus()); + ASSERT_EQUALS(ReplicationProcess::kUninitializedRollbackId, shutdownState.getRBID()); } TEST_F(OplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownBeforeRunQueryScheduled) { @@ -571,6 +594,7 @@ TEST_F(OplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownBeforeRunQ oplogFetcher->join(); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus()); + ASSERT_EQUALS(ReplicationProcess::kUninitializedRollbackId, shutdownState.getRBID()); } TEST_F(OplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownAfterRunQueryScheduled) { @@ -596,6 +620,7 @@ TEST_F(OplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownAfterRunQu oplogFetcher->join(); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus()); + ASSERT_EQUALS(ReplicationProcess::kUninitializedRollbackId, shutdownState.getRBID()); } TEST_F(OplogFetcherTest, @@ -620,6 +645,7 @@ TEST_F(OplogFetcherTest, // This is the error that the connection throws if shutdown while blocked on the network. ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState.getStatus()); + ASSERT_EQUALS(ReplicationProcess::kUninitializedRollbackId, shutdownState.getRBID()); } TEST_F(OplogFetcherTest, @@ -662,6 +688,7 @@ TEST_F(OplogFetcherTest, oplogFetcher->join(); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus()); + ASSERT_EQUALS(ReplicationProcess::kUninitializedRollbackId, shutdownState.getRBID()); } bool sharedCallbackStateDestroyed = false; @@ -693,7 +720,7 @@ TEST_F(OplogFetcherTest, OplogFetcherResetsOnShutdownCallbackFnOnCompletion) { auto status = getDetectableErrorStatus(); auto oplogFetcher = getOplogFetcherAfterConnectionCreated( - [&callbackInvoked, sharedCallbackData, &status](const Status& shutdownStatus) { + [&callbackInvoked, sharedCallbackData, &status](const Status& shutdownStatus, int rbid) { status = shutdownStatus, callbackInvoked = true; }); @@ -861,12 +888,21 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack) CursorId cursorId = 22LL; auto entry = makeNoopOplogEntry(lastFetched); - rpc::OplogQueryMetadata oplogQueryMetadata( - {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid + 1, primaryIndex, syncSourceIndex); + rpc::OplogQueryMetadata oplogQueryMetadata({staleOpTime, staleWallTime}, + remoteNewerOpTime, + remoteRBID + 1, + primaryIndex, + syncSourceIndex); auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oplogQueryMetadata); - ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, - processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus()); + auto shutdownState = processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj), + false /* shouldShutdown */, + true /* requireFresherSyncSource */, + false /* lastFetchedShouldAdvance */, + remoteRBID /* requiredRBID */); + + ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, shutdownState->getStatus()); + ASSERT_EQUALS(remoteRBID, shutdownState->getRBID()); ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); ASSERT(lastEnqueuedDocuments.empty()); @@ -889,7 +925,7 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead auto entry = makeNoopOplogEntry(lastFetched); rpc::OplogQueryMetadata oplogQueryMetadata( - {staleOpTime, staleWallTime}, lastFetched, rbid, primaryIndex, syncSourceIndex); + {staleOpTime, staleWallTime}, lastFetched, remoteRBID, primaryIndex, syncSourceIndex); auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oplogQueryMetadata); ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, @@ -935,7 +971,7 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsNotAheadWithoutRequiringFresherSyncSource) { CursorId cursorId = 0LL; rpc::OplogQueryMetadata oplogQueryMetadata( - {staleOpTime, staleWallTime}, lastFetched, rbid, 2, 2); + {staleOpTime, staleWallTime}, lastFetched, remoteRBID, 2, 2); auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oplogQueryMetadata); auto entry = makeNoopOplogEntry(lastFetched); @@ -1044,11 +1080,12 @@ TEST_F(OplogFetcherTest, RemoteFirstOplogEntryWithExtraFieldsReturnsOplogStartMi << "field"); _mockServer->insert(nss.ns(), remoteFirstOplogEntry); + auto shutdownState = processSingleBatch(makeFirstBatch(cursorId, {entry}, {metadataObj})); + // We should have parsed the OpTime correctly and realized that we have not fallen off the sync // source's oplog, so we should go into rollback. - ASSERT_EQUALS( - ErrorCodes::OplogStartMissing, - processSingleBatch(makeFirstBatch(cursorId, {entry}, {metadataObj}))->getStatus()); + ASSERT_EQUALS(ErrorCodes::OplogStartMissing, shutdownState->getStatus()); + ASSERT_EQUALS(remoteRBID, shutdownState->getRBID()); } TEST_F(OplogFetcherTest, FailingInitialCreateNewCursorNoRetriesShutsDownOplogFetcher) { @@ -1070,6 +1107,7 @@ TEST_F(OplogFetcherTest, FailingInitialCreateNewCursorWithRetriesShutsDownOplogF oplogFetcher->join(); ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, shutdownState.getStatus()); + ASSERT_EQUALS(ReplicationProcess::kUninitializedRollbackId, shutdownState.getRBID()); } TEST_F(OplogFetcherTest, @@ -1497,6 +1535,7 @@ TEST_F(OplogFetcherTest, OplogFetcherWorksWithoutExhaust) { oplogFetcher->join(); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus()); + ASSERT_EQUALS(remoteRBID, shutdownState.getRBID()); } TEST_F(OplogFetcherTest, CursorIsDeadShutsDownOplogFetcherWithSuccessfulStatus) { @@ -1528,6 +1567,7 @@ TEST_F(OplogFetcherTest, CursorIsDeadShutsDownOplogFetcherWithSuccessfulStatus) true /* skipFirstDoc */, firstBatch, oplogFetcher->getLastOpTimeFetched_forTest()); ASSERT_OK(shutdownState.getStatus()); + ASSERT_EQ(remoteRBID, shutdownState.getRBID()); } TEST_F(OplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithOplogStartMissingError) { @@ -1797,7 +1837,7 @@ TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetc TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithSyncSourceHavingNoSyncSourceStopsTheOplogFetcher) { rpc::OplogQueryMetadata oplogQueryMetadata( - {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, primaryIndex, -1); + {staleOpTime, staleWallTime}, remoteNewerOpTime, remoteRBID, primaryIndex, -1); testSyncSourceChecking(replSetMetadata, oplogQueryMetadata); // Sync source "hasSyncSource" is derived from metadata. diff --git a/src/mongo/db/repl/replication_process.h b/src/mongo/db/repl/replication_process.h index 82c298d363d..a6f2b019f3c 100644 --- a/src/mongo/db/repl/replication_process.h +++ b/src/mongo/db/repl/replication_process.h @@ -63,7 +63,7 @@ class ReplicationProcess { ReplicationProcess& operator=(const ReplicationProcess&) = delete; public: - static const int kUninitializedRollbackId = -1; + constexpr static int kUninitializedRollbackId = -1; // Operation Context binding. static ReplicationProcess* get(ServiceContext* service); diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp index 4b4a9a178a2..ba7f06c24ea 100644 --- a/src/mongo/db/repl/sync_source_resolver.cpp +++ b/src/mongo/db/repl/sync_source_resolver.cpp @@ -37,7 +37,6 @@ #include "mongo/db/jsobj.h" #include "mongo/db/repl/oplog_entry.h" -#include "mongo/db/repl/replication_process.h" #include "mongo/db/repl/sync_source_selector.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -372,6 +371,13 @@ void SyncSourceResolver::_firstOplogEntryFetcherCallback( return; } + // If we should not proceed with the rollback-via-refetch checks, we can safely return the + // candidate with an uninitialized rbid. + if (_requiredOpTime.isNull()) { + _finishCallback(candidate, ReplicationProcess::kUninitializedRollbackId).ignore(); + return; + } + auto status = _scheduleRBIDRequest(candidate, earliestOpTimeSeen); if (!status.isOK()) { _finishCallback(status).ignore(); @@ -434,18 +440,12 @@ void SyncSourceResolver::_rbidRequestCallback( return; } - if (!_requiredOpTime.isNull()) { - // Schedule fetcher to look for '_requiredOpTime' in the remote oplog. - // Unittest requires that this kind of failure be handled specially. - auto status = - _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen, rbid)); - if (!status.isOK()) { - _finishCallback(status).transitional_ignore(); - } - return; + // Schedule fetcher to look for '_requiredOpTime' in the remote oplog. + // Unittest requires that this kind of failure be handled specially. + auto status = _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen, rbid)); + if (!status.isOK()) { + _finishCallback(status).ignore(); } - - _finishCallback(candidate, rbid).ignore(); } Status SyncSourceResolver::_compareRequiredOpTimeWithQueryResponse( @@ -570,9 +570,7 @@ Status SyncSourceResolver::_chooseAndProbeNextSyncSource(OpTime earliestOpTimeSe Status SyncSourceResolver::_finishCallback(HostAndPort hostAndPort, int rbid) { SyncSourceResolverResponse response; response.syncSourceStatus = std::move(hostAndPort); - if (rbid != ReplicationProcess::kUninitializedRollbackId) { - response.rbid = rbid; - } + response.rbid = rbid; return _finishCallback(response); } diff --git a/src/mongo/db/repl/sync_source_resolver.h b/src/mongo/db/repl/sync_source_resolver.h index a98db3d8773..3cdeead4f65 100644 --- a/src/mongo/db/repl/sync_source_resolver.h +++ b/src/mongo/db/repl/sync_source_resolver.h @@ -37,6 +37,7 @@ #include "mongo/client/fetcher.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/optime.h" +#include "mongo/db/repl/replication_process.h" #include "mongo/executor/task_executor.h" #include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" @@ -73,8 +74,9 @@ struct SyncSourceResolverResponse { // Rollback ID of the selected sync source. // The rbid is fetched before the required optime so callers can be sure that as long as the - // rbid is the same, the required optime is still present. - int rbid; + // rbid is the same, the required optime is still present. The rbid will remain set to + // 'kUninitializedRollbackId' if _requiredOpTime is null. + int rbid = ReplicationProcess::kUninitializedRollbackId; bool isOK() { return syncSourceStatus.isOK(); diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp index 1e4380d2fca..2d6543c43cb 100644 --- a/src/mongo/db/repl/sync_source_resolver_test.cpp +++ b/src/mongo/db/repl/sync_source_resolver_test.cpp @@ -71,7 +71,7 @@ private: }; class SyncSourceResolverTest : public executor::ThreadPoolExecutorTest { -private: +public: void setUp() override; void tearDown() override; @@ -129,6 +129,19 @@ std::unique_ptr<SyncSourceResolver> SyncSourceResolverTest::_makeResolver( const NamespaceString nss("local.oplog.rs"); +const OpTime requiredOpTime(Timestamp(200, 1U), 1LL); +class SyncSourceResolverRequiredOpTimeTest : public SyncSourceResolverTest { +public: + void setUp() override; +}; + +void SyncSourceResolverRequiredOpTimeTest::setUp() { + SyncSourceResolverTest::setUp(); + // Initialize a resolver and set _requiredOpTime to null. This will prevent the sync source + // resolver from running the RBID and requiredOpTime checks. + _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); +} + BSONObj makeCursorResponse(CursorId cursorId, const NamespaceString& nss, std::vector<BSONObj> docs, @@ -348,7 +361,6 @@ TEST_F(SyncSourceResolverTest, _scheduleFirstOplogEntryFetcherResponse( getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 2)); - _scheduleRBIDResponse(getNet(), candidate1); _resolver->join(); ASSERT_FALSE(_resolver->isActive()); @@ -369,7 +381,6 @@ TEST_F(SyncSourceResolverTest, _scheduleFirstOplogEntryFetcherResponse( getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 2)); - _scheduleRBIDResponse(getNet(), candidate1); _resolver->join(); ASSERT_FALSE(_resolver->isActive()); @@ -420,7 +431,6 @@ TEST_F(SyncSourceResolverTest, _scheduleFirstOplogEntryFetcherResponse( getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2)); - _scheduleRBIDResponse(getNet(), candidate2); _resolver->join(); ASSERT_FALSE(_resolver->isActive()); @@ -486,7 +496,6 @@ TEST_F(SyncSourceResolverTest, _scheduleFirstOplogEntryFetcherResponse( getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2)); - _scheduleRBIDResponse(getNet(), candidate2); _resolver->join(); ASSERT_FALSE(_resolver->isActive()); @@ -552,7 +561,6 @@ TEST_F(SyncSourceResolverTest, _scheduleFirstOplogEntryFetcherResponse( getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2)); - _scheduleRBIDResponse(getNet(), candidate2); _resolver->join(); ASSERT_FALSE(_resolver->isActive()); @@ -577,7 +585,6 @@ TEST_F(SyncSourceResolverTest, _scheduleFirstOplogEntryFetcherResponse( getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2)); - _scheduleRBIDResponse(getNet(), candidate2); _resolver->join(); ASSERT_FALSE(_resolver->isActive()); @@ -603,7 +610,6 @@ TEST_F(SyncSourceResolverTest, _scheduleFirstOplogEntryFetcherResponse( getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2)); - _scheduleRBIDResponse(getNet(), candidate2); _resolver->join(); ASSERT_FALSE(_resolver->isActive()); @@ -629,7 +635,6 @@ TEST_F(SyncSourceResolverTest, _scheduleFirstOplogEntryFetcherResponse( getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2)); - _scheduleRBIDResponse(getNet(), candidate2); _resolver->join(); ASSERT_FALSE(_resolver->isActive()); @@ -650,8 +655,6 @@ TEST_F(SyncSourceResolverTest, SyncSourceResolverWillSucceedWithExtraFields) { {BSON("ts" << Timestamp(1, 1) << "t" << 1 << "note" << "a")}); - _scheduleRBIDResponse(getNet(), candidate1); - _resolver->join(); ASSERT_FALSE(_resolver->isActive()); ASSERT_EQUALS(candidate1, unittest::assertGet(_response.syncSourceStatus)); @@ -705,12 +708,8 @@ void _scheduleRequiredOpTimeFetcherResponse(executor::NetworkInterfaceMock* net, {_makeOplogEntry(requiredOpTime.getTimestamp(), requiredOpTime.getTerm())}); } -const OpTime requiredOpTime(Timestamp(200, 1U), 1LL); - -TEST_F(SyncSourceResolverTest, +TEST_F(SyncSourceResolverRequiredOpTimeTest, SyncSourceResolverWillCheckForRequiredOpTimeIfRequiredOpTimeIsProvided) { - _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); - HostAndPort candidate1("node1", 12345); _selector->setChooseNewSyncSourceResult_forTest(candidate1); ASSERT_OK(_resolver->startup()); @@ -731,10 +730,8 @@ TEST_F(SyncSourceResolverTest, ASSERT_EQ(_response.rbid, 7); } -TEST_F(SyncSourceResolverTest, +TEST_F(SyncSourceResolverRequiredOpTimeTest, SyncSourceResolverRejectsRemoteOpTimeWhenCheckingRequiredOpTimeIfRemoteTermIsUninitialized) { - _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); - HostAndPort candidate1("node1", 12345); HostAndPort candidate2("node2", 12345); _selector->setChooseNewSyncSourceResult_forTest(candidate1); @@ -770,7 +767,7 @@ TEST_F(SyncSourceResolverTest, } TEST_F( - SyncSourceResolverTest, + SyncSourceResolverRequiredOpTimeTest, SyncSourceResolverRejectsRemoteOpTimeWhenCheckingRequiredOpTimeIfRequiredOpTimesTermIsUninitialized) { auto requireOpTimeWithUninitializedTerm = OpTime(requiredOpTime.getTimestamp(), OpTime::kUninitializedTerm); @@ -806,10 +803,8 @@ TEST_F( ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus)); } -TEST_F(SyncSourceResolverTest, +TEST_F(SyncSourceResolverRequiredOpTimeTest, SyncSourceResolverWillTryOtherSourcesIfRequiredOpTimeIsNotFoundInRemoteOplog) { - _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); - HostAndPort candidate1("node1", 12345); HostAndPort candidate2("node2", 12345); _selector->setChooseNewSyncSourceResult_forTest(candidate1); @@ -840,10 +835,8 @@ TEST_F(SyncSourceResolverTest, ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus)); } -TEST_F(SyncSourceResolverTest, +TEST_F(SyncSourceResolverRequiredOpTimeTest, SyncSourceResolverWillTryOtherSourcesIfRequiredOpTimesTermIsNotFoundInRemoteOplog) { - _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); - HostAndPort candidate1("node1", 12345); HostAndPort candidate2("node2", 12345); _selector->setChooseNewSyncSourceResult_forTest(candidate1); @@ -878,10 +871,8 @@ TEST_F(SyncSourceResolverTest, ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus)); } -TEST_F(SyncSourceResolverTest, +TEST_F(SyncSourceResolverRequiredOpTimeTest, SyncSourceResolverReturnsScheduleErrorWhenSchedulingRBIDCommandFails) { - _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); - _shouldFailRequest = [](const executor::RemoteCommandRequest& request) { return request.cmdObj.firstElementFieldName() == "replSetGetRBID"_sd; }; @@ -899,10 +890,8 @@ TEST_F(SyncSourceResolverTest, ASSERT_EQUALS(ErrorCodes::OperationFailed, _response.syncSourceStatus); } -TEST_F(SyncSourceResolverTest, +TEST_F(SyncSourceResolverRequiredOpTimeTest, SyncSourceResolverReturnsScheduleErrorWhenSchedulingRequiredOpTimeFindCommandFails) { - _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); - _shouldFailRequest = [](const executor::RemoteCommandRequest& request) { // Fail find commands reading the oplog with filter containing a "ts" predicate. if (StringData{request.cmdObj.getStringField("find")} != @@ -934,10 +923,8 @@ TEST_F(SyncSourceResolverTest, } TEST_F( - SyncSourceResolverTest, + SyncSourceResolverRequiredOpTimeTest, SyncSourceResolverReturnsCallbackCanceledIfResolverIsShutdownAfterSchedulingRequiredOpTimeFetcher) { - _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); - HostAndPort candidate1("node1", 12345); _selector->setChooseNewSyncSourceResult_forTest(candidate1); ASSERT_OK(_resolver->startup()); @@ -957,10 +944,8 @@ TEST_F( } TEST_F( - SyncSourceResolverTest, + SyncSourceResolverRequiredOpTimeTest, SyncSourceResolverReturnsCallbackCanceledIfExecutorIsShutdownAfterSchedulingRequiredOpTimeFetcher) { - _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); - HostAndPort candidate1("node1", 12345); _selector->setChooseNewSyncSourceResult_forTest(candidate1); ASSERT_OK(_resolver->startup()); @@ -978,10 +963,8 @@ TEST_F( ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _response.syncSourceStatus); } -TEST_F(SyncSourceResolverTest, +TEST_F(SyncSourceResolverRequiredOpTimeTest, SyncSourceResolverReturnsEmptyHostIfNoViableHostExistsAfterNetworkErrorOnRBIDCommand) { - _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); - HostAndPort candidate1("node1", 12345); _selector->setChooseNewSyncSourceResult_forTest(candidate1); ASSERT_OK(_resolver->startup()); @@ -1002,10 +985,8 @@ TEST_F(SyncSourceResolverTest, ASSERT_EQUALS(HostAndPort(), unittest::assertGet(_response.syncSourceStatus)); } -TEST_F(SyncSourceResolverTest, +TEST_F(SyncSourceResolverRequiredOpTimeTest, SyncSourceResolverReturnsEmptyHostIfNoViableHostExistsAfterRBIDCommandNotOk) { - _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); - HostAndPort candidate1("node1", 12345); _selector->setChooseNewSyncSourceResult_forTest(candidate1); ASSERT_OK(_resolver->startup()); @@ -1033,10 +1014,8 @@ TEST_F(SyncSourceResolverTest, } TEST_F( - SyncSourceResolverTest, + SyncSourceResolverRequiredOpTimeTest, SyncSourceResolverReturnsEmptyHostIfNoViableHostExistsAfterNetworkErrorOnRequiredOpTimeCommand) { - _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); - HostAndPort candidate1("node1", 12345); _selector->setChooseNewSyncSourceResult_forTest(candidate1); ASSERT_OK(_resolver->startup()); |