From ed1e2b4d2a4987e3744484f9482fdc7a0e119e94 Mon Sep 17 00:00:00 2001 From: David Storch Date: Wed, 28 Feb 2018 18:08:21 -0500 Subject: SERVER-33541 Add readConcern level 'snapshot' support for aggregation. --- jstests/noPassthrough/readConcern_snapshot.js | 8 +- .../read_concern_snapshot_aggregation.js | 216 +++++++++++++++++++++ .../read_concern_snapshot_yielding.js | 13 ++ jstests/noPassthrough/snapshot_reads.js | 33 ++-- src/mongo/db/commands/find_cmd.cpp | 16 +- src/mongo/db/pipeline/document_source.h | 33 ++++ .../db/pipeline/document_source_bucket_auto.h | 3 +- .../db/pipeline/document_source_change_stream.cpp | 2 + .../pipeline/document_source_check_resume_token.h | 2 + src/mongo/db/pipeline/document_source_coll_stats.h | 3 +- src/mongo/db/pipeline/document_source_current_op.h | 3 +- src/mongo/db/pipeline/document_source_cursor.cpp | 10 +- src/mongo/db/pipeline/document_source_cursor.h | 3 +- src/mongo/db/pipeline/document_source_facet.cpp | 3 +- .../db/pipeline/document_source_facet_test.cpp | 6 +- src/mongo/db/pipeline/document_source_geo_near.h | 3 +- .../db/pipeline/document_source_graph_lookup.h | 3 +- src/mongo/db/pipeline/document_source_group.h | 3 +- .../db/pipeline/document_source_index_stats.h | 3 +- ...document_source_internal_inhibit_optimization.h | 3 +- .../document_source_internal_split_pipeline.h | 3 +- src/mongo/db/pipeline/document_source_limit.h | 3 +- .../pipeline/document_source_list_local_cursors.h | 3 +- .../pipeline/document_source_list_local_sessions.h | 3 +- .../db/pipeline/document_source_list_sessions.h | 3 +- src/mongo/db/pipeline/document_source_lookup.h | 3 +- .../document_source_lookup_change_post_image.h | 1 + src/mongo/db/pipeline/document_source_match.h | 1 + .../db/pipeline/document_source_merge_cursors.h | 5 +- src/mongo/db/pipeline/document_source_mock.h | 3 +- src/mongo/db/pipeline/document_source_out.h | 3 +- src/mongo/db/pipeline/document_source_redact.h | 1 + src/mongo/db/pipeline/document_source_sample.h | 3 +- .../document_source_sample_from_random_cursor.h | 3 +- .../document_source_sequential_document_cache.h | 3 +- ...ocument_source_single_document_transformation.h | 3 + src/mongo/db/pipeline/document_source_skip.h | 3 +- src/mongo/db/pipeline/document_source_sort.h | 1 + .../db/pipeline/document_source_tee_consumer.h | 3 +- src/mongo/db/pipeline/document_source_unwind.h | 3 +- src/mongo/db/pipeline/pipeline.cpp | 39 ++-- src/mongo/db/pipeline/pipeline.h | 24 ++- src/mongo/db/pipeline/pipeline_d.cpp | 3 +- src/mongo/db/pipeline/pipeline_test.cpp | 86 ++++++-- src/mongo/db/query/find.cpp | 3 +- src/mongo/db/query/get_executor.cpp | 33 +++- src/mongo/db/query/get_executor.h | 10 +- src/mongo/db/service_entry_point_common.cpp | 3 +- src/mongo/s/query/document_source_router_adapter.h | 3 +- 49 files changed, 518 insertions(+), 109 deletions(-) create mode 100644 jstests/noPassthrough/read_concern_snapshot_aggregation.js diff --git a/jstests/noPassthrough/readConcern_snapshot.js b/jstests/noPassthrough/readConcern_snapshot.js index a45bdc5a4bd..2257f6641bf 100644 --- a/jstests/noPassthrough/readConcern_snapshot.js +++ b/jstests/noPassthrough/readConcern_snapshot.js @@ -117,16 +117,14 @@ assert.commandWorked(sessionDb.runCommand( {find: collName, readConcern: {level: "snapshot"}, txnNumber: NumberLong(txnNumber++)})); - // readConcern 'snapshot' is not supported by aggregate. - // TODO SERVER-33354: Add snapshot support for aggregate. - assert.commandFailedWithCode(sessionDb.runCommand({ + // readConcern 'snapshot' is supported by aggregate. + assert.commandWorked(sessionDb.runCommand({ aggregate: collName, pipeline: [], cursor: {}, readConcern: {level: "snapshot"}, txnNumber: NumberLong(txnNumber++) - }), - ErrorCodes.InvalidOptions); + })); // readConcern 'snapshot' is supported by count. assert.commandWorked(sessionDb.runCommand( diff --git a/jstests/noPassthrough/read_concern_snapshot_aggregation.js b/jstests/noPassthrough/read_concern_snapshot_aggregation.js new file mode 100644 index 00000000000..9dfa9e3d816 --- /dev/null +++ b/jstests/noPassthrough/read_concern_snapshot_aggregation.js @@ -0,0 +1,216 @@ +/** + * Tests for the aggregate command's support for readConcern level "snapshot". + * + * @tags: [requires_replication] + */ +(function() { + "use strict"; + + const kAdminDB = "admin"; + const kCollName = "coll"; + const kConfigDB = "config"; + const kDBName = "test"; + const kIllegalStageForSnapshotReadCode = 50742; + const kWCMajority = {writeConcern: {w: "majority"}}; + + let rst = new ReplSetTest({nodes: 1}); + rst.startSet(); + rst.initiate(); + let session = + rst.getPrimary().getDB(kDBName).getMongo().startSession({causalConsistency: false}); + let sessionDB = session.getDatabase(kDBName); + if (!sessionDB.serverStatus().storageEngine.supportsSnapshotReadConcern) { + rst.stopSet(); + return; + } + + let txnNumber = NumberLong(0); + assert.commandWorked(sessionDB.runCommand({create: kCollName})); + + function testSnapshotAggFailsWithCode(coll, pipeline, code) { + let cmd = {aggregate: coll, pipeline: pipeline, cursor: {}}; + + let cmdAsSnapshotRead = Object.extend({}, cmd); + cmdAsSnapshotRead.txnNumber = NumberLong(++txnNumber); + cmdAsSnapshotRead.readConcern = {level: "snapshot"}; + assert.commandFailedWithCode(sessionDB.runCommand(cmdAsSnapshotRead), code); + + // As a sanity check, also make sure that the command succeeds when run without a txn number + // and without a readConcern. + assert.commandWorked(sessionDB.runCommand(cmd)); + } + + // Test that $changeStream is disallowed with snapshot reads. + testSnapshotAggFailsWithCode(kCollName, [{$changeStream: {}}], ErrorCodes.InvalidOptions); + + // Test that $collStats is disallowed with snapshot reads. + testSnapshotAggFailsWithCode(kCollName, [{$collStats: {}}], kIllegalStageForSnapshotReadCode); + + // Test that $geoNear is disallowed with snapshot reads. + assert.commandWorked(sessionDB.getCollection(kCollName).createIndex({a: "2dsphere"})); + testSnapshotAggFailsWithCode(kCollName, + [{ + $geoNear: { + near: {type: "Point", coordinates: [0, 0]}, + distanceField: "distanceField", + spherical: true, + key: "a" + } + }], + kIllegalStageForSnapshotReadCode); + + // Test that $indexStats is disallowed with snapshot reads. + testSnapshotAggFailsWithCode(kCollName, [{$indexStats: {}}], kIllegalStageForSnapshotReadCode); + + // Test that $listLocalCursors is disallowed with snapshot reads. + testSnapshotAggFailsWithCode(1, [{$listLocalCursors: {}}], ErrorCodes.InvalidOptions); + + // Test that $listLocalSessions is disallowed with snapshot reads. + testSnapshotAggFailsWithCode(1, [{$listLocalSessions: {}}], ErrorCodes.InvalidOptions); + + // Test that $out is disallowed with snapshot reads. + testSnapshotAggFailsWithCode(kCollName, [{$out: "out"}], ErrorCodes.InvalidOptions); + + // Test that $listSessions is disallowed with snapshot reads. This stage must be run against + // 'system.sessions' in the config database. + sessionDB = session.getDatabase(kConfigDB); + testSnapshotAggFailsWithCode( + "system.sessions", [{$listSessions: {}}], kIllegalStageForSnapshotReadCode); + + // Test that $currentOp is disallowed with snapshot reads. We have to reassign 'sessionDB' to + // refer to the admin database, because $currentOp pipelines are required to run against + // 'admin'. + sessionDB = session.getDatabase(kAdminDB); + testSnapshotAggFailsWithCode(1, [{$currentOp: {}}], ErrorCodes.InvalidOptions); + sessionDB = session.getDatabase(kDBName); + + // Helper for testing that aggregation stages which involve a local and foreign collection + // ($lookup and $graphLookup) obey the expected readConcern "snapshot" isolation semantics. + // + // Inserts 'localDocsPre' into the 'local' collection and 'foreignDocsPre' into the 'foreign' + // collection. Then runs the first batch of 'pipeline', before inserting 'localDocsPost' into + // 'local' and 'foreignDocsPost' into 'foreign'. Iterates the remainder of the aggregation + // cursor and verifies that the result set matches 'expectedResults'. + function testLookupReadConcernSnapshotIsolation( + {localDocsPre, foreignDocsPre, localDocsPost, foreignDocsPost, pipeline, expectedResults}) { + let localColl = sessionDB.local; + let foreignColl = sessionDB.foreign; + localColl.drop(); + foreignColl.drop(); + assert.commandWorked(localColl.insert(localDocsPre, kWCMajority)); + assert.commandWorked(foreignColl.insert(foreignDocsPre, kWCMajority)); + let cmdRes = sessionDB.runCommand({ + aggregate: localColl.getName(), + pipeline: pipeline, + // TODO SERVER-33698: Remove this workaround once cursor establishment commands with + // batchSize:0 correctly open a snapshot for the read transaction. + cursor: {batchSize: 1}, + readConcern: {level: "snapshot"}, + txnNumber: NumberLong(++txnNumber) + }); + assert.commandWorked(cmdRes); + assert.neq(0, cmdRes.cursor.id); + assert.eq(1, cmdRes.cursor.firstBatch.length); + + assert.commandWorked(localColl.insert(localDocsPost, kWCMajority)); + assert.commandWorked(foreignColl.insert(foreignDocsPost, kWCMajority)); + let results = + new DBCommandCursor(sessionDB, cmdRes, undefined, undefined, NumberLong(txnNumber)) + .toArray(); + assert.eq(results, expectedResults); + } + + // Test that snapshot isolation works with $lookup using localField/foreignField syntax. + testLookupReadConcernSnapshotIsolation({ + localDocsPre: [{_id: 0}, {_id: 1}, {_id: 2}], + foreignDocsPre: [{_id: 1}], + localDocsPost: [{_id: 3}], + foreignDocsPost: [{_id: 2}, {_id: 3}], + pipeline: [ + {$lookup: {from: "foreign", localField: "_id", foreignField: "_id", as: "as"}}, + {$sort: {_id: 1}} + ], + expectedResults: [{_id: 0, as: []}, {_id: 1, as: [{_id: 1}]}, {_id: 2, as: []}] + }); + + // Test that snapshot isolation works with $lookup into a nested pipeline. + testLookupReadConcernSnapshotIsolation({ + localDocsPre: [{_id: 0}, {_id: 1}, {_id: 2}], + foreignDocsPre: [{_id: 1}], + localDocsPost: [{_id: 3}], + foreignDocsPost: [{_id: 2}, {_id: 3}], + pipeline: [ + { + $lookup: { + from: "foreign", + as: "as", + let : {localId: "$_id"}, + pipeline: [{$match: {$expr: {$eq: ["$_id", "$$localId"]}}}] + } + }, + {$sort: {_id: 1}} + ], + expectedResults: [{_id: 0, as: []}, {_id: 1, as: [{_id: 1}]}, {_id: 2, as: []}] + }); + + // Test that snapshot isolation works with $graphLookup. + testLookupReadConcernSnapshotIsolation({ + localDocsPre: [{_id: 0}, {_id: 1}, {_id: 2}], + foreignDocsPre: [{_id: 1, linkTo: 2}], + localDocsPost: [{_id: 3}], + foreignDocsPost: [{_id: 2, linkTo: 3}, {_id: 3}], + pipeline: [ + { + $graphLookup: { + from: "foreign", + as: "as", + startWith: "$_id", + connectFromField: "linkTo", + connectToField: "_id" + } + }, + {$sort: {_id: 1}} + ], + expectedResults: + [{_id: 0, as: []}, {_id: 1, as: [{_id: 1, linkTo: 2}]}, {_id: 2, as: []}] + }); + + // Test that snapshot reads are legal for $facet. + let coll = sessionDB.getCollection(kCollName); + coll.drop(); + assert.commandWorked(coll.insert( + [ + {group1: 1, group2: 1, val: 1}, + {group1: 1, group2: 2, val: 2}, + {group1: 2, group2: 2, val: 8} + ], + kWCMajority)); + + let cmdRes = sessionDB.runCommand({ + aggregate: kCollName, + pipeline: [ + { + $facet: { + g1: [{$group: {_id: "$group1", sum: {$sum: "$val"}}}, {$sort: {_id: 1}}], + g2: [{$group: {_id: "$group2", sum: {$sum: "$val"}}}, {$sort: {_id: 1}}] + } + }, + {$unwind: "$g1"}, + {$unwind: "$g2"}, + {$sort: {"g1._id": 1, "g2._id": 1}} + ], + cursor: {}, + readConcern: {level: "snapshot"}, + txnNumber: NumberLong(++txnNumber) + }); + assert.commandWorked(cmdRes); + assert.eq(0, cmdRes.cursor.id); + assert.eq(cmdRes.cursor.firstBatch, [ + {g1: {_id: 1, sum: 3}, g2: {_id: 1, sum: 1}}, + {g1: {_id: 1, sum: 3}, g2: {_id: 2, sum: 10}}, + {g1: {_id: 2, sum: 8}, g2: {_id: 1, sum: 1}}, + {g1: {_id: 2, sum: 8}, g2: {_id: 2, sum: 10}} + ]); + + rst.stopSet(); +}()); diff --git a/jstests/noPassthrough/read_concern_snapshot_yielding.js b/jstests/noPassthrough/read_concern_snapshot_yielding.js index aa953e11927..dc600721b59 100644 --- a/jstests/noPassthrough/read_concern_snapshot_yielding.js +++ b/jstests/noPassthrough/read_concern_snapshot_yielding.js @@ -237,6 +237,19 @@ res.cursor.nextBatch.length, TestData.numDocs - initialFindBatchSize, tojson(res)); }, {"originatingCommand.filter": {x: 1}}, {op: "getmore"}); + // Test aggregate. + testCommand(function() { + const res = assert.commandWorked(db.runCommand({ + aggregate: "coll", + pipeline: [{$match: {x: 1}}], + readConcern: {level: "snapshot"}, + cursor: {}, + lsid: TestData.sessionId, + txnNumber: NumberLong(TestData.txnNumber) + })); + assert.eq(res.cursor.firstBatch.length, TestData.numDocs, tojson(res)); + }, {"command.pipeline": [{$match: {x: 1}}]}, {"command.pipeline": [{$match: {x: 1}}]}); + // Test update. // We cannot provide a 'profilerFilter' because profiling is turned off for write commands in // transactions. diff --git a/jstests/noPassthrough/snapshot_reads.js b/jstests/noPassthrough/snapshot_reads.js index efa80492b78..65a37fdc4db 100644 --- a/jstests/noPassthrough/snapshot_reads.js +++ b/jstests/noPassthrough/snapshot_reads.js @@ -20,7 +20,7 @@ } const secondaryDB = rst.getSecondary().getDB(dbName); - function runTest({useCausalConsistency, readFromSecondary}) { + function runTest({useCausalConsistency, readFromSecondary, establishCursorCmd}) { primaryDB.coll.drop(); let readDB = primaryDB; @@ -43,14 +43,14 @@ let txnNumber = 0; + // Augment the cursor-establishing command with the proper readConcern and transaction + // number. + let cursorCmd = Object.extend({}, establishCursorCmd); + cursorCmd.readConcern = {level: "snapshot"}; + cursorCmd.txnNumber = NumberLong(txnNumber); + // Establish a snapshot cursor, fetching the first 5 documents. - let res = assert.commandWorked(sessionDb.runCommand({ - find: collName, - sort: {_id: 1}, - batchSize: 5, - readConcern: {level: "snapshot"}, - txnNumber: NumberLong(txnNumber) - })); + let res = assert.commandWorked(sessionDb.runCommand(cursorCmd)); assert(res.hasOwnProperty("cursor")); assert(res.cursor.hasOwnProperty("firstBatch")); @@ -132,10 +132,19 @@ session.endSession(); } - runTest({useCausalConsistency: false, readFromSecondary: false}); - runTest({useCausalConsistency: true, readFromSecondary: false}); - runTest({useCausalConsistency: false, readFromSecondary: true}); - runTest({useCausalConsistency: true, readFromSecondary: true}); + // Test snapshot reads using find. + let findCmd = {find: collName, sort: {_id: 1}, batchSize: 5}; + runTest({useCausalConsistency: false, readFromSecondary: false, establishCursorCmd: findCmd}); + runTest({useCausalConsistency: true, readFromSecondary: false, establishCursorCmd: findCmd}); + runTest({useCausalConsistency: false, readFromSecondary: true, establishCursorCmd: findCmd}); + runTest({useCausalConsistency: true, readFromSecondary: true, establishCursorCmd: findCmd}); + + // Test snapshot reads using aggregate. + let aggCmd = {aggregate: collName, pipeline: [{$sort: {_id: 1}}], cursor: {batchSize: 5}}; + runTest({useCausalConsistency: false, readFromSecondary: false, establishCursorCmd: aggCmd}); + runTest({useCausalConsistency: true, readFromSecondary: false, establishCursorCmd: aggCmd}); + runTest({useCausalConsistency: false, readFromSecondary: true, establishCursorCmd: aggCmd}); + runTest({useCausalConsistency: true, readFromSecondary: true, establishCursorCmd: aggCmd}); rst.stopSet(); })(); diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 7cd41a2a6a8..9f4e12cd062 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -197,13 +197,7 @@ public: Collection* const collection = ctx->getCollection(); // We have a parsed query. Time to get the execution plan for it. - auto readConcernArgs = repl::ReadConcernArgs::get(opCtx); - auto yieldPolicy = - readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern - ? PlanExecutor::INTERRUPT_ONLY - : PlanExecutor::YIELD_AUTO; - auto statusWithPlanExecutor = - getExecutorFind(opCtx, collection, nss, std::move(cq), yieldPolicy); + auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, nss, std::move(cq)); if (!statusWithPlanExecutor.isOK()) { return statusWithPlanExecutor.getStatus(); } @@ -315,13 +309,7 @@ public: Collection* const collection = ctx->getCollection(); // Get the execution plan for the query. - auto readConcernArgs = repl::ReadConcernArgs::get(opCtx); - auto yieldPolicy = - readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern - ? PlanExecutor::INTERRUPT_ONLY - : PlanExecutor::YIELD_AUTO; - auto statusWithPlanExecutor = - getExecutorFind(opCtx, collection, nss, std::move(cq), yieldPolicy); + auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, nss, std::move(cq)); if (!statusWithPlanExecutor.isOK()) { return CommandHelpers::appendCommandStatus(result, statusWithPlanExecutor.getStatus()); } diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 17622862af1..f1ee7d5717c 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -187,18 +187,26 @@ public: */ enum class FacetRequirement { kAllowed, kNotAllowed }; + /** + * Indicates whether or not this stage is legal when the read concern for the aggregate has + * readConcern level "snapshot" or is running inside of a multi-document transaction. + */ + enum class TransactionRequirement { kNotAllowed, kAllowed }; + StageConstraints( StreamType streamType, PositionRequirement requiredPosition, HostTypeRequirement hostRequirement, DiskUseRequirement diskRequirement, FacetRequirement facetRequirement, + TransactionRequirement transactionRequirement, ChangeStreamRequirement changeStreamRequirement = ChangeStreamRequirement::kBlacklist) : requiredPosition(requiredPosition), hostRequirement(hostRequirement), diskRequirement(diskRequirement), changeStreamRequirement(changeStreamRequirement), facetRequirement(facetRequirement), + transactionRequirement(transactionRequirement), streamType(streamType) { // Stages which are allowed to run in $facet must not have any position requirements. invariant( @@ -219,6 +227,18 @@ public: // A stage which is whitelisted for $changeStream cannot have a position requirement. invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist && requiredPosition != PositionRequirement::kNone)); + + // Change stream stages should not be permitted with readConcern level "snapshot" or + // inside of a multi-document transaction. + if (isChangeStreamStage()) { + invariant(!isAllowedInTransaction()); + } + + // Stages which write data to user collections should not be permitted with readConcern + // level "snapshot" or inside of a multi-document transaction. + if (diskRequirement == DiskUseRequirement::kWritesPersistentData) { + invariant(!isAllowedInTransaction()); + } } /** @@ -262,6 +282,14 @@ public: return changeStreamRequirement == ChangeStreamRequirement::kChangeStreamStage; } + /** + * Returns true if this stage is legal when the readConcern level is "snapshot" or when this + * aggregation is being run within a multi-document transaction. + */ + bool isAllowedInTransaction() const { + return transactionRequirement == TransactionRequirement::kAllowed; + } + // Indicates whether this stage needs to be at a particular position in the pipeline. const PositionRequirement requiredPosition; @@ -280,6 +308,10 @@ public: // Indicates whether this stage may run inside a $facet stage. const FacetRequirement facetRequirement; + // Indicates whether this stage is legal when the readConcern level is "snapshot" or the + // aggregate is running inside of a multi-document transaction. + const TransactionRequirement transactionRequirement; + // Indicates whether this is a streaming or blocking stage. const StreamType streamType; @@ -309,6 +341,7 @@ public: using DiskUseRequirement = StageConstraints::DiskUseRequirement; using FacetRequirement = StageConstraints::FacetRequirement; using StreamType = StageConstraints::StreamType; + using TransactionRequirement = StageConstraints::TransactionRequirement; /** * This is what is returned from the main DocumentSource API: getNext(). It is essentially a diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h index 77fd0173d9c..a5b041d741e 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.h +++ b/src/mongo/db/pipeline/document_source_bucket_auto.h @@ -53,7 +53,8 @@ public: PositionRequirement::kNone, HostTypeRequirement::kNone, DiskUseRequirement::kWritesTmpData, - FacetRequirement::kAllowed}; + FacetRequirement::kAllowed, + TransactionRequirement::kAllowed}; } /** diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 6f4ac3ffc57..f4cee26c9c2 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -106,6 +106,7 @@ DocumentSource::StageConstraints DocumentSourceOplogMatch::constraints( HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage}; } @@ -157,6 +158,7 @@ public: : HostTypeRequirement::kMongoS), DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage}; } diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index a0d5b643d96..3401f9e7ba1 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -66,6 +66,7 @@ public: HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage}; } @@ -104,6 +105,7 @@ public: : HostTypeRequirement::kMongoS), DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage}; } diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h index 5b7bf719a14..17cd974cb60 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.h +++ b/src/mongo/db/pipeline/document_source_coll_stats.h @@ -79,7 +79,8 @@ public: PositionRequirement::kFirst, HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed); + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed); constraints.requiresInputDocSource = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h index 14e95983f70..cb180a99085 100644 --- a/src/mongo/db/pipeline/document_source_current_op.h +++ b/src/mongo/db/pipeline/document_source_current_op.h @@ -83,7 +83,8 @@ public: PositionRequirement::kFirst, HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed); + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed); constraints.isIndependentOfAnyCollection = true; constraints.requiresInputDocSource = false; diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index b090f195bbe..10d049cef28 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -210,8 +210,9 @@ Value DocumentSourceCursor::serialize(boost::optional { auto opCtx = pExpCtx->opCtx; - AutoGetDb dbLock(opCtx, _exec->nss().db(), MODE_IS); - Lock::CollectionLock collLock(opCtx->lockState(), _exec->nss().ns(), MODE_IS); + auto lockMode = getLockModeForQuery(opCtx); + AutoGetDb dbLock(opCtx, _exec->nss().db(), lockMode); + Lock::CollectionLock collLock(opCtx->lockState(), _exec->nss().ns(), lockMode); auto collection = dbLock.getDb() ? dbLock.getDb()->getCollection(opCtx, _exec->nss()) : nullptr; @@ -266,8 +267,9 @@ void DocumentSourceCursor::cleanupExecutor() { // already have been marked as killed when the collection was dropped, and we won't need to // access the CursorManager to properly dispose of it. UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - AutoGetDb dbLock(opCtx, _exec->nss().db(), MODE_IS); - Lock::CollectionLock collLock(opCtx->lockState(), _exec->nss().ns(), MODE_IS); + auto lockMode = getLockModeForQuery(opCtx); + AutoGetDb dbLock(opCtx, _exec->nss().db(), lockMode); + Lock::CollectionLock collLock(opCtx->lockState(), _exec->nss().ns(), lockMode); auto collection = dbLock.getDb() ? dbLock.getDb()->getCollection(opCtx, _exec->nss()) : nullptr; auto cursorManager = collection ? collection->getCursorManager() : nullptr; _exec->dispose(opCtx, cursorManager); diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index 6f4e4a1fc01..fc9b37f9403 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -58,7 +58,8 @@ public: PositionRequirement::kFirst, HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed); + FacetRequirement::kNotAllowed, + TransactionRequirement::kAllowed); constraints.requiresInputDocSource = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index 050ffa9f331..bc49ab6371f 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -263,7 +263,8 @@ DocumentSource::StageConstraints DocumentSourceFacet::constraints( PositionRequirement::kNone, host, mayUseDisk ? DiskUseRequirement::kWritesTmpData : DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed}; + FacetRequirement::kNotAllowed, + TransactionRequirement::kAllowed}; } DocumentSource::GetDepsReturn DocumentSourceFacet::getDependencies(DepsTracker* deps) const { diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp index c706a5ca31e..25f8cdc7a07 100644 --- a/src/mongo/db/pipeline/document_source_facet_test.cpp +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -216,7 +216,8 @@ public: PositionRequirement::kNone, HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kAllowed}; + FacetRequirement::kAllowed, + TransactionRequirement::kAllowed}; } DocumentSource::GetNextResult getNext() final { @@ -671,7 +672,8 @@ public: PositionRequirement::kNone, HostTypeRequirement::kPrimaryShard, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kAllowed}; + FacetRequirement::kAllowed, + TransactionRequirement::kAllowed}; } static boost::intrusive_ptr create() { diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h index 376e4a03ea0..f5a7f0cf67d 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.h +++ b/src/mongo/db/pipeline/document_source_geo_near.h @@ -54,7 +54,8 @@ public: PositionRequirement::kFirst, HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed); + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed); constraints.requiresInputDocSource = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h index 9b5882e3f52..527c71a21c2 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.h +++ b/src/mongo/db/pipeline/document_source_graph_lookup.h @@ -58,7 +58,8 @@ public: PositionRequirement::kNone, HostTypeRequirement::kPrimaryShard, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kAllowed); + FacetRequirement::kAllowed, + TransactionRequirement::kAllowed); constraints.canSwapWithMatch = true; return constraints; diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index 0b7120c3fc0..af262fd739e 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -74,7 +74,8 @@ public: PositionRequirement::kNone, HostTypeRequirement::kNone, DiskUseRequirement::kWritesTmpData, - FacetRequirement::kAllowed}; + FacetRequirement::kAllowed, + TransactionRequirement::kAllowed}; } /** diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h index b485db6d748..68ebc5ecfd1 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.h +++ b/src/mongo/db/pipeline/document_source_index_stats.h @@ -74,7 +74,8 @@ public: PositionRequirement::kFirst, HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed); + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed); constraints.requiresInputDocSource = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h index 806779641af..9fbf57fe8cc 100644 --- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h +++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h @@ -57,7 +57,8 @@ public: PositionRequirement::kNone, HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kAllowed}; + FacetRequirement::kAllowed, + TransactionRequirement::kAllowed}; } GetNextResult getNext() final; diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h index d2f0e89754a..bf93c58c100 100644 --- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h +++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h @@ -71,7 +71,8 @@ public: PositionRequirement::kNone, _mergeType, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kAllowed}; + FacetRequirement::kAllowed, + TransactionRequirement::kAllowed}; } GetNextResult getNext() final; diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h index f58321289d8..3c8c21c2511 100644 --- a/src/mongo/db/pipeline/document_source_limit.h +++ b/src/mongo/db/pipeline/document_source_limit.h @@ -53,7 +53,8 @@ public: PositionRequirement::kNone, HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kAllowed}; + FacetRequirement::kAllowed, + TransactionRequirement::kAllowed}; } GetNextResult getNext() final; diff --git a/src/mongo/db/pipeline/document_source_list_local_cursors.h b/src/mongo/db/pipeline/document_source_list_local_cursors.h index 4746c4933e7..7bd9e3ffed6 100644 --- a/src/mongo/db/pipeline/document_source_list_local_cursors.h +++ b/src/mongo/db/pipeline/document_source_list_local_cursors.h @@ -87,7 +87,8 @@ public: PositionRequirement::kFirst, HostTypeRequirement::kLocalOnly, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed); + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed); constraints.isIndependentOfAnyCollection = true; constraints.requiresInputDocSource = false; diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h index 9674feb6e66..e6e87e0c75c 100644 --- a/src/mongo/db/pipeline/document_source_list_local_sessions.h +++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h @@ -98,7 +98,8 @@ public: PositionRequirement::kFirst, HostTypeRequirement::kLocalOnly, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed); + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed); constraints.isIndependentOfAnyCollection = true; constraints.requiresInputDocSource = false; diff --git a/src/mongo/db/pipeline/document_source_list_sessions.h b/src/mongo/db/pipeline/document_source_list_sessions.h index 57ab2e2eb5d..32db9a6157c 100644 --- a/src/mongo/db/pipeline/document_source_list_sessions.h +++ b/src/mongo/db/pipeline/document_source_list_sessions.h @@ -90,7 +90,8 @@ public: PositionRequirement::kFirst, HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed}; + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed}; } static boost::intrusive_ptr createFromBson( diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index 530c62f985c..c9f8872f580 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -110,7 +110,8 @@ public: HostTypeRequirement::kPrimaryShard, mayUseDisk ? DiskUseRequirement::kWritesTmpData : DiskUseRequirement::kNoDiskUse, - FacetRequirement::kAllowed); + FacetRequirement::kAllowed, + TransactionRequirement::kAllowed); constraints.canSwapWithMatch = true; return constraints; diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h index 416088fb742..8f377b46986 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h @@ -70,6 +70,7 @@ public: : HostTypeRequirement::kMongoS, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, ChangeStreamRequirement::kChangeStreamStage); constraints.canSwapWithMatch = true; diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h index 4614ee5d50b..4deb71b7118 100644 --- a/src/mongo/db/pipeline/document_source_match.h +++ b/src/mongo/db/pipeline/document_source_match.h @@ -56,6 +56,7 @@ public: HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, + TransactionRequirement::kAllowed, ChangeStreamRequirement::kWhitelist}; } diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h index e23772f411b..cb74f4bd9ef 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.h +++ b/src/mongo/db/pipeline/document_source_merge_cursors.h @@ -86,7 +86,10 @@ public: PositionRequirement::kFirst, HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed); + FacetRequirement::kNotAllowed, + // TODO SERVER-33683: Permit $mergeCursors with readConcern + // level "snapshot". + TransactionRequirement::kNotAllowed); constraints.requiresInputDocSource = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h index 70f8bbc2d8e..cc81d533fb2 100644 --- a/src/mongo/db/pipeline/document_source_mock.h +++ b/src/mongo/db/pipeline/document_source_mock.h @@ -54,7 +54,8 @@ public: PositionRequirement::kFirst, HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed); + FacetRequirement::kNotAllowed, + TransactionRequirement::kAllowed); constraints.requiresInputDocSource = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 26e49e9f54c..ad6659f42b0 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -49,7 +49,8 @@ public: PositionRequirement::kLast, HostTypeRequirement::kPrimaryShard, DiskUseRequirement::kWritesPersistentData, - FacetRequirement::kNotAllowed}; + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed}; } // Virtuals for SplittableDocumentSource diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h index c6c83794aec..af21e236d17 100644 --- a/src/mongo/db/pipeline/document_source_redact.h +++ b/src/mongo/db/pipeline/document_source_redact.h @@ -46,6 +46,7 @@ public: HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, FacetRequirement::kAllowed, + TransactionRequirement::kAllowed, ChangeStreamRequirement::kWhitelist}; } diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h index deb3e3b033e..1ac496305ee 100644 --- a/src/mongo/db/pipeline/document_source_sample.h +++ b/src/mongo/db/pipeline/document_source_sample.h @@ -48,7 +48,8 @@ public: PositionRequirement::kNone, HostTypeRequirement::kNone, DiskUseRequirement::kWritesTmpData, - FacetRequirement::kAllowed}; + FacetRequirement::kAllowed, + TransactionRequirement::kAllowed}; } GetDepsReturn getDependencies(DepsTracker* deps) const final { diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h index 9dc724dbcfb..bf7f75252ed 100644 --- a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h +++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h @@ -49,7 +49,8 @@ public: PositionRequirement::kFirst, HostTypeRequirement::kAnyShard, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed}; + FacetRequirement::kNotAllowed, + TransactionRequirement::kAllowed}; } static boost::intrusive_ptr create( diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h index 48a98a5b0c3..102731d4a32 100644 --- a/src/mongo/db/pipeline/document_source_sequential_document_cache.h +++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h @@ -55,7 +55,8 @@ public: : PositionRequirement::kNone, HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed); + FacetRequirement::kNotAllowed, + TransactionRequirement::kAllowed); constraints.requiresInputDocSource = (_cache->isBuilding()); return constraints; diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index 22689123099..bffa484c46b 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -113,6 +113,9 @@ public: (getType() == TransformerInterface::TransformerType::kChangeStreamTransformation ? FacetRequirement::kNotAllowed : FacetRequirement::kAllowed), + (getType() == TransformerInterface::TransformerType::kChangeStreamTransformation + ? TransactionRequirement::kNotAllowed + : TransactionRequirement::kAllowed), (getType() == TransformerInterface::TransformerType::kChangeStreamTransformation ? ChangeStreamRequirement::kChangeStreamStage : ChangeStreamRequirement::kWhitelist)); diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h index cc3bca48d1b..760e887f8b4 100644 --- a/src/mongo/db/pipeline/document_source_skip.h +++ b/src/mongo/db/pipeline/document_source_skip.h @@ -55,7 +55,8 @@ public: PositionRequirement::kNone, HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kAllowed}; + FacetRequirement::kAllowed, + TransactionRequirement::kAllowed}; } GetNextResult getNext() final; diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index a9b2ed14c2f..d2ad1cd9ec4 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -69,6 +69,7 @@ public: HostTypeRequirement::kNone, _mergingPresorted ? DiskUseRequirement::kNoDiskUse : DiskUseRequirement::kWritesTmpData, _mergingPresorted ? FacetRequirement::kNotAllowed : FacetRequirement::kAllowed, + TransactionRequirement::kAllowed, _mergingPresorted ? ChangeStreamRequirement::kWhitelist : ChangeStreamRequirement::kBlacklist); diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.h b/src/mongo/db/pipeline/document_source_tee_consumer.h index 397c5d28128..75b33ea8415 100644 --- a/src/mongo/db/pipeline/document_source_tee_consumer.h +++ b/src/mongo/db/pipeline/document_source_tee_consumer.h @@ -58,7 +58,8 @@ public: PositionRequirement::kNone, HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kAllowed}; + FacetRequirement::kAllowed, + TransactionRequirement::kAllowed}; } GetNextResult getNext() final; diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h index a2c68f9c8e0..763d0523642 100644 --- a/src/mongo/db/pipeline/document_source_unwind.h +++ b/src/mongo/db/pipeline/document_source_unwind.h @@ -51,7 +51,8 @@ public: PositionRequirement::kNone, HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kAllowed); + FacetRequirement::kAllowed, + TransactionRequirement::kAllowed); constraints.canSwapWithMatch = true; return constraints; diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 0eb286bca28..fafcd0d910f 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -120,11 +120,7 @@ StatusWith> Pipeline::createTopLevelO std::unique_ptr pipeline(new Pipeline(std::move(stages), expCtx), PipelineDeleter(expCtx->opCtx)); try { - if (isFacetPipeline) { - pipeline->validateFacetPipeline(); - } else { - pipeline->validatePipeline(); - } + pipeline->validate(isFacetPipeline); } catch (const DBException& ex) { return ex.toStatus(); } @@ -133,7 +129,17 @@ StatusWith> Pipeline::createTopLevelO return std::move(pipeline); } -void Pipeline::validatePipeline() const { +void Pipeline::validate(bool isFacetPipeline) const { + if (isFacetPipeline) { + validateFacetPipeline(); + } else { + validateTopLevelPipeline(); + } + + validateCommon(); +} + +void Pipeline::validateTopLevelPipeline() const { // Verify that the specified namespace is valid for the initial stage of this pipeline. const NamespaceString& nss = pCtx->ns; @@ -173,9 +179,6 @@ void Pipeline::validatePipeline() const { } } } - - // Verify that each stage is in a legal position within the pipeline. - ensureAllStagesAreInLegalPositions(); } void Pipeline::validateFacetPipeline() const { @@ -194,14 +197,12 @@ void Pipeline::validateFacetPipeline() const { invariant(stageConstraints.requiredPosition == PositionRequirement::kNone); invariant(!stageConstraints.isIndependentOfAnyCollection); } - - // Facet pipelines cannot have any stages which are initial sources. We've already validated the - // first stage, and the 'ensureAllStagesAreInLegalPositions' method checks that there are no - // initial sources in positions 1...N, so we can just return its result directly. - ensureAllStagesAreInLegalPositions(); } -void Pipeline::ensureAllStagesAreInLegalPositions() const { +void Pipeline::validateCommon() const { + // TODO SERVER-33551: Don't use presence of WUOW to decide whether we are in a snapshot read or + // multi-doc transaction. + const bool isSnapshotReadOrTxn = static_cast(pCtx->opCtx->getWriteUnitOfWork()); size_t i = 0; for (auto&& stage : _sources) { auto constraints = stage->constraints(_splitState); @@ -229,6 +230,14 @@ void Pipeline::ensureAllStagesAreInLegalPositions() const { uassert(40644, str::stream() << stage->getSourceName() << " can only be run on mongoS", !(constraints.hostRequirement == HostTypeRequirement::kMongoS && !pCtx->inMongos)); + + if (isSnapshotReadOrTxn) { + uassert(50742, + str::stream() << "Stage not supported with readConcern level \"snapshot\" " + "or inside of a multi-document transaction: " + << stage->getSourceName(), + constraints.isAllowedInTransaction()); + } } } diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 6aba6d38d66..bcb7ee64521 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -338,21 +338,29 @@ private: * is present then it must come last in the pipeline, while initial stages such as $indexStats * must be at the start. */ - void validatePipeline() const; + void validate(bool isFacetPipeline) const; /** - * Throws if the $facet pipeline fails any of a set of semantic checks. For example, the - * pipeline cannot be empty and may not contain any initial stages. + * Performs validation checking specific to top-level pipelines. Throws if the pipeline is + * invalid. + */ + void validateTopLevelPipeline() const; + + /** + * Performs validation checking specific to nested $facet pipelines. Throws if the pipeline is + * invalid. */ void validateFacetPipeline() const; /** - * Helper method which validates that each stage in pipeline is in a legal position. For - * example, $out must be at the end, while a $match stage with a text query must be at the - * start. Note that this method accepts an initial source as the first stage, which is illegal - * for $facet pipelines. + * Performs common validation for top-level or facet pipelines. Throws if the pipeline is + * invalid. + * + * Includes checking for illegal stage positioning. For example, $out must be at the end, while + * a $match stage with a text query must be at the start. Note that this method accepts an + * initial source as the first stage, which is illegal for $facet pipelines. */ - void ensureAllStagesAreInLegalPositions() const; + void validateCommon() const; /** * Returns Status::OK if the pipeline can run on mongoS, or an error with a message explaining diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index b0718930f2c..ae130fa7595 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -214,8 +214,7 @@ StatusWith> attemptToGetExe return {cq.getStatus()}; } - return getExecutorFind( - opCtx, collection, nss, std::move(cq.getValue()), PlanExecutor::YIELD_AUTO, plannerOpts); + return getExecutorFind(opCtx, collection, nss, std::move(cq.getValue()), plannerOpts); } BSONObj removeSortKeyMetaProjection(BSONObj projectionObj) { diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 305e0f20d0f..19df0485811 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -2181,7 +2181,8 @@ public: PositionRequirement::kNone, HostTypeRequirement::kMongoS, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed}; + FacetRequirement::kNotAllowed, + TransactionRequirement::kAllowed}; } static boost::intrusive_ptr create() { @@ -2337,9 +2338,10 @@ TEST(PipelineInitialSource, MatchInitialQuery) { ASSERT_BSONOBJ_EQ(pipe->getInitialQuery(), BSON("a" << 4)); } -namespace Namespaces { +// Contains test cases for validation done on pipeline creation. +namespace pipeline_validate { -using PipelineInitialSourceNSTest = AggregationContextFixture; +using PipelineValidateTest = AggregationContextFixture; class DocumentSourceCollectionlessMock : public DocumentSourceMock { public: @@ -2350,7 +2352,8 @@ public: PositionRequirement::kFirst, HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed); + FacetRequirement::kNotAllowed, + TransactionRequirement::kAllowed); constraints.isIndependentOfAnyCollection = true; constraints.requiresInputDocSource = false; return constraints; @@ -2361,7 +2364,7 @@ public: } }; -TEST_F(PipelineInitialSourceNSTest, AggregateOneNSNotValidForEmptyPipeline) { +TEST_F(PipelineValidateTest, AggregateOneNSNotValidForEmptyPipeline) { const std::vector rawPipeline = {}; auto ctx = getExpCtx(); @@ -2370,7 +2373,7 @@ TEST_F(PipelineInitialSourceNSTest, AggregateOneNSNotValidForEmptyPipeline) { ASSERT_NOT_OK(Pipeline::parse(rawPipeline, ctx).getStatus()); } -TEST_F(PipelineInitialSourceNSTest, AggregateOneNSNotValidIfInitialStageRequiresCollection) { +TEST_F(PipelineValidateTest, AggregateOneNSNotValidIfInitialStageRequiresCollection) { const std::vector rawPipeline = {fromjson("{$match: {}}")}; auto ctx = getExpCtx(); @@ -2379,7 +2382,7 @@ TEST_F(PipelineInitialSourceNSTest, AggregateOneNSNotValidIfInitialStageRequires ASSERT_NOT_OK(Pipeline::parse(rawPipeline, ctx).getStatus()); } -TEST_F(PipelineInitialSourceNSTest, AggregateOneNSValidIfInitialStageIsCollectionless) { +TEST_F(PipelineValidateTest, AggregateOneNSValidIfInitialStageIsCollectionless) { auto collectionlessSource = DocumentSourceCollectionlessMock::create(); auto ctx = getExpCtx(); @@ -2388,7 +2391,7 @@ TEST_F(PipelineInitialSourceNSTest, AggregateOneNSValidIfInitialStageIsCollectio ASSERT_OK(Pipeline::create({collectionlessSource}, ctx).getStatus()); } -TEST_F(PipelineInitialSourceNSTest, CollectionNSNotValidIfInitialStageIsCollectionless) { +TEST_F(PipelineValidateTest, CollectionNSNotValidIfInitialStageIsCollectionless) { auto collectionlessSource = DocumentSourceCollectionlessMock::create(); auto ctx = getExpCtx(); @@ -2397,7 +2400,7 @@ TEST_F(PipelineInitialSourceNSTest, CollectionNSNotValidIfInitialStageIsCollecti ASSERT_NOT_OK(Pipeline::create({collectionlessSource}, ctx).getStatus()); } -TEST_F(PipelineInitialSourceNSTest, AggregateOneNSValidForFacetPipelineRegardlessOfInitialStage) { +TEST_F(PipelineValidateTest, AggregateOneNSValidForFacetPipelineRegardlessOfInitialStage) { const std::vector rawPipeline = {fromjson("{$match: {}}")}; auto ctx = getExpCtx(); @@ -2406,7 +2409,7 @@ TEST_F(PipelineInitialSourceNSTest, AggregateOneNSValidForFacetPipelineRegardles ASSERT_OK(Pipeline::parseFacetPipeline(rawPipeline, ctx).getStatus()); } -TEST_F(PipelineInitialSourceNSTest, ChangeStreamIsValidAsFirstStage) { +TEST_F(PipelineValidateTest, ChangeStreamIsValidAsFirstStage) { const std::vector rawPipeline = {fromjson("{$changeStream: {}}")}; auto ctx = getExpCtx(); setMockReplicationCoordinatorOnOpCtx(ctx->opCtx); @@ -2414,7 +2417,7 @@ TEST_F(PipelineInitialSourceNSTest, ChangeStreamIsValidAsFirstStage) { ASSERT_OK(Pipeline::parse(rawPipeline, ctx).getStatus()); } -TEST_F(PipelineInitialSourceNSTest, ChangeStreamIsNotValidIfNotFirstStage) { +TEST_F(PipelineValidateTest, ChangeStreamIsNotValidIfNotFirstStage) { const std::vector rawPipeline = {fromjson("{$match: {custom: 'filter'}}"), fromjson("{$changeStream: {}}")}; auto ctx = getExpCtx(); @@ -2424,7 +2427,7 @@ TEST_F(PipelineInitialSourceNSTest, ChangeStreamIsNotValidIfNotFirstStage) { ASSERT_EQ(parseStatus, ErrorCodes::duplicateCodeForTest(40602)); } -TEST_F(PipelineInitialSourceNSTest, ChangeStreamIsNotValidIfNotFirstStageInFacet) { +TEST_F(PipelineValidateTest, ChangeStreamIsNotValidIfNotFirstStageInFacet) { const std::vector rawPipeline = {fromjson("{$match: {custom: 'filter'}}"), fromjson("{$changeStream: {}}")}; auto ctx = getExpCtx(); @@ -2435,7 +2438,61 @@ TEST_F(PipelineInitialSourceNSTest, ChangeStreamIsNotValidIfNotFirstStageInFacet ASSERT(std::string::npos != parseStatus.reason().find("$changeStream")); } -} // namespace Namespaces +class DocumentSourceDisallowedWithSnapshotReads : public DocumentSourceMock { +public: + DocumentSourceDisallowedWithSnapshotReads() : DocumentSourceMock({}) {} + + StageConstraints constraints(Pipeline::SplitState pipeState) const final { + return StageConstraints{StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed, + TransactionRequirement::kNotAllowed}; + } + + static boost::intrusive_ptr create() { + return new DocumentSourceDisallowedWithSnapshotReads(); + } +}; + +TEST_F(PipelineValidateTest, TopLevelPipelineValidatedForStagesIllegalWithSnapshotReads) { + BSONObj readConcernSnapshot = BSON("readConcern" << BSON("level" + << "snapshot")); + auto ctx = getExpCtx(); + auto&& readConcernArgs = repl::ReadConcernArgs::get(ctx->opCtx); + ASSERT_OK(readConcernArgs.initialize(readConcernSnapshot["readConcern"])); + ASSERT(readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern); + ctx->opCtx->setWriteUnitOfWork(stdx::make_unique(ctx->opCtx)); + + // Make a pipeline with a legal $match, and then an illegal mock stage, and verify that pipeline + // creation fails with the expected error code. + auto matchStage = DocumentSourceMatch::create(BSON("_id" << 3), ctx); + auto illegalStage = DocumentSourceDisallowedWithSnapshotReads::create(); + auto pipeline = Pipeline::create({matchStage, illegalStage}, ctx); + ASSERT_NOT_OK(pipeline.getStatus()); + ASSERT_EQ(pipeline.getStatus(), ErrorCodes::duplicateCodeForTest(50742)); +} + +TEST_F(PipelineValidateTest, FacetPipelineValidatedForStagesIllegalWithSnapshotReads) { + BSONObj readConcernSnapshot = BSON("readConcern" << BSON("level" + << "snapshot")); + auto ctx = getExpCtx(); + auto&& readConcernArgs = repl::ReadConcernArgs::get(ctx->opCtx); + ASSERT_OK(readConcernArgs.initialize(readConcernSnapshot["readConcern"])); + ASSERT(readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern); + ctx->opCtx->setWriteUnitOfWork(stdx::make_unique(ctx->opCtx)); + + // Make a pipeline with a legal $match, and then an illegal mock stage, and verify that pipeline + // creation fails with the expected error code. + auto matchStage = DocumentSourceMatch::create(BSON("_id" << 3), ctx); + auto illegalStage = DocumentSourceDisallowedWithSnapshotReads::create(); + auto pipeline = Pipeline::createFacetPipeline({matchStage, illegalStage}, ctx); + ASSERT_NOT_OK(pipeline.getStatus()); + ASSERT_EQ(pipeline.getStatus(), ErrorCodes::duplicateCodeForTest(50742)); +} + +} // namespace pipeline_validate namespace Dependencies { @@ -2468,7 +2525,8 @@ public: PositionRequirement::kNone, HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kAllowed}; + FacetRequirement::kAllowed, + TransactionRequirement::kAllowed}; } }; diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index cf2a8fcfe36..85370429156 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -572,8 +572,7 @@ std::string runQuery(OperationContext* opCtx, } // We have a parsed query. Time to get the execution plan for it. - auto exec = uassertStatusOK( - getExecutorFind(opCtx, collection, nss, std::move(cq), PlanExecutor::YIELD_AUTO)); + auto exec = uassertStatusOK(getExecutorLegacyFind(opCtx, collection, nss, std::move(cq))); const QueryRequest& qr = exec->getCanonicalQuery()->getQueryRequest(); diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index 111d2884174..4074dab4ca4 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -641,9 +641,7 @@ StatusWith> getOplogStartHack( opCtx, std::move(ws), std::move(cs), std::move(cq), collection, PlanExecutor::YIELD_AUTO); } -} // namespace - -StatusWith> getExecutorFind( +StatusWith> _getExecutorFind( OperationContext* opCtx, Collection* collection, const NamespaceString& nss, @@ -665,6 +663,35 @@ StatusWith> getExecutorFind( return getExecutor(opCtx, collection, std::move(canonicalQuery), yieldPolicy, plannerOptions); } +} // namespace + +StatusWith> getExecutorFind( + OperationContext* opCtx, + Collection* collection, + const NamespaceString& nss, + unique_ptr canonicalQuery, + size_t plannerOptions) { + auto readConcernArgs = repl::ReadConcernArgs::get(opCtx); + auto yieldPolicy = readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern + ? PlanExecutor::INTERRUPT_ONLY + : PlanExecutor::YIELD_AUTO; + return _getExecutorFind( + opCtx, collection, nss, std::move(canonicalQuery), yieldPolicy, plannerOptions); +} + +StatusWith> getExecutorLegacyFind( + OperationContext* opCtx, + Collection* collection, + const NamespaceString& nss, + std::unique_ptr canonicalQuery) { + return _getExecutorFind(opCtx, + collection, + nss, + std::move(canonicalQuery), + PlanExecutor::YIELD_AUTO, + QueryPlannerParams::DEFAULT); +} + namespace { /** diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h index 34857084bf9..6a801c1a81b 100644 --- a/src/mongo/db/query/get_executor.h +++ b/src/mongo/db/query/get_executor.h @@ -91,9 +91,17 @@ StatusWith> getExecutorFind Collection* collection, const NamespaceString& nss, std::unique_ptr canonicalQuery, - PlanExecutor::YieldPolicy yieldPolicy, size_t plannerOptions = QueryPlannerParams::DEFAULT); +/** + * Returns a plan executor for a legacy OP_QUERY find. + */ +StatusWith> getExecutorLegacyFind( + OperationContext* opCtx, + Collection* collection, + const NamespaceString& nss, + std::unique_ptr canonicalQuery); + /** * If possible, turn the provided QuerySolution into a QuerySolution that uses a DistinctNode * to provide results for the distinct command. diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 08b766b01e7..99b54a425c8 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -129,9 +129,10 @@ const StringMap sessionCheckoutWhitelist = {{"aggregate", 1}, // The command names for which readConcern level snapshot is allowed. The getMore command is // implicitly allowed to operate on a cursor which was opened under readConcern level snapshot. -const StringMap readConcernSnapshotWhitelist = {{"find", 1}, +const StringMap readConcernSnapshotWhitelist = {{"aggregate", 1}, {"count", 1}, {"delete", 1}, + {"find", 1}, {"geoSearch", 1}, {"insert", 1}, {"parallelCollectionScan", 1}, diff --git a/src/mongo/s/query/document_source_router_adapter.h b/src/mongo/s/query/document_source_router_adapter.h index 1520713edd5..5c1a6a0935c 100644 --- a/src/mongo/s/query/document_source_router_adapter.h +++ b/src/mongo/s/query/document_source_router_adapter.h @@ -49,7 +49,8 @@ public: PositionRequirement::kFirst, HostTypeRequirement::kMongoS, DiskUseRequirement::kNoDiskUse, - FacetRequirement::kNotAllowed}; + FacetRequirement::kNotAllowed, + TransactionRequirement::kAllowed}; } GetNextResult getNext() final; -- cgit v1.2.1