diff options
author | Judah Schvimer <judah@mongodb.com> | 2017-02-15 11:49:36 -0500 |
---|---|---|
committer | Judah Schvimer <judah@mongodb.com> | 2017-02-15 11:49:36 -0500 |
commit | 7284884f3d8f3cf1d1489579180c2637efcc42b2 (patch) | |
tree | 7e2749c6940232f2a70a182e6856bfb0c97f2454 /src/mongo/db | |
parent | f6006942e76377c9434a61e76a7803eb83430591 (diff) | |
download | mongo-7284884f3d8f3cf1d1489579180c2637efcc42b2.tar.gz |
SERVER-27543 Process OplogQueryMetadata with backwards and forwards compatibility
Diffstat (limited to 'src/mongo/db')
28 files changed, 657 insertions, 288 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index a4943f3f8a4..1874e73474b 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -80,7 +80,8 @@ public: ReplicationCoordinatorExternalState* replicationCoordinatorExternalState, BackgroundSync* bgsync); bool shouldStopFetching(const HostAndPort& source, - const rpc::ReplSetMetadata& metadata) override; + const rpc::ReplSetMetadata& replMetadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata) override; private: BackgroundSync* _bgsync; @@ -94,12 +95,14 @@ DataReplicatorExternalStateBackgroundSync::DataReplicatorExternalStateBackground _bgsync(bgsync) {} bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching( - const HostAndPort& source, const rpc::ReplSetMetadata& metadata) { + const HostAndPort& source, + const rpc::ReplSetMetadata& replMetadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata) { if (_bgsync->shouldStopFetching()) { return true; } - return DataReplicatorExternalStateImpl::shouldStopFetching(source, metadata); + return DataReplicatorExternalStateImpl::shouldStopFetching(source, replMetadata, oqMetadata); } size_t getSize(const BSONObj& o) { diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h index f0ae73e61ec..44f34ee208c 100644 --- a/src/mongo/db/repl/data_replicator_external_state.h +++ b/src/mongo/db/repl/data_replicator_external_state.h @@ -35,6 +35,7 @@ #include "mongo/db/repl/optime.h" #include "mongo/db/repl/optime_with.h" #include "mongo/db/repl/replica_set_config.h" +#include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" @@ -88,16 +89,22 @@ public: /** * Forwards the parsed metadata in the query results to the replication system. + * + * TODO (SERVER-27668): Make OplogQueryMetadata non-optional in mongodb 3.8. */ - virtual void processMetadata(const rpc::ReplSetMetadata& metadata) = 0; + virtual void processMetadata(const rpc::ReplSetMetadata& replMetadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata) = 0; /** * Evaluates quality of sync source. Accepts the current sync source; the last optime on this * sync source (from metadata); and whether this sync source has a sync source (also from * metadata). + * + * TODO (SERVER-27668): Make OplogQueryMetadata non-optional in mongodb 3.8. */ virtual bool shouldStopFetching(const HostAndPort& source, - const rpc::ReplSetMetadata& metadata) = 0; + const rpc::ReplSetMetadata& replMetadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata) = 0; /** * This function creates an oplog buffer of the type specified at server startup. diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp index 238f4717aa6..ab166f3bdc2 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp @@ -60,21 +60,46 @@ OpTimeWithTerm DataReplicatorExternalStateImpl::getCurrentTermAndLastCommittedOp return {_replicationCoordinator->getTerm(), _replicationCoordinator->getLastCommittedOpTime()}; } -void DataReplicatorExternalStateImpl::processMetadata(const rpc::ReplSetMetadata& metadata) { - _replicationCoordinator->processReplSetMetadata(metadata, true /*advance the commit point*/); - if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) { +void DataReplicatorExternalStateImpl::processMetadata( + const rpc::ReplSetMetadata& replMetadata, boost::optional<rpc::OplogQueryMetadata> oqMetadata) { + OpTime newCommitPoint; + // If OplogQueryMetadata was provided, use its values, otherwise use the ones in + // ReplSetMetadata. + if (oqMetadata) { + newCommitPoint = oqMetadata->getLastOpCommitted(); + } else { + newCommitPoint = replMetadata.getLastOpCommitted(); + } + _replicationCoordinator->advanceCommitPoint(newCommitPoint); + + _replicationCoordinator->processReplSetMetadata(replMetadata); + + if ((oqMetadata && (oqMetadata->getPrimaryIndex() != rpc::OplogQueryMetadata::kNoPrimary)) || + (replMetadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary)) { _replicationCoordinator->cancelAndRescheduleElectionTimeout(); } } -bool DataReplicatorExternalStateImpl::shouldStopFetching(const HostAndPort& source, - const rpc::ReplSetMetadata& metadata) { +bool DataReplicatorExternalStateImpl::shouldStopFetching( + const HostAndPort& source, + const rpc::ReplSetMetadata& replMetadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata) { // Re-evaluate quality of sync target. - if (_replicationCoordinator->shouldChangeSyncSource(source, metadata)) { - log() - << "Canceling oplog query because we have to choose a new sync source. Current source: " - << source << ", OpTime " << metadata.getLastOpVisible() - << ", its sync source index:" << metadata.getSyncSourceIndex(); + if (_replicationCoordinator->shouldChangeSyncSource(source, replMetadata, oqMetadata)) { + // If OplogQueryMetadata was provided, its values were used to determine if we should + // change sync sources. + if (oqMetadata) { + log() << "Canceling oplog query due to OplogQueryMetadata. We have to choose a new " + "sync source. Current source: " + << source << ", OpTime " << oqMetadata->getLastOpApplied() + << ", its sync source index:" << oqMetadata->getSyncSourceIndex(); + + } else { + log() << "Canceling oplog query due to ReplSetMetadata. We have to choose a new sync " + "source. Current source: " + << source << ", OpTime " << replMetadata.getLastOpVisible() + << ", its sync source index:" << replMetadata.getSyncSourceIndex(); + } return true; } return false; diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h index 19a08b6c702..d4ce61119a6 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.h +++ b/src/mongo/db/repl/data_replicator_external_state_impl.h @@ -52,10 +52,12 @@ public: OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override; - void processMetadata(const rpc::ReplSetMetadata& metadata) override; + void processMetadata(const rpc::ReplSetMetadata& replMetadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata) override; bool shouldStopFetching(const HostAndPort& source, - const rpc::ReplSetMetadata& metadata) override; + const rpc::ReplSetMetadata& replMetadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata) override; std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(OperationContext* txn) const override; diff --git a/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp b/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp index 1337b170f32..8cb6a42293c 100644 --- a/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp @@ -39,8 +39,8 @@ DataReplicatorExternalStateInitialSync::DataReplicatorExternalStateInitialSync( : DataReplicatorExternalStateImpl(replicationCoordinator, replicationCoordinatorExternalState) { } -bool DataReplicatorExternalStateInitialSync::shouldStopFetching(const HostAndPort&, - const rpc::ReplSetMetadata&) { +bool DataReplicatorExternalStateInitialSync::shouldStopFetching( + const HostAndPort&, const rpc::ReplSetMetadata&, boost::optional<rpc::OplogQueryMetadata>) { // Since initial sync does not allow for sync source changes, it should not check if there are // better sync sources. If there is a problem on the sync source, it will manifest itself in the diff --git a/src/mongo/db/repl/data_replicator_external_state_initial_sync.h b/src/mongo/db/repl/data_replicator_external_state_initial_sync.h index 6ee2ce75fe4..e36fa710f47 100644 --- a/src/mongo/db/repl/data_replicator_external_state_initial_sync.h +++ b/src/mongo/db/repl/data_replicator_external_state_initial_sync.h @@ -44,7 +44,8 @@ public: ReplicationCoordinatorExternalState* replicationCoordinatorExternalState); bool shouldStopFetching(const HostAndPort& source, - const rpc::ReplSetMetadata& metadata) override; + const rpc::ReplSetMetadata& replMetadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata) override; }; } // namespace repl diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp index 45ec02b0ad2..eeed8c240f3 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp @@ -53,16 +53,30 @@ OpTimeWithTerm DataReplicatorExternalStateMock::getCurrentTermAndLastCommittedOp return {currentTerm, lastCommittedOpTime}; } -void DataReplicatorExternalStateMock::processMetadata(const rpc::ReplSetMetadata& metadata) { - metadataProcessed = metadata; +void DataReplicatorExternalStateMock::processMetadata( + const rpc::ReplSetMetadata& replMetadata, boost::optional<rpc::OplogQueryMetadata> oqMetadata) { + replMetadataProcessed = replMetadata; + if (oqMetadata) { + oqMetadataProcessed = oqMetadata.get(); + } metadataWasProcessed = true; } -bool DataReplicatorExternalStateMock::shouldStopFetching(const HostAndPort& source, - const rpc::ReplSetMetadata& metadata) { +bool DataReplicatorExternalStateMock::shouldStopFetching( + const HostAndPort& source, + const rpc::ReplSetMetadata& replMetadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata) { lastSyncSourceChecked = source; - syncSourceLastOpTime = metadata.getLastOpVisible(); - syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1; + + // If OplogQueryMetadata was provided, use its values, otherwise use the ones in + // ReplSetMetadata. + if (oqMetadata) { + syncSourceLastOpTime = oqMetadata->getLastOpApplied(); + syncSourceHasSyncSource = oqMetadata->getSyncSourceIndex() != -1; + } else { + syncSourceLastOpTime = replMetadata.getLastOpVisible(); + syncSourceHasSyncSource = replMetadata.getSyncSourceIndex() != -1; + } return shouldStopFetchingResult; } diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h index 423e8f401dc..1e32647a518 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.h +++ b/src/mongo/db/repl/data_replicator_external_state_mock.h @@ -49,10 +49,12 @@ public: OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() override; - void processMetadata(const rpc::ReplSetMetadata& metadata) override; + void processMetadata(const rpc::ReplSetMetadata& metadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata) override; bool shouldStopFetching(const HostAndPort& source, - const rpc::ReplSetMetadata& metadata) override; + const rpc::ReplSetMetadata& replMetadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata) override; std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(OperationContext* txn) const override; @@ -71,7 +73,8 @@ public: OpTime lastCommittedOpTime; // Set by processMetadata. - rpc::ReplSetMetadata metadataProcessed; + rpc::ReplSetMetadata replMetadataProcessed; + rpc::OplogQueryMetadata oqMetadataProcessed; bool metadataWasProcessed = false; // Set by shouldStopFetching. diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index 4f841f2f2b7..790dfede040 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -171,8 +171,9 @@ public: _syncSourceSelector->blacklistSyncSource(host, until); } bool shouldChangeSyncSource(const HostAndPort& currentSource, - const rpc::ReplSetMetadata& metadata) override { - return _syncSourceSelector->shouldChangeSyncSource(currentSource, metadata); + const rpc::ReplSetMetadata& replMetadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata) override { + return _syncSourceSelector->shouldChangeSyncSource(currentSource, replMetadata, oqMetadata); } void scheduleNetworkResponse(std::string cmdName, const BSONObj& obj) { diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index df08e1b2174..5378689027e 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -184,6 +184,30 @@ Status checkRemoteOplogStart(const Fetcher::Documents& documents, OpTimeWithHash return Status::OK(); } +/** + * Parses a QueryResponse for the OplogQueryMetadata. If there is an error it returns it. If + * no OplogQueryMetadata is provided then it returns boost::none. + * + * OplogQueryMetadata is made optional for backwards compatibility. + * TODO (SERVER-27668): Make this non-optional in mongodb 3.8. When this stops being optional + * we can remove the duplicated fields in both metadata types and begin to always use + * OplogQueryMetadata's data. + */ +StatusWith<boost::optional<rpc::OplogQueryMetadata>> parseOplogQueryMetadata( + Fetcher::QueryResponse queryResponse) { + boost::optional<rpc::OplogQueryMetadata> oqMetadata = boost::none; + bool receivedOplogQueryMetadata = + queryResponse.otherFields.metadata.hasElement(rpc::kOplogQueryMetadataFieldName); + if (receivedOplogQueryMetadata) { + const auto& metadataObj = queryResponse.otherFields.metadata; + auto metadataResult = rpc::OplogQueryMetadata::readFromMetadata(metadataObj); + if (!metadataResult.isOK()) { + return metadataResult.getStatus(); + } + oqMetadata = boost::make_optional<rpc::OplogQueryMetadata>(metadataResult.getValue()); + } + return oqMetadata; +} } // namespace StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments( @@ -445,6 +469,15 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, auto opTimeWithHash = getLastOpTimeWithHashFetched(); + auto oqMetadataResult = parseOplogQueryMetadata(queryResponse); + if (!oqMetadataResult.isOK()) { + error() << "invalid oplog query metadata from sync source " << _fetcher->getSource() << ": " + << oqMetadataResult.getStatus() << ": " << queryResponse.otherFields.metadata; + _finishCallback(oqMetadataResult.getStatus()); + return; + } + auto oqMetadata = oqMetadataResult.getValue(); + // Check start of remote oplog and, if necessary, stop fetcher to execute rollback. if (queryResponse.first) { auto status = checkRemoteOplogStart(documents, opTimeWithHash); @@ -471,10 +504,10 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, // Process replset metadata. It is important that this happen after we've validated the // first batch, so we don't progress our knowledge of the commit point from a // response that triggers a rollback. - rpc::ReplSetMetadata metadata; - bool receivedMetadata = + rpc::ReplSetMetadata replSetMetadata; + bool receivedReplMetadata = queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName); - if (receivedMetadata) { + if (receivedReplMetadata) { const auto& metadataObj = queryResponse.otherFields.metadata; auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(metadataObj); if (!metadataResult.isOK()) { @@ -483,8 +516,11 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, _finishCallback(metadataResult.getStatus()); return; } - metadata = metadataResult.getValue(); - _dataReplicatorExternalState->processMetadata(metadata); + replSetMetadata = metadataResult.getValue(); + + // We will only ever have OplogQueryMetadata if we have ReplSetMetadata, so it is safe + // to call processMetadata() in this if block. + _dataReplicatorExternalState->processMetadata(replSetMetadata, oqMetadata); } // Increment stats. We read all of the docs in the query. @@ -511,19 +547,24 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, _lastFetched = opTimeWithHash; } - if (_dataReplicatorExternalState->shouldStopFetching(_fetcher->getSource(), metadata)) { - _finishCallback(Status(ErrorCodes::InvalidSyncSource, - str::stream() << "sync source " << _fetcher->getSource().toString() - << " (last visible optime: " - << metadata.getLastOpVisible().toString() - << "; config version: " - << metadata.getConfigVersion() - << "; sync source index: " - << metadata.getSyncSourceIndex() - << "; primary index: " - << metadata.getPrimaryIndex() - << ") is no longer valid"), - opTimeWithHash); + if (_dataReplicatorExternalState->shouldStopFetching( + _fetcher->getSource(), replSetMetadata, oqMetadata)) { + str::stream errMsg; + errMsg << "sync source " << _fetcher->getSource().toString(); + errMsg << " (config version: " << replSetMetadata.getConfigVersion(); + // If OplogQueryMetadata was provided, its values were used to determine if we should + // stop fetching from this sync source. + if (oqMetadata) { + errMsg << "; last applied optime: " << oqMetadata->getLastOpApplied().toString(); + errMsg << "; sync source index: " << oqMetadata->getSyncSourceIndex(); + errMsg << "; primary index: " << oqMetadata->getPrimaryIndex(); + } else { + errMsg << "; last visible optime: " << replSetMetadata.getLastOpVisible().toString(); + errMsg << "; sync source index: " << replSetMetadata.getSyncSourceIndex(); + errMsg << "; primary index: " << replSetMetadata.getPrimaryIndex(); + } + errMsg << ") is no longer valid"; + _finishCallback(Status(ErrorCodes::InvalidSyncSource, errMsg), opTimeWithHash); return; } diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index 5c6db18013e..a94dac92122 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -97,7 +97,8 @@ protected: /** * Tests checkSyncSource result handling. */ - void testSyncSourceChecking(rpc::ReplSetMetadata* metadata); + void testSyncSourceChecking(rpc::ReplSetMetadata* replMetadata, + rpc::OplogQueryMetadata* oqMetadata); /** * Tests handling of two batches of operations returned from query. @@ -389,7 +390,7 @@ TEST_F( _checkDefaultCommandObjectFields(cmdObj); } -TEST_F(OplogFetcherTest, MetadataObjectContainsReplSetMetadataFieldUnderProtocolVersion1) { +TEST_F(OplogFetcherTest, MetadataObjectContainsMetadataFieldsUnderProtocolVersion1) { auto metadataObj = OplogFetcher(&getExecutor(), lastFetched, source, @@ -537,7 +538,7 @@ BSONObj makeCursorResponse(CursorId cursorId, return bob.obj(); } -TEST_F(OplogFetcherTest, InvalidMetadataInResponseStopsTheOplogFetcher) { +TEST_F(OplogFetcherTest, InvalidReplSetMetadataInResponseStopsTheOplogFetcher) { auto shutdownState = processSingleBatch( {makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), BSON(rpc::kReplSetMetadataFieldName << BSON("invalid_repl_metadata_field" << 1)), @@ -546,7 +547,17 @@ TEST_F(OplogFetcherTest, InvalidMetadataInResponseStopsTheOplogFetcher) { ASSERT_EQUALS(ErrorCodes::NoSuchKey, shutdownState->getStatus()); } -TEST_F(OplogFetcherTest, VaidMetadataInResponseShouldBeForwardedToProcessMetadataFn) { +TEST_F(OplogFetcherTest, InvalidOplogQueryMetadataInResponseStopsTheOplogFetcher) { + auto shutdownState = processSingleBatch( + {makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), + BSON(rpc::kOplogQueryMetadataFieldName << BSON("invalid_oq_metadata_field" << 1)), + Milliseconds(0)}); + + ASSERT_EQUALS(ErrorCodes::NoSuchKey, shutdownState->getStatus()); +} + +TEST_F(OplogFetcherTest, + ValidMetadataInResponseWithoutOplogMetadataShouldBeForwardedToProcessMetadataFn) { rpc::ReplSetMetadata metadata(1, lastFetched.opTime, lastFetched.opTime, 1, OID::gen(), 2, 2); BSONObjBuilder bob; ASSERT_OK(metadata.writeToMetadata(&bob)); @@ -557,10 +568,30 @@ TEST_F(OplogFetcherTest, VaidMetadataInResponseShouldBeForwardedToProcessMetadat ->getStatus()); ASSERT_TRUE(dataReplicatorExternalState->metadataWasProcessed); ASSERT_EQUALS(metadata.getPrimaryIndex(), - dataReplicatorExternalState->metadataProcessed.getPrimaryIndex()); + dataReplicatorExternalState->replMetadataProcessed.getPrimaryIndex()); + ASSERT_EQUALS(-1, dataReplicatorExternalState->oqMetadataProcessed.getPrimaryIndex()); } -TEST_F(OplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) { +TEST_F(OplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMetadataFn) { + rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); + rpc::OplogQueryMetadata oqMetadata(lastFetched.opTime, lastFetched.opTime, 1, 2, 2); + BSONObjBuilder bob; + ASSERT_OK(replMetadata.writeToMetadata(&bob)); + ASSERT_OK(oqMetadata.writeToMetadata(&bob)); + auto metadataObj = bob.obj(); + ASSERT_OK(processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), + metadataObj, + Milliseconds(0)}) + ->getStatus()); + ASSERT_TRUE(dataReplicatorExternalState->metadataWasProcessed); + ASSERT_EQUALS(replMetadata.getPrimaryIndex(), + dataReplicatorExternalState->replMetadataProcessed.getPrimaryIndex()); + ASSERT_EQUALS(oqMetadata.getPrimaryIndex(), + dataReplicatorExternalState->oqMetadataProcessed.getPrimaryIndex()); +} + +TEST_F(OplogFetcherTest, + MetadataWithoutOplogQueryMetadataIsNotProcessedOnBatchThatTriggersRollback) { rpc::ReplSetMetadata metadata(1, lastFetched.opTime, lastFetched.opTime, 1, OID::gen(), 2, 2); BSONObjBuilder bob; ASSERT_OK(metadata.writeToMetadata(&bob)); @@ -574,6 +605,30 @@ TEST_F(OplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) { ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); } +TEST_F(OplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) { + rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); + rpc::OplogQueryMetadata oqMetadata(lastFetched.opTime, lastFetched.opTime, 1, 2, 2); + BSONObjBuilder bob; + ASSERT_OK(replMetadata.writeToMetadata(&bob)); + ASSERT_OK(oqMetadata.writeToMetadata(&bob)); + auto metadataObj = bob.obj(); + ASSERT_EQUALS(ErrorCodes::OplogStartMissing, + processSingleBatch( + {makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}), + metadataObj, + Milliseconds(0)}) + ->getStatus()); + ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); +} + +TEST_F(OplogFetcherTest, EmptyMetadataIsNotProcessed) { + ASSERT_OK(processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}), + rpc::makeEmptyMetadata(), + Milliseconds(0)}) + ->getStatus()); + ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); +} + TEST_F(OplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithRemoteOplogStaleError) { ASSERT_EQUALS(ErrorCodes::RemoteOplogStale, processSingleBatch(makeCursorResponse(0, {}))->getStatus()); @@ -674,18 +729,21 @@ TEST_F(OplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromCallback) { ASSERT_EQ(shutdownState->getStatus(), Status(ErrorCodes::InternalError, "my custom error")); } -void OplogFetcherTest::testSyncSourceChecking(rpc::ReplSetMetadata* metadata) { +void OplogFetcherTest::testSyncSourceChecking(rpc::ReplSetMetadata* replMetadata, + rpc::OplogQueryMetadata* oqMetadata) { auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200); auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300); Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry}; - BSONObj metadataObj; - if (metadata) { - BSONObjBuilder bob; - ASSERT_OK(metadata->writeToMetadata(&bob)); - metadataObj = bob.obj(); + BSONObjBuilder bob; + if (replMetadata) { + ASSERT_OK(replMetadata->writeToMetadata(&bob)); } + if (oqMetadata) { + ASSERT_OK(oqMetadata->writeToMetadata(&bob)); + } + BSONObj metadataObj = bob.obj(); dataReplicatorExternalState->shouldStopFetchingResult = true; @@ -703,7 +761,7 @@ void OplogFetcherTest::testSyncSourceChecking(rpc::ReplSetMetadata* metadata) { } TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFetcher) { - testSyncSourceChecking(nullptr); + testSyncSourceChecking(nullptr, nullptr); // Sync source optime and "hasSyncSource" are not available if the response does not // contain metadata. @@ -712,7 +770,7 @@ TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFetche ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource); } -TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithMetadataStopsTheOplogFetcher) { +TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithReplSetMetadataStopsTheOplogFetcher) { rpc::ReplSetMetadata metadata(lastFetched.opTime.getTerm(), {{Seconds(10000), 0}, 1}, {{Seconds(20000), 0}, 1}, @@ -721,7 +779,7 @@ TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithMetadataStopsTheOplogFetcher) 2, 2); - testSyncSourceChecking(&metadata); + testSyncSourceChecking(&metadata, nullptr); // Sync source optime and "hasSyncSource" can be set if the respone contains metadata. ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked); @@ -729,8 +787,21 @@ TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithMetadataStopsTheOplogFetcher) ASSERT_TRUE(dataReplicatorExternalState->syncSourceHasSyncSource); } +TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetcher) { + rpc::ReplSetMetadata replMetadata( + lastFetched.opTime.getTerm(), OpTime(), OpTime(), 1, OID::gen(), -1, -1); + rpc::OplogQueryMetadata oqMetadata({{Seconds(10000), 0}, 1}, {{Seconds(20000), 0}, 1}, 1, 2, 2); + + testSyncSourceChecking(&replMetadata, &oqMetadata); + + // Sync source optime and "hasSyncSource" can be set if the respone contains metadata. + ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked); + ASSERT_EQUALS(oqMetadata.getLastOpApplied(), dataReplicatorExternalState->syncSourceLastOpTime); + ASSERT_TRUE(dataReplicatorExternalState->syncSourceHasSyncSource); +} + TEST_F(OplogFetcherTest, - FailedSyncSourceCheckWithSyncSourceHavingNoSyncSourceStopsTheOplogFetcher) { + FailedSyncSourceCheckWithSyncSourceHavingNoSyncSourceInReplSetMetadataStopsTheOplogFetcher) { rpc::ReplSetMetadata metadata(lastFetched.opTime.getTerm(), {{Seconds(10000), 0}, 1}, {{Seconds(20000), 0}, 1}, @@ -739,7 +810,7 @@ TEST_F(OplogFetcherTest, 2, -1); - testSyncSourceChecking(&metadata); + testSyncSourceChecking(&metadata, nullptr); // Sync source "hasSyncSource" is derived from metadata. ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked); @@ -747,6 +818,25 @@ TEST_F(OplogFetcherTest, ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource); } +TEST_F(OplogFetcherTest, + FailedSyncSourceCheckWithSyncSourceHavingNoSyncSourceStopsTheOplogFetcher) { + rpc::ReplSetMetadata replMetadata(lastFetched.opTime.getTerm(), + {{Seconds(10000), 0}, 1}, + {{Seconds(20000), 0}, 1}, + 1, + OID::gen(), + 2, + 2); + rpc::OplogQueryMetadata oqMetadata( + {{Seconds(10000), 0}, 1}, {{Seconds(20000), 0}, 1}, 1, 2, -1); + + testSyncSourceChecking(&replMetadata, &oqMetadata); + + // Sync source "hasSyncSource" is derived from metadata. + ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked); + ASSERT_EQUALS(oqMetadata.getLastOpApplied(), dataReplicatorExternalState->syncSourceLastOpTime); + ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource); +} RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling(bool isV1ElectionProtocol) { ShutdownState shutdownState; diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index a9d55d42259..a840b8e99ed 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -481,11 +481,16 @@ public: /** * Processes the ReplSetMetadata returned from a command run against another * replica set member and so long as the config version in the metadata matches the replica set - * config version this node currently has, updates the current term and optionally updates - * this node's notion of the commit point. + * config version this node currently has, updates the current term. + * + * This does NOT update this node's notion of the commit point. + */ + virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) = 0; + + /** + * This updates the node's notion of the commit point. */ - virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata, - bool advanceCommitPoint) = 0; + virtual void advanceCommitPoint(const OpTime& committedOptime) = 0; /** * Elections under protocol version 1 are triggered by a timer. diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 0716601d6e7..623f5251977 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -2126,13 +2126,12 @@ void ReplicationCoordinatorImpl::processReplSetGetConfig(BSONObjBuilder* result) result->append("config", _rsConfig.toBSON()); } -void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata, - bool advanceCommitPoint) { +void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) { EventHandle evh; { LockGuard topoLock(_topoMutex); - evh = _processReplSetMetadata_incallback(replMetadata, advanceCommitPoint); + evh = _processReplSetMetadata_incallback(replMetadata); } if (evh) { @@ -2146,13 +2145,10 @@ void ReplicationCoordinatorImpl::cancelAndRescheduleElectionTimeout() { } EventHandle ReplicationCoordinatorImpl::_processReplSetMetadata_incallback( - const rpc::ReplSetMetadata& replMetadata, bool advanceCommitPoint) { + const rpc::ReplSetMetadata& replMetadata) { if (replMetadata.getConfigVersion() != _rsConfig.getConfigVersion()) { return EventHandle(); } - if (advanceCommitPoint) { - _setLastCommittedOpTime(replMetadata.getLastOpCommitted()); - } return _updateTerm_incallback(replMetadata.getTerm()); } @@ -3133,11 +3129,13 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* txn _externalState->setGlobalTimestamp(txn->getServiceContext(), lastOpTime.getTimestamp()); } -bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource, - const rpc::ReplSetMetadata& metadata) { +bool ReplicationCoordinatorImpl::shouldChangeSyncSource( + const HostAndPort& currentSource, + const rpc::ReplSetMetadata& replMetadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata) { LockGuard topoLock(_topoMutex); return _topCoord->shouldChangeSyncSource( - currentSource, getMyLastAppliedOpTime(), metadata, _replExecutor.now()); + currentSource, getMyLastAppliedOpTime(), replMetadata, oqMetadata, _replExecutor.now()); } void ReplicationCoordinatorImpl::_updateLastCommittedOpTime_inlock() { @@ -3168,15 +3166,15 @@ void ReplicationCoordinatorImpl::_updateLastCommittedOpTime_inlock() { // need the majority to have this OpTime OpTime committedOpTime = votingNodesOpTimes[votingNodesOpTimes.size() - _rsConfig.getWriteMajority()]; - _setLastCommittedOpTime_inlock(committedOpTime); + _advanceCommitPoint_inlock(committedOpTime); } -void ReplicationCoordinatorImpl::_setLastCommittedOpTime(const OpTime& committedOpTime) { +void ReplicationCoordinatorImpl::advanceCommitPoint(const OpTime& committedOpTime) { stdx::unique_lock<stdx::mutex> lk(_mutex); - _setLastCommittedOpTime_inlock(committedOpTime); + _advanceCommitPoint_inlock(committedOpTime); } -void ReplicationCoordinatorImpl::_setLastCommittedOpTime_inlock(const OpTime& committedOpTime) { +void ReplicationCoordinatorImpl::_advanceCommitPoint_inlock(const OpTime& committedOpTime) { if (committedOpTime == _lastCommittedOpTime) { return; // Hasn't changed, so ignore it. } else if (committedOpTime < _lastCommittedOpTime) { diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index c481b30cdc6..9144e88b6b8 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -200,8 +200,9 @@ public: virtual void processReplSetGetConfig(BSONObjBuilder* result) override; - virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata, - bool advanceCommitPoint) override; + virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) override; + + virtual void advanceCommitPoint(const OpTime& committedOpTime) override; virtual void cancelAndRescheduleElectionTimeout() override; @@ -262,8 +263,10 @@ public: virtual void resetLastOpTimesFromOplog(OperationContext* txn) override; - virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, - const rpc::ReplSetMetadata& metadata) override; + virtual bool shouldChangeSyncSource( + const HostAndPort& currentSource, + const rpc::ReplSetMetadata& replMetadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata) override; virtual OpTime getLastCommittedOpTime() const override; @@ -590,8 +593,7 @@ private: * Updates the last committed OpTime to be "committedOpTime" if it is more recent than the * current last committed OpTime. */ - void _setLastCommittedOpTime(const OpTime& committedOpTime); - void _setLastCommittedOpTime_inlock(const OpTime& committedOpTime); + void _advanceCommitPoint_inlock(const OpTime& committedOpTime); /** * Helper to wake waiters in _replicationWaiterList that are doneWaitingForReplication. @@ -984,12 +986,13 @@ private: /** * Callback that processes the ReplSetMetadata returned from a command run against another * replica set member and so long as the config version in the metadata matches the replica set - * config version this node currently has, updates the current term and optionally updates - * this node's notion of the commit point. + * config version this node currently has, updates the current term. + * + * This does NOT update this node's notion of the commit point. + * * Returns the finish event which is invalid if the process has already finished. */ - EventHandle _processReplSetMetadata_incallback(const rpc::ReplSetMetadata& replMetadata, - bool advanceCommitPoint); + EventHandle _processReplSetMetadata_incallback(const rpc::ReplSetMetadata& replMetadata); /** * Prepares a metadata object for ReplSetMetadata. diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 6c344de33cb..4c85e836bbb 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -162,11 +162,13 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( replMetadata = responseStatus; } if (replMetadata.isOK()) { + // Arbiters are the only nodes allowed to advance their commit point via heartbeats. + if (getMemberState().arbiter()) { + advanceCommitPoint(replMetadata.getValue().getLastOpCommitted()); + } // Asynchronous stepdown could happen, but it will wait for _topoMutex and execute // after this function, so we cannot and don't need to wait for it to finish. - // Arbiters are the only nodes allowed to advance their commit point via heartbeats. - bool advanceCommitPoint = getMemberState().arbiter(); - _processReplSetMetadata_incallback(replMetadata.getValue(), advanceCommitPoint); + _processReplSetMetadata_incallback(replMetadata.getValue()); } } const Date_t now = _replExecutor.now(); diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index 0ebaa43cc77..1cc20369998 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -341,45 +341,6 @@ TEST_F(ReplCoordHBV1Test, assertMemberState(MemberState::RS_RECOVERING, "0"); } -TEST_F(ReplCoordHBV1Test, ArbiterRecordsCommittedOpTimeFromHeartbeatMetadata) { - // Tests that an arbiter will update its committed optime from the heartbeat metadata - assertStartSuccess(fromjson("{_id:'mySet', version:1, protocolVersion:1, members:[" - "{_id:1, host:'node1:12345', arbiterOnly:true}, " - "{_id:2, host:'node2:12345'}]}"), - HostAndPort("node1", 12345)); - ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_ARBITER)); - - // calls processReplSetMetadata with the "committed" optime and verifies that the arbiter sets - // its current optime to 'expected' - auto test = [this](OpTime committedOpTime, OpTime expected) { - // process heartbeat metadata directly - StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata( - BSON(rpc::kReplSetMetadataFieldName - << BSON("lastOpCommitted" << BSON("ts" << committedOpTime.getTimestamp() << "t" - << committedOpTime.getTerm()) - << "lastOpVisible" - << BSON("ts" << committedOpTime.getTimestamp() << "t" - << committedOpTime.getTerm()) - << "configVersion" - << 1 - << "primaryIndex" - << 1 - << "term" - << committedOpTime.getTerm() - << "syncSourceIndex" - << 1))); - ASSERT_OK(metadata.getStatus()); - getReplCoord()->processReplSetMetadata(metadata.getValue(), true); - - ASSERT_EQ(getReplCoord()->getMyLastAppliedOpTime().getTimestamp(), expected.getTimestamp()); - }; - - OpTime committedOpTime{Timestamp{10, 10}, 10}; - test(committedOpTime, committedOpTime); - OpTime olderOpTime{Timestamp{2, 2}, 9}; - test(olderOpTime, committedOpTime); -} - TEST_F(ReplCoordHBV1Test, IgnoreTheContentsOfMetadataWhenItsReplicaSetIdDoesNotMatchOurs) { // Tests that a secondary node will not update its committed optime from the heartbeat metadata // if the replica set ID is inconsistent with the existing configuration. diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index bb9e9a4535d..9e8a92a531f 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -3837,8 +3837,8 @@ TEST_F(ReplCoordTest, IgnoreTheContentsOfMetadataWhenItsConfigVersionDoesNotMatc << 2 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata.getValue(), true); - ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); + getReplCoord()->processReplSetMetadata(metadata.getValue()); + ASSERT_EQUALS(0, getReplCoord()->getTerm()); // higher configVersion StatusWith<rpc::ReplSetMetadata> metadata2 = rpc::ReplSetMetadata::readFromMetadata(BSON( @@ -3853,11 +3853,11 @@ TEST_F(ReplCoordTest, IgnoreTheContentsOfMetadataWhenItsConfigVersionDoesNotMatc << 2 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata2.getValue(), true); - ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); + getReplCoord()->processReplSetMetadata(metadata2.getValue()); + ASSERT_EQUALS(0, getReplCoord()->getTerm()); } -TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeFromMetadataIsNewer) { +TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeIsNewer) { // Ensure that LastCommittedOpTime updates when a newer OpTime comes in via ReplSetMetadata, // but not if the OpTime is older than the current LastCommittedOpTime. assertStartSuccess(BSON("_id" @@ -3887,42 +3887,19 @@ TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeFromMet ASSERT_EQUALS(1, getReplCoord()->getTerm()); OpTime time(Timestamp(10, 0), 1); + OpTime oldTime(Timestamp(9, 0), 1); getReplCoord()->createSnapshot(txn.get(), time, SnapshotName(1)); // higher OpTime, should change - StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON( - rpc::kReplSetMetadataFieldName - << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 1) << "lastOpVisible" - << BSON("ts" << Timestamp(10, 0) << "t" << 1) - << "configVersion" - << 2 - << "primaryIndex" - << 2 - << "term" - << 1 - << "syncSourceIndex" - << 1))); - getReplCoord()->processReplSetMetadata(metadata.getValue(), true); - ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getLastCommittedOpTime()); - ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getCurrentCommittedSnapshotOpTime()); + getReplCoord()->advanceCommitPoint(time); + ASSERT_EQUALS(time, getReplCoord()->getLastCommittedOpTime()); + ASSERT_EQUALS(time, getReplCoord()->getCurrentCommittedSnapshotOpTime()); // lower OpTime, should not change - StatusWith<rpc::ReplSetMetadata> metadata2 = rpc::ReplSetMetadata::readFromMetadata(BSON( - rpc::kReplSetMetadataFieldName - << BSON("lastOpCommitted" << BSON("ts" << Timestamp(9, 0) << "t" << 1) << "lastOpVisible" - << BSON("ts" << Timestamp(9, 0) << "t" << 1) - << "configVersion" - << 2 - << "primaryIndex" - << 2 - << "term" - << 1 - << "syncSourceIndex" - << 1))); - getReplCoord()->processReplSetMetadata(metadata2.getValue(), true); - ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getLastCommittedOpTime()); + getReplCoord()->advanceCommitPoint(oldTime); + ASSERT_EQUALS(time, getReplCoord()->getLastCommittedOpTime()); + ASSERT_EQUALS(time, getReplCoord()->getCurrentCommittedSnapshotOpTime()); } - TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurrentPrimaryIndex) { // Ensure that the term is updated if and only if the term is greater than our current term. // Ensure that currentPrimaryIndex is never altered by ReplSetMetadata. @@ -3964,10 +3941,10 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr << 3 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata.getValue(), true); - ASSERT_EQUALS(OpTime(Timestamp(10, 0), 3), getReplCoord()->getLastCommittedOpTime()); + getReplCoord()->processReplSetMetadata(metadata.getValue()); ASSERT_EQUALS(3, getReplCoord()->getTerm()); ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex()); + ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); // lower term, should not change StatusWith<rpc::ReplSetMetadata> metadata2 = rpc::ReplSetMetadata::readFromMetadata(BSON( @@ -3982,10 +3959,10 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr << 2 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata2.getValue(), true); - ASSERT_EQUALS(OpTime(Timestamp(11, 0), 3), getReplCoord()->getLastCommittedOpTime()); + getReplCoord()->processReplSetMetadata(metadata2.getValue()); ASSERT_EQUALS(3, getReplCoord()->getTerm()); ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex()); + ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); // same term, should not change StatusWith<rpc::ReplSetMetadata> metadata3 = rpc::ReplSetMetadata::readFromMetadata(BSON( @@ -4000,10 +3977,10 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr << 3 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata3.getValue(), true); - ASSERT_EQUALS(OpTime(Timestamp(11, 0), 3), getReplCoord()->getLastCommittedOpTime()); + getReplCoord()->processReplSetMetadata(metadata3.getValue()); ASSERT_EQUALS(3, getReplCoord()->getTerm()); ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex()); + ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); } TEST_F(ReplCoordTest, @@ -4095,23 +4072,10 @@ TEST_F(ReplCoordTest, PrepareOplogQueryMetadata) { HostAndPort("node1", 12345)); getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY); - // Update committed optime with ReplSetMetadata. OpTime optime1{Timestamp(10, 0), 3}; OpTime optime2{Timestamp(11, 2), 5}; - StatusWith<rpc::ReplSetMetadata> metadataToProcess = rpc::ReplSetMetadata::readFromMetadata( - BSON(rpc::kReplSetMetadataFieldName - << BSON("lastOpCommitted" << optime1.toBSON() << "lastOpVisible" << optime1.toBSON() - << "configVersion" - << 2 - << "primaryIndex" - << 2 - << "term" - << 3 - << "syncSourceIndex" - << 1))); - getReplCoord()->processReplSetMetadata(metadataToProcess.getValue(), true); - + getReplCoord()->advanceCommitPoint(optime1); getReplCoord()->setMyLastAppliedOpTime(optime2); // Get current rbid to check against. @@ -4141,7 +4105,7 @@ TEST_F(ReplCoordTest, PrepareOplogQueryMetadata) { ASSERT_EQ(replMetadata.getValue().getLastOpCommitted(), optime1); ASSERT_EQ(replMetadata.getValue().getLastOpVisible(), OpTime()); ASSERT_EQ(replMetadata.getValue().getConfigVersion(), 2); - ASSERT_EQ(replMetadata.getValue().getTerm(), 3); + ASSERT_EQ(replMetadata.getValue().getTerm(), 0); ASSERT_EQ(replMetadata.getValue().getSyncSourceIndex(), -1); ASSERT_EQ(replMetadata.getValue().getPrimaryIndex(), -1); } @@ -4884,7 +4848,8 @@ TEST_F(ReplCoordTest, NewStyleUpdatePositionCmdHasMetadata) { // Set last committed optime via metadata. rpc::ReplSetMetadata syncSourceMetadata(optime.getTerm(), optime, optime, 1, OID(), -1, 1); - getReplCoord()->processReplSetMetadata(syncSourceMetadata, true); + getReplCoord()->processReplSetMetadata(syncSourceMetadata); + getReplCoord()->advanceCommitPoint(optime); getReplCoord()->createSnapshot(txn.get(), optime, SnapshotName(1)); BSONObj cmd = unittest::assertGet(getReplCoord()->prepareReplSetUpdatePositionCommand( @@ -4892,6 +4857,9 @@ TEST_F(ReplCoordTest, NewStyleUpdatePositionCmdHasMetadata) { auto metadata = unittest::assertGet(rpc::ReplSetMetadata::readFromMetadata(cmd)); ASSERT_EQUALS(metadata.getTerm(), getReplCoord()->getTerm()); ASSERT_EQUALS(metadata.getLastOpVisible(), optime); + + auto oqMetadataStatus = rpc::OplogQueryMetadata::readFromMetadata(cmd); + ASSERT_EQUALS(oqMetadataStatus.getStatus(), ErrorCodes::NoSuchKey); } TEST_F(ReplCoordTest, StepDownWhenHandleLivenessTimeoutMarksAMajorityOfVotingNodesDown) { diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 2a4432c2590..fd41930ce28 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -250,8 +250,9 @@ void ReplicationCoordinatorMock::processReplSetGetConfig(BSONObjBuilder* result) // TODO } -void ReplicationCoordinatorMock::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata, - bool advanceCommitPoint) {} +void ReplicationCoordinatorMock::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) {} + +void ReplicationCoordinatorMock::advanceCommitPoint(const OpTime& committedOptime) {} void ReplicationCoordinatorMock::cancelAndRescheduleElectionTimeout() {} @@ -382,8 +383,10 @@ void ReplicationCoordinatorMock::resetLastOpTimesFromOplog(OperationContext* txn invariant(false); } -bool ReplicationCoordinatorMock::shouldChangeSyncSource(const HostAndPort& currentSource, - const rpc::ReplSetMetadata& metadata) { +bool ReplicationCoordinatorMock::shouldChangeSyncSource( + const HostAndPort& currentSource, + const rpc::ReplSetMetadata& replMetadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata) { invariant(false); } diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 9825883ecbf..b211007a8dd 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -155,8 +155,9 @@ public: virtual void processReplSetGetConfig(BSONObjBuilder* result); - void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata, - bool advanceCommitPoint) override; + virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) override; + + virtual void advanceCommitPoint(const OpTime& committedOptime) override; virtual void cancelAndRescheduleElectionTimeout() override; @@ -213,7 +214,8 @@ public: virtual void resetLastOpTimesFromOplog(OperationContext* txn); virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, - const rpc::ReplSetMetadata& metadata); + const rpc::ReplSetMetadata& replMetadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata); virtual OpTime getLastCommittedOpTime() const; diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp index 6303888dde1..c2793e42764 100644 --- a/src/mongo/db/repl/replset_commands.cpp +++ b/src/mongo/db/repl/replset_commands.cpp @@ -640,7 +640,7 @@ public: // New style update position command has metadata, which may inform the // upstream of a higher term. auto metadata = metadataResult.getValue(); - replCoord->processReplSetMetadata(metadata, false /*don't advance the commit point*/); + replCoord->processReplSetMetadata(metadata); } // In the case of an update from a member with an invalid replica set config, diff --git a/src/mongo/db/repl/sync_source_selector.h b/src/mongo/db/repl/sync_source_selector.h index 36cd7df3bf4..0c9fbc82d45 100644 --- a/src/mongo/db/repl/sync_source_selector.h +++ b/src/mongo/db/repl/sync_source_selector.h @@ -29,6 +29,7 @@ #pragma once #include "mongo/base/disallow_copying.h" +#include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" @@ -39,6 +40,7 @@ class Timestamp; namespace rpc { class ReplSetMetadata; +class OplogQueryMetadata; } namespace repl { @@ -73,16 +75,21 @@ public: /** * Determines if a new sync source should be chosen, if a better candidate sync source is - * available. If the current sync source's last optime (visibleOpTime of metadata under - * protocolVersion 1, but pulled from the MemberHeartbeatData in protocolVersion 0) is more than - * _maxSyncSourceLagSecs behind any syncable source, this function returns true. If we are - * running in ProtocolVersion 1, our current sync source is not primary, has no sync source - * and only has data up to "myLastOpTime", returns true. + * available. If the current sync source's last optime (visibleOpTime or appliedOpTime of + * metadata under protocolVersion 1, but pulled from the MemberHeartbeatData in protocolVersion + * 0) is more than _maxSyncSourceLagSecs behind any syncable source, this function returns true. + * If we are running in ProtocolVersion 1, our current sync source is not primary, has no sync + * source and only has data up to "myLastOpTime", returns true. * * "now" is used to skip over currently blacklisted sync sources. + * + * OplogQueryMetadata is optional for compatibility with 3.4 servers that do not know to + * send OplogQueryMetadata. + * TODO (SERVER-27668): Make OplogQueryMetadata non-optional in mongodb 3.8. */ virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, - const rpc::ReplSetMetadata& metadata) = 0; + const rpc::ReplSetMetadata& replMetadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata) = 0; }; } // namespace repl diff --git a/src/mongo/db/repl/sync_source_selector_mock.cpp b/src/mongo/db/repl/sync_source_selector_mock.cpp index 0b697a2e234..df8e953c3cf 100644 --- a/src/mongo/db/repl/sync_source_selector_mock.cpp +++ b/src/mongo/db/repl/sync_source_selector_mock.cpp @@ -55,8 +55,10 @@ void SyncSourceSelectorMock::setChooseNewSyncSourceHook_forTest( _chooseNewSyncSourceHook = hook; } -bool SyncSourceSelectorMock::shouldChangeSyncSource(const HostAndPort&, - const rpc::ReplSetMetadata&) { +bool SyncSourceSelectorMock::shouldChangeSyncSource( + const HostAndPort&, + const rpc::ReplSetMetadata&, + boost::optional<rpc::OplogQueryMetadata> oqMetadata) { return false; } diff --git a/src/mongo/db/repl/sync_source_selector_mock.h b/src/mongo/db/repl/sync_source_selector_mock.h index b53dfb6dd49..551eec9b846 100644 --- a/src/mongo/db/repl/sync_source_selector_mock.h +++ b/src/mongo/db/repl/sync_source_selector_mock.h @@ -51,7 +51,9 @@ public: void clearSyncSourceBlacklist() override; HostAndPort chooseNewSyncSource(const OpTime& ot) override; void blacklistSyncSource(const HostAndPort& host, Date_t until) override; - bool shouldChangeSyncSource(const HostAndPort&, const rpc::ReplSetMetadata&) override; + bool shouldChangeSyncSource(const HostAndPort&, + const rpc::ReplSetMetadata&, + boost::optional<rpc::OplogQueryMetadata> oqMetadata) override; /** * Sets a function that will be run every time chooseNewSyncSource() is called. diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 7dcf76164ec..0761b800b38 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -166,10 +166,13 @@ public: * ("syncSourceHasSyncSource" is false), and only has data up to "myLastOpTime", returns true. * * "now" is used to skip over currently blacklisted sync sources. + * + * TODO (SERVER-27668): Make OplogQueryMetadata non-optional in mongodb 3.8. */ virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, const OpTime& myLastOpTime, - const rpc::ReplSetMetadata& metadata, + const rpc::ReplSetMetadata& replMetadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata, Date_t now) const = 0; /** diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index f4e9c01cea7..79dba110db4 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -2431,10 +2431,12 @@ long long TopologyCoordinatorImpl::getTerm() { // TODO(siyuan): Merge _hddata into _slaveInfo, so that we have a single view of the // replset. Passing metadata is unnecessary. -bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource, - const OpTime& myLastOpTime, - const rpc::ReplSetMetadata& metadata, - Date_t now) const { +bool TopologyCoordinatorImpl::shouldChangeSyncSource( + const HostAndPort& currentSource, + const OpTime& myLastOpTime, + const rpc::ReplSetMetadata& replMetadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata, + Date_t now) const { // Methodology: // If there exists a viable sync source member other than currentSource, whose oplog has // reached an optime greater than _options.maxSyncSourceLagSecs later than currentSource's, @@ -2451,9 +2453,9 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS } if (_rsConfig.getProtocolVersion() == 1 && - metadata.getConfigVersion() != _rsConfig.getConfigVersion()) { + replMetadata.getConfigVersion() != _rsConfig.getConfigVersion()) { log() << "Choosing new sync source because the config version supplied by " << currentSource - << ", " << metadata.getConfigVersion() << ", does not match ours, " + << ", " << replMetadata.getConfigVersion() << ", does not match ours, " << _rsConfig.getConfigVersion(); return true; } @@ -2468,8 +2470,22 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS invariant(currentSourceIndex != _selfIndex); - OpTime currentSourceOpTime = - std::max(metadata.getLastOpVisible(), _hbdata.at(currentSourceIndex).getAppliedOpTime()); + // If OplogQueryMetadata was provided, use its values, otherwise use the ones in + // ReplSetMetadata. + OpTime currentSourceOpTime; + int syncSourceIndex = -1; + int primaryIndex = -1; + if (oqMetadata) { + currentSourceOpTime = std::max(oqMetadata->getLastOpApplied(), + _hbdata.at(currentSourceIndex).getAppliedOpTime()); + syncSourceIndex = oqMetadata->getSyncSourceIndex(); + primaryIndex = oqMetadata->getPrimaryIndex(); + } else { + currentSourceOpTime = std::max(replMetadata.getLastOpVisible(), + _hbdata.at(currentSourceIndex).getAppliedOpTime()); + syncSourceIndex = replMetadata.getSyncSourceIndex(); + primaryIndex = replMetadata.getPrimaryIndex(); + } if (currentSourceOpTime.isNull()) { // Haven't received a heartbeat from the sync source yet, so can't tell if we should @@ -2479,16 +2495,15 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS // Change sync source if they are not ahead of us, and don't have a sync source, // unless they are primary. - if (_rsConfig.getProtocolVersion() == 1 && metadata.getSyncSourceIndex() == -1 && - currentSourceOpTime <= myLastOpTime && metadata.getPrimaryIndex() != currentSourceIndex) { + if (_rsConfig.getProtocolVersion() == 1 && syncSourceIndex == -1 && + currentSourceOpTime <= myLastOpTime && primaryIndex != currentSourceIndex) { std::stringstream logMessage; logMessage << "Choosing new sync source because our current sync source, " << currentSource.toString() << ", has an OpTime (" << currentSourceOpTime << ") which is not ahead of ours (" << myLastOpTime << "), it does not have a sync source, and it's not the primary"; - if (metadata.getPrimaryIndex() >= 0) { - logMessage << " (" << _rsConfig.getMemberAt(metadata.getPrimaryIndex()).getHostAndPort() - << " is)"; + if (primaryIndex >= 0) { + logMessage << " (" << _rsConfig.getMemberAt(primaryIndex).getHostAndPort() << " is)"; } else { logMessage << " (sync source does not know the primary)"; } diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index f57d945465e..96708c90b48 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -158,7 +158,8 @@ public: virtual void clearSyncSourceBlacklist(); virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, const OpTime& myLastOpTime, - const rpc::ReplSetMetadata& metadata, + const rpc::ReplSetMetadata& replMetadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata, Date_t now) const; virtual bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now); virtual void setElectionSleepUntil(Date_t newTime); diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp index b61f8907b41..3519dd5e8d5 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp @@ -40,6 +40,7 @@ #include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/db/server_options.h" #include "mongo/logger/logger.h" +#include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" @@ -52,6 +53,7 @@ using std::unique_ptr; using mongo::rpc::ReplSetMetadata; +using mongo::rpc::OplogQueryMetadata; namespace mongo { namespace repl { @@ -148,11 +150,22 @@ protected: } // Make the metadata coming from sync source. Only set visibleOpTime. - ReplSetMetadata makeMetadata(OpTime opTime = OpTime()) { - return ReplSetMetadata( - _topo->getTerm(), OpTime(), opTime, _currentConfig.getConfigVersion(), OID(), -1, -1); + ReplSetMetadata makeReplSetMetadata(OpTime visibleOpTime = OpTime()) { + return ReplSetMetadata(_topo->getTerm(), + OpTime(), + visibleOpTime, + _currentConfig.getConfigVersion(), + OID(), + -1, + -1); } + // Make the metadata coming from sync source. Only set lastAppliedOpTime. + OplogQueryMetadata makeOplogQueryMetadata(OpTime lastAppliedOpTime = OpTime()) { + return OplogQueryMetadata(OpTime(), lastAppliedOpTime, -1, -1, -1); + } + + HeartbeatResponseAction receiveUpHeartbeat(const HostAndPort& member, const std::string& setName, MemberState memberState, @@ -719,10 +732,10 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc getTopoCoord().setForceSyncSourceIndex(1); // force should cause shouldChangeSyncSource() to return true // even if the currentSource is the force target - ASSERT_TRUE( - getTopoCoord().shouldChangeSyncSource(HostAndPort("h2"), OpTime(), makeMetadata(), now())); - ASSERT_TRUE( - getTopoCoord().shouldChangeSyncSource(HostAndPort("h3"), OpTime(), makeMetadata(), now())); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("h2"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now())); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("h3"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now())); getTopoCoord().chooseNewSyncSource( now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); @@ -5349,8 +5362,8 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenMemberNotInConfig) { // In this test, the TopologyCoordinator should tell us to change sync sources away from // "host4" since "host4" is absent from the config of version 10. ReplSetMetadata metadata(0, OpTime(), OpTime(), 10, OID(), -1, -1); - ASSERT_TRUE( - getTopoCoord().shouldChangeSyncSource(HostAndPort("host4"), OpTime(), metadata, now())); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host4"), OpTime(), metadata, makeOplogQueryMetadata(), now())); } TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenMemberHasYetToHeartbeatUs) { @@ -5358,7 +5371,7 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenMemberHasYetToHeartbeatU // "host2" since we do not yet have a heartbeat (and as a result do not yet have an optime) // for "host2" ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), makeMetadata(), now())); + HostAndPort("host2"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now())); } TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenNodeIsFreshByHeartbeatButNotMetadata) { @@ -5388,9 +5401,21 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenNodeIsFreshByHeartbea // set up complete, time for actual check startCapturingLogMessages(); - auto metadata = makeMetadata(lastOpTimeApplied); - ASSERT_FALSE( - getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), metadata, now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(), + makeOplogQueryMetadata(lastOpTimeApplied), + now())); + stopCapturingLogMessages(); + ASSERT_EQUALS(0, countLogLinesContaining("Choosing new sync source")); + + // set up complete, time for actual check + startCapturingLogMessages(); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(lastOpTimeApplied), + boost::none, + now())); stopCapturingLogMessages(); ASSERT_EQUALS(0, countLogLinesContaining("Choosing new sync source")); } @@ -5422,9 +5447,23 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenNodeIsStaleByHeartbea // set up complete, time for actual check startCapturingLogMessages(); - auto metadata = makeMetadata(fresherLastOpTimeApplied); ASSERT_FALSE( - getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), OpTime(), metadata, now())); + getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(), + makeOplogQueryMetadata(fresherLastOpTimeApplied), + now())); + stopCapturingLogMessages(); + ASSERT_EQUALS(0, countLogLinesContaining("Choosing new sync source")); + + // set up complete, time for actual check + startCapturingLogMessages(); + ASSERT_FALSE( + getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(fresherLastOpTimeApplied), + boost::none, + now())); stopCapturingLogMessages(); ASSERT_EQUALS(0, countLogLinesContaining("Choosing new sync source")); } @@ -5456,7 +5495,7 @@ TEST_F(HeartbeatResponseTest, ShouldChangeSyncSourceWhenFresherMemberExists) { // set up complete, time for actual check startCapturingLogMessages(); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), makeMetadata(), now())); + HostAndPort("host2"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source")); } @@ -5490,18 +5529,18 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhileFresherMemberIsBlack // set up complete, time for actual check ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), makeMetadata(), now())); + HostAndPort("host2"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now())); // unblacklist with too early a time (node should remained blacklisted) getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(90)); ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), makeMetadata(), now())); + HostAndPort("host2"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now())); // unblacklist and it should succeed getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100)); startCapturingLogMessages(); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), makeMetadata(), now())); + HostAndPort("host2"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source")); } @@ -5535,7 +5574,7 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberIsDown) nextAction = receiveDownHeartbeat(HostAndPort("host3"), "rs0", lastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), makeMetadata(), now())); + HostAndPort("host2"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now())); } TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberIsNotReadable) { @@ -5565,7 +5604,7 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberIsNotRea // set up complete, time for actual check ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), makeMetadata(), now())); + HostAndPort("host2"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now())); } TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberDoesNotBuildIndexes) { @@ -5610,7 +5649,7 @@ TEST_F(HeartbeatResponseTest, ShouldNotChangeSyncSourceWhenFresherMemberDoesNotB // set up complete, time for actual check ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), makeMetadata(), now())); + HostAndPort("host2"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now())); } TEST_F(HeartbeatResponseTest, @@ -5661,7 +5700,7 @@ TEST_F(HeartbeatResponseTest, // set up complete, time for actual check startCapturingLogMessages(); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), makeMetadata(), now())); + HostAndPort("host2"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source")); } diff --git a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp index 216bbceabc7..a171f7ca89e 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp @@ -41,6 +41,7 @@ #include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/db/server_options.h" #include "mongo/logger/logger.h" +#include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" @@ -53,6 +54,7 @@ using std::unique_ptr; using mongo::rpc::ReplSetMetadata; +using mongo::rpc::OplogQueryMetadata; namespace mongo { namespace repl { @@ -149,20 +151,28 @@ protected: _currentConfig = config; } - // Make the metadata coming from sync source. + // Make the ReplSetMetadata coming from sync source. // Only set visibleOpTime, primaryIndex and syncSourceIndex - ReplSetMetadata makeMetadata(OpTime opTime = OpTime(), - int primaryIndex = -1, - int syncSourceIndex = -1) { + ReplSetMetadata makeReplSetMetadata(OpTime visibleOpTime = OpTime(), + int primaryIndex = -1, + int syncSourceIndex = -1) { return ReplSetMetadata(_topo->getTerm(), OpTime(), - opTime, + visibleOpTime, _currentConfig.getConfigVersion(), OID(), primaryIndex, syncSourceIndex); } + // Make the OplogQueryMetadata coming from sync source. + // Only set lastAppliedOpTime, primaryIndex and syncSourceIndex + OplogQueryMetadata makeOplogQueryMetadata(OpTime lastAppliedOpTime = OpTime(), + int primaryIndex = -1, + int syncSourceIndex = -1) { + return OplogQueryMetadata(OpTime(), lastAppliedOpTime, -1, primaryIndex, syncSourceIndex); + } + HeartbeatResponseAction receiveUpHeartbeat(const HostAndPort& member, const std::string& setName, MemberState memberState, @@ -771,10 +781,20 @@ TEST_F(TopoCoordTest, ChooseRequestedSyncSourceOnlyTheFirstTimeAfterTheSyncSourc getTopoCoord().setForceSyncSourceIndex(1); // force should cause shouldChangeSyncSource() to return true // even if the currentSource is the force target + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("h2"), + OpTime(), + makeReplSetMetadata(), + makeOplogQueryMetadata(oldOpTime), + now())); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("h3"), + OpTime(), + makeReplSetMetadata(), + makeOplogQueryMetadata(newOpTime), + now())); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("h2"), OpTime(), makeMetadata(oldOpTime), now())); + HostAndPort("h2"), OpTime(), makeReplSetMetadata(oldOpTime), boost::none, now())); ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("h3"), OpTime(), makeMetadata(newOpTime), now())); + HostAndPort("h3"), OpTime(), makeReplSetMetadata(newOpTime), boost::none, now())); getTopoCoord().chooseNewSyncSource( now()++, OpTime(), TopologyCoordinator::ChainingPreference::kUseConfiguration); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); @@ -3229,8 +3249,21 @@ TEST_F(HeartbeatResponseTestV1, // set up complete, time for actual check startCapturingLogMessages(); - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(), + makeOplogQueryMetadata(lastOpTimeApplied), + now())); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source")); + + // set up complete, time for actual check + startCapturingLogMessages(); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(lastOpTimeApplied), + boost::none, + now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source")); } @@ -3271,8 +3304,16 @@ TEST_F(HeartbeatResponseTestV1, lastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); // Show we like host2 while it is primary. - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), lastOpTimeApplied, makeMetadata(lastOpTimeApplied, 1), now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + lastOpTimeApplied, + makeReplSetMetadata(), + makeOplogQueryMetadata(lastOpTimeApplied, 1), + now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + lastOpTimeApplied, + makeReplSetMetadata(lastOpTimeApplied, 1), + boost::none, + now())); // Show that we also like host2 while it has a sync source. nextAction = receiveUpHeartbeat(HostAndPort("host2"), @@ -3282,8 +3323,17 @@ TEST_F(HeartbeatResponseTestV1, lastOpTimeApplied, lastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), lastOpTimeApplied, makeMetadata(lastOpTimeApplied, 2, 2), now())); + ASSERT_FALSE( + getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + lastOpTimeApplied, + makeReplSetMetadata(), + makeOplogQueryMetadata(lastOpTimeApplied, 2, 2), + now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + lastOpTimeApplied, + makeReplSetMetadata(lastOpTimeApplied, 2, 2), + boost::none, + now())); // Show that we do not like it when it is not PRIMARY and lacks a sync source and lacks progress // beyond our own. @@ -3294,14 +3344,30 @@ TEST_F(HeartbeatResponseTestV1, lastOpTimeApplied, lastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); - ASSERT(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), lastOpTimeApplied, makeMetadata(lastOpTimeApplied), now())); + ASSERT(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + lastOpTimeApplied, + makeReplSetMetadata(), + makeOplogQueryMetadata(lastOpTimeApplied), + now())); + ASSERT(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + lastOpTimeApplied, + makeReplSetMetadata(lastOpTimeApplied), + boost::none, + now())); // Sometimes the heartbeat is stale and the metadata says it's the primary. Trust the metadata. ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( HostAndPort("host2"), lastOpTimeApplied, - makeMetadata(lastOpTimeApplied, 1 /* host2 is primary */, -1 /* no sync source */), + makeReplSetMetadata(), + makeOplogQueryMetadata( + lastOpTimeApplied, 1 /* host2 is primary */, -1 /* no sync source */), + now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host2"), + lastOpTimeApplied, + makeReplSetMetadata(lastOpTimeApplied, 1 /* host2 is primary */, -1 /* no sync source */), + boost::none, now())); // But if it is secondary and has some progress beyond our own, we still like it. @@ -3313,8 +3379,18 @@ TEST_F(HeartbeatResponseTestV1, newerThanLastOpTimeApplied, newerThanLastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), lastOpTimeApplied, makeMetadata(newerThanLastOpTimeApplied), now())); + ASSERT_FALSE( + getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + lastOpTimeApplied, + makeReplSetMetadata(), + makeOplogQueryMetadata(newerThanLastOpTimeApplied), + now())); + ASSERT_FALSE( + getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + lastOpTimeApplied, + makeReplSetMetadata(newerThanLastOpTimeApplied), + boost::none, + now())); } TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsDown) { @@ -3345,8 +3421,17 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsDown // set up complete, time for actual check nextAction = receiveDownHeartbeat(HostAndPort("host3"), "rs0", lastOpTimeApplied); ASSERT_NO_ACTION(nextAction.getAction()); - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(), + makeOplogQueryMetadata(lastOpTimeApplied), + now())); + + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(lastOpTimeApplied), + boost::none, + now())); } TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhileFresherMemberIsBlackListed) { @@ -3377,19 +3462,47 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhileFresherMemberIsBla getTopoCoord().blacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100)); // set up complete, time for actual check - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(), + makeOplogQueryMetadata(lastOpTimeApplied), + now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(lastOpTimeApplied), + boost::none, + now())); // unblacklist with too early a time (node should remained blacklisted) getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(90)); - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(), + makeOplogQueryMetadata(lastOpTimeApplied), + now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(lastOpTimeApplied), + boost::none, + now())); // unblacklist and it should succeed getTopoCoord().unblacklistSyncSource(HostAndPort("host3"), now() + Milliseconds(100)); startCapturingLogMessages(); - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(), + makeOplogQueryMetadata(lastOpTimeApplied), + now())); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source")); + + startCapturingLogMessages(); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(lastOpTimeApplied), + boost::none, + now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source")); } @@ -3421,8 +3534,20 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfNodeIsFreshByHeartbea // set up complete, time for actual check startCapturingLogMessages(); - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(), + makeOplogQueryMetadata(lastOpTimeApplied), + now())); + stopCapturingLogMessages(); + ASSERT_EQUALS(0, countLogLinesContaining("Choosing new sync source")); + + startCapturingLogMessages(); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(lastOpTimeApplied), + boost::none, + now())); stopCapturingLogMessages(); ASSERT_EQUALS(0, countLogLinesContaining("Choosing new sync source")); } @@ -3454,8 +3579,22 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceIfNodeIsStaleByHeartbea // set up complete, time for actual check startCapturingLogMessages(); - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), makeMetadata(fresherLastOpTimeApplied), now())); + ASSERT_FALSE( + getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(), + makeOplogQueryMetadata(fresherLastOpTimeApplied), + now())); + stopCapturingLogMessages(); + ASSERT_EQUALS(0, countLogLinesContaining("Choosing new sync source")); + // set up complete, time for actual check + startCapturingLogMessages(); + ASSERT_FALSE( + getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(fresherLastOpTimeApplied), + boost::none, + now())); stopCapturingLogMessages(); ASSERT_EQUALS(0, countLogLinesContaining("Choosing new sync source")); } @@ -3486,8 +3625,21 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenFresherMemberExists) { // set up complete, time for actual check startCapturingLogMessages(); - ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(), + makeOplogQueryMetadata(lastOpTimeApplied), + now())); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source")); + + // set up complete, time for actual check + startCapturingLogMessages(); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(lastOpTimeApplied), + boost::none, + now())); stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source")); } @@ -3496,15 +3648,15 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenMemberHasYetToHeart // In this test, the TopologyCoordinator should not tell us to change sync sources away from // "host2" since we do not use the member's heartbeatdata in pv1. ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), makeMetadata(), now())); + HostAndPort("host2"), OpTime(), makeReplSetMetadata(), makeOplogQueryMetadata(), now())); } TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenMemberNotInConfig) { // In this test, the TopologyCoordinator should tell us to change sync sources away from // "host4" since "host4" is absent from the config of version 10. - ReplSetMetadata metadata(0, OpTime(), OpTime(), 10, OID(), -1, -1); - ASSERT_TRUE( - getTopoCoord().shouldChangeSyncSource(HostAndPort("host4"), OpTime(), metadata, now())); + ReplSetMetadata replMetadata(0, OpTime(), OpTime(), 10, OID(), -1, -1); + ASSERT_TRUE(getTopoCoord().shouldChangeSyncSource( + HostAndPort("host4"), OpTime(), replMetadata, makeOplogQueryMetadata(), now())); } // TODO(dannenberg) figure out what this is trying to test.. @@ -4433,8 +4585,17 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberDoesNo ASSERT_NO_ACTION(nextAction.getAction()); // set up complete, time for actual check - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(), + makeOplogQueryMetadata(lastOpTimeApplied), + now())); + // set up complete, time for actual check + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(lastOpTimeApplied), + boost::none, + now())); } TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsNotReadable) { @@ -4463,8 +4624,18 @@ TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceWhenFresherMemberIsNotR ASSERT_NO_ACTION(nextAction.getAction()); // set up complete, time for actual check - ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource( - HostAndPort("host2"), OpTime(), makeMetadata(lastOpTimeApplied), now())); + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(), + makeOplogQueryMetadata(lastOpTimeApplied), + now())); + + // set up complete, time for actual check + ASSERT_FALSE(getTopoCoord().shouldChangeSyncSource(HostAndPort("host2"), + OpTime(), + makeReplSetMetadata(lastOpTimeApplied), + boost::none, + now())); } class HeartbeatResponseTestOneRetryV1 : public HeartbeatResponseTestV1 { |