diff options
10 files changed, 191 insertions, 17 deletions
diff --git a/jstests/aggregation/sources/unionWith/unionWith_invalid_usage.js b/jstests/aggregation/sources/unionWith/unionWith_invalid_usage.js index 8877aa99740..e0264413151 100644 --- a/jstests/aggregation/sources/unionWith/unionWith_invalid_usage.js +++ b/jstests/aggregation/sources/unionWith/unionWith_invalid_usage.js @@ -1,5 +1,9 @@ /** * Tests for invalid usages of $unionWith, or invalid stages within the $unionWith sub-pipeline. + * @tags: [ + * # Some stages we're checking are only supported with a single read concern. + * assumes_read_concern_unchanged + * ] */ (function() { "use strict"; diff --git a/jstests/core/api_version_new_50_language_features.js b/jstests/core/api_version_new_50_language_features.js index a2c703cd8b3..553ce5447ee 100644 --- a/jstests/core/api_version_new_50_language_features.js +++ b/jstests/core/api_version_new_50_language_features.js @@ -21,12 +21,6 @@ coll.drop(); assert.commandWorked(coll.insert({a: 1, date: new ISODate()})); const unstablePipelines = [ - [{ - $setWindowFields: { - sortBy: {_id: 1}, - output: {runningCount: {$sum: 1, window: {documents: ["unbounded", "current"]}}} - } - }], [{$set: {x: {$dateTrunc: {date: "$date", unit: "second", binSize: 5}}}}], [{$set: {x: {$dateAdd: {startDate: "$date", unit: "day", amount: 1}}}}], [{$set: {x: {$dateSubtract: {startDate: "$date", unit: "day", amount: 1}}}}], @@ -37,7 +31,7 @@ const unstablePipelines = [ [{$set: {x: {$tsIncrement: new Timestamp(0, 0)}}}], ]; -function assertAggregateFailsWithAPIStrict(pipeline) { +function assertAggregateFailsWithAPIStrict(pipeline, errorCodes) { assert.commandFailedWithCode(db.runCommand({ aggregate: collName, pipeline: pipeline, @@ -45,15 +39,11 @@ function assertAggregateFailsWithAPIStrict(pipeline) { apiStrict: true, apiVersion: "1" }), - ErrorCodes.APIStrictError, + errorCodes, pipeline); } -for (let pipeline of unstablePipelines) { - // Assert error thrown when running a pipeline with stages not in API Version 1. - assertAggregateFailsWithAPIStrict(pipeline); - - // Assert error thrown when creating a view on a pipeline with stages not in API Version 1. +function assertViewFailsWithAPIStrict(pipeline) { assert.commandFailedWithCode(db.runCommand({ create: 'new_50_feature_view', viewOn: collName, @@ -63,6 +53,14 @@ for (let pipeline of unstablePipelines) { }), ErrorCodes.APIStrictError, pipeline); +} + +for (let pipeline of unstablePipelines) { + // Assert error thrown when running a pipeline with stages not in API Version 1. + assertAggregateFailsWithAPIStrict(pipeline, ErrorCodes.APIStrictError); + + // Assert error thrown when creating a view on a pipeline with stages not in API Version 1. + assertViewFailsWithAPIStrict(pipeline); // Assert error is not thrown when running without apiStrict=true. assert.commandWorked(db.runCommand({ @@ -73,6 +71,22 @@ for (let pipeline of unstablePipelines) { })); } +// $setWindowFields is not supported in transactions or with read concern snapshot. Test separately +// and check for all the error codes that can occur depending on what passthrough we are in. +const setWindowFieldsPipeline = [{ + $setWindowFields: { + sortBy: {_id: 1}, + output: {runningCount: {$sum: 1, window: {documents: ["unbounded", "current"]}}} + } +}]; +assertAggregateFailsWithAPIStrict(setWindowFieldsPipeline, [ + ErrorCodes.APIStrictError, + ErrorCodes.InvalidOptions, + ErrorCodes.OperationNotSupportedInTransaction +]); + +assertViewFailsWithAPIStrict(setWindowFieldsPipeline); + // Creating a collection with the unstable validator is not allowed with apiStrict:true. assert.commandFailedWithCode(db.runCommand({ create: 'new_50_features_validator', diff --git a/jstests/noPassthrough/set_window_fields_read_concern_snapshot.js b/jstests/noPassthrough/set_window_fields_read_concern_snapshot.js new file mode 100644 index 00000000000..ac99450555b --- /dev/null +++ b/jstests/noPassthrough/set_window_fields_read_concern_snapshot.js @@ -0,0 +1,76 @@ +/** + * Test that $setWindowFields is not supported with readConcern snapshot and in transactions. + */ +(function() { +"use strict"; + +const rst = new ReplSetTest({nodes: 2}); +rst.startSet(); +rst.initiate(); +const rstPrimary = rst.getPrimary(); +const testDB = rstPrimary.getDB(jsTestName() + "_db"); +const coll = testDB[jsTestName() + "_coll"]; +coll.drop(); + +assert.commandWorked(coll.insert({_id: 0, val: 0, partition: 1})); + +const rsStatus = rst.status(); +const lastClusterTime = rsStatus.optimes.lastCommittedOpTime.ts; + +let pipeline = [ + { + $setWindowFields: { + partitionBy: "$partition", + sortBy: {partition: 1}, + output: {sum: {$sum: "$val", window: {documents: [-21, 21]}}} + } + }, +]; +let aggregationCommand = { + aggregate: coll.getName(), + pipeline: pipeline, + allowDiskUse: true, + readConcern: {level: "snapshot", atClusterTime: lastClusterTime}, + cursor: {} +}; + +// Run outside of a transaction. Fail because read concern snapshot is specified. +assert.commandFailedWithCode(testDB.runCommand(aggregationCommand), ErrorCodes.InvalidOptions); +// Make sure that a $setWindowFields in a subpipeline with readConcern snapshot fails. +const lookupPipeline = [{$lookup: {from: coll.getName(), pipeline: pipeline, as: "newField"}}]; +aggregationCommand = { + aggregate: coll.getName(), + pipeline: lookupPipeline, + allowDiskUse: true, + readConcern: {level: "snapshot", atClusterTime: lastClusterTime}, + cursor: {} +}; +assert.commandFailedWithCode(testDB.runCommand(aggregationCommand), ErrorCodes.InvalidOptions); + +// Repeat in a transaction. +let session = rstPrimary.startSession(); +session.startTransaction({readConcern: {level: "snapshot"}}); +const sessionDB = session.getDatabase(testDB.getName()); +const sessionColl = sessionDB.getCollection(coll.getName()); +aggregationCommand = { + aggregate: coll.getName(), + pipeline: pipeline, + allowDiskUse: true, + cursor: {}, +}; +assert.commandFailedWithCode(sessionColl.runCommand(aggregationCommand), + ErrorCodes.OperationNotSupportedInTransaction); +// Transaction state is now unusual, abort it and start a new one. +session.abortTransaction(); +session.startTransaction({readConcern: {level: "snapshot"}}); +// Repeat the subpipeline test in a transaction. +aggregationCommand = { + aggregate: coll.getName(), + pipeline: lookupPipeline, + allowDiskUse: true, + cursor: {} +}; +assert.commandFailedWithCode(sessionColl.runCommand(aggregationCommand), + ErrorCodes.OperationNotSupportedInTransaction); +rst.stopSet(); +})(); diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h index a3623fca678..05a87f1e340 100644 --- a/src/mongo/db/pipeline/document_source_merge.h +++ b/src/mongo/db/pipeline/document_source_merge.h @@ -90,11 +90,17 @@ public: ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level, bool isImplicitDefault) const final { - return { + ReadConcernSupportResult result = { {level == repl::ReadConcernLevel::kLinearizableReadConcern, {ErrorCodes::InvalidOptions, "{} cannot be used with a 'linearizable' read concern level"_format(kStageName)}}, Status::OK()}; + auto pipelineReadConcern = LiteParsedDocumentSourceNestedPipelines::supportsReadConcern( + level, isImplicitDefault); + // Merge the result from the sub-pipeline into the $merge specific read concern result + // to preserve the $merge errors over the internal pipeline errors. + result.merge(pipelineReadConcern); + return result; } PrivilegeVector requiredPrivileges(bool isMongos, diff --git a/src/mongo/db/pipeline/document_source_set_window_fields.cpp b/src/mongo/db/pipeline/document_source_set_window_fields.cpp index 0df6d6d5927..7a9fe901c8f 100644 --- a/src/mongo/db/pipeline/document_source_set_window_fields.cpp +++ b/src/mongo/db/pipeline/document_source_set_window_fields.cpp @@ -75,14 +75,14 @@ bool modifiedSortPaths(const SortPattern& pat, const DocumentSource::GetModPaths REGISTER_DOCUMENT_SOURCE_WITH_MIN_VERSION( setWindowFields, - LiteParsedDocumentSourceDefault::parse, + document_source_set_window_fields::LiteParsedSetWindowFields::parse, document_source_set_window_fields::createFromBson, AllowedWithApiStrict::kNeverInVersion1, multiversion::FeatureCompatibilityVersion::kFullyDowngradedTo_5_0); REGISTER_DOCUMENT_SOURCE_WITH_MIN_VERSION( _internalSetWindowFields, - LiteParsedDocumentSourceDefault::parse, + document_source_set_window_fields::LiteParsedSetWindowFields::parse, DocumentSourceInternalSetWindowFields::createFromBson, AllowedWithApiStrict::kNeverInVersion1, multiversion::FeatureCompatibilityVersion::kFullyDowngradedTo_5_0); diff --git a/src/mongo/db/pipeline/document_source_set_window_fields.h b/src/mongo/db/pipeline/document_source_set_window_fields.h index f854de92653..313961216c4 100644 --- a/src/mongo/db/pipeline/document_source_set_window_fields.h +++ b/src/mongo/db/pipeline/document_source_set_window_fields.h @@ -89,6 +89,39 @@ std::list<boost::intrusive_ptr<DocumentSource>> create( boost::optional<boost::intrusive_ptr<Expression>> partitionBy, const boost::optional<SortPattern>& sortBy, std::vector<WindowFunctionStatement> outputFields); + +class LiteParsedSetWindowFields : public LiteParsedDocumentSource { +public: + static std::unique_ptr<LiteParsedSetWindowFields> parse(const NamespaceString& nss, + const BSONElement& spec) { + return std::make_unique<LiteParsedSetWindowFields>(spec.fieldName()); + } + + explicit LiteParsedSetWindowFields(std::string parseTimeName) + : LiteParsedDocumentSource(std::move(parseTimeName)) {} + + ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level, + bool isImplicitDefault) const { + // $setWindowFields cannot spill to disk if read concern is set to "snapshot". + // TODO SERVER-59772 Enable $setWindowFields with read concern "snapshot". + return {{level == repl::ReadConcernLevel::kSnapshotReadConcern && !isImplicitDefault, + {ErrorCodes::InvalidOptions, + str::stream() << "Aggregation stage " << kStageName + << " cannot run with readConcern '" + << repl::readConcernLevels::toString( + repl::ReadConcernLevel::kSnapshotReadConcern)}}, + // The default read concern can't be snapshot. + boost::none}; + } + + stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { + return stdx::unordered_set<NamespaceString>(); + } + + PrivilegeVector requiredPrivileges(bool isMongos, bool bypassDocumentValidation) const final { + return {}; + } +}; } // namespace document_source_set_window_fields class DocumentSourceInternalSetWindowFields final : public DocumentSource { @@ -131,7 +164,9 @@ public: HostTypeRequirement::kNone, DiskUseRequirement::kWritesTmpData, FacetRequirement::kAllowed, - TransactionRequirement::kAllowed, + // $setWindowFields does not work inside transactions. + // TODO SERVER-59772 Enable $setWindowFields inside transactions. + TransactionRequirement::kNotAllowed, LookupRequirement::kAllowed, UnionRequirement::kAllowed); } diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.cpp b/src/mongo/db/pipeline/lite_parsed_document_source.cpp index 9b28609c9d7..121b916a389 100644 --- a/src/mongo/db/pipeline/lite_parsed_document_source.cpp +++ b/src/mongo/db/pipeline/lite_parsed_document_source.cpp @@ -132,4 +132,19 @@ bool LiteParsedDocumentSourceNestedPipelines::allowShardedForeignCollection( }); } +ReadConcernSupportResult LiteParsedDocumentSourceNestedPipelines::supportsReadConcern( + repl::ReadConcernLevel level, bool isImplicitDefault) const { + // Assume that the document source holding the pipeline has no constraints of its own, so + // return the strictest of the constraints on the sub-pipelines. + auto result = ReadConcernSupportResult::allSupportedAndDefaultPermitted(); + for (auto& pipeline : _pipelines) { + result.merge(pipeline.sourcesSupportReadConcern(level, isImplicitDefault)); + // If both result statuses are already not OK, stop checking. + if (!result.readConcernSupport.isOK() && !result.defaultReadConcernPermit.isOK()) { + break; + } + } + return result; +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.h b/src/mongo/db/pipeline/lite_parsed_document_source.h index ecc56c170ab..f5fc5fbcb06 100644 --- a/src/mongo/db/pipeline/lite_parsed_document_source.h +++ b/src/mongo/db/pipeline/lite_parsed_document_source.h @@ -295,6 +295,13 @@ public: return _pipelines; } + /** + * Check the read concern constraints of all sub-pipelines. If the stage that owns the + * sub-pipelines has its own constraints this should be overridden to take those into account. + */ + ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level, + bool isImplicitDefault) const override; + protected: boost::optional<NamespaceString> _foreignNss; std::vector<LiteParsedPipeline> _pipelines; diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.cpp b/src/mongo/db/pipeline/lite_parsed_pipeline.cpp index 94f29755d9b..ff8491c88b7 100644 --- a/src/mongo/db/pipeline/lite_parsed_pipeline.cpp +++ b/src/mongo/db/pipeline/lite_parsed_pipeline.cpp @@ -73,6 +73,16 @@ ReadConcernSupportResult LiteParsedPipeline::supportsReadConcern( // 3. If either the specified or default readConcern have not already been rejected, determine // whether the pipeline stages support them. If not, we record the first error we encounter. + result.merge(sourcesSupportReadConcern(level, isImplicitDefault)); + + return result; +} + +ReadConcernSupportResult LiteParsedPipeline::sourcesSupportReadConcern( + repl::ReadConcernLevel level, bool isImplicitDefault) const { + // Start by assuming that we will support both readConcern and cluster-wide default. + ReadConcernSupportResult result = ReadConcernSupportResult::allSupportedAndDefaultPermitted(); + for (auto&& spec : _stageSpecs) { // If both result statuses are already not OK, stop checking further stages. if (!result.readConcernSupport.isOK() && !result.defaultReadConcernPermit.isOK()) { diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h index 0a7f24b83a5..eecaaa937e1 100644 --- a/src/mongo/db/pipeline/lite_parsed_pipeline.h +++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h @@ -136,6 +136,13 @@ public: bool enableMajorityReadConcern) const; /** + * Checks that all of the stages in this pipeline are allowed to run with the specified read + * concern level. Does not do any pipeline global checks. + */ + ReadConcernSupportResult sourcesSupportReadConcern(repl::ReadConcernLevel level, + bool isImplicitDefault) const; + + /** * Verifies that this pipeline is allowed to run in a multi-document transaction. This ensures * that each stage is compatible, and throws a UserException if not. This should only be called * if the caller has determined the current operation is part of a transaction. |