summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/aggregation/extras/utils.js63
-rw-r--r--jstests/core/timeseries/bucket_unpacking_with_sort.js356
-rw-r--r--jstests/core/timeseries/timeseries_internal_bounded_sort.js8
-rw-r--r--jstests/core/timeseries/timeseries_internal_bounded_sort_compound.js8
-rw-r--r--jstests/core/timeseries/timeseries_internal_bounded_sort_compound_mixed_types.js4
-rw-r--r--src/mongo/db/exec/bucket_unpacker.cpp1
-rw-r--r--src/mongo/db/exec/bucket_unpacker.h17
-rw-r--r--src/mongo/db/exec/collection_scan.h4
-rw-r--r--src/mongo/db/exec/index_scan.h8
-rw-r--r--src/mongo/db/index/sort_key_generator.h4
-rw-r--r--src/mongo/db/pipeline/document_source.h36
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp1
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.h23
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp65
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h14
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp288
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h79
17 files changed, 944 insertions, 35 deletions
diff --git a/jstests/aggregation/extras/utils.js b/jstests/aggregation/extras/utils.js
index efbe87fa3cf..4c71328c027 100644
--- a/jstests/aggregation/extras/utils.js
+++ b/jstests/aggregation/extras/utils.js
@@ -441,46 +441,75 @@ function desugarSingleStageAggregation(db, coll, stage) {
/**
* Runs and asserts an explain command for an aggregation with the given pipeline. Returns just the
* pipeline from the explain results regardless of cluster topology.
+ * The fourth parameter `options` is for a few options for unusual scenarios.
+ * options.inhibitOptimization defaults to true. This prepends an inhibitOptimization stage to the
+ * query and removes it before returning results. This is sub ideal for views. options.hint is an
+ * optional hint that will get passed on to the aggregation stage. It defaults to undefined.
*/
-function getExplainedPipelineFromAggregation(db, coll, pipeline) {
+function getExplainedPipelineFromAggregation(
+ db, coll, pipeline, {inhibitOptimization = true, hint} = {}) {
// Prevent stages from being absorbed into the .find() layer
- pipeline.unshift({$_internalInhibitOptimization: {}});
- const result = coll.explain().aggregate(pipeline);
+ if (inhibitOptimization) {
+ pipeline.unshift({$_internalInhibitOptimization: {}});
+ }
+
+ const aggOptions = hint ? {hint: hint} : {};
+
+ const result = coll.explain().aggregate(pipeline, aggOptions);
assert.commandWorked(result);
- return getExplainPipelineFromAggregationResult(db, result);
+ return getExplainPipelineFromAggregationResult(db, result, {inhibitOptimization});
}
-function getExplainPipelineFromAggregationResult(db, result) {
+function getExplainPipelineFromAggregationResult(db, result, {
+ inhibitOptimization = true,
+} = {}) {
// We proceed by cases based on topology.
if (!FixtureHelpers.isMongos(db)) {
assert(Array.isArray(result.stages), result);
- // The first two stages should be the .find() cursor and the inhibit-optimization stage;
- // the rest of the stages are what the user's 'stage' expanded to.
+ // The first two stages should be the .find() cursor and the inhibit-optimization stage (if
+ // enabled); the rest of the stages are what the user's 'stage' expanded to.
assert(result.stages[0].$cursor, result);
- assert(result.stages[1].$_internalInhibitOptimization, result);
- return result.stages.slice(2);
+ if (inhibitOptimization) {
+ assert(result.stages[1].$_internalInhibitOptimization, result);
+ return result.stages.slice(2);
+ } else {
+ return result.stages.slice(1);
+ }
} else {
if (result.splitPipeline) {
- assert(result.splitPipeline.shardsPart[0].$_internalInhibitOptimization, result);
- const shardsPart = result.splitPipeline.shardsPart.slice(1);
+ let shardsPart = null;
+ if (inhibitOptimization) {
+ assert(result.splitPipeline.shardsPart[0].$_internalInhibitOptimization, result);
+ shardsPart = result.splitPipeline.shardsPart.slice(1);
+ } else {
+ shardsPart = result.splitPipeline.shardsPart;
+ }
assert(result.splitPipeline.mergerPart[0].$mergeCursors, result);
const mergerPart = result.splitPipeline.mergerPart.slice(1);
return [].concat(shardsPart).concat(mergerPart);
} else if (result.stages) {
// Required for aggregation_mongos_passthrough.
assert(Array.isArray(result.stages), result);
- // The first two stages should be the .find() cursor and the inhibit-optimization stage;
- // the rest of the stages are what the user's 'stage' expanded to.
+ // The first two stages should be the .find() cursor and the inhibit-optimization stage
+ // (if enabled); the rest of the stages are what the user's 'stage' expanded to.
assert(result.stages[0].$cursor, result);
- assert(result.stages[1].$_internalInhibitOptimization, result);
- return result.stages.slice(2);
+ if (inhibitOptimization) {
+ assert(result.stages[1].$_internalInhibitOptimization, result);
+ return result.stages.slice(2);
+ } else {
+ return result.stages.slice(1);
+ }
} else {
// Required for aggregation_one_shard_sharded_collections.
assert(Array.isArray(result.shards["shard-rs0"].stages), result);
assert(result.shards["shard-rs0"].stages[0].$cursor, result);
- assert(result.shards["shard-rs0"].stages[1].$_internalInhibitOptimization, result);
- return result.shards["shard-rs0"].stages.slice(2);
+ if (inhibitOptimization) {
+ assert(result.shards["shard-rs0"].stages[1].$_internalInhibitOptimization, result);
+ return result.shards["shard-rs0"].stages.slice(2);
+ } else {
+ return result.shards["shard-rs0"].stages.slice(1);
+ }
}
}
}
diff --git a/jstests/core/timeseries/bucket_unpacking_with_sort.js b/jstests/core/timeseries/bucket_unpacking_with_sort.js
new file mode 100644
index 00000000000..77c41091451
--- /dev/null
+++ b/jstests/core/timeseries/bucket_unpacking_with_sort.js
@@ -0,0 +1,356 @@
+/**
+ * Test that the bucket unpacking with sorting rewrite is performed and doesn't cause incorrect
+ * results to be created.
+ *
+ * @tags: [
+ * requires_fcv_61,
+ * # We need a timeseries collection.
+ * assumes_no_implicit_collection_creation_after_drop,
+ * # Bounded sorter is currently broken w/ sharding.
+ * assumes_unsharded_collection,
+ * # Cannot insert into a time-series collection in a multi-document transaction.
+ * does_not_support_transactions,
+ * # Refusing to run a test that issues an aggregation command with explain because it may
+ * # return incomplete results if interrupted by a stepdown.
+ * does_not_support_stepdowns,
+ * # This complicates aggregation extraction.
+ * do_not_wrap_aggregations_in_facets,
+ * # We need a timeseries collection.
+ * requires_timeseries,
+ * # Explain of a resolved view must be executed by mongos.
+ * directly_against_shardsvrs_incompatible,
+ * ]
+ */
+(function() {
+"use strict";
+
+const featureEnabled =
+ assert.commandWorked(db.adminCommand({getParameter: 1, featureFlagBucketUnpackWithSort: 1}))
+ .featureFlagBucketUnpackWithSort.value;
+if (!featureEnabled) {
+ jsTestLog("Skipping test because the BUS feature flag is disabled");
+ return;
+}
+
+load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
+load("jstests/aggregation/extras/utils.js"); // For getExplainedPipelineFromAggregation.
+const collName = "bucket_unpacking_with_sort";
+const coll = db[collName];
+const metaCollName = "bucket_unpacking_with_sort_with_meta";
+const metaColl = db[metaCollName];
+const metaCollSubFieldsName = "bucket_unpacking_with_sort_with_meta_sub";
+const metaCollSubFields = db[metaCollSubFieldsName];
+const subFields = ["a", "b"];
+
+const setupColl = (coll, collName, usesMeta, subFields = null) => {
+ coll.drop();
+ if (usesMeta) {
+ db.createCollection(collName, {timeseries: {timeField: "t", metaField: "m"}});
+ } else {
+ db.createCollection(collName, {timeseries: {timeField: "t"}});
+ }
+ // Create a few buckets.
+ let meta = usesMeta ? 10 : 1;
+ let docs = [];
+ let numberOfItemsPerBucket =
+ db.adminCommand({getParameter: 1, timeseriesBucketMaxCount: 1}).timeseriesBucketMaxCount;
+ assert(Number.isInteger(numberOfItemsPerBucket));
+ for (let j = 0; j < meta; ++j) {
+ let metaVal = j;
+ if (subFields) {
+ metaVal = subFields.reduce((memo, field) => {
+ return Object.assign(Object.assign({}, memo), {[field]: j});
+ }, {});
+ }
+ for (let i = 0; i < numberOfItemsPerBucket; ++i) {
+ docs.push({m: metaVal, t: new Date(i * 6000)});
+ }
+ // Because the max bucket span is 3600 * 1000 milliseconds, we know that the above will have
+ // generated two buckets. Since we are now going back in time to before the minimum
+ // timestamp of the second bucket, we'll open a third and fourth bucket below. Crucially
+ // will overlap with the first two buckets.
+ for (let i = 0; i < numberOfItemsPerBucket; ++i) {
+ docs.push({m: metaVal, t: new Date(i * 6000 + 3000)});
+ }
+ }
+ assert.commandWorked(coll.insert(docs));
+};
+
+setupColl(coll, collName, false);
+setupColl(metaColl, metaCollName, true);
+setupColl(metaCollSubFields, metaCollSubFieldsName, true, subFields);
+
+// For use in reductions.
+// Takes a memo (accumulator value) and a stage and returns if the stage was an internal bounded
+// sort or the memo was true.
+const stageIsInternalBoundedSort = (stage) => {
+ return stage.hasOwnProperty("$_internalBoundedSort");
+};
+
+const hasInternalBoundedSort = (pipeline) => pipeline.some(stageIsInternalBoundedSort);
+
+const getIfMatch = (memo, stage) => {
+ if (memo)
+ return memo;
+ else if (stage.hasOwnProperty("$match"))
+ return stage;
+ else
+ return null;
+};
+
+const findFirstMatch = (pipeline) => pipeline.reduce(getIfMatch, null);
+
+const setup = (coll, createIndex = null) => {
+ // Create index.
+ if (createIndex) {
+ assert.commandWorked(coll.createIndex(createIndex));
+ }
+};
+
+/**
+ * This test creates a time-series collection, inserts data (with multiple overlapping buckets), and
+ * tests that the BUS optimization is performed and produces correct results.
+ *
+ * `sortSpec` is the sort that we wish to have optimized.
+ * `createIndex` is the index that we need to create in order to perform the optimization.
+ * It defaults to null which signals that we don't create an index.
+ * `hint` is the hint that we specify in order to produce the optimization.
+ * traversing a min (resp. max) field index on a descending (resp. ascending) sort.
+ * `testColl` is the collection to use.
+ */
+const runRewritesTest = (sortSpec,
+ createIndex,
+ hint,
+ testColl,
+ precise,
+ intermediaryStages = [],
+ posteriorStages = []) => {
+ setup(testColl, createIndex);
+
+ const options = (hint) ? {hint: hint} : {};
+
+ const match = {$match: {t: {$lt: new Date("2022-01-01")}}};
+ // Get results
+ const optPipeline = [match, ...intermediaryStages, {$sort: sortSpec}, ...posteriorStages];
+ const optResults = testColl.aggregate(optPipeline, options).toArray();
+ const optExplainFull = testColl.explain().aggregate(optPipeline, options);
+
+ const ogPipeline = [
+ match,
+ ...intermediaryStages,
+ {$_internalInhibitOptimization: {}},
+ {$sort: sortSpec},
+ ...posteriorStages
+ ];
+ const ogResults = testColl.aggregate(ogPipeline, options).toArray();
+ const ogExplainFull = testColl.explain().aggregate(ogPipeline, options);
+
+ // Assert correct
+ assert.docEq(optResults, ogResults, {optResults: optResults, ogResults: ogResults});
+
+ // Check contains stage
+ const optExplain = getExplainedPipelineFromAggregation(
+ db, testColl, optPipeline, {inhibitOptimization: false, hint: hint});
+ assert(hasInternalBoundedSort(optExplain), optExplainFull);
+
+ // Check doesn't contain stage
+ const ogExplain = getExplainedPipelineFromAggregation(
+ db, testColl, ogPipeline, {inhibitOptimization: false, hint: hint});
+ assert(!hasInternalBoundedSort(ogExplain), ogExplainFull);
+
+ let foundMatch = findFirstMatch(optExplain);
+ if (!precise) {
+ assert.docEq(foundMatch, {
+ $match: {
+ $expr:
+ {$lte: [{$subtract: ["$control.max.t", "$control.min.t"]}, {$const: 3600000}]}
+ }
+ });
+ } else {
+ assert.docEq(foundMatch, match);
+ }
+};
+
+const runDoesntRewriteTest = (sortSpec, createIndex, hint, testColl, intermediaryStages = []) => {
+ setup(testColl, createIndex);
+
+ const optPipeline = [
+ {$match: {t: {$lt: new Date("2022-01-01")}}},
+ ...intermediaryStages,
+ {$sort: sortSpec},
+ ];
+
+ // Check doesn't contain stage
+ const optExplain = getExplainedPipelineFromAggregation(
+ db, testColl, optPipeline, {inhibitOptimization: false, hint: hint});
+ const optExplainFull = testColl.explain().aggregate(optPipeline, {hint});
+ const containsOptimization = optExplain.reduce(stageIsInternalBoundedSort, false);
+ assert(!containsOptimization, optExplainFull);
+};
+
+// Collscan cases
+runRewritesTest({t: 1}, null, null, coll, true);
+runRewritesTest({t: -1}, null, {$natural: -1}, coll, false);
+
+// Indexed cases
+runRewritesTest({t: 1}, {t: 1}, {t: 1}, coll, true);
+runRewritesTest({t: -1}, {t: -1}, {t: -1}, coll, true);
+runRewritesTest({t: 1}, {t: 1}, {t: 1}, coll, true);
+runRewritesTest({m: 1, t: -1}, {m: 1, t: -1}, {m: 1, t: -1}, metaColl, true);
+runRewritesTest({m: -1, t: 1}, {m: -1, t: 1}, {m: -1, t: 1}, metaColl, true);
+runRewritesTest({m: -1, t: -1}, {m: -1, t: -1}, {m: -1, t: -1}, metaColl, true);
+runRewritesTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, true);
+
+// Intermediary projects that don't modify sorted fields are allowed.
+runRewritesTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, true, [{$project: {a: 0}}]);
+runRewritesTest(
+ {m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, true, [{$project: {m: 1, t: 1}}]);
+
+// Test multiple meta fields
+let metaIndexObj = subFields.reduce((memo, field) => {
+ return Object.assign(Object.assign({}, memo), {[`m.${field}`]: 1});
+}, {});
+Object.assign(metaIndexObj, {t: 1});
+runRewritesTest(metaIndexObj, metaIndexObj, metaIndexObj, metaCollSubFields, true);
+runRewritesTest(
+ metaIndexObj, metaIndexObj, metaIndexObj, metaCollSubFields, true, [{$project: {m: 1, t: 1}}]);
+
+// Check sort-limit optimization.
+runRewritesTest({t: 1}, {t: 1}, {t: 1}, coll, true, [], [{$limit: 10}]);
+
+// Check set window fields is optimized as well.
+// Since {k: 1} cannot provide a bounded sort we know if there's a bounded sort it comes form
+// setWindowFields.
+runRewritesTest({k: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, true, [], [
+ {$setWindowFields: {partitionBy: "$m", sortBy: {t: 1}, output: {arr: {$max: "$t"}}}}
+]);
+
+// Negative tests
+for (let m = -1; m < 2; m++) {
+ for (let t = -1; t < 2; t++) {
+ for (let k = -1; k < 2; k++) {
+ printjson({"currently running": "the following configuration...", m: m, t: t, k: k});
+ let sort = null;
+ let createIndex = null;
+ let hint = null;
+ let usesMeta = null;
+ if (k != 0) {
+ // This is the case where we add an intermediary incompatible field.
+ if (m == 0) {
+ if (t == 0) {
+ sort = {k: k};
+ } else {
+ sort = {k: k, t: t};
+ }
+ } else {
+ if (t == 0) {
+ sort = {m: m, k: k};
+ } else {
+ sort = {m: m, k: k, t: t};
+ }
+ }
+ hint = sort;
+ createIndex = sort;
+ usesMeta = m != 0;
+ runDoesntRewriteTest(sort, createIndex, hint, usesMeta ? metaColl : coll);
+ } else {
+ // This is the case where we do not add an intermediary incompatible field.
+ // Instead we enumerate the ways that the index and sort could disagree.
+
+ // For the meta case, negate the time order.
+ // For the non-meta case, use a collscan with a negated order.
+ if (m == 0) {
+ if (t == 0) {
+ // Do not execute a test run.
+ } else {
+ sort = {t: t};
+ hint = {$natural: -t};
+ }
+ } else {
+ if (t == 0) {
+ // Do not execute a test run.
+ } else {
+ usesMeta = true;
+ sort = {m: m, t: t};
+ hint = {m: m, t: -t};
+ createIndex = hint;
+ }
+ }
+
+ if (sort)
+ runDoesntRewriteTest(sort, createIndex, hint, usesMeta ? metaColl : coll);
+
+ sort = null;
+ hint = null;
+ createIndex = null;
+ usesMeta = false;
+ // For the meta case, negate the meta order.
+ // For the non-meta case, use an index instead of a collscan.
+ if (m == 0) {
+ if (t == 0) {
+ // Do not execute a test run.
+ } else {
+ sort = {t: t};
+ createIndex = {t: -t};
+ hint = createIndex;
+ }
+ } else {
+ if (t == 0) {
+ // Do not execute a test run.
+ } else {
+ usesMeta = true;
+ sort = {m: m, t: t};
+ hint = {m: -m, t: t};
+ createIndex = hint;
+ }
+ }
+
+ if (sort)
+ runDoesntRewriteTest(sort, createIndex, hint, usesMeta ? metaColl : coll);
+
+ sort = null;
+ hint = null;
+ createIndex = null;
+ usesMeta = false;
+ // For the meta case, negate both meta and time.
+ if (m == 0) {
+ if (t == 0) {
+ // Do not execute a test run.
+ } else {
+ // Do not execute -- we've exhausted relevant cases.
+ }
+ } else {
+ if (t == 0) {
+ // Do not execute a test run.
+ } else {
+ usesMeta = true;
+ sort = {m: m, t: t};
+ hint = {m: -m, t: -t};
+ createIndex = hint;
+ }
+ }
+
+ if (sort)
+ runDoesntRewriteTest(sort, createIndex, hint, usesMeta ? metaColl : coll);
+ }
+ }
+ }
+}
+
+// Test mismatched meta paths don't produce the optimization.
+runDoesntRewriteTest({m: 1, t: 1}, {"m.a": 1, t: 1}, {"m.a": 1, t: 1}, metaCollSubFields);
+runDoesntRewriteTest(
+ {"m.b": 1, t: 1}, {"m.a": 1, "m.b": 1, t: 1}, {"m.a": 1, "m.b": 1, t: 1}, metaCollSubFields);
+runDoesntRewriteTest({"m.a": 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaCollSubFields);
+runDoesntRewriteTest({"m.a": 1, "m.b": 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaCollSubFields);
+// Test matched meta-subpaths with mismatched directions don't produce the optimization.
+runDoesntRewriteTest({"m.a": 1, t: -1}, {"m.a": 1, t: 1}, {"m.a": 1, t: 1}, metaCollSubFields);
+// Test intermediary projections that exclude the sorted fields don't produce the optimizaiton.
+runDoesntRewriteTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, [{$project: {m: 0}}]);
+runDoesntRewriteTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, [{$project: {t: 0}}]);
+runDoesntRewriteTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, [{$project: {a: 1}}]);
+runDoesntRewriteTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, [{$project: {'m.a': 0}}]);
+runDoesntRewriteTest({'m.a': 1, t: 1}, {'m.a': 1, t: 1}, {'m.a': 1, t: 1}, metaCollSubFields, [
+ {$project: {'m': 0}}
+]);
+})();
diff --git a/jstests/core/timeseries/timeseries_internal_bounded_sort.js b/jstests/core/timeseries/timeseries_internal_bounded_sort.js
index 30e58b7d42e..c52bb3efb65 100644
--- a/jstests/core/timeseries/timeseries_internal_bounded_sort.js
+++ b/jstests/core/timeseries/timeseries_internal_bounded_sort.js
@@ -88,7 +88,7 @@ function runTest(ascending) {
$_internalBoundedSort: {
sortKey: {t: ascending ? 1 : -1},
bound: ascending ? {base: "min"}
- : {base: "min", offset: bucketMaxSpanSeconds}
+ : {base: "min", offsetSeconds: bucketMaxSpanSeconds}
}
},
])
@@ -104,7 +104,7 @@ function runTest(ascending) {
{
$_internalBoundedSort: {
sortKey: {t: ascending ? 1 : -1},
- bound: ascending ? {base: "max", offset: -bucketMaxSpanSeconds}
+ bound: ascending ? {base: "max", offsetSeconds: -bucketMaxSpanSeconds}
: {base: "max"}
}
},
@@ -136,7 +136,7 @@ function runTest(ascending) {
$_internalBoundedSort: {
sortKey: {t: ascending ? 1 : -1},
bound: ascending ? {base: "min"}
- : {base: "min", offset: bucketMaxSpanSeconds},
+ : {base: "min", offsetSeconds: bucketMaxSpanSeconds},
limit: 100
}
},
@@ -154,7 +154,7 @@ function runTest(ascending) {
{
$_internalBoundedSort: {
sortKey: {t: ascending ? 1 : -1},
- bound: ascending ? {base: "max", offset: -bucketMaxSpanSeconds}
+ bound: ascending ? {base: "max", offsetSeconds: -bucketMaxSpanSeconds}
: {base: "max"},
limit: 100
}
diff --git a/jstests/core/timeseries/timeseries_internal_bounded_sort_compound.js b/jstests/core/timeseries/timeseries_internal_bounded_sort_compound.js
index 2e73a808d85..208d6a45b9a 100644
--- a/jstests/core/timeseries/timeseries_internal_bounded_sort_compound.js
+++ b/jstests/core/timeseries/timeseries_internal_bounded_sort_compound.js
@@ -131,7 +131,7 @@ function runTest(sortSpec) {
$_internalBoundedSort: {
sortKey: sortSpec,
bound: sortSpec.t > 0 ? {base: "min"}
- : {base: "min", offset: bucketMaxSpanSeconds}
+ : {base: "min", offsetSeconds: bucketMaxSpanSeconds}
}
},
];
@@ -145,7 +145,7 @@ function runTest(sortSpec) {
{
$_internalBoundedSort: {
sortKey: sortSpec,
- bound: sortSpec.t > 0 ? {base: "max", offset: -bucketMaxSpanSeconds}
+ bound: sortSpec.t > 0 ? {base: "max", offsetSeconds: -bucketMaxSpanSeconds}
: {base: "max"}
}
},
@@ -174,7 +174,7 @@ function runTest(sortSpec) {
$_internalBoundedSort: {
sortKey: sortSpec,
bound: sortSpec.t > 0 ? {base: "min"}
- : {base: "min", offset: bucketMaxSpanSeconds},
+ : {base: "min", offsetSeconds: bucketMaxSpanSeconds},
limit: 100
}
}
@@ -190,7 +190,7 @@ function runTest(sortSpec) {
{
$_internalBoundedSort: {
sortKey: sortSpec,
- bound: sortSpec.t > 0 ? {base: "max", offset: -bucketMaxSpanSeconds}
+ bound: sortSpec.t > 0 ? {base: "max", offsetSeconds: -bucketMaxSpanSeconds}
: {base: "max"},
limit: 100
}
diff --git a/jstests/core/timeseries/timeseries_internal_bounded_sort_compound_mixed_types.js b/jstests/core/timeseries/timeseries_internal_bounded_sort_compound_mixed_types.js
index a986458a99f..417818559d6 100644
--- a/jstests/core/timeseries/timeseries_internal_bounded_sort_compound_mixed_types.js
+++ b/jstests/core/timeseries/timeseries_internal_bounded_sort_compound_mixed_types.js
@@ -131,7 +131,7 @@ function runTest(sortSpec) {
// Use a much looser bound than necessary, to exercise the partitioning logic more.
// With such a loose bound, events are only released due to a partition boundary,
// never a bucket boundary.
- bound: {base: "min", offset: -sortSpec.t * 10 * bucketMaxSpanSeconds},
+ bound: {base: "min", offsetSeconds: -sortSpec.t * 10 * bucketMaxSpanSeconds},
}
},
];
@@ -152,7 +152,7 @@ function runTest(sortSpec) {
// Use a much looser bound than necessary, to exercise the partitioning logic more.
// With such a loose bound, events are only released due to a partition boundary,
// never a bucket boundary.
- bound: {base: "max", offset: -sortSpec.t * 10 * bucketMaxSpanSeconds},
+ bound: {base: "max", offsetSeconds: -sortSpec.t * 10 * bucketMaxSpanSeconds},
}
},
];
diff --git a/src/mongo/db/exec/bucket_unpacker.cpp b/src/mongo/db/exec/bucket_unpacker.cpp
index 7703c9d3949..0651aae78ee 100644
--- a/src/mongo/db/exec/bucket_unpacker.cpp
+++ b/src/mongo/db/exec/bucket_unpacker.cpp
@@ -42,7 +42,6 @@
#include "mongo/db/matcher/expression_tree.h"
#include "mongo/db/matcher/extensions_callback_noop.h"
#include "mongo/db/pipeline/expression.h"
-#include "mongo/db/timeseries/timeseries_constants.h"
#include "mongo/db/timeseries/timeseries_options.h"
namespace mongo {
diff --git a/src/mongo/db/exec/bucket_unpacker.h b/src/mongo/db/exec/bucket_unpacker.h
index 90465257415..287bd9f2540 100644
--- a/src/mongo/db/exec/bucket_unpacker.h
+++ b/src/mongo/db/exec/bucket_unpacker.h
@@ -37,6 +37,7 @@
#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/matcher/expression.h"
#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/timeseries/timeseries_constants.h"
namespace mongo {
/**
@@ -289,6 +290,22 @@ public:
return _includeMaxTimeAsMetadata;
}
+ const std::string& getTimeField() const {
+ return _spec.timeField();
+ }
+
+ const boost::optional<std::string>& getMetaField() const {
+ return _spec.metaField();
+ }
+
+ std::string getMinField(StringData field) const {
+ return std::string{timeseries::kControlMinFieldNamePrefix} + field;
+ }
+
+ std::string getMaxField(StringData field) const {
+ return std::string{timeseries::kControlMaxFieldNamePrefix} + field;
+ }
+
void setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior);
void setIncludeMinTimeAsMetadata();
void setIncludeMaxTimeAsMetadata();
diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h
index 0de33f09899..f9ce637dbad 100644
--- a/src/mongo/db/exec/collection_scan.h
+++ b/src/mongo/db/exec/collection_scan.h
@@ -91,6 +91,10 @@ public:
const SpecificStats* getSpecificStats() const final;
+ CollectionScanParams::Direction getDirection() const {
+ return _params.direction;
+ }
+
protected:
void doSaveStateRequiresCollection() final;
diff --git a/src/mongo/db/exec/index_scan.h b/src/mongo/db/exec/index_scan.h
index 3efc3360d08..c0cf75bf02d 100644
--- a/src/mongo/db/exec/index_scan.h
+++ b/src/mongo/db/exec/index_scan.h
@@ -131,6 +131,14 @@ public:
static const char* kStageType;
+ const BSONObj& getKeyPattern() const {
+ return _keyPattern;
+ }
+
+ bool isForward() const {
+ return _forward;
+ }
+
protected:
void doSaveStateRequiresIndex() final;
diff --git a/src/mongo/db/index/sort_key_generator.h b/src/mongo/db/index/sort_key_generator.h
index d9b73c75c38..40e270dac21 100644
--- a/src/mongo/db/index/sort_key_generator.h
+++ b/src/mongo/db/index/sort_key_generator.h
@@ -75,6 +75,10 @@ public:
return _sortPattern.isSingleElementKey();
}
+ const SortPattern& getSortPattern() const {
+ return _sortPattern;
+ }
+
private:
// Returns the sort key for the input 'doc' as a Value.
//
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 37a6b103140..ff2aaf9f8ae 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -49,6 +49,7 @@
#include "mongo/db/exec/scoped_timer.h"
#include "mongo/db/generic_cursor.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/matcher/expression_algo.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/expression_context.h"
@@ -597,6 +598,41 @@ public:
return newNames;
}
+ bool canModify(const FieldPath& fieldPath) const {
+ switch (type) {
+ case Type::kAllPaths:
+ return true;
+ case Type::kNotSupported:
+ return true;
+ case Type::kFiniteSet:
+ // If there's a subpath that is modified this path may be modified.
+ for (size_t i = 0; i < fieldPath.getPathLength(); i++) {
+ if (paths.count(fieldPath.getSubpath(i).toString()))
+ return true;
+ }
+
+ for (auto&& path : paths) {
+ // If there's a superpath that is modified this path may be modified.
+ if (expression::isPathPrefixOf(fieldPath.fullPath(), path)) {
+ return true;
+ }
+ }
+
+ return false;
+ case Type::kAllExcept:
+ // If one of the subpaths is unmodified return false.
+ for (size_t i = 0; i < fieldPath.getPathLength(); i++) {
+ if (paths.count(fieldPath.getSubpath(i).toString()))
+ return false;
+ }
+
+ // Otherwise return true;
+ return true;
+ }
+ // Cannot hit.
+ MONGO_UNREACHABLE_TASSERT(6434901);
+ }
+
Type type;
std::set<std::string> paths;
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
index 4aef3bc0c2d..272d7a1a80c 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
@@ -1191,5 +1191,4 @@ DocumentSource::GetModPathsReturn DocumentSourceInternalUnpackBucket::getModifie
}
return {GetModPathsReturn::Type::kAllPaths, std::set<std::string>{}, {}};
}
-
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
index 96bca0e1680..df3bf93bff4 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
@@ -33,6 +33,9 @@
#include <vector>
#include "mongo/db/exec/bucket_unpacker.h"
+#include "mongo/db/exec/collection_scan.h"
+#include "mongo/db/exec/index_scan.h"
+#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_match.h"
@@ -103,6 +106,18 @@ public:
return DepsTracker::State::EXHAUSTIVE_ALL;
}
+ int getBucketMaxSpanSeconds() const {
+ return _bucketMaxSpanSeconds;
+ }
+
+ std::string getMinTimeField() const {
+ return _bucketUnpacker.getMinField(_bucketUnpacker.getTimeField());
+ }
+
+ std::string getMaxTimeField() const {
+ return _bucketUnpacker.getMaxField(_bucketUnpacker.getTimeField());
+ }
+
boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
return boost::none;
};
@@ -164,6 +179,14 @@ public:
_bucketMaxCount = bucketMaxCount;
}
+ void setIncludeMinTimeAsMetadata() {
+ _bucketUnpacker.setIncludeMinTimeAsMetadata();
+ }
+
+ void setIncludeMaxTimeAsMetadata() {
+ _bucketUnpacker.setIncludeMaxTimeAsMetadata();
+ }
+
boost::optional<long long> sampleSize() const {
return _sampleSize;
}
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index bf9c72eb89d..2503a2e0857 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -59,9 +59,6 @@ using std::unique_ptr;
using std::vector;
namespace {
-constexpr StringData kMin = "min"_sd;
-constexpr StringData kMax = "max"_sd;
-
struct BoundMakerMin {
const long long offset; // Offset in millis
@@ -72,7 +69,9 @@ struct BoundMakerMin {
}
Document serialize() const {
- return Document{{{"base"_sd, kMin}, {"offset"_sd, offset}}};
+ // Convert from millis to seconds.
+ return Document{{{"base"_sd, DocumentSourceSort::kMin},
+ {DocumentSourceSort::kOffset, (offset / 1000)}}};
}
};
@@ -86,7 +85,9 @@ struct BoundMakerMax {
}
Document serialize() const {
- return Document{{{"base"_sd, kMax}, {"offset"_sd, offset}}};
+ // Convert from millis to seconds.
+ return Document{{{"base"_sd, DocumentSourceSort::kMax},
+ {DocumentSourceSort::kOffset, (offset / 1000)}}};
}
};
struct CompAsc {
@@ -423,6 +424,58 @@ intrusive_ptr<DocumentSourceSort> DocumentSourceSort::create(
return pSort;
}
+intrusive_ptr<DocumentSourceSort> DocumentSourceSort::createBoundedSort(
+ SortPattern pat,
+ StringData boundBase,
+ long long boundOffset,
+ boost::optional<long long> limit,
+ const intrusive_ptr<ExpressionContext>& expCtx) {
+
+ auto ds = DocumentSourceSort::create(expCtx, pat);
+
+ SortOptions opts;
+ opts.maxMemoryUsageBytes = internalQueryMaxBlockingSortMemoryUsageBytes.load();
+ if (expCtx->allowDiskUse) {
+ opts.extSortAllowed = true;
+ opts.tempDir = expCtx->tempDir;
+ }
+
+ if (limit) {
+ opts.Limit(limit.get());
+ }
+
+ if (boundBase == kMin) {
+ if (pat.back().isAscending) {
+ ds->_timeSorter.reset(
+ new TimeSorterAscMin{opts, CompAsc{}, BoundMakerMin{boundOffset}});
+ } else {
+ ds->_timeSorter.reset(
+ new TimeSorterDescMin{opts, CompDesc{}, BoundMakerMin{boundOffset}});
+ }
+ ds->_requiredMetadata.set(DocumentMetadataFields::MetaType::kTimeseriesBucketMinTime);
+ } else if (boundBase == kMax) {
+ if (pat.back().isAscending) {
+ ds->_timeSorter.reset(
+ new TimeSorterAscMax{opts, CompAsc{}, BoundMakerMax{boundOffset}});
+ } else {
+ ds->_timeSorter.reset(
+ new TimeSorterDescMax{opts, CompDesc{}, BoundMakerMax{boundOffset}});
+ }
+ ds->_requiredMetadata.set(DocumentMetadataFields::MetaType::kTimeseriesBucketMaxTime);
+ } else {
+ MONGO_UNREACHABLE;
+ }
+
+ if (pat.size() > 1) {
+ SortPattern partitionKey =
+ std::vector<SortPattern::SortPatternPart>(pat.begin(), pat.end() - 1);
+ ds->_timeSorterPartitionKeyGen =
+ SortKeyGenerator{std::move(partitionKey), expCtx->getCollator()};
+ }
+
+ return ds;
+}
+
intrusive_ptr<DocumentSourceSort> DocumentSourceSort::parseBoundedSort(
BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
uassert(6369905,
@@ -451,7 +504,7 @@ intrusive_ptr<DocumentSourceSort> DocumentSourceSort::parseBoundedSort(
uassert(
6460200, "$_internalBoundedSort bound must be an object", bound && bound.type() == Object);
- BSONElement boundOffsetElem = bound.Obj()["offset"];
+ BSONElement boundOffsetElem = bound.Obj()[DocumentSourceSort::kOffset];
long long boundOffset = 0;
if (boundOffsetElem && boundOffsetElem.isNumber()) {
boundOffset = uassertStatusOK(boundOffsetElem.parseIntegerElementToLong()) *
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index 89e302330c2..ac592c08c6c 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -42,6 +42,10 @@ namespace mongo {
class DocumentSourceSort final : public DocumentSource {
public:
+ static constexpr StringData kMin = "min"_sd;
+ static constexpr StringData kMax = "max"_sd;
+ static constexpr StringData kOffset = "offsetSeconds"_sd;
+
struct SortableDate {
Date_t date;
@@ -130,6 +134,12 @@ public:
return create(pExpCtx, {sortOrder, pExpCtx});
}
+ static boost::intrusive_ptr<DocumentSourceSort> createBoundedSort(
+ SortPattern pat,
+ StringData boundBase,
+ long long boundOffset,
+ boost::optional<long long> limit,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx);
/**
* Parse a stage that uses BoundedSorter.
*/
@@ -164,6 +174,10 @@ public:
return _populated;
};
+ bool isBoundedSortStage() {
+ return (_timeSorter) ? true : false;
+ }
+
bool hasLimit() const {
return _sortExecutor->hasLimit();
}
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 0719e235bb9..dbc96480c10 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -28,6 +28,7 @@
*/
#include "mongo/db/query/projection_parser.h"
+#include "mongo/db/server_options.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
#include "mongo/platform/basic.h"
@@ -53,6 +54,7 @@
#include "mongo/db/exec/unpack_timeseries_bucket.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/index/index_access_method.h"
+#include "mongo/db/matcher/expression_expr.h"
#include "mongo/db/matcher/extensions_callback_real.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/ops/write_ops_exec.h"
@@ -77,6 +79,7 @@
#include "mongo/db/query/collation/collator_interface.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_executor_factory.h"
+#include "mongo/db/query/plan_executor_impl.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/query_feature_flags_gen.h"
#include "mongo/db/query/query_knobs_gen.h"
@@ -406,6 +409,35 @@ std::pair<DocumentSourceSample*, DocumentSourceInternalUnpackBucket*> extractSam
return std::pair{sampleStage, unpackStage};
}
+
+std::tuple<DocumentSourceInternalUnpackBucket*, DocumentSourceSort*> findUnpackThenSort(
+ const Pipeline::SourceContainer& sources) {
+ DocumentSourceSort* sortStage = nullptr;
+ DocumentSourceInternalUnpackBucket* unpackStage = nullptr;
+
+ auto sourcesIt = sources.begin();
+ while (sourcesIt != sources.end()) {
+ if (!sortStage) {
+ sortStage = dynamic_cast<DocumentSourceSort*>(sourcesIt->get());
+
+ if (sortStage) {
+ // Do not double optimize
+ if (sortStage->isBoundedSortStage()) {
+ return {nullptr, nullptr};
+ }
+
+ return {unpackStage, sortStage};
+ }
+ }
+
+ if (!unpackStage) {
+ unpackStage = dynamic_cast<DocumentSourceInternalUnpackBucket*>(sourcesIt->get());
+ }
+ ++sourcesIt;
+ }
+
+ return {unpackStage, sortStage};
+}
} // namespace
StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::createRandomCursorExecutor(
@@ -874,6 +906,103 @@ auto buildProjectionForPushdown(const DepsTracker& deps,
}
} // namespace
+boost::optional<std::pair<PipelineD::IndexSortOrderAgree, PipelineD::IndexOrderedByMinTime>>
+PipelineD::supportsSort(const BucketUnpacker& bucketUnpacker,
+ PlanStage* root,
+ const SortPattern& sort) {
+ if (!root)
+ return boost::none;
+
+ switch (root->stageType()) {
+ case STAGE_COLLSCAN: {
+ const CollectionScan* scan = static_cast<CollectionScan*>(root);
+ if (sort.size() == 1) {
+ auto part = sort[0];
+ // Check the sort we're asking for is on time.
+ if (part.fieldPath && *part.fieldPath == bucketUnpacker.getTimeField()) {
+ // Check that the directions agree.
+ if ((scan->getDirection() == CollectionScanParams::Direction::FORWARD) ==
+ part.isAscending)
+ return std::pair{part.isAscending, true};
+ }
+ }
+ return boost::none;
+ }
+ case STAGE_IXSCAN: {
+ const IndexScan* scan = static_cast<IndexScan*>(root);
+
+ const auto keyPattern = scan->getKeyPattern();
+ const bool forward = scan->isForward();
+
+ // Return none if the keyPattern cannot support the sort.
+ // Note We add one to sort size to account for the compounding on min/max for the time
+ // field.
+ if ((sort.size() + 1 > (unsigned int)keyPattern.nFields()) || (sort.size() < 1)) {
+ return boost::none;
+ }
+
+ if (sort.size() == 1) {
+ auto part = sort[0];
+
+ // TOOD SERVER-65050: implement here.
+
+ // Check the sort we're asking for is on time.
+ if (part.fieldPath && *part.fieldPath == bucketUnpacker.getTimeField()) {
+ auto keyPatternIter = keyPattern.begin();
+
+ return checkTimeHelper(
+ bucketUnpacker, keyPatternIter, forward, *part.fieldPath, part.isAscending);
+ }
+ } else if (sort.size() >= 2) {
+ size_t i = 0;
+
+ for (auto keyPatternIter = keyPattern.begin();
+ i < sort.size() && keyPatternIter != keyPattern.end();
+ (++keyPatternIter)) {
+ auto part = sort[i];
+ if (!(part.fieldPath))
+ return boost::none;
+
+ if (i < sort.size() - 1) {
+ // Check the meta field index isn't special.
+ if (!(*keyPatternIter).isNumber() ||
+ abs((*keyPatternIter).numberInt()) != 1) {
+ return boost::none;
+ }
+
+ // True = ascending; false = descending.
+ bool direction = ((*keyPatternIter).numberInt() == 1);
+ direction = (forward) ? direction : !direction;
+
+ // Return false if partOne and the first keyPattern part don't agree.
+ if (!sortAndKeyPatternPartAgreeAndOnMeta(
+ bucketUnpacker, (*keyPatternIter).fieldName(), *part.fieldPath) ||
+ part.isAscending != direction)
+ return boost::none;
+ } else {
+ if (!part.fieldPath ||
+ !(*part.fieldPath == bucketUnpacker.getTimeField())) {
+ return boost::none;
+ }
+
+ return checkTimeHelper(bucketUnpacker,
+ keyPatternIter,
+ forward,
+ *part.fieldPath,
+ part.isAscending);
+ }
+
+ // Increment index
+ ++i;
+ }
+ }
+ return boost::none;
+ }
+ default:
+ return boost::none;
+ }
+}
+
std::pair<PipelineD::AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>>
PipelineD::buildInnerQueryExecutorGeneric(const MultipleCollectionAccessor& collections,
const NamespaceString& nss,
@@ -944,6 +1073,165 @@ PipelineD::buildInnerQueryExecutorGeneric(const MultipleCollectionAccessor& coll
Pipeline::kAllowedMatcherFeatures,
&shouldProduceEmptyDocs));
+ // If this is a query on a time-series collection then it may be eligible for a post-planning
+ // sort optimization. We check eligibility and perform the rewrite here.
+ auto [unpack, sort] = findUnpackThenSort(pipeline->_sources);
+ if (serverGlobalParams.featureCompatibility.isVersionInitialized() &&
+ serverGlobalParams.featureCompatibility.isGreaterThanOrEqualTo(
+ multiversion::FeatureCompatibilityVersion::kVersion_6_1) &&
+ feature_flags::gFeatureFlagBucketUnpackWithSort.isEnabled(
+ serverGlobalParams.featureCompatibility) &&
+ unpack && sort) {
+ auto execImpl = dynamic_cast<PlanExecutorImpl*>(exec.get());
+ if (execImpl) {
+
+ // Get source stage
+ PlanStage* rootStage = execImpl->getRootStage();
+ while (rootStage && rootStage->getChildren().size() == 1) {
+ switch (rootStage->stageType()) {
+ case STAGE_FETCH:
+ rootStage = rootStage->child().get();
+ break;
+ case STAGE_SHARDING_FILTER:
+ rootStage = rootStage->child().get();
+ break;
+ default:
+ rootStage = nullptr;
+ }
+ }
+
+ if (rootStage && rootStage->getChildren().size() != 0) {
+ rootStage = nullptr;
+ }
+
+ const auto& sortPattern = sort->getSortKeyPattern();
+ if (auto agree = supportsSort(unpack->bucketUnpacker(), rootStage, sortPattern)) {
+ // Scan the pipeline to check if it's compatible with the optimization.
+ bool badStage = false;
+ bool seenSort = false;
+ std::list<boost::intrusive_ptr<DocumentSource>>::iterator iter =
+ pipeline->_sources.begin();
+ std::list<boost::intrusive_ptr<DocumentSource>>::iterator unpackIter =
+ pipeline->_sources.end();
+ for (; !badStage && iter != pipeline->_sources.end() && !seenSort; ++iter) {
+ if (dynamic_cast<const DocumentSourceSort*>(iter->get())) {
+ seenSort = true;
+ } else if (dynamic_cast<const DocumentSourceMatch*>(iter->get())) {
+ // do nothing
+ } else if (dynamic_cast<const DocumentSourceInternalUnpackBucket*>(
+ iter->get())) {
+ unpackIter = iter;
+ } else if (auto projection =
+ dynamic_cast<const DocumentSourceSingleDocumentTransformation*>(
+ iter->get())) {
+ auto modPaths = projection->getModifiedPaths();
+
+ // Check to see if the sort paths are modified.
+ for (auto sortIter = sortPattern.begin();
+ !badStage && sortIter != sortPattern.end();
+ ++sortIter) {
+
+ auto fieldPath = sortIter->fieldPath;
+ // If they are then escap the loop & don't optimize.
+ if (!fieldPath || modPaths.canModify(*fieldPath)) {
+ badStage = true;
+ }
+ }
+
+ } else {
+ badStage = true;
+ }
+ }
+ if (!badStage && seenSort) {
+ auto [indexSortOrderAgree, indexOrderedByMinTime] = *agree;
+ // This is safe because we have seen a sort so we must have at least one stage
+ // to the left of the current iterator position.
+ --iter;
+
+ if (indexOrderedByMinTime) {
+ unpack->setIncludeMinTimeAsMetadata();
+ } else {
+ unpack->setIncludeMaxTimeAsMetadata();
+ }
+
+ if (indexSortOrderAgree) {
+ pipeline->_sources.insert(
+ iter,
+ DocumentSourceSort::createBoundedSort(sort->getSortKeyPattern(),
+ (indexOrderedByMinTime
+ ? DocumentSourceSort::kMin
+ : DocumentSourceSort::kMax),
+ 0,
+ sort->getLimit(),
+ expCtx));
+ } else {
+ // Since the sortPattern and the direction of the index don't agree we must
+ // use the offset to get an estimate on the bounds of the bucket.
+ pipeline->_sources.insert(
+ iter,
+ DocumentSourceSort::createBoundedSort(
+ sort->getSortKeyPattern(),
+ (indexOrderedByMinTime ? DocumentSourceSort::kMin
+ : DocumentSourceSort::kMax),
+ ((indexOrderedByMinTime) ? unpack->getBucketMaxSpanSeconds()
+ : -unpack->getBucketMaxSpanSeconds()) *
+ 1000,
+ sort->getLimit(),
+ expCtx));
+
+ /**
+ * We wish to create the following predicate to avoid returning incorrect
+ * results in the unlikely event bucketMaxSpanSeconds changes under us.
+ *
+ * {$expr:
+ * {$lte: [
+ * {$subtract: [$control.max.timeField, $control.min.timeField]},
+ * {$const: bucketMaxSpanSeconds, in milliseconds}
+ * ]}}
+ */
+ auto minTime = unpack->getMinTimeField();
+ auto maxTime = unpack->getMaxTimeField();
+ auto match = std::make_unique<ExprMatchExpression>(
+ // This produces {$lte: ... }
+ make_intrusive<ExpressionCompare>(
+ expCtx.get(),
+ ExpressionCompare::CmpOp::LTE,
+ // This produces [...]
+ makeVector<boost::intrusive_ptr<Expression>>(
+ // This produces {$subtract: ... }
+ make_intrusive<ExpressionSubtract>(
+ expCtx.get(),
+ // This produces [...]
+ makeVector<boost::intrusive_ptr<Expression>>(
+ // This produces "$control.max.timeField"
+ ExpressionFieldPath::createPathFromString(
+ expCtx.get(), maxTime, expCtx->variablesParseState),
+ // This produces "$control.min.timeField"
+ ExpressionFieldPath::createPathFromString(
+ expCtx.get(),
+ minTime,
+ expCtx->variablesParseState))),
+ // This produces {$const: maxBucketSpanSeconds}
+ make_intrusive<ExpressionConstant>(
+ expCtx.get(),
+ Value{unpack->getBucketMaxSpanSeconds() * 1000}))),
+ expCtx);
+ pipeline->_sources.insert(
+ unpackIter,
+ make_intrusive<DocumentSourceMatch>(std::move(match), expCtx));
+ }
+ // Ensure we're erasing the sort source.
+ tassert(6434901,
+ "we must erase a $sort stage and replace it with a bounded sort stage",
+ strcmp((*iter)->getSourceName(),
+ DocumentSourceSort::kStageName.rawData()) == 0);
+ pipeline->_sources.erase(iter);
+ pipeline->stitch();
+ }
+ }
+ }
+ }
+
const auto cursorType = shouldProduceEmptyDocs
? DocumentSourceCursor::CursorType::kEmptyDocuments
: DocumentSourceCursor::CursorType::kRegular;
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index c7dcd8e9fec..4a88600b114 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -29,6 +29,7 @@
#pragma once
+#include "mongo/db/exec/bucket_unpacker.h"
#include <boost/intrusive_ptr.hpp>
#include <memory>
@@ -251,6 +252,84 @@ private:
long long sampleSize,
long long numRecords,
boost::optional<BucketUnpacker> bucketUnpacker);
+
+ typedef bool IndexSortOrderAgree;
+ typedef bool IndexOrderedByMinTime;
+
+ /*
+ * Takes a leaf plan stage and a sort pattern and returns a pair if they support the Bucket
+Unpacking with Sort Optimization.
+ * The pair includes whether the index order and sort order agree with each other as its first
+ * member and the order of the index as the second parameter.
+ *
+ * Note that the index scan order is different from the index order.
+ */
+ static boost::optional<std::pair<IndexSortOrderAgree, IndexOrderedByMinTime>> supportsSort(
+ const BucketUnpacker& bucketUnpacker, PlanStage* root, const SortPattern& sort);
+
+ /* This is a helper method for supportsSort. It takes the current iterator for the index
+ * keyPattern, the direction of the index scan, the timeField path we're sorting on, and the
+ * direction of the sort. It returns a pair if this data agrees and none if it doesn't.
+ *
+ * The pair contains whether the index order and the sort order agree with each other as the
+ * firstmember and the order of the index as the second parameter.
+ *
+ * Note that the index scan order may be different from the index order.
+ * N.B.: It ASSUMES that there are two members left in the keyPatternIter iterator, and that the
+ * timeSortFieldPath is in fact the path on time.
+ */
+ static boost::optional<std::pair<IndexSortOrderAgree, IndexOrderedByMinTime>> checkTimeHelper(
+ const BucketUnpacker& bucketUnpacker,
+ BSONObj::iterator& keyPatternIter,
+ bool scanIsForward,
+ const FieldPath& timeSortFieldPath,
+ bool sortIsAscending) {
+ bool wasMin = false;
+ bool wasMax = false;
+
+ // Check that the index isn't special.
+ if ((*keyPatternIter).isNumber() && abs((*keyPatternIter).numberInt()) == 1) {
+ bool direction = ((*keyPatternIter).numberInt() == 1);
+ direction = (scanIsForward) ? direction : !direction;
+
+ // Verify the direction and fieldNames match.
+ wasMin = ((*keyPatternIter).fieldName() ==
+ bucketUnpacker.getMinField(timeSortFieldPath.fullPath()));
+ wasMax = ((*keyPatternIter).fieldName() ==
+ bucketUnpacker.getMaxField(timeSortFieldPath.fullPath()));
+ // Terminate early if it wasn't max or min or if the directions don't match.
+ if ((wasMin || wasMax) && (sortIsAscending == direction))
+ return std::pair{wasMin ? sortIsAscending : !sortIsAscending, wasMin};
+ }
+
+ return boost::none;
+ }
+
+ static bool sortAndKeyPatternPartAgreeAndOnMeta(const BucketUnpacker& bucketUnpacker,
+ const char* keyPatternFieldName,
+ const FieldPath& sortFieldPath) {
+ FieldPath keyPatternFieldPath = FieldPath(keyPatternFieldName);
+
+ // If they don't have the same path length they cannot agree.
+ if (keyPatternFieldPath.getPathLength() != sortFieldPath.getPathLength())
+ return false;
+
+ // Check these paths are on the meta field.
+ if (keyPatternFieldPath.getSubpath(0) != mongo::timeseries::kBucketMetaFieldName)
+ return false;
+ if (!bucketUnpacker.getMetaField() ||
+ sortFieldPath.getSubpath(0) != *bucketUnpacker.getMetaField()) {
+ return false;
+ }
+
+ // If meta was the only path component then return true.
+ // Note: We already checked that the path lengths are equal.
+ if (keyPatternFieldPath.getPathLength() == 1)
+ return true;
+
+ // Otherwise return if the remaining path components are equal.
+ return (keyPatternFieldPath.tail() == sortFieldPath.tail());
+ }
};
} // namespace mongo