From 42c40db7ef341b3dbf2b975e61d87ce8000042a9 Mon Sep 17 00:00:00 2001 From: Arun Banala Date: Mon, 21 Oct 2019 10:06:08 +0000 Subject: SERVER-43706 M/R Agg: Support referencing 'scope' variables in translated aggregation pipeline for mongos --- .../suites/sharding_map_reduce_agg.yaml | 1 + jstests/sharding/map_reduce_scope.js | 42 ++++++++++++++++++++++ src/mongo/s/commands/cluster_map_reduce_agg.cpp | 18 ++++++---- 3 files changed, 55 insertions(+), 6 deletions(-) create mode 100644 jstests/sharding/map_reduce_scope.js diff --git a/buildscripts/resmokeconfig/suites/sharding_map_reduce_agg.yaml b/buildscripts/resmokeconfig/suites/sharding_map_reduce_agg.yaml index 175efe9f381..1f71cbb0279 100644 --- a/buildscripts/resmokeconfig/suites/sharding_map_reduce_agg.yaml +++ b/buildscripts/resmokeconfig/suites/sharding_map_reduce_agg.yaml @@ -11,6 +11,7 @@ selector: - jstests/sharding/localhostAuthBypass.js - jstests/sharding/map_reduce_invalid_output_collection.js - jstests/sharding/map_reduce_invalid_result_set.js + - jstests/sharding/map_reduce_scope.js - jstests/sharding/max_time_ms_sharded.js - jstests/sharding/mr_and_agg_versioning.js - jstests/sharding/mr_shard_version.js diff --git a/jstests/sharding/map_reduce_scope.js b/jstests/sharding/map_reduce_scope.js new file mode 100644 index 00000000000..8657a99afdb --- /dev/null +++ b/jstests/sharding/map_reduce_scope.js @@ -0,0 +1,42 @@ +/** + * Test to verify 'scope' parameter of mapReduce command. This test verfies that 'map', 'reduce' and + * 'finalize' functions can use 'scope' variable passed in the input. + */ +(function() { +"use strict"; + +const st = new ShardingTest({shards: 2}); +const dbName = jsTest.name(); +const coll = st.s.getDB(dbName).coll; +st.s.adminCommand({enableSharding: dbName}); +st.ensurePrimaryShard(dbName, st.shard0.shardName); + +function runTest(coll) { + const map = function() { + emit(xx.val, this.a); + }; + const reduce = function(key, values) { + return {reduce: Array.sum(values) + xx.val}; + }; + const finalize = function(key, values) { + values.finalize = xx.val + 1; + return values; + }; + const res = assert.commandWorked( + coll.mapReduce(map, reduce, {finalize: finalize, out: {inline: 1}, scope: {xx: {val: 9}}})); + assert.eq(9, res.results[0].value.reduce); + assert.eq(10, res.results[0].value.finalize); +} + +assert.commandWorked(coll.insert({a: -4})); +assert.commandWorked(coll.insert({a: 4})); + +// Run test when a single shard is targetted. +runTest(coll); + +// Run test when more than one shard is targetted. +st.shardColl("coll", {a: 1}, {a: 0}); +runTest(coll); + +st.stop(); +})(); diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp index 837de4feda2..6df1e77250c 100644 --- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp @@ -77,7 +77,10 @@ auto makeExpressionContext(OperationContext* opCtx, parsedMr.getOutOptions().getCollectionName()}; resolvedNamespaces.try_emplace(outNss.coll(), outNss, std::vector{}); } - + auto runtimeConstants = Variables::generateRuntimeConstants(opCtx); + if (parsedMr.getScope()) { + runtimeConstants.setJsScope(parsedMr.getScope()->getObj()); + } auto expCtx = make_intrusive( opCtx, boost::none, // explain @@ -87,7 +90,7 @@ auto makeExpressionContext(OperationContext* opCtx, parsedMr.getBypassDocumentValidation().get_value_or(false), nss, collationObj, - boost::none, // runtimeConstants + runtimeConstants, std::move(resolvedCollator), std::make_shared(), std::move(resolvedNamespaces), @@ -100,10 +103,13 @@ Document serializeToCommand(BSONObj originalCmd, const MapReduce& parsedMr, Pipe MutableDocument translatedCmd; translatedCmd["aggregate"] = Value(parsedMr.getNamespace().coll()); - translatedCmd["pipeline"] = Value(pipeline->serialize()); - translatedCmd["cursor"] = Value(Document{{"batchSize", std::numeric_limits::max()}}); - translatedCmd["allowDiskUse"] = Value(true); - translatedCmd["fromMongos"] = Value(true); + translatedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize()); + translatedCmd[AggregationRequest::kCursorName] = + Value(Document{{"batchSize", std::numeric_limits::max()}}); + translatedCmd[AggregationRequest::kAllowDiskUseName] = Value(true); + translatedCmd[AggregationRequest::kFromMongosName] = Value(true); + translatedCmd[AggregationRequest::kRuntimeConstants] = + Value(pipeline->getContext()->getRuntimeConstants().toBSON()); // Append generic command options. for (const auto& elem : CommandHelpers::appendPassthroughFields(originalCmd, BSONObj())) { -- cgit v1.2.1