diff options
-rw-r--r-- | jstests/aggregation/extras/utils.js | 63 | ||||
-rw-r--r-- | jstests/core/timeseries/bucket_unpacking_with_sort.js | 356 | ||||
-rw-r--r-- | jstests/core/timeseries/timeseries_internal_bounded_sort.js | 8 | ||||
-rw-r--r-- | jstests/core/timeseries/timeseries_internal_bounded_sort_compound.js | 8 | ||||
-rw-r--r-- | jstests/core/timeseries/timeseries_internal_bounded_sort_compound_mixed_types.js | 4 | ||||
-rw-r--r-- | src/mongo/db/exec/bucket_unpacker.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/exec/bucket_unpacker.h | 17 | ||||
-rw-r--r-- | src/mongo/db/exec/collection_scan.h | 4 | ||||
-rw-r--r-- | src/mongo/db/exec/index_scan.h | 8 | ||||
-rw-r--r-- | src/mongo/db/index/sort_key_generator.h | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source.h | 36 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp | 93 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_internal_unpack_bucket.h | 99 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_sort.cpp | 65 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_sort.h | 14 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 191 |
16 files changed, 34 insertions, 937 deletions
diff --git a/jstests/aggregation/extras/utils.js b/jstests/aggregation/extras/utils.js index 4c71328c027..efbe87fa3cf 100644 --- a/jstests/aggregation/extras/utils.js +++ b/jstests/aggregation/extras/utils.js @@ -441,75 +441,46 @@ function desugarSingleStageAggregation(db, coll, stage) { /** * Runs and asserts an explain command for an aggregation with the given pipeline. Returns just the * pipeline from the explain results regardless of cluster topology. - * The fourth parameter `options` is for a few options for unusual scenarios. - * options.inhibitOptimization defaults to true. This prepends an inhibitOptimization stage to the - * query and removes it before returning results. This is sub ideal for views. options.hint is an - * optional hint that will get passed on to the aggregation stage. It defaults to undefined. */ -function getExplainedPipelineFromAggregation( - db, coll, pipeline, {inhibitOptimization = true, hint} = {}) { +function getExplainedPipelineFromAggregation(db, coll, pipeline) { // Prevent stages from being absorbed into the .find() layer - if (inhibitOptimization) { - pipeline.unshift({$_internalInhibitOptimization: {}}); - } - - const aggOptions = hint ? {hint: hint} : {}; - - const result = coll.explain().aggregate(pipeline, aggOptions); + pipeline.unshift({$_internalInhibitOptimization: {}}); + const result = coll.explain().aggregate(pipeline); assert.commandWorked(result); - return getExplainPipelineFromAggregationResult(db, result, {inhibitOptimization}); + return getExplainPipelineFromAggregationResult(db, result); } -function getExplainPipelineFromAggregationResult(db, result, { - inhibitOptimization = true, -} = {}) { +function getExplainPipelineFromAggregationResult(db, result) { // We proceed by cases based on topology. if (!FixtureHelpers.isMongos(db)) { assert(Array.isArray(result.stages), result); - // The first two stages should be the .find() cursor and the inhibit-optimization stage (if - // enabled); the rest of the stages are what the user's 'stage' expanded to. + // The first two stages should be the .find() cursor and the inhibit-optimization stage; + // the rest of the stages are what the user's 'stage' expanded to. assert(result.stages[0].$cursor, result); - if (inhibitOptimization) { - assert(result.stages[1].$_internalInhibitOptimization, result); - return result.stages.slice(2); - } else { - return result.stages.slice(1); - } + assert(result.stages[1].$_internalInhibitOptimization, result); + return result.stages.slice(2); } else { if (result.splitPipeline) { - let shardsPart = null; - if (inhibitOptimization) { - assert(result.splitPipeline.shardsPart[0].$_internalInhibitOptimization, result); - shardsPart = result.splitPipeline.shardsPart.slice(1); - } else { - shardsPart = result.splitPipeline.shardsPart; - } + assert(result.splitPipeline.shardsPart[0].$_internalInhibitOptimization, result); + const shardsPart = result.splitPipeline.shardsPart.slice(1); assert(result.splitPipeline.mergerPart[0].$mergeCursors, result); const mergerPart = result.splitPipeline.mergerPart.slice(1); return [].concat(shardsPart).concat(mergerPart); } else if (result.stages) { // Required for aggregation_mongos_passthrough. assert(Array.isArray(result.stages), result); - // The first two stages should be the .find() cursor and the inhibit-optimization stage - // (if enabled); the rest of the stages are what the user's 'stage' expanded to. + // The first two stages should be the .find() cursor and the inhibit-optimization stage; + // the rest of the stages are what the user's 'stage' expanded to. assert(result.stages[0].$cursor, result); - if (inhibitOptimization) { - assert(result.stages[1].$_internalInhibitOptimization, result); - return result.stages.slice(2); - } else { - return result.stages.slice(1); - } + assert(result.stages[1].$_internalInhibitOptimization, result); + return result.stages.slice(2); } else { // Required for aggregation_one_shard_sharded_collections. assert(Array.isArray(result.shards["shard-rs0"].stages), result); assert(result.shards["shard-rs0"].stages[0].$cursor, result); - if (inhibitOptimization) { - assert(result.shards["shard-rs0"].stages[1].$_internalInhibitOptimization, result); - return result.shards["shard-rs0"].stages.slice(2); - } else { - return result.shards["shard-rs0"].stages.slice(1); - } + assert(result.shards["shard-rs0"].stages[1].$_internalInhibitOptimization, result); + return result.shards["shard-rs0"].stages.slice(2); } } } diff --git a/jstests/core/timeseries/bucket_unpacking_with_sort.js b/jstests/core/timeseries/bucket_unpacking_with_sort.js deleted file mode 100644 index 77c41091451..00000000000 --- a/jstests/core/timeseries/bucket_unpacking_with_sort.js +++ /dev/null @@ -1,356 +0,0 @@ -/** - * Test that the bucket unpacking with sorting rewrite is performed and doesn't cause incorrect - * results to be created. - * - * @tags: [ - * requires_fcv_61, - * # We need a timeseries collection. - * assumes_no_implicit_collection_creation_after_drop, - * # Bounded sorter is currently broken w/ sharding. - * assumes_unsharded_collection, - * # Cannot insert into a time-series collection in a multi-document transaction. - * does_not_support_transactions, - * # Refusing to run a test that issues an aggregation command with explain because it may - * # return incomplete results if interrupted by a stepdown. - * does_not_support_stepdowns, - * # This complicates aggregation extraction. - * do_not_wrap_aggregations_in_facets, - * # We need a timeseries collection. - * requires_timeseries, - * # Explain of a resolved view must be executed by mongos. - * directly_against_shardsvrs_incompatible, - * ] - */ -(function() { -"use strict"; - -const featureEnabled = - assert.commandWorked(db.adminCommand({getParameter: 1, featureFlagBucketUnpackWithSort: 1})) - .featureFlagBucketUnpackWithSort.value; -if (!featureEnabled) { - jsTestLog("Skipping test because the BUS feature flag is disabled"); - return; -} - -load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. -load("jstests/aggregation/extras/utils.js"); // For getExplainedPipelineFromAggregation. -const collName = "bucket_unpacking_with_sort"; -const coll = db[collName]; -const metaCollName = "bucket_unpacking_with_sort_with_meta"; -const metaColl = db[metaCollName]; -const metaCollSubFieldsName = "bucket_unpacking_with_sort_with_meta_sub"; -const metaCollSubFields = db[metaCollSubFieldsName]; -const subFields = ["a", "b"]; - -const setupColl = (coll, collName, usesMeta, subFields = null) => { - coll.drop(); - if (usesMeta) { - db.createCollection(collName, {timeseries: {timeField: "t", metaField: "m"}}); - } else { - db.createCollection(collName, {timeseries: {timeField: "t"}}); - } - // Create a few buckets. - let meta = usesMeta ? 10 : 1; - let docs = []; - let numberOfItemsPerBucket = - db.adminCommand({getParameter: 1, timeseriesBucketMaxCount: 1}).timeseriesBucketMaxCount; - assert(Number.isInteger(numberOfItemsPerBucket)); - for (let j = 0; j < meta; ++j) { - let metaVal = j; - if (subFields) { - metaVal = subFields.reduce((memo, field) => { - return Object.assign(Object.assign({}, memo), {[field]: j}); - }, {}); - } - for (let i = 0; i < numberOfItemsPerBucket; ++i) { - docs.push({m: metaVal, t: new Date(i * 6000)}); - } - // Because the max bucket span is 3600 * 1000 milliseconds, we know that the above will have - // generated two buckets. Since we are now going back in time to before the minimum - // timestamp of the second bucket, we'll open a third and fourth bucket below. Crucially - // will overlap with the first two buckets. - for (let i = 0; i < numberOfItemsPerBucket; ++i) { - docs.push({m: metaVal, t: new Date(i * 6000 + 3000)}); - } - } - assert.commandWorked(coll.insert(docs)); -}; - -setupColl(coll, collName, false); -setupColl(metaColl, metaCollName, true); -setupColl(metaCollSubFields, metaCollSubFieldsName, true, subFields); - -// For use in reductions. -// Takes a memo (accumulator value) and a stage and returns if the stage was an internal bounded -// sort or the memo was true. -const stageIsInternalBoundedSort = (stage) => { - return stage.hasOwnProperty("$_internalBoundedSort"); -}; - -const hasInternalBoundedSort = (pipeline) => pipeline.some(stageIsInternalBoundedSort); - -const getIfMatch = (memo, stage) => { - if (memo) - return memo; - else if (stage.hasOwnProperty("$match")) - return stage; - else - return null; -}; - -const findFirstMatch = (pipeline) => pipeline.reduce(getIfMatch, null); - -const setup = (coll, createIndex = null) => { - // Create index. - if (createIndex) { - assert.commandWorked(coll.createIndex(createIndex)); - } -}; - -/** - * This test creates a time-series collection, inserts data (with multiple overlapping buckets), and - * tests that the BUS optimization is performed and produces correct results. - * - * `sortSpec` is the sort that we wish to have optimized. - * `createIndex` is the index that we need to create in order to perform the optimization. - * It defaults to null which signals that we don't create an index. - * `hint` is the hint that we specify in order to produce the optimization. - * traversing a min (resp. max) field index on a descending (resp. ascending) sort. - * `testColl` is the collection to use. - */ -const runRewritesTest = (sortSpec, - createIndex, - hint, - testColl, - precise, - intermediaryStages = [], - posteriorStages = []) => { - setup(testColl, createIndex); - - const options = (hint) ? {hint: hint} : {}; - - const match = {$match: {t: {$lt: new Date("2022-01-01")}}}; - // Get results - const optPipeline = [match, ...intermediaryStages, {$sort: sortSpec}, ...posteriorStages]; - const optResults = testColl.aggregate(optPipeline, options).toArray(); - const optExplainFull = testColl.explain().aggregate(optPipeline, options); - - const ogPipeline = [ - match, - ...intermediaryStages, - {$_internalInhibitOptimization: {}}, - {$sort: sortSpec}, - ...posteriorStages - ]; - const ogResults = testColl.aggregate(ogPipeline, options).toArray(); - const ogExplainFull = testColl.explain().aggregate(ogPipeline, options); - - // Assert correct - assert.docEq(optResults, ogResults, {optResults: optResults, ogResults: ogResults}); - - // Check contains stage - const optExplain = getExplainedPipelineFromAggregation( - db, testColl, optPipeline, {inhibitOptimization: false, hint: hint}); - assert(hasInternalBoundedSort(optExplain), optExplainFull); - - // Check doesn't contain stage - const ogExplain = getExplainedPipelineFromAggregation( - db, testColl, ogPipeline, {inhibitOptimization: false, hint: hint}); - assert(!hasInternalBoundedSort(ogExplain), ogExplainFull); - - let foundMatch = findFirstMatch(optExplain); - if (!precise) { - assert.docEq(foundMatch, { - $match: { - $expr: - {$lte: [{$subtract: ["$control.max.t", "$control.min.t"]}, {$const: 3600000}]} - } - }); - } else { - assert.docEq(foundMatch, match); - } -}; - -const runDoesntRewriteTest = (sortSpec, createIndex, hint, testColl, intermediaryStages = []) => { - setup(testColl, createIndex); - - const optPipeline = [ - {$match: {t: {$lt: new Date("2022-01-01")}}}, - ...intermediaryStages, - {$sort: sortSpec}, - ]; - - // Check doesn't contain stage - const optExplain = getExplainedPipelineFromAggregation( - db, testColl, optPipeline, {inhibitOptimization: false, hint: hint}); - const optExplainFull = testColl.explain().aggregate(optPipeline, {hint}); - const containsOptimization = optExplain.reduce(stageIsInternalBoundedSort, false); - assert(!containsOptimization, optExplainFull); -}; - -// Collscan cases -runRewritesTest({t: 1}, null, null, coll, true); -runRewritesTest({t: -1}, null, {$natural: -1}, coll, false); - -// Indexed cases -runRewritesTest({t: 1}, {t: 1}, {t: 1}, coll, true); -runRewritesTest({t: -1}, {t: -1}, {t: -1}, coll, true); -runRewritesTest({t: 1}, {t: 1}, {t: 1}, coll, true); -runRewritesTest({m: 1, t: -1}, {m: 1, t: -1}, {m: 1, t: -1}, metaColl, true); -runRewritesTest({m: -1, t: 1}, {m: -1, t: 1}, {m: -1, t: 1}, metaColl, true); -runRewritesTest({m: -1, t: -1}, {m: -1, t: -1}, {m: -1, t: -1}, metaColl, true); -runRewritesTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, true); - -// Intermediary projects that don't modify sorted fields are allowed. -runRewritesTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, true, [{$project: {a: 0}}]); -runRewritesTest( - {m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, true, [{$project: {m: 1, t: 1}}]); - -// Test multiple meta fields -let metaIndexObj = subFields.reduce((memo, field) => { - return Object.assign(Object.assign({}, memo), {[`m.${field}`]: 1}); -}, {}); -Object.assign(metaIndexObj, {t: 1}); -runRewritesTest(metaIndexObj, metaIndexObj, metaIndexObj, metaCollSubFields, true); -runRewritesTest( - metaIndexObj, metaIndexObj, metaIndexObj, metaCollSubFields, true, [{$project: {m: 1, t: 1}}]); - -// Check sort-limit optimization. -runRewritesTest({t: 1}, {t: 1}, {t: 1}, coll, true, [], [{$limit: 10}]); - -// Check set window fields is optimized as well. -// Since {k: 1} cannot provide a bounded sort we know if there's a bounded sort it comes form -// setWindowFields. -runRewritesTest({k: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, true, [], [ - {$setWindowFields: {partitionBy: "$m", sortBy: {t: 1}, output: {arr: {$max: "$t"}}}} -]); - -// Negative tests -for (let m = -1; m < 2; m++) { - for (let t = -1; t < 2; t++) { - for (let k = -1; k < 2; k++) { - printjson({"currently running": "the following configuration...", m: m, t: t, k: k}); - let sort = null; - let createIndex = null; - let hint = null; - let usesMeta = null; - if (k != 0) { - // This is the case where we add an intermediary incompatible field. - if (m == 0) { - if (t == 0) { - sort = {k: k}; - } else { - sort = {k: k, t: t}; - } - } else { - if (t == 0) { - sort = {m: m, k: k}; - } else { - sort = {m: m, k: k, t: t}; - } - } - hint = sort; - createIndex = sort; - usesMeta = m != 0; - runDoesntRewriteTest(sort, createIndex, hint, usesMeta ? metaColl : coll); - } else { - // This is the case where we do not add an intermediary incompatible field. - // Instead we enumerate the ways that the index and sort could disagree. - - // For the meta case, negate the time order. - // For the non-meta case, use a collscan with a negated order. - if (m == 0) { - if (t == 0) { - // Do not execute a test run. - } else { - sort = {t: t}; - hint = {$natural: -t}; - } - } else { - if (t == 0) { - // Do not execute a test run. - } else { - usesMeta = true; - sort = {m: m, t: t}; - hint = {m: m, t: -t}; - createIndex = hint; - } - } - - if (sort) - runDoesntRewriteTest(sort, createIndex, hint, usesMeta ? metaColl : coll); - - sort = null; - hint = null; - createIndex = null; - usesMeta = false; - // For the meta case, negate the meta order. - // For the non-meta case, use an index instead of a collscan. - if (m == 0) { - if (t == 0) { - // Do not execute a test run. - } else { - sort = {t: t}; - createIndex = {t: -t}; - hint = createIndex; - } - } else { - if (t == 0) { - // Do not execute a test run. - } else { - usesMeta = true; - sort = {m: m, t: t}; - hint = {m: -m, t: t}; - createIndex = hint; - } - } - - if (sort) - runDoesntRewriteTest(sort, createIndex, hint, usesMeta ? metaColl : coll); - - sort = null; - hint = null; - createIndex = null; - usesMeta = false; - // For the meta case, negate both meta and time. - if (m == 0) { - if (t == 0) { - // Do not execute a test run. - } else { - // Do not execute -- we've exhausted relevant cases. - } - } else { - if (t == 0) { - // Do not execute a test run. - } else { - usesMeta = true; - sort = {m: m, t: t}; - hint = {m: -m, t: -t}; - createIndex = hint; - } - } - - if (sort) - runDoesntRewriteTest(sort, createIndex, hint, usesMeta ? metaColl : coll); - } - } - } -} - -// Test mismatched meta paths don't produce the optimization. -runDoesntRewriteTest({m: 1, t: 1}, {"m.a": 1, t: 1}, {"m.a": 1, t: 1}, metaCollSubFields); -runDoesntRewriteTest( - {"m.b": 1, t: 1}, {"m.a": 1, "m.b": 1, t: 1}, {"m.a": 1, "m.b": 1, t: 1}, metaCollSubFields); -runDoesntRewriteTest({"m.a": 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaCollSubFields); -runDoesntRewriteTest({"m.a": 1, "m.b": 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaCollSubFields); -// Test matched meta-subpaths with mismatched directions don't produce the optimization. -runDoesntRewriteTest({"m.a": 1, t: -1}, {"m.a": 1, t: 1}, {"m.a": 1, t: 1}, metaCollSubFields); -// Test intermediary projections that exclude the sorted fields don't produce the optimizaiton. -runDoesntRewriteTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, [{$project: {m: 0}}]); -runDoesntRewriteTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, [{$project: {t: 0}}]); -runDoesntRewriteTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, [{$project: {a: 1}}]); -runDoesntRewriteTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, [{$project: {'m.a': 0}}]); -runDoesntRewriteTest({'m.a': 1, t: 1}, {'m.a': 1, t: 1}, {'m.a': 1, t: 1}, metaCollSubFields, [ - {$project: {'m': 0}} -]); -})(); diff --git a/jstests/core/timeseries/timeseries_internal_bounded_sort.js b/jstests/core/timeseries/timeseries_internal_bounded_sort.js index c52bb3efb65..30e58b7d42e 100644 --- a/jstests/core/timeseries/timeseries_internal_bounded_sort.js +++ b/jstests/core/timeseries/timeseries_internal_bounded_sort.js @@ -88,7 +88,7 @@ function runTest(ascending) { $_internalBoundedSort: { sortKey: {t: ascending ? 1 : -1}, bound: ascending ? {base: "min"} - : {base: "min", offsetSeconds: bucketMaxSpanSeconds} + : {base: "min", offset: bucketMaxSpanSeconds} } }, ]) @@ -104,7 +104,7 @@ function runTest(ascending) { { $_internalBoundedSort: { sortKey: {t: ascending ? 1 : -1}, - bound: ascending ? {base: "max", offsetSeconds: -bucketMaxSpanSeconds} + bound: ascending ? {base: "max", offset: -bucketMaxSpanSeconds} : {base: "max"} } }, @@ -136,7 +136,7 @@ function runTest(ascending) { $_internalBoundedSort: { sortKey: {t: ascending ? 1 : -1}, bound: ascending ? {base: "min"} - : {base: "min", offsetSeconds: bucketMaxSpanSeconds}, + : {base: "min", offset: bucketMaxSpanSeconds}, limit: 100 } }, @@ -154,7 +154,7 @@ function runTest(ascending) { { $_internalBoundedSort: { sortKey: {t: ascending ? 1 : -1}, - bound: ascending ? {base: "max", offsetSeconds: -bucketMaxSpanSeconds} + bound: ascending ? {base: "max", offset: -bucketMaxSpanSeconds} : {base: "max"}, limit: 100 } diff --git a/jstests/core/timeseries/timeseries_internal_bounded_sort_compound.js b/jstests/core/timeseries/timeseries_internal_bounded_sort_compound.js index 208d6a45b9a..2e73a808d85 100644 --- a/jstests/core/timeseries/timeseries_internal_bounded_sort_compound.js +++ b/jstests/core/timeseries/timeseries_internal_bounded_sort_compound.js @@ -131,7 +131,7 @@ function runTest(sortSpec) { $_internalBoundedSort: { sortKey: sortSpec, bound: sortSpec.t > 0 ? {base: "min"} - : {base: "min", offsetSeconds: bucketMaxSpanSeconds} + : {base: "min", offset: bucketMaxSpanSeconds} } }, ]; @@ -145,7 +145,7 @@ function runTest(sortSpec) { { $_internalBoundedSort: { sortKey: sortSpec, - bound: sortSpec.t > 0 ? {base: "max", offsetSeconds: -bucketMaxSpanSeconds} + bound: sortSpec.t > 0 ? {base: "max", offset: -bucketMaxSpanSeconds} : {base: "max"} } }, @@ -174,7 +174,7 @@ function runTest(sortSpec) { $_internalBoundedSort: { sortKey: sortSpec, bound: sortSpec.t > 0 ? {base: "min"} - : {base: "min", offsetSeconds: bucketMaxSpanSeconds}, + : {base: "min", offset: bucketMaxSpanSeconds}, limit: 100 } } @@ -190,7 +190,7 @@ function runTest(sortSpec) { { $_internalBoundedSort: { sortKey: sortSpec, - bound: sortSpec.t > 0 ? {base: "max", offsetSeconds: -bucketMaxSpanSeconds} + bound: sortSpec.t > 0 ? {base: "max", offset: -bucketMaxSpanSeconds} : {base: "max"}, limit: 100 } diff --git a/jstests/core/timeseries/timeseries_internal_bounded_sort_compound_mixed_types.js b/jstests/core/timeseries/timeseries_internal_bounded_sort_compound_mixed_types.js index 417818559d6..a986458a99f 100644 --- a/jstests/core/timeseries/timeseries_internal_bounded_sort_compound_mixed_types.js +++ b/jstests/core/timeseries/timeseries_internal_bounded_sort_compound_mixed_types.js @@ -131,7 +131,7 @@ function runTest(sortSpec) { // Use a much looser bound than necessary, to exercise the partitioning logic more. // With such a loose bound, events are only released due to a partition boundary, // never a bucket boundary. - bound: {base: "min", offsetSeconds: -sortSpec.t * 10 * bucketMaxSpanSeconds}, + bound: {base: "min", offset: -sortSpec.t * 10 * bucketMaxSpanSeconds}, } }, ]; @@ -152,7 +152,7 @@ function runTest(sortSpec) { // Use a much looser bound than necessary, to exercise the partitioning logic more. // With such a loose bound, events are only released due to a partition boundary, // never a bucket boundary. - bound: {base: "max", offsetSeconds: -sortSpec.t * 10 * bucketMaxSpanSeconds}, + bound: {base: "max", offset: -sortSpec.t * 10 * bucketMaxSpanSeconds}, } }, ]; diff --git a/src/mongo/db/exec/bucket_unpacker.cpp b/src/mongo/db/exec/bucket_unpacker.cpp index 0651aae78ee..7703c9d3949 100644 --- a/src/mongo/db/exec/bucket_unpacker.cpp +++ b/src/mongo/db/exec/bucket_unpacker.cpp @@ -42,6 +42,7 @@ #include "mongo/db/matcher/expression_tree.h" #include "mongo/db/matcher/extensions_callback_noop.h" #include "mongo/db/pipeline/expression.h" +#include "mongo/db/timeseries/timeseries_constants.h" #include "mongo/db/timeseries/timeseries_options.h" namespace mongo { diff --git a/src/mongo/db/exec/bucket_unpacker.h b/src/mongo/db/exec/bucket_unpacker.h index 287bd9f2540..90465257415 100644 --- a/src/mongo/db/exec/bucket_unpacker.h +++ b/src/mongo/db/exec/bucket_unpacker.h @@ -37,7 +37,6 @@ #include "mongo/db/exec/document_value/document.h" #include "mongo/db/matcher/expression.h" #include "mongo/db/pipeline/expression_context.h" -#include "mongo/db/timeseries/timeseries_constants.h" namespace mongo { /** @@ -290,22 +289,6 @@ public: return _includeMaxTimeAsMetadata; } - const std::string& getTimeField() const { - return _spec.timeField(); - } - - const boost::optional<std::string>& getMetaField() const { - return _spec.metaField(); - } - - std::string getMinField(StringData field) const { - return std::string{timeseries::kControlMinFieldNamePrefix} + field; - } - - std::string getMaxField(StringData field) const { - return std::string{timeseries::kControlMaxFieldNamePrefix} + field; - } - void setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior); void setIncludeMinTimeAsMetadata(); void setIncludeMaxTimeAsMetadata(); diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h index f9ce637dbad..0de33f09899 100644 --- a/src/mongo/db/exec/collection_scan.h +++ b/src/mongo/db/exec/collection_scan.h @@ -91,10 +91,6 @@ public: const SpecificStats* getSpecificStats() const final; - CollectionScanParams::Direction getDirection() const { - return _params.direction; - } - protected: void doSaveStateRequiresCollection() final; diff --git a/src/mongo/db/exec/index_scan.h b/src/mongo/db/exec/index_scan.h index c0cf75bf02d..3efc3360d08 100644 --- a/src/mongo/db/exec/index_scan.h +++ b/src/mongo/db/exec/index_scan.h @@ -131,14 +131,6 @@ public: static const char* kStageType; - const BSONObj& getKeyPattern() const { - return _keyPattern; - } - - bool isForward() const { - return _forward; - } - protected: void doSaveStateRequiresIndex() final; diff --git a/src/mongo/db/index/sort_key_generator.h b/src/mongo/db/index/sort_key_generator.h index 40e270dac21..d9b73c75c38 100644 --- a/src/mongo/db/index/sort_key_generator.h +++ b/src/mongo/db/index/sort_key_generator.h @@ -75,10 +75,6 @@ public: return _sortPattern.isSingleElementKey(); } - const SortPattern& getSortPattern() const { - return _sortPattern; - } - private: // Returns the sort key for the input 'doc' as a Value. // diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index ff2aaf9f8ae..37a6b103140 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -49,7 +49,6 @@ #include "mongo/db/exec/scoped_timer.h" #include "mongo/db/generic_cursor.h" #include "mongo/db/jsobj.h" -#include "mongo/db/matcher/expression_algo.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/expression_context.h" @@ -598,41 +597,6 @@ public: return newNames; } - bool canModify(const FieldPath& fieldPath) const { - switch (type) { - case Type::kAllPaths: - return true; - case Type::kNotSupported: - return true; - case Type::kFiniteSet: - // If there's a subpath that is modified this path may be modified. - for (size_t i = 0; i < fieldPath.getPathLength(); i++) { - if (paths.count(fieldPath.getSubpath(i).toString())) - return true; - } - - for (auto&& path : paths) { - // If there's a superpath that is modified this path may be modified. - if (expression::isPathPrefixOf(fieldPath.fullPath(), path)) { - return true; - } - } - - return false; - case Type::kAllExcept: - // If one of the subpaths is unmodified return false. - for (size_t i = 0; i < fieldPath.getPathLength(); i++) { - if (paths.count(fieldPath.getSubpath(i).toString())) - return false; - } - - // Otherwise return true; - return true; - } - // Cannot hit. - MONGO_UNREACHABLE_TASSERT(6434901); - } - Type type; std::set<std::string> paths; diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp index 6a1a57daa30..4aef3bc0c2d 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp @@ -1192,97 +1192,4 @@ DocumentSource::GetModPathsReturn DocumentSourceInternalUnpackBucket::getModifie return {GetModPathsReturn::Type::kAllPaths, std::set<std::string>{}, {}}; } -boost::optional<std::pair<DocumentSourceInternalUnpackBucket::IndexSortOrderAgree, - DocumentSourceInternalUnpackBucket::IndexOrderedByMinTime>> -DocumentSourceInternalUnpackBucket::supportsSort(PlanStage* root, const SortPattern& sort) const { - if (!root) - return boost::none; - - switch (root->stageType()) { - case STAGE_COLLSCAN: { - const CollectionScan* scan = static_cast<CollectionScan*>(root); - if (sort.size() == 1) { - auto part = sort[0]; - // Check the sort we're asking for is on time. - if (part.fieldPath && *part.fieldPath == _bucketUnpacker.getTimeField()) { - // Check that the directions agree. - if ((scan->getDirection() == CollectionScanParams::Direction::FORWARD) == - part.isAscending) - return std::pair{part.isAscending, true}; - } - } - return boost::none; - } - case STAGE_IXSCAN: { - const IndexScan* scan = static_cast<IndexScan*>(root); - - const auto keyPattern = scan->getKeyPattern(); - const bool forward = scan->isForward(); - - // Return none if the keyPattern cannot support the sort. - // Note We add one to sort size to account for the compounding on min/max for the time - // field. - if ((sort.size() + 1 > (unsigned int)keyPattern.nFields()) || (sort.size() < 1)) { - return boost::none; - } - - if (sort.size() == 1) { - auto part = sort[0]; - - // TOOD SERVER-65050: implement here. - - // Check the sort we're asking for is on time. - if (part.fieldPath && *part.fieldPath == _bucketUnpacker.getTimeField()) { - auto keyPatternIter = keyPattern.begin(); - - return checkTimeHelper( - keyPatternIter, forward, *part.fieldPath, part.isAscending); - } - } else if (sort.size() >= 2) { - size_t i = 0; - - - for (auto keyPatternIter = keyPattern.begin(); - i < sort.size() && keyPatternIter != keyPattern.end(); - (++keyPatternIter)) { - auto part = sort[i]; - if (!(part.fieldPath)) - return boost::none; - - if (i < sort.size() - 1) { - // Check the meta field index isn't special. - if (!(*keyPatternIter).isNumber() || - abs((*keyPatternIter).numberInt()) != 1) { - return boost::none; - } - - // True = ascending; false = descending. - bool direction = ((*keyPatternIter).numberInt() == 1); - direction = (forward) ? direction : !direction; - - // Return false if partOne and the first keyPattern part don't agree. - if (!sortAndKeyPatternPartAgreeAndOnMeta((*keyPatternIter).fieldName(), - *part.fieldPath) || - part.isAscending != direction) - return boost::none; - } else { - if (!part.fieldPath || - !(*part.fieldPath == _bucketUnpacker.getTimeField())) { - return boost::none; - } - - return checkTimeHelper( - keyPatternIter, forward, *part.fieldPath, part.isAscending); - } - - // Increment index - ++i; - } - } - return boost::none; - } - default: - return boost::none; - } -} } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h index 8d7506ffe48..96bca0e1680 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h @@ -33,9 +33,6 @@ #include <vector> #include "mongo/db/exec/bucket_unpacker.h" -#include "mongo/db/exec/collection_scan.h" -#include "mongo/db/exec/index_scan.h" -#include "mongo/db/exec/plan_stage.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_match.h" @@ -106,18 +103,6 @@ public: return DepsTracker::State::EXHAUSTIVE_ALL; } - int getBucketMaxSpanSeconds() const { - return _bucketMaxSpanSeconds; - } - - std::string getMinTimeField() const { - return _bucketUnpacker.getMinField(_bucketUnpacker.getTimeField()); - } - - std::string getMaxTimeField() const { - return _bucketUnpacker.getMaxField(_bucketUnpacker.getTimeField()); - } - boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; }; @@ -126,20 +111,6 @@ public: return _bucketUnpacker.copy(); } - typedef bool IndexSortOrderAgree; - typedef bool IndexOrderedByMinTime; - - /* - * Takes a leaf plan stage and a sort pattern and returns a pair if they support the Bucket -Unpacking with Sort Optimization. - * The pair includes whether the index order and sort order agree with each other as its first - * member and the order of the index as the second parameter. - * - * Note that the index scan order is different from the index order. - */ - boost::optional<std::pair<IndexSortOrderAgree, IndexOrderedByMinTime>> supportsSort( - PlanStage* root, const SortPattern& sort) const; - Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) final; @@ -193,14 +164,6 @@ Unpacking with Sort Optimization. _bucketMaxCount = bucketMaxCount; } - void setIncludeMinTimeAsMetadata() { - _bucketUnpacker.setIncludeMinTimeAsMetadata(); - } - - void setIncludeMaxTimeAsMetadata() { - _bucketUnpacker.setIncludeMaxTimeAsMetadata(); - } - boost::optional<long long> sampleSize() const { return _sampleSize; } @@ -255,68 +218,6 @@ Unpacking with Sort Optimization. GetModPathsReturn getModifiedPaths() const final override; private: - /* This is a helper method for supportsSort. It takes the current iterator for the index - * keyPattern, the direction of the index scan, the timeField path we're sorting on, and the - * direction of the sort. It returns a pair if this data agrees and none if it doesn't. - * - * The pair contains whether the index order and the sort order agree with each other as the - * firstmember and the order of the index as the second parameter. - * - * Note that the index scan order may be different from the index order. - * N.B.: It ASSUMES that there are two members left in the keyPatternIter iterator, and that the - * timeSortFieldPath is in fact the path on time. - */ - boost::optional<std::pair<IndexSortOrderAgree, IndexOrderedByMinTime>> checkTimeHelper( - BSONObj::iterator& keyPatternIter, - bool scanIsForward, - const FieldPath& timeSortFieldPath, - bool sortIsAscending) const { - bool wasMin = false; - bool wasMax = false; - - // Check that the index isn't special. - if ((*keyPatternIter).isNumber() && abs((*keyPatternIter).numberInt()) == 1) { - bool direction = ((*keyPatternIter).numberInt() == 1); - direction = (scanIsForward) ? direction : !direction; - - // Verify the direction and fieldNames match. - wasMin = ((*keyPatternIter).fieldName() == - _bucketUnpacker.getMinField(timeSortFieldPath.fullPath())); - wasMax = ((*keyPatternIter).fieldName() == - _bucketUnpacker.getMaxField(timeSortFieldPath.fullPath())); - // Terminate early if it wasn't max or min or if the directions don't match. - if ((wasMin || wasMax) && (sortIsAscending == direction)) - return std::pair{wasMin ? sortIsAscending : !sortIsAscending, wasMin}; - } - - return boost::none; - } - - bool sortAndKeyPatternPartAgreeAndOnMeta(const char* keyPatternFieldName, - const FieldPath& sortFieldPath) const { - FieldPath keyPatternFieldPath = FieldPath(keyPatternFieldName); - - // If they don't have the same path length they cannot agree. - if (keyPatternFieldPath.getPathLength() != sortFieldPath.getPathLength()) - return false; - - // Check these paths are on the meta field. - if (keyPatternFieldPath.getSubpath(0) != mongo::timeseries::kBucketMetaFieldName) - return false; - if (!_bucketUnpacker.getMetaField() || - sortFieldPath.getSubpath(0) != *_bucketUnpacker.getMetaField()) { - return false; - } - - // If meta was the only path component then return true. - // Note: We already checked that the path lengths are equal. - if (keyPatternFieldPath.getPathLength() == 1) - return true; - - // Otherwise return if the remaining path components are equal. - return (keyPatternFieldPath.tail() == sortFieldPath.tail()); - } - GetNextResult doGetNext() final; bool haveComputedMetaField() const; diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index 2503a2e0857..bf9c72eb89d 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -59,6 +59,9 @@ using std::unique_ptr; using std::vector; namespace { +constexpr StringData kMin = "min"_sd; +constexpr StringData kMax = "max"_sd; + struct BoundMakerMin { const long long offset; // Offset in millis @@ -69,9 +72,7 @@ struct BoundMakerMin { } Document serialize() const { - // Convert from millis to seconds. - return Document{{{"base"_sd, DocumentSourceSort::kMin}, - {DocumentSourceSort::kOffset, (offset / 1000)}}}; + return Document{{{"base"_sd, kMin}, {"offset"_sd, offset}}}; } }; @@ -85,9 +86,7 @@ struct BoundMakerMax { } Document serialize() const { - // Convert from millis to seconds. - return Document{{{"base"_sd, DocumentSourceSort::kMax}, - {DocumentSourceSort::kOffset, (offset / 1000)}}}; + return Document{{{"base"_sd, kMax}, {"offset"_sd, offset}}}; } }; struct CompAsc { @@ -424,58 +423,6 @@ intrusive_ptr<DocumentSourceSort> DocumentSourceSort::create( return pSort; } -intrusive_ptr<DocumentSourceSort> DocumentSourceSort::createBoundedSort( - SortPattern pat, - StringData boundBase, - long long boundOffset, - boost::optional<long long> limit, - const intrusive_ptr<ExpressionContext>& expCtx) { - - auto ds = DocumentSourceSort::create(expCtx, pat); - - SortOptions opts; - opts.maxMemoryUsageBytes = internalQueryMaxBlockingSortMemoryUsageBytes.load(); - if (expCtx->allowDiskUse) { - opts.extSortAllowed = true; - opts.tempDir = expCtx->tempDir; - } - - if (limit) { - opts.Limit(limit.get()); - } - - if (boundBase == kMin) { - if (pat.back().isAscending) { - ds->_timeSorter.reset( - new TimeSorterAscMin{opts, CompAsc{}, BoundMakerMin{boundOffset}}); - } else { - ds->_timeSorter.reset( - new TimeSorterDescMin{opts, CompDesc{}, BoundMakerMin{boundOffset}}); - } - ds->_requiredMetadata.set(DocumentMetadataFields::MetaType::kTimeseriesBucketMinTime); - } else if (boundBase == kMax) { - if (pat.back().isAscending) { - ds->_timeSorter.reset( - new TimeSorterAscMax{opts, CompAsc{}, BoundMakerMax{boundOffset}}); - } else { - ds->_timeSorter.reset( - new TimeSorterDescMax{opts, CompDesc{}, BoundMakerMax{boundOffset}}); - } - ds->_requiredMetadata.set(DocumentMetadataFields::MetaType::kTimeseriesBucketMaxTime); - } else { - MONGO_UNREACHABLE; - } - - if (pat.size() > 1) { - SortPattern partitionKey = - std::vector<SortPattern::SortPatternPart>(pat.begin(), pat.end() - 1); - ds->_timeSorterPartitionKeyGen = - SortKeyGenerator{std::move(partitionKey), expCtx->getCollator()}; - } - - return ds; -} - intrusive_ptr<DocumentSourceSort> DocumentSourceSort::parseBoundedSort( BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { uassert(6369905, @@ -504,7 +451,7 @@ intrusive_ptr<DocumentSourceSort> DocumentSourceSort::parseBoundedSort( uassert( 6460200, "$_internalBoundedSort bound must be an object", bound && bound.type() == Object); - BSONElement boundOffsetElem = bound.Obj()[DocumentSourceSort::kOffset]; + BSONElement boundOffsetElem = bound.Obj()["offset"]; long long boundOffset = 0; if (boundOffsetElem && boundOffsetElem.isNumber()) { boundOffset = uassertStatusOK(boundOffsetElem.parseIntegerElementToLong()) * diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index ac592c08c6c..89e302330c2 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -42,10 +42,6 @@ namespace mongo { class DocumentSourceSort final : public DocumentSource { public: - static constexpr StringData kMin = "min"_sd; - static constexpr StringData kMax = "max"_sd; - static constexpr StringData kOffset = "offsetSeconds"_sd; - struct SortableDate { Date_t date; @@ -134,12 +130,6 @@ public: return create(pExpCtx, {sortOrder, pExpCtx}); } - static boost::intrusive_ptr<DocumentSourceSort> createBoundedSort( - SortPattern pat, - StringData boundBase, - long long boundOffset, - boost::optional<long long> limit, - const boost::intrusive_ptr<ExpressionContext>& expCtx); /** * Parse a stage that uses BoundedSorter. */ @@ -174,10 +164,6 @@ public: return _populated; }; - bool isBoundedSortStage() { - return (_timeSorter) ? true : false; - } - bool hasLimit() const { return _sortExecutor->hasLimit(); } diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 8347e6ed458..0719e235bb9 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -28,7 +28,6 @@ */ #include "mongo/db/query/projection_parser.h" -#include "mongo/db/server_options.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery #include "mongo/platform/basic.h" @@ -54,7 +53,6 @@ #include "mongo/db/exec/unpack_timeseries_bucket.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/index/index_access_method.h" -#include "mongo/db/matcher/expression_expr.h" #include "mongo/db/matcher/extensions_callback_real.h" #include "mongo/db/namespace_string.h" #include "mongo/db/ops/write_ops_exec.h" @@ -79,7 +77,6 @@ #include "mongo/db/query/collation/collator_interface.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_executor_factory.h" -#include "mongo/db/query/plan_executor_impl.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/query_feature_flags_gen.h" #include "mongo/db/query/query_knobs_gen.h" @@ -409,35 +406,6 @@ std::pair<DocumentSourceSample*, DocumentSourceInternalUnpackBucket*> extractSam return std::pair{sampleStage, unpackStage}; } - -std::tuple<DocumentSourceInternalUnpackBucket*, DocumentSourceSort*> findUnpackThenSort( - const Pipeline::SourceContainer& sources) { - DocumentSourceSort* sortStage = nullptr; - DocumentSourceInternalUnpackBucket* unpackStage = nullptr; - - auto sourcesIt = sources.begin(); - while (sourcesIt != sources.end()) { - if (!sortStage) { - sortStage = dynamic_cast<DocumentSourceSort*>(sourcesIt->get()); - - if (sortStage) { - // Do not double optimize - if (sortStage->isBoundedSortStage()) { - return {nullptr, nullptr}; - } - - return {unpackStage, sortStage}; - } - } - - if (!unpackStage) { - unpackStage = dynamic_cast<DocumentSourceInternalUnpackBucket*>(sourcesIt->get()); - } - ++sourcesIt; - } - - return {unpackStage, sortStage}; -} } // namespace StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::createRandomCursorExecutor( @@ -976,165 +944,6 @@ PipelineD::buildInnerQueryExecutorGeneric(const MultipleCollectionAccessor& coll Pipeline::kAllowedMatcherFeatures, &shouldProduceEmptyDocs)); - // If this is a query on a time-series collection then it may be eligible for a post-planning - // sort optimization. We check eligibility and perform the rewrite here. - auto [unpack, sort] = findUnpackThenSort(pipeline->_sources); - if (serverGlobalParams.featureCompatibility.isVersionInitialized() && - serverGlobalParams.featureCompatibility.isGreaterThanOrEqualTo( - multiversion::FeatureCompatibilityVersion::kVersion_6_1) && - feature_flags::gFeatureFlagBucketUnpackWithSort.isEnabled( - serverGlobalParams.featureCompatibility) && - unpack && sort) { - auto execImpl = dynamic_cast<PlanExecutorImpl*>(exec.get()); - if (execImpl) { - - // Get source stage - PlanStage* rootStage = execImpl->getRootStage(); - while (rootStage && rootStage->getChildren().size() == 1) { - switch (rootStage->stageType()) { - case STAGE_FETCH: - rootStage = rootStage->child().get(); - break; - case STAGE_SHARDING_FILTER: - rootStage = rootStage->child().get(); - break; - default: - rootStage = nullptr; - } - } - - if (rootStage && rootStage->getChildren().size() != 0) { - rootStage = nullptr; - } - - const auto& sortPattern = sort->getSortKeyPattern(); - if (auto agree = unpack->supportsSort(rootStage, sortPattern)) { - // Scan the pipeline to check if it's compatible with the optimization. - bool badStage = false; - bool seenSort = false; - std::list<boost::intrusive_ptr<DocumentSource>>::iterator iter = - pipeline->_sources.begin(); - std::list<boost::intrusive_ptr<DocumentSource>>::iterator unpackIter = - pipeline->_sources.end(); - for (; !badStage && iter != pipeline->_sources.end() && !seenSort; ++iter) { - if (dynamic_cast<const DocumentSourceSort*>(iter->get())) { - seenSort = true; - } else if (dynamic_cast<const DocumentSourceMatch*>(iter->get())) { - // do nothing - } else if (dynamic_cast<const DocumentSourceInternalUnpackBucket*>( - iter->get())) { - unpackIter = iter; - } else if (auto projection = - dynamic_cast<const DocumentSourceSingleDocumentTransformation*>( - iter->get())) { - auto modPaths = projection->getModifiedPaths(); - - // Check to see if the sort paths are modified. - for (auto sortIter = sortPattern.begin(); - !badStage && sortIter != sortPattern.end(); - ++sortIter) { - - auto fieldPath = sortIter->fieldPath; - // If they are then escap the loop & don't optimize. - if (!fieldPath || modPaths.canModify(*fieldPath)) { - badStage = true; - } - } - - } else { - badStage = true; - } - } - if (!badStage && seenSort) { - auto [indexSortOrderAgree, indexOrderedByMinTime] = *agree; - // This is safe because we have seen a sort so we must have at least one stage - // to the left of the current iterator position. - --iter; - - if (indexOrderedByMinTime) { - unpack->setIncludeMinTimeAsMetadata(); - } else { - unpack->setIncludeMaxTimeAsMetadata(); - } - - if (indexSortOrderAgree) { - pipeline->_sources.insert( - iter, - DocumentSourceSort::createBoundedSort(sort->getSortKeyPattern(), - (indexOrderedByMinTime - ? DocumentSourceSort::kMin - : DocumentSourceSort::kMax), - 0, - sort->getLimit(), - expCtx)); - } else { - // Since the sortPattern and the direction of the index don't agree we must - // use the offset to get an estimate on the bounds of the bucket. - pipeline->_sources.insert( - iter, - DocumentSourceSort::createBoundedSort( - sort->getSortKeyPattern(), - (indexOrderedByMinTime ? DocumentSourceSort::kMin - : DocumentSourceSort::kMax), - ((indexOrderedByMinTime) ? unpack->getBucketMaxSpanSeconds() - : -unpack->getBucketMaxSpanSeconds()) * - 1000, - sort->getLimit(), - expCtx)); - - /** - * We wish to create the following predicate to avoid returning incorrect - * results in the unlikely event bucketMaxSpanSeconds changes under us. - * - * {$expr: - * {$lte: [ - * {$subtract: [$control.max.timeField, $control.min.timeField]}, - * {$const: bucketMaxSpanSeconds, in milliseconds} - * ]}} - */ - auto minTime = unpack->getMinTimeField(); - auto maxTime = unpack->getMaxTimeField(); - auto match = std::make_unique<ExprMatchExpression>( - // This produces {$lte: ... } - make_intrusive<ExpressionCompare>( - expCtx.get(), - ExpressionCompare::CmpOp::LTE, - // This produces [...] - makeVector<boost::intrusive_ptr<Expression>>( - // This produces {$subtract: ... } - make_intrusive<ExpressionSubtract>( - expCtx.get(), - // This produces [...] - makeVector<boost::intrusive_ptr<Expression>>( - // This produces "$control.max.timeField" - ExpressionFieldPath::createPathFromString( - expCtx.get(), maxTime, expCtx->variablesParseState), - // This produces "$control.min.timeField" - ExpressionFieldPath::createPathFromString( - expCtx.get(), - minTime, - expCtx->variablesParseState))), - // This produces {$const: maxBucketSpanSeconds} - make_intrusive<ExpressionConstant>( - expCtx.get(), - Value{unpack->getBucketMaxSpanSeconds() * 1000}))), - expCtx); - pipeline->_sources.insert( - unpackIter, - make_intrusive<DocumentSourceMatch>(std::move(match), expCtx)); - } - // Ensure we're erasing the sort source. - tassert(6434901, - "we must erase a $sort stage and replace it with a bounded sort stage", - strcmp((*iter)->getSourceName(), - DocumentSourceSort::kStageName.rawData()) == 0); - pipeline->_sources.erase(iter); - pipeline->stitch(); - } - } - } - } - const auto cursorType = shouldProduceEmptyDocs ? DocumentSourceCursor::CursorType::kEmptyDocuments : DocumentSourceCursor::CursorType::kRegular; |