diff options
author | Anton Korshunov <anton.korshunov@mongodb.com> | 2020-09-08 13:48:35 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-11 10:38:26 +0000 |
commit | 8216b783d18577097d56b067abd045e030fde96a (patch) | |
tree | ceae02dd801fdc5a2cdae854bd7a84bdf1162377 | |
parent | 8222bf355090bc81de603d34765de4e32550d6ce (diff) | |
download | mongo-8216b783d18577097d56b067abd045e030fde96a.tar.gz |
SERVER-48472 Make SBE raise a KeyNotFound error when $_resumeAfter record id is not found
-rw-r--r-- | jstests/core/resume_query_from_non_existent_record.js | 66 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/loop_join.h | 3 | ||||
-rw-r--r-- | src/mongo/db/query/sbe_stage_builder_coll_scan.cpp | 68 |
3 files changed, 123 insertions, 14 deletions
diff --git a/jstests/core/resume_query_from_non_existent_record.js b/jstests/core/resume_query_from_non_existent_record.js new file mode 100644 index 00000000000..954325a5763 --- /dev/null +++ b/jstests/core/resume_query_from_non_existent_record.js @@ -0,0 +1,66 @@ +/** + * Test that an error is raised when we try to resume a query from a record which doesn't exist. + * + * @tags: [ + * assumes_against_mongod_not_mongos, + * requires_find_command, + * multiversion_incompatible, + * ] + */ + +(function() { +"use strict"; + +const collName = "resume_query_from_non_existent_record"; +const coll = db[collName]; + +coll.drop(); + +const testData = [{_id: 0, a: 1}, {_id: 1, a: 2}, {_id: 2, a: 3}]; +assert.commandWorked(coll.insert(testData)); + +// Run the initial query and request to return a resume token. We're interested only in a single +// document, so 'batchSize' is set to 1. +let res = assert.commandWorked( + db.runCommand({find: collName, hint: {$natural: 1}, batchSize: 1, $_requestResumeToken: true})); +assert.eq(1, res.cursor.firstBatch.length); +assert.contains(res.cursor.firstBatch[0], testData); +const savedData = res.cursor.firstBatch; + +// Make sure the query returned a resume token which will be used to resume the query from. +assert.hasFields(res.cursor, ["postBatchResumeToken"]); +const resumeToken = res.cursor.postBatchResumeToken; + +// Kill the cursor before attempting to resume. +assert.commandWorked(db.runCommand({killCursors: collName, cursors: [res.cursor.id]})); + +// Try to resume the query from the saved resume token. +res = assert.commandWorked(db.runCommand({ + find: collName, + hint: {$natural: 1}, + batchSize: 1, + $_requestResumeToken: true, + $_resumeAfter: resumeToken +})); +assert.eq(1, res.cursor.firstBatch.length); +assert.contains(res.cursor.firstBatch[0], testData); +assert.neq(savedData[0], res.cursor.firstBatch[0]); + +// Kill the cursor before attempting to resume. +assert.commandWorked(db.runCommand({killCursors: collName, cursors: [res.cursor.id]})); + +// Delete a document which corresponds to the saved resume token, so that we can guarantee it does +// not exist. +assert.commandWorked(coll.remove({_id: savedData[0]._id}, {justOne: true})); + +// Try to resume the query from the same token and check that it fails to position the cursor to +// the record specified in the resume token. +assert.commandFailedWithCode(db.runCommand({ + find: collName, + hint: {$natural: 1}, + batchSize: 1, + $_requestResumeToken: true, + $_resumeAfter: resumeToken +}), + ErrorCodes.KeyNotFound); +})(); diff --git a/src/mongo/db/exec/sbe/stages/loop_join.h b/src/mongo/db/exec/sbe/stages/loop_join.h index bf19c50b8f2..0f94d39a9c1 100644 --- a/src/mongo/db/exec/sbe/stages/loop_join.h +++ b/src/mongo/db/exec/sbe/stages/loop_join.h @@ -57,8 +57,7 @@ public: private: // Set of variables coming from the outer side. const value::SlotVector _outerProjects; - // Set of correlated variables from the outer side that are visible on the inner side. They must - // be also present in the _outerProjects. + // Set of correlated variables from the outer side that are visible on the inner side. const value::SlotVector _outerCorrelated; // If not set then this is a cross product. const std::unique_ptr<EExpression> _predicate; diff --git a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp index 05f9bcefb96..1a338abf238 100644 --- a/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp +++ b/src/mongo/db/query/sbe_stage_builder_coll_scan.cpp @@ -41,6 +41,7 @@ #include "mongo/db/exec/sbe/stages/loop_join.h" #include "mongo/db/exec/sbe/stages/project.h" #include "mongo/db/exec/sbe/stages/scan.h" +#include "mongo/db/exec/sbe/stages/union.h" #include "mongo/db/query/sbe_stage_builder_filter.h" #include "mongo/db/query/util/make_data_structure.h" #include "mongo/db/storage/oplog_hack.h" @@ -330,20 +331,63 @@ generateGenericCollScan(const Collection* collection, // Check if the scan should be started after the provided resume RecordId and construct a nested // loop join sub-tree to project out the resume RecordId as a seekRecordIdSlot and feed it to - // the inner side (scan). - // - // Note that we also inject a 'skip 1' stage on top of the inner branch, as we need to start - // _after_ the resume RecordId. - // - // TODO SERVER-48472: raise KeyNotFound error if we cannot position the cursor on - // seekRecordIdSlot. + // the inner side (scan). We will also construct a union sub-tree as an outer side of the loop + // join to implement the check that the record we're trying to reposition the scan exists. if (seekRecordIdSlot && !isTailableResumeBranch) { + // Project out the RecordId we want to resume from as 'seekSlot'. + auto seekSlot = slotIdGenerator->generate(); + auto projStage = sbe::makeProjectStage( + sbe::makeS<sbe::LimitSkipStage>(sbe::makeS<sbe::CoScanStage>(), 1, boost::none), + seekSlot, + sbe::makeE<sbe::EConstant>(sbe::value::TypeTags::NumberInt64, + csn->resumeAfterRecordId->repr())); + + // Construct a 'seek' branch of the 'union'. If we're succeeded to reposition the cursor, + // the branch will output the 'seekSlot' to start the real scan from, otherwise it will + // produce EOF. + auto seekBranch = + sbe::makeS<sbe::LoopJoinStage>(std::move(projStage), + sbe::makeS<sbe::ScanStage>(nss, + boost::none, + boost::none, + std::vector<std::string>{}, + sbe::makeSV(), + seekSlot, + forward, + yieldPolicy, + tracker), + + sbe::makeSV(seekSlot), + sbe::makeSV(seekSlot), + nullptr); + + // Construct a 'fail' branch of the union. The 'unusedSlot' is needed as each union branch + // must have the same number of slots, and we use just one in the 'seek' branch above. This + // branch will only be executed if the 'seek' branch produces EOF, which can only happen if + // if the seek did not find the record id specified in $_resumeAfter. + auto unusedSlot = slotIdGenerator->generate(); + auto failBranch = sbe::makeProjectStage( + sbe::makeS<sbe::CoScanStage>(), + unusedSlot, + sbe::makeE<sbe::EFail>( + ErrorCodes::KeyNotFound, + str::stream() << "Failed to resume collection scan: the recordId from which we are " + << "attempting to resume no longer exists in the collection: " + << csn->resumeAfterRecordId)); + + // Construct a union stage from the 'seek' and 'fail' branches. Note that this stage will + // ever produce a single call to getNext() due to a 'limit 1' sitting on top of it. + auto unionStage = sbe::makeS<sbe::UnionStage>( + make_vector<std::unique_ptr<sbe::PlanStage>>(std::move(seekBranch), + std::move(failBranch)), + std::vector<sbe::value::SlotVector>{sbe::makeSV(seekSlot), sbe::makeSV(unusedSlot)}, + sbe::makeSV(*seekRecordIdSlot)); + + // Construct the final loop join. Note that we also inject a 'skip 1' stage on top of the + // inner branch, as we need to start _after_ the resume RecordId, and a 'limit 1' stage on + // top of the outer branch, as it should produce just a single seek recordId. stage = sbe::makeS<sbe::LoopJoinStage>( - sbe::makeProjectStage( - sbe::makeS<sbe::LimitSkipStage>(sbe::makeS<sbe::CoScanStage>(), 1, boost::none), - *seekRecordIdSlot, - sbe::makeE<sbe::EConstant>(sbe::value::TypeTags::NumberInt64, - csn->resumeAfterRecordId->repr())), + sbe::makeS<sbe::LimitSkipStage>(std::move(unionStage), 1, boost::none), sbe::makeS<sbe::LimitSkipStage>(std::move(stage), boost::none, 1), sbe::makeSV(), sbe::makeSV(*seekRecordIdSlot), |