diff options
Diffstat (limited to 'src/mongo/db/pipeline/pipeline.cpp')
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 23 |
1 files changed, 15 insertions, 8 deletions
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index e5cd62a4670..8b84fe36bff 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -62,6 +62,9 @@ using std::vector; namespace dps = ::mongo::dotted_path_support; +using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement; +using PositionRequirement = DocumentSource::StageConstraints::PositionRequirement; + Pipeline::Pipeline(const intrusive_ptr<ExpressionContext>& pTheCtx) : pCtx(pTheCtx) {} Pipeline::Pipeline(SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) @@ -171,8 +174,7 @@ Status Pipeline::validateFacetPipeline() const { // We expect a stage within a $facet stage to have these properties. invariant(stageConstraints.requiresInputDocSource); invariant(!stageConstraints.isIndependentOfAnyCollection); - invariant(stageConstraints.requiredPosition == - DocumentSource::StageConstraints::PositionRequirement::kNone); + invariant(stageConstraints.requiredPosition == PositionRequirement::kNone); } // Facet pipelines cannot have any stages which are initial sources. We've already validated the @@ -184,9 +186,7 @@ Status Pipeline::validateFacetPipeline() const { Status Pipeline::ensureAllStagesAreInLegalPositions() const { size_t i = 0; for (auto&& stage : _sources) { - if (stage->constraints().requiredPosition == - DocumentSource::StageConstraints::PositionRequirement::kFirst && - i != 0) { + if (stage->constraints().requiredPosition == PositionRequirement::kFirst && i != 0) { return {ErrorCodes::BadValue, str::stream() << stage->getSourceName() << " is only valid as the first stage in a pipeline.", @@ -199,8 +199,7 @@ Status Pipeline::ensureAllStagesAreInLegalPositions() const { 17313}; } - if (stage->constraints().requiredPosition == - DocumentSource::StageConstraints::PositionRequirement::kLast && + if (stage->constraints().requiredPosition == PositionRequirement::kLast && i != _sources.size() - 1) { return {ErrorCodes::BadValue, str::stream() << stage->getSourceName() @@ -312,6 +311,8 @@ std::unique_ptr<Pipeline, Pipeline::Deleter> Pipeline::splitForSharded() { shardPipeline->_splitForSharded = true; _splitForMerge = true; + stitch(); + return shardPipeline; } @@ -428,7 +429,13 @@ BSONObj Pipeline::getInitialQuery() const { bool Pipeline::needsPrimaryShardMerger() const { return std::any_of(_sources.begin(), _sources.end(), [](const auto& stage) { - return stage->constraints().mustRunOnPrimaryShardIfSharded; + return stage->constraints().hostRequirement == HostTypeRequirement::kPrimaryShard; + }); +} + +bool Pipeline::canRunOnMongos() const { + return std::all_of(_sources.begin(), _sources.end(), [](const auto& stage) { + return stage->constraints().hostRequirement == HostTypeRequirement::kAnyShardOrMongoS; }); } |