From 897c960e0faf9ada0bc247567e57cb8b876324ee Mon Sep 17 00:00:00 2001 From: Davis Haupt Date: Mon, 19 Dec 2022 21:22:35 +0000 Subject: SERVER-72222 fix mapReduce single reduce optimization in sharded clusters --- jstests/sharding/mr_single_reduce_split.js | 55 ++++++++++++++++ src/mongo/db/pipeline/accumulator_js_reduce.cpp | 83 +++++++++++++------------ 2 files changed, 97 insertions(+), 41 deletions(-) create mode 100644 jstests/sharding/mr_single_reduce_split.js diff --git a/jstests/sharding/mr_single_reduce_split.js b/jstests/sharding/mr_single_reduce_split.js new file mode 100644 index 00000000000..1a6d20f0e1f --- /dev/null +++ b/jstests/sharding/mr_single_reduce_split.js @@ -0,0 +1,55 @@ +/** + * This jstest verifies that the mrEnableSingleReduceOptimization flag works properly in a sharded + * cluster when there are documents on multiple chunks that need to be merged. + * @tags: [ + * backport_required_multiversion, + * ] + */ +(function() { +const st = new ShardingTest({ + shards: 2, + mongos: 1, + other: { + mongosOptions: {setParameter: {mrEnableSingleReduceOptimization: true}}, + shardOptions: {setParameter: {mrEnableSingleReduceOptimization: true}}, + } +}); + +const mongosDB = st.s0.getDB(jsTestName()); +const mongosColl = mongosDB[jsTestName()]; + +assert.commandWorked(mongosDB.dropDatabase()); +assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); +st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName); + +assert.commandWorked( + mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); + +// Split the collection into 2 chunks: [minKey, 0) and [0, maxKey]. +assert.commandWorked(mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}})); + +// Move the [0, MaxKey) chunk to the second shard. +assert.commandWorked(mongosDB.adminCommand( + {moveChunk: mongosColl.getFullName(), find: {_id: 50}, to: st.shard1.shardName})); + +assert.commandWorked(mongosColl.insert({_id: -1})); +const map = function() { + emit(0, {val: "mapped value"}); +}; + +const reduce = function(key, values) { + return {val: "reduced value"}; +}; + +let res = assert.commandWorked(mongosDB.runCommand( + {mapReduce: mongosColl.getName(), map: map, reduce: reduce, out: {inline: 1}})); +assert.eq(res.results[0], {_id: 0, value: {val: "mapped value"}}); + +assert.commandWorked(mongosColl.insert({_id: 1})); + +res = assert.commandWorked(mongosDB.runCommand( + {mapReduce: mongosColl.getName(), map: map, reduce: reduce, out: {inline: 1}})); +assert.eq(res.results[0], {_id: 0, value: {val: "reduced value"}}); + +st.stop(); +}()); diff --git a/src/mongo/db/pipeline/accumulator_js_reduce.cpp b/src/mongo/db/pipeline/accumulator_js_reduce.cpp index acd77254b9d..dff07b0e815 100644 --- a/src/mongo/db/pipeline/accumulator_js_reduce.cpp +++ b/src/mongo/db/pipeline/accumulator_js_reduce.cpp @@ -117,52 +117,53 @@ void AccumulatorInternalJsReduce::processInternal(const Value& input, bool mergi Value AccumulatorInternalJsReduce::getValue(bool toBeMerged) { if (_values.size() < 1) { return Value{}; - } else if (mrSingleReduceOptimizationEnabled && _values.size() == 1) { + } + Value result; + if (mrSingleReduceOptimizationEnabled && _values.size() == 1) { // This optimization existed in the old Pre-4.4 MapReduce implementation. If the flag is // set, then we should replicate the optimization. See SERVER-68766 for more details. - return _values[0]; - } - - const auto keySize = _key.getApproximateSize(); + result = std::move(_values[0]); + } else { + const auto keySize = _key.getApproximateSize(); + + // Keep reducing until we have exactly one value. + while (true) { + BSONArrayBuilder bsonValues; + size_t numLeft = _values.size(); + for (; numLeft > 0; numLeft--) { + Value val = _values[numLeft - 1]; + + // Do not insert if doing so would exceed the the maximum allowed BSONObj size. + if (bsonValues.len() + keySize + val.getApproximateSize() > BSONObjMaxUserSize) { + // If we have reached the threshold for maximum allowed BSONObj size and only + // have a single value then no progress will be made on reduce. We must fail + // when this scenario is encountered. + size_t numNextReduce = _values.size() - numLeft; + uassert(31392, "Value too large to reduce", numNextReduce > 1); + break; + } + bsonValues << val; + } - Value result; - // Keep reducing until we have exactly one value. - while (true) { - BSONArrayBuilder bsonValues; - size_t numLeft = _values.size(); - for (; numLeft > 0; numLeft--) { - Value val = _values[numLeft - 1]; - - // Do not insert if doing so would exceed the the maximum allowed BSONObj size. - if (bsonValues.len() + keySize + val.getApproximateSize() > BSONObjMaxUserSize) { - // If we have reached the threshold for maximum allowed BSONObj size and only have a - // single value then no progress will be made on reduce. We must fail when this - // scenario is encountered. - size_t numNextReduce = _values.size() - numLeft; - uassert(31392, "Value too large to reduce", numNextReduce > 1); + auto expCtx = getExpressionContext(); + auto reduceFunc = makeJsFunc(expCtx, _funcSource); + + // Function signature: reduce(key, values). + BSONObj params = BSON_ARRAY(_key << bsonValues.arr()); + // For reduce, the key and values are both passed as 'params' so there's no need to set + // 'this'. + BSONObj thisObj; + Value reduceResult = + expCtx->getJsExecWithScope()->callFunction(reduceFunc, params, thisObj); + if (numLeft == 0) { + result = reduceResult; break; + } else { + // Remove all values which have been reduced. + _values.resize(numLeft); + // Include most recent result in the set of values to be reduced. + _values.push_back(reduceResult); } - bsonValues << val; - } - - auto expCtx = getExpressionContext(); - auto reduceFunc = makeJsFunc(expCtx, _funcSource); - - // Function signature: reduce(key, values). - BSONObj params = BSON_ARRAY(_key << bsonValues.arr()); - // For reduce, the key and values are both passed as 'params' so there's no need to set - // 'this'. - BSONObj thisObj; - Value reduceResult = - expCtx->getJsExecWithScope()->callFunction(reduceFunc, params, thisObj); - if (numLeft == 0) { - result = reduceResult; - break; - } else { - // Remove all values which have been reduced. - _values.resize(numLeft); - // Include most recent result in the set of values to be reduced. - _values.push_back(reduceResult); } } -- cgit v1.2.1