summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArun Banala <arun.banala@10gen.com>2019-10-21 10:06:08 +0000
committerevergreen <evergreen@mongodb.com>2019-10-21 10:06:08 +0000
commit42c40db7ef341b3dbf2b975e61d87ce8000042a9 (patch)
tree715990636b41e35724937d2ad694ccc43205247b
parentebbafd030d6109aac9ee214fd584468e4c912472 (diff)
downloadmongo-42c40db7ef341b3dbf2b975e61d87ce8000042a9.tar.gz
SERVER-43706 M/R Agg: Support referencing 'scope' variables in translated aggregation pipeline for mongos
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_map_reduce_agg.yaml1
-rw-r--r--jstests/sharding/map_reduce_scope.js42
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_agg.cpp18
3 files changed, 55 insertions, 6 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_map_reduce_agg.yaml b/buildscripts/resmokeconfig/suites/sharding_map_reduce_agg.yaml
index 175efe9f381..1f71cbb0279 100644
--- a/buildscripts/resmokeconfig/suites/sharding_map_reduce_agg.yaml
+++ b/buildscripts/resmokeconfig/suites/sharding_map_reduce_agg.yaml
@@ -11,6 +11,7 @@ selector:
- jstests/sharding/localhostAuthBypass.js
- jstests/sharding/map_reduce_invalid_output_collection.js
- jstests/sharding/map_reduce_invalid_result_set.js
+ - jstests/sharding/map_reduce_scope.js
- jstests/sharding/max_time_ms_sharded.js
- jstests/sharding/mr_and_agg_versioning.js
- jstests/sharding/mr_shard_version.js
diff --git a/jstests/sharding/map_reduce_scope.js b/jstests/sharding/map_reduce_scope.js
new file mode 100644
index 00000000000..8657a99afdb
--- /dev/null
+++ b/jstests/sharding/map_reduce_scope.js
@@ -0,0 +1,42 @@
+/**
+ * Test to verify 'scope' parameter of mapReduce command. This test verfies that 'map', 'reduce' and
+ * 'finalize' functions can use 'scope' variable passed in the input.
+ */
+(function() {
+"use strict";
+
+const st = new ShardingTest({shards: 2});
+const dbName = jsTest.name();
+const coll = st.s.getDB(dbName).coll;
+st.s.adminCommand({enableSharding: dbName});
+st.ensurePrimaryShard(dbName, st.shard0.shardName);
+
+function runTest(coll) {
+ const map = function() {
+ emit(xx.val, this.a);
+ };
+ const reduce = function(key, values) {
+ return {reduce: Array.sum(values) + xx.val};
+ };
+ const finalize = function(key, values) {
+ values.finalize = xx.val + 1;
+ return values;
+ };
+ const res = assert.commandWorked(
+ coll.mapReduce(map, reduce, {finalize: finalize, out: {inline: 1}, scope: {xx: {val: 9}}}));
+ assert.eq(9, res.results[0].value.reduce);
+ assert.eq(10, res.results[0].value.finalize);
+}
+
+assert.commandWorked(coll.insert({a: -4}));
+assert.commandWorked(coll.insert({a: 4}));
+
+// Run test when a single shard is targetted.
+runTest(coll);
+
+// Run test when more than one shard is targetted.
+st.shardColl("coll", {a: 1}, {a: 0});
+runTest(coll);
+
+st.stop();
+})();
diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp
index 837de4feda2..6df1e77250c 100644
--- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp
@@ -77,7 +77,10 @@ auto makeExpressionContext(OperationContext* opCtx,
parsedMr.getOutOptions().getCollectionName()};
resolvedNamespaces.try_emplace(outNss.coll(), outNss, std::vector<BSONObj>{});
}
-
+ auto runtimeConstants = Variables::generateRuntimeConstants(opCtx);
+ if (parsedMr.getScope()) {
+ runtimeConstants.setJsScope(parsedMr.getScope()->getObj());
+ }
auto expCtx = make_intrusive<ExpressionContext>(
opCtx,
boost::none, // explain
@@ -87,7 +90,7 @@ auto makeExpressionContext(OperationContext* opCtx,
parsedMr.getBypassDocumentValidation().get_value_or(false),
nss,
collationObj,
- boost::none, // runtimeConstants
+ runtimeConstants,
std::move(resolvedCollator),
std::make_shared<MongoSInterface>(),
std::move(resolvedNamespaces),
@@ -100,10 +103,13 @@ Document serializeToCommand(BSONObj originalCmd, const MapReduce& parsedMr, Pipe
MutableDocument translatedCmd;
translatedCmd["aggregate"] = Value(parsedMr.getNamespace().coll());
- translatedCmd["pipeline"] = Value(pipeline->serialize());
- translatedCmd["cursor"] = Value(Document{{"batchSize", std::numeric_limits<long long>::max()}});
- translatedCmd["allowDiskUse"] = Value(true);
- translatedCmd["fromMongos"] = Value(true);
+ translatedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize());
+ translatedCmd[AggregationRequest::kCursorName] =
+ Value(Document{{"batchSize", std::numeric_limits<long long>::max()}});
+ translatedCmd[AggregationRequest::kAllowDiskUseName] = Value(true);
+ translatedCmd[AggregationRequest::kFromMongosName] = Value(true);
+ translatedCmd[AggregationRequest::kRuntimeConstants] =
+ Value(pipeline->getContext()->getRuntimeConstants().toBSON());
// Append generic command options.
for (const auto& elem : CommandHelpers::appendPassthroughFields(originalCmd, BSONObj())) {