summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorTed Tuckman <ted.tuckman@mongodb.com>2020-02-06 13:28:27 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-18 18:57:37 +0000
commitad8f814bcac53e273acb563e2f96264a9bb973ab (patch)
tree37f540b605578b5b9efda7e40663ef5372172ba6 /src/mongo/db
parent46ee9603414d47d18c850e5eccd18834cd7f9669 (diff)
downloadmongo-ad8f814bcac53e273acb563e2f96264a9bb973ab.tar.gz
SERVER-45620 Have $unionWith introspect the pipeline to decide on constraints
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp52
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp26
-rw-r--r--src/mongo/db/pipeline/document_source_mock.cpp16
-rw-r--r--src/mongo/db/pipeline/document_source_mock.h8
-rw-r--r--src/mongo/db/pipeline/document_source_union_with.h14
-rw-r--r--src/mongo/db/pipeline/document_source_union_with_test.cpp113
-rw-r--r--src/mongo/db/pipeline/stage_constraints.cpp37
-rw-r--r--src/mongo/db/pipeline/stage_constraints.h89
9 files changed, 219 insertions, 137 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 0d8a9ecd390..5ad3adefc5b 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -252,7 +252,6 @@ pipelineEnv.Library(
'pipeline.cpp',
'semantic_analysis.cpp',
'sequential_document_cache.cpp',
- 'stage_constraints.cpp',
'tee_buffer.cpp',
],
LIBDEPS=[
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index 88f926b9b41..fcbfc566c0c 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -104,31 +104,6 @@ vector<pair<string, vector<BSONObj>>> extractRawPipelines(const BSONElement& ele
return rawFacetPipelines;
}
-StageConstraints::LookupRequirement computeLookupRequirement(
- const std::vector<DocumentSourceFacet::FacetPipeline>& facets) {
- for (auto&& facet : facets) {
- const auto& sources = facet.pipeline->getSources();
- for (auto&& src : sources) {
- if (!src->constraints().isAllowedInLookupPipeline()) {
- return StageConstraints::LookupRequirement::kNotAllowed;
- }
- }
- }
- return StageConstraints::LookupRequirement::kAllowed;
-}
-
-StageConstraints::UnionRequirement computeUnionRequirement(
- const std::vector<DocumentSourceFacet::FacetPipeline>& facets) {
- for (auto&& facet : facets) {
- for (auto&& src : facet.pipeline->getSources()) {
- if (!src->constraints().isAllowedInUnionPipeline()) {
- return StageConstraints::UnionRequirement::kNotAllowed;
- }
- }
- }
- return StageConstraints::UnionRequirement::kAllowed;
-}
-
} // namespace
std::unique_ptr<DocumentSourceFacet::LiteParsed> DocumentSourceFacet::LiteParsed::parse(
@@ -251,22 +226,21 @@ StageConstraints DocumentSourceFacet::constraints(Pipeline::SplitState) const {
}
}
- // Resolve the disk use and transaction requirement of this $facet by iterating through the
- // children in its facets.
- auto diskAndTxnReq = StageConstraints::kDefaultDiskUseAndTransactionRequirement;
+ // Resolve the disk use, lookup, and transaction requirement of this $facet by iterating through
+ // the children in its facets.
+ StageConstraints constraints(StreamType::kBlocking,
+ PositionRequirement::kNone,
+ host,
+ StageConstraints::DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed,
+ StageConstraints::TransactionRequirement::kAllowed,
+ StageConstraints::LookupRequirement::kAllowed,
+ StageConstraints::UnionRequirement::kAllowed);
for (const auto& facet : _facets) {
- diskAndTxnReq = StageConstraints::resolveDiskUseAndTransactionRequirement(
- facet.pipeline->getSources(), diskAndTxnReq);
+ constraints =
+ StageConstraints::getStrictestConstraints(facet.pipeline->getSources(), constraints);
}
-
- return {StreamType::kBlocking,
- PositionRequirement::kNone,
- host,
- std::get<StageConstraints::DiskUseRequirement>(diskAndTxnReq),
- FacetRequirement::kNotAllowed,
- std::get<StageConstraints::TransactionRequirement>(diskAndTxnReq),
- computeLookupRequirement(_facets),
- computeUnionRequirement(_facets)};
+ return constraints;
}
bool DocumentSourceFacet::usedDisk() {
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index f998c3d66a8..4e30b9fc5dc 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -176,19 +176,6 @@ const char* DocumentSourceLookUp::getSourceName() const {
}
StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const {
- // By default, $lookup is allowed in a transaction and does not use disk.
- auto diskRequirement = DiskUseRequirement::kNoDiskUse;
- auto txnRequirement = TransactionRequirement::kAllowed;
-
- // However, if $lookup is specified with a pipeline, it inherits the strictest disk use and
- // transaction requirement from the children in its pipeline.
- if (wasConstructedWithPipelineSyntax()) {
- const auto resolvedRequirements = StageConstraints::resolveDiskUseAndTransactionRequirement(
- _resolvedIntrospectionPipeline->getSources());
- diskRequirement = resolvedRequirements.first;
- txnRequirement = resolvedRequirements.second;
- }
-
// If executing on mongos and the foreign collection is sharded, then this stage can run on
// mongos or any shard.
HostTypeRequirement hostRequirement =
@@ -203,17 +190,26 @@ StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const {
hostRequirement = HostTypeRequirement::kPrimaryShard;
}
+ // By default, $lookup is allowed in a transaction and does not use disk.
StageConstraints constraints(StreamType::kStreaming,
PositionRequirement::kNone,
hostRequirement,
- diskRequirement,
+ DiskUseRequirement::kNoDiskUse,
FacetRequirement::kAllowed,
- txnRequirement,
+ TransactionRequirement::kAllowed,
LookupRequirement::kAllowed,
UnionRequirement::kAllowed);
+ // However, if $lookup is specified with a pipeline, it inherits the strictest disk use, facet,
+ // transaction, and lookup requirements from the children in its pipeline.
+ if (wasConstructedWithPipelineSyntax()) {
+ constraints = StageConstraints::getStrictestConstraints(
+ _resolvedIntrospectionPipeline->getSources(), constraints);
+ }
+
constraints.canSwapWithMatch = true;
constraints.canSwapWithLimitAndSample = !_unwindSrc;
+
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_mock.cpp b/src/mongo/db/pipeline/document_source_mock.cpp
index 815cacb24ac..70b7dbf4e14 100644
--- a/src/mongo/db/pipeline/document_source_mock.cpp
+++ b/src/mongo/db/pipeline/document_source_mock.cpp
@@ -41,7 +41,21 @@ using boost::intrusive_ptr;
using std::deque;
DocumentSourceMock::DocumentSourceMock(deque<GetNextResult> results)
- : DocumentSourceQueue(std::move(results), new ExpressionContextForTest()) {}
+ : DocumentSourceMock(std::move(results), new ExpressionContextForTest()) {}
+
+DocumentSourceMock::DocumentSourceMock(std::deque<GetNextResult> results,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSourceQueue(std::move(results), expCtx),
+ mockConstraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kAllowed,
+ LookupRequirement::kAllowed,
+ UnionRequirement::kAllowed) {
+ mockConstraints.requiresInputDocSource = false;
+}
const char* DocumentSourceMock::getSourceName() const {
return "mock";
diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h
index ca882575585..b5b18124bc3 100644
--- a/src/mongo/db/pipeline/document_source_mock.h
+++ b/src/mongo/db/pipeline/document_source_mock.h
@@ -32,6 +32,7 @@
#include <deque>
#include "mongo/db/pipeline/document_source_queue.h"
+#include "mongo/db/pipeline/stage_constraints.h"
namespace mongo {
@@ -57,6 +58,7 @@ public:
using DocumentSourceQueue::DocumentSourceQueue;
DocumentSourceMock(std::deque<GetNextResult>);
+ DocumentSourceMock(std::deque<GetNextResult>, const boost::intrusive_ptr<ExpressionContext>&);
Value serialize(
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override {
@@ -79,6 +81,10 @@ public:
return this;
}
+ StageConstraints constraints(Pipeline::SplitState pipeState) const override {
+ return mockConstraints;
+ }
+
/**
* This stage does not modify anything.
*/
@@ -93,6 +99,8 @@ public:
bool isDisposed = false;
bool isDetachedFromOpCtx = false;
bool isOptimized = false;
+ StageConstraints mockConstraints;
+
protected:
GetNextResult doGetNext() override {
diff --git a/src/mongo/db/pipeline/document_source_union_with.h b/src/mongo/db/pipeline/document_source_union_with.h
index beb69c7e9e5..a5a4e14fc48 100644
--- a/src/mongo/db/pipeline/document_source_union_with.h
+++ b/src/mongo/db/pipeline/document_source_union_with.h
@@ -34,6 +34,7 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
+#include "mongo/db/pipeline/stage_constraints.h"
namespace mongo {
@@ -88,7 +89,7 @@ public:
}
StageConstraints constraints(Pipeline::SplitState) const final {
- auto constraints = StageConstraints(
+ StageConstraints unionConstraints(
StreamType::kStreaming,
PositionRequirement::kNone,
HostTypeRequirement::kAnyShard,
@@ -100,12 +101,19 @@ public:
LookupRequirement::kAllowed,
UnionRequirement::kAllowed);
+ if (_pipeline) {
+ // The constraints of the sub-pipeline determine the constraints of the $unionWith
+ // stage. We want to forward the strictest requirements of the stages in the
+ // sub-pipeline.
+ unionConstraints = StageConstraints::getStrictestConstraints(_pipeline->getSources(),
+ unionConstraints);
+ }
// DocumentSourceUnionWith cannot directly swap with match but it contains custom logic in
// the doOptimizeAt() member function to allow itself to duplicate any match ahead in the
// current pipeline and place one copy inside its sub-pipeline and one copy behind in the
// current pipeline.
- constraints.canSwapWithMatch = false;
- return constraints;
+ unionConstraints.canSwapWithMatch = false;
+ return unionConstraints;
}
DepsTracker::State getDependencies(DepsTracker* deps) const final;
diff --git a/src/mongo/db/pipeline/document_source_union_with_test.cpp b/src/mongo/db/pipeline/document_source_union_with_test.cpp
index 9309b24ab7b..eaebccf21a7 100644
--- a/src/mongo/db/pipeline/document_source_union_with_test.cpp
+++ b/src/mongo/db/pipeline/document_source_union_with_test.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_add_fields.h"
+#include "mongo/db/pipeline/document_source_facet.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_mock.h"
#include "mongo/db/pipeline/document_source_replace_root.h"
@@ -454,5 +455,117 @@ TEST_F(DocumentSourceUnionWithTest, RejectUnionWhenDepthLimitIsExceeded) {
AssertionException,
ErrorCodes::MaxSubPipelineDepthExceeded);
}
+
+TEST_F(DocumentSourceUnionWithTest, ConstraintsWithoutPipelineAreCorrect) {
+ auto emptyUnion = DocumentSourceUnionWith(
+ getExpCtx(),
+ uassertStatusOK(
+ Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx())));
+ StageConstraints defaultConstraints(StageConstraints::StreamType::kStreaming,
+ StageConstraints::PositionRequirement::kNone,
+ StageConstraints::HostTypeRequirement::kAnyShard,
+ StageConstraints::DiskUseRequirement::kNoDiskUse,
+ StageConstraints::FacetRequirement::kAllowed,
+ StageConstraints::TransactionRequirement::kNotAllowed,
+ StageConstraints::LookupRequirement::kAllowed,
+ StageConstraints::UnionRequirement::kAllowed);
+ ASSERT_TRUE(emptyUnion.constraints(Pipeline::SplitState::kUnsplit) == defaultConstraints);
+}
+
+TEST_F(DocumentSourceUnionWithTest, ConstraintsWithMixedSubPipelineAreCorrect) {
+ const auto mock = DocumentSourceMock::createForTest();
+ StageConstraints stricterConstraint(StageConstraints::StreamType::kStreaming,
+ StageConstraints::PositionRequirement::kNone,
+ StageConstraints::HostTypeRequirement::kAnyShard,
+ StageConstraints::DiskUseRequirement::kNoDiskUse,
+ StageConstraints::FacetRequirement::kNotAllowed,
+ StageConstraints::TransactionRequirement::kNotAllowed,
+ StageConstraints::LookupRequirement::kNotAllowed,
+ StageConstraints::UnionRequirement::kAllowed);
+ mock->mockConstraints = stricterConstraint;
+ auto unionWithOne = DocumentSourceUnionWith(
+ getExpCtx(),
+ uassertStatusOK(
+ Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{mock}, getExpCtx())));
+ ASSERT_TRUE(unionWithOne.constraints(Pipeline::SplitState::kUnsplit) == stricterConstraint);
+}
+
+TEST_F(DocumentSourceUnionWithTest, ConstraintsWithStrictSubPipelineAreCorrect) {
+ const auto mockOne = DocumentSourceMock::createForTest();
+ StageConstraints constraintTmpDataFacetLookupNotAllowed(
+ StageConstraints::StreamType::kStreaming,
+ StageConstraints::PositionRequirement::kNone,
+ StageConstraints::HostTypeRequirement::kAnyShard,
+ StageConstraints::DiskUseRequirement::kWritesTmpData,
+ StageConstraints::FacetRequirement::kNotAllowed,
+ StageConstraints::TransactionRequirement::kAllowed,
+ StageConstraints::LookupRequirement::kNotAllowed,
+ StageConstraints::UnionRequirement::kAllowed);
+ mockOne->mockConstraints = constraintTmpDataFacetLookupNotAllowed;
+ const auto mockTwo = DocumentSourceMock::createForTest();
+ StageConstraints constraintPermissive(StageConstraints::StreamType::kStreaming,
+ StageConstraints::PositionRequirement::kNone,
+ StageConstraints::HostTypeRequirement::kNone,
+ StageConstraints::DiskUseRequirement::kNoDiskUse,
+ StageConstraints::FacetRequirement::kAllowed,
+ StageConstraints::TransactionRequirement::kAllowed,
+ StageConstraints::LookupRequirement::kAllowed,
+ StageConstraints::UnionRequirement::kAllowed);
+ mockTwo->mockConstraints = constraintPermissive;
+ const auto mockThree = DocumentSourceMock::createForTest();
+ StageConstraints constraintPersistentDataTransactionLookupNotAllowed(
+ StageConstraints::StreamType::kStreaming,
+ StageConstraints::PositionRequirement::kNone,
+ StageConstraints::HostTypeRequirement::kNone,
+ StageConstraints::DiskUseRequirement::kWritesPersistentData,
+ StageConstraints::FacetRequirement::kAllowed,
+ StageConstraints::TransactionRequirement::kNotAllowed,
+ StageConstraints::LookupRequirement::kNotAllowed,
+ StageConstraints::UnionRequirement::kAllowed);
+ mockThree->mockConstraints = constraintPersistentDataTransactionLookupNotAllowed;
+ auto unionStage = DocumentSourceUnionWith(
+ getExpCtx(),
+ uassertStatusOK(Pipeline::create(
+ std::list<boost::intrusive_ptr<DocumentSource>>{mockOne, mockTwo, mockThree},
+ getExpCtx())));
+ StageConstraints strict(StageConstraints::StreamType::kStreaming,
+ StageConstraints::PositionRequirement::kNone,
+ StageConstraints::HostTypeRequirement::kAnyShard,
+ StageConstraints::DiskUseRequirement::kWritesPersistentData,
+ StageConstraints::FacetRequirement::kNotAllowed,
+ StageConstraints::TransactionRequirement::kNotAllowed,
+ StageConstraints::LookupRequirement::kNotAllowed,
+ StageConstraints::UnionRequirement::kAllowed);
+ ASSERT_TRUE(unionStage.constraints(Pipeline::SplitState::kUnsplit) == strict);
+}
+TEST_F(DocumentSourceUnionWithTest, StricterConstraintsFromSubSubPipelineAreInherited) {
+ const auto mock = DocumentSourceMock::createForTest();
+ StageConstraints strictConstraint(StageConstraints::StreamType::kStreaming,
+ StageConstraints::PositionRequirement::kNone,
+ StageConstraints::HostTypeRequirement::kAnyShard,
+ StageConstraints::DiskUseRequirement::kNoDiskUse,
+ StageConstraints::FacetRequirement::kAllowed,
+ StageConstraints::TransactionRequirement::kNotAllowed,
+ StageConstraints::LookupRequirement::kNotAllowed,
+ StageConstraints::UnionRequirement::kAllowed);
+ mock->mockConstraints = strictConstraint;
+ auto facetPipeline = uassertStatusOK(Pipeline::createFacetPipeline({mock}, getExpCtx()));
+ std::vector<DocumentSourceFacet::FacetPipeline> facets;
+ facets.emplace_back("pipeline", std::move(facetPipeline));
+ auto facetStage = DocumentSourceFacet::create(std::move(facets), getExpCtx());
+ auto unionStage = DocumentSourceUnionWith(
+ getExpCtx(),
+ uassertStatusOK(Pipeline::create(
+ std::list<boost::intrusive_ptr<DocumentSource>>{facetStage}, getExpCtx())));
+ StageConstraints expectedConstraints(StageConstraints::StreamType::kStreaming,
+ StageConstraints::PositionRequirement::kNone,
+ StageConstraints::HostTypeRequirement::kAnyShard,
+ StageConstraints::DiskUseRequirement::kNoDiskUse,
+ StageConstraints::FacetRequirement::kNotAllowed,
+ StageConstraints::TransactionRequirement::kNotAllowed,
+ StageConstraints::LookupRequirement::kNotAllowed,
+ StageConstraints::UnionRequirement::kAllowed);
+ ASSERT_TRUE(unionStage.constraints(Pipeline::SplitState::kUnsplit) == expectedConstraints);
+}
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/stage_constraints.cpp b/src/mongo/db/pipeline/stage_constraints.cpp
deleted file mode 100644
index d488b7f9954..00000000000
--- a/src/mongo/db/pipeline/stage_constraints.cpp
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/pipeline/stage_constraints.h"
-
-namespace mongo {
-constexpr StageConstraints::DiskUseAndTransactionRequirement
- StageConstraints::kDefaultDiskUseAndTransactionRequirement;
-} // namespace mongo
diff --git a/src/mongo/db/pipeline/stage_constraints.h b/src/mongo/db/pipeline/stage_constraints.h
index b4fee766281..d0e0c4ef2b8 100644
--- a/src/mongo/db/pipeline/stage_constraints.h
+++ b/src/mongo/db/pipeline/stage_constraints.h
@@ -111,41 +111,33 @@ struct StageConstraints {
*/
enum class UnionRequirement { kNotAllowed, kAllowed };
- using DiskUseAndTransactionRequirement = std::pair<DiskUseRequirement, TransactionRequirement>;
-
/**
- * By default, a stage is assumed to use no disk and be allowed to run in a transaction.
- */
- static constexpr auto kDefaultDiskUseAndTransactionRequirement =
- std::make_pair(DiskUseRequirement::kNoDiskUse, TransactionRequirement::kAllowed);
-
- /**
- * Given a 'pipeline' of DocumentSources, resolves the container's disk use requirement and
- * transaction requirement:
- *
- * - Returns the "strictest" DiskUseRequirement reported by the stages in 'pipeline',
- * where the strictness order is kNone < kWritesTmpData < kWritesPersistentData. For example,
- * in a pipeline where all three DiskUseRequirements are present, the return value will be
- * DiskUseRequirement::kWritesPersistentData.
- *
- * - Returns TransactionRequirement::kAllowed if and only if every DocumentSource in
- * 'pipeline' is allowed in a transaction.
+ * Returns the StageConstraints representing the strictest constraint of each type from the
+ * given pipeline. Does not compare StreamType and PositionRequirement because those values
+ * don't make sense as a property of a pipeline. Also does not compare HostTypeRequirement as
+ * there is no strict ordering of possible values. Those three values in the returned
+ * StageConstraints will always be the same as those passed in `defaultReqs`.
*/
template <typename DocumentSourceContainer>
- static DiskUseAndTransactionRequirement resolveDiskUseAndTransactionRequirement(
- const DocumentSourceContainer& pipeline,
- DiskUseAndTransactionRequirement defaultReqs = kDefaultDiskUseAndTransactionRequirement) {
- return std::accumulate(
- pipeline.begin(),
- pipeline.end(),
- defaultReqs,
- [](const DiskUseAndTransactionRequirement& constraints, const auto& stage) {
- const auto stageConstraints = stage->constraints();
- const auto diskUse = std::max(constraints.first, stageConstraints.diskRequirement);
- const auto txnReq =
- std::min(constraints.second, stageConstraints.transactionRequirement);
- return std::make_pair(diskUse, txnReq);
- });
+ static StageConstraints getStrictestConstraints(const DocumentSourceContainer& pipeline,
+ const StageConstraints& defaultReqs) {
+ auto newReqs = defaultReqs;
+ for (const auto& stage : pipeline) {
+ const auto stageConstraints = stage->constraints();
+ // Skip PositionRequirement, StreamType, and HostTypeRequirement, as it doesn't make
+ // sense to compare those values.
+ newReqs.diskRequirement =
+ std::max(newReqs.diskRequirement, stageConstraints.diskRequirement);
+ newReqs.facetRequirement =
+ std::max(newReqs.facetRequirement, stageConstraints.facetRequirement);
+ newReqs.transactionRequirement =
+ std::min(newReqs.transactionRequirement, stageConstraints.transactionRequirement);
+ newReqs.lookupRequirement =
+ std::min(newReqs.lookupRequirement, stageConstraints.lookupRequirement);
+ newReqs.unionRequirement =
+ std::min(newReqs.unionRequirement, stageConstraints.unionRequirement);
+ }
+ return newReqs;
}
StageConstraints(
@@ -278,35 +270,35 @@ struct StageConstraints {
}
// Indicates whether this stage needs to be at a particular position in the pipeline.
- const PositionRequirement requiredPosition;
+ PositionRequirement requiredPosition;
// Indicates whether this stage can only be executed on specific components of a sharded
// cluster.
- const HostTypeRequirement hostRequirement;
+ HostTypeRequirement hostRequirement;
// Indicates whether this stage may write persistent data to disk, or may spill to temporary
// files if its memory usage becomes excessive.
- const DiskUseRequirement diskRequirement;
+ DiskUseRequirement diskRequirement;
// Indicates whether this stage is itself a $changeStream stage, or if not whether it may
// exist in a pipeline which begins with $changeStream.
- const ChangeStreamRequirement changeStreamRequirement;
+ ChangeStreamRequirement changeStreamRequirement;
// Indicates whether this stage may run inside a $facet stage.
- const FacetRequirement facetRequirement;
+ FacetRequirement facetRequirement;
// Indicates whether this stage is legal when the readConcern level is "snapshot" or the
// aggregate is running inside of a multi-document transaction.
- const TransactionRequirement transactionRequirement;
+ TransactionRequirement transactionRequirement;
// Indicates whether this stage is allowed in a $lookup subpipeline.
- const LookupRequirement lookupRequirement;
+ LookupRequirement lookupRequirement;
// Indicates whether this stage is allowed in a $unionWith subpipeline.
- const UnionRequirement unionRequirement;
+ UnionRequirement unionRequirement;
// Indicates whether this is a streaming or blocking stage.
- const StreamType streamType;
+ StreamType streamType;
// True if this stage does not generate results itself, and instead pulls inputs from an
// input DocumentSource (via 'pSource').
@@ -332,5 +324,20 @@ struct StageConstraints {
// Indicates that a stage is allowed within a pipeline-stlye update.
bool isAllowedWithinUpdatePipeline = false;
+
+ bool operator==(const StageConstraints& other) const {
+ return requiredPosition == other.requiredPosition &&
+ hostRequirement == other.hostRequirement && diskRequirement == other.diskRequirement &&
+ changeStreamRequirement == other.changeStreamRequirement &&
+ facetRequirement == other.facetRequirement &&
+ transactionRequirement == other.transactionRequirement &&
+ lookupRequirement == other.lookupRequirement && streamType == other.streamType &&
+ requiresInputDocSource == other.requiresInputDocSource &&
+ isIndependentOfAnyCollection == other.isIndependentOfAnyCollection &&
+ canSwapWithMatch == other.canSwapWithMatch &&
+ canSwapWithLimitAndSample == other.canSwapWithLimitAndSample &&
+ isAllowedWithinUpdatePipeline == other.isAllowedWithinUpdatePipeline &&
+ unionRequirement == other.unionRequirement;
+ }
};
} // namespace mongo