diff options
author | Eddie Louie <eddie.louie@mongodb.com> | 2017-08-01 20:15:42 -0400 |
---|---|---|
committer | Eddie Louie <eddie.louie@mongodb.com> | 2017-08-01 20:15:42 -0400 |
commit | fd01541d77ca1455c603c411f395d6ce34fab6f1 (patch) | |
tree | 10a26d5aec28d9d912c2020a9e4c87064382cf09 /src | |
parent | babab967892f81f3107903cb41672503de791998 (diff) | |
download | mongo-fd01541d77ca1455c603c411f395d6ce34fab6f1.tar.gz |
SERVER-29506 Require $changeNotification to be the first stage.
This reverts commit babab967892f81f3107903cb41672503de791998.
Diffstat (limited to 'src')
29 files changed, 292 insertions, 183 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 3d6b8a25d16..d77c3a51a23 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -400,15 +400,10 @@ Status runAggregate(OperationContext* opCtx, expCtx->tailableMode = ExpressionContext::TailableMode::kTailableAndAwaitData; } - // Parse the pipeline. - auto statusWithPipeline = Pipeline::parse(request.getPipeline(), expCtx); - if (!statusWithPipeline.isOK()) { - return statusWithPipeline.getStatus(); - } - auto pipeline = std::move(statusWithPipeline.getValue()); + auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx)); - // Check that the view's collation matches the collation of any views involved - // in the pipeline. + // Check that the view's collation matches the collation of any views involved in the + // pipeline. if (!pipelineInvolvedNamespaces.empty()) { invariant(ctx); auto pipelineCollationStatus = collatorCompatibleWithPipeline( diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp index 9651c7cb8e8..6e58adae2bf 100644 --- a/src/mongo/db/pipeline/document_source.cpp +++ b/src/mongo/db/pipeline/document_source.cpp @@ -164,7 +164,7 @@ Pipeline::SourceContainer::iterator DocumentSource::optimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this && (std::next(itr) != container->end())); auto nextMatch = dynamic_cast<DocumentSourceMatch*>((*std::next(itr)).get()); - if (canSwapWithMatch() && nextMatch && !nextMatch->isTextQuery()) { + if (constraints().canSwapWithMatch && nextMatch && !nextMatch->isTextQuery()) { // We're allowed to swap with a $match and the stage after us is a $match. Furthermore, the // $match does not contain a text search predicate, which we do not attempt to optimize // because such a $match must already be the first stage in the pipeline. We can attempt to diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 24f67e46a29..2545f02d8de 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -118,14 +118,39 @@ public: using Parser = stdx::function<std::vector<boost::intrusive_ptr<DocumentSource>>( BSONElement, const boost::intrusive_ptr<ExpressionContext>&)>; - enum class InitialSourceType { - // Stage requires input from a preceding DocumentSource. - kNotInitialSource, - // Stage does not need an input source and should be the first stage in the pipeline. - kInitialSource, - // Similar to kInitialSource, but does not require an underlying collection to produce - // output. - kCollectionlessInitialSource + /** + * A struct describing various constraints about where this stage can run, where it must be in + * the pipeline, and things like that. + */ + struct StageConstraints { + /** + * A Position describes a requirement of the position of the stage within the pipeline. + */ + enum class PositionRequirement { kNone, kFirst, kLast }; + + // Set if this stage needs to be in a particular position of the pipeline. + PositionRequirement requiredPosition = PositionRequirement::kNone; + + bool isAllowedInsideFacetStage = true; + + // True if this stage does not generate results itself, and instead pulls inputs from an + // input DocumentSource (via 'pSource'). + bool requiresInputDocSource = true; + + // True if this stage operates on a global or database level, like $currentOp. + bool isIndependentOfAnyCollection = false; + + // True if this stage can ever be safely swapped with a subsequent $match stage, provided + // that the match does not depend on the paths returned by getModifiedPaths(). + // + // Stages that want to participate in match swapping should set this to true. Such a stage + // must also override getModifiedPaths() to provide information about which particular + // $match predicates be swapped before itself. + bool canSwapWithMatch = false; + + // True if this stage must run on the primary shard when the collection being aggregated is + // sharded. + bool mustRunOnPrimaryShardIfSharded = false; }; /** @@ -219,6 +244,14 @@ public: virtual GetNextResult getNext() = 0; /** + * Returns a struct containing information about any special constraints imposed on using this + * stage. + */ + virtual StageConstraints constraints() const { + return StageConstraints{}; + } + + /** * Informs the stage that it is no longer needed and can release its resources. After dispose() * is called the stage must still be able to handle calls to getNext(), but can return kEOF. * @@ -259,38 +292,6 @@ public: boost::optional<ExplainOptions::Verbosity> explain = boost::none) const; /** - * Subclasses should return InitialSourceType::kInitialSource if the stage does not require an - * input source, or InitialSourceType::kCollectionlessInitialSource if the stage will produce - * the input for the pipeline independent of an underlying collection. The latter are specified - * with {aggregate: 1}, e.g. $currentOp. - */ - virtual InitialSourceType getInitialSourceType() const { - return InitialSourceType::kNotInitialSource; - } - - /** - * Returns true if this stage does not require an input source. - */ - bool isInitialSource() const { - return getInitialSourceType() != InitialSourceType::kNotInitialSource; - } - - /** - * Returns true if this stage will produce the input for the pipeline independent of an - * underlying collection. These are specified with {aggregate: 1}, e.g. $currentOp. - */ - bool isCollectionlessInitialSource() const { - return getInitialSourceType() == InitialSourceType::kCollectionlessInitialSource; - } - - /** - * Returns true if the DocumentSource needs to be run on the primary shard. - */ - virtual bool needsPrimaryShard() const { - return false; - } - - /** * If DocumentSource uses additional collections, it adds the namespaces to the input vector. */ virtual void addInvolvedCollections(std::vector<NamespaceString>* collections) const {} @@ -426,18 +427,6 @@ public: return {GetModPathsReturn::Type::kNotSupported, std::set<std::string>{}, {}}; } - /** - * Returns whether this stage can swap with a subsequent $match stage, provided that the match - * does not depend on the paths returned by getModifiedPaths(). - * - * Subclasses which want to participate in match swapping should override this to return true. - * Such a subclass must also override getModifiedPaths() to provide information about which - * $match predicates be swapped before itself. - */ - virtual bool canSwapWithMatch() const { - return false; - } - enum GetDepsReturn { // The full object and all metadata may be required. NOT_SUPPORTED = 0x0, diff --git a/src/mongo/db/pipeline/document_source_change_notification.cpp b/src/mongo/db/pipeline/document_source_change_notification.cpp index b8d7027fc5f..dff06f05802 100644 --- a/src/mongo/db/pipeline/document_source_change_notification.cpp +++ b/src/mongo/db/pipeline/document_source_change_notification.cpp @@ -40,6 +40,7 @@ namespace mongo { using boost::intrusive_ptr; +using boost::optional; using std::vector; using std::string; @@ -53,7 +54,52 @@ REGISTER_MULTI_STAGE_ALIAS(changeNotification, DocumentSourceChangeNotification::LiteParsed::parse, DocumentSourceChangeNotification::createFromBson); -BSONObj DocumentSourceChangeNotification::buildMatch(BSONElement elem, const NamespaceString& nss) { +namespace { + +static constexpr StringData kOplogMatchExplainName = "$_internalOplogMatch"_sd; + +/** + * A custom subclass of DocumentSourceMatch which does not serialize itself (since it came from an + * alias) and requires itself to be the first stage in the pipeline. + */ +class DocumentSourceOplogMatch final : public DocumentSourceMatch { +public: + static intrusive_ptr<DocumentSourceOplogMatch> create( + BSONObj filter, const intrusive_ptr<ExpressionContext>& expCtx) { + return new DocumentSourceOplogMatch(std::move(filter), expCtx); + } + + const char* getSourceName() const final { + // This is used in error reporting, particularly if we find this stage in a position other + // than first, so report the name as $changeNotification. + return "$changeNotification"; + } + + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.isAllowedInsideFacetStage = false; + return constraints; + } + + /** + * Only serialize this stage for explain purposes, otherwise keep it hidden so that we can + * properly alias. + */ + Value serialize(optional<ExplainOptions::Verbosity> explain) const final { + if (explain) { + return Value(Document{{kOplogMatchExplainName, Document{}}}); + } + return Value(); + } + +private: + DocumentSourceOplogMatch(BSONObj filter, const intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSourceMatch(std::move(filter), expCtx) {} +}; +} // namespace + +BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString& nss) { auto target = nss.ns(); // 1) Supported commands that have the target db namespace (e.g. test.$cmd) in "ns" field. @@ -78,9 +124,7 @@ BSONObj DocumentSourceChangeNotification::buildMatch(BSONElement elem, const Nam auto opMatch = BSON("ns" << target); // Match oplog entries after "start" and are either (3) supported commands or (4) CRUD ops. - auto matchFilter = - BSON("ts" << GT << Timestamp() << "$or" << BSON_ARRAY(opMatch << commandMatch)); - return BSON("$match" << matchFilter); + return BSON("ts" << GT << Timestamp() << "$or" << BSON_ARRAY(opMatch << commandMatch)); } vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFromBson( @@ -93,11 +137,11 @@ vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFr "Only default collation is allowed when using a $changeNotification stage.", !expCtx->getCollator()); - BSONObj matchObj = buildMatch(elem, expCtx->ns); + BSONObj filter = buildMatchFilter(expCtx->ns); - auto matchSource = DocumentSourceMatch::createFromBson(matchObj.firstElement(), expCtx); - auto transformSource = createTransformationStage(expCtx); - return {matchSource, transformSource}; + auto oplogMatch = DocumentSourceOplogMatch::create(filter, expCtx); + auto transformation = createTransformationStage(expCtx); + return {oplogMatch, transformation}; } intrusive_ptr<DocumentSource> DocumentSourceChangeNotification::createTransformationStage( @@ -194,6 +238,8 @@ Document DocumentSourceChangeNotification::Transformation::applyTransformation( Document DocumentSourceChangeNotification::Transformation::serializeStageOptions( boost::optional<ExplainOptions::Verbosity> explain) const { + // TODO SERVER-29135 Be sure to re-serialize the 'postImage' argument. + // TODO SERVER-29131 Be sure to re-serialize the 'resumeAfter' argument. return Document(); } diff --git a/src/mongo/db/pipeline/document_source_change_notification.h b/src/mongo/db/pipeline/document_source_change_notification.h index e5a1186f04e..3b38a908626 100644 --- a/src/mongo/db/pipeline/document_source_change_notification.h +++ b/src/mongo/db/pipeline/document_source_change_notification.h @@ -77,7 +77,7 @@ public: /** * Produce the BSON for the $match stage based on a $changeNotification stage. */ - static BSONObj buildMatch(BSONElement elem, const NamespaceString& nss); + static BSONObj buildMatchFilter(const NamespaceString& nss); /** * Parses a $changeNotification stage from 'elem' and produces the $match and transformation diff --git a/src/mongo/db/pipeline/document_source_change_notification_test.cpp b/src/mongo/db/pipeline/document_source_change_notification_test.cpp index 0fd14fd9a92..b3595789577 100644 --- a/src/mongo/db/pipeline/document_source_change_notification_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_notification_test.cpp @@ -101,7 +101,8 @@ TEST_F(ChangeNotificationStageTest, StagesGeneratedCorrectly) { DocumentSourceChangeNotification::createFromBson(spec.firstElement(), getExpCtx()); ASSERT_EQUALS(result.size(), 2UL); - ASSERT_EQUALS(string(result[0]->getSourceName()), "$match"); + ASSERT_TRUE(dynamic_cast<DocumentSourceMatch*>(result[0].get())); + ASSERT_EQUALS(string(result[0]->getSourceName()), "$changeNotification"); ASSERT_EQUALS(string(result[1]->getSourceName()), "$changeNotification"); // TODO: Check explain result. diff --git a/src/mongo/db/pipeline/document_source_coll_stats.cpp b/src/mongo/db/pipeline/document_source_coll_stats.cpp index 7f025a29606..73a182145ac 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.cpp +++ b/src/mongo/db/pipeline/document_source_coll_stats.cpp @@ -148,10 +148,6 @@ DocumentSource::GetNextResult DocumentSourceCollStats::getNext() { return {Document(builder.obj())}; } -DocumentSource::InitialSourceType DocumentSourceCollStats::getInitialSourceType() const { - return InitialSourceType::kInitialSource; -} - Value DocumentSourceCollStats::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { return Value(Document{{getSourceName(), _collStatsSpec}}); } diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h index 2506c1be823..6334bbe55c8 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.h +++ b/src/mongo/db/pipeline/document_source_coll_stats.h @@ -74,7 +74,13 @@ public: const char* getSourceName() const final; - InitialSourceType getInitialSourceType() const final; + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.requiresInputDocSource = false; + constraints.isAllowedInsideFacetStage = false; + return constraints; + } Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; diff --git a/src/mongo/db/pipeline/document_source_current_op.cpp b/src/mongo/db/pipeline/document_source_current_op.cpp index 49f3dfc9d70..674d0026ad5 100644 --- a/src/mongo/db/pipeline/document_source_current_op.cpp +++ b/src/mongo/db/pipeline/document_source_current_op.cpp @@ -86,10 +86,6 @@ const char* DocumentSourceCurrentOp::getSourceName() const { return "$currentOp"; } -DocumentSource::InitialSourceType DocumentSourceCurrentOp::getInitialSourceType() const { - return InitialSourceType::kCollectionlessInitialSource; -} - DocumentSource::GetNextResult DocumentSourceCurrentOp::getNext() { pExpCtx->checkForInterrupt(); diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h index 4526134c13f..cd86d027733 100644 --- a/src/mongo/db/pipeline/document_source_current_op.h +++ b/src/mongo/db/pipeline/document_source_current_op.h @@ -78,7 +78,14 @@ public: const char* getSourceName() const final; - InitialSourceType getInitialSourceType() const final; + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.requiresInputDocSource = false; + constraints.isAllowedInsideFacetStage = false; + constraints.isIndependentOfAnyCollection = true; + return constraints; + } static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index b3cdf802d4e..57a41996af9 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -50,8 +50,12 @@ public: return _outputSorts; } Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - InitialSourceType getInitialSourceType() const final { - return InitialSourceType::kInitialSource; + + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.requiresInputDocSource = false; + return constraints; } void detachFromOperationContext() final; diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index 992f3daac11..aba1f5f115c 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -99,13 +99,6 @@ vector<pair<string, vector<BSONObj>>> extractRawPipelines(const BSONElement& ele << ": " << subPipeElem, subPipeElem.type() == BSONType::Object); - auto stageName = subPipeElem.Obj().firstElementFieldName(); - uassert( - 40331, - str::stream() << "specified stage is not allowed to be used within a $facet stage: " - << subPipeElem, - !str::equals(stageName, "$out") && !str::equals(stageName, "$facet")); - rawPipeline.push_back(subPipeElem.embeddedObject()); } @@ -244,16 +237,22 @@ void DocumentSourceFacet::doReattachToOperationContext(OperationContext* opCtx) } } -bool DocumentSourceFacet::needsPrimaryShard() const { - // Currently we don't split $facet to have a merger part and a shards part (see SERVER-24154). - // This means that if any stage in any of the $facet pipelines requires the primary shard, then - // the entire $facet must happen on the merger, and the merger must be the primary shard. +DocumentSource::StageConstraints DocumentSourceFacet::constraints() const { + StageConstraints constraints; + constraints.isAllowedInsideFacetStage = false; // Disallow nested $facets. + for (auto&& facet : _facets) { - if (facet.pipeline->needsPrimaryShardMerger()) { - return true; + for (auto&& nestedStage : facet.pipeline->getSources()) { + if (nestedStage->constraints().mustRunOnPrimaryShardIfSharded) { + // Currently we don't split $facet to have a merger part and a shards part (see + // SERVER-24154). This means that if any stage in any of the $facet pipelines + // requires the primary shard, then the entire $facet must happen on the merger, and + // the merger must be the primary shard. + constraints.mustRunOnPrimaryShardIfSharded = true; + } } } - return false; + return constraints; } DocumentSource::GetDepsReturn DocumentSourceFacet::getDependencies(DepsTracker* deps) const { diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h index f06f73c7b27..9ca2ea149ec 100644 --- a/src/mongo/db/pipeline/document_source_facet.h +++ b/src/mongo/db/pipeline/document_source_facet.h @@ -135,7 +135,7 @@ public: void doInjectMongodInterface(std::shared_ptr<MongodInterface> mongod) final; void doDetachFromOperationContext() final; void doReattachToOperationContext(OperationContext* opCtx) final; - bool needsPrimaryShard() const final; + StageConstraints constraints() const final; protected: void doDispose() final; diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp index a41b3a2b1ce..60aefdead64 100644 --- a/src/mongo/db/pipeline/document_source_facet_test.cpp +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -111,16 +111,6 @@ TEST_F(DocumentSourceFacetTest, ShouldRejectEmptyPipelines) { ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); } -TEST_F(DocumentSourceFacetTest, ShouldRejectFacetsWithStagesThatMustBeTheFirstStage) { - auto ctx = getExpCtx(); - auto spec = BSON("$facet" << BSON("a" << BSON_ARRAY(BSON("$indexStats" << BSONObj())))); - ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); - - spec = BSON("$facet" << BSON( - "a" << BSON_ARRAY(BSON("$limit" << 1) << BSON("$indexStats" << BSONObj())))); - ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); -} - TEST_F(DocumentSourceFacetTest, ShouldSucceedWhenNamespaceIsCollectionless) { auto ctx = getExpCtx(); auto spec = fromjson("{$facet: {a: [{$match: {}}]}}"); @@ -178,9 +168,10 @@ class DocumentSourcePassthrough : public DocumentSourceMock { public: DocumentSourcePassthrough() : DocumentSourceMock({}) {} - // We need this to be false so that it can be used in a $facet stage. - InitialSourceType getInitialSourceType() const final { - return InitialSourceType::kNotInitialSource; + StageConstraints constraints() const override { + StageConstraints constraints; + constraints.isAllowedInsideFacetStage = true; + return constraints; } DocumentSource::GetNextResult getNext() final { @@ -616,8 +607,10 @@ TEST_F(DocumentSourceFacetTest, ShouldThrowIfAnyPipelineRequiresTextScoreButItIs */ class DocumentSourceNeedsPrimaryShard final : public DocumentSourcePassthrough { public: - bool needsPrimaryShard() const final { - return true; + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.mustRunOnPrimaryShardIfSharded = true; + return constraints; } static boost::intrusive_ptr<DocumentSourceNeedsPrimaryShard> create() { @@ -640,7 +633,7 @@ TEST_F(DocumentSourceFacetTest, ShouldRequirePrimaryShardIfAnyStageRequiresPrima facets.emplace_back("needsPrimaryShard", std::move(secondPipeline)); auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); - ASSERT_TRUE(facetStage->needsPrimaryShard()); + ASSERT_TRUE(facetStage->constraints().mustRunOnPrimaryShardIfSharded); } TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPrimaryShard) { @@ -659,7 +652,7 @@ TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPr facets.emplace_back("second", std::move(secondPipeline)); auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); - ASSERT_FALSE(facetStage->needsPrimaryShard()); + ASSERT_FALSE(facetStage->constraints().mustRunOnPrimaryShardIfSharded); } } // namespace diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h index 5178a89c18a..5df25d6c1c6 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.h +++ b/src/mongo/db/pipeline/document_source_geo_near.h @@ -46,9 +46,15 @@ public: */ Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) final; - InitialSourceType getInitialSourceType() const final { - return InitialSourceType::kInitialSource; + + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.requiresInputDocSource = false; + constraints.isAllowedInsideFacetStage = false; + return constraints; } + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; BSONObjSet getOutputSorts() final { return SimpleBSONObjComparator::kInstance.makeBSONObjSet( diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h index 6572fc4741f..489a80ea7b9 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.h +++ b/src/mongo/db/pipeline/document_source_graph_lookup.h @@ -53,8 +53,11 @@ public: */ GetModPathsReturn getModifiedPaths() const final; - bool canSwapWithMatch() const final { - return true; + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.canSwapWithMatch = true; + constraints.mustRunOnPrimaryShardIfSharded = true; + return constraints; } GetDepsReturn getDependencies(DepsTracker* deps) const final { @@ -62,10 +65,6 @@ public: return SEE_NEXT; }; - bool needsPrimaryShard() const final { - return true; - } - void addInvolvedCollections(std::vector<NamespaceString>* collections) const final { collections->push_back(_from); } diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h index 7d25aca6c9f..dd62f5323bd 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.h +++ b/src/mongo/db/pipeline/document_source_index_stats.h @@ -69,8 +69,12 @@ public: const char* getSourceName() const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - virtual InitialSourceType getInitialSourceType() const final { - return InitialSourceType::kInitialSource; + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.requiresInputDocSource = false; + constraints.isAllowedInsideFacetStage = false; + return constraints; } static boost::intrusive_ptr<DocumentSource> createFromBson( diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index bb4fae8df2e..d5bab6fd9e7 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -94,8 +94,11 @@ public: */ GetModPathsReturn getModifiedPaths() const final; - bool canSwapWithMatch() const final { - return true; + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.canSwapWithMatch = true; + constraints.mustRunOnPrimaryShardIfSharded = true; + return constraints; } GetDepsReturn getDependencies(DepsTracker* deps) const final; @@ -104,10 +107,6 @@ public: return DocumentSource::truncateSortSet(pSource->getOutputSorts(), {_as.fullPath()}); } - bool needsPrimaryShard() const final { - return true; - } - boost::intrusive_ptr<DocumentSource> getShardSource() final { return nullptr; } diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h index 0b5800515aa..2be99dd7f12 100644 --- a/src/mongo/db/pipeline/document_source_match.h +++ b/src/mongo/db/pipeline/document_source_match.h @@ -37,18 +37,21 @@ namespace mongo { -class DocumentSourceMatch final : public DocumentSource { +class DocumentSourceMatch : public DocumentSource { public: - // virtuals from DocumentSource + virtual ~DocumentSourceMatch() = default; + GetNextResult getNext() final; - const char* getSourceName() const final; - Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; boost::intrusive_ptr<DocumentSource> optimize() final; BSONObjSet getOutputSorts() final { return pSource ? pSource->getOutputSorts() : SimpleBSONObjComparator::kInstance.makeBSONObjSet(); } + const char* getSourceName() const override; + Value serialize( + boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override; + /** * Attempts to combine with any subsequent $match stages, joining the query objects with a * $and. @@ -141,10 +144,11 @@ public: const std::string& path, const boost::intrusive_ptr<ExpressionContext>& expCtx); -private: +protected: DocumentSourceMatch(const BSONObj& query, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); +private: void addDependencies(DepsTracker* deps) const; std::unique_ptr<MatchExpression> _expression; diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h index ebe0a861ad6..045ddad4836 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.h +++ b/src/mongo/db/pipeline/document_source_merge_cursors.h @@ -52,8 +52,13 @@ public: GetNextResult getNext() final; const char* getSourceName() const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - InitialSourceType getInitialSourceType() const final { - return InitialSourceType::kInitialSource; + + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.requiresInputDocSource = false; + constraints.isAllowedInsideFacetStage = false; + return constraints; } static boost::intrusive_ptr<DocumentSource> createFromBson( diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h index 9236e55ad62..513994cd599 100644 --- a/src/mongo/db/pipeline/document_source_mock.h +++ b/src/mongo/db/pipeline/document_source_mock.h @@ -48,9 +48,15 @@ public: const char* getSourceName() const override; Value serialize( boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override; - InitialSourceType getInitialSourceType() const override { - return InitialSourceType::kInitialSource; + + StageConstraints constraints() const override { + StageConstraints constraints; + constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.requiresInputDocSource = false; + constraints.isAllowedInsideFacetStage = false; + return constraints; } + BSONObjSet getOutputSorts() override { return sorts; } diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 3b134f39d5e..9366d01a670 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -43,8 +43,13 @@ public: const char* getSourceName() const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; GetDepsReturn getDependencies(DepsTracker* deps) const final; - bool needsPrimaryShard() const final { - return true; + + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.mustRunOnPrimaryShardIfSharded = true; + constraints.isAllowedInsideFacetStage = false; + constraints.requiredPosition = StageConstraints::PositionRequirement::kLast; + return constraints; } // Virtuals for SplittableDocumentSource diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index a3bea23795e..26d394d05e9 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -101,8 +101,10 @@ public: DocumentSource::GetDepsReturn getDependencies(DepsTracker* deps) const final; GetModPathsReturn getModifiedPaths() const final; - bool canSwapWithMatch() const final { - return true; + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.canSwapWithMatch = true; + return constraints; } TransformerInterface::TransformerType getType() const { diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index cb1c709ecde..866a3fb955c 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -52,10 +52,12 @@ public: return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}}; } - bool canSwapWithMatch() const final { + StageConstraints constraints() const final { + StageConstraints constraints; // Can't swap with a $match if a limit has been absorbed, since in general match can't swap // with limit. - return !limitSrc; + constraints.canSwapWithMatch = !limitSrc; + return constraints; } BSONObjSet getOutputSorts() final { diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h index 53fbd33d083..fb7bd68ba39 100644 --- a/src/mongo/db/pipeline/document_source_unwind.h +++ b/src/mongo/db/pipeline/document_source_unwind.h @@ -46,8 +46,10 @@ public: */ GetModPathsReturn getModifiedPaths() const final; - bool canSwapWithMatch() const final { - return true; + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.canSwapWithMatch = true; + return constraints; } GetDepsReturn getDependencies(DepsTracker* deps) const final; diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 63dae04e325..31cd015ea97 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -32,6 +32,8 @@ #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/pipeline_optimizations.h" +#include <algorithm> + #include "mongo/base/error_codes.h" #include "mongo/db/bson/dotted_path_support.h" #include "mongo/db/catalog/document_validation.h" @@ -134,14 +136,16 @@ Status Pipeline::validatePipeline() const { // {aggregate: 1} is only valid for collectionless sources, and vice-versa. const auto firstStage = _sources.front().get(); - if (nss.isCollectionlessAggregateNS() && !firstStage->isCollectionlessInitialSource()) { + if (nss.isCollectionlessAggregateNS() && + !firstStage->constraints().isIndependentOfAnyCollection) { return {ErrorCodes::InvalidNamespace, str::stream() << "{aggregate: 1} is not valid for '" << firstStage->getSourceName() << "'; a collection is required."}; } - if (!nss.isCollectionlessAggregateNS() && firstStage->isCollectionlessInitialSource()) { + if (!nss.isCollectionlessAggregateNS() && + firstStage->constraints().isIndependentOfAnyCollection) { return {ErrorCodes::InvalidNamespace, str::stream() << "'" << firstStage->getSourceName() << "' can only be run with {aggregate: 1}"}; @@ -154,11 +158,21 @@ Status Pipeline::validatePipeline() const { Status Pipeline::validateFacetPipeline() const { if (_sources.empty()) { - return {ErrorCodes::BadValue, "sub-pipeline in $facet stage cannot be empty."}; - } else if (_sources.front()->isInitialSource()) { - return {ErrorCodes::BadValue, - str::stream() << _sources.front()->getSourceName() - << " is not allowed to be used within a $facet stage."}; + return {ErrorCodes::BadValue, "sub-pipeline in $facet stage cannot be empty"}; + } + for (auto&& stage : _sources) { + auto stageConstraints = stage->constraints(); + if (!stageConstraints.isAllowedInsideFacetStage) { + return {ErrorCodes::BadValue, + str::stream() << stage->getSourceName() + << " is not allowed to be used within a $facet stage", + 40550}; + } + // We expect a stage within a $facet stage to have these properties. + invariant(stageConstraints.requiresInputDocSource); + invariant(!stageConstraints.isIndependentOfAnyCollection); + invariant(stageConstraints.requiredPosition == + DocumentSource::StageConstraints::PositionRequirement::kNone); } // Facet pipelines cannot have any stages which are initial sources. We've already validated the @@ -170,10 +184,13 @@ Status Pipeline::validateFacetPipeline() const { Status Pipeline::ensureAllStagesAreInLegalPositions() const { size_t i = 0; for (auto&& stage : _sources) { - if (stage->isInitialSource() && i != 0) { + if (stage->constraints().requiredPosition == + DocumentSource::StageConstraints::PositionRequirement::kFirst && + i != 0) { return {ErrorCodes::BadValue, str::stream() << stage->getSourceName() - << " is only valid as the first stage in a pipeline."}; + << " is only valid as the first stage in a pipeline.", + 40549}; } auto matchStage = dynamic_cast<DocumentSourceMatch*>(stage.get()); if (i != 0 && matchStage && matchStage->isTextQuery()) { @@ -182,8 +199,13 @@ Status Pipeline::ensureAllStagesAreInLegalPositions() const { 17313}; } - if (dynamic_cast<DocumentSourceOut*>(stage.get()) && i != _sources.size() - 1) { - return {ErrorCodes::BadValue, "$out can only be the final stage in the pipeline"}; + if (stage->constraints().requiredPosition == + DocumentSource::StageConstraints::PositionRequirement::kLast && + i != _sources.size() - 1) { + return {ErrorCodes::BadValue, + str::stream() << stage->getSourceName() + << " can only be the final stage in the pipeline", + 40551}; } ++i; } @@ -373,12 +395,9 @@ BSONObj Pipeline::getInitialQuery() const { } bool Pipeline::needsPrimaryShardMerger() const { - for (auto&& source : _sources) { - if (source->needsPrimaryShard()) { - return true; - } - } - return false; + return std::any_of(_sources.begin(), _sources.end(), [](const auto& stage) { + return stage->constraints().mustRunOnPrimaryShardIfSharded; + }); } std::vector<NamespaceString> Pipeline::getInvolvedCollections() const { diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 697c639ccf7..f1222797f11 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -454,8 +454,7 @@ void PipelineD::prepareCursorSource(Collection* collection, } } - // If the first stage of the pipeline is an initial source, we don't need an input cursor. - if (!sources.empty() && sources.front()->isInitialSource()) { + if (!sources.empty() && !sources.front()->constraints().requiresInputDocSource) { return; } diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 8dbc02fc88a..1f37b30fc90 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -1349,8 +1349,10 @@ class DocumentSourceCollectionlessMock : public DocumentSourceMock { public: DocumentSourceCollectionlessMock() : DocumentSourceMock({}) {} - InitialSourceType getInitialSourceType() const final { - return InitialSourceType::kCollectionlessInitialSource; + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.isIndependentOfAnyCollection = true; + return constraints; } static boost::intrusive_ptr<DocumentSourceCollectionlessMock> create() { @@ -1403,6 +1405,34 @@ TEST_F(PipelineInitialSourceNSTest, AggregateOneNSValidForFacetPipelineRegardles ASSERT_OK(Pipeline::parseFacetPipeline(rawPipeline, ctx).getStatus()); } +TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsValidAsFirstStage) { + const std::vector<BSONObj> rawPipeline = {fromjson("{$changeNotification: {}}")}; + auto ctx = getExpCtx(); + ctx->ns = NamespaceString("a.collection"); + ASSERT_OK(Pipeline::parse(rawPipeline, ctx).getStatus()); +} + +TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsNotValidIfNotFirstStage) { + const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"), + fromjson("{$changeNotification: {}}")}; + auto ctx = getExpCtx(); + ctx->ns = NamespaceString("a.collection"); + auto parseStatus = Pipeline::parse(rawPipeline, ctx).getStatus(); + ASSERT_EQ(parseStatus, ErrorCodes::BadValue); + ASSERT_EQ(parseStatus.location(), 40549); +} + +TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsNotValidIfNotFirstStageInFacet) { + const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"), + fromjson("{$changeNotification: {}}")}; + auto ctx = getExpCtx(); + ctx->ns = NamespaceString("a.collection"); + auto parseStatus = Pipeline::parseFacetPipeline(rawPipeline, ctx).getStatus(); + ASSERT_EQ(parseStatus, ErrorCodes::BadValue); + ASSERT_EQ(parseStatus.location(), 40550); + ASSERT(std::string::npos != parseStatus.reason().find("$changeNotification")); +} + } // namespace Namespaces namespace Dependencies { @@ -1430,8 +1460,8 @@ class DocumentSourceDependencyDummy : public DocumentSourceMock { public: DocumentSourceDependencyDummy() : DocumentSourceMock({}) {} - InitialSourceType getInitialSourceType() const final { - return InitialSourceType::kNotInitialSource; + StageConstraints constraints() const final { + return StageConstraints{}; // Overrides DocumentSourceMock's required position. } }; diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index ddee4d2960a..e21f8b2523c 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -219,20 +219,15 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, mergeCtx->inRouter = true; // explicitly *not* setting mergeCtx->tempDir - // Parse and optimize the pipeline specification. - auto pipeline = Pipeline::parse(request.getPipeline(), mergeCtx); - if (!pipeline.isOK()) { - return pipeline.getStatus(); - } - - pipeline.getValue()->optimizePipeline(); + auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), mergeCtx)); + pipeline->optimizePipeline(); // If the first $match stage is an exact match on the shard key (with a simple collation or no // string matching), we only have to send it to one shard, so send the command to that shard. const bool singleShard = !namespaces.executionNss.isCollectionlessAggregateNS() && [&]() { invariant(chunkMgr); - BSONObj firstMatchQuery = pipeline.getValue()->getInitialQuery(); + BSONObj firstMatchQuery = pipeline->getInitialQuery(); BSONObj shardKeyMatches = uassertStatusOK( chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(opCtx, firstMatchQuery)); @@ -250,7 +245,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // Don't need to split pipeline if the first $match is an exact match on shard key, unless // there is a stage that needs to be run on the primary shard. - const bool needPrimaryShardMerger = pipeline.getValue()->needsPrimaryShardMerger(); + const bool needPrimaryShardMerger = pipeline->needsPrimaryShardMerger(); const bool needSplit = !singleShard || needPrimaryShardMerger; // Split the pipeline into pieces for mongod(s) and this mongos. It is illegal to use 'pipeline' @@ -258,10 +253,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForTargetedShards; std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMergingShard; if (needSplit) { - pipelineForTargetedShards = pipeline.getValue()->splitForSharded(); - pipelineForMergingShard = std::move(pipeline.getValue()); + pipelineForTargetedShards = pipeline->splitForSharded(); + pipelineForMergingShard = std::move(pipeline); } else { - pipelineForTargetedShards = std::move(pipeline.getValue()); + pipelineForTargetedShards = std::move(pipeline); } // Create the command for the shards. The 'fromRouter' field means produce output to be |