diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2017-09-26 11:46:58 -0400 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2017-09-27 22:12:14 -0400 |
commit | 55637833c707998f685f997d43624c52cde99b45 (patch) | |
tree | bbc00a719c14983e8984d1dbe8dbddd074e023a7 /jstests | |
parent | 22c34669f744ea245c14a64c556d61f8932ceda9 (diff) | |
download | mongo-55637833c707998f685f997d43624c52cde99b45.tar.gz |
SERVER-30871 Permit blocking aggregation stages to run on mongoS if allowDiskUse is false
Diffstat (limited to 'jstests')
-rw-r--r-- | jstests/aggregation/mongos_merge.js | 335 |
1 files changed, 227 insertions, 108 deletions
diff --git a/jstests/aggregation/mongos_merge.js b/jstests/aggregation/mongos_merge.js index fd49ef4c7e5..b8763a45545 100644 --- a/jstests/aggregation/mongos_merge.js +++ b/jstests/aggregation/mongos_merge.js @@ -1,19 +1,10 @@ /** * Tests that split aggregations whose merge pipelines are eligible to run on mongoS do so, and - * produce the expected results. + * produce the expected results. Stages which are eligible to merge on mongoS include: * - * Splittable stages whose merge components are eligible to run on mongoS include: - * - $sort (iff merging pre-sorted streams) - * - $skip - * - $limit - * - $sample - * - * Non-splittable stages such as those listed below are eligible to run in a mongoS merge pipeline: - * - $match - * - $project - * - $addFields - * - $unwind - * - $redact + * - Splitpoints whose merge components are non-blocking, e.g. $skip, $limit, $sort, $sample. + * - Non-splittable streaming stages, e.g. $match, $project, $unwind. + * - Blocking stages in cases where 'allowDiskUse' is false, e.g. $group, $bucketAuto. * * Because wrapping these aggregations in a $facet stage will affect how the pipeline can be merged, * and will therefore invalidate the results of the test cases below, we tag this test to prevent it @@ -23,12 +14,14 @@ */ (function() { - load("jstests/libs/profiler.js"); // For profilerHas*OrThrow helper functions. + load("jstests/libs/profiler.js"); // For profilerHas*OrThrow helper functions. + load('jstests/libs/geo_near_random.js'); // For GeoNearRandomTest. const st = new ShardingTest({shards: 2, mongos: 1, config: 1}); const mongosDB = st.s0.getDB(jsTestName()); const mongosColl = mongosDB[jsTestName()]; + const unshardedColl = mongosDB[jsTestName() + "_unsharded"]; const shard0DB = primaryShardDB = st.shard0.getDB(jsTestName()); const shard1DB = st.shard1.getDB(jsTestName()); @@ -52,6 +45,9 @@ assert.commandWorked( mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); + // We will need to test $geoNear on this collection, so create a 2dsphere index. + assert.commandWorked(mongosColl.createIndex({geo: "2dsphere"})); + // Split the collection into 4 chunks: [MinKey, -100), [-100, 0), [0, 100), [100, MaxKey). assert.commandWorked( mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: -100}})); @@ -66,29 +62,47 @@ assert.commandWorked(mongosDB.adminCommand( {moveChunk: mongosColl.getFullName(), find: {_id: 150}, to: "shard0001"})); + // Create a random geo co-ord generator for testing. + var georng = new GeoNearRandomTest(mongosColl); + // Write 400 documents across the 4 chunks. for (let i = -200; i < 200; i++) { - assert.writeOK(mongosColl.insert({_id: i, a: [i], b: {redactThisDoc: true}, c: true})); + assert.writeOK(mongosColl.insert( + {_id: i, a: [i], b: {redactThisDoc: true}, c: true, geo: georng.mkPt()})); + assert.writeOK(unshardedColl.insert({_id: i, x: i})); } + let testNameHistory = new Set(); + /** * Runs the aggregation specified by 'pipeline', verifying that: * - The number of documents returned by the aggregation matches 'expectedCount'. * - The merge was performed on a mongoS if 'mergeType' is 'mongos', and on a shard otherwise. */ - function assertMergeBehaviour({testName, pipeline, mergeType, batchSize, expectedCount}) { - // Verify that the 'mergeOnMongoS' explain() output for this pipeline matches our - // expectation. + function assertMergeBehaviour( + {testName, pipeline, mergeType, batchSize, allowDiskUse, expectedCount}) { + // Ensure that this test has a unique name. + assert(!testNameHistory.has(testName)); + testNameHistory.add(testName); + + // Create the aggregation options from the given arguments. + const opts = { + comment: testName, + cursor: (batchSize ? {batchSize: batchSize} : {}), + }; + + if (allowDiskUse !== undefined) { + opts.allowDiskUse = allowDiskUse; + } + + // Verify that the explain() output's 'mergeType' field matches our expectation. assert.eq( - assert.commandWorked(mongosColl.explain().aggregate(pipeline, {comment: testName})) + assert.commandWorked(mongosColl.explain().aggregate(pipeline, Object.extend({}, opts))) .mergeType, mergeType); - assert.eq( - mongosColl - .aggregate(pipeline, {comment: testName, cursor: {batchSize: (batchSize || 101)}}) - .itcount(), - expectedCount); + // Verify that the aggregation returns the expected number of results. + assert.eq(mongosColl.aggregate(pipeline, opts).itcount(), expectedCount); // Verify that a $mergeCursors aggregation ran on the primary shard if 'mergeType' is not // 'mongos', and that no such aggregation ran otherwise. @@ -107,12 +121,13 @@ * Throws an assertion if the aggregation specified by 'pipeline' does not produce * 'expectedCount' results, or if the merge phase is not performed on the mongoS. */ - function assertMergeOnMongoS({testName, pipeline, batchSize, expectedCount}) { + function assertMergeOnMongoS({testName, pipeline, batchSize, allowDiskUse, expectedCount}) { assertMergeBehaviour({ testName: testName, pipeline: pipeline, mergeType: "mongos", - batchSize: (batchSize || 101), + batchSize: (batchSize || 10), + allowDiskUse: allowDiskUse, expectedCount: expectedCount }); } @@ -121,94 +136,214 @@ * Throws an assertion if the aggregation specified by 'pipeline' does not produce * 'expectedCount' results, or if the merge phase was not performed on a shard. */ - function assertMergeOnMongoD({testName, pipeline, mergeType, batchSize, expectedCount}) { + function assertMergeOnMongoD( + {testName, pipeline, mergeType, batchSize, allowDiskUse, expectedCount}) { assertMergeBehaviour({ testName: testName, pipeline: pipeline, mergeType: (mergeType || "anyShard"), - batchSize: (batchSize || 101), + batchSize: (batchSize || 10), + allowDiskUse: allowDiskUse, expectedCount: expectedCount }); } - // - // Test cases. - // + /** + * Runs a series of test cases which will consistently merge on mongoS or mongoD regardless of + * whether 'allowDiskUse' is true, false or omitted. + */ + function runTestCasesWhoseMergeLocationIsConsistentRegardlessOfAllowDiskUse(allowDiskUse) { + // Test that a $match pipeline with an empty merge stage is merged on mongoS. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_match_only", + pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}], + allowDiskUse: allowDiskUse, + expectedCount: 400 + }); - let testName; + // Test that a $sort stage which merges pre-sorted streams is run on mongoS. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_sort_presorted", + pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}], + allowDiskUse: allowDiskUse, + expectedCount: 400 + }); - // Test that a $match pipeline with an empty merge stage is merged on mongoS. - assertMergeOnMongoS({ - testName: "agg_mongos_merge_match_only", - pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}], - batchSize: 10, - expectedCount: 400 - }); + // Test that $skip is merged on mongoS. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_skip", + pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}, {$skip: 300}], + allowDiskUse: allowDiskUse, + expectedCount: 100 + }); - // Test that a $sort stage which merges pre-sorted streams is run on mongoS. - assertMergeOnMongoS({ - testName: "agg_mongos_merge_sort_presorted", - pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}], - batchSize: 10, - expectedCount: 400 - }); + // Test that $limit is merged on mongoS. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_limit", + pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$limit: 300}], + allowDiskUse: allowDiskUse, + expectedCount: 300 + }); - // Test that a $sort stage which must sort the dataset from scratch is NOT run on mongoS. - assertMergeOnMongoD({ - testName: "agg_mongos_merge_sort_in_mem", - pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}, {$sort: {a: 1}}], - batchSize: 10, - expectedCount: 400 - }); + // Test that $sample is merged on mongoS if it is the splitpoint, since this will result in + // a merging $sort of presorted streams in the merge pipeline. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_sample_splitpoint", + pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sample: {size: 300}}], + allowDiskUse: allowDiskUse, + expectedCount: 300 + }); - // Test that a merge pipeline which needs to run on the primary shard is NOT merged on mongoS. - assertMergeOnMongoD({ - testName: "agg_mongos_merge_primary_shard", - pipeline: [ - {$match: {_id: {$gte: -200, $lte: 200}}}, - {$_internalSplitPipeline: {mergeType: "primaryShard"}} - ], - mergeType: "primaryShard", - batchSize: 10, - expectedCount: 400 - }); + // Test that $geoNear is merged on mongoS. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_geo_near", + pipeline: [ + {$geoNear: {near: [0, 0], distanceField: "distance", spherical: true, limit: 300}} + ], + allowDiskUse: allowDiskUse, + expectedCount: 300 + }); - // Test that $skip is merged on mongoS. - assertMergeOnMongoS({ - testName: "agg_mongos_merge_skip", - pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}, {$skip: 300}], - batchSize: 10, - expectedCount: 100 - }); + // Test that a pipeline whose merging half can be run on mongos using only the mongos + // execution machinery returns the correct results. + // TODO SERVER-30882 Find a way to assert that all stages get absorbed by mongos. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_all_mongos_runnable_skip_and_limit_stages", + pipeline: [ + {$match: {_id: {$gte: -200, $lte: 200}}}, + {$sort: {_id: -1}}, + {$skip: 150}, + {$limit: 150}, + {$skip: 5}, + {$limit: 1}, + ], + allowDiskUse: allowDiskUse, + expectedCount: 1 + }); - // Test that $limit is merged on mongoS. - assertMergeOnMongoS({ - testName: "agg_mongos_merge_limit", - pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$limit: 300}], - batchSize: 10, - expectedCount: 300 - }); + // Test that a merge pipeline which needs to run on a shard is NOT merged on mongoS + // regardless of 'allowDiskUse'. + assertMergeOnMongoD({ + testName: "agg_mongos_merge_primary_shard_disk_use_" + allowDiskUse, + pipeline: [ + {$match: {_id: {$gte: -200, $lte: 200}}}, + {$_internalSplitPipeline: {mergeType: "anyShard"}} + ], + mergeType: "anyShard", + allowDiskUse: allowDiskUse, + expectedCount: 400 + }); + } - // Test that $sample is merged on mongoS. - assertMergeOnMongoS({ - testName: "agg_mongos_merge_sample", - pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sample: {size: 300}}], - batchSize: 10, - expectedCount: 300 - }); + /** + * Runs a series of test cases which will always merge on mongoD when 'allowDiskUse' is true, + * and on mongoS when 'allowDiskUse' is false or omitted. + */ + function runTestCasesWhoseMergeLocationDependsOnAllowDiskUse(allowDiskUse) { + // All test cases should merge on mongoD if allowDiskUse is true, mongoS otherwise. + const assertMergeOnMongoX = (allowDiskUse ? assertMergeOnMongoD : assertMergeOnMongoS); + + // Test that a blocking $sort is only merged on mongoS if 'allowDiskUse' is not set. + assertMergeOnMongoX({ + testName: "agg_mongos_merge_blocking_sort_no_disk_use", + pipeline: + [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}, {$sort: {a: 1}}], + allowDiskUse: allowDiskUse, + expectedCount: 400 + }); + + // Test that $group is only merged on mongoS if 'allowDiskUse' is not set. + assertMergeOnMongoX({ + testName: "agg_mongos_merge_group_allow_disk_use", + pipeline: + [{$match: {_id: {$gte: -200, $lte: 200}}}, {$group: {_id: {$mod: ["$_id", 150]}}}], + allowDiskUse: allowDiskUse, + expectedCount: 299 + }); + + // Test that a blocking $sample is only merged on mongoS if 'allowDiskUse' is not set. + assertMergeOnMongoX({ + testName: "agg_mongos_merge_blocking_sample_allow_disk_use", + pipeline: [ + {$match: {_id: {$gte: -200, $lte: 200}}}, + {$sample: {size: 300}}, + {$sample: {size: 200}} + ], + allowDiskUse: allowDiskUse, + expectedCount: 200 + }); + + // Test that $bucketAuto is only merged on mongoS if 'allowDiskUse' is not set. + assertMergeOnMongoX({ + testName: "agg_mongos_merge_bucket_auto_allow_disk_use", + pipeline: [ + {$match: {_id: {$gte: -200, $lte: 200}}}, + {$bucketAuto: {groupBy: "$_id", buckets: 10}} + ], + allowDiskUse: allowDiskUse, + expectedCount: 10 + }); + + // + // Test composite stages. + // + + // Test that $bucket ($group->$sort) is merged on mongoS iff 'allowDiskUse' is not set. + assertMergeOnMongoX({ + testName: "agg_mongos_merge_bucket_allow_disk_use", + pipeline: [ + {$match: {_id: {$gte: -200, $lte: 200}}}, + { + $bucket: { + groupBy: "$_id", + boundaries: [-200, -150, -100, -50, 0, 50, 100, 150, 200] + } + } + ], + allowDiskUse: allowDiskUse, + expectedCount: 8 + }); + + // Test that $sortByCount ($group->$sort) is merged on mongoS iff 'allowDiskUse' isn't set. + assertMergeOnMongoX({ + testName: "agg_mongos_merge_sort_by_count_allow_disk_use", + pipeline: + [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sortByCount: {$mod: ["$_id", 150]}}], + allowDiskUse: allowDiskUse, + expectedCount: 299 + }); + + // Test that $count ($group->$project) is merged on mongoS iff 'allowDiskUse' is not set. + assertMergeOnMongoX({ + testName: "agg_mongos_merge_count_allow_disk_use", + pipeline: [{$match: {_id: {$gte: -150, $lte: 1500}}}, {$count: "doc_count"}], + allowDiskUse: allowDiskUse, + expectedCount: 1 + }); + } + + // Run all test cases for each potential value of 'allowDiskUse'. + for (let allowDiskUse of[false, undefined, true]) { + runTestCasesWhoseMergeLocationIsConsistentRegardlessOfAllowDiskUse(allowDiskUse); + runTestCasesWhoseMergeLocationDependsOnAllowDiskUse(allowDiskUse); + testNameHistory.clear(); + } // Test that merge pipelines containing all mongos-runnable stages produce the expected output. assertMergeOnMongoS({ testName: "agg_mongos_merge_all_mongos_runnable_stages", pipeline: [ - {$match: {_id: {$gte: -200, $lte: 200}}}, - {$sort: {_id: 1}}, + {$geoNear: {near: [0, 0], distanceField: "distance", spherical: true, limit: 400}}, + {$sort: {a: 1}}, {$skip: 150}, {$limit: 150}, {$addFields: {d: true}}, {$unwind: "$a"}, {$sample: {size: 100}}, - {$project: {c: 0}}, + {$project: {c: 0, geo: 0, distance: 0}}, + {$group: {_id: "$_id", doc: {$push: "$$CURRENT"}}}, + {$unwind: "$doc"}, + {$replaceRoot: {newRoot: "$doc"}}, { $redact: { $cond: @@ -218,33 +353,17 @@ { $match: { _id: {$gte: -50, $lte: 100}, - a: {$gte: -50, $lte: 100}, + a: {$type: "number", $gte: -50, $lte: 100}, b: {$exists: false}, c: {$exists: false}, - d: true + d: true, + geo: {$exists: false}, + distance: {$exists: false} } } ], - batchSize: 10, expectedCount: 100 }); - // Test that a pipeline whose merging half can be run on mongos using only the mongos execution - // machinery returns the correct results. - assertMergeOnMongoS({ - testName: "agg_mongos_merge_all_mongos_runnable_skip_and_limit_stages", - pipeline: [ - {$match: {_id: {$gte: -5, $lte: 100}}}, - {$sort: {_id: -1}}, - {$skip: 5}, - {$limit: 10}, - {$skip: 5}, - {$limit: 1}, - ], - batchSize: 10, - expectedCount: 1 - }); - // TODO SERVER-30882 Find a way to assert that all stages get absorbed by mongos. - st.stop(); })(); |