summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2020-02-09 23:47:48 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-11 03:09:41 +0000
commit9f40a2e96b3ad0e5fc84956c153a938ed531e91f (patch)
treef74a00b47f9e094f79424126273fb07bd22c48c2 /src/mongo/db
parent6cc6194661eced3b02a5a853c382276bf258871c (diff)
downloadmongo-9f40a2e96b3ad0e5fc84956c153a938ed531e91f.tar.gz
SERVER-46053: NewOplogFetcher should send term and lastCommittedOpTime in getMore requests
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp6
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp67
2 files changed, 68 insertions, 5 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index 087bb34bbaa..f6006f67ed2 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -960,6 +960,12 @@ StatusWith<NewOplogFetcher::Documents> NewOplogFetcher::_getNextBatch() {
// the awaitData timeout with a network buffer.
_setSocketTimeout(durationCount<Milliseconds>(_awaitDataTimeout));
} else {
+ auto lastCommittedWithCurrentTerm =
+ _dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime();
+ if (lastCommittedWithCurrentTerm.value != OpTime::kUninitializedTerm) {
+ _cursor->setCurrentTermAndLastCommittedOpTime(lastCommittedWithCurrentTerm.value,
+ lastCommittedWithCurrentTerm.opTime);
+ }
_cursor->more();
}
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index ed5c69a4b0f..ced49a76235 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -1109,11 +1109,27 @@ void validateFindCommand(Message m, OpTime lastFetched, int findTimeout) {
// TODO SERVER-45470: Test the metadata sent.
}
-void validateGetMoreCommand(Message m, int cursorId, int timeout, bool exhaustSupported = true) {
+void validateGetMoreCommand(Message m,
+ int cursorId,
+ int timeout,
+ OpTimeWithTerm lastCommittedWithCurrentTerm,
+ bool exhaustSupported = true) {
auto msg = mongo::OpMsg::parse(m);
ASSERT_EQ(mongo::StringData(msg.body.firstElement().fieldName()), "getMore");
ASSERT_EQ(cursorId, msg.body.getIntField("getMore"));
ASSERT_EQUALS(timeout, msg.body.getIntField("maxTimeMS"));
+
+ // In unittests, lastCommittedWithCurrentTerm should always be default to valid and non-null.
+ // The case when currentTerm is kUninitializedTerm is tested separately in
+ // GetMoreQueryDoesNotContainTermIfGetCurrentTermAndLastCommittedOpTimeReturnsUninitializedTerm.
+ invariant(lastCommittedWithCurrentTerm.value != OpTime::kUninitializedTerm);
+ invariant(!lastCommittedWithCurrentTerm.opTime.isNull());
+ ASSERT_EQUALS(lastCommittedWithCurrentTerm.value, msg.body["term"].numberLong());
+ ASSERT_EQUALS(lastCommittedWithCurrentTerm.opTime.getTimestamp(),
+ msg.body["lastKnownCommittedOpTime"]["ts"].timestamp());
+ ASSERT_EQUALS(lastCommittedWithCurrentTerm.opTime.getTerm(),
+ msg.body["lastKnownCommittedOpTime"]["t"].numberLong());
+
if (exhaustSupported) {
ASSERT_TRUE(OpMsg::isFlagSet(m, OpMsg::kExhaustSupported));
} else {
@@ -1627,6 +1643,41 @@ TEST_F(NewOplogFetcherTest,
ASSERT_FALSE(queryObj.hasField("term"));
}
+TEST_F(
+ NewOplogFetcherTest,
+ GetMoreQueryDoesNotContainTermIfGetCurrentTermAndLastCommittedOpTimeReturnsUninitializedTerm) {
+ dataReplicatorExternalState->currentTerm = OpTime::kUninitializedTerm;
+
+ ShutdownState shutdownState;
+
+ // Create an oplog fetcher without any retries.
+ auto oplogFetcher = getOplogFetcherAfterConnectionCreated(std::ref(shutdownState));
+
+ CursorId cursorId = 22LL;
+ auto firstEntry = makeNoopOplogEntry(lastFetched);
+ auto metadataObj = makeOplogBatchMetadata(boost::none, staleOqMetadata);
+
+ auto conn = oplogFetcher->getDBClientConnection_forTest();
+
+ // First batch for the initial find command.
+ processSingleRequestResponse(conn, makeFirstBatch(cursorId, {firstEntry}, metadataObj), true);
+
+ // Second and terminating batch (with cursorId 0) for the getMore command.
+ auto secondEntry = makeNoopOplogEntry({{Seconds(124), 0}, lastFetched.getTerm()});
+ auto m = processSingleRequestResponse(
+ conn, makeSubsequentBatch(0LL, {secondEntry}, metadataObj, false /* moreToCome */), false);
+
+ auto msg = mongo::OpMsg::parse(m);
+ ASSERT_EQ(mongo::StringData(msg.body.firstElement().fieldName()), "getMore");
+ // Test that the getMore query does not contain the term or the lastKnownCommittedOpTime field.
+ ASSERT_FALSE(msg.body.hasField("term"));
+ ASSERT_FALSE(msg.body.hasField("lastKnownCommittedOpTime"));
+
+ oplogFetcher->join();
+
+ ASSERT_OK(shutdownState.getStatus());
+}
+
TEST_F(NewOplogFetcherTest, AwaitDataTimeoutShouldEqualHalfElectionTimeout) {
auto config = _createConfig();
auto timeout = makeOplogFetcher()->getAwaitDataTimeout_forTest();
@@ -1836,8 +1887,10 @@ TEST_F(NewOplogFetcherTest, SuccessfullyRecreateCursorAfterFailedBatch) {
mongo::Status{mongo::ErrorCodes::NetworkTimeout, "Fake socket timeout"},
true);
- validateGetMoreCommand(
- m, cursorId, durationCount<Milliseconds>(oplogFetcher->getAwaitDataTimeout_forTest()));
+ validateGetMoreCommand(m,
+ cursorId,
+ durationCount<Milliseconds>(oplogFetcher->getAwaitDataTimeout_forTest()),
+ dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime());
// Check the socket timeout is equal to the retried find max time plus the network buffer.
ASSERT_EQUALS(retriedMaxFindTimeDouble + oplogNetworkTimeoutBufferSeconds.load(),
@@ -1879,8 +1932,10 @@ TEST_F(NewOplogFetcherTest, SuccessfullyRecreateCursorAfterFailedBatch) {
makeSubsequentBatch(cursorId, thirdBatch, metadataObj, true /* moreToCome */),
true);
- validateGetMoreCommand(
- m, cursorId, durationCount<Milliseconds>(oplogFetcher->getAwaitDataTimeout_forTest()));
+ validateGetMoreCommand(m,
+ cursorId,
+ durationCount<Milliseconds>(oplogFetcher->getAwaitDataTimeout_forTest()),
+ dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime());
// Update lastFetched since it should have been updated after getting the last batch.
lastFetched = oplogFetcher->getLastOpTimeFetched_forTest();
@@ -2050,6 +2105,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherWorksWithoutExhaust) {
validateGetMoreCommand(m,
cursorId,
durationCount<Milliseconds>(oplogFetcher->getAwaitDataTimeout_forTest()),
+ dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime(),
false /* exhaustSupported */);
// Update lastFetched since it should have been updated after getting the last batch.
@@ -2069,6 +2125,7 @@ TEST_F(NewOplogFetcherTest, OplogFetcherWorksWithoutExhaust) {
validateGetMoreCommand(m,
cursorId,
durationCount<Milliseconds>(oplogFetcher->getAwaitDataTimeout_forTest()),
+ dataReplicatorExternalState->getCurrentTermAndLastCommittedOpTime(),
false /* exhaustSupported */);
// Update lastFetched since it should have been updated after getting the last batch.