diff options
author | Nicholas Zolnierz <nicholas.zolnierz@mongodb.com> | 2019-11-19 14:49:37 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-11-19 14:49:37 +0000 |
commit | 0bed872351365fa35107e0d9818d9066642a84a4 (patch) | |
tree | 069823b8122b461f50874b22fad3cab9982a3840 | |
parent | 4722a18440d6645a24f83def678f7cf7a6a290fe (diff) | |
download | mongo-0bed872351365fa35107e0d9818d9066642a84a4.tar.gz |
SERVER-44150 Enable MR tests for output mode 'reduce' with a non-trivial reduce function
8 files changed, 49 insertions, 61 deletions
diff --git a/buildscripts/resmokeconfig/suites/concurrency_simultaneous.yml b/buildscripts/resmokeconfig/suites/concurrency_simultaneous.yml index 6df807aad19..81e56d57edc 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_simultaneous.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_simultaneous.yml @@ -29,10 +29,6 @@ selector: - jstests/concurrency/fsm_workloads/collmod_writeconflict.js - jstests/concurrency/fsm_workloads/reindex_writeconflict.js - # This test is not compatible with the other mapReduce FSM tests since it requires the - # 'internalQueryUseAggMapReduce' knob to be false while the other tests explicitly set it to true. - - jstests/concurrency/fsm_workloads/map_reduce_reduce.js - exclude_with_any_tags: - uses_transactions - requires_replication diff --git a/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication.yml b/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication.yml index 64df87a1722..10307b07be1 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication.yml @@ -44,10 +44,6 @@ selector: - jstests/concurrency/fsm_workloads/collmod_writeconflict.js - jstests/concurrency/fsm_workloads/reindex_writeconflict.js - # This test is not compatible with the other mapReduce FSM tests since it requires the - # 'internalQueryUseAggMapReduce' knob to be false while the other tests explicitly set it to true. - - jstests/concurrency/fsm_workloads/map_reduce_reduce.js - exclude_with_any_tags: - requires_sharding diff --git a/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication_wiredtiger_cursor_sweeps.yml b/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication_wiredtiger_cursor_sweeps.yml index 0f9fd10d2ba..d3a5cc6f0c5 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication_wiredtiger_cursor_sweeps.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication_wiredtiger_cursor_sweeps.yml @@ -44,10 +44,6 @@ selector: - jstests/concurrency/fsm_workloads/collmod_writeconflict.js - jstests/concurrency/fsm_workloads/reindex_writeconflict.js - # This test is not compatible with the other mapReduce FSM tests since it requires the - # 'internalQueryUseAggMapReduce' knob to be false while the other tests explicitly set it to true. - - jstests/concurrency/fsm_workloads/map_reduce_reduce.js - exclude_with_any_tags: - requires_sharding diff --git a/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication_wiredtiger_eviction_debug.yml b/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication_wiredtiger_eviction_debug.yml index 5c47e68c1c0..61dfaea8645 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication_wiredtiger_eviction_debug.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_simultaneous_replication_wiredtiger_eviction_debug.yml @@ -44,10 +44,6 @@ selector: - jstests/concurrency/fsm_workloads/collmod_writeconflict.js - jstests/concurrency/fsm_workloads/reindex_writeconflict.js - # This test is not compatible with the other mapReduce FSM tests since it requires the - # 'internalQueryUseAggMapReduce' knob to be false while the other tests explicitly set it to true. - - jstests/concurrency/fsm_workloads/map_reduce_reduce.js - exclude_with_any_tags: - requires_sharding diff --git a/jstests/concurrency/fsm_workloads/map_reduce_reduce.js b/jstests/concurrency/fsm_workloads/map_reduce_reduce.js index 348fe2259d1..173bca4d5d3 100644 --- a/jstests/concurrency/fsm_workloads/map_reduce_reduce.js +++ b/jstests/concurrency/fsm_workloads/map_reduce_reduce.js @@ -31,12 +31,6 @@ var $config = extendWorkload($config, function($config, $super) { } $config.states.init = function init(db, collName) { - // TODO SERVER-44150: Cannot run MR with output 'reduce' in agg since the 'whenMatched' - // pipeline will always run which can cause unexpected failures in the user-specified reduce - // function. - assert.commandWorked( - db.adminCommand({setParameter: 1, internalQueryUseAggMapReduce: false})); - $super.states.init.apply(this, arguments); this.outCollName = uniqueCollectionName(prefix, this.tid); diff --git a/jstests/multiVersion/map_reduce_multiversion_cluster.js b/jstests/multiVersion/map_reduce_multiversion_cluster.js index 79524c67067..e2020744610 100644 --- a/jstests/multiVersion/map_reduce_multiversion_cluster.js +++ b/jstests/multiVersion/map_reduce_multiversion_cluster.js @@ -68,10 +68,11 @@ function runValidMrTests(coll) { } function assertResultsValid(results, expectedCount) { - assert.gt(results.length, 0); - assert.lte(results.length, expectedCount); + assert.gt(results.length, 0, tojson(results)); + assert.lte(results.length, expectedCount, tojson(results)); results.map(resultDoc => assert.eq(resultDoc.value.avgAge, - resultDoc.value.total / resultDoc.value.count)); + resultDoc.value.total / resultDoc.value.count, + tojson(results))); } // Inline output. @@ -102,18 +103,14 @@ function runValidMrTests(coll) { // Cache a sample result document to ensure that re-reducing actually occurs below. let sampleDoc = mergeColl.findOne(); - // TODO SERVER-44150: Enable the following tests once the new implementation is able to - // support re-reducing against an existing collection. - // Output mode "reduce" to an existing unsharded collection. - // assert.commandWorked(coll.mapReduce( - // map, - // reduce, - // {finalize: fin, out: {reduce: mergeColl.getName(), db: - // mergeColl.getDB().getName()}})); - // res = mergeColl.find().toArray(); - // assertResultsValid(res, states.length); - // assert.gte(mergeColl.findOne({_id: sampleDoc._id}).value.avgAge, sampleDoc.value.avgAge); + assert.commandWorked(coll.mapReduce( + map, + reduce, + {finalize: fin, out: {reduce: mergeColl.getName(), db: mergeColl.getDB().getName()}})); + res = mergeColl.find().toArray(); + assertResultsValid(res, states.length); + assert.gte(mergeColl.findOne({_id: sampleDoc._id}).value.avgAge, sampleDoc.value.avgAge); // Drop and recreate the target collection as sharded. mergeColl.drop(); @@ -142,17 +139,14 @@ function runValidMrTests(coll) { // Cache a sample result document to ensure that re-reducing actually occurs below. sampleDoc = mergeColl.findOne({_id: {$not: {$in: ["AL", "PA"]}}}); - // TODO SERVER-44150: Enable the following tests once the new implementation is able to - // support re-reducing against an existing collection. - // Output mode "reduce" to an existing sharded collection. - // assert.commandWorked(coll.mapReduce(map, reduce, { - // finalize: fin, - // out: {reduce: mergeColl.getName(), db: mergeColl.getDB().getName(), sharded: true} - // })); - // res = mergeColl.find().toArray(); - // assertResultsValid(res, states.length + 2); - // assert.gte(mergeColl.findOne({_id: sampleDoc._id}).value.avgAge, sampleDoc.value.avgAge); + assert.commandWorked(coll.mapReduce(map, reduce, { + finalize: fin, + out: {reduce: mergeColl.getName(), db: mergeColl.getDB().getName(), sharded: true} + })); + res = mergeColl.find().toArray(); + assertResultsValid(res, states.length + 2); + assert.gte(mergeColl.findOne({_id: sampleDoc._id}).value.avgAge, sampleDoc.value.avgAge); } // Test merge to a collection in the same database as the source collection. diff --git a/jstests/multiVersion/map_reduce_multiversion_repl_set.js b/jstests/multiVersion/map_reduce_multiversion_repl_set.js index 3ec026c7bbd..8853b9b5e9e 100644 --- a/jstests/multiVersion/map_reduce_multiversion_repl_set.js +++ b/jstests/multiVersion/map_reduce_multiversion_repl_set.js @@ -83,16 +83,13 @@ function runValidMrTests(db, coll) { res = mergeColl.find().toArray(); assertResultsValid(res); - // TODO SERVER-44150: Enable the following tests once the new implementation is able to - // support re-reducing against an existing collection. - // Cache a sample result document to ensure that re-reducing actually occurs below. - // const sampleDoc = mergeColl.findOne(); + const sampleDoc = mergeColl.findOne(); - // // Output mode "reduce" to an existing collection. - // assert.commandWorked( - // coll.mapReduce(map, reduce, {finalize: fin, out: {reduce: mergeColl.getName()}})); - // assert.gte(mergeColl.findOne({_id: sampleDoc._id}).value.avgAge, sampleDoc.value.avgAge); + // Output mode "reduce" to an existing collection. + assert.commandWorked( + coll.mapReduce(map, reduce, {finalize: fin, out: {reduce: mergeColl.getName()}})); + assert.gte(mergeColl.findOne({_id: sampleDoc._id}).value.avgAge, sampleDoc.value.avgAge); } // diff --git a/src/mongo/db/commands/mr_common.cpp b/src/mongo/db/commands/mr_common.cpp index bad14132cc1..80d5f8e9ed0 100644 --- a/src/mongo/db/commands/mr_common.cpp +++ b/src/mongo/db/commands/mr_common.cpp @@ -176,18 +176,31 @@ auto translateOutMerge(boost::intrusive_ptr<ExpressionContext> expCtx, Namespace auto translateOutReduce(boost::intrusive_ptr<ExpressionContext> expCtx, NamespaceString targetNss, - std::string code) { + std::string reduceCode, + boost::optional<MapReduceJavascriptCode> finalizeCode) { // Because of communication for sharding, $merge must hold on to a serializable BSON object // at the moment so we reparse here. Note that the reduce function signature expects 2 // arguments, the first being the key and the second being the array of values to reduce. auto reduceObj = BSON("args" << BSON_ARRAY("$_id" << BSON_ARRAY("$value" << "$$new.value")) - << "eval" << code); + << "eval" << reduceCode); - auto finalProjectSpec = + auto reduceSpec = BSON(DocumentSourceProject::kStageName << BSON("value" << BSON(ExpressionInternalJs::kExpressionName << reduceObj))); - auto pipelineSpec = boost::make_optional(std::vector<BSONObj>{finalProjectSpec}); + auto pipelineSpec = boost::make_optional(std::vector<BSONObj>{reduceSpec}); + + // Build finalize $project stage if given. + if (finalizeCode) { + auto finalizeObj = BSON("args" << BSON_ARRAY("$_id" + << "$value") + << "eval" << finalizeCode->getCode()); + auto finalizeSpec = + BSON(DocumentSourceProject::kStageName + << BSON("value" << BSON(ExpressionInternalJs::kExpressionName << finalizeObj))); + pipelineSpec->emplace_back(std::move(finalizeSpec)); + } + return DocumentSourceMerge::create(targetNss, expCtx, MergeWhenMatchedModeEnum::kPipeline, @@ -201,14 +214,16 @@ auto translateOutReduce(boost::intrusive_ptr<ExpressionContext> expCtx, auto translateOut(boost::intrusive_ptr<ExpressionContext> expCtx, const OutputType outputType, NamespaceString targetNss, - std::string reduceCode) { + std::string reduceCode, + boost::optional<MapReduceJavascriptCode> finalizeCode) { switch (outputType) { case OutputType::Replace: return boost::make_optional(translateOutReplace(expCtx, targetNss)); case OutputType::Merge: return boost::make_optional(translateOutMerge(expCtx, targetNss)); case OutputType::Reduce: - return boost::make_optional(translateOutReduce(expCtx, targetNss, reduceCode)); + return boost::make_optional(translateOutReduce( + expCtx, targetNss, std::move(reduceCode), std::move(finalizeCode))); case OutputType::InMemory:; } return boost::optional<boost::intrusive_ptr<mongo::DocumentSource>>{}; @@ -382,7 +397,11 @@ std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR( parsedMr.getFinalize().map([&](auto&& finalize) { return translateFinalize(expCtx, parsedMr.getFinalize()->getCode()); }), - translateOut(expCtx, outType, std::move(outNss), parsedMr.getReduce().getCode())), + translateOut(expCtx, + outType, + std::move(outNss), + parsedMr.getReduce().getCode(), + parsedMr.getFinalize())), expCtx)); pipeline->optimizePipeline(); return pipeline; |