diff options
author | Judah Schvimer <judah@mongodb.com> | 2020-09-29 21:30:16 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-10-13 21:38:46 +0000 |
commit | 9b519f5f01cfa98636b061785acdaf58a3dfcf77 (patch) | |
tree | c55eda44e8422f64e1be4b90a1feebd37fb4fdfa | |
parent | 6831f81b2b84325a2ef4f8c74c71d478eeadb34a (diff) | |
download | mongo-9b519f5f01cfa98636b061785acdaf58a3dfcf77.tar.gz |
SERVER-51227 Make find/getMore cmd with $_requestResumeToken on oplog report latest oplog entry ts seen while generating the response batch
-rw-r--r-- | jstests/replsets/resume_after_against_oplog.js | 306 | ||||
-rw-r--r-- | src/mongo/db/commands/find_cmd.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/commands/getmore_cmd.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/query/planner_access.cpp | 9 |
4 files changed, 288 insertions, 38 deletions
diff --git a/jstests/replsets/resume_after_against_oplog.js b/jstests/replsets/resume_after_against_oplog.js index 07f32522552..fbaa5b07006 100644 --- a/jstests/replsets/resume_after_against_oplog.js +++ b/jstests/replsets/resume_after_against_oplog.js @@ -1,59 +1,293 @@ /** - * Tests that using the $_resumeAfter option in a query against the oplog does not invariant. + * Tests the behavior of the 'postBatchResumeToken' and '$_resumeAfter' fields in 'find' and + * 'getMore' requests and responses on the oplog. + * + * @tags: [requires_fcv_47] */ (function() { "use strict"; -const testName = "resume_after_against_oplog"; -const rst = new ReplSetTest({nodes: 1, name: testName}); +const rst = new ReplSetTest({nodes: 1}); rst.startSet(); rst.initiate(); const node = rst.getPrimary(); const dbName = "test"; -const collName = testName; +const collName = jsTestName(); jsTestLog("Inserting some data"); // We will query the oplog for the entries corresponding to those inserts. -const testData = [{_id: 0, ans: 42}, {_id: 1, ans: 42}]; +const testData = [{_id: 0, ans: 42}, {_id: 1, ans: 42}, {_id: 2, ans: 42}]; assert.commandWorked(node.getDB(dbName).getCollection(collName).insert(testData)); const localDb = node.getDB("local"); +const kNullTS = new Timestamp(0, 0); +function assertExpectedResumeTokenFormat(res, isOplog = true) { + assert.hasFields(res.cursor, ["postBatchResumeToken"]); + const resumeToken = res.cursor.postBatchResumeToken; + assert.eq(resumeToken.hasOwnProperty("ts"), isOplog, res); + assert.eq(resumeToken.hasOwnProperty("$recordId"), !isOplog, res); + return resumeToken; +} + +// --------------------------------------------------------------------------------------- jsTestLog("Running initial query on the oplog"); -const firstRes = assert.commandWorked(localDb.runCommand({ - find: "oplog.rs", - filter: {op: "i", "o.ans": 42}, - hint: {$natural: 1}, - batchSize: 1, - $_requestResumeToken: true -})); - -const firstDoc = firstRes.cursor.firstBatch[0]; -assert.eq(firstDoc.o._id, 0); -assert.hasFields(firstRes.cursor, ["postBatchResumeToken"]); -const resumeToken = firstRes.cursor.postBatchResumeToken; - -// Kill the cursor before attempting to resume. -assert.commandWorked(localDb.runCommand({killCursors: "oplog.rs", cursors: [firstRes.cursor.id]})); - -jsTestLog("Resuming oplog collection scan from the last recordId we saw"); -const secondRes = assert.commandWorked(localDb.runCommand({ - find: "oplog.rs", - filter: {op: "i", "o.ans": 42}, - hint: {$natural: 1}, - batchSize: 1, - $_requestResumeToken: true, - $_resumeAfter: resumeToken -})); - -const secondDoc = secondRes.cursor.firstBatch[0]; -assert.eq(secondDoc.o._id, 1); - -// Make sure the second result differs from the first. -assert.neq(firstDoc, secondDoc); +{ + const res = assert.commandWorked(localDb.runCommand({ + find: "oplog.rs", + filter: {op: "i", "o.ans": 42}, + hint: {$natural: 1}, + batchSize: 1, + $_requestResumeToken: true + })); + + assert.eq(res.cursor.firstBatch.length, 1, res); + assert.eq(res.cursor.firstBatch[0].o._id, 0, res); + + // Assert resume token is non-null. + const resumeToken1 = assertExpectedResumeTokenFormat(res); + assert.eq(timestampCmp(resumeToken1.ts, kNullTS), 1); + + // Kill the cursor before attempting to resume. + assert.commandWorked(localDb.runCommand({killCursors: "oplog.rs", cursors: [res.cursor.id]})); + + jsTestLog("Resuming oplog collection scan fails"); + // This fails because '$_resumeAfter' expects a '$recordId' resume token, not a 'ts' resume + // token. This is appropriate since resuming on the oplog should occur by timestamp, not by + // record id. + assert.commandFailedWithCode(localDb.runCommand({ + find: "oplog.rs", + filter: {op: "i", "o.ans": 42}, + hint: {$natural: 1}, + batchSize: 1, + $_requestResumeToken: true, + $_resumeAfter: resumeToken1 + }), + ErrorCodes.BadValue); + + // Confirm that when we restart our oplog scan after the resume token, we see + // the next expected document and the PBRT advances past the last observed token. + const res2 = assert.commandWorked(localDb.runCommand({ + find: "oplog.rs", + filter: {op: "i", "o.ans": 42, ts: {"$gt": resumeToken1.ts}}, + hint: {$natural: 1}, + batchSize: 1, + $_requestResumeToken: true, + })); + + assert.eq(res2.cursor.firstBatch.length, 1, res); + assert.eq(res2.cursor.firstBatch[0].o._id, 1, res); + + const resumeToken2 = assertExpectedResumeTokenFormat(res2); + assert.eq(timestampCmp(resumeToken2.ts, resumeToken1.ts), 1); + + const res3 = assert.commandWorked(localDb.runCommand({ + find: "oplog.rs", + filter: {op: "i", "o.ans": 42, ts: {"$gt": resumeToken2.ts}}, + hint: {$natural: 1}, + batchSize: 1, + $_requestResumeToken: true, + })); + + assert.eq(res3.cursor.firstBatch.length, 1, res); + assert.eq(res3.cursor.firstBatch[0].o._id, 2, res); + + const resumeToken3 = assertExpectedResumeTokenFormat(res3); + assert.eq(timestampCmp(resumeToken3.ts, resumeToken2.ts), 1); +} +// --------------------------------------------------------------------------------------- +jsTestLog("Running initial tailable query on the oplog"); +{ + const res = assert.commandWorked(localDb.runCommand({ + find: "oplog.rs", + filter: {op: "i", "o.ans": 42}, + hint: {$natural: 1}, + batchSize: 1, + tailable: true, + awaitData: true, + $_requestResumeToken: true + })); + + assert.eq(res.cursor.firstBatch.length, 1, res); + assert.eq(res.cursor.firstBatch[0].o._id, 0, res); + + // Resume token should be non-null. + const resumeToken1 = assertExpectedResumeTokenFormat(res); + assert.eq(timestampCmp(resumeToken1.ts, kNullTS), 1); + + const cursorId = res.cursor.id; + + jsTest.log("Ensure that postBatchResumeToken attribute is returned for getMore command"); + const resGetMore1 = + assert.commandWorked(localDb.runCommand({getMore: cursorId, collection: "oplog.rs"})); + + assert.eq(resGetMore1.cursor.nextBatch.length, 2, resGetMore1); + assert.eq(resGetMore1.cursor.nextBatch[0].o._id, 1, resGetMore1); + assert.eq(resGetMore1.cursor.nextBatch[1].o._id, 2, resGetMore1); + + // Resume token should be greater than the find command's. + const resumeToken2 = assertExpectedResumeTokenFormat(resGetMore1); + assert.eq(timestampCmp(resumeToken2.ts, resumeToken1.ts), 1); + + jsTest.log( + "Ensure that postBatchResumeToken attribute is returned for getMore command with no results"); + const resGetMore2 = assert.commandWorked( + localDb.runCommand({getMore: cursorId, collection: "oplog.rs", maxTimeMS: 100})); + + // The results are exhausted, but the cursor stays alive. + assert.eq(resGetMore2.cursor.nextBatch.length, 0, resGetMore2); + assert.eq(resGetMore2.cursor.id, cursorId, resGetMore2); + + // Resume token should be the same as the first getMore. + const resumeToken3 = assertExpectedResumeTokenFormat(resGetMore2); + assert.eq(timestampCmp(resumeToken3.ts, resumeToken2.ts), 0); + + // Kill the tailable cursor. + assert.commandWorked(localDb.runCommand({killCursors: "oplog.rs", cursors: [cursorId]})); +} +// --------------------------------------------------------------------------------------- +jsTest.log("Run find command on oplog with $_requestResumeToken disabled"); +{ + const res = assert.commandWorked(localDb.runCommand({ + find: "oplog.rs", + filter: {op: "i", "o.ans": 42}, + hint: {$natural: 1}, + batchSize: 1, + $_requestResumeToken: false + })); + + assert(!res.cursor.hasOwnProperty("postBatchResumeToken"), res); +} +// --------------------------------------------------------------------------------------- +jsTest.log("Run find command on oplog with $_requestResumeToken defaulted to disabled"); +{ + const res = assert.commandWorked(localDb.runCommand({ + find: "oplog.rs", + filter: {op: "i", "o.ans": 42}, + hint: {$natural: 1}, + batchSize: 1, + })); + + assert(!res.cursor.hasOwnProperty("postBatchResumeToken"), res); +} +// --------------------------------------------------------------------------------------- +jsTest.log("Run find command on non-oplog with $_requestResumeToken requested"); +{ + const res = assert.commandWorked(node.getDB(dbName).runCommand({ + find: collName, + filter: {}, + hint: {$natural: 1}, + batchSize: 1, + $_requestResumeToken: true + })); + + assert.eq(res.cursor.firstBatch.length, 1, res); + assert.eq(res.cursor.firstBatch[0]._id, 0, res); + + assert.hasFields(res.cursor, ["postBatchResumeToken"]); + assertExpectedResumeTokenFormat(res, false /* isOplog */); +} +// --------------------------------------------------------------------------------------- +jsTestLog("Running query on the oplog with no results"); +{ + const res = assert.commandWorked(localDb.runCommand({ + find: "oplog.rs", + filter: {op: "i", "o.ans": 43}, + hint: {$natural: 1}, + batchSize: 1, + $_requestResumeToken: true + })); + + assert.eq(res.cursor.firstBatch.length, 0, res); + + // Resume token should be non-null. + const resumeToken = assertExpectedResumeTokenFormat(res); + assert.eq(timestampCmp(resumeToken.ts, kNullTS), 1); +} +// --------------------------------------------------------------------------------------- +jsTestLog("Running tailable query on the oplog with no results"); +{ + const res = assert.commandWorked(localDb.runCommand({ + find: "oplog.rs", + filter: {op: "i", ns: `${dbName}.${collName}`, "o.ans": 43}, + hint: {$natural: 1}, + batchSize: 1, + tailable: true, + awaitData: true, + $_requestResumeToken: true + })); + + assert.eq(res.cursor.firstBatch.length, 0, res); + + // Resume token should be non-null. + const resumeToken1 = assertExpectedResumeTokenFormat(res); + assert.eq(timestampCmp(resumeToken1.ts, kNullTS), 1); + + const cursorId = res.cursor.id; + + jsTest.log("Run a tailable getMore with no results"); + const resGetMore1 = assert.commandWorked( + localDb.runCommand({getMore: cursorId, collection: "oplog.rs", maxTimeMS: 100})); + + assert.eq(resGetMore1.cursor.nextBatch.length, 0, resGetMore1); + + // Resume token should be equal to the find command's. + const resumeToken2 = assertExpectedResumeTokenFormat(resGetMore1); + assert.eq(timestampCmp(resumeToken2.ts, resumeToken1.ts), 0); + + // Insert dummy data so the next getMore should have a higher resume token. + assert.commandWorked(node.getDB(dbName).getCollection(collName + "_other").insert({dummy: 1})); + + jsTest.log("Run another tailable getMore with no results"); + const resGetMore2 = assert.commandWorked( + localDb.runCommand({getMore: cursorId, collection: "oplog.rs", maxTimeMS: 100})); + + assert.eq(resGetMore2.cursor.nextBatch.length, 0, resGetMore2); + + // Resume token should be greater than the last getMore's. + const resumeToken3 = assertExpectedResumeTokenFormat(resGetMore2); + assert.eq(timestampCmp(resumeToken3.ts, resumeToken2.ts), 1); + + // Kill the tailable cursor. + assert.commandWorked(localDb.runCommand({killCursors: "oplog.rs", cursors: [cursorId]})); +} + +// --------------------------------------------------------------------------------------- +jsTestLog("Running query on the oplog with an empty batch"); +{ + const res = assert.commandWorked(localDb.runCommand({ + find: "oplog.rs", + filter: {op: "i", "o.ans": 42}, + hint: {$natural: 1}, + batchSize: 0, + $_requestResumeToken: true + })); + + assert.eq(res.cursor.firstBatch.length, 0, res); + + // Resume token should be null because the batch size is 0. + const resumeToken1 = assertExpectedResumeTokenFormat(res); + assert.eq(timestampCmp(resumeToken1.ts, kNullTS), 0); + + const cursorId = res.cursor.id; + + jsTest.log("Run a getMore that should return data"); + const resGetMore1 = + assert.commandWorked(localDb.runCommand({getMore: cursorId, collection: "oplog.rs"})); + + assert.eq(resGetMore1.cursor.nextBatch.length, 3, resGetMore1); + assert.eq(resGetMore1.cursor.nextBatch[0].o._id, 0, resGetMore1); + assert.eq(resGetMore1.cursor.nextBatch[1].o._id, 1, resGetMore1); + assert.eq(resGetMore1.cursor.nextBatch[2].o._id, 2, resGetMore1); + assert.eq(resGetMore1.cursor.id, 0, resGetMore1); + + // Resume token should be greater than the find command's. + const resumeToken2 = assertExpectedResumeTokenFormat(resGetMore1); + assert.eq(timestampCmp(resumeToken2.ts, resumeToken1.ts), 1); +} rst.stopSet(); })(); diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 96d612b60e5..9e1c0f711fc 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -455,6 +455,7 @@ public: BSONObj obj; PlanExecutor::ExecState state = PlanExecutor::ADVANCED; std::uint64_t numResults = 0; + bool stashedResult = false; try { while (!FindCommon::enoughForFirstBatch(originalQR, numResults) && @@ -463,6 +464,7 @@ public: // later. if (!FindCommon::haveSpaceForNext(obj, numResults, firstBatch.bytesUsed())) { exec->enqueue(obj); + stashedResult = true; break; } @@ -490,6 +492,13 @@ public: throw; } + // For empty batches, or in the case where the final result was added to the batch + // rather than being stashed, we update the PBRT to ensure that it is the most recent + // available. + if (!stashedResult) { + firstBatch.setPostBatchResumeToken(exec->getPostBatchResumeToken()); + } + // Set up the cursor for getMore. CursorId cursorId = 0; if (shouldSaveCursor(opCtx, collection, state, exec.get())) { diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 64edcbfff86..8a21e28dcfc 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -352,6 +352,8 @@ public: if (state == PlanExecutor::IS_EOF) { // The latest oplog timestamp may advance even when there are no results. Ensure // that we have the latest postBatchResumeToken produced by the plan executor. + // The getMore command does not accept a batchSize of 0, so empty batches are + // always caused by hitting EOF and do not need to be handled separately. nextBatch->setPostBatchResumeToken(exec->getPostBatchResumeToken()); } diff --git a/src/mongo/db/query/planner_access.cpp b/src/mongo/db/query/planner_access.cpp index db4779c2500..1ece9c238cc 100644 --- a/src/mongo/db/query/planner_access.cpp +++ b/src/mongo/db/query/planner_access.cpp @@ -231,8 +231,13 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan( } } - // Extract and assign the 'requestResumeToken' field. - csn->requestResumeToken = query.getQueryRequest().getRequestResumeToken(); + // If the client requested a resume token and we are scanning the oplog, prepare + // the collection scan to return timestamp-based tokens. Otherwise, we should + // return generic RecordId-based tokens. + if (query.getQueryRequest().getRequestResumeToken()) { + csn->shouldTrackLatestOplogTimestamp = query.nss().isOplog(); + csn->requestResumeToken = !query.nss().isOplog(); + } // Extract and assign the RecordId from the 'resumeAfter' token, if present. const BSONObj& resumeAfterObj = query.getQueryRequest().getResumeAfter(); |