summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/aggregation/extras/utils.js63
-rw-r--r--jstests/core/timeseries/bucket_unpacking_with_sort.js356
-rw-r--r--jstests/core/timeseries/timeseries_internal_bounded_sort.js8
-rw-r--r--jstests/core/timeseries/timeseries_internal_bounded_sort_compound.js8
-rw-r--r--jstests/core/timeseries/timeseries_internal_bounded_sort_compound_mixed_types.js4
-rw-r--r--src/mongo/db/exec/bucket_unpacker.cpp1
-rw-r--r--src/mongo/db/exec/bucket_unpacker.h17
-rw-r--r--src/mongo/db/exec/collection_scan.h4
-rw-r--r--src/mongo/db/exec/index_scan.h8
-rw-r--r--src/mongo/db/index/sort_key_generator.h4
-rw-r--r--src/mongo/db/pipeline/document_source.h36
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp93
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.h99
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp65
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h14
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp191
16 files changed, 34 insertions, 937 deletions
diff --git a/jstests/aggregation/extras/utils.js b/jstests/aggregation/extras/utils.js
index 4c71328c027..efbe87fa3cf 100644
--- a/jstests/aggregation/extras/utils.js
+++ b/jstests/aggregation/extras/utils.js
@@ -441,75 +441,46 @@ function desugarSingleStageAggregation(db, coll, stage) {
/**
* Runs and asserts an explain command for an aggregation with the given pipeline. Returns just the
* pipeline from the explain results regardless of cluster topology.
- * The fourth parameter `options` is for a few options for unusual scenarios.
- * options.inhibitOptimization defaults to true. This prepends an inhibitOptimization stage to the
- * query and removes it before returning results. This is sub ideal for views. options.hint is an
- * optional hint that will get passed on to the aggregation stage. It defaults to undefined.
*/
-function getExplainedPipelineFromAggregation(
- db, coll, pipeline, {inhibitOptimization = true, hint} = {}) {
+function getExplainedPipelineFromAggregation(db, coll, pipeline) {
// Prevent stages from being absorbed into the .find() layer
- if (inhibitOptimization) {
- pipeline.unshift({$_internalInhibitOptimization: {}});
- }
-
- const aggOptions = hint ? {hint: hint} : {};
-
- const result = coll.explain().aggregate(pipeline, aggOptions);
+ pipeline.unshift({$_internalInhibitOptimization: {}});
+ const result = coll.explain().aggregate(pipeline);
assert.commandWorked(result);
- return getExplainPipelineFromAggregationResult(db, result, {inhibitOptimization});
+ return getExplainPipelineFromAggregationResult(db, result);
}
-function getExplainPipelineFromAggregationResult(db, result, {
- inhibitOptimization = true,
-} = {}) {
+function getExplainPipelineFromAggregationResult(db, result) {
// We proceed by cases based on topology.
if (!FixtureHelpers.isMongos(db)) {
assert(Array.isArray(result.stages), result);
- // The first two stages should be the .find() cursor and the inhibit-optimization stage (if
- // enabled); the rest of the stages are what the user's 'stage' expanded to.
+ // The first two stages should be the .find() cursor and the inhibit-optimization stage;
+ // the rest of the stages are what the user's 'stage' expanded to.
assert(result.stages[0].$cursor, result);
- if (inhibitOptimization) {
- assert(result.stages[1].$_internalInhibitOptimization, result);
- return result.stages.slice(2);
- } else {
- return result.stages.slice(1);
- }
+ assert(result.stages[1].$_internalInhibitOptimization, result);
+ return result.stages.slice(2);
} else {
if (result.splitPipeline) {
- let shardsPart = null;
- if (inhibitOptimization) {
- assert(result.splitPipeline.shardsPart[0].$_internalInhibitOptimization, result);
- shardsPart = result.splitPipeline.shardsPart.slice(1);
- } else {
- shardsPart = result.splitPipeline.shardsPart;
- }
+ assert(result.splitPipeline.shardsPart[0].$_internalInhibitOptimization, result);
+ const shardsPart = result.splitPipeline.shardsPart.slice(1);
assert(result.splitPipeline.mergerPart[0].$mergeCursors, result);
const mergerPart = result.splitPipeline.mergerPart.slice(1);
return [].concat(shardsPart).concat(mergerPart);
} else if (result.stages) {
// Required for aggregation_mongos_passthrough.
assert(Array.isArray(result.stages), result);
- // The first two stages should be the .find() cursor and the inhibit-optimization stage
- // (if enabled); the rest of the stages are what the user's 'stage' expanded to.
+ // The first two stages should be the .find() cursor and the inhibit-optimization stage;
+ // the rest of the stages are what the user's 'stage' expanded to.
assert(result.stages[0].$cursor, result);
- if (inhibitOptimization) {
- assert(result.stages[1].$_internalInhibitOptimization, result);
- return result.stages.slice(2);
- } else {
- return result.stages.slice(1);
- }
+ assert(result.stages[1].$_internalInhibitOptimization, result);
+ return result.stages.slice(2);
} else {
// Required for aggregation_one_shard_sharded_collections.
assert(Array.isArray(result.shards["shard-rs0"].stages), result);
assert(result.shards["shard-rs0"].stages[0].$cursor, result);
- if (inhibitOptimization) {
- assert(result.shards["shard-rs0"].stages[1].$_internalInhibitOptimization, result);
- return result.shards["shard-rs0"].stages.slice(2);
- } else {
- return result.shards["shard-rs0"].stages.slice(1);
- }
+ assert(result.shards["shard-rs0"].stages[1].$_internalInhibitOptimization, result);
+ return result.shards["shard-rs0"].stages.slice(2);
}
}
}
diff --git a/jstests/core/timeseries/bucket_unpacking_with_sort.js b/jstests/core/timeseries/bucket_unpacking_with_sort.js
deleted file mode 100644
index 77c41091451..00000000000
--- a/jstests/core/timeseries/bucket_unpacking_with_sort.js
+++ /dev/null
@@ -1,356 +0,0 @@
-/**
- * Test that the bucket unpacking with sorting rewrite is performed and doesn't cause incorrect
- * results to be created.
- *
- * @tags: [
- * requires_fcv_61,
- * # We need a timeseries collection.
- * assumes_no_implicit_collection_creation_after_drop,
- * # Bounded sorter is currently broken w/ sharding.
- * assumes_unsharded_collection,
- * # Cannot insert into a time-series collection in a multi-document transaction.
- * does_not_support_transactions,
- * # Refusing to run a test that issues an aggregation command with explain because it may
- * # return incomplete results if interrupted by a stepdown.
- * does_not_support_stepdowns,
- * # This complicates aggregation extraction.
- * do_not_wrap_aggregations_in_facets,
- * # We need a timeseries collection.
- * requires_timeseries,
- * # Explain of a resolved view must be executed by mongos.
- * directly_against_shardsvrs_incompatible,
- * ]
- */
-(function() {
-"use strict";
-
-const featureEnabled =
- assert.commandWorked(db.adminCommand({getParameter: 1, featureFlagBucketUnpackWithSort: 1}))
- .featureFlagBucketUnpackWithSort.value;
-if (!featureEnabled) {
- jsTestLog("Skipping test because the BUS feature flag is disabled");
- return;
-}
-
-load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
-load("jstests/aggregation/extras/utils.js"); // For getExplainedPipelineFromAggregation.
-const collName = "bucket_unpacking_with_sort";
-const coll = db[collName];
-const metaCollName = "bucket_unpacking_with_sort_with_meta";
-const metaColl = db[metaCollName];
-const metaCollSubFieldsName = "bucket_unpacking_with_sort_with_meta_sub";
-const metaCollSubFields = db[metaCollSubFieldsName];
-const subFields = ["a", "b"];
-
-const setupColl = (coll, collName, usesMeta, subFields = null) => {
- coll.drop();
- if (usesMeta) {
- db.createCollection(collName, {timeseries: {timeField: "t", metaField: "m"}});
- } else {
- db.createCollection(collName, {timeseries: {timeField: "t"}});
- }
- // Create a few buckets.
- let meta = usesMeta ? 10 : 1;
- let docs = [];
- let numberOfItemsPerBucket =
- db.adminCommand({getParameter: 1, timeseriesBucketMaxCount: 1}).timeseriesBucketMaxCount;
- assert(Number.isInteger(numberOfItemsPerBucket));
- for (let j = 0; j < meta; ++j) {
- let metaVal = j;
- if (subFields) {
- metaVal = subFields.reduce((memo, field) => {
- return Object.assign(Object.assign({}, memo), {[field]: j});
- }, {});
- }
- for (let i = 0; i < numberOfItemsPerBucket; ++i) {
- docs.push({m: metaVal, t: new Date(i * 6000)});
- }
- // Because the max bucket span is 3600 * 1000 milliseconds, we know that the above will have
- // generated two buckets. Since we are now going back in time to before the minimum
- // timestamp of the second bucket, we'll open a third and fourth bucket below. Crucially
- // will overlap with the first two buckets.
- for (let i = 0; i < numberOfItemsPerBucket; ++i) {
- docs.push({m: metaVal, t: new Date(i * 6000 + 3000)});
- }
- }
- assert.commandWorked(coll.insert(docs));
-};
-
-setupColl(coll, collName, false);
-setupColl(metaColl, metaCollName, true);
-setupColl(metaCollSubFields, metaCollSubFieldsName, true, subFields);
-
-// For use in reductions.
-// Takes a memo (accumulator value) and a stage and returns if the stage was an internal bounded
-// sort or the memo was true.
-const stageIsInternalBoundedSort = (stage) => {
- return stage.hasOwnProperty("$_internalBoundedSort");
-};
-
-const hasInternalBoundedSort = (pipeline) => pipeline.some(stageIsInternalBoundedSort);
-
-const getIfMatch = (memo, stage) => {
- if (memo)
- return memo;
- else if (stage.hasOwnProperty("$match"))
- return stage;
- else
- return null;
-};
-
-const findFirstMatch = (pipeline) => pipeline.reduce(getIfMatch, null);
-
-const setup = (coll, createIndex = null) => {
- // Create index.
- if (createIndex) {
- assert.commandWorked(coll.createIndex(createIndex));
- }
-};
-
-/**
- * This test creates a time-series collection, inserts data (with multiple overlapping buckets), and
- * tests that the BUS optimization is performed and produces correct results.
- *
- * `sortSpec` is the sort that we wish to have optimized.
- * `createIndex` is the index that we need to create in order to perform the optimization.
- * It defaults to null which signals that we don't create an index.
- * `hint` is the hint that we specify in order to produce the optimization.
- * traversing a min (resp. max) field index on a descending (resp. ascending) sort.
- * `testColl` is the collection to use.
- */
-const runRewritesTest = (sortSpec,
- createIndex,
- hint,
- testColl,
- precise,
- intermediaryStages = [],
- posteriorStages = []) => {
- setup(testColl, createIndex);
-
- const options = (hint) ? {hint: hint} : {};
-
- const match = {$match: {t: {$lt: new Date("2022-01-01")}}};
- // Get results
- const optPipeline = [match, ...intermediaryStages, {$sort: sortSpec}, ...posteriorStages];
- const optResults = testColl.aggregate(optPipeline, options).toArray();
- const optExplainFull = testColl.explain().aggregate(optPipeline, options);
-
- const ogPipeline = [
- match,
- ...intermediaryStages,
- {$_internalInhibitOptimization: {}},
- {$sort: sortSpec},
- ...posteriorStages
- ];
- const ogResults = testColl.aggregate(ogPipeline, options).toArray();
- const ogExplainFull = testColl.explain().aggregate(ogPipeline, options);
-
- // Assert correct
- assert.docEq(optResults, ogResults, {optResults: optResults, ogResults: ogResults});
-
- // Check contains stage
- const optExplain = getExplainedPipelineFromAggregation(
- db, testColl, optPipeline, {inhibitOptimization: false, hint: hint});
- assert(hasInternalBoundedSort(optExplain), optExplainFull);
-
- // Check doesn't contain stage
- const ogExplain = getExplainedPipelineFromAggregation(
- db, testColl, ogPipeline, {inhibitOptimization: false, hint: hint});
- assert(!hasInternalBoundedSort(ogExplain), ogExplainFull);
-
- let foundMatch = findFirstMatch(optExplain);
- if (!precise) {
- assert.docEq(foundMatch, {
- $match: {
- $expr:
- {$lte: [{$subtract: ["$control.max.t", "$control.min.t"]}, {$const: 3600000}]}
- }
- });
- } else {
- assert.docEq(foundMatch, match);
- }
-};
-
-const runDoesntRewriteTest = (sortSpec, createIndex, hint, testColl, intermediaryStages = []) => {
- setup(testColl, createIndex);
-
- const optPipeline = [
- {$match: {t: {$lt: new Date("2022-01-01")}}},
- ...intermediaryStages,
- {$sort: sortSpec},
- ];
-
- // Check doesn't contain stage
- const optExplain = getExplainedPipelineFromAggregation(
- db, testColl, optPipeline, {inhibitOptimization: false, hint: hint});
- const optExplainFull = testColl.explain().aggregate(optPipeline, {hint});
- const containsOptimization = optExplain.reduce(stageIsInternalBoundedSort, false);
- assert(!containsOptimization, optExplainFull);
-};
-
-// Collscan cases
-runRewritesTest({t: 1}, null, null, coll, true);
-runRewritesTest({t: -1}, null, {$natural: -1}, coll, false);
-
-// Indexed cases
-runRewritesTest({t: 1}, {t: 1}, {t: 1}, coll, true);
-runRewritesTest({t: -1}, {t: -1}, {t: -1}, coll, true);
-runRewritesTest({t: 1}, {t: 1}, {t: 1}, coll, true);
-runRewritesTest({m: 1, t: -1}, {m: 1, t: -1}, {m: 1, t: -1}, metaColl, true);
-runRewritesTest({m: -1, t: 1}, {m: -1, t: 1}, {m: -1, t: 1}, metaColl, true);
-runRewritesTest({m: -1, t: -1}, {m: -1, t: -1}, {m: -1, t: -1}, metaColl, true);
-runRewritesTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, true);
-
-// Intermediary projects that don't modify sorted fields are allowed.
-runRewritesTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, true, [{$project: {a: 0}}]);
-runRewritesTest(
- {m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, true, [{$project: {m: 1, t: 1}}]);
-
-// Test multiple meta fields
-let metaIndexObj = subFields.reduce((memo, field) => {
- return Object.assign(Object.assign({}, memo), {[`m.${field}`]: 1});
-}, {});
-Object.assign(metaIndexObj, {t: 1});
-runRewritesTest(metaIndexObj, metaIndexObj, metaIndexObj, metaCollSubFields, true);
-runRewritesTest(
- metaIndexObj, metaIndexObj, metaIndexObj, metaCollSubFields, true, [{$project: {m: 1, t: 1}}]);
-
-// Check sort-limit optimization.
-runRewritesTest({t: 1}, {t: 1}, {t: 1}, coll, true, [], [{$limit: 10}]);
-
-// Check set window fields is optimized as well.
-// Since {k: 1} cannot provide a bounded sort we know if there's a bounded sort it comes form
-// setWindowFields.
-runRewritesTest({k: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, true, [], [
- {$setWindowFields: {partitionBy: "$m", sortBy: {t: 1}, output: {arr: {$max: "$t"}}}}
-]);
-
-// Negative tests
-for (let m = -1; m < 2; m++) {
- for (let t = -1; t < 2; t++) {
- for (let k = -1; k < 2; k++) {
- printjson({"currently running": "the following configuration...", m: m, t: t, k: k});
- let sort = null;
- let createIndex = null;
- let hint = null;
- let usesMeta = null;
- if (k != 0) {
- // This is the case where we add an intermediary incompatible field.
- if (m == 0) {
- if (t == 0) {
- sort = {k: k};
- } else {
- sort = {k: k, t: t};
- }
- } else {
- if (t == 0) {
- sort = {m: m, k: k};
- } else {
- sort = {m: m, k: k, t: t};
- }
- }
- hint = sort;
- createIndex = sort;
- usesMeta = m != 0;
- runDoesntRewriteTest(sort, createIndex, hint, usesMeta ? metaColl : coll);
- } else {
- // This is the case where we do not add an intermediary incompatible field.
- // Instead we enumerate the ways that the index and sort could disagree.
-
- // For the meta case, negate the time order.
- // For the non-meta case, use a collscan with a negated order.
- if (m == 0) {
- if (t == 0) {
- // Do not execute a test run.
- } else {
- sort = {t: t};
- hint = {$natural: -t};
- }
- } else {
- if (t == 0) {
- // Do not execute a test run.
- } else {
- usesMeta = true;
- sort = {m: m, t: t};
- hint = {m: m, t: -t};
- createIndex = hint;
- }
- }
-
- if (sort)
- runDoesntRewriteTest(sort, createIndex, hint, usesMeta ? metaColl : coll);
-
- sort = null;
- hint = null;
- createIndex = null;
- usesMeta = false;
- // For the meta case, negate the meta order.
- // For the non-meta case, use an index instead of a collscan.
- if (m == 0) {
- if (t == 0) {
- // Do not execute a test run.
- } else {
- sort = {t: t};
- createIndex = {t: -t};
- hint = createIndex;
- }
- } else {
- if (t == 0) {
- // Do not execute a test run.
- } else {
- usesMeta = true;
- sort = {m: m, t: t};
- hint = {m: -m, t: t};
- createIndex = hint;
- }
- }
-
- if (sort)
- runDoesntRewriteTest(sort, createIndex, hint, usesMeta ? metaColl : coll);
-
- sort = null;
- hint = null;
- createIndex = null;
- usesMeta = false;
- // For the meta case, negate both meta and time.
- if (m == 0) {
- if (t == 0) {
- // Do not execute a test run.
- } else {
- // Do not execute -- we've exhausted relevant cases.
- }
- } else {
- if (t == 0) {
- // Do not execute a test run.
- } else {
- usesMeta = true;
- sort = {m: m, t: t};
- hint = {m: -m, t: -t};
- createIndex = hint;
- }
- }
-
- if (sort)
- runDoesntRewriteTest(sort, createIndex, hint, usesMeta ? metaColl : coll);
- }
- }
- }
-}
-
-// Test mismatched meta paths don't produce the optimization.
-runDoesntRewriteTest({m: 1, t: 1}, {"m.a": 1, t: 1}, {"m.a": 1, t: 1}, metaCollSubFields);
-runDoesntRewriteTest(
- {"m.b": 1, t: 1}, {"m.a": 1, "m.b": 1, t: 1}, {"m.a": 1, "m.b": 1, t: 1}, metaCollSubFields);
-runDoesntRewriteTest({"m.a": 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaCollSubFields);
-runDoesntRewriteTest({"m.a": 1, "m.b": 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaCollSubFields);
-// Test matched meta-subpaths with mismatched directions don't produce the optimization.
-runDoesntRewriteTest({"m.a": 1, t: -1}, {"m.a": 1, t: 1}, {"m.a": 1, t: 1}, metaCollSubFields);
-// Test intermediary projections that exclude the sorted fields don't produce the optimizaiton.
-runDoesntRewriteTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, [{$project: {m: 0}}]);
-runDoesntRewriteTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, [{$project: {t: 0}}]);
-runDoesntRewriteTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, [{$project: {a: 1}}]);
-runDoesntRewriteTest({m: 1, t: 1}, {m: 1, t: 1}, {m: 1, t: 1}, metaColl, [{$project: {'m.a': 0}}]);
-runDoesntRewriteTest({'m.a': 1, t: 1}, {'m.a': 1, t: 1}, {'m.a': 1, t: 1}, metaCollSubFields, [
- {$project: {'m': 0}}
-]);
-})();
diff --git a/jstests/core/timeseries/timeseries_internal_bounded_sort.js b/jstests/core/timeseries/timeseries_internal_bounded_sort.js
index c52bb3efb65..30e58b7d42e 100644
--- a/jstests/core/timeseries/timeseries_internal_bounded_sort.js
+++ b/jstests/core/timeseries/timeseries_internal_bounded_sort.js
@@ -88,7 +88,7 @@ function runTest(ascending) {
$_internalBoundedSort: {
sortKey: {t: ascending ? 1 : -1},
bound: ascending ? {base: "min"}
- : {base: "min", offsetSeconds: bucketMaxSpanSeconds}
+ : {base: "min", offset: bucketMaxSpanSeconds}
}
},
])
@@ -104,7 +104,7 @@ function runTest(ascending) {
{
$_internalBoundedSort: {
sortKey: {t: ascending ? 1 : -1},
- bound: ascending ? {base: "max", offsetSeconds: -bucketMaxSpanSeconds}
+ bound: ascending ? {base: "max", offset: -bucketMaxSpanSeconds}
: {base: "max"}
}
},
@@ -136,7 +136,7 @@ function runTest(ascending) {
$_internalBoundedSort: {
sortKey: {t: ascending ? 1 : -1},
bound: ascending ? {base: "min"}
- : {base: "min", offsetSeconds: bucketMaxSpanSeconds},
+ : {base: "min", offset: bucketMaxSpanSeconds},
limit: 100
}
},
@@ -154,7 +154,7 @@ function runTest(ascending) {
{
$_internalBoundedSort: {
sortKey: {t: ascending ? 1 : -1},
- bound: ascending ? {base: "max", offsetSeconds: -bucketMaxSpanSeconds}
+ bound: ascending ? {base: "max", offset: -bucketMaxSpanSeconds}
: {base: "max"},
limit: 100
}
diff --git a/jstests/core/timeseries/timeseries_internal_bounded_sort_compound.js b/jstests/core/timeseries/timeseries_internal_bounded_sort_compound.js
index 208d6a45b9a..2e73a808d85 100644
--- a/jstests/core/timeseries/timeseries_internal_bounded_sort_compound.js
+++ b/jstests/core/timeseries/timeseries_internal_bounded_sort_compound.js
@@ -131,7 +131,7 @@ function runTest(sortSpec) {
$_internalBoundedSort: {
sortKey: sortSpec,
bound: sortSpec.t > 0 ? {base: "min"}
- : {base: "min", offsetSeconds: bucketMaxSpanSeconds}
+ : {base: "min", offset: bucketMaxSpanSeconds}
}
},
];
@@ -145,7 +145,7 @@ function runTest(sortSpec) {
{
$_internalBoundedSort: {
sortKey: sortSpec,
- bound: sortSpec.t > 0 ? {base: "max", offsetSeconds: -bucketMaxSpanSeconds}
+ bound: sortSpec.t > 0 ? {base: "max", offset: -bucketMaxSpanSeconds}
: {base: "max"}
}
},
@@ -174,7 +174,7 @@ function runTest(sortSpec) {
$_internalBoundedSort: {
sortKey: sortSpec,
bound: sortSpec.t > 0 ? {base: "min"}
- : {base: "min", offsetSeconds: bucketMaxSpanSeconds},
+ : {base: "min", offset: bucketMaxSpanSeconds},
limit: 100
}
}
@@ -190,7 +190,7 @@ function runTest(sortSpec) {
{
$_internalBoundedSort: {
sortKey: sortSpec,
- bound: sortSpec.t > 0 ? {base: "max", offsetSeconds: -bucketMaxSpanSeconds}
+ bound: sortSpec.t > 0 ? {base: "max", offset: -bucketMaxSpanSeconds}
: {base: "max"},
limit: 100
}
diff --git a/jstests/core/timeseries/timeseries_internal_bounded_sort_compound_mixed_types.js b/jstests/core/timeseries/timeseries_internal_bounded_sort_compound_mixed_types.js
index 417818559d6..a986458a99f 100644
--- a/jstests/core/timeseries/timeseries_internal_bounded_sort_compound_mixed_types.js
+++ b/jstests/core/timeseries/timeseries_internal_bounded_sort_compound_mixed_types.js
@@ -131,7 +131,7 @@ function runTest(sortSpec) {
// Use a much looser bound than necessary, to exercise the partitioning logic more.
// With such a loose bound, events are only released due to a partition boundary,
// never a bucket boundary.
- bound: {base: "min", offsetSeconds: -sortSpec.t * 10 * bucketMaxSpanSeconds},
+ bound: {base: "min", offset: -sortSpec.t * 10 * bucketMaxSpanSeconds},
}
},
];
@@ -152,7 +152,7 @@ function runTest(sortSpec) {
// Use a much looser bound than necessary, to exercise the partitioning logic more.
// With such a loose bound, events are only released due to a partition boundary,
// never a bucket boundary.
- bound: {base: "max", offsetSeconds: -sortSpec.t * 10 * bucketMaxSpanSeconds},
+ bound: {base: "max", offset: -sortSpec.t * 10 * bucketMaxSpanSeconds},
}
},
];
diff --git a/src/mongo/db/exec/bucket_unpacker.cpp b/src/mongo/db/exec/bucket_unpacker.cpp
index 0651aae78ee..7703c9d3949 100644
--- a/src/mongo/db/exec/bucket_unpacker.cpp
+++ b/src/mongo/db/exec/bucket_unpacker.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/matcher/expression_tree.h"
#include "mongo/db/matcher/extensions_callback_noop.h"
#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/timeseries/timeseries_constants.h"
#include "mongo/db/timeseries/timeseries_options.h"
namespace mongo {
diff --git a/src/mongo/db/exec/bucket_unpacker.h b/src/mongo/db/exec/bucket_unpacker.h
index 287bd9f2540..90465257415 100644
--- a/src/mongo/db/exec/bucket_unpacker.h
+++ b/src/mongo/db/exec/bucket_unpacker.h
@@ -37,7 +37,6 @@
#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/matcher/expression.h"
#include "mongo/db/pipeline/expression_context.h"
-#include "mongo/db/timeseries/timeseries_constants.h"
namespace mongo {
/**
@@ -290,22 +289,6 @@ public:
return _includeMaxTimeAsMetadata;
}
- const std::string& getTimeField() const {
- return _spec.timeField();
- }
-
- const boost::optional<std::string>& getMetaField() const {
- return _spec.metaField();
- }
-
- std::string getMinField(StringData field) const {
- return std::string{timeseries::kControlMinFieldNamePrefix} + field;
- }
-
- std::string getMaxField(StringData field) const {
- return std::string{timeseries::kControlMaxFieldNamePrefix} + field;
- }
-
void setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior);
void setIncludeMinTimeAsMetadata();
void setIncludeMaxTimeAsMetadata();
diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h
index f9ce637dbad..0de33f09899 100644
--- a/src/mongo/db/exec/collection_scan.h
+++ b/src/mongo/db/exec/collection_scan.h
@@ -91,10 +91,6 @@ public:
const SpecificStats* getSpecificStats() const final;
- CollectionScanParams::Direction getDirection() const {
- return _params.direction;
- }
-
protected:
void doSaveStateRequiresCollection() final;
diff --git a/src/mongo/db/exec/index_scan.h b/src/mongo/db/exec/index_scan.h
index c0cf75bf02d..3efc3360d08 100644
--- a/src/mongo/db/exec/index_scan.h
+++ b/src/mongo/db/exec/index_scan.h
@@ -131,14 +131,6 @@ public:
static const char* kStageType;
- const BSONObj& getKeyPattern() const {
- return _keyPattern;
- }
-
- bool isForward() const {
- return _forward;
- }
-
protected:
void doSaveStateRequiresIndex() final;
diff --git a/src/mongo/db/index/sort_key_generator.h b/src/mongo/db/index/sort_key_generator.h
index 40e270dac21..d9b73c75c38 100644
--- a/src/mongo/db/index/sort_key_generator.h
+++ b/src/mongo/db/index/sort_key_generator.h
@@ -75,10 +75,6 @@ public:
return _sortPattern.isSingleElementKey();
}
- const SortPattern& getSortPattern() const {
- return _sortPattern;
- }
-
private:
// Returns the sort key for the input 'doc' as a Value.
//
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index ff2aaf9f8ae..37a6b103140 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -49,7 +49,6 @@
#include "mongo/db/exec/scoped_timer.h"
#include "mongo/db/generic_cursor.h"
#include "mongo/db/jsobj.h"
-#include "mongo/db/matcher/expression_algo.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/expression_context.h"
@@ -598,41 +597,6 @@ public:
return newNames;
}
- bool canModify(const FieldPath& fieldPath) const {
- switch (type) {
- case Type::kAllPaths:
- return true;
- case Type::kNotSupported:
- return true;
- case Type::kFiniteSet:
- // If there's a subpath that is modified this path may be modified.
- for (size_t i = 0; i < fieldPath.getPathLength(); i++) {
- if (paths.count(fieldPath.getSubpath(i).toString()))
- return true;
- }
-
- for (auto&& path : paths) {
- // If there's a superpath that is modified this path may be modified.
- if (expression::isPathPrefixOf(fieldPath.fullPath(), path)) {
- return true;
- }
- }
-
- return false;
- case Type::kAllExcept:
- // If one of the subpaths is unmodified return false.
- for (size_t i = 0; i < fieldPath.getPathLength(); i++) {
- if (paths.count(fieldPath.getSubpath(i).toString()))
- return false;
- }
-
- // Otherwise return true;
- return true;
- }
- // Cannot hit.
- MONGO_UNREACHABLE_TASSERT(6434901);
- }
-
Type type;
std::set<std::string> paths;
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
index 6a1a57daa30..4aef3bc0c2d 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
@@ -1192,97 +1192,4 @@ DocumentSource::GetModPathsReturn DocumentSourceInternalUnpackBucket::getModifie
return {GetModPathsReturn::Type::kAllPaths, std::set<std::string>{}, {}};
}
-boost::optional<std::pair<DocumentSourceInternalUnpackBucket::IndexSortOrderAgree,
- DocumentSourceInternalUnpackBucket::IndexOrderedByMinTime>>
-DocumentSourceInternalUnpackBucket::supportsSort(PlanStage* root, const SortPattern& sort) const {
- if (!root)
- return boost::none;
-
- switch (root->stageType()) {
- case STAGE_COLLSCAN: {
- const CollectionScan* scan = static_cast<CollectionScan*>(root);
- if (sort.size() == 1) {
- auto part = sort[0];
- // Check the sort we're asking for is on time.
- if (part.fieldPath && *part.fieldPath == _bucketUnpacker.getTimeField()) {
- // Check that the directions agree.
- if ((scan->getDirection() == CollectionScanParams::Direction::FORWARD) ==
- part.isAscending)
- return std::pair{part.isAscending, true};
- }
- }
- return boost::none;
- }
- case STAGE_IXSCAN: {
- const IndexScan* scan = static_cast<IndexScan*>(root);
-
- const auto keyPattern = scan->getKeyPattern();
- const bool forward = scan->isForward();
-
- // Return none if the keyPattern cannot support the sort.
- // Note We add one to sort size to account for the compounding on min/max for the time
- // field.
- if ((sort.size() + 1 > (unsigned int)keyPattern.nFields()) || (sort.size() < 1)) {
- return boost::none;
- }
-
- if (sort.size() == 1) {
- auto part = sort[0];
-
- // TOOD SERVER-65050: implement here.
-
- // Check the sort we're asking for is on time.
- if (part.fieldPath && *part.fieldPath == _bucketUnpacker.getTimeField()) {
- auto keyPatternIter = keyPattern.begin();
-
- return checkTimeHelper(
- keyPatternIter, forward, *part.fieldPath, part.isAscending);
- }
- } else if (sort.size() >= 2) {
- size_t i = 0;
-
-
- for (auto keyPatternIter = keyPattern.begin();
- i < sort.size() && keyPatternIter != keyPattern.end();
- (++keyPatternIter)) {
- auto part = sort[i];
- if (!(part.fieldPath))
- return boost::none;
-
- if (i < sort.size() - 1) {
- // Check the meta field index isn't special.
- if (!(*keyPatternIter).isNumber() ||
- abs((*keyPatternIter).numberInt()) != 1) {
- return boost::none;
- }
-
- // True = ascending; false = descending.
- bool direction = ((*keyPatternIter).numberInt() == 1);
- direction = (forward) ? direction : !direction;
-
- // Return false if partOne and the first keyPattern part don't agree.
- if (!sortAndKeyPatternPartAgreeAndOnMeta((*keyPatternIter).fieldName(),
- *part.fieldPath) ||
- part.isAscending != direction)
- return boost::none;
- } else {
- if (!part.fieldPath ||
- !(*part.fieldPath == _bucketUnpacker.getTimeField())) {
- return boost::none;
- }
-
- return checkTimeHelper(
- keyPatternIter, forward, *part.fieldPath, part.isAscending);
- }
-
- // Increment index
- ++i;
- }
- }
- return boost::none;
- }
- default:
- return boost::none;
- }
-}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
index 8d7506ffe48..96bca0e1680 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
@@ -33,9 +33,6 @@
#include <vector>
#include "mongo/db/exec/bucket_unpacker.h"
-#include "mongo/db/exec/collection_scan.h"
-#include "mongo/db/exec/index_scan.h"
-#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_match.h"
@@ -106,18 +103,6 @@ public:
return DepsTracker::State::EXHAUSTIVE_ALL;
}
- int getBucketMaxSpanSeconds() const {
- return _bucketMaxSpanSeconds;
- }
-
- std::string getMinTimeField() const {
- return _bucketUnpacker.getMinField(_bucketUnpacker.getTimeField());
- }
-
- std::string getMaxTimeField() const {
- return _bucketUnpacker.getMaxField(_bucketUnpacker.getTimeField());
- }
-
boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
return boost::none;
};
@@ -126,20 +111,6 @@ public:
return _bucketUnpacker.copy();
}
- typedef bool IndexSortOrderAgree;
- typedef bool IndexOrderedByMinTime;
-
- /*
- * Takes a leaf plan stage and a sort pattern and returns a pair if they support the Bucket
-Unpacking with Sort Optimization.
- * The pair includes whether the index order and sort order agree with each other as its first
- * member and the order of the index as the second parameter.
- *
- * Note that the index scan order is different from the index order.
- */
- boost::optional<std::pair<IndexSortOrderAgree, IndexOrderedByMinTime>> supportsSort(
- PlanStage* root, const SortPattern& sort) const;
-
Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
Pipeline::SourceContainer* container) final;
@@ -193,14 +164,6 @@ Unpacking with Sort Optimization.
_bucketMaxCount = bucketMaxCount;
}
- void setIncludeMinTimeAsMetadata() {
- _bucketUnpacker.setIncludeMinTimeAsMetadata();
- }
-
- void setIncludeMaxTimeAsMetadata() {
- _bucketUnpacker.setIncludeMaxTimeAsMetadata();
- }
-
boost::optional<long long> sampleSize() const {
return _sampleSize;
}
@@ -255,68 +218,6 @@ Unpacking with Sort Optimization.
GetModPathsReturn getModifiedPaths() const final override;
private:
- /* This is a helper method for supportsSort. It takes the current iterator for the index
- * keyPattern, the direction of the index scan, the timeField path we're sorting on, and the
- * direction of the sort. It returns a pair if this data agrees and none if it doesn't.
- *
- * The pair contains whether the index order and the sort order agree with each other as the
- * firstmember and the order of the index as the second parameter.
- *
- * Note that the index scan order may be different from the index order.
- * N.B.: It ASSUMES that there are two members left in the keyPatternIter iterator, and that the
- * timeSortFieldPath is in fact the path on time.
- */
- boost::optional<std::pair<IndexSortOrderAgree, IndexOrderedByMinTime>> checkTimeHelper(
- BSONObj::iterator& keyPatternIter,
- bool scanIsForward,
- const FieldPath& timeSortFieldPath,
- bool sortIsAscending) const {
- bool wasMin = false;
- bool wasMax = false;
-
- // Check that the index isn't special.
- if ((*keyPatternIter).isNumber() && abs((*keyPatternIter).numberInt()) == 1) {
- bool direction = ((*keyPatternIter).numberInt() == 1);
- direction = (scanIsForward) ? direction : !direction;
-
- // Verify the direction and fieldNames match.
- wasMin = ((*keyPatternIter).fieldName() ==
- _bucketUnpacker.getMinField(timeSortFieldPath.fullPath()));
- wasMax = ((*keyPatternIter).fieldName() ==
- _bucketUnpacker.getMaxField(timeSortFieldPath.fullPath()));
- // Terminate early if it wasn't max or min or if the directions don't match.
- if ((wasMin || wasMax) && (sortIsAscending == direction))
- return std::pair{wasMin ? sortIsAscending : !sortIsAscending, wasMin};
- }
-
- return boost::none;
- }
-
- bool sortAndKeyPatternPartAgreeAndOnMeta(const char* keyPatternFieldName,
- const FieldPath& sortFieldPath) const {
- FieldPath keyPatternFieldPath = FieldPath(keyPatternFieldName);
-
- // If they don't have the same path length they cannot agree.
- if (keyPatternFieldPath.getPathLength() != sortFieldPath.getPathLength())
- return false;
-
- // Check these paths are on the meta field.
- if (keyPatternFieldPath.getSubpath(0) != mongo::timeseries::kBucketMetaFieldName)
- return false;
- if (!_bucketUnpacker.getMetaField() ||
- sortFieldPath.getSubpath(0) != *_bucketUnpacker.getMetaField()) {
- return false;
- }
-
- // If meta was the only path component then return true.
- // Note: We already checked that the path lengths are equal.
- if (keyPatternFieldPath.getPathLength() == 1)
- return true;
-
- // Otherwise return if the remaining path components are equal.
- return (keyPatternFieldPath.tail() == sortFieldPath.tail());
- }
-
GetNextResult doGetNext() final;
bool haveComputedMetaField() const;
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index 2503a2e0857..bf9c72eb89d 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -59,6 +59,9 @@ using std::unique_ptr;
using std::vector;
namespace {
+constexpr StringData kMin = "min"_sd;
+constexpr StringData kMax = "max"_sd;
+
struct BoundMakerMin {
const long long offset; // Offset in millis
@@ -69,9 +72,7 @@ struct BoundMakerMin {
}
Document serialize() const {
- // Convert from millis to seconds.
- return Document{{{"base"_sd, DocumentSourceSort::kMin},
- {DocumentSourceSort::kOffset, (offset / 1000)}}};
+ return Document{{{"base"_sd, kMin}, {"offset"_sd, offset}}};
}
};
@@ -85,9 +86,7 @@ struct BoundMakerMax {
}
Document serialize() const {
- // Convert from millis to seconds.
- return Document{{{"base"_sd, DocumentSourceSort::kMax},
- {DocumentSourceSort::kOffset, (offset / 1000)}}};
+ return Document{{{"base"_sd, kMax}, {"offset"_sd, offset}}};
}
};
struct CompAsc {
@@ -424,58 +423,6 @@ intrusive_ptr<DocumentSourceSort> DocumentSourceSort::create(
return pSort;
}
-intrusive_ptr<DocumentSourceSort> DocumentSourceSort::createBoundedSort(
- SortPattern pat,
- StringData boundBase,
- long long boundOffset,
- boost::optional<long long> limit,
- const intrusive_ptr<ExpressionContext>& expCtx) {
-
- auto ds = DocumentSourceSort::create(expCtx, pat);
-
- SortOptions opts;
- opts.maxMemoryUsageBytes = internalQueryMaxBlockingSortMemoryUsageBytes.load();
- if (expCtx->allowDiskUse) {
- opts.extSortAllowed = true;
- opts.tempDir = expCtx->tempDir;
- }
-
- if (limit) {
- opts.Limit(limit.get());
- }
-
- if (boundBase == kMin) {
- if (pat.back().isAscending) {
- ds->_timeSorter.reset(
- new TimeSorterAscMin{opts, CompAsc{}, BoundMakerMin{boundOffset}});
- } else {
- ds->_timeSorter.reset(
- new TimeSorterDescMin{opts, CompDesc{}, BoundMakerMin{boundOffset}});
- }
- ds->_requiredMetadata.set(DocumentMetadataFields::MetaType::kTimeseriesBucketMinTime);
- } else if (boundBase == kMax) {
- if (pat.back().isAscending) {
- ds->_timeSorter.reset(
- new TimeSorterAscMax{opts, CompAsc{}, BoundMakerMax{boundOffset}});
- } else {
- ds->_timeSorter.reset(
- new TimeSorterDescMax{opts, CompDesc{}, BoundMakerMax{boundOffset}});
- }
- ds->_requiredMetadata.set(DocumentMetadataFields::MetaType::kTimeseriesBucketMaxTime);
- } else {
- MONGO_UNREACHABLE;
- }
-
- if (pat.size() > 1) {
- SortPattern partitionKey =
- std::vector<SortPattern::SortPatternPart>(pat.begin(), pat.end() - 1);
- ds->_timeSorterPartitionKeyGen =
- SortKeyGenerator{std::move(partitionKey), expCtx->getCollator()};
- }
-
- return ds;
-}
-
intrusive_ptr<DocumentSourceSort> DocumentSourceSort::parseBoundedSort(
BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
uassert(6369905,
@@ -504,7 +451,7 @@ intrusive_ptr<DocumentSourceSort> DocumentSourceSort::parseBoundedSort(
uassert(
6460200, "$_internalBoundedSort bound must be an object", bound && bound.type() == Object);
- BSONElement boundOffsetElem = bound.Obj()[DocumentSourceSort::kOffset];
+ BSONElement boundOffsetElem = bound.Obj()["offset"];
long long boundOffset = 0;
if (boundOffsetElem && boundOffsetElem.isNumber()) {
boundOffset = uassertStatusOK(boundOffsetElem.parseIntegerElementToLong()) *
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index ac592c08c6c..89e302330c2 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -42,10 +42,6 @@ namespace mongo {
class DocumentSourceSort final : public DocumentSource {
public:
- static constexpr StringData kMin = "min"_sd;
- static constexpr StringData kMax = "max"_sd;
- static constexpr StringData kOffset = "offsetSeconds"_sd;
-
struct SortableDate {
Date_t date;
@@ -134,12 +130,6 @@ public:
return create(pExpCtx, {sortOrder, pExpCtx});
}
- static boost::intrusive_ptr<DocumentSourceSort> createBoundedSort(
- SortPattern pat,
- StringData boundBase,
- long long boundOffset,
- boost::optional<long long> limit,
- const boost::intrusive_ptr<ExpressionContext>& expCtx);
/**
* Parse a stage that uses BoundedSorter.
*/
@@ -174,10 +164,6 @@ public:
return _populated;
};
- bool isBoundedSortStage() {
- return (_timeSorter) ? true : false;
- }
-
bool hasLimit() const {
return _sortExecutor->hasLimit();
}
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 8347e6ed458..0719e235bb9 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -28,7 +28,6 @@
*/
#include "mongo/db/query/projection_parser.h"
-#include "mongo/db/server_options.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
#include "mongo/platform/basic.h"
@@ -54,7 +53,6 @@
#include "mongo/db/exec/unpack_timeseries_bucket.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/index/index_access_method.h"
-#include "mongo/db/matcher/expression_expr.h"
#include "mongo/db/matcher/extensions_callback_real.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/ops/write_ops_exec.h"
@@ -79,7 +77,6 @@
#include "mongo/db/query/collation/collator_interface.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_executor_factory.h"
-#include "mongo/db/query/plan_executor_impl.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/query_feature_flags_gen.h"
#include "mongo/db/query/query_knobs_gen.h"
@@ -409,35 +406,6 @@ std::pair<DocumentSourceSample*, DocumentSourceInternalUnpackBucket*> extractSam
return std::pair{sampleStage, unpackStage};
}
-
-std::tuple<DocumentSourceInternalUnpackBucket*, DocumentSourceSort*> findUnpackThenSort(
- const Pipeline::SourceContainer& sources) {
- DocumentSourceSort* sortStage = nullptr;
- DocumentSourceInternalUnpackBucket* unpackStage = nullptr;
-
- auto sourcesIt = sources.begin();
- while (sourcesIt != sources.end()) {
- if (!sortStage) {
- sortStage = dynamic_cast<DocumentSourceSort*>(sourcesIt->get());
-
- if (sortStage) {
- // Do not double optimize
- if (sortStage->isBoundedSortStage()) {
- return {nullptr, nullptr};
- }
-
- return {unpackStage, sortStage};
- }
- }
-
- if (!unpackStage) {
- unpackStage = dynamic_cast<DocumentSourceInternalUnpackBucket*>(sourcesIt->get());
- }
- ++sourcesIt;
- }
-
- return {unpackStage, sortStage};
-}
} // namespace
StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::createRandomCursorExecutor(
@@ -976,165 +944,6 @@ PipelineD::buildInnerQueryExecutorGeneric(const MultipleCollectionAccessor& coll
Pipeline::kAllowedMatcherFeatures,
&shouldProduceEmptyDocs));
- // If this is a query on a time-series collection then it may be eligible for a post-planning
- // sort optimization. We check eligibility and perform the rewrite here.
- auto [unpack, sort] = findUnpackThenSort(pipeline->_sources);
- if (serverGlobalParams.featureCompatibility.isVersionInitialized() &&
- serverGlobalParams.featureCompatibility.isGreaterThanOrEqualTo(
- multiversion::FeatureCompatibilityVersion::kVersion_6_1) &&
- feature_flags::gFeatureFlagBucketUnpackWithSort.isEnabled(
- serverGlobalParams.featureCompatibility) &&
- unpack && sort) {
- auto execImpl = dynamic_cast<PlanExecutorImpl*>(exec.get());
- if (execImpl) {
-
- // Get source stage
- PlanStage* rootStage = execImpl->getRootStage();
- while (rootStage && rootStage->getChildren().size() == 1) {
- switch (rootStage->stageType()) {
- case STAGE_FETCH:
- rootStage = rootStage->child().get();
- break;
- case STAGE_SHARDING_FILTER:
- rootStage = rootStage->child().get();
- break;
- default:
- rootStage = nullptr;
- }
- }
-
- if (rootStage && rootStage->getChildren().size() != 0) {
- rootStage = nullptr;
- }
-
- const auto& sortPattern = sort->getSortKeyPattern();
- if (auto agree = unpack->supportsSort(rootStage, sortPattern)) {
- // Scan the pipeline to check if it's compatible with the optimization.
- bool badStage = false;
- bool seenSort = false;
- std::list<boost::intrusive_ptr<DocumentSource>>::iterator iter =
- pipeline->_sources.begin();
- std::list<boost::intrusive_ptr<DocumentSource>>::iterator unpackIter =
- pipeline->_sources.end();
- for (; !badStage && iter != pipeline->_sources.end() && !seenSort; ++iter) {
- if (dynamic_cast<const DocumentSourceSort*>(iter->get())) {
- seenSort = true;
- } else if (dynamic_cast<const DocumentSourceMatch*>(iter->get())) {
- // do nothing
- } else if (dynamic_cast<const DocumentSourceInternalUnpackBucket*>(
- iter->get())) {
- unpackIter = iter;
- } else if (auto projection =
- dynamic_cast<const DocumentSourceSingleDocumentTransformation*>(
- iter->get())) {
- auto modPaths = projection->getModifiedPaths();
-
- // Check to see if the sort paths are modified.
- for (auto sortIter = sortPattern.begin();
- !badStage && sortIter != sortPattern.end();
- ++sortIter) {
-
- auto fieldPath = sortIter->fieldPath;
- // If they are then escap the loop & don't optimize.
- if (!fieldPath || modPaths.canModify(*fieldPath)) {
- badStage = true;
- }
- }
-
- } else {
- badStage = true;
- }
- }
- if (!badStage && seenSort) {
- auto [indexSortOrderAgree, indexOrderedByMinTime] = *agree;
- // This is safe because we have seen a sort so we must have at least one stage
- // to the left of the current iterator position.
- --iter;
-
- if (indexOrderedByMinTime) {
- unpack->setIncludeMinTimeAsMetadata();
- } else {
- unpack->setIncludeMaxTimeAsMetadata();
- }
-
- if (indexSortOrderAgree) {
- pipeline->_sources.insert(
- iter,
- DocumentSourceSort::createBoundedSort(sort->getSortKeyPattern(),
- (indexOrderedByMinTime
- ? DocumentSourceSort::kMin
- : DocumentSourceSort::kMax),
- 0,
- sort->getLimit(),
- expCtx));
- } else {
- // Since the sortPattern and the direction of the index don't agree we must
- // use the offset to get an estimate on the bounds of the bucket.
- pipeline->_sources.insert(
- iter,
- DocumentSourceSort::createBoundedSort(
- sort->getSortKeyPattern(),
- (indexOrderedByMinTime ? DocumentSourceSort::kMin
- : DocumentSourceSort::kMax),
- ((indexOrderedByMinTime) ? unpack->getBucketMaxSpanSeconds()
- : -unpack->getBucketMaxSpanSeconds()) *
- 1000,
- sort->getLimit(),
- expCtx));
-
- /**
- * We wish to create the following predicate to avoid returning incorrect
- * results in the unlikely event bucketMaxSpanSeconds changes under us.
- *
- * {$expr:
- * {$lte: [
- * {$subtract: [$control.max.timeField, $control.min.timeField]},
- * {$const: bucketMaxSpanSeconds, in milliseconds}
- * ]}}
- */
- auto minTime = unpack->getMinTimeField();
- auto maxTime = unpack->getMaxTimeField();
- auto match = std::make_unique<ExprMatchExpression>(
- // This produces {$lte: ... }
- make_intrusive<ExpressionCompare>(
- expCtx.get(),
- ExpressionCompare::CmpOp::LTE,
- // This produces [...]
- makeVector<boost::intrusive_ptr<Expression>>(
- // This produces {$subtract: ... }
- make_intrusive<ExpressionSubtract>(
- expCtx.get(),
- // This produces [...]
- makeVector<boost::intrusive_ptr<Expression>>(
- // This produces "$control.max.timeField"
- ExpressionFieldPath::createPathFromString(
- expCtx.get(), maxTime, expCtx->variablesParseState),
- // This produces "$control.min.timeField"
- ExpressionFieldPath::createPathFromString(
- expCtx.get(),
- minTime,
- expCtx->variablesParseState))),
- // This produces {$const: maxBucketSpanSeconds}
- make_intrusive<ExpressionConstant>(
- expCtx.get(),
- Value{unpack->getBucketMaxSpanSeconds() * 1000}))),
- expCtx);
- pipeline->_sources.insert(
- unpackIter,
- make_intrusive<DocumentSourceMatch>(std::move(match), expCtx));
- }
- // Ensure we're erasing the sort source.
- tassert(6434901,
- "we must erase a $sort stage and replace it with a bounded sort stage",
- strcmp((*iter)->getSourceName(),
- DocumentSourceSort::kStageName.rawData()) == 0);
- pipeline->_sources.erase(iter);
- pipeline->stitch();
- }
- }
- }
- }
-
const auto cursorType = shouldProduceEmptyDocs
? DocumentSourceCursor::CursorType::kEmptyDocuments
: DocumentSourceCursor::CursorType::kRegular;