diff options
36 files changed, 189 insertions, 347 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml b/buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml index f102b4e6ae1..230702c629c 100644 --- a/buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml @@ -14,7 +14,6 @@ selector: - jstests/aggregation/sources/facet/use_cases.js # Cannot specify write concern when # secondaryThrottle is not set. - jstests/aggregation/testSlave.js # Majority read on secondary requires afterOpTime. - - jstests/aggregation/sources/out/*.js # Cannot specify write concern when secondaryThrottle is not set. executor: config: diff --git a/jstests/aggregation/bugs/server11675.js b/jstests/aggregation/bugs/server11675.js index 168b745f201..63b6081fab5 100644 --- a/jstests/aggregation/bugs/server11675.js +++ b/jstests/aggregation/bugs/server11675.js @@ -198,7 +198,8 @@ var server11675 = function() { // Error checking // $match, but wrong position - assertErrorCode(t, [{$sort: {text: 1}}, {$match: {$text: {$search: 'apple banana'}}}], 17313); + assertErrorCode( + t, [{$sort: {text: 1}}, {$match: {$text: {$search: 'apple banana'}}}], ErrorCodes.BadValue); // wrong $stage, but correct position assertErrorCode(t, diff --git a/jstests/aggregation/extras/utils.js b/jstests/aggregation/extras/utils.js index 24d05af628e..fcad201f24e 100644 --- a/jstests/aggregation/extras/utils.js +++ b/jstests/aggregation/extras/utils.js @@ -272,8 +272,8 @@ function assertErrorCode(coll, pipe, code, errmsg) { cursor.itcount(); }, [], "expected error: " + code); - assert.eq(error.code, code, tojson(error)); + assert.eq(error.code, code); } else { - assert.eq(cursorRes.code, code, tojson(cursorRes)); + assert.eq(cursorRes.code, code); } } diff --git a/jstests/aggregation/sources/changeNotification/required_as_first_stage.js b/jstests/aggregation/sources/changeNotification/required_as_first_stage.js deleted file mode 100644 index dc53770aacb..00000000000 --- a/jstests/aggregation/sources/changeNotification/required_as_first_stage.js +++ /dev/null @@ -1,32 +0,0 @@ -// Tests that the $changeNotification stage can only be present as the first stage in the pipeline. -(function() { - "use strict"; - load("jstests/aggregation/extras/utils.js"); // For assertErrorCode. - - let testName = "change_notification_required_as_first_stage"; - var rst = new ReplSetTest({name: testName, nodes: 1}); - rst.startSet(); - rst.initiate(); - - const coll = rst.getPrimary().getDB("test").getCollection(testName); - - assertErrorCode(coll, [{$indexStats: {}}, {$changeNotification: {}}], 40549); - assertErrorCode( - coll, - [{$indexStats: {}}, {$changeNotification: {}}, {$match: {test: "this is an extra stage"}}], - 40549); - - // Test that a $changeNotification stage is not allowed within a $facet stage. - assertErrorCode(coll, [{$facet: {testPipe: [{$changeNotification: {}}]}}], 40550); - assertErrorCode(coll, - [{ - $facet: { - testPipe: [ - {$indexStats: {}}, - {$changeNotification: {}}, - {$match: {test: "this is an extra stage"}} - ] - } - }], - 40550); -}()); diff --git a/jstests/aggregation/sources/collStats/count.js b/jstests/aggregation/sources/collStats/count.js index d90bd33f22f..4072a9d56fc 100644 --- a/jstests/aggregation/sources/collStats/count.js +++ b/jstests/aggregation/sources/collStats/count.js @@ -16,7 +16,7 @@ // Test that $collStats must be first stage. let pipeline = [{$match: {}}, {$collStats: {}}]; - assertErrorCode(coll, pipeline, 40549); + assertErrorCode(coll, pipeline, ErrorCodes.BadValue); // Test that an error is returned if count is not an object. pipeline = [{$collStats: {count: 1}}]; diff --git a/jstests/aggregation/sources/out/required_last_position.js b/jstests/aggregation/sources/out/required_last_position.js deleted file mode 100644 index ebdf4df9fa7..00000000000 --- a/jstests/aggregation/sources/out/required_last_position.js +++ /dev/null @@ -1,17 +0,0 @@ -// Tests that $out can only be used as the last stage. -(function() { - "use strict"; - - load("jstests/aggregation/extras/utils.js"); // For assertErrorCode. - - const coll = db.require_out_last; - coll.drop(); - - // Test that $out is allowed as the last (and only) stage. - assert.doesNotThrow(() => coll.aggregate([{$out: "out_collection"}])); - - // Test that $out is not allowed to have a stage after it. - assertErrorCode(coll, [{$out: "out_collection"}, {$match: {x: true}}], 40551); - assertErrorCode( - coll, [{$project: {x: 0}}, {$out: "out_collection"}, {$match: {x: true}}], 40551); -}()); diff --git a/jstests/sharding/aggregation_currentop.js b/jstests/sharding/aggregation_currentop.js index e31abafb702..5ecb7ef5f74 100644 --- a/jstests/sharding/aggregation_currentop.js +++ b/jstests/sharding/aggregation_currentop.js @@ -234,7 +234,7 @@ assert.commandFailedWithCode( adminDB.runCommand( {aggregate: 1, pipeline: [{$currentOp: {}}, {$currentOp: {}}], cursor: {}}), - 40549); + ErrorCodes.BadValue); // Test that $currentOp fails when run on admin without {aggregate: 1}. assert.commandFailedWithCode( diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index d77c3a51a23..3d6b8a25d16 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -400,10 +400,15 @@ Status runAggregate(OperationContext* opCtx, expCtx->tailableMode = ExpressionContext::TailableMode::kTailableAndAwaitData; } - auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx)); + // Parse the pipeline. + auto statusWithPipeline = Pipeline::parse(request.getPipeline(), expCtx); + if (!statusWithPipeline.isOK()) { + return statusWithPipeline.getStatus(); + } + auto pipeline = std::move(statusWithPipeline.getValue()); - // Check that the view's collation matches the collation of any views involved in the - // pipeline. + // Check that the view's collation matches the collation of any views involved + // in the pipeline. if (!pipelineInvolvedNamespaces.empty()) { invariant(ctx); auto pipelineCollationStatus = collatorCompatibleWithPipeline( diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp index 6e58adae2bf..9651c7cb8e8 100644 --- a/src/mongo/db/pipeline/document_source.cpp +++ b/src/mongo/db/pipeline/document_source.cpp @@ -164,7 +164,7 @@ Pipeline::SourceContainer::iterator DocumentSource::optimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this && (std::next(itr) != container->end())); auto nextMatch = dynamic_cast<DocumentSourceMatch*>((*std::next(itr)).get()); - if (constraints().canSwapWithMatch && nextMatch && !nextMatch->isTextQuery()) { + if (canSwapWithMatch() && nextMatch && !nextMatch->isTextQuery()) { // We're allowed to swap with a $match and the stage after us is a $match. Furthermore, the // $match does not contain a text search predicate, which we do not attempt to optimize // because such a $match must already be the first stage in the pipeline. We can attempt to diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 2545f02d8de..24f67e46a29 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -118,39 +118,14 @@ public: using Parser = stdx::function<std::vector<boost::intrusive_ptr<DocumentSource>>( BSONElement, const boost::intrusive_ptr<ExpressionContext>&)>; - /** - * A struct describing various constraints about where this stage can run, where it must be in - * the pipeline, and things like that. - */ - struct StageConstraints { - /** - * A Position describes a requirement of the position of the stage within the pipeline. - */ - enum class PositionRequirement { kNone, kFirst, kLast }; - - // Set if this stage needs to be in a particular position of the pipeline. - PositionRequirement requiredPosition = PositionRequirement::kNone; - - bool isAllowedInsideFacetStage = true; - - // True if this stage does not generate results itself, and instead pulls inputs from an - // input DocumentSource (via 'pSource'). - bool requiresInputDocSource = true; - - // True if this stage operates on a global or database level, like $currentOp. - bool isIndependentOfAnyCollection = false; - - // True if this stage can ever be safely swapped with a subsequent $match stage, provided - // that the match does not depend on the paths returned by getModifiedPaths(). - // - // Stages that want to participate in match swapping should set this to true. Such a stage - // must also override getModifiedPaths() to provide information about which particular - // $match predicates be swapped before itself. - bool canSwapWithMatch = false; - - // True if this stage must run on the primary shard when the collection being aggregated is - // sharded. - bool mustRunOnPrimaryShardIfSharded = false; + enum class InitialSourceType { + // Stage requires input from a preceding DocumentSource. + kNotInitialSource, + // Stage does not need an input source and should be the first stage in the pipeline. + kInitialSource, + // Similar to kInitialSource, but does not require an underlying collection to produce + // output. + kCollectionlessInitialSource }; /** @@ -244,14 +219,6 @@ public: virtual GetNextResult getNext() = 0; /** - * Returns a struct containing information about any special constraints imposed on using this - * stage. - */ - virtual StageConstraints constraints() const { - return StageConstraints{}; - } - - /** * Informs the stage that it is no longer needed and can release its resources. After dispose() * is called the stage must still be able to handle calls to getNext(), but can return kEOF. * @@ -292,6 +259,38 @@ public: boost::optional<ExplainOptions::Verbosity> explain = boost::none) const; /** + * Subclasses should return InitialSourceType::kInitialSource if the stage does not require an + * input source, or InitialSourceType::kCollectionlessInitialSource if the stage will produce + * the input for the pipeline independent of an underlying collection. The latter are specified + * with {aggregate: 1}, e.g. $currentOp. + */ + virtual InitialSourceType getInitialSourceType() const { + return InitialSourceType::kNotInitialSource; + } + + /** + * Returns true if this stage does not require an input source. + */ + bool isInitialSource() const { + return getInitialSourceType() != InitialSourceType::kNotInitialSource; + } + + /** + * Returns true if this stage will produce the input for the pipeline independent of an + * underlying collection. These are specified with {aggregate: 1}, e.g. $currentOp. + */ + bool isCollectionlessInitialSource() const { + return getInitialSourceType() == InitialSourceType::kCollectionlessInitialSource; + } + + /** + * Returns true if the DocumentSource needs to be run on the primary shard. + */ + virtual bool needsPrimaryShard() const { + return false; + } + + /** * If DocumentSource uses additional collections, it adds the namespaces to the input vector. */ virtual void addInvolvedCollections(std::vector<NamespaceString>* collections) const {} @@ -427,6 +426,18 @@ public: return {GetModPathsReturn::Type::kNotSupported, std::set<std::string>{}, {}}; } + /** + * Returns whether this stage can swap with a subsequent $match stage, provided that the match + * does not depend on the paths returned by getModifiedPaths(). + * + * Subclasses which want to participate in match swapping should override this to return true. + * Such a subclass must also override getModifiedPaths() to provide information about which + * $match predicates be swapped before itself. + */ + virtual bool canSwapWithMatch() const { + return false; + } + enum GetDepsReturn { // The full object and all metadata may be required. NOT_SUPPORTED = 0x0, diff --git a/src/mongo/db/pipeline/document_source_change_notification.cpp b/src/mongo/db/pipeline/document_source_change_notification.cpp index dff06f05802..b8d7027fc5f 100644 --- a/src/mongo/db/pipeline/document_source_change_notification.cpp +++ b/src/mongo/db/pipeline/document_source_change_notification.cpp @@ -40,7 +40,6 @@ namespace mongo { using boost::intrusive_ptr; -using boost::optional; using std::vector; using std::string; @@ -54,52 +53,7 @@ REGISTER_MULTI_STAGE_ALIAS(changeNotification, DocumentSourceChangeNotification::LiteParsed::parse, DocumentSourceChangeNotification::createFromBson); -namespace { - -static constexpr StringData kOplogMatchExplainName = "$_internalOplogMatch"_sd; - -/** - * A custom subclass of DocumentSourceMatch which does not serialize itself (since it came from an - * alias) and requires itself to be the first stage in the pipeline. - */ -class DocumentSourceOplogMatch final : public DocumentSourceMatch { -public: - static intrusive_ptr<DocumentSourceOplogMatch> create( - BSONObj filter, const intrusive_ptr<ExpressionContext>& expCtx) { - return new DocumentSourceOplogMatch(std::move(filter), expCtx); - } - - const char* getSourceName() const final { - // This is used in error reporting, particularly if we find this stage in a position other - // than first, so report the name as $changeNotification. - return "$changeNotification"; - } - - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; - constraints.isAllowedInsideFacetStage = false; - return constraints; - } - - /** - * Only serialize this stage for explain purposes, otherwise keep it hidden so that we can - * properly alias. - */ - Value serialize(optional<ExplainOptions::Verbosity> explain) const final { - if (explain) { - return Value(Document{{kOplogMatchExplainName, Document{}}}); - } - return Value(); - } - -private: - DocumentSourceOplogMatch(BSONObj filter, const intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSourceMatch(std::move(filter), expCtx) {} -}; -} // namespace - -BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString& nss) { +BSONObj DocumentSourceChangeNotification::buildMatch(BSONElement elem, const NamespaceString& nss) { auto target = nss.ns(); // 1) Supported commands that have the target db namespace (e.g. test.$cmd) in "ns" field. @@ -124,7 +78,9 @@ BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString auto opMatch = BSON("ns" << target); // Match oplog entries after "start" and are either (3) supported commands or (4) CRUD ops. - return BSON("ts" << GT << Timestamp() << "$or" << BSON_ARRAY(opMatch << commandMatch)); + auto matchFilter = + BSON("ts" << GT << Timestamp() << "$or" << BSON_ARRAY(opMatch << commandMatch)); + return BSON("$match" << matchFilter); } vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFromBson( @@ -137,11 +93,11 @@ vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFr "Only default collation is allowed when using a $changeNotification stage.", !expCtx->getCollator()); - BSONObj filter = buildMatchFilter(expCtx->ns); + BSONObj matchObj = buildMatch(elem, expCtx->ns); - auto oplogMatch = DocumentSourceOplogMatch::create(filter, expCtx); - auto transformation = createTransformationStage(expCtx); - return {oplogMatch, transformation}; + auto matchSource = DocumentSourceMatch::createFromBson(matchObj.firstElement(), expCtx); + auto transformSource = createTransformationStage(expCtx); + return {matchSource, transformSource}; } intrusive_ptr<DocumentSource> DocumentSourceChangeNotification::createTransformationStage( @@ -238,8 +194,6 @@ Document DocumentSourceChangeNotification::Transformation::applyTransformation( Document DocumentSourceChangeNotification::Transformation::serializeStageOptions( boost::optional<ExplainOptions::Verbosity> explain) const { - // TODO SERVER-29135 Be sure to re-serialize the 'postImage' argument. - // TODO SERVER-29131 Be sure to re-serialize the 'resumeAfter' argument. return Document(); } diff --git a/src/mongo/db/pipeline/document_source_change_notification.h b/src/mongo/db/pipeline/document_source_change_notification.h index 3b38a908626..e5a1186f04e 100644 --- a/src/mongo/db/pipeline/document_source_change_notification.h +++ b/src/mongo/db/pipeline/document_source_change_notification.h @@ -77,7 +77,7 @@ public: /** * Produce the BSON for the $match stage based on a $changeNotification stage. */ - static BSONObj buildMatchFilter(const NamespaceString& nss); + static BSONObj buildMatch(BSONElement elem, const NamespaceString& nss); /** * Parses a $changeNotification stage from 'elem' and produces the $match and transformation diff --git a/src/mongo/db/pipeline/document_source_change_notification_test.cpp b/src/mongo/db/pipeline/document_source_change_notification_test.cpp index b3595789577..0fd14fd9a92 100644 --- a/src/mongo/db/pipeline/document_source_change_notification_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_notification_test.cpp @@ -101,8 +101,7 @@ TEST_F(ChangeNotificationStageTest, StagesGeneratedCorrectly) { DocumentSourceChangeNotification::createFromBson(spec.firstElement(), getExpCtx()); ASSERT_EQUALS(result.size(), 2UL); - ASSERT_TRUE(dynamic_cast<DocumentSourceMatch*>(result[0].get())); - ASSERT_EQUALS(string(result[0]->getSourceName()), "$changeNotification"); + ASSERT_EQUALS(string(result[0]->getSourceName()), "$match"); ASSERT_EQUALS(string(result[1]->getSourceName()), "$changeNotification"); // TODO: Check explain result. diff --git a/src/mongo/db/pipeline/document_source_coll_stats.cpp b/src/mongo/db/pipeline/document_source_coll_stats.cpp index 73a182145ac..7f025a29606 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.cpp +++ b/src/mongo/db/pipeline/document_source_coll_stats.cpp @@ -148,6 +148,10 @@ DocumentSource::GetNextResult DocumentSourceCollStats::getNext() { return {Document(builder.obj())}; } +DocumentSource::InitialSourceType DocumentSourceCollStats::getInitialSourceType() const { + return InitialSourceType::kInitialSource; +} + Value DocumentSourceCollStats::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { return Value(Document{{getSourceName(), _collStatsSpec}}); } diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h index 6334bbe55c8..2506c1be823 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.h +++ b/src/mongo/db/pipeline/document_source_coll_stats.h @@ -74,13 +74,7 @@ public: const char* getSourceName() const final; - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; - constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; - return constraints; - } + InitialSourceType getInitialSourceType() const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; diff --git a/src/mongo/db/pipeline/document_source_current_op.cpp b/src/mongo/db/pipeline/document_source_current_op.cpp index 674d0026ad5..49f3dfc9d70 100644 --- a/src/mongo/db/pipeline/document_source_current_op.cpp +++ b/src/mongo/db/pipeline/document_source_current_op.cpp @@ -86,6 +86,10 @@ const char* DocumentSourceCurrentOp::getSourceName() const { return "$currentOp"; } +DocumentSource::InitialSourceType DocumentSourceCurrentOp::getInitialSourceType() const { + return InitialSourceType::kCollectionlessInitialSource; +} + DocumentSource::GetNextResult DocumentSourceCurrentOp::getNext() { pExpCtx->checkForInterrupt(); diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h index cd86d027733..4526134c13f 100644 --- a/src/mongo/db/pipeline/document_source_current_op.h +++ b/src/mongo/db/pipeline/document_source_current_op.h @@ -78,14 +78,7 @@ public: const char* getSourceName() const final; - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; - constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; - constraints.isIndependentOfAnyCollection = true; - return constraints; - } + InitialSourceType getInitialSourceType() const final; static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index 57a41996af9..b3cdf802d4e 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -50,12 +50,8 @@ public: return _outputSorts; } Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; - constraints.requiresInputDocSource = false; - return constraints; + InitialSourceType getInitialSourceType() const final { + return InitialSourceType::kInitialSource; } void detachFromOperationContext() final; diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index aba1f5f115c..992f3daac11 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -99,6 +99,13 @@ vector<pair<string, vector<BSONObj>>> extractRawPipelines(const BSONElement& ele << ": " << subPipeElem, subPipeElem.type() == BSONType::Object); + auto stageName = subPipeElem.Obj().firstElementFieldName(); + uassert( + 40331, + str::stream() << "specified stage is not allowed to be used within a $facet stage: " + << subPipeElem, + !str::equals(stageName, "$out") && !str::equals(stageName, "$facet")); + rawPipeline.push_back(subPipeElem.embeddedObject()); } @@ -237,22 +244,16 @@ void DocumentSourceFacet::doReattachToOperationContext(OperationContext* opCtx) } } -DocumentSource::StageConstraints DocumentSourceFacet::constraints() const { - StageConstraints constraints; - constraints.isAllowedInsideFacetStage = false; // Disallow nested $facets. - +bool DocumentSourceFacet::needsPrimaryShard() const { + // Currently we don't split $facet to have a merger part and a shards part (see SERVER-24154). + // This means that if any stage in any of the $facet pipelines requires the primary shard, then + // the entire $facet must happen on the merger, and the merger must be the primary shard. for (auto&& facet : _facets) { - for (auto&& nestedStage : facet.pipeline->getSources()) { - if (nestedStage->constraints().mustRunOnPrimaryShardIfSharded) { - // Currently we don't split $facet to have a merger part and a shards part (see - // SERVER-24154). This means that if any stage in any of the $facet pipelines - // requires the primary shard, then the entire $facet must happen on the merger, and - // the merger must be the primary shard. - constraints.mustRunOnPrimaryShardIfSharded = true; - } + if (facet.pipeline->needsPrimaryShardMerger()) { + return true; } } - return constraints; + return false; } DocumentSource::GetDepsReturn DocumentSourceFacet::getDependencies(DepsTracker* deps) const { diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h index 9ca2ea149ec..f06f73c7b27 100644 --- a/src/mongo/db/pipeline/document_source_facet.h +++ b/src/mongo/db/pipeline/document_source_facet.h @@ -135,7 +135,7 @@ public: void doInjectMongodInterface(std::shared_ptr<MongodInterface> mongod) final; void doDetachFromOperationContext() final; void doReattachToOperationContext(OperationContext* opCtx) final; - StageConstraints constraints() const final; + bool needsPrimaryShard() const final; protected: void doDispose() final; diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp index 60aefdead64..a41b3a2b1ce 100644 --- a/src/mongo/db/pipeline/document_source_facet_test.cpp +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -111,6 +111,16 @@ TEST_F(DocumentSourceFacetTest, ShouldRejectEmptyPipelines) { ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); } +TEST_F(DocumentSourceFacetTest, ShouldRejectFacetsWithStagesThatMustBeTheFirstStage) { + auto ctx = getExpCtx(); + auto spec = BSON("$facet" << BSON("a" << BSON_ARRAY(BSON("$indexStats" << BSONObj())))); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); + + spec = BSON("$facet" << BSON( + "a" << BSON_ARRAY(BSON("$limit" << 1) << BSON("$indexStats" << BSONObj())))); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); +} + TEST_F(DocumentSourceFacetTest, ShouldSucceedWhenNamespaceIsCollectionless) { auto ctx = getExpCtx(); auto spec = fromjson("{$facet: {a: [{$match: {}}]}}"); @@ -168,10 +178,9 @@ class DocumentSourcePassthrough : public DocumentSourceMock { public: DocumentSourcePassthrough() : DocumentSourceMock({}) {} - StageConstraints constraints() const override { - StageConstraints constraints; - constraints.isAllowedInsideFacetStage = true; - return constraints; + // We need this to be false so that it can be used in a $facet stage. + InitialSourceType getInitialSourceType() const final { + return InitialSourceType::kNotInitialSource; } DocumentSource::GetNextResult getNext() final { @@ -607,10 +616,8 @@ TEST_F(DocumentSourceFacetTest, ShouldThrowIfAnyPipelineRequiresTextScoreButItIs */ class DocumentSourceNeedsPrimaryShard final : public DocumentSourcePassthrough { public: - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.mustRunOnPrimaryShardIfSharded = true; - return constraints; + bool needsPrimaryShard() const final { + return true; } static boost::intrusive_ptr<DocumentSourceNeedsPrimaryShard> create() { @@ -633,7 +640,7 @@ TEST_F(DocumentSourceFacetTest, ShouldRequirePrimaryShardIfAnyStageRequiresPrima facets.emplace_back("needsPrimaryShard", std::move(secondPipeline)); auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); - ASSERT_TRUE(facetStage->constraints().mustRunOnPrimaryShardIfSharded); + ASSERT_TRUE(facetStage->needsPrimaryShard()); } TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPrimaryShard) { @@ -652,7 +659,7 @@ TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPr facets.emplace_back("second", std::move(secondPipeline)); auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); - ASSERT_FALSE(facetStage->constraints().mustRunOnPrimaryShardIfSharded); + ASSERT_FALSE(facetStage->needsPrimaryShard()); } } // namespace diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h index 5df25d6c1c6..5178a89c18a 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.h +++ b/src/mongo/db/pipeline/document_source_geo_near.h @@ -46,15 +46,9 @@ public: */ Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) final; - - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; - constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; - return constraints; + InitialSourceType getInitialSourceType() const final { + return InitialSourceType::kInitialSource; } - Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; BSONObjSet getOutputSorts() final { return SimpleBSONObjComparator::kInstance.makeBSONObjSet( diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h index 489a80ea7b9..6572fc4741f 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.h +++ b/src/mongo/db/pipeline/document_source_graph_lookup.h @@ -53,11 +53,8 @@ public: */ GetModPathsReturn getModifiedPaths() const final; - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.canSwapWithMatch = true; - constraints.mustRunOnPrimaryShardIfSharded = true; - return constraints; + bool canSwapWithMatch() const final { + return true; } GetDepsReturn getDependencies(DepsTracker* deps) const final { @@ -65,6 +62,10 @@ public: return SEE_NEXT; }; + bool needsPrimaryShard() const final { + return true; + } + void addInvolvedCollections(std::vector<NamespaceString>* collections) const final { collections->push_back(_from); } diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h index dd62f5323bd..7d25aca6c9f 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.h +++ b/src/mongo/db/pipeline/document_source_index_stats.h @@ -69,12 +69,8 @@ public: const char* getSourceName() const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; - constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; - return constraints; + virtual InitialSourceType getInitialSourceType() const final { + return InitialSourceType::kInitialSource; } static boost::intrusive_ptr<DocumentSource> createFromBson( diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index d5bab6fd9e7..bb4fae8df2e 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -94,11 +94,8 @@ public: */ GetModPathsReturn getModifiedPaths() const final; - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.canSwapWithMatch = true; - constraints.mustRunOnPrimaryShardIfSharded = true; - return constraints; + bool canSwapWithMatch() const final { + return true; } GetDepsReturn getDependencies(DepsTracker* deps) const final; @@ -107,6 +104,10 @@ public: return DocumentSource::truncateSortSet(pSource->getOutputSorts(), {_as.fullPath()}); } + bool needsPrimaryShard() const final { + return true; + } + boost::intrusive_ptr<DocumentSource> getShardSource() final { return nullptr; } diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h index 2be99dd7f12..0b5800515aa 100644 --- a/src/mongo/db/pipeline/document_source_match.h +++ b/src/mongo/db/pipeline/document_source_match.h @@ -37,21 +37,18 @@ namespace mongo { -class DocumentSourceMatch : public DocumentSource { +class DocumentSourceMatch final : public DocumentSource { public: - virtual ~DocumentSourceMatch() = default; - + // virtuals from DocumentSource GetNextResult getNext() final; + const char* getSourceName() const final; + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; boost::intrusive_ptr<DocumentSource> optimize() final; BSONObjSet getOutputSorts() final { return pSource ? pSource->getOutputSorts() : SimpleBSONObjComparator::kInstance.makeBSONObjSet(); } - const char* getSourceName() const override; - Value serialize( - boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override; - /** * Attempts to combine with any subsequent $match stages, joining the query objects with a * $and. @@ -144,11 +141,10 @@ public: const std::string& path, const boost::intrusive_ptr<ExpressionContext>& expCtx); -protected: +private: DocumentSourceMatch(const BSONObj& query, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); -private: void addDependencies(DepsTracker* deps) const; std::unique_ptr<MatchExpression> _expression; diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h index 045ddad4836..ebe0a861ad6 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.h +++ b/src/mongo/db/pipeline/document_source_merge_cursors.h @@ -52,13 +52,8 @@ public: GetNextResult getNext() final; const char* getSourceName() const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; - constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; - return constraints; + InitialSourceType getInitialSourceType() const final { + return InitialSourceType::kInitialSource; } static boost::intrusive_ptr<DocumentSource> createFromBson( diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h index 513994cd599..9236e55ad62 100644 --- a/src/mongo/db/pipeline/document_source_mock.h +++ b/src/mongo/db/pipeline/document_source_mock.h @@ -48,15 +48,9 @@ public: const char* getSourceName() const override; Value serialize( boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override; - - StageConstraints constraints() const override { - StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; - constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; - return constraints; + InitialSourceType getInitialSourceType() const override { + return InitialSourceType::kInitialSource; } - BSONObjSet getOutputSorts() override { return sorts; } diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 9366d01a670..3b134f39d5e 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -43,13 +43,8 @@ public: const char* getSourceName() const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; GetDepsReturn getDependencies(DepsTracker* deps) const final; - - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.mustRunOnPrimaryShardIfSharded = true; - constraints.isAllowedInsideFacetStage = false; - constraints.requiredPosition = StageConstraints::PositionRequirement::kLast; - return constraints; + bool needsPrimaryShard() const final { + return true; } // Virtuals for SplittableDocumentSource diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index 26d394d05e9..a3bea23795e 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -101,10 +101,8 @@ public: DocumentSource::GetDepsReturn getDependencies(DepsTracker* deps) const final; GetModPathsReturn getModifiedPaths() const final; - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.canSwapWithMatch = true; - return constraints; + bool canSwapWithMatch() const final { + return true; } TransformerInterface::TransformerType getType() const { diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index 866a3fb955c..cb1c709ecde 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -52,12 +52,10 @@ public: return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}}; } - StageConstraints constraints() const final { - StageConstraints constraints; + bool canSwapWithMatch() const final { // Can't swap with a $match if a limit has been absorbed, since in general match can't swap // with limit. - constraints.canSwapWithMatch = !limitSrc; - return constraints; + return !limitSrc; } BSONObjSet getOutputSorts() final { diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h index fb7bd68ba39..53fbd33d083 100644 --- a/src/mongo/db/pipeline/document_source_unwind.h +++ b/src/mongo/db/pipeline/document_source_unwind.h @@ -46,10 +46,8 @@ public: */ GetModPathsReturn getModifiedPaths() const final; - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.canSwapWithMatch = true; - return constraints; + bool canSwapWithMatch() const final { + return true; } GetDepsReturn getDependencies(DepsTracker* deps) const final; diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 31cd015ea97..63dae04e325 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -32,8 +32,6 @@ #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/pipeline_optimizations.h" -#include <algorithm> - #include "mongo/base/error_codes.h" #include "mongo/db/bson/dotted_path_support.h" #include "mongo/db/catalog/document_validation.h" @@ -136,16 +134,14 @@ Status Pipeline::validatePipeline() const { // {aggregate: 1} is only valid for collectionless sources, and vice-versa. const auto firstStage = _sources.front().get(); - if (nss.isCollectionlessAggregateNS() && - !firstStage->constraints().isIndependentOfAnyCollection) { + if (nss.isCollectionlessAggregateNS() && !firstStage->isCollectionlessInitialSource()) { return {ErrorCodes::InvalidNamespace, str::stream() << "{aggregate: 1} is not valid for '" << firstStage->getSourceName() << "'; a collection is required."}; } - if (!nss.isCollectionlessAggregateNS() && - firstStage->constraints().isIndependentOfAnyCollection) { + if (!nss.isCollectionlessAggregateNS() && firstStage->isCollectionlessInitialSource()) { return {ErrorCodes::InvalidNamespace, str::stream() << "'" << firstStage->getSourceName() << "' can only be run with {aggregate: 1}"}; @@ -158,21 +154,11 @@ Status Pipeline::validatePipeline() const { Status Pipeline::validateFacetPipeline() const { if (_sources.empty()) { - return {ErrorCodes::BadValue, "sub-pipeline in $facet stage cannot be empty"}; - } - for (auto&& stage : _sources) { - auto stageConstraints = stage->constraints(); - if (!stageConstraints.isAllowedInsideFacetStage) { - return {ErrorCodes::BadValue, - str::stream() << stage->getSourceName() - << " is not allowed to be used within a $facet stage", - 40550}; - } - // We expect a stage within a $facet stage to have these properties. - invariant(stageConstraints.requiresInputDocSource); - invariant(!stageConstraints.isIndependentOfAnyCollection); - invariant(stageConstraints.requiredPosition == - DocumentSource::StageConstraints::PositionRequirement::kNone); + return {ErrorCodes::BadValue, "sub-pipeline in $facet stage cannot be empty."}; + } else if (_sources.front()->isInitialSource()) { + return {ErrorCodes::BadValue, + str::stream() << _sources.front()->getSourceName() + << " is not allowed to be used within a $facet stage."}; } // Facet pipelines cannot have any stages which are initial sources. We've already validated the @@ -184,13 +170,10 @@ Status Pipeline::validateFacetPipeline() const { Status Pipeline::ensureAllStagesAreInLegalPositions() const { size_t i = 0; for (auto&& stage : _sources) { - if (stage->constraints().requiredPosition == - DocumentSource::StageConstraints::PositionRequirement::kFirst && - i != 0) { + if (stage->isInitialSource() && i != 0) { return {ErrorCodes::BadValue, str::stream() << stage->getSourceName() - << " is only valid as the first stage in a pipeline.", - 40549}; + << " is only valid as the first stage in a pipeline."}; } auto matchStage = dynamic_cast<DocumentSourceMatch*>(stage.get()); if (i != 0 && matchStage && matchStage->isTextQuery()) { @@ -199,13 +182,8 @@ Status Pipeline::ensureAllStagesAreInLegalPositions() const { 17313}; } - if (stage->constraints().requiredPosition == - DocumentSource::StageConstraints::PositionRequirement::kLast && - i != _sources.size() - 1) { - return {ErrorCodes::BadValue, - str::stream() << stage->getSourceName() - << " can only be the final stage in the pipeline", - 40551}; + if (dynamic_cast<DocumentSourceOut*>(stage.get()) && i != _sources.size() - 1) { + return {ErrorCodes::BadValue, "$out can only be the final stage in the pipeline"}; } ++i; } @@ -395,9 +373,12 @@ BSONObj Pipeline::getInitialQuery() const { } bool Pipeline::needsPrimaryShardMerger() const { - return std::any_of(_sources.begin(), _sources.end(), [](const auto& stage) { - return stage->constraints().mustRunOnPrimaryShardIfSharded; - }); + for (auto&& source : _sources) { + if (source->needsPrimaryShard()) { + return true; + } + } + return false; } std::vector<NamespaceString> Pipeline::getInvolvedCollections() const { diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index f1222797f11..697c639ccf7 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -454,7 +454,8 @@ void PipelineD::prepareCursorSource(Collection* collection, } } - if (!sources.empty() && !sources.front()->constraints().requiresInputDocSource) { + // If the first stage of the pipeline is an initial source, we don't need an input cursor. + if (!sources.empty() && sources.front()->isInitialSource()) { return; } diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 1f37b30fc90..8dbc02fc88a 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -1349,10 +1349,8 @@ class DocumentSourceCollectionlessMock : public DocumentSourceMock { public: DocumentSourceCollectionlessMock() : DocumentSourceMock({}) {} - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.isIndependentOfAnyCollection = true; - return constraints; + InitialSourceType getInitialSourceType() const final { + return InitialSourceType::kCollectionlessInitialSource; } static boost::intrusive_ptr<DocumentSourceCollectionlessMock> create() { @@ -1405,34 +1403,6 @@ TEST_F(PipelineInitialSourceNSTest, AggregateOneNSValidForFacetPipelineRegardles ASSERT_OK(Pipeline::parseFacetPipeline(rawPipeline, ctx).getStatus()); } -TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsValidAsFirstStage) { - const std::vector<BSONObj> rawPipeline = {fromjson("{$changeNotification: {}}")}; - auto ctx = getExpCtx(); - ctx->ns = NamespaceString("a.collection"); - ASSERT_OK(Pipeline::parse(rawPipeline, ctx).getStatus()); -} - -TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsNotValidIfNotFirstStage) { - const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"), - fromjson("{$changeNotification: {}}")}; - auto ctx = getExpCtx(); - ctx->ns = NamespaceString("a.collection"); - auto parseStatus = Pipeline::parse(rawPipeline, ctx).getStatus(); - ASSERT_EQ(parseStatus, ErrorCodes::BadValue); - ASSERT_EQ(parseStatus.location(), 40549); -} - -TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsNotValidIfNotFirstStageInFacet) { - const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"), - fromjson("{$changeNotification: {}}")}; - auto ctx = getExpCtx(); - ctx->ns = NamespaceString("a.collection"); - auto parseStatus = Pipeline::parseFacetPipeline(rawPipeline, ctx).getStatus(); - ASSERT_EQ(parseStatus, ErrorCodes::BadValue); - ASSERT_EQ(parseStatus.location(), 40550); - ASSERT(std::string::npos != parseStatus.reason().find("$changeNotification")); -} - } // namespace Namespaces namespace Dependencies { @@ -1460,8 +1430,8 @@ class DocumentSourceDependencyDummy : public DocumentSourceMock { public: DocumentSourceDependencyDummy() : DocumentSourceMock({}) {} - StageConstraints constraints() const final { - return StageConstraints{}; // Overrides DocumentSourceMock's required position. + InitialSourceType getInitialSourceType() const final { + return InitialSourceType::kNotInitialSource; } }; diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index e21f8b2523c..ddee4d2960a 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -219,15 +219,20 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, mergeCtx->inRouter = true; // explicitly *not* setting mergeCtx->tempDir - auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), mergeCtx)); - pipeline->optimizePipeline(); + // Parse and optimize the pipeline specification. + auto pipeline = Pipeline::parse(request.getPipeline(), mergeCtx); + if (!pipeline.isOK()) { + return pipeline.getStatus(); + } + + pipeline.getValue()->optimizePipeline(); // If the first $match stage is an exact match on the shard key (with a simple collation or no // string matching), we only have to send it to one shard, so send the command to that shard. const bool singleShard = !namespaces.executionNss.isCollectionlessAggregateNS() && [&]() { invariant(chunkMgr); - BSONObj firstMatchQuery = pipeline->getInitialQuery(); + BSONObj firstMatchQuery = pipeline.getValue()->getInitialQuery(); BSONObj shardKeyMatches = uassertStatusOK( chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(opCtx, firstMatchQuery)); @@ -245,7 +250,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // Don't need to split pipeline if the first $match is an exact match on shard key, unless // there is a stage that needs to be run on the primary shard. - const bool needPrimaryShardMerger = pipeline->needsPrimaryShardMerger(); + const bool needPrimaryShardMerger = pipeline.getValue()->needsPrimaryShardMerger(); const bool needSplit = !singleShard || needPrimaryShardMerger; // Split the pipeline into pieces for mongod(s) and this mongos. It is illegal to use 'pipeline' @@ -253,10 +258,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForTargetedShards; std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMergingShard; if (needSplit) { - pipelineForTargetedShards = pipeline->splitForSharded(); - pipelineForMergingShard = std::move(pipeline); + pipelineForTargetedShards = pipeline.getValue()->splitForSharded(); + pipelineForMergingShard = std::move(pipeline.getValue()); } else { - pipelineForTargetedShards = std::move(pipeline); + pipelineForTargetedShards = std::move(pipeline.getValue()); } // Create the command for the shards. The 'fromRouter' field means produce output to be |