summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavis Haupt <davis.haupt@mongodb.com>2022-12-19 21:22:35 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-12-21 01:17:48 +0000
commit70bbc40c52b10395d0c54d63e4f7c35abadd8756 (patch)
tree2b19dc0d583f6b04af13042d5185f7f4817fef64
parent21b2615135067cdb67409f9cae670b315425b0df (diff)
downloadmongo-70bbc40c52b10395d0c54d63e4f7c35abadd8756.tar.gz
SERVER-72222 fix mapReduce single reduce optimization in sharded clusters
-rw-r--r--jstests/sharding/mr_single_reduce_split.js55
-rw-r--r--src/mongo/db/pipeline/accumulator_js_reduce.cpp83
2 files changed, 97 insertions, 41 deletions
diff --git a/jstests/sharding/mr_single_reduce_split.js b/jstests/sharding/mr_single_reduce_split.js
new file mode 100644
index 00000000000..1a6d20f0e1f
--- /dev/null
+++ b/jstests/sharding/mr_single_reduce_split.js
@@ -0,0 +1,55 @@
+/**
+ * This jstest verifies that the mrEnableSingleReduceOptimization flag works properly in a sharded
+ * cluster when there are documents on multiple chunks that need to be merged.
+ * @tags: [
+ * backport_required_multiversion,
+ * ]
+ */
+(function() {
+const st = new ShardingTest({
+ shards: 2,
+ mongos: 1,
+ other: {
+ mongosOptions: {setParameter: {mrEnableSingleReduceOptimization: true}},
+ shardOptions: {setParameter: {mrEnableSingleReduceOptimization: true}},
+ }
+});
+
+const mongosDB = st.s0.getDB(jsTestName());
+const mongosColl = mongosDB[jsTestName()];
+
+assert.commandWorked(mongosDB.dropDatabase());
+assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName);
+
+assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
+
+// Split the collection into 2 chunks: [minKey, 0) and [0, maxKey].
+assert.commandWorked(mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
+
+// Move the [0, MaxKey) chunk to the second shard.
+assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: mongosColl.getFullName(), find: {_id: 50}, to: st.shard1.shardName}));
+
+assert.commandWorked(mongosColl.insert({_id: -1}));
+const map = function() {
+ emit(0, {val: "mapped value"});
+};
+
+const reduce = function(key, values) {
+ return {val: "reduced value"};
+};
+
+let res = assert.commandWorked(mongosDB.runCommand(
+ {mapReduce: mongosColl.getName(), map: map, reduce: reduce, out: {inline: 1}}));
+assert.eq(res.results[0], {_id: 0, value: {val: "mapped value"}});
+
+assert.commandWorked(mongosColl.insert({_id: 1}));
+
+res = assert.commandWorked(mongosDB.runCommand(
+ {mapReduce: mongosColl.getName(), map: map, reduce: reduce, out: {inline: 1}}));
+assert.eq(res.results[0], {_id: 0, value: {val: "reduced value"}});
+
+st.stop();
+}());
diff --git a/src/mongo/db/pipeline/accumulator_js_reduce.cpp b/src/mongo/db/pipeline/accumulator_js_reduce.cpp
index 50f0aa43c73..78058d5913d 100644
--- a/src/mongo/db/pipeline/accumulator_js_reduce.cpp
+++ b/src/mongo/db/pipeline/accumulator_js_reduce.cpp
@@ -120,52 +120,53 @@ void AccumulatorInternalJsReduce::processInternal(const Value& input, bool mergi
Value AccumulatorInternalJsReduce::getValue(bool toBeMerged) {
if (_values.size() < 1) {
return Value{};
- } else if (mrSingleReduceOptimizationEnabled && _values.size() == 1) {
+ }
+ Value result;
+ if (mrSingleReduceOptimizationEnabled && _values.size() == 1) {
// This optimization existed in the old Pre-4.4 MapReduce implementation. If the flag is
// set, then we should replicate the optimization. See SERVER-68766 for more details.
- return _values[0];
- }
-
- const auto keySize = _key.getApproximateSize();
+ result = std::move(_values[0]);
+ } else {
+ const auto keySize = _key.getApproximateSize();
+
+ // Keep reducing until we have exactly one value.
+ while (true) {
+ BSONArrayBuilder bsonValues;
+ size_t numLeft = _values.size();
+ for (; numLeft > 0; numLeft--) {
+ Value val = _values[numLeft - 1];
+
+ // Do not insert if doing so would exceed the the maximum allowed BSONObj size.
+ if (bsonValues.len() + keySize + val.getApproximateSize() > BSONObjMaxUserSize) {
+ // If we have reached the threshold for maximum allowed BSONObj size and only
+ // have a single value then no progress will be made on reduce. We must fail
+ // when this scenario is encountered.
+ size_t numNextReduce = _values.size() - numLeft;
+ uassert(31392, "Value too large to reduce", numNextReduce > 1);
+ break;
+ }
+ bsonValues << val;
+ }
- Value result;
- // Keep reducing until we have exactly one value.
- while (true) {
- BSONArrayBuilder bsonValues;
- size_t numLeft = _values.size();
- for (; numLeft > 0; numLeft--) {
- Value val = _values[numLeft - 1];
-
- // Do not insert if doing so would exceed the the maximum allowed BSONObj size.
- if (bsonValues.len() + keySize + val.getApproximateSize() > BSONObjMaxUserSize) {
- // If we have reached the threshold for maximum allowed BSONObj size and only have a
- // single value then no progress will be made on reduce. We must fail when this
- // scenario is encountered.
- size_t numNextReduce = _values.size() - numLeft;
- uassert(31392, "Value too large to reduce", numNextReduce > 1);
+ auto expCtx = getExpressionContext();
+ auto reduceFunc = makeJsFunc(expCtx, _funcSource);
+
+ // Function signature: reduce(key, values).
+ BSONObj params = BSON_ARRAY(_key << bsonValues.arr());
+ // For reduce, the key and values are both passed as 'params' so there's no need to set
+ // 'this'.
+ BSONObj thisObj;
+ Value reduceResult =
+ expCtx->getJsExecWithScope()->callFunction(reduceFunc, params, thisObj);
+ if (numLeft == 0) {
+ result = reduceResult;
break;
+ } else {
+ // Remove all values which have been reduced.
+ _values.resize(numLeft);
+ // Include most recent result in the set of values to be reduced.
+ _values.push_back(reduceResult);
}
- bsonValues << val;
- }
-
- auto expCtx = getExpressionContext();
- auto reduceFunc = makeJsFunc(expCtx, _funcSource);
-
- // Function signature: reduce(key, values).
- BSONObj params = BSON_ARRAY(_key << bsonValues.arr());
- // For reduce, the key and values are both passed as 'params' so there's no need to set
- // 'this'.
- BSONObj thisObj;
- Value reduceResult =
- expCtx->getJsExecWithScope()->callFunction(reduceFunc, params, thisObj);
- if (numLeft == 0) {
- result = reduceResult;
- break;
- } else {
- // Remove all values which have been reduced.
- _values.resize(numLeft);
- // Include most recent result in the set of values to be reduced.
- _values.push_back(reduceResult);
}
}