summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2020-09-29 21:30:16 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-10-13 21:38:46 +0000
commit9b519f5f01cfa98636b061785acdaf58a3dfcf77 (patch)
treec55eda44e8422f64e1be4b90a1feebd37fb4fdfa
parent6831f81b2b84325a2ef4f8c74c71d478eeadb34a (diff)
downloadmongo-9b519f5f01cfa98636b061785acdaf58a3dfcf77.tar.gz
SERVER-51227 Make find/getMore cmd with $_requestResumeToken on oplog report latest oplog entry ts seen while generating the response batch
-rw-r--r--jstests/replsets/resume_after_against_oplog.js306
-rw-r--r--src/mongo/db/commands/find_cmd.cpp9
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp2
-rw-r--r--src/mongo/db/query/planner_access.cpp9
4 files changed, 288 insertions, 38 deletions
diff --git a/jstests/replsets/resume_after_against_oplog.js b/jstests/replsets/resume_after_against_oplog.js
index 07f32522552..fbaa5b07006 100644
--- a/jstests/replsets/resume_after_against_oplog.js
+++ b/jstests/replsets/resume_after_against_oplog.js
@@ -1,59 +1,293 @@
/**
- * Tests that using the $_resumeAfter option in a query against the oplog does not invariant.
+ * Tests the behavior of the 'postBatchResumeToken' and '$_resumeAfter' fields in 'find' and
+ * 'getMore' requests and responses on the oplog.
+ *
+ * @tags: [requires_fcv_47]
*/
(function() {
"use strict";
-const testName = "resume_after_against_oplog";
-const rst = new ReplSetTest({nodes: 1, name: testName});
+const rst = new ReplSetTest({nodes: 1});
rst.startSet();
rst.initiate();
const node = rst.getPrimary();
const dbName = "test";
-const collName = testName;
+const collName = jsTestName();
jsTestLog("Inserting some data");
// We will query the oplog for the entries corresponding to those inserts.
-const testData = [{_id: 0, ans: 42}, {_id: 1, ans: 42}];
+const testData = [{_id: 0, ans: 42}, {_id: 1, ans: 42}, {_id: 2, ans: 42}];
assert.commandWorked(node.getDB(dbName).getCollection(collName).insert(testData));
const localDb = node.getDB("local");
+const kNullTS = new Timestamp(0, 0);
+function assertExpectedResumeTokenFormat(res, isOplog = true) {
+ assert.hasFields(res.cursor, ["postBatchResumeToken"]);
+ const resumeToken = res.cursor.postBatchResumeToken;
+ assert.eq(resumeToken.hasOwnProperty("ts"), isOplog, res);
+ assert.eq(resumeToken.hasOwnProperty("$recordId"), !isOplog, res);
+ return resumeToken;
+}
+
+// ---------------------------------------------------------------------------------------
jsTestLog("Running initial query on the oplog");
-const firstRes = assert.commandWorked(localDb.runCommand({
- find: "oplog.rs",
- filter: {op: "i", "o.ans": 42},
- hint: {$natural: 1},
- batchSize: 1,
- $_requestResumeToken: true
-}));
-
-const firstDoc = firstRes.cursor.firstBatch[0];
-assert.eq(firstDoc.o._id, 0);
-assert.hasFields(firstRes.cursor, ["postBatchResumeToken"]);
-const resumeToken = firstRes.cursor.postBatchResumeToken;
-
-// Kill the cursor before attempting to resume.
-assert.commandWorked(localDb.runCommand({killCursors: "oplog.rs", cursors: [firstRes.cursor.id]}));
-
-jsTestLog("Resuming oplog collection scan from the last recordId we saw");
-const secondRes = assert.commandWorked(localDb.runCommand({
- find: "oplog.rs",
- filter: {op: "i", "o.ans": 42},
- hint: {$natural: 1},
- batchSize: 1,
- $_requestResumeToken: true,
- $_resumeAfter: resumeToken
-}));
-
-const secondDoc = secondRes.cursor.firstBatch[0];
-assert.eq(secondDoc.o._id, 1);
-
-// Make sure the second result differs from the first.
-assert.neq(firstDoc, secondDoc);
+{
+ const res = assert.commandWorked(localDb.runCommand({
+ find: "oplog.rs",
+ filter: {op: "i", "o.ans": 42},
+ hint: {$natural: 1},
+ batchSize: 1,
+ $_requestResumeToken: true
+ }));
+
+ assert.eq(res.cursor.firstBatch.length, 1, res);
+ assert.eq(res.cursor.firstBatch[0].o._id, 0, res);
+
+ // Assert resume token is non-null.
+ const resumeToken1 = assertExpectedResumeTokenFormat(res);
+ assert.eq(timestampCmp(resumeToken1.ts, kNullTS), 1);
+
+ // Kill the cursor before attempting to resume.
+ assert.commandWorked(localDb.runCommand({killCursors: "oplog.rs", cursors: [res.cursor.id]}));
+
+ jsTestLog("Resuming oplog collection scan fails");
+ // This fails because '$_resumeAfter' expects a '$recordId' resume token, not a 'ts' resume
+ // token. This is appropriate since resuming on the oplog should occur by timestamp, not by
+ // record id.
+ assert.commandFailedWithCode(localDb.runCommand({
+ find: "oplog.rs",
+ filter: {op: "i", "o.ans": 42},
+ hint: {$natural: 1},
+ batchSize: 1,
+ $_requestResumeToken: true,
+ $_resumeAfter: resumeToken1
+ }),
+ ErrorCodes.BadValue);
+
+ // Confirm that when we restart our oplog scan after the resume token, we see
+ // the next expected document and the PBRT advances past the last observed token.
+ const res2 = assert.commandWorked(localDb.runCommand({
+ find: "oplog.rs",
+ filter: {op: "i", "o.ans": 42, ts: {"$gt": resumeToken1.ts}},
+ hint: {$natural: 1},
+ batchSize: 1,
+ $_requestResumeToken: true,
+ }));
+
+ assert.eq(res2.cursor.firstBatch.length, 1, res);
+ assert.eq(res2.cursor.firstBatch[0].o._id, 1, res);
+
+ const resumeToken2 = assertExpectedResumeTokenFormat(res2);
+ assert.eq(timestampCmp(resumeToken2.ts, resumeToken1.ts), 1);
+
+ const res3 = assert.commandWorked(localDb.runCommand({
+ find: "oplog.rs",
+ filter: {op: "i", "o.ans": 42, ts: {"$gt": resumeToken2.ts}},
+ hint: {$natural: 1},
+ batchSize: 1,
+ $_requestResumeToken: true,
+ }));
+
+ assert.eq(res3.cursor.firstBatch.length, 1, res);
+ assert.eq(res3.cursor.firstBatch[0].o._id, 2, res);
+
+ const resumeToken3 = assertExpectedResumeTokenFormat(res3);
+ assert.eq(timestampCmp(resumeToken3.ts, resumeToken2.ts), 1);
+}
+// ---------------------------------------------------------------------------------------
+jsTestLog("Running initial tailable query on the oplog");
+{
+ const res = assert.commandWorked(localDb.runCommand({
+ find: "oplog.rs",
+ filter: {op: "i", "o.ans": 42},
+ hint: {$natural: 1},
+ batchSize: 1,
+ tailable: true,
+ awaitData: true,
+ $_requestResumeToken: true
+ }));
+
+ assert.eq(res.cursor.firstBatch.length, 1, res);
+ assert.eq(res.cursor.firstBatch[0].o._id, 0, res);
+
+ // Resume token should be non-null.
+ const resumeToken1 = assertExpectedResumeTokenFormat(res);
+ assert.eq(timestampCmp(resumeToken1.ts, kNullTS), 1);
+
+ const cursorId = res.cursor.id;
+
+ jsTest.log("Ensure that postBatchResumeToken attribute is returned for getMore command");
+ const resGetMore1 =
+ assert.commandWorked(localDb.runCommand({getMore: cursorId, collection: "oplog.rs"}));
+
+ assert.eq(resGetMore1.cursor.nextBatch.length, 2, resGetMore1);
+ assert.eq(resGetMore1.cursor.nextBatch[0].o._id, 1, resGetMore1);
+ assert.eq(resGetMore1.cursor.nextBatch[1].o._id, 2, resGetMore1);
+
+ // Resume token should be greater than the find command's.
+ const resumeToken2 = assertExpectedResumeTokenFormat(resGetMore1);
+ assert.eq(timestampCmp(resumeToken2.ts, resumeToken1.ts), 1);
+
+ jsTest.log(
+ "Ensure that postBatchResumeToken attribute is returned for getMore command with no results");
+ const resGetMore2 = assert.commandWorked(
+ localDb.runCommand({getMore: cursorId, collection: "oplog.rs", maxTimeMS: 100}));
+
+ // The results are exhausted, but the cursor stays alive.
+ assert.eq(resGetMore2.cursor.nextBatch.length, 0, resGetMore2);
+ assert.eq(resGetMore2.cursor.id, cursorId, resGetMore2);
+
+ // Resume token should be the same as the first getMore.
+ const resumeToken3 = assertExpectedResumeTokenFormat(resGetMore2);
+ assert.eq(timestampCmp(resumeToken3.ts, resumeToken2.ts), 0);
+
+ // Kill the tailable cursor.
+ assert.commandWorked(localDb.runCommand({killCursors: "oplog.rs", cursors: [cursorId]}));
+}
+// ---------------------------------------------------------------------------------------
+jsTest.log("Run find command on oplog with $_requestResumeToken disabled");
+{
+ const res = assert.commandWorked(localDb.runCommand({
+ find: "oplog.rs",
+ filter: {op: "i", "o.ans": 42},
+ hint: {$natural: 1},
+ batchSize: 1,
+ $_requestResumeToken: false
+ }));
+
+ assert(!res.cursor.hasOwnProperty("postBatchResumeToken"), res);
+}
+// ---------------------------------------------------------------------------------------
+jsTest.log("Run find command on oplog with $_requestResumeToken defaulted to disabled");
+{
+ const res = assert.commandWorked(localDb.runCommand({
+ find: "oplog.rs",
+ filter: {op: "i", "o.ans": 42},
+ hint: {$natural: 1},
+ batchSize: 1,
+ }));
+
+ assert(!res.cursor.hasOwnProperty("postBatchResumeToken"), res);
+}
+// ---------------------------------------------------------------------------------------
+jsTest.log("Run find command on non-oplog with $_requestResumeToken requested");
+{
+ const res = assert.commandWorked(node.getDB(dbName).runCommand({
+ find: collName,
+ filter: {},
+ hint: {$natural: 1},
+ batchSize: 1,
+ $_requestResumeToken: true
+ }));
+
+ assert.eq(res.cursor.firstBatch.length, 1, res);
+ assert.eq(res.cursor.firstBatch[0]._id, 0, res);
+
+ assert.hasFields(res.cursor, ["postBatchResumeToken"]);
+ assertExpectedResumeTokenFormat(res, false /* isOplog */);
+}
+// ---------------------------------------------------------------------------------------
+jsTestLog("Running query on the oplog with no results");
+{
+ const res = assert.commandWorked(localDb.runCommand({
+ find: "oplog.rs",
+ filter: {op: "i", "o.ans": 43},
+ hint: {$natural: 1},
+ batchSize: 1,
+ $_requestResumeToken: true
+ }));
+
+ assert.eq(res.cursor.firstBatch.length, 0, res);
+
+ // Resume token should be non-null.
+ const resumeToken = assertExpectedResumeTokenFormat(res);
+ assert.eq(timestampCmp(resumeToken.ts, kNullTS), 1);
+}
+// ---------------------------------------------------------------------------------------
+jsTestLog("Running tailable query on the oplog with no results");
+{
+ const res = assert.commandWorked(localDb.runCommand({
+ find: "oplog.rs",
+ filter: {op: "i", ns: `${dbName}.${collName}`, "o.ans": 43},
+ hint: {$natural: 1},
+ batchSize: 1,
+ tailable: true,
+ awaitData: true,
+ $_requestResumeToken: true
+ }));
+
+ assert.eq(res.cursor.firstBatch.length, 0, res);
+
+ // Resume token should be non-null.
+ const resumeToken1 = assertExpectedResumeTokenFormat(res);
+ assert.eq(timestampCmp(resumeToken1.ts, kNullTS), 1);
+
+ const cursorId = res.cursor.id;
+
+ jsTest.log("Run a tailable getMore with no results");
+ const resGetMore1 = assert.commandWorked(
+ localDb.runCommand({getMore: cursorId, collection: "oplog.rs", maxTimeMS: 100}));
+
+ assert.eq(resGetMore1.cursor.nextBatch.length, 0, resGetMore1);
+
+ // Resume token should be equal to the find command's.
+ const resumeToken2 = assertExpectedResumeTokenFormat(resGetMore1);
+ assert.eq(timestampCmp(resumeToken2.ts, resumeToken1.ts), 0);
+
+ // Insert dummy data so the next getMore should have a higher resume token.
+ assert.commandWorked(node.getDB(dbName).getCollection(collName + "_other").insert({dummy: 1}));
+
+ jsTest.log("Run another tailable getMore with no results");
+ const resGetMore2 = assert.commandWorked(
+ localDb.runCommand({getMore: cursorId, collection: "oplog.rs", maxTimeMS: 100}));
+
+ assert.eq(resGetMore2.cursor.nextBatch.length, 0, resGetMore2);
+
+ // Resume token should be greater than the last getMore's.
+ const resumeToken3 = assertExpectedResumeTokenFormat(resGetMore2);
+ assert.eq(timestampCmp(resumeToken3.ts, resumeToken2.ts), 1);
+
+ // Kill the tailable cursor.
+ assert.commandWorked(localDb.runCommand({killCursors: "oplog.rs", cursors: [cursorId]}));
+}
+
+// ---------------------------------------------------------------------------------------
+jsTestLog("Running query on the oplog with an empty batch");
+{
+ const res = assert.commandWorked(localDb.runCommand({
+ find: "oplog.rs",
+ filter: {op: "i", "o.ans": 42},
+ hint: {$natural: 1},
+ batchSize: 0,
+ $_requestResumeToken: true
+ }));
+
+ assert.eq(res.cursor.firstBatch.length, 0, res);
+
+ // Resume token should be null because the batch size is 0.
+ const resumeToken1 = assertExpectedResumeTokenFormat(res);
+ assert.eq(timestampCmp(resumeToken1.ts, kNullTS), 0);
+
+ const cursorId = res.cursor.id;
+
+ jsTest.log("Run a getMore that should return data");
+ const resGetMore1 =
+ assert.commandWorked(localDb.runCommand({getMore: cursorId, collection: "oplog.rs"}));
+
+ assert.eq(resGetMore1.cursor.nextBatch.length, 3, resGetMore1);
+ assert.eq(resGetMore1.cursor.nextBatch[0].o._id, 0, resGetMore1);
+ assert.eq(resGetMore1.cursor.nextBatch[1].o._id, 1, resGetMore1);
+ assert.eq(resGetMore1.cursor.nextBatch[2].o._id, 2, resGetMore1);
+ assert.eq(resGetMore1.cursor.id, 0, resGetMore1);
+
+ // Resume token should be greater than the find command's.
+ const resumeToken2 = assertExpectedResumeTokenFormat(resGetMore1);
+ assert.eq(timestampCmp(resumeToken2.ts, resumeToken1.ts), 1);
+}
rst.stopSet();
})();
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 96d612b60e5..9e1c0f711fc 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -455,6 +455,7 @@ public:
BSONObj obj;
PlanExecutor::ExecState state = PlanExecutor::ADVANCED;
std::uint64_t numResults = 0;
+ bool stashedResult = false;
try {
while (!FindCommon::enoughForFirstBatch(originalQR, numResults) &&
@@ -463,6 +464,7 @@ public:
// later.
if (!FindCommon::haveSpaceForNext(obj, numResults, firstBatch.bytesUsed())) {
exec->enqueue(obj);
+ stashedResult = true;
break;
}
@@ -490,6 +492,13 @@ public:
throw;
}
+ // For empty batches, or in the case where the final result was added to the batch
+ // rather than being stashed, we update the PBRT to ensure that it is the most recent
+ // available.
+ if (!stashedResult) {
+ firstBatch.setPostBatchResumeToken(exec->getPostBatchResumeToken());
+ }
+
// Set up the cursor for getMore.
CursorId cursorId = 0;
if (shouldSaveCursor(opCtx, collection, state, exec.get())) {
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 64edcbfff86..8a21e28dcfc 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -352,6 +352,8 @@ public:
if (state == PlanExecutor::IS_EOF) {
// The latest oplog timestamp may advance even when there are no results. Ensure
// that we have the latest postBatchResumeToken produced by the plan executor.
+ // The getMore command does not accept a batchSize of 0, so empty batches are
+ // always caused by hitting EOF and do not need to be handled separately.
nextBatch->setPostBatchResumeToken(exec->getPostBatchResumeToken());
}
diff --git a/src/mongo/db/query/planner_access.cpp b/src/mongo/db/query/planner_access.cpp
index db4779c2500..1ece9c238cc 100644
--- a/src/mongo/db/query/planner_access.cpp
+++ b/src/mongo/db/query/planner_access.cpp
@@ -231,8 +231,13 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan(
}
}
- // Extract and assign the 'requestResumeToken' field.
- csn->requestResumeToken = query.getQueryRequest().getRequestResumeToken();
+ // If the client requested a resume token and we are scanning the oplog, prepare
+ // the collection scan to return timestamp-based tokens. Otherwise, we should
+ // return generic RecordId-based tokens.
+ if (query.getQueryRequest().getRequestResumeToken()) {
+ csn->shouldTrackLatestOplogTimestamp = query.nss().isOplog();
+ csn->requestResumeToken = !query.nss().isOplog();
+ }
// Extract and assign the RecordId from the 'resumeAfter' token, if present.
const BSONObj& resumeAfterObj = query.getQueryRequest().getResumeAfter();