summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNicholas Zolnierz <nicholas.zolnierz@mongodb.com>2020-02-13 15:09:48 +0000
committerevergreen <evergreen@mongodb.com>2020-02-13 15:09:48 +0000
commite87edace22b258809b6f209a3cdc6c123149a16b (patch)
tree4a189176dce4ffc5c0a2718325e8b5c030e464a6
parente289b78c61033674f6440d9ddc402b50903717ac (diff)
downloadmongo-e87edace22b258809b6f209a3cdc6c123149a16b.tar.gz
SERVER-46087 Propagate allowDiskUse to shards for sharded $unionWith
-rw-r--r--jstests/aggregation/sources/unionWith/unionWith.js40
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp7
2 files changed, 43 insertions, 4 deletions
diff --git a/jstests/aggregation/sources/unionWith/unionWith.js b/jstests/aggregation/sources/unionWith/unionWith.js
index 28bb66a9926..f26f93a91d5 100644
--- a/jstests/aggregation/sources/unionWith/unionWith.js
+++ b/jstests/aggregation/sources/unionWith/unionWith.js
@@ -5,6 +5,7 @@
(function() {
"use strict";
load("jstests/aggregation/extras/utils.js"); // arrayEq
+load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
const testDB = db.getSiblingDB(jsTestName());
const collA = testDB.A;
@@ -24,6 +25,7 @@ for (let i = 0; i < docsPerCollection; i++) {
}
function checkResults(resObj, expectedResult) {
+ assert.commandWorked(resObj);
assert(arrayEq(resObj.cursor.firstBatch, expectedResult),
"Expected:\n" + tojson(expectedResult) + "Got:\n" + tojson(resObj.cursor.firstBatch));
}
@@ -124,7 +126,13 @@ checkResults(testDB.runCommand({
}),
resSet);
// Test with $group. Set MaxMemory low to force spillToDisk.
-testDB.adminCommand({setParameter: 1, "internalDocumentSourceGroupMaxMemoryBytes": 1});
+FixtureHelpers.runCommandOnEachPrimary({
+ db: testDB.getSiblingDB("admin"),
+ cmdObj: {
+ setParameter: 1,
+ "internalDocumentSourceGroupMaxMemoryBytes": 1,
+ }
+});
resSet =
[{_id: 0, sum: 0}, {_id: 1, sum: 3}, {_id: 2, sum: 6}, {_id: 3, sum: 9}, {_id: 4, sum: 12}];
checkResults(testDB.runCommand({
@@ -176,9 +184,35 @@ checkResults(testDB.runCommand({
cursor: {}
}),
resSet);
+
+// Test that a $group within a $unionWith sub-pipeline correctly fails if it needs to spill but
+// 'allowDiskUse' is false.
+assert.commandFailedWithCode(testDB.runCommand({
+ aggregate: collA.getName(),
+ pipeline: [
+ {
+ $unionWith: {
+ coll: collB.getName(),
+ pipeline: [
+ {"$group": {_id: "$groupKey", val: {$sum: "$val"}}},
+ {"$addFields": {groupKey: 1}}
+ ]
+ }
+ },
+ ],
+ allowDiskUse: false,
+ cursor: {}
+}),
+ 16945);
+
// Reset to default value.
-testDB.adminCommand(
- {setParameter: 1, "internalDocumentSourceGroupMaxMemoryBytes": 100 * 1024 * 1024});
+FixtureHelpers.runCommandOnEachPrimary({
+ db: testDB.getSiblingDB("admin"),
+ cmdObj: {
+ setParameter: 1,
+ "internalDocumentSourceGroupMaxMemoryBytes": 100 * 1024 * 1024,
+ }
+});
// Test with $limit and sort in a sub-pipeline.
const setBResult = collB.find().sort({b: 1}).limit(2).toArray();
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index fabc49ceb4e..d6d3569e5a8 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -128,7 +128,7 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
cmdForShards[AggregationRequest::kRuntimeConstants] = Value(constants.get().toBSON());
}
- cmdForShards[AggregationRequest::kFromMongosName] = Value(true);
+ cmdForShards[AggregationRequest::kFromMongosName] = Value(expCtx->inMongos);
// If this is a request for an aggregation explain, then we must wrap the aggregate inside an
// explain command.
if (explainVerbosity) {
@@ -955,6 +955,11 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
}();
AggregationRequest aggRequest(expCtx->ns, rawStages);
+
+ // The default value for 'allowDiskUse' in the AggregationRequest may not match what was set
+ // on the originating command, so copy it from the ExpressionContext.
+ aggRequest.setAllowDiskUse(expCtx->allowDiskUse);
+
LiteParsedPipeline liteParsedPipeline(aggRequest);
auto hasChangeStream = liteParsedPipeline.hasChangeStream();
auto shardDispatchResults = dispatchShardPipeline(