summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNicholas Zolnierz <nicholas.zolnierz@mongodb.com>2019-11-19 14:49:37 +0000
committerevergreen <evergreen@mongodb.com>2019-11-19 14:49:37 +0000
commit0bed872351365fa35107e0d9818d9066642a84a4 (patch)
tree069823b8122b461f50874b22fad3cab9982a3840
parent4722a18440d6645a24f83def678f7cf7a6a290fe (diff)
downloadmongo-0bed872351365fa35107e0d9818d9066642a84a4.tar.gz
SERVER-44150 Enable MR tests for output mode 'reduce' with a non-trivial reduce function
-rw-r--r--buildscripts/resmokeconfig/suites/concurrency_simultaneous.yml4
-rw-r--r--buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication.yml4
-rw-r--r--buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication_wiredtiger_cursor_sweeps.yml4
-rw-r--r--buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication_wiredtiger_eviction_debug.yml4
-rw-r--r--jstests/concurrency/fsm_workloads/map_reduce_reduce.js6
-rw-r--r--jstests/multiVersion/map_reduce_multiversion_cluster.js42
-rw-r--r--jstests/multiVersion/map_reduce_multiversion_repl_set.js13
-rw-r--r--src/mongo/db/commands/mr_common.cpp33
8 files changed, 49 insertions, 61 deletions
diff --git a/buildscripts/resmokeconfig/suites/concurrency_simultaneous.yml b/buildscripts/resmokeconfig/suites/concurrency_simultaneous.yml
index 6df807aad19..81e56d57edc 100644
--- a/buildscripts/resmokeconfig/suites/concurrency_simultaneous.yml
+++ b/buildscripts/resmokeconfig/suites/concurrency_simultaneous.yml
@@ -29,10 +29,6 @@ selector:
- jstests/concurrency/fsm_workloads/collmod_writeconflict.js
- jstests/concurrency/fsm_workloads/reindex_writeconflict.js
- # This test is not compatible with the other mapReduce FSM tests since it requires the
- # 'internalQueryUseAggMapReduce' knob to be false while the other tests explicitly set it to true.
- - jstests/concurrency/fsm_workloads/map_reduce_reduce.js
-
exclude_with_any_tags:
- uses_transactions
- requires_replication
diff --git a/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication.yml b/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication.yml
index 64df87a1722..10307b07be1 100644
--- a/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication.yml
+++ b/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication.yml
@@ -44,10 +44,6 @@ selector:
- jstests/concurrency/fsm_workloads/collmod_writeconflict.js
- jstests/concurrency/fsm_workloads/reindex_writeconflict.js
- # This test is not compatible with the other mapReduce FSM tests since it requires the
- # 'internalQueryUseAggMapReduce' knob to be false while the other tests explicitly set it to true.
- - jstests/concurrency/fsm_workloads/map_reduce_reduce.js
-
exclude_with_any_tags:
- requires_sharding
diff --git a/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication_wiredtiger_cursor_sweeps.yml b/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication_wiredtiger_cursor_sweeps.yml
index 0f9fd10d2ba..d3a5cc6f0c5 100644
--- a/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication_wiredtiger_cursor_sweeps.yml
+++ b/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication_wiredtiger_cursor_sweeps.yml
@@ -44,10 +44,6 @@ selector:
- jstests/concurrency/fsm_workloads/collmod_writeconflict.js
- jstests/concurrency/fsm_workloads/reindex_writeconflict.js
- # This test is not compatible with the other mapReduce FSM tests since it requires the
- # 'internalQueryUseAggMapReduce' knob to be false while the other tests explicitly set it to true.
- - jstests/concurrency/fsm_workloads/map_reduce_reduce.js
-
exclude_with_any_tags:
- requires_sharding
diff --git a/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication_wiredtiger_eviction_debug.yml b/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication_wiredtiger_eviction_debug.yml
index 5c47e68c1c0..61dfaea8645 100644
--- a/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication_wiredtiger_eviction_debug.yml
+++ b/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication_wiredtiger_eviction_debug.yml
@@ -44,10 +44,6 @@ selector:
- jstests/concurrency/fsm_workloads/collmod_writeconflict.js
- jstests/concurrency/fsm_workloads/reindex_writeconflict.js
- # This test is not compatible with the other mapReduce FSM tests since it requires the
- # 'internalQueryUseAggMapReduce' knob to be false while the other tests explicitly set it to true.
- - jstests/concurrency/fsm_workloads/map_reduce_reduce.js
-
exclude_with_any_tags:
- requires_sharding
diff --git a/jstests/concurrency/fsm_workloads/map_reduce_reduce.js b/jstests/concurrency/fsm_workloads/map_reduce_reduce.js
index 348fe2259d1..173bca4d5d3 100644
--- a/jstests/concurrency/fsm_workloads/map_reduce_reduce.js
+++ b/jstests/concurrency/fsm_workloads/map_reduce_reduce.js
@@ -31,12 +31,6 @@ var $config = extendWorkload($config, function($config, $super) {
}
$config.states.init = function init(db, collName) {
- // TODO SERVER-44150: Cannot run MR with output 'reduce' in agg since the 'whenMatched'
- // pipeline will always run which can cause unexpected failures in the user-specified reduce
- // function.
- assert.commandWorked(
- db.adminCommand({setParameter: 1, internalQueryUseAggMapReduce: false}));
-
$super.states.init.apply(this, arguments);
this.outCollName = uniqueCollectionName(prefix, this.tid);
diff --git a/jstests/multiVersion/map_reduce_multiversion_cluster.js b/jstests/multiVersion/map_reduce_multiversion_cluster.js
index 79524c67067..e2020744610 100644
--- a/jstests/multiVersion/map_reduce_multiversion_cluster.js
+++ b/jstests/multiVersion/map_reduce_multiversion_cluster.js
@@ -68,10 +68,11 @@ function runValidMrTests(coll) {
}
function assertResultsValid(results, expectedCount) {
- assert.gt(results.length, 0);
- assert.lte(results.length, expectedCount);
+ assert.gt(results.length, 0, tojson(results));
+ assert.lte(results.length, expectedCount, tojson(results));
results.map(resultDoc => assert.eq(resultDoc.value.avgAge,
- resultDoc.value.total / resultDoc.value.count));
+ resultDoc.value.total / resultDoc.value.count,
+ tojson(results)));
}
// Inline output.
@@ -102,18 +103,14 @@ function runValidMrTests(coll) {
// Cache a sample result document to ensure that re-reducing actually occurs below.
let sampleDoc = mergeColl.findOne();
- // TODO SERVER-44150: Enable the following tests once the new implementation is able to
- // support re-reducing against an existing collection.
-
// Output mode "reduce" to an existing unsharded collection.
- // assert.commandWorked(coll.mapReduce(
- // map,
- // reduce,
- // {finalize: fin, out: {reduce: mergeColl.getName(), db:
- // mergeColl.getDB().getName()}}));
- // res = mergeColl.find().toArray();
- // assertResultsValid(res, states.length);
- // assert.gte(mergeColl.findOne({_id: sampleDoc._id}).value.avgAge, sampleDoc.value.avgAge);
+ assert.commandWorked(coll.mapReduce(
+ map,
+ reduce,
+ {finalize: fin, out: {reduce: mergeColl.getName(), db: mergeColl.getDB().getName()}}));
+ res = mergeColl.find().toArray();
+ assertResultsValid(res, states.length);
+ assert.gte(mergeColl.findOne({_id: sampleDoc._id}).value.avgAge, sampleDoc.value.avgAge);
// Drop and recreate the target collection as sharded.
mergeColl.drop();
@@ -142,17 +139,14 @@ function runValidMrTests(coll) {
// Cache a sample result document to ensure that re-reducing actually occurs below.
sampleDoc = mergeColl.findOne({_id: {$not: {$in: ["AL", "PA"]}}});
- // TODO SERVER-44150: Enable the following tests once the new implementation is able to
- // support re-reducing against an existing collection.
-
// Output mode "reduce" to an existing sharded collection.
- // assert.commandWorked(coll.mapReduce(map, reduce, {
- // finalize: fin,
- // out: {reduce: mergeColl.getName(), db: mergeColl.getDB().getName(), sharded: true}
- // }));
- // res = mergeColl.find().toArray();
- // assertResultsValid(res, states.length + 2);
- // assert.gte(mergeColl.findOne({_id: sampleDoc._id}).value.avgAge, sampleDoc.value.avgAge);
+ assert.commandWorked(coll.mapReduce(map, reduce, {
+ finalize: fin,
+ out: {reduce: mergeColl.getName(), db: mergeColl.getDB().getName(), sharded: true}
+ }));
+ res = mergeColl.find().toArray();
+ assertResultsValid(res, states.length + 2);
+ assert.gte(mergeColl.findOne({_id: sampleDoc._id}).value.avgAge, sampleDoc.value.avgAge);
}
// Test merge to a collection in the same database as the source collection.
diff --git a/jstests/multiVersion/map_reduce_multiversion_repl_set.js b/jstests/multiVersion/map_reduce_multiversion_repl_set.js
index 3ec026c7bbd..8853b9b5e9e 100644
--- a/jstests/multiVersion/map_reduce_multiversion_repl_set.js
+++ b/jstests/multiVersion/map_reduce_multiversion_repl_set.js
@@ -83,16 +83,13 @@ function runValidMrTests(db, coll) {
res = mergeColl.find().toArray();
assertResultsValid(res);
- // TODO SERVER-44150: Enable the following tests once the new implementation is able to
- // support re-reducing against an existing collection.
-
// Cache a sample result document to ensure that re-reducing actually occurs below.
- // const sampleDoc = mergeColl.findOne();
+ const sampleDoc = mergeColl.findOne();
- // // Output mode "reduce" to an existing collection.
- // assert.commandWorked(
- // coll.mapReduce(map, reduce, {finalize: fin, out: {reduce: mergeColl.getName()}}));
- // assert.gte(mergeColl.findOne({_id: sampleDoc._id}).value.avgAge, sampleDoc.value.avgAge);
+ // Output mode "reduce" to an existing collection.
+ assert.commandWorked(
+ coll.mapReduce(map, reduce, {finalize: fin, out: {reduce: mergeColl.getName()}}));
+ assert.gte(mergeColl.findOne({_id: sampleDoc._id}).value.avgAge, sampleDoc.value.avgAge);
}
//
diff --git a/src/mongo/db/commands/mr_common.cpp b/src/mongo/db/commands/mr_common.cpp
index bad14132cc1..80d5f8e9ed0 100644
--- a/src/mongo/db/commands/mr_common.cpp
+++ b/src/mongo/db/commands/mr_common.cpp
@@ -176,18 +176,31 @@ auto translateOutMerge(boost::intrusive_ptr<ExpressionContext> expCtx, Namespace
auto translateOutReduce(boost::intrusive_ptr<ExpressionContext> expCtx,
NamespaceString targetNss,
- std::string code) {
+ std::string reduceCode,
+ boost::optional<MapReduceJavascriptCode> finalizeCode) {
// Because of communication for sharding, $merge must hold on to a serializable BSON object
// at the moment so we reparse here. Note that the reduce function signature expects 2
// arguments, the first being the key and the second being the array of values to reduce.
auto reduceObj = BSON("args" << BSON_ARRAY("$_id" << BSON_ARRAY("$value"
<< "$$new.value"))
- << "eval" << code);
+ << "eval" << reduceCode);
- auto finalProjectSpec =
+ auto reduceSpec =
BSON(DocumentSourceProject::kStageName
<< BSON("value" << BSON(ExpressionInternalJs::kExpressionName << reduceObj)));
- auto pipelineSpec = boost::make_optional(std::vector<BSONObj>{finalProjectSpec});
+ auto pipelineSpec = boost::make_optional(std::vector<BSONObj>{reduceSpec});
+
+ // Build finalize $project stage if given.
+ if (finalizeCode) {
+ auto finalizeObj = BSON("args" << BSON_ARRAY("$_id"
+ << "$value")
+ << "eval" << finalizeCode->getCode());
+ auto finalizeSpec =
+ BSON(DocumentSourceProject::kStageName
+ << BSON("value" << BSON(ExpressionInternalJs::kExpressionName << finalizeObj)));
+ pipelineSpec->emplace_back(std::move(finalizeSpec));
+ }
+
return DocumentSourceMerge::create(targetNss,
expCtx,
MergeWhenMatchedModeEnum::kPipeline,
@@ -201,14 +214,16 @@ auto translateOutReduce(boost::intrusive_ptr<ExpressionContext> expCtx,
auto translateOut(boost::intrusive_ptr<ExpressionContext> expCtx,
const OutputType outputType,
NamespaceString targetNss,
- std::string reduceCode) {
+ std::string reduceCode,
+ boost::optional<MapReduceJavascriptCode> finalizeCode) {
switch (outputType) {
case OutputType::Replace:
return boost::make_optional(translateOutReplace(expCtx, targetNss));
case OutputType::Merge:
return boost::make_optional(translateOutMerge(expCtx, targetNss));
case OutputType::Reduce:
- return boost::make_optional(translateOutReduce(expCtx, targetNss, reduceCode));
+ return boost::make_optional(translateOutReduce(
+ expCtx, targetNss, std::move(reduceCode), std::move(finalizeCode)));
case OutputType::InMemory:;
}
return boost::optional<boost::intrusive_ptr<mongo::DocumentSource>>{};
@@ -382,7 +397,11 @@ std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR(
parsedMr.getFinalize().map([&](auto&& finalize) {
return translateFinalize(expCtx, parsedMr.getFinalize()->getCode());
}),
- translateOut(expCtx, outType, std::move(outNss), parsedMr.getReduce().getCode())),
+ translateOut(expCtx,
+ outType,
+ std::move(outNss),
+ parsedMr.getReduce().getCode(),
+ parsedMr.getFinalize())),
expCtx));
pipeline->optimizePipeline();
return pipeline;