From 9062f71621d053e32beec1061f708e3fd08b0158 Mon Sep 17 00:00:00 2001 From: Bernard Gorman Date: Mon, 7 Aug 2017 15:07:47 +0100 Subject: SERVER-22760 Sharded aggregation pipelines which involve taking a simple union should merge on mongos --- src/mongo/db/pipeline/SConscript | 1 + src/mongo/db/pipeline/document_source.h | 16 ++- .../db/pipeline/document_source_change_stream.cpp | 2 +- src/mongo/db/pipeline/document_source_coll_stats.h | 2 +- src/mongo/db/pipeline/document_source_current_op.h | 2 +- src/mongo/db/pipeline/document_source_cursor.h | 2 +- src/mongo/db/pipeline/document_source_facet.cpp | 4 +- .../db/pipeline/document_source_facet_test.cpp | 8 +- src/mongo/db/pipeline/document_source_geo_near.h | 2 +- .../db/pipeline/document_source_graph_lookup.h | 2 +- .../db/pipeline/document_source_index_stats.h | 2 +- ...document_source_internal_inhibit_optimization.h | 6 ++ .../document_source_internal_split_pipeline.cpp | 109 +++++++++++++++++++++ .../document_source_internal_split_pipeline.h | 81 +++++++++++++++ src/mongo/db/pipeline/document_source_limit.h | 6 ++ src/mongo/db/pipeline/document_source_lookup.h | 2 +- src/mongo/db/pipeline/document_source_match.h | 7 ++ .../db/pipeline/document_source_merge_cursors.cpp | 6 ++ .../db/pipeline/document_source_merge_cursors.h | 8 +- src/mongo/db/pipeline/document_source_mock.h | 2 +- src/mongo/db/pipeline/document_source_out.h | 4 +- src/mongo/db/pipeline/document_source_redact.h | 6 ++ src/mongo/db/pipeline/document_source_sample.cpp | 7 +- src/mongo/db/pipeline/document_source_sample.h | 6 ++ ...ocument_source_single_document_transformation.h | 1 + src/mongo/db/pipeline/document_source_skip.h | 7 ++ src/mongo/db/pipeline/document_source_sort.h | 4 + src/mongo/db/pipeline/document_source_unwind.h | 1 + src/mongo/db/pipeline/pipeline.cpp | 23 +++-- src/mongo/db/pipeline/pipeline.h | 5 + 30 files changed, 304 insertions(+), 30 deletions(-) create mode 100644 src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp create mode 100644 src/mongo/db/pipeline/document_source_internal_split_pipeline.h (limited to 'src/mongo/db/pipeline') diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 4ceb1616a36..1d622a2c8cf 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -242,6 +242,7 @@ docSourceEnv.Library( 'document_source_group.cpp', 'document_source_index_stats.cpp', 'document_source_internal_inhibit_optimization.cpp', + 'document_source_internal_split_pipeline.cpp', 'document_source_limit.cpp', 'document_source_match.cpp', 'document_source_merge_cursors.cpp', diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index efb695c8f59..8f1c2fef4a0 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -128,9 +128,18 @@ public: */ enum class PositionRequirement { kNone, kFirst, kLast }; + /** + * A HostTypeRequirement defines where this stage is permitted to be executed when the + * pipeline is run on a sharded cluster. + */ + enum class HostTypeRequirement { kPrimaryShard, kAnyShard, kAnyShardOrMongoS }; + // Set if this stage needs to be in a particular position of the pipeline. PositionRequirement requiredPosition = PositionRequirement::kNone; + // Set if this stage can only be executed on specific components of a sharded cluster. + HostTypeRequirement hostRequirement = HostTypeRequirement::kAnyShard; + bool isAllowedInsideFacetStage = true; // True if this stage does not generate results itself, and instead pulls inputs from an @@ -147,12 +156,11 @@ public: // must also override getModifiedPaths() to provide information about which particular // $match predicates be swapped before itself. bool canSwapWithMatch = false; - - // True if this stage must run on the primary shard when the collection being aggregated is - // sharded. - bool mustRunOnPrimaryShardIfSharded = false; }; + using HostTypeRequirement = StageConstraints::HostTypeRequirement; + using PositionRequirement = StageConstraints::PositionRequirement; + /** * This is what is returned from the main DocumentSource API: getNext(). It is essentially a * (ReturnStatus, Document) pair, with the first entry being used to communicate information diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 40d9fe5555e..64f313649af 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -96,7 +96,7 @@ public: StageConstraints constraints() const final { StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.requiredPosition = PositionRequirement::kFirst; constraints.isAllowedInsideFacetStage = false; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h index 6334bbe55c8..2903241d1a7 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.h +++ b/src/mongo/db/pipeline/document_source_coll_stats.h @@ -76,7 +76,7 @@ public: StageConstraints constraints() const final { StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.requiredPosition = PositionRequirement::kFirst; constraints.requiresInputDocSource = false; constraints.isAllowedInsideFacetStage = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h index cd86d027733..d5138e74d2a 100644 --- a/src/mongo/db/pipeline/document_source_current_op.h +++ b/src/mongo/db/pipeline/document_source_current_op.h @@ -80,7 +80,7 @@ public: StageConstraints constraints() const final { StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.requiredPosition = PositionRequirement::kFirst; constraints.requiresInputDocSource = false; constraints.isAllowedInsideFacetStage = false; constraints.isIndependentOfAnyCollection = true; diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index 57a41996af9..c15c0495ba7 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -53,7 +53,7 @@ public: StageConstraints constraints() const final { StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.requiredPosition = PositionRequirement::kFirst; constraints.requiresInputDocSource = false; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index aba1f5f115c..e553ea2fa62 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -243,12 +243,12 @@ DocumentSource::StageConstraints DocumentSourceFacet::constraints() const { for (auto&& facet : _facets) { for (auto&& nestedStage : facet.pipeline->getSources()) { - if (nestedStage->constraints().mustRunOnPrimaryShardIfSharded) { + if (nestedStage->constraints().hostRequirement == HostTypeRequirement::kPrimaryShard) { // Currently we don't split $facet to have a merger part and a shards part (see // SERVER-24154). This means that if any stage in any of the $facet pipelines // requires the primary shard, then the entire $facet must happen on the merger, and // the merger must be the primary shard. - constraints.mustRunOnPrimaryShardIfSharded = true; + constraints.hostRequirement = HostTypeRequirement::kPrimaryShard; } } } diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp index 60aefdead64..d8798a5c70e 100644 --- a/src/mongo/db/pipeline/document_source_facet_test.cpp +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -609,7 +609,7 @@ class DocumentSourceNeedsPrimaryShard final : public DocumentSourcePassthrough { public: StageConstraints constraints() const final { StageConstraints constraints; - constraints.mustRunOnPrimaryShardIfSharded = true; + constraints.hostRequirement = HostTypeRequirement::kPrimaryShard; return constraints; } @@ -633,7 +633,8 @@ TEST_F(DocumentSourceFacetTest, ShouldRequirePrimaryShardIfAnyStageRequiresPrima facets.emplace_back("needsPrimaryShard", std::move(secondPipeline)); auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); - ASSERT_TRUE(facetStage->constraints().mustRunOnPrimaryShardIfSharded); + ASSERT(facetStage->constraints().hostRequirement == + DocumentSource::StageConstraints::HostTypeRequirement::kPrimaryShard); } TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPrimaryShard) { @@ -652,7 +653,8 @@ TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPr facets.emplace_back("second", std::move(secondPipeline)); auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); - ASSERT_FALSE(facetStage->constraints().mustRunOnPrimaryShardIfSharded); + ASSERT(facetStage->constraints().hostRequirement == + DocumentSource::StageConstraints::HostTypeRequirement::kAnyShard); } } // namespace diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h index 5df25d6c1c6..315dcea6dd4 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.h +++ b/src/mongo/db/pipeline/document_source_geo_near.h @@ -49,7 +49,7 @@ public: StageConstraints constraints() const final { StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.requiredPosition = PositionRequirement::kFirst; constraints.requiresInputDocSource = false; constraints.isAllowedInsideFacetStage = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h index 489a80ea7b9..53394d3d0ab 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.h +++ b/src/mongo/db/pipeline/document_source_graph_lookup.h @@ -56,7 +56,7 @@ public: StageConstraints constraints() const final { StageConstraints constraints; constraints.canSwapWithMatch = true; - constraints.mustRunOnPrimaryShardIfSharded = true; + constraints.hostRequirement = HostTypeRequirement::kPrimaryShard; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h index dd62f5323bd..83546654361 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.h +++ b/src/mongo/db/pipeline/document_source_index_stats.h @@ -71,7 +71,7 @@ public: StageConstraints constraints() const final { StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.requiredPosition = PositionRequirement::kFirst; constraints.requiresInputDocSource = false; constraints.isAllowedInsideFacetStage = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h index a750443a5b5..d1967b6625f 100644 --- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h +++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h @@ -52,6 +52,12 @@ public: return kStageName.rawData(); } + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; + return constraints; + } + GetNextResult getNext() final; private: diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp b/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp new file mode 100644 index 00000000000..a49514cf628 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp @@ -0,0 +1,109 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_internal_split_pipeline.h" + +namespace mongo { + +REGISTER_DOCUMENT_SOURCE(_internalSplitPipeline, + LiteParsedDocumentSourceDefault::parse, + DocumentSourceInternalSplitPipeline::createFromBson); + +constexpr StringData DocumentSourceInternalSplitPipeline::kStageName; + +boost::intrusive_ptr DocumentSourceInternalSplitPipeline::createFromBson( + BSONElement elem, const boost::intrusive_ptr& expCtx) { + uassert(ErrorCodes::TypeMismatch, + str::stream() << "$_internalSplitPipeline must take a nested object but found: " + << elem, + elem.type() == BSONType::Object); + + auto specObj = elem.embeddedObject(); + + HostTypeRequirement mergeType = HostTypeRequirement::kAnyShard; + + for (auto&& elt : specObj) { + if (elt.fieldNameStringData() == "mergeType"_sd) { + uassert(ErrorCodes::BadValue, + str::stream() << "'mergeType' must be a string value but found: " << elt.type(), + elt.type() == BSONType::String); + + auto mergeTypeString = elt.valueStringData(); + + if ("anyShard"_sd == mergeTypeString) { + mergeType = HostTypeRequirement::kAnyShard; + } else if ("primaryShard"_sd == mergeTypeString) { + mergeType = HostTypeRequirement::kPrimaryShard; + } else if ("mongos"_sd == mergeTypeString) { + mergeType = HostTypeRequirement::kAnyShardOrMongoS; + } else { + uasserted(ErrorCodes::BadValue, + str::stream() << "unrecognized field while parsing mergeType: '" + << elt.fieldNameStringData() + << "'"); + } + } else { + uasserted(ErrorCodes::BadValue, + str::stream() << "unrecognized field while parsing $_internalSplitPipeline: '" + << elt.fieldNameStringData() + << "'"); + } + } + + return new DocumentSourceInternalSplitPipeline(expCtx, mergeType); +} + +DocumentSource::GetNextResult DocumentSourceInternalSplitPipeline::getNext() { + pExpCtx->checkForInterrupt(); + return pSource->getNext(); +} + +Value DocumentSourceInternalSplitPipeline::serialize( + boost::optional explain) const { + std::string mergeTypeString; + + switch (_mergeType) { + case HostTypeRequirement::kAnyShardOrMongoS: + mergeTypeString = "mongos"; + break; + + case HostTypeRequirement::kPrimaryShard: + mergeTypeString = "primaryShard"; + break; + + default: + mergeTypeString = "anyShard"; + break; + } + + return Value(Document{{getSourceName(), Value{Document{{"mergeType", mergeTypeString}}}}}); +} + +} // namesace mongo diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h new file mode 100644 index 00000000000..c18a6d301a6 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +/** + * An internal stage available for testing. Acts as a simple passthrough of intermediate results + * from the source stage, but forces the pipeline to split at the point where this stage appears + * (assuming that no earlier splitpoints exist). Takes a single parameter, 'mergeType', which can be + * one of 'anyShard', 'primaryShard' or 'mongos' to control where the merge may occur. Omitting this + * parameter or specifying 'anyShard' produces the default merging behaviour; the merge half of the + * pipeline will be sent to a random participating shard, subject to the requirements of any + * subsequent splittable stages in the pipeline. + */ +class DocumentSourceInternalSplitPipeline final : public DocumentSource, + public SplittableDocumentSource { +public: + static constexpr StringData kStageName = "$_internalSplitPipeline"_sd; + + static boost::intrusive_ptr createFromBson( + BSONElement, const boost::intrusive_ptr&); + + DocumentSourceInternalSplitPipeline(const boost::intrusive_ptr& expCtx, + HostTypeRequirement mergeType) + : DocumentSource(expCtx), _mergeType(mergeType) {} + + const char* getSourceName() const final { + return kStageName.rawData(); + } + + boost::intrusive_ptr getShardSource() final { + return this; + } + + boost::intrusive_ptr getMergeSource() final { + return this; + } + + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.hostRequirement = _mergeType; + return constraints; + } + + GetNextResult getNext() final; + +private: + Value serialize(boost::optional explain = boost::none) const final; + HostTypeRequirement _mergeType = HostTypeRequirement::kAnyShard; +}; + +} // namesace mongo diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h index c6660152d66..88acfa8b45a 100644 --- a/src/mongo/db/pipeline/document_source_limit.h +++ b/src/mongo/db/pipeline/document_source_limit.h @@ -42,6 +42,12 @@ public: : SimpleBSONObjComparator::kInstance.makeBSONObjSet(); } + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; + return constraints; + } + /** * Attempts to combine with a subsequent $limit stage, setting 'limit' appropriately. */ diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index d5bab6fd9e7..a4cd8d35ed6 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -97,7 +97,7 @@ public: StageConstraints constraints() const final { StageConstraints constraints; constraints.canSwapWithMatch = true; - constraints.mustRunOnPrimaryShardIfSharded = true; + constraints.hostRequirement = HostTypeRequirement::kPrimaryShard; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h index 2be99dd7f12..135dbc39e17 100644 --- a/src/mongo/db/pipeline/document_source_match.h +++ b/src/mongo/db/pipeline/document_source_match.h @@ -49,6 +49,13 @@ public: } const char* getSourceName() const override; + + StageConstraints constraints() const override { + StageConstraints constraints; + constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; + return constraints; + } + Value serialize( boost::optional explain = boost::none) const override; diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp index 93cd6dd93e7..d53c2183443 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp +++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp @@ -176,6 +176,12 @@ DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() { return std::move(next); } +bool DocumentSourceMergeCursors::remotesExhausted() const { + return std::all_of(_cursors.begin(), _cursors.end(), [](const auto& cursorAndConn) { + return cursorAndConn->cursor.isDead(); + }); +} + void DocumentSourceMergeCursors::doDispose() { for (auto&& cursorAndConn : _cursors) { // Note it is an error to call done() on a connection before consuming the reply from a diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h index 045ddad4836..e83316fa6ea 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.h +++ b/src/mongo/db/pipeline/document_source_merge_cursors.h @@ -55,7 +55,8 @@ public: StageConstraints constraints() const final { StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; + constraints.requiredPosition = PositionRequirement::kFirst; constraints.requiresInputDocSource = false; constraints.isAllowedInsideFacetStage = false; return constraints; @@ -68,6 +69,11 @@ public: std::vector cursorDescriptors, const boost::intrusive_ptr& pExpCtx); + /** + * Returns true if all remotes have reported that their cursors are closed. + */ + bool remotesExhausted() const; + /** Returns non-owning pointers to cursors managed by this stage. * Call this instead of getNext() if you want access to the raw streams. * This method should only be called at most once. diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h index 513994cd599..22f7a1aa24d 100644 --- a/src/mongo/db/pipeline/document_source_mock.h +++ b/src/mongo/db/pipeline/document_source_mock.h @@ -51,7 +51,7 @@ public: StageConstraints constraints() const override { StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.requiredPosition = PositionRequirement::kFirst; constraints.requiresInputDocSource = false; constraints.isAllowedInsideFacetStage = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 9366d01a670..6756cd4df2a 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -46,9 +46,9 @@ public: StageConstraints constraints() const final { StageConstraints constraints; - constraints.mustRunOnPrimaryShardIfSharded = true; + constraints.hostRequirement = HostTypeRequirement::kPrimaryShard; constraints.isAllowedInsideFacetStage = false; - constraints.requiredPosition = StageConstraints::PositionRequirement::kLast; + constraints.requiredPosition = PositionRequirement::kLast; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h index a76ca9c7940..bedc434d61e 100644 --- a/src/mongo/db/pipeline/document_source_redact.h +++ b/src/mongo/db/pipeline/document_source_redact.h @@ -40,6 +40,12 @@ public: const char* getSourceName() const final; boost::intrusive_ptr optimize() final; + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; + return constraints; + } + /** * Attempts to duplicate the redact-safe portion of a subsequent $match before the $redact * stage. diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp index 5117e5c516a..c174d95d935 100644 --- a/src/mongo/db/pipeline/document_source_sample.cpp +++ b/src/mongo/db/pipeline/document_source_sample.cpp @@ -123,6 +123,11 @@ intrusive_ptr DocumentSourceSample::getShardSource() { intrusive_ptr DocumentSourceSample::getMergeSource() { // Just need to merge the pre-sorted documents by their random values. - return DocumentSourceSort::create(pExpCtx, randSortSpec, _size); + BSONObjBuilder randMergeSortSpec; + + randMergeSortSpec.appendElements(randSortSpec); + randMergeSortSpec.append("$mergePresorted", true); + + return DocumentSourceSort::create(pExpCtx, randMergeSortSpec.obj(), _size); } } // mongo diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h index ec88c0737a8..662c6a9a49d 100644 --- a/src/mongo/db/pipeline/document_source_sample.h +++ b/src/mongo/db/pipeline/document_source_sample.h @@ -39,6 +39,12 @@ public: const char* getSourceName() const final; Value serialize(boost::optional explain = boost::none) const final; + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; + return constraints; + } + GetDepsReturn getDependencies(DepsTracker* deps) const final { return SEE_NEXT; } diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index b50065bc303..188e9864310 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -103,6 +103,7 @@ public: StageConstraints constraints() const final { StageConstraints constraints; + constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; constraints.canSwapWithMatch = true; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h index 46cb6a6f3ab..a69f5e59eb5 100644 --- a/src/mongo/db/pipeline/document_source_skip.h +++ b/src/mongo/db/pipeline/document_source_skip.h @@ -37,6 +37,13 @@ public: // virtuals from DocumentSource GetNextResult getNext() final; const char* getSourceName() const final; + + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; + return constraints; + } + /** * Attempts to move a subsequent $limit before the skip, potentially allowing for forther * optimizations earlier in the pipeline. diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index 866a3fb955c..5731051cbd3 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -57,6 +57,10 @@ public: // Can't swap with a $match if a limit has been absorbed, since in general match can't swap // with limit. constraints.canSwapWithMatch = !limitSrc; + + // Can run on mongoS only if this stage is merging presorted streams. + constraints.hostRequirement = (_mergingPresorted ? HostTypeRequirement::kAnyShardOrMongoS + : HostTypeRequirement::kAnyShard); return constraints; } diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h index fb7bd68ba39..5bc9a91afdb 100644 --- a/src/mongo/db/pipeline/document_source_unwind.h +++ b/src/mongo/db/pipeline/document_source_unwind.h @@ -48,6 +48,7 @@ public: StageConstraints constraints() const final { StageConstraints constraints; + constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; constraints.canSwapWithMatch = true; return constraints; } diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index e5cd62a4670..8b84fe36bff 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -62,6 +62,9 @@ using std::vector; namespace dps = ::mongo::dotted_path_support; +using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement; +using PositionRequirement = DocumentSource::StageConstraints::PositionRequirement; + Pipeline::Pipeline(const intrusive_ptr& pTheCtx) : pCtx(pTheCtx) {} Pipeline::Pipeline(SourceContainer stages, const intrusive_ptr& expCtx) @@ -171,8 +174,7 @@ Status Pipeline::validateFacetPipeline() const { // We expect a stage within a $facet stage to have these properties. invariant(stageConstraints.requiresInputDocSource); invariant(!stageConstraints.isIndependentOfAnyCollection); - invariant(stageConstraints.requiredPosition == - DocumentSource::StageConstraints::PositionRequirement::kNone); + invariant(stageConstraints.requiredPosition == PositionRequirement::kNone); } // Facet pipelines cannot have any stages which are initial sources. We've already validated the @@ -184,9 +186,7 @@ Status Pipeline::validateFacetPipeline() const { Status Pipeline::ensureAllStagesAreInLegalPositions() const { size_t i = 0; for (auto&& stage : _sources) { - if (stage->constraints().requiredPosition == - DocumentSource::StageConstraints::PositionRequirement::kFirst && - i != 0) { + if (stage->constraints().requiredPosition == PositionRequirement::kFirst && i != 0) { return {ErrorCodes::BadValue, str::stream() << stage->getSourceName() << " is only valid as the first stage in a pipeline.", @@ -199,8 +199,7 @@ Status Pipeline::ensureAllStagesAreInLegalPositions() const { 17313}; } - if (stage->constraints().requiredPosition == - DocumentSource::StageConstraints::PositionRequirement::kLast && + if (stage->constraints().requiredPosition == PositionRequirement::kLast && i != _sources.size() - 1) { return {ErrorCodes::BadValue, str::stream() << stage->getSourceName() @@ -312,6 +311,8 @@ std::unique_ptr Pipeline::splitForSharded() { shardPipeline->_splitForSharded = true; _splitForMerge = true; + stitch(); + return shardPipeline; } @@ -428,7 +429,13 @@ BSONObj Pipeline::getInitialQuery() const { bool Pipeline::needsPrimaryShardMerger() const { return std::any_of(_sources.begin(), _sources.end(), [](const auto& stage) { - return stage->constraints().mustRunOnPrimaryShardIfSharded; + return stage->constraints().hostRequirement == HostTypeRequirement::kPrimaryShard; + }); +} + +bool Pipeline::canRunOnMongos() const { + return std::all_of(_sources.begin(), _sources.end(), [](const auto& stage) { + return stage->constraints().hostRequirement == HostTypeRequirement::kAnyShardOrMongoS; }); } diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 677cfe7b6d1..ed19d44ae2b 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -218,6 +218,11 @@ public: */ bool needsPrimaryShardMerger() const; + /** + * Returns whether or not every DocumentSource in the pipeline can run on mongoS. + */ + bool canRunOnMongos() const; + /** * Modifies the pipeline, optimizing it by combining and swapping stages. */ -- cgit v1.2.1