From 13c2e614e05cb58753ee3a89a0fa9b14d0837a6d Mon Sep 17 00:00:00 2001 From: Nicholas Zolnierz Date: Tue, 8 Oct 2019 22:18:54 +0000 Subject: SERVER-42942 M/R Agg: Implement translation for cluster mapReduce command --- ...harding_last_stable_mongos_and_mixed_shards.yml | 1 + .../suites/sharding_map_reduce_agg.yaml | 12 +- .../aggregation/sharded_agg_cleanup_on_error.js | 8 +- ...hange_streams_establishment_finds_new_shards.js | 6 +- src/mongo/db/commands/SConscript | 6 +- src/mongo/db/commands/map_reduce_agg.cpp | 169 +--- src/mongo/db/commands/map_reduce_agg.h | 3 - src/mongo/db/commands/map_reduce_agg_test.cpp | 17 +- src/mongo/db/commands/map_reduce_command_base.h | 4 +- src/mongo/db/commands/mr.cpp | 19 +- src/mongo/db/commands/mr.h | 2 +- src/mongo/db/commands/mr_common.cpp | 197 +++- src/mongo/db/commands/mr_common.h | 21 +- src/mongo/db/commands/mr_test.cpp | 69 +- src/mongo/db/pipeline/aggregation_request.cpp | 1 - src/mongo/db/pipeline/lite_parsed_pipeline.cpp | 7 +- src/mongo/db/pipeline/lite_parsed_pipeline.h | 5 +- src/mongo/db/pipeline/mongos_process_interface.h | 28 - src/mongo/db/pipeline/sharded_agg_helpers.cpp | 1012 +++++++++++++++++--- src/mongo/db/pipeline/sharded_agg_helpers.h | 118 ++- src/mongo/s/commands/cluster_map_reduce.cpp | 37 + src/mongo/s/commands/cluster_map_reduce_agg.cpp | 197 +++- src/mongo/s/commands/cluster_map_reduce_cmd.cpp | 37 - src/mongo/s/query/cluster_aggregate.cpp | 856 ++--------------- src/mongo/s/query/cluster_aggregate.h | 9 - src/mongo/s/query/cluster_aggregation_planner.cpp | 6 +- src/mongo/s/query/cluster_aggregation_planner.h | 6 +- 27 files changed, 1520 insertions(+), 1333 deletions(-) diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index 81a68edf47b..61d8bfde017 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -49,6 +49,7 @@ selector: - jstests/sharding/track_unsharded_collections_rename_collection.js - jstests/sharding/banned_txn_databases_sharded.js - jstests/sharding/split_large_key.js + - jstests/sharding/change_streams_establishment_finds_new_shards.js # Enable if SERVER-41813 is backported or 4.4 becomes last-stable - jstests/sharding/invalid_system_views_sharded_collection.js diff --git a/buildscripts/resmokeconfig/suites/sharding_map_reduce_agg.yaml b/buildscripts/resmokeconfig/suites/sharding_map_reduce_agg.yaml index f126871e84d..4de6fde506a 100644 --- a/buildscripts/resmokeconfig/suites/sharding_map_reduce_agg.yaml +++ b/buildscripts/resmokeconfig/suites/sharding_map_reduce_agg.yaml @@ -4,8 +4,16 @@ test_kind: js_test selector: roots: - # Placeholder test to avoid failing in resmoke. - - jstests/sharding/accurate_count_with_predicate.js + - jstests/sharding/auth.js + - jstests/sharding/authCommands.js + - jstests/sharding/authmr.js + - jstests/sharding/causal_consistency_shell_support.js + - jstests/sharding/localhostAuthBypass.js + - jstests/sharding/max_time_ms_sharded.js + - jstests/sharding/mr_and_agg_versioning.js + - jstests/sharding/mr_shard_version.js + - jstests/sharding/query_config.js + - jstests/sharding/shard_targeting.js executor: config: diff --git a/jstests/aggregation/sharded_agg_cleanup_on_error.js b/jstests/aggregation/sharded_agg_cleanup_on_error.js index 5fba3e477a9..fb646819d06 100644 --- a/jstests/aggregation/sharded_agg_cleanup_on_error.js +++ b/jstests/aggregation/sharded_agg_cleanup_on_error.js @@ -88,7 +88,7 @@ try { try { // Enable the failpoint to fail on establishing a merging shard cursor. assert.commandWorked(mongosDB.adminCommand({ - configureFailPoint: "clusterAggregateFailToEstablishMergingShardCursor", + configureFailPoint: "shardedAggregateFailToEstablishMergingShardCursor", mode: "alwaysOn" })); @@ -103,13 +103,13 @@ try { } finally { assert.commandWorked(mongosDB.adminCommand( - {configureFailPoint: "clusterAggregateFailToEstablishMergingShardCursor", mode: "off"})); + {configureFailPoint: "shardedAggregateFailToEstablishMergingShardCursor", mode: "off"})); } // Test that aggregations involving $exchange correctly clean up the producer cursors. try { assert.commandWorked(mongosDB.adminCommand({ - configureFailPoint: "clusterAggregateFailToDispatchExchangeConsumerPipeline", + configureFailPoint: "shardedAggregateFailToDispatchExchangeConsumerPipeline", mode: "alwaysOn" })); @@ -133,7 +133,7 @@ try { } finally { assert.commandWorked(mongosDB.adminCommand({ - configureFailPoint: "clusterAggregateFailToDispatchExchangeConsumerPipeline", + configureFailPoint: "shardedAggregateFailToDispatchExchangeConsumerPipeline", mode: "off" })); } diff --git a/jstests/sharding/change_streams_establishment_finds_new_shards.js b/jstests/sharding/change_streams_establishment_finds_new_shards.js index 8f2393f99ee..6a232856bb7 100644 --- a/jstests/sharding/change_streams_establishment_finds_new_shards.js +++ b/jstests/sharding/change_streams_establishment_finds_new_shards.js @@ -40,13 +40,13 @@ assert.commandWorked(mongos.adminCommand({split: mongosColl.getFullName(), middl // Enable the failpoint. assert.commandWorked(mongos.adminCommand( - {configureFailPoint: "clusterAggregateHangBeforeEstablishingShardCursors", mode: "alwaysOn"})); + {configureFailPoint: "shardedAggregateHangBeforeEstablishingShardCursors", mode: "alwaysOn"})); // While opening the cursor, wait for the failpoint and add the new shard. const awaitNewShard = startParallelShell(` load("jstests/libs/check_log.js"); checkLog.contains(db, - "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled"); + "shardedAggregateHangBeforeEstablishingShardCursors fail point enabled"); assert.commandWorked( db.adminCommand({addShard: "${newShard.getURL()}", name: "${newShard.name}"})); // Migrate the [10, MaxKey] chunk to "newShard". @@ -56,7 +56,7 @@ const awaitNewShard = startParallelShell(` _waitForDelete: true})); assert.commandWorked( db.adminCommand( - {configureFailPoint: "clusterAggregateHangBeforeEstablishingShardCursors", + {configureFailPoint: "shardedAggregateHangBeforeEstablishingShardCursors", mode: "off"}));`, mongos.port); diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index fce6f57b45e..300595e2a4f 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -157,6 +157,7 @@ env.Library( '$BUILD_DIR/mongo/db/logical_session_cache', '$BUILD_DIR/mongo/db/logical_session_id_helpers', '$BUILD_DIR/mongo/db/logical_session_id', + '$BUILD_DIR/mongo/db/pipeline/pipeline', '$BUILD_DIR/mongo/db/repl/isself', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', '$BUILD_DIR/mongo/db/session_catalog', @@ -508,10 +509,10 @@ env.Library( 'map_reduce_agg.cpp', ], LIBDEPS=[ - '$BUILD_DIR/mongo/db/db_raii', '$BUILD_DIR/mongo/idl/idl_parser', + '$BUILD_DIR/mongo/db/commands/servers', + '$BUILD_DIR/mongo/db/db_raii', '$BUILD_DIR/mongo/db/pipeline/mongo_process_interface', - '$BUILD_DIR/mongo/db/pipeline/pipeline', '$BUILD_DIR/mongo/db/query/map_reduce_output_format', 'map_reduce_parser' ] @@ -541,6 +542,7 @@ env.CppUnitTest( "map_reduce_parse_test.cpp", ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/auth/authmocks', '$BUILD_DIR/mongo/db/query/query_test_service_context', 'map_reduce_agg', ] diff --git a/src/mongo/db/commands/map_reduce_agg.cpp b/src/mongo/db/commands/map_reduce_agg.cpp index aaffa84dc19..f1c94a9c339 100644 --- a/src/mongo/db/commands/map_reduce_agg.cpp +++ b/src/mongo/db/commands/map_reduce_agg.cpp @@ -41,155 +41,18 @@ #include "mongo/db/commands.h" #include "mongo/db/commands/map_reduce_agg.h" #include "mongo/db/commands/map_reduce_javascript_code.h" +#include "mongo/db/commands/mr_common.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/document_value/value.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/pipeline/document_source.h" -#include "mongo/db/pipeline/document_source_group.h" -#include "mongo/db/pipeline/document_source_limit.h" -#include "mongo/db/pipeline/document_source_match.h" -#include "mongo/db/pipeline/document_source_merge.h" -#include "mongo/db/pipeline/document_source_out.h" -#include "mongo/db/pipeline/document_source_project.h" -#include "mongo/db/pipeline/document_source_single_document_transformation.h" -#include "mongo/db/pipeline/document_source_sort.h" -#include "mongo/db/pipeline/document_source_unwind.h" #include "mongo/db/pipeline/expression.h" -#include "mongo/db/pipeline/expression_javascript.h" -#include "mongo/db/pipeline/parsed_aggregation_projection_node.h" -#include "mongo/db/pipeline/parsed_inclusion_projection.h" #include "mongo/db/pipeline/pipeline_d.h" #include "mongo/db/query/map_reduce_output_format.h" -#include "mongo/db/query/util/make_data_structure.h" -#include "mongo/util/intrusive_counter.h" namespace mongo::map_reduce_agg { namespace { -using namespace std::string_literals; - -auto translateSort(boost::intrusive_ptr expCtx, - const BSONObj& sort, - const boost::optional& limit) { - return DocumentSourceSort::create(expCtx, sort, limit.get_value_or(-1)); -} - -auto translateMap(boost::intrusive_ptr expCtx, std::string code) { - auto emitExpression = ExpressionInternalJsEmit::create( - expCtx, ExpressionFieldPath::parse(expCtx, "$$ROOT", expCtx->variablesParseState), code); - auto node = std::make_unique( - ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId}); - node->addExpressionForPath(FieldPath{"emits"s}, std::move(emitExpression)); - auto inclusion = std::unique_ptr{ - std::make_unique( - expCtx, - ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId}, - std::move(node))}; - return make_intrusive( - expCtx, std::move(inclusion), DocumentSourceProject::kStageName, false); -} - -auto translateReduce(boost::intrusive_ptr expCtx, std::string code) { - auto accumulatorArguments = ExpressionObject::create( - expCtx, - make_vector>>( - std::pair{"data"s, - ExpressionFieldPath::parse(expCtx, "$emits", expCtx->variablesParseState)}, - std::pair{"eval"s, ExpressionConstant::create(expCtx, Value{code})})); - auto jsReduce = AccumulationStatement{ - "value", - std::move(accumulatorArguments), - AccumulationStatement::getFactory(AccumulatorInternalJsReduce::kAccumulatorName)}; - auto groupExpr = ExpressionFieldPath::parse(expCtx, "$emits.k", expCtx->variablesParseState); - return DocumentSourceGroup::create(expCtx, - std::move(groupExpr), - make_vector(std::move(jsReduce)), - boost::none); -} - -auto translateFinalize(boost::intrusive_ptr expCtx, std::string code) { - auto jsExpression = ExpressionInternalJs::create( - expCtx, - ExpressionArray::create( - expCtx, - make_vector>( - ExpressionFieldPath::parse(expCtx, "$_id", expCtx->variablesParseState), - ExpressionFieldPath::parse(expCtx, "$value", expCtx->variablesParseState))), - code); - auto node = std::make_unique( - ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId}); - node->addExpressionForPath(FieldPath{"value"s}, std::move(jsExpression)); - auto inclusion = std::unique_ptr{ - std::make_unique( - expCtx, - ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId}, - std::move(node))}; - return make_intrusive( - expCtx, std::move(inclusion), DocumentSourceProject::kStageName, false); -} - -auto translateOutReplace(boost::intrusive_ptr expCtx, - const StringData inputDatabase, - NamespaceString targetNss) { - uassert(31278, - "MapReduce must output to the database belonging to its input collection - Input: "s + - inputDatabase + "Output: " + targetNss.db(), - inputDatabase == targetNss.db()); - return DocumentSourceOut::create(std::move(targetNss), expCtx); -} - -auto translateOutMerge(boost::intrusive_ptr expCtx, NamespaceString targetNss) { - return DocumentSourceMerge::create(targetNss, - expCtx, - MergeWhenMatchedModeEnum::kReplace, - MergeWhenNotMatchedModeEnum::kInsert, - boost::none, // Let variables - boost::none, // pipeline - std::set{FieldPath("_id"s)}, - boost::none); // targetCollectionVersion -} - -auto translateOutReduce(boost::intrusive_ptr expCtx, - NamespaceString targetNss, - std::string code) { - // Because of communication for sharding, $merge must hold on to a serializable BSON object - // at the moment so we reparse here. - auto reduceObj = BSON("args" << BSON_ARRAY("$value" - << "$$new.value") - << "eval" << code); - - auto finalProjectSpec = - BSON(DocumentSourceProject::kStageName - << BSON("value" << BSON(ExpressionInternalJs::kExpressionName << reduceObj))); - auto pipelineSpec = boost::make_optional(std::vector{finalProjectSpec}); - return DocumentSourceMerge::create(targetNss, - expCtx, - MergeWhenMatchedModeEnum::kPipeline, - MergeWhenNotMatchedModeEnum::kInsert, - boost::none, // Let variables - pipelineSpec, - std::set{FieldPath("_id"s)}, - boost::none); // targetCollectionVersion -} - -auto translateOut(boost::intrusive_ptr expCtx, - const OutputType outputType, - const StringData inputDatabase, - NamespaceString targetNss, - std::string reduceCode) { - switch (outputType) { - case OutputType::Replace: - return boost::make_optional(translateOutReplace(expCtx, inputDatabase, targetNss)); - case OutputType::Merge: - return boost::make_optional(translateOutMerge(expCtx, targetNss)); - case OutputType::Reduce: - return boost::make_optional(translateOutReduce(expCtx, targetNss, reduceCode)); - case OutputType::InMemory:; - } - return boost::optional>{}; -} - auto makeExpressionContext(OperationContext* opCtx, const MapReduce& parsedMr) { // AutoGetCollectionForReadCommand will throw if the sharding version for this connection is // out of date. @@ -247,7 +110,7 @@ bool runAggregationMapReduce(OperationContext* opCtx, auto parsedMr = MapReduce::parse(IDLParserErrorContext("MapReduce"), cmd); auto expCtx = makeExpressionContext(opCtx, parsedMr); auto runnablePipeline = [&]() { - auto pipeline = translateFromMR(parsedMr, expCtx); + auto pipeline = map_reduce_common::translateFromMR(parsedMr, expCtx); return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead( expCtx, pipeline.release()); }(); @@ -274,32 +137,4 @@ bool runAggregationMapReduce(OperationContext* opCtx, return true; } -std::unique_ptr translateFromMR( - MapReduce parsedMr, boost::intrusive_ptr expCtx) { - - // TODO: It would be good to figure out what kind of errors this would produce in the Status. - // It would be better not to produce something incomprehensible out of an internal translation. - return uassertStatusOK(Pipeline::create( - makeFlattenedList>( - parsedMr.getQuery().map( - [&](auto&& query) { return DocumentSourceMatch::create(query, expCtx); }), - parsedMr.getSort().map( - [&](auto&& sort) { return translateSort(expCtx, sort, parsedMr.getLimit()); }), - translateMap(expCtx, parsedMr.getMap().getCode()), - DocumentSourceUnwind::create(expCtx, "emits", false, boost::none), - translateReduce(expCtx, parsedMr.getReduce().getCode()), - parsedMr.getFinalize().map([&](auto&& finalize) { - return translateFinalize(expCtx, parsedMr.getFinalize()->getCode()); - }), - translateOut(expCtx, - parsedMr.getOutOptions().getOutputType(), - parsedMr.getNamespace().db(), - NamespaceString{parsedMr.getOutOptions().getDatabaseName() - ? *parsedMr.getOutOptions().getDatabaseName() - : parsedMr.getNamespace().db(), - parsedMr.getOutOptions().getCollectionName()}, - parsedMr.getReduce().getCode())), - expCtx)); -} - } // namespace mongo::map_reduce_agg diff --git a/src/mongo/db/commands/map_reduce_agg.h b/src/mongo/db/commands/map_reduce_agg.h index 573119c0393..7c5972a7de9 100644 --- a/src/mongo/db/commands/map_reduce_agg.h +++ b/src/mongo/db/commands/map_reduce_agg.h @@ -47,7 +47,4 @@ bool runAggregationMapReduce(OperationContext* opCtx, std::string& errmsg, BSONObjBuilder& result); -std::unique_ptr translateFromMR( - MapReduce parsedMr, boost::intrusive_ptr expCtx); - } // namespace mongo::map_reduce_agg diff --git a/src/mongo/db/commands/map_reduce_agg_test.cpp b/src/mongo/db/commands/map_reduce_agg_test.cpp index 17b5f2520ba..3e78b1cb4c4 100644 --- a/src/mongo/db/commands/map_reduce_agg_test.cpp +++ b/src/mongo/db/commands/map_reduce_agg_test.cpp @@ -35,6 +35,7 @@ #include "mongo/base/string_data.h" #include "mongo/bson/bsonobj.h" #include "mongo/db/commands/map_reduce_agg.h" +#include "mongo/db/commands/mr_common.h" #include "mongo/db/pipeline/document_source_group.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_merge.h" @@ -65,7 +66,7 @@ TEST(MapReduceAggTest, testBasicTranslate) { MapReduceJavascriptCode{reduceJavascript.toString()}, MapReduceOutOptions{boost::none, "", OutputType::InMemory, false}}; boost::intrusive_ptr expCtx(new ExpressionContextForTest(nss)); - auto pipeline = translateFromMR(mr, expCtx); + auto pipeline = map_reduce_common::translateFromMR(mr, expCtx); auto& sources = pipeline->getSources(); ASSERT_EQ(3u, sources.size()); auto iter = sources.begin(); @@ -82,7 +83,7 @@ TEST(MapReduceAggTest, testSortWithoutLimit) { MapReduceOutOptions{boost::none, "", OutputType::InMemory, false}}; mr.setSort(BSON("foo" << 1)); boost::intrusive_ptr expCtx(new ExpressionContextForTest(nss)); - auto pipeline = translateFromMR(mr, expCtx); + auto pipeline = map_reduce_common::translateFromMR(mr, expCtx); auto& sources = pipeline->getSources(); ASSERT_EQ(4u, sources.size()); auto iter = sources.begin(); @@ -103,7 +104,7 @@ TEST(MapReduceAggTest, testSortWithLimit) { mr.setSort(BSON("foo" << 1)); mr.setLimit(23); boost::intrusive_ptr expCtx(new ExpressionContextForTest(nss)); - auto pipeline = translateFromMR(mr, expCtx); + auto pipeline = map_reduce_common::translateFromMR(mr, expCtx); auto& sources = pipeline->getSources(); ASSERT_EQ(4u, sources.size()); auto iter = sources.begin(); @@ -127,7 +128,7 @@ TEST(MapReduceAggTest, testFeatureLadenTranslate) { << "fooval")); mr.setFinalize(boost::make_optional(MapReduceJavascriptCode{finalizeJavascript.toString()})); boost::intrusive_ptr expCtx(new ExpressionContextForTest(nss)); - auto pipeline = translateFromMR(mr, expCtx); + auto pipeline = map_reduce_common::translateFromMR(mr, expCtx); auto& sources = pipeline->getSources(); ASSERT_EQ(7u, sources.size()); auto iter = sources.begin(); @@ -148,7 +149,7 @@ TEST(MapReduceAggTest, testOutMergeTranslate) { MapReduceJavascriptCode{reduceJavascript.toString()}, MapReduceOutOptions{boost::make_optional("db"s), "coll2", OutputType::Merge, false}}; boost::intrusive_ptr expCtx(new ExpressionContextForTest(nss)); - auto pipeline = translateFromMR(mr, expCtx); + auto pipeline = map_reduce_common::translateFromMR(mr, expCtx); auto& sources = pipeline->getSources(); ASSERT_EQ(sources.size(), 4u); auto iter = sources.begin(); @@ -168,7 +169,7 @@ TEST(MapReduceAggTest, testOutReduceTranslate) { MapReduceJavascriptCode{reduceJavascript.toString()}, MapReduceOutOptions{boost::make_optional("db"s), "coll2", OutputType::Reduce, false}}; boost::intrusive_ptr expCtx(new ExpressionContextForTest(nss)); - auto pipeline = translateFromMR(mr, expCtx); + auto pipeline = map_reduce_common::translateFromMR(mr, expCtx); auto& sources = pipeline->getSources(); ASSERT_EQ(sources.size(), 4u); auto iter = sources.begin(); @@ -190,7 +191,7 @@ TEST(MapReduceAggTest, testOutDifferentDBFails) { MapReduceJavascriptCode{reduceJavascript.toString()}, MapReduceOutOptions{boost::make_optional("db2"s), "coll2", OutputType::Replace, false}}; boost::intrusive_ptr expCtx(new ExpressionContextForTest(nss)); - ASSERT_THROWS_CODE(translateFromMR(mr, expCtx), AssertionException, 31278); + ASSERT_THROWS_CODE(map_reduce_common::translateFromMR(mr, expCtx), AssertionException, 31278); } TEST(MapReduceAggTest, testOutSameCollection) { @@ -201,7 +202,7 @@ TEST(MapReduceAggTest, testOutSameCollection) { MapReduceJavascriptCode{reduceJavascript.toString()}, MapReduceOutOptions{boost::make_optional("db"s), "coll", OutputType::Replace, false}}; boost::intrusive_ptr expCtx(new ExpressionContextForTest(nss)); - auto pipeline = translateFromMR(mr, expCtx); + auto pipeline = map_reduce_common::translateFromMR(mr, expCtx); auto& sources = pipeline->getSources(); ASSERT_EQ(sources.size(), 4u); auto iter = sources.begin(); diff --git a/src/mongo/db/commands/map_reduce_command_base.h b/src/mongo/db/commands/map_reduce_command_base.h index 8b16b29f21f..4de4676e02d 100644 --- a/src/mongo/db/commands/map_reduce_command_base.h +++ b/src/mongo/db/commands/map_reduce_command_base.h @@ -45,7 +45,7 @@ public: } virtual bool supportsWriteConcern(const BSONObj& cmd) const override { - return mr::mrSupportsWriteConcern(cmd); + return map_reduce_common::mrSupportsWriteConcern(cmd); } bool allowsAfterClusterTime(const BSONObj& cmd) const override { @@ -62,7 +62,7 @@ public: virtual void addRequiredPrivileges(const std::string& dbname, const BSONObj& cmdObj, std::vector* out) const { - mr::addPrivilegesRequiredForMapReduce(this, dbname, cmdObj, out); + map_reduce_common::addPrivilegesRequiredForMapReduce(this, dbname, cmdObj, out); } bool errmsgRun(OperationContext* opCtx, diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index b9d90555d4e..55e6586784d 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -46,6 +46,7 @@ #include "mongo/db/client.h" #include "mongo/db/clientcursor.h" #include "mongo/db/commands.h" +#include "mongo/db/commands/map_reduce_gen.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbhelpers.h" @@ -425,17 +426,17 @@ Config::Config(const string& _dbname, const BSONObj& cmdObj) { uassert(13602, "outType is no longer a valid option", cmdObj["outType"].eoo()); - outputOptions = mr::parseOutputOptions(dbname, cmdObj); + outputOptions = map_reduce_common::parseOutputOptions(dbname, cmdObj); shardedFirstPass = false; if (cmdObj.hasField("shardedFirstPass") && cmdObj["shardedFirstPass"].trueValue()) { massert(16054, "shardedFirstPass should only use replace outType", - outputOptions.outType == mr::OutputType::kReplace); + outputOptions.outType == OutputType::Replace); shardedFirstPass = true; } - if (outputOptions.outType != mr::OutputType::kInMemory) { + if (outputOptions.outType != OutputType::InMemory) { // Create names for the temp collection and the incremental collection. The incremental // collection goes in the "local" database, so that it doesn't get replicated. const std::string& outDBName = outputOptions.outDB.empty() ? dbname : outputOptions.outDB; @@ -739,7 +740,7 @@ void State::appendResults(BSONObjBuilder& final) { * This may involve replacing, merging or reducing. */ long long State::postProcessCollection(OperationContext* opCtx, CurOp* curOp) { - if (_onDisk == false || _config.outputOptions.outType == mr::OutputType::kInMemory) + if (_onDisk == false || _config.outputOptions.outType == OutputType::InMemory) return numInMemKeys(); bool holdingGlobalLock = false; @@ -763,7 +764,7 @@ long long State::postProcessCollectionNonAtomic(OperationContext* opCtx, if (_config.outputOptions.finalNamespace == _config.tempNamespace) return collectionCount(opCtx, _config.outputOptions.finalNamespace, callerHoldsGlobalLock); - if (_config.outputOptions.outType == mr::OutputType::kReplace || + if (_config.outputOptions.outType == OutputType::Replace || collectionCount(opCtx, _config.outputOptions.finalNamespace, callerHoldsGlobalLock) == 0) { // This must be global because we may write across different databases. Lock::GlobalWrite lock(opCtx); @@ -780,7 +781,7 @@ long long State::postProcessCollectionNonAtomic(OperationContext* opCtx, } _db.dropCollection(_config.tempNamespace.ns()); - } else if (_config.outputOptions.outType == mr::OutputType::kMerge) { + } else if (_config.outputOptions.outType == OutputType::Merge) { // merge: upsert new docs into old collection const auto count = collectionCount(opCtx, _config.tempNamespace, callerHoldsGlobalLock); @@ -799,7 +800,7 @@ long long State::postProcessCollectionNonAtomic(OperationContext* opCtx, } _db.dropCollection(_config.tempNamespace.ns()); pm.finished(); - } else if (_config.outputOptions.outType == mr::OutputType::kReduce) { + } else if (_config.outputOptions.outType == OutputType::Reduce) { // reduce: apply reduce op on new result and existing one BSONList values; @@ -926,7 +927,7 @@ State::State(OperationContext* opCtx, const Config& c) _dupCount(0), _numEmits(0) { _temp.reset(new InMemory()); - _onDisk = _config.outputOptions.outType != mr::OutputType::kInMemory; + _onDisk = _config.outputOptions.outType != OutputType::InMemory; } bool State::sourceExists() { @@ -1747,7 +1748,7 @@ bool runMapReduceShardedFinish(OperationContext* opCtx, std::vector chunks; - if (config.outputOptions.outType != mr::OutputType::kInMemory) { + if (config.outputOptions.outType != OutputType::InMemory) { auto outRoutingInfoStatus = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo( opCtx, config.outputOptions.finalNamespace); uassertStatusOK(outRoutingInfoStatus.getStatus()); diff --git a/src/mongo/db/commands/mr.h b/src/mongo/db/commands/mr.h index d5e32d1f9b5..6ea15c30fee 100644 --- a/src/mongo/db/commands/mr.h +++ b/src/mongo/db/commands/mr.h @@ -224,7 +224,7 @@ public: NamespaceString incLong; NamespaceString tempNamespace; - mr::OutputOptions outputOptions; + map_reduce_common::OutputOptions outputOptions; // max number of keys allowed in JS map before switching mode long jsMaxKeys; diff --git a/src/mongo/db/commands/mr_common.cpp b/src/mongo/db/commands/mr_common.cpp index a13b323ea23..ea0dea5e218 100644 --- a/src/mongo/db/commands/mr_common.cpp +++ b/src/mongo/db/commands/mr_common.cpp @@ -40,15 +40,152 @@ #include "mongo/db/catalog/document_validation.h" #include "mongo/db/commands.h" #include "mongo/db/jsobj.h" +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_group.h" +#include "mongo/db/pipeline/document_source_limit.h" +#include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_merge.h" +#include "mongo/db/pipeline/document_source_out.h" +#include "mongo/db/pipeline/document_source_project.h" +#include "mongo/db/pipeline/document_source_single_document_transformation.h" +#include "mongo/db/pipeline/document_source_sort.h" +#include "mongo/db/pipeline/document_source_unwind.h" +#include "mongo/db/pipeline/expression_javascript.h" +#include "mongo/db/pipeline/parsed_aggregation_projection_node.h" +#include "mongo/db/pipeline/parsed_inclusion_projection.h" +#include "mongo/db/query/util/make_data_structure.h" +#include "mongo/util/intrusive_counter.h" #include "mongo/util/log.h" #include "mongo/util/str.h" -namespace mongo { - -namespace mr { +namespace mongo::map_reduce_common { namespace { Rarely nonAtomicDeprecationSampler; // Used to occasionally log deprecation messages. + +using namespace std::string_literals; + +auto translateSort(boost::intrusive_ptr expCtx, + const BSONObj& sort, + const boost::optional& limit) { + return DocumentSourceSort::create(expCtx, sort, limit.get_value_or(-1)); +} + +auto translateMap(boost::intrusive_ptr expCtx, std::string code) { + auto emitExpression = ExpressionInternalJsEmit::create( + expCtx, ExpressionFieldPath::parse(expCtx, "$$ROOT", expCtx->variablesParseState), code); + auto node = std::make_unique( + ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId}); + node->addExpressionForPath(FieldPath{"emits"s}, std::move(emitExpression)); + auto inclusion = std::unique_ptr{ + std::make_unique( + expCtx, + ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId}, + std::move(node))}; + return make_intrusive( + expCtx, std::move(inclusion), DocumentSourceProject::kStageName, false); +} + +auto translateReduce(boost::intrusive_ptr expCtx, std::string code) { + auto accumulatorArguments = ExpressionObject::create( + expCtx, + make_vector>>( + std::pair{"data"s, + ExpressionFieldPath::parse(expCtx, "$emits", expCtx->variablesParseState)}, + std::pair{"eval"s, ExpressionConstant::create(expCtx, Value{code})})); + auto jsReduce = AccumulationStatement{ + "value", + std::move(accumulatorArguments), + AccumulationStatement::getFactory(AccumulatorInternalJsReduce::kAccumulatorName)}; + auto groupExpr = ExpressionFieldPath::parse(expCtx, "$emits.k", expCtx->variablesParseState); + return DocumentSourceGroup::create(expCtx, + std::move(groupExpr), + make_vector(std::move(jsReduce)), + boost::none); +} + +auto translateFinalize(boost::intrusive_ptr expCtx, std::string code) { + auto jsExpression = ExpressionInternalJs::create( + expCtx, + ExpressionArray::create( + expCtx, + make_vector>( + ExpressionFieldPath::parse(expCtx, "$_id", expCtx->variablesParseState), + ExpressionFieldPath::parse(expCtx, "$value", expCtx->variablesParseState))), + code); + auto node = std::make_unique( + ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId}); + node->addExpressionForPath(FieldPath{"value"s}, std::move(jsExpression)); + auto inclusion = std::unique_ptr{ + std::make_unique( + expCtx, + ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId}, + std::move(node))}; + return make_intrusive( + expCtx, std::move(inclusion), DocumentSourceProject::kStageName, false); +} + +auto translateOutReplace(boost::intrusive_ptr expCtx, + const StringData inputDatabase, + NamespaceString targetNss) { + uassert(31278, + "MapReduce must output to the database belonging to its input collection - Input: "s + + inputDatabase + " Output: " + targetNss.db(), + inputDatabase == targetNss.db()); + return DocumentSourceOut::create(std::move(targetNss), expCtx); +} + +auto translateOutMerge(boost::intrusive_ptr expCtx, NamespaceString targetNss) { + return DocumentSourceMerge::create(targetNss, + expCtx, + MergeWhenMatchedModeEnum::kReplace, + MergeWhenNotMatchedModeEnum::kInsert, + boost::none, // Let variables + boost::none, // pipeline + std::set{FieldPath("_id"s)}, + boost::none); // targetCollectionVersion +} + +auto translateOutReduce(boost::intrusive_ptr expCtx, + NamespaceString targetNss, + std::string code) { + // Because of communication for sharding, $merge must hold on to a serializable BSON object + // at the moment so we reparse here. + auto reduceObj = BSON("args" << BSON_ARRAY("$value" + << "$$new.value") + << "eval" << code); + + auto finalProjectSpec = + BSON(DocumentSourceProject::kStageName + << BSON("value" << BSON(ExpressionInternalJs::kExpressionName << reduceObj))); + auto pipelineSpec = boost::make_optional(std::vector{finalProjectSpec}); + return DocumentSourceMerge::create(targetNss, + expCtx, + MergeWhenMatchedModeEnum::kPipeline, + MergeWhenNotMatchedModeEnum::kInsert, + boost::none, // Let variables + pipelineSpec, + std::set{FieldPath("_id"s)}, + boost::none); // targetCollectionVersion +} + +auto translateOut(boost::intrusive_ptr expCtx, + const OutputType outputType, + const StringData inputDatabase, + NamespaceString targetNss, + std::string reduceCode) { + switch (outputType) { + case OutputType::Replace: + return boost::make_optional(translateOutReplace(expCtx, inputDatabase, targetNss)); + case OutputType::Merge: + return boost::make_optional(translateOutMerge(expCtx, targetNss)); + case OutputType::Reduce: + return boost::make_optional(translateOutReduce(expCtx, targetNss, reduceCode)); + case OutputType::InMemory:; + } + return boost::optional>{}; +} + } // namespace OutputOptions parseOutputOptions(const std::string& dbname, const BSONObj& cmdObj) { @@ -57,24 +194,24 @@ OutputOptions parseOutputOptions(const std::string& dbname, const BSONObj& cmdOb outputOptions.outNonAtomic = false; if (cmdObj["out"].type() == String) { outputOptions.collectionName = cmdObj["out"].String(); - outputOptions.outType = OutputType::kReplace; + outputOptions.outType = OutputType::Replace; } else if (cmdObj["out"].type() == Object) { BSONObj o = cmdObj["out"].embeddedObject(); if (o.hasElement("normal")) { - outputOptions.outType = OutputType::kReplace; + outputOptions.outType = OutputType::Replace; outputOptions.collectionName = o["normal"].String(); } else if (o.hasElement("replace")) { - outputOptions.outType = OutputType::kReplace; + outputOptions.outType = OutputType::Replace; outputOptions.collectionName = o["replace"].String(); } else if (o.hasElement("merge")) { - outputOptions.outType = OutputType::kMerge; + outputOptions.outType = OutputType::Merge; outputOptions.collectionName = o["merge"].String(); } else if (o.hasElement("reduce")) { - outputOptions.outType = OutputType::kReduce; + outputOptions.outType = OutputType::Reduce; outputOptions.collectionName = o["reduce"].String(); } else if (o.hasElement("inline")) { - outputOptions.outType = OutputType::kInMemory; + outputOptions.outType = OutputType::InMemory; uassert(ErrorCodes::InvalidOptions, "cannot specify 'sharded' in combination with 'inline'", !o.hasElement("sharded")); @@ -96,8 +233,8 @@ OutputOptions parseOutputOptions(const std::string& dbname, const BSONObj& cmdOb if (outputOptions.outNonAtomic) { uassert(15895, "nonAtomic option cannot be used with this output type", - (outputOptions.outType == OutputType::kReduce || - outputOptions.outType == OutputType::kMerge)); + (outputOptions.outType == OutputType::Reduce || + outputOptions.outType == OutputType::Merge)); } else if (nonAtomicDeprecationSampler.tick()) { warning() << "Setting out.nonAtomic to false in MapReduce is deprecated."; } @@ -106,7 +243,7 @@ OutputOptions parseOutputOptions(const std::string& dbname, const BSONObj& cmdOb uasserted(13606, "'out' has to be a string or an object"); } - if (outputOptions.outType != OutputType::kInMemory) { + if (outputOptions.outType != OutputType::InMemory) { const StringData outDb(outputOptions.outDB.empty() ? dbname : outputOptions.outDB); const NamespaceString nss(outDb, outputOptions.collectionName); uassert(ErrorCodes::InvalidNamespace, @@ -130,10 +267,10 @@ void addPrivilegesRequiredForMapReduce(const BasicCommand* commandTemplate, inputResource.isExactNamespacePattern()); out->push_back(Privilege(inputResource, ActionType::find)); - if (outputOptions.outType != OutputType::kInMemory) { + if (outputOptions.outType != OutputType::InMemory) { ActionSet outputActions; outputActions.addAction(ActionType::insert); - if (outputOptions.outType == OutputType::kReplace) { + if (outputOptions.outType == OutputType::Replace) { outputActions.addAction(ActionType::remove); } else { outputActions.addAction(ActionType::update); @@ -163,5 +300,33 @@ bool mrSupportsWriteConcern(const BSONObj& cmd) { return true; } } -} // namespace mr -} // namespace mongo + +std::unique_ptr translateFromMR( + MapReduce parsedMr, boost::intrusive_ptr expCtx) { + + // TODO: It would be good to figure out what kind of errors this would produce in the Status. + // It would be better not to produce something incomprehensible out of an internal translation. + return uassertStatusOK(Pipeline::create( + makeFlattenedList>( + parsedMr.getQuery().map( + [&](auto&& query) { return DocumentSourceMatch::create(query, expCtx); }), + parsedMr.getSort().map( + [&](auto&& sort) { return translateSort(expCtx, sort, parsedMr.getLimit()); }), + translateMap(expCtx, parsedMr.getMap().getCode()), + DocumentSourceUnwind::create(expCtx, "emits", false, boost::none), + translateReduce(expCtx, parsedMr.getReduce().getCode()), + parsedMr.getFinalize().map([&](auto&& finalize) { + return translateFinalize(expCtx, parsedMr.getFinalize()->getCode()); + }), + translateOut(expCtx, + parsedMr.getOutOptions().getOutputType(), + parsedMr.getNamespace().db(), + NamespaceString{parsedMr.getOutOptions().getDatabaseName() + ? *parsedMr.getOutOptions().getDatabaseName() + : parsedMr.getNamespace().db(), + parsedMr.getOutOptions().getCollectionName()}, + parsedMr.getReduce().getCode())), + expCtx)); +} + +} // namespace mongo::map_reduce_common diff --git a/src/mongo/db/commands/mr_common.h b/src/mongo/db/commands/mr_common.h index aad419b94bd..c3e80c11874 100644 --- a/src/mongo/db/commands/mr_common.h +++ b/src/mongo/db/commands/mr_common.h @@ -33,19 +33,11 @@ #include #include "mongo/db/commands.h" +#include "mongo/db/commands/map_reduce_gen.h" #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" -namespace mongo { - -namespace mr { - -enum class OutputType { - kReplace, // Atomically replace the collection. - kMerge, // Merge keys, override dups. - kReduce, // Merge keys, reduce dups. - kInMemory // Only store in memory, limited in size. -}; +namespace mongo::map_reduce_common { struct OutputOptions { std::string outDB; @@ -68,6 +60,11 @@ void addPrivilegesRequiredForMapReduce(const BasicCommand* commandTemplate, */ bool mrSupportsWriteConcern(const BSONObj& cmd); +/** + * Accepts a parsed mapReduce command and returns the equivalent aggregation pipeline. Note that the + * returned pipeline does *not* contain a $cursor stage and thus is not runnable. + */ +std::unique_ptr translateFromMR( + MapReduce parsedMr, boost::intrusive_ptr expCtx); -} // namespace mr -} // namespace mongo +} // namespace mongo::map_reduce_common diff --git a/src/mongo/db/commands/mr_test.cpp b/src/mongo/db/commands/mr_test.cpp index 53b06339448..304edd39f83 100644 --- a/src/mongo/db/commands/mr_test.cpp +++ b/src/mongo/db/commands/mr_test.cpp @@ -41,6 +41,7 @@ #include "mongo/db/catalog/collection_options.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" +#include "mongo/db/commands/map_reduce_gen.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/json.h" #include "mongo/db/op_observer_noop.h" @@ -78,17 +79,17 @@ void _compareOutputOptionField(const std::string& dbname, } /** - * Returns string representation of mr::Config::OutputType + * Returns string representation of OutputType */ -std::string _getOutTypeString(mr::OutputType outType) { +std::string _getOutTypeString(OutputType outType) { switch (outType) { - case mr::OutputType::kReplace: + case OutputType::Replace: return "REPLACE"; - case mr::OutputType::kMerge: + case OutputType::Merge: return "MERGE"; - case mr::OutputType::kReduce: + case OutputType::Reduce: return "REDUCE"; - case mr::OutputType::kInMemory: + case OutputType::InMemory: return "INMEMORY"; } MONGO_UNREACHABLE; @@ -103,9 +104,10 @@ void _testConfigParseOutputOptions(const std::string& dbname, const std::string& expectedCollectionName, const std::string& expectedFinalNamespace, bool expectedOutNonAtomic, - mr::OutputType expectedOutType) { + OutputType expectedOutType) { const BSONObj cmdObj = fromjson(cmdObjStr); - mr::OutputOptions outputOptions = mr::parseOutputOptions(dbname, cmdObj); + map_reduce_common::OutputOptions outputOptions = + map_reduce_common::parseOutputOptions(dbname, cmdObj); _compareOutputOptionField(dbname, cmdObjStr, "outDb", outputOptions.outDB, expectedOutDb); _compareOutputOptionField( dbname, cmdObjStr, "collectionName", outputOptions.collectionName, expectedCollectionName); @@ -124,31 +126,34 @@ void _testConfigParseOutputOptions(const std::string& dbname, } /** - * Tests for mr::parseOutputOptions. + * Tests for map_reduce_common::parseOutputOptions. */ TEST(ConfigOutputOptionsTest, parseOutputOptions) { // Missing 'out' field. - ASSERT_THROWS(mr::parseOutputOptions("mydb", fromjson("{}")), AssertionException); + ASSERT_THROWS(map_reduce_common::parseOutputOptions("mydb", fromjson("{}")), + AssertionException); // 'out' must be either string or object. - ASSERT_THROWS(mr::parseOutputOptions("mydb", fromjson("{out: 99}")), AssertionException); + ASSERT_THROWS(map_reduce_common::parseOutputOptions("mydb", fromjson("{out: 99}")), + AssertionException); // 'out.nonAtomic' is not supported with normal, replace or inline. - ASSERT_THROWS( - mr::parseOutputOptions("mydb", fromjson("{out: {normal: 'mycoll', nonAtomic: true}}")), - AssertionException); - ASSERT_THROWS( - mr::parseOutputOptions("mydb", fromjson("{out: {replace: 'mycoll', nonAtomic: true}}")), - AssertionException); - ASSERT_THROWS( - mr::parseOutputOptions("mydb", fromjson("{out: {inline: 'mycoll', nonAtomic: true}}")), - AssertionException); + ASSERT_THROWS(map_reduce_common::parseOutputOptions( + "mydb", fromjson("{out: {normal: 'mycoll', nonAtomic: true}}")), + AssertionException); + ASSERT_THROWS(map_reduce_common::parseOutputOptions( + "mydb", fromjson("{out: {replace: 'mycoll', nonAtomic: true}}")), + AssertionException); + ASSERT_THROWS(map_reduce_common::parseOutputOptions( + "mydb", fromjson("{out: {inline: 'mycoll', nonAtomic: true}}")), + AssertionException); // Unknown output specifer. - ASSERT_THROWS(mr::parseOutputOptions("mydb", fromjson("{out: {no_such_out_type: 'mycoll'}}")), + ASSERT_THROWS(map_reduce_common::parseOutputOptions( + "mydb", fromjson("{out: {no_such_out_type: 'mycoll'}}")), AssertionException); // 'out' is string. _testConfigParseOutputOptions( - "mydb", "{out: 'mycoll'}", "", "mycoll", "mydb.mycoll", false, mr::OutputType::kReplace); + "mydb", "{out: 'mycoll'}", "", "mycoll", "mydb.mycoll", false, OutputType::Replace); // 'out' is object. _testConfigParseOutputOptions("mydb", "{out: {normal: 'mycoll'}}", @@ -156,7 +161,7 @@ TEST(ConfigOutputOptionsTest, parseOutputOptions) { "mycoll", "mydb.mycoll", false, - mr::OutputType::kReplace); + OutputType::Replace); // 'out.db' overrides dbname parameter _testConfigParseOutputOptions("mydb1", "{out: {replace: 'mycoll', db: 'mydb2'}}", @@ -164,7 +169,7 @@ TEST(ConfigOutputOptionsTest, parseOutputOptions) { "mycoll", "mydb2.mycoll", false, - mr::OutputType::kReplace); + OutputType::Replace); // 'out.nonAtomic' is supported with merge and reduce. _testConfigParseOutputOptions("mydb", "{out: {merge: 'mycoll', nonAtomic: true}}", @@ -172,14 +177,14 @@ TEST(ConfigOutputOptionsTest, parseOutputOptions) { "mycoll", "mydb.mycoll", true, - mr::OutputType::kMerge); + OutputType::Merge); _testConfigParseOutputOptions("mydb", "{out: {reduce: 'mycoll', nonAtomic: true}}", "", "mycoll", "mydb.mycoll", true, - mr::OutputType::kReduce); + OutputType::Reduce); // inline _testConfigParseOutputOptions("mydb1", "{out: {inline: 'mycoll', db: 'mydb2'}}", @@ -187,7 +192,7 @@ TEST(ConfigOutputOptionsTest, parseOutputOptions) { "", "", false, - mr::OutputType::kInMemory); + OutputType::InMemory); // Order should not matter in fields of 'out' object. _testConfigParseOutputOptions("mydb1", @@ -196,35 +201,35 @@ TEST(ConfigOutputOptionsTest, parseOutputOptions) { "mycoll", "mydb2.mycoll", false, - mr::OutputType::kReplace); + OutputType::Replace); _testConfigParseOutputOptions("mydb1", "{out: {db: 'mydb2', replace: 'mycoll'}}", "mydb2", "mycoll", "mydb2.mycoll", false, - mr::OutputType::kReplace); + OutputType::Replace); _testConfigParseOutputOptions("mydb1", "{out: {nonAtomic: true, merge: 'mycoll'}}", "", "mycoll", "mydb1.mycoll", true, - mr::OutputType::kMerge); + OutputType::Merge); _testConfigParseOutputOptions("mydb1", "{out: {nonAtomic: true, reduce: 'mycoll'}}", "", "mycoll", "mydb1.mycoll", true, - mr::OutputType::kReduce); + OutputType::Reduce); _testConfigParseOutputOptions("mydb1", "{out: {db: 'mydb2', inline: 'mycoll'}}", "mydb2", "", "", false, - mr::OutputType::kInMemory); + OutputType::InMemory); } TEST(ConfigTest, ParseCollation) { diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp index ec5c6af9a27..626867a499e 100644 --- a/src/mongo/db/pipeline/aggregation_request.cpp +++ b/src/mongo/db/pipeline/aggregation_request.cpp @@ -283,7 +283,6 @@ NamespaceString AggregationRequest::parseNs(const std::string& dbname, const BSO } Document AggregationRequest::serializeToCommandObj() const { - MutableDocument serialized; return Document{ {kCommandName, (_nss.isCollectionlessAggregateNS() ? Value(1) : Value(_nss.coll()))}, {kPipelineName, _pipeline}, diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.cpp b/src/mongo/db/pipeline/lite_parsed_pipeline.cpp index 0b9909c5ba4..6af52389e6a 100644 --- a/src/mongo/db/pipeline/lite_parsed_pipeline.cpp +++ b/src/mongo/db/pipeline/lite_parsed_pipeline.cpp @@ -73,7 +73,7 @@ void LiteParsedPipeline::assertSupportsMultiDocumentTransaction( } } -bool LiteParsedPipeline::verifyIsSupported( +void LiteParsedPipeline::verifyIsSupported( OperationContext* opCtx, const std::function isSharded, const boost::optional explain, @@ -85,14 +85,11 @@ bool LiteParsedPipeline::verifyIsSupported( // Verify litePipe can be run at the given read concern. assertSupportsReadConcern(opCtx, explain, enableMajorityReadConcern); // Verify that no involved namespace is sharded unless allowed by the pipeline. - auto sharded = false; for (const auto& nss : getInvolvedNamespaces()) { - sharded = isSharded(opCtx, nss); uassert(28769, str::stream() << nss.ns() << " cannot be sharded", - allowShardedForeignCollection(nss) || !sharded); + allowShardedForeignCollection(nss) || !isSharded(opCtx, nss)); } - return sharded; } } // namespace mongo diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h index f5578873621..21e309dbaba 100644 --- a/src/mongo/db/pipeline/lite_parsed_pipeline.h +++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h @@ -140,10 +140,9 @@ public: /** * Perform checks that verify that the LitePipe is valid. Note that this function must be called * before forwarding an aggregation command on an unsharded collection, in order to verify that - * the involved namespaces are allowed to be sharded. Returns true if any involved namespace is - * sharded. + * the involved namespaces are allowed to be sharded. */ - bool verifyIsSupported( + void verifyIsSupported( OperationContext* opCtx, const std::function isSharded, const boost::optional explain, diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h index 33e133adff2..fc6457fcc4b 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -45,34 +45,6 @@ namespace mongo { */ class MongoSInterface : public MongoProcessCommon { public: - static BSONObj createPassthroughCommandForShard(OperationContext* opCtx, - const AggregationRequest& request, - const boost::optional& shardId, - Pipeline* pipeline, - BSONObj collationObj); - - /** - * Appends information to the command sent to the shards which should be appended both if this - * is a passthrough sent to a single shard and if this is a split pipeline. - */ - static BSONObj genericTransformForShards(MutableDocument&& cmdForShards, - OperationContext* opCtx, - const boost::optional& shardId, - const AggregationRequest& request, - BSONObj collationObj); - - static BSONObj createCommandForTargetedShards( - OperationContext* opCtx, - const AggregationRequest& request, - const LiteParsedPipeline& litePipe, - const cluster_aggregation_planner::SplitPipeline& splitPipeline, - const BSONObj collationObj, - const boost::optional exchangeSpec, - bool needsMerge); - - static StatusWith getExecutionNsRoutingInfo( - OperationContext* opCtx, const NamespaceString& execNss); - MongoSInterface() = default; virtual ~MongoSInterface() = default; diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 3ef81811d71..a469b6d9807 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -32,20 +32,56 @@ #include "sharded_agg_helpers.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/client/connpool.h" +#include "mongo/db/auth/authorization_session.h" #include "mongo/db/curop.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_out.h" +#include "mongo/db/query/find_common.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/op_msg_rpc_impls.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/cluster_commands_helpers.h" +#include "mongo/s/multi_statement_transaction_requests_sender.h" +#include "mongo/s/query/cluster_aggregation_planner.h" +#include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/query/cluster_query_knobs_gen.h" #include "mongo/s/query/document_source_merge_cursors.h" +#include "mongo/s/query/store_possible_cursor.h" +#include "mongo/s/transaction_router.h" #include "mongo/util/fail_point.h" #include "mongo/util/log.h" -namespace mongo { -namespace sharded_agg_helpers { +namespace mongo::sharded_agg_helpers { -MONGO_FAIL_POINT_DEFINE(clusterAggregateHangBeforeEstablishingShardCursors); +using SplitPipeline = cluster_aggregation_planner::SplitPipeline; + +MONGO_FAIL_POINT_DEFINE(shardedAggregateHangBeforeEstablishingShardCursors); +MONGO_FAIL_POINT_DEFINE(shardedAggregateFailToEstablishMergingShardCursor); +MONGO_FAIL_POINT_DEFINE(shardedAggregateFailToDispatchExchangeConsumerPipeline); + +namespace { + +bool mustRunOnAllShards(const NamespaceString& nss, bool hasChangeStream) { + // The following aggregations must be routed to all shards: + // - Any collectionless aggregation, such as non-localOps $currentOp. + // - Any aggregation which begins with a $changeStream stage. + return nss.isCollectionlessAggregateNS() || hasChangeStream; +} + +Status appendCursorResponseToCommandResult(const ShardId& shardId, + const BSONObj cursorResponse, + BSONObjBuilder* result) { + // If a write error was encountered, append it to the output buffer first. + if (auto wcErrorElem = cursorResponse["writeConcernError"]) { + appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *result); + } + + // Pass the results from the remote shard into our command response. + result->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(cursorResponse)); + return getStatusFromCommandResult(result->asTempObj()); +} /** * Given a document representing an aggregation command such as @@ -73,24 +109,18 @@ Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity v return explainCommandBuilder.freeze(); } -BSONObj createPassthroughCommandForShard(OperationContext* opCtx, - const AggregationRequest& request, - const boost::optional& constants, - Pipeline* pipeline, - BSONObj collationObj) { - // Create the command for the shards. - MutableDocument targetedCmd(request.serializeToCommandObj()); - if (pipeline) { - targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize()); +Shard::RetryPolicy getDesiredRetryPolicy(OperationContext* opCtx) { + // The idempotent retry policy will retry even for writeConcern failures, so only set it if the + // pipeline does not support writeConcern. + if (!opCtx->getWriteConcern().usedDefault) { + return Shard::RetryPolicy::kNotIdempotent; } - - return genericTransformForShards( - std::move(targetedCmd), opCtx, request, constants, collationObj); + return Shard::RetryPolicy::kIdempotent; } BSONObj genericTransformForShards(MutableDocument&& cmdForShards, OperationContext* opCtx, - const AggregationRequest& request, + boost::optional explainVerbosity, const boost::optional& constants, BSONObj collationObj) { if (constants) { @@ -100,7 +130,7 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards, cmdForShards[AggregationRequest::kFromMongosName] = Value(true); // If this is a request for an aggregation explain, then we must wrap the aggregate inside an // explain command. - if (auto explainVerbosity = request.getExplain()) { + if (explainVerbosity) { cmdForShards.reset(wrapAggAsExplain(cmdForShards.freeze(), *explainVerbosity)); } @@ -120,6 +150,93 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards, return appendAllowImplicitCreate(cmdForShards.freeze().toBson(), false); } +std::vector establishShardCursors( + OperationContext* opCtx, + const NamespaceString& nss, + bool hasChangeStream, + boost::optional& routingInfo, + const std::set& shardIds, + const BSONObj& cmdObj, + const ReadPreferenceSetting& readPref) { + LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; + + const bool mustRunOnAll = mustRunOnAllShards(nss, hasChangeStream); + std::vector> requests; + + // If we don't need to run on all shards, then we should always have a valid routing table. + invariant(routingInfo || mustRunOnAll); + + if (mustRunOnAll) { + // The pipeline contains a stage which must be run on all shards. Skip versioning and + // enqueue the raw command objects. + for (const auto& shardId : shardIds) { + requests.emplace_back(shardId, cmdObj); + } + } else if (routingInfo->cm()) { + // The collection is sharded. Use the routing table to decide which shards to target + // based on the query and collation, and build versioned requests for them. + for (const auto& shardId : shardIds) { + auto versionedCmdObj = + appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId)); + requests.emplace_back(shardId, std::move(versionedCmdObj)); + } + } else { + // The collection is unsharded. Target only the primary shard for the database. + // Don't append shard version info when contacting the config servers. + const auto cmdObjWithShardVersion = !routingInfo->db().primary()->isConfig() + ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()) + : cmdObj; + requests.emplace_back(routingInfo->db().primaryId(), + appendDbVersionIfPresent(cmdObjWithShardVersion, routingInfo->db())); + } + + if (MONGO_unlikely(shardedAggregateHangBeforeEstablishingShardCursors.shouldFail())) { + log() << "shardedAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking " + "until fail point is disabled."; + while (MONGO_unlikely(shardedAggregateHangBeforeEstablishingShardCursors.shouldFail())) { + sleepsecs(1); + } + } + + return establishCursors(opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + nss, + readPref, + requests, + false /* do not allow partial results */, + getDesiredRetryPolicy(opCtx)); +} + +std::set getTargetedShards(OperationContext* opCtx, + bool mustRunOnAllShards, + const boost::optional& routingInfo, + const BSONObj shardQuery, + const BSONObj collation) { + if (mustRunOnAllShards) { + // The pipeline begins with a stage which must be run on all shards. + std::vector shardIds; + Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds); + return {std::make_move_iterator(shardIds.begin()), std::make_move_iterator(shardIds.end())}; + } + + // If we don't need to run on all shards, then we should always have a valid routing table. + invariant(routingInfo); + + return getTargetedShardsForQuery(opCtx, *routingInfo, shardQuery, collation); +} + +ShardId pickMergingShard(OperationContext* opCtx, + bool needsPrimaryShardMerge, + const std::vector& targetedShards, + ShardId primaryShard) { + auto& prng = opCtx->getClient()->getPrng(); + // If we cannot merge on mongoS, establish the merge cursor on a shard. Perform the merging + // command on random shard, unless the pipeline dictates that it needs to be run on the primary + // shard for the database. + return needsPrimaryShardMerge ? primaryShard + : targetedShards[prng.nextInt32(targetedShards.size())]; +} + StatusWith getExecutionNsRoutingInfo(OperationContext* opCtx, const NamespaceString& execNss) { // First, verify that there are shards present in the cluster. If not, then we return the @@ -139,32 +256,80 @@ StatusWith getExecutionNsRoutingInfo(OperationConte return getCollectionRoutingInfoForTxnCmd(opCtx, execNss); } -Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req) { - // The idempotent retry policy will retry even for writeConcern failures, so only set it if the - // pipeline does not support writeConcern. - if (req.getWriteConcern()) { - return Shard::RetryPolicy::kNotIdempotent; +Status appendExplainResults(sharded_agg_helpers::DispatchShardPipelineResults&& dispatchResults, + const boost::intrusive_ptr& mergeCtx, + BSONObjBuilder* result) { + if (dispatchResults.splitPipeline) { + auto* mergePipeline = dispatchResults.splitPipeline->mergePipeline.get(); + const char* mergeType = [&]() { + if (mergePipeline->canRunOnMongos()) { + return "mongos"; + } else if (dispatchResults.exchangeSpec) { + return "exchange"; + } else if (mergePipeline->needsPrimaryShardMerger()) { + return "primaryShard"; + } else { + return "anyShard"; + } + }(); + + *result << "mergeType" << mergeType; + + MutableDocument pipelinesDoc; + pipelinesDoc.addField("shardsPart", + Value(dispatchResults.splitPipeline->shardsPipeline->writeExplainOps( + *mergeCtx->explain))); + if (dispatchResults.exchangeSpec) { + BSONObjBuilder bob; + dispatchResults.exchangeSpec->exchangeSpec.serialize(&bob); + bob.append("consumerShards", dispatchResults.exchangeSpec->consumerShards); + pipelinesDoc.addField("exchange", Value(bob.obj())); + } + pipelinesDoc.addField("mergerPart", + Value(mergePipeline->writeExplainOps(*mergeCtx->explain))); + + *result << "splitPipeline" << pipelinesDoc.freeze(); + } else { + *result << "splitPipeline" << BSONNULL; } - return Shard::RetryPolicy::kIdempotent; -} -bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe) { - // The following aggregations must be routed to all shards: - // - Any collectionless aggregation, such as non-localOps $currentOp. - // - Any aggregation which begins with a $changeStream stage. - return nss.isCollectionlessAggregateNS() || litePipe.hasChangeStream(); + BSONObjBuilder shardExplains(result->subobjStart("shards")); + for (const auto& shardResult : dispatchResults.remoteExplainOutput) { + invariant(shardResult.shardHostAndPort); + + uassertStatusOK(shardResult.swResponse.getStatus()); + uassertStatusOK(getStatusFromCommandResult(shardResult.swResponse.getValue().data)); + + auto shardId = shardResult.shardId.toString(); + const auto& data = shardResult.swResponse.getValue().data; + BSONObjBuilder explain(shardExplains.subobjStart(shardId)); + explain << "host" << shardResult.shardHostAndPort->toString(); + if (auto stagesElement = data["stages"]) { + explain << "stages" << stagesElement; + } else { + auto queryPlannerElement = data["queryPlanner"]; + uassert(51157, + str::stream() << "Malformed explain response received from shard " << shardId + << ": " << data.toString(), + queryPlannerElement); + explain << "queryPlanner" << queryPlannerElement; + if (auto executionStatsElement = data["executionStats"]) { + explain << "executionStats" << executionStatsElement; + } + } + } + + return Status::OK(); } + BSONObj createCommandForTargetedShards( - OperationContext* opCtx, - const AggregationRequest& request, - const LiteParsedPipeline& litePipe, + const boost::intrusive_ptr& expCtx, + Document serializedCommand, const cluster_aggregation_planner::SplitPipeline& splitPipeline, - const BSONObj collationObj, const boost::optional exchangeSpec, - const boost::optional& constants, bool needsMerge) { // Create the command for the shards. - MutableDocument targetedCmd(request.serializeToCommandObj()); + MutableDocument targetedCmd(serializedCommand); // If we've parsed a pipeline on mongos, always override the pipeline, in case parsing it // has defaulted any arguments or otherwise changed the spec. For example, $listSessions may // have detected a logged in user and appended that user name to the $listSessions spec to @@ -193,8 +358,143 @@ BSONObj createCommandForTargetedShards( targetedCmd[AggregationRequest::kExchangeName] = exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value(); + return genericTransformForShards(std::move(targetedCmd), + expCtx->opCtx, + expCtx->explain, + expCtx->getRuntimeConstants(), + expCtx->collation); +} + +sharded_agg_helpers::DispatchShardPipelineResults dispatchExchangeConsumerPipeline( + const boost::intrusive_ptr& expCtx, + const NamespaceString& executionNss, + Document serializedCommand, + sharded_agg_helpers::DispatchShardPipelineResults* shardDispatchResults) { + auto opCtx = expCtx->opCtx; + + if (MONGO_unlikely(shardedAggregateFailToDispatchExchangeConsumerPipeline.shouldFail())) { + log() << "shardedAggregateFailToDispatchExchangeConsumerPipeline fail point enabled."; + uasserted(ErrorCodes::FailPointEnabled, + "Asserting on exhange consumer pipeline dispatch due to failpoint."); + } + + // For all consumers construct a request with appropriate cursor ids and send to shards. + std::vector> requests; + auto numConsumers = shardDispatchResults->exchangeSpec->consumerShards.size(); + std::vector consumerPipelines; + for (size_t idx = 0; idx < numConsumers; ++idx) { + // Pick this consumer's cursors from producers. + std::vector producers; + for (size_t p = 0; p < shardDispatchResults->numProducers; ++p) { + producers.emplace_back( + std::move(shardDispatchResults->remoteCursors[p * numConsumers + idx])); + } + + // Create a pipeline for a consumer and add the merging stage. + auto consumerPipeline = uassertStatusOK(Pipeline::create( + shardDispatchResults->splitPipeline->mergePipeline->getSources(), expCtx)); + + cluster_aggregation_planner::addMergeCursorsSource( + consumerPipeline.get(), + BSONObj(), + std::move(producers), + {}, + shardDispatchResults->splitPipeline->shardCursorsSortSpec, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + false); + + consumerPipelines.emplace_back(std::move(consumerPipeline), nullptr, boost::none); + + auto consumerCmdObj = createCommandForTargetedShards( + expCtx, serializedCommand, consumerPipelines.back(), boost::none, false); + + requests.emplace_back(shardDispatchResults->exchangeSpec->consumerShards[idx], + consumerCmdObj); + } + auto cursors = establishCursors(opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + executionNss, + ReadPreferenceSetting::get(opCtx), + requests, + false /* do not allow partial results */); + + // Convert remote cursors into a vector of "owned" cursors. + std::vector ownedCursors; + for (auto&& cursor : cursors) { + ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), executionNss)); + } + + // The merging pipeline is just a union of the results from each of the shards involved on the + // consumer side of the exchange. + auto mergePipeline = uassertStatusOK(Pipeline::create({}, expCtx)); + mergePipeline->setSplitState(Pipeline::SplitState::kSplitForMerge); + + SplitPipeline splitPipeline{nullptr, std::move(mergePipeline), boost::none}; + + // Relinquish ownership of the local consumer pipelines' cursors as each shard is now + // responsible for its own producer cursors. + for (const auto& pipeline : consumerPipelines) { + const auto& mergeCursors = + static_cast(pipeline.shardsPipeline->peekFront()); + mergeCursors->dismissCursorOwnership(); + } + return sharded_agg_helpers::DispatchShardPipelineResults{false, + std::move(ownedCursors), + {} /*TODO SERVER-36279*/, + std::move(splitPipeline), + nullptr, + BSONObj(), + numConsumers}; +} + +BSONObj createCommandForMergingShard(Document serializedCommand, + const boost::intrusive_ptr& mergeCtx, + const ShardId& shardId, + bool mergingShardContributesData, + const Pipeline* pipelineForMerging) { + MutableDocument mergeCmd(serializedCommand); + + mergeCmd["pipeline"] = Value(pipelineForMerging->serialize()); + mergeCmd[AggregationRequest::kFromMongosName] = Value(true); + + mergeCmd[AggregationRequest::kRuntimeConstants] = + Value(mergeCtx->getRuntimeConstants().toBSON()); + + // If the user didn't specify a collation already, make sure there's a collation attached to + // the merge command, since the merging shard may not have the collection metadata. + if (mergeCmd.peek()["collation"].missing()) { + mergeCmd["collation"] = mergeCtx->getCollator() + ? Value(mergeCtx->getCollator()->getSpec().toBSON()) + : Value(Document{CollationSpec::kSimpleSpec}); + } + + const auto txnRouter = TransactionRouter::get(mergeCtx->opCtx); + if (txnRouter && mergingShardContributesData) { + // Don't include a readConcern since we can only include read concerns on the _first_ + // command sent to a participant per transaction. Assuming the merging shard is a + // participant, it will already have received another 'aggregate' command earlier which + // contained a readConcern. + mergeCmd.remove("readConcern"); + } + + return appendAllowImplicitCreate(mergeCmd.freeze().toBson(), false); +} + +BSONObj createPassthroughCommandForShard( + OperationContext* opCtx, + Document serializedCommand, + boost::optional explainVerbosity, + const boost::optional& constants, + Pipeline* pipeline, + BSONObj collationObj) { + // Create the command for the shards. + MutableDocument targetedCmd(serializedCommand); + if (pipeline) { + targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize()); + } + return genericTransformForShards( - std::move(targetedCmd), opCtx, request, constants, collationObj); + std::move(targetedCmd), opCtx, explainVerbosity, constants, collationObj); } /** @@ -203,12 +503,11 @@ BSONObj createCommandForTargetedShards( * shard version is encountered, refreshes the routing table and tries again. */ DispatchShardPipelineResults dispatchShardPipeline( - const boost::intrusive_ptr& expCtx, - const NamespaceString& executionNss, - const AggregationRequest& aggRequest, - const LiteParsedPipeline& litePipe, - std::unique_ptr pipeline, - BSONObj collationObj) { + Document serializedCommand, + bool hasChangeStream, + std::unique_ptr pipeline) { + auto expCtx = pipeline->getContext(); + // The process is as follows: // - First, determine whether we need to target more than one shard. If so, we split the // pipeline; if not, we retain the existing pipeline. @@ -226,12 +525,11 @@ DispatchShardPipelineResults dispatchShardPipeline( const auto shardQuery = pipeline->getInitialQuery(); - auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss); + auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, expCtx->ns); // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue. // Otherwise, uassert on all exceptions here. - if (!(litePipe.hasChangeStream() && - executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) { + if (!(hasChangeStream && executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) { uassertStatusOK(executionNsRoutingInfoStatus); } @@ -240,9 +538,9 @@ DispatchShardPipelineResults dispatchShardPipeline( : boost::optional{}; // Determine whether we can run the entire aggregation on a single shard. - const bool mustRunOnAll = mustRunOnAllShards(executionNss, litePipe); + const bool mustRunOnAll = mustRunOnAllShards(expCtx->ns, hasChangeStream); std::set shardIds = getTargetedShards( - opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation()); + opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, expCtx->collation); // Don't need to split the pipeline if we are only targeting a single shard, unless: // - There is a stage that needs to be run on the primary shard and the single target shard @@ -268,16 +566,14 @@ DispatchShardPipelineResults dispatchShardPipeline( // Generate the command object for the targeted shards. BSONObj targetedCommand = splitPipeline - ? createCommandForTargetedShards(opCtx, - aggRequest, - litePipe, - *splitPipeline, - collationObj, - exchangeSpec, - expCtx->getRuntimeConstants(), - true) - : createPassthroughCommandForShard( - opCtx, aggRequest, expCtx->getRuntimeConstants(), pipeline.get(), collationObj); + ? createCommandForTargetedShards( + expCtx, serializedCommand, *splitPipeline, exchangeSpec, true) + : createPassthroughCommandForShard(expCtx->opCtx, + serializedCommand, + expCtx->explain, + expCtx->getRuntimeConstants(), + pipeline.get(), + expCtx->collation); // In order for a $changeStream to work reliably, we need the shard registry to be at least as // current as the logical time at which the pipeline was serialized to 'targetedCommand' above. @@ -286,14 +582,14 @@ DispatchShardPipelineResults dispatchShardPipeline( // may not have been forced to split if there was only one shard in the cluster when the command // began execution. If a shard was added since the earlier targeting logic ran, then refreshing // here may cause us to illegally target an unsplit pipeline to more than one shard. - if (litePipe.hasChangeStream()) { + if (hasChangeStream) { auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); if (!shardRegistry->reload(opCtx)) { shardRegistry->reload(opCtx); } // Rebuild the set of shards as the shard registry might have changed. shardIds = getTargetedShards( - opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation()); + opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, expCtx->collation); } // If there were no shards when we began execution, we wouldn't have run this aggregation in the @@ -310,7 +606,7 @@ DispatchShardPipelineResults dispatchShardPipeline( // should not participate in the shard version protocol. shardResults = scatterGatherUnversionedTargetAllShards(opCtx, - executionNss.db(), + expCtx->ns.db(), targetedCommand, ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kIdempotent); @@ -320,23 +616,22 @@ DispatchShardPipelineResults dispatchShardPipeline( invariant(executionNsRoutingInfo); shardResults = scatterGatherVersionedTargetByRoutingTable(opCtx, - executionNss.db(), - executionNss, + expCtx->ns.db(), + expCtx->ns, *executionNsRoutingInfo, targetedCommand, ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kIdempotent, shardQuery, - aggRequest.getCollation()); + expCtx->collation); } } else { cursors = establishShardCursors(opCtx, - executionNss, - litePipe, + expCtx->ns, + hasChangeStream, executionNsRoutingInfo, shardIds, targetedCommand, - aggRequest, ReadPreferenceSetting::get(opCtx)); invariant(cursors.size() % shardIds.size() == 0, str::stream() << "Number of cursors (" << cursors.size() @@ -346,7 +641,7 @@ DispatchShardPipelineResults dispatchShardPipeline( // Convert remote cursors into a vector of "owned" cursors. std::vector ownedCursors; for (auto&& cursor : cursors) { - ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), executionNss)); + ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), expCtx->ns)); } // Record the number of shards involved in the aggregation. If we are required to merge on @@ -366,82 +661,269 @@ DispatchShardPipelineResults dispatchShardPipeline( exchangeSpec}; } -std::set getTargetedShards(OperationContext* opCtx, - bool mustRunOnAllShards, - const boost::optional& routingInfo, - const BSONObj shardQuery, - const BSONObj collation) { - if (mustRunOnAllShards) { - // The pipeline begins with a stage which must be run on all shards. - std::vector shardIds; - Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds); - return {shardIds.begin(), shardIds.end()}; +AsyncRequestsSender::Response establishMergingShardCursor(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj mergeCmdObj, + const ShardId& mergingShardId) { + if (MONGO_unlikely(shardedAggregateFailToEstablishMergingShardCursor.shouldFail())) { + log() << "shardedAggregateFailToEstablishMergingShardCursor fail point enabled."; + uasserted(ErrorCodes::FailPointEnabled, + "Asserting on establishing merging shard cursor due to failpoint."); } - // If we don't need to run on all shards, then we should always have a valid routing table. + MultiStatementTransactionRequestsSender ars( + opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + nss.db().toString(), + {{mergingShardId, mergeCmdObj}}, + ReadPreferenceSetting::get(opCtx), + getDesiredRetryPolicy(opCtx)); + const auto response = ars.next(); + invariant(ars.done()); + return response; +} + +Status dispatchMergingPipeline( + const boost::intrusive_ptr& expCtx, + const ClusterAggregate::Namespaces& namespaces, + Document serializedCommand, + long long batchSize, + const boost::optional& routingInfo, + sharded_agg_helpers::DispatchShardPipelineResults&& shardDispatchResults, + BSONObjBuilder* result, + const PrivilegeVector& privileges, + bool hasChangeStream) { + // We should never be in a situation where we call this function on a non-merge pipeline. + invariant(shardDispatchResults.splitPipeline); + auto* mergePipeline = shardDispatchResults.splitPipeline->mergePipeline.get(); + invariant(mergePipeline); + auto* opCtx = expCtx->opCtx; + + std::vector targetedShards; + targetedShards.reserve(shardDispatchResults.remoteCursors.size()); + for (auto&& remoteCursor : shardDispatchResults.remoteCursors) { + targetedShards.emplace_back(remoteCursor->getShardId().toString()); + } + + cluster_aggregation_planner::addMergeCursorsSource( + mergePipeline, + shardDispatchResults.commandForTargetedShards, + std::move(shardDispatchResults.remoteCursors), + targetedShards, + shardDispatchResults.splitPipeline->shardCursorsSortSpec, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + hasChangeStream); + + // First, check whether we can merge on the mongoS. If the merge pipeline MUST run on mongoS, + // then ignore the internalQueryProhibitMergingOnMongoS parameter. + if (mergePipeline->requiredToRunOnMongos() || + (!internalQueryProhibitMergingOnMongoS.load() && mergePipeline->canRunOnMongos())) { + return runPipelineOnMongoS(namespaces, + batchSize, + std::move(shardDispatchResults.splitPipeline->mergePipeline), + result, + privileges); + } + + // If we are not merging on mongoS, then this is not a $changeStream aggregation, and we + // therefore must have a valid routing table. invariant(routingInfo); - return getTargetedShardsForQuery(opCtx, *routingInfo, shardQuery, collation); + const ShardId mergingShardId = pickMergingShard(opCtx, + shardDispatchResults.needsPrimaryShardMerge, + targetedShards, + routingInfo->db().primaryId()); + const bool mergingShardContributesData = + std::find(targetedShards.begin(), targetedShards.end(), mergingShardId) != + targetedShards.end(); + + auto mergeCmdObj = createCommandForMergingShard( + serializedCommand, expCtx, mergingShardId, mergingShardContributesData, mergePipeline); + + LOG(1) << "Dispatching merge pipeline " << redact(mergeCmdObj) << " to designated shard"; + + // Dispatch $mergeCursors to the chosen shard, store the resulting cursor, and return. + auto mergeResponse = + establishMergingShardCursor(opCtx, namespaces.executionNss, mergeCmdObj, mergingShardId); + uassertStatusOK(mergeResponse.swResponse); + + auto mergeCursorResponse = uassertStatusOK( + storePossibleCursor(opCtx, + mergingShardId, + *mergeResponse.shardHostAndPort, + mergeResponse.swResponse.getValue().data, + namespaces.requestedNss, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + Grid::get(opCtx)->getCursorManager(), + privileges, + expCtx->tailableMode)); + + // Ownership for the shard cursors has been transferred to the merging shard. Dismiss the + // ownership in the current merging pipeline such that when it goes out of scope it does not + // attempt to kill the cursors. + auto mergeCursors = static_cast(mergePipeline->peekFront()); + mergeCursors->dismissCursorOwnership(); + + return appendCursorResponseToCommandResult(mergingShardId, mergeCursorResponse, result); } -std::vector establishShardCursors( - OperationContext* opCtx, - const NamespaceString& nss, - const LiteParsedPipeline& litePipe, - boost::optional& routingInfo, - const std::set& shardIds, - const BSONObj& cmdObj, - const AggregationRequest& request, - const ReadPreferenceSetting& readPref) { - LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; +BSONObj establishMergingMongosCursor(OperationContext* opCtx, + long long batchSize, + const NamespaceString& requestedNss, + std::unique_ptr pipelineForMerging, + const PrivilegeVector& privileges) { - const bool mustRunOnAll = mustRunOnAllShards(nss, litePipe); - std::vector> requests; + ClusterClientCursorParams params(requestedNss, ReadPreferenceSetting::get(opCtx)); - // If we don't need to run on all shards, then we should always have a valid routing table. - invariant(routingInfo || mustRunOnAll); + params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned(); + params.tailableMode = pipelineForMerging->getContext()->tailableMode; + // A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch + // size we pass here is used for getMores, so do not specify a batch size if the initial request + // had a batch size of 0. + params.batchSize = batchSize == 0 ? boost::none : boost::make_optional(batchSize); + params.lsid = opCtx->getLogicalSessionId(); + params.txnNumber = opCtx->getTxnNumber(); + params.originatingPrivileges = privileges; - if (mustRunOnAll) { - // The pipeline contains a stage which must be run on all shards. Skip versioning and - // enqueue the raw command objects. - for (auto&& shardId : shardIds) { - requests.emplace_back(std::move(shardId), cmdObj); + if (TransactionRouter::get(opCtx)) { + params.isAutoCommit = false; + } + + auto ccc = cluster_aggregation_planner::buildClusterCursor( + opCtx, std::move(pipelineForMerging), std::move(params)); + + auto cursorState = ClusterCursorManager::CursorState::NotExhausted; + + rpc::OpMsgReplyBuilder replyBuilder; + CursorResponseBuilder::Options options; + options.isInitialResponse = true; + + CursorResponseBuilder responseBuilder(&replyBuilder, options); + bool stashedResult = false; + + for (long long objCount = 0; objCount < batchSize; ++objCount) { + ClusterQueryResult next; + try { + next = uassertStatusOK(ccc->next(RouterExecStage::ExecContext::kInitialFind)); + } catch (const ExceptionFor&) { + // This exception is thrown when a $changeStream stage encounters an event + // that invalidates the cursor. We should close the cursor and return without + // error. + cursorState = ClusterCursorManager::CursorState::Exhausted; + break; } - } else if (routingInfo->cm()) { - // The collection is sharded. Use the routing table to decide which shards to target - // based on the query and collation, and build versioned requests for them. - for (auto& shardId : shardIds) { - auto versionedCmdObj = - appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId)); - requests.emplace_back(std::move(shardId), std::move(versionedCmdObj)); + + // Check whether we have exhausted the pipeline's results. + if (next.isEOF()) { + // We reached end-of-stream. If the cursor is not tailable, then we mark it as + // exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even when + // we reach end-of-stream. However, if all the remote cursors are exhausted, there is no + // hope of returning data and thus we need to close the mongos cursor as well. + if (!ccc->isTailable() || ccc->remotesExhausted()) { + cursorState = ClusterCursorManager::CursorState::Exhausted; + } + break; } - } else { - // The collection is unsharded. Target only the primary shard for the database. - // Don't append shard version info when contacting the config servers. - const auto cmdObjWithShardVersion = !routingInfo->db().primary()->isConfig() - ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()) - : cmdObj; - requests.emplace_back(routingInfo->db().primaryId(), - appendDbVersionIfPresent(cmdObjWithShardVersion, routingInfo->db())); - } - if (MONGO_unlikely(clusterAggregateHangBeforeEstablishingShardCursors.shouldFail())) { - log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking " - "until fail point is disabled."; - while (MONGO_unlikely(clusterAggregateHangBeforeEstablishingShardCursors.shouldFail())) { - sleepsecs(1); + // If this result will fit into the current batch, add it. Otherwise, stash it in the cursor + // to be returned on the next getMore. + auto nextObj = *next.getResult(); + + if (!FindCommon::haveSpaceForNext(nextObj, objCount, responseBuilder.bytesUsed())) { + ccc->queueResult(nextObj); + stashedResult = true; + break; } + + // Set the postBatchResumeToken. For non-$changeStream aggregations, this will be empty. + responseBuilder.setPostBatchResumeToken(ccc->getPostBatchResumeToken()); + responseBuilder.append(nextObj); } - return establishCursors(opCtx, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - nss, - readPref, - requests, - false /* do not allow partial results */, - getDesiredRetryPolicy(request)); + // For empty batches, or in the case where the final result was added to the batch rather than + // being stashed, we update the PBRT here to ensure that it is the most recent available. + if (!stashedResult) { + responseBuilder.setPostBatchResumeToken(ccc->getPostBatchResumeToken()); + } + + ccc->detachFromOperationContext(); + + int nShards = ccc->getNumRemotes(); + CursorId clusterCursorId = 0; + + if (cursorState == ClusterCursorManager::CursorState::NotExhausted) { + auto authUsers = AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(); + clusterCursorId = uassertStatusOK(Grid::get(opCtx)->getCursorManager()->registerCursor( + opCtx, + ccc.releaseCursor(), + requestedNss, + ClusterCursorManager::CursorType::MultiTarget, + ClusterCursorManager::CursorLifetime::Mortal, + authUsers)); + } + + // Fill out the aggregation metrics in CurOp. + if (clusterCursorId > 0) { + CurOp::get(opCtx)->debug().cursorid = clusterCursorId; + } + CurOp::get(opCtx)->debug().nShards = std::max(CurOp::get(opCtx)->debug().nShards, nShards); + CurOp::get(opCtx)->debug().cursorExhausted = (clusterCursorId == 0); + CurOp::get(opCtx)->debug().nreturned = responseBuilder.numDocs(); + + responseBuilder.done(clusterCursorId, requestedNss.ns()); + + auto bodyBuilder = replyBuilder.getBodyBuilder(); + CommandHelpers::appendSimpleCommandStatus(bodyBuilder, true); + bodyBuilder.doneFast(); + + return replyBuilder.releaseBody(); } +/** + * Returns the output of the listCollections command filtered to the namespace 'nss'. + */ +BSONObj getUnshardedCollInfo(const Shard* primaryShard, const NamespaceString& nss) { + ScopedDbConnection conn(primaryShard->getConnString()); + std::list all = + conn->getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); + if (all.empty()) { + // Collection does not exist, return an empty object. + return BSONObj(); + } + return all.front(); +} + + +/** + * Returns the collection default collation or the simple collator if there is no default. If the + * collection does not exist, then returns an empty BSON Object. + */ +BSONObj getDefaultCollationForUnshardedCollection(const BSONObj collectionInfo) { + if (collectionInfo.isEmpty()) { + // Collection does not exist, return an empty object. + return BSONObj(); + } + + BSONObj defaultCollation = CollationSpec::kSimpleSpec; + if (collectionInfo["options"].type() == BSONType::Object) { + BSONObj collectionOptions = collectionInfo["options"].Obj(); + BSONElement collationElement; + auto status = bsonExtractTypedField( + collectionOptions, "collation", BSONType::Object, &collationElement); + if (status.isOK()) { + defaultCollation = collationElement.Obj().getOwned(); + uassert(ErrorCodes::BadValue, + "Default collation in collection metadata cannot be empty.", + !defaultCollation.isEmpty()); + } else if (status != ErrorCodes::NoSuchKey) { + uassertStatusOK(status); + } + } + return defaultCollation; +} + +} // namespace + std::unique_ptr targetShardsAndAddMergeCursors( const boost::intrusive_ptr& expCtx, Pipeline* ownedPipeline) { std::unique_ptr pipeline(ownedPipeline, @@ -466,8 +948,9 @@ std::unique_ptr targetShardsAndAddMergeCursors( AggregationRequest aggRequest(expCtx->ns, rawStages); LiteParsedPipeline liteParsedPipeline(aggRequest); + auto hasChangeStream = liteParsedPipeline.hasChangeStream(); auto shardDispatchResults = dispatchShardPipeline( - expCtx, expCtx->ns, aggRequest, liteParsedPipeline, std::move(pipeline), expCtx->collation); + aggRequest.serializeToCommandObj(), hasChangeStream, std::move(pipeline)); std::vector targetedShards; targetedShards.reserve(shardDispatchResults.remoteCursors.size()); @@ -488,14 +971,281 @@ std::unique_ptr targetShardsAndAddMergeCursors( cluster_aggregation_planner::addMergeCursorsSource( mergePipeline.get(), - liteParsedPipeline, shardDispatchResults.commandForTargetedShards, std::move(shardDispatchResults.remoteCursors), targetedShards, shardCursorsSortSpec, - Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor()); + Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), + hasChangeStream); return mergePipeline; } -} // namespace sharded_agg_helpers -} // namespace mongo + +StatusWith AggregationTargeter::make( + OperationContext* opCtx, + const NamespaceString& executionNss, + const std::function( + boost::optional)> buildPipelineFn, + stdx::unordered_set involvedNamespaces, + bool hasChangeStream, + bool allowedToPassthrough) { + + // Check if any of the involved collections are sharded. + bool involvesShardedCollections = [&]() { + for (const auto& nss : involvedNamespaces) { + const auto resolvedNsRoutingInfo = + uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); + if (resolvedNsRoutingInfo.cm()) { + return true; + } + } + return false; + }(); + + // Determine whether this aggregation must be dispatched to all shards in the cluster. + const bool mustRunOnAll = mustRunOnAllShards(executionNss, hasChangeStream); + + // If the routing table is valid, we obtain a reference to it. If the table is not valid, then + // either the database does not exist, or there are no shards in the cluster. In the latter + // case, we always return an empty cursor. In the former case, if the requested aggregation is a + // $changeStream, we allow the operation to continue so that stream cursors can be established + // on the given namespace before the database or collection is actually created. If the database + // does not exist and this is not a $changeStream, then we return an empty cursor. + boost::optional routingInfo; + auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss); + if (executionNsRoutingInfoStatus.isOK()) { + routingInfo = std::move(executionNsRoutingInfoStatus.getValue()); + } else if (!(hasChangeStream && + executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) { + return executionNsRoutingInfoStatus.getStatus(); + } + + // If we don't have a routing table, then this is a $changeStream which must run on all shards. + invariant(routingInfo || (mustRunOnAll && hasChangeStream)); + + // A pipeline is allowed to passthrough to the primary shard iff the following conditions are + // met: + // + // 1. The namespace of the aggregate and any other involved namespaces are unsharded. + // 2. Is allowed to be forwarded to shards. For example, $currentOp with localOps: true should + // run locally on mongos and cannot be forwarded to a shard. + // 3. Does not need to run on all shards. For example, a pipeline with a $changeStream or + // $currentOp. + // 4. Doesn't need transformation via DocumentSource::serialize(). For example, list sessions + // needs to include information about users that can only be deduced on mongos. + if (routingInfo && !routingInfo->cm() && !mustRunOnAll && allowedToPassthrough && + !involvesShardedCollections) { + return AggregationTargeter{TargetingPolicy::kPassthrough, nullptr, routingInfo}; + } else { + auto pipeline = buildPipelineFn(routingInfo); + auto policy = pipeline->requiredToRunOnMongos() ? TargetingPolicy::kMongosRequired + : TargetingPolicy::kAnyShard; + return AggregationTargeter{policy, std::move(pipeline), routingInfo}; + } +} + +Status runPipelineOnPrimaryShard(OperationContext* opCtx, + const ClusterAggregate::Namespaces& namespaces, + const CachedDatabaseInfo& dbInfo, + boost::optional explain, + Document serializedCommand, + const PrivilegeVector& privileges, + BSONObjBuilder* out) { + // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an + // explain if necessary, and rewrites the result into a format safe to forward to shards. + BSONObj cmdObj = + CommandHelpers::filterCommandRequestForPassthrough(createPassthroughCommandForShard( + opCtx, serializedCommand, explain, boost::none, nullptr, BSONObj())); + + const auto shardId = dbInfo.primary()->getId(); + const auto cmdObjWithShardVersion = (shardId != ShardRegistry::kConfigServerShardId) + ? appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED()) + : std::move(cmdObj); + + MultiStatementTransactionRequestsSender ars( + opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + namespaces.executionNss.db().toString(), + {{shardId, appendDbVersionIfPresent(cmdObjWithShardVersion, dbInfo)}}, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent); + auto response = ars.next(); + invariant(ars.done()); + + uassertStatusOK(response.swResponse); + auto commandStatus = getStatusFromCommandResult(response.swResponse.getValue().data); + + if (ErrorCodes::isStaleShardVersionError(commandStatus.code())) { + uassertStatusOK(commandStatus.withContext("command failed because of stale config")); + } else if (ErrorCodes::isSnapshotError(commandStatus.code())) { + uassertStatusOK( + commandStatus.withContext("command failed because can not establish a snapshot")); + } + + BSONObj result; + if (explain) { + // If this was an explain, then we get back an explain result object rather than a cursor. + result = response.swResponse.getValue().data; + } else { + result = uassertStatusOK( + storePossibleCursor(opCtx, + shardId, + *response.shardHostAndPort, + response.swResponse.getValue().data, + namespaces.requestedNss, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + Grid::get(opCtx)->getCursorManager(), + privileges, + TailableModeEnum::kNormal)); + } + + // First append the properly constructed writeConcernError. It will then be skipped + // in appendElementsUnique. + if (auto wcErrorElem = result["writeConcernError"]) { + appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *out); + } + + out->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(result)); + + return getStatusFromCommandResult(out->asTempObj()); +} + +Status runPipelineOnMongoS(const ClusterAggregate::Namespaces& namespaces, + long long batchSize, + std::unique_ptr pipeline, + BSONObjBuilder* result, + const PrivilegeVector& privileges) { + auto expCtx = pipeline->getContext(); + + // We should never receive a pipeline which cannot run on mongoS. + invariant(!expCtx->explain); + invariant(pipeline->canRunOnMongos()); + + // Verify that the first stage can produce input for the remainder of the pipeline. + uassert(ErrorCodes::IllegalOperation, + str::stream() << "Aggregation pipeline must be run on mongoS, but " + << pipeline->getSources().front()->getSourceName() + << " is not capable of producing input", + !pipeline->getSources().front()->constraints().requiresInputDocSource); + + // Register the new mongoS cursor, and retrieve the initial batch of results. + auto cursorResponse = establishMergingMongosCursor( + expCtx->opCtx, batchSize, namespaces.requestedNss, std::move(pipeline), privileges); + + // We don't need to storePossibleCursor or propagate writeConcern errors; a pipeline with + // writing stages like $out can never run on mongoS. Filter the command response and return + // immediately. + CommandHelpers::filterCommandReplyForPassthrough(cursorResponse, result); + return getStatusFromCommandResult(result->asTempObj()); +} + +Status dispatchPipelineAndMerge(OperationContext* opCtx, + AggregationTargeter targeter, + Document serializedCommand, + long long batchSize, + const ClusterAggregate::Namespaces& namespaces, + const PrivilegeVector& privileges, + BSONObjBuilder* result, + bool hasChangeStream) { + auto expCtx = targeter.pipeline->getContext(); + // If not, split the pipeline as necessary and dispatch to the relevant shards. + auto shardDispatchResults = + dispatchShardPipeline(serializedCommand, hasChangeStream, std::move(targeter.pipeline)); + + // If the operation is an explain, then we verify that it succeeded on all targeted + // shards, write the results to the output builder, and return immediately. + if (expCtx->explain) { + return appendExplainResults(std::move(shardDispatchResults), expCtx, result); + } + + // If this isn't an explain, then we must have established cursors on at least one + // shard. + invariant(shardDispatchResults.remoteCursors.size() > 0); + + // If we sent the entire pipeline to a single shard, store the remote cursor and return. + if (!shardDispatchResults.splitPipeline) { + invariant(shardDispatchResults.remoteCursors.size() == 1); + auto&& remoteCursor = std::move(shardDispatchResults.remoteCursors.front()); + const auto shardId = remoteCursor->getShardId().toString(); + const auto reply = uassertStatusOK(storePossibleCursor(opCtx, + namespaces.requestedNss, + std::move(remoteCursor), + privileges, + expCtx->tailableMode)); + return appendCursorResponseToCommandResult(shardId, reply, result); + } + + // If we have the exchange spec then dispatch all consumers. + if (shardDispatchResults.exchangeSpec) { + shardDispatchResults = dispatchExchangeConsumerPipeline( + expCtx, namespaces.executionNss, serializedCommand, &shardDispatchResults); + } + + // If we reach here, we have a merge pipeline to dispatch. + return dispatchMergingPipeline(expCtx, + namespaces, + serializedCommand, + batchSize, + targeter.routingInfo, + std::move(shardDispatchResults), + result, + privileges, + hasChangeStream); +} + +std::pair> getCollationAndUUID( + const boost::optional& routingInfo, + const NamespaceString& nss, + const BSONObj& collation) { + const bool collectionIsSharded = (routingInfo && routingInfo->cm()); + const bool collectionIsNotSharded = (routingInfo && !routingInfo->cm()); + + // If this is a collectionless aggregation, we immediately return the user- + // defined collation if one exists, or an empty BSONObj otherwise. Collectionless aggregations + // generally run on the 'admin' database, the standard logic would attempt to resolve its + // non-existent UUID and collation by sending a specious 'listCollections' command to the config + // servers. + if (nss.isCollectionlessAggregateNS()) { + return {collation, boost::none}; + } + + // If the collection is unsharded, obtain collInfo from the primary shard. + const auto unshardedCollInfo = collectionIsNotSharded + ? getUnshardedCollInfo(routingInfo->db().primary().get(), nss) + : BSONObj(); + + // Return the collection UUID if available, or boost::none otherwise. + const auto getUUID = [&]() -> auto { + if (collectionIsSharded) { + return routingInfo->cm()->getUUID(); + } else { + return unshardedCollInfo["info"] && unshardedCollInfo["info"]["uuid"] + ? boost::optional{uassertStatusOK( + UUID::parse(unshardedCollInfo["info"]["uuid"]))} + : boost::optional{boost::none}; + } + }; + + // If the collection exists, return its default collation, or the simple + // collation if no explicit default is present. If the collection does not + // exist, return an empty BSONObj. + const auto getCollation = [&]() -> auto { + if (!collectionIsSharded && !collectionIsNotSharded) { + return BSONObj(); + } + if (collectionIsNotSharded) { + return getDefaultCollationForUnshardedCollection(unshardedCollInfo); + } else { + return routingInfo->cm()->getDefaultCollator() + ? routingInfo->cm()->getDefaultCollator()->getSpec().toBSON() + : CollationSpec::kSimpleSpec; + } + }; + + // If the user specified an explicit collation, we always adopt it. Otherwise, + // obtain the collection default or simple collation as appropriate, and return + // it along with the collection's UUID. + return {collation.isEmpty() ? getCollation() : collation, getUUID()}; +} + +} // namespace mongo::sharded_agg_helpers diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h index 15e0dd51c2e..b8c25a42510 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.h +++ b/src/mongo/db/pipeline/sharded_agg_helpers.h @@ -32,6 +32,7 @@ #include "mongo/db/pipeline/pipeline.h" #include "mongo/s/async_requests_sender.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/query/cluster_aggregate.h" #include "mongo/s/query/cluster_aggregation_planner.h" namespace mongo { @@ -67,63 +68,78 @@ struct DispatchShardPipelineResults { boost::optional exchangeSpec; }; -Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req); +/** + * This structure contains information for targeting an aggregation pipeline in a sharded cluster. + */ +struct AggregationTargeter { + /** + * Populates and returns targeting info for an aggregation pipeline on the given namespace + * 'executionNss'. + */ + static StatusWith make( + OperationContext* opCtx, + const NamespaceString& executionNss, + const std::function( + boost::optional)> buildPipelineFn, + stdx::unordered_set involvedNamespaces, + bool hasChangeStream, + bool allowedToPassthrough); + + enum TargetingPolicy { + kPassthrough, + kMongosRequired, + kAnyShard, + } policy; + + std::unique_ptr pipeline; + boost::optional routingInfo; +}; + +Status runPipelineOnPrimaryShard(OperationContext* opCtx, + const ClusterAggregate::Namespaces& namespaces, + const CachedDatabaseInfo& dbInfo, + boost::optional explain, + Document serializedCommand, + const PrivilegeVector& privileges, + BSONObjBuilder* out); -bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe); +/** + * Runs a pipeline on mongoS, having first validated that it is eligible to do so. This can be a + * pipeline which is split for merging, or an intact pipeline which must run entirely on mongoS. + */ +Status runPipelineOnMongoS(const ClusterAggregate::Namespaces& namespaces, + long long batchSize, + std::unique_ptr pipeline, + BSONObjBuilder* result, + const PrivilegeVector& privileges); -StatusWith getExecutionNsRoutingInfo(OperationContext* opCtx, - const NamespaceString& execNss); +/** + * Dispatches the pipeline in 'targeter' to the shards that are involved, and merges the results if + * necessary on either mongos or a randomly designated shard. + */ +Status dispatchPipelineAndMerge(OperationContext* opCtx, + sharded_agg_helpers::AggregationTargeter targeter, + Document serializedCommand, + long long batchSize, + const ClusterAggregate::Namespaces& namespaces, + const PrivilegeVector& privileges, + BSONObjBuilder* result, + bool hasChangeStream); /** - * Targets shards for the pipeline and returns a struct with the remote cursors or results, and the - * pipeline that will need to be executed to merge the results from the remotes. If a stale shard - * version is encountered, refreshes the routing table and tries again. + * Returns the "collation" and "uuid" for the collection given by "nss" with the following + * semantics: + * - The "collation" parameter will be set to the default collation for the collection or the + * simple collation if there is no default. If the collection does not exist or if the aggregate + * is on the collectionless namespace, this will be set to an empty object. + * - The "uuid" is retrieved from the chunk manager for sharded collections or the listCollections + * output for unsharded collections. The UUID will remain unset if the aggregate is on the + * collectionless namespace. */ -DispatchShardPipelineResults dispatchShardPipeline( - const boost::intrusive_ptr& expCtx, - const NamespaceString& executionNss, - const AggregationRequest& aggRequest, - const LiteParsedPipeline& liteParsedPipeline, - std::unique_ptr pipeline, - BSONObj collationObj); - -std::set getTargetedShards(OperationContext* opCtx, - bool mustRunOnAllShards, - const boost::optional& routingInfo, - const BSONObj shardQuery, - const BSONObj collation); - -std::vector establishShardCursors( - OperationContext* opCtx, +std::pair> getCollationAndUUID( + const boost::optional& routingInfo, const NamespaceString& nss, - const LiteParsedPipeline& litePipe, - boost::optional& routingInfo, - const std::set& shardIds, - const BSONObj& cmdObj, - const AggregationRequest& request, - const ReadPreferenceSetting& readPref); - -BSONObj createCommandForTargetedShards( - OperationContext* opCtx, - const AggregationRequest& request, - const LiteParsedPipeline& litePipe, - const cluster_aggregation_planner::SplitPipeline& splitPipeline, - const BSONObj collationObj, - const boost::optional exchangeSpec, - const boost::optional& constants, - bool needsMerge); - -BSONObj createPassthroughCommandForShard(OperationContext* opCtx, - const AggregationRequest& request, - const boost::optional& constants, - Pipeline* pipeline, - BSONObj collationObj); - -BSONObj genericTransformForShards(MutableDocument&& cmdForShards, - OperationContext* opCtx, - const AggregationRequest& request, - const boost::optional& constants, - BSONObj collationObj); + const BSONObj& collation); /** * For a sharded collection, establishes remote cursors on each shard that may have results, and diff --git a/src/mongo/s/commands/cluster_map_reduce.cpp b/src/mongo/s/commands/cluster_map_reduce.cpp index 59b6e1680ae..9ae34eca3c5 100644 --- a/src/mongo/s/commands/cluster_map_reduce.cpp +++ b/src/mongo/s/commands/cluster_map_reduce.cpp @@ -201,6 +201,43 @@ BSONObj fixForShards(const BSONObj& orig, return appendAllowImplicitCreate(b.obj(), false); } +/** + * Outline for sharded map reduce for sharded output, $out replace: + * + * ============= mongos ============= + * 1. Send map reduce command to all relevant shards with some extra info like the value for + * the chunkSize and the name of the temporary output collection. + * + * ============= shard ============= + * 2. Does normal map reduce. + * + * 3. Calls splitVector on itself against the output collection and puts the results into the + * response object. + * + * ============= mongos ============= + * 4. If the output collection is *not* sharded, uses the information from splitVector to + * create a pre-split sharded collection. + * + * 5. Grabs the distributed lock for the final output collection. + * + * 6. Sends mapReduce.shardedfinish. + * + * ============= shard ============= + * 7. Extracts the list of shards from the mapReduce.shardedfinish and performs a broadcast + * query against all of them to obtain all documents that this shard owns. + * + * 8. Performs the reduce operation against every document from step #7 and outputs them to + * another temporary collection. Also keeps track of the BSONObject size of every "reduced" + * document for each chunk range. + * + * 9. Atomically drops the old output collection and renames the temporary collection to the + * output collection. + * + * ============= mongos ============= + * 10. Releases the distributed lock acquired at step #5. + * + * 11. Inspects the BSONObject size from step #8 and determines if it needs to split. + */ bool runMapReduce(OperationContext* opCtx, const std::string& dbname, const BSONObj& cmdObj, diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp index 9375d1c2176..2c2012c3af5 100644 --- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp @@ -31,56 +31,185 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/client/connpool.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/client.h" #include "mongo/db/commands.h" +#include "mongo/db/commands/map_reduce_agg.h" #include "mongo/db/commands/map_reduce_gen.h" +#include "mongo/db/commands/mr_common.h" +#include "mongo/db/pipeline/mongos_process_interface.h" +#include "mongo/db/pipeline/sharded_agg_helpers.h" +#include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/getmore_request.h" +#include "mongo/db/query/map_reduce_output_format.h" +#include "mongo/s/catalog_cache.h" #include "mongo/s/commands/cluster_map_reduce_agg.h" namespace mongo { +namespace { -// Exhaust the cursor from the aggregation response and extract results and statistics. -std::vector getAllAggregationResults(OperationContext* opCtx, - const std::string& dbname, - CursorResponse& response) { - CursorId cursorId = response.getCursorId(); - auto fullBatch = response.releaseBatch(); - while (cursorId != 0) { - GetMoreRequest request( - response.getNSS(), cursorId, boost::none, boost::none, boost::none, boost::none); - BSONObj getMoreResponse = CommandHelpers::runCommandDirectly( - opCtx, OpMsgRequest::fromDBAndBody(dbname, request.toBSON())); - auto getMoreCursorResponse = CursorResponse::parseFromBSONThrowing(getMoreResponse); - auto nextBatch = getMoreCursorResponse.releaseBatch(); - fullBatch.insert(fullBatch.end(), nextBatch.begin(), nextBatch.end()); - cursorId = getMoreCursorResponse.getCursorId(); +auto makeExpressionContext(OperationContext* opCtx, + const MapReduce& parsedMr, + boost::optional routingInfo) { + // Populate the collection UUID and the appropriate collation to use. + auto nss = parsedMr.getNamespace(); + auto [collationObj, uuid] = sharded_agg_helpers::getCollationAndUUID( + routingInfo, nss, parsedMr.getCollation().get_value_or(BSONObj())); + + std::unique_ptr resolvedCollator; + if (!collationObj.isEmpty()) { + // This will be null if attempting to build an interface for the simple collator. + resolvedCollator = uassertStatusOK( + CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collationObj)); + } + + // Resolve involved namespaces. + StringMap resolvedNamespaces; + resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector{}); + if (parsedMr.getOutOptions().getOutputType() != OutputType::InMemory) { + auto outNss = NamespaceString{parsedMr.getOutOptions().getDatabaseName() + ? *parsedMr.getOutOptions().getDatabaseName() + : parsedMr.getNamespace().db(), + parsedMr.getOutOptions().getCollectionName()}; + resolvedNamespaces.try_emplace(outNss.coll(), outNss, std::vector{}); } - return fullBatch; + + auto expCtx = make_intrusive( + opCtx, + boost::none, // explain + false, // fromMongos + false, // needsmerge + true, // allowDiskUse + parsedMr.getBypassDocumentValidation().get_value_or(false), + nss, + collationObj, + boost::none, // runtimeConstants + std::move(resolvedCollator), + std::make_shared(), + std::move(resolvedNamespaces), + boost::none); // uuid + expCtx->inMongos = true; + return expCtx; } +Document serializeToCommand(BSONObj originalCmd, const MapReduce& parsedMr, Pipeline* pipeline) { + MutableDocument translatedCmd; + + translatedCmd["aggregate"] = Value(parsedMr.getNamespace().coll()); + translatedCmd["pipeline"] = Value(pipeline->serialize()); + translatedCmd["cursor"] = Value(Document{{"batchSize", std::numeric_limits::max()}}); + translatedCmd["allowDiskUse"] = Value(true); + translatedCmd["fromMongos"] = Value(true); + + // Append generic command options. + for (const auto& elem : CommandHelpers::appendPassthroughFields(originalCmd, BSONObj())) { + translatedCmd[elem.fieldNameStringData()] = Value(elem); + } + return translatedCmd.freeze(); +} + +} // namespace + bool runAggregationMapReduce(OperationContext* opCtx, const std::string& dbname, const BSONObj& cmd, std::string& errmsg, BSONObjBuilder& result) { - // Pretend we have built the appropriate pipeline and aggregation request. - auto mrRequest = MapReduce::parse(IDLParserErrorContext("MapReduce"), cmd); - const BSONObj aggRequest = - fromjson(str::stream() << "{aggregate: '" << mrRequest.getNamespace().coll() - << "', pipeline: [ { $group: { _id: { user: \"$user\" }," - << "count: { $sum: 1 } } } ], cursor: {}}"); - BSONObj aggResult = CommandHelpers::runCommandDirectly( - opCtx, OpMsgRequest::fromDBAndBody(dbname, std::move(aggRequest))); - - bool inMemory = mrRequest.getOutOptions().getOutputType() == OutputType::InMemory; - std::string outColl = mrRequest.getOutOptions().getCollectionName(); - // Either inline response specified or we have an output collection. - invariant(inMemory ^ !outColl.empty()); - - auto cursorResponse = CursorResponse::parseFromBSONThrowing(aggResult); - auto completeBatch = getAllAggregationResults(opCtx, dbname, cursorResponse); - [[maybe_unused]] CursorResponse completeCursor( - cursorResponse.getNSS(), cursorResponse.getCursorId(), std::move(completeBatch)); + auto parsedMr = MapReduce::parse(IDLParserErrorContext("MapReduce"), cmd); + stdx::unordered_set involvedNamespaces{parsedMr.getNamespace()}; + auto resolvedOutNss = NamespaceString{parsedMr.getOutOptions().getDatabaseName() + ? *parsedMr.getOutOptions().getDatabaseName() + : parsedMr.getNamespace().db(), + parsedMr.getOutOptions().getCollectionName()}; + + if (parsedMr.getOutOptions().getOutputType() != OutputType::InMemory) { + involvedNamespaces.insert(resolvedOutNss); + } + + const auto pipelineBuilder = [&](boost::optional routingInfo) { + return map_reduce_common::translateFromMR( + parsedMr, makeExpressionContext(opCtx, parsedMr, routingInfo)); + }; + + auto namespaces = + ClusterAggregate::Namespaces{parsedMr.getNamespace(), parsedMr.getNamespace()}; + + // Auth has already been checked for the original mapReduce command, no need to recheck here. + PrivilegeVector privileges; + + // This holds the raw results from the aggregation, which will be reformatted to match the + // expected mapReduce output. + BSONObjBuilder tempResults; + + auto targeter = uassertStatusOK( + sharded_agg_helpers::AggregationTargeter::make(opCtx, + parsedMr.getNamespace(), + pipelineBuilder, + involvedNamespaces, + false, // hasChangeStream + true)); // allowedToPassthrough + switch (targeter.policy) { + case sharded_agg_helpers::AggregationTargeter::TargetingPolicy::kPassthrough: { + // For the passthrough case, the targeter will not build a pipeline since its not needed + // in the normal aggregation path. For this translation, though, we need to build the + // pipeline to serialize and send to the primary shard. + auto serialized = + serializeToCommand(cmd, parsedMr, pipelineBuilder(targeter.routingInfo).get()); + uassertStatusOK( + sharded_agg_helpers::runPipelineOnPrimaryShard(opCtx, + namespaces, + targeter.routingInfo->db(), + boost::none, // explain + std::move(serialized), + privileges, + &tempResults)); + break; + } + + case sharded_agg_helpers::AggregationTargeter::TargetingPolicy::kMongosRequired: { + // Pipelines generated from mapReduce should never be required to run on mongos. + uasserted(31291, "Internal error during mapReduce translation"); + break; + } + + case sharded_agg_helpers::AggregationTargeter::TargetingPolicy::kAnyShard: { + auto serialized = serializeToCommand(cmd, parsedMr, targeter.pipeline.get()); + uassertStatusOK( + sharded_agg_helpers::dispatchPipelineAndMerge(opCtx, + std::move(targeter), + std::move(serialized), + std::numeric_limits::max(), + namespaces, + privileges, + &tempResults, + false)); // hasChangeStream + break; + } + } + + auto aggResults = tempResults.done(); + if (parsedMr.getOutOptions().getOutputType() == OutputType::InMemory) { + auto exhaustedResults = [&]() { + BSONArrayBuilder bab; + for (auto&& elem : aggResults["cursor"]["firstBatch"].Obj()) + bab.append(elem.embeddedObject()); + return bab.arr(); + }(); + map_reduce_output_format::appendInlineResponse(std::move(exhaustedResults), + parsedMr.getVerbose().get_value_or(false), + true, // inMongos + &result); + } else { + map_reduce_output_format::appendOutResponse( + parsedMr.getOutOptions().getDatabaseName(), + parsedMr.getOutOptions().getCollectionName(), + boost::get_optional_value_or(parsedMr.getVerbose(), false), + true, // inMongos + &result); + } return true; } diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp index 0c40f522e1b..c0abf8efd87 100644 --- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp @@ -41,43 +41,6 @@ namespace mongo { namespace { -/** - * Outline for sharded map reduce for sharded output, $out replace: - * - * ============= mongos ============= - * 1. Send map reduce command to all relevant shards with some extra info like the value for - * the chunkSize and the name of the temporary output collection. - * - * ============= shard ============= - * 2. Does normal map reduce. - * - * 3. Calls splitVector on itself against the output collection and puts the results into the - * response object. - * - * ============= mongos ============= - * 4. If the output collection is *not* sharded, uses the information from splitVector to - * create a pre-split sharded collection. - * - * 5. Grabs the distributed lock for the final output collection. - * - * 6. Sends mapReduce.shardedfinish. - * - * ============= shard ============= - * 7. Extracts the list of shards from the mapReduce.shardedfinish and performs a broadcast - * query against all of them to obtain all documents that this shard owns. - * - * 8. Performs the reduce operation against every document from step #7 and outputs them to - * another temporary collection. Also keeps track of the BSONObject size of every "reduced" - * document for each chunk range. - * - * 9. Atomically drops the old output collection and renames the temporary collection to the - * output collection. - * - * ============= mongos ============= - * 10. Releases the distributed lock acquired at step #5. - * - * 11. Inspects the BSONObject size from step #8 and determines if it needs to split. - */ class ClusterMapReduceCommand : public MapReduceCommandBase { public: ClusterMapReduceCommand() = default; diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 953f4c48d05..9de374b9202 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -35,7 +35,6 @@ #include -#include "mongo/bson/util/bson_extract.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" @@ -75,503 +74,34 @@ #include "mongo/s/query/store_possible_cursor.h" #include "mongo/s/stale_exception.h" #include "mongo/s/transaction_router.h" -#include "mongo/util/fail_point.h" #include "mongo/util/log.h" #include "mongo/util/net/socket_utils.h" namespace mongo { -using SplitPipeline = cluster_aggregation_planner::SplitPipeline; - -MONGO_FAIL_POINT_DEFINE(clusterAggregateFailToEstablishMergingShardCursor); -MONGO_FAIL_POINT_DEFINE(clusterAggregateFailToDispatchExchangeConsumerPipeline); - constexpr unsigned ClusterAggregate::kMaxViewRetries; namespace { -Status appendCursorResponseToCommandResult(const ShardId& shardId, - const BSONObj cursorResponse, - BSONObjBuilder* result) { - // If a write error was encountered, append it to the output buffer first. - if (auto wcErrorElem = cursorResponse["writeConcernError"]) { - appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *result); - } - - // Pass the results from the remote shard into our command response. - result->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(cursorResponse)); - return getStatusFromCommandResult(result->asTempObj()); -} - -BSONObj createCommandForMergingShard(const AggregationRequest& request, - const boost::intrusive_ptr& mergeCtx, - const ShardId& shardId, - bool mergingShardContributesData, - const Pipeline* pipelineForMerging) { - MutableDocument mergeCmd(request.serializeToCommandObj()); - - mergeCmd["pipeline"] = Value(pipelineForMerging->serialize()); - mergeCmd[AggregationRequest::kFromMongosName] = Value(true); - - mergeCmd[AggregationRequest::kRuntimeConstants] = - Value(mergeCtx->getRuntimeConstants().toBSON()); - - // If the user didn't specify a collation already, make sure there's a collation attached to - // the merge command, since the merging shard may not have the collection metadata. - if (mergeCmd.peek()["collation"].missing()) { - mergeCmd["collation"] = mergeCtx->getCollator() - ? Value(mergeCtx->getCollator()->getSpec().toBSON()) - : Value(Document{CollationSpec::kSimpleSpec}); - } - - const auto txnRouter = TransactionRouter::get(mergeCtx->opCtx); - if (txnRouter && mergingShardContributesData) { - // Don't include a readConcern since we can only include read concerns on the _first_ - // command sent to a participant per transaction. Assuming the merging shard is a - // participant, it will already have received another 'aggregate' command earlier which - // contained a readConcern. - - mergeCmd.remove("readConcern"); - } - - return appendAllowImplicitCreate(mergeCmd.freeze().toBson(), false); -} - -sharded_agg_helpers::DispatchShardPipelineResults dispatchExchangeConsumerPipeline( - const boost::intrusive_ptr& expCtx, - const NamespaceString& executionNss, - const AggregationRequest& request, - const LiteParsedPipeline& litePipe, - BSONObj collationObj, - sharded_agg_helpers::DispatchShardPipelineResults* shardDispatchResults) { - invariant(!litePipe.hasChangeStream()); - auto opCtx = expCtx->opCtx; - - if (MONGO_unlikely(clusterAggregateFailToDispatchExchangeConsumerPipeline.shouldFail())) { - log() << "clusterAggregateFailToDispatchExchangeConsumerPipeline fail point enabled."; - uasserted(ErrorCodes::FailPointEnabled, - "Asserting on exhange consumer pipeline dispatch due to failpoint."); - } - - // For all consumers construct a request with appropriate cursor ids and send to shards. - std::vector> requests; - auto numConsumers = shardDispatchResults->exchangeSpec->consumerShards.size(); - std::vector consumerPipelines; - for (size_t idx = 0; idx < numConsumers; ++idx) { - // Pick this consumer's cursors from producers. - std::vector producers; - for (size_t p = 0; p < shardDispatchResults->numProducers; ++p) { - producers.emplace_back( - std::move(shardDispatchResults->remoteCursors[p * numConsumers + idx])); - } - - // Create a pipeline for a consumer and add the merging stage. - auto consumerPipeline = uassertStatusOK(Pipeline::create( - shardDispatchResults->splitPipeline->mergePipeline->getSources(), expCtx)); - - cluster_aggregation_planner::addMergeCursorsSource( - consumerPipeline.get(), - litePipe, - BSONObj(), - std::move(producers), - {}, - shardDispatchResults->splitPipeline->shardCursorsSortSpec, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor()); - - consumerPipelines.emplace_back(std::move(consumerPipeline), nullptr, boost::none); - - auto consumerCmdObj = - sharded_agg_helpers::createCommandForTargetedShards(opCtx, - request, - litePipe, - consumerPipelines.back(), - collationObj, - boost::none, - expCtx->getRuntimeConstants(), - false); - - requests.emplace_back(shardDispatchResults->exchangeSpec->consumerShards[idx], - consumerCmdObj); - } - auto cursors = establishCursors(opCtx, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - executionNss, - ReadPreferenceSetting::get(opCtx), - requests, - false /* do not allow partial results */); - - // Convert remote cursors into a vector of "owned" cursors. - std::vector ownedCursors; - for (auto&& cursor : cursors) { - ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), executionNss)); - } - - // The merging pipeline is just a union of the results from each of the shards involved on the - // consumer side of the exchange. - auto mergePipeline = uassertStatusOK(Pipeline::create({}, expCtx)); - mergePipeline->setSplitState(Pipeline::SplitState::kSplitForMerge); - - SplitPipeline splitPipeline{nullptr, std::move(mergePipeline), boost::none}; - - // Relinquish ownership of the local consumer pipelines' cursors as each shard is now - // responsible for its own producer cursors. - for (const auto& pipeline : consumerPipelines) { - const auto& mergeCursors = - static_cast(pipeline.shardsPipeline->peekFront()); - mergeCursors->dismissCursorOwnership(); - } - return sharded_agg_helpers::DispatchShardPipelineResults{false, - std::move(ownedCursors), - {} /*TODO SERVER-36279*/, - std::move(splitPipeline), - nullptr, - BSONObj(), - numConsumers}; -} - -Status appendExplainResults(sharded_agg_helpers::DispatchShardPipelineResults&& dispatchResults, - const boost::intrusive_ptr& mergeCtx, - BSONObjBuilder* result) { - if (dispatchResults.splitPipeline) { - auto* mergePipeline = dispatchResults.splitPipeline->mergePipeline.get(); - const char* mergeType = [&]() { - if (mergePipeline->canRunOnMongos()) { - return "mongos"; - } else if (dispatchResults.exchangeSpec) { - return "exchange"; - } else if (mergePipeline->needsPrimaryShardMerger()) { - return "primaryShard"; - } else { - return "anyShard"; - } - }(); - - *result << "mergeType" << mergeType; - - MutableDocument pipelinesDoc; - pipelinesDoc.addField("shardsPart", - Value(dispatchResults.splitPipeline->shardsPipeline->writeExplainOps( - *mergeCtx->explain))); - if (dispatchResults.exchangeSpec) { - BSONObjBuilder bob; - dispatchResults.exchangeSpec->exchangeSpec.serialize(&bob); - bob.append("consumerShards", dispatchResults.exchangeSpec->consumerShards); - pipelinesDoc.addField("exchange", Value(bob.obj())); - } - pipelinesDoc.addField("mergerPart", - Value(mergePipeline->writeExplainOps(*mergeCtx->explain))); - - *result << "splitPipeline" << pipelinesDoc.freeze(); - } else { - *result << "splitPipeline" << BSONNULL; - } - - BSONObjBuilder shardExplains(result->subobjStart("shards")); - for (const auto& shardResult : dispatchResults.remoteExplainOutput) { - invariant(shardResult.shardHostAndPort); - - uassertStatusOK(shardResult.swResponse.getStatus()); - uassertStatusOK(getStatusFromCommandResult(shardResult.swResponse.getValue().data)); - - auto shardId = shardResult.shardId.toString(); - const auto& data = shardResult.swResponse.getValue().data; - BSONObjBuilder explain(shardExplains.subobjStart(shardId)); - explain << "host" << shardResult.shardHostAndPort->toString(); - if (auto stagesElement = data["stages"]) { - explain << "stages" << stagesElement; - } else { - auto queryPlannerElement = data["queryPlanner"]; - uassert(51157, - str::stream() << "Malformed explain response received from shard " << shardId - << ": " << data.toString(), - queryPlannerElement); - explain << "queryPlanner" << queryPlannerElement; - if (auto executionStatsElement = data["executionStats"]) { - explain << "executionStats" << executionStatsElement; - } - } - } - - return Status::OK(); -} - -AsyncRequestsSender::Response establishMergingShardCursor(OperationContext* opCtx, - const NamespaceString& nss, - const AggregationRequest& request, - const BSONObj mergeCmdObj, - const ShardId& mergingShardId) { - if (MONGO_unlikely(clusterAggregateFailToEstablishMergingShardCursor.shouldFail())) { - log() << "clusterAggregateFailToEstablishMergingShardCursor fail point enabled."; - uasserted(ErrorCodes::FailPointEnabled, - "Asserting on establishing merging shard cursor due to failpoint."); - } - - MultiStatementTransactionRequestsSender ars( - opCtx, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - nss.db().toString(), - {{mergingShardId, mergeCmdObj}}, - ReadPreferenceSetting::get(opCtx), - sharded_agg_helpers::getDesiredRetryPolicy(request)); - const auto response = ars.next(); - invariant(ars.done()); - return response; -} - -BSONObj establishMergingMongosCursor(OperationContext* opCtx, - const AggregationRequest& request, - const NamespaceString& requestedNss, - const LiteParsedPipeline& liteParsedPipeline, - std::unique_ptr pipelineForMerging, - const PrivilegeVector& privileges) { - - ClusterClientCursorParams params(requestedNss, ReadPreferenceSetting::get(opCtx)); - - params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned(); - params.tailableMode = pipelineForMerging->getContext()->tailableMode; - // A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch - // size we pass here is used for getMores, so do not specify a batch size if the initial request - // had a batch size of 0. - params.batchSize = request.getBatchSize() == 0 - ? boost::none - : boost::optional(request.getBatchSize()); - params.lsid = opCtx->getLogicalSessionId(); - params.txnNumber = opCtx->getTxnNumber(); - params.originatingPrivileges = privileges; - - if (TransactionRouter::get(opCtx)) { - params.isAutoCommit = false; - } - - auto ccc = cluster_aggregation_planner::buildClusterCursor( - opCtx, std::move(pipelineForMerging), std::move(params)); - - auto cursorState = ClusterCursorManager::CursorState::NotExhausted; - - rpc::OpMsgReplyBuilder replyBuilder; - CursorResponseBuilder::Options options; - options.isInitialResponse = true; - - CursorResponseBuilder responseBuilder(&replyBuilder, options); - bool stashedResult = false; - - for (long long objCount = 0; objCount < request.getBatchSize(); ++objCount) { - ClusterQueryResult next; - try { - next = uassertStatusOK(ccc->next(RouterExecStage::ExecContext::kInitialFind)); - } catch (const ExceptionFor&) { - // This exception is thrown when a $changeStream stage encounters an event - // that invalidates the cursor. We should close the cursor and return without - // error. - cursorState = ClusterCursorManager::CursorState::Exhausted; - break; - } - - // Check whether we have exhausted the pipeline's results. - if (next.isEOF()) { - // We reached end-of-stream. If the cursor is not tailable, then we mark it as - // exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even when - // we reach end-of-stream. However, if all the remote cursors are exhausted, there is no - // hope of returning data and thus we need to close the mongos cursor as well. - if (!ccc->isTailable() || ccc->remotesExhausted()) { - cursorState = ClusterCursorManager::CursorState::Exhausted; - } - break; - } - - // If this result will fit into the current batch, add it. Otherwise, stash it in the cursor - // to be returned on the next getMore. - auto nextObj = *next.getResult(); - - if (!FindCommon::haveSpaceForNext(nextObj, objCount, responseBuilder.bytesUsed())) { - ccc->queueResult(nextObj); - stashedResult = true; - break; - } - - // Set the postBatchResumeToken. For non-$changeStream aggregations, this will be empty. - responseBuilder.setPostBatchResumeToken(ccc->getPostBatchResumeToken()); - responseBuilder.append(nextObj); - } - - // For empty batches, or in the case where the final result was added to the batch rather than - // being stashed, we update the PBRT here to ensure that it is the most recent available. - if (!stashedResult) { - responseBuilder.setPostBatchResumeToken(ccc->getPostBatchResumeToken()); - } - - ccc->detachFromOperationContext(); - - int nShards = ccc->getNumRemotes(); - CursorId clusterCursorId = 0; - - if (cursorState == ClusterCursorManager::CursorState::NotExhausted) { - auto authUsers = AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(); - clusterCursorId = uassertStatusOK(Grid::get(opCtx)->getCursorManager()->registerCursor( - opCtx, - ccc.releaseCursor(), - requestedNss, - ClusterCursorManager::CursorType::MultiTarget, - ClusterCursorManager::CursorLifetime::Mortal, - authUsers)); - } - - // Fill out the aggregation metrics in CurOp. - if (clusterCursorId > 0) { - CurOp::get(opCtx)->debug().cursorid = clusterCursorId; - } - CurOp::get(opCtx)->debug().nShards = std::max(CurOp::get(opCtx)->debug().nShards, nShards); - CurOp::get(opCtx)->debug().cursorExhausted = (clusterCursorId == 0); - CurOp::get(opCtx)->debug().nreturned = responseBuilder.numDocs(); - - responseBuilder.done(clusterCursorId, requestedNss.ns()); - - auto bodyBuilder = replyBuilder.getBodyBuilder(); - CommandHelpers::appendSimpleCommandStatus(bodyBuilder, true); - bodyBuilder.doneFast(); - - return replyBuilder.releaseBody(); -} - -/** - * Returns the output of the listCollections command filtered to the namespace 'nss'. - */ -BSONObj getUnshardedCollInfo(const Shard* primaryShard, const NamespaceString& nss) { - ScopedDbConnection conn(primaryShard->getConnString()); - std::list all = - conn->getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); - if (all.empty()) { - // Collection does not exist, return an empty object. - return BSONObj(); - } - return all.front(); -} - -/** - * Returns the collection default collation or the simple collator if there is no default. If the - * collection does not exist, then returns an empty BSON Object. - */ -BSONObj getDefaultCollationForUnshardedCollection(const BSONObj collectionInfo) { - if (collectionInfo.isEmpty()) { - // Collection does not exist, return an empty object. - return BSONObj(); - } - - BSONObj defaultCollation = CollationSpec::kSimpleSpec; - if (collectionInfo["options"].type() == BSONType::Object) { - BSONObj collectionOptions = collectionInfo["options"].Obj(); - BSONElement collationElement; - auto status = bsonExtractTypedField( - collectionOptions, "collation", BSONType::Object, &collationElement); - if (status.isOK()) { - defaultCollation = collationElement.Obj().getOwned(); - uassert(ErrorCodes::BadValue, - "Default collation in collection metadata cannot be empty.", - !defaultCollation.isEmpty()); - } else if (status != ErrorCodes::NoSuchKey) { - uassertStatusOK(status); - } - } - return defaultCollation; -} - -/** - * Populates the "collation" and "uuid" parameters with the following semantics: - * - The "collation" parameter will be set to the default collation for the collection or the - * simple collation if there is no default. If the collection does not exist or if the aggregate - * is on the collectionless namespace, this will be set to an empty object. - * - The "uuid" is retrieved from the chunk manager for sharded collections or the listCollections - * output for unsharded collections. The UUID will remain unset if the aggregate is on the - * collectionless namespace. - */ -std::pair> getCollationAndUUID( - const boost::optional& routingInfo, - const NamespaceString& nss, - const AggregationRequest& request, - const LiteParsedPipeline& litePipe) { - const bool collectionIsSharded = (routingInfo && routingInfo->cm()); - const bool collectionIsNotSharded = (routingInfo && !routingInfo->cm()); - - // If this is a change stream or a collectionless aggregation, we immediately return the user- - // defined collation if one exists, or an empty BSONObj otherwise. Change streams never inherit - // the collection's default collation, and since collectionless aggregations generally run on - // the 'admin' database, the standard logic would attempt to resolve its non-existent UUID and - // collation by sending a specious 'listCollections' command to the config servers. - if (litePipe.hasChangeStream() || nss.isCollectionlessAggregateNS()) { - return {request.getCollation(), boost::none}; - } - - // If the collection is unsharded, obtain collInfo from the primary shard. - const auto unshardedCollInfo = collectionIsNotSharded - ? getUnshardedCollInfo(routingInfo->db().primary().get(), nss) - : BSONObj(); - - // Return the collection UUID if available, or boost::none otherwise. - const auto getUUID = [&]() -> auto { - if (collectionIsSharded) { - return routingInfo->cm()->getUUID(); - } else { - return unshardedCollInfo["info"] && unshardedCollInfo["info"]["uuid"] - ? boost::optional{uassertStatusOK( - UUID::parse(unshardedCollInfo["info"]["uuid"]))} - : boost::optional{boost::none}; - } - }; - - // If the collection exists, return its default collation, or the simple - // collation if no explicit default is present. If the collection does not - // exist, return an empty BSONObj. - const auto getCollation = [&]() -> auto { - if (!collectionIsSharded && !collectionIsNotSharded) { - return BSONObj(); - } - if (collectionIsNotSharded) { - return getDefaultCollationForUnshardedCollection(unshardedCollInfo); - } else { - return routingInfo->cm()->getDefaultCollator() - ? routingInfo->cm()->getDefaultCollator()->getSpec().toBSON() - : CollationSpec::kSimpleSpec; - } - }; - - // If the user specified an explicit collation, we always adopt it. Otherwise, - // obtain the collection default or simple collation as appropriate, and return - // it along with the collection's UUID. - return {request.getCollation().isEmpty() ? getCollation() : request.getCollation(), getUUID()}; -} - -ShardId pickMergingShard(OperationContext* opCtx, - bool needsPrimaryShardMerge, - const std::vector& targetedShards, - ShardId primaryShard) { - auto& prng = opCtx->getClient()->getPrng(); - // If we cannot merge on mongoS, establish the merge cursor on a shard. Perform the merging - // command on random shard, unless the pipeline dictates that it needs to be run on the primary - // shard for the database. - return needsPrimaryShardMerge ? primaryShard - : targetedShards[prng.nextInt32(targetedShards.size())]; -} - // "Resolve" involved namespaces into a map. We won't try to execute anything on a mongos, but we // still have to populate this map so that any $lookups, etc. will be able to have a resolved view // definition. It's okay that this is incorrect, we will repopulate the real namespace map on the // mongod. Note that this function must be called before forwarding an aggregation command on an // unsharded collection, in order to verify that the involved namespaces are allowed to be sharded. -auto resolveInvolvedNamespaces(OperationContext* opCtx, const LiteParsedPipeline& litePipe) { +auto resolveInvolvedNamespaces(stdx::unordered_set involvedNamespaces) { StringMap resolvedNamespaces; - for (auto&& nss : litePipe.getInvolvedNamespaces()) { + for (auto&& nss : involvedNamespaces) { resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector{}); } return resolvedNamespaces; } // Build an appropriate ExpressionContext for the pipeline. This helper instantiates an appropriate -// collator, creates a MongoProcessInterface for use by the pipeline's stages, and optionally -// extracts the UUID from the collection info if present. +// collator, creates a MongoProcessInterface for use by the pipeline's stages, and sets the +// collection UUID if provided. boost::intrusive_ptr makeExpressionContext( OperationContext* opCtx, const AggregationRequest& request, - const LiteParsedPipeline& litePipe, BSONObj collationObj, boost::optional uuid, StringMap resolvedNamespaces) { @@ -592,127 +122,13 @@ boost::intrusive_ptr makeExpressionContext( std::move(resolvedNamespaces), uuid); + // Keep the backing collation object on the context up to date with the resolved collator. + mergeCtx->collation = collationObj; + mergeCtx->inMongos = true; return mergeCtx; } -// Runs a pipeline on mongoS, having first validated that it is eligible to do so. This can be a -// pipeline which is split for merging, or an intact pipeline which must run entirely on mongoS. -Status runPipelineOnMongoS(const boost::intrusive_ptr& expCtx, - const ClusterAggregate::Namespaces& namespaces, - const AggregationRequest& request, - const LiteParsedPipeline& litePipe, - std::unique_ptr pipeline, - BSONObjBuilder* result, - const PrivilegeVector& privileges) { - // We should never receive a pipeline which cannot run on mongoS. - invariant(!expCtx->explain); - invariant(pipeline->canRunOnMongos()); - - const auto& requestedNss = namespaces.requestedNss; - const auto opCtx = expCtx->opCtx; - - // Verify that the first stage can produce input for the remainder of the pipeline. - uassert(ErrorCodes::IllegalOperation, - str::stream() << "Aggregation pipeline must be run on mongoS, but " - << pipeline->getSources().front()->getSourceName() - << " is not capable of producing input", - !pipeline->getSources().front()->constraints().requiresInputDocSource); - - // Register the new mongoS cursor, and retrieve the initial batch of results. - auto cursorResponse = establishMergingMongosCursor( - opCtx, request, requestedNss, litePipe, std::move(pipeline), privileges); - - // We don't need to storePossibleCursor or propagate writeConcern errors; a pipeline with - // writing stages like $out can never run on mongoS. Filter the command response and return - // immediately. - CommandHelpers::filterCommandReplyForPassthrough(cursorResponse, result); - return getStatusFromCommandResult(result->asTempObj()); -} - -Status dispatchMergingPipeline( - const boost::intrusive_ptr& expCtx, - const ClusterAggregate::Namespaces& namespaces, - const AggregationRequest& request, - const LiteParsedPipeline& litePipe, - const boost::optional& routingInfo, - sharded_agg_helpers::DispatchShardPipelineResults&& shardDispatchResults, - BSONObjBuilder* result, - const PrivilegeVector& privileges) { - // We should never be in a situation where we call this function on a non-merge pipeline. - invariant(shardDispatchResults.splitPipeline); - auto* mergePipeline = shardDispatchResults.splitPipeline->mergePipeline.get(); - invariant(mergePipeline); - auto* opCtx = expCtx->opCtx; - - std::vector targetedShards; - targetedShards.reserve(shardDispatchResults.remoteCursors.size()); - for (auto&& remoteCursor : shardDispatchResults.remoteCursors) { - targetedShards.emplace_back(remoteCursor->getShardId().toString()); - } - - cluster_aggregation_planner::addMergeCursorsSource( - mergePipeline, - litePipe, - shardDispatchResults.commandForTargetedShards, - std::move(shardDispatchResults.remoteCursors), - targetedShards, - shardDispatchResults.splitPipeline->shardCursorsSortSpec, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor()); - - // First, check whether we can merge on the mongoS. If the merge pipeline MUST run on mongoS, - // then ignore the internalQueryProhibitMergingOnMongoS parameter. - if (mergePipeline->requiredToRunOnMongos() || - (!internalQueryProhibitMergingOnMongoS.load() && mergePipeline->canRunOnMongos())) { - return runPipelineOnMongoS(expCtx, - namespaces, - request, - litePipe, - std::move(shardDispatchResults.splitPipeline->mergePipeline), - result, - privileges); - } - - // If we are not merging on mongoS, then this is not a $changeStream aggregation, and we - // therefore must have a valid routing table. - invariant(routingInfo); - - const ShardId mergingShardId = pickMergingShard(opCtx, - shardDispatchResults.needsPrimaryShardMerge, - targetedShards, - routingInfo->db().primaryId()); - const bool mergingShardContributesData = - std::find(targetedShards.begin(), targetedShards.end(), mergingShardId) != - targetedShards.end(); - - auto mergeCmdObj = createCommandForMergingShard( - request, expCtx, mergingShardId, mergingShardContributesData, mergePipeline); - - // Dispatch $mergeCursors to the chosen shard, store the resulting cursor, and return. - auto mergeResponse = establishMergingShardCursor( - opCtx, namespaces.executionNss, request, mergeCmdObj, mergingShardId); - uassertStatusOK(mergeResponse.swResponse); - - auto mergeCursorResponse = uassertStatusOK( - storePossibleCursor(opCtx, - mergingShardId, - *mergeResponse.shardHostAndPort, - mergeResponse.swResponse.getValue().data, - namespaces.requestedNss, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - Grid::get(opCtx)->getCursorManager(), - privileges, - expCtx->tailableMode)); - - // Ownership for the shard cursors has been transferred to the merging shard. Dismiss the - // ownership in the current merging pipeline such that when it goes out of scope it does not - // attempt to kill the cursors. - auto mergeCursors = static_cast(mergePipeline->peekFront()); - mergeCursors->dismissCursorOwnership(); - - return appendCursorResponseToCommandResult(mergingShardId, mergeCursorResponse, result); -} - void appendEmptyResultSetWithStatus(OperationContext* opCtx, const NamespaceString& nss, Status status, @@ -740,199 +156,105 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, << ", " << AggregationRequest::kFromMongosName << "] cannot be set to 'true' when sent to mongos", !request.needsMerge() && !request.isFromMongos()); - auto executionNsRoutingInfoStatus = - sharded_agg_helpers::getExecutionNsRoutingInfo(opCtx, namespaces.executionNss); - boost::optional routingInfo; - LiteParsedPipeline litePipe(request); + const auto isSharded = [](OperationContext* opCtx, const NamespaceString& nss) -> bool { const auto resolvedNsRoutingInfo = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); return resolvedNsRoutingInfo.cm().get(); }; - const bool involvesShardedCollections = litePipe.verifyIsSupported( - opCtx, isSharded, request.getExplain(), serverGlobalParams.enableMajorityReadConcern); - // If the routing table is valid, we obtain a reference to it. If the table is not valid, then - // either the database does not exist, or there are no shards in the cluster. In the latter - // case, we always return an empty cursor. In the former case, if the requested aggregation is a - // $changeStream, we allow the operation to continue so that stream cursors can be established - // on the given namespace before the database or collection is actually created. If the database - // does not exist and this is not a $changeStream, then we return an empty cursor. - if (executionNsRoutingInfoStatus.isOK()) { - routingInfo = std::move(executionNsRoutingInfoStatus.getValue()); - } else if (!(litePipe.hasChangeStream() && - executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) { - appendEmptyResultSetWithStatus( - opCtx, namespaces.requestedNss, executionNsRoutingInfoStatus.getStatus(), result); - return Status::OK(); - } - - // Determine whether this aggregation must be dispatched to all shards in the cluster. - const bool mustRunOnAll = - sharded_agg_helpers::mustRunOnAllShards(namespaces.executionNss, litePipe); - - // If we don't have a routing table, then this is a $changeStream which must run on all shards. - invariant(routingInfo || (mustRunOnAll && litePipe.hasChangeStream())); - - auto resolvedNamespaces = resolveInvolvedNamespaces(opCtx, litePipe); - - // A pipeline is allowed to passthrough to the primary shard iff the following conditions are - // met: - // - // 1. The namespace of the aggregate and any other involved namespaces are unsharded. - // 2. Is allowed to be forwarded to shards. - // 3. Does not need to run on all shards. - // 4. Doesn't need transformation via DocumentSource::serialize(). - if (routingInfo && !routingInfo->cm() && !mustRunOnAll && - litePipe.allowedToPassthroughFromMongos() && !involvesShardedCollections) { - const auto primaryShardId = routingInfo->db().primary()->getId(); - return aggPassthrough( - opCtx, namespaces, routingInfo->db(), request, litePipe, privileges, result); - } - - // Populate the collection UUID and the appropriate collation to use. - auto collInfo = getCollationAndUUID(routingInfo, namespaces.executionNss, request, litePipe); - BSONObj collationObj = collInfo.first; - boost::optional uuid = collInfo.second; - - // Build an ExpressionContext for the pipeline. This instantiates an appropriate collator, - // resolves all involved namespaces, and creates a shared MongoProcessInterface for use by the - // pipeline's stages. - auto expCtx = makeExpressionContext( - opCtx, request, litePipe, collationObj, uuid, std::move(resolvedNamespaces)); - - // Parse and optimize the full pipeline. - auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx)); - pipeline->optimizePipeline(); - - // Check whether the entire pipeline must be run on mongoS. - if (pipeline->requiredToRunOnMongos()) { - // If this is an explain write the explain output and return. - if (expCtx->explain) { - *result << "splitPipeline" << BSONNULL << "mongos" - << Document{{"host", getHostNameCachedAndPort()}, - {"stages", pipeline->writeExplainOps(*expCtx->explain)}}; - return Status::OK(); - } - - return runPipelineOnMongoS( - expCtx, namespaces, request, litePipe, std::move(pipeline), result, privileges); - } + LiteParsedPipeline litePipe(request); + litePipe.verifyIsSupported( + opCtx, isSharded, request.getExplain(), serverGlobalParams.enableMajorityReadConcern); + auto hasChangeStream = litePipe.hasChangeStream(); + auto involvedNamespaces = litePipe.getInvolvedNamespaces(); + + const auto pipelineBuilder = [&](boost::optional routingInfo) { + // Populate the collection UUID and the appropriate collation to use. + auto [collationObj, uuid] = [&]() -> std::pair> { + // If this is a change stream, take the user-defined collation if one exists, or an + // empty BSONObj otherwise. Change streams never inherit the collection's default + // collation, and since collectionless aggregations generally run on the 'admin' + // database, the standard logic would attempt to resolve its non-existent UUID and + // collation by sending a specious 'listCollections' command to the config servers. + if (hasChangeStream) { + return {request.getCollation(), boost::none}; + } - // If not, split the pipeline as necessary and dispatch to the relevant shards. - auto shardDispatchResults = sharded_agg_helpers::dispatchShardPipeline( - expCtx, namespaces.executionNss, request, litePipe, std::move(pipeline), collationObj); + return sharded_agg_helpers::getCollationAndUUID( + routingInfo, namespaces.executionNss, request.getCollation()); + }(); - // If the operation is an explain, then we verify that it succeeded on all targeted shards, - // write the results to the output builder, and return immediately. - if (expCtx->explain) { - return appendExplainResults(std::move(shardDispatchResults), expCtx, result); - } + // Build an ExpressionContext for the pipeline. This instantiates an appropriate collator, + // resolves all involved namespaces, and creates a shared MongoProcessInterface for use by + // the pipeline's stages. + auto expCtx = makeExpressionContext( + opCtx, request, collationObj, uuid, resolveInvolvedNamespaces(involvedNamespaces)); - // If this isn't an explain, then we must have established cursors on at least one shard. - invariant(shardDispatchResults.remoteCursors.size() > 0); - - // If we sent the entire pipeline to a single shard, store the remote cursor and return. - if (!shardDispatchResults.splitPipeline) { - invariant(shardDispatchResults.remoteCursors.size() == 1); - auto&& remoteCursor = std::move(shardDispatchResults.remoteCursors.front()); - const auto shardId = remoteCursor->getShardId().toString(); - const auto reply = uassertStatusOK(storePossibleCursor(opCtx, - namespaces.requestedNss, - std::move(remoteCursor), - privileges, - expCtx->tailableMode)); - return appendCursorResponseToCommandResult(shardId, reply, result); - } + // Parse and optimize the full pipeline. + auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx)); + pipeline->optimizePipeline(); + return pipeline; + }; - // If we have the exchange spec then dispatch all consumers. - if (shardDispatchResults.exchangeSpec) { - shardDispatchResults = dispatchExchangeConsumerPipeline(expCtx, - namespaces.executionNss, - request, - litePipe, - collationObj, - &shardDispatchResults); + auto targetingStatus = + sharded_agg_helpers::AggregationTargeter::make(opCtx, + namespaces.executionNss, + pipelineBuilder, + involvedNamespaces, + hasChangeStream, + litePipe.allowedToPassthroughFromMongos()); + if (!targetingStatus.isOK()) { + appendEmptyResultSetWithStatus( + opCtx, namespaces.requestedNss, targetingStatus.getStatus(), result); + return Status::OK(); } - // If we reach here, we have a merge pipeline to dispatch. - return dispatchMergingPipeline(expCtx, - namespaces, - request, - litePipe, - routingInfo, - std::move(shardDispatchResults), - result, - privileges); -} + auto targeter = std::move(targetingStatus.getValue()); + switch (targeter.policy) { + case sharded_agg_helpers::AggregationTargeter::TargetingPolicy::kPassthrough: { + // A pipeline with $changeStream should never be allowed to passthrough. + invariant(!hasChangeStream); + return sharded_agg_helpers::runPipelineOnPrimaryShard(opCtx, + namespaces, + targeter.routingInfo->db(), + request.getExplain(), + request.serializeToCommandObj(), + privileges, + result); + } -Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, - const Namespaces& namespaces, - const CachedDatabaseInfo& dbInfo, - const AggregationRequest& aggRequest, - const LiteParsedPipeline& liteParsedPipeline, - const PrivilegeVector& privileges, - BSONObjBuilder* out) { - // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an - // explain if necessary, and rewrites the result into a format safe to forward to shards. - BSONObj cmdObj = CommandHelpers::filterCommandRequestForPassthrough( - sharded_agg_helpers::createPassthroughCommandForShard( - opCtx, aggRequest, boost::none, nullptr, BSONObj())); - - const auto shardId = dbInfo.primary()->getId(); - const auto cmdObjWithShardVersion = (shardId != ShardRegistry::kConfigServerShardId) - ? appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED()) - : std::move(cmdObj); - - MultiStatementTransactionRequestsSender ars( - opCtx, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - namespaces.executionNss.db().toString(), - {{shardId, appendDbVersionIfPresent(cmdObjWithShardVersion, dbInfo)}}, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent); - auto response = ars.next(); - invariant(ars.done()); - - uassertStatusOK(response.swResponse); - auto commandStatus = getStatusFromCommandResult(response.swResponse.getValue().data); - - if (ErrorCodes::isStaleShardVersionError(commandStatus.code())) { - uassertStatusOK(commandStatus.withContext("command failed because of stale config")); - } else if (ErrorCodes::isSnapshotError(commandStatus.code())) { - uassertStatusOK( - commandStatus.withContext("command failed because can not establish a snapshot")); - } + case sharded_agg_helpers::AggregationTargeter::TargetingPolicy::kMongosRequired: { + auto expCtx = targeter.pipeline->getContext(); + // If this is an explain write the explain output and return. + if (expCtx->explain) { + *result << "splitPipeline" << BSONNULL << "mongos" + << Document{ + {"host", getHostNameCachedAndPort()}, + {"stages", targeter.pipeline->writeExplainOps(*expCtx->explain)}}; + return Status::OK(); + } - BSONObj result; - if (aggRequest.getExplain()) { - // If this was an explain, then we get back an explain result object rather than a cursor. - result = response.swResponse.getValue().data; - } else { - auto tailMode = liteParsedPipeline.hasChangeStream() - ? TailableModeEnum::kTailableAndAwaitData - : TailableModeEnum::kNormal; - result = uassertStatusOK( - storePossibleCursor(opCtx, - shardId, - *response.shardHostAndPort, - response.swResponse.getValue().data, - namespaces.requestedNss, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - Grid::get(opCtx)->getCursorManager(), - privileges, - tailMode)); - } + return sharded_agg_helpers::runPipelineOnMongoS(namespaces, + request.getBatchSize(), + std::move(targeter.pipeline), + result, + privileges); + } - // First append the properly constructed writeConcernError. It will then be skipped - // in appendElementsUnique. - if (auto wcErrorElem = result["writeConcernError"]) { - appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *out); + case sharded_agg_helpers::AggregationTargeter::TargetingPolicy::kAnyShard: { + return sharded_agg_helpers::dispatchPipelineAndMerge(opCtx, + std::move(targeter), + request.serializeToCommandObj(), + request.getBatchSize(), + namespaces, + privileges, + result, + hasChangeStream); + } } - out->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(result)); - - return getStatusFromCommandResult(out->asTempObj()); + MONGO_UNREACHABLE; } Status ClusterAggregate::retryOnViewError(OperationContext* opCtx, diff --git a/src/mongo/s/query/cluster_aggregate.h b/src/mongo/s/query/cluster_aggregate.h index 630f4d987b1..9d3c0a90eba 100644 --- a/src/mongo/s/query/cluster_aggregate.h +++ b/src/mongo/s/query/cluster_aggregate.h @@ -104,15 +104,6 @@ public: const PrivilegeVector& privileges, BSONObjBuilder* result, unsigned numberRetries = 0); - -private: - static Status aggPassthrough(OperationContext*, - const Namespaces&, - const CachedDatabaseInfo&, - const AggregationRequest&, - const LiteParsedPipeline&, - const PrivilegeVector& privileges, - BSONObjBuilder* result); }; } // namespace mongo diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index bf8665894b8..d3c899755cb 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -479,12 +479,12 @@ SplitPipeline splitPipeline(std::unique_ptr pipeline) } void addMergeCursorsSource(Pipeline* mergePipeline, - const LiteParsedPipeline& liteParsedPipeline, BSONObj cmdSentToShards, std::vector ownedCursors, const std::vector& targetedShards, boost::optional shardCursorsSortSpec, - std::shared_ptr executor) { + std::shared_ptr executor, + bool hasChangeStream) { auto* opCtx = mergePipeline->getContext()->opCtx; AsyncResultsMergerParams armParams; armParams.setSort(shardCursorsSortSpec); @@ -524,7 +524,7 @@ void addMergeCursorsSource(Pipeline* mergePipeline, auto mergeCursorsStage = DocumentSourceMergeCursors::create( std::move(executor), std::move(armParams), mergePipeline->getContext()); - if (liteParsedPipeline.hasChangeStream()) { + if (hasChangeStream) { mergePipeline->addInitialSource(DocumentSourceUpdateOnAddShard::create( mergePipeline->getContext(), Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), diff --git a/src/mongo/s/query/cluster_aggregation_planner.h b/src/mongo/s/query/cluster_aggregation_planner.h index 75baded9e31..3b8b70fcd1d 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.h +++ b/src/mongo/s/query/cluster_aggregation_planner.h @@ -81,12 +81,12 @@ SplitPipeline splitPipeline(std::unique_ptr pipeline) * front of 'mergePipeline'. */ void addMergeCursorsSource(Pipeline* mergePipeline, - const LiteParsedPipeline&, BSONObj cmdSentToShards, - std::vector remoteCursors, + std::vector ownedCursors, const std::vector& targetedShards, boost::optional shardCursorsSortSpec, - std::shared_ptr executor); + std::shared_ptr executor, + bool hasChangeStream); /** * Builds a ClusterClientCursor which will execute 'pipeline'. If 'pipeline' consists entirely of -- cgit v1.2.1