summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNicholas Zolnierz <nicholas.zolnierz@mongodb.com>2019-10-08 22:18:54 +0000
committerevergreen <evergreen@mongodb.com>2019-10-08 22:18:54 +0000
commit13c2e614e05cb58753ee3a89a0fa9b14d0837a6d (patch)
tree9f9ba57524e99c069a4a5e0db82f6490858e7cac
parent40fb24a73ce0c8c1092dda3f2631648990f4587a (diff)
downloadmongo-13c2e614e05cb58753ee3a89a0fa9b14d0837a6d.tar.gz
SERVER-42942 M/R Agg: Implement translation for cluster mapReduce command
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_map_reduce_agg.yaml12
-rw-r--r--jstests/aggregation/sharded_agg_cleanup_on_error.js8
-rw-r--r--jstests/sharding/change_streams_establishment_finds_new_shards.js6
-rw-r--r--src/mongo/db/commands/SConscript6
-rw-r--r--src/mongo/db/commands/map_reduce_agg.cpp169
-rw-r--r--src/mongo/db/commands/map_reduce_agg.h3
-rw-r--r--src/mongo/db/commands/map_reduce_agg_test.cpp17
-rw-r--r--src/mongo/db/commands/map_reduce_command_base.h4
-rw-r--r--src/mongo/db/commands/mr.cpp19
-rw-r--r--src/mongo/db/commands/mr.h2
-rw-r--r--src/mongo/db/commands/mr_common.cpp197
-rw-r--r--src/mongo/db/commands/mr_common.h21
-rw-r--r--src/mongo/db/commands/mr_test.cpp69
-rw-r--r--src/mongo/db/pipeline/aggregation_request.cpp1
-rw-r--r--src/mongo/db/pipeline/lite_parsed_pipeline.cpp7
-rw-r--r--src/mongo/db/pipeline/lite_parsed_pipeline.h5
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h28
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp1012
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.h118
-rw-r--r--src/mongo/s/commands/cluster_map_reduce.cpp37
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_agg.cpp197
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_cmd.cpp37
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp856
-rw-r--r--src/mongo/s/query/cluster_aggregate.h9
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp6
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.h6
27 files changed, 1520 insertions, 1333 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 81a68edf47b..61d8bfde017 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -49,6 +49,7 @@ selector:
- jstests/sharding/track_unsharded_collections_rename_collection.js
- jstests/sharding/banned_txn_databases_sharded.js
- jstests/sharding/split_large_key.js
+ - jstests/sharding/change_streams_establishment_finds_new_shards.js
# Enable if SERVER-41813 is backported or 4.4 becomes last-stable
- jstests/sharding/invalid_system_views_sharded_collection.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_map_reduce_agg.yaml b/buildscripts/resmokeconfig/suites/sharding_map_reduce_agg.yaml
index f126871e84d..4de6fde506a 100644
--- a/buildscripts/resmokeconfig/suites/sharding_map_reduce_agg.yaml
+++ b/buildscripts/resmokeconfig/suites/sharding_map_reduce_agg.yaml
@@ -4,8 +4,16 @@ test_kind: js_test
selector:
roots:
- # Placeholder test to avoid failing in resmoke.
- - jstests/sharding/accurate_count_with_predicate.js
+ - jstests/sharding/auth.js
+ - jstests/sharding/authCommands.js
+ - jstests/sharding/authmr.js
+ - jstests/sharding/causal_consistency_shell_support.js
+ - jstests/sharding/localhostAuthBypass.js
+ - jstests/sharding/max_time_ms_sharded.js
+ - jstests/sharding/mr_and_agg_versioning.js
+ - jstests/sharding/mr_shard_version.js
+ - jstests/sharding/query_config.js
+ - jstests/sharding/shard_targeting.js
executor:
config:
diff --git a/jstests/aggregation/sharded_agg_cleanup_on_error.js b/jstests/aggregation/sharded_agg_cleanup_on_error.js
index 5fba3e477a9..fb646819d06 100644
--- a/jstests/aggregation/sharded_agg_cleanup_on_error.js
+++ b/jstests/aggregation/sharded_agg_cleanup_on_error.js
@@ -88,7 +88,7 @@ try {
try {
// Enable the failpoint to fail on establishing a merging shard cursor.
assert.commandWorked(mongosDB.adminCommand({
- configureFailPoint: "clusterAggregateFailToEstablishMergingShardCursor",
+ configureFailPoint: "shardedAggregateFailToEstablishMergingShardCursor",
mode: "alwaysOn"
}));
@@ -103,13 +103,13 @@ try {
} finally {
assert.commandWorked(mongosDB.adminCommand(
- {configureFailPoint: "clusterAggregateFailToEstablishMergingShardCursor", mode: "off"}));
+ {configureFailPoint: "shardedAggregateFailToEstablishMergingShardCursor", mode: "off"}));
}
// Test that aggregations involving $exchange correctly clean up the producer cursors.
try {
assert.commandWorked(mongosDB.adminCommand({
- configureFailPoint: "clusterAggregateFailToDispatchExchangeConsumerPipeline",
+ configureFailPoint: "shardedAggregateFailToDispatchExchangeConsumerPipeline",
mode: "alwaysOn"
}));
@@ -133,7 +133,7 @@ try {
} finally {
assert.commandWorked(mongosDB.adminCommand({
- configureFailPoint: "clusterAggregateFailToDispatchExchangeConsumerPipeline",
+ configureFailPoint: "shardedAggregateFailToDispatchExchangeConsumerPipeline",
mode: "off"
}));
}
diff --git a/jstests/sharding/change_streams_establishment_finds_new_shards.js b/jstests/sharding/change_streams_establishment_finds_new_shards.js
index 8f2393f99ee..6a232856bb7 100644
--- a/jstests/sharding/change_streams_establishment_finds_new_shards.js
+++ b/jstests/sharding/change_streams_establishment_finds_new_shards.js
@@ -40,13 +40,13 @@ assert.commandWorked(mongos.adminCommand({split: mongosColl.getFullName(), middl
// Enable the failpoint.
assert.commandWorked(mongos.adminCommand(
- {configureFailPoint: "clusterAggregateHangBeforeEstablishingShardCursors", mode: "alwaysOn"}));
+ {configureFailPoint: "shardedAggregateHangBeforeEstablishingShardCursors", mode: "alwaysOn"}));
// While opening the cursor, wait for the failpoint and add the new shard.
const awaitNewShard = startParallelShell(`
load("jstests/libs/check_log.js");
checkLog.contains(db,
- "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled");
+ "shardedAggregateHangBeforeEstablishingShardCursors fail point enabled");
assert.commandWorked(
db.adminCommand({addShard: "${newShard.getURL()}", name: "${newShard.name}"}));
// Migrate the [10, MaxKey] chunk to "newShard".
@@ -56,7 +56,7 @@ const awaitNewShard = startParallelShell(`
_waitForDelete: true}));
assert.commandWorked(
db.adminCommand(
- {configureFailPoint: "clusterAggregateHangBeforeEstablishingShardCursors",
+ {configureFailPoint: "shardedAggregateHangBeforeEstablishingShardCursors",
mode: "off"}));`,
mongos.port);
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index fce6f57b45e..300595e2a4f 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -157,6 +157,7 @@ env.Library(
'$BUILD_DIR/mongo/db/logical_session_cache',
'$BUILD_DIR/mongo/db/logical_session_id_helpers',
'$BUILD_DIR/mongo/db/logical_session_id',
+ '$BUILD_DIR/mongo/db/pipeline/pipeline',
'$BUILD_DIR/mongo/db/repl/isself',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
'$BUILD_DIR/mongo/db/session_catalog',
@@ -508,10 +509,10 @@ env.Library(
'map_reduce_agg.cpp',
],
LIBDEPS=[
- '$BUILD_DIR/mongo/db/db_raii',
'$BUILD_DIR/mongo/idl/idl_parser',
+ '$BUILD_DIR/mongo/db/commands/servers',
+ '$BUILD_DIR/mongo/db/db_raii',
'$BUILD_DIR/mongo/db/pipeline/mongo_process_interface',
- '$BUILD_DIR/mongo/db/pipeline/pipeline',
'$BUILD_DIR/mongo/db/query/map_reduce_output_format',
'map_reduce_parser'
]
@@ -541,6 +542,7 @@ env.CppUnitTest(
"map_reduce_parse_test.cpp",
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/db/auth/authmocks',
'$BUILD_DIR/mongo/db/query/query_test_service_context',
'map_reduce_agg',
]
diff --git a/src/mongo/db/commands/map_reduce_agg.cpp b/src/mongo/db/commands/map_reduce_agg.cpp
index aaffa84dc19..f1c94a9c339 100644
--- a/src/mongo/db/commands/map_reduce_agg.cpp
+++ b/src/mongo/db/commands/map_reduce_agg.cpp
@@ -41,155 +41,18 @@
#include "mongo/db/commands.h"
#include "mongo/db/commands/map_reduce_agg.h"
#include "mongo/db/commands/map_reduce_javascript_code.h"
+#include "mongo/db/commands/mr_common.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/exec/document_value/value.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/pipeline/document_source.h"
-#include "mongo/db/pipeline/document_source_group.h"
-#include "mongo/db/pipeline/document_source_limit.h"
-#include "mongo/db/pipeline/document_source_match.h"
-#include "mongo/db/pipeline/document_source_merge.h"
-#include "mongo/db/pipeline/document_source_out.h"
-#include "mongo/db/pipeline/document_source_project.h"
-#include "mongo/db/pipeline/document_source_single_document_transformation.h"
-#include "mongo/db/pipeline/document_source_sort.h"
-#include "mongo/db/pipeline/document_source_unwind.h"
#include "mongo/db/pipeline/expression.h"
-#include "mongo/db/pipeline/expression_javascript.h"
-#include "mongo/db/pipeline/parsed_aggregation_projection_node.h"
-#include "mongo/db/pipeline/parsed_inclusion_projection.h"
#include "mongo/db/pipeline/pipeline_d.h"
#include "mongo/db/query/map_reduce_output_format.h"
-#include "mongo/db/query/util/make_data_structure.h"
-#include "mongo/util/intrusive_counter.h"
namespace mongo::map_reduce_agg {
namespace {
-using namespace std::string_literals;
-
-auto translateSort(boost::intrusive_ptr<ExpressionContext> expCtx,
- const BSONObj& sort,
- const boost::optional<std::int64_t>& limit) {
- return DocumentSourceSort::create(expCtx, sort, limit.get_value_or(-1));
-}
-
-auto translateMap(boost::intrusive_ptr<ExpressionContext> expCtx, std::string code) {
- auto emitExpression = ExpressionInternalJsEmit::create(
- expCtx, ExpressionFieldPath::parse(expCtx, "$$ROOT", expCtx->variablesParseState), code);
- auto node = std::make_unique<parsed_aggregation_projection::InclusionNode>(
- ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId});
- node->addExpressionForPath(FieldPath{"emits"s}, std::move(emitExpression));
- auto inclusion = std::unique_ptr<TransformerInterface>{
- std::make_unique<parsed_aggregation_projection::ParsedInclusionProjection>(
- expCtx,
- ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId},
- std::move(node))};
- return make_intrusive<DocumentSourceSingleDocumentTransformation>(
- expCtx, std::move(inclusion), DocumentSourceProject::kStageName, false);
-}
-
-auto translateReduce(boost::intrusive_ptr<ExpressionContext> expCtx, std::string code) {
- auto accumulatorArguments = ExpressionObject::create(
- expCtx,
- make_vector<std::pair<std::string, boost::intrusive_ptr<Expression>>>(
- std::pair{"data"s,
- ExpressionFieldPath::parse(expCtx, "$emits", expCtx->variablesParseState)},
- std::pair{"eval"s, ExpressionConstant::create(expCtx, Value{code})}));
- auto jsReduce = AccumulationStatement{
- "value",
- std::move(accumulatorArguments),
- AccumulationStatement::getFactory(AccumulatorInternalJsReduce::kAccumulatorName)};
- auto groupExpr = ExpressionFieldPath::parse(expCtx, "$emits.k", expCtx->variablesParseState);
- return DocumentSourceGroup::create(expCtx,
- std::move(groupExpr),
- make_vector<AccumulationStatement>(std::move(jsReduce)),
- boost::none);
-}
-
-auto translateFinalize(boost::intrusive_ptr<ExpressionContext> expCtx, std::string code) {
- auto jsExpression = ExpressionInternalJs::create(
- expCtx,
- ExpressionArray::create(
- expCtx,
- make_vector<boost::intrusive_ptr<Expression>>(
- ExpressionFieldPath::parse(expCtx, "$_id", expCtx->variablesParseState),
- ExpressionFieldPath::parse(expCtx, "$value", expCtx->variablesParseState))),
- code);
- auto node = std::make_unique<parsed_aggregation_projection::InclusionNode>(
- ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId});
- node->addExpressionForPath(FieldPath{"value"s}, std::move(jsExpression));
- auto inclusion = std::unique_ptr<TransformerInterface>{
- std::make_unique<parsed_aggregation_projection::ParsedInclusionProjection>(
- expCtx,
- ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId},
- std::move(node))};
- return make_intrusive<DocumentSourceSingleDocumentTransformation>(
- expCtx, std::move(inclusion), DocumentSourceProject::kStageName, false);
-}
-
-auto translateOutReplace(boost::intrusive_ptr<ExpressionContext> expCtx,
- const StringData inputDatabase,
- NamespaceString targetNss) {
- uassert(31278,
- "MapReduce must output to the database belonging to its input collection - Input: "s +
- inputDatabase + "Output: " + targetNss.db(),
- inputDatabase == targetNss.db());
- return DocumentSourceOut::create(std::move(targetNss), expCtx);
-}
-
-auto translateOutMerge(boost::intrusive_ptr<ExpressionContext> expCtx, NamespaceString targetNss) {
- return DocumentSourceMerge::create(targetNss,
- expCtx,
- MergeWhenMatchedModeEnum::kReplace,
- MergeWhenNotMatchedModeEnum::kInsert,
- boost::none, // Let variables
- boost::none, // pipeline
- std::set<FieldPath>{FieldPath("_id"s)},
- boost::none); // targetCollectionVersion
-}
-
-auto translateOutReduce(boost::intrusive_ptr<ExpressionContext> expCtx,
- NamespaceString targetNss,
- std::string code) {
- // Because of communication for sharding, $merge must hold on to a serializable BSON object
- // at the moment so we reparse here.
- auto reduceObj = BSON("args" << BSON_ARRAY("$value"
- << "$$new.value")
- << "eval" << code);
-
- auto finalProjectSpec =
- BSON(DocumentSourceProject::kStageName
- << BSON("value" << BSON(ExpressionInternalJs::kExpressionName << reduceObj)));
- auto pipelineSpec = boost::make_optional(std::vector<BSONObj>{finalProjectSpec});
- return DocumentSourceMerge::create(targetNss,
- expCtx,
- MergeWhenMatchedModeEnum::kPipeline,
- MergeWhenNotMatchedModeEnum::kInsert,
- boost::none, // Let variables
- pipelineSpec,
- std::set<FieldPath>{FieldPath("_id"s)},
- boost::none); // targetCollectionVersion
-}
-
-auto translateOut(boost::intrusive_ptr<ExpressionContext> expCtx,
- const OutputType outputType,
- const StringData inputDatabase,
- NamespaceString targetNss,
- std::string reduceCode) {
- switch (outputType) {
- case OutputType::Replace:
- return boost::make_optional(translateOutReplace(expCtx, inputDatabase, targetNss));
- case OutputType::Merge:
- return boost::make_optional(translateOutMerge(expCtx, targetNss));
- case OutputType::Reduce:
- return boost::make_optional(translateOutReduce(expCtx, targetNss, reduceCode));
- case OutputType::InMemory:;
- }
- return boost::optional<boost::intrusive_ptr<mongo::DocumentSource>>{};
-}
-
auto makeExpressionContext(OperationContext* opCtx, const MapReduce& parsedMr) {
// AutoGetCollectionForReadCommand will throw if the sharding version for this connection is
// out of date.
@@ -247,7 +110,7 @@ bool runAggregationMapReduce(OperationContext* opCtx,
auto parsedMr = MapReduce::parse(IDLParserErrorContext("MapReduce"), cmd);
auto expCtx = makeExpressionContext(opCtx, parsedMr);
auto runnablePipeline = [&]() {
- auto pipeline = translateFromMR(parsedMr, expCtx);
+ auto pipeline = map_reduce_common::translateFromMR(parsedMr, expCtx);
return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead(
expCtx, pipeline.release());
}();
@@ -274,32 +137,4 @@ bool runAggregationMapReduce(OperationContext* opCtx,
return true;
}
-std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR(
- MapReduce parsedMr, boost::intrusive_ptr<ExpressionContext> expCtx) {
-
- // TODO: It would be good to figure out what kind of errors this would produce in the Status.
- // It would be better not to produce something incomprehensible out of an internal translation.
- return uassertStatusOK(Pipeline::create(
- makeFlattenedList<boost::intrusive_ptr<DocumentSource>>(
- parsedMr.getQuery().map(
- [&](auto&& query) { return DocumentSourceMatch::create(query, expCtx); }),
- parsedMr.getSort().map(
- [&](auto&& sort) { return translateSort(expCtx, sort, parsedMr.getLimit()); }),
- translateMap(expCtx, parsedMr.getMap().getCode()),
- DocumentSourceUnwind::create(expCtx, "emits", false, boost::none),
- translateReduce(expCtx, parsedMr.getReduce().getCode()),
- parsedMr.getFinalize().map([&](auto&& finalize) {
- return translateFinalize(expCtx, parsedMr.getFinalize()->getCode());
- }),
- translateOut(expCtx,
- parsedMr.getOutOptions().getOutputType(),
- parsedMr.getNamespace().db(),
- NamespaceString{parsedMr.getOutOptions().getDatabaseName()
- ? *parsedMr.getOutOptions().getDatabaseName()
- : parsedMr.getNamespace().db(),
- parsedMr.getOutOptions().getCollectionName()},
- parsedMr.getReduce().getCode())),
- expCtx));
-}
-
} // namespace mongo::map_reduce_agg
diff --git a/src/mongo/db/commands/map_reduce_agg.h b/src/mongo/db/commands/map_reduce_agg.h
index 573119c0393..7c5972a7de9 100644
--- a/src/mongo/db/commands/map_reduce_agg.h
+++ b/src/mongo/db/commands/map_reduce_agg.h
@@ -47,7 +47,4 @@ bool runAggregationMapReduce(OperationContext* opCtx,
std::string& errmsg,
BSONObjBuilder& result);
-std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR(
- MapReduce parsedMr, boost::intrusive_ptr<ExpressionContext> expCtx);
-
} // namespace mongo::map_reduce_agg
diff --git a/src/mongo/db/commands/map_reduce_agg_test.cpp b/src/mongo/db/commands/map_reduce_agg_test.cpp
index 17b5f2520ba..3e78b1cb4c4 100644
--- a/src/mongo/db/commands/map_reduce_agg_test.cpp
+++ b/src/mongo/db/commands/map_reduce_agg_test.cpp
@@ -35,6 +35,7 @@
#include "mongo/base/string_data.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/db/commands/map_reduce_agg.h"
+#include "mongo/db/commands/mr_common.h"
#include "mongo/db/pipeline/document_source_group.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_merge.h"
@@ -65,7 +66,7 @@ TEST(MapReduceAggTest, testBasicTranslate) {
MapReduceJavascriptCode{reduceJavascript.toString()},
MapReduceOutOptions{boost::none, "", OutputType::InMemory, false}};
boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest(nss));
- auto pipeline = translateFromMR(mr, expCtx);
+ auto pipeline = map_reduce_common::translateFromMR(mr, expCtx);
auto& sources = pipeline->getSources();
ASSERT_EQ(3u, sources.size());
auto iter = sources.begin();
@@ -82,7 +83,7 @@ TEST(MapReduceAggTest, testSortWithoutLimit) {
MapReduceOutOptions{boost::none, "", OutputType::InMemory, false}};
mr.setSort(BSON("foo" << 1));
boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest(nss));
- auto pipeline = translateFromMR(mr, expCtx);
+ auto pipeline = map_reduce_common::translateFromMR(mr, expCtx);
auto& sources = pipeline->getSources();
ASSERT_EQ(4u, sources.size());
auto iter = sources.begin();
@@ -103,7 +104,7 @@ TEST(MapReduceAggTest, testSortWithLimit) {
mr.setSort(BSON("foo" << 1));
mr.setLimit(23);
boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest(nss));
- auto pipeline = translateFromMR(mr, expCtx);
+ auto pipeline = map_reduce_common::translateFromMR(mr, expCtx);
auto& sources = pipeline->getSources();
ASSERT_EQ(4u, sources.size());
auto iter = sources.begin();
@@ -127,7 +128,7 @@ TEST(MapReduceAggTest, testFeatureLadenTranslate) {
<< "fooval"));
mr.setFinalize(boost::make_optional(MapReduceJavascriptCode{finalizeJavascript.toString()}));
boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest(nss));
- auto pipeline = translateFromMR(mr, expCtx);
+ auto pipeline = map_reduce_common::translateFromMR(mr, expCtx);
auto& sources = pipeline->getSources();
ASSERT_EQ(7u, sources.size());
auto iter = sources.begin();
@@ -148,7 +149,7 @@ TEST(MapReduceAggTest, testOutMergeTranslate) {
MapReduceJavascriptCode{reduceJavascript.toString()},
MapReduceOutOptions{boost::make_optional("db"s), "coll2", OutputType::Merge, false}};
boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest(nss));
- auto pipeline = translateFromMR(mr, expCtx);
+ auto pipeline = map_reduce_common::translateFromMR(mr, expCtx);
auto& sources = pipeline->getSources();
ASSERT_EQ(sources.size(), 4u);
auto iter = sources.begin();
@@ -168,7 +169,7 @@ TEST(MapReduceAggTest, testOutReduceTranslate) {
MapReduceJavascriptCode{reduceJavascript.toString()},
MapReduceOutOptions{boost::make_optional("db"s), "coll2", OutputType::Reduce, false}};
boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest(nss));
- auto pipeline = translateFromMR(mr, expCtx);
+ auto pipeline = map_reduce_common::translateFromMR(mr, expCtx);
auto& sources = pipeline->getSources();
ASSERT_EQ(sources.size(), 4u);
auto iter = sources.begin();
@@ -190,7 +191,7 @@ TEST(MapReduceAggTest, testOutDifferentDBFails) {
MapReduceJavascriptCode{reduceJavascript.toString()},
MapReduceOutOptions{boost::make_optional("db2"s), "coll2", OutputType::Replace, false}};
boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest(nss));
- ASSERT_THROWS_CODE(translateFromMR(mr, expCtx), AssertionException, 31278);
+ ASSERT_THROWS_CODE(map_reduce_common::translateFromMR(mr, expCtx), AssertionException, 31278);
}
TEST(MapReduceAggTest, testOutSameCollection) {
@@ -201,7 +202,7 @@ TEST(MapReduceAggTest, testOutSameCollection) {
MapReduceJavascriptCode{reduceJavascript.toString()},
MapReduceOutOptions{boost::make_optional("db"s), "coll", OutputType::Replace, false}};
boost::intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest(nss));
- auto pipeline = translateFromMR(mr, expCtx);
+ auto pipeline = map_reduce_common::translateFromMR(mr, expCtx);
auto& sources = pipeline->getSources();
ASSERT_EQ(sources.size(), 4u);
auto iter = sources.begin();
diff --git a/src/mongo/db/commands/map_reduce_command_base.h b/src/mongo/db/commands/map_reduce_command_base.h
index 8b16b29f21f..4de4676e02d 100644
--- a/src/mongo/db/commands/map_reduce_command_base.h
+++ b/src/mongo/db/commands/map_reduce_command_base.h
@@ -45,7 +45,7 @@ public:
}
virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
- return mr::mrSupportsWriteConcern(cmd);
+ return map_reduce_common::mrSupportsWriteConcern(cmd);
}
bool allowsAfterClusterTime(const BSONObj& cmd) const override {
@@ -62,7 +62,7 @@ public:
virtual void addRequiredPrivileges(const std::string& dbname,
const BSONObj& cmdObj,
std::vector<Privilege>* out) const {
- mr::addPrivilegesRequiredForMapReduce(this, dbname, cmdObj, out);
+ map_reduce_common::addPrivilegesRequiredForMapReduce(this, dbname, cmdObj, out);
}
bool errmsgRun(OperationContext* opCtx,
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp
index b9d90555d4e..55e6586784d 100644
--- a/src/mongo/db/commands/mr.cpp
+++ b/src/mongo/db/commands/mr.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/client.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/commands.h"
+#include "mongo/db/commands/map_reduce_gen.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
@@ -425,17 +426,17 @@ Config::Config(const string& _dbname, const BSONObj& cmdObj) {
uassert(13602, "outType is no longer a valid option", cmdObj["outType"].eoo());
- outputOptions = mr::parseOutputOptions(dbname, cmdObj);
+ outputOptions = map_reduce_common::parseOutputOptions(dbname, cmdObj);
shardedFirstPass = false;
if (cmdObj.hasField("shardedFirstPass") && cmdObj["shardedFirstPass"].trueValue()) {
massert(16054,
"shardedFirstPass should only use replace outType",
- outputOptions.outType == mr::OutputType::kReplace);
+ outputOptions.outType == OutputType::Replace);
shardedFirstPass = true;
}
- if (outputOptions.outType != mr::OutputType::kInMemory) {
+ if (outputOptions.outType != OutputType::InMemory) {
// Create names for the temp collection and the incremental collection. The incremental
// collection goes in the "local" database, so that it doesn't get replicated.
const std::string& outDBName = outputOptions.outDB.empty() ? dbname : outputOptions.outDB;
@@ -739,7 +740,7 @@ void State::appendResults(BSONObjBuilder& final) {
* This may involve replacing, merging or reducing.
*/
long long State::postProcessCollection(OperationContext* opCtx, CurOp* curOp) {
- if (_onDisk == false || _config.outputOptions.outType == mr::OutputType::kInMemory)
+ if (_onDisk == false || _config.outputOptions.outType == OutputType::InMemory)
return numInMemKeys();
bool holdingGlobalLock = false;
@@ -763,7 +764,7 @@ long long State::postProcessCollectionNonAtomic(OperationContext* opCtx,
if (_config.outputOptions.finalNamespace == _config.tempNamespace)
return collectionCount(opCtx, _config.outputOptions.finalNamespace, callerHoldsGlobalLock);
- if (_config.outputOptions.outType == mr::OutputType::kReplace ||
+ if (_config.outputOptions.outType == OutputType::Replace ||
collectionCount(opCtx, _config.outputOptions.finalNamespace, callerHoldsGlobalLock) == 0) {
// This must be global because we may write across different databases.
Lock::GlobalWrite lock(opCtx);
@@ -780,7 +781,7 @@ long long State::postProcessCollectionNonAtomic(OperationContext* opCtx,
}
_db.dropCollection(_config.tempNamespace.ns());
- } else if (_config.outputOptions.outType == mr::OutputType::kMerge) {
+ } else if (_config.outputOptions.outType == OutputType::Merge) {
// merge: upsert new docs into old collection
const auto count = collectionCount(opCtx, _config.tempNamespace, callerHoldsGlobalLock);
@@ -799,7 +800,7 @@ long long State::postProcessCollectionNonAtomic(OperationContext* opCtx,
}
_db.dropCollection(_config.tempNamespace.ns());
pm.finished();
- } else if (_config.outputOptions.outType == mr::OutputType::kReduce) {
+ } else if (_config.outputOptions.outType == OutputType::Reduce) {
// reduce: apply reduce op on new result and existing one
BSONList values;
@@ -926,7 +927,7 @@ State::State(OperationContext* opCtx, const Config& c)
_dupCount(0),
_numEmits(0) {
_temp.reset(new InMemory());
- _onDisk = _config.outputOptions.outType != mr::OutputType::kInMemory;
+ _onDisk = _config.outputOptions.outType != OutputType::InMemory;
}
bool State::sourceExists() {
@@ -1747,7 +1748,7 @@ bool runMapReduceShardedFinish(OperationContext* opCtx,
std::vector<Chunk> chunks;
- if (config.outputOptions.outType != mr::OutputType::kInMemory) {
+ if (config.outputOptions.outType != OutputType::InMemory) {
auto outRoutingInfoStatus = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(
opCtx, config.outputOptions.finalNamespace);
uassertStatusOK(outRoutingInfoStatus.getStatus());
diff --git a/src/mongo/db/commands/mr.h b/src/mongo/db/commands/mr.h
index d5e32d1f9b5..6ea15c30fee 100644
--- a/src/mongo/db/commands/mr.h
+++ b/src/mongo/db/commands/mr.h
@@ -224,7 +224,7 @@ public:
NamespaceString incLong;
NamespaceString tempNamespace;
- mr::OutputOptions outputOptions;
+ map_reduce_common::OutputOptions outputOptions;
// max number of keys allowed in JS map before switching mode
long jsMaxKeys;
diff --git a/src/mongo/db/commands/mr_common.cpp b/src/mongo/db/commands/mr_common.cpp
index a13b323ea23..ea0dea5e218 100644
--- a/src/mongo/db/commands/mr_common.cpp
+++ b/src/mongo/db/commands/mr_common.cpp
@@ -40,15 +40,152 @@
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/commands.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_group.h"
+#include "mongo/db/pipeline/document_source_limit.h"
+#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_merge.h"
+#include "mongo/db/pipeline/document_source_out.h"
+#include "mongo/db/pipeline/document_source_project.h"
+#include "mongo/db/pipeline/document_source_single_document_transformation.h"
+#include "mongo/db/pipeline/document_source_sort.h"
+#include "mongo/db/pipeline/document_source_unwind.h"
+#include "mongo/db/pipeline/expression_javascript.h"
+#include "mongo/db/pipeline/parsed_aggregation_projection_node.h"
+#include "mongo/db/pipeline/parsed_inclusion_projection.h"
+#include "mongo/db/query/util/make_data_structure.h"
+#include "mongo/util/intrusive_counter.h"
#include "mongo/util/log.h"
#include "mongo/util/str.h"
-namespace mongo {
-
-namespace mr {
+namespace mongo::map_reduce_common {
namespace {
Rarely nonAtomicDeprecationSampler; // Used to occasionally log deprecation messages.
+
+using namespace std::string_literals;
+
+auto translateSort(boost::intrusive_ptr<ExpressionContext> expCtx,
+ const BSONObj& sort,
+ const boost::optional<std::int64_t>& limit) {
+ return DocumentSourceSort::create(expCtx, sort, limit.get_value_or(-1));
+}
+
+auto translateMap(boost::intrusive_ptr<ExpressionContext> expCtx, std::string code) {
+ auto emitExpression = ExpressionInternalJsEmit::create(
+ expCtx, ExpressionFieldPath::parse(expCtx, "$$ROOT", expCtx->variablesParseState), code);
+ auto node = std::make_unique<parsed_aggregation_projection::InclusionNode>(
+ ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId});
+ node->addExpressionForPath(FieldPath{"emits"s}, std::move(emitExpression));
+ auto inclusion = std::unique_ptr<TransformerInterface>{
+ std::make_unique<parsed_aggregation_projection::ParsedInclusionProjection>(
+ expCtx,
+ ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId},
+ std::move(node))};
+ return make_intrusive<DocumentSourceSingleDocumentTransformation>(
+ expCtx, std::move(inclusion), DocumentSourceProject::kStageName, false);
+}
+
+auto translateReduce(boost::intrusive_ptr<ExpressionContext> expCtx, std::string code) {
+ auto accumulatorArguments = ExpressionObject::create(
+ expCtx,
+ make_vector<std::pair<std::string, boost::intrusive_ptr<Expression>>>(
+ std::pair{"data"s,
+ ExpressionFieldPath::parse(expCtx, "$emits", expCtx->variablesParseState)},
+ std::pair{"eval"s, ExpressionConstant::create(expCtx, Value{code})}));
+ auto jsReduce = AccumulationStatement{
+ "value",
+ std::move(accumulatorArguments),
+ AccumulationStatement::getFactory(AccumulatorInternalJsReduce::kAccumulatorName)};
+ auto groupExpr = ExpressionFieldPath::parse(expCtx, "$emits.k", expCtx->variablesParseState);
+ return DocumentSourceGroup::create(expCtx,
+ std::move(groupExpr),
+ make_vector<AccumulationStatement>(std::move(jsReduce)),
+ boost::none);
+}
+
+auto translateFinalize(boost::intrusive_ptr<ExpressionContext> expCtx, std::string code) {
+ auto jsExpression = ExpressionInternalJs::create(
+ expCtx,
+ ExpressionArray::create(
+ expCtx,
+ make_vector<boost::intrusive_ptr<Expression>>(
+ ExpressionFieldPath::parse(expCtx, "$_id", expCtx->variablesParseState),
+ ExpressionFieldPath::parse(expCtx, "$value", expCtx->variablesParseState))),
+ code);
+ auto node = std::make_unique<parsed_aggregation_projection::InclusionNode>(
+ ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId});
+ node->addExpressionForPath(FieldPath{"value"s}, std::move(jsExpression));
+ auto inclusion = std::unique_ptr<TransformerInterface>{
+ std::make_unique<parsed_aggregation_projection::ParsedInclusionProjection>(
+ expCtx,
+ ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId},
+ std::move(node))};
+ return make_intrusive<DocumentSourceSingleDocumentTransformation>(
+ expCtx, std::move(inclusion), DocumentSourceProject::kStageName, false);
+}
+
+auto translateOutReplace(boost::intrusive_ptr<ExpressionContext> expCtx,
+ const StringData inputDatabase,
+ NamespaceString targetNss) {
+ uassert(31278,
+ "MapReduce must output to the database belonging to its input collection - Input: "s +
+ inputDatabase + " Output: " + targetNss.db(),
+ inputDatabase == targetNss.db());
+ return DocumentSourceOut::create(std::move(targetNss), expCtx);
+}
+
+auto translateOutMerge(boost::intrusive_ptr<ExpressionContext> expCtx, NamespaceString targetNss) {
+ return DocumentSourceMerge::create(targetNss,
+ expCtx,
+ MergeWhenMatchedModeEnum::kReplace,
+ MergeWhenNotMatchedModeEnum::kInsert,
+ boost::none, // Let variables
+ boost::none, // pipeline
+ std::set<FieldPath>{FieldPath("_id"s)},
+ boost::none); // targetCollectionVersion
+}
+
+auto translateOutReduce(boost::intrusive_ptr<ExpressionContext> expCtx,
+ NamespaceString targetNss,
+ std::string code) {
+ // Because of communication for sharding, $merge must hold on to a serializable BSON object
+ // at the moment so we reparse here.
+ auto reduceObj = BSON("args" << BSON_ARRAY("$value"
+ << "$$new.value")
+ << "eval" << code);
+
+ auto finalProjectSpec =
+ BSON(DocumentSourceProject::kStageName
+ << BSON("value" << BSON(ExpressionInternalJs::kExpressionName << reduceObj)));
+ auto pipelineSpec = boost::make_optional(std::vector<BSONObj>{finalProjectSpec});
+ return DocumentSourceMerge::create(targetNss,
+ expCtx,
+ MergeWhenMatchedModeEnum::kPipeline,
+ MergeWhenNotMatchedModeEnum::kInsert,
+ boost::none, // Let variables
+ pipelineSpec,
+ std::set<FieldPath>{FieldPath("_id"s)},
+ boost::none); // targetCollectionVersion
+}
+
+auto translateOut(boost::intrusive_ptr<ExpressionContext> expCtx,
+ const OutputType outputType,
+ const StringData inputDatabase,
+ NamespaceString targetNss,
+ std::string reduceCode) {
+ switch (outputType) {
+ case OutputType::Replace:
+ return boost::make_optional(translateOutReplace(expCtx, inputDatabase, targetNss));
+ case OutputType::Merge:
+ return boost::make_optional(translateOutMerge(expCtx, targetNss));
+ case OutputType::Reduce:
+ return boost::make_optional(translateOutReduce(expCtx, targetNss, reduceCode));
+ case OutputType::InMemory:;
+ }
+ return boost::optional<boost::intrusive_ptr<mongo::DocumentSource>>{};
+}
+
} // namespace
OutputOptions parseOutputOptions(const std::string& dbname, const BSONObj& cmdObj) {
@@ -57,24 +194,24 @@ OutputOptions parseOutputOptions(const std::string& dbname, const BSONObj& cmdOb
outputOptions.outNonAtomic = false;
if (cmdObj["out"].type() == String) {
outputOptions.collectionName = cmdObj["out"].String();
- outputOptions.outType = OutputType::kReplace;
+ outputOptions.outType = OutputType::Replace;
} else if (cmdObj["out"].type() == Object) {
BSONObj o = cmdObj["out"].embeddedObject();
if (o.hasElement("normal")) {
- outputOptions.outType = OutputType::kReplace;
+ outputOptions.outType = OutputType::Replace;
outputOptions.collectionName = o["normal"].String();
} else if (o.hasElement("replace")) {
- outputOptions.outType = OutputType::kReplace;
+ outputOptions.outType = OutputType::Replace;
outputOptions.collectionName = o["replace"].String();
} else if (o.hasElement("merge")) {
- outputOptions.outType = OutputType::kMerge;
+ outputOptions.outType = OutputType::Merge;
outputOptions.collectionName = o["merge"].String();
} else if (o.hasElement("reduce")) {
- outputOptions.outType = OutputType::kReduce;
+ outputOptions.outType = OutputType::Reduce;
outputOptions.collectionName = o["reduce"].String();
} else if (o.hasElement("inline")) {
- outputOptions.outType = OutputType::kInMemory;
+ outputOptions.outType = OutputType::InMemory;
uassert(ErrorCodes::InvalidOptions,
"cannot specify 'sharded' in combination with 'inline'",
!o.hasElement("sharded"));
@@ -96,8 +233,8 @@ OutputOptions parseOutputOptions(const std::string& dbname, const BSONObj& cmdOb
if (outputOptions.outNonAtomic) {
uassert(15895,
"nonAtomic option cannot be used with this output type",
- (outputOptions.outType == OutputType::kReduce ||
- outputOptions.outType == OutputType::kMerge));
+ (outputOptions.outType == OutputType::Reduce ||
+ outputOptions.outType == OutputType::Merge));
} else if (nonAtomicDeprecationSampler.tick()) {
warning() << "Setting out.nonAtomic to false in MapReduce is deprecated.";
}
@@ -106,7 +243,7 @@ OutputOptions parseOutputOptions(const std::string& dbname, const BSONObj& cmdOb
uasserted(13606, "'out' has to be a string or an object");
}
- if (outputOptions.outType != OutputType::kInMemory) {
+ if (outputOptions.outType != OutputType::InMemory) {
const StringData outDb(outputOptions.outDB.empty() ? dbname : outputOptions.outDB);
const NamespaceString nss(outDb, outputOptions.collectionName);
uassert(ErrorCodes::InvalidNamespace,
@@ -130,10 +267,10 @@ void addPrivilegesRequiredForMapReduce(const BasicCommand* commandTemplate,
inputResource.isExactNamespacePattern());
out->push_back(Privilege(inputResource, ActionType::find));
- if (outputOptions.outType != OutputType::kInMemory) {
+ if (outputOptions.outType != OutputType::InMemory) {
ActionSet outputActions;
outputActions.addAction(ActionType::insert);
- if (outputOptions.outType == OutputType::kReplace) {
+ if (outputOptions.outType == OutputType::Replace) {
outputActions.addAction(ActionType::remove);
} else {
outputActions.addAction(ActionType::update);
@@ -163,5 +300,33 @@ bool mrSupportsWriteConcern(const BSONObj& cmd) {
return true;
}
}
-} // namespace mr
-} // namespace mongo
+
+std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR(
+ MapReduce parsedMr, boost::intrusive_ptr<ExpressionContext> expCtx) {
+
+ // TODO: It would be good to figure out what kind of errors this would produce in the Status.
+ // It would be better not to produce something incomprehensible out of an internal translation.
+ return uassertStatusOK(Pipeline::create(
+ makeFlattenedList<boost::intrusive_ptr<DocumentSource>>(
+ parsedMr.getQuery().map(
+ [&](auto&& query) { return DocumentSourceMatch::create(query, expCtx); }),
+ parsedMr.getSort().map(
+ [&](auto&& sort) { return translateSort(expCtx, sort, parsedMr.getLimit()); }),
+ translateMap(expCtx, parsedMr.getMap().getCode()),
+ DocumentSourceUnwind::create(expCtx, "emits", false, boost::none),
+ translateReduce(expCtx, parsedMr.getReduce().getCode()),
+ parsedMr.getFinalize().map([&](auto&& finalize) {
+ return translateFinalize(expCtx, parsedMr.getFinalize()->getCode());
+ }),
+ translateOut(expCtx,
+ parsedMr.getOutOptions().getOutputType(),
+ parsedMr.getNamespace().db(),
+ NamespaceString{parsedMr.getOutOptions().getDatabaseName()
+ ? *parsedMr.getOutOptions().getDatabaseName()
+ : parsedMr.getNamespace().db(),
+ parsedMr.getOutOptions().getCollectionName()},
+ parsedMr.getReduce().getCode())),
+ expCtx));
+}
+
+} // namespace mongo::map_reduce_common
diff --git a/src/mongo/db/commands/mr_common.h b/src/mongo/db/commands/mr_common.h
index aad419b94bd..c3e80c11874 100644
--- a/src/mongo/db/commands/mr_common.h
+++ b/src/mongo/db/commands/mr_common.h
@@ -33,19 +33,11 @@
#include <vector>
#include "mongo/db/commands.h"
+#include "mongo/db/commands/map_reduce_gen.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
-namespace mongo {
-
-namespace mr {
-
-enum class OutputType {
- kReplace, // Atomically replace the collection.
- kMerge, // Merge keys, override dups.
- kReduce, // Merge keys, reduce dups.
- kInMemory // Only store in memory, limited in size.
-};
+namespace mongo::map_reduce_common {
struct OutputOptions {
std::string outDB;
@@ -68,6 +60,11 @@ void addPrivilegesRequiredForMapReduce(const BasicCommand* commandTemplate,
*/
bool mrSupportsWriteConcern(const BSONObj& cmd);
+/**
+ * Accepts a parsed mapReduce command and returns the equivalent aggregation pipeline. Note that the
+ * returned pipeline does *not* contain a $cursor stage and thus is not runnable.
+ */
+std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR(
+ MapReduce parsedMr, boost::intrusive_ptr<ExpressionContext> expCtx);
-} // namespace mr
-} // namespace mongo
+} // namespace mongo::map_reduce_common
diff --git a/src/mongo/db/commands/mr_test.cpp b/src/mongo/db/commands/mr_test.cpp
index 53b06339448..304edd39f83 100644
--- a/src/mongo/db/commands/mr_test.cpp
+++ b/src/mongo/db/commands/mr_test.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
+#include "mongo/db/commands/map_reduce_gen.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/json.h"
#include "mongo/db/op_observer_noop.h"
@@ -78,17 +79,17 @@ void _compareOutputOptionField(const std::string& dbname,
}
/**
- * Returns string representation of mr::Config::OutputType
+ * Returns string representation of OutputType
*/
-std::string _getOutTypeString(mr::OutputType outType) {
+std::string _getOutTypeString(OutputType outType) {
switch (outType) {
- case mr::OutputType::kReplace:
+ case OutputType::Replace:
return "REPLACE";
- case mr::OutputType::kMerge:
+ case OutputType::Merge:
return "MERGE";
- case mr::OutputType::kReduce:
+ case OutputType::Reduce:
return "REDUCE";
- case mr::OutputType::kInMemory:
+ case OutputType::InMemory:
return "INMEMORY";
}
MONGO_UNREACHABLE;
@@ -103,9 +104,10 @@ void _testConfigParseOutputOptions(const std::string& dbname,
const std::string& expectedCollectionName,
const std::string& expectedFinalNamespace,
bool expectedOutNonAtomic,
- mr::OutputType expectedOutType) {
+ OutputType expectedOutType) {
const BSONObj cmdObj = fromjson(cmdObjStr);
- mr::OutputOptions outputOptions = mr::parseOutputOptions(dbname, cmdObj);
+ map_reduce_common::OutputOptions outputOptions =
+ map_reduce_common::parseOutputOptions(dbname, cmdObj);
_compareOutputOptionField(dbname, cmdObjStr, "outDb", outputOptions.outDB, expectedOutDb);
_compareOutputOptionField(
dbname, cmdObjStr, "collectionName", outputOptions.collectionName, expectedCollectionName);
@@ -124,31 +126,34 @@ void _testConfigParseOutputOptions(const std::string& dbname,
}
/**
- * Tests for mr::parseOutputOptions.
+ * Tests for map_reduce_common::parseOutputOptions.
*/
TEST(ConfigOutputOptionsTest, parseOutputOptions) {
// Missing 'out' field.
- ASSERT_THROWS(mr::parseOutputOptions("mydb", fromjson("{}")), AssertionException);
+ ASSERT_THROWS(map_reduce_common::parseOutputOptions("mydb", fromjson("{}")),
+ AssertionException);
// 'out' must be either string or object.
- ASSERT_THROWS(mr::parseOutputOptions("mydb", fromjson("{out: 99}")), AssertionException);
+ ASSERT_THROWS(map_reduce_common::parseOutputOptions("mydb", fromjson("{out: 99}")),
+ AssertionException);
// 'out.nonAtomic' is not supported with normal, replace or inline.
- ASSERT_THROWS(
- mr::parseOutputOptions("mydb", fromjson("{out: {normal: 'mycoll', nonAtomic: true}}")),
- AssertionException);
- ASSERT_THROWS(
- mr::parseOutputOptions("mydb", fromjson("{out: {replace: 'mycoll', nonAtomic: true}}")),
- AssertionException);
- ASSERT_THROWS(
- mr::parseOutputOptions("mydb", fromjson("{out: {inline: 'mycoll', nonAtomic: true}}")),
- AssertionException);
+ ASSERT_THROWS(map_reduce_common::parseOutputOptions(
+ "mydb", fromjson("{out: {normal: 'mycoll', nonAtomic: true}}")),
+ AssertionException);
+ ASSERT_THROWS(map_reduce_common::parseOutputOptions(
+ "mydb", fromjson("{out: {replace: 'mycoll', nonAtomic: true}}")),
+ AssertionException);
+ ASSERT_THROWS(map_reduce_common::parseOutputOptions(
+ "mydb", fromjson("{out: {inline: 'mycoll', nonAtomic: true}}")),
+ AssertionException);
// Unknown output specifer.
- ASSERT_THROWS(mr::parseOutputOptions("mydb", fromjson("{out: {no_such_out_type: 'mycoll'}}")),
+ ASSERT_THROWS(map_reduce_common::parseOutputOptions(
+ "mydb", fromjson("{out: {no_such_out_type: 'mycoll'}}")),
AssertionException);
// 'out' is string.
_testConfigParseOutputOptions(
- "mydb", "{out: 'mycoll'}", "", "mycoll", "mydb.mycoll", false, mr::OutputType::kReplace);
+ "mydb", "{out: 'mycoll'}", "", "mycoll", "mydb.mycoll", false, OutputType::Replace);
// 'out' is object.
_testConfigParseOutputOptions("mydb",
"{out: {normal: 'mycoll'}}",
@@ -156,7 +161,7 @@ TEST(ConfigOutputOptionsTest, parseOutputOptions) {
"mycoll",
"mydb.mycoll",
false,
- mr::OutputType::kReplace);
+ OutputType::Replace);
// 'out.db' overrides dbname parameter
_testConfigParseOutputOptions("mydb1",
"{out: {replace: 'mycoll', db: 'mydb2'}}",
@@ -164,7 +169,7 @@ TEST(ConfigOutputOptionsTest, parseOutputOptions) {
"mycoll",
"mydb2.mycoll",
false,
- mr::OutputType::kReplace);
+ OutputType::Replace);
// 'out.nonAtomic' is supported with merge and reduce.
_testConfigParseOutputOptions("mydb",
"{out: {merge: 'mycoll', nonAtomic: true}}",
@@ -172,14 +177,14 @@ TEST(ConfigOutputOptionsTest, parseOutputOptions) {
"mycoll",
"mydb.mycoll",
true,
- mr::OutputType::kMerge);
+ OutputType::Merge);
_testConfigParseOutputOptions("mydb",
"{out: {reduce: 'mycoll', nonAtomic: true}}",
"",
"mycoll",
"mydb.mycoll",
true,
- mr::OutputType::kReduce);
+ OutputType::Reduce);
// inline
_testConfigParseOutputOptions("mydb1",
"{out: {inline: 'mycoll', db: 'mydb2'}}",
@@ -187,7 +192,7 @@ TEST(ConfigOutputOptionsTest, parseOutputOptions) {
"",
"",
false,
- mr::OutputType::kInMemory);
+ OutputType::InMemory);
// Order should not matter in fields of 'out' object.
_testConfigParseOutputOptions("mydb1",
@@ -196,35 +201,35 @@ TEST(ConfigOutputOptionsTest, parseOutputOptions) {
"mycoll",
"mydb2.mycoll",
false,
- mr::OutputType::kReplace);
+ OutputType::Replace);
_testConfigParseOutputOptions("mydb1",
"{out: {db: 'mydb2', replace: 'mycoll'}}",
"mydb2",
"mycoll",
"mydb2.mycoll",
false,
- mr::OutputType::kReplace);
+ OutputType::Replace);
_testConfigParseOutputOptions("mydb1",
"{out: {nonAtomic: true, merge: 'mycoll'}}",
"",
"mycoll",
"mydb1.mycoll",
true,
- mr::OutputType::kMerge);
+ OutputType::Merge);
_testConfigParseOutputOptions("mydb1",
"{out: {nonAtomic: true, reduce: 'mycoll'}}",
"",
"mycoll",
"mydb1.mycoll",
true,
- mr::OutputType::kReduce);
+ OutputType::Reduce);
_testConfigParseOutputOptions("mydb1",
"{out: {db: 'mydb2', inline: 'mycoll'}}",
"mydb2",
"",
"",
false,
- mr::OutputType::kInMemory);
+ OutputType::InMemory);
}
TEST(ConfigTest, ParseCollation) {
diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp
index ec5c6af9a27..626867a499e 100644
--- a/src/mongo/db/pipeline/aggregation_request.cpp
+++ b/src/mongo/db/pipeline/aggregation_request.cpp
@@ -283,7 +283,6 @@ NamespaceString AggregationRequest::parseNs(const std::string& dbname, const BSO
}
Document AggregationRequest::serializeToCommandObj() const {
- MutableDocument serialized;
return Document{
{kCommandName, (_nss.isCollectionlessAggregateNS() ? Value(1) : Value(_nss.coll()))},
{kPipelineName, _pipeline},
diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.cpp b/src/mongo/db/pipeline/lite_parsed_pipeline.cpp
index 0b9909c5ba4..6af52389e6a 100644
--- a/src/mongo/db/pipeline/lite_parsed_pipeline.cpp
+++ b/src/mongo/db/pipeline/lite_parsed_pipeline.cpp
@@ -73,7 +73,7 @@ void LiteParsedPipeline::assertSupportsMultiDocumentTransaction(
}
}
-bool LiteParsedPipeline::verifyIsSupported(
+void LiteParsedPipeline::verifyIsSupported(
OperationContext* opCtx,
const std::function<bool(OperationContext*, const NamespaceString&)> isSharded,
const boost::optional<ExplainOptions::Verbosity> explain,
@@ -85,14 +85,11 @@ bool LiteParsedPipeline::verifyIsSupported(
// Verify litePipe can be run at the given read concern.
assertSupportsReadConcern(opCtx, explain, enableMajorityReadConcern);
// Verify that no involved namespace is sharded unless allowed by the pipeline.
- auto sharded = false;
for (const auto& nss : getInvolvedNamespaces()) {
- sharded = isSharded(opCtx, nss);
uassert(28769,
str::stream() << nss.ns() << " cannot be sharded",
- allowShardedForeignCollection(nss) || !sharded);
+ allowShardedForeignCollection(nss) || !isSharded(opCtx, nss));
}
- return sharded;
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h
index f5578873621..21e309dbaba 100644
--- a/src/mongo/db/pipeline/lite_parsed_pipeline.h
+++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h
@@ -140,10 +140,9 @@ public:
/**
* Perform checks that verify that the LitePipe is valid. Note that this function must be called
* before forwarding an aggregation command on an unsharded collection, in order to verify that
- * the involved namespaces are allowed to be sharded. Returns true if any involved namespace is
- * sharded.
+ * the involved namespaces are allowed to be sharded.
*/
- bool verifyIsSupported(
+ void verifyIsSupported(
OperationContext* opCtx,
const std::function<bool(OperationContext*, const NamespaceString&)> isSharded,
const boost::optional<ExplainOptions::Verbosity> explain,
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index 33e133adff2..fc6457fcc4b 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -45,34 +45,6 @@ namespace mongo {
*/
class MongoSInterface : public MongoProcessCommon {
public:
- static BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
- const AggregationRequest& request,
- const boost::optional<ShardId>& shardId,
- Pipeline* pipeline,
- BSONObj collationObj);
-
- /**
- * Appends information to the command sent to the shards which should be appended both if this
- * is a passthrough sent to a single shard and if this is a split pipeline.
- */
- static BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
- OperationContext* opCtx,
- const boost::optional<ShardId>& shardId,
- const AggregationRequest& request,
- BSONObj collationObj);
-
- static BSONObj createCommandForTargetedShards(
- OperationContext* opCtx,
- const AggregationRequest& request,
- const LiteParsedPipeline& litePipe,
- const cluster_aggregation_planner::SplitPipeline& splitPipeline,
- const BSONObj collationObj,
- const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec,
- bool needsMerge);
-
- static StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(
- OperationContext* opCtx, const NamespaceString& execNss);
-
MongoSInterface() = default;
virtual ~MongoSInterface() = default;
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 3ef81811d71..a469b6d9807 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -32,20 +32,56 @@
#include "sharded_agg_helpers.h"
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/client/connpool.h"
+#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/curop.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_out.h"
+#include "mongo/db/query/find_common.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/rpc/op_msg_rpc_impls.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/cluster_commands_helpers.h"
+#include "mongo/s/multi_statement_transaction_requests_sender.h"
+#include "mongo/s/query/cluster_aggregation_planner.h"
+#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/query/cluster_query_knobs_gen.h"
#include "mongo/s/query/document_source_merge_cursors.h"
+#include "mongo/s/query/store_possible_cursor.h"
+#include "mongo/s/transaction_router.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/log.h"
-namespace mongo {
-namespace sharded_agg_helpers {
+namespace mongo::sharded_agg_helpers {
-MONGO_FAIL_POINT_DEFINE(clusterAggregateHangBeforeEstablishingShardCursors);
+using SplitPipeline = cluster_aggregation_planner::SplitPipeline;
+
+MONGO_FAIL_POINT_DEFINE(shardedAggregateHangBeforeEstablishingShardCursors);
+MONGO_FAIL_POINT_DEFINE(shardedAggregateFailToEstablishMergingShardCursor);
+MONGO_FAIL_POINT_DEFINE(shardedAggregateFailToDispatchExchangeConsumerPipeline);
+
+namespace {
+
+bool mustRunOnAllShards(const NamespaceString& nss, bool hasChangeStream) {
+ // The following aggregations must be routed to all shards:
+ // - Any collectionless aggregation, such as non-localOps $currentOp.
+ // - Any aggregation which begins with a $changeStream stage.
+ return nss.isCollectionlessAggregateNS() || hasChangeStream;
+}
+
+Status appendCursorResponseToCommandResult(const ShardId& shardId,
+ const BSONObj cursorResponse,
+ BSONObjBuilder* result) {
+ // If a write error was encountered, append it to the output buffer first.
+ if (auto wcErrorElem = cursorResponse["writeConcernError"]) {
+ appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *result);
+ }
+
+ // Pass the results from the remote shard into our command response.
+ result->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(cursorResponse));
+ return getStatusFromCommandResult(result->asTempObj());
+}
/**
* Given a document representing an aggregation command such as
@@ -73,24 +109,18 @@ Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity v
return explainCommandBuilder.freeze();
}
-BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
- const AggregationRequest& request,
- const boost::optional<RuntimeConstants>& constants,
- Pipeline* pipeline,
- BSONObj collationObj) {
- // Create the command for the shards.
- MutableDocument targetedCmd(request.serializeToCommandObj());
- if (pipeline) {
- targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize());
+Shard::RetryPolicy getDesiredRetryPolicy(OperationContext* opCtx) {
+ // The idempotent retry policy will retry even for writeConcern failures, so only set it if the
+ // pipeline does not support writeConcern.
+ if (!opCtx->getWriteConcern().usedDefault) {
+ return Shard::RetryPolicy::kNotIdempotent;
}
-
- return genericTransformForShards(
- std::move(targetedCmd), opCtx, request, constants, collationObj);
+ return Shard::RetryPolicy::kIdempotent;
}
BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
OperationContext* opCtx,
- const AggregationRequest& request,
+ boost::optional<ExplainOptions::Verbosity> explainVerbosity,
const boost::optional<RuntimeConstants>& constants,
BSONObj collationObj) {
if (constants) {
@@ -100,7 +130,7 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
cmdForShards[AggregationRequest::kFromMongosName] = Value(true);
// If this is a request for an aggregation explain, then we must wrap the aggregate inside an
// explain command.
- if (auto explainVerbosity = request.getExplain()) {
+ if (explainVerbosity) {
cmdForShards.reset(wrapAggAsExplain(cmdForShards.freeze(), *explainVerbosity));
}
@@ -120,6 +150,93 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
return appendAllowImplicitCreate(cmdForShards.freeze().toBson(), false);
}
+std::vector<RemoteCursor> establishShardCursors(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ bool hasChangeStream,
+ boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ const std::set<ShardId>& shardIds,
+ const BSONObj& cmdObj,
+ const ReadPreferenceSetting& readPref) {
+ LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
+
+ const bool mustRunOnAll = mustRunOnAllShards(nss, hasChangeStream);
+ std::vector<std::pair<ShardId, BSONObj>> requests;
+
+ // If we don't need to run on all shards, then we should always have a valid routing table.
+ invariant(routingInfo || mustRunOnAll);
+
+ if (mustRunOnAll) {
+ // The pipeline contains a stage which must be run on all shards. Skip versioning and
+ // enqueue the raw command objects.
+ for (const auto& shardId : shardIds) {
+ requests.emplace_back(shardId, cmdObj);
+ }
+ } else if (routingInfo->cm()) {
+ // The collection is sharded. Use the routing table to decide which shards to target
+ // based on the query and collation, and build versioned requests for them.
+ for (const auto& shardId : shardIds) {
+ auto versionedCmdObj =
+ appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId));
+ requests.emplace_back(shardId, std::move(versionedCmdObj));
+ }
+ } else {
+ // The collection is unsharded. Target only the primary shard for the database.
+ // Don't append shard version info when contacting the config servers.
+ const auto cmdObjWithShardVersion = !routingInfo->db().primary()->isConfig()
+ ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED())
+ : cmdObj;
+ requests.emplace_back(routingInfo->db().primaryId(),
+ appendDbVersionIfPresent(cmdObjWithShardVersion, routingInfo->db()));
+ }
+
+ if (MONGO_unlikely(shardedAggregateHangBeforeEstablishingShardCursors.shouldFail())) {
+ log() << "shardedAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking "
+ "until fail point is disabled.";
+ while (MONGO_unlikely(shardedAggregateHangBeforeEstablishingShardCursors.shouldFail())) {
+ sleepsecs(1);
+ }
+ }
+
+ return establishCursors(opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ nss,
+ readPref,
+ requests,
+ false /* do not allow partial results */,
+ getDesiredRetryPolicy(opCtx));
+}
+
+std::set<ShardId> getTargetedShards(OperationContext* opCtx,
+ bool mustRunOnAllShards,
+ const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ const BSONObj shardQuery,
+ const BSONObj collation) {
+ if (mustRunOnAllShards) {
+ // The pipeline begins with a stage which must be run on all shards.
+ std::vector<ShardId> shardIds;
+ Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds);
+ return {std::make_move_iterator(shardIds.begin()), std::make_move_iterator(shardIds.end())};
+ }
+
+ // If we don't need to run on all shards, then we should always have a valid routing table.
+ invariant(routingInfo);
+
+ return getTargetedShardsForQuery(opCtx, *routingInfo, shardQuery, collation);
+}
+
+ShardId pickMergingShard(OperationContext* opCtx,
+ bool needsPrimaryShardMerge,
+ const std::vector<ShardId>& targetedShards,
+ ShardId primaryShard) {
+ auto& prng = opCtx->getClient()->getPrng();
+ // If we cannot merge on mongoS, establish the merge cursor on a shard. Perform the merging
+ // command on random shard, unless the pipeline dictates that it needs to be run on the primary
+ // shard for the database.
+ return needsPrimaryShardMerge ? primaryShard
+ : targetedShards[prng.nextInt32(targetedShards.size())];
+}
+
StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx,
const NamespaceString& execNss) {
// First, verify that there are shards present in the cluster. If not, then we return the
@@ -139,32 +256,80 @@ StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationConte
return getCollectionRoutingInfoForTxnCmd(opCtx, execNss);
}
-Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req) {
- // The idempotent retry policy will retry even for writeConcern failures, so only set it if the
- // pipeline does not support writeConcern.
- if (req.getWriteConcern()) {
- return Shard::RetryPolicy::kNotIdempotent;
+Status appendExplainResults(sharded_agg_helpers::DispatchShardPipelineResults&& dispatchResults,
+ const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
+ BSONObjBuilder* result) {
+ if (dispatchResults.splitPipeline) {
+ auto* mergePipeline = dispatchResults.splitPipeline->mergePipeline.get();
+ const char* mergeType = [&]() {
+ if (mergePipeline->canRunOnMongos()) {
+ return "mongos";
+ } else if (dispatchResults.exchangeSpec) {
+ return "exchange";
+ } else if (mergePipeline->needsPrimaryShardMerger()) {
+ return "primaryShard";
+ } else {
+ return "anyShard";
+ }
+ }();
+
+ *result << "mergeType" << mergeType;
+
+ MutableDocument pipelinesDoc;
+ pipelinesDoc.addField("shardsPart",
+ Value(dispatchResults.splitPipeline->shardsPipeline->writeExplainOps(
+ *mergeCtx->explain)));
+ if (dispatchResults.exchangeSpec) {
+ BSONObjBuilder bob;
+ dispatchResults.exchangeSpec->exchangeSpec.serialize(&bob);
+ bob.append("consumerShards", dispatchResults.exchangeSpec->consumerShards);
+ pipelinesDoc.addField("exchange", Value(bob.obj()));
+ }
+ pipelinesDoc.addField("mergerPart",
+ Value(mergePipeline->writeExplainOps(*mergeCtx->explain)));
+
+ *result << "splitPipeline" << pipelinesDoc.freeze();
+ } else {
+ *result << "splitPipeline" << BSONNULL;
}
- return Shard::RetryPolicy::kIdempotent;
-}
-bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe) {
- // The following aggregations must be routed to all shards:
- // - Any collectionless aggregation, such as non-localOps $currentOp.
- // - Any aggregation which begins with a $changeStream stage.
- return nss.isCollectionlessAggregateNS() || litePipe.hasChangeStream();
+ BSONObjBuilder shardExplains(result->subobjStart("shards"));
+ for (const auto& shardResult : dispatchResults.remoteExplainOutput) {
+ invariant(shardResult.shardHostAndPort);
+
+ uassertStatusOK(shardResult.swResponse.getStatus());
+ uassertStatusOK(getStatusFromCommandResult(shardResult.swResponse.getValue().data));
+
+ auto shardId = shardResult.shardId.toString();
+ const auto& data = shardResult.swResponse.getValue().data;
+ BSONObjBuilder explain(shardExplains.subobjStart(shardId));
+ explain << "host" << shardResult.shardHostAndPort->toString();
+ if (auto stagesElement = data["stages"]) {
+ explain << "stages" << stagesElement;
+ } else {
+ auto queryPlannerElement = data["queryPlanner"];
+ uassert(51157,
+ str::stream() << "Malformed explain response received from shard " << shardId
+ << ": " << data.toString(),
+ queryPlannerElement);
+ explain << "queryPlanner" << queryPlannerElement;
+ if (auto executionStatsElement = data["executionStats"]) {
+ explain << "executionStats" << executionStatsElement;
+ }
+ }
+ }
+
+ return Status::OK();
}
+
BSONObj createCommandForTargetedShards(
- OperationContext* opCtx,
- const AggregationRequest& request,
- const LiteParsedPipeline& litePipe,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Document serializedCommand,
const cluster_aggregation_planner::SplitPipeline& splitPipeline,
- const BSONObj collationObj,
const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec,
- const boost::optional<RuntimeConstants>& constants,
bool needsMerge) {
// Create the command for the shards.
- MutableDocument targetedCmd(request.serializeToCommandObj());
+ MutableDocument targetedCmd(serializedCommand);
// If we've parsed a pipeline on mongos, always override the pipeline, in case parsing it
// has defaulted any arguments or otherwise changed the spec. For example, $listSessions may
// have detected a logged in user and appended that user name to the $listSessions spec to
@@ -193,8 +358,143 @@ BSONObj createCommandForTargetedShards(
targetedCmd[AggregationRequest::kExchangeName] =
exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value();
+ return genericTransformForShards(std::move(targetedCmd),
+ expCtx->opCtx,
+ expCtx->explain,
+ expCtx->getRuntimeConstants(),
+ expCtx->collation);
+}
+
+sharded_agg_helpers::DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& executionNss,
+ Document serializedCommand,
+ sharded_agg_helpers::DispatchShardPipelineResults* shardDispatchResults) {
+ auto opCtx = expCtx->opCtx;
+
+ if (MONGO_unlikely(shardedAggregateFailToDispatchExchangeConsumerPipeline.shouldFail())) {
+ log() << "shardedAggregateFailToDispatchExchangeConsumerPipeline fail point enabled.";
+ uasserted(ErrorCodes::FailPointEnabled,
+ "Asserting on exhange consumer pipeline dispatch due to failpoint.");
+ }
+
+ // For all consumers construct a request with appropriate cursor ids and send to shards.
+ std::vector<std::pair<ShardId, BSONObj>> requests;
+ auto numConsumers = shardDispatchResults->exchangeSpec->consumerShards.size();
+ std::vector<SplitPipeline> consumerPipelines;
+ for (size_t idx = 0; idx < numConsumers; ++idx) {
+ // Pick this consumer's cursors from producers.
+ std::vector<OwnedRemoteCursor> producers;
+ for (size_t p = 0; p < shardDispatchResults->numProducers; ++p) {
+ producers.emplace_back(
+ std::move(shardDispatchResults->remoteCursors[p * numConsumers + idx]));
+ }
+
+ // Create a pipeline for a consumer and add the merging stage.
+ auto consumerPipeline = uassertStatusOK(Pipeline::create(
+ shardDispatchResults->splitPipeline->mergePipeline->getSources(), expCtx));
+
+ cluster_aggregation_planner::addMergeCursorsSource(
+ consumerPipeline.get(),
+ BSONObj(),
+ std::move(producers),
+ {},
+ shardDispatchResults->splitPipeline->shardCursorsSortSpec,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ false);
+
+ consumerPipelines.emplace_back(std::move(consumerPipeline), nullptr, boost::none);
+
+ auto consumerCmdObj = createCommandForTargetedShards(
+ expCtx, serializedCommand, consumerPipelines.back(), boost::none, false);
+
+ requests.emplace_back(shardDispatchResults->exchangeSpec->consumerShards[idx],
+ consumerCmdObj);
+ }
+ auto cursors = establishCursors(opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ executionNss,
+ ReadPreferenceSetting::get(opCtx),
+ requests,
+ false /* do not allow partial results */);
+
+ // Convert remote cursors into a vector of "owned" cursors.
+ std::vector<OwnedRemoteCursor> ownedCursors;
+ for (auto&& cursor : cursors) {
+ ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), executionNss));
+ }
+
+ // The merging pipeline is just a union of the results from each of the shards involved on the
+ // consumer side of the exchange.
+ auto mergePipeline = uassertStatusOK(Pipeline::create({}, expCtx));
+ mergePipeline->setSplitState(Pipeline::SplitState::kSplitForMerge);
+
+ SplitPipeline splitPipeline{nullptr, std::move(mergePipeline), boost::none};
+
+ // Relinquish ownership of the local consumer pipelines' cursors as each shard is now
+ // responsible for its own producer cursors.
+ for (const auto& pipeline : consumerPipelines) {
+ const auto& mergeCursors =
+ static_cast<DocumentSourceMergeCursors*>(pipeline.shardsPipeline->peekFront());
+ mergeCursors->dismissCursorOwnership();
+ }
+ return sharded_agg_helpers::DispatchShardPipelineResults{false,
+ std::move(ownedCursors),
+ {} /*TODO SERVER-36279*/,
+ std::move(splitPipeline),
+ nullptr,
+ BSONObj(),
+ numConsumers};
+}
+
+BSONObj createCommandForMergingShard(Document serializedCommand,
+ const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
+ const ShardId& shardId,
+ bool mergingShardContributesData,
+ const Pipeline* pipelineForMerging) {
+ MutableDocument mergeCmd(serializedCommand);
+
+ mergeCmd["pipeline"] = Value(pipelineForMerging->serialize());
+ mergeCmd[AggregationRequest::kFromMongosName] = Value(true);
+
+ mergeCmd[AggregationRequest::kRuntimeConstants] =
+ Value(mergeCtx->getRuntimeConstants().toBSON());
+
+ // If the user didn't specify a collation already, make sure there's a collation attached to
+ // the merge command, since the merging shard may not have the collection metadata.
+ if (mergeCmd.peek()["collation"].missing()) {
+ mergeCmd["collation"] = mergeCtx->getCollator()
+ ? Value(mergeCtx->getCollator()->getSpec().toBSON())
+ : Value(Document{CollationSpec::kSimpleSpec});
+ }
+
+ const auto txnRouter = TransactionRouter::get(mergeCtx->opCtx);
+ if (txnRouter && mergingShardContributesData) {
+ // Don't include a readConcern since we can only include read concerns on the _first_
+ // command sent to a participant per transaction. Assuming the merging shard is a
+ // participant, it will already have received another 'aggregate' command earlier which
+ // contained a readConcern.
+ mergeCmd.remove("readConcern");
+ }
+
+ return appendAllowImplicitCreate(mergeCmd.freeze().toBson(), false);
+}
+
+BSONObj createPassthroughCommandForShard(
+ OperationContext* opCtx,
+ Document serializedCommand,
+ boost::optional<ExplainOptions::Verbosity> explainVerbosity,
+ const boost::optional<RuntimeConstants>& constants,
+ Pipeline* pipeline,
+ BSONObj collationObj) {
+ // Create the command for the shards.
+ MutableDocument targetedCmd(serializedCommand);
+ if (pipeline) {
+ targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize());
+ }
+
return genericTransformForShards(
- std::move(targetedCmd), opCtx, request, constants, collationObj);
+ std::move(targetedCmd), opCtx, explainVerbosity, constants, collationObj);
}
/**
@@ -203,12 +503,11 @@ BSONObj createCommandForTargetedShards(
* shard version is encountered, refreshes the routing table and tries again.
*/
DispatchShardPipelineResults dispatchShardPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& executionNss,
- const AggregationRequest& aggRequest,
- const LiteParsedPipeline& litePipe,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- BSONObj collationObj) {
+ Document serializedCommand,
+ bool hasChangeStream,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline) {
+ auto expCtx = pipeline->getContext();
+
// The process is as follows:
// - First, determine whether we need to target more than one shard. If so, we split the
// pipeline; if not, we retain the existing pipeline.
@@ -226,12 +525,11 @@ DispatchShardPipelineResults dispatchShardPipeline(
const auto shardQuery = pipeline->getInitialQuery();
- auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss);
+ auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, expCtx->ns);
// If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue.
// Otherwise, uassert on all exceptions here.
- if (!(litePipe.hasChangeStream() &&
- executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) {
+ if (!(hasChangeStream && executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) {
uassertStatusOK(executionNsRoutingInfoStatus);
}
@@ -240,9 +538,9 @@ DispatchShardPipelineResults dispatchShardPipeline(
: boost::optional<CachedCollectionRoutingInfo>{};
// Determine whether we can run the entire aggregation on a single shard.
- const bool mustRunOnAll = mustRunOnAllShards(executionNss, litePipe);
+ const bool mustRunOnAll = mustRunOnAllShards(expCtx->ns, hasChangeStream);
std::set<ShardId> shardIds = getTargetedShards(
- opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation());
+ opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, expCtx->collation);
// Don't need to split the pipeline if we are only targeting a single shard, unless:
// - There is a stage that needs to be run on the primary shard and the single target shard
@@ -268,16 +566,14 @@ DispatchShardPipelineResults dispatchShardPipeline(
// Generate the command object for the targeted shards.
BSONObj targetedCommand = splitPipeline
- ? createCommandForTargetedShards(opCtx,
- aggRequest,
- litePipe,
- *splitPipeline,
- collationObj,
- exchangeSpec,
- expCtx->getRuntimeConstants(),
- true)
- : createPassthroughCommandForShard(
- opCtx, aggRequest, expCtx->getRuntimeConstants(), pipeline.get(), collationObj);
+ ? createCommandForTargetedShards(
+ expCtx, serializedCommand, *splitPipeline, exchangeSpec, true)
+ : createPassthroughCommandForShard(expCtx->opCtx,
+ serializedCommand,
+ expCtx->explain,
+ expCtx->getRuntimeConstants(),
+ pipeline.get(),
+ expCtx->collation);
// In order for a $changeStream to work reliably, we need the shard registry to be at least as
// current as the logical time at which the pipeline was serialized to 'targetedCommand' above.
@@ -286,14 +582,14 @@ DispatchShardPipelineResults dispatchShardPipeline(
// may not have been forced to split if there was only one shard in the cluster when the command
// began execution. If a shard was added since the earlier targeting logic ran, then refreshing
// here may cause us to illegally target an unsplit pipeline to more than one shard.
- if (litePipe.hasChangeStream()) {
+ if (hasChangeStream) {
auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
if (!shardRegistry->reload(opCtx)) {
shardRegistry->reload(opCtx);
}
// Rebuild the set of shards as the shard registry might have changed.
shardIds = getTargetedShards(
- opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation());
+ opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, expCtx->collation);
}
// If there were no shards when we began execution, we wouldn't have run this aggregation in the
@@ -310,7 +606,7 @@ DispatchShardPipelineResults dispatchShardPipeline(
// should not participate in the shard version protocol.
shardResults =
scatterGatherUnversionedTargetAllShards(opCtx,
- executionNss.db(),
+ expCtx->ns.db(),
targetedCommand,
ReadPreferenceSetting::get(opCtx),
Shard::RetryPolicy::kIdempotent);
@@ -320,23 +616,22 @@ DispatchShardPipelineResults dispatchShardPipeline(
invariant(executionNsRoutingInfo);
shardResults =
scatterGatherVersionedTargetByRoutingTable(opCtx,
- executionNss.db(),
- executionNss,
+ expCtx->ns.db(),
+ expCtx->ns,
*executionNsRoutingInfo,
targetedCommand,
ReadPreferenceSetting::get(opCtx),
Shard::RetryPolicy::kIdempotent,
shardQuery,
- aggRequest.getCollation());
+ expCtx->collation);
}
} else {
cursors = establishShardCursors(opCtx,
- executionNss,
- litePipe,
+ expCtx->ns,
+ hasChangeStream,
executionNsRoutingInfo,
shardIds,
targetedCommand,
- aggRequest,
ReadPreferenceSetting::get(opCtx));
invariant(cursors.size() % shardIds.size() == 0,
str::stream() << "Number of cursors (" << cursors.size()
@@ -346,7 +641,7 @@ DispatchShardPipelineResults dispatchShardPipeline(
// Convert remote cursors into a vector of "owned" cursors.
std::vector<OwnedRemoteCursor> ownedCursors;
for (auto&& cursor : cursors) {
- ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), executionNss));
+ ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), expCtx->ns));
}
// Record the number of shards involved in the aggregation. If we are required to merge on
@@ -366,82 +661,269 @@ DispatchShardPipelineResults dispatchShardPipeline(
exchangeSpec};
}
-std::set<ShardId> getTargetedShards(OperationContext* opCtx,
- bool mustRunOnAllShards,
- const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
- const BSONObj shardQuery,
- const BSONObj collation) {
- if (mustRunOnAllShards) {
- // The pipeline begins with a stage which must be run on all shards.
- std::vector<ShardId> shardIds;
- Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds);
- return {shardIds.begin(), shardIds.end()};
+AsyncRequestsSender::Response establishMergingShardCursor(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj mergeCmdObj,
+ const ShardId& mergingShardId) {
+ if (MONGO_unlikely(shardedAggregateFailToEstablishMergingShardCursor.shouldFail())) {
+ log() << "shardedAggregateFailToEstablishMergingShardCursor fail point enabled.";
+ uasserted(ErrorCodes::FailPointEnabled,
+ "Asserting on establishing merging shard cursor due to failpoint.");
}
- // If we don't need to run on all shards, then we should always have a valid routing table.
+ MultiStatementTransactionRequestsSender ars(
+ opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ nss.db().toString(),
+ {{mergingShardId, mergeCmdObj}},
+ ReadPreferenceSetting::get(opCtx),
+ getDesiredRetryPolicy(opCtx));
+ const auto response = ars.next();
+ invariant(ars.done());
+ return response;
+}
+
+Status dispatchMergingPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const ClusterAggregate::Namespaces& namespaces,
+ Document serializedCommand,
+ long long batchSize,
+ const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ sharded_agg_helpers::DispatchShardPipelineResults&& shardDispatchResults,
+ BSONObjBuilder* result,
+ const PrivilegeVector& privileges,
+ bool hasChangeStream) {
+ // We should never be in a situation where we call this function on a non-merge pipeline.
+ invariant(shardDispatchResults.splitPipeline);
+ auto* mergePipeline = shardDispatchResults.splitPipeline->mergePipeline.get();
+ invariant(mergePipeline);
+ auto* opCtx = expCtx->opCtx;
+
+ std::vector<ShardId> targetedShards;
+ targetedShards.reserve(shardDispatchResults.remoteCursors.size());
+ for (auto&& remoteCursor : shardDispatchResults.remoteCursors) {
+ targetedShards.emplace_back(remoteCursor->getShardId().toString());
+ }
+
+ cluster_aggregation_planner::addMergeCursorsSource(
+ mergePipeline,
+ shardDispatchResults.commandForTargetedShards,
+ std::move(shardDispatchResults.remoteCursors),
+ targetedShards,
+ shardDispatchResults.splitPipeline->shardCursorsSortSpec,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ hasChangeStream);
+
+ // First, check whether we can merge on the mongoS. If the merge pipeline MUST run on mongoS,
+ // then ignore the internalQueryProhibitMergingOnMongoS parameter.
+ if (mergePipeline->requiredToRunOnMongos() ||
+ (!internalQueryProhibitMergingOnMongoS.load() && mergePipeline->canRunOnMongos())) {
+ return runPipelineOnMongoS(namespaces,
+ batchSize,
+ std::move(shardDispatchResults.splitPipeline->mergePipeline),
+ result,
+ privileges);
+ }
+
+ // If we are not merging on mongoS, then this is not a $changeStream aggregation, and we
+ // therefore must have a valid routing table.
invariant(routingInfo);
- return getTargetedShardsForQuery(opCtx, *routingInfo, shardQuery, collation);
+ const ShardId mergingShardId = pickMergingShard(opCtx,
+ shardDispatchResults.needsPrimaryShardMerge,
+ targetedShards,
+ routingInfo->db().primaryId());
+ const bool mergingShardContributesData =
+ std::find(targetedShards.begin(), targetedShards.end(), mergingShardId) !=
+ targetedShards.end();
+
+ auto mergeCmdObj = createCommandForMergingShard(
+ serializedCommand, expCtx, mergingShardId, mergingShardContributesData, mergePipeline);
+
+ LOG(1) << "Dispatching merge pipeline " << redact(mergeCmdObj) << " to designated shard";
+
+ // Dispatch $mergeCursors to the chosen shard, store the resulting cursor, and return.
+ auto mergeResponse =
+ establishMergingShardCursor(opCtx, namespaces.executionNss, mergeCmdObj, mergingShardId);
+ uassertStatusOK(mergeResponse.swResponse);
+
+ auto mergeCursorResponse = uassertStatusOK(
+ storePossibleCursor(opCtx,
+ mergingShardId,
+ *mergeResponse.shardHostAndPort,
+ mergeResponse.swResponse.getValue().data,
+ namespaces.requestedNss,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ Grid::get(opCtx)->getCursorManager(),
+ privileges,
+ expCtx->tailableMode));
+
+ // Ownership for the shard cursors has been transferred to the merging shard. Dismiss the
+ // ownership in the current merging pipeline such that when it goes out of scope it does not
+ // attempt to kill the cursors.
+ auto mergeCursors = static_cast<DocumentSourceMergeCursors*>(mergePipeline->peekFront());
+ mergeCursors->dismissCursorOwnership();
+
+ return appendCursorResponseToCommandResult(mergingShardId, mergeCursorResponse, result);
}
-std::vector<RemoteCursor> establishShardCursors(
- OperationContext* opCtx,
- const NamespaceString& nss,
- const LiteParsedPipeline& litePipe,
- boost::optional<CachedCollectionRoutingInfo>& routingInfo,
- const std::set<ShardId>& shardIds,
- const BSONObj& cmdObj,
- const AggregationRequest& request,
- const ReadPreferenceSetting& readPref) {
- LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
+BSONObj establishMergingMongosCursor(OperationContext* opCtx,
+ long long batchSize,
+ const NamespaceString& requestedNss,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging,
+ const PrivilegeVector& privileges) {
- const bool mustRunOnAll = mustRunOnAllShards(nss, litePipe);
- std::vector<std::pair<ShardId, BSONObj>> requests;
+ ClusterClientCursorParams params(requestedNss, ReadPreferenceSetting::get(opCtx));
- // If we don't need to run on all shards, then we should always have a valid routing table.
- invariant(routingInfo || mustRunOnAll);
+ params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned();
+ params.tailableMode = pipelineForMerging->getContext()->tailableMode;
+ // A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch
+ // size we pass here is used for getMores, so do not specify a batch size if the initial request
+ // had a batch size of 0.
+ params.batchSize = batchSize == 0 ? boost::none : boost::make_optional(batchSize);
+ params.lsid = opCtx->getLogicalSessionId();
+ params.txnNumber = opCtx->getTxnNumber();
+ params.originatingPrivileges = privileges;
- if (mustRunOnAll) {
- // The pipeline contains a stage which must be run on all shards. Skip versioning and
- // enqueue the raw command objects.
- for (auto&& shardId : shardIds) {
- requests.emplace_back(std::move(shardId), cmdObj);
+ if (TransactionRouter::get(opCtx)) {
+ params.isAutoCommit = false;
+ }
+
+ auto ccc = cluster_aggregation_planner::buildClusterCursor(
+ opCtx, std::move(pipelineForMerging), std::move(params));
+
+ auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
+
+ rpc::OpMsgReplyBuilder replyBuilder;
+ CursorResponseBuilder::Options options;
+ options.isInitialResponse = true;
+
+ CursorResponseBuilder responseBuilder(&replyBuilder, options);
+ bool stashedResult = false;
+
+ for (long long objCount = 0; objCount < batchSize; ++objCount) {
+ ClusterQueryResult next;
+ try {
+ next = uassertStatusOK(ccc->next(RouterExecStage::ExecContext::kInitialFind));
+ } catch (const ExceptionFor<ErrorCodes::CloseChangeStream>&) {
+ // This exception is thrown when a $changeStream stage encounters an event
+ // that invalidates the cursor. We should close the cursor and return without
+ // error.
+ cursorState = ClusterCursorManager::CursorState::Exhausted;
+ break;
}
- } else if (routingInfo->cm()) {
- // The collection is sharded. Use the routing table to decide which shards to target
- // based on the query and collation, and build versioned requests for them.
- for (auto& shardId : shardIds) {
- auto versionedCmdObj =
- appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId));
- requests.emplace_back(std::move(shardId), std::move(versionedCmdObj));
+
+ // Check whether we have exhausted the pipeline's results.
+ if (next.isEOF()) {
+ // We reached end-of-stream. If the cursor is not tailable, then we mark it as
+ // exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even when
+ // we reach end-of-stream. However, if all the remote cursors are exhausted, there is no
+ // hope of returning data and thus we need to close the mongos cursor as well.
+ if (!ccc->isTailable() || ccc->remotesExhausted()) {
+ cursorState = ClusterCursorManager::CursorState::Exhausted;
+ }
+ break;
}
- } else {
- // The collection is unsharded. Target only the primary shard for the database.
- // Don't append shard version info when contacting the config servers.
- const auto cmdObjWithShardVersion = !routingInfo->db().primary()->isConfig()
- ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED())
- : cmdObj;
- requests.emplace_back(routingInfo->db().primaryId(),
- appendDbVersionIfPresent(cmdObjWithShardVersion, routingInfo->db()));
- }
- if (MONGO_unlikely(clusterAggregateHangBeforeEstablishingShardCursors.shouldFail())) {
- log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking "
- "until fail point is disabled.";
- while (MONGO_unlikely(clusterAggregateHangBeforeEstablishingShardCursors.shouldFail())) {
- sleepsecs(1);
+ // If this result will fit into the current batch, add it. Otherwise, stash it in the cursor
+ // to be returned on the next getMore.
+ auto nextObj = *next.getResult();
+
+ if (!FindCommon::haveSpaceForNext(nextObj, objCount, responseBuilder.bytesUsed())) {
+ ccc->queueResult(nextObj);
+ stashedResult = true;
+ break;
}
+
+ // Set the postBatchResumeToken. For non-$changeStream aggregations, this will be empty.
+ responseBuilder.setPostBatchResumeToken(ccc->getPostBatchResumeToken());
+ responseBuilder.append(nextObj);
}
- return establishCursors(opCtx,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- nss,
- readPref,
- requests,
- false /* do not allow partial results */,
- getDesiredRetryPolicy(request));
+ // For empty batches, or in the case where the final result was added to the batch rather than
+ // being stashed, we update the PBRT here to ensure that it is the most recent available.
+ if (!stashedResult) {
+ responseBuilder.setPostBatchResumeToken(ccc->getPostBatchResumeToken());
+ }
+
+ ccc->detachFromOperationContext();
+
+ int nShards = ccc->getNumRemotes();
+ CursorId clusterCursorId = 0;
+
+ if (cursorState == ClusterCursorManager::CursorState::NotExhausted) {
+ auto authUsers = AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames();
+ clusterCursorId = uassertStatusOK(Grid::get(opCtx)->getCursorManager()->registerCursor(
+ opCtx,
+ ccc.releaseCursor(),
+ requestedNss,
+ ClusterCursorManager::CursorType::MultiTarget,
+ ClusterCursorManager::CursorLifetime::Mortal,
+ authUsers));
+ }
+
+ // Fill out the aggregation metrics in CurOp.
+ if (clusterCursorId > 0) {
+ CurOp::get(opCtx)->debug().cursorid = clusterCursorId;
+ }
+ CurOp::get(opCtx)->debug().nShards = std::max(CurOp::get(opCtx)->debug().nShards, nShards);
+ CurOp::get(opCtx)->debug().cursorExhausted = (clusterCursorId == 0);
+ CurOp::get(opCtx)->debug().nreturned = responseBuilder.numDocs();
+
+ responseBuilder.done(clusterCursorId, requestedNss.ns());
+
+ auto bodyBuilder = replyBuilder.getBodyBuilder();
+ CommandHelpers::appendSimpleCommandStatus(bodyBuilder, true);
+ bodyBuilder.doneFast();
+
+ return replyBuilder.releaseBody();
}
+/**
+ * Returns the output of the listCollections command filtered to the namespace 'nss'.
+ */
+BSONObj getUnshardedCollInfo(const Shard* primaryShard, const NamespaceString& nss) {
+ ScopedDbConnection conn(primaryShard->getConnString());
+ std::list<BSONObj> all =
+ conn->getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll()));
+ if (all.empty()) {
+ // Collection does not exist, return an empty object.
+ return BSONObj();
+ }
+ return all.front();
+}
+
+
+/**
+ * Returns the collection default collation or the simple collator if there is no default. If the
+ * collection does not exist, then returns an empty BSON Object.
+ */
+BSONObj getDefaultCollationForUnshardedCollection(const BSONObj collectionInfo) {
+ if (collectionInfo.isEmpty()) {
+ // Collection does not exist, return an empty object.
+ return BSONObj();
+ }
+
+ BSONObj defaultCollation = CollationSpec::kSimpleSpec;
+ if (collectionInfo["options"].type() == BSONType::Object) {
+ BSONObj collectionOptions = collectionInfo["options"].Obj();
+ BSONElement collationElement;
+ auto status = bsonExtractTypedField(
+ collectionOptions, "collation", BSONType::Object, &collationElement);
+ if (status.isOK()) {
+ defaultCollation = collationElement.Obj().getOwned();
+ uassert(ErrorCodes::BadValue,
+ "Default collation in collection metadata cannot be empty.",
+ !defaultCollation.isEmpty());
+ } else if (status != ErrorCodes::NoSuchKey) {
+ uassertStatusOK(status);
+ }
+ }
+ return defaultCollation;
+}
+
+} // namespace
+
std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline) {
std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline,
@@ -466,8 +948,9 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
AggregationRequest aggRequest(expCtx->ns, rawStages);
LiteParsedPipeline liteParsedPipeline(aggRequest);
+ auto hasChangeStream = liteParsedPipeline.hasChangeStream();
auto shardDispatchResults = dispatchShardPipeline(
- expCtx, expCtx->ns, aggRequest, liteParsedPipeline, std::move(pipeline), expCtx->collation);
+ aggRequest.serializeToCommandObj(), hasChangeStream, std::move(pipeline));
std::vector<ShardId> targetedShards;
targetedShards.reserve(shardDispatchResults.remoteCursors.size());
@@ -488,14 +971,281 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
cluster_aggregation_planner::addMergeCursorsSource(
mergePipeline.get(),
- liteParsedPipeline,
shardDispatchResults.commandForTargetedShards,
std::move(shardDispatchResults.remoteCursors),
targetedShards,
shardCursorsSortSpec,
- Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor());
+ Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ hasChangeStream);
return mergePipeline;
}
-} // namespace sharded_agg_helpers
-} // namespace mongo
+
+StatusWith<AggregationTargeter> AggregationTargeter::make(
+ OperationContext* opCtx,
+ const NamespaceString& executionNss,
+ const std::function<std::unique_ptr<Pipeline, PipelineDeleter>(
+ boost::optional<CachedCollectionRoutingInfo>)> buildPipelineFn,
+ stdx::unordered_set<NamespaceString> involvedNamespaces,
+ bool hasChangeStream,
+ bool allowedToPassthrough) {
+
+ // Check if any of the involved collections are sharded.
+ bool involvesShardedCollections = [&]() {
+ for (const auto& nss : involvedNamespaces) {
+ const auto resolvedNsRoutingInfo =
+ uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss));
+ if (resolvedNsRoutingInfo.cm()) {
+ return true;
+ }
+ }
+ return false;
+ }();
+
+ // Determine whether this aggregation must be dispatched to all shards in the cluster.
+ const bool mustRunOnAll = mustRunOnAllShards(executionNss, hasChangeStream);
+
+ // If the routing table is valid, we obtain a reference to it. If the table is not valid, then
+ // either the database does not exist, or there are no shards in the cluster. In the latter
+ // case, we always return an empty cursor. In the former case, if the requested aggregation is a
+ // $changeStream, we allow the operation to continue so that stream cursors can be established
+ // on the given namespace before the database or collection is actually created. If the database
+ // does not exist and this is not a $changeStream, then we return an empty cursor.
+ boost::optional<CachedCollectionRoutingInfo> routingInfo;
+ auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss);
+ if (executionNsRoutingInfoStatus.isOK()) {
+ routingInfo = std::move(executionNsRoutingInfoStatus.getValue());
+ } else if (!(hasChangeStream &&
+ executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) {
+ return executionNsRoutingInfoStatus.getStatus();
+ }
+
+ // If we don't have a routing table, then this is a $changeStream which must run on all shards.
+ invariant(routingInfo || (mustRunOnAll && hasChangeStream));
+
+ // A pipeline is allowed to passthrough to the primary shard iff the following conditions are
+ // met:
+ //
+ // 1. The namespace of the aggregate and any other involved namespaces are unsharded.
+ // 2. Is allowed to be forwarded to shards. For example, $currentOp with localOps: true should
+ // run locally on mongos and cannot be forwarded to a shard.
+ // 3. Does not need to run on all shards. For example, a pipeline with a $changeStream or
+ // $currentOp.
+ // 4. Doesn't need transformation via DocumentSource::serialize(). For example, list sessions
+ // needs to include information about users that can only be deduced on mongos.
+ if (routingInfo && !routingInfo->cm() && !mustRunOnAll && allowedToPassthrough &&
+ !involvesShardedCollections) {
+ return AggregationTargeter{TargetingPolicy::kPassthrough, nullptr, routingInfo};
+ } else {
+ auto pipeline = buildPipelineFn(routingInfo);
+ auto policy = pipeline->requiredToRunOnMongos() ? TargetingPolicy::kMongosRequired
+ : TargetingPolicy::kAnyShard;
+ return AggregationTargeter{policy, std::move(pipeline), routingInfo};
+ }
+}
+
+Status runPipelineOnPrimaryShard(OperationContext* opCtx,
+ const ClusterAggregate::Namespaces& namespaces,
+ const CachedDatabaseInfo& dbInfo,
+ boost::optional<ExplainOptions::Verbosity> explain,
+ Document serializedCommand,
+ const PrivilegeVector& privileges,
+ BSONObjBuilder* out) {
+ // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
+ // explain if necessary, and rewrites the result into a format safe to forward to shards.
+ BSONObj cmdObj =
+ CommandHelpers::filterCommandRequestForPassthrough(createPassthroughCommandForShard(
+ opCtx, serializedCommand, explain, boost::none, nullptr, BSONObj()));
+
+ const auto shardId = dbInfo.primary()->getId();
+ const auto cmdObjWithShardVersion = (shardId != ShardRegistry::kConfigServerShardId)
+ ? appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED())
+ : std::move(cmdObj);
+
+ MultiStatementTransactionRequestsSender ars(
+ opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ namespaces.executionNss.db().toString(),
+ {{shardId, appendDbVersionIfPresent(cmdObjWithShardVersion, dbInfo)}},
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent);
+ auto response = ars.next();
+ invariant(ars.done());
+
+ uassertStatusOK(response.swResponse);
+ auto commandStatus = getStatusFromCommandResult(response.swResponse.getValue().data);
+
+ if (ErrorCodes::isStaleShardVersionError(commandStatus.code())) {
+ uassertStatusOK(commandStatus.withContext("command failed because of stale config"));
+ } else if (ErrorCodes::isSnapshotError(commandStatus.code())) {
+ uassertStatusOK(
+ commandStatus.withContext("command failed because can not establish a snapshot"));
+ }
+
+ BSONObj result;
+ if (explain) {
+ // If this was an explain, then we get back an explain result object rather than a cursor.
+ result = response.swResponse.getValue().data;
+ } else {
+ result = uassertStatusOK(
+ storePossibleCursor(opCtx,
+ shardId,
+ *response.shardHostAndPort,
+ response.swResponse.getValue().data,
+ namespaces.requestedNss,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ Grid::get(opCtx)->getCursorManager(),
+ privileges,
+ TailableModeEnum::kNormal));
+ }
+
+ // First append the properly constructed writeConcernError. It will then be skipped
+ // in appendElementsUnique.
+ if (auto wcErrorElem = result["writeConcernError"]) {
+ appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *out);
+ }
+
+ out->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(result));
+
+ return getStatusFromCommandResult(out->asTempObj());
+}
+
+Status runPipelineOnMongoS(const ClusterAggregate::Namespaces& namespaces,
+ long long batchSize,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ BSONObjBuilder* result,
+ const PrivilegeVector& privileges) {
+ auto expCtx = pipeline->getContext();
+
+ // We should never receive a pipeline which cannot run on mongoS.
+ invariant(!expCtx->explain);
+ invariant(pipeline->canRunOnMongos());
+
+ // Verify that the first stage can produce input for the remainder of the pipeline.
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "Aggregation pipeline must be run on mongoS, but "
+ << pipeline->getSources().front()->getSourceName()
+ << " is not capable of producing input",
+ !pipeline->getSources().front()->constraints().requiresInputDocSource);
+
+ // Register the new mongoS cursor, and retrieve the initial batch of results.
+ auto cursorResponse = establishMergingMongosCursor(
+ expCtx->opCtx, batchSize, namespaces.requestedNss, std::move(pipeline), privileges);
+
+ // We don't need to storePossibleCursor or propagate writeConcern errors; a pipeline with
+ // writing stages like $out can never run on mongoS. Filter the command response and return
+ // immediately.
+ CommandHelpers::filterCommandReplyForPassthrough(cursorResponse, result);
+ return getStatusFromCommandResult(result->asTempObj());
+}
+
+Status dispatchPipelineAndMerge(OperationContext* opCtx,
+ AggregationTargeter targeter,
+ Document serializedCommand,
+ long long batchSize,
+ const ClusterAggregate::Namespaces& namespaces,
+ const PrivilegeVector& privileges,
+ BSONObjBuilder* result,
+ bool hasChangeStream) {
+ auto expCtx = targeter.pipeline->getContext();
+ // If not, split the pipeline as necessary and dispatch to the relevant shards.
+ auto shardDispatchResults =
+ dispatchShardPipeline(serializedCommand, hasChangeStream, std::move(targeter.pipeline));
+
+ // If the operation is an explain, then we verify that it succeeded on all targeted
+ // shards, write the results to the output builder, and return immediately.
+ if (expCtx->explain) {
+ return appendExplainResults(std::move(shardDispatchResults), expCtx, result);
+ }
+
+ // If this isn't an explain, then we must have established cursors on at least one
+ // shard.
+ invariant(shardDispatchResults.remoteCursors.size() > 0);
+
+ // If we sent the entire pipeline to a single shard, store the remote cursor and return.
+ if (!shardDispatchResults.splitPipeline) {
+ invariant(shardDispatchResults.remoteCursors.size() == 1);
+ auto&& remoteCursor = std::move(shardDispatchResults.remoteCursors.front());
+ const auto shardId = remoteCursor->getShardId().toString();
+ const auto reply = uassertStatusOK(storePossibleCursor(opCtx,
+ namespaces.requestedNss,
+ std::move(remoteCursor),
+ privileges,
+ expCtx->tailableMode));
+ return appendCursorResponseToCommandResult(shardId, reply, result);
+ }
+
+ // If we have the exchange spec then dispatch all consumers.
+ if (shardDispatchResults.exchangeSpec) {
+ shardDispatchResults = dispatchExchangeConsumerPipeline(
+ expCtx, namespaces.executionNss, serializedCommand, &shardDispatchResults);
+ }
+
+ // If we reach here, we have a merge pipeline to dispatch.
+ return dispatchMergingPipeline(expCtx,
+ namespaces,
+ serializedCommand,
+ batchSize,
+ targeter.routingInfo,
+ std::move(shardDispatchResults),
+ result,
+ privileges,
+ hasChangeStream);
+}
+
+std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID(
+ const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ const NamespaceString& nss,
+ const BSONObj& collation) {
+ const bool collectionIsSharded = (routingInfo && routingInfo->cm());
+ const bool collectionIsNotSharded = (routingInfo && !routingInfo->cm());
+
+ // If this is a collectionless aggregation, we immediately return the user-
+ // defined collation if one exists, or an empty BSONObj otherwise. Collectionless aggregations
+ // generally run on the 'admin' database, the standard logic would attempt to resolve its
+ // non-existent UUID and collation by sending a specious 'listCollections' command to the config
+ // servers.
+ if (nss.isCollectionlessAggregateNS()) {
+ return {collation, boost::none};
+ }
+
+ // If the collection is unsharded, obtain collInfo from the primary shard.
+ const auto unshardedCollInfo = collectionIsNotSharded
+ ? getUnshardedCollInfo(routingInfo->db().primary().get(), nss)
+ : BSONObj();
+
+ // Return the collection UUID if available, or boost::none otherwise.
+ const auto getUUID = [&]() -> auto {
+ if (collectionIsSharded) {
+ return routingInfo->cm()->getUUID();
+ } else {
+ return unshardedCollInfo["info"] && unshardedCollInfo["info"]["uuid"]
+ ? boost::optional<UUID>{uassertStatusOK(
+ UUID::parse(unshardedCollInfo["info"]["uuid"]))}
+ : boost::optional<UUID>{boost::none};
+ }
+ };
+
+ // If the collection exists, return its default collation, or the simple
+ // collation if no explicit default is present. If the collection does not
+ // exist, return an empty BSONObj.
+ const auto getCollation = [&]() -> auto {
+ if (!collectionIsSharded && !collectionIsNotSharded) {
+ return BSONObj();
+ }
+ if (collectionIsNotSharded) {
+ return getDefaultCollationForUnshardedCollection(unshardedCollInfo);
+ } else {
+ return routingInfo->cm()->getDefaultCollator()
+ ? routingInfo->cm()->getDefaultCollator()->getSpec().toBSON()
+ : CollationSpec::kSimpleSpec;
+ }
+ };
+
+ // If the user specified an explicit collation, we always adopt it. Otherwise,
+ // obtain the collection default or simple collation as appropriate, and return
+ // it along with the collection's UUID.
+ return {collation.isEmpty() ? getCollation() : collation, getUUID()};
+}
+
+} // namespace mongo::sharded_agg_helpers
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h
index 15e0dd51c2e..b8c25a42510 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.h
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.h
@@ -32,6 +32,7 @@
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/s/async_requests_sender.h"
#include "mongo/s/catalog_cache.h"
+#include "mongo/s/query/cluster_aggregate.h"
#include "mongo/s/query/cluster_aggregation_planner.h"
namespace mongo {
@@ -67,63 +68,78 @@ struct DispatchShardPipelineResults {
boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec;
};
-Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req);
+/**
+ * This structure contains information for targeting an aggregation pipeline in a sharded cluster.
+ */
+struct AggregationTargeter {
+ /**
+ * Populates and returns targeting info for an aggregation pipeline on the given namespace
+ * 'executionNss'.
+ */
+ static StatusWith<AggregationTargeter> make(
+ OperationContext* opCtx,
+ const NamespaceString& executionNss,
+ const std::function<std::unique_ptr<Pipeline, PipelineDeleter>(
+ boost::optional<CachedCollectionRoutingInfo>)> buildPipelineFn,
+ stdx::unordered_set<NamespaceString> involvedNamespaces,
+ bool hasChangeStream,
+ bool allowedToPassthrough);
+
+ enum TargetingPolicy {
+ kPassthrough,
+ kMongosRequired,
+ kAnyShard,
+ } policy;
+
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline;
+ boost::optional<CachedCollectionRoutingInfo> routingInfo;
+};
+
+Status runPipelineOnPrimaryShard(OperationContext* opCtx,
+ const ClusterAggregate::Namespaces& namespaces,
+ const CachedDatabaseInfo& dbInfo,
+ boost::optional<ExplainOptions::Verbosity> explain,
+ Document serializedCommand,
+ const PrivilegeVector& privileges,
+ BSONObjBuilder* out);
-bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe);
+/**
+ * Runs a pipeline on mongoS, having first validated that it is eligible to do so. This can be a
+ * pipeline which is split for merging, or an intact pipeline which must run entirely on mongoS.
+ */
+Status runPipelineOnMongoS(const ClusterAggregate::Namespaces& namespaces,
+ long long batchSize,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ BSONObjBuilder* result,
+ const PrivilegeVector& privileges);
-StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx,
- const NamespaceString& execNss);
+/**
+ * Dispatches the pipeline in 'targeter' to the shards that are involved, and merges the results if
+ * necessary on either mongos or a randomly designated shard.
+ */
+Status dispatchPipelineAndMerge(OperationContext* opCtx,
+ sharded_agg_helpers::AggregationTargeter targeter,
+ Document serializedCommand,
+ long long batchSize,
+ const ClusterAggregate::Namespaces& namespaces,
+ const PrivilegeVector& privileges,
+ BSONObjBuilder* result,
+ bool hasChangeStream);
/**
- * Targets shards for the pipeline and returns a struct with the remote cursors or results, and the
- * pipeline that will need to be executed to merge the results from the remotes. If a stale shard
- * version is encountered, refreshes the routing table and tries again.
+ * Returns the "collation" and "uuid" for the collection given by "nss" with the following
+ * semantics:
+ * - The "collation" parameter will be set to the default collation for the collection or the
+ * simple collation if there is no default. If the collection does not exist or if the aggregate
+ * is on the collectionless namespace, this will be set to an empty object.
+ * - The "uuid" is retrieved from the chunk manager for sharded collections or the listCollections
+ * output for unsharded collections. The UUID will remain unset if the aggregate is on the
+ * collectionless namespace.
*/
-DispatchShardPipelineResults dispatchShardPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& executionNss,
- const AggregationRequest& aggRequest,
- const LiteParsedPipeline& liteParsedPipeline,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- BSONObj collationObj);
-
-std::set<ShardId> getTargetedShards(OperationContext* opCtx,
- bool mustRunOnAllShards,
- const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
- const BSONObj shardQuery,
- const BSONObj collation);
-
-std::vector<RemoteCursor> establishShardCursors(
- OperationContext* opCtx,
+std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID(
+ const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
const NamespaceString& nss,
- const LiteParsedPipeline& litePipe,
- boost::optional<CachedCollectionRoutingInfo>& routingInfo,
- const std::set<ShardId>& shardIds,
- const BSONObj& cmdObj,
- const AggregationRequest& request,
- const ReadPreferenceSetting& readPref);
-
-BSONObj createCommandForTargetedShards(
- OperationContext* opCtx,
- const AggregationRequest& request,
- const LiteParsedPipeline& litePipe,
- const cluster_aggregation_planner::SplitPipeline& splitPipeline,
- const BSONObj collationObj,
- const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec,
- const boost::optional<RuntimeConstants>& constants,
- bool needsMerge);
-
-BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
- const AggregationRequest& request,
- const boost::optional<RuntimeConstants>& constants,
- Pipeline* pipeline,
- BSONObj collationObj);
-
-BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
- OperationContext* opCtx,
- const AggregationRequest& request,
- const boost::optional<RuntimeConstants>& constants,
- BSONObj collationObj);
+ const BSONObj& collation);
/**
* For a sharded collection, establishes remote cursors on each shard that may have results, and
diff --git a/src/mongo/s/commands/cluster_map_reduce.cpp b/src/mongo/s/commands/cluster_map_reduce.cpp
index 59b6e1680ae..9ae34eca3c5 100644
--- a/src/mongo/s/commands/cluster_map_reduce.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce.cpp
@@ -201,6 +201,43 @@ BSONObj fixForShards(const BSONObj& orig,
return appendAllowImplicitCreate(b.obj(), false);
}
+/**
+ * Outline for sharded map reduce for sharded output, $out replace:
+ *
+ * ============= mongos =============
+ * 1. Send map reduce command to all relevant shards with some extra info like the value for
+ * the chunkSize and the name of the temporary output collection.
+ *
+ * ============= shard =============
+ * 2. Does normal map reduce.
+ *
+ * 3. Calls splitVector on itself against the output collection and puts the results into the
+ * response object.
+ *
+ * ============= mongos =============
+ * 4. If the output collection is *not* sharded, uses the information from splitVector to
+ * create a pre-split sharded collection.
+ *
+ * 5. Grabs the distributed lock for the final output collection.
+ *
+ * 6. Sends mapReduce.shardedfinish.
+ *
+ * ============= shard =============
+ * 7. Extracts the list of shards from the mapReduce.shardedfinish and performs a broadcast
+ * query against all of them to obtain all documents that this shard owns.
+ *
+ * 8. Performs the reduce operation against every document from step #7 and outputs them to
+ * another temporary collection. Also keeps track of the BSONObject size of every "reduced"
+ * document for each chunk range.
+ *
+ * 9. Atomically drops the old output collection and renames the temporary collection to the
+ * output collection.
+ *
+ * ============= mongos =============
+ * 10. Releases the distributed lock acquired at step #5.
+ *
+ * 11. Inspects the BSONObject size from step #8 and determines if it needs to split.
+ */
bool runMapReduce(OperationContext* opCtx,
const std::string& dbname,
const BSONObj& cmdObj,
diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp
index 9375d1c2176..2c2012c3af5 100644
--- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp
@@ -31,56 +31,185 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/client/connpool.h"
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/client.h"
#include "mongo/db/commands.h"
+#include "mongo/db/commands/map_reduce_agg.h"
#include "mongo/db/commands/map_reduce_gen.h"
+#include "mongo/db/commands/mr_common.h"
+#include "mongo/db/pipeline/mongos_process_interface.h"
+#include "mongo/db/pipeline/sharded_agg_helpers.h"
+#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/getmore_request.h"
+#include "mongo/db/query/map_reduce_output_format.h"
+#include "mongo/s/catalog_cache.h"
#include "mongo/s/commands/cluster_map_reduce_agg.h"
namespace mongo {
+namespace {
-// Exhaust the cursor from the aggregation response and extract results and statistics.
-std::vector<BSONObj> getAllAggregationResults(OperationContext* opCtx,
- const std::string& dbname,
- CursorResponse& response) {
- CursorId cursorId = response.getCursorId();
- auto fullBatch = response.releaseBatch();
- while (cursorId != 0) {
- GetMoreRequest request(
- response.getNSS(), cursorId, boost::none, boost::none, boost::none, boost::none);
- BSONObj getMoreResponse = CommandHelpers::runCommandDirectly(
- opCtx, OpMsgRequest::fromDBAndBody(dbname, request.toBSON()));
- auto getMoreCursorResponse = CursorResponse::parseFromBSONThrowing(getMoreResponse);
- auto nextBatch = getMoreCursorResponse.releaseBatch();
- fullBatch.insert(fullBatch.end(), nextBatch.begin(), nextBatch.end());
- cursorId = getMoreCursorResponse.getCursorId();
+auto makeExpressionContext(OperationContext* opCtx,
+ const MapReduce& parsedMr,
+ boost::optional<CachedCollectionRoutingInfo> routingInfo) {
+ // Populate the collection UUID and the appropriate collation to use.
+ auto nss = parsedMr.getNamespace();
+ auto [collationObj, uuid] = sharded_agg_helpers::getCollationAndUUID(
+ routingInfo, nss, parsedMr.getCollation().get_value_or(BSONObj()));
+
+ std::unique_ptr<CollatorInterface> resolvedCollator;
+ if (!collationObj.isEmpty()) {
+ // This will be null if attempting to build an interface for the simple collator.
+ resolvedCollator = uassertStatusOK(
+ CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collationObj));
+ }
+
+ // Resolve involved namespaces.
+ StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
+ resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{});
+ if (parsedMr.getOutOptions().getOutputType() != OutputType::InMemory) {
+ auto outNss = NamespaceString{parsedMr.getOutOptions().getDatabaseName()
+ ? *parsedMr.getOutOptions().getDatabaseName()
+ : parsedMr.getNamespace().db(),
+ parsedMr.getOutOptions().getCollectionName()};
+ resolvedNamespaces.try_emplace(outNss.coll(), outNss, std::vector<BSONObj>{});
}
- return fullBatch;
+
+ auto expCtx = make_intrusive<ExpressionContext>(
+ opCtx,
+ boost::none, // explain
+ false, // fromMongos
+ false, // needsmerge
+ true, // allowDiskUse
+ parsedMr.getBypassDocumentValidation().get_value_or(false),
+ nss,
+ collationObj,
+ boost::none, // runtimeConstants
+ std::move(resolvedCollator),
+ std::make_shared<MongoSInterface>(),
+ std::move(resolvedNamespaces),
+ boost::none); // uuid
+ expCtx->inMongos = true;
+ return expCtx;
}
+Document serializeToCommand(BSONObj originalCmd, const MapReduce& parsedMr, Pipeline* pipeline) {
+ MutableDocument translatedCmd;
+
+ translatedCmd["aggregate"] = Value(parsedMr.getNamespace().coll());
+ translatedCmd["pipeline"] = Value(pipeline->serialize());
+ translatedCmd["cursor"] = Value(Document{{"batchSize", std::numeric_limits<long long>::max()}});
+ translatedCmd["allowDiskUse"] = Value(true);
+ translatedCmd["fromMongos"] = Value(true);
+
+ // Append generic command options.
+ for (const auto& elem : CommandHelpers::appendPassthroughFields(originalCmd, BSONObj())) {
+ translatedCmd[elem.fieldNameStringData()] = Value(elem);
+ }
+ return translatedCmd.freeze();
+}
+
+} // namespace
+
bool runAggregationMapReduce(OperationContext* opCtx,
const std::string& dbname,
const BSONObj& cmd,
std::string& errmsg,
BSONObjBuilder& result) {
- // Pretend we have built the appropriate pipeline and aggregation request.
- auto mrRequest = MapReduce::parse(IDLParserErrorContext("MapReduce"), cmd);
- const BSONObj aggRequest =
- fromjson(str::stream() << "{aggregate: '" << mrRequest.getNamespace().coll()
- << "', pipeline: [ { $group: { _id: { user: \"$user\" },"
- << "count: { $sum: 1 } } } ], cursor: {}}");
- BSONObj aggResult = CommandHelpers::runCommandDirectly(
- opCtx, OpMsgRequest::fromDBAndBody(dbname, std::move(aggRequest)));
-
- bool inMemory = mrRequest.getOutOptions().getOutputType() == OutputType::InMemory;
- std::string outColl = mrRequest.getOutOptions().getCollectionName();
- // Either inline response specified or we have an output collection.
- invariant(inMemory ^ !outColl.empty());
-
- auto cursorResponse = CursorResponse::parseFromBSONThrowing(aggResult);
- auto completeBatch = getAllAggregationResults(opCtx, dbname, cursorResponse);
- [[maybe_unused]] CursorResponse completeCursor(
- cursorResponse.getNSS(), cursorResponse.getCursorId(), std::move(completeBatch));
+ auto parsedMr = MapReduce::parse(IDLParserErrorContext("MapReduce"), cmd);
+ stdx::unordered_set<NamespaceString> involvedNamespaces{parsedMr.getNamespace()};
+ auto resolvedOutNss = NamespaceString{parsedMr.getOutOptions().getDatabaseName()
+ ? *parsedMr.getOutOptions().getDatabaseName()
+ : parsedMr.getNamespace().db(),
+ parsedMr.getOutOptions().getCollectionName()};
+
+ if (parsedMr.getOutOptions().getOutputType() != OutputType::InMemory) {
+ involvedNamespaces.insert(resolvedOutNss);
+ }
+
+ const auto pipelineBuilder = [&](boost::optional<CachedCollectionRoutingInfo> routingInfo) {
+ return map_reduce_common::translateFromMR(
+ parsedMr, makeExpressionContext(opCtx, parsedMr, routingInfo));
+ };
+
+ auto namespaces =
+ ClusterAggregate::Namespaces{parsedMr.getNamespace(), parsedMr.getNamespace()};
+
+ // Auth has already been checked for the original mapReduce command, no need to recheck here.
+ PrivilegeVector privileges;
+
+ // This holds the raw results from the aggregation, which will be reformatted to match the
+ // expected mapReduce output.
+ BSONObjBuilder tempResults;
+
+ auto targeter = uassertStatusOK(
+ sharded_agg_helpers::AggregationTargeter::make(opCtx,
+ parsedMr.getNamespace(),
+ pipelineBuilder,
+ involvedNamespaces,
+ false, // hasChangeStream
+ true)); // allowedToPassthrough
+ switch (targeter.policy) {
+ case sharded_agg_helpers::AggregationTargeter::TargetingPolicy::kPassthrough: {
+ // For the passthrough case, the targeter will not build a pipeline since its not needed
+ // in the normal aggregation path. For this translation, though, we need to build the
+ // pipeline to serialize and send to the primary shard.
+ auto serialized =
+ serializeToCommand(cmd, parsedMr, pipelineBuilder(targeter.routingInfo).get());
+ uassertStatusOK(
+ sharded_agg_helpers::runPipelineOnPrimaryShard(opCtx,
+ namespaces,
+ targeter.routingInfo->db(),
+ boost::none, // explain
+ std::move(serialized),
+ privileges,
+ &tempResults));
+ break;
+ }
+
+ case sharded_agg_helpers::AggregationTargeter::TargetingPolicy::kMongosRequired: {
+ // Pipelines generated from mapReduce should never be required to run on mongos.
+ uasserted(31291, "Internal error during mapReduce translation");
+ break;
+ }
+
+ case sharded_agg_helpers::AggregationTargeter::TargetingPolicy::kAnyShard: {
+ auto serialized = serializeToCommand(cmd, parsedMr, targeter.pipeline.get());
+ uassertStatusOK(
+ sharded_agg_helpers::dispatchPipelineAndMerge(opCtx,
+ std::move(targeter),
+ std::move(serialized),
+ std::numeric_limits<long long>::max(),
+ namespaces,
+ privileges,
+ &tempResults,
+ false)); // hasChangeStream
+ break;
+ }
+ }
+
+ auto aggResults = tempResults.done();
+ if (parsedMr.getOutOptions().getOutputType() == OutputType::InMemory) {
+ auto exhaustedResults = [&]() {
+ BSONArrayBuilder bab;
+ for (auto&& elem : aggResults["cursor"]["firstBatch"].Obj())
+ bab.append(elem.embeddedObject());
+ return bab.arr();
+ }();
+ map_reduce_output_format::appendInlineResponse(std::move(exhaustedResults),
+ parsedMr.getVerbose().get_value_or(false),
+ true, // inMongos
+ &result);
+ } else {
+ map_reduce_output_format::appendOutResponse(
+ parsedMr.getOutOptions().getDatabaseName(),
+ parsedMr.getOutOptions().getCollectionName(),
+ boost::get_optional_value_or(parsedMr.getVerbose(), false),
+ true, // inMongos
+ &result);
+ }
return true;
}
diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
index 0c40f522e1b..c0abf8efd87 100644
--- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp
@@ -41,43 +41,6 @@
namespace mongo {
namespace {
-/**
- * Outline for sharded map reduce for sharded output, $out replace:
- *
- * ============= mongos =============
- * 1. Send map reduce command to all relevant shards with some extra info like the value for
- * the chunkSize and the name of the temporary output collection.
- *
- * ============= shard =============
- * 2. Does normal map reduce.
- *
- * 3. Calls splitVector on itself against the output collection and puts the results into the
- * response object.
- *
- * ============= mongos =============
- * 4. If the output collection is *not* sharded, uses the information from splitVector to
- * create a pre-split sharded collection.
- *
- * 5. Grabs the distributed lock for the final output collection.
- *
- * 6. Sends mapReduce.shardedfinish.
- *
- * ============= shard =============
- * 7. Extracts the list of shards from the mapReduce.shardedfinish and performs a broadcast
- * query against all of them to obtain all documents that this shard owns.
- *
- * 8. Performs the reduce operation against every document from step #7 and outputs them to
- * another temporary collection. Also keeps track of the BSONObject size of every "reduced"
- * document for each chunk range.
- *
- * 9. Atomically drops the old output collection and renames the temporary collection to the
- * output collection.
- *
- * ============= mongos =============
- * 10. Releases the distributed lock acquired at step #5.
- *
- * 11. Inspects the BSONObject size from step #8 and determines if it needs to split.
- */
class ClusterMapReduceCommand : public MapReduceCommandBase {
public:
ClusterMapReduceCommand() = default;
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 953f4c48d05..9de374b9202 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -35,7 +35,6 @@
#include <boost/intrusive_ptr.hpp>
-#include "mongo/bson/util/bson_extract.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
@@ -75,503 +74,34 @@
#include "mongo/s/query/store_possible_cursor.h"
#include "mongo/s/stale_exception.h"
#include "mongo/s/transaction_router.h"
-#include "mongo/util/fail_point.h"
#include "mongo/util/log.h"
#include "mongo/util/net/socket_utils.h"
namespace mongo {
-using SplitPipeline = cluster_aggregation_planner::SplitPipeline;
-
-MONGO_FAIL_POINT_DEFINE(clusterAggregateFailToEstablishMergingShardCursor);
-MONGO_FAIL_POINT_DEFINE(clusterAggregateFailToDispatchExchangeConsumerPipeline);
-
constexpr unsigned ClusterAggregate::kMaxViewRetries;
namespace {
-Status appendCursorResponseToCommandResult(const ShardId& shardId,
- const BSONObj cursorResponse,
- BSONObjBuilder* result) {
- // If a write error was encountered, append it to the output buffer first.
- if (auto wcErrorElem = cursorResponse["writeConcernError"]) {
- appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *result);
- }
-
- // Pass the results from the remote shard into our command response.
- result->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(cursorResponse));
- return getStatusFromCommandResult(result->asTempObj());
-}
-
-BSONObj createCommandForMergingShard(const AggregationRequest& request,
- const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
- const ShardId& shardId,
- bool mergingShardContributesData,
- const Pipeline* pipelineForMerging) {
- MutableDocument mergeCmd(request.serializeToCommandObj());
-
- mergeCmd["pipeline"] = Value(pipelineForMerging->serialize());
- mergeCmd[AggregationRequest::kFromMongosName] = Value(true);
-
- mergeCmd[AggregationRequest::kRuntimeConstants] =
- Value(mergeCtx->getRuntimeConstants().toBSON());
-
- // If the user didn't specify a collation already, make sure there's a collation attached to
- // the merge command, since the merging shard may not have the collection metadata.
- if (mergeCmd.peek()["collation"].missing()) {
- mergeCmd["collation"] = mergeCtx->getCollator()
- ? Value(mergeCtx->getCollator()->getSpec().toBSON())
- : Value(Document{CollationSpec::kSimpleSpec});
- }
-
- const auto txnRouter = TransactionRouter::get(mergeCtx->opCtx);
- if (txnRouter && mergingShardContributesData) {
- // Don't include a readConcern since we can only include read concerns on the _first_
- // command sent to a participant per transaction. Assuming the merging shard is a
- // participant, it will already have received another 'aggregate' command earlier which
- // contained a readConcern.
-
- mergeCmd.remove("readConcern");
- }
-
- return appendAllowImplicitCreate(mergeCmd.freeze().toBson(), false);
-}
-
-sharded_agg_helpers::DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& executionNss,
- const AggregationRequest& request,
- const LiteParsedPipeline& litePipe,
- BSONObj collationObj,
- sharded_agg_helpers::DispatchShardPipelineResults* shardDispatchResults) {
- invariant(!litePipe.hasChangeStream());
- auto opCtx = expCtx->opCtx;
-
- if (MONGO_unlikely(clusterAggregateFailToDispatchExchangeConsumerPipeline.shouldFail())) {
- log() << "clusterAggregateFailToDispatchExchangeConsumerPipeline fail point enabled.";
- uasserted(ErrorCodes::FailPointEnabled,
- "Asserting on exhange consumer pipeline dispatch due to failpoint.");
- }
-
- // For all consumers construct a request with appropriate cursor ids and send to shards.
- std::vector<std::pair<ShardId, BSONObj>> requests;
- auto numConsumers = shardDispatchResults->exchangeSpec->consumerShards.size();
- std::vector<SplitPipeline> consumerPipelines;
- for (size_t idx = 0; idx < numConsumers; ++idx) {
- // Pick this consumer's cursors from producers.
- std::vector<OwnedRemoteCursor> producers;
- for (size_t p = 0; p < shardDispatchResults->numProducers; ++p) {
- producers.emplace_back(
- std::move(shardDispatchResults->remoteCursors[p * numConsumers + idx]));
- }
-
- // Create a pipeline for a consumer and add the merging stage.
- auto consumerPipeline = uassertStatusOK(Pipeline::create(
- shardDispatchResults->splitPipeline->mergePipeline->getSources(), expCtx));
-
- cluster_aggregation_planner::addMergeCursorsSource(
- consumerPipeline.get(),
- litePipe,
- BSONObj(),
- std::move(producers),
- {},
- shardDispatchResults->splitPipeline->shardCursorsSortSpec,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor());
-
- consumerPipelines.emplace_back(std::move(consumerPipeline), nullptr, boost::none);
-
- auto consumerCmdObj =
- sharded_agg_helpers::createCommandForTargetedShards(opCtx,
- request,
- litePipe,
- consumerPipelines.back(),
- collationObj,
- boost::none,
- expCtx->getRuntimeConstants(),
- false);
-
- requests.emplace_back(shardDispatchResults->exchangeSpec->consumerShards[idx],
- consumerCmdObj);
- }
- auto cursors = establishCursors(opCtx,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- executionNss,
- ReadPreferenceSetting::get(opCtx),
- requests,
- false /* do not allow partial results */);
-
- // Convert remote cursors into a vector of "owned" cursors.
- std::vector<OwnedRemoteCursor> ownedCursors;
- for (auto&& cursor : cursors) {
- ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), executionNss));
- }
-
- // The merging pipeline is just a union of the results from each of the shards involved on the
- // consumer side of the exchange.
- auto mergePipeline = uassertStatusOK(Pipeline::create({}, expCtx));
- mergePipeline->setSplitState(Pipeline::SplitState::kSplitForMerge);
-
- SplitPipeline splitPipeline{nullptr, std::move(mergePipeline), boost::none};
-
- // Relinquish ownership of the local consumer pipelines' cursors as each shard is now
- // responsible for its own producer cursors.
- for (const auto& pipeline : consumerPipelines) {
- const auto& mergeCursors =
- static_cast<DocumentSourceMergeCursors*>(pipeline.shardsPipeline->peekFront());
- mergeCursors->dismissCursorOwnership();
- }
- return sharded_agg_helpers::DispatchShardPipelineResults{false,
- std::move(ownedCursors),
- {} /*TODO SERVER-36279*/,
- std::move(splitPipeline),
- nullptr,
- BSONObj(),
- numConsumers};
-}
-
-Status appendExplainResults(sharded_agg_helpers::DispatchShardPipelineResults&& dispatchResults,
- const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
- BSONObjBuilder* result) {
- if (dispatchResults.splitPipeline) {
- auto* mergePipeline = dispatchResults.splitPipeline->mergePipeline.get();
- const char* mergeType = [&]() {
- if (mergePipeline->canRunOnMongos()) {
- return "mongos";
- } else if (dispatchResults.exchangeSpec) {
- return "exchange";
- } else if (mergePipeline->needsPrimaryShardMerger()) {
- return "primaryShard";
- } else {
- return "anyShard";
- }
- }();
-
- *result << "mergeType" << mergeType;
-
- MutableDocument pipelinesDoc;
- pipelinesDoc.addField("shardsPart",
- Value(dispatchResults.splitPipeline->shardsPipeline->writeExplainOps(
- *mergeCtx->explain)));
- if (dispatchResults.exchangeSpec) {
- BSONObjBuilder bob;
- dispatchResults.exchangeSpec->exchangeSpec.serialize(&bob);
- bob.append("consumerShards", dispatchResults.exchangeSpec->consumerShards);
- pipelinesDoc.addField("exchange", Value(bob.obj()));
- }
- pipelinesDoc.addField("mergerPart",
- Value(mergePipeline->writeExplainOps(*mergeCtx->explain)));
-
- *result << "splitPipeline" << pipelinesDoc.freeze();
- } else {
- *result << "splitPipeline" << BSONNULL;
- }
-
- BSONObjBuilder shardExplains(result->subobjStart("shards"));
- for (const auto& shardResult : dispatchResults.remoteExplainOutput) {
- invariant(shardResult.shardHostAndPort);
-
- uassertStatusOK(shardResult.swResponse.getStatus());
- uassertStatusOK(getStatusFromCommandResult(shardResult.swResponse.getValue().data));
-
- auto shardId = shardResult.shardId.toString();
- const auto& data = shardResult.swResponse.getValue().data;
- BSONObjBuilder explain(shardExplains.subobjStart(shardId));
- explain << "host" << shardResult.shardHostAndPort->toString();
- if (auto stagesElement = data["stages"]) {
- explain << "stages" << stagesElement;
- } else {
- auto queryPlannerElement = data["queryPlanner"];
- uassert(51157,
- str::stream() << "Malformed explain response received from shard " << shardId
- << ": " << data.toString(),
- queryPlannerElement);
- explain << "queryPlanner" << queryPlannerElement;
- if (auto executionStatsElement = data["executionStats"]) {
- explain << "executionStats" << executionStatsElement;
- }
- }
- }
-
- return Status::OK();
-}
-
-AsyncRequestsSender::Response establishMergingShardCursor(OperationContext* opCtx,
- const NamespaceString& nss,
- const AggregationRequest& request,
- const BSONObj mergeCmdObj,
- const ShardId& mergingShardId) {
- if (MONGO_unlikely(clusterAggregateFailToEstablishMergingShardCursor.shouldFail())) {
- log() << "clusterAggregateFailToEstablishMergingShardCursor fail point enabled.";
- uasserted(ErrorCodes::FailPointEnabled,
- "Asserting on establishing merging shard cursor due to failpoint.");
- }
-
- MultiStatementTransactionRequestsSender ars(
- opCtx,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- nss.db().toString(),
- {{mergingShardId, mergeCmdObj}},
- ReadPreferenceSetting::get(opCtx),
- sharded_agg_helpers::getDesiredRetryPolicy(request));
- const auto response = ars.next();
- invariant(ars.done());
- return response;
-}
-
-BSONObj establishMergingMongosCursor(OperationContext* opCtx,
- const AggregationRequest& request,
- const NamespaceString& requestedNss,
- const LiteParsedPipeline& liteParsedPipeline,
- std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging,
- const PrivilegeVector& privileges) {
-
- ClusterClientCursorParams params(requestedNss, ReadPreferenceSetting::get(opCtx));
-
- params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned();
- params.tailableMode = pipelineForMerging->getContext()->tailableMode;
- // A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch
- // size we pass here is used for getMores, so do not specify a batch size if the initial request
- // had a batch size of 0.
- params.batchSize = request.getBatchSize() == 0
- ? boost::none
- : boost::optional<long long>(request.getBatchSize());
- params.lsid = opCtx->getLogicalSessionId();
- params.txnNumber = opCtx->getTxnNumber();
- params.originatingPrivileges = privileges;
-
- if (TransactionRouter::get(opCtx)) {
- params.isAutoCommit = false;
- }
-
- auto ccc = cluster_aggregation_planner::buildClusterCursor(
- opCtx, std::move(pipelineForMerging), std::move(params));
-
- auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
-
- rpc::OpMsgReplyBuilder replyBuilder;
- CursorResponseBuilder::Options options;
- options.isInitialResponse = true;
-
- CursorResponseBuilder responseBuilder(&replyBuilder, options);
- bool stashedResult = false;
-
- for (long long objCount = 0; objCount < request.getBatchSize(); ++objCount) {
- ClusterQueryResult next;
- try {
- next = uassertStatusOK(ccc->next(RouterExecStage::ExecContext::kInitialFind));
- } catch (const ExceptionFor<ErrorCodes::CloseChangeStream>&) {
- // This exception is thrown when a $changeStream stage encounters an event
- // that invalidates the cursor. We should close the cursor and return without
- // error.
- cursorState = ClusterCursorManager::CursorState::Exhausted;
- break;
- }
-
- // Check whether we have exhausted the pipeline's results.
- if (next.isEOF()) {
- // We reached end-of-stream. If the cursor is not tailable, then we mark it as
- // exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even when
- // we reach end-of-stream. However, if all the remote cursors are exhausted, there is no
- // hope of returning data and thus we need to close the mongos cursor as well.
- if (!ccc->isTailable() || ccc->remotesExhausted()) {
- cursorState = ClusterCursorManager::CursorState::Exhausted;
- }
- break;
- }
-
- // If this result will fit into the current batch, add it. Otherwise, stash it in the cursor
- // to be returned on the next getMore.
- auto nextObj = *next.getResult();
-
- if (!FindCommon::haveSpaceForNext(nextObj, objCount, responseBuilder.bytesUsed())) {
- ccc->queueResult(nextObj);
- stashedResult = true;
- break;
- }
-
- // Set the postBatchResumeToken. For non-$changeStream aggregations, this will be empty.
- responseBuilder.setPostBatchResumeToken(ccc->getPostBatchResumeToken());
- responseBuilder.append(nextObj);
- }
-
- // For empty batches, or in the case where the final result was added to the batch rather than
- // being stashed, we update the PBRT here to ensure that it is the most recent available.
- if (!stashedResult) {
- responseBuilder.setPostBatchResumeToken(ccc->getPostBatchResumeToken());
- }
-
- ccc->detachFromOperationContext();
-
- int nShards = ccc->getNumRemotes();
- CursorId clusterCursorId = 0;
-
- if (cursorState == ClusterCursorManager::CursorState::NotExhausted) {
- auto authUsers = AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames();
- clusterCursorId = uassertStatusOK(Grid::get(opCtx)->getCursorManager()->registerCursor(
- opCtx,
- ccc.releaseCursor(),
- requestedNss,
- ClusterCursorManager::CursorType::MultiTarget,
- ClusterCursorManager::CursorLifetime::Mortal,
- authUsers));
- }
-
- // Fill out the aggregation metrics in CurOp.
- if (clusterCursorId > 0) {
- CurOp::get(opCtx)->debug().cursorid = clusterCursorId;
- }
- CurOp::get(opCtx)->debug().nShards = std::max(CurOp::get(opCtx)->debug().nShards, nShards);
- CurOp::get(opCtx)->debug().cursorExhausted = (clusterCursorId == 0);
- CurOp::get(opCtx)->debug().nreturned = responseBuilder.numDocs();
-
- responseBuilder.done(clusterCursorId, requestedNss.ns());
-
- auto bodyBuilder = replyBuilder.getBodyBuilder();
- CommandHelpers::appendSimpleCommandStatus(bodyBuilder, true);
- bodyBuilder.doneFast();
-
- return replyBuilder.releaseBody();
-}
-
-/**
- * Returns the output of the listCollections command filtered to the namespace 'nss'.
- */
-BSONObj getUnshardedCollInfo(const Shard* primaryShard, const NamespaceString& nss) {
- ScopedDbConnection conn(primaryShard->getConnString());
- std::list<BSONObj> all =
- conn->getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll()));
- if (all.empty()) {
- // Collection does not exist, return an empty object.
- return BSONObj();
- }
- return all.front();
-}
-
-/**
- * Returns the collection default collation or the simple collator if there is no default. If the
- * collection does not exist, then returns an empty BSON Object.
- */
-BSONObj getDefaultCollationForUnshardedCollection(const BSONObj collectionInfo) {
- if (collectionInfo.isEmpty()) {
- // Collection does not exist, return an empty object.
- return BSONObj();
- }
-
- BSONObj defaultCollation = CollationSpec::kSimpleSpec;
- if (collectionInfo["options"].type() == BSONType::Object) {
- BSONObj collectionOptions = collectionInfo["options"].Obj();
- BSONElement collationElement;
- auto status = bsonExtractTypedField(
- collectionOptions, "collation", BSONType::Object, &collationElement);
- if (status.isOK()) {
- defaultCollation = collationElement.Obj().getOwned();
- uassert(ErrorCodes::BadValue,
- "Default collation in collection metadata cannot be empty.",
- !defaultCollation.isEmpty());
- } else if (status != ErrorCodes::NoSuchKey) {
- uassertStatusOK(status);
- }
- }
- return defaultCollation;
-}
-
-/**
- * Populates the "collation" and "uuid" parameters with the following semantics:
- * - The "collation" parameter will be set to the default collation for the collection or the
- * simple collation if there is no default. If the collection does not exist or if the aggregate
- * is on the collectionless namespace, this will be set to an empty object.
- * - The "uuid" is retrieved from the chunk manager for sharded collections or the listCollections
- * output for unsharded collections. The UUID will remain unset if the aggregate is on the
- * collectionless namespace.
- */
-std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID(
- const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
- const NamespaceString& nss,
- const AggregationRequest& request,
- const LiteParsedPipeline& litePipe) {
- const bool collectionIsSharded = (routingInfo && routingInfo->cm());
- const bool collectionIsNotSharded = (routingInfo && !routingInfo->cm());
-
- // If this is a change stream or a collectionless aggregation, we immediately return the user-
- // defined collation if one exists, or an empty BSONObj otherwise. Change streams never inherit
- // the collection's default collation, and since collectionless aggregations generally run on
- // the 'admin' database, the standard logic would attempt to resolve its non-existent UUID and
- // collation by sending a specious 'listCollections' command to the config servers.
- if (litePipe.hasChangeStream() || nss.isCollectionlessAggregateNS()) {
- return {request.getCollation(), boost::none};
- }
-
- // If the collection is unsharded, obtain collInfo from the primary shard.
- const auto unshardedCollInfo = collectionIsNotSharded
- ? getUnshardedCollInfo(routingInfo->db().primary().get(), nss)
- : BSONObj();
-
- // Return the collection UUID if available, or boost::none otherwise.
- const auto getUUID = [&]() -> auto {
- if (collectionIsSharded) {
- return routingInfo->cm()->getUUID();
- } else {
- return unshardedCollInfo["info"] && unshardedCollInfo["info"]["uuid"]
- ? boost::optional<UUID>{uassertStatusOK(
- UUID::parse(unshardedCollInfo["info"]["uuid"]))}
- : boost::optional<UUID>{boost::none};
- }
- };
-
- // If the collection exists, return its default collation, or the simple
- // collation if no explicit default is present. If the collection does not
- // exist, return an empty BSONObj.
- const auto getCollation = [&]() -> auto {
- if (!collectionIsSharded && !collectionIsNotSharded) {
- return BSONObj();
- }
- if (collectionIsNotSharded) {
- return getDefaultCollationForUnshardedCollection(unshardedCollInfo);
- } else {
- return routingInfo->cm()->getDefaultCollator()
- ? routingInfo->cm()->getDefaultCollator()->getSpec().toBSON()
- : CollationSpec::kSimpleSpec;
- }
- };
-
- // If the user specified an explicit collation, we always adopt it. Otherwise,
- // obtain the collection default or simple collation as appropriate, and return
- // it along with the collection's UUID.
- return {request.getCollation().isEmpty() ? getCollation() : request.getCollation(), getUUID()};
-}
-
-ShardId pickMergingShard(OperationContext* opCtx,
- bool needsPrimaryShardMerge,
- const std::vector<ShardId>& targetedShards,
- ShardId primaryShard) {
- auto& prng = opCtx->getClient()->getPrng();
- // If we cannot merge on mongoS, establish the merge cursor on a shard. Perform the merging
- // command on random shard, unless the pipeline dictates that it needs to be run on the primary
- // shard for the database.
- return needsPrimaryShardMerge ? primaryShard
- : targetedShards[prng.nextInt32(targetedShards.size())];
-}
-
// "Resolve" involved namespaces into a map. We won't try to execute anything on a mongos, but we
// still have to populate this map so that any $lookups, etc. will be able to have a resolved view
// definition. It's okay that this is incorrect, we will repopulate the real namespace map on the
// mongod. Note that this function must be called before forwarding an aggregation command on an
// unsharded collection, in order to verify that the involved namespaces are allowed to be sharded.
-auto resolveInvolvedNamespaces(OperationContext* opCtx, const LiteParsedPipeline& litePipe) {
+auto resolveInvolvedNamespaces(stdx::unordered_set<NamespaceString> involvedNamespaces) {
StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
- for (auto&& nss : litePipe.getInvolvedNamespaces()) {
+ for (auto&& nss : involvedNamespaces) {
resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{});
}
return resolvedNamespaces;
}
// Build an appropriate ExpressionContext for the pipeline. This helper instantiates an appropriate
-// collator, creates a MongoProcessInterface for use by the pipeline's stages, and optionally
-// extracts the UUID from the collection info if present.
+// collator, creates a MongoProcessInterface for use by the pipeline's stages, and sets the
+// collection UUID if provided.
boost::intrusive_ptr<ExpressionContext> makeExpressionContext(
OperationContext* opCtx,
const AggregationRequest& request,
- const LiteParsedPipeline& litePipe,
BSONObj collationObj,
boost::optional<UUID> uuid,
StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces) {
@@ -592,127 +122,13 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(
std::move(resolvedNamespaces),
uuid);
+ // Keep the backing collation object on the context up to date with the resolved collator.
+ mergeCtx->collation = collationObj;
+
mergeCtx->inMongos = true;
return mergeCtx;
}
-// Runs a pipeline on mongoS, having first validated that it is eligible to do so. This can be a
-// pipeline which is split for merging, or an intact pipeline which must run entirely on mongoS.
-Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const ClusterAggregate::Namespaces& namespaces,
- const AggregationRequest& request,
- const LiteParsedPipeline& litePipe,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- BSONObjBuilder* result,
- const PrivilegeVector& privileges) {
- // We should never receive a pipeline which cannot run on mongoS.
- invariant(!expCtx->explain);
- invariant(pipeline->canRunOnMongos());
-
- const auto& requestedNss = namespaces.requestedNss;
- const auto opCtx = expCtx->opCtx;
-
- // Verify that the first stage can produce input for the remainder of the pipeline.
- uassert(ErrorCodes::IllegalOperation,
- str::stream() << "Aggregation pipeline must be run on mongoS, but "
- << pipeline->getSources().front()->getSourceName()
- << " is not capable of producing input",
- !pipeline->getSources().front()->constraints().requiresInputDocSource);
-
- // Register the new mongoS cursor, and retrieve the initial batch of results.
- auto cursorResponse = establishMergingMongosCursor(
- opCtx, request, requestedNss, litePipe, std::move(pipeline), privileges);
-
- // We don't need to storePossibleCursor or propagate writeConcern errors; a pipeline with
- // writing stages like $out can never run on mongoS. Filter the command response and return
- // immediately.
- CommandHelpers::filterCommandReplyForPassthrough(cursorResponse, result);
- return getStatusFromCommandResult(result->asTempObj());
-}
-
-Status dispatchMergingPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const ClusterAggregate::Namespaces& namespaces,
- const AggregationRequest& request,
- const LiteParsedPipeline& litePipe,
- const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
- sharded_agg_helpers::DispatchShardPipelineResults&& shardDispatchResults,
- BSONObjBuilder* result,
- const PrivilegeVector& privileges) {
- // We should never be in a situation where we call this function on a non-merge pipeline.
- invariant(shardDispatchResults.splitPipeline);
- auto* mergePipeline = shardDispatchResults.splitPipeline->mergePipeline.get();
- invariant(mergePipeline);
- auto* opCtx = expCtx->opCtx;
-
- std::vector<ShardId> targetedShards;
- targetedShards.reserve(shardDispatchResults.remoteCursors.size());
- for (auto&& remoteCursor : shardDispatchResults.remoteCursors) {
- targetedShards.emplace_back(remoteCursor->getShardId().toString());
- }
-
- cluster_aggregation_planner::addMergeCursorsSource(
- mergePipeline,
- litePipe,
- shardDispatchResults.commandForTargetedShards,
- std::move(shardDispatchResults.remoteCursors),
- targetedShards,
- shardDispatchResults.splitPipeline->shardCursorsSortSpec,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor());
-
- // First, check whether we can merge on the mongoS. If the merge pipeline MUST run on mongoS,
- // then ignore the internalQueryProhibitMergingOnMongoS parameter.
- if (mergePipeline->requiredToRunOnMongos() ||
- (!internalQueryProhibitMergingOnMongoS.load() && mergePipeline->canRunOnMongos())) {
- return runPipelineOnMongoS(expCtx,
- namespaces,
- request,
- litePipe,
- std::move(shardDispatchResults.splitPipeline->mergePipeline),
- result,
- privileges);
- }
-
- // If we are not merging on mongoS, then this is not a $changeStream aggregation, and we
- // therefore must have a valid routing table.
- invariant(routingInfo);
-
- const ShardId mergingShardId = pickMergingShard(opCtx,
- shardDispatchResults.needsPrimaryShardMerge,
- targetedShards,
- routingInfo->db().primaryId());
- const bool mergingShardContributesData =
- std::find(targetedShards.begin(), targetedShards.end(), mergingShardId) !=
- targetedShards.end();
-
- auto mergeCmdObj = createCommandForMergingShard(
- request, expCtx, mergingShardId, mergingShardContributesData, mergePipeline);
-
- // Dispatch $mergeCursors to the chosen shard, store the resulting cursor, and return.
- auto mergeResponse = establishMergingShardCursor(
- opCtx, namespaces.executionNss, request, mergeCmdObj, mergingShardId);
- uassertStatusOK(mergeResponse.swResponse);
-
- auto mergeCursorResponse = uassertStatusOK(
- storePossibleCursor(opCtx,
- mergingShardId,
- *mergeResponse.shardHostAndPort,
- mergeResponse.swResponse.getValue().data,
- namespaces.requestedNss,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- Grid::get(opCtx)->getCursorManager(),
- privileges,
- expCtx->tailableMode));
-
- // Ownership for the shard cursors has been transferred to the merging shard. Dismiss the
- // ownership in the current merging pipeline such that when it goes out of scope it does not
- // attempt to kill the cursors.
- auto mergeCursors = static_cast<DocumentSourceMergeCursors*>(mergePipeline->peekFront());
- mergeCursors->dismissCursorOwnership();
-
- return appendCursorResponseToCommandResult(mergingShardId, mergeCursorResponse, result);
-}
-
void appendEmptyResultSetWithStatus(OperationContext* opCtx,
const NamespaceString& nss,
Status status,
@@ -740,199 +156,105 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
<< ", " << AggregationRequest::kFromMongosName
<< "] cannot be set to 'true' when sent to mongos",
!request.needsMerge() && !request.isFromMongos());
- auto executionNsRoutingInfoStatus =
- sharded_agg_helpers::getExecutionNsRoutingInfo(opCtx, namespaces.executionNss);
- boost::optional<CachedCollectionRoutingInfo> routingInfo;
- LiteParsedPipeline litePipe(request);
+
const auto isSharded = [](OperationContext* opCtx, const NamespaceString& nss) -> bool {
const auto resolvedNsRoutingInfo =
uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss));
return resolvedNsRoutingInfo.cm().get();
};
- const bool involvesShardedCollections = litePipe.verifyIsSupported(
- opCtx, isSharded, request.getExplain(), serverGlobalParams.enableMajorityReadConcern);
- // If the routing table is valid, we obtain a reference to it. If the table is not valid, then
- // either the database does not exist, or there are no shards in the cluster. In the latter
- // case, we always return an empty cursor. In the former case, if the requested aggregation is a
- // $changeStream, we allow the operation to continue so that stream cursors can be established
- // on the given namespace before the database or collection is actually created. If the database
- // does not exist and this is not a $changeStream, then we return an empty cursor.
- if (executionNsRoutingInfoStatus.isOK()) {
- routingInfo = std::move(executionNsRoutingInfoStatus.getValue());
- } else if (!(litePipe.hasChangeStream() &&
- executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) {
- appendEmptyResultSetWithStatus(
- opCtx, namespaces.requestedNss, executionNsRoutingInfoStatus.getStatus(), result);
- return Status::OK();
- }
-
- // Determine whether this aggregation must be dispatched to all shards in the cluster.
- const bool mustRunOnAll =
- sharded_agg_helpers::mustRunOnAllShards(namespaces.executionNss, litePipe);
-
- // If we don't have a routing table, then this is a $changeStream which must run on all shards.
- invariant(routingInfo || (mustRunOnAll && litePipe.hasChangeStream()));
-
- auto resolvedNamespaces = resolveInvolvedNamespaces(opCtx, litePipe);
-
- // A pipeline is allowed to passthrough to the primary shard iff the following conditions are
- // met:
- //
- // 1. The namespace of the aggregate and any other involved namespaces are unsharded.
- // 2. Is allowed to be forwarded to shards.
- // 3. Does not need to run on all shards.
- // 4. Doesn't need transformation via DocumentSource::serialize().
- if (routingInfo && !routingInfo->cm() && !mustRunOnAll &&
- litePipe.allowedToPassthroughFromMongos() && !involvesShardedCollections) {
- const auto primaryShardId = routingInfo->db().primary()->getId();
- return aggPassthrough(
- opCtx, namespaces, routingInfo->db(), request, litePipe, privileges, result);
- }
-
- // Populate the collection UUID and the appropriate collation to use.
- auto collInfo = getCollationAndUUID(routingInfo, namespaces.executionNss, request, litePipe);
- BSONObj collationObj = collInfo.first;
- boost::optional<UUID> uuid = collInfo.second;
-
- // Build an ExpressionContext for the pipeline. This instantiates an appropriate collator,
- // resolves all involved namespaces, and creates a shared MongoProcessInterface for use by the
- // pipeline's stages.
- auto expCtx = makeExpressionContext(
- opCtx, request, litePipe, collationObj, uuid, std::move(resolvedNamespaces));
-
- // Parse and optimize the full pipeline.
- auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx));
- pipeline->optimizePipeline();
-
- // Check whether the entire pipeline must be run on mongoS.
- if (pipeline->requiredToRunOnMongos()) {
- // If this is an explain write the explain output and return.
- if (expCtx->explain) {
- *result << "splitPipeline" << BSONNULL << "mongos"
- << Document{{"host", getHostNameCachedAndPort()},
- {"stages", pipeline->writeExplainOps(*expCtx->explain)}};
- return Status::OK();
- }
-
- return runPipelineOnMongoS(
- expCtx, namespaces, request, litePipe, std::move(pipeline), result, privileges);
- }
+ LiteParsedPipeline litePipe(request);
+ litePipe.verifyIsSupported(
+ opCtx, isSharded, request.getExplain(), serverGlobalParams.enableMajorityReadConcern);
+ auto hasChangeStream = litePipe.hasChangeStream();
+ auto involvedNamespaces = litePipe.getInvolvedNamespaces();
+
+ const auto pipelineBuilder = [&](boost::optional<CachedCollectionRoutingInfo> routingInfo) {
+ // Populate the collection UUID and the appropriate collation to use.
+ auto [collationObj, uuid] = [&]() -> std::pair<BSONObj, boost::optional<UUID>> {
+ // If this is a change stream, take the user-defined collation if one exists, or an
+ // empty BSONObj otherwise. Change streams never inherit the collection's default
+ // collation, and since collectionless aggregations generally run on the 'admin'
+ // database, the standard logic would attempt to resolve its non-existent UUID and
+ // collation by sending a specious 'listCollections' command to the config servers.
+ if (hasChangeStream) {
+ return {request.getCollation(), boost::none};
+ }
- // If not, split the pipeline as necessary and dispatch to the relevant shards.
- auto shardDispatchResults = sharded_agg_helpers::dispatchShardPipeline(
- expCtx, namespaces.executionNss, request, litePipe, std::move(pipeline), collationObj);
+ return sharded_agg_helpers::getCollationAndUUID(
+ routingInfo, namespaces.executionNss, request.getCollation());
+ }();
- // If the operation is an explain, then we verify that it succeeded on all targeted shards,
- // write the results to the output builder, and return immediately.
- if (expCtx->explain) {
- return appendExplainResults(std::move(shardDispatchResults), expCtx, result);
- }
+ // Build an ExpressionContext for the pipeline. This instantiates an appropriate collator,
+ // resolves all involved namespaces, and creates a shared MongoProcessInterface for use by
+ // the pipeline's stages.
+ auto expCtx = makeExpressionContext(
+ opCtx, request, collationObj, uuid, resolveInvolvedNamespaces(involvedNamespaces));
- // If this isn't an explain, then we must have established cursors on at least one shard.
- invariant(shardDispatchResults.remoteCursors.size() > 0);
-
- // If we sent the entire pipeline to a single shard, store the remote cursor and return.
- if (!shardDispatchResults.splitPipeline) {
- invariant(shardDispatchResults.remoteCursors.size() == 1);
- auto&& remoteCursor = std::move(shardDispatchResults.remoteCursors.front());
- const auto shardId = remoteCursor->getShardId().toString();
- const auto reply = uassertStatusOK(storePossibleCursor(opCtx,
- namespaces.requestedNss,
- std::move(remoteCursor),
- privileges,
- expCtx->tailableMode));
- return appendCursorResponseToCommandResult(shardId, reply, result);
- }
+ // Parse and optimize the full pipeline.
+ auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx));
+ pipeline->optimizePipeline();
+ return pipeline;
+ };
- // If we have the exchange spec then dispatch all consumers.
- if (shardDispatchResults.exchangeSpec) {
- shardDispatchResults = dispatchExchangeConsumerPipeline(expCtx,
- namespaces.executionNss,
- request,
- litePipe,
- collationObj,
- &shardDispatchResults);
+ auto targetingStatus =
+ sharded_agg_helpers::AggregationTargeter::make(opCtx,
+ namespaces.executionNss,
+ pipelineBuilder,
+ involvedNamespaces,
+ hasChangeStream,
+ litePipe.allowedToPassthroughFromMongos());
+ if (!targetingStatus.isOK()) {
+ appendEmptyResultSetWithStatus(
+ opCtx, namespaces.requestedNss, targetingStatus.getStatus(), result);
+ return Status::OK();
}
- // If we reach here, we have a merge pipeline to dispatch.
- return dispatchMergingPipeline(expCtx,
- namespaces,
- request,
- litePipe,
- routingInfo,
- std::move(shardDispatchResults),
- result,
- privileges);
-}
+ auto targeter = std::move(targetingStatus.getValue());
+ switch (targeter.policy) {
+ case sharded_agg_helpers::AggregationTargeter::TargetingPolicy::kPassthrough: {
+ // A pipeline with $changeStream should never be allowed to passthrough.
+ invariant(!hasChangeStream);
+ return sharded_agg_helpers::runPipelineOnPrimaryShard(opCtx,
+ namespaces,
+ targeter.routingInfo->db(),
+ request.getExplain(),
+ request.serializeToCommandObj(),
+ privileges,
+ result);
+ }
-Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
- const Namespaces& namespaces,
- const CachedDatabaseInfo& dbInfo,
- const AggregationRequest& aggRequest,
- const LiteParsedPipeline& liteParsedPipeline,
- const PrivilegeVector& privileges,
- BSONObjBuilder* out) {
- // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
- // explain if necessary, and rewrites the result into a format safe to forward to shards.
- BSONObj cmdObj = CommandHelpers::filterCommandRequestForPassthrough(
- sharded_agg_helpers::createPassthroughCommandForShard(
- opCtx, aggRequest, boost::none, nullptr, BSONObj()));
-
- const auto shardId = dbInfo.primary()->getId();
- const auto cmdObjWithShardVersion = (shardId != ShardRegistry::kConfigServerShardId)
- ? appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED())
- : std::move(cmdObj);
-
- MultiStatementTransactionRequestsSender ars(
- opCtx,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- namespaces.executionNss.db().toString(),
- {{shardId, appendDbVersionIfPresent(cmdObjWithShardVersion, dbInfo)}},
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent);
- auto response = ars.next();
- invariant(ars.done());
-
- uassertStatusOK(response.swResponse);
- auto commandStatus = getStatusFromCommandResult(response.swResponse.getValue().data);
-
- if (ErrorCodes::isStaleShardVersionError(commandStatus.code())) {
- uassertStatusOK(commandStatus.withContext("command failed because of stale config"));
- } else if (ErrorCodes::isSnapshotError(commandStatus.code())) {
- uassertStatusOK(
- commandStatus.withContext("command failed because can not establish a snapshot"));
- }
+ case sharded_agg_helpers::AggregationTargeter::TargetingPolicy::kMongosRequired: {
+ auto expCtx = targeter.pipeline->getContext();
+ // If this is an explain write the explain output and return.
+ if (expCtx->explain) {
+ *result << "splitPipeline" << BSONNULL << "mongos"
+ << Document{
+ {"host", getHostNameCachedAndPort()},
+ {"stages", targeter.pipeline->writeExplainOps(*expCtx->explain)}};
+ return Status::OK();
+ }
- BSONObj result;
- if (aggRequest.getExplain()) {
- // If this was an explain, then we get back an explain result object rather than a cursor.
- result = response.swResponse.getValue().data;
- } else {
- auto tailMode = liteParsedPipeline.hasChangeStream()
- ? TailableModeEnum::kTailableAndAwaitData
- : TailableModeEnum::kNormal;
- result = uassertStatusOK(
- storePossibleCursor(opCtx,
- shardId,
- *response.shardHostAndPort,
- response.swResponse.getValue().data,
- namespaces.requestedNss,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- Grid::get(opCtx)->getCursorManager(),
- privileges,
- tailMode));
- }
+ return sharded_agg_helpers::runPipelineOnMongoS(namespaces,
+ request.getBatchSize(),
+ std::move(targeter.pipeline),
+ result,
+ privileges);
+ }
- // First append the properly constructed writeConcernError. It will then be skipped
- // in appendElementsUnique.
- if (auto wcErrorElem = result["writeConcernError"]) {
- appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *out);
+ case sharded_agg_helpers::AggregationTargeter::TargetingPolicy::kAnyShard: {
+ return sharded_agg_helpers::dispatchPipelineAndMerge(opCtx,
+ std::move(targeter),
+ request.serializeToCommandObj(),
+ request.getBatchSize(),
+ namespaces,
+ privileges,
+ result,
+ hasChangeStream);
+ }
}
- out->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(result));
-
- return getStatusFromCommandResult(out->asTempObj());
+ MONGO_UNREACHABLE;
}
Status ClusterAggregate::retryOnViewError(OperationContext* opCtx,
diff --git a/src/mongo/s/query/cluster_aggregate.h b/src/mongo/s/query/cluster_aggregate.h
index 630f4d987b1..9d3c0a90eba 100644
--- a/src/mongo/s/query/cluster_aggregate.h
+++ b/src/mongo/s/query/cluster_aggregate.h
@@ -104,15 +104,6 @@ public:
const PrivilegeVector& privileges,
BSONObjBuilder* result,
unsigned numberRetries = 0);
-
-private:
- static Status aggPassthrough(OperationContext*,
- const Namespaces&,
- const CachedDatabaseInfo&,
- const AggregationRequest&,
- const LiteParsedPipeline&,
- const PrivilegeVector& privileges,
- BSONObjBuilder* result);
};
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index bf8665894b8..d3c899755cb 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -479,12 +479,12 @@ SplitPipeline splitPipeline(std::unique_ptr<Pipeline, PipelineDeleter> pipeline)
}
void addMergeCursorsSource(Pipeline* mergePipeline,
- const LiteParsedPipeline& liteParsedPipeline,
BSONObj cmdSentToShards,
std::vector<OwnedRemoteCursor> ownedCursors,
const std::vector<ShardId>& targetedShards,
boost::optional<BSONObj> shardCursorsSortSpec,
- std::shared_ptr<executor::TaskExecutor> executor) {
+ std::shared_ptr<executor::TaskExecutor> executor,
+ bool hasChangeStream) {
auto* opCtx = mergePipeline->getContext()->opCtx;
AsyncResultsMergerParams armParams;
armParams.setSort(shardCursorsSortSpec);
@@ -524,7 +524,7 @@ void addMergeCursorsSource(Pipeline* mergePipeline,
auto mergeCursorsStage = DocumentSourceMergeCursors::create(
std::move(executor), std::move(armParams), mergePipeline->getContext());
- if (liteParsedPipeline.hasChangeStream()) {
+ if (hasChangeStream) {
mergePipeline->addInitialSource(DocumentSourceUpdateOnAddShard::create(
mergePipeline->getContext(),
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
diff --git a/src/mongo/s/query/cluster_aggregation_planner.h b/src/mongo/s/query/cluster_aggregation_planner.h
index 75baded9e31..3b8b70fcd1d 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.h
+++ b/src/mongo/s/query/cluster_aggregation_planner.h
@@ -81,12 +81,12 @@ SplitPipeline splitPipeline(std::unique_ptr<Pipeline, PipelineDeleter> pipeline)
* front of 'mergePipeline'.
*/
void addMergeCursorsSource(Pipeline* mergePipeline,
- const LiteParsedPipeline&,
BSONObj cmdSentToShards,
- std::vector<OwnedRemoteCursor> remoteCursors,
+ std::vector<OwnedRemoteCursor> ownedCursors,
const std::vector<ShardId>& targetedShards,
boost::optional<BSONObj> shardCursorsSortSpec,
- std::shared_ptr<executor::TaskExecutor> executor);
+ std::shared_ptr<executor::TaskExecutor> executor,
+ bool hasChangeStream);
/**
* Builds a ClusterClientCursor which will execute 'pipeline'. If 'pipeline' consists entirely of