diff options
author | A. Jesse Jiryu Davis <jesse@mongodb.com> | 2020-03-25 12:41:11 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-03-25 17:28:45 +0000 |
commit | b36c69c5930d25a8f5ae348a2b2fb24f27f925e6 (patch) | |
tree | 267ef20d6a33e8dccdca58ef91f9cab47b4870cc /src/mongo/db | |
parent | e8a9c1d087efa92501510d6ca340eec082300e6d (diff) | |
download | mongo-b36c69c5930d25a8f5ae348a2b2fb24f27f925e6.tar.gz |
SERVER-46120 Make OplogQueryMetadata non-optional
Diffstat (limited to 'src/mongo/db')
23 files changed, 84 insertions, 269 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 0d25c62ca9f..eed57307446 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -111,7 +111,7 @@ public: BackgroundSync* bgsync); bool shouldStopFetching(const HostAndPort& source, const rpc::ReplSetMetadata& replMetadata, - boost::optional<rpc::OplogQueryMetadata> oqMetadata) override; + const rpc::OplogQueryMetadata& oqMetadata) override; private: BackgroundSync* _bgsync; @@ -127,7 +127,7 @@ DataReplicatorExternalStateBackgroundSync::DataReplicatorExternalStateBackground bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching( const HostAndPort& source, const rpc::ReplSetMetadata& replMetadata, - boost::optional<rpc::OplogQueryMetadata> oqMetadata) { + const rpc::OplogQueryMetadata& oqMetadata) { if (_bgsync->shouldStopFetching()) { return true; } diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h index 51b7f65c20e..6215015be2c 100644 --- a/src/mongo/db/repl/data_replicator_external_state.h +++ b/src/mongo/db/repl/data_replicator_external_state.h @@ -94,12 +94,10 @@ public: * Evaluates quality of sync source. Accepts the current sync source; the last optime on this * sync source (from metadata); and whether this sync source has a sync source (also from * metadata). - * - * TODO (SERVER-27668): Make OplogQueryMetadata non-optional in mongodb 3.8. */ virtual bool shouldStopFetching(const HostAndPort& source, const rpc::ReplSetMetadata& replMetadata, - boost::optional<rpc::OplogQueryMetadata> oqMetadata) = 0; + const rpc::OplogQueryMetadata& oqMetadata) = 0; /** * This function creates an oplog buffer of the type specified at server startup. diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp index 4596bddc980..443cb43adbe 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp @@ -95,33 +95,19 @@ void DataReplicatorExternalStateImpl::processMetadata(const rpc::ReplSetMetadata bool DataReplicatorExternalStateImpl::shouldStopFetching( const HostAndPort& source, const rpc::ReplSetMetadata& replMetadata, - boost::optional<rpc::OplogQueryMetadata> oqMetadata) { + const rpc::OplogQueryMetadata& oqMetadata) { // Re-evaluate quality of sync target. if (_replicationCoordinator->shouldChangeSyncSource(source, replMetadata, oqMetadata)) { - // If OplogQueryMetadata was provided, its values were used to determine if we should - // change sync sources. - if (oqMetadata) { - LOGV2(21150, - "Canceling oplog query due to OplogQueryMetadata. We have to choose a new " - "sync source. Current source: {syncSource}, OpTime {lastAppliedOpTime}, " - "its sync source index:{syncSourceIndex}", - "Canceling oplog query due to OplogQueryMetadata. We have to choose a new " - "sync source", - "syncSource"_attr = source, - "lastAppliedOpTime"_attr = oqMetadata->getLastOpApplied(), - "syncSourceIndex"_attr = oqMetadata->getSyncSourceIndex()); - - } else { - LOGV2(21151, - "Canceling oplog query due to ReplSetMetadata. We have to choose a new sync " - "source. Current source: {syncSource}, OpTime {lastVisibleOpTime}, its " - "sync source index:{syncSourceIndex}", - "Canceling oplog query due to ReplSetMetadata. We have to choose a new sync " - "source", - "syncSource"_attr = source, - "lastVisibleOpTime"_attr = replMetadata.getLastOpVisible(), - "syncSourceIndex"_attr = replMetadata.getSyncSourceIndex()); - } + LOGV2(21150, + "Canceling oplog query due to OplogQueryMetadata. We have to choose a new " + "sync source. Current source: {syncSource}, OpTime {lastAppliedOpTime}, " + "its sync source index:{syncSourceIndex}", + "Canceling oplog query due to OplogQueryMetadata. We have to choose a new " + "sync source", + "syncSource"_attr = source, + "lastAppliedOpTime"_attr = oqMetadata.getLastOpApplied(), + "syncSourceIndex"_attr = oqMetadata.getSyncSourceIndex()); + return true; } return false; diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h index 4be02bc2b97..f866e9078f5 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.h +++ b/src/mongo/db/repl/data_replicator_external_state_impl.h @@ -56,7 +56,7 @@ public: bool shouldStopFetching(const HostAndPort& source, const rpc::ReplSetMetadata& replMetadata, - boost::optional<rpc::OplogQueryMetadata> oqMetadata) override; + const rpc::OplogQueryMetadata& oqMetadata) override; std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(OperationContext* opCtx) const override; diff --git a/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp b/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp index c6f573e5115..793f0cc5953 100644 --- a/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp @@ -40,8 +40,9 @@ DataReplicatorExternalStateInitialSync::DataReplicatorExternalStateInitialSync( : DataReplicatorExternalStateImpl(replicationCoordinator, replicationCoordinatorExternalState) { } -bool DataReplicatorExternalStateInitialSync::shouldStopFetching( - const HostAndPort&, const rpc::ReplSetMetadata&, boost::optional<rpc::OplogQueryMetadata>) { +bool DataReplicatorExternalStateInitialSync::shouldStopFetching(const HostAndPort&, + const rpc::ReplSetMetadata&, + const rpc::OplogQueryMetadata&) { // Since initial sync does not allow for sync source changes, it should not check if there are // better sync sources. If there is a problem on the sync source, it will manifest itself in the diff --git a/src/mongo/db/repl/data_replicator_external_state_initial_sync.h b/src/mongo/db/repl/data_replicator_external_state_initial_sync.h index 038accdd4ff..d35757e60d0 100644 --- a/src/mongo/db/repl/data_replicator_external_state_initial_sync.h +++ b/src/mongo/db/repl/data_replicator_external_state_initial_sync.h @@ -46,7 +46,7 @@ public: bool shouldStopFetching(const HostAndPort& source, const rpc::ReplSetMetadata& replMetadata, - boost::optional<rpc::OplogQueryMetadata> oqMetadata) override; + const rpc::OplogQueryMetadata& oqMetadata) override; }; } // namespace repl diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp index 10b4ae6fd3d..3a7124f4a4c 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp @@ -92,18 +92,10 @@ void DataReplicatorExternalStateMock::processMetadata(const rpc::ReplSetMetadata bool DataReplicatorExternalStateMock::shouldStopFetching( const HostAndPort& source, const rpc::ReplSetMetadata& replMetadata, - boost::optional<rpc::OplogQueryMetadata> oqMetadata) { + const rpc::OplogQueryMetadata& oqMetadata) { lastSyncSourceChecked = source; - - // If OplogQueryMetadata was provided, use its values, otherwise use the ones in - // ReplSetMetadata. - if (oqMetadata) { - syncSourceLastOpTime = oqMetadata->getLastOpApplied(); - syncSourceHasSyncSource = oqMetadata->getSyncSourceIndex() != -1; - } else { - syncSourceLastOpTime = replMetadata.getLastOpVisible(); - syncSourceHasSyncSource = replMetadata.getSyncSourceIndex() != -1; - } + syncSourceLastOpTime = oqMetadata.getLastOpApplied(); + syncSourceHasSyncSource = oqMetadata.getSyncSourceIndex() != -1; return shouldStopFetchingResult; } diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h index 7aee165ce85..55a353237e0 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.h +++ b/src/mongo/db/repl/data_replicator_external_state_mock.h @@ -53,7 +53,7 @@ public: bool shouldStopFetching(const HostAndPort& source, const rpc::ReplSetMetadata& replMetadata, - boost::optional<rpc::OplogQueryMetadata> oqMetadata) override; + const rpc::OplogQueryMetadata& oqMetadata) override; std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(OperationContext* opCtx) const override; diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 6344cefc503..a22e52b30b3 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -158,7 +158,7 @@ public: } bool shouldChangeSyncSource(const HostAndPort& currentSource, const rpc::ReplSetMetadata& replMetadata, - boost::optional<rpc::OplogQueryMetadata> oqMetadata) override { + const rpc::OplogQueryMetadata& oqMetadata) override { return _syncSourceSelector->shouldChangeSyncSource(currentSource, replMetadata, oqMetadata); } diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 58a874c3b1a..1aa4774f89d 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -129,22 +129,20 @@ Milliseconds calculateAwaitDataTimeout(const ReplSetConfig& config) { * oplog to be ahead of ours. If false, the sync source's oplog is allowed to be at the same point * as ours, but still cannot be behind ours. * - * TODO (SERVER-27668): Make remoteLastOpApplied, and remoteRBID non-optional. - * * Returns OplogStartMissing if we cannot find the optime of the last fetched operation in * the remote oplog. */ Status checkRemoteOplogStart(const OplogFetcher::Documents& documents, OpTime lastFetched, - boost::optional<OpTime> remoteLastOpApplied, + OpTime remoteLastOpApplied, int requiredRBID, - boost::optional<int> remoteRBID, + int remoteRBID, bool requireFresherSyncSource) { // Once we establish our cursor, we need to ensure that our upstream node hasn't rolled back // since that could cause it to not have our required minValid point. The cursor will be // killed if the upstream node rolls back so we don't need to keep checking once the cursor // is established. - if (remoteRBID && (*remoteRBID != requiredRBID)) { + if (remoteRBID != requiredRBID) { return Status(ErrorCodes::InvalidSyncSource, "Upstream node rolled back after choosing it as a sync source. Choosing " "new sync source."); @@ -153,10 +151,10 @@ Status checkRemoteOplogStart(const OplogFetcher::Documents& documents, // Sometimes our remoteLastOpApplied may be stale; if we received a document with an // opTime later than remoteLastApplied, we can assume the remote is at least up to that // opTime. - if (remoteLastOpApplied && !documents.empty()) { + if (!documents.empty()) { const auto docOpTime = OpTime::parseFromOplogEntry(documents.back()); if (docOpTime.isOK()) { - remoteLastOpApplied = std::max(*remoteLastOpApplied, docOpTime.getValue()); + remoteLastOpApplied = std::max(remoteLastOpApplied, docOpTime.getValue()); } } @@ -164,10 +162,10 @@ Status checkRemoteOplogStart(const OplogFetcher::Documents& documents, // failed to detect the rollback if it occurred between sync source selection (when we check the // candidate is ahead of us) and sync source resolution (when we got 'requiredRBID'). If the // sync source is now behind us, choose a new sync source to prevent going into rollback. - if (remoteLastOpApplied && (*remoteLastOpApplied < lastFetched)) { + if (remoteLastOpApplied < lastFetched) { return Status(ErrorCodes::InvalidSyncSource, str::stream() - << "Sync source's last applied OpTime " << remoteLastOpApplied->toString() + << "Sync source's last applied OpTime " << remoteLastOpApplied.toString() << " is older than our last fetched OpTime " << lastFetched.toString() << ". Choosing new sync source."); } @@ -181,12 +179,12 @@ Status checkRemoteOplogStart(const OplogFetcher::Documents& documents, // problematic to check this condition for initial sync, since the 'lastFetched' OpTime will // almost always equal the 'remoteLastApplied', since we fetch the sync source's last applied // OpTime to determine where to start our OplogFetcher. - if (requireFresherSyncSource && remoteLastOpApplied && *remoteLastOpApplied <= lastFetched) { + if (requireFresherSyncSource && remoteLastOpApplied <= lastFetched) { return Status(ErrorCodes::InvalidSyncSource, str::stream() << "Sync source must be ahead of me. My last fetched oplog optime: " << lastFetched.toString() << ", latest oplog optime of sync source: " - << remoteLastOpApplied->toString()); + << remoteLastOpApplied.toString()); } // At this point we know that our sync source has our minValid and is not behind us, so if our @@ -215,31 +213,6 @@ Status checkRemoteOplogStart(const OplogFetcher::Documents& documents, } return Status::OK(); } - -/** - * Parses the cursor's metadata response for the OplogQueryMetadata. If there is an error it returns - * it. If no OplogQueryMetadata is provided then it returns boost::none. - * - * OplogQueryMetadata is made optional for backwards compatibility. - * TODO SERVER-27668: Make this non-optional. When this stops being optional we can remove the - * duplicated fields in both metadata types and begin to always use OplogQueryMetadata's data. - */ -StatusWith<boost::optional<rpc::OplogQueryMetadata>> parseOplogQueryMetadata( - const BSONObj& metadata) { - boost::optional<rpc::OplogQueryMetadata> oqMetadata = boost::none; - - bool receivedOplogQueryMetadata = metadata.hasElement(rpc::kOplogQueryMetadataFieldName); - if (receivedOplogQueryMetadata) { - auto metadataResult = rpc::OplogQueryMetadata::readFromMetadata(metadata); - if (!metadataResult.isOK()) { - return metadataResult.getStatus(); - } - - oqMetadata = boost::make_optional(metadataResult.getValue()); - } - - return oqMetadata; -} } // namespace @@ -770,7 +743,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { LOGV2_DEBUG(21271, 2, "Oplog fetcher read 0 operations from remote oplog"); } - auto oqMetadataResult = parseOplogQueryMetadata(_metadataObj); + auto oqMetadataResult = rpc::OplogQueryMetadata::readFromMetadata(_metadataObj); if (!oqMetadataResult.isOK()) { LOGV2_ERROR(21278, "invalid oplog query metadata from sync source {syncSource}: " @@ -787,14 +760,11 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { auto lastFetched = _getLastOpTimeFetched(); if (_firstBatch) { - auto remoteRBID = oqMetadata ? boost::make_optional(oqMetadata->getRBID()) : boost::none; - auto remoteLastApplied = - oqMetadata ? boost::make_optional(oqMetadata->getLastOpApplied()) : boost::none; auto status = checkRemoteOplogStart(documents, lastFetched, - remoteLastApplied, + oqMetadata.getLastOpApplied(), _requiredRBID, - remoteRBID, + oqMetadata.getRBID(), _requireFresherSyncSource); if (!status.isOK()) { // Stop oplog fetcher and execute rollback if necessary. @@ -850,8 +820,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { // We will only ever have OplogQueryMetadata if we have ReplSetMetadata, so it is safe // to call processMetadata() in this if block. - invariant(oqMetadata); - _dataReplicatorExternalState->processMetadata(replSetMetadata, *oqMetadata); + _dataReplicatorExternalState->processMetadata(replSetMetadata, oqMetadata); } // Increment stats. We read all of the docs in the query. @@ -873,17 +842,9 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) { str::stream errMsg; errMsg << "sync source " << _source.toString(); errMsg << " (config version: " << replSetMetadata.getConfigVersion(); - // If OplogQueryMetadata was provided, its values were used to determine if we should - // stop fetching from this sync source. - if (oqMetadata) { - errMsg << "; last applied optime: " << oqMetadata->getLastOpApplied().toString(); - errMsg << "; sync source index: " << oqMetadata->getSyncSourceIndex(); - errMsg << "; primary index: " << oqMetadata->getPrimaryIndex(); - } else { - errMsg << "; last visible optime: " << replSetMetadata.getLastOpVisible().toString(); - errMsg << "; sync source index: " << replSetMetadata.getSyncSourceIndex(); - errMsg << "; primary index: " << replSetMetadata.getPrimaryIndex(); - } + errMsg << "; last applied optime: " << oqMetadata.getLastOpApplied().toString(); + errMsg << "; sync source index: " << oqMetadata.getSyncSourceIndex(); + errMsg << "; primary index: " << oqMetadata.getPrimaryIndex(); errMsg << ") is no longer valid"; return Status(ErrorCodes::InvalidSyncSource, errMsg); } @@ -932,7 +893,7 @@ bool OplogFetcher::OplogFetcherRestartDecisionDefault::shouldContinue(OplogFetch void OplogFetcher::OplogFetcherRestartDecisionDefault::fetchSuccessful(OplogFetcher* fetcher) { _numRestarts = 0; -}; +} OplogFetcher::OplogFetcherRestartDecision::~OplogFetcherRestartDecision(){}; diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index 0191fc2ada0..2c80bbd5931 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -306,8 +306,8 @@ protected: /** * Tests checkSyncSource result handling. */ - void testSyncSourceChecking(boost::optional<const rpc::ReplSetMetadata&> replMetadata, - boost::optional<const rpc::OplogQueryMetadata&> oqMetadata); + void testSyncSourceChecking(const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata); void validateLastBatch(bool skipFirstDoc, OplogFetcher::Documents docs, OpTime lastFetched); @@ -453,9 +453,8 @@ std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(const Messag return shutdownState; } -void OplogFetcherTest::testSyncSourceChecking( - boost::optional<const rpc::ReplSetMetadata&> replMetadata, - boost::optional<const rpc::OplogQueryMetadata&> oqMetadata) { +void OplogFetcherTest::testSyncSourceChecking(const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata) { auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()}); @@ -824,14 +823,12 @@ TEST_F(OplogFetcherTest, InvalidOplogQueryMetadataInResponseStopsTheOplogFetcher processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus()); } -DEATH_TEST_REGEX_F(OplogFetcherTest, - ValidMetadataInResponseWithoutOplogMetadataInvariants, - "Invariant failure.*oqMetadata") { +TEST_F(OplogFetcherTest, ValidMetadataInResponseWithoutOplogMetadataStopsTheOplogFetcher) { CursorId cursorId = 22LL; auto entry = makeNoopOplogEntry(lastFetched); auto metadataObj = makeOplogBatchMetadata(replSetMetadata, boost::none); - - processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj)); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, + processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus()); } TEST_F(OplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMetadataFn) { @@ -936,17 +933,6 @@ TEST_F(OplogFetcherTest, ASSERT(dataReplicatorExternalState->metadataWasProcessed); } -TEST_F(OplogFetcherTest, - MetadataWithoutOplogQueryMetadataIsNotProcessedOnBatchThatTriggersRollback) { - CursorId cursorId = 22LL; - auto metadataObj = makeOplogBatchMetadata(replSetMetadata, boost::none); - auto entry = makeNoopOplogEntry(Seconds(456)); - - ASSERT_EQUALS(ErrorCodes::OplogStartMissing, - processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus()); - ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); -} - TEST_F(OplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) { CursorId cursorId = 22LL; auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oqMetadata); @@ -958,14 +944,6 @@ TEST_F(OplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) { ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); } -TEST_F(OplogFetcherTest, EmptyMetadataIsNotProcessed) { - CursorId cursorId = 0LL; - auto entry = makeNoopOplogEntry(lastFetched); - - ASSERT_OK(processSingleBatch(makeFirstBatch(cursorId, {entry}, {}))->getStatus()); - ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); -} - TEST_F(OplogFetcherTest, FailingInitialCreateNewCursorNoRetriesShutsDownOplogFetcher) { ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, processSingleBatch(Message())->getStatus()); } @@ -1448,8 +1426,9 @@ TEST_F(OplogFetcherTest, CursorIsDeadShutsDownOplogFetcherWithSuccessfulStatus) TEST_F(OplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithOplogStartMissingError) { CursorId cursorId = 22LL; + auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oqMetadata); ASSERT_EQUALS(ErrorCodes::OplogStartMissing, - processSingleBatch(makeFirstBatch(cursorId, {}, {}))->getStatus()); + processSingleBatch(makeFirstBatch(cursorId, {}, {metadataObj}))->getStatus()); } TEST_F(OplogFetcherTest, MissingOpTimeInFirstDocumentCausesOplogFetcherToStopWithInvalidBSONError) { @@ -1695,16 +1674,6 @@ TEST_F(OplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromEnqueueDocument ASSERT_EQ(Status(ErrorCodes::InternalError, "my custom error"), shutdownState->getStatus()); } -TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFetcher) { - testSyncSourceChecking(boost::none, boost::none); - - // Sync source optime and "hasSyncSource" are not available if the response does not - // contain metadata. - ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked); - ASSERT_EQUALS(OpTime(), dataReplicatorExternalState->syncSourceLastOpTime); - ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource); -} - TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetcher) { testSyncSourceChecking(replSetMetadata, oqMetadata); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index e9eb073f8e7..80d548b7716 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -4471,10 +4471,9 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opC _reportUpstream_inlock(std::move(lock)); } -bool ReplicationCoordinatorImpl::shouldChangeSyncSource( - const HostAndPort& currentSource, - const rpc::ReplSetMetadata& replMetadata, - boost::optional<rpc::OplogQueryMetadata> oqMetadata) { +bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource, + const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata) { stdx::lock_guard<Latch> lock(_mutex); return _topCoord->shouldChangeSyncSource( currentSource, replMetadata, oqMetadata, _replExecutor->now()); diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 56cf436a91e..fc9e4a39888 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -276,10 +276,9 @@ public: virtual void resetLastOpTimesFromOplog(OperationContext* opCtx, DataConsistency consistency) override; - virtual bool shouldChangeSyncSource( - const HostAndPort& currentSource, - const rpc::ReplSetMetadata& replMetadata, - boost::optional<rpc::OplogQueryMetadata> oqMetadata) override; + virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, + const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata) override; virtual OpTime getLastCommittedOpTime() const override; virtual OpTimeAndWallTime getLastCommittedOpTimeAndWallTime() const override; diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 171db3035cd..6ddd4d4e84f 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -450,10 +450,9 @@ bool ReplicationCoordinatorMock::lastOpTimesWereReset() const { return _resetLastOpTimesCalled; } -bool ReplicationCoordinatorMock::shouldChangeSyncSource( - const HostAndPort& currentSource, - const rpc::ReplSetMetadata& replMetadata, - boost::optional<rpc::OplogQueryMetadata> oqMetadata) { +bool ReplicationCoordinatorMock::shouldChangeSyncSource(const HostAndPort& currentSource, + const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata) { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index e5a8b1edb7a..7a4b8db79a0 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -243,7 +243,7 @@ public: virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, const rpc::ReplSetMetadata& replMetadata, - boost::optional<rpc::OplogQueryMetadata> oqMetadata); + const rpc::OplogQueryMetadata& oqMetadata); virtual OpTime getLastCommittedOpTime() const; diff --git a/src/mongo/db/repl/replication_coordinator_noop.cpp b/src/mongo/db/repl/replication_coordinator_noop.cpp index 60334f18742..d0e52db43a2 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.cpp +++ b/src/mongo/db/repl/replication_coordinator_noop.cpp @@ -387,7 +387,7 @@ void ReplicationCoordinatorNoOp::resetLastOpTimesFromOplog(OperationContext*, Da bool ReplicationCoordinatorNoOp::shouldChangeSyncSource(const HostAndPort&, const rpc::ReplSetMetadata&, - boost::optional<rpc::OplogQueryMetadata>) { + const rpc::OplogQueryMetadata&) { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/repl/replication_coordinator_noop.h b/src/mongo/db/repl/replication_coordinator_noop.h index 89ec7039f39..382073c21fb 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.h +++ b/src/mongo/db/repl/replication_coordinator_noop.h @@ -208,7 +208,7 @@ public: bool shouldChangeSyncSource(const HostAndPort&, const rpc::ReplSetMetadata&, - boost::optional<rpc::OplogQueryMetadata>) final; + const rpc::OplogQueryMetadata&) final; OpTime getLastCommittedOpTime() const final; diff --git a/src/mongo/db/repl/sync_source_selector.h b/src/mongo/db/repl/sync_source_selector.h index c21a5e82a14..ab159a2a9d8 100644 --- a/src/mongo/db/repl/sync_source_selector.h +++ b/src/mongo/db/repl/sync_source_selector.h @@ -83,14 +83,10 @@ public: * source and only has data up to "myLastOpTime", returns true. * * "now" is used to skip over currently blacklisted sync sources. - * - * OplogQueryMetadata is optional for compatibility with 3.4 servers that do not know to - * send OplogQueryMetadata. - * TODO (SERVER-27668): Make OplogQueryMetadata non-optional in mongodb 3.8. */ virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, const rpc::ReplSetMetadata& replMetadata, - boost::optional<rpc::OplogQueryMetadata> oqMetadata) = 0; + const rpc::OplogQueryMetadata& oqMetadata) = 0; }; } // namespace repl diff --git a/src/mongo/db/repl/sync_source_selector_mock.cpp b/src/mongo/db/repl/sync_source_selector_mock.cpp index f55cb4bb9fe..67c3a44f67d 100644 --- a/src/mongo/db/repl/sync_source_selector_mock.cpp +++ b/src/mongo/db/repl/sync_source_selector_mock.cpp @@ -56,10 +56,9 @@ void SyncSourceSelectorMock::setChooseNewSyncSourceHook_forTest( _chooseNewSyncSourceHook = hook; } -bool SyncSourceSelectorMock::shouldChangeSyncSource( - const HostAndPort&, - const rpc::ReplSetMetadata&, - boost::optional<rpc::OplogQueryMetadata> oqMetadata) { +bool SyncSourceSelectorMock::shouldChangeSyncSource(const HostAndPort&, + const rpc::ReplSetMetadata&, + const rpc::OplogQueryMetadata& oqMetadata) { return false; } diff --git a/src/mongo/db/repl/sync_source_selector_mock.h b/src/mongo/db/repl/sync_source_selector_mock.h index 9bf7bf4daba..78c548ab702 100644 --- a/src/mongo/db/repl/sync_source_selector_mock.h +++ b/src/mongo/db/repl/sync_source_selector_mock.h @@ -55,7 +55,7 @@ public: void blacklistSyncSource(const HostAndPort& host, Date_t until) override; bool shouldChangeSyncSource(const HostAndPort&, const rpc::ReplSetMetadata&, - boost::optional<rpc::OplogQueryMetadata> oqMetadata) override; + const rpc::OplogQueryMetadata& oqMetadata) override; /** * Sets a function that will be run every time chooseNewSyncSource() is called. diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index c901b7098f8..e1ffa9a6339 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -2882,11 +2882,10 @@ long long TopologyCoordinator::getTerm() const { // TODO(siyuan): Merge _hddata into _slaveInfo, so that we have a single view of the // replset. Passing metadata is unnecessary. -bool TopologyCoordinator::shouldChangeSyncSource( - const HostAndPort& currentSource, - const rpc::ReplSetMetadata& replMetadata, - boost::optional<rpc::OplogQueryMetadata> oqMetadata, - Date_t now) const { +bool TopologyCoordinator::shouldChangeSyncSource(const HostAndPort& currentSource, + const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata, + Date_t now) const { // Methodology: // If there exists a viable sync source member other than currentSource, whose oplog has // reached an optime greater than _options.maxSyncSourceLagSecs later than currentSource's, @@ -2940,30 +2939,14 @@ bool TopologyCoordinator::shouldChangeSyncSource( invariant(currentSourceIndex != _selfIndex); - // If OplogQueryMetadata was provided, use its values, otherwise use the ones in - // ReplSetMetadata. - OpTime currentSourceOpTime; - int syncSourceIndex = -1; - int primaryIndex = -1; - if (oqMetadata) { - currentSourceOpTime = - std::max(oqMetadata->getLastOpApplied(), - _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime()); - syncSourceIndex = oqMetadata->getSyncSourceIndex(); - primaryIndex = oqMetadata->getPrimaryIndex(); - } else { - currentSourceOpTime = - std::max(replMetadata.getLastOpVisible(), - _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime()); - syncSourceIndex = replMetadata.getSyncSourceIndex(); - primaryIndex = replMetadata.getPrimaryIndex(); - } + OpTime currentSourceOpTime = + std::max(oqMetadata.getLastOpApplied(), + _memberData.at(currentSourceIndex).getHeartbeatAppliedOpTime()); - if (currentSourceOpTime.isNull()) { - // Haven't received a heartbeat from the sync source yet, so can't tell if we should - // change. - return false; - } + fassert(4612000, !currentSourceOpTime.isNull()); + + int syncSourceIndex = oqMetadata.getSyncSourceIndex(); + int primaryIndex = oqMetadata.getPrimaryIndex(); // Change sync source if they are not ahead of us, and don't have a sync source, // unless they are primary. diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 8b1444c8d45..0e9f90aef21 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -228,12 +228,10 @@ public: * ("syncSourceHasSyncSource" is false), and only has data up to "myLastOpTime", returns true. * * "now" is used to skip over currently blacklisted sync sources. - * - * TODO (SERVER-27668): Make OplogQueryMetadata non-optional in mongodb 3.8. */ bool shouldChangeSyncSource(const HostAndPort& currentSource, const rpc::ReplSetMetadata& replMetadata, - boost::optional<rpc::OplogQueryMetadata> oqMetadata, + const rpc::OplogQueryMetadata& oqMetadata, Date_t now) const; /** diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp index 7363a4e1385..88f4ee2b3d7 100644 --- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp @@ -988,10 +988,6 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc HostAndPort("h2"), makeReplSetMetadata(), makeOplogQueryMetadata(oldOpTime), now())); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( HostAndPort("h3"), makeReplSetMetadata(), makeOplogQueryMetadata(newOpTime), now())); - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("h2"), makeReplSetMetadata(oldOpTime), boost::none, now())); - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("h3"), makeReplSetMetadata(newOpTime), boost::none, now())); getTopoCoord().chooseNewSyncSource(now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration, @@ -4095,13 +4091,6 @@ TEST_F(HeartbeatResponseTestV1, now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source")); - - // set up complete, time for actual check - startCapturingLogMessages(); - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), makeReplSetMetadata(lastOpTimeApplied), boost::none, now())); - stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source")); } TEST_F(HeartbeatResponseTestV1, @@ -4120,8 +4109,6 @@ TEST_F(HeartbeatResponseTestV1, makeReplSetMetadata(), makeOplogQueryMetadata(lastOpTimeApplied, 1), now())); - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), makeReplSetMetadata(lastOpTimeApplied, 1), boost::none, now())); // Show that we also like host2 while it has a sync source. nextAction = receiveUpHeartbeat( @@ -4132,8 +4119,6 @@ TEST_F(HeartbeatResponseTestV1, makeReplSetMetadata(), makeOplogQueryMetadata(lastOpTimeApplied, 2, 2), now())); - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), makeReplSetMetadata(lastOpTimeApplied, 2, 2), boost::none, now())); // Show that we do not like it when it is not PRIMARY and lacks a sync source and lacks progress // beyond our own. @@ -4144,8 +4129,6 @@ TEST_F(HeartbeatResponseTestV1, makeReplSetMetadata(), makeOplogQueryMetadata(lastOpTimeApplied), now())); - ASSERT(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), makeReplSetMetadata(lastOpTimeApplied), boost::none, now())); // Sometimes the heartbeat is stale and the metadata says it's the primary. Trust the metadata. ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( @@ -4154,11 +4137,6 @@ TEST_F(HeartbeatResponseTestV1, makeOplogQueryMetadata( lastOpTimeApplied, 1 /* host2 is primary */, -1 /* no sync source */), now())); - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), - makeReplSetMetadata(lastOpTimeApplied, 1 /* host2 is primary */, -1 /* no sync source */), - boost::none, - now())); // But if it is secondary and has some progress beyond our own, we still like it. OpTime newerThanLastOpTimeApplied = OpTime(Timestamp(500, 0), 0); @@ -4173,8 +4151,6 @@ TEST_F(HeartbeatResponseTestV1, makeReplSetMetadata(), makeOplogQueryMetadata(newerThanLastOpTimeApplied), now())); - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), makeReplSetMetadata(newerThanLastOpTimeApplied), boost::none, now())); } TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsDown) { @@ -4211,9 +4187,6 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsDown makeReplSetMetadata(), makeOplogQueryMetadata(syncSourceOpTime), now())); - - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), makeReplSetMetadata(syncSourceOpTime), boost::none, now())); } TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhileFresherMemberIsBlackListed) { @@ -4244,8 +4217,6 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhileFresherMemberIsBla makeReplSetMetadata(), makeOplogQueryMetadata(syncSourceOpTime), now())); - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), makeReplSetMetadata(syncSourceOpTime), boost::none, now())); // unblacklist with too early a time (node should remained blacklisted) getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(90)); @@ -4253,8 +4224,6 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhileFresherMemberIsBla makeReplSetMetadata(), makeOplogQueryMetadata(syncSourceOpTime), now())); - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), makeReplSetMetadata(syncSourceOpTime), boost::none, now())); // unblacklist and it should succeed getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100)); @@ -4265,12 +4234,6 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhileFresherMemberIsBla now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source")); - - startCapturingLogMessages(); - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), makeReplSetMetadata(syncSourceOpTime), boost::none, now())); - stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source")); } TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfNodeIsFreshByHeartbeatButNotMetadata) { @@ -4299,12 +4262,6 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfNodeIsFreshByHeartbea now())); stopCapturingLogMessages(); ASSERT_EQUALS(0, countLogLinesContaining("Choosing new sync source")); - - startCapturingLogMessages(); - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), makeReplSetMetadata(lastOpTimeApplied), boost::none, now())); - stopCapturingLogMessages(); - ASSERT_EQUALS(0, countLogLinesContaining("Choosing new sync source")); } TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfNodeIsStaleByHeartbeatButNotMetadata) { @@ -4334,12 +4291,6 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfNodeIsStaleByHeartbea now())); stopCapturingLogMessages(); ASSERT_EQUALS(0, countLogLinesContaining("Choosing new sync source")); - // set up complete, time for actual check - startCapturingLogMessages(); - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), makeReplSetMetadata(fresherLastOpTimeApplied), boost::none, now())); - stopCapturingLogMessages(); - ASSERT_EQUALS(0, countLogLinesContaining("Choosing new sync source")); } TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenFresherMemberExists) { @@ -4367,20 +4318,16 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenFresherMemberExists) { now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source")); - - // set up complete, time for actual check - startCapturingLogMessages(); - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), makeReplSetMetadata(lastOpTimeApplied), boost::none, now())); - stopCapturingLogMessages(); - ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source")); } TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenMemberHasYetToHeartbeatUs) { // In this test, the TopologyCoordinator should not tell us to change sync sources away from // "host2" since we do not use the member's heartbeatdata in pv1. - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), makeReplSetMetadata(), makeOplogQueryMetadata(), now())); + OpTime lastOpTimeApplied = OpTime(Timestamp(4, 0), 0); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + makeReplSetMetadata(), + makeOplogQueryMetadata(lastOpTimeApplied), + now())); } TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenMemberNotInConfig) { @@ -6870,9 +6817,6 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberDoesNo makeReplSetMetadata(), makeOplogQueryMetadata(syncSourceOpTime), now())); - // set up complete, time for actual check - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), makeReplSetMetadata(syncSourceOpTime), boost::none, now())); } TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsNotReadable) { @@ -6901,10 +6845,6 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsNotR makeReplSetMetadata(), makeOplogQueryMetadata(syncSourceOpTime), now())); - - // set up complete, time for actual check - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), makeReplSetMetadata(syncSourceOpTime), boost::none, now())); } TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfSyncSourceHasDifferentConfigVersion) { @@ -6938,11 +6878,6 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfSyncSourceHasDifferen makeReplSetMetadata(OpTime(), -1, -1, 8 /* different config version */), makeOplogQueryMetadata(syncSourceOpTime), now())); - ASSERT_FALSE( - getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), - makeReplSetMetadata(syncSourceOpTime, -1, -1, 8), - boost::none, - now())); } class HeartbeatResponseTestOneRetryV1 : public HeartbeatResponseTestV1 { |