summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2017-09-26 11:46:58 -0400
committerBernard Gorman <bernard.gorman@gmail.com>2017-09-27 22:12:14 -0400
commit55637833c707998f685f997d43624c52cde99b45 (patch)
treebbc00a719c14983e8984d1dbe8dbddd074e023a7 /src/mongo/db/pipeline
parent22c34669f744ea245c14a64c556d61f8932ceda9 (diff)
downloadmongo-55637833c707998f685f997d43624c52cde99b45.tar.gz
SERVER-30871 Permit blocking aggregation stages to run on mongoS if allowDiskUse is false
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/document_source.h69
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.h8
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto_test.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp17
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.h16
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.h9
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.h11
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.h8
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp38
-rw-r--r--src/mongo/db/pipeline/document_source_facet_test.cpp16
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.h9
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.h8
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_group.h10
-rw-r--r--src/mongo/db/pipeline/document_source_group_test.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_index_stats.h9
-rw-r--r--src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h8
-rw-r--r--src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp10
-rw-r--r--src/mongo/db/pipeline/document_source_internal_split_pipeline.h18
-rw-r--r--src/mongo/db/pipeline/document_source_limit.h14
-rw-r--r--src/mongo/db/pipeline/document_source_list_local_sessions.h12
-rw-r--r--src/mongo/db/pipeline/document_source_list_sessions.h10
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp10
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h30
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image.h8
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_match.h8
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.h10
-rw-r--r--src/mongo/db/pipeline/document_source_mock.h9
-rw-r--r--src/mongo/db/pipeline/document_source_out.h10
-rw-r--r--src/mongo/db/pipeline/document_source_redact.h8
-rw-r--r--src/mongo/db/pipeline/document_source_sample.h8
-rw-r--r--src/mongo/db/pipeline/document_source_sample_from_random_cursor.h8
-rw-r--r--src/mongo/db/pipeline/document_source_sequential_document_cache.h15
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.h7
-rw-r--r--src/mongo/db/pipeline/document_source_skip.h14
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h22
-rw-r--r--src/mongo/db/pipeline/document_source_sort_test.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_tee_consumer.h8
-rw-r--r--src/mongo/db/pipeline/document_source_unwind.h8
-rw-r--r--src/mongo/db/pipeline/expression_context.cpp4
-rw-r--r--src/mongo/db/pipeline/expression_context.h2
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp30
-rw-r--r--src/mongo/db/pipeline/pipeline.h11
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp9
48 files changed, 415 insertions, 174 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index eba4e2acc4b..934d9d29da2 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -132,20 +132,70 @@ public:
* A HostTypeRequirement defines where this stage is permitted to be executed when the
* pipeline is run on a sharded cluster.
*/
- enum class HostTypeRequirement { kPrimaryShard, kAnyShard, kAnyShardOrMongoS };
+ enum class HostTypeRequirement { kNone, kPrimaryShard, kAnyShard };
- // Set if this stage needs to be in a particular position of the pipeline.
- PositionRequirement requiredPosition = PositionRequirement::kNone;
+ /**
+ * A DiskUseRequirement indicates whether this stage writes to disk, or whether it may spill
+ * to disk if its memory usage exceeds a given threshold. Note that this only indicates the
+ * *ability* of the stage to spill; if 'allowDiskUse' is set to false, it will be prevented
+ * from doing so.
+ */
+ enum class DiskUseRequirement { kNoDiskUse, kWritesTmpData, kWritesPersistentData };
+
+ /**
+ * A FacetRequirement indicates whether this stage may be used within a $facet pipeline.
+ */
+ enum class FacetRequirement { kAllowed, kNotAllowed };
- // Set if this stage can only be executed on specific components of a sharded cluster.
- HostTypeRequirement hostRequirement = HostTypeRequirement::kAnyShard;
+ /**
+ * A StreamType defines whether this stage is streaming (can produce output based solely on
+ * the current input document) or blocking (must examine subsequent documents before
+ * producing an output document).
+ */
+ enum class StreamType { kStreaming, kBlocking };
+
+ StageConstraints(StreamType streamType,
+ PositionRequirement requiredPosition,
+ HostTypeRequirement hostRequirement,
+ DiskUseRequirement diskRequirement,
+ FacetRequirement facetRequirement)
+ : requiredPosition(requiredPosition),
+ hostRequirement(hostRequirement),
+ diskRequirement(diskRequirement),
+ facetRequirement(facetRequirement),
+ streamType(streamType) {
+ // Stages which are allowed to run in $facet pipelines must not have any specific
+ // position requirements.
+ invariant(!isAllowedInsideFacetStage() ||
+ requiredPosition == PositionRequirement::kNone);
+ }
- bool isAllowedInsideFacetStage = true;
+ // Indicates whether this stage needs to be at a particular position in the pipeline.
+ const PositionRequirement requiredPosition;
+
+ // Indicates whether this stage can only be executed on specific components of a sharded
+ // cluster.
+ const HostTypeRequirement hostRequirement;
+
+ // Indicates whether this stage may write persistent data to disk, or may spill to temporary
+ // files if its memory usage becomes excessive.
+ const DiskUseRequirement diskRequirement;
+
+ // Indicates whether this stage may run inside a $facet stage.
+ const FacetRequirement facetRequirement;
+
+ // Indicates whether this is a streaming or blocking stage.
+ const StreamType streamType;
// True if this stage does not generate results itself, and instead pulls inputs from an
// input DocumentSource (via 'pSource').
bool requiresInputDocSource = true;
+ // True if this stage should be permitted to run in a $facet pipeline.
+ bool isAllowedInsideFacetStage() const {
+ return facetRequirement == FacetRequirement::kAllowed;
+ }
+
// True if this stage operates on a global or database level, like $currentOp.
bool isIndependentOfAnyCollection = false;
@@ -165,6 +215,9 @@ public:
using HostTypeRequirement = StageConstraints::HostTypeRequirement;
using PositionRequirement = StageConstraints::PositionRequirement;
+ using DiskUseRequirement = StageConstraints::DiskUseRequirement;
+ using FacetRequirement = StageConstraints::FacetRequirement;
+ using StreamType = StageConstraints::StreamType;
/**
* This is what is returned from the main DocumentSource API: getNext(). It is essentially a
@@ -260,9 +313,7 @@ public:
* Returns a struct containing information about any special constraints imposed on using this
* stage.
*/
- virtual StageConstraints constraints() const {
- return StageConstraints{};
- }
+ virtual StageConstraints constraints() const = 0;
/**
* Informs the stage that it is no longer needed and can release its resources. After dispose()
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
index e874fd5d426..a8123f5f2fc 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
@@ -91,7 +91,7 @@ DocumentSource::GetNextResult DocumentSourceBucketAuto::populateSorter() {
if (!_sorter) {
SortOptions opts;
opts.maxMemoryUsageBytes = _maxMemoryUsageBytes;
- if (pExpCtx->extSortAllowed && !pExpCtx->inMongos) {
+ if (pExpCtx->allowDiskUse && !pExpCtx->inMongos) {
opts.extSortAllowed = true;
opts.tempDir = pExpCtx->tempDir;
}
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h
index 2a40b092c1f..60c5ed02501 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.h
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.h
@@ -48,6 +48,14 @@ public:
GetNextResult getNext() final;
const char* getSourceName() const final;
+ StageConstraints constraints() const final {
+ return {StreamType::kBlocking,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kWritesTmpData,
+ FacetRequirement::kAllowed};
+ }
+
/**
* The $bucketAuto stage must be run on the merging shard.
*/
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
index 08926254480..df19b4eb497 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
@@ -350,7 +350,7 @@ TEST_F(BucketAutoTests, ShouldBeAbleToCorrectlySpillToDisk) {
auto expCtx = getExpCtx();
unittest::TempDir tempDir("DocumentSourceBucketAutoTest");
expCtx->tempDir = tempDir.path();
- expCtx->extSortAllowed = true;
+ expCtx->allowDiskUse = true;
const size_t maxMemoryUsageBytes = 1000;
VariablesParseState vps = expCtx->variablesParseState;
@@ -386,7 +386,7 @@ TEST_F(BucketAutoTests, ShouldBeAbleToPauseLoadingWhileSpilled) {
// Allow the $sort stage to spill to disk.
unittest::TempDir tempDir("DocumentSourceBucketAutoTest");
expCtx->tempDir = tempDir.path();
- expCtx->extSortAllowed = true;
+ expCtx->allowDiskUse = true;
const size_t maxMemoryUsageBytes = 1000;
VariablesParseState vps = expCtx->variablesParseState;
@@ -647,22 +647,22 @@ void assertCannotSpillToDisk(const boost::intrusive_ptr<ExpressionContext>& expC
TEST_F(BucketAutoTests, ShouldFailIfBufferingTooManyDocuments) {
auto expCtx = getExpCtx();
- expCtx->extSortAllowed = false;
+ expCtx->allowDiskUse = false;
expCtx->inMongos = false;
assertCannotSpillToDisk(expCtx);
- expCtx->extSortAllowed = true;
+ expCtx->allowDiskUse = true;
expCtx->inMongos = true;
assertCannotSpillToDisk(expCtx);
- expCtx->extSortAllowed = false;
+ expCtx->allowDiskUse = false;
expCtx->inMongos = true;
assertCannotSpillToDisk(expCtx);
}
TEST_F(BucketAutoTests, ShouldCorrectlyTrackMemoryUsageBetweenPauses) {
auto expCtx = getExpCtx();
- expCtx->extSortAllowed = false;
+ expCtx->allowDiskUse = false;
const size_t maxMemoryUsageBytes = 1000;
VariablesParseState vps = expCtx->variablesParseState;
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index a38a1aef2ce..40ceaa5b1b7 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -97,10 +97,11 @@ const char* DocumentSourceOplogMatch::getSourceName() const {
}
DocumentSource::StageConstraints DocumentSourceOplogMatch::constraints() const {
- StageConstraints constraints;
- constraints.requiredPosition = PositionRequirement::kFirst;
- constraints.isAllowedInsideFacetStage = false;
- return constraints;
+ return {StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed};
}
/**
@@ -142,6 +143,14 @@ public:
return "$changeStream";
}
+ StageConstraints constraints() const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed};
+ }
+
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
// This stage is created by the DocumentSourceChangeStream stage, so serializing it
// here would result in it being created twice.
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h
index 22441f2a03a..cfbef21088d 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.h
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.h
@@ -60,6 +60,14 @@ public:
GetNextResult getNext() final;
const char* getSourceName() const final;
+ StageConstraints constraints() const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed};
+ }
+
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create(
@@ -87,6 +95,14 @@ public:
GetNextResult getNext() final;
const char* getSourceName() const final;
+ StageConstraints constraints() const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed};
+ }
+
/**
* SplittableDocumentSource methods; this has to run on the merger, since the resume point could
* be at any shard.
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h
index 2903241d1a7..0c319729328 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.h
+++ b/src/mongo/db/pipeline/document_source_coll_stats.h
@@ -75,10 +75,13 @@ public:
const char* getSourceName() const final;
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.requiredPosition = PositionRequirement::kFirst;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed);
+
constraints.requiresInputDocSource = false;
- constraints.isAllowedInsideFacetStage = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h
index d5138e74d2a..6fa869712b1 100644
--- a/src/mongo/db/pipeline/document_source_current_op.h
+++ b/src/mongo/db/pipeline/document_source_current_op.h
@@ -79,11 +79,14 @@ public:
const char* getSourceName() const final;
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.requiredPosition = PositionRequirement::kFirst;
- constraints.requiresInputDocSource = false;
- constraints.isAllowedInsideFacetStage = false;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed);
+
constraints.isIndependentOfAnyCollection = true;
+ constraints.requiresInputDocSource = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h
index f9c9345baf6..674bbaa85ee 100644
--- a/src/mongo/db/pipeline/document_source_cursor.h
+++ b/src/mongo/db/pipeline/document_source_cursor.h
@@ -52,8 +52,12 @@ public:
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.requiredPosition = PositionRequirement::kFirst;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed);
+
constraints.requiresInputDocSource = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index c7f82678eb1..6c58124387b 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -238,21 +238,29 @@ void DocumentSourceFacet::doReattachToOperationContext(OperationContext* opCtx)
}
DocumentSource::StageConstraints DocumentSourceFacet::constraints() const {
- StageConstraints constraints;
- constraints.isAllowedInsideFacetStage = false; // Disallow nested $facets.
-
- for (auto&& facet : _facets) {
- for (auto&& nestedStage : facet.pipeline->getSources()) {
- if (nestedStage->constraints().hostRequirement == HostTypeRequirement::kPrimaryShard) {
- // Currently we don't split $facet to have a merger part and a shards part (see
- // SERVER-24154). This means that if any stage in any of the $facet pipelines
- // requires the primary shard, then the entire $facet must happen on the merger, and
- // the merger must be the primary shard.
- constraints.hostRequirement = HostTypeRequirement::kPrimaryShard;
- }
- }
- }
- return constraints;
+ const bool mayUseDisk = std::any_of(_facets.begin(), _facets.end(), [&](const auto& facet) {
+ const auto sources = facet.pipeline->getSources();
+ return std::any_of(sources.begin(), sources.end(), [&](const auto source) {
+ return source->constraints().diskRequirement == DiskUseRequirement::kWritesTmpData;
+ });
+ });
+
+ // Currently we don't split $facet to have a merger part and a shards part (see SERVER-24154).
+ // This means that if any stage in any of the $facet pipelines requires the primary shard, then
+ // the entire $facet must happen on the merger, and the merger must be the primary shard.
+ const bool needsPrimaryShard =
+ std::any_of(_facets.begin(), _facets.end(), [&](const auto& facet) {
+ const auto sources = facet.pipeline->getSources();
+ return std::any_of(sources.begin(), sources.end(), [&](const auto source) {
+ return source->constraints().hostRequirement == HostTypeRequirement::kPrimaryShard;
+ });
+ });
+
+ return {StreamType::kBlocking,
+ PositionRequirement::kNone,
+ needsPrimaryShard ? HostTypeRequirement::kPrimaryShard : HostTypeRequirement::kAnyShard,
+ mayUseDisk ? DiskUseRequirement::kWritesTmpData : DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed};
}
DocumentSource::GetDepsReturn DocumentSourceFacet::getDependencies(DepsTracker* deps) const {
diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp
index be05268fb7b..61d112906c7 100644
--- a/src/mongo/db/pipeline/document_source_facet_test.cpp
+++ b/src/mongo/db/pipeline/document_source_facet_test.cpp
@@ -186,9 +186,11 @@ public:
DocumentSourcePassthrough() : DocumentSourceMock({}) {}
StageConstraints constraints() const override {
- StageConstraints constraints;
- constraints.isAllowedInsideFacetStage = true;
- return constraints;
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed};
}
DocumentSource::GetNextResult getNext() final {
@@ -625,9 +627,11 @@ TEST_F(DocumentSourceFacetTest, ShouldThrowIfAnyPipelineRequiresTextScoreButItIs
class DocumentSourceNeedsPrimaryShard final : public DocumentSourcePassthrough {
public:
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kPrimaryShard;
- return constraints;
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kPrimaryShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed};
}
static boost::intrusive_ptr<DocumentSourceNeedsPrimaryShard> create() {
diff --git a/src/mongo/db/pipeline/document_source_geo_near.cpp b/src/mongo/db/pipeline/document_source_geo_near.cpp
index c971509e373..f7b7a5ca741 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.cpp
+++ b/src/mongo/db/pipeline/document_source_geo_near.cpp
@@ -61,7 +61,7 @@ DocumentSource::GetNextResult DocumentSourceGeoNear::getNext() {
if (!resultsIterator->more())
return GetNextResult::makeEOF();
- // each result from the geoNear command is wrapped in a wrapper object with "obj",
+ // Each result from the geoNear command is wrapped in a wrapper object with "obj",
// "dis" and maybe "loc" fields. We want to take the object from "obj" and inject the
// other fields into it.
Document result(resultsIterator->next().embeddedObject());
@@ -70,6 +70,11 @@ DocumentSource::GetNextResult DocumentSourceGeoNear::getNext() {
if (includeLocs)
output.setNestedField(*includeLocs, result["loc"]);
+ // In a cluster, $geoNear output will be merged via $sort, so add the sort key.
+ if (pExpCtx->needsMerge) {
+ output.setSortKeyMetaField(BSON("" << result["dis"]));
+ }
+
return output.freeze();
}
@@ -89,12 +94,13 @@ Pipeline::SourceContainer::iterator DocumentSourceGeoNear::doOptimizeAt(
}
// This command is sent as-is to the shards.
-// On router this becomes a sort by distance (nearest-first) with limit.
intrusive_ptr<DocumentSource> DocumentSourceGeoNear::getShardSource() {
return this;
}
+// On mongoS this becomes a merge sort by distance (nearest-first) with limit.
intrusive_ptr<DocumentSource> DocumentSourceGeoNear::getMergeSource() {
- return DocumentSourceSort::create(pExpCtx, BSON(distanceField->fullPath() << 1), limit);
+ return DocumentSourceSort::create(
+ pExpCtx, BSON(distanceField->fullPath() << 1 << "$mergePresorted" << true), limit);
}
Value DocumentSourceGeoNear::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h
index 315dcea6dd4..23e7c4e901c 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.h
+++ b/src/mongo/db/pipeline/document_source_geo_near.h
@@ -48,10 +48,13 @@ public:
Pipeline::SourceContainer* container) final;
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.requiredPosition = PositionRequirement::kFirst;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed);
+
constraints.requiresInputDocSource = false;
- constraints.isAllowedInsideFacetStage = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h
index 53394d3d0ab..afd633d46bc 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.h
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.h
@@ -54,9 +54,13 @@ public:
GetModPathsReturn getModifiedPaths() const final;
StageConstraints constraints() const final {
- StageConstraints constraints;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kPrimaryShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed);
+
constraints.canSwapWithMatch = true;
- constraints.hostRequirement = HostTypeRequirement::kPrimaryShard;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index ba2c48680f2..41b2546e331 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -266,7 +266,7 @@ DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>&
_initialized(false),
_groups(pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>()),
_spilled(false),
- _extSortAllowed(pExpCtx->extSortAllowed && !pExpCtx->inMongos) {}
+ _allowDiskUse(pExpCtx->allowDiskUse && !pExpCtx->inMongos) {}
void DocumentSourceGroup::addAccumulator(AccumulationStatement accumulationStatement) {
_accumulatedFields.push_back(accumulationStatement);
@@ -485,7 +485,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
uassert(16945,
"Exceeded memory limit for $group, but didn't allow external sort."
" Pass allowDiskUse:true to opt in.",
- _extSortAllowed);
+ _allowDiskUse);
_sortedFiles.push_back(spill());
_memoryUsageBytes = 0;
}
@@ -531,7 +531,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
// In debug mode, spill every time we have a duplicate id to stress merge logic.
if (!inserted && // is a dup
!pExpCtx->inMongos && // can't spill to disk in mongos
- !_extSortAllowed && // don't change behavior when testing external sort
+ !_allowDiskUse && // don't change behavior when testing external sort
_sortedFiles.size() < 20) { // don't open too many FDs
_sortedFiles.push_back(spill());
diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h
index cf108c15f24..057eb58164a 100644
--- a/src/mongo/db/pipeline/document_source_group.h
+++ b/src/mongo/db/pipeline/document_source_group.h
@@ -69,6 +69,14 @@ public:
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ StageConstraints constraints() const final {
+ return {StreamType::kBlocking,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kWritesTmpData,
+ FacetRequirement::kAllowed};
+ }
+
/**
* Add an accumulator, which will become a field in each Document that results from grouping.
*/
@@ -176,7 +184,7 @@ private:
// Only used when '_spilled' is true.
std::unique_ptr<Sorter<Value, Value>::Iterator> _sorterIterator;
- const bool _extSortAllowed;
+ const bool _allowDiskUse;
std::pair<Value, Value> _firstPartOfNextGroup;
// Only used when '_sorted' is true.
diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp
index 5cc5ba93be4..8e3fc90521b 100644
--- a/src/mongo/db/pipeline/document_source_group_test.cpp
+++ b/src/mongo/db/pipeline/document_source_group_test.cpp
@@ -103,7 +103,7 @@ TEST_F(DocumentSourceGroupTest, ShouldBeAbleToPauseLoadingWhileSpilled) {
// Allow the $group stage to spill to disk.
TempDir tempDir("DocumentSourceGroupTest");
expCtx->tempDir = tempDir.path();
- expCtx->extSortAllowed = true;
+ expCtx->allowDiskUse = true;
const size_t maxMemoryUsageBytes = 1000;
VariablesParseState vps = expCtx->variablesParseState;
diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h
index 83546654361..c5d4d7c2762 100644
--- a/src/mongo/db/pipeline/document_source_index_stats.h
+++ b/src/mongo/db/pipeline/document_source_index_stats.h
@@ -70,10 +70,13 @@ public:
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.requiredPosition = PositionRequirement::kFirst;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed);
+
constraints.requiresInputDocSource = false;
- constraints.isAllowedInsideFacetStage = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
index d1967b6625f..839eebecc3f 100644
--- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
+++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
@@ -53,9 +53,11 @@ public:
}
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
- return constraints;
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed};
}
GetNextResult getNext() final;
diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp b/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp
index a49514cf628..276cf3005d8 100644
--- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp
@@ -47,7 +47,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalSplitPipeline::create
auto specObj = elem.embeddedObject();
- HostTypeRequirement mergeType = HostTypeRequirement::kAnyShard;
+ HostTypeRequirement mergeType = HostTypeRequirement::kNone;
for (auto&& elt : specObj) {
if (elt.fieldNameStringData() == "mergeType"_sd) {
@@ -62,7 +62,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalSplitPipeline::create
} else if ("primaryShard"_sd == mergeTypeString) {
mergeType = HostTypeRequirement::kPrimaryShard;
} else if ("mongos"_sd == mergeTypeString) {
- mergeType = HostTypeRequirement::kAnyShardOrMongoS;
+ mergeType = HostTypeRequirement::kNone;
} else {
uasserted(ErrorCodes::BadValue,
str::stream() << "unrecognized field while parsing mergeType: '"
@@ -90,8 +90,8 @@ Value DocumentSourceInternalSplitPipeline::serialize(
std::string mergeTypeString;
switch (_mergeType) {
- case HostTypeRequirement::kAnyShardOrMongoS:
- mergeTypeString = "mongos";
+ case HostTypeRequirement::kAnyShard:
+ mergeTypeString = "anyShard";
break;
case HostTypeRequirement::kPrimaryShard:
@@ -99,7 +99,7 @@ Value DocumentSourceInternalSplitPipeline::serialize(
break;
default:
- mergeTypeString = "anyShard";
+ mergeTypeString = "mongos";
break;
}
diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
index f9ac84c555f..811de4b7e23 100644
--- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
+++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
@@ -36,10 +36,10 @@ namespace mongo {
* An internal stage available for testing. Acts as a simple passthrough of intermediate results
* from the source stage, but forces the pipeline to split at the point where this stage appears
* (assuming that no earlier splitpoints exist). Takes a single parameter, 'mergeType', which can be
- * one of 'anyShard', 'primaryShard' or 'mongos' to control where the merge may occur. Omitting this
- * parameter or specifying 'anyShard' produces the default merging behaviour; the merge half of the
- * pipeline will be sent to a random participating shard, subject to the requirements of any
- * subsequent splittable stages in the pipeline.
+ * one of 'primaryShard', 'anyShard' or 'mongos' to control where the merge may occur. Omitting this
+ * parameter or specifying 'mongos' produces the default merging behaviour; the merge half of the
+ * pipeline will be executed on mongoS if all other stages are eligible, and will be sent to a
+ * random participating shard otherwise.
*/
class DocumentSourceInternalSplitPipeline final : public DocumentSource,
public SplittableDocumentSource {
@@ -67,9 +67,11 @@ public:
}
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.hostRequirement = _mergeType;
- return constraints;
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ _mergeType,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed};
}
GetNextResult getNext() final;
@@ -80,7 +82,7 @@ private:
: DocumentSource(expCtx), _mergeType(mergeType) {}
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
- HostTypeRequirement _mergeType = HostTypeRequirement::kAnyShard;
+ HostTypeRequirement _mergeType = HostTypeRequirement::kNone;
};
} // namesace mongo
diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h
index 7bf47638b21..b74a053028f 100644
--- a/src/mongo/db/pipeline/document_source_limit.h
+++ b/src/mongo/db/pipeline/document_source_limit.h
@@ -48,6 +48,14 @@ public:
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ StageConstraints constraints() const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed};
+ }
+
GetNextResult getNext() final;
const char* getSourceName() const final {
return kStageName.rawData();
@@ -58,12 +66,6 @@ public:
: SimpleBSONObjComparator::kInstance.makeBSONObjSet();
}
- StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
- return constraints;
- }
-
/**
* Attempts to combine with a subsequent $limit stage, setting 'limit' appropriately.
*/
diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h
index 23e7ce97592..01321d596a6 100644
--- a/src/mongo/db/pipeline/document_source_list_local_sessions.h
+++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h
@@ -94,13 +94,15 @@ public:
}
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
- constraints.hostRequirement = StageConstraints::HostTypeRequirement::kAnyShardOrMongoS;
- constraints.requiresInputDocSource = false;
- constraints.isAllowedInsideFacetStage = false;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed);
+
constraints.isIndependentOfAnyCollection = true;
constraints.allowedToForwardFromMongos = false;
+ constraints.requiresInputDocSource = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_list_sessions.h b/src/mongo/db/pipeline/document_source_list_sessions.h
index 47c08af9d73..4d050239e4c 100644
--- a/src/mongo/db/pipeline/document_source_list_sessions.h
+++ b/src/mongo/db/pipeline/document_source_list_sessions.h
@@ -86,11 +86,11 @@ public:
}
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
- constraints.requiresInputDocSource = true;
- constraints.isAllowedInsideFacetStage = false;
- return constraints;
+ return {StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed};
}
static boost::intrusive_ptr<DocumentSource> createFromBson(
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 21f3151f59c..3a8de609c9f 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -118,6 +118,8 @@ DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
Expression::parseOperand(pExpCtx, varElem, pExpCtx->variablesParseState),
_variablesParseState.defineVariable(varName));
}
+
+ initializeIntrospectionPipeline();
}
std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LiteParsed::parse(
@@ -659,16 +661,14 @@ void DocumentSourceLookUp::serializeToArray(
DocumentSource::GetDepsReturn DocumentSourceLookUp::getDependencies(DepsTracker* deps) const {
if (wasConstructedWithPipelineSyntax()) {
- // Copy all 'let' variables into the foreign pipeline's expression context.
- copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
-
- auto pipeline = uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx));
+ // We will use the introspection pipeline which we prebuilt during construction.
+ invariant(_parsedIntrospectionPipeline);
DepsTracker subDeps(deps->getMetadataAvailable());
// Get the subpipeline dependencies. Subpipeline stages may reference both 'let' variables
// declared by this $lookup and variables declared externally.
- for (auto&& source : pipeline->getSources()) {
+ for (auto&& source : _parsedIntrospectionPipeline->getSources()) {
source->getDependencies(&subDeps);
}
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index 9727d0e92db..7cf136b0cf9 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -96,9 +96,22 @@ public:
GetModPathsReturn getModifiedPaths() const final;
StageConstraints constraints() const final {
- StageConstraints constraints;
+ const bool mayUseDisk = wasConstructedWithPipelineSyntax() &&
+ std::any_of(_parsedIntrospectionPipeline->getSources().begin(),
+ _parsedIntrospectionPipeline->getSources().end(),
+ [](const auto& source) {
+ return source->constraints().diskRequirement ==
+ DiskUseRequirement::kWritesTmpData;
+ });
+
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kPrimaryShard,
+ mayUseDisk ? DiskUseRequirement::kWritesTmpData
+ : DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed);
+
constraints.canSwapWithMatch = true;
- constraints.hostRequirement = HostTypeRequirement::kPrimaryShard;
return constraints;
}
@@ -243,6 +256,16 @@ private:
void resolveLetVariables(const Document& localDoc, Variables* variables);
/**
+ * Builds a parsed pipeline for introspection (e.g. constraints, dependencies). Any sub-$lookup
+ * pipelines will be built recursively.
+ */
+ void initializeIntrospectionPipeline() {
+ copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
+ _parsedIntrospectionPipeline =
+ uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx));
+ }
+
+ /**
* Builds the $lookup pipeline and resolves any variables using the passed 'inputDoc', adding a
* cursor and/or cache source as appropriate.
*/
@@ -296,6 +319,9 @@ private:
// The aggregation pipeline defined with the user request, prior to optimization and view
// resolution.
std::vector<BSONObj> _userPipeline;
+ // A pipeline parsed from _resolvedPipeline at creation time, intended to support introspective
+ // functions. If sub-$lookup stages are present, their pipelines are constructed recursively.
+ std::unique_ptr<Pipeline, Pipeline::Deleter> _parsedIntrospectionPipeline;
std::vector<LetVariable> _letVariables;
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
index c51653ddb8d..02cfa7435d4 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
@@ -62,9 +62,13 @@ public:
}
StageConstraints constraints() const final {
- StageConstraints constraints;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed);
+
constraints.canSwapWithMatch = true;
- constraints.isAllowedInsideFacetStage = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index dcbc2b55814..03ea80c5d7d 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -162,7 +162,7 @@ TEST_F(DocumentSourceLookUpTest, AcceptsPipelineWithLetSyntax) {
<< "pipeline"
<< BSON_ARRAY(BSON("$project" << BSON("hasX"
<< "$$var1"))
- << BSON("$match" << BSON("$hasX" << true)))
+ << BSON("$match" << BSON("hasX" << true)))
<< "as"
<< "as"))
.firstElement(),
@@ -448,9 +448,9 @@ public:
Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
Pipeline* pipeline) final {
while (_removeLeadingQueryStages && !pipeline->getSources().empty()) {
- if (pipeline->popFrontStageWithName("$match") ||
- pipeline->popFrontStageWithName("$sort") ||
- pipeline->popFrontStageWithName("$project")) {
+ if (pipeline->popFrontWithCriteria("$match") ||
+ pipeline->popFrontWithCriteria("$sort") ||
+ pipeline->popFrontWithCriteria("$project")) {
continue;
}
break;
diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h
index 6b2fc653d43..26d912928eb 100644
--- a/src/mongo/db/pipeline/document_source_match.h
+++ b/src/mongo/db/pipeline/document_source_match.h
@@ -51,9 +51,11 @@ public:
const char* getSourceName() const override;
StageConstraints constraints() const override {
- StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
- return constraints;
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed};
}
Value serialize(
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h
index 63521e2c742..2e334d5e0ce 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.h
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.h
@@ -59,11 +59,13 @@ public:
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kAnyShard;
- constraints.requiredPosition = PositionRequirement::kFirst;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed);
+
constraints.requiresInputDocSource = false;
- constraints.isAllowedInsideFacetStage = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h
index 22f7a1aa24d..6cc7f5aed69 100644
--- a/src/mongo/db/pipeline/document_source_mock.h
+++ b/src/mongo/db/pipeline/document_source_mock.h
@@ -50,10 +50,13 @@ public:
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override;
StageConstraints constraints() const override {
- StageConstraints constraints;
- constraints.requiredPosition = PositionRequirement::kFirst;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed);
+
constraints.requiresInputDocSource = false;
- constraints.isAllowedInsideFacetStage = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 6756cd4df2a..ec8c188c0b7 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -45,11 +45,11 @@ public:
GetDepsReturn getDependencies(DepsTracker* deps) const final;
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kPrimaryShard;
- constraints.isAllowedInsideFacetStage = false;
- constraints.requiredPosition = PositionRequirement::kLast;
- return constraints;
+ return {StreamType::kStreaming,
+ PositionRequirement::kLast,
+ HostTypeRequirement::kPrimaryShard,
+ DiskUseRequirement::kWritesPersistentData,
+ FacetRequirement::kNotAllowed};
}
// Virtuals for SplittableDocumentSource
diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h
index bedc434d61e..db1698fe776 100644
--- a/src/mongo/db/pipeline/document_source_redact.h
+++ b/src/mongo/db/pipeline/document_source_redact.h
@@ -41,9 +41,11 @@ public:
boost::intrusive_ptr<DocumentSource> optimize() final;
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
- return constraints;
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed};
}
/**
diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h
index 07a85c77e33..d5a86d9c008 100644
--- a/src/mongo/db/pipeline/document_source_sample.h
+++ b/src/mongo/db/pipeline/document_source_sample.h
@@ -44,9 +44,11 @@ public:
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
- return constraints;
+ return {StreamType::kBlocking,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kWritesTmpData,
+ FacetRequirement::kAllowed};
}
GetDepsReturn getDependencies(DepsTracker* deps) const final {
diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h
index 19b7106b03d..8d10664f6ff 100644
--- a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h
+++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h
@@ -44,6 +44,14 @@ public:
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
GetDepsReturn getDependencies(DepsTracker* deps) const final;
+ StageConstraints constraints() const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed};
+ }
+
static boost::intrusive_ptr<DocumentSourceSampleFromRandomCursor> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
long long size,
diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h
index af96ae8e2ab..1d2474235f2 100644
--- a/src/mongo/db/pipeline/document_source_sequential_document_cache.h
+++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h
@@ -50,13 +50,14 @@ public:
}
StageConstraints constraints() const {
- StageConstraints constraints;
-
- if (_cache->isServing()) {
- constraints.requiredPosition = PositionRequirement::kFirst;
- constraints.requiresInputDocSource = false;
- }
-
+ StageConstraints constraints(StreamType::kStreaming,
+ _cache->isServing() ? PositionRequirement::kFirst
+ : PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed);
+
+ constraints.requiresInputDocSource = (_cache->isBuilding());
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h
index 188e9864310..bdf6eac8d05 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.h
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h
@@ -102,8 +102,11 @@ public:
GetModPathsReturn getModifiedPaths() const final;
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed);
constraints.canSwapWithMatch = true;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h
index fc87d7e1eaa..4e10f9ac852 100644
--- a/src/mongo/db/pipeline/document_source_skip.h
+++ b/src/mongo/db/pipeline/document_source_skip.h
@@ -50,18 +50,20 @@ public:
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ StageConstraints constraints() const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed};
+ }
+
GetNextResult getNext() final;
const char* getSourceName() const final {
return kStageName.rawData();
}
- StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
- return constraints;
- }
-
/**
* Attempts to move a subsequent $limit before the skip, potentially allowing for forther
* optimizations earlier in the pipeline.
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index 97923ff8118..e32c7d418a6 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -304,7 +304,7 @@ SortOptions DocumentSourceSort::makeSortOptions() const {
opts.limit = limitSrc->getLimit();
opts.maxMemoryUsageBytes = _maxMemoryUsageBytes;
- if (pExpCtx->extSortAllowed && !pExpCtx->inMongos) {
+ if (pExpCtx->allowDiskUse && !pExpCtx->inMongos) {
opts.extSortAllowed = true;
opts.tempDir = pExpCtx->tempDir;
}
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index 2494400eaae..c4ecc799a5f 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -63,14 +63,15 @@ public:
}
StageConstraints constraints() const final {
- StageConstraints constraints;
- // Can't swap with a $match if a limit has been absorbed, since in general match can't swap
- // with limit.
+ StageConstraints constraints(
+ _mergingPresorted ? StreamType::kStreaming : StreamType::kBlocking,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ _mergingPresorted ? DiskUseRequirement::kNoDiskUse : DiskUseRequirement::kWritesTmpData,
+ FacetRequirement::kAllowed);
+
+ // Can't swap with a $match if a limit has been absorbed, as $match can't swap with $limit.
constraints.canSwapWithMatch = !limitSrc;
-
- // Can run on mongoS only if this stage is merging presorted streams.
- constraints.hostRequirement = (_mergingPresorted ? HostTypeRequirement::kAnyShardOrMongoS
- : HostTypeRequirement::kAnyShard);
return constraints;
}
@@ -104,6 +105,13 @@ public:
uint64_t maxMemoryUsageBytes = kMaxMemoryUsageBytes);
/**
+ * Returns true if this $sort stage is merging presorted streams.
+ */
+ bool mergingPresorted() const {
+ return _mergingPresorted;
+ }
+
+ /**
* Returns -1 for no limit.
*/
long long getLimit() const;
diff --git a/src/mongo/db/pipeline/document_source_sort_test.cpp b/src/mongo/db/pipeline/document_source_sort_test.cpp
index 25362138a9c..2bc39427a37 100644
--- a/src/mongo/db/pipeline/document_source_sort_test.cpp
+++ b/src/mongo/db/pipeline/document_source_sort_test.cpp
@@ -402,7 +402,7 @@ TEST_F(DocumentSourceSortExecutionTest, ShouldBeAbleToPauseLoadingWhileSpilled)
// Allow the $sort stage to spill to disk.
unittest::TempDir tempDir("DocumentSourceSortTest");
expCtx->tempDir = tempDir.path();
- expCtx->extSortAllowed = true;
+ expCtx->allowDiskUse = true;
const size_t maxMemoryUsageBytes = 1000;
auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes);
@@ -436,7 +436,7 @@ TEST_F(DocumentSourceSortExecutionTest, ShouldBeAbleToPauseLoadingWhileSpilled)
TEST_F(DocumentSourceSortExecutionTest,
ShouldErrorIfNotAllowedToSpillToDiskAndResultSetIsTooLarge) {
auto expCtx = getExpCtx();
- expCtx->extSortAllowed = false;
+ expCtx->allowDiskUse = false;
const size_t maxMemoryUsageBytes = 1000;
auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes);
@@ -451,7 +451,7 @@ TEST_F(DocumentSourceSortExecutionTest,
TEST_F(DocumentSourceSortExecutionTest, ShouldCorrectlyTrackMemoryUsageBetweenPauses) {
auto expCtx = getExpCtx();
- expCtx->extSortAllowed = false;
+ expCtx->allowDiskUse = false;
const size_t maxMemoryUsageBytes = 1000;
auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes);
diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.h b/src/mongo/db/pipeline/document_source_tee_consumer.h
index 826967a5c48..e20232457a1 100644
--- a/src/mongo/db/pipeline/document_source_tee_consumer.h
+++ b/src/mongo/db/pipeline/document_source_tee_consumer.h
@@ -53,6 +53,14 @@ public:
size_t facetId,
const boost::intrusive_ptr<TeeBuffer>& bufferSource);
+ StageConstraints constraints() const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed};
+ }
+
GetNextResult getNext() final;
/**
diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h
index 5bc9a91afdb..c95a979dc56 100644
--- a/src/mongo/db/pipeline/document_source_unwind.h
+++ b/src/mongo/db/pipeline/document_source_unwind.h
@@ -47,8 +47,12 @@ public:
GetModPathsReturn getModifiedPaths() const final;
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed);
+
constraints.canSwapWithMatch = true;
return constraints;
}
diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp
index 8f15fb5a3fe..db80c5f7234 100644
--- a/src/mongo/db/pipeline/expression_context.cpp
+++ b/src/mongo/db/pipeline/expression_context.cpp
@@ -46,7 +46,7 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx,
: explain(request.getExplain()),
fromMongos(request.isFromMongos()),
needsMerge(request.needsMerge()),
- extSortAllowed(request.shouldAllowDiskUse()),
+ allowDiskUse(request.shouldAllowDiskUse()),
bypassDocumentValidation(request.shouldBypassDocumentValidation()),
from34Mongos(request.isFrom34Mongos()),
ns(request.getNamespaceString()),
@@ -84,7 +84,7 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(NamespaceString ns,
expCtx->fromMongos = fromMongos;
expCtx->from34Mongos = from34Mongos;
expCtx->inMongos = inMongos;
- expCtx->extSortAllowed = extSortAllowed;
+ expCtx->allowDiskUse = allowDiskUse;
expCtx->bypassDocumentValidation = bypassDocumentValidation;
expCtx->tempDir = tempDir;
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index 4fe6bc8e541..d580644eeba 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -117,7 +117,7 @@ public:
bool fromMongos = false;
bool needsMerge = false;
bool inMongos = false;
- bool extSortAllowed = false;
+ bool allowDiskUse = false;
bool bypassDocumentValidation = false;
// We track whether the aggregation request came from a 3.4 mongos. If so, the merge may occur
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 34e04912e49..631228a8ddb 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -64,6 +64,9 @@ namespace dps = ::mongo::dotted_path_support;
using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement;
using PositionRequirement = DocumentSource::StageConstraints::PositionRequirement;
+using DiskUseRequirement = DocumentSource::StageConstraints::DiskUseRequirement;
+using FacetRequirement = DocumentSource::StageConstraints::FacetRequirement;
+using StreamType = DocumentSource::StageConstraints::StreamType;
Pipeline::Pipeline(const intrusive_ptr<ExpressionContext>& pTheCtx) : pCtx(pTheCtx) {}
@@ -169,15 +172,14 @@ void Pipeline::validateFacetPipeline() const {
}
for (auto&& stage : _sources) {
auto stageConstraints = stage->constraints();
- if (!stageConstraints.isAllowedInsideFacetStage) {
+ if (!stageConstraints.isAllowedInsideFacetStage()) {
uasserted(40600,
str::stream() << stage->getSourceName()
<< " is not allowed to be used within a $facet stage");
}
// We expect a stage within a $facet stage to have these properties.
- invariant(stageConstraints.requiresInputDocSource);
- invariant(!stageConstraints.isIndependentOfAnyCollection);
invariant(stageConstraints.requiredPosition == PositionRequirement::kNone);
+ invariant(!stageConstraints.isIndependentOfAnyCollection);
}
// Facet pipelines cannot have any stages which are initial sources. We've already validated the
@@ -434,8 +436,18 @@ bool Pipeline::needsPrimaryShardMerger() const {
}
bool Pipeline::canRunOnMongos() const {
- return std::all_of(_sources.begin(), _sources.end(), [](const auto& stage) {
- return stage->constraints().hostRequirement == HostTypeRequirement::kAnyShardOrMongoS;
+ return std::all_of(_sources.begin(), _sources.end(), [&](const auto& stage) {
+ auto constraints = stage->constraints();
+ const bool doesNotNeedShard = (constraints.hostRequirement == HostTypeRequirement::kNone);
+ const bool doesNotNeedDisk =
+ (constraints.diskRequirement == DiskUseRequirement::kNoDiskUse ||
+ (constraints.diskRequirement == DiskUseRequirement::kWritesTmpData &&
+ !pCtx->allowDiskUse));
+ const bool doesNotBlockOrBlockingIsPermitted =
+ (constraints.streamType == StreamType::kStreaming ||
+ !internalQueryProhibitBlockingMergeOnMongoS.load());
+
+ return doesNotNeedShard && doesNotNeedDisk && doesNotBlockOrBlockingIsPermitted;
});
}
@@ -579,11 +591,17 @@ DepsTracker Pipeline::getDependencies(DepsTracker::MetadataAvailable metadataAva
return deps;
}
-boost::intrusive_ptr<DocumentSource> Pipeline::popFrontStageWithName(StringData targetStageName) {
+boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithCriteria(
+ StringData targetStageName, stdx::function<bool(const DocumentSource* const)> predicate) {
if (_sources.empty() || _sources.front()->getSourceName() != targetStageName) {
return nullptr;
}
auto targetStage = _sources.front();
+
+ if (predicate && !predicate(targetStage.get())) {
+ return nullptr;
+ }
+
_sources.pop_front();
stitch();
return targetStage;
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 0aa142c36c8..29321861ce8 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -38,6 +38,8 @@
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/value.h"
#include "mongo/db/query/explain_options.h"
+#include "mongo/db/query/query_knobs.h"
+#include "mongo/stdx/functional.h"
#include "mongo/util/intrusive_counter.h"
#include "mongo/util/timer.h"
@@ -281,10 +283,13 @@ public:
}
/**
- * Removes and returns the first stage of the pipeline if its name is 'targetStageName'. Returns
- * nullptr if there is no first stage, or if the stage's name is not 'targetStageName'.
+ * Removes and returns the first stage of the pipeline if its name is 'targetStageName' and the
+ * given 'predicate' function, if present, returns 'true' when called with a pointer to the
+ * stage. Returns nullptr if there is no first stage which meets these criteria.
*/
- boost::intrusive_ptr<DocumentSource> popFrontStageWithName(StringData targetStageName);
+ boost::intrusive_ptr<DocumentSource> popFrontWithCriteria(
+ StringData targetStageName,
+ stdx::function<bool(const DocumentSource* const)> predicate = nullptr);
/**
* PipelineD is a "sister" class that has additional functionality for the Pipeline. It exists
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index e84f30dca74..6c2c3fbba73 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -1791,7 +1791,7 @@ public:
DocumentSourceCollectionlessMock() : DocumentSourceMock({}) {}
StageConstraints constraints() const final {
- StageConstraints constraints;
+ auto constraints = DocumentSourceMock::constraints();
constraints.isIndependentOfAnyCollection = true;
return constraints;
}
@@ -1906,7 +1906,12 @@ public:
DocumentSourceDependencyDummy() : DocumentSourceMock({}) {}
StageConstraints constraints() const final {
- return StageConstraints{}; // Overrides DocumentSourceMock's required position.
+ // Overrides DocumentSourceMock's required position.
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed};
}
};