diff options
author | Lingzhi Deng <lingzhi.deng@mongodb.com> | 2020-02-09 23:47:48 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-11 03:09:41 +0000 |
commit | 9f40a2e96b3ad0e5fc84956c153a938ed531e91f (patch) | |
tree | f74a00b47f9e094f79424126273fb07bd22c48c2 /src/mongo/db | |
parent | 6cc6194661eced3b02a5a853c382276bf258871c (diff) | |
download | mongo-9f40a2e96b3ad0e5fc84956c153a938ed531e91f.tar.gz |
SERVER-46053: NewOplogFetcher should send term and lastCommittedOpTime in getMore requests
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_test.cpp | 67 |
2 files changed, 68 insertions, 5 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 087bb34bbaa..f6006f67ed2 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -960,6 +960,12 @@ StatusWith<NewOplogFetcher::Documents> NewOplogFetcher::_getNextBatch() { // the awaitData timeout with a network buffer. _setSocketTimeout(durationCount<Milliseconds>(_awaitDataTimeout)); } else { + auto lastCommittedWithCurrentTerm = + _dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime(); + if (lastCommittedWithCurrentTerm.value != OpTime::kUninitializedTerm) { + _cursor->setCurrentTermAndLastCommittedOpTime(lastCommittedWithCurrentTerm.value, + lastCommittedWithCurrentTerm.opTime); + } _cursor->more(); } diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index ed5c69a4b0f..ced49a76235 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -1109,11 +1109,27 @@ void validateFindCommand(Message m, OpTime lastFetched, int findTimeout) { // TODO SERVER-45470: Test the metadata sent. } -void validateGetMoreCommand(Message m, int cursorId, int timeout, bool exhaustSupported = true) { +void validateGetMoreCommand(Message m, + int cursorId, + int timeout, + OpTimeWithTerm lastCommittedWithCurrentTerm, + bool exhaustSupported = true) { auto msg = mongo::OpMsg::parse(m); ASSERT_EQ(mongo::StringData(msg.body.firstElement().fieldName()), "getMore"); ASSERT_EQ(cursorId, msg.body.getIntField("getMore")); ASSERT_EQUALS(timeout, msg.body.getIntField("maxTimeMS")); + + // In unittests, lastCommittedWithCurrentTerm should always be default to valid and non-null. + // The case when currentTerm is kUninitializedTerm is tested separately in + // GetMoreQueryDoesNotContainTermIfGetCurrentTermAndLastCommittedOpTimeReturnsUninitializedTerm. + invariant(lastCommittedWithCurrentTerm.value != OpTime::kUninitializedTerm); + invariant(!lastCommittedWithCurrentTerm.opTime.isNull()); + ASSERT_EQUALS(lastCommittedWithCurrentTerm.value, msg.body["term"].numberLong()); + ASSERT_EQUALS(lastCommittedWithCurrentTerm.opTime.getTimestamp(), + msg.body["lastKnownCommittedOpTime"]["ts"].timestamp()); + ASSERT_EQUALS(lastCommittedWithCurrentTerm.opTime.getTerm(), + msg.body["lastKnownCommittedOpTime"]["t"].numberLong()); + if (exhaustSupported) { ASSERT_TRUE(OpMsg::isFlagSet(m, OpMsg::kExhaustSupported)); } else { @@ -1627,6 +1643,41 @@ TEST_F(NewOplogFetcherTest, ASSERT_FALSE(queryObj.hasField("term")); } +TEST_F( + NewOplogFetcherTest, + GetMoreQueryDoesNotContainTermIfGetCurrentTermAndLastCommittedOpTimeReturnsUninitializedTerm) { + dataReplicatorExternalState->currentTerm = OpTime::kUninitializedTerm; + + ShutdownState shutdownState; + + // Create an oplog fetcher without any retries. + auto oplogFetcher = getOplogFetcherAfterConnectionCreated(std::ref(shutdownState)); + + CursorId cursorId = 22LL; + auto firstEntry = makeNoopOplogEntry(lastFetched); + auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata); + + auto conn = oplogFetcher->getDBClientConnection_forTest(); + + // First batch for the initial find command. + processSingleRequestResponse(conn, makeFirstBatch(cursorId, {firstEntry}, metadataObj), true); + + // Second and terminating batch (with cursorId 0) for the getMore command. + auto secondEntry = makeNoopOplogEntry({{Seconds(124), 0}, lastFetched.getTerm()}); + auto m = processSingleRequestResponse( + conn, makeSubsequentBatch(0LL, {secondEntry}, metadataObj, false /* moreToCome */), false); + + auto msg = mongo::OpMsg::parse(m); + ASSERT_EQ(mongo::StringData(msg.body.firstElement().fieldName()), "getMore"); + // Test that the getMore query does not contain the term or the lastKnownCommittedOpTime field. + ASSERT_FALSE(msg.body.hasField("term")); + ASSERT_FALSE(msg.body.hasField("lastKnownCommittedOpTime")); + + oplogFetcher->join(); + + ASSERT_OK(shutdownState.getStatus()); +} + TEST_F(NewOplogFetcherTest, AwaitDataTimeoutShouldEqualHalfElectionTimeout) { auto config = _createConfig(); auto timeout = makeOplogFetcher()->getAwaitDataTimeout_forTest(); @@ -1836,8 +1887,10 @@ TEST_F(NewOplogFetcherTest, SuccessfullyRecreateCursorAfterFailedBatch) { mongo::Status{mongo::ErrorCodes::NetworkTimeout, "Fake socket timeout"}, true); - validateGetMoreCommand( - m, cursorId, durationCount<Milliseconds>(oplogFetcher->getAwaitDataTimeout_forTest())); + validateGetMoreCommand(m, + cursorId, + durationCount<Milliseconds>(oplogFetcher->getAwaitDataTimeout_forTest()), + dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime()); // Check the socket timeout is equal to the retried find max time plus the network buffer. ASSERT_EQUALS(retriedMaxFindTimeDouble + oplogNetworkTimeoutBufferSeconds.load(), @@ -1879,8 +1932,10 @@ TEST_F(NewOplogFetcherTest, SuccessfullyRecreateCursorAfterFailedBatch) { makeSubsequentBatch(cursorId, thirdBatch, metadataObj, true /* moreToCome */), true); - validateGetMoreCommand( - m, cursorId, durationCount<Milliseconds>(oplogFetcher->getAwaitDataTimeout_forTest())); + validateGetMoreCommand(m, + cursorId, + durationCount<Milliseconds>(oplogFetcher->getAwaitDataTimeout_forTest()), + dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime()); // Update lastFetched since it should have been updated after getting the last batch. lastFetched = oplogFetcher->getLastOpTimeFetched_forTest(); @@ -2050,6 +2105,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherWorksWithoutExhaust) { validateGetMoreCommand(m, cursorId, durationCount<Milliseconds>(oplogFetcher->getAwaitDataTimeout_forTest()), + dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime(), false /* exhaustSupported */); // Update lastFetched since it should have been updated after getting the last batch. @@ -2069,6 +2125,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherWorksWithoutExhaust) { validateGetMoreCommand(m, cursorId, durationCount<Milliseconds>(oplogFetcher->getAwaitDataTimeout_forTest()), + dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime(), false /* exhaustSupported */); // Update lastFetched since it should have been updated after getting the last batch. |