diff options
author | Eddie Louie <eddie.louie@mongodb.com> | 2017-08-01 19:47:26 -0400 |
---|---|---|
committer | Eddie Louie <eddie.louie@mongodb.com> | 2017-08-01 19:47:26 -0400 |
commit | babab967892f81f3107903cb41672503de791998 (patch) | |
tree | 2b5565bde7d88b251911215be7569766a25d5fdd /src/mongo/db/pipeline | |
parent | 2a76bd75d75197d3604643ff2b11d0a8f23c14f9 (diff) | |
download | mongo-babab967892f81f3107903cb41672503de791998.tar.gz |
Revert "SERVER-29506 Require $changeNotification to be the first stage."
This reverts commit 2431e1356823d898ef8af16997d6f63b65b385a5.
Diffstat (limited to 'src/mongo/db/pipeline')
27 files changed, 163 insertions, 282 deletions
diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp index 6e58adae2bf..9651c7cb8e8 100644 --- a/src/mongo/db/pipeline/document_source.cpp +++ b/src/mongo/db/pipeline/document_source.cpp @@ -164,7 +164,7 @@ Pipeline::SourceContainer::iterator DocumentSource::optimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this && (std::next(itr) != container->end())); auto nextMatch = dynamic_cast<DocumentSourceMatch*>((*std::next(itr)).get()); - if (constraints().canSwapWithMatch && nextMatch && !nextMatch->isTextQuery()) { + if (canSwapWithMatch() && nextMatch && !nextMatch->isTextQuery()) { // We're allowed to swap with a $match and the stage after us is a $match. Furthermore, the // $match does not contain a text search predicate, which we do not attempt to optimize // because such a $match must already be the first stage in the pipeline. We can attempt to diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 2545f02d8de..24f67e46a29 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -118,39 +118,14 @@ public: using Parser = stdx::function<std::vector<boost::intrusive_ptr<DocumentSource>>( BSONElement, const boost::intrusive_ptr<ExpressionContext>&)>; - /** - * A struct describing various constraints about where this stage can run, where it must be in - * the pipeline, and things like that. - */ - struct StageConstraints { - /** - * A Position describes a requirement of the position of the stage within the pipeline. - */ - enum class PositionRequirement { kNone, kFirst, kLast }; - - // Set if this stage needs to be in a particular position of the pipeline. - PositionRequirement requiredPosition = PositionRequirement::kNone; - - bool isAllowedInsideFacetStage = true; - - // True if this stage does not generate results itself, and instead pulls inputs from an - // input DocumentSource (via 'pSource'). - bool requiresInputDocSource = true; - - // True if this stage operates on a global or database level, like $currentOp. - bool isIndependentOfAnyCollection = false; - - // True if this stage can ever be safely swapped with a subsequent $match stage, provided - // that the match does not depend on the paths returned by getModifiedPaths(). - // - // Stages that want to participate in match swapping should set this to true. Such a stage - // must also override getModifiedPaths() to provide information about which particular - // $match predicates be swapped before itself. - bool canSwapWithMatch = false; - - // True if this stage must run on the primary shard when the collection being aggregated is - // sharded. - bool mustRunOnPrimaryShardIfSharded = false; + enum class InitialSourceType { + // Stage requires input from a preceding DocumentSource. + kNotInitialSource, + // Stage does not need an input source and should be the first stage in the pipeline. + kInitialSource, + // Similar to kInitialSource, but does not require an underlying collection to produce + // output. + kCollectionlessInitialSource }; /** @@ -244,14 +219,6 @@ public: virtual GetNextResult getNext() = 0; /** - * Returns a struct containing information about any special constraints imposed on using this - * stage. - */ - virtual StageConstraints constraints() const { - return StageConstraints{}; - } - - /** * Informs the stage that it is no longer needed and can release its resources. After dispose() * is called the stage must still be able to handle calls to getNext(), but can return kEOF. * @@ -292,6 +259,38 @@ public: boost::optional<ExplainOptions::Verbosity> explain = boost::none) const; /** + * Subclasses should return InitialSourceType::kInitialSource if the stage does not require an + * input source, or InitialSourceType::kCollectionlessInitialSource if the stage will produce + * the input for the pipeline independent of an underlying collection. The latter are specified + * with {aggregate: 1}, e.g. $currentOp. + */ + virtual InitialSourceType getInitialSourceType() const { + return InitialSourceType::kNotInitialSource; + } + + /** + * Returns true if this stage does not require an input source. + */ + bool isInitialSource() const { + return getInitialSourceType() != InitialSourceType::kNotInitialSource; + } + + /** + * Returns true if this stage will produce the input for the pipeline independent of an + * underlying collection. These are specified with {aggregate: 1}, e.g. $currentOp. + */ + bool isCollectionlessInitialSource() const { + return getInitialSourceType() == InitialSourceType::kCollectionlessInitialSource; + } + + /** + * Returns true if the DocumentSource needs to be run on the primary shard. + */ + virtual bool needsPrimaryShard() const { + return false; + } + + /** * If DocumentSource uses additional collections, it adds the namespaces to the input vector. */ virtual void addInvolvedCollections(std::vector<NamespaceString>* collections) const {} @@ -427,6 +426,18 @@ public: return {GetModPathsReturn::Type::kNotSupported, std::set<std::string>{}, {}}; } + /** + * Returns whether this stage can swap with a subsequent $match stage, provided that the match + * does not depend on the paths returned by getModifiedPaths(). + * + * Subclasses which want to participate in match swapping should override this to return true. + * Such a subclass must also override getModifiedPaths() to provide information about which + * $match predicates be swapped before itself. + */ + virtual bool canSwapWithMatch() const { + return false; + } + enum GetDepsReturn { // The full object and all metadata may be required. NOT_SUPPORTED = 0x0, diff --git a/src/mongo/db/pipeline/document_source_change_notification.cpp b/src/mongo/db/pipeline/document_source_change_notification.cpp index dff06f05802..b8d7027fc5f 100644 --- a/src/mongo/db/pipeline/document_source_change_notification.cpp +++ b/src/mongo/db/pipeline/document_source_change_notification.cpp @@ -40,7 +40,6 @@ namespace mongo { using boost::intrusive_ptr; -using boost::optional; using std::vector; using std::string; @@ -54,52 +53,7 @@ REGISTER_MULTI_STAGE_ALIAS(changeNotification, DocumentSourceChangeNotification::LiteParsed::parse, DocumentSourceChangeNotification::createFromBson); -namespace { - -static constexpr StringData kOplogMatchExplainName = "$_internalOplogMatch"_sd; - -/** - * A custom subclass of DocumentSourceMatch which does not serialize itself (since it came from an - * alias) and requires itself to be the first stage in the pipeline. - */ -class DocumentSourceOplogMatch final : public DocumentSourceMatch { -public: - static intrusive_ptr<DocumentSourceOplogMatch> create( - BSONObj filter, const intrusive_ptr<ExpressionContext>& expCtx) { - return new DocumentSourceOplogMatch(std::move(filter), expCtx); - } - - const char* getSourceName() const final { - // This is used in error reporting, particularly if we find this stage in a position other - // than first, so report the name as $changeNotification. - return "$changeNotification"; - } - - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; - constraints.isAllowedInsideFacetStage = false; - return constraints; - } - - /** - * Only serialize this stage for explain purposes, otherwise keep it hidden so that we can - * properly alias. - */ - Value serialize(optional<ExplainOptions::Verbosity> explain) const final { - if (explain) { - return Value(Document{{kOplogMatchExplainName, Document{}}}); - } - return Value(); - } - -private: - DocumentSourceOplogMatch(BSONObj filter, const intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSourceMatch(std::move(filter), expCtx) {} -}; -} // namespace - -BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString& nss) { +BSONObj DocumentSourceChangeNotification::buildMatch(BSONElement elem, const NamespaceString& nss) { auto target = nss.ns(); // 1) Supported commands that have the target db namespace (e.g. test.$cmd) in "ns" field. @@ -124,7 +78,9 @@ BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString auto opMatch = BSON("ns" << target); // Match oplog entries after "start" and are either (3) supported commands or (4) CRUD ops. - return BSON("ts" << GT << Timestamp() << "$or" << BSON_ARRAY(opMatch << commandMatch)); + auto matchFilter = + BSON("ts" << GT << Timestamp() << "$or" << BSON_ARRAY(opMatch << commandMatch)); + return BSON("$match" << matchFilter); } vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFromBson( @@ -137,11 +93,11 @@ vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFr "Only default collation is allowed when using a $changeNotification stage.", !expCtx->getCollator()); - BSONObj filter = buildMatchFilter(expCtx->ns); + BSONObj matchObj = buildMatch(elem, expCtx->ns); - auto oplogMatch = DocumentSourceOplogMatch::create(filter, expCtx); - auto transformation = createTransformationStage(expCtx); - return {oplogMatch, transformation}; + auto matchSource = DocumentSourceMatch::createFromBson(matchObj.firstElement(), expCtx); + auto transformSource = createTransformationStage(expCtx); + return {matchSource, transformSource}; } intrusive_ptr<DocumentSource> DocumentSourceChangeNotification::createTransformationStage( @@ -238,8 +194,6 @@ Document DocumentSourceChangeNotification::Transformation::applyTransformation( Document DocumentSourceChangeNotification::Transformation::serializeStageOptions( boost::optional<ExplainOptions::Verbosity> explain) const { - // TODO SERVER-29135 Be sure to re-serialize the 'postImage' argument. - // TODO SERVER-29131 Be sure to re-serialize the 'resumeAfter' argument. return Document(); } diff --git a/src/mongo/db/pipeline/document_source_change_notification.h b/src/mongo/db/pipeline/document_source_change_notification.h index 3b38a908626..e5a1186f04e 100644 --- a/src/mongo/db/pipeline/document_source_change_notification.h +++ b/src/mongo/db/pipeline/document_source_change_notification.h @@ -77,7 +77,7 @@ public: /** * Produce the BSON for the $match stage based on a $changeNotification stage. */ - static BSONObj buildMatchFilter(const NamespaceString& nss); + static BSONObj buildMatch(BSONElement elem, const NamespaceString& nss); /** * Parses a $changeNotification stage from 'elem' and produces the $match and transformation diff --git a/src/mongo/db/pipeline/document_source_change_notification_test.cpp b/src/mongo/db/pipeline/document_source_change_notification_test.cpp index b3595789577..0fd14fd9a92 100644 --- a/src/mongo/db/pipeline/document_source_change_notification_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_notification_test.cpp @@ -101,8 +101,7 @@ TEST_F(ChangeNotificationStageTest, StagesGeneratedCorrectly) { DocumentSourceChangeNotification::createFromBson(spec.firstElement(), getExpCtx()); ASSERT_EQUALS(result.size(), 2UL); - ASSERT_TRUE(dynamic_cast<DocumentSourceMatch*>(result[0].get())); - ASSERT_EQUALS(string(result[0]->getSourceName()), "$changeNotification"); + ASSERT_EQUALS(string(result[0]->getSourceName()), "$match"); ASSERT_EQUALS(string(result[1]->getSourceName()), "$changeNotification"); // TODO: Check explain result. diff --git a/src/mongo/db/pipeline/document_source_coll_stats.cpp b/src/mongo/db/pipeline/document_source_coll_stats.cpp index 73a182145ac..7f025a29606 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.cpp +++ b/src/mongo/db/pipeline/document_source_coll_stats.cpp @@ -148,6 +148,10 @@ DocumentSource::GetNextResult DocumentSourceCollStats::getNext() { return {Document(builder.obj())}; } +DocumentSource::InitialSourceType DocumentSourceCollStats::getInitialSourceType() const { + return InitialSourceType::kInitialSource; +} + Value DocumentSourceCollStats::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { return Value(Document{{getSourceName(), _collStatsSpec}}); } diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h index 6334bbe55c8..2506c1be823 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.h +++ b/src/mongo/db/pipeline/document_source_coll_stats.h @@ -74,13 +74,7 @@ public: const char* getSourceName() const final; - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; - constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; - return constraints; - } + InitialSourceType getInitialSourceType() const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; diff --git a/src/mongo/db/pipeline/document_source_current_op.cpp b/src/mongo/db/pipeline/document_source_current_op.cpp index 674d0026ad5..49f3dfc9d70 100644 --- a/src/mongo/db/pipeline/document_source_current_op.cpp +++ b/src/mongo/db/pipeline/document_source_current_op.cpp @@ -86,6 +86,10 @@ const char* DocumentSourceCurrentOp::getSourceName() const { return "$currentOp"; } +DocumentSource::InitialSourceType DocumentSourceCurrentOp::getInitialSourceType() const { + return InitialSourceType::kCollectionlessInitialSource; +} + DocumentSource::GetNextResult DocumentSourceCurrentOp::getNext() { pExpCtx->checkForInterrupt(); diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h index cd86d027733..4526134c13f 100644 --- a/src/mongo/db/pipeline/document_source_current_op.h +++ b/src/mongo/db/pipeline/document_source_current_op.h @@ -78,14 +78,7 @@ public: const char* getSourceName() const final; - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; - constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; - constraints.isIndependentOfAnyCollection = true; - return constraints; - } + InitialSourceType getInitialSourceType() const final; static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index 57a41996af9..b3cdf802d4e 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -50,12 +50,8 @@ public: return _outputSorts; } Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; - constraints.requiresInputDocSource = false; - return constraints; + InitialSourceType getInitialSourceType() const final { + return InitialSourceType::kInitialSource; } void detachFromOperationContext() final; diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index aba1f5f115c..992f3daac11 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -99,6 +99,13 @@ vector<pair<string, vector<BSONObj>>> extractRawPipelines(const BSONElement& ele << ": " << subPipeElem, subPipeElem.type() == BSONType::Object); + auto stageName = subPipeElem.Obj().firstElementFieldName(); + uassert( + 40331, + str::stream() << "specified stage is not allowed to be used within a $facet stage: " + << subPipeElem, + !str::equals(stageName, "$out") && !str::equals(stageName, "$facet")); + rawPipeline.push_back(subPipeElem.embeddedObject()); } @@ -237,22 +244,16 @@ void DocumentSourceFacet::doReattachToOperationContext(OperationContext* opCtx) } } -DocumentSource::StageConstraints DocumentSourceFacet::constraints() const { - StageConstraints constraints; - constraints.isAllowedInsideFacetStage = false; // Disallow nested $facets. - +bool DocumentSourceFacet::needsPrimaryShard() const { + // Currently we don't split $facet to have a merger part and a shards part (see SERVER-24154). + // This means that if any stage in any of the $facet pipelines requires the primary shard, then + // the entire $facet must happen on the merger, and the merger must be the primary shard. for (auto&& facet : _facets) { - for (auto&& nestedStage : facet.pipeline->getSources()) { - if (nestedStage->constraints().mustRunOnPrimaryShardIfSharded) { - // Currently we don't split $facet to have a merger part and a shards part (see - // SERVER-24154). This means that if any stage in any of the $facet pipelines - // requires the primary shard, then the entire $facet must happen on the merger, and - // the merger must be the primary shard. - constraints.mustRunOnPrimaryShardIfSharded = true; - } + if (facet.pipeline->needsPrimaryShardMerger()) { + return true; } } - return constraints; + return false; } DocumentSource::GetDepsReturn DocumentSourceFacet::getDependencies(DepsTracker* deps) const { diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h index 9ca2ea149ec..f06f73c7b27 100644 --- a/src/mongo/db/pipeline/document_source_facet.h +++ b/src/mongo/db/pipeline/document_source_facet.h @@ -135,7 +135,7 @@ public: void doInjectMongodInterface(std::shared_ptr<MongodInterface> mongod) final; void doDetachFromOperationContext() final; void doReattachToOperationContext(OperationContext* opCtx) final; - StageConstraints constraints() const final; + bool needsPrimaryShard() const final; protected: void doDispose() final; diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp index 60aefdead64..a41b3a2b1ce 100644 --- a/src/mongo/db/pipeline/document_source_facet_test.cpp +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -111,6 +111,16 @@ TEST_F(DocumentSourceFacetTest, ShouldRejectEmptyPipelines) { ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); } +TEST_F(DocumentSourceFacetTest, ShouldRejectFacetsWithStagesThatMustBeTheFirstStage) { + auto ctx = getExpCtx(); + auto spec = BSON("$facet" << BSON("a" << BSON_ARRAY(BSON("$indexStats" << BSONObj())))); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); + + spec = BSON("$facet" << BSON( + "a" << BSON_ARRAY(BSON("$limit" << 1) << BSON("$indexStats" << BSONObj())))); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); +} + TEST_F(DocumentSourceFacetTest, ShouldSucceedWhenNamespaceIsCollectionless) { auto ctx = getExpCtx(); auto spec = fromjson("{$facet: {a: [{$match: {}}]}}"); @@ -168,10 +178,9 @@ class DocumentSourcePassthrough : public DocumentSourceMock { public: DocumentSourcePassthrough() : DocumentSourceMock({}) {} - StageConstraints constraints() const override { - StageConstraints constraints; - constraints.isAllowedInsideFacetStage = true; - return constraints; + // We need this to be false so that it can be used in a $facet stage. + InitialSourceType getInitialSourceType() const final { + return InitialSourceType::kNotInitialSource; } DocumentSource::GetNextResult getNext() final { @@ -607,10 +616,8 @@ TEST_F(DocumentSourceFacetTest, ShouldThrowIfAnyPipelineRequiresTextScoreButItIs */ class DocumentSourceNeedsPrimaryShard final : public DocumentSourcePassthrough { public: - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.mustRunOnPrimaryShardIfSharded = true; - return constraints; + bool needsPrimaryShard() const final { + return true; } static boost::intrusive_ptr<DocumentSourceNeedsPrimaryShard> create() { @@ -633,7 +640,7 @@ TEST_F(DocumentSourceFacetTest, ShouldRequirePrimaryShardIfAnyStageRequiresPrima facets.emplace_back("needsPrimaryShard", std::move(secondPipeline)); auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); - ASSERT_TRUE(facetStage->constraints().mustRunOnPrimaryShardIfSharded); + ASSERT_TRUE(facetStage->needsPrimaryShard()); } TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPrimaryShard) { @@ -652,7 +659,7 @@ TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPr facets.emplace_back("second", std::move(secondPipeline)); auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); - ASSERT_FALSE(facetStage->constraints().mustRunOnPrimaryShardIfSharded); + ASSERT_FALSE(facetStage->needsPrimaryShard()); } } // namespace diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h index 5df25d6c1c6..5178a89c18a 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.h +++ b/src/mongo/db/pipeline/document_source_geo_near.h @@ -46,15 +46,9 @@ public: */ Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) final; - - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; - constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; - return constraints; + InitialSourceType getInitialSourceType() const final { + return InitialSourceType::kInitialSource; } - Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; BSONObjSet getOutputSorts() final { return SimpleBSONObjComparator::kInstance.makeBSONObjSet( diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h index 489a80ea7b9..6572fc4741f 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.h +++ b/src/mongo/db/pipeline/document_source_graph_lookup.h @@ -53,11 +53,8 @@ public: */ GetModPathsReturn getModifiedPaths() const final; - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.canSwapWithMatch = true; - constraints.mustRunOnPrimaryShardIfSharded = true; - return constraints; + bool canSwapWithMatch() const final { + return true; } GetDepsReturn getDependencies(DepsTracker* deps) const final { @@ -65,6 +62,10 @@ public: return SEE_NEXT; }; + bool needsPrimaryShard() const final { + return true; + } + void addInvolvedCollections(std::vector<NamespaceString>* collections) const final { collections->push_back(_from); } diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h index dd62f5323bd..7d25aca6c9f 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.h +++ b/src/mongo/db/pipeline/document_source_index_stats.h @@ -69,12 +69,8 @@ public: const char* getSourceName() const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; - constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; - return constraints; + virtual InitialSourceType getInitialSourceType() const final { + return InitialSourceType::kInitialSource; } static boost::intrusive_ptr<DocumentSource> createFromBson( diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index d5bab6fd9e7..bb4fae8df2e 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -94,11 +94,8 @@ public: */ GetModPathsReturn getModifiedPaths() const final; - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.canSwapWithMatch = true; - constraints.mustRunOnPrimaryShardIfSharded = true; - return constraints; + bool canSwapWithMatch() const final { + return true; } GetDepsReturn getDependencies(DepsTracker* deps) const final; @@ -107,6 +104,10 @@ public: return DocumentSource::truncateSortSet(pSource->getOutputSorts(), {_as.fullPath()}); } + bool needsPrimaryShard() const final { + return true; + } + boost::intrusive_ptr<DocumentSource> getShardSource() final { return nullptr; } diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h index 2be99dd7f12..0b5800515aa 100644 --- a/src/mongo/db/pipeline/document_source_match.h +++ b/src/mongo/db/pipeline/document_source_match.h @@ -37,21 +37,18 @@ namespace mongo { -class DocumentSourceMatch : public DocumentSource { +class DocumentSourceMatch final : public DocumentSource { public: - virtual ~DocumentSourceMatch() = default; - + // virtuals from DocumentSource GetNextResult getNext() final; + const char* getSourceName() const final; + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; boost::intrusive_ptr<DocumentSource> optimize() final; BSONObjSet getOutputSorts() final { return pSource ? pSource->getOutputSorts() : SimpleBSONObjComparator::kInstance.makeBSONObjSet(); } - const char* getSourceName() const override; - Value serialize( - boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override; - /** * Attempts to combine with any subsequent $match stages, joining the query objects with a * $and. @@ -144,11 +141,10 @@ public: const std::string& path, const boost::intrusive_ptr<ExpressionContext>& expCtx); -protected: +private: DocumentSourceMatch(const BSONObj& query, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); -private: void addDependencies(DepsTracker* deps) const; std::unique_ptr<MatchExpression> _expression; diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h index 045ddad4836..ebe0a861ad6 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.h +++ b/src/mongo/db/pipeline/document_source_merge_cursors.h @@ -52,13 +52,8 @@ public: GetNextResult getNext() final; const char* getSourceName() const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; - constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; - return constraints; + InitialSourceType getInitialSourceType() const final { + return InitialSourceType::kInitialSource; } static boost::intrusive_ptr<DocumentSource> createFromBson( diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h index 513994cd599..9236e55ad62 100644 --- a/src/mongo/db/pipeline/document_source_mock.h +++ b/src/mongo/db/pipeline/document_source_mock.h @@ -48,15 +48,9 @@ public: const char* getSourceName() const override; Value serialize( boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override; - - StageConstraints constraints() const override { - StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; - constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; - return constraints; + InitialSourceType getInitialSourceType() const override { + return InitialSourceType::kInitialSource; } - BSONObjSet getOutputSorts() override { return sorts; } diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 9366d01a670..3b134f39d5e 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -43,13 +43,8 @@ public: const char* getSourceName() const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; GetDepsReturn getDependencies(DepsTracker* deps) const final; - - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.mustRunOnPrimaryShardIfSharded = true; - constraints.isAllowedInsideFacetStage = false; - constraints.requiredPosition = StageConstraints::PositionRequirement::kLast; - return constraints; + bool needsPrimaryShard() const final { + return true; } // Virtuals for SplittableDocumentSource diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index 26d394d05e9..a3bea23795e 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -101,10 +101,8 @@ public: DocumentSource::GetDepsReturn getDependencies(DepsTracker* deps) const final; GetModPathsReturn getModifiedPaths() const final; - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.canSwapWithMatch = true; - return constraints; + bool canSwapWithMatch() const final { + return true; } TransformerInterface::TransformerType getType() const { diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index 866a3fb955c..cb1c709ecde 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -52,12 +52,10 @@ public: return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}}; } - StageConstraints constraints() const final { - StageConstraints constraints; + bool canSwapWithMatch() const final { // Can't swap with a $match if a limit has been absorbed, since in general match can't swap // with limit. - constraints.canSwapWithMatch = !limitSrc; - return constraints; + return !limitSrc; } BSONObjSet getOutputSorts() final { diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h index fb7bd68ba39..53fbd33d083 100644 --- a/src/mongo/db/pipeline/document_source_unwind.h +++ b/src/mongo/db/pipeline/document_source_unwind.h @@ -46,10 +46,8 @@ public: */ GetModPathsReturn getModifiedPaths() const final; - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.canSwapWithMatch = true; - return constraints; + bool canSwapWithMatch() const final { + return true; } GetDepsReturn getDependencies(DepsTracker* deps) const final; diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 31cd015ea97..63dae04e325 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -32,8 +32,6 @@ #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/pipeline_optimizations.h" -#include <algorithm> - #include "mongo/base/error_codes.h" #include "mongo/db/bson/dotted_path_support.h" #include "mongo/db/catalog/document_validation.h" @@ -136,16 +134,14 @@ Status Pipeline::validatePipeline() const { // {aggregate: 1} is only valid for collectionless sources, and vice-versa. const auto firstStage = _sources.front().get(); - if (nss.isCollectionlessAggregateNS() && - !firstStage->constraints().isIndependentOfAnyCollection) { + if (nss.isCollectionlessAggregateNS() && !firstStage->isCollectionlessInitialSource()) { return {ErrorCodes::InvalidNamespace, str::stream() << "{aggregate: 1} is not valid for '" << firstStage->getSourceName() << "'; a collection is required."}; } - if (!nss.isCollectionlessAggregateNS() && - firstStage->constraints().isIndependentOfAnyCollection) { + if (!nss.isCollectionlessAggregateNS() && firstStage->isCollectionlessInitialSource()) { return {ErrorCodes::InvalidNamespace, str::stream() << "'" << firstStage->getSourceName() << "' can only be run with {aggregate: 1}"}; @@ -158,21 +154,11 @@ Status Pipeline::validatePipeline() const { Status Pipeline::validateFacetPipeline() const { if (_sources.empty()) { - return {ErrorCodes::BadValue, "sub-pipeline in $facet stage cannot be empty"}; - } - for (auto&& stage : _sources) { - auto stageConstraints = stage->constraints(); - if (!stageConstraints.isAllowedInsideFacetStage) { - return {ErrorCodes::BadValue, - str::stream() << stage->getSourceName() - << " is not allowed to be used within a $facet stage", - 40550}; - } - // We expect a stage within a $facet stage to have these properties. - invariant(stageConstraints.requiresInputDocSource); - invariant(!stageConstraints.isIndependentOfAnyCollection); - invariant(stageConstraints.requiredPosition == - DocumentSource::StageConstraints::PositionRequirement::kNone); + return {ErrorCodes::BadValue, "sub-pipeline in $facet stage cannot be empty."}; + } else if (_sources.front()->isInitialSource()) { + return {ErrorCodes::BadValue, + str::stream() << _sources.front()->getSourceName() + << " is not allowed to be used within a $facet stage."}; } // Facet pipelines cannot have any stages which are initial sources. We've already validated the @@ -184,13 +170,10 @@ Status Pipeline::validateFacetPipeline() const { Status Pipeline::ensureAllStagesAreInLegalPositions() const { size_t i = 0; for (auto&& stage : _sources) { - if (stage->constraints().requiredPosition == - DocumentSource::StageConstraints::PositionRequirement::kFirst && - i != 0) { + if (stage->isInitialSource() && i != 0) { return {ErrorCodes::BadValue, str::stream() << stage->getSourceName() - << " is only valid as the first stage in a pipeline.", - 40549}; + << " is only valid as the first stage in a pipeline."}; } auto matchStage = dynamic_cast<DocumentSourceMatch*>(stage.get()); if (i != 0 && matchStage && matchStage->isTextQuery()) { @@ -199,13 +182,8 @@ Status Pipeline::ensureAllStagesAreInLegalPositions() const { 17313}; } - if (stage->constraints().requiredPosition == - DocumentSource::StageConstraints::PositionRequirement::kLast && - i != _sources.size() - 1) { - return {ErrorCodes::BadValue, - str::stream() << stage->getSourceName() - << " can only be the final stage in the pipeline", - 40551}; + if (dynamic_cast<DocumentSourceOut*>(stage.get()) && i != _sources.size() - 1) { + return {ErrorCodes::BadValue, "$out can only be the final stage in the pipeline"}; } ++i; } @@ -395,9 +373,12 @@ BSONObj Pipeline::getInitialQuery() const { } bool Pipeline::needsPrimaryShardMerger() const { - return std::any_of(_sources.begin(), _sources.end(), [](const auto& stage) { - return stage->constraints().mustRunOnPrimaryShardIfSharded; - }); + for (auto&& source : _sources) { + if (source->needsPrimaryShard()) { + return true; + } + } + return false; } std::vector<NamespaceString> Pipeline::getInvolvedCollections() const { diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index f1222797f11..697c639ccf7 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -454,7 +454,8 @@ void PipelineD::prepareCursorSource(Collection* collection, } } - if (!sources.empty() && !sources.front()->constraints().requiresInputDocSource) { + // If the first stage of the pipeline is an initial source, we don't need an input cursor. + if (!sources.empty() && sources.front()->isInitialSource()) { return; } diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 1f37b30fc90..8dbc02fc88a 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -1349,10 +1349,8 @@ class DocumentSourceCollectionlessMock : public DocumentSourceMock { public: DocumentSourceCollectionlessMock() : DocumentSourceMock({}) {} - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.isIndependentOfAnyCollection = true; - return constraints; + InitialSourceType getInitialSourceType() const final { + return InitialSourceType::kCollectionlessInitialSource; } static boost::intrusive_ptr<DocumentSourceCollectionlessMock> create() { @@ -1405,34 +1403,6 @@ TEST_F(PipelineInitialSourceNSTest, AggregateOneNSValidForFacetPipelineRegardles ASSERT_OK(Pipeline::parseFacetPipeline(rawPipeline, ctx).getStatus()); } -TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsValidAsFirstStage) { - const std::vector<BSONObj> rawPipeline = {fromjson("{$changeNotification: {}}")}; - auto ctx = getExpCtx(); - ctx->ns = NamespaceString("a.collection"); - ASSERT_OK(Pipeline::parse(rawPipeline, ctx).getStatus()); -} - -TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsNotValidIfNotFirstStage) { - const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"), - fromjson("{$changeNotification: {}}")}; - auto ctx = getExpCtx(); - ctx->ns = NamespaceString("a.collection"); - auto parseStatus = Pipeline::parse(rawPipeline, ctx).getStatus(); - ASSERT_EQ(parseStatus, ErrorCodes::BadValue); - ASSERT_EQ(parseStatus.location(), 40549); -} - -TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsNotValidIfNotFirstStageInFacet) { - const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"), - fromjson("{$changeNotification: {}}")}; - auto ctx = getExpCtx(); - ctx->ns = NamespaceString("a.collection"); - auto parseStatus = Pipeline::parseFacetPipeline(rawPipeline, ctx).getStatus(); - ASSERT_EQ(parseStatus, ErrorCodes::BadValue); - ASSERT_EQ(parseStatus.location(), 40550); - ASSERT(std::string::npos != parseStatus.reason().find("$changeNotification")); -} - } // namespace Namespaces namespace Dependencies { @@ -1460,8 +1430,8 @@ class DocumentSourceDependencyDummy : public DocumentSourceMock { public: DocumentSourceDependencyDummy() : DocumentSourceMock({}) {} - StageConstraints constraints() const final { - return StageConstraints{}; // Overrides DocumentSourceMock's required position. + InitialSourceType getInitialSourceType() const final { + return InitialSourceType::kNotInitialSource; } }; |