summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2017-07-12 11:43:48 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2017-08-01 17:16:14 -0400
commit2431e1356823d898ef8af16997d6f63b65b385a5 (patch)
tree6df997de8e8e0378d4a2d711fce1d28be772293b /src
parent718cf09aa21b36e9436a675c8645770826078da7 (diff)
downloadmongo-2431e1356823d898ef8af16997d6f63b65b385a5.tar.gz
SERVER-29506 Require $changeNotification to be the first stage.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp11
-rw-r--r--src/mongo/db/pipeline/document_source.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source.h93
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification.cpp62
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification.h2
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification_test.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.h8
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.h9
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.h8
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp27
-rw-r--r--src/mongo/db/pipeline/document_source_facet.h2
-rw-r--r--src/mongo/db/pipeline/document_source_facet_test.cpp27
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.h10
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.h11
-rw-r--r--src/mongo/db/pipeline/document_source_index_stats.h8
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h11
-rw-r--r--src/mongo/db/pipeline/document_source_match.h14
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.h9
-rw-r--r--src/mongo/db/pipeline/document_source_mock.h10
-rw-r--r--src/mongo/db/pipeline/document_source_out.h9
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.h6
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h6
-rw-r--r--src/mongo/db/pipeline/document_source_unwind.h6
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp53
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp3
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp38
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp19
29 files changed, 292 insertions, 183 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 3d6b8a25d16..d77c3a51a23 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -400,15 +400,10 @@ Status runAggregate(OperationContext* opCtx,
expCtx->tailableMode = ExpressionContext::TailableMode::kTailableAndAwaitData;
}
- // Parse the pipeline.
- auto statusWithPipeline = Pipeline::parse(request.getPipeline(), expCtx);
- if (!statusWithPipeline.isOK()) {
- return statusWithPipeline.getStatus();
- }
- auto pipeline = std::move(statusWithPipeline.getValue());
+ auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx));
- // Check that the view's collation matches the collation of any views involved
- // in the pipeline.
+ // Check that the view's collation matches the collation of any views involved in the
+ // pipeline.
if (!pipelineInvolvedNamespaces.empty()) {
invariant(ctx);
auto pipelineCollationStatus = collatorCompatibleWithPipeline(
diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp
index 9651c7cb8e8..6e58adae2bf 100644
--- a/src/mongo/db/pipeline/document_source.cpp
+++ b/src/mongo/db/pipeline/document_source.cpp
@@ -164,7 +164,7 @@ Pipeline::SourceContainer::iterator DocumentSource::optimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
invariant(*itr == this && (std::next(itr) != container->end()));
auto nextMatch = dynamic_cast<DocumentSourceMatch*>((*std::next(itr)).get());
- if (canSwapWithMatch() && nextMatch && !nextMatch->isTextQuery()) {
+ if (constraints().canSwapWithMatch && nextMatch && !nextMatch->isTextQuery()) {
// We're allowed to swap with a $match and the stage after us is a $match. Furthermore, the
// $match does not contain a text search predicate, which we do not attempt to optimize
// because such a $match must already be the first stage in the pipeline. We can attempt to
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 24f67e46a29..2545f02d8de 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -118,14 +118,39 @@ public:
using Parser = stdx::function<std::vector<boost::intrusive_ptr<DocumentSource>>(
BSONElement, const boost::intrusive_ptr<ExpressionContext>&)>;
- enum class InitialSourceType {
- // Stage requires input from a preceding DocumentSource.
- kNotInitialSource,
- // Stage does not need an input source and should be the first stage in the pipeline.
- kInitialSource,
- // Similar to kInitialSource, but does not require an underlying collection to produce
- // output.
- kCollectionlessInitialSource
+ /**
+ * A struct describing various constraints about where this stage can run, where it must be in
+ * the pipeline, and things like that.
+ */
+ struct StageConstraints {
+ /**
+ * A Position describes a requirement of the position of the stage within the pipeline.
+ */
+ enum class PositionRequirement { kNone, kFirst, kLast };
+
+ // Set if this stage needs to be in a particular position of the pipeline.
+ PositionRequirement requiredPosition = PositionRequirement::kNone;
+
+ bool isAllowedInsideFacetStage = true;
+
+ // True if this stage does not generate results itself, and instead pulls inputs from an
+ // input DocumentSource (via 'pSource').
+ bool requiresInputDocSource = true;
+
+ // True if this stage operates on a global or database level, like $currentOp.
+ bool isIndependentOfAnyCollection = false;
+
+ // True if this stage can ever be safely swapped with a subsequent $match stage, provided
+ // that the match does not depend on the paths returned by getModifiedPaths().
+ //
+ // Stages that want to participate in match swapping should set this to true. Such a stage
+ // must also override getModifiedPaths() to provide information about which particular
+ // $match predicates be swapped before itself.
+ bool canSwapWithMatch = false;
+
+ // True if this stage must run on the primary shard when the collection being aggregated is
+ // sharded.
+ bool mustRunOnPrimaryShardIfSharded = false;
};
/**
@@ -219,6 +244,14 @@ public:
virtual GetNextResult getNext() = 0;
/**
+ * Returns a struct containing information about any special constraints imposed on using this
+ * stage.
+ */
+ virtual StageConstraints constraints() const {
+ return StageConstraints{};
+ }
+
+ /**
* Informs the stage that it is no longer needed and can release its resources. After dispose()
* is called the stage must still be able to handle calls to getNext(), but can return kEOF.
*
@@ -259,38 +292,6 @@ public:
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const;
/**
- * Subclasses should return InitialSourceType::kInitialSource if the stage does not require an
- * input source, or InitialSourceType::kCollectionlessInitialSource if the stage will produce
- * the input for the pipeline independent of an underlying collection. The latter are specified
- * with {aggregate: 1}, e.g. $currentOp.
- */
- virtual InitialSourceType getInitialSourceType() const {
- return InitialSourceType::kNotInitialSource;
- }
-
- /**
- * Returns true if this stage does not require an input source.
- */
- bool isInitialSource() const {
- return getInitialSourceType() != InitialSourceType::kNotInitialSource;
- }
-
- /**
- * Returns true if this stage will produce the input for the pipeline independent of an
- * underlying collection. These are specified with {aggregate: 1}, e.g. $currentOp.
- */
- bool isCollectionlessInitialSource() const {
- return getInitialSourceType() == InitialSourceType::kCollectionlessInitialSource;
- }
-
- /**
- * Returns true if the DocumentSource needs to be run on the primary shard.
- */
- virtual bool needsPrimaryShard() const {
- return false;
- }
-
- /**
* If DocumentSource uses additional collections, it adds the namespaces to the input vector.
*/
virtual void addInvolvedCollections(std::vector<NamespaceString>* collections) const {}
@@ -426,18 +427,6 @@ public:
return {GetModPathsReturn::Type::kNotSupported, std::set<std::string>{}, {}};
}
- /**
- * Returns whether this stage can swap with a subsequent $match stage, provided that the match
- * does not depend on the paths returned by getModifiedPaths().
- *
- * Subclasses which want to participate in match swapping should override this to return true.
- * Such a subclass must also override getModifiedPaths() to provide information about which
- * $match predicates be swapped before itself.
- */
- virtual bool canSwapWithMatch() const {
- return false;
- }
-
enum GetDepsReturn {
// The full object and all metadata may be required.
NOT_SUPPORTED = 0x0,
diff --git a/src/mongo/db/pipeline/document_source_change_notification.cpp b/src/mongo/db/pipeline/document_source_change_notification.cpp
index b8d7027fc5f..dff06f05802 100644
--- a/src/mongo/db/pipeline/document_source_change_notification.cpp
+++ b/src/mongo/db/pipeline/document_source_change_notification.cpp
@@ -40,6 +40,7 @@
namespace mongo {
using boost::intrusive_ptr;
+using boost::optional;
using std::vector;
using std::string;
@@ -53,7 +54,52 @@ REGISTER_MULTI_STAGE_ALIAS(changeNotification,
DocumentSourceChangeNotification::LiteParsed::parse,
DocumentSourceChangeNotification::createFromBson);
-BSONObj DocumentSourceChangeNotification::buildMatch(BSONElement elem, const NamespaceString& nss) {
+namespace {
+
+static constexpr StringData kOplogMatchExplainName = "$_internalOplogMatch"_sd;
+
+/**
+ * A custom subclass of DocumentSourceMatch which does not serialize itself (since it came from an
+ * alias) and requires itself to be the first stage in the pipeline.
+ */
+class DocumentSourceOplogMatch final : public DocumentSourceMatch {
+public:
+ static intrusive_ptr<DocumentSourceOplogMatch> create(
+ BSONObj filter, const intrusive_ptr<ExpressionContext>& expCtx) {
+ return new DocumentSourceOplogMatch(std::move(filter), expCtx);
+ }
+
+ const char* getSourceName() const final {
+ // This is used in error reporting, particularly if we find this stage in a position other
+ // than first, so report the name as $changeNotification.
+ return "$changeNotification";
+ }
+
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.isAllowedInsideFacetStage = false;
+ return constraints;
+ }
+
+ /**
+ * Only serialize this stage for explain purposes, otherwise keep it hidden so that we can
+ * properly alias.
+ */
+ Value serialize(optional<ExplainOptions::Verbosity> explain) const final {
+ if (explain) {
+ return Value(Document{{kOplogMatchExplainName, Document{}}});
+ }
+ return Value();
+ }
+
+private:
+ DocumentSourceOplogMatch(BSONObj filter, const intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSourceMatch(std::move(filter), expCtx) {}
+};
+} // namespace
+
+BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString& nss) {
auto target = nss.ns();
// 1) Supported commands that have the target db namespace (e.g. test.$cmd) in "ns" field.
@@ -78,9 +124,7 @@ BSONObj DocumentSourceChangeNotification::buildMatch(BSONElement elem, const Nam
auto opMatch = BSON("ns" << target);
// Match oplog entries after "start" and are either (3) supported commands or (4) CRUD ops.
- auto matchFilter =
- BSON("ts" << GT << Timestamp() << "$or" << BSON_ARRAY(opMatch << commandMatch));
- return BSON("$match" << matchFilter);
+ return BSON("ts" << GT << Timestamp() << "$or" << BSON_ARRAY(opMatch << commandMatch));
}
vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFromBson(
@@ -93,11 +137,11 @@ vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFr
"Only default collation is allowed when using a $changeNotification stage.",
!expCtx->getCollator());
- BSONObj matchObj = buildMatch(elem, expCtx->ns);
+ BSONObj filter = buildMatchFilter(expCtx->ns);
- auto matchSource = DocumentSourceMatch::createFromBson(matchObj.firstElement(), expCtx);
- auto transformSource = createTransformationStage(expCtx);
- return {matchSource, transformSource};
+ auto oplogMatch = DocumentSourceOplogMatch::create(filter, expCtx);
+ auto transformation = createTransformationStage(expCtx);
+ return {oplogMatch, transformation};
}
intrusive_ptr<DocumentSource> DocumentSourceChangeNotification::createTransformationStage(
@@ -194,6 +238,8 @@ Document DocumentSourceChangeNotification::Transformation::applyTransformation(
Document DocumentSourceChangeNotification::Transformation::serializeStageOptions(
boost::optional<ExplainOptions::Verbosity> explain) const {
+ // TODO SERVER-29135 Be sure to re-serialize the 'postImage' argument.
+ // TODO SERVER-29131 Be sure to re-serialize the 'resumeAfter' argument.
return Document();
}
diff --git a/src/mongo/db/pipeline/document_source_change_notification.h b/src/mongo/db/pipeline/document_source_change_notification.h
index e5a1186f04e..3b38a908626 100644
--- a/src/mongo/db/pipeline/document_source_change_notification.h
+++ b/src/mongo/db/pipeline/document_source_change_notification.h
@@ -77,7 +77,7 @@ public:
/**
* Produce the BSON for the $match stage based on a $changeNotification stage.
*/
- static BSONObj buildMatch(BSONElement elem, const NamespaceString& nss);
+ static BSONObj buildMatchFilter(const NamespaceString& nss);
/**
* Parses a $changeNotification stage from 'elem' and produces the $match and transformation
diff --git a/src/mongo/db/pipeline/document_source_change_notification_test.cpp b/src/mongo/db/pipeline/document_source_change_notification_test.cpp
index 0fd14fd9a92..b3595789577 100644
--- a/src/mongo/db/pipeline/document_source_change_notification_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_notification_test.cpp
@@ -101,7 +101,8 @@ TEST_F(ChangeNotificationStageTest, StagesGeneratedCorrectly) {
DocumentSourceChangeNotification::createFromBson(spec.firstElement(), getExpCtx());
ASSERT_EQUALS(result.size(), 2UL);
- ASSERT_EQUALS(string(result[0]->getSourceName()), "$match");
+ ASSERT_TRUE(dynamic_cast<DocumentSourceMatch*>(result[0].get()));
+ ASSERT_EQUALS(string(result[0]->getSourceName()), "$changeNotification");
ASSERT_EQUALS(string(result[1]->getSourceName()), "$changeNotification");
// TODO: Check explain result.
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.cpp b/src/mongo/db/pipeline/document_source_coll_stats.cpp
index 7f025a29606..73a182145ac 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.cpp
+++ b/src/mongo/db/pipeline/document_source_coll_stats.cpp
@@ -148,10 +148,6 @@ DocumentSource::GetNextResult DocumentSourceCollStats::getNext() {
return {Document(builder.obj())};
}
-DocumentSource::InitialSourceType DocumentSourceCollStats::getInitialSourceType() const {
- return InitialSourceType::kInitialSource;
-}
-
Value DocumentSourceCollStats::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
return Value(Document{{getSourceName(), _collStatsSpec}});
}
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h
index 2506c1be823..6334bbe55c8 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.h
+++ b/src/mongo/db/pipeline/document_source_coll_stats.h
@@ -74,7 +74,13 @@ public:
const char* getSourceName() const final;
- InitialSourceType getInitialSourceType() const final;
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.requiresInputDocSource = false;
+ constraints.isAllowedInsideFacetStage = false;
+ return constraints;
+ }
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
diff --git a/src/mongo/db/pipeline/document_source_current_op.cpp b/src/mongo/db/pipeline/document_source_current_op.cpp
index 49f3dfc9d70..674d0026ad5 100644
--- a/src/mongo/db/pipeline/document_source_current_op.cpp
+++ b/src/mongo/db/pipeline/document_source_current_op.cpp
@@ -86,10 +86,6 @@ const char* DocumentSourceCurrentOp::getSourceName() const {
return "$currentOp";
}
-DocumentSource::InitialSourceType DocumentSourceCurrentOp::getInitialSourceType() const {
- return InitialSourceType::kCollectionlessInitialSource;
-}
-
DocumentSource::GetNextResult DocumentSourceCurrentOp::getNext() {
pExpCtx->checkForInterrupt();
diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h
index 4526134c13f..cd86d027733 100644
--- a/src/mongo/db/pipeline/document_source_current_op.h
+++ b/src/mongo/db/pipeline/document_source_current_op.h
@@ -78,7 +78,14 @@ public:
const char* getSourceName() const final;
- InitialSourceType getInitialSourceType() const final;
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.requiresInputDocSource = false;
+ constraints.isAllowedInsideFacetStage = false;
+ constraints.isIndependentOfAnyCollection = true;
+ return constraints;
+ }
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h
index b3cdf802d4e..57a41996af9 100644
--- a/src/mongo/db/pipeline/document_source_cursor.h
+++ b/src/mongo/db/pipeline/document_source_cursor.h
@@ -50,8 +50,12 @@ public:
return _outputSorts;
}
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
- InitialSourceType getInitialSourceType() const final {
- return InitialSourceType::kInitialSource;
+
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.requiresInputDocSource = false;
+ return constraints;
}
void detachFromOperationContext() final;
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index 992f3daac11..aba1f5f115c 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -99,13 +99,6 @@ vector<pair<string, vector<BSONObj>>> extractRawPipelines(const BSONElement& ele
<< ": "
<< subPipeElem,
subPipeElem.type() == BSONType::Object);
- auto stageName = subPipeElem.Obj().firstElementFieldName();
- uassert(
- 40331,
- str::stream() << "specified stage is not allowed to be used within a $facet stage: "
- << subPipeElem,
- !str::equals(stageName, "$out") && !str::equals(stageName, "$facet"));
-
rawPipeline.push_back(subPipeElem.embeddedObject());
}
@@ -244,16 +237,22 @@ void DocumentSourceFacet::doReattachToOperationContext(OperationContext* opCtx)
}
}
-bool DocumentSourceFacet::needsPrimaryShard() const {
- // Currently we don't split $facet to have a merger part and a shards part (see SERVER-24154).
- // This means that if any stage in any of the $facet pipelines requires the primary shard, then
- // the entire $facet must happen on the merger, and the merger must be the primary shard.
+DocumentSource::StageConstraints DocumentSourceFacet::constraints() const {
+ StageConstraints constraints;
+ constraints.isAllowedInsideFacetStage = false; // Disallow nested $facets.
+
for (auto&& facet : _facets) {
- if (facet.pipeline->needsPrimaryShardMerger()) {
- return true;
+ for (auto&& nestedStage : facet.pipeline->getSources()) {
+ if (nestedStage->constraints().mustRunOnPrimaryShardIfSharded) {
+ // Currently we don't split $facet to have a merger part and a shards part (see
+ // SERVER-24154). This means that if any stage in any of the $facet pipelines
+ // requires the primary shard, then the entire $facet must happen on the merger, and
+ // the merger must be the primary shard.
+ constraints.mustRunOnPrimaryShardIfSharded = true;
+ }
}
}
- return false;
+ return constraints;
}
DocumentSource::GetDepsReturn DocumentSourceFacet::getDependencies(DepsTracker* deps) const {
diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h
index f06f73c7b27..9ca2ea149ec 100644
--- a/src/mongo/db/pipeline/document_source_facet.h
+++ b/src/mongo/db/pipeline/document_source_facet.h
@@ -135,7 +135,7 @@ public:
void doInjectMongodInterface(std::shared_ptr<MongodInterface> mongod) final;
void doDetachFromOperationContext() final;
void doReattachToOperationContext(OperationContext* opCtx) final;
- bool needsPrimaryShard() const final;
+ StageConstraints constraints() const final;
protected:
void doDispose() final;
diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp
index a41b3a2b1ce..60aefdead64 100644
--- a/src/mongo/db/pipeline/document_source_facet_test.cpp
+++ b/src/mongo/db/pipeline/document_source_facet_test.cpp
@@ -111,16 +111,6 @@ TEST_F(DocumentSourceFacetTest, ShouldRejectEmptyPipelines) {
ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
}
-TEST_F(DocumentSourceFacetTest, ShouldRejectFacetsWithStagesThatMustBeTheFirstStage) {
- auto ctx = getExpCtx();
- auto spec = BSON("$facet" << BSON("a" << BSON_ARRAY(BSON("$indexStats" << BSONObj()))));
- ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
-
- spec = BSON("$facet" << BSON(
- "a" << BSON_ARRAY(BSON("$limit" << 1) << BSON("$indexStats" << BSONObj()))));
- ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
-}
-
TEST_F(DocumentSourceFacetTest, ShouldSucceedWhenNamespaceIsCollectionless) {
auto ctx = getExpCtx();
auto spec = fromjson("{$facet: {a: [{$match: {}}]}}");
@@ -178,9 +168,10 @@ class DocumentSourcePassthrough : public DocumentSourceMock {
public:
DocumentSourcePassthrough() : DocumentSourceMock({}) {}
- // We need this to be false so that it can be used in a $facet stage.
- InitialSourceType getInitialSourceType() const final {
- return InitialSourceType::kNotInitialSource;
+ StageConstraints constraints() const override {
+ StageConstraints constraints;
+ constraints.isAllowedInsideFacetStage = true;
+ return constraints;
}
DocumentSource::GetNextResult getNext() final {
@@ -616,8 +607,10 @@ TEST_F(DocumentSourceFacetTest, ShouldThrowIfAnyPipelineRequiresTextScoreButItIs
*/
class DocumentSourceNeedsPrimaryShard final : public DocumentSourcePassthrough {
public:
- bool needsPrimaryShard() const final {
- return true;
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.mustRunOnPrimaryShardIfSharded = true;
+ return constraints;
}
static boost::intrusive_ptr<DocumentSourceNeedsPrimaryShard> create() {
@@ -640,7 +633,7 @@ TEST_F(DocumentSourceFacetTest, ShouldRequirePrimaryShardIfAnyStageRequiresPrima
facets.emplace_back("needsPrimaryShard", std::move(secondPipeline));
auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
- ASSERT_TRUE(facetStage->needsPrimaryShard());
+ ASSERT_TRUE(facetStage->constraints().mustRunOnPrimaryShardIfSharded);
}
TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPrimaryShard) {
@@ -659,7 +652,7 @@ TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPr
facets.emplace_back("second", std::move(secondPipeline));
auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
- ASSERT_FALSE(facetStage->needsPrimaryShard());
+ ASSERT_FALSE(facetStage->constraints().mustRunOnPrimaryShardIfSharded);
}
} // namespace
diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h
index 5178a89c18a..5df25d6c1c6 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.h
+++ b/src/mongo/db/pipeline/document_source_geo_near.h
@@ -46,9 +46,15 @@ public:
*/
Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
Pipeline::SourceContainer* container) final;
- InitialSourceType getInitialSourceType() const final {
- return InitialSourceType::kInitialSource;
+
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.requiresInputDocSource = false;
+ constraints.isAllowedInsideFacetStage = false;
+ return constraints;
}
+
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
BSONObjSet getOutputSorts() final {
return SimpleBSONObjComparator::kInstance.makeBSONObjSet(
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h
index 6572fc4741f..489a80ea7b9 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.h
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.h
@@ -53,8 +53,11 @@ public:
*/
GetModPathsReturn getModifiedPaths() const final;
- bool canSwapWithMatch() const final {
- return true;
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.canSwapWithMatch = true;
+ constraints.mustRunOnPrimaryShardIfSharded = true;
+ return constraints;
}
GetDepsReturn getDependencies(DepsTracker* deps) const final {
@@ -62,10 +65,6 @@ public:
return SEE_NEXT;
};
- bool needsPrimaryShard() const final {
- return true;
- }
-
void addInvolvedCollections(std::vector<NamespaceString>* collections) const final {
collections->push_back(_from);
}
diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h
index 7d25aca6c9f..dd62f5323bd 100644
--- a/src/mongo/db/pipeline/document_source_index_stats.h
+++ b/src/mongo/db/pipeline/document_source_index_stats.h
@@ -69,8 +69,12 @@ public:
const char* getSourceName() const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
- virtual InitialSourceType getInitialSourceType() const final {
- return InitialSourceType::kInitialSource;
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.requiresInputDocSource = false;
+ constraints.isAllowedInsideFacetStage = false;
+ return constraints;
}
static boost::intrusive_ptr<DocumentSource> createFromBson(
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index bb4fae8df2e..d5bab6fd9e7 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -94,8 +94,11 @@ public:
*/
GetModPathsReturn getModifiedPaths() const final;
- bool canSwapWithMatch() const final {
- return true;
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.canSwapWithMatch = true;
+ constraints.mustRunOnPrimaryShardIfSharded = true;
+ return constraints;
}
GetDepsReturn getDependencies(DepsTracker* deps) const final;
@@ -104,10 +107,6 @@ public:
return DocumentSource::truncateSortSet(pSource->getOutputSorts(), {_as.fullPath()});
}
- bool needsPrimaryShard() const final {
- return true;
- }
-
boost::intrusive_ptr<DocumentSource> getShardSource() final {
return nullptr;
}
diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h
index 0b5800515aa..2be99dd7f12 100644
--- a/src/mongo/db/pipeline/document_source_match.h
+++ b/src/mongo/db/pipeline/document_source_match.h
@@ -37,18 +37,21 @@
namespace mongo {
-class DocumentSourceMatch final : public DocumentSource {
+class DocumentSourceMatch : public DocumentSource {
public:
- // virtuals from DocumentSource
+ virtual ~DocumentSourceMatch() = default;
+
GetNextResult getNext() final;
- const char* getSourceName() const final;
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
boost::intrusive_ptr<DocumentSource> optimize() final;
BSONObjSet getOutputSorts() final {
return pSource ? pSource->getOutputSorts()
: SimpleBSONObjComparator::kInstance.makeBSONObjSet();
}
+ const char* getSourceName() const override;
+ Value serialize(
+ boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override;
+
/**
* Attempts to combine with any subsequent $match stages, joining the query objects with a
* $and.
@@ -141,10 +144,11 @@ public:
const std::string& path,
const boost::intrusive_ptr<ExpressionContext>& expCtx);
-private:
+protected:
DocumentSourceMatch(const BSONObj& query,
const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+private:
void addDependencies(DepsTracker* deps) const;
std::unique_ptr<MatchExpression> _expression;
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h
index ebe0a861ad6..045ddad4836 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.h
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.h
@@ -52,8 +52,13 @@ public:
GetNextResult getNext() final;
const char* getSourceName() const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
- InitialSourceType getInitialSourceType() const final {
- return InitialSourceType::kInitialSource;
+
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.requiresInputDocSource = false;
+ constraints.isAllowedInsideFacetStage = false;
+ return constraints;
}
static boost::intrusive_ptr<DocumentSource> createFromBson(
diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h
index 9236e55ad62..513994cd599 100644
--- a/src/mongo/db/pipeline/document_source_mock.h
+++ b/src/mongo/db/pipeline/document_source_mock.h
@@ -48,9 +48,15 @@ public:
const char* getSourceName() const override;
Value serialize(
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override;
- InitialSourceType getInitialSourceType() const override {
- return InitialSourceType::kInitialSource;
+
+ StageConstraints constraints() const override {
+ StageConstraints constraints;
+ constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.requiresInputDocSource = false;
+ constraints.isAllowedInsideFacetStage = false;
+ return constraints;
}
+
BSONObjSet getOutputSorts() override {
return sorts;
}
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 3b134f39d5e..9366d01a670 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -43,8 +43,13 @@ public:
const char* getSourceName() const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
GetDepsReturn getDependencies(DepsTracker* deps) const final;
- bool needsPrimaryShard() const final {
- return true;
+
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.mustRunOnPrimaryShardIfSharded = true;
+ constraints.isAllowedInsideFacetStage = false;
+ constraints.requiredPosition = StageConstraints::PositionRequirement::kLast;
+ return constraints;
}
// Virtuals for SplittableDocumentSource
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h
index a3bea23795e..26d394d05e9 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.h
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h
@@ -101,8 +101,10 @@ public:
DocumentSource::GetDepsReturn getDependencies(DepsTracker* deps) const final;
GetModPathsReturn getModifiedPaths() const final;
- bool canSwapWithMatch() const final {
- return true;
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.canSwapWithMatch = true;
+ return constraints;
}
TransformerInterface::TransformerType getType() const {
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index cb1c709ecde..866a3fb955c 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -52,10 +52,12 @@ public:
return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}};
}
- bool canSwapWithMatch() const final {
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
// Can't swap with a $match if a limit has been absorbed, since in general match can't swap
// with limit.
- return !limitSrc;
+ constraints.canSwapWithMatch = !limitSrc;
+ return constraints;
}
BSONObjSet getOutputSorts() final {
diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h
index 53fbd33d083..fb7bd68ba39 100644
--- a/src/mongo/db/pipeline/document_source_unwind.h
+++ b/src/mongo/db/pipeline/document_source_unwind.h
@@ -46,8 +46,10 @@ public:
*/
GetModPathsReturn getModifiedPaths() const final;
- bool canSwapWithMatch() const final {
- return true;
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.canSwapWithMatch = true;
+ return constraints;
}
GetDepsReturn getDependencies(DepsTracker* deps) const final;
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 63dae04e325..31cd015ea97 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -32,6 +32,8 @@
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/pipeline_optimizations.h"
+#include <algorithm>
+
#include "mongo/base/error_codes.h"
#include "mongo/db/bson/dotted_path_support.h"
#include "mongo/db/catalog/document_validation.h"
@@ -134,14 +136,16 @@ Status Pipeline::validatePipeline() const {
// {aggregate: 1} is only valid for collectionless sources, and vice-versa.
const auto firstStage = _sources.front().get();
- if (nss.isCollectionlessAggregateNS() && !firstStage->isCollectionlessInitialSource()) {
+ if (nss.isCollectionlessAggregateNS() &&
+ !firstStage->constraints().isIndependentOfAnyCollection) {
return {ErrorCodes::InvalidNamespace,
str::stream() << "{aggregate: 1} is not valid for '"
<< firstStage->getSourceName()
<< "'; a collection is required."};
}
- if (!nss.isCollectionlessAggregateNS() && firstStage->isCollectionlessInitialSource()) {
+ if (!nss.isCollectionlessAggregateNS() &&
+ firstStage->constraints().isIndependentOfAnyCollection) {
return {ErrorCodes::InvalidNamespace,
str::stream() << "'" << firstStage->getSourceName()
<< "' can only be run with {aggregate: 1}"};
@@ -154,11 +158,21 @@ Status Pipeline::validatePipeline() const {
Status Pipeline::validateFacetPipeline() const {
if (_sources.empty()) {
- return {ErrorCodes::BadValue, "sub-pipeline in $facet stage cannot be empty."};
- } else if (_sources.front()->isInitialSource()) {
- return {ErrorCodes::BadValue,
- str::stream() << _sources.front()->getSourceName()
- << " is not allowed to be used within a $facet stage."};
+ return {ErrorCodes::BadValue, "sub-pipeline in $facet stage cannot be empty"};
+ }
+ for (auto&& stage : _sources) {
+ auto stageConstraints = stage->constraints();
+ if (!stageConstraints.isAllowedInsideFacetStage) {
+ return {ErrorCodes::BadValue,
+ str::stream() << stage->getSourceName()
+ << " is not allowed to be used within a $facet stage",
+ 40550};
+ }
+ // We expect a stage within a $facet stage to have these properties.
+ invariant(stageConstraints.requiresInputDocSource);
+ invariant(!stageConstraints.isIndependentOfAnyCollection);
+ invariant(stageConstraints.requiredPosition ==
+ DocumentSource::StageConstraints::PositionRequirement::kNone);
}
// Facet pipelines cannot have any stages which are initial sources. We've already validated the
@@ -170,10 +184,13 @@ Status Pipeline::validateFacetPipeline() const {
Status Pipeline::ensureAllStagesAreInLegalPositions() const {
size_t i = 0;
for (auto&& stage : _sources) {
- if (stage->isInitialSource() && i != 0) {
+ if (stage->constraints().requiredPosition ==
+ DocumentSource::StageConstraints::PositionRequirement::kFirst &&
+ i != 0) {
return {ErrorCodes::BadValue,
str::stream() << stage->getSourceName()
- << " is only valid as the first stage in a pipeline."};
+ << " is only valid as the first stage in a pipeline.",
+ 40549};
}
auto matchStage = dynamic_cast<DocumentSourceMatch*>(stage.get());
if (i != 0 && matchStage && matchStage->isTextQuery()) {
@@ -182,8 +199,13 @@ Status Pipeline::ensureAllStagesAreInLegalPositions() const {
17313};
}
- if (dynamic_cast<DocumentSourceOut*>(stage.get()) && i != _sources.size() - 1) {
- return {ErrorCodes::BadValue, "$out can only be the final stage in the pipeline"};
+ if (stage->constraints().requiredPosition ==
+ DocumentSource::StageConstraints::PositionRequirement::kLast &&
+ i != _sources.size() - 1) {
+ return {ErrorCodes::BadValue,
+ str::stream() << stage->getSourceName()
+ << " can only be the final stage in the pipeline",
+ 40551};
}
++i;
}
@@ -373,12 +395,9 @@ BSONObj Pipeline::getInitialQuery() const {
}
bool Pipeline::needsPrimaryShardMerger() const {
- for (auto&& source : _sources) {
- if (source->needsPrimaryShard()) {
- return true;
- }
- }
- return false;
+ return std::any_of(_sources.begin(), _sources.end(), [](const auto& stage) {
+ return stage->constraints().mustRunOnPrimaryShardIfSharded;
+ });
}
std::vector<NamespaceString> Pipeline::getInvolvedCollections() const {
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 697c639ccf7..f1222797f11 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -454,8 +454,7 @@ void PipelineD::prepareCursorSource(Collection* collection,
}
}
- // If the first stage of the pipeline is an initial source, we don't need an input cursor.
- if (!sources.empty() && sources.front()->isInitialSource()) {
+ if (!sources.empty() && !sources.front()->constraints().requiresInputDocSource) {
return;
}
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 8dbc02fc88a..1f37b30fc90 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -1349,8 +1349,10 @@ class DocumentSourceCollectionlessMock : public DocumentSourceMock {
public:
DocumentSourceCollectionlessMock() : DocumentSourceMock({}) {}
- InitialSourceType getInitialSourceType() const final {
- return InitialSourceType::kCollectionlessInitialSource;
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.isIndependentOfAnyCollection = true;
+ return constraints;
}
static boost::intrusive_ptr<DocumentSourceCollectionlessMock> create() {
@@ -1403,6 +1405,34 @@ TEST_F(PipelineInitialSourceNSTest, AggregateOneNSValidForFacetPipelineRegardles
ASSERT_OK(Pipeline::parseFacetPipeline(rawPipeline, ctx).getStatus());
}
+TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsValidAsFirstStage) {
+ const std::vector<BSONObj> rawPipeline = {fromjson("{$changeNotification: {}}")};
+ auto ctx = getExpCtx();
+ ctx->ns = NamespaceString("a.collection");
+ ASSERT_OK(Pipeline::parse(rawPipeline, ctx).getStatus());
+}
+
+TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsNotValidIfNotFirstStage) {
+ const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"),
+ fromjson("{$changeNotification: {}}")};
+ auto ctx = getExpCtx();
+ ctx->ns = NamespaceString("a.collection");
+ auto parseStatus = Pipeline::parse(rawPipeline, ctx).getStatus();
+ ASSERT_EQ(parseStatus, ErrorCodes::BadValue);
+ ASSERT_EQ(parseStatus.location(), 40549);
+}
+
+TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsNotValidIfNotFirstStageInFacet) {
+ const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"),
+ fromjson("{$changeNotification: {}}")};
+ auto ctx = getExpCtx();
+ ctx->ns = NamespaceString("a.collection");
+ auto parseStatus = Pipeline::parseFacetPipeline(rawPipeline, ctx).getStatus();
+ ASSERT_EQ(parseStatus, ErrorCodes::BadValue);
+ ASSERT_EQ(parseStatus.location(), 40550);
+ ASSERT(std::string::npos != parseStatus.reason().find("$changeNotification"));
+}
+
} // namespace Namespaces
namespace Dependencies {
@@ -1430,8 +1460,8 @@ class DocumentSourceDependencyDummy : public DocumentSourceMock {
public:
DocumentSourceDependencyDummy() : DocumentSourceMock({}) {}
- InitialSourceType getInitialSourceType() const final {
- return InitialSourceType::kNotInitialSource;
+ StageConstraints constraints() const final {
+ return StageConstraints{}; // Overrides DocumentSourceMock's required position.
}
};
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index ddee4d2960a..e21f8b2523c 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -219,20 +219,15 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
mergeCtx->inRouter = true;
// explicitly *not* setting mergeCtx->tempDir
- // Parse and optimize the pipeline specification.
- auto pipeline = Pipeline::parse(request.getPipeline(), mergeCtx);
- if (!pipeline.isOK()) {
- return pipeline.getStatus();
- }
-
- pipeline.getValue()->optimizePipeline();
+ auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), mergeCtx));
+ pipeline->optimizePipeline();
// If the first $match stage is an exact match on the shard key (with a simple collation or no
// string matching), we only have to send it to one shard, so send the command to that shard.
const bool singleShard = !namespaces.executionNss.isCollectionlessAggregateNS() && [&]() {
invariant(chunkMgr);
- BSONObj firstMatchQuery = pipeline.getValue()->getInitialQuery();
+ BSONObj firstMatchQuery = pipeline->getInitialQuery();
BSONObj shardKeyMatches = uassertStatusOK(
chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(opCtx, firstMatchQuery));
@@ -250,7 +245,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// Don't need to split pipeline if the first $match is an exact match on shard key, unless
// there is a stage that needs to be run on the primary shard.
- const bool needPrimaryShardMerger = pipeline.getValue()->needsPrimaryShardMerger();
+ const bool needPrimaryShardMerger = pipeline->needsPrimaryShardMerger();
const bool needSplit = !singleShard || needPrimaryShardMerger;
// Split the pipeline into pieces for mongod(s) and this mongos. It is illegal to use 'pipeline'
@@ -258,10 +253,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForTargetedShards;
std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMergingShard;
if (needSplit) {
- pipelineForTargetedShards = pipeline.getValue()->splitForSharded();
- pipelineForMergingShard = std::move(pipeline.getValue());
+ pipelineForTargetedShards = pipeline->splitForSharded();
+ pipelineForMergingShard = std::move(pipeline);
} else {
- pipelineForTargetedShards = std::move(pipeline.getValue());
+ pipelineForTargetedShards = std::move(pipeline);
}
// Create the command for the shards. The 'fromRouter' field means produce output to be