summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEddie Louie <eddie.louie@mongodb.com>2017-08-01 19:47:26 -0400
committerEddie Louie <eddie.louie@mongodb.com>2017-08-01 19:47:26 -0400
commitbabab967892f81f3107903cb41672503de791998 (patch)
tree2b5565bde7d88b251911215be7569766a25d5fdd /src
parent2a76bd75d75197d3604643ff2b11d0a8f23c14f9 (diff)
downloadmongo-babab967892f81f3107903cb41672503de791998.tar.gz
Revert "SERVER-29506 Require $changeNotification to be the first stage."
This reverts commit 2431e1356823d898ef8af16997d6f63b65b385a5.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp11
-rw-r--r--src/mongo/db/pipeline/document_source.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source.h93
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification.cpp62
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification.h2
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification_test.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.h8
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.h9
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.h8
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp27
-rw-r--r--src/mongo/db/pipeline/document_source_facet.h2
-rw-r--r--src/mongo/db/pipeline/document_source_facet_test.cpp27
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.h10
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.h11
-rw-r--r--src/mongo/db/pipeline/document_source_index_stats.h8
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h11
-rw-r--r--src/mongo/db/pipeline/document_source_match.h14
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.h9
-rw-r--r--src/mongo/db/pipeline/document_source_mock.h10
-rw-r--r--src/mongo/db/pipeline/document_source_out.h9
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.h6
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h6
-rw-r--r--src/mongo/db/pipeline/document_source_unwind.h6
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp53
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp3
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp38
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp19
29 files changed, 183 insertions, 292 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index d77c3a51a23..3d6b8a25d16 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -400,10 +400,15 @@ Status runAggregate(OperationContext* opCtx,
expCtx->tailableMode = ExpressionContext::TailableMode::kTailableAndAwaitData;
}
- auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx));
+ // Parse the pipeline.
+ auto statusWithPipeline = Pipeline::parse(request.getPipeline(), expCtx);
+ if (!statusWithPipeline.isOK()) {
+ return statusWithPipeline.getStatus();
+ }
+ auto pipeline = std::move(statusWithPipeline.getValue());
- // Check that the view's collation matches the collation of any views involved in the
- // pipeline.
+ // Check that the view's collation matches the collation of any views involved
+ // in the pipeline.
if (!pipelineInvolvedNamespaces.empty()) {
invariant(ctx);
auto pipelineCollationStatus = collatorCompatibleWithPipeline(
diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp
index 6e58adae2bf..9651c7cb8e8 100644
--- a/src/mongo/db/pipeline/document_source.cpp
+++ b/src/mongo/db/pipeline/document_source.cpp
@@ -164,7 +164,7 @@ Pipeline::SourceContainer::iterator DocumentSource::optimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
invariant(*itr == this && (std::next(itr) != container->end()));
auto nextMatch = dynamic_cast<DocumentSourceMatch*>((*std::next(itr)).get());
- if (constraints().canSwapWithMatch && nextMatch && !nextMatch->isTextQuery()) {
+ if (canSwapWithMatch() && nextMatch && !nextMatch->isTextQuery()) {
// We're allowed to swap with a $match and the stage after us is a $match. Furthermore, the
// $match does not contain a text search predicate, which we do not attempt to optimize
// because such a $match must already be the first stage in the pipeline. We can attempt to
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 2545f02d8de..24f67e46a29 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -118,39 +118,14 @@ public:
using Parser = stdx::function<std::vector<boost::intrusive_ptr<DocumentSource>>(
BSONElement, const boost::intrusive_ptr<ExpressionContext>&)>;
- /**
- * A struct describing various constraints about where this stage can run, where it must be in
- * the pipeline, and things like that.
- */
- struct StageConstraints {
- /**
- * A Position describes a requirement of the position of the stage within the pipeline.
- */
- enum class PositionRequirement { kNone, kFirst, kLast };
-
- // Set if this stage needs to be in a particular position of the pipeline.
- PositionRequirement requiredPosition = PositionRequirement::kNone;
-
- bool isAllowedInsideFacetStage = true;
-
- // True if this stage does not generate results itself, and instead pulls inputs from an
- // input DocumentSource (via 'pSource').
- bool requiresInputDocSource = true;
-
- // True if this stage operates on a global or database level, like $currentOp.
- bool isIndependentOfAnyCollection = false;
-
- // True if this stage can ever be safely swapped with a subsequent $match stage, provided
- // that the match does not depend on the paths returned by getModifiedPaths().
- //
- // Stages that want to participate in match swapping should set this to true. Such a stage
- // must also override getModifiedPaths() to provide information about which particular
- // $match predicates be swapped before itself.
- bool canSwapWithMatch = false;
-
- // True if this stage must run on the primary shard when the collection being aggregated is
- // sharded.
- bool mustRunOnPrimaryShardIfSharded = false;
+ enum class InitialSourceType {
+ // Stage requires input from a preceding DocumentSource.
+ kNotInitialSource,
+ // Stage does not need an input source and should be the first stage in the pipeline.
+ kInitialSource,
+ // Similar to kInitialSource, but does not require an underlying collection to produce
+ // output.
+ kCollectionlessInitialSource
};
/**
@@ -244,14 +219,6 @@ public:
virtual GetNextResult getNext() = 0;
/**
- * Returns a struct containing information about any special constraints imposed on using this
- * stage.
- */
- virtual StageConstraints constraints() const {
- return StageConstraints{};
- }
-
- /**
* Informs the stage that it is no longer needed and can release its resources. After dispose()
* is called the stage must still be able to handle calls to getNext(), but can return kEOF.
*
@@ -292,6 +259,38 @@ public:
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const;
/**
+ * Subclasses should return InitialSourceType::kInitialSource if the stage does not require an
+ * input source, or InitialSourceType::kCollectionlessInitialSource if the stage will produce
+ * the input for the pipeline independent of an underlying collection. The latter are specified
+ * with {aggregate: 1}, e.g. $currentOp.
+ */
+ virtual InitialSourceType getInitialSourceType() const {
+ return InitialSourceType::kNotInitialSource;
+ }
+
+ /**
+ * Returns true if this stage does not require an input source.
+ */
+ bool isInitialSource() const {
+ return getInitialSourceType() != InitialSourceType::kNotInitialSource;
+ }
+
+ /**
+ * Returns true if this stage will produce the input for the pipeline independent of an
+ * underlying collection. These are specified with {aggregate: 1}, e.g. $currentOp.
+ */
+ bool isCollectionlessInitialSource() const {
+ return getInitialSourceType() == InitialSourceType::kCollectionlessInitialSource;
+ }
+
+ /**
+ * Returns true if the DocumentSource needs to be run on the primary shard.
+ */
+ virtual bool needsPrimaryShard() const {
+ return false;
+ }
+
+ /**
* If DocumentSource uses additional collections, it adds the namespaces to the input vector.
*/
virtual void addInvolvedCollections(std::vector<NamespaceString>* collections) const {}
@@ -427,6 +426,18 @@ public:
return {GetModPathsReturn::Type::kNotSupported, std::set<std::string>{}, {}};
}
+ /**
+ * Returns whether this stage can swap with a subsequent $match stage, provided that the match
+ * does not depend on the paths returned by getModifiedPaths().
+ *
+ * Subclasses which want to participate in match swapping should override this to return true.
+ * Such a subclass must also override getModifiedPaths() to provide information about which
+ * $match predicates be swapped before itself.
+ */
+ virtual bool canSwapWithMatch() const {
+ return false;
+ }
+
enum GetDepsReturn {
// The full object and all metadata may be required.
NOT_SUPPORTED = 0x0,
diff --git a/src/mongo/db/pipeline/document_source_change_notification.cpp b/src/mongo/db/pipeline/document_source_change_notification.cpp
index dff06f05802..b8d7027fc5f 100644
--- a/src/mongo/db/pipeline/document_source_change_notification.cpp
+++ b/src/mongo/db/pipeline/document_source_change_notification.cpp
@@ -40,7 +40,6 @@
namespace mongo {
using boost::intrusive_ptr;
-using boost::optional;
using std::vector;
using std::string;
@@ -54,52 +53,7 @@ REGISTER_MULTI_STAGE_ALIAS(changeNotification,
DocumentSourceChangeNotification::LiteParsed::parse,
DocumentSourceChangeNotification::createFromBson);
-namespace {
-
-static constexpr StringData kOplogMatchExplainName = "$_internalOplogMatch"_sd;
-
-/**
- * A custom subclass of DocumentSourceMatch which does not serialize itself (since it came from an
- * alias) and requires itself to be the first stage in the pipeline.
- */
-class DocumentSourceOplogMatch final : public DocumentSourceMatch {
-public:
- static intrusive_ptr<DocumentSourceOplogMatch> create(
- BSONObj filter, const intrusive_ptr<ExpressionContext>& expCtx) {
- return new DocumentSourceOplogMatch(std::move(filter), expCtx);
- }
-
- const char* getSourceName() const final {
- // This is used in error reporting, particularly if we find this stage in a position other
- // than first, so report the name as $changeNotification.
- return "$changeNotification";
- }
-
- StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
- constraints.isAllowedInsideFacetStage = false;
- return constraints;
- }
-
- /**
- * Only serialize this stage for explain purposes, otherwise keep it hidden so that we can
- * properly alias.
- */
- Value serialize(optional<ExplainOptions::Verbosity> explain) const final {
- if (explain) {
- return Value(Document{{kOplogMatchExplainName, Document{}}});
- }
- return Value();
- }
-
-private:
- DocumentSourceOplogMatch(BSONObj filter, const intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSourceMatch(std::move(filter), expCtx) {}
-};
-} // namespace
-
-BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString& nss) {
+BSONObj DocumentSourceChangeNotification::buildMatch(BSONElement elem, const NamespaceString& nss) {
auto target = nss.ns();
// 1) Supported commands that have the target db namespace (e.g. test.$cmd) in "ns" field.
@@ -124,7 +78,9 @@ BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString
auto opMatch = BSON("ns" << target);
// Match oplog entries after "start" and are either (3) supported commands or (4) CRUD ops.
- return BSON("ts" << GT << Timestamp() << "$or" << BSON_ARRAY(opMatch << commandMatch));
+ auto matchFilter =
+ BSON("ts" << GT << Timestamp() << "$or" << BSON_ARRAY(opMatch << commandMatch));
+ return BSON("$match" << matchFilter);
}
vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFromBson(
@@ -137,11 +93,11 @@ vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFr
"Only default collation is allowed when using a $changeNotification stage.",
!expCtx->getCollator());
- BSONObj filter = buildMatchFilter(expCtx->ns);
+ BSONObj matchObj = buildMatch(elem, expCtx->ns);
- auto oplogMatch = DocumentSourceOplogMatch::create(filter, expCtx);
- auto transformation = createTransformationStage(expCtx);
- return {oplogMatch, transformation};
+ auto matchSource = DocumentSourceMatch::createFromBson(matchObj.firstElement(), expCtx);
+ auto transformSource = createTransformationStage(expCtx);
+ return {matchSource, transformSource};
}
intrusive_ptr<DocumentSource> DocumentSourceChangeNotification::createTransformationStage(
@@ -238,8 +194,6 @@ Document DocumentSourceChangeNotification::Transformation::applyTransformation(
Document DocumentSourceChangeNotification::Transformation::serializeStageOptions(
boost::optional<ExplainOptions::Verbosity> explain) const {
- // TODO SERVER-29135 Be sure to re-serialize the 'postImage' argument.
- // TODO SERVER-29131 Be sure to re-serialize the 'resumeAfter' argument.
return Document();
}
diff --git a/src/mongo/db/pipeline/document_source_change_notification.h b/src/mongo/db/pipeline/document_source_change_notification.h
index 3b38a908626..e5a1186f04e 100644
--- a/src/mongo/db/pipeline/document_source_change_notification.h
+++ b/src/mongo/db/pipeline/document_source_change_notification.h
@@ -77,7 +77,7 @@ public:
/**
* Produce the BSON for the $match stage based on a $changeNotification stage.
*/
- static BSONObj buildMatchFilter(const NamespaceString& nss);
+ static BSONObj buildMatch(BSONElement elem, const NamespaceString& nss);
/**
* Parses a $changeNotification stage from 'elem' and produces the $match and transformation
diff --git a/src/mongo/db/pipeline/document_source_change_notification_test.cpp b/src/mongo/db/pipeline/document_source_change_notification_test.cpp
index b3595789577..0fd14fd9a92 100644
--- a/src/mongo/db/pipeline/document_source_change_notification_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_notification_test.cpp
@@ -101,8 +101,7 @@ TEST_F(ChangeNotificationStageTest, StagesGeneratedCorrectly) {
DocumentSourceChangeNotification::createFromBson(spec.firstElement(), getExpCtx());
ASSERT_EQUALS(result.size(), 2UL);
- ASSERT_TRUE(dynamic_cast<DocumentSourceMatch*>(result[0].get()));
- ASSERT_EQUALS(string(result[0]->getSourceName()), "$changeNotification");
+ ASSERT_EQUALS(string(result[0]->getSourceName()), "$match");
ASSERT_EQUALS(string(result[1]->getSourceName()), "$changeNotification");
// TODO: Check explain result.
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.cpp b/src/mongo/db/pipeline/document_source_coll_stats.cpp
index 73a182145ac..7f025a29606 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.cpp
+++ b/src/mongo/db/pipeline/document_source_coll_stats.cpp
@@ -148,6 +148,10 @@ DocumentSource::GetNextResult DocumentSourceCollStats::getNext() {
return {Document(builder.obj())};
}
+DocumentSource::InitialSourceType DocumentSourceCollStats::getInitialSourceType() const {
+ return InitialSourceType::kInitialSource;
+}
+
Value DocumentSourceCollStats::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
return Value(Document{{getSourceName(), _collStatsSpec}});
}
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h
index 6334bbe55c8..2506c1be823 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.h
+++ b/src/mongo/db/pipeline/document_source_coll_stats.h
@@ -74,13 +74,7 @@ public:
const char* getSourceName() const final;
- StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
- constraints.requiresInputDocSource = false;
- constraints.isAllowedInsideFacetStage = false;
- return constraints;
- }
+ InitialSourceType getInitialSourceType() const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
diff --git a/src/mongo/db/pipeline/document_source_current_op.cpp b/src/mongo/db/pipeline/document_source_current_op.cpp
index 674d0026ad5..49f3dfc9d70 100644
--- a/src/mongo/db/pipeline/document_source_current_op.cpp
+++ b/src/mongo/db/pipeline/document_source_current_op.cpp
@@ -86,6 +86,10 @@ const char* DocumentSourceCurrentOp::getSourceName() const {
return "$currentOp";
}
+DocumentSource::InitialSourceType DocumentSourceCurrentOp::getInitialSourceType() const {
+ return InitialSourceType::kCollectionlessInitialSource;
+}
+
DocumentSource::GetNextResult DocumentSourceCurrentOp::getNext() {
pExpCtx->checkForInterrupt();
diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h
index cd86d027733..4526134c13f 100644
--- a/src/mongo/db/pipeline/document_source_current_op.h
+++ b/src/mongo/db/pipeline/document_source_current_op.h
@@ -78,14 +78,7 @@ public:
const char* getSourceName() const final;
- StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
- constraints.requiresInputDocSource = false;
- constraints.isAllowedInsideFacetStage = false;
- constraints.isIndependentOfAnyCollection = true;
- return constraints;
- }
+ InitialSourceType getInitialSourceType() const final;
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h
index 57a41996af9..b3cdf802d4e 100644
--- a/src/mongo/db/pipeline/document_source_cursor.h
+++ b/src/mongo/db/pipeline/document_source_cursor.h
@@ -50,12 +50,8 @@ public:
return _outputSorts;
}
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
-
- StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
- constraints.requiresInputDocSource = false;
- return constraints;
+ InitialSourceType getInitialSourceType() const final {
+ return InitialSourceType::kInitialSource;
}
void detachFromOperationContext() final;
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index aba1f5f115c..992f3daac11 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -99,6 +99,13 @@ vector<pair<string, vector<BSONObj>>> extractRawPipelines(const BSONElement& ele
<< ": "
<< subPipeElem,
subPipeElem.type() == BSONType::Object);
+ auto stageName = subPipeElem.Obj().firstElementFieldName();
+ uassert(
+ 40331,
+ str::stream() << "specified stage is not allowed to be used within a $facet stage: "
+ << subPipeElem,
+ !str::equals(stageName, "$out") && !str::equals(stageName, "$facet"));
+
rawPipeline.push_back(subPipeElem.embeddedObject());
}
@@ -237,22 +244,16 @@ void DocumentSourceFacet::doReattachToOperationContext(OperationContext* opCtx)
}
}
-DocumentSource::StageConstraints DocumentSourceFacet::constraints() const {
- StageConstraints constraints;
- constraints.isAllowedInsideFacetStage = false; // Disallow nested $facets.
-
+bool DocumentSourceFacet::needsPrimaryShard() const {
+ // Currently we don't split $facet to have a merger part and a shards part (see SERVER-24154).
+ // This means that if any stage in any of the $facet pipelines requires the primary shard, then
+ // the entire $facet must happen on the merger, and the merger must be the primary shard.
for (auto&& facet : _facets) {
- for (auto&& nestedStage : facet.pipeline->getSources()) {
- if (nestedStage->constraints().mustRunOnPrimaryShardIfSharded) {
- // Currently we don't split $facet to have a merger part and a shards part (see
- // SERVER-24154). This means that if any stage in any of the $facet pipelines
- // requires the primary shard, then the entire $facet must happen on the merger, and
- // the merger must be the primary shard.
- constraints.mustRunOnPrimaryShardIfSharded = true;
- }
+ if (facet.pipeline->needsPrimaryShardMerger()) {
+ return true;
}
}
- return constraints;
+ return false;
}
DocumentSource::GetDepsReturn DocumentSourceFacet::getDependencies(DepsTracker* deps) const {
diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h
index 9ca2ea149ec..f06f73c7b27 100644
--- a/src/mongo/db/pipeline/document_source_facet.h
+++ b/src/mongo/db/pipeline/document_source_facet.h
@@ -135,7 +135,7 @@ public:
void doInjectMongodInterface(std::shared_ptr<MongodInterface> mongod) final;
void doDetachFromOperationContext() final;
void doReattachToOperationContext(OperationContext* opCtx) final;
- StageConstraints constraints() const final;
+ bool needsPrimaryShard() const final;
protected:
void doDispose() final;
diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp
index 60aefdead64..a41b3a2b1ce 100644
--- a/src/mongo/db/pipeline/document_source_facet_test.cpp
+++ b/src/mongo/db/pipeline/document_source_facet_test.cpp
@@ -111,6 +111,16 @@ TEST_F(DocumentSourceFacetTest, ShouldRejectEmptyPipelines) {
ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
}
+TEST_F(DocumentSourceFacetTest, ShouldRejectFacetsWithStagesThatMustBeTheFirstStage) {
+ auto ctx = getExpCtx();
+ auto spec = BSON("$facet" << BSON("a" << BSON_ARRAY(BSON("$indexStats" << BSONObj()))));
+ ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
+
+ spec = BSON("$facet" << BSON(
+ "a" << BSON_ARRAY(BSON("$limit" << 1) << BSON("$indexStats" << BSONObj()))));
+ ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
+}
+
TEST_F(DocumentSourceFacetTest, ShouldSucceedWhenNamespaceIsCollectionless) {
auto ctx = getExpCtx();
auto spec = fromjson("{$facet: {a: [{$match: {}}]}}");
@@ -168,10 +178,9 @@ class DocumentSourcePassthrough : public DocumentSourceMock {
public:
DocumentSourcePassthrough() : DocumentSourceMock({}) {}
- StageConstraints constraints() const override {
- StageConstraints constraints;
- constraints.isAllowedInsideFacetStage = true;
- return constraints;
+ // We need this to be false so that it can be used in a $facet stage.
+ InitialSourceType getInitialSourceType() const final {
+ return InitialSourceType::kNotInitialSource;
}
DocumentSource::GetNextResult getNext() final {
@@ -607,10 +616,8 @@ TEST_F(DocumentSourceFacetTest, ShouldThrowIfAnyPipelineRequiresTextScoreButItIs
*/
class DocumentSourceNeedsPrimaryShard final : public DocumentSourcePassthrough {
public:
- StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.mustRunOnPrimaryShardIfSharded = true;
- return constraints;
+ bool needsPrimaryShard() const final {
+ return true;
}
static boost::intrusive_ptr<DocumentSourceNeedsPrimaryShard> create() {
@@ -633,7 +640,7 @@ TEST_F(DocumentSourceFacetTest, ShouldRequirePrimaryShardIfAnyStageRequiresPrima
facets.emplace_back("needsPrimaryShard", std::move(secondPipeline));
auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
- ASSERT_TRUE(facetStage->constraints().mustRunOnPrimaryShardIfSharded);
+ ASSERT_TRUE(facetStage->needsPrimaryShard());
}
TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPrimaryShard) {
@@ -652,7 +659,7 @@ TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPr
facets.emplace_back("second", std::move(secondPipeline));
auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
- ASSERT_FALSE(facetStage->constraints().mustRunOnPrimaryShardIfSharded);
+ ASSERT_FALSE(facetStage->needsPrimaryShard());
}
} // namespace
diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h
index 5df25d6c1c6..5178a89c18a 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.h
+++ b/src/mongo/db/pipeline/document_source_geo_near.h
@@ -46,15 +46,9 @@ public:
*/
Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
Pipeline::SourceContainer* container) final;
-
- StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
- constraints.requiresInputDocSource = false;
- constraints.isAllowedInsideFacetStage = false;
- return constraints;
+ InitialSourceType getInitialSourceType() const final {
+ return InitialSourceType::kInitialSource;
}
-
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
BSONObjSet getOutputSorts() final {
return SimpleBSONObjComparator::kInstance.makeBSONObjSet(
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h
index 489a80ea7b9..6572fc4741f 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.h
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.h
@@ -53,11 +53,8 @@ public:
*/
GetModPathsReturn getModifiedPaths() const final;
- StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.canSwapWithMatch = true;
- constraints.mustRunOnPrimaryShardIfSharded = true;
- return constraints;
+ bool canSwapWithMatch() const final {
+ return true;
}
GetDepsReturn getDependencies(DepsTracker* deps) const final {
@@ -65,6 +62,10 @@ public:
return SEE_NEXT;
};
+ bool needsPrimaryShard() const final {
+ return true;
+ }
+
void addInvolvedCollections(std::vector<NamespaceString>* collections) const final {
collections->push_back(_from);
}
diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h
index dd62f5323bd..7d25aca6c9f 100644
--- a/src/mongo/db/pipeline/document_source_index_stats.h
+++ b/src/mongo/db/pipeline/document_source_index_stats.h
@@ -69,12 +69,8 @@ public:
const char* getSourceName() const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
- StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
- constraints.requiresInputDocSource = false;
- constraints.isAllowedInsideFacetStage = false;
- return constraints;
+ virtual InitialSourceType getInitialSourceType() const final {
+ return InitialSourceType::kInitialSource;
}
static boost::intrusive_ptr<DocumentSource> createFromBson(
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index d5bab6fd9e7..bb4fae8df2e 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -94,11 +94,8 @@ public:
*/
GetModPathsReturn getModifiedPaths() const final;
- StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.canSwapWithMatch = true;
- constraints.mustRunOnPrimaryShardIfSharded = true;
- return constraints;
+ bool canSwapWithMatch() const final {
+ return true;
}
GetDepsReturn getDependencies(DepsTracker* deps) const final;
@@ -107,6 +104,10 @@ public:
return DocumentSource::truncateSortSet(pSource->getOutputSorts(), {_as.fullPath()});
}
+ bool needsPrimaryShard() const final {
+ return true;
+ }
+
boost::intrusive_ptr<DocumentSource> getShardSource() final {
return nullptr;
}
diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h
index 2be99dd7f12..0b5800515aa 100644
--- a/src/mongo/db/pipeline/document_source_match.h
+++ b/src/mongo/db/pipeline/document_source_match.h
@@ -37,21 +37,18 @@
namespace mongo {
-class DocumentSourceMatch : public DocumentSource {
+class DocumentSourceMatch final : public DocumentSource {
public:
- virtual ~DocumentSourceMatch() = default;
-
+ // virtuals from DocumentSource
GetNextResult getNext() final;
+ const char* getSourceName() const final;
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
boost::intrusive_ptr<DocumentSource> optimize() final;
BSONObjSet getOutputSorts() final {
return pSource ? pSource->getOutputSorts()
: SimpleBSONObjComparator::kInstance.makeBSONObjSet();
}
- const char* getSourceName() const override;
- Value serialize(
- boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override;
-
/**
* Attempts to combine with any subsequent $match stages, joining the query objects with a
* $and.
@@ -144,11 +141,10 @@ public:
const std::string& path,
const boost::intrusive_ptr<ExpressionContext>& expCtx);
-protected:
+private:
DocumentSourceMatch(const BSONObj& query,
const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-private:
void addDependencies(DepsTracker* deps) const;
std::unique_ptr<MatchExpression> _expression;
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h
index 045ddad4836..ebe0a861ad6 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.h
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.h
@@ -52,13 +52,8 @@ public:
GetNextResult getNext() final;
const char* getSourceName() const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
-
- StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
- constraints.requiresInputDocSource = false;
- constraints.isAllowedInsideFacetStage = false;
- return constraints;
+ InitialSourceType getInitialSourceType() const final {
+ return InitialSourceType::kInitialSource;
}
static boost::intrusive_ptr<DocumentSource> createFromBson(
diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h
index 513994cd599..9236e55ad62 100644
--- a/src/mongo/db/pipeline/document_source_mock.h
+++ b/src/mongo/db/pipeline/document_source_mock.h
@@ -48,15 +48,9 @@ public:
const char* getSourceName() const override;
Value serialize(
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override;
-
- StageConstraints constraints() const override {
- StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
- constraints.requiresInputDocSource = false;
- constraints.isAllowedInsideFacetStage = false;
- return constraints;
+ InitialSourceType getInitialSourceType() const override {
+ return InitialSourceType::kInitialSource;
}
-
BSONObjSet getOutputSorts() override {
return sorts;
}
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 9366d01a670..3b134f39d5e 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -43,13 +43,8 @@ public:
const char* getSourceName() const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
GetDepsReturn getDependencies(DepsTracker* deps) const final;
-
- StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.mustRunOnPrimaryShardIfSharded = true;
- constraints.isAllowedInsideFacetStage = false;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kLast;
- return constraints;
+ bool needsPrimaryShard() const final {
+ return true;
}
// Virtuals for SplittableDocumentSource
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h
index 26d394d05e9..a3bea23795e 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.h
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h
@@ -101,10 +101,8 @@ public:
DocumentSource::GetDepsReturn getDependencies(DepsTracker* deps) const final;
GetModPathsReturn getModifiedPaths() const final;
- StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.canSwapWithMatch = true;
- return constraints;
+ bool canSwapWithMatch() const final {
+ return true;
}
TransformerInterface::TransformerType getType() const {
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index 866a3fb955c..cb1c709ecde 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -52,12 +52,10 @@ public:
return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}};
}
- StageConstraints constraints() const final {
- StageConstraints constraints;
+ bool canSwapWithMatch() const final {
// Can't swap with a $match if a limit has been absorbed, since in general match can't swap
// with limit.
- constraints.canSwapWithMatch = !limitSrc;
- return constraints;
+ return !limitSrc;
}
BSONObjSet getOutputSorts() final {
diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h
index fb7bd68ba39..53fbd33d083 100644
--- a/src/mongo/db/pipeline/document_source_unwind.h
+++ b/src/mongo/db/pipeline/document_source_unwind.h
@@ -46,10 +46,8 @@ public:
*/
GetModPathsReturn getModifiedPaths() const final;
- StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.canSwapWithMatch = true;
- return constraints;
+ bool canSwapWithMatch() const final {
+ return true;
}
GetDepsReturn getDependencies(DepsTracker* deps) const final;
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 31cd015ea97..63dae04e325 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -32,8 +32,6 @@
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/pipeline_optimizations.h"
-#include <algorithm>
-
#include "mongo/base/error_codes.h"
#include "mongo/db/bson/dotted_path_support.h"
#include "mongo/db/catalog/document_validation.h"
@@ -136,16 +134,14 @@ Status Pipeline::validatePipeline() const {
// {aggregate: 1} is only valid for collectionless sources, and vice-versa.
const auto firstStage = _sources.front().get();
- if (nss.isCollectionlessAggregateNS() &&
- !firstStage->constraints().isIndependentOfAnyCollection) {
+ if (nss.isCollectionlessAggregateNS() && !firstStage->isCollectionlessInitialSource()) {
return {ErrorCodes::InvalidNamespace,
str::stream() << "{aggregate: 1} is not valid for '"
<< firstStage->getSourceName()
<< "'; a collection is required."};
}
- if (!nss.isCollectionlessAggregateNS() &&
- firstStage->constraints().isIndependentOfAnyCollection) {
+ if (!nss.isCollectionlessAggregateNS() && firstStage->isCollectionlessInitialSource()) {
return {ErrorCodes::InvalidNamespace,
str::stream() << "'" << firstStage->getSourceName()
<< "' can only be run with {aggregate: 1}"};
@@ -158,21 +154,11 @@ Status Pipeline::validatePipeline() const {
Status Pipeline::validateFacetPipeline() const {
if (_sources.empty()) {
- return {ErrorCodes::BadValue, "sub-pipeline in $facet stage cannot be empty"};
- }
- for (auto&& stage : _sources) {
- auto stageConstraints = stage->constraints();
- if (!stageConstraints.isAllowedInsideFacetStage) {
- return {ErrorCodes::BadValue,
- str::stream() << stage->getSourceName()
- << " is not allowed to be used within a $facet stage",
- 40550};
- }
- // We expect a stage within a $facet stage to have these properties.
- invariant(stageConstraints.requiresInputDocSource);
- invariant(!stageConstraints.isIndependentOfAnyCollection);
- invariant(stageConstraints.requiredPosition ==
- DocumentSource::StageConstraints::PositionRequirement::kNone);
+ return {ErrorCodes::BadValue, "sub-pipeline in $facet stage cannot be empty."};
+ } else if (_sources.front()->isInitialSource()) {
+ return {ErrorCodes::BadValue,
+ str::stream() << _sources.front()->getSourceName()
+ << " is not allowed to be used within a $facet stage."};
}
// Facet pipelines cannot have any stages which are initial sources. We've already validated the
@@ -184,13 +170,10 @@ Status Pipeline::validateFacetPipeline() const {
Status Pipeline::ensureAllStagesAreInLegalPositions() const {
size_t i = 0;
for (auto&& stage : _sources) {
- if (stage->constraints().requiredPosition ==
- DocumentSource::StageConstraints::PositionRequirement::kFirst &&
- i != 0) {
+ if (stage->isInitialSource() && i != 0) {
return {ErrorCodes::BadValue,
str::stream() << stage->getSourceName()
- << " is only valid as the first stage in a pipeline.",
- 40549};
+ << " is only valid as the first stage in a pipeline."};
}
auto matchStage = dynamic_cast<DocumentSourceMatch*>(stage.get());
if (i != 0 && matchStage && matchStage->isTextQuery()) {
@@ -199,13 +182,8 @@ Status Pipeline::ensureAllStagesAreInLegalPositions() const {
17313};
}
- if (stage->constraints().requiredPosition ==
- DocumentSource::StageConstraints::PositionRequirement::kLast &&
- i != _sources.size() - 1) {
- return {ErrorCodes::BadValue,
- str::stream() << stage->getSourceName()
- << " can only be the final stage in the pipeline",
- 40551};
+ if (dynamic_cast<DocumentSourceOut*>(stage.get()) && i != _sources.size() - 1) {
+ return {ErrorCodes::BadValue, "$out can only be the final stage in the pipeline"};
}
++i;
}
@@ -395,9 +373,12 @@ BSONObj Pipeline::getInitialQuery() const {
}
bool Pipeline::needsPrimaryShardMerger() const {
- return std::any_of(_sources.begin(), _sources.end(), [](const auto& stage) {
- return stage->constraints().mustRunOnPrimaryShardIfSharded;
- });
+ for (auto&& source : _sources) {
+ if (source->needsPrimaryShard()) {
+ return true;
+ }
+ }
+ return false;
}
std::vector<NamespaceString> Pipeline::getInvolvedCollections() const {
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index f1222797f11..697c639ccf7 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -454,7 +454,8 @@ void PipelineD::prepareCursorSource(Collection* collection,
}
}
- if (!sources.empty() && !sources.front()->constraints().requiresInputDocSource) {
+ // If the first stage of the pipeline is an initial source, we don't need an input cursor.
+ if (!sources.empty() && sources.front()->isInitialSource()) {
return;
}
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 1f37b30fc90..8dbc02fc88a 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -1349,10 +1349,8 @@ class DocumentSourceCollectionlessMock : public DocumentSourceMock {
public:
DocumentSourceCollectionlessMock() : DocumentSourceMock({}) {}
- StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.isIndependentOfAnyCollection = true;
- return constraints;
+ InitialSourceType getInitialSourceType() const final {
+ return InitialSourceType::kCollectionlessInitialSource;
}
static boost::intrusive_ptr<DocumentSourceCollectionlessMock> create() {
@@ -1405,34 +1403,6 @@ TEST_F(PipelineInitialSourceNSTest, AggregateOneNSValidForFacetPipelineRegardles
ASSERT_OK(Pipeline::parseFacetPipeline(rawPipeline, ctx).getStatus());
}
-TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsValidAsFirstStage) {
- const std::vector<BSONObj> rawPipeline = {fromjson("{$changeNotification: {}}")};
- auto ctx = getExpCtx();
- ctx->ns = NamespaceString("a.collection");
- ASSERT_OK(Pipeline::parse(rawPipeline, ctx).getStatus());
-}
-
-TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsNotValidIfNotFirstStage) {
- const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"),
- fromjson("{$changeNotification: {}}")};
- auto ctx = getExpCtx();
- ctx->ns = NamespaceString("a.collection");
- auto parseStatus = Pipeline::parse(rawPipeline, ctx).getStatus();
- ASSERT_EQ(parseStatus, ErrorCodes::BadValue);
- ASSERT_EQ(parseStatus.location(), 40549);
-}
-
-TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsNotValidIfNotFirstStageInFacet) {
- const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"),
- fromjson("{$changeNotification: {}}")};
- auto ctx = getExpCtx();
- ctx->ns = NamespaceString("a.collection");
- auto parseStatus = Pipeline::parseFacetPipeline(rawPipeline, ctx).getStatus();
- ASSERT_EQ(parseStatus, ErrorCodes::BadValue);
- ASSERT_EQ(parseStatus.location(), 40550);
- ASSERT(std::string::npos != parseStatus.reason().find("$changeNotification"));
-}
-
} // namespace Namespaces
namespace Dependencies {
@@ -1460,8 +1430,8 @@ class DocumentSourceDependencyDummy : public DocumentSourceMock {
public:
DocumentSourceDependencyDummy() : DocumentSourceMock({}) {}
- StageConstraints constraints() const final {
- return StageConstraints{}; // Overrides DocumentSourceMock's required position.
+ InitialSourceType getInitialSourceType() const final {
+ return InitialSourceType::kNotInitialSource;
}
};
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index e21f8b2523c..ddee4d2960a 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -219,15 +219,20 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
mergeCtx->inRouter = true;
// explicitly *not* setting mergeCtx->tempDir
- auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), mergeCtx));
- pipeline->optimizePipeline();
+ // Parse and optimize the pipeline specification.
+ auto pipeline = Pipeline::parse(request.getPipeline(), mergeCtx);
+ if (!pipeline.isOK()) {
+ return pipeline.getStatus();
+ }
+
+ pipeline.getValue()->optimizePipeline();
// If the first $match stage is an exact match on the shard key (with a simple collation or no
// string matching), we only have to send it to one shard, so send the command to that shard.
const bool singleShard = !namespaces.executionNss.isCollectionlessAggregateNS() && [&]() {
invariant(chunkMgr);
- BSONObj firstMatchQuery = pipeline->getInitialQuery();
+ BSONObj firstMatchQuery = pipeline.getValue()->getInitialQuery();
BSONObj shardKeyMatches = uassertStatusOK(
chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(opCtx, firstMatchQuery));
@@ -245,7 +250,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// Don't need to split pipeline if the first $match is an exact match on shard key, unless
// there is a stage that needs to be run on the primary shard.
- const bool needPrimaryShardMerger = pipeline->needsPrimaryShardMerger();
+ const bool needPrimaryShardMerger = pipeline.getValue()->needsPrimaryShardMerger();
const bool needSplit = !singleShard || needPrimaryShardMerger;
// Split the pipeline into pieces for mongod(s) and this mongos. It is illegal to use 'pipeline'
@@ -253,10 +258,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForTargetedShards;
std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMergingShard;
if (needSplit) {
- pipelineForTargetedShards = pipeline->splitForSharded();
- pipelineForMergingShard = std::move(pipeline);
+ pipelineForTargetedShards = pipeline.getValue()->splitForSharded();
+ pipelineForMergingShard = std::move(pipeline.getValue());
} else {
- pipelineForTargetedShards = std::move(pipeline);
+ pipelineForTargetedShards = std::move(pipeline.getValue());
}
// Create the command for the shards. The 'fromRouter' field means produce output to be