From 8216b783d18577097d56b067abd045e030fde96a Mon Sep 17 00:00:00 2001 From: Anton Korshunov Date: Tue, 8 Sep 2020 13:48:35 +0100 Subject: SERVER-48472 Make SBE raise a KeyNotFound error when $_resumeAfter record id is not found --- .../core/resume_query_from_non_existent_record.js | 66 +++++++++++++++++++++ src/mongo/db/exec/sbe/stages/loop_join.h | 3 +- src/mongo/db/query/sbe_stage_builder_coll_scan.cpp | 68 ++++++++++++++++++---- 3 files changed, 123 insertions(+), 14 deletions(-) create mode 100644 jstests/core/resume_query_from_non_existent_record.js diff --git a/jstests/core/resume_query_from_non_existent_record.js b/jstests/core/resume_query_from_non_existent_record.js new file mode 100644 index 00000000000..954325a5763 --- /dev/null +++ b/jstests/core/resume_query_from_non_existent_record.js @@ -0,0 +1,66 @@ +/** + * Test that an error is raised when we try to resume a query from a record which doesn't exist. + * + * @tags: [ + * assumes_against_mongod_not_mongos, + * requires_find_command, + * multiversion_incompatible, + * ] + */ + +(function() { +"use strict"; + +const collName = "resume_query_from_non_existent_record"; +const coll = db[collName]; + +coll.drop(); + +const testData = [{_id: 0, a: 1}, {_id: 1, a: 2}, {_id: 2, a: 3}]; +assert.commandWorked(coll.insert(testData)); + +// Run the initial query and request to return a resume token. We're interested only in a single +// document, so 'batchSize' is set to 1. +let res = assert.commandWorked( + db.runCommand({find: collName, hint: {$natural: 1}, batchSize: 1, $_requestResumeToken: true})); +assert.eq(1, res.cursor.firstBatch.length); +assert.contains(res.cursor.firstBatch[0], testData); +const savedData = res.cursor.firstBatch; + +// Make sure the query returned a resume token which will be used to resume the query from. +assert.hasFields(res.cursor, ["postBatchResumeToken"]); +const resumeToken = res.cursor.postBatchResumeToken; + +// Kill the cursor before attempting to resume. +assert.commandWorked(db.runCommand({killCursors: collName, cursors: [res.cursor.id]})); + +// Try to resume the query from the saved resume token. +res = assert.commandWorked(db.runCommand({ + find: collName, + hint: {$natural: 1}, + batchSize: 1, + $_requestResumeToken: true, + $_resumeAfter: resumeToken +})); +assert.eq(1, res.cursor.firstBatch.length); +assert.contains(res.cursor.firstBatch[0], testData); +assert.neq(savedData[0], res.cursor.firstBatch[0]); + +// Kill the cursor before attempting to resume. +assert.commandWorked(db.runCommand({killCursors: collName, cursors: [res.cursor.id]})); + +// Delete a document which corresponds to the saved resume token, so that we can guarantee it does +// not exist. +assert.commandWorked(coll.remove({_id: savedData[0]._id}, {justOne: true})); + +// Try to resume the query from the same token and check that it fails to position the cursor to +// the record specified in the resume token. +assert.commandFailedWithCode(db.runCommand({ + find: collName, + hint: {$natural: 1}, + batchSize: 1, + $_requestResumeToken: true, + $_resumeAfter: resumeToken +}), + ErrorCodes.KeyNotFound); +})(); diff --git a/src/mongo/db/exec/sbe/stages/loop_join.h b/src/mongo/db/exec/sbe/stages/loop_join.h index bf19c50b8f2..0f94d39a9c1 100644 --- a/src/mongo/db/exec/sbe/stages/loop_join.h +++ b/src/mongo/db/exec/sbe/stages/loop_join.h @@ -57,8 +57,7 @@ public: private: // Set of variables coming from the outer side. const value::SlotVector _outerProjects; - // Set of correlated variables from the outer side that are visible on the inner side. They must - // be also present in the _outerProjects. + // Set of correlated variables from the outer side that are visible on the inner side. const value::SlotVector _outerCorrelated; // If not set then this is a cross product. const std::unique_ptr _predicate; diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp index 05f9bcefb96..1a338abf238 100644 --- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp +++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp @@ -41,6 +41,7 @@ #include "mongo/db/exec/sbe/stages/loop_join.h" #include "mongo/db/exec/sbe/stages/project.h" #include "mongo/db/exec/sbe/stages/scan.h" +#include "mongo/db/exec/sbe/stages/union.h" #include "mongo/db/query/sbe_stage_builder_filter.h" #include "mongo/db/query/util/make_data_structure.h" #include "mongo/db/storage/oplog_hack.h" @@ -330,20 +331,63 @@ generateGenericCollScan(const Collection* collection, // Check if the scan should be started after the provided resume RecordId and construct a nested // loop join sub-tree to project out the resume RecordId as a seekRecordIdSlot and feed it to - // the inner side (scan). - // - // Note that we also inject a 'skip 1' stage on top of the inner branch, as we need to start - // _after_ the resume RecordId. - // - // TODO SERVER-48472: raise KeyNotFound error if we cannot position the cursor on - // seekRecordIdSlot. + // the inner side (scan). We will also construct a union sub-tree as an outer side of the loop + // join to implement the check that the record we're trying to reposition the scan exists. if (seekRecordIdSlot && !isTailableResumeBranch) { + // Project out the RecordId we want to resume from as 'seekSlot'. + auto seekSlot = slotIdGenerator->generate(); + auto projStage = sbe::makeProjectStage( + sbe::makeS(sbe::makeS(), 1, boost::none), + seekSlot, + sbe::makeE(sbe::value::TypeTags::NumberInt64, + csn->resumeAfterRecordId->repr())); + + // Construct a 'seek' branch of the 'union'. If we're succeeded to reposition the cursor, + // the branch will output the 'seekSlot' to start the real scan from, otherwise it will + // produce EOF. + auto seekBranch = + sbe::makeS(std::move(projStage), + sbe::makeS(nss, + boost::none, + boost::none, + std::vector{}, + sbe::makeSV(), + seekSlot, + forward, + yieldPolicy, + tracker), + + sbe::makeSV(seekSlot), + sbe::makeSV(seekSlot), + nullptr); + + // Construct a 'fail' branch of the union. The 'unusedSlot' is needed as each union branch + // must have the same number of slots, and we use just one in the 'seek' branch above. This + // branch will only be executed if the 'seek' branch produces EOF, which can only happen if + // if the seek did not find the record id specified in $_resumeAfter. + auto unusedSlot = slotIdGenerator->generate(); + auto failBranch = sbe::makeProjectStage( + sbe::makeS(), + unusedSlot, + sbe::makeE( + ErrorCodes::KeyNotFound, + str::stream() << "Failed to resume collection scan: the recordId from which we are " + << "attempting to resume no longer exists in the collection: " + << csn->resumeAfterRecordId)); + + // Construct a union stage from the 'seek' and 'fail' branches. Note that this stage will + // ever produce a single call to getNext() due to a 'limit 1' sitting on top of it. + auto unionStage = sbe::makeS( + make_vector>(std::move(seekBranch), + std::move(failBranch)), + std::vector{sbe::makeSV(seekSlot), sbe::makeSV(unusedSlot)}, + sbe::makeSV(*seekRecordIdSlot)); + + // Construct the final loop join. Note that we also inject a 'skip 1' stage on top of the + // inner branch, as we need to start _after_ the resume RecordId, and a 'limit 1' stage on + // top of the outer branch, as it should produce just a single seek recordId. stage = sbe::makeS( - sbe::makeProjectStage( - sbe::makeS(sbe::makeS(), 1, boost::none), - *seekRecordIdSlot, - sbe::makeE(sbe::value::TypeTags::NumberInt64, - csn->resumeAfterRecordId->repr())), + sbe::makeS(std::move(unionStage), 1, boost::none), sbe::makeS(std::move(stage), boost::none, 1), sbe::makeSV(), sbe::makeSV(*seekRecordIdSlot), -- cgit v1.2.1