diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2017-09-26 11:46:58 -0400 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2017-09-27 22:12:14 -0400 |
commit | 55637833c707998f685f997d43624c52cde99b45 (patch) | |
tree | bbc00a719c14983e8984d1dbe8dbddd074e023a7 /src/mongo/db/pipeline | |
parent | 22c34669f744ea245c14a64c556d61f8932ceda9 (diff) | |
download | mongo-55637833c707998f685f997d43624c52cde99b45.tar.gz |
SERVER-30871 Permit blocking aggregation stages to run on mongoS if allowDiskUse is false
Diffstat (limited to 'src/mongo/db/pipeline')
48 files changed, 415 insertions, 174 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index eba4e2acc4b..934d9d29da2 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -132,20 +132,70 @@ public: * A HostTypeRequirement defines where this stage is permitted to be executed when the * pipeline is run on a sharded cluster. */ - enum class HostTypeRequirement { kPrimaryShard, kAnyShard, kAnyShardOrMongoS }; + enum class HostTypeRequirement { kNone, kPrimaryShard, kAnyShard }; - // Set if this stage needs to be in a particular position of the pipeline. - PositionRequirement requiredPosition = PositionRequirement::kNone; + /** + * A DiskUseRequirement indicates whether this stage writes to disk, or whether it may spill + * to disk if its memory usage exceeds a given threshold. Note that this only indicates the + * *ability* of the stage to spill; if 'allowDiskUse' is set to false, it will be prevented + * from doing so. + */ + enum class DiskUseRequirement { kNoDiskUse, kWritesTmpData, kWritesPersistentData }; + + /** + * A FacetRequirement indicates whether this stage may be used within a $facet pipeline. + */ + enum class FacetRequirement { kAllowed, kNotAllowed }; - // Set if this stage can only be executed on specific components of a sharded cluster. - HostTypeRequirement hostRequirement = HostTypeRequirement::kAnyShard; + /** + * A StreamType defines whether this stage is streaming (can produce output based solely on + * the current input document) or blocking (must examine subsequent documents before + * producing an output document). + */ + enum class StreamType { kStreaming, kBlocking }; + + StageConstraints(StreamType streamType, + PositionRequirement requiredPosition, + HostTypeRequirement hostRequirement, + DiskUseRequirement diskRequirement, + FacetRequirement facetRequirement) + : requiredPosition(requiredPosition), + hostRequirement(hostRequirement), + diskRequirement(diskRequirement), + facetRequirement(facetRequirement), + streamType(streamType) { + // Stages which are allowed to run in $facet pipelines must not have any specific + // position requirements. + invariant(!isAllowedInsideFacetStage() || + requiredPosition == PositionRequirement::kNone); + } - bool isAllowedInsideFacetStage = true; + // Indicates whether this stage needs to be at a particular position in the pipeline. + const PositionRequirement requiredPosition; + + // Indicates whether this stage can only be executed on specific components of a sharded + // cluster. + const HostTypeRequirement hostRequirement; + + // Indicates whether this stage may write persistent data to disk, or may spill to temporary + // files if its memory usage becomes excessive. + const DiskUseRequirement diskRequirement; + + // Indicates whether this stage may run inside a $facet stage. + const FacetRequirement facetRequirement; + + // Indicates whether this is a streaming or blocking stage. + const StreamType streamType; // True if this stage does not generate results itself, and instead pulls inputs from an // input DocumentSource (via 'pSource'). bool requiresInputDocSource = true; + // True if this stage should be permitted to run in a $facet pipeline. + bool isAllowedInsideFacetStage() const { + return facetRequirement == FacetRequirement::kAllowed; + } + // True if this stage operates on a global or database level, like $currentOp. bool isIndependentOfAnyCollection = false; @@ -165,6 +215,9 @@ public: using HostTypeRequirement = StageConstraints::HostTypeRequirement; using PositionRequirement = StageConstraints::PositionRequirement; + using DiskUseRequirement = StageConstraints::DiskUseRequirement; + using FacetRequirement = StageConstraints::FacetRequirement; + using StreamType = StageConstraints::StreamType; /** * This is what is returned from the main DocumentSource API: getNext(). It is essentially a @@ -260,9 +313,7 @@ public: * Returns a struct containing information about any special constraints imposed on using this * stage. */ - virtual StageConstraints constraints() const { - return StageConstraints{}; - } + virtual StageConstraints constraints() const = 0; /** * Informs the stage that it is no longer needed and can release its resources. After dispose() diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp index e874fd5d426..a8123f5f2fc 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp @@ -91,7 +91,7 @@ DocumentSource::GetNextResult DocumentSourceBucketAuto::populateSorter() { if (!_sorter) { SortOptions opts; opts.maxMemoryUsageBytes = _maxMemoryUsageBytes; - if (pExpCtx->extSortAllowed && !pExpCtx->inMongos) { + if (pExpCtx->allowDiskUse && !pExpCtx->inMongos) { opts.extSortAllowed = true; opts.tempDir = pExpCtx->tempDir; } diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h index 2a40b092c1f..60c5ed02501 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.h +++ b/src/mongo/db/pipeline/document_source_bucket_auto.h @@ -48,6 +48,14 @@ public: GetNextResult getNext() final; const char* getSourceName() const final; + StageConstraints constraints() const final { + return {StreamType::kBlocking, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kWritesTmpData, + FacetRequirement::kAllowed}; + } + /** * The $bucketAuto stage must be run on the merging shard. */ diff --git a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp index 08926254480..df19b4eb497 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp @@ -350,7 +350,7 @@ TEST_F(BucketAutoTests, ShouldBeAbleToCorrectlySpillToDisk) { auto expCtx = getExpCtx(); unittest::TempDir tempDir("DocumentSourceBucketAutoTest"); expCtx->tempDir = tempDir.path(); - expCtx->extSortAllowed = true; + expCtx->allowDiskUse = true; const size_t maxMemoryUsageBytes = 1000; VariablesParseState vps = expCtx->variablesParseState; @@ -386,7 +386,7 @@ TEST_F(BucketAutoTests, ShouldBeAbleToPauseLoadingWhileSpilled) { // Allow the $sort stage to spill to disk. unittest::TempDir tempDir("DocumentSourceBucketAutoTest"); expCtx->tempDir = tempDir.path(); - expCtx->extSortAllowed = true; + expCtx->allowDiskUse = true; const size_t maxMemoryUsageBytes = 1000; VariablesParseState vps = expCtx->variablesParseState; @@ -647,22 +647,22 @@ void assertCannotSpillToDisk(const boost::intrusive_ptr<ExpressionContext>& expC TEST_F(BucketAutoTests, ShouldFailIfBufferingTooManyDocuments) { auto expCtx = getExpCtx(); - expCtx->extSortAllowed = false; + expCtx->allowDiskUse = false; expCtx->inMongos = false; assertCannotSpillToDisk(expCtx); - expCtx->extSortAllowed = true; + expCtx->allowDiskUse = true; expCtx->inMongos = true; assertCannotSpillToDisk(expCtx); - expCtx->extSortAllowed = false; + expCtx->allowDiskUse = false; expCtx->inMongos = true; assertCannotSpillToDisk(expCtx); } TEST_F(BucketAutoTests, ShouldCorrectlyTrackMemoryUsageBetweenPauses) { auto expCtx = getExpCtx(); - expCtx->extSortAllowed = false; + expCtx->allowDiskUse = false; const size_t maxMemoryUsageBytes = 1000; VariablesParseState vps = expCtx->variablesParseState; diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index a38a1aef2ce..40ceaa5b1b7 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -97,10 +97,11 @@ const char* DocumentSourceOplogMatch::getSourceName() const { } DocumentSource::StageConstraints DocumentSourceOplogMatch::constraints() const { - StageConstraints constraints; - constraints.requiredPosition = PositionRequirement::kFirst; - constraints.isAllowedInsideFacetStage = false; - return constraints; + return {StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed}; } /** @@ -142,6 +143,14 @@ public: return "$changeStream"; } + StageConstraints constraints() const final { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed}; + } + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { // This stage is created by the DocumentSourceChangeStream stage, so serializing it // here would result in it being created twice. diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index 22441f2a03a..cfbef21088d 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -60,6 +60,14 @@ public: GetNextResult getNext() final; const char* getSourceName() const final; + StageConstraints constraints() const final { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed}; + } + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create( @@ -87,6 +95,14 @@ public: GetNextResult getNext() final; const char* getSourceName() const final; + StageConstraints constraints() const final { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed}; + } + /** * SplittableDocumentSource methods; this has to run on the merger, since the resume point could * be at any shard. diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h index 2903241d1a7..0c319729328 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.h +++ b/src/mongo/db/pipeline/document_source_coll_stats.h @@ -75,10 +75,13 @@ public: const char* getSourceName() const final; StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = PositionRequirement::kFirst; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed); + constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h index d5138e74d2a..6fa869712b1 100644 --- a/src/mongo/db/pipeline/document_source_current_op.h +++ b/src/mongo/db/pipeline/document_source_current_op.h @@ -79,11 +79,14 @@ public: const char* getSourceName() const final; StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = PositionRequirement::kFirst; - constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed); + constraints.isIndependentOfAnyCollection = true; + constraints.requiresInputDocSource = false; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index f9c9345baf6..674bbaa85ee 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -52,8 +52,12 @@ public: Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = PositionRequirement::kFirst; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed); + constraints.requiresInputDocSource = false; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index c7f82678eb1..6c58124387b 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -238,21 +238,29 @@ void DocumentSourceFacet::doReattachToOperationContext(OperationContext* opCtx) } DocumentSource::StageConstraints DocumentSourceFacet::constraints() const { - StageConstraints constraints; - constraints.isAllowedInsideFacetStage = false; // Disallow nested $facets. - - for (auto&& facet : _facets) { - for (auto&& nestedStage : facet.pipeline->getSources()) { - if (nestedStage->constraints().hostRequirement == HostTypeRequirement::kPrimaryShard) { - // Currently we don't split $facet to have a merger part and a shards part (see - // SERVER-24154). This means that if any stage in any of the $facet pipelines - // requires the primary shard, then the entire $facet must happen on the merger, and - // the merger must be the primary shard. - constraints.hostRequirement = HostTypeRequirement::kPrimaryShard; - } - } - } - return constraints; + const bool mayUseDisk = std::any_of(_facets.begin(), _facets.end(), [&](const auto& facet) { + const auto sources = facet.pipeline->getSources(); + return std::any_of(sources.begin(), sources.end(), [&](const auto source) { + return source->constraints().diskRequirement == DiskUseRequirement::kWritesTmpData; + }); + }); + + // Currently we don't split $facet to have a merger part and a shards part (see SERVER-24154). + // This means that if any stage in any of the $facet pipelines requires the primary shard, then + // the entire $facet must happen on the merger, and the merger must be the primary shard. + const bool needsPrimaryShard = + std::any_of(_facets.begin(), _facets.end(), [&](const auto& facet) { + const auto sources = facet.pipeline->getSources(); + return std::any_of(sources.begin(), sources.end(), [&](const auto source) { + return source->constraints().hostRequirement == HostTypeRequirement::kPrimaryShard; + }); + }); + + return {StreamType::kBlocking, + PositionRequirement::kNone, + needsPrimaryShard ? HostTypeRequirement::kPrimaryShard : HostTypeRequirement::kAnyShard, + mayUseDisk ? DiskUseRequirement::kWritesTmpData : DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed}; } DocumentSource::GetDepsReturn DocumentSourceFacet::getDependencies(DepsTracker* deps) const { diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp index be05268fb7b..61d112906c7 100644 --- a/src/mongo/db/pipeline/document_source_facet_test.cpp +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -186,9 +186,11 @@ public: DocumentSourcePassthrough() : DocumentSourceMock({}) {} StageConstraints constraints() const override { - StageConstraints constraints; - constraints.isAllowedInsideFacetStage = true; - return constraints; + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed}; } DocumentSource::GetNextResult getNext() final { @@ -625,9 +627,11 @@ TEST_F(DocumentSourceFacetTest, ShouldThrowIfAnyPipelineRequiresTextScoreButItIs class DocumentSourceNeedsPrimaryShard final : public DocumentSourcePassthrough { public: StageConstraints constraints() const final { - StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kPrimaryShard; - return constraints; + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kPrimaryShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed}; } static boost::intrusive_ptr<DocumentSourceNeedsPrimaryShard> create() { diff --git a/src/mongo/db/pipeline/document_source_geo_near.cpp b/src/mongo/db/pipeline/document_source_geo_near.cpp index c971509e373..f7b7a5ca741 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.cpp +++ b/src/mongo/db/pipeline/document_source_geo_near.cpp @@ -61,7 +61,7 @@ DocumentSource::GetNextResult DocumentSourceGeoNear::getNext() { if (!resultsIterator->more()) return GetNextResult::makeEOF(); - // each result from the geoNear command is wrapped in a wrapper object with "obj", + // Each result from the geoNear command is wrapped in a wrapper object with "obj", // "dis" and maybe "loc" fields. We want to take the object from "obj" and inject the // other fields into it. Document result(resultsIterator->next().embeddedObject()); @@ -70,6 +70,11 @@ DocumentSource::GetNextResult DocumentSourceGeoNear::getNext() { if (includeLocs) output.setNestedField(*includeLocs, result["loc"]); + // In a cluster, $geoNear output will be merged via $sort, so add the sort key. + if (pExpCtx->needsMerge) { + output.setSortKeyMetaField(BSON("" << result["dis"])); + } + return output.freeze(); } @@ -89,12 +94,13 @@ Pipeline::SourceContainer::iterator DocumentSourceGeoNear::doOptimizeAt( } // This command is sent as-is to the shards. -// On router this becomes a sort by distance (nearest-first) with limit. intrusive_ptr<DocumentSource> DocumentSourceGeoNear::getShardSource() { return this; } +// On mongoS this becomes a merge sort by distance (nearest-first) with limit. intrusive_ptr<DocumentSource> DocumentSourceGeoNear::getMergeSource() { - return DocumentSourceSort::create(pExpCtx, BSON(distanceField->fullPath() << 1), limit); + return DocumentSourceSort::create( + pExpCtx, BSON(distanceField->fullPath() << 1 << "$mergePresorted" << true), limit); } Value DocumentSourceGeoNear::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h index 315dcea6dd4..23e7c4e901c 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.h +++ b/src/mongo/db/pipeline/document_source_geo_near.h @@ -48,10 +48,13 @@ public: Pipeline::SourceContainer* container) final; StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = PositionRequirement::kFirst; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed); + constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h index 53394d3d0ab..afd633d46bc 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.h +++ b/src/mongo/db/pipeline/document_source_graph_lookup.h @@ -54,9 +54,13 @@ public: GetModPathsReturn getModifiedPaths() const final; StageConstraints constraints() const final { - StageConstraints constraints; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kPrimaryShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed); + constraints.canSwapWithMatch = true; - constraints.hostRequirement = HostTypeRequirement::kPrimaryShard; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index ba2c48680f2..41b2546e331 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -266,7 +266,7 @@ DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>& _initialized(false), _groups(pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>()), _spilled(false), - _extSortAllowed(pExpCtx->extSortAllowed && !pExpCtx->inMongos) {} + _allowDiskUse(pExpCtx->allowDiskUse && !pExpCtx->inMongos) {} void DocumentSourceGroup::addAccumulator(AccumulationStatement accumulationStatement) { _accumulatedFields.push_back(accumulationStatement); @@ -485,7 +485,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { uassert(16945, "Exceeded memory limit for $group, but didn't allow external sort." " Pass allowDiskUse:true to opt in.", - _extSortAllowed); + _allowDiskUse); _sortedFiles.push_back(spill()); _memoryUsageBytes = 0; } @@ -531,7 +531,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { // In debug mode, spill every time we have a duplicate id to stress merge logic. if (!inserted && // is a dup !pExpCtx->inMongos && // can't spill to disk in mongos - !_extSortAllowed && // don't change behavior when testing external sort + !_allowDiskUse && // don't change behavior when testing external sort _sortedFiles.size() < 20) { // don't open too many FDs _sortedFiles.push_back(spill()); diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index cf108c15f24..057eb58164a 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -69,6 +69,14 @@ public: static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + StageConstraints constraints() const final { + return {StreamType::kBlocking, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kWritesTmpData, + FacetRequirement::kAllowed}; + } + /** * Add an accumulator, which will become a field in each Document that results from grouping. */ @@ -176,7 +184,7 @@ private: // Only used when '_spilled' is true. std::unique_ptr<Sorter<Value, Value>::Iterator> _sorterIterator; - const bool _extSortAllowed; + const bool _allowDiskUse; std::pair<Value, Value> _firstPartOfNextGroup; // Only used when '_sorted' is true. diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp index 5cc5ba93be4..8e3fc90521b 100644 --- a/src/mongo/db/pipeline/document_source_group_test.cpp +++ b/src/mongo/db/pipeline/document_source_group_test.cpp @@ -103,7 +103,7 @@ TEST_F(DocumentSourceGroupTest, ShouldBeAbleToPauseLoadingWhileSpilled) { // Allow the $group stage to spill to disk. TempDir tempDir("DocumentSourceGroupTest"); expCtx->tempDir = tempDir.path(); - expCtx->extSortAllowed = true; + expCtx->allowDiskUse = true; const size_t maxMemoryUsageBytes = 1000; VariablesParseState vps = expCtx->variablesParseState; diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h index 83546654361..c5d4d7c2762 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.h +++ b/src/mongo/db/pipeline/document_source_index_stats.h @@ -70,10 +70,13 @@ public: Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = PositionRequirement::kFirst; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed); + constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h index d1967b6625f..839eebecc3f 100644 --- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h +++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h @@ -53,9 +53,11 @@ public: } StageConstraints constraints() const final { - StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; - return constraints; + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed}; } GetNextResult getNext() final; diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp b/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp index a49514cf628..276cf3005d8 100644 --- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp +++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp @@ -47,7 +47,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalSplitPipeline::create auto specObj = elem.embeddedObject(); - HostTypeRequirement mergeType = HostTypeRequirement::kAnyShard; + HostTypeRequirement mergeType = HostTypeRequirement::kNone; for (auto&& elt : specObj) { if (elt.fieldNameStringData() == "mergeType"_sd) { @@ -62,7 +62,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalSplitPipeline::create } else if ("primaryShard"_sd == mergeTypeString) { mergeType = HostTypeRequirement::kPrimaryShard; } else if ("mongos"_sd == mergeTypeString) { - mergeType = HostTypeRequirement::kAnyShardOrMongoS; + mergeType = HostTypeRequirement::kNone; } else { uasserted(ErrorCodes::BadValue, str::stream() << "unrecognized field while parsing mergeType: '" @@ -90,8 +90,8 @@ Value DocumentSourceInternalSplitPipeline::serialize( std::string mergeTypeString; switch (_mergeType) { - case HostTypeRequirement::kAnyShardOrMongoS: - mergeTypeString = "mongos"; + case HostTypeRequirement::kAnyShard: + mergeTypeString = "anyShard"; break; case HostTypeRequirement::kPrimaryShard: @@ -99,7 +99,7 @@ Value DocumentSourceInternalSplitPipeline::serialize( break; default: - mergeTypeString = "anyShard"; + mergeTypeString = "mongos"; break; } diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h index f9ac84c555f..811de4b7e23 100644 --- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h +++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h @@ -36,10 +36,10 @@ namespace mongo { * An internal stage available for testing. Acts as a simple passthrough of intermediate results * from the source stage, but forces the pipeline to split at the point where this stage appears * (assuming that no earlier splitpoints exist). Takes a single parameter, 'mergeType', which can be - * one of 'anyShard', 'primaryShard' or 'mongos' to control where the merge may occur. Omitting this - * parameter or specifying 'anyShard' produces the default merging behaviour; the merge half of the - * pipeline will be sent to a random participating shard, subject to the requirements of any - * subsequent splittable stages in the pipeline. + * one of 'primaryShard', 'anyShard' or 'mongos' to control where the merge may occur. Omitting this + * parameter or specifying 'mongos' produces the default merging behaviour; the merge half of the + * pipeline will be executed on mongoS if all other stages are eligible, and will be sent to a + * random participating shard otherwise. */ class DocumentSourceInternalSplitPipeline final : public DocumentSource, public SplittableDocumentSource { @@ -67,9 +67,11 @@ public: } StageConstraints constraints() const final { - StageConstraints constraints; - constraints.hostRequirement = _mergeType; - return constraints; + return {StreamType::kStreaming, + PositionRequirement::kNone, + _mergeType, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed}; } GetNextResult getNext() final; @@ -80,7 +82,7 @@ private: : DocumentSource(expCtx), _mergeType(mergeType) {} Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - HostTypeRequirement _mergeType = HostTypeRequirement::kAnyShard; + HostTypeRequirement _mergeType = HostTypeRequirement::kNone; }; } // namesace mongo diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h index 7bf47638b21..b74a053028f 100644 --- a/src/mongo/db/pipeline/document_source_limit.h +++ b/src/mongo/db/pipeline/document_source_limit.h @@ -48,6 +48,14 @@ public: static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + StageConstraints constraints() const final { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed}; + } + GetNextResult getNext() final; const char* getSourceName() const final { return kStageName.rawData(); @@ -58,12 +66,6 @@ public: : SimpleBSONObjComparator::kInstance.makeBSONObjSet(); } - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; - return constraints; - } - /** * Attempts to combine with a subsequent $limit stage, setting 'limit' appropriately. */ diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h index 23e7ce97592..01321d596a6 100644 --- a/src/mongo/db/pipeline/document_source_list_local_sessions.h +++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h @@ -94,13 +94,15 @@ public: } StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; - constraints.hostRequirement = StageConstraints::HostTypeRequirement::kAnyShardOrMongoS; - constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed); + constraints.isIndependentOfAnyCollection = true; constraints.allowedToForwardFromMongos = false; + constraints.requiresInputDocSource = false; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_list_sessions.h b/src/mongo/db/pipeline/document_source_list_sessions.h index 47c08af9d73..4d050239e4c 100644 --- a/src/mongo/db/pipeline/document_source_list_sessions.h +++ b/src/mongo/db/pipeline/document_source_list_sessions.h @@ -86,11 +86,11 @@ public: } StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; - constraints.requiresInputDocSource = true; - constraints.isAllowedInsideFacetStage = false; - return constraints; + return {StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed}; } static boost::intrusive_ptr<DocumentSource> createFromBson( diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 21f3151f59c..3a8de609c9f 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -118,6 +118,8 @@ DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs, Expression::parseOperand(pExpCtx, varElem, pExpCtx->variablesParseState), _variablesParseState.defineVariable(varName)); } + + initializeIntrospectionPipeline(); } std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LiteParsed::parse( @@ -659,16 +661,14 @@ void DocumentSourceLookUp::serializeToArray( DocumentSource::GetDepsReturn DocumentSourceLookUp::getDependencies(DepsTracker* deps) const { if (wasConstructedWithPipelineSyntax()) { - // Copy all 'let' variables into the foreign pipeline's expression context. - copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get()); - - auto pipeline = uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx)); + // We will use the introspection pipeline which we prebuilt during construction. + invariant(_parsedIntrospectionPipeline); DepsTracker subDeps(deps->getMetadataAvailable()); // Get the subpipeline dependencies. Subpipeline stages may reference both 'let' variables // declared by this $lookup and variables declared externally. - for (auto&& source : pipeline->getSources()) { + for (auto&& source : _parsedIntrospectionPipeline->getSources()) { source->getDependencies(&subDeps); } diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index 9727d0e92db..7cf136b0cf9 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -96,9 +96,22 @@ public: GetModPathsReturn getModifiedPaths() const final; StageConstraints constraints() const final { - StageConstraints constraints; + const bool mayUseDisk = wasConstructedWithPipelineSyntax() && + std::any_of(_parsedIntrospectionPipeline->getSources().begin(), + _parsedIntrospectionPipeline->getSources().end(), + [](const auto& source) { + return source->constraints().diskRequirement == + DiskUseRequirement::kWritesTmpData; + }); + + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kPrimaryShard, + mayUseDisk ? DiskUseRequirement::kWritesTmpData + : DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed); + constraints.canSwapWithMatch = true; - constraints.hostRequirement = HostTypeRequirement::kPrimaryShard; return constraints; } @@ -243,6 +256,16 @@ private: void resolveLetVariables(const Document& localDoc, Variables* variables); /** + * Builds a parsed pipeline for introspection (e.g. constraints, dependencies). Any sub-$lookup + * pipelines will be built recursively. + */ + void initializeIntrospectionPipeline() { + copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get()); + _parsedIntrospectionPipeline = + uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx)); + } + + /** * Builds the $lookup pipeline and resolves any variables using the passed 'inputDoc', adding a * cursor and/or cache source as appropriate. */ @@ -296,6 +319,9 @@ private: // The aggregation pipeline defined with the user request, prior to optimization and view // resolution. std::vector<BSONObj> _userPipeline; + // A pipeline parsed from _resolvedPipeline at creation time, intended to support introspective + // functions. If sub-$lookup stages are present, their pipelines are constructed recursively. + std::unique_ptr<Pipeline, Pipeline::Deleter> _parsedIntrospectionPipeline; std::vector<LetVariable> _letVariables; diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h index c51653ddb8d..02cfa7435d4 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h @@ -62,9 +62,13 @@ public: } StageConstraints constraints() const final { - StageConstraints constraints; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed); + constraints.canSwapWithMatch = true; - constraints.isAllowedInsideFacetStage = false; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index dcbc2b55814..03ea80c5d7d 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -162,7 +162,7 @@ TEST_F(DocumentSourceLookUpTest, AcceptsPipelineWithLetSyntax) { << "pipeline" << BSON_ARRAY(BSON("$project" << BSON("hasX" << "$$var1")) - << BSON("$match" << BSON("$hasX" << true))) + << BSON("$match" << BSON("hasX" << true))) << "as" << "as")) .firstElement(), @@ -448,9 +448,9 @@ public: Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { while (_removeLeadingQueryStages && !pipeline->getSources().empty()) { - if (pipeline->popFrontStageWithName("$match") || - pipeline->popFrontStageWithName("$sort") || - pipeline->popFrontStageWithName("$project")) { + if (pipeline->popFrontWithCriteria("$match") || + pipeline->popFrontWithCriteria("$sort") || + pipeline->popFrontWithCriteria("$project")) { continue; } break; diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h index 6b2fc653d43..26d912928eb 100644 --- a/src/mongo/db/pipeline/document_source_match.h +++ b/src/mongo/db/pipeline/document_source_match.h @@ -51,9 +51,11 @@ public: const char* getSourceName() const override; StageConstraints constraints() const override { - StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; - return constraints; + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed}; } Value serialize( diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h index 63521e2c742..2e334d5e0ce 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.h +++ b/src/mongo/db/pipeline/document_source_merge_cursors.h @@ -59,11 +59,13 @@ public: Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; StageConstraints constraints() const final { - StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kAnyShard; - constraints.requiredPosition = PositionRequirement::kFirst; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed); + constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h index 22f7a1aa24d..6cc7f5aed69 100644 --- a/src/mongo/db/pipeline/document_source_mock.h +++ b/src/mongo/db/pipeline/document_source_mock.h @@ -50,10 +50,13 @@ public: boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override; StageConstraints constraints() const override { - StageConstraints constraints; - constraints.requiredPosition = PositionRequirement::kFirst; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed); + constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 6756cd4df2a..ec8c188c0b7 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -45,11 +45,11 @@ public: GetDepsReturn getDependencies(DepsTracker* deps) const final; StageConstraints constraints() const final { - StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kPrimaryShard; - constraints.isAllowedInsideFacetStage = false; - constraints.requiredPosition = PositionRequirement::kLast; - return constraints; + return {StreamType::kStreaming, + PositionRequirement::kLast, + HostTypeRequirement::kPrimaryShard, + DiskUseRequirement::kWritesPersistentData, + FacetRequirement::kNotAllowed}; } // Virtuals for SplittableDocumentSource diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h index bedc434d61e..db1698fe776 100644 --- a/src/mongo/db/pipeline/document_source_redact.h +++ b/src/mongo/db/pipeline/document_source_redact.h @@ -41,9 +41,11 @@ public: boost::intrusive_ptr<DocumentSource> optimize() final; StageConstraints constraints() const final { - StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; - return constraints; + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed}; } /** diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h index 07a85c77e33..d5a86d9c008 100644 --- a/src/mongo/db/pipeline/document_source_sample.h +++ b/src/mongo/db/pipeline/document_source_sample.h @@ -44,9 +44,11 @@ public: Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; StageConstraints constraints() const final { - StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; - return constraints; + return {StreamType::kBlocking, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kWritesTmpData, + FacetRequirement::kAllowed}; } GetDepsReturn getDependencies(DepsTracker* deps) const final { diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h index 19b7106b03d..8d10664f6ff 100644 --- a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h +++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h @@ -44,6 +44,14 @@ public: Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; GetDepsReturn getDependencies(DepsTracker* deps) const final; + StageConstraints constraints() const final { + return {StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed}; + } + static boost::intrusive_ptr<DocumentSourceSampleFromRandomCursor> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, long long size, diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h index af96ae8e2ab..1d2474235f2 100644 --- a/src/mongo/db/pipeline/document_source_sequential_document_cache.h +++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h @@ -50,13 +50,14 @@ public: } StageConstraints constraints() const { - StageConstraints constraints; - - if (_cache->isServing()) { - constraints.requiredPosition = PositionRequirement::kFirst; - constraints.requiresInputDocSource = false; - } - + StageConstraints constraints(StreamType::kStreaming, + _cache->isServing() ? PositionRequirement::kFirst + : PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed); + + constraints.requiresInputDocSource = (_cache->isBuilding()); return constraints; } diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index 188e9864310..bdf6eac8d05 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -102,8 +102,11 @@ public: GetModPathsReturn getModifiedPaths() const final; StageConstraints constraints() const final { - StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed); constraints.canSwapWithMatch = true; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h index fc87d7e1eaa..4e10f9ac852 100644 --- a/src/mongo/db/pipeline/document_source_skip.h +++ b/src/mongo/db/pipeline/document_source_skip.h @@ -50,18 +50,20 @@ public: static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + StageConstraints constraints() const final { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed}; + } + GetNextResult getNext() final; const char* getSourceName() const final { return kStageName.rawData(); } - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; - return constraints; - } - /** * Attempts to move a subsequent $limit before the skip, potentially allowing for forther * optimizations earlier in the pipeline. diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index 97923ff8118..e32c7d418a6 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -304,7 +304,7 @@ SortOptions DocumentSourceSort::makeSortOptions() const { opts.limit = limitSrc->getLimit(); opts.maxMemoryUsageBytes = _maxMemoryUsageBytes; - if (pExpCtx->extSortAllowed && !pExpCtx->inMongos) { + if (pExpCtx->allowDiskUse && !pExpCtx->inMongos) { opts.extSortAllowed = true; opts.tempDir = pExpCtx->tempDir; } diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index 2494400eaae..c4ecc799a5f 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -63,14 +63,15 @@ public: } StageConstraints constraints() const final { - StageConstraints constraints; - // Can't swap with a $match if a limit has been absorbed, since in general match can't swap - // with limit. + StageConstraints constraints( + _mergingPresorted ? StreamType::kStreaming : StreamType::kBlocking, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + _mergingPresorted ? DiskUseRequirement::kNoDiskUse : DiskUseRequirement::kWritesTmpData, + FacetRequirement::kAllowed); + + // Can't swap with a $match if a limit has been absorbed, as $match can't swap with $limit. constraints.canSwapWithMatch = !limitSrc; - - // Can run on mongoS only if this stage is merging presorted streams. - constraints.hostRequirement = (_mergingPresorted ? HostTypeRequirement::kAnyShardOrMongoS - : HostTypeRequirement::kAnyShard); return constraints; } @@ -104,6 +105,13 @@ public: uint64_t maxMemoryUsageBytes = kMaxMemoryUsageBytes); /** + * Returns true if this $sort stage is merging presorted streams. + */ + bool mergingPresorted() const { + return _mergingPresorted; + } + + /** * Returns -1 for no limit. */ long long getLimit() const; diff --git a/src/mongo/db/pipeline/document_source_sort_test.cpp b/src/mongo/db/pipeline/document_source_sort_test.cpp index 25362138a9c..2bc39427a37 100644 --- a/src/mongo/db/pipeline/document_source_sort_test.cpp +++ b/src/mongo/db/pipeline/document_source_sort_test.cpp @@ -402,7 +402,7 @@ TEST_F(DocumentSourceSortExecutionTest, ShouldBeAbleToPauseLoadingWhileSpilled) // Allow the $sort stage to spill to disk. unittest::TempDir tempDir("DocumentSourceSortTest"); expCtx->tempDir = tempDir.path(); - expCtx->extSortAllowed = true; + expCtx->allowDiskUse = true; const size_t maxMemoryUsageBytes = 1000; auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes); @@ -436,7 +436,7 @@ TEST_F(DocumentSourceSortExecutionTest, ShouldBeAbleToPauseLoadingWhileSpilled) TEST_F(DocumentSourceSortExecutionTest, ShouldErrorIfNotAllowedToSpillToDiskAndResultSetIsTooLarge) { auto expCtx = getExpCtx(); - expCtx->extSortAllowed = false; + expCtx->allowDiskUse = false; const size_t maxMemoryUsageBytes = 1000; auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes); @@ -451,7 +451,7 @@ TEST_F(DocumentSourceSortExecutionTest, TEST_F(DocumentSourceSortExecutionTest, ShouldCorrectlyTrackMemoryUsageBetweenPauses) { auto expCtx = getExpCtx(); - expCtx->extSortAllowed = false; + expCtx->allowDiskUse = false; const size_t maxMemoryUsageBytes = 1000; auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes); diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.h b/src/mongo/db/pipeline/document_source_tee_consumer.h index 826967a5c48..e20232457a1 100644 --- a/src/mongo/db/pipeline/document_source_tee_consumer.h +++ b/src/mongo/db/pipeline/document_source_tee_consumer.h @@ -53,6 +53,14 @@ public: size_t facetId, const boost::intrusive_ptr<TeeBuffer>& bufferSource); + StageConstraints constraints() const final { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed}; + } + GetNextResult getNext() final; /** diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h index 5bc9a91afdb..c95a979dc56 100644 --- a/src/mongo/db/pipeline/document_source_unwind.h +++ b/src/mongo/db/pipeline/document_source_unwind.h @@ -47,8 +47,12 @@ public: GetModPathsReturn getModifiedPaths() const final; StageConstraints constraints() const final { - StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed); + constraints.canSwapWithMatch = true; return constraints; } diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp index 8f15fb5a3fe..db80c5f7234 100644 --- a/src/mongo/db/pipeline/expression_context.cpp +++ b/src/mongo/db/pipeline/expression_context.cpp @@ -46,7 +46,7 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx, : explain(request.getExplain()), fromMongos(request.isFromMongos()), needsMerge(request.needsMerge()), - extSortAllowed(request.shouldAllowDiskUse()), + allowDiskUse(request.shouldAllowDiskUse()), bypassDocumentValidation(request.shouldBypassDocumentValidation()), from34Mongos(request.isFrom34Mongos()), ns(request.getNamespaceString()), @@ -84,7 +84,7 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(NamespaceString ns, expCtx->fromMongos = fromMongos; expCtx->from34Mongos = from34Mongos; expCtx->inMongos = inMongos; - expCtx->extSortAllowed = extSortAllowed; + expCtx->allowDiskUse = allowDiskUse; expCtx->bypassDocumentValidation = bypassDocumentValidation; expCtx->tempDir = tempDir; diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index 4fe6bc8e541..d580644eeba 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -117,7 +117,7 @@ public: bool fromMongos = false; bool needsMerge = false; bool inMongos = false; - bool extSortAllowed = false; + bool allowDiskUse = false; bool bypassDocumentValidation = false; // We track whether the aggregation request came from a 3.4 mongos. If so, the merge may occur diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 34e04912e49..631228a8ddb 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -64,6 +64,9 @@ namespace dps = ::mongo::dotted_path_support; using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement; using PositionRequirement = DocumentSource::StageConstraints::PositionRequirement; +using DiskUseRequirement = DocumentSource::StageConstraints::DiskUseRequirement; +using FacetRequirement = DocumentSource::StageConstraints::FacetRequirement; +using StreamType = DocumentSource::StageConstraints::StreamType; Pipeline::Pipeline(const intrusive_ptr<ExpressionContext>& pTheCtx) : pCtx(pTheCtx) {} @@ -169,15 +172,14 @@ void Pipeline::validateFacetPipeline() const { } for (auto&& stage : _sources) { auto stageConstraints = stage->constraints(); - if (!stageConstraints.isAllowedInsideFacetStage) { + if (!stageConstraints.isAllowedInsideFacetStage()) { uasserted(40600, str::stream() << stage->getSourceName() << " is not allowed to be used within a $facet stage"); } // We expect a stage within a $facet stage to have these properties. - invariant(stageConstraints.requiresInputDocSource); - invariant(!stageConstraints.isIndependentOfAnyCollection); invariant(stageConstraints.requiredPosition == PositionRequirement::kNone); + invariant(!stageConstraints.isIndependentOfAnyCollection); } // Facet pipelines cannot have any stages which are initial sources. We've already validated the @@ -434,8 +436,18 @@ bool Pipeline::needsPrimaryShardMerger() const { } bool Pipeline::canRunOnMongos() const { - return std::all_of(_sources.begin(), _sources.end(), [](const auto& stage) { - return stage->constraints().hostRequirement == HostTypeRequirement::kAnyShardOrMongoS; + return std::all_of(_sources.begin(), _sources.end(), [&](const auto& stage) { + auto constraints = stage->constraints(); + const bool doesNotNeedShard = (constraints.hostRequirement == HostTypeRequirement::kNone); + const bool doesNotNeedDisk = + (constraints.diskRequirement == DiskUseRequirement::kNoDiskUse || + (constraints.diskRequirement == DiskUseRequirement::kWritesTmpData && + !pCtx->allowDiskUse)); + const bool doesNotBlockOrBlockingIsPermitted = + (constraints.streamType == StreamType::kStreaming || + !internalQueryProhibitBlockingMergeOnMongoS.load()); + + return doesNotNeedShard && doesNotNeedDisk && doesNotBlockOrBlockingIsPermitted; }); } @@ -579,11 +591,17 @@ DepsTracker Pipeline::getDependencies(DepsTracker::MetadataAvailable metadataAva return deps; } -boost::intrusive_ptr<DocumentSource> Pipeline::popFrontStageWithName(StringData targetStageName) { +boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithCriteria( + StringData targetStageName, stdx::function<bool(const DocumentSource* const)> predicate) { if (_sources.empty() || _sources.front()->getSourceName() != targetStageName) { return nullptr; } auto targetStage = _sources.front(); + + if (predicate && !predicate(targetStage.get())) { + return nullptr; + } + _sources.pop_front(); stitch(); return targetStage; diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 0aa142c36c8..29321861ce8 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -38,6 +38,8 @@ #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/value.h" #include "mongo/db/query/explain_options.h" +#include "mongo/db/query/query_knobs.h" +#include "mongo/stdx/functional.h" #include "mongo/util/intrusive_counter.h" #include "mongo/util/timer.h" @@ -281,10 +283,13 @@ public: } /** - * Removes and returns the first stage of the pipeline if its name is 'targetStageName'. Returns - * nullptr if there is no first stage, or if the stage's name is not 'targetStageName'. + * Removes and returns the first stage of the pipeline if its name is 'targetStageName' and the + * given 'predicate' function, if present, returns 'true' when called with a pointer to the + * stage. Returns nullptr if there is no first stage which meets these criteria. */ - boost::intrusive_ptr<DocumentSource> popFrontStageWithName(StringData targetStageName); + boost::intrusive_ptr<DocumentSource> popFrontWithCriteria( + StringData targetStageName, + stdx::function<bool(const DocumentSource* const)> predicate = nullptr); /** * PipelineD is a "sister" class that has additional functionality for the Pipeline. It exists diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index e84f30dca74..6c2c3fbba73 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -1791,7 +1791,7 @@ public: DocumentSourceCollectionlessMock() : DocumentSourceMock({}) {} StageConstraints constraints() const final { - StageConstraints constraints; + auto constraints = DocumentSourceMock::constraints(); constraints.isIndependentOfAnyCollection = true; return constraints; } @@ -1906,7 +1906,12 @@ public: DocumentSourceDependencyDummy() : DocumentSourceMock({}) {} StageConstraints constraints() const final { - return StageConstraints{}; // Overrides DocumentSourceMock's required position. + // Overrides DocumentSourceMock's required position. + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed}; } }; |