diff options
author | Nicholas Zolnierz <nicholas.zolnierz@mongodb.com> | 2020-02-13 15:09:48 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2020-02-13 15:09:48 +0000 |
commit | e87edace22b258809b6f209a3cdc6c123149a16b (patch) | |
tree | 4a189176dce4ffc5c0a2718325e8b5c030e464a6 | |
parent | e289b78c61033674f6440d9ddc402b50903717ac (diff) | |
download | mongo-e87edace22b258809b6f209a3cdc6c123149a16b.tar.gz |
SERVER-46087 Propagate allowDiskUse to shards for sharded $unionWith
-rw-r--r-- | jstests/aggregation/sources/unionWith/unionWith.js | 40 | ||||
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.cpp | 7 |
2 files changed, 43 insertions, 4 deletions
diff --git a/jstests/aggregation/sources/unionWith/unionWith.js b/jstests/aggregation/sources/unionWith/unionWith.js index 28bb66a9926..f26f93a91d5 100644 --- a/jstests/aggregation/sources/unionWith/unionWith.js +++ b/jstests/aggregation/sources/unionWith/unionWith.js @@ -5,6 +5,7 @@ (function() { "use strict"; load("jstests/aggregation/extras/utils.js"); // arrayEq +load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. const testDB = db.getSiblingDB(jsTestName()); const collA = testDB.A; @@ -24,6 +25,7 @@ for (let i = 0; i < docsPerCollection; i++) { } function checkResults(resObj, expectedResult) { + assert.commandWorked(resObj); assert(arrayEq(resObj.cursor.firstBatch, expectedResult), "Expected:\n" + tojson(expectedResult) + "Got:\n" + tojson(resObj.cursor.firstBatch)); } @@ -124,7 +126,13 @@ checkResults(testDB.runCommand({ }), resSet); // Test with $group. Set MaxMemory low to force spillToDisk. -testDB.adminCommand({setParameter: 1, "internalDocumentSourceGroupMaxMemoryBytes": 1}); +FixtureHelpers.runCommandOnEachPrimary({ + db: testDB.getSiblingDB("admin"), + cmdObj: { + setParameter: 1, + "internalDocumentSourceGroupMaxMemoryBytes": 1, + } +}); resSet = [{_id: 0, sum: 0}, {_id: 1, sum: 3}, {_id: 2, sum: 6}, {_id: 3, sum: 9}, {_id: 4, sum: 12}]; checkResults(testDB.runCommand({ @@ -176,9 +184,35 @@ checkResults(testDB.runCommand({ cursor: {} }), resSet); + +// Test that a $group within a $unionWith sub-pipeline correctly fails if it needs to spill but +// 'allowDiskUse' is false. +assert.commandFailedWithCode(testDB.runCommand({ + aggregate: collA.getName(), + pipeline: [ + { + $unionWith: { + coll: collB.getName(), + pipeline: [ + {"$group": {_id: "$groupKey", val: {$sum: "$val"}}}, + {"$addFields": {groupKey: 1}} + ] + } + }, + ], + allowDiskUse: false, + cursor: {} +}), + 16945); + // Reset to default value. -testDB.adminCommand( - {setParameter: 1, "internalDocumentSourceGroupMaxMemoryBytes": 100 * 1024 * 1024}); +FixtureHelpers.runCommandOnEachPrimary({ + db: testDB.getSiblingDB("admin"), + cmdObj: { + setParameter: 1, + "internalDocumentSourceGroupMaxMemoryBytes": 100 * 1024 * 1024, + } +}); // Test with $limit and sort in a sub-pipeline. const setBResult = collB.find().sort({b: 1}).limit(2).toArray(); diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index fabc49ceb4e..d6d3569e5a8 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -128,7 +128,7 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards, cmdForShards[AggregationRequest::kRuntimeConstants] = Value(constants.get().toBSON()); } - cmdForShards[AggregationRequest::kFromMongosName] = Value(true); + cmdForShards[AggregationRequest::kFromMongosName] = Value(expCtx->inMongos); // If this is a request for an aggregation explain, then we must wrap the aggregate inside an // explain command. if (explainVerbosity) { @@ -955,6 +955,11 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors( }(); AggregationRequest aggRequest(expCtx->ns, rawStages); + + // The default value for 'allowDiskUse' in the AggregationRequest may not match what was set + // on the originating command, so copy it from the ExpressionContext. + aggRequest.setAllowDiskUse(expCtx->allowDiskUse); + LiteParsedPipeline liteParsedPipeline(aggRequest); auto hasChangeStream = liteParsedPipeline.hasChangeStream(); auto shardDispatchResults = dispatchShardPipeline( |