diff options
author | Samyukta Lanka <samy.lanka@mongodb.com> | 2020-02-12 23:19:35 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2020-02-12 23:19:35 +0000 |
commit | 0a4a011397d7e399cbe93a2e2da60aec84997533 (patch) | |
tree | f204d0c765e9ef04d11e405706b227660af5366b /src | |
parent | 994fdd99bb6adb2cf9c7dd4061c2035188c2c8da (diff) | |
download | mongo-0a4a011397d7e399cbe93a2e2da60aec84997533.tar.gz |
SERVER-45470 Process metadata with each new batch in the new oplog fetcher
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.cpp | 114 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_test.cpp | 283 |
3 files changed, 365 insertions, 39 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 7f7e8c11d0d..5cd9e5c967c 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -248,6 +248,31 @@ StatusWith<boost::optional<rpc::OplogQueryMetadata>> parseOplogQueryMetadata( } return oqMetadata; } + +/** + * Parses the cursor's metadata response for the OplogQueryMetadata. If there is an error it returns + * it. If no OplogQueryMetadata is provided then it returns boost::none. + * + * OplogQueryMetadata is made optional for backwards compatibility. + * TODO SERVER-27668: Make this non-optional. When this stops being optional we can remove the + * duplicated fields in both metadata types and begin to always use OplogQueryMetadata's data. + */ +StatusWith<boost::optional<rpc::OplogQueryMetadata>> parseOplogQueryMetadata( + const BSONObj& metadata) { + boost::optional<rpc::OplogQueryMetadata> oqMetadata = boost::none; + + bool receivedOplogQueryMetadata = metadata.hasElement(rpc::kOplogQueryMetadataFieldName); + if (receivedOplogQueryMetadata) { + auto metadataResult = rpc::OplogQueryMetadata::readFromMetadata(metadata); + if (!metadataResult.isOK()) { + return metadataResult.getStatus(); + } + + oqMetadata = boost::make_optional(metadataResult.getValue()); + } + + return oqMetadata; +} } // namespace StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments( @@ -708,6 +733,22 @@ Mutex* NewOplogFetcher::_getMutex() noexcept { return &_mutex; } +std::string NewOplogFetcher::toString() { + stdx::lock_guard lock(_mutex); + str::stream output; + output << "OplogFetcher -"; + output << " last optime fetched: " << _lastFetched.toString(); + output << " source: " << _source.toString(); + output << " namespace: " << _nss.toString(); + output << " active: " << _isActive_inlock(); + output << " shutting down?:" << _isShuttingDown_inlock(); + output << " first batch: " << _firstBatch; + output << " initial find timeout: " << _getInitialFindMaxTime(); + output << " retried find timeout: " << _getRetriedFindMaxTime(); + output << " awaitData timeout: " << _awaitDataTimeout; + return output; +} + OpTime NewOplogFetcher::getLastOpTimeFetched_forTest() const { return _getLastOpTimeFetched(); } @@ -809,6 +850,7 @@ void NewOplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& call return; } + _setMetadataWriterAndReader(); _createNewCursor(true /* initialFind */); while (true) { @@ -874,7 +916,7 @@ Status NewOplogFetcher::_connect() { uassertStatusOK(_conn->connect(_source, "OplogFetcher")); uassertStatusOK(replAuthenticate(_conn.get()) .withContext(str::stream() - << "OplogFecther failed to authenticate to " + << "OplogFetcher failed to authenticate to " << _source)); // Reset any state needed to track restarts on successful connection. _oplogFetcherRestartDecision->fetchSuccessful(this); @@ -890,6 +932,23 @@ Status NewOplogFetcher::_connect() { return connectStatus; } +void NewOplogFetcher::_setMetadataWriterAndReader() { + invariant(_conn); + + _conn->setRequestMetadataWriter([this](OperationContext* opCtx, BSONObjBuilder* metadataBob) { + *metadataBob << rpc::kReplSetMetadataFieldName << 1; + *metadataBob << rpc::kOplogQueryMetadataFieldName << 1; + metadataBob->appendElements(ReadPreferenceSetting::secondaryPreferredMetadata()); + return Status::OK(); + }); + + _conn->setReplyMetadataReader( + [this](OperationContext* opCtx, const BSONObj& metadataObj, StringData source) { + _metadataObj = metadataObj.getOwned(); + return Status::OK(); + }); +} + BSONObj NewOplogFetcher::_makeFindQuery(long long findTimeout) const { BSONObjBuilder queryBob; @@ -1039,15 +1098,21 @@ Status NewOplogFetcher::_onSuccessfulBatch(const Documents& documents) { LOG(2) << "oplog fetcher read 0 operations from remote oplog"; } - // TODO SERVER-45470: parse metadata. + auto oqMetadataResult = parseOplogQueryMetadata(_metadataObj); + if (!oqMetadataResult.isOK()) { + error() << "invalid oplog query metadata from sync source " << _source << ": " + << oqMetadataResult.getStatus() << ": " << _metadataObj; + return oqMetadataResult.getStatus(); + } + auto oqMetadata = oqMetadataResult.getValue(); // This lastFetched value is the last OpTime from the previous batch. auto lastFetched = _getLastOpTimeFetched(); if (_firstBatch) { - // TODO SERVER-45470: use metadata to populate remoteRBID and remoteLastApplied. - auto remoteRBID = boost::none; - auto remoteLastApplied = boost::none; + auto remoteRBID = oqMetadata ? boost::make_optional(oqMetadata->getRBID()) : boost::none; + auto remoteLastApplied = + oqMetadata ? boost::make_optional(oqMetadata->getLastOpApplied()) : boost::none; auto status = checkRemoteOplogStart(documents, lastFetched, remoteLastApplied, @@ -1083,7 +1148,25 @@ Status NewOplogFetcher::_onSuccessfulBatch(const Documents& documents) { } auto info = validateResult.getValue(); - // TODO SERVER-45470: Process replset metadata. + // Process replset metadata. It is important that this happen after we've validated the + // first batch, so we don't progress our knowledge of the commit point from a + // response that triggers a rollback. + rpc::ReplSetMetadata replSetMetadata; + bool receivedReplMetadata = _metadataObj.hasElement(rpc::kReplSetMetadataFieldName); + if (receivedReplMetadata) { + auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(_metadataObj); + if (!metadataResult.isOK()) { + error() << "invalid replication metadata from sync source " << _source << ": " + << metadataResult.getStatus() << ": " << _metadataObj; + return metadataResult.getStatus(); + } + replSetMetadata = metadataResult.getValue(); + + // We will only ever have OplogQueryMetadata if we have ReplSetMetadata, so it is safe + // to call processMetadata() in this if block. + invariant(oqMetadata); + _dataReplicatorExternalState->processMetadata(replSetMetadata, *oqMetadata); + } // Increment stats. We read all of the docs in the query. opsReadStats.increment(info.networkDocumentCount); @@ -1100,7 +1183,24 @@ Status NewOplogFetcher::_onSuccessfulBatch(const Documents& documents) { // of this fetcher. _startingPoint = StartingPoint::kSkipFirstDoc; - // TODO SERVER-45470: check for invalid sync source. + if (_dataReplicatorExternalState->shouldStopFetching(_source, replSetMetadata, oqMetadata)) { + str::stream errMsg; + errMsg << "sync source " << _source.toString(); + errMsg << " (config version: " << replSetMetadata.getConfigVersion(); + // If OplogQueryMetadata was provided, its values were used to determine if we should + // stop fetching from this sync source. + if (oqMetadata) { + errMsg << "; last applied optime: " << oqMetadata->getLastOpApplied().toString(); + errMsg << "; sync source index: " << oqMetadata->getSyncSourceIndex(); + errMsg << "; primary index: " << oqMetadata->getPrimaryIndex(); + } else { + errMsg << "; last visible optime: " << replSetMetadata.getLastOpVisible().toString(); + errMsg << "; sync source index: " << replSetMetadata.getSyncSourceIndex(); + errMsg << "; primary index: " << replSetMetadata.getPrimaryIndex(); + } + errMsg << ") is no longer valid"; + return Status(ErrorCodes::InvalidSyncSource, errMsg); + } // We have now processed the batch and should move forward our view of _lastFetched. if (documents.size() > 0) { diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index 0c64c44ede3..69af79c6c6f 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -391,7 +391,7 @@ public: /** * Prints out the status and settings of the oplog fetcher. */ - std::string toString() const; + std::string toString(); // ================== Test support API =================== @@ -481,6 +481,11 @@ private: Status _connect(); /** + * Sets the RequestMetadataWriter and ReplyMetadataReader on the connection. + */ + void _setMetadataWriterAndReader(); + + /** * Executes a `find` query on the sync source's oplog and establishes a tailable, awaitData, * exhaust cursor. * diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index ced49a76235..063f32f0762 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -1092,6 +1092,14 @@ bool blockedOnNetworkSoon(MockDBClientConnection* conn) { return false; } +void validateMetadataRequest(OpMsg msg) { + ASSERT_EQUALS(1, msg.body.getIntField("$replData")); + ASSERT_EQUALS(1, msg.body.getIntField("$oplogQueryData")); + ASSERT_BSONOBJ_EQ(BSON("mode" + << "secondaryPreferred"), + msg.body.getObjectField("$readPreference")); +} + void validateFindCommand(Message m, OpTime lastFetched, int findTimeout) { auto msg = mongo::OpMsg::parse(m); ASSERT_EQ(mongo::StringData(msg.body.firstElement().fieldName()), "find"); @@ -1106,7 +1114,8 @@ void validateFindCommand(Message m, OpTime lastFetched, int findTimeout) { << "local" << "afterClusterTime" << Timestamp(0, 1)), msg.body.getObjectField("readConcern")); - // TODO SERVER-45470: Test the metadata sent. + + validateMetadataRequest(msg); } void validateGetMoreCommand(Message m, @@ -1135,6 +1144,8 @@ void validateGetMoreCommand(Message m, } else { ASSERT_FALSE(OpMsg::isFlagSet(m, OpMsg::kExhaustSupported)); } + + validateMetadataRequest(msg); } // Simulate a response to a single outgoing client request and return the client request. Use this @@ -1180,7 +1191,9 @@ protected: static const int rbid = 2; static const int primaryIndex = 2; static const int syncSourceIndex = 2; + static const rpc::OplogQueryMetadata oqMetadata; static const rpc::OplogQueryMetadata staleOqMetadata; + static const rpc::ReplSetMetadata replSetMetadata; // 16MB max batch size / 12 byte min doc size * 10 (for good measure) = defaultBatchSize to use. const int defaultBatchSize = (16 * 1024 * 1024) / 12 * 10; @@ -1204,7 +1217,8 @@ protected: std::unique_ptr<ShutdownState> processSingleBatch(const Message& response, bool shouldShutdown = false, - bool requireFresherSyncSource = true); + bool requireFresherSyncSource = true, + bool lastFetchedShouldAdvance = false); /** * Tests checkSyncSource result handling. @@ -1226,12 +1240,19 @@ protected: std::unique_ptr<MockRemoteDBServer> _mockServer; }; -const OpTime NewOplogFetcherTest::remoteNewerOpTime = OpTime(Timestamp(124, 1), 2); +const int NewOplogFetcherTest::rbid; +const OpTime NewOplogFetcherTest::remoteNewerOpTime = OpTime(Timestamp(1000, 1), 2); +const rpc::OplogQueryMetadata NewOplogFetcherTest::oqMetadata = rpc::OplogQueryMetadata( + {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, primaryIndex, syncSourceIndex); + const OpTime NewOplogFetcherTest::staleOpTime = OpTime(Timestamp(1, 1), 0); const Date_t NewOplogFetcherTest::staleWallTime = Date_t() + Seconds(staleOpTime.getSecs()); const rpc::OplogQueryMetadata NewOplogFetcherTest::staleOqMetadata = rpc::OplogQueryMetadata( {staleOpTime, staleWallTime}, staleOpTime, rbid, primaryIndex, syncSourceIndex); +const rpc::ReplSetMetadata NewOplogFetcherTest::replSetMetadata = + rpc::ReplSetMetadata(1, OpTimeAndWallTime(), OpTime(), 1, OID(), primaryIndex, syncSourceIndex); + void NewOplogFetcherTest::setUp() { executor::ThreadPoolExecutorTest::setUp(); launchExecutorThread(); @@ -1296,7 +1317,7 @@ std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::makeOplogFetcherWithDiffer source, _createConfig(), std::make_unique<NewOplogFetcher::OplogFetcherRestartDecisionDefault>(numRestarts), - -1, + rbid, requireFresherSyncSource, dataReplicatorExternalState.get(), enqueueDocumentsFn, @@ -1312,7 +1333,10 @@ std::unique_ptr<NewOplogFetcher> NewOplogFetcherTest::makeOplogFetcherWithDiffer } std::unique_ptr<ShutdownState> NewOplogFetcherTest::processSingleBatch( - const Message& response, bool shouldShutdown, bool requireFresherSyncSource) { + const Message& response, + bool shouldShutdown, + bool requireFresherSyncSource, + bool lastFetchedShouldAdvance) { auto shutdownState = std::make_unique<ShutdownState>(); // Create an oplog fetcher with no retries. @@ -1335,6 +1359,10 @@ std::unique_ptr<ShutdownState> NewOplogFetcherTest::processSingleBatch( } oplogFetcher->join(); + if (!lastFetchedShouldAdvance) { + ASSERT_EQUALS(lastFetched, oplogFetcher->getLastOpTimeFetched_forTest()); + } + return shutdownState; } @@ -1350,8 +1378,7 @@ void NewOplogFetcherTest::testSyncSourceChecking( dataReplicatorExternalState->shouldStopFetchingResult = true; auto shutdownState = - processSingleBatch(makeFirstBatch(0, {firstEntry, secondEntry, thirdEntry}, metadataObj), - true /* shouldShutdown */); + processSingleBatch(makeFirstBatch(0, {firstEntry, secondEntry, thirdEntry}, metadataObj)); ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, shutdownState->getStatus()); } @@ -1520,7 +1547,7 @@ TEST_F(NewOplogFetcherTest, CursorId cursorId = 22LL; auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); - auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); // Update lastFetched before it is updated by getting the next batch. lastFetched = oplogFetcher->getLastOpTimeFetched_forTest(); @@ -1655,7 +1682,7 @@ TEST_F( CursorId cursorId = 22LL; auto firstEntry = makeNoopOplogEntry(lastFetched); - auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); auto conn = oplogFetcher->getDBClientConnection_forTest(); @@ -1692,6 +1719,166 @@ TEST_F(NewOplogFetcherTest, AwaitDataTimeoutSmallerWhenFailPointSet) { failPoint->setMode(FailPoint::off); } +TEST_F(NewOplogFetcherTest, InvalidReplSetMetadataInResponseStopsTheOplogFetcher) { + CursorId cursorId = 22LL; + auto entry = makeNoopOplogEntry(lastFetched); + auto metadataObj = + BSON(rpc::kReplSetMetadataFieldName << BSON("invalid_repl_metadata_field" << 1)); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, + processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus()); +} + +TEST_F(NewOplogFetcherTest, InvalidOplogQueryMetadataInResponseStopsTheOplogFetcher) { + CursorId cursorId = 22LL; + auto entry = makeNoopOplogEntry(lastFetched); + auto metadataObj = + BSON(rpc::kOplogQueryMetadataFieldName << BSON("invalid_oq_metadata_field" << 1)); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, + processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus()); +} + +DEATH_TEST_F(NewOplogFetcherTest, + ValidMetadataInResponseWithoutOplogMetadataInvariants, + "Invariant failure oqMetadata") { + CursorId cursorId = 22LL; + auto entry = makeNoopOplogEntry(lastFetched); + auto metadataObj = makeOplogBatchMetadata(replSetMetadata, boost::none); + + processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj)); +} + +TEST_F(NewOplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMetadataFn) { + CursorId cursorId = 0LL; + auto entry = makeNoopOplogEntry(lastFetched); + auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oqMetadata); + + ASSERT_OK(processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus()); + ASSERT_TRUE(dataReplicatorExternalState->metadataWasProcessed); + ASSERT_EQUALS(replSetMetadata.getPrimaryIndex(), + dataReplicatorExternalState->replMetadataProcessed.getPrimaryIndex()); + ASSERT_EQUALS(oqMetadata.getPrimaryIndex(), + dataReplicatorExternalState->oqMetadataProcessed.getPrimaryIndex()); +} + +TEST_F(NewOplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack) { + CursorId cursorId = 22LL; + auto entry = makeNoopOplogEntry(lastFetched); + + rpc::OplogQueryMetadata oplogQueryMetadata( + {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid + 1, primaryIndex, syncSourceIndex); + auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oplogQueryMetadata); + + ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, + processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus()); + + ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); + ASSERT(lastEnqueuedDocuments.empty()); +} + +TEST_F(NewOplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehind) { + CursorId cursorId = 22LL; + auto entry = makeNoopOplogEntry(staleOpTime); + auto metadataObj = makeOplogBatchMetadata(replSetMetadata, staleOqMetadata); + + ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, + processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus()); + + ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); + ASSERT(lastEnqueuedDocuments.empty()); +} + +TEST_F(NewOplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead) { + CursorId cursorId = 22LL; + auto entry = makeNoopOplogEntry(lastFetched); + + rpc::OplogQueryMetadata oplogQueryMetadata( + {staleOpTime, staleWallTime}, lastFetched, rbid, primaryIndex, syncSourceIndex); + auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oplogQueryMetadata); + + ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, + processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus()); + + ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); + ASSERT(lastEnqueuedDocuments.empty()); +} + +TEST_F(NewOplogFetcherTest, + MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehindWithoutRequiringFresherSyncSource) { + CursorId cursorId = 22LL; + auto entry = makeNoopOplogEntry(staleOpTime); + auto metadataObj = makeOplogBatchMetadata(replSetMetadata, staleOqMetadata); + + ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, + processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj), + false /* shouldShutdown */, + false /* requireFresherSyncSource */) + ->getStatus()); + + ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); + ASSERT(lastEnqueuedDocuments.empty()); +} + +TEST_F(NewOplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsCurrentButMetadataIsStale) { + // This tests the case where the sync source metadata is behind us but we get a document which + // is equal to us. Since that means the metadata is stale and can be ignored, we should accept + // this sync source. + CursorId cursorId = 0LL; + auto metadataObj = makeOplogBatchMetadata(replSetMetadata, staleOqMetadata); + auto entry = makeNoopOplogEntry(lastFetched); + + ASSERT_OK(processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj), + false /* shouldShutdown */, + false /* requireFresherSyncSource */) + ->getStatus()); + + ASSERT(dataReplicatorExternalState->metadataWasProcessed); +} + +TEST_F(NewOplogFetcherTest, + MetadataAndBatchAreProcessedWhenSyncSourceIsNotAheadWithoutRequiringFresherSyncSource) { + CursorId cursorId = 0LL; + rpc::OplogQueryMetadata oplogQueryMetadata( + {staleOpTime, staleWallTime}, lastFetched, rbid, 2, 2); + auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oplogQueryMetadata); + auto entry = makeNoopOplogEntry(lastFetched); + + ASSERT_OK(processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj), + false /* shouldShutdown */, + false /* requireFresherSyncSource */) + ->getStatus()); + ASSERT(dataReplicatorExternalState->metadataWasProcessed); +} + +TEST_F(NewOplogFetcherTest, + MetadataWithoutOplogQueryMetadataIsNotProcessedOnBatchThatTriggersRollback) { + CursorId cursorId = 22LL; + auto metadataObj = makeOplogBatchMetadata(replSetMetadata, boost::none); + auto entry = makeNoopOplogEntry(Seconds(456)); + + ASSERT_EQUALS(ErrorCodes::OplogStartMissing, + processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus()); + ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); +} + +TEST_F(NewOplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) { + CursorId cursorId = 22LL; + auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oqMetadata); + auto entry = makeNoopOplogEntry(Seconds(456)); + + ASSERT_EQUALS( + ErrorCodes::OplogStartMissing, + processSingleBatch(makeFirstBatch(cursorId, {entry}, {metadataObj}))->getStatus()); + ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); +} + +TEST_F(NewOplogFetcherTest, EmptyMetadataIsNotProcessed) { + CursorId cursorId = 0LL; + auto entry = makeNoopOplogEntry(lastFetched); + + ASSERT_OK(processSingleBatch(makeFirstBatch(cursorId, {entry}, {}))->getStatus()); + ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); +} + TEST_F(NewOplogFetcherTest, FailingInitialCreateNewCursorNoRetriesShutsDownOplogFetcher) { ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, processSingleBatch(Message())->getStatus()); } @@ -1745,7 +1932,7 @@ TEST_F(NewOplogFetcherTest, DontRecreateNewCursorAfterFailedBatchNoRetries) { CursorId cursorId = 22LL; auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); - auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); auto firstBatch = {firstEntry, secondEntry}; // Update lastFetched before it is updated by getting the next batch. @@ -1783,7 +1970,7 @@ TEST_F(NewOplogFetcherTest, FailCreateNewCursorAfterFailedBatchRetriesShutsDownO CursorId cursorId = 22LL; auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); - auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); auto firstBatch = {firstEntry, secondEntry}; // Update lastFetched before it is updated by getting the next batch. @@ -1853,7 +2040,7 @@ TEST_F(NewOplogFetcherTest, SuccessfullyRecreateCursorAfterFailedBatch) { CursorId cursorId = 22LL; auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(124), 0}, lastFetched.getTerm()}); - auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); auto firstBatch = {firstEntry, secondEntry}; // Update lastFetched before it is updated by getting the next batch. @@ -1997,7 +2184,7 @@ TEST_F(NewOplogFetcherTest, SuccessfulBatchResetsNumRestarts) { CursorId cursorId = 22LL; auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); - auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); auto firstBatch = {firstEntry, secondEntry}; // Update lastFetched before it is updated by getting the next batch. @@ -2070,7 +2257,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherWorksWithoutExhaust) { CursorId cursorId = 22LL; auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); - auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); auto firstBatch = {firstEntry, secondEntry}; // Update lastFetched before it is updated by getting the next batch. @@ -2149,7 +2336,7 @@ TEST_F(NewOplogFetcherTest, CursorIsDeadShutsDownOplogFetcherWithSuccessfulStatu CursorId cursorId = 0LL; auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(124), 0}, lastFetched.getTerm()}); - auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); auto firstBatch = {firstEntry, secondEntry}; // Creating the cursor will succeed, but the oplog fetcher will shut down after receiving this @@ -2181,7 +2368,7 @@ TEST_F(NewOplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithOplogStartMissin TEST_F(NewOplogFetcherTest, MissingOpTimeInFirstDocumentCausesOplogFetcherToStopWithInvalidBSONError) { CursorId cursorId = 22LL; - auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); ASSERT_EQUALS( ErrorCodes::InvalidBSON, processSingleBatch(makeFirstBatch(cursorId, {BSONObj()}, metadataObj))->getStatus()); @@ -2192,7 +2379,7 @@ TEST_F( LastOpTimeFetchedDoesNotMatchFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) { CursorId cursorId = 22LL; auto entry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); - auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); ASSERT_EQUALS(ErrorCodes::OplogStartMissing, processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus()); } @@ -2201,7 +2388,7 @@ TEST_F(NewOplogFetcherTest, MissingOpTimeInSecondDocumentOfFirstBatchCausesOplogFetcherToStopWithNoSuchKey) { CursorId cursorId = 22LL; auto firstEntry = makeNoopOplogEntry(lastFetched); - auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); ASSERT_EQUALS( ErrorCodes::NoSuchKey, processSingleBatch(makeFirstBatch(cursorId, @@ -2215,7 +2402,7 @@ TEST_F(NewOplogFetcherTest, TEST_F(NewOplogFetcherTest, TimestampsNotAdvancingInBatchCausesOplogFetcherStopWithOplogOutOfOrder) { CursorId cursorId = 22LL; - auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, processSingleBatch(makeFirstBatch(cursorId, {makeNoopOplogEntry(lastFetched), @@ -2232,11 +2419,13 @@ TEST_F(NewOplogFetcherTest, auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()}); - auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); auto shutdownState = processSingleBatch( makeFirstBatch(cursorId, {firstEntry, secondEntry, thirdEntry}, metadataObj), - true /* shouldShutdown */); + true /* shouldShutdown */, + true /* requireFresherSyncSource */, + true /* lastFetchedShouldAdvance */); ASSERT_EQUALS(2U, lastEnqueuedDocuments.size()); ASSERT_BSONOBJ_EQ(secondEntry, lastEnqueuedDocuments[0]); @@ -2291,7 +2480,7 @@ TEST_F(NewOplogFetcherTest, CursorId cursorId = 22LL; auto firstEntry = makeNoopOplogEntry({{Seconds(123), 0}, lastFetched.getTerm()}); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); - auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); // Update lastFetched before it is updated by getting the next batch. lastFetched = oplogFetcher->getLastOpTimeFetched_forTest(); @@ -2366,7 +2555,7 @@ TEST_F(NewOplogFetcherTest, auto firstEntry = makeNoopOplogEntry({{Seconds(123), 0}, lastFetched.getTerm()}); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()}); - auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); // Update lastFetched before it is updated by getting the next batch. lastFetched = oplogFetcher->getLastOpTimeFetched_forTest(); @@ -2409,7 +2598,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromEnqueueDocum CursorId cursorId = 22LL; auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); - auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); enqueueDocumentsFn = [](NewOplogFetcher::Documents::const_iterator, NewOplogFetcher::Documents::const_iterator, @@ -2422,6 +2611,38 @@ TEST_F(NewOplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromEnqueueDocum ASSERT_EQ(Status(ErrorCodes::InternalError, "my custom error"), shutdownState->getStatus()); } +TEST_F(NewOplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFetcher) { + testSyncSourceChecking(boost::none, boost::none); + + // Sync source optime and "hasSyncSource" are not available if the response does not + // contain metadata. + ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked); + ASSERT_EQUALS(OpTime(), dataReplicatorExternalState->syncSourceLastOpTime); + ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource); +} + +TEST_F(NewOplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetcher) { + testSyncSourceChecking(replSetMetadata, oqMetadata); + + // Sync source optime and "hasSyncSource" can be set if the respone contains metadata. + ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked); + ASSERT_EQUALS(oqMetadata.getLastOpApplied(), dataReplicatorExternalState->syncSourceLastOpTime); + ASSERT_TRUE(dataReplicatorExternalState->syncSourceHasSyncSource); +} + +TEST_F(NewOplogFetcherTest, + FailedSyncSourceCheckWithSyncSourceHavingNoSyncSourceStopsTheOplogFetcher) { + rpc::OplogQueryMetadata oplogQueryMetadata( + {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, primaryIndex, -1); + testSyncSourceChecking(replSetMetadata, oplogQueryMetadata); + + // Sync source "hasSyncSource" is derived from metadata. + ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked); + ASSERT_EQUALS(oplogQueryMetadata.getLastOpApplied(), + dataReplicatorExternalState->syncSourceLastOpTime); + ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource); +} + TEST_F(NewOplogFetcherTest, ValidateDocumentsReturnsNoSuchKeyIfTimestampIsNotFoundInAnyDocument) { auto firstEntry = makeNoopOplogEntry(Seconds(123)); auto secondEntry = BSON("o" << BSON("msg" @@ -2646,7 +2867,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherResetsNumRestartsOnSuccessfulConnection) CursorId cursorId = 0LL; auto firstEntry = makeNoopOplogEntry(lastFetched); - auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); // Allow the cursor re-initialization to succeed. But the OplogFetcher will shut down with an OK // status after receiving this batch because the cursor id is 0. @@ -2671,7 +2892,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherCanAutoReconnect) { auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(124), 0}, lastFetched.getTerm()}); - auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); // Simulate closing the cursor and the OplogFetcher should exit with an OK status. processSingleRequestResponse(conn, makeFirstBatch(0LL, {firstEntry}, metadataObj)); @@ -2711,7 +2932,7 @@ TEST_F(NewOplogFetcherTest, DisconnectsOnErrorsDuringExhaustStream) { CursorId cursorId = 22LL; auto firstEntry = makeNoopOplogEntry(lastFetched); auto secondEntry = makeNoopOplogEntry({{Seconds(124), 0}, lastFetched.getTerm()}); - auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); auto conn = oplogFetcher->getDBClientConnection_forTest(); // First batch for the initial find command. @@ -2757,17 +2978,17 @@ TEST_F(NewOplogFetcherTest, GetMoreEmptyBatch) { CursorId cursorId = 22LL; auto firstEntry = makeNoopOplogEntry(lastFetched); - auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + auto metadataObj = makeOplogBatchMetadata(boost::none, oqMetadata); auto conn = oplogFetcher->getDBClientConnection_forTest(); // Creating the cursor will succeed. - auto m = - processSingleRequestResponse(conn, makeFirstBatch(cursorId, {firstEntry}, metadataObj)); + auto m = processSingleRequestResponse( + conn, makeFirstBatch(cursorId, {firstEntry}, metadataObj), true); // Empty batch from first getMore. processSingleRequestResponse( - conn, makeSubsequentBatch(cursorId, {}, metadataObj, true /* moreToCome */)); + conn, makeSubsequentBatch(cursorId, {}, metadataObj, true /* moreToCome */), true); // Terminating empty batch from exhaust stream with cursorId 0. processSingleExhaustResponse(oplogFetcher->getDBClientConnection_forTest(), |