diff options
author | samontea <merciers.merciers@gmail.com> | 2022-05-03 17:26:49 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-05-03 17:56:04 +0000 |
commit | 3151bf2f90871da056873554f5455a09647ee16e (patch) | |
tree | dac3227675c0a42da51df095b357003536750ddf | |
parent | 20c54417144fc3cbae632a27ea306c80361bbb8f (diff) | |
download | mongo-3151bf2f90871da056873554f5455a09647ee16e.tar.gz |
SERVER-64349 Add heuristic-based planning support for bucket unpacking with sort
19 files changed, 981 insertions, 67 deletions
diff --git a/jstests/aggregation/extras/utils.js b/jstests/aggregation/extras/utils.js index efbe87fa3cf..4c71328c027 100644 --- a/jstests/aggregation/extras/utils.js +++ b/jstests/aggregation/extras/utils.js @@ -441,46 +441,75 @@ function desugarSingleStageAggregation(db, coll, stage) { /** * Runs and asserts an explain command for an aggregation with the given pipeline. Returns just the * pipeline from the explain results regardless of cluster topology. + * The fourth parameter `options` is for a few options for unusual scenarios. + * options.inhibitOptimization defaults to true. This prepends an inhibitOptimization stage to the + * query and removes it before returning results. This is sub ideal for views. options.hint is an + * optional hint that will get passed on to the aggregation stage. It defaults to undefined. */ -function getExplainedPipelineFromAggregation(db, coll, pipeline) { +function getExplainedPipelineFromAggregation( + db, coll, pipeline, {inhibitOptimization = true, hint} = {}) { // Prevent stages from being absorbed into the .find() layer - pipeline.unshift({$_internalInhibitOptimization: {}}); - const result = coll.explain().aggregate(pipeline); + if (inhibitOptimization) { + pipeline.unshift({$_internalInhibitOptimization: {}}); + } + + const aggOptions = hint ? {hint: hint} : {}; + + const result = coll.explain().aggregate(pipeline, aggOptions); assert.commandWorked(result); - return getExplainPipelineFromAggregationResult(db, result); + return getExplainPipelineFromAggregationResult(db, result, {inhibitOptimization}); } -function getExplainPipelineFromAggregationResult(db, result) { +function getExplainPipelineFromAggregationResult(db, result, { + inhibitOptimization = true, +} = {}) { // We proceed by cases based on topology. if (!FixtureHelpers.isMongos(db)) { assert(Array.isArray(result.stages), result); - // The first two stages should be the .find() cursor and the inhibit-optimization stage; - // the rest of the stages are what the user's 'stage' expanded to. + // The first two stages should be the .find() cursor and the inhibit-optimization stage (if + // enabled); the rest of the stages are what the user's 'stage' expanded to. assert(result.stages[0].$cursor, result); - assert(result.stages[1].$_internalInhibitOptimization, result); - return result.stages.slice(2); + if (inhibitOptimization) { + assert(result.stages[1].$_internalInhibitOptimization, result); + return result.stages.slice(2); + } else { + return result.stages.slice(1); + } } else { if (result.splitPipeline) { - assert(result.splitPipeline.shardsPart[0].$_internalInhibitOptimization, result); - const shardsPart = result.splitPipeline.shardsPart.slice(1); + let shardsPart = null; + if (inhibitOptimization) { + assert(result.splitPipeline.shardsPart[0].$_internalInhibitOptimization, result); + shardsPart = result.splitPipeline.shardsPart.slice(1); + } else { + shardsPart = result.splitPipeline.shardsPart; + } assert(result.splitPipeline.mergerPart[0].$mergeCursors, result); const mergerPart = result.splitPipeline.mergerPart.slice(1); return [].concat(shardsPart).concat(mergerPart); } else if (result.stages) { // Required for aggregation_mongos_passthrough. assert(Array.isArray(result.stages), result); - // The first two stages should be the .find() cursor and the inhibit-optimization stage; - // the rest of the stages are what the user's 'stage' expanded to. + // The first two stages should be the .find() cursor and the inhibit-optimization stage + // (if enabled); the rest of the stages are what the user's 'stage' expanded to. assert(result.stages[0].$cursor, result); - assert(result.stages[1].$_internalInhibitOptimization, result); - return result.stages.slice(2); + if (inhibitOptimization) { + assert(result.stages[1].$_internalInhibitOptimization, result); + return result.stages.slice(2); + } else { + return result.stages.slice(1); + } } else { // Required for aggregation_one_shard_sharded_collections. assert(Array.isArray(result.shards["shard-rs0"].stages), result); assert(result.shards["shard-rs0"].stages[0].$cursor, result); - assert(result.shards["shard-rs0"].stages[1].$_internalInhibitOptimization, result); - return result.shards["shard-rs0"].stages.slice(2); + if (inhibitOptimization) { + assert(result.shards["shard-rs0"].stages[1].$_internalInhibitOptimization, result); + return result.shards["shard-rs0"].stages.slice(2); + } else { + return result.shards["shard-rs0"].stages.slice(1); + } } } } diff --git a/jstests/core/timeseries/bucket_unpacking_with_sort.js b/jstests/core/timeseries/bucket_unpacking_with_sort.js new file mode 100644 index 00000000000..77c41091451 --- /dev/null +++ b/jstests/core/timeseries/bucket_unpacking_with_sort.js @@ -0,0 +1,356 @@ +/** + * Test that the bucket unpacking with sorting rewrite is performed and doesn't cause incorrect + * results to be created. + * + * @tags: [ + * requires_fcv_61, + * # We need a timeseries collection. + * assumes_no_implicit_collection_creation_after_drop, + * # Bounded sorter is currently broken w/ sharding. + * assumes_unsharded_collection, + * # Cannot insert into a time-series collection in a multi-document transaction. + * does_not_support_transactions, + * # Refusing to run a test that issues an aggregation command with explain because it may + * # return incomplete results if interrupted by a stepdown. + * does_not_support_stepdowns, + * # This complicates aggregation extraction. + * do_not_wrap_aggregations_in_facets, + * # We need a timeseries collection. + * requires_timeseries, + * # Explain of a resolved view must be executed by mongos. + * directly_against_shardsvrs_incompatible, + * ] + */ +(function() { +"use strict"; + +const featureEnabled = + assert.commandWorked(db.adminCommand({getParameter: 1, featureFlagBucketUnpackWithSort: 1})) + .featureFlagBucketUnpackWithSort.value; +if (!featureEnabled) { + jsTestLog("Skipping test because the BUS feature flag is disabled"); + return; +} + +load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. +load("jstests/aggregation/extras/utils.js"); // For getExplainedPipelineFromAggregation. +const collName = "bucket_unpacking_with_sort"; +const coll = db[collName]; +const metaCollName = "bucket_unpacking_with_sort_with_meta"; +const metaColl = db[metaCollName]; +const metaCollSubFieldsName = "bucket_unpacking_with_sort_with_meta_sub"; +const metaCollSubFields = db[metaCollSubFieldsName]; +const subFields = ["a", "b"]; + +const setupColl = (coll, collName, usesMeta, subFields = null) => { + coll.drop(); + if (usesMeta) { + db.createCollection(collName, {timeseries: {timeField: "t", metaField: "m"}}); + } else { + db.createCollection(collName, {timeseries: {timeField: "t"}}); + } + // Create a few buckets. + let meta = usesMeta ? 10 : 1; + let docs = []; + let numberOfItemsPerBucket = + db.adminCommand({getParameter: 1, timeseriesBucketMaxCount: 1}).timeseriesBucketMaxCount; + assert(Number.isInteger(numberOfItemsPerBucket)); + for (let j = 0; j < meta; ++j) { + let metaVal = j; + if (subFields) { + metaVal = subFields.reduce((memo, field) => { + return Object.assign(Object.assign({}, memo), {[field]: j}); + }, {}); + } + for (let i = 0; i < numberOfItemsPerBucket; ++i) { + docs.push({m: metaVal, t: new Date(i * 6000)}); + } + // Because the max bucket span is 3600 * 1000 milliseconds, we know that the above will have + // generated two buckets. Since we are now going back in time to before the minimum + // timestamp of the second bucket, we'll open a third and fourth bucket below. Crucially + // will overlap with the first two buckets. + for (let i = 0; i < numberOfItemsPerBucket; ++i) { + docs.push({m: metaVal, t: new Date(i * 6000 + 3000)}); + } + } + assert.commandWorked(coll.insert(docs)); +}; + +setupColl(coll, collName, false); +setupColl(metaColl, metaCollName, true); +setupColl(metaCollSubFields, metaCollSubFieldsName, true, subFields); + +// For use in reductions. +// Takes a memo (accumulator value) and a stage and returns if the stage was an internal bounded +// sort or the memo was true. +const stageIsInternalBoundedSort = (stage) => { + return stage.hasOwnProperty("$_internalBoundedSort"); +}; + +const hasInternalBoundedSort = (pipeline) => pipeline.some(stageIsInternalBoundedSort); + +const getIfMatch = (memo, stage) => { + if (memo) + return memo; + else if (stage.hasOwnProperty("$match")) + return stage; + else + return null; +}; + +const findFirstMatch = (pipeline) => pipeline.reduce(getIfMatch, null); + +const setup = (coll, createIndex = null) => { + // Create index. + if (createIndex) { + assert.commandWorked(coll.createIndex(createIndex)); + } +}; + +/** + * This test creates a time-series collection, inserts data (with multiple overlapping buckets), and + * tests that the BUS optimization is performed and produces correct results. + * + * `sortSpec` is the sort that we wish to have optimized. + * `createIndex` is the index that we need to create in order to perform the optimization. + * It defaults to null which signals that we don't create an index. + * `hint` is the hint that we specify in order to produce the optimization. + * traversing a min (resp. max) field index on a descending (resp. ascending) sort. + * `testColl` is the collection to use. + */ +const runRewritesTest = (sortSpec, + createIndex, + hint, + testColl, + precise, + intermediaryStages = [], + posteriorStages = []) => { + setup(testColl, createIndex); + + const options = (hint) ? {hint: hint} : {}; + + const match = {$match: {t: {$lt: new Date("2022-01-01")}}}; + // Get results + const optPipeline = [match, ...intermediaryStages, {$sort: sortSpec}, ...posteriorStages]; + const optResults = testColl.aggregate(optPipeline, options).toArray(); + const optExplainFull = testColl.explain().aggregate(optPipeline, options); + + const ogPipeline = [ + match, + ...intermediaryStages, + {$_internalInhibitOptimization: {}}, + {$sort: sortSpec}, + ...posteriorStages + ]; + const ogResults = testColl.aggregate(ogPipeline, options).toArray(); + const ogExplainFull = testColl.explain().aggregate(ogPipeline, options); + + // Assert correct + assert.docEq(optResults, ogResults, {optResults: optResults, ogResults: ogResults}); + + // Check contains stage + const optExplain = getExplainedPipelineFromAggregation( + db, testColl, optPipeline, {inhibitOptimization: false, hint: hint}); + assert(hasInternalBoundedSort(optExplain), optExplainFull); + + // Check doesn't contain stage + const ogExplain = getExplainedPipelineFromAggregation( + db, testColl, ogPipeline, {inhibitOptimization: false, hint: hint}); + assert(!hasInternalBoundedSort(ogExplain), ogExplainFull); + + let foundMatch = findFirstMatch(optExplain); + if (!precise) { + assert.docEq(foundMatch, { + $match: { + $expr: + {$lte: [{$subtract: ["$control.max.t", "$control.min.t"]}, {$const: 3600000}]} + } + }); + } else { + assert.docEq(foundMatch, match); + } +}; + +const runDoesntRewriteTest = (sortSpec, createIndex, hint, testColl, intermediaryStages = []) => { + setup(testColl, createIndex); + + const optPipeline = [ + {$match: {t: {$lt: new Date("2022-01-01")}}}, + ...intermediaryStages, + {$sort: sortSpec}, + ]; + + // Check doesn't contain stage + const optExplain = getExplainedPipelineFromAggregation( + db, testColl, optPipeline, {inhibitOptimization: false, hint: hint}); + const optExplainFull = testColl.explain().aggregate(optPipeline, {hint}); + const containsOptimization = optExplain.reduce(stageIsInternalBoundedSort, false); + assert(!containsOptimization, optExplainFull); +}; + +// Collscan cases +runRewritesTest({t: 1}, null, null, coll, true); +runRewritesTest({t: -1}, null, {$natural: -1}, coll, false); + +// Indexed cases +runRewritesTest({t: 1}, {t: 1}, {t: 1}, coll, true); +runRewritesTest({t: -1}, {t: -1}, {t: -1}, coll, true); +runRewritesTest({t: 1}, {t: 1}, {t: 1}, coll, true); +runRewritesTest({m: 1, t: -1}, {m: 1, t: -1}, {m: 1, t: -1}, metaColl, true); +runRewritesTest({m: -1, t: 1}, {m: -1, t: 1}, {m: -1, t: 1}, metaColl, true); +runRewritesTest({m: -1, t: -1}, {m: -1, t: -1}, {m: -1, t: -1}, metaColl, true); +runRewritesTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, true); + +// Intermediary projects that don't modify sorted fields are allowed. +runRewritesTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, true, [{$project: {a: 0}}]); +runRewritesTest( + {m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, true, [{$project: {m: 1, t: 1}}]); + +// Test multiple meta fields +let metaIndexObj = subFields.reduce((memo, field) => { + return Object.assign(Object.assign({}, memo), {[`m.${field}`]: 1}); +}, {}); +Object.assign(metaIndexObj, {t: 1}); +runRewritesTest(metaIndexObj, metaIndexObj, metaIndexObj, metaCollSubFields, true); +runRewritesTest( + metaIndexObj, metaIndexObj, metaIndexObj, metaCollSubFields, true, [{$project: {m: 1, t: 1}}]); + +// Check sort-limit optimization. +runRewritesTest({t: 1}, {t: 1}, {t: 1}, coll, true, [], [{$limit: 10}]); + +// Check set window fields is optimized as well. +// Since {k: 1} cannot provide a bounded sort we know if there's a bounded sort it comes form +// setWindowFields. +runRewritesTest({k: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, true, [], [ + {$setWindowFields: {partitionBy: "$m", sortBy: {t: 1}, output: {arr: {$max: "$t"}}}} +]); + +// Negative tests +for (let m = -1; m < 2; m++) { + for (let t = -1; t < 2; t++) { + for (let k = -1; k < 2; k++) { + printjson({"currently running": "the following configuration...", m: m, t: t, k: k}); + let sort = null; + let createIndex = null; + let hint = null; + let usesMeta = null; + if (k != 0) { + // This is the case where we add an intermediary incompatible field. + if (m == 0) { + if (t == 0) { + sort = {k: k}; + } else { + sort = {k: k, t: t}; + } + } else { + if (t == 0) { + sort = {m: m, k: k}; + } else { + sort = {m: m, k: k, t: t}; + } + } + hint = sort; + createIndex = sort; + usesMeta = m != 0; + runDoesntRewriteTest(sort, createIndex, hint, usesMeta ? metaColl : coll); + } else { + // This is the case where we do not add an intermediary incompatible field. + // Instead we enumerate the ways that the index and sort could disagree. + + // For the meta case, negate the time order. + // For the non-meta case, use a collscan with a negated order. + if (m == 0) { + if (t == 0) { + // Do not execute a test run. + } else { + sort = {t: t}; + hint = {$natural: -t}; + } + } else { + if (t == 0) { + // Do not execute a test run. + } else { + usesMeta = true; + sort = {m: m, t: t}; + hint = {m: m, t: -t}; + createIndex = hint; + } + } + + if (sort) + runDoesntRewriteTest(sort, createIndex, hint, usesMeta ? metaColl : coll); + + sort = null; + hint = null; + createIndex = null; + usesMeta = false; + // For the meta case, negate the meta order. + // For the non-meta case, use an index instead of a collscan. + if (m == 0) { + if (t == 0) { + // Do not execute a test run. + } else { + sort = {t: t}; + createIndex = {t: -t}; + hint = createIndex; + } + } else { + if (t == 0) { + // Do not execute a test run. + } else { + usesMeta = true; + sort = {m: m, t: t}; + hint = {m: -m, t: t}; + createIndex = hint; + } + } + + if (sort) + runDoesntRewriteTest(sort, createIndex, hint, usesMeta ? metaColl : coll); + + sort = null; + hint = null; + createIndex = null; + usesMeta = false; + // For the meta case, negate both meta and time. + if (m == 0) { + if (t == 0) { + // Do not execute a test run. + } else { + // Do not execute -- we've exhausted relevant cases. + } + } else { + if (t == 0) { + // Do not execute a test run. + } else { + usesMeta = true; + sort = {m: m, t: t}; + hint = {m: -m, t: -t}; + createIndex = hint; + } + } + + if (sort) + runDoesntRewriteTest(sort, createIndex, hint, usesMeta ? metaColl : coll); + } + } + } +} + +// Test mismatched meta paths don't produce the optimization. +runDoesntRewriteTest({m: 1, t: 1}, {"m.a": 1, t: 1}, {"m.a": 1, t: 1}, metaCollSubFields); +runDoesntRewriteTest( + {"m.b": 1, t: 1}, {"m.a": 1, "m.b": 1, t: 1}, {"m.a": 1, "m.b": 1, t: 1}, metaCollSubFields); +runDoesntRewriteTest({"m.a": 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaCollSubFields); +runDoesntRewriteTest({"m.a": 1, "m.b": 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaCollSubFields); +// Test matched meta-subpaths with mismatched directions don't produce the optimization. +runDoesntRewriteTest({"m.a": 1, t: -1}, {"m.a": 1, t: 1}, {"m.a": 1, t: 1}, metaCollSubFields); +// Test intermediary projections that exclude the sorted fields don't produce the optimizaiton. +runDoesntRewriteTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, [{$project: {m: 0}}]); +runDoesntRewriteTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, [{$project: {t: 0}}]); +runDoesntRewriteTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, [{$project: {a: 1}}]); +runDoesntRewriteTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, [{$project: {'m.a': 0}}]); +runDoesntRewriteTest({'m.a': 1, t: 1}, {'m.a': 1, t: 1}, {'m.a': 1, t: 1}, metaCollSubFields, [ + {$project: {'m': 0}} +]); +})(); diff --git a/jstests/core/timeseries/timeseries_internal_bounded_sort.js b/jstests/core/timeseries/timeseries_internal_bounded_sort.js index 67cdc4c1c31..c52bb3efb65 100644 --- a/jstests/core/timeseries/timeseries_internal_bounded_sort.js +++ b/jstests/core/timeseries/timeseries_internal_bounded_sort.js @@ -88,7 +88,7 @@ function runTest(ascending) { $_internalBoundedSort: { sortKey: {t: ascending ? 1 : -1}, bound: ascending ? {base: "min"} - : {base: "min", offset: bucketMaxSpanSeconds} + : {base: "min", offsetSeconds: bucketMaxSpanSeconds} } }, ]) @@ -104,7 +104,7 @@ function runTest(ascending) { { $_internalBoundedSort: { sortKey: {t: ascending ? 1 : -1}, - bound: ascending ? {base: "max", offset: -bucketMaxSpanSeconds} + bound: ascending ? {base: "max", offsetSeconds: -bucketMaxSpanSeconds} : {base: "max"} } }, @@ -136,10 +136,10 @@ function runTest(ascending) { $_internalBoundedSort: { sortKey: {t: ascending ? 1 : -1}, bound: ascending ? {base: "min"} - : {base: "min", offset: bucketMaxSpanSeconds} + : {base: "min", offsetSeconds: bucketMaxSpanSeconds}, + limit: 100 } }, - {$limit: 100}, ]) .toArray(); assertSorted(optFromMin, ascending); @@ -154,11 +154,11 @@ function runTest(ascending) { { $_internalBoundedSort: { sortKey: {t: ascending ? 1 : -1}, - bound: ascending ? {base: "max", offset: -bucketMaxSpanSeconds} - : {base: "max"} + bound: ascending ? {base: "max", offsetSeconds: -bucketMaxSpanSeconds} + : {base: "max"}, + limit: 100 } - }, - {$limit: 100}, + } ]) .toArray(); assertSorted(optFromMax, ascending); diff --git a/jstests/core/timeseries/timeseries_internal_bounded_sort_compound.js b/jstests/core/timeseries/timeseries_internal_bounded_sort_compound.js index 9f290b1774b..208d6a45b9a 100644 --- a/jstests/core/timeseries/timeseries_internal_bounded_sort_compound.js +++ b/jstests/core/timeseries/timeseries_internal_bounded_sort_compound.js @@ -131,7 +131,7 @@ function runTest(sortSpec) { $_internalBoundedSort: { sortKey: sortSpec, bound: sortSpec.t > 0 ? {base: "min"} - : {base: "min", offset: bucketMaxSpanSeconds} + : {base: "min", offsetSeconds: bucketMaxSpanSeconds} } }, ]; @@ -145,7 +145,7 @@ function runTest(sortSpec) { { $_internalBoundedSort: { sortKey: sortSpec, - bound: sortSpec.t > 0 ? {base: "max", offset: -bucketMaxSpanSeconds} + bound: sortSpec.t > 0 ? {base: "max", offsetSeconds: -bucketMaxSpanSeconds} : {base: "max"} } }, @@ -174,10 +174,10 @@ function runTest(sortSpec) { $_internalBoundedSort: { sortKey: sortSpec, bound: sortSpec.t > 0 ? {base: "min"} - : {base: "min", offset: bucketMaxSpanSeconds} + : {base: "min", offsetSeconds: bucketMaxSpanSeconds}, + limit: 100 } - }, - {$limit: 100}, + } ]; const optFromMin = buckets.aggregate(optFromMinQuery).toArray(); assertSorted(optFromMin, sortSpec); @@ -190,11 +190,11 @@ function runTest(sortSpec) { { $_internalBoundedSort: { sortKey: sortSpec, - bound: sortSpec.t > 0 ? {base: "max", offset: -bucketMaxSpanSeconds} - : {base: "max"} + bound: sortSpec.t > 0 ? {base: "max", offsetSeconds: -bucketMaxSpanSeconds} + : {base: "max"}, + limit: 100 } - }, - {$limit: 100}, + } ]; const optFromMax = buckets.aggregate(optFromMaxQuery).toArray(); assertSorted(optFromMax, sortSpec); diff --git a/jstests/core/timeseries/timeseries_internal_bounded_sort_compound_mixed_types.js b/jstests/core/timeseries/timeseries_internal_bounded_sort_compound_mixed_types.js index a986458a99f..417818559d6 100644 --- a/jstests/core/timeseries/timeseries_internal_bounded_sort_compound_mixed_types.js +++ b/jstests/core/timeseries/timeseries_internal_bounded_sort_compound_mixed_types.js @@ -131,7 +131,7 @@ function runTest(sortSpec) { // Use a much looser bound than necessary, to exercise the partitioning logic more. // With such a loose bound, events are only released due to a partition boundary, // never a bucket boundary. - bound: {base: "min", offset: -sortSpec.t * 10 * bucketMaxSpanSeconds}, + bound: {base: "min", offsetSeconds: -sortSpec.t * 10 * bucketMaxSpanSeconds}, } }, ]; @@ -152,7 +152,7 @@ function runTest(sortSpec) { // Use a much looser bound than necessary, to exercise the partitioning logic more. // With such a loose bound, events are only released due to a partition boundary, // never a bucket boundary. - bound: {base: "max", offset: -sortSpec.t * 10 * bucketMaxSpanSeconds}, + bound: {base: "max", offsetSeconds: -sortSpec.t * 10 * bucketMaxSpanSeconds}, } }, ]; diff --git a/jstests/noPassthrough/timeseries_internal_bounded_sort_spilling.js b/jstests/noPassthrough/timeseries_internal_bounded_sort_spilling.js index bd5d7e9bcca..0014dc64d95 100644 --- a/jstests/noPassthrough/timeseries_internal_bounded_sort_spilling.js +++ b/jstests/noPassthrough/timeseries_internal_bounded_sort_spilling.js @@ -158,21 +158,16 @@ function assertSorted(result) { assertSorted(naive); assert.eq(100, naive.length); - const opt = buckets - .aggregate( - [ - {$sort: {'control.min.t': 1}}, - unpackStage, - { - $_internalBoundedSort: { - sortKey: {t: 1}, - bound: {base: "min"}, - } - }, - {$limit: 100}, - ], - {allowDiskUse: true}) - .toArray(); + const opt = + buckets + .aggregate( + [ + {$sort: {'control.min.t': 1}}, + unpackStage, + {$_internalBoundedSort: {sortKey: {t: 1}, bound: {base: "min"}, limit: 100}} + ], + {allowDiskUse: true}) + .toArray(); assertSorted(opt); assert.eq(100, opt.length); diff --git a/src/mongo/db/exec/bucket_unpacker.cpp b/src/mongo/db/exec/bucket_unpacker.cpp index 0804da7b831..aa28e4f3e0b 100644 --- a/src/mongo/db/exec/bucket_unpacker.cpp +++ b/src/mongo/db/exec/bucket_unpacker.cpp @@ -42,7 +42,6 @@ #include "mongo/db/matcher/expression_tree.h" #include "mongo/db/matcher/extensions_callback_noop.h" #include "mongo/db/pipeline/expression.h" -#include "mongo/db/timeseries/timeseries_constants.h" #include "mongo/db/timeseries/timeseries_options.h" namespace mongo { diff --git a/src/mongo/db/exec/bucket_unpacker.h b/src/mongo/db/exec/bucket_unpacker.h index 90465257415..287bd9f2540 100644 --- a/src/mongo/db/exec/bucket_unpacker.h +++ b/src/mongo/db/exec/bucket_unpacker.h @@ -37,6 +37,7 @@ #include "mongo/db/exec/document_value/document.h" #include "mongo/db/matcher/expression.h" #include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/timeseries/timeseries_constants.h" namespace mongo { /** @@ -289,6 +290,22 @@ public: return _includeMaxTimeAsMetadata; } + const std::string& getTimeField() const { + return _spec.timeField(); + } + + const boost::optional<std::string>& getMetaField() const { + return _spec.metaField(); + } + + std::string getMinField(StringData field) const { + return std::string{timeseries::kControlMinFieldNamePrefix} + field; + } + + std::string getMaxField(StringData field) const { + return std::string{timeseries::kControlMaxFieldNamePrefix} + field; + } + void setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior); void setIncludeMinTimeAsMetadata(); void setIncludeMaxTimeAsMetadata(); diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h index 0de33f09899..f9ce637dbad 100644 --- a/src/mongo/db/exec/collection_scan.h +++ b/src/mongo/db/exec/collection_scan.h @@ -91,6 +91,10 @@ public: const SpecificStats* getSpecificStats() const final; + CollectionScanParams::Direction getDirection() const { + return _params.direction; + } + protected: void doSaveStateRequiresCollection() final; diff --git a/src/mongo/db/exec/index_scan.h b/src/mongo/db/exec/index_scan.h index 3efc3360d08..c0cf75bf02d 100644 --- a/src/mongo/db/exec/index_scan.h +++ b/src/mongo/db/exec/index_scan.h @@ -131,6 +131,14 @@ public: static const char* kStageType; + const BSONObj& getKeyPattern() const { + return _keyPattern; + } + + bool isForward() const { + return _forward; + } + protected: void doSaveStateRequiresIndex() final; diff --git a/src/mongo/db/index/sort_key_generator.h b/src/mongo/db/index/sort_key_generator.h index d9b73c75c38..40e270dac21 100644 --- a/src/mongo/db/index/sort_key_generator.h +++ b/src/mongo/db/index/sort_key_generator.h @@ -75,6 +75,10 @@ public: return _sortPattern.isSingleElementKey(); } + const SortPattern& getSortPattern() const { + return _sortPattern; + } + private: // Returns the sort key for the input 'doc' as a Value. // diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 37a6b103140..ff2aaf9f8ae 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -49,6 +49,7 @@ #include "mongo/db/exec/scoped_timer.h" #include "mongo/db/generic_cursor.h" #include "mongo/db/jsobj.h" +#include "mongo/db/matcher/expression_algo.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/expression_context.h" @@ -597,6 +598,41 @@ public: return newNames; } + bool canModify(const FieldPath& fieldPath) const { + switch (type) { + case Type::kAllPaths: + return true; + case Type::kNotSupported: + return true; + case Type::kFiniteSet: + // If there's a subpath that is modified this path may be modified. + for (size_t i = 0; i < fieldPath.getPathLength(); i++) { + if (paths.count(fieldPath.getSubpath(i).toString())) + return true; + } + + for (auto&& path : paths) { + // If there's a superpath that is modified this path may be modified. + if (expression::isPathPrefixOf(fieldPath.fullPath(), path)) { + return true; + } + } + + return false; + case Type::kAllExcept: + // If one of the subpaths is unmodified return false. + for (size_t i = 0; i < fieldPath.getPathLength(); i++) { + if (paths.count(fieldPath.getSubpath(i).toString())) + return false; + } + + // Otherwise return true; + return true; + } + // Cannot hit. + MONGO_UNREACHABLE_TASSERT(6434901); + } + Type type; std::set<std::string> paths; diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp index 4aef3bc0c2d..272d7a1a80c 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp @@ -1191,5 +1191,4 @@ DocumentSource::GetModPathsReturn DocumentSourceInternalUnpackBucket::getModifie } return {GetModPathsReturn::Type::kAllPaths, std::set<std::string>{}, {}}; } - } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h index 96bca0e1680..df3bf93bff4 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h @@ -33,6 +33,9 @@ #include <vector> #include "mongo/db/exec/bucket_unpacker.h" +#include "mongo/db/exec/collection_scan.h" +#include "mongo/db/exec/index_scan.h" +#include "mongo/db/exec/plan_stage.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_match.h" @@ -103,6 +106,18 @@ public: return DepsTracker::State::EXHAUSTIVE_ALL; } + int getBucketMaxSpanSeconds() const { + return _bucketMaxSpanSeconds; + } + + std::string getMinTimeField() const { + return _bucketUnpacker.getMinField(_bucketUnpacker.getTimeField()); + } + + std::string getMaxTimeField() const { + return _bucketUnpacker.getMaxField(_bucketUnpacker.getTimeField()); + } + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; }; @@ -164,6 +179,14 @@ public: _bucketMaxCount = bucketMaxCount; } + void setIncludeMinTimeAsMetadata() { + _bucketUnpacker.setIncludeMinTimeAsMetadata(); + } + + void setIncludeMaxTimeAsMetadata() { + _bucketUnpacker.setIncludeMaxTimeAsMetadata(); + } + boost::optional<long long> sampleSize() const { return _sampleSize; } diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index 3fbede89ffc..2503a2e0857 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -59,9 +59,6 @@ using std::unique_ptr; using std::vector; namespace { -constexpr StringData kMin = "min"_sd; -constexpr StringData kMax = "max"_sd; - struct BoundMakerMin { const long long offset; // Offset in millis @@ -72,7 +69,9 @@ struct BoundMakerMin { } Document serialize() const { - return Document{{{"base"_sd, kMin}, {"offset"_sd, offset}}}; + // Convert from millis to seconds. + return Document{{{"base"_sd, DocumentSourceSort::kMin}, + {DocumentSourceSort::kOffset, (offset / 1000)}}}; } }; @@ -86,7 +85,9 @@ struct BoundMakerMax { } Document serialize() const { - return Document{{{"base"_sd, kMax}, {"offset"_sd, offset}}}; + // Convert from millis to seconds. + return Document{{{"base"_sd, DocumentSourceSort::kMax}, + {DocumentSourceSort::kOffset, (offset / 1000)}}}; } }; struct CompAsc { @@ -296,10 +297,9 @@ void DocumentSourceSort::serializeToArray( MutableDocument mutDoc{Document{{ {"$_internalBoundedSort"_sd, - Document{{ - {"sortKey"_sd, std::move(sortKey)}, - {"bound"_sd, _timeSorter->serializeBound()}, - }}}, + Document{{{"sortKey"_sd, std::move(sortKey)}, + {"bound"_sd, _timeSorter->serializeBound()}, + {"limit"_sd, static_cast<long long>(_timeSorter->limit())}}}}, }}}; if (explain >= ExplainOptions::Verbosity::kExecStats) { @@ -424,6 +424,58 @@ intrusive_ptr<DocumentSourceSort> DocumentSourceSort::create( return pSort; } +intrusive_ptr<DocumentSourceSort> DocumentSourceSort::createBoundedSort( + SortPattern pat, + StringData boundBase, + long long boundOffset, + boost::optional<long long> limit, + const intrusive_ptr<ExpressionContext>& expCtx) { + + auto ds = DocumentSourceSort::create(expCtx, pat); + + SortOptions opts; + opts.maxMemoryUsageBytes = internalQueryMaxBlockingSortMemoryUsageBytes.load(); + if (expCtx->allowDiskUse) { + opts.extSortAllowed = true; + opts.tempDir = expCtx->tempDir; + } + + if (limit) { + opts.Limit(limit.get()); + } + + if (boundBase == kMin) { + if (pat.back().isAscending) { + ds->_timeSorter.reset( + new TimeSorterAscMin{opts, CompAsc{}, BoundMakerMin{boundOffset}}); + } else { + ds->_timeSorter.reset( + new TimeSorterDescMin{opts, CompDesc{}, BoundMakerMin{boundOffset}}); + } + ds->_requiredMetadata.set(DocumentMetadataFields::MetaType::kTimeseriesBucketMinTime); + } else if (boundBase == kMax) { + if (pat.back().isAscending) { + ds->_timeSorter.reset( + new TimeSorterAscMax{opts, CompAsc{}, BoundMakerMax{boundOffset}}); + } else { + ds->_timeSorter.reset( + new TimeSorterDescMax{opts, CompDesc{}, BoundMakerMax{boundOffset}}); + } + ds->_requiredMetadata.set(DocumentMetadataFields::MetaType::kTimeseriesBucketMaxTime); + } else { + MONGO_UNREACHABLE; + } + + if (pat.size() > 1) { + SortPattern partitionKey = + std::vector<SortPattern::SortPatternPart>(pat.begin(), pat.end() - 1); + ds->_timeSorterPartitionKeyGen = + SortKeyGenerator{std::move(partitionKey), expCtx->getCollator()}; + } + + return ds; +} + intrusive_ptr<DocumentSourceSort> DocumentSourceSort::parseBoundedSort( BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { uassert(6369905, @@ -452,7 +504,7 @@ intrusive_ptr<DocumentSourceSort> DocumentSourceSort::parseBoundedSort( uassert( 6460200, "$_internalBoundedSort bound must be an object", bound && bound.type() == Object); - BSONElement boundOffsetElem = bound.Obj()["offset"]; + BSONElement boundOffsetElem = bound.Obj()[DocumentSourceSort::kOffset]; long long boundOffset = 0; if (boundOffsetElem && boundOffsetElem.isNumber()) { boundOffset = uassertStatusOK(boundOffsetElem.parseIntegerElementToLong()) * @@ -470,10 +522,16 @@ intrusive_ptr<DocumentSourceSort> DocumentSourceSort::parseBoundedSort( boundBase == kMin || boundBase == kMax); SortOptions opts; - opts.maxMemoryUsageBytes = internalQueryMaxBlockingSortMemoryUsageBytes.load(); + opts.MaxMemoryUsageBytes(internalQueryMaxBlockingSortMemoryUsageBytes.load()); if (expCtx->allowDiskUse) { - opts.extSortAllowed = true; - opts.tempDir = expCtx->tempDir; + opts.ExtSortAllowed(true); + opts.TempDir(expCtx->tempDir); + } + if (BSONElement limitElem = args["limit"]) { + uassert(6588100, + "$_internalBoundedSort limit must be a non-negative number if specified", + limitElem.isNumber() && limitElem.numberLong() >= 0); + opts.Limit(limitElem.numberLong()); } auto ds = DocumentSourceSort::create(expCtx, pat); diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index 89e302330c2..ac592c08c6c 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -42,6 +42,10 @@ namespace mongo { class DocumentSourceSort final : public DocumentSource { public: + static constexpr StringData kMin = "min"_sd; + static constexpr StringData kMax = "max"_sd; + static constexpr StringData kOffset = "offsetSeconds"_sd; + struct SortableDate { Date_t date; @@ -130,6 +134,12 @@ public: return create(pExpCtx, {sortOrder, pExpCtx}); } + static boost::intrusive_ptr<DocumentSourceSort> createBoundedSort( + SortPattern pat, + StringData boundBase, + long long boundOffset, + boost::optional<long long> limit, + const boost::intrusive_ptr<ExpressionContext>& expCtx); /** * Parse a stage that uses BoundedSorter. */ @@ -164,6 +174,10 @@ public: return _populated; }; + bool isBoundedSortStage() { + return (_timeSorter) ? true : false; + } + bool hasLimit() const { return _sortExecutor->hasLimit(); } diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index b4e9334ff3d..d013ed96dc9 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -28,6 +28,7 @@ */ #include "mongo/db/query/projection_parser.h" +#include "mongo/db/server_options.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery #include "mongo/platform/basic.h" @@ -54,6 +55,7 @@ #include "mongo/db/exec/unpack_timeseries_bucket.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/index/index_access_method.h" +#include "mongo/db/matcher/expression_expr.h" #include "mongo/db/matcher/extensions_callback_real.h" #include "mongo/db/namespace_string.h" #include "mongo/db/ops/write_ops_exec.h" @@ -78,6 +80,7 @@ #include "mongo/db/query/collation/collator_interface.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_executor_factory.h" +#include "mongo/db/query/plan_executor_impl.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/query_feature_flags_gen.h" #include "mongo/db/query/query_knobs_gen.h" @@ -407,6 +410,35 @@ std::pair<DocumentSourceSample*, DocumentSourceInternalUnpackBucket*> extractSam return std::pair{sampleStage, unpackStage}; } + +std::tuple<DocumentSourceInternalUnpackBucket*, DocumentSourceSort*> findUnpackThenSort( + const Pipeline::SourceContainer& sources) { + DocumentSourceSort* sortStage = nullptr; + DocumentSourceInternalUnpackBucket* unpackStage = nullptr; + + auto sourcesIt = sources.begin(); + while (sourcesIt != sources.end()) { + if (!sortStage) { + sortStage = dynamic_cast<DocumentSourceSort*>(sourcesIt->get()); + + if (sortStage) { + // Do not double optimize + if (sortStage->isBoundedSortStage()) { + return {nullptr, nullptr}; + } + + return {unpackStage, sortStage}; + } + } + + if (!unpackStage) { + unpackStage = dynamic_cast<DocumentSourceInternalUnpackBucket*>(sourcesIt->get()); + } + ++sourcesIt; + } + + return {unpackStage, sortStage}; +} } // namespace StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::createRandomCursorExecutor( @@ -875,6 +907,103 @@ auto buildProjectionForPushdown(const DepsTracker& deps, } } // namespace +boost::optional<std::pair<PipelineD::IndexSortOrderAgree, PipelineD::IndexOrderedByMinTime>> +PipelineD::supportsSort(const BucketUnpacker& bucketUnpacker, + PlanStage* root, + const SortPattern& sort) { + if (!root) + return boost::none; + + switch (root->stageType()) { + case STAGE_COLLSCAN: { + const CollectionScan* scan = static_cast<CollectionScan*>(root); + if (sort.size() == 1) { + auto part = sort[0]; + // Check the sort we're asking for is on time. + if (part.fieldPath && *part.fieldPath == bucketUnpacker.getTimeField()) { + // Check that the directions agree. + if ((scan->getDirection() == CollectionScanParams::Direction::FORWARD) == + part.isAscending) + return std::pair{part.isAscending, true}; + } + } + return boost::none; + } + case STAGE_IXSCAN: { + const IndexScan* scan = static_cast<IndexScan*>(root); + + const auto keyPattern = scan->getKeyPattern(); + const bool forward = scan->isForward(); + + // Return none if the keyPattern cannot support the sort. + // Note We add one to sort size to account for the compounding on min/max for the time + // field. + if ((sort.size() + 1 > (unsigned int)keyPattern.nFields()) || (sort.size() < 1)) { + return boost::none; + } + + if (sort.size() == 1) { + auto part = sort[0]; + + // TOOD SERVER-65050: implement here. + + // Check the sort we're asking for is on time. + if (part.fieldPath && *part.fieldPath == bucketUnpacker.getTimeField()) { + auto keyPatternIter = keyPattern.begin(); + + return checkTimeHelper( + bucketUnpacker, keyPatternIter, forward, *part.fieldPath, part.isAscending); + } + } else if (sort.size() >= 2) { + size_t i = 0; + + for (auto keyPatternIter = keyPattern.begin(); + i < sort.size() && keyPatternIter != keyPattern.end(); + (++keyPatternIter)) { + auto part = sort[i]; + if (!(part.fieldPath)) + return boost::none; + + if (i < sort.size() - 1) { + // Check the meta field index isn't special. + if (!(*keyPatternIter).isNumber() || + abs((*keyPatternIter).numberInt()) != 1) { + return boost::none; + } + + // True = ascending; false = descending. + bool direction = ((*keyPatternIter).numberInt() == 1); + direction = (forward) ? direction : !direction; + + // Return false if partOne and the first keyPattern part don't agree. + if (!sortAndKeyPatternPartAgreeAndOnMeta( + bucketUnpacker, (*keyPatternIter).fieldName(), *part.fieldPath) || + part.isAscending != direction) + return boost::none; + } else { + if (!part.fieldPath || + !(*part.fieldPath == bucketUnpacker.getTimeField())) { + return boost::none; + } + + return checkTimeHelper(bucketUnpacker, + keyPatternIter, + forward, + *part.fieldPath, + part.isAscending); + } + + // Increment index + ++i; + } + } + return boost::none; + } + default: + return boost::none; + } +} + std::pair<PipelineD::AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::buildInnerQueryExecutorGeneric(const MultipleCollectionAccessor& collections, const NamespaceString& nss, @@ -945,6 +1074,165 @@ PipelineD::buildInnerQueryExecutorGeneric(const MultipleCollectionAccessor& coll Pipeline::kAllowedMatcherFeatures, &shouldProduceEmptyDocs)); + // If this is a query on a time-series collection then it may be eligible for a post-planning + // sort optimization. We check eligibility and perform the rewrite here. + auto [unpack, sort] = findUnpackThenSort(pipeline->_sources); + if (serverGlobalParams.featureCompatibility.isVersionInitialized() && + serverGlobalParams.featureCompatibility.isGreaterThanOrEqualTo( + multiversion::FeatureCompatibilityVersion::kVersion_6_0) && + feature_flags::gFeatureFlagBucketUnpackWithSort.isEnabled( + serverGlobalParams.featureCompatibility) && + unpack && sort) { + auto execImpl = dynamic_cast<PlanExecutorImpl*>(exec.get()); + if (execImpl) { + + // Get source stage + PlanStage* rootStage = execImpl->getRootStage(); + while (rootStage && rootStage->getChildren().size() == 1) { + switch (rootStage->stageType()) { + case STAGE_FETCH: + rootStage = rootStage->child().get(); + break; + case STAGE_SHARDING_FILTER: + rootStage = rootStage->child().get(); + break; + default: + rootStage = nullptr; + } + } + + if (rootStage && rootStage->getChildren().size() != 0) { + rootStage = nullptr; + } + + const auto& sortPattern = sort->getSortKeyPattern(); + if (auto agree = supportsSort(unpack->bucketUnpacker(), rootStage, sortPattern)) { + // Scan the pipeline to check if it's compatible with the optimization. + bool badStage = false; + bool seenSort = false; + std::list<boost::intrusive_ptr<DocumentSource>>::iterator iter = + pipeline->_sources.begin(); + std::list<boost::intrusive_ptr<DocumentSource>>::iterator unpackIter = + pipeline->_sources.end(); + for (; !badStage && iter != pipeline->_sources.end() && !seenSort; ++iter) { + if (dynamic_cast<const DocumentSourceSort*>(iter->get())) { + seenSort = true; + } else if (dynamic_cast<const DocumentSourceMatch*>(iter->get())) { + // do nothing + } else if (dynamic_cast<const DocumentSourceInternalUnpackBucket*>( + iter->get())) { + unpackIter = iter; + } else if (auto projection = + dynamic_cast<const DocumentSourceSingleDocumentTransformation*>( + iter->get())) { + auto modPaths = projection->getModifiedPaths(); + + // Check to see if the sort paths are modified. + for (auto sortIter = sortPattern.begin(); + !badStage && sortIter != sortPattern.end(); + ++sortIter) { + + auto fieldPath = sortIter->fieldPath; + // If they are then escap the loop & don't optimize. + if (!fieldPath || modPaths.canModify(*fieldPath)) { + badStage = true; + } + } + + } else { + badStage = true; + } + } + if (!badStage && seenSort) { + auto [indexSortOrderAgree, indexOrderedByMinTime] = *agree; + // This is safe because we have seen a sort so we must have at least one stage + // to the left of the current iterator position. + --iter; + + if (indexOrderedByMinTime) { + unpack->setIncludeMinTimeAsMetadata(); + } else { + unpack->setIncludeMaxTimeAsMetadata(); + } + + if (indexSortOrderAgree) { + pipeline->_sources.insert( + iter, + DocumentSourceSort::createBoundedSort(sort->getSortKeyPattern(), + (indexOrderedByMinTime + ? DocumentSourceSort::kMin + : DocumentSourceSort::kMax), + 0, + sort->getLimit(), + expCtx)); + } else { + // Since the sortPattern and the direction of the index don't agree we must + // use the offset to get an estimate on the bounds of the bucket. + pipeline->_sources.insert( + iter, + DocumentSourceSort::createBoundedSort( + sort->getSortKeyPattern(), + (indexOrderedByMinTime ? DocumentSourceSort::kMin + : DocumentSourceSort::kMax), + ((indexOrderedByMinTime) ? unpack->getBucketMaxSpanSeconds() + : -unpack->getBucketMaxSpanSeconds()) * + 1000, + sort->getLimit(), + expCtx)); + + /** + * We wish to create the following predicate to avoid returning incorrect + * results in the unlikely event bucketMaxSpanSeconds changes under us. + * + * {$expr: + * {$lte: [ + * {$subtract: [$control.max.timeField, $control.min.timeField]}, + * {$const: bucketMaxSpanSeconds, in milliseconds} + * ]}} + */ + auto minTime = unpack->getMinTimeField(); + auto maxTime = unpack->getMaxTimeField(); + auto match = std::make_unique<ExprMatchExpression>( + // This produces {$lte: ... } + make_intrusive<ExpressionCompare>( + expCtx.get(), + ExpressionCompare::CmpOp::LTE, + // This produces [...] + makeVector<boost::intrusive_ptr<Expression>>( + // This produces {$subtract: ... } + make_intrusive<ExpressionSubtract>( + expCtx.get(), + // This produces [...] + makeVector<boost::intrusive_ptr<Expression>>( + // This produces "$control.max.timeField" + ExpressionFieldPath::createPathFromString( + expCtx.get(), maxTime, expCtx->variablesParseState), + // This produces "$control.min.timeField" + ExpressionFieldPath::createPathFromString( + expCtx.get(), + minTime, + expCtx->variablesParseState))), + // This produces {$const: maxBucketSpanSeconds} + make_intrusive<ExpressionConstant>( + expCtx.get(), + Value{unpack->getBucketMaxSpanSeconds() * 1000}))), + expCtx); + pipeline->_sources.insert( + unpackIter, + make_intrusive<DocumentSourceMatch>(std::move(match), expCtx)); + } + // Ensure we're erasing the sort source. + tassert(6434901, + "we must erase a $sort stage and replace it with a bounded sort stage", + strcmp((*iter)->getSourceName(), + DocumentSourceSort::kStageName.rawData()) == 0); + pipeline->_sources.erase(iter); + pipeline->stitch(); + } + } + } + } + const auto cursorType = shouldProduceEmptyDocs ? DocumentSourceCursor::CursorType::kEmptyDocuments : DocumentSourceCursor::CursorType::kRegular; diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index c7dcd8e9fec..4a88600b114 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/db/exec/bucket_unpacker.h" #include <boost/intrusive_ptr.hpp> #include <memory> @@ -251,6 +252,84 @@ private: long long sampleSize, long long numRecords, boost::optional<BucketUnpacker> bucketUnpacker); + + typedef bool IndexSortOrderAgree; + typedef bool IndexOrderedByMinTime; + + /* + * Takes a leaf plan stage and a sort pattern and returns a pair if they support the Bucket +Unpacking with Sort Optimization. + * The pair includes whether the index order and sort order agree with each other as its first + * member and the order of the index as the second parameter. + * + * Note that the index scan order is different from the index order. + */ + static boost::optional<std::pair<IndexSortOrderAgree, IndexOrderedByMinTime>> supportsSort( + const BucketUnpacker& bucketUnpacker, PlanStage* root, const SortPattern& sort); + + /* This is a helper method for supportsSort. It takes the current iterator for the index + * keyPattern, the direction of the index scan, the timeField path we're sorting on, and the + * direction of the sort. It returns a pair if this data agrees and none if it doesn't. + * + * The pair contains whether the index order and the sort order agree with each other as the + * firstmember and the order of the index as the second parameter. + * + * Note that the index scan order may be different from the index order. + * N.B.: It ASSUMES that there are two members left in the keyPatternIter iterator, and that the + * timeSortFieldPath is in fact the path on time. + */ + static boost::optional<std::pair<IndexSortOrderAgree, IndexOrderedByMinTime>> checkTimeHelper( + const BucketUnpacker& bucketUnpacker, + BSONObj::iterator& keyPatternIter, + bool scanIsForward, + const FieldPath& timeSortFieldPath, + bool sortIsAscending) { + bool wasMin = false; + bool wasMax = false; + + // Check that the index isn't special. + if ((*keyPatternIter).isNumber() && abs((*keyPatternIter).numberInt()) == 1) { + bool direction = ((*keyPatternIter).numberInt() == 1); + direction = (scanIsForward) ? direction : !direction; + + // Verify the direction and fieldNames match. + wasMin = ((*keyPatternIter).fieldName() == + bucketUnpacker.getMinField(timeSortFieldPath.fullPath())); + wasMax = ((*keyPatternIter).fieldName() == + bucketUnpacker.getMaxField(timeSortFieldPath.fullPath())); + // Terminate early if it wasn't max or min or if the directions don't match. + if ((wasMin || wasMax) && (sortIsAscending == direction)) + return std::pair{wasMin ? sortIsAscending : !sortIsAscending, wasMin}; + } + + return boost::none; + } + + static bool sortAndKeyPatternPartAgreeAndOnMeta(const BucketUnpacker& bucketUnpacker, + const char* keyPatternFieldName, + const FieldPath& sortFieldPath) { + FieldPath keyPatternFieldPath = FieldPath(keyPatternFieldName); + + // If they don't have the same path length they cannot agree. + if (keyPatternFieldPath.getPathLength() != sortFieldPath.getPathLength()) + return false; + + // Check these paths are on the meta field. + if (keyPatternFieldPath.getSubpath(0) != mongo::timeseries::kBucketMetaFieldName) + return false; + if (!bucketUnpacker.getMetaField() || + sortFieldPath.getSubpath(0) != *bucketUnpacker.getMetaField()) { + return false; + } + + // If meta was the only path component then return true. + // Note: We already checked that the path lengths are equal. + if (keyPatternFieldPath.getPathLength() == 1) + return true; + + // Otherwise return if the remaining path components are equal. + return (keyPatternFieldPath.tail() == sortFieldPath.tail()); + } }; } // namespace mongo diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h index 2bb11ee4bb9..57fd9e2b077 100644 --- a/src/mongo/db/sorter/sorter.h +++ b/src/mongo/db/sorter/sorter.h @@ -418,6 +418,7 @@ public: virtual size_t totalDataSizeBytes() const = 0; virtual size_t numSpills() const = 0; + virtual size_t limit() const = 0; // By default, uassert that the input meets our assumptions of being almost-sorted. // But if _checkInput is false, don't do that check. @@ -506,6 +507,10 @@ public: return _numSpills; } + size_t limit() const { + return _opts.limit; + } + bool checkInput() const { return _checkInput; } @@ -529,7 +534,7 @@ private: size_t _numSorted = 0; // Keeps track of the number of keys sorted. uint64_t _totalDataSizeSorted = 0; // Keeps track of the total size of data sorted. - SortOptions _opts; + const SortOptions _opts; using KV = std::pair<Key, Value>; std::priority_queue<KV, std::vector<KV>, Greater> _heap; |