summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/pipeline.cpp')
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp23
1 files changed, 15 insertions, 8 deletions
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index e5cd62a4670..8b84fe36bff 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -62,6 +62,9 @@ using std::vector;
namespace dps = ::mongo::dotted_path_support;
+using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement;
+using PositionRequirement = DocumentSource::StageConstraints::PositionRequirement;
+
Pipeline::Pipeline(const intrusive_ptr<ExpressionContext>& pTheCtx) : pCtx(pTheCtx) {}
Pipeline::Pipeline(SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx)
@@ -171,8 +174,7 @@ Status Pipeline::validateFacetPipeline() const {
// We expect a stage within a $facet stage to have these properties.
invariant(stageConstraints.requiresInputDocSource);
invariant(!stageConstraints.isIndependentOfAnyCollection);
- invariant(stageConstraints.requiredPosition ==
- DocumentSource::StageConstraints::PositionRequirement::kNone);
+ invariant(stageConstraints.requiredPosition == PositionRequirement::kNone);
}
// Facet pipelines cannot have any stages which are initial sources. We've already validated the
@@ -184,9 +186,7 @@ Status Pipeline::validateFacetPipeline() const {
Status Pipeline::ensureAllStagesAreInLegalPositions() const {
size_t i = 0;
for (auto&& stage : _sources) {
- if (stage->constraints().requiredPosition ==
- DocumentSource::StageConstraints::PositionRequirement::kFirst &&
- i != 0) {
+ if (stage->constraints().requiredPosition == PositionRequirement::kFirst && i != 0) {
return {ErrorCodes::BadValue,
str::stream() << stage->getSourceName()
<< " is only valid as the first stage in a pipeline.",
@@ -199,8 +199,7 @@ Status Pipeline::ensureAllStagesAreInLegalPositions() const {
17313};
}
- if (stage->constraints().requiredPosition ==
- DocumentSource::StageConstraints::PositionRequirement::kLast &&
+ if (stage->constraints().requiredPosition == PositionRequirement::kLast &&
i != _sources.size() - 1) {
return {ErrorCodes::BadValue,
str::stream() << stage->getSourceName()
@@ -312,6 +311,8 @@ std::unique_ptr<Pipeline, Pipeline::Deleter> Pipeline::splitForSharded() {
shardPipeline->_splitForSharded = true;
_splitForMerge = true;
+ stitch();
+
return shardPipeline;
}
@@ -428,7 +429,13 @@ BSONObj Pipeline::getInitialQuery() const {
bool Pipeline::needsPrimaryShardMerger() const {
return std::any_of(_sources.begin(), _sources.end(), [](const auto& stage) {
- return stage->constraints().mustRunOnPrimaryShardIfSharded;
+ return stage->constraints().hostRequirement == HostTypeRequirement::kPrimaryShard;
+ });
+}
+
+bool Pipeline::canRunOnMongos() const {
+ return std::all_of(_sources.begin(), _sources.end(), [](const auto& stage) {
+ return stage->constraints().hostRequirement == HostTypeRequirement::kAnyShardOrMongoS;
});
}