summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2017-08-07 15:07:47 +0100
committerBernard Gorman <bernard.gorman@gmail.com>2017-08-08 19:04:30 +0100
commit9062f71621d053e32beec1061f708e3fd08b0158 (patch)
tree82fd151004fc4192126d71628e765e9257128feb /src/mongo/db/pipeline
parent1dabf89e81b06b4d93a447e1fa5f6742b2f7afa1 (diff)
downloadmongo-9062f71621d053e32beec1061f708e3fd08b0158.tar.gz
SERVER-22760 Sharded aggregation pipelines which involve taking a simple union should merge on mongos
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source.h16
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.h2
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.h2
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.h2
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_facet_test.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.h2
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.h2
-rw-r--r--src/mongo/db/pipeline/document_source_index_stats.h2
-rw-r--r--src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h6
-rw-r--r--src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp109
-rw-r--r--src/mongo/db/pipeline/document_source_internal_split_pipeline.h81
-rw-r--r--src/mongo/db/pipeline/document_source_limit.h6
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h2
-rw-r--r--src/mongo/db/pipeline/document_source_match.h7
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.h8
-rw-r--r--src/mongo/db/pipeline/document_source_mock.h2
-rw-r--r--src/mongo/db/pipeline/document_source_out.h4
-rw-r--r--src/mongo/db/pipeline/document_source_redact.h6
-rw-r--r--src/mongo/db/pipeline/document_source_sample.cpp7
-rw-r--r--src/mongo/db/pipeline/document_source_sample.h6
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.h1
-rw-r--r--src/mongo/db/pipeline/document_source_skip.h7
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h4
-rw-r--r--src/mongo/db/pipeline/document_source_unwind.h1
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp23
-rw-r--r--src/mongo/db/pipeline/pipeline.h5
30 files changed, 304 insertions, 30 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 4ceb1616a36..1d622a2c8cf 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -242,6 +242,7 @@ docSourceEnv.Library(
'document_source_group.cpp',
'document_source_index_stats.cpp',
'document_source_internal_inhibit_optimization.cpp',
+ 'document_source_internal_split_pipeline.cpp',
'document_source_limit.cpp',
'document_source_match.cpp',
'document_source_merge_cursors.cpp',
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index efb695c8f59..8f1c2fef4a0 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -128,9 +128,18 @@ public:
*/
enum class PositionRequirement { kNone, kFirst, kLast };
+ /**
+ * A HostTypeRequirement defines where this stage is permitted to be executed when the
+ * pipeline is run on a sharded cluster.
+ */
+ enum class HostTypeRequirement { kPrimaryShard, kAnyShard, kAnyShardOrMongoS };
+
// Set if this stage needs to be in a particular position of the pipeline.
PositionRequirement requiredPosition = PositionRequirement::kNone;
+ // Set if this stage can only be executed on specific components of a sharded cluster.
+ HostTypeRequirement hostRequirement = HostTypeRequirement::kAnyShard;
+
bool isAllowedInsideFacetStage = true;
// True if this stage does not generate results itself, and instead pulls inputs from an
@@ -147,12 +156,11 @@ public:
// must also override getModifiedPaths() to provide information about which particular
// $match predicates be swapped before itself.
bool canSwapWithMatch = false;
-
- // True if this stage must run on the primary shard when the collection being aggregated is
- // sharded.
- bool mustRunOnPrimaryShardIfSharded = false;
};
+ using HostTypeRequirement = StageConstraints::HostTypeRequirement;
+ using PositionRequirement = StageConstraints::PositionRequirement;
+
/**
* This is what is returned from the main DocumentSource API: getNext(). It is essentially a
* (ReturnStatus, Document) pair, with the first entry being used to communicate information
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 40d9fe5555e..64f313649af 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -96,7 +96,7 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.requiredPosition = PositionRequirement::kFirst;
constraints.isAllowedInsideFacetStage = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h
index 6334bbe55c8..2903241d1a7 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.h
+++ b/src/mongo/db/pipeline/document_source_coll_stats.h
@@ -76,7 +76,7 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.requiredPosition = PositionRequirement::kFirst;
constraints.requiresInputDocSource = false;
constraints.isAllowedInsideFacetStage = false;
return constraints;
diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h
index cd86d027733..d5138e74d2a 100644
--- a/src/mongo/db/pipeline/document_source_current_op.h
+++ b/src/mongo/db/pipeline/document_source_current_op.h
@@ -80,7 +80,7 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.requiredPosition = PositionRequirement::kFirst;
constraints.requiresInputDocSource = false;
constraints.isAllowedInsideFacetStage = false;
constraints.isIndependentOfAnyCollection = true;
diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h
index 57a41996af9..c15c0495ba7 100644
--- a/src/mongo/db/pipeline/document_source_cursor.h
+++ b/src/mongo/db/pipeline/document_source_cursor.h
@@ -53,7 +53,7 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.requiredPosition = PositionRequirement::kFirst;
constraints.requiresInputDocSource = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index aba1f5f115c..e553ea2fa62 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -243,12 +243,12 @@ DocumentSource::StageConstraints DocumentSourceFacet::constraints() const {
for (auto&& facet : _facets) {
for (auto&& nestedStage : facet.pipeline->getSources()) {
- if (nestedStage->constraints().mustRunOnPrimaryShardIfSharded) {
+ if (nestedStage->constraints().hostRequirement == HostTypeRequirement::kPrimaryShard) {
// Currently we don't split $facet to have a merger part and a shards part (see
// SERVER-24154). This means that if any stage in any of the $facet pipelines
// requires the primary shard, then the entire $facet must happen on the merger, and
// the merger must be the primary shard.
- constraints.mustRunOnPrimaryShardIfSharded = true;
+ constraints.hostRequirement = HostTypeRequirement::kPrimaryShard;
}
}
}
diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp
index 60aefdead64..d8798a5c70e 100644
--- a/src/mongo/db/pipeline/document_source_facet_test.cpp
+++ b/src/mongo/db/pipeline/document_source_facet_test.cpp
@@ -609,7 +609,7 @@ class DocumentSourceNeedsPrimaryShard final : public DocumentSourcePassthrough {
public:
StageConstraints constraints() const final {
StageConstraints constraints;
- constraints.mustRunOnPrimaryShardIfSharded = true;
+ constraints.hostRequirement = HostTypeRequirement::kPrimaryShard;
return constraints;
}
@@ -633,7 +633,8 @@ TEST_F(DocumentSourceFacetTest, ShouldRequirePrimaryShardIfAnyStageRequiresPrima
facets.emplace_back("needsPrimaryShard", std::move(secondPipeline));
auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
- ASSERT_TRUE(facetStage->constraints().mustRunOnPrimaryShardIfSharded);
+ ASSERT(facetStage->constraints().hostRequirement ==
+ DocumentSource::StageConstraints::HostTypeRequirement::kPrimaryShard);
}
TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPrimaryShard) {
@@ -652,7 +653,8 @@ TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPr
facets.emplace_back("second", std::move(secondPipeline));
auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
- ASSERT_FALSE(facetStage->constraints().mustRunOnPrimaryShardIfSharded);
+ ASSERT(facetStage->constraints().hostRequirement ==
+ DocumentSource::StageConstraints::HostTypeRequirement::kAnyShard);
}
} // namespace
diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h
index 5df25d6c1c6..315dcea6dd4 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.h
+++ b/src/mongo/db/pipeline/document_source_geo_near.h
@@ -49,7 +49,7 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.requiredPosition = PositionRequirement::kFirst;
constraints.requiresInputDocSource = false;
constraints.isAllowedInsideFacetStage = false;
return constraints;
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h
index 489a80ea7b9..53394d3d0ab 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.h
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.h
@@ -56,7 +56,7 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
constraints.canSwapWithMatch = true;
- constraints.mustRunOnPrimaryShardIfSharded = true;
+ constraints.hostRequirement = HostTypeRequirement::kPrimaryShard;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h
index dd62f5323bd..83546654361 100644
--- a/src/mongo/db/pipeline/document_source_index_stats.h
+++ b/src/mongo/db/pipeline/document_source_index_stats.h
@@ -71,7 +71,7 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.requiredPosition = PositionRequirement::kFirst;
constraints.requiresInputDocSource = false;
constraints.isAllowedInsideFacetStage = false;
return constraints;
diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
index a750443a5b5..d1967b6625f 100644
--- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
+++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
@@ -52,6 +52,12 @@ public:
return kStageName.rawData();
}
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
+ return constraints;
+ }
+
GetNextResult getNext() final;
private:
diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp b/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp
new file mode 100644
index 00000000000..a49514cf628
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp
@@ -0,0 +1,109 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_internal_split_pipeline.h"
+
+namespace mongo {
+
+REGISTER_DOCUMENT_SOURCE(_internalSplitPipeline,
+ LiteParsedDocumentSourceDefault::parse,
+ DocumentSourceInternalSplitPipeline::createFromBson);
+
+constexpr StringData DocumentSourceInternalSplitPipeline::kStageName;
+
+boost::intrusive_ptr<DocumentSource> DocumentSourceInternalSplitPipeline::createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ uassert(ErrorCodes::TypeMismatch,
+ str::stream() << "$_internalSplitPipeline must take a nested object but found: "
+ << elem,
+ elem.type() == BSONType::Object);
+
+ auto specObj = elem.embeddedObject();
+
+ HostTypeRequirement mergeType = HostTypeRequirement::kAnyShard;
+
+ for (auto&& elt : specObj) {
+ if (elt.fieldNameStringData() == "mergeType"_sd) {
+ uassert(ErrorCodes::BadValue,
+ str::stream() << "'mergeType' must be a string value but found: " << elt.type(),
+ elt.type() == BSONType::String);
+
+ auto mergeTypeString = elt.valueStringData();
+
+ if ("anyShard"_sd == mergeTypeString) {
+ mergeType = HostTypeRequirement::kAnyShard;
+ } else if ("primaryShard"_sd == mergeTypeString) {
+ mergeType = HostTypeRequirement::kPrimaryShard;
+ } else if ("mongos"_sd == mergeTypeString) {
+ mergeType = HostTypeRequirement::kAnyShardOrMongoS;
+ } else {
+ uasserted(ErrorCodes::BadValue,
+ str::stream() << "unrecognized field while parsing mergeType: '"
+ << elt.fieldNameStringData()
+ << "'");
+ }
+ } else {
+ uasserted(ErrorCodes::BadValue,
+ str::stream() << "unrecognized field while parsing $_internalSplitPipeline: '"
+ << elt.fieldNameStringData()
+ << "'");
+ }
+ }
+
+ return new DocumentSourceInternalSplitPipeline(expCtx, mergeType);
+}
+
+DocumentSource::GetNextResult DocumentSourceInternalSplitPipeline::getNext() {
+ pExpCtx->checkForInterrupt();
+ return pSource->getNext();
+}
+
+Value DocumentSourceInternalSplitPipeline::serialize(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
+ std::string mergeTypeString;
+
+ switch (_mergeType) {
+ case HostTypeRequirement::kAnyShardOrMongoS:
+ mergeTypeString = "mongos";
+ break;
+
+ case HostTypeRequirement::kPrimaryShard:
+ mergeTypeString = "primaryShard";
+ break;
+
+ default:
+ mergeTypeString = "anyShard";
+ break;
+ }
+
+ return Value(Document{{getSourceName(), Value{Document{{"mergeType", mergeTypeString}}}}});
+}
+
+} // namesace mongo
diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
new file mode 100644
index 00000000000..c18a6d301a6
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
@@ -0,0 +1,81 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+/**
+ * An internal stage available for testing. Acts as a simple passthrough of intermediate results
+ * from the source stage, but forces the pipeline to split at the point where this stage appears
+ * (assuming that no earlier splitpoints exist). Takes a single parameter, 'mergeType', which can be
+ * one of 'anyShard', 'primaryShard' or 'mongos' to control where the merge may occur. Omitting this
+ * parameter or specifying 'anyShard' produces the default merging behaviour; the merge half of the
+ * pipeline will be sent to a random participating shard, subject to the requirements of any
+ * subsequent splittable stages in the pipeline.
+ */
+class DocumentSourceInternalSplitPipeline final : public DocumentSource,
+ public SplittableDocumentSource {
+public:
+ static constexpr StringData kStageName = "$_internalSplitPipeline"_sd;
+
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement, const boost::intrusive_ptr<ExpressionContext>&);
+
+ DocumentSourceInternalSplitPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ HostTypeRequirement mergeType)
+ : DocumentSource(expCtx), _mergeType(mergeType) {}
+
+ const char* getSourceName() const final {
+ return kStageName.rawData();
+ }
+
+ boost::intrusive_ptr<DocumentSource> getShardSource() final {
+ return this;
+ }
+
+ boost::intrusive_ptr<DocumentSource> getMergeSource() final {
+ return this;
+ }
+
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.hostRequirement = _mergeType;
+ return constraints;
+ }
+
+ GetNextResult getNext() final;
+
+private:
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
+ HostTypeRequirement _mergeType = HostTypeRequirement::kAnyShard;
+};
+
+} // namesace mongo
diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h
index c6660152d66..88acfa8b45a 100644
--- a/src/mongo/db/pipeline/document_source_limit.h
+++ b/src/mongo/db/pipeline/document_source_limit.h
@@ -42,6 +42,12 @@ public:
: SimpleBSONObjComparator::kInstance.makeBSONObjSet();
}
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
+ return constraints;
+ }
+
/**
* Attempts to combine with a subsequent $limit stage, setting 'limit' appropriately.
*/
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index d5bab6fd9e7..a4cd8d35ed6 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -97,7 +97,7 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
constraints.canSwapWithMatch = true;
- constraints.mustRunOnPrimaryShardIfSharded = true;
+ constraints.hostRequirement = HostTypeRequirement::kPrimaryShard;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h
index 2be99dd7f12..135dbc39e17 100644
--- a/src/mongo/db/pipeline/document_source_match.h
+++ b/src/mongo/db/pipeline/document_source_match.h
@@ -49,6 +49,13 @@ public:
}
const char* getSourceName() const override;
+
+ StageConstraints constraints() const override {
+ StageConstraints constraints;
+ constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
+ return constraints;
+ }
+
Value serialize(
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override;
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
index 93cd6dd93e7..d53c2183443 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
@@ -176,6 +176,12 @@ DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() {
return std::move(next);
}
+bool DocumentSourceMergeCursors::remotesExhausted() const {
+ return std::all_of(_cursors.begin(), _cursors.end(), [](const auto& cursorAndConn) {
+ return cursorAndConn->cursor.isDead();
+ });
+}
+
void DocumentSourceMergeCursors::doDispose() {
for (auto&& cursorAndConn : _cursors) {
// Note it is an error to call done() on a connection before consuming the reply from a
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h
index 045ddad4836..e83316fa6ea 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.h
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.h
@@ -55,7 +55,8 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
+ constraints.requiredPosition = PositionRequirement::kFirst;
constraints.requiresInputDocSource = false;
constraints.isAllowedInsideFacetStage = false;
return constraints;
@@ -68,6 +69,11 @@ public:
std::vector<CursorDescriptor> cursorDescriptors,
const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ /**
+ * Returns true if all remotes have reported that their cursors are closed.
+ */
+ bool remotesExhausted() const;
+
/** Returns non-owning pointers to cursors managed by this stage.
* Call this instead of getNext() if you want access to the raw streams.
* This method should only be called at most once.
diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h
index 513994cd599..22f7a1aa24d 100644
--- a/src/mongo/db/pipeline/document_source_mock.h
+++ b/src/mongo/db/pipeline/document_source_mock.h
@@ -51,7 +51,7 @@ public:
StageConstraints constraints() const override {
StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.requiredPosition = PositionRequirement::kFirst;
constraints.requiresInputDocSource = false;
constraints.isAllowedInsideFacetStage = false;
return constraints;
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 9366d01a670..6756cd4df2a 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -46,9 +46,9 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
- constraints.mustRunOnPrimaryShardIfSharded = true;
+ constraints.hostRequirement = HostTypeRequirement::kPrimaryShard;
constraints.isAllowedInsideFacetStage = false;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kLast;
+ constraints.requiredPosition = PositionRequirement::kLast;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h
index a76ca9c7940..bedc434d61e 100644
--- a/src/mongo/db/pipeline/document_source_redact.h
+++ b/src/mongo/db/pipeline/document_source_redact.h
@@ -40,6 +40,12 @@ public:
const char* getSourceName() const final;
boost::intrusive_ptr<DocumentSource> optimize() final;
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
+ return constraints;
+ }
+
/**
* Attempts to duplicate the redact-safe portion of a subsequent $match before the $redact
* stage.
diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp
index 5117e5c516a..c174d95d935 100644
--- a/src/mongo/db/pipeline/document_source_sample.cpp
+++ b/src/mongo/db/pipeline/document_source_sample.cpp
@@ -123,6 +123,11 @@ intrusive_ptr<DocumentSource> DocumentSourceSample::getShardSource() {
intrusive_ptr<DocumentSource> DocumentSourceSample::getMergeSource() {
// Just need to merge the pre-sorted documents by their random values.
- return DocumentSourceSort::create(pExpCtx, randSortSpec, _size);
+ BSONObjBuilder randMergeSortSpec;
+
+ randMergeSortSpec.appendElements(randSortSpec);
+ randMergeSortSpec.append("$mergePresorted", true);
+
+ return DocumentSourceSort::create(pExpCtx, randMergeSortSpec.obj(), _size);
}
} // mongo
diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h
index ec88c0737a8..662c6a9a49d 100644
--- a/src/mongo/db/pipeline/document_source_sample.h
+++ b/src/mongo/db/pipeline/document_source_sample.h
@@ -39,6 +39,12 @@ public:
const char* getSourceName() const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
+ return constraints;
+ }
+
GetDepsReturn getDependencies(DepsTracker* deps) const final {
return SEE_NEXT;
}
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h
index b50065bc303..188e9864310 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.h
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h
@@ -103,6 +103,7 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
+ constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
constraints.canSwapWithMatch = true;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h
index 46cb6a6f3ab..a69f5e59eb5 100644
--- a/src/mongo/db/pipeline/document_source_skip.h
+++ b/src/mongo/db/pipeline/document_source_skip.h
@@ -37,6 +37,13 @@ public:
// virtuals from DocumentSource
GetNextResult getNext() final;
const char* getSourceName() const final;
+
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
+ return constraints;
+ }
+
/**
* Attempts to move a subsequent $limit before the skip, potentially allowing for forther
* optimizations earlier in the pipeline.
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index 866a3fb955c..5731051cbd3 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -57,6 +57,10 @@ public:
// Can't swap with a $match if a limit has been absorbed, since in general match can't swap
// with limit.
constraints.canSwapWithMatch = !limitSrc;
+
+ // Can run on mongoS only if this stage is merging presorted streams.
+ constraints.hostRequirement = (_mergingPresorted ? HostTypeRequirement::kAnyShardOrMongoS
+ : HostTypeRequirement::kAnyShard);
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h
index fb7bd68ba39..5bc9a91afdb 100644
--- a/src/mongo/db/pipeline/document_source_unwind.h
+++ b/src/mongo/db/pipeline/document_source_unwind.h
@@ -48,6 +48,7 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
+ constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
constraints.canSwapWithMatch = true;
return constraints;
}
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index e5cd62a4670..8b84fe36bff 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -62,6 +62,9 @@ using std::vector;
namespace dps = ::mongo::dotted_path_support;
+using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement;
+using PositionRequirement = DocumentSource::StageConstraints::PositionRequirement;
+
Pipeline::Pipeline(const intrusive_ptr<ExpressionContext>& pTheCtx) : pCtx(pTheCtx) {}
Pipeline::Pipeline(SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx)
@@ -171,8 +174,7 @@ Status Pipeline::validateFacetPipeline() const {
// We expect a stage within a $facet stage to have these properties.
invariant(stageConstraints.requiresInputDocSource);
invariant(!stageConstraints.isIndependentOfAnyCollection);
- invariant(stageConstraints.requiredPosition ==
- DocumentSource::StageConstraints::PositionRequirement::kNone);
+ invariant(stageConstraints.requiredPosition == PositionRequirement::kNone);
}
// Facet pipelines cannot have any stages which are initial sources. We've already validated the
@@ -184,9 +186,7 @@ Status Pipeline::validateFacetPipeline() const {
Status Pipeline::ensureAllStagesAreInLegalPositions() const {
size_t i = 0;
for (auto&& stage : _sources) {
- if (stage->constraints().requiredPosition ==
- DocumentSource::StageConstraints::PositionRequirement::kFirst &&
- i != 0) {
+ if (stage->constraints().requiredPosition == PositionRequirement::kFirst && i != 0) {
return {ErrorCodes::BadValue,
str::stream() << stage->getSourceName()
<< " is only valid as the first stage in a pipeline.",
@@ -199,8 +199,7 @@ Status Pipeline::ensureAllStagesAreInLegalPositions() const {
17313};
}
- if (stage->constraints().requiredPosition ==
- DocumentSource::StageConstraints::PositionRequirement::kLast &&
+ if (stage->constraints().requiredPosition == PositionRequirement::kLast &&
i != _sources.size() - 1) {
return {ErrorCodes::BadValue,
str::stream() << stage->getSourceName()
@@ -312,6 +311,8 @@ std::unique_ptr<Pipeline, Pipeline::Deleter> Pipeline::splitForSharded() {
shardPipeline->_splitForSharded = true;
_splitForMerge = true;
+ stitch();
+
return shardPipeline;
}
@@ -428,7 +429,13 @@ BSONObj Pipeline::getInitialQuery() const {
bool Pipeline::needsPrimaryShardMerger() const {
return std::any_of(_sources.begin(), _sources.end(), [](const auto& stage) {
- return stage->constraints().mustRunOnPrimaryShardIfSharded;
+ return stage->constraints().hostRequirement == HostTypeRequirement::kPrimaryShard;
+ });
+}
+
+bool Pipeline::canRunOnMongos() const {
+ return std::all_of(_sources.begin(), _sources.end(), [](const auto& stage) {
+ return stage->constraints().hostRequirement == HostTypeRequirement::kAnyShardOrMongoS;
});
}
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 677cfe7b6d1..ed19d44ae2b 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -219,6 +219,11 @@ public:
bool needsPrimaryShardMerger() const;
/**
+ * Returns whether or not every DocumentSource in the pipeline can run on mongoS.
+ */
+ bool canRunOnMongos() const;
+
+ /**
* Modifies the pipeline, optimizing it by combining and swapping stages.
*/
void optimizePipeline();