summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlya Berciu <alya.berciu@mongodb.com>2022-04-13 13:01:29 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-13 13:46:46 +0000
commit3956cbffd669f9ceeeb0733730aa0b643e3bcc35 (patch)
tree0529772f55b3bebce8484a0e0e55b77fb1470c28
parent30a80b80911965bb9b60d3f8e912fc28524fb5bd (diff)
downloadmongo-3956cbffd669f9ceeeb0733730aa0b643e3bcc35.tar.gz
SERVER-61656 Extend lastpoint rewrite to $top, $bottom, $topN, $bottomN
-rw-r--r--jstests/core/timeseries/libs/timeseries_lastpoint_helpers.js280
-rw-r--r--jstests/core/timeseries/timeseries_lastpoint.js420
-rw-r--r--jstests/core/timeseries/timeseries_lastpoint_top.js165
-rw-r--r--src/mongo/db/pipeline/accumulator_multi.cpp2
-rw-r--r--src/mongo/db/pipeline/accumulator_multi.h40
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp143
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp236
7 files changed, 910 insertions, 376 deletions
diff --git a/jstests/core/timeseries/libs/timeseries_lastpoint_helpers.js b/jstests/core/timeseries/libs/timeseries_lastpoint_helpers.js
new file mode 100644
index 00000000000..756317cb3b9
--- /dev/null
+++ b/jstests/core/timeseries/libs/timeseries_lastpoint_helpers.js
@@ -0,0 +1,280 @@
+/**
+ * Helpers for testing lastpoint queries on time-series collections.
+ */
+
+load("jstests/aggregation/extras/utils.js");
+load("jstests/core/timeseries/libs/timeseries_agg_helpers.js");
+load("jstests/libs/analyze_plan.js");
+
+// These are functions instead of const variables to avoid tripping up the parallel jstests.
+function getEquivalentStrings() {
+ return ['a', 'A', 'b', 'B'];
+}
+
+function getEquivalentNumbers() {
+ return [7, NumberInt(7), NumberLong(7), NumberDecimal(7)];
+}
+
+function verifyLastpoint({tsColl, observerColl, pipeline, precedingFilter, expectStage}) {
+ // Verify lastpoint optmization.
+ const explain = tsColl.explain().aggregate(pipeline);
+ expectStage({explain, precedingFilter});
+
+ // Assert that the time-series aggregation results match that of the observer collection.
+ const expected = observerColl.aggregate(pipeline).toArray();
+ const actual = tsColl.aggregate(pipeline).toArray();
+ assertArrayEq({actual, expected});
+}
+
+function createBoringCollections(includeIdleMeasurements = false) {
+ // Prepare collections. Note: we usually test without idle measurements (all meta subfields are
+ // non-null). If we allow the insertion of idle measurements, we will obtain multiple lastpoints
+ // per bucket, and may have different results on the observer and timeseries collections.
+ const numHosts = 10;
+ const numIterations = 20;
+ return TimeseriesAggTests.prepareInputCollections(
+ numHosts, numIterations, includeIdleMeasurements);
+}
+
+// Generate interesting values.
+function generateInterestingValues() {
+ const epoch = ISODate('1970-01-01');
+
+ // Pick values with interesting equality behavior.
+ let values = [
+ // Arrays whose highest or lowest element is equal.
+ [5],
+ [5, 99],
+ [99],
+ // Objects that differ only by field order.
+ {x: 1, y: 2},
+ {y: 2, x: 1},
+ // A variety of values that are "empty" somehow.
+ // Missing can't be represented as a JS value--handled later.
+ null,
+ // Undefined is not supported:
+ // "The $_internalUnpackBucket stage allows metadata to be absent or otherwise, it must
+ // not be the deprecated undefined bson type", code 5369600.
+ undefined,
+ [],
+ {},
+ "",
+ ];
+
+ // Test strings that differ only by case and numbers that differ only by type.
+ values = values.concat(getEquivalentStrings()).concat(getEquivalentNumbers());
+
+ // Also wrap each interesting value in an object or array.
+ // Some values that are "equal" at the top level may be distinguished when wrapped this way.
+ const arrayWrapped = values.map(v => [v]);
+ const objectWrapped = values.map(v => ({w: v}));
+ values = values.concat(arrayWrapped).concat(objectWrapped);
+
+ let docs = [];
+ // Each event's _id is an autoincrementing number, and its timestamp is epoch + _id.
+ // Run through the interesting values twice to ensure each one has two events.
+ // Do this in the outer loop to ensure all the intervals overlap.
+ for (const _ of [1, 2]) {
+ for (const m of values) {
+ docs.push({_id: docs.length, tags: {hostid: m}, time: new Date(+epoch + docs.length)});
+ }
+ // Handle 'missing' metaField.
+ docs.push({_id: docs.length, time: new Date(+epoch + docs.length)});
+ }
+
+ for (const m of values) {
+ // Push a second measurement an hour later to create another bucket for this meta field.
+ docs.push({
+ _id: docs.length,
+ tags: {hostid: m},
+ time: new Date(+epoch + docs.length + 60 * 60 * 1000)
+ });
+ }
+
+ return docs;
+}
+
+function getMapInterestingValuesToEquivalentsStage() {
+ const firstElemInId = {$arrayElemAt: ["$_id", 0]};
+ const isIdArray = {$isArray: "$_id"};
+ const equivalentStrings = getEquivalentStrings();
+ const equivalentNumbers = getEquivalentNumbers();
+ return {
+ $addFields: {
+ _id: {
+ $switch: {
+ branches: [
+ // Replace equivalent string cases with their lowercase counterparts.
+ {
+ case: {$in: ["$_id.w", equivalentStrings]},
+ then: {w: {$toLower: "$_id.w"}}
+ },
+ {
+ case: {$and: [isIdArray, {$in: [firstElemInId, equivalentStrings]}]},
+ then: [{$toLower: firstElemInId}]
+ },
+ {
+ case: {$and: [{$not: isIdArray}, {$in: ["$_id", equivalentStrings]}]},
+ then: {$toLower: "$_id"}
+ },
+ // Replace equal numbers with different numeric types with an int.
+ {case: {$in: ["$_id.w", equivalentNumbers]}, then: {w: 7}},
+ {
+ case: {$and: [isIdArray, {$in: [firstElemInId, equivalentNumbers]}]},
+ then: [7]
+ },
+ {
+ case: {$and: [{$not: isIdArray}, {$in: ["$_id", equivalentNumbers]}]},
+ then: 7
+ },
+ ],
+ default: "$_id"
+ }
+ }
+ }
+ };
+}
+
+function createInterestingCollections() {
+ const testDB = TimeseriesAggTests.getTestDb();
+ const collation = {locale: 'en_US', strength: 2};
+
+ // Prepare timeseries collection.
+ const tsCollName = "in";
+ assert.commandWorked(testDB.createCollection(
+ tsCollName, {timeseries: {timeField: "time", metaField: "tags"}, collation}));
+ const tsColl = testDB[tsCollName];
+
+ const interestingValues = generateInterestingValues();
+ assert.commandWorked(tsColl.insertMany(interestingValues));
+
+ // Prepare observer collection.
+ const observerCollName = "observer_in";
+ assert.commandWorked(testDB.createCollection(observerCollName, {collation}));
+ const observerColl = testDB[observerCollName];
+
+ // We can't just insert the values directly, because bucketing treats "interesting" metaField
+ // values differently than a regular collection would. For example, a true timeseries collection
+ // would treat objects that only differ by field order as being equivalent meta values. For the
+ // purposes of this test we don't care about the semantic difference between timeseries
+ // collection bucketing and regular collections, only about the accuracy of the lastpoint
+ // rewrite.
+ assert.commandWorked(observerColl.insertMany(tsColl.find().toArray()));
+
+ return [tsColl, observerColl];
+}
+
+function expectDistinctScan({explain}) {
+ // The query can utilize DISTINCT_SCAN.
+ assert.neq(getAggPlanStage(explain, "DISTINCT_SCAN"), null, explain);
+
+ // Pipelines that use the DISTINCT_SCAN optimization should not also have a blocking sort.
+ assert.eq(getAggPlanStage(explain, "SORT"), null, explain);
+}
+
+function expectCollScan({explain, precedingFilter, noSortInCursor}) {
+ if (noSortInCursor) {
+ // We need a separate sort stage.
+ assert.eq(getAggPlanStage(explain, "SORT"), null, explain);
+ } else {
+ // $sort can be pushed into the cursor layer.
+ assert.neq(getAggPlanStage(explain, "SORT"), null, explain);
+ }
+
+ // At the bottom, there should be a COLLSCAN.
+ const collScanStage = getAggPlanStage(explain, "COLLSCAN");
+ assert.neq(collScanStage, null, explain);
+ if (precedingFilter) {
+ assert.eq(precedingFilter, collScanStage.filter, collScanStage);
+ }
+}
+
+function expectIxscan({explain, noSortInCursor}) {
+ if (noSortInCursor) {
+ // We can rely on the index without a cursor $sort.
+ assert.eq(getAggPlanStage(explain, "SORT"), null, explain);
+ } else {
+ // $sort can be pushed into the cursor layer.
+ assert.neq(getAggPlanStage(explain, "SORT"), null, explain);
+ }
+
+ // At the bottom, there should be a IXSCAN.
+ assert.neq(getAggPlanStage(explain, "IXSCAN"), null, explain);
+}
+
+/**
+ Test cases:
+ 1. Lastpoint queries on indexes with descending time and $first/$top (DISTINCT_SCAN).
+ 2. Lastpoint queries on indexes with ascending time and $last/$bottom (no DISTINCT_SCAN).
+ 3. Lastpoint queries on indexes with ascending time and $last/$bottom and an additional
+ secondary index so that we can use the DISTINCT_SCAN optimization.
+*/
+function testAllTimeMetaDirections(tsColl, observerColl, getTestCases) {
+ const testDB = TimeseriesAggTests.getTestDb();
+ const testCases = [
+ {time: -1, useBucketsIndex: false},
+ {time: 1, useBucketsIndex: false},
+ {time: 1, useBucketsIndex: true}
+ ];
+
+ for (const {time, useBucketsIndex} of testCases) {
+ const isTimeDescending = time < 0;
+ const canUseDistinct = isTimeDescending || useBucketsIndex;
+
+ // Test both directions of the metaField sort for each direction of time.
+ for (const metaDir of [1, -1]) {
+ const index = {"tags.hostid": metaDir, time};
+ const bucketsIndex = useBucketsIndex
+ ? {"meta.hostid": metaDir, "control.max.time": 1, "control.min.time": 1}
+ : undefined;
+
+ const tests = getTestCases({
+ canUseDistinct,
+ canSortOnTimeUseDistinct: (metaDir > 0) && (isTimeDescending || useBucketsIndex),
+ time,
+ index
+ });
+
+ // Run all tests without an index.
+ for (const {pipeline, expectStageNoIndex, precedingFilter} of tests) {
+ // Normally we expect to see a COLLSCAN with a SORT pushed into the cursor, but some
+ // test-cases may override this.
+ const expectStage = expectStageNoIndex || expectCollScan;
+ verifyLastpoint({tsColl, observerColl, pipeline, precedingFilter, expectStage});
+ }
+
+ // Create index on the timeseries collection.
+ const ixName = "tsIndex_time_" + time + "_meta_" + metaDir;
+ tsColl.createIndex(index, {name: ixName});
+
+ // Create an additional secondary index directly on the buckets collection so that we
+ // can test the DISTINCT_SCAN optimization when time is sorted in ascending order.
+ const bucketsColl = testDB["system.buckets.in"];
+ const bucketsIxName = "bucketsIndex_time_" + time + "_meta_" + metaDir;
+ if (bucketsIndex) {
+ bucketsColl.createIndex(bucketsIndex, {name: bucketsIxName});
+ }
+
+ // Re-run all tests with an index.
+ for (const {pipeline, expectStageWithIndex, precedingFilter} of tests) {
+ verifyLastpoint({
+ tsColl,
+ observerColl,
+ pipeline,
+ precedingFilter,
+ expectStage: expectStageWithIndex
+ });
+ }
+
+ // Drop indexes for next test.
+ tsColl.dropIndex(ixName);
+ if (bucketsIndex) {
+ bucketsColl.dropIndex(bucketsIxName);
+ }
+ }
+ }
+
+ // Drop collections at the end of the test.
+ tsColl.drop();
+ observerColl.drop();
+}
diff --git a/jstests/core/timeseries/timeseries_lastpoint.js b/jstests/core/timeseries/timeseries_lastpoint.js
index f3cdb3ebd2e..d3bdc66d3bd 100644
--- a/jstests/core/timeseries/timeseries_lastpoint.js
+++ b/jstests/core/timeseries/timeseries_lastpoint.js
@@ -18,206 +18,26 @@
load("jstests/aggregation/extras/utils.js");
load("jstests/core/timeseries/libs/timeseries_agg_helpers.js");
-load('jstests/libs/analyze_plan.js');
+load("jstests/core/timeseries/libs/timeseries_lastpoint_helpers.js");
+load("jstests/libs/analyze_plan.js");
const testDB = TimeseriesAggTests.getTestDb();
assert.commandWorked(testDB.dropDatabase());
// Do not run the rest of the tests if the lastpoint optimization is disabled.
-const getLastpointParam = db.adminCommand({getParameter: 1, featureFlagLastPointQuery: 1});
-const isLastpointEnabled = getLastpointParam.hasOwnProperty("featureFlagLastPointQuery") &&
- getLastpointParam.featureFlagLastPointQuery.value;
-if (!isLastpointEnabled) {
+if (!FeatureFlagUtil.isEnabled(db, "LastPointQuery")) {
return;
}
-function verifyLastpoint({tsColl, observerColl, pipeline, precedingFilter, expectStage}) {
- // Verify lastpoint optmization.
- const explain = tsColl.explain().aggregate(pipeline);
- expectStage({explain, precedingFilter});
-
- // Assert that the time-series aggregation results match that of the observer collection.
- const expected = observerColl.aggregate(pipeline).toArray();
- const actual = tsColl.aggregate(pipeline).toArray();
- assertArrayEq({actual, expected});
-}
-
-function createBoringCollections(includeIdleMeasurements = false) {
- // Prepare collections. Note: we usually test without idle measurements (all meta subfields are
- // non-null). If we allow the insertion of idle measurements, we will obtain multiple lastpoints
- // per bucket, and may have different results on the observer and timeseries collections.
- const numHosts = 10;
- const numIterations = 20;
- return TimeseriesAggTests.prepareInputCollections(
- numHosts, numIterations, includeIdleMeasurements);
-}
-
-// Generate interesting values.
-const equivalentStrings = ['a', 'A', 'b', 'B'];
-const equivalentNumbers = [7, NumberInt(7), NumberLong(7), NumberDecimal(7)];
-function generateInterestingValues() {
- const epoch = ISODate('1970-01-01');
-
- // Pick values with interesting equality behavior.
- let values = [
- // Arrays whose highest or lowest element is equal.
- [5],
- [5, 99],
- [99],
- // Objects that differ only by field order.
- {x: 1, y: 2},
- {y: 2, x: 1},
- // A variety of values that are "empty" somehow.
- // Missing can't be represented as a JS value--handled later.
- null,
- // Undefined is not supported:
- // "The $_internalUnpackBucket stage allows metadata to be absent or otherwise, it must
- // not be the deprecated undefined bson type", code 5369600.
- undefined,
- [],
- {},
- "",
- ];
-
- // Test strings that differ only by case and numbers that differ only by type.
- values = values.concat(equivalentStrings).concat(equivalentNumbers);
-
- // Also wrap each interesting value in an object or array.
- // Some values that are "equal" at the top level may be distinguished when wrapped this way.
- const arrayWrapped = values.map(v => [v]);
- const objectWrapped = values.map(v => ({w: v}));
- values = values.concat(arrayWrapped).concat(objectWrapped);
-
- let docs = [];
- // Each event's _id is an autoincrementing number, and its timestamp is epoch + _id.
- // Run through the interesting values twice to ensure each one has two events.
- // Do this in the outer loop to ensure all the intervals overlap.
- for (const _ of [1, 2]) {
- for (const m of values) {
- docs.push({_id: docs.length, tags: {hostid: m}, time: new Date(+epoch + docs.length)});
- }
- // Handle 'missing' metaField.
- docs.push({_id: docs.length, time: new Date(+epoch + docs.length)});
- }
-
- for (const m of values) {
- // Push a second measurement an hour later to create another bucket for this meta field.
- docs.push({
- _id: docs.length,
- tags: {hostid: m},
- time: new Date(+epoch + docs.length + 60 * 60 * 1000)
- });
- }
-
- return docs;
-}
-
-function getMapInterestingValuesToEquivalentsStage() {
- const firstElemInId = {$arrayElemAt: ["$_id", 0]};
- const isIdArray = {$isArray: "$_id"};
- return {
- $addFields: {
- _id: {
- $switch: {
- branches: [
- // Replace equivalent string cases with their lowercase counterparts.
- {
- case: {$in: ["$_id.w", equivalentStrings]},
- then: {w: {$toLower: "$_id.w"}}
- },
- {
- case: {$and: [isIdArray, {$in: [firstElemInId, equivalentStrings]}]},
- then: [{$toLower: firstElemInId}]
- },
- {
- case: {$and: [{$not: isIdArray}, {$in: ["$_id", equivalentStrings]}]},
- then: {$toLower: "$_id"}
- },
- // Replace equal numbers with different numeric types with an int.
- {case: {$in: ["$_id.w", equivalentNumbers]}, then: {w: 7}},
- {
- case: {$and: [isIdArray, {$in: [firstElemInId, equivalentNumbers]}]},
- then: [7]
- },
- {
- case: {$and: [{$not: isIdArray}, {$in: ["$_id", equivalentNumbers]}]},
- then: 7
- },
- ],
- default: "$_id"
- }
- }
- }
- };
-}
-
-function createInterestingCollections() {
- const collation = {locale: 'en_US', strength: 2};
-
- // Prepare timeseries collection.
- const tsCollName = "in";
- assert.commandWorked(testDB.createCollection(
- tsCollName, {timeseries: {timeField: "time", metaField: "tags"}, collation}));
- const tsColl = testDB[tsCollName];
-
- const interestingValues = generateInterestingValues();
- assert.commandWorked(tsColl.insertMany(interestingValues));
-
- // Prepare observer collection.
- const observerCollName = "observer_in";
- assert.commandWorked(testDB.createCollection(observerCollName, {collation}));
- const observerColl = testDB[observerCollName];
-
- // We can't just insert the values directly, because bucketing treats "interesting" metaField
- // values differently than a regular collection would. For example, a true timeseries collection
- // would treat objects that only differ by field order as being equivalent meta values. For the
- // purposes of this test we don't care about the semantic difference between timeseries
- // collection bucketing and regular collections, only about the accuracy of the lastpoint
- // rewrite.
- assert.commandWorked(observerColl.insertMany(tsColl.find().toArray()));
-
- return [tsColl, observerColl];
-}
-
-function expectDistinctScan({explain}) {
- // The query can utilize DISTINCT_SCAN.
- assert.neq(getAggPlanStage(explain, "DISTINCT_SCAN"), null, explain);
-
- // Pipelines that use the DISTINCT_SCAN optimization should not also have a blocking sort.
- assert.eq(getAggPlanStage(explain, "SORT"), null, explain);
-}
-
-function expectCollScan({explain, precedingFilter, noSortInCursor}) {
- if (noSortInCursor) {
- // We need a separate sort stage.
- assert.eq(getAggPlanStage(explain, "SORT"), null, explain);
- } else {
- // $sort can be pushed into the cursor layer.
- assert.neq(getAggPlanStage(explain, "SORT"), null, explain);
- }
-
- // At the bottom, there should be a COLLSCAN.
- const collScanStage = getAggPlanStage(explain, "COLLSCAN");
- assert.neq(collScanStage, null, explain);
- if (precedingFilter) {
- assert.eq(precedingFilter, collScanStage.filter, collScanStage);
- }
-}
-
-function expectIxscan({explain, noSortInCursor}) {
- if (noSortInCursor) {
- // We can rely on the index without a cursor $sort.
- assert.eq(getAggPlanStage(explain, "SORT"), null, explain);
- } else {
- // $sort can be pushed into the cursor layer.
- assert.neq(getAggPlanStage(explain, "SORT"), null, explain);
- }
-
- // At the bottom, there should be a IXSCAN.
- assert.neq(getAggPlanStage(explain, "IXSCAN"), null, explain);
-}
-
-function getGroupStage(accumulator, extraFields = []) {
+/**
+ * Returns a lastpoint $group stage of the form:
+ * {$group: {
+ * _id: "$tags.hostid",
+ * usage_user: {$first: "$usage_user"}, ...
+ * }}
+ */
+function getGroupStage(time, extraFields = []) {
+ const accumulator = time > 0 ? "$last" : "$first";
let innerGroup = {_id: "$tags.hostid"};
for (const f of extraFields.concat(["usage_user", "usage_guest", "usage_idle"])) {
innerGroup[f] = {[accumulator]: "$" + f};
@@ -225,140 +45,62 @@ function getGroupStage(accumulator, extraFields = []) {
return {$group: innerGroup};
}
-/**
- Test cases:
- 1. Lastpoint queries on indexes with descending time and $first (DISTINCT_SCAN).
- 2. Lastpoint queries on indexes with ascending time and $last (no DISTINCT_SCAN).
- 3. Lastpoint queries on indexes with ascending time and $last and an additional secondary
- index so that we can use the DISTINCT_SCAN optimization.
-*/
-function testAllTimeMetaDirections(tsColl, observerColl, getTestCases) {
- const testCases = [
- {time: -1, useBucketsIndex: false},
- {time: 1, useBucketsIndex: false},
- {time: 1, useBucketsIndex: true}
- ];
-
- for (const {time, useBucketsIndex} of testCases) {
- const isTimeDescending = time < 0;
- const canUseDistinct = isTimeDescending || useBucketsIndex;
- const accumulator = isTimeDescending ? "$first" : "$last";
- const groupStage = getGroupStage(accumulator);
-
- // Test both directions of the metaField sort for each direction of time.
- for (const metaDir of [1, -1]) {
- const index = {"tags.hostid": metaDir, time};
- const bucketsIndex = useBucketsIndex
- ? {"meta.hostid": metaDir, "control.max.time": 1, "control.min.time": 1}
- : undefined;
-
- const tests = getTestCases({
- canUseDistinct,
- canSortOnTimeUseDistinct: (metaDir > 0) && (isTimeDescending || useBucketsIndex),
- accumulator,
- groupStage,
- time,
- index,
- bucketsIndex,
- });
-
- // Run all tests without an index.
- for (const {pipeline, expectStageNoIndex, precedingFilter} of tests) {
- // Normally we expect to see a COLLSCAN with a SORT pushed into the cursor, but some
- // test-cases may override this.
- const expectStage = expectStageNoIndex || expectCollScan;
- verifyLastpoint({tsColl, observerColl, pipeline, precedingFilter, expectStage});
- }
-
- // Create index on the timeseries collection.
- const ixName = "tsIndex_time_" + time + "_meta_" + metaDir;
- tsColl.createIndex(index, {name: ixName});
-
- // Create an additional secondary index directly on the buckets collection so that we
- // can test the DISTINCT_SCAN optimization when time is sorted in ascending order.
- const bucketsColl = testDB["system.buckets.in"];
- const bucketsIxName = "bucketsIndex_time_" + time + "_meta_" + metaDir;
- if (bucketsIndex) {
- bucketsColl.createIndex(bucketsIndex, {name: bucketsIxName});
- }
-
- // Re-run all tests with an index.
- for (const {pipeline, expectStageWithIndex, precedingFilter} of tests) {
- verifyLastpoint({
- tsColl,
- observerColl,
- pipeline,
- precedingFilter,
- expectStage: expectStageWithIndex
- });
- }
-
- // Drop indexes for next test.
- tsColl.dropIndex(ixName);
- if (bucketsIndex) {
- bucketsColl.dropIndex(bucketsIxName);
- }
- }
- }
-
- // Drop collections at the end of the test.
- tsColl.drop();
- observerColl.drop();
-}
-
{
const [tsColl, observerColl] = createBoringCollections();
- testAllTimeMetaDirections(tsColl, observerColl, (t) => {
- const {time, canUseDistinct, canSortOnTimeUseDistinct, accumulator, groupStage, index} = t;
- const expectCollscanNoSort = ({explain}) => expectCollScan({explain, noSortInCursor: true});
- const getTestWithMatch = (matchStage, precedingFilter) => {
- return {
- precedingFilter,
- pipeline: [matchStage, {$sort: index}, groupStage],
- expectStageWithIndex: (canUseDistinct ? expectDistinctScan : expectIxscan),
+ testAllTimeMetaDirections(
+ tsColl, observerColl, ({time, canUseDistinct, canSortOnTimeUseDistinct, index}) => {
+ const groupStage = getGroupStage(time);
+ const expectCollscanNoSort = ({explain}) =>
+ expectCollScan({explain, noSortInCursor: true});
+ const getTestWithMatch = (matchStage, precedingFilter) => {
+ return {
+ precedingFilter,
+ pipeline: [matchStage, {$sort: index}, groupStage],
+ expectStageWithIndex: (canUseDistinct ? expectDistinctScan : expectIxscan),
+ };
};
- };
- return [
- // Test pipeline without a preceding $match stage with sort only on time.
- {
- pipeline: [{$sort: {time}}, groupStage],
- expectStageWithIndex:
- (canSortOnTimeUseDistinct ? expectDistinctScan : expectCollScan),
- },
-
- // Test pipeline without a preceding $match stage with a sort on the index.
- {
- pipeline: [{$sort: index}, groupStage],
- expectStageWithIndex: (canUseDistinct ? expectDistinctScan : expectCollScan),
- },
- // Test pipeline without a projection to ensure that we correctly evaluate
- // computedMetaProjFields in the rewrite. Note that we can't get a DISTINCT_SCAN here
- // due to the projection.
- {
- pipeline: [
- {$set: {abc: {$add: [1, "$tags.hostid"]}}},
- {$sort: index},
- getGroupStage(accumulator, ["abc"]),
- ],
- expectStageWithIndex: expectCollscanNoSort,
- expectStageNoIndex: expectCollscanNoSort,
- },
+ return [
+ // Test pipeline without a preceding $match stage with sort only on time.
+ {
+ pipeline: [{$sort: {time}}, groupStage],
+ expectStageWithIndex:
+ (canSortOnTimeUseDistinct ? expectDistinctScan : expectCollScan),
+ },
+
+ // Test pipeline without a preceding $match stage with a sort on the index.
+ {
+ pipeline: [{$sort: index}, groupStage],
+ expectStageWithIndex: (canUseDistinct ? expectDistinctScan : expectCollScan),
+ },
+
+ // Test pipeline with a projection to ensure that we correctly evaluate
+ // computedMetaProjFields in the rewrite. Note that we can't get a DISTINCT_SCAN
+ // here due to the projection.
+ {
+ pipeline: [
+ {$set: {abc: {$add: [1, "$tags.hostid"]}}},
+ {$sort: index},
+ getGroupStage(time, ["abc"]),
+ ],
+ expectStageWithIndex: expectCollscanNoSort,
+ expectStageNoIndex: expectCollscanNoSort,
+ },
- // Test pipeline with an equality $match stage.
- getTestWithMatch({$match: {"tags.hostid": 0}}, {"meta.hostid": {$eq: 0}}),
+ // Test pipeline with an equality $match stage.
+ getTestWithMatch({$match: {"tags.hostid": 0}}, {"meta.hostid": {$eq: 0}}),
- // Test pipeline with an inequality $match stage.
- getTestWithMatch({$match: {"tags.hostid": {$ne: 0}}},
- {"meta.hostid": {$not: {$eq: 0}}}),
+ // Test pipeline with an inequality $match stage.
+ getTestWithMatch({$match: {"tags.hostid": {$ne: 0}}},
+ {"meta.hostid": {$not: {$eq: 0}}}),
- // Test pipeline with a $match stage that uses a $gt query.
- getTestWithMatch({$match: {"tags.hostid": {$gt: 5}}}, {"meta.hostid": {$gt: 5}}),
+ // Test pipeline with a $match stage that uses a $gt query.
+ getTestWithMatch({$match: {"tags.hostid": {$gt: 5}}}, {"meta.hostid": {$gt: 5}}),
- // Test pipeline with a $match stage that uses a $lt query.
- getTestWithMatch({$match: {"tags.hostid": {$lt: 5}}}, {"meta.hostid": {$lt: 5}}),
- ];
- });
+ // Test pipeline with a $match stage that uses a $lt query.
+ getTestWithMatch({$match: {"tags.hostid": {$lt: 5}}}, {"meta.hostid": {$lt: 5}}),
+ ];
+ });
}
// Test pipeline without a preceding $match stage which has an extra idle measurement. This verifies
@@ -368,8 +110,8 @@ function testAllTimeMetaDirections(tsColl, observerColl, getTestCases) {
testAllTimeMetaDirections(
tsColl,
observerColl,
- ({canUseDistinct, groupStage, index}) => [{
- pipeline: [{$sort: index}, groupStage],
+ ({canUseDistinct, index, time}) => [{
+ pipeline: [{$sort: index}, getGroupStage(time)],
expectStageWithIndex: (canUseDistinct ? expectDistinctScan : expectCollScan),
}]);
}
@@ -383,23 +125,25 @@ function testAllTimeMetaDirections(tsColl, observerColl, getTestCases) {
const mapToEquivalentIdStage = getMapInterestingValuesToEquivalentsStage();
testAllTimeMetaDirections(
- tsColl,
- observerColl,
- ({canUseDistinct, canSortOnTimeUseDistinct, groupStage, time, index}) => [
- // Test pipeline with sort only on time and interesting metaField values.
- {
- pipeline: [{$sort: {time}}, groupStage, mapToEquivalentIdStage],
- // We get an index scan here because the index on interesting values is multikey.
- expectStageWithIndex:
- (canSortOnTimeUseDistinct ? expectIxscanNoSort : expectCollScan),
- },
- // Test pipeline without a preceding $match stage and interesting metaField values.
- {
- pipeline: [{$sort: index}, groupStage, mapToEquivalentIdStage],
- // We get an index scan here because the index on interesting values is multikey, so
- // we cannot have a DISTINCT_SCAN.
- expectStageWithIndex: (canUseDistinct ? expectIxscanNoSort : expectCollScan),
- },
- ]);
+ tsColl, observerColl, ({canUseDistinct, canSortOnTimeUseDistinct, time, index}) => {
+ const groupStage = getGroupStage(time);
+ return [
+ // Test pipeline with sort only on time and interesting metaField values.
+ {
+ pipeline: [{$sort: {time}}, groupStage, mapToEquivalentIdStage],
+ // We get an index scan here because the index on interesting values is
+ // multikey.
+ expectStageWithIndex:
+ (canSortOnTimeUseDistinct ? expectIxscanNoSort : expectCollScan),
+ },
+ // Test pipeline without a preceding $match stage and interesting metaField values.
+ {
+ pipeline: [{$sort: index}, groupStage, mapToEquivalentIdStage],
+ // We get an index scan here because the index on interesting values is
+ // multikey, so we cannot have a DISTINCT_SCAN.
+ expectStageWithIndex: (canUseDistinct ? expectIxscanNoSort : expectCollScan),
+ }
+ ];
+ });
}
})();
diff --git a/jstests/core/timeseries/timeseries_lastpoint_top.js b/jstests/core/timeseries/timeseries_lastpoint_top.js
new file mode 100644
index 00000000000..2efd352b574
--- /dev/null
+++ b/jstests/core/timeseries/timeseries_lastpoint_top.js
@@ -0,0 +1,165 @@
+/**
+ * Tests the optimization of "lastpoint"-type queries on time-series collections.
+ *
+ * @tags: [
+ * does_not_support_stepdowns,
+ * does_not_support_transactions,
+ * requires_timeseries,
+ * requires_pipeline_optimization,
+ * requires_fcv_53,
+ * # TODO (SERVER-63590): Investigate presence of getmore tag in timeseries jstests.
+ * requires_getmore,
+ * # Explain of a resolved view must be executed by mongos.
+ * directly_against_shardsvrs_incompatible,
+ * ]
+ */
+(function() {
+"use strict";
+
+load("jstests/aggregation/extras/utils.js");
+load("jstests/core/timeseries/libs/timeseries_agg_helpers.js");
+load("jstests/core/timeseries/libs/timeseries_lastpoint_helpers.js");
+load("jstests/libs/analyze_plan.js");
+load("jstests/libs/feature_flag_util.js");
+
+const testDB = TimeseriesAggTests.getTestDb();
+assert.commandWorked(testDB.dropDatabase());
+
+// Do not run the rest of the tests if the lastpoint optimization is disabled.
+if (!FeatureFlagUtil.isEnabled(db, "LastPointQuery")) {
+ return;
+}
+
+/**
+ * Returns a lastpoint $group stage of the form:
+ * {$group: {
+ * _id: "$tags.hostid",
+ * mostRecent: {$topN: {
+ * n: 1, sortBy, output: {usage_user: "$usage_user", ...}
+ * }}
+ * }}
+ */
+function getGroupStage({time, sortBy, n, extraFields = []}) {
+ let output = {};
+ for (const f of extraFields.concat(["usage_user", "usage_guest", "usage_idle"])) {
+ output[f] = {[f]: "$" + f};
+ }
+
+ const accumulator = ((time < 0) ? "$top" : "$bottom");
+ const mostRecent =
+ n ? {[accumulator + "N"]: {sortBy, output, n}} : {[accumulator]: {sortBy, output}};
+ return {$group: {_id: "$tags.hostid", mostRecent}};
+}
+
+{
+ const [tsColl, observerColl] = createBoringCollections();
+ testAllTimeMetaDirections(
+ tsColl, observerColl, ({time, index, canUseDistinct, canSortOnTimeUseDistinct}) => {
+ const expectCollscanNoSort = ({explain}) =>
+ expectCollScan({explain, noSortInCursor: true});
+
+ // Try both $top/$bottom and $topN/$bottomN variations of the rewrite.
+ return [1, undefined].flatMap(n => {
+ const groupStage = getGroupStage({time, sortBy: index, n});
+ const getTestWithMatch = (matchStage, precedingFilter) => {
+ return {
+ precedingFilter,
+ pipeline: [matchStage, groupStage],
+ expectStageWithIndex: (canUseDistinct ? expectDistinctScan : expectIxscan),
+ };
+ };
+
+ return [
+ // Test pipeline without a preceding $match stage with sort only on time.
+ {
+ pipeline: [getGroupStage({time, sortBy: {time}, n})],
+ expectStageWithIndex:
+ (canSortOnTimeUseDistinct ? expectDistinctScan : expectCollScan),
+ },
+
+ // Test pipeline without a preceding $match stage with a sort on the index.
+ {
+ pipeline: [groupStage],
+ expectStageWithIndex:
+ (canUseDistinct ? expectDistinctScan : expectCollScan),
+ },
+
+ // Test pipeline with a projection to ensure that we correctly evaluate
+ // computedMetaProjFields in the rewrite. Note that we can't get a DISTINCT_SCAN
+ // here due to the projection.
+ {
+ pipeline: [
+ {$set: {abc: {$add: [1, "$tags.hostid"]}}},
+ getGroupStage({time, sortBy: index, n, extraFields: ["abc"]}),
+ ],
+ expectStageWithIndex: expectCollscanNoSort,
+ expectStageNoIndex: expectCollscanNoSort,
+ },
+
+ // Test pipeline with an equality $match stage.
+ getTestWithMatch({$match: {"tags.hostid": 0}}, {"meta.hostid": {$eq: 0}}),
+
+ // Test pipeline with an inequality $match stage.
+ getTestWithMatch({$match: {"tags.hostid": {$ne: 0}}},
+ {"meta.hostid": {$not: {$eq: 0}}}),
+
+ // Test pipeline with a $match stage that uses a $gt query.
+ getTestWithMatch({$match: {"tags.hostid": {$gt: 5}}},
+ {"meta.hostid": {$gt: 5}}),
+
+ // Test pipeline with a $match stage that uses a $lt query.
+ getTestWithMatch({$match: {"tags.hostid": {$lt: 5}}},
+ {"meta.hostid": {$lt: 5}}),
+ ];
+ });
+ });
+}
+
+// Test pipeline without a preceding $match stage which has an extra idle measurement. This verifies
+// that the query rewrite correctly returns missing fields.
+{
+ const [tsColl, observerColl] = createBoringCollections(true /* includeIdleMeasurements */);
+ testAllTimeMetaDirections(
+ tsColl, observerColl, ({canUseDistinct, time, index}) => [1, undefined].map(n => {
+ return {
+ pipeline: [getGroupStage({time, sortBy: index, n})],
+ expectStageWithIndex: (canUseDistinct ? expectDistinctScan : expectCollScan),
+ };
+ }));
+}
+
+// Test interesting metaField values.
+{
+ const [tsColl, observerColl] = createInterestingCollections();
+ const expectIxscanNoSort = ({explain}) => expectIxscan({explain, noSortInCursor: true});
+
+ // Verifies that the '_id' of each group matches one of the equivalent '_id' values.
+ const mapToEquivalentIdStage = getMapInterestingValuesToEquivalentsStage();
+
+ testAllTimeMetaDirections(tsColl, observerColl, ({
+ canUseDistinct,
+ canSortOnTimeUseDistinct,
+ time,
+ index
+ }) => {
+ return [1, undefined].flatMap(
+ n => [
+ // Test pipeline with sort only on time and interesting metaField values.
+ {
+ pipeline: [getGroupStage({time, sortBy: {time}, n}), mapToEquivalentIdStage],
+ // We get an index scan here because the index on interesting values is
+ // multikey.
+ expectStageWithIndex:
+ (canSortOnTimeUseDistinct ? expectIxscanNoSort : expectCollScan),
+ },
+ // Test pipeline without a preceding $match stage and interesting metaField values.
+ {
+ pipeline: [getGroupStage({time, sortBy: index, n}), mapToEquivalentIdStage],
+ // We get an index scan here because the index on interesting values is
+ // multikey, so we cannot have a DISTINCT_SCAN.
+ expectStageWithIndex: (canUseDistinct ? expectIxscanNoSort : expectCollScan),
+ },
+ ]);
+ });
+}
+})();
diff --git a/src/mongo/db/pipeline/accumulator_multi.cpp b/src/mongo/db/pipeline/accumulator_multi.cpp
index db945bff6ec..2f6971e1f22 100644
--- a/src/mongo/db/pipeline/accumulator_multi.cpp
+++ b/src/mongo/db/pipeline/accumulator_multi.cpp
@@ -740,7 +740,7 @@ Value AccumulatorTopBottomN<sense, single>::getValueConst(bool toBeMerged) const
std::vector<Value> result;
auto begin = _map->begin();
auto end = _map->end();
- if constexpr (sense == kBottom) {
+ if constexpr (sense == TopBottomSense::kBottom) {
// If this accumulator is removable there may be more than n elements in the map, so we must
// skip elements that shouldn't be in the result.
if (static_cast<long long>(_map->size()) > *_n) {
diff --git a/src/mongo/db/pipeline/accumulator_multi.h b/src/mongo/db/pipeline/accumulator_multi.h
index 554123924cc..13e7971ee0c 100644
--- a/src/mongo/db/pipeline/accumulator_multi.h
+++ b/src/mongo/db/pipeline/accumulator_multi.h
@@ -46,6 +46,8 @@ namespace mongo {
*/
class AccumulatorN : public AccumulatorState {
public:
+ enum AccumulatorType { kMinN, kMaxN, kFirstN, kLastN, kTopN, kTop, kBottomN, kBottom };
+
static constexpr auto kFieldNameN = "n"_sd;
static constexpr auto kFieldNameInput = "input"_sd;
@@ -64,6 +66,8 @@ public:
AccumulatorN(ExpressionContext* expCtx);
+ virtual AccumulatorType getAccumulatorType() const = 0;
+
/**
* Verifies that 'input' is a positive integer.
*/
@@ -159,6 +163,10 @@ public:
static const char* getName();
+ AccumulatorType getAccumulatorType() const {
+ return AccumulatorType::kMinN;
+ }
+
static boost::intrusive_ptr<AccumulatorState> create(ExpressionContext* expCtx);
};
@@ -170,6 +178,10 @@ public:
static const char* getName();
+ AccumulatorType getAccumulatorType() const override {
+ return AccumulatorType::kMaxN;
+ }
+
static boost::intrusive_ptr<AccumulatorState> create(ExpressionContext* expCtx);
};
@@ -233,6 +245,10 @@ public:
static const char* getName();
+ AccumulatorType getAccumulatorType() const override {
+ return AccumulatorType::kFirstN;
+ }
+
static boost::intrusive_ptr<AccumulatorState> create(ExpressionContext* expCtx);
};
@@ -244,6 +260,10 @@ public:
static const char* getName();
+ AccumulatorType getAccumulatorType() const override {
+ return AccumulatorType::kLastN;
+ }
+
static boost::intrusive_ptr<AccumulatorState> create(ExpressionContext* expCtx);
};
@@ -315,6 +335,26 @@ public:
*/
void remove(const Value& val);
+ const SortPattern getSortPattern() const {
+ return _sortPattern;
+ }
+
+ AccumulatorType getAccumulatorType() const override {
+ if constexpr (single) {
+ if constexpr (sense == TopBottomSense::kTop) {
+ return AccumulatorType::kTop;
+ } else {
+ return AccumulatorType::kBottom;
+ }
+ } else {
+ if constexpr (sense == TopBottomSense::kTop) {
+ return AccumulatorType::kTopN;
+ } else {
+ return AccumulatorType::kBottomN;
+ }
+ }
+ }
+
private:
// top/bottom/topN/bottomN do NOT ignore null values, but MISSING values will be promoted to
// null so the users see them.
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
index 5e5ca839d43..4aef3bc0c2d 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/matcher/expression_geo.h"
#include "mongo/db/matcher/expression_internal_bucket_geo_within.h"
#include "mongo/db/matcher/expression_internal_expr_comparison.h"
+#include "mongo/db/pipeline/accumulator_multi.h"
#include "mongo/db/pipeline/document_source_add_fields.h"
#include "mongo/db/pipeline/document_source_geo_near.h"
#include "mongo/db/pipeline/document_source_group.h"
@@ -737,15 +738,142 @@ bool DocumentSourceInternalUnpackBucket::haveComputedMetaField() const {
_bucketUnpacker.bucketSpec().metaField().get());
}
+template <TopBottomSense sense, bool single>
+bool extractFromAcc(const AccumulatorN* acc,
+ const boost::intrusive_ptr<Expression>& init,
+ boost::optional<BSONObj>& outputAccumulator,
+ boost::optional<BSONObj>& outputSortPattern) {
+ // If this accumulator will not return a single document then we cannot rewrite this query to
+ // use a $sort + a $group with $first or $last.
+ if constexpr (!single) {
+ // We may have a $topN or a $bottomN with n = 1; in this case, we may still be able to
+ // perform the lastpoint rewrite.
+ if (auto constInit = dynamic_cast<ExpressionConstant*>(init.get()); constInit) {
+ // Since this is a $const expression, the input to evaluate() should not matter.
+ auto constVal = constInit->evaluate(Document(), nullptr);
+ if (!constVal.numeric() || (constVal.coerceToLong() != 1)) {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ // Retrieve sort pattern for an equivalent $sort.
+ const auto multiAc = dynamic_cast<const AccumulatorTopBottomN<sense, single>*>(acc);
+ invariant(multiAc);
+ outputSortPattern = multiAc->getSortPattern()
+ .serialize(SortPattern::SortKeySerialization::kForPipelineSerialization)
+ .toBson();
+
+ // Retrieve equivalent accumulator statement using $first/$last for retrieving the entire
+ // document.
+ constexpr auto accumulator =
+ (sense == TopBottomSense::kTop) ? AccumulatorFirst::kName : AccumulatorLast::kName;
+ // Note: we don't need to preserve what the $top/$bottom accumulator outputs here. We only need
+ // a $group stage with the appropriate accumulator that retrieves the bucket in some way. For
+ // the rewrite we preserve the original group and insert an $group that returns all the data in
+ // the $first bucket selected for each _id.
+ outputAccumulator = BSON("bucket" << BSON(accumulator << "$$ROOT"));
+
+ return true;
+}
+
+bool extractFromAccIfTopBottomN(const AccumulatorN* multiAcc,
+ const boost::intrusive_ptr<Expression>& init,
+ boost::optional<BSONObj>& outputAccumulator,
+ boost::optional<BSONObj>& outputSortPattern) {
+ const auto accType = multiAcc->getAccumulatorType();
+ if (accType == AccumulatorN::kTopN) {
+ return extractFromAcc<TopBottomSense::kTop, false>(
+ multiAcc, init, outputAccumulator, outputSortPattern);
+ } else if (accType == AccumulatorN::kTop) {
+ return extractFromAcc<TopBottomSense::kTop, true>(
+ multiAcc, init, outputAccumulator, outputSortPattern);
+ } else if (accType == AccumulatorN::kBottomN) {
+ return extractFromAcc<TopBottomSense::kBottom, false>(
+ multiAcc, init, outputAccumulator, outputSortPattern);
+ } else if (accType == AccumulatorN::kBottom) {
+ return extractFromAcc<TopBottomSense::kBottom, true>(
+ multiAcc, init, outputAccumulator, outputSortPattern);
+ }
+ // This isn't a topN/bottomN/top/bottom accumulator.
+ return false;
+}
+
+std::pair<boost::intrusive_ptr<DocumentSourceSort>, boost::intrusive_ptr<DocumentSourceGroup>>
+tryRewriteGroupAsSortGroup(boost::intrusive_ptr<ExpressionContext> expCtx,
+ Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container,
+ DocumentSourceGroup* groupStage) {
+ const auto accumulators = groupStage->getAccumulatedFields();
+ if (accumulators.size() != 1) {
+ // If we have multiple accumulators, we fail to optimize for a lastpoint query.
+ return {nullptr, nullptr};
+ }
+
+ const auto init = accumulators[0].expr.initializer;
+ const auto accState = accumulators[0].makeAccumulator();
+ const AccumulatorN* multiAcc = dynamic_cast<const AccumulatorN*>(accState.get());
+ if (!multiAcc) {
+ return {nullptr, nullptr};
+ }
+
+ boost::optional<BSONObj> maybeAcc;
+ boost::optional<BSONObj> maybeSortPattern;
+ if (!extractFromAccIfTopBottomN(multiAcc, init, maybeAcc, maybeSortPattern)) {
+ // This isn't a topN/bottomN/top/bottom accumulator or N != 1.
+ return {nullptr, nullptr};
+ }
+
+ tassert(6165600,
+ "sort pattern and accumulator must be initialized if cast of $top or $bottom succeeds",
+ maybeSortPattern && maybeAcc);
+
+ auto newSortStage = DocumentSourceSort::create(expCtx, SortPattern(*maybeSortPattern, expCtx));
+ auto newAccState = AccumulationStatement::parseAccumulationStatement(
+ expCtx.get(), maybeAcc->firstElement(), expCtx->variablesParseState);
+ auto newGroupStage =
+ DocumentSourceGroup::create(expCtx, groupStage->getIdExpression(), {newAccState});
+ return {newSortStage, newGroupStage};
+}
+
+
bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceContainer::iterator itr,
Pipeline::SourceContainer* container) {
- // A lastpoint-type aggregation must contain both a $sort and a $group stage, in that order.
- if (std::next(itr, 2) == container->end()) {
+ // A lastpoint-type aggregation must contain both a $sort and a $group stage, in that order, or
+ // only a $group stage with a $top, $topN, $bottom, or $bottomN accumulator. This means we need
+ // at least one stage after $_internalUnpackBucket.
+ if (std::next(itr) == container->end()) {
return false;
}
- auto sortStage = dynamic_cast<DocumentSourceSort*>(std::next(itr)->get());
- auto groupStage = dynamic_cast<DocumentSourceGroup*>(std::next(itr, 2)->get());
+ // If we only have one stage after $_internalUnpackBucket, it must be a $group for the
+ // lastpoint rewrite to happen.
+ DocumentSourceSort* sortStage = nullptr;
+ auto groupStage = dynamic_cast<DocumentSourceGroup*>(std::next(itr)->get());
+
+ // If we don't have a $sort + $group lastpoint query, we will need to replace the $group with
+ // equivalent $sort + $group stages for the rewrite.
+ boost::intrusive_ptr<DocumentSourceSort> sortStagePtr;
+ boost::intrusive_ptr<DocumentSourceGroup> groupStagePtr;
+ if (!groupStage && (std::next(itr, 2) != container->end())) {
+ // If the first stage is not a $group, we may have a $sort + $group lastpoint query.
+ sortStage = dynamic_cast<DocumentSourceSort*>(std::next(itr)->get());
+ groupStage = dynamic_cast<DocumentSourceGroup*>(std::next(itr, 2)->get());
+ } else if (groupStage) {
+ // Try to rewrite the $group to a $sort+$group-style lastpoint query before proceeding with
+ // the optimization.
+ std::tie(sortStagePtr, groupStagePtr) =
+ tryRewriteGroupAsSortGroup(pExpCtx, itr, container, groupStage);
+
+ // Both these stages should be discarded once we exit this function; either because the
+ // rewrite failed validation checks, or because we created updated versions of these stages
+ // in 'tryInsertBucketLevelSortAndGroup' below (which will be inserted into the pipeline).
+ // The intrusive_ptrs above handle this deletion gracefully.
+ sortStage = sortStagePtr.get();
+ groupStage = groupStagePtr.get();
+ }
if (!sortStage || !groupStage) {
return false;
@@ -833,9 +961,16 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta
if (!groupOnlyUsesTargetAccum(accum) || !isSortValidForGroup(accum)) {
return false;
}
+
bool flipSort = (accum == AccumulatorDocumentsNeeded::kLastDocument);
auto newSort = createMetadataSortForReorder(*sortStage, timeField, newFieldPath, flipSort);
auto newGroup = createBucketGroupForReorder(pExpCtx, fieldsToInclude, newFieldPath);
+
+ // Note that we don't erase any of the original stages for this rewrite. This allows us to
+ // preserve the particular semantics of the original group (e.g. $top behaves differently
+ // than topN with n = 1, $group accumulators have a special way of dealing with nulls, etc.)
+ // without constructing a specialized projection to exactly match what the original query
+ // would have returned.
container->insert(itr, newSort);
container->insert(itr, newGroup);
return true;
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp
index bb2f219126b..375ee698ec1 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp
@@ -71,60 +71,115 @@ void assertExpectedLastpointOpt(const boost::intrusive_ptr<ExpressionContext> ex
TEST_F(InternalUnpackBucketOptimizeLastpointTest, NonLastpointDoesNotParticipateInOptimization) {
RAIIServerParameterControllerForTest controller("featureFlagLastPointQuery", true);
- auto assertPipelineUnoptimized = [&](const std::string& unpackStr,
- const std::string& sortStr,
- const std::string& groupStr) {
- std::vector stageStrs{unpackStr, sortStr, groupStr};
+ auto assertPipelineUnoptimized = [&](const std::vector<std::string>& stageStrs) {
assertExpectedLastpointOpt(getExpCtx(), stageStrs, stageStrs, /* expectedSuccess */ false);
};
// $sort must contain a time field.
assertPipelineUnoptimized(
- "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'm', "
- "bucketMaxSpanSeconds: 60}}",
- "{$sort: {'m.a': 1}}",
- "{$group: {_id: '$m.a', b: {$first: '$b'}, c: {$first: '$c'}}}");
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'm', "
+ "bucketMaxSpanSeconds: 60}}",
+ "{$sort: {'m.a': 1}}",
+ "{$group: {_id: '$m.a', b: {$first: '$b'}, c: {$first: '$c'}}}"});
+
+ assertPipelineUnoptimized(
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'm', "
+ "bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.a', lastpoint: {$top: {output: {b: '$b', c: '$c'}, sortBy: {'m.a': "
+ "1}}}}}"});
// $sort must have the time field as the last field in the sort key pattern.
assertPipelineUnoptimized(
- "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'm', "
- "bucketMaxSpanSeconds: 60}}",
- "{$sort: {t: -1, 'm.a': 1}}",
- "{$group: {_id: '$m.a', b: {$first: '$b'}, c: {$first: '$c'}}}");
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'm', "
+ "bucketMaxSpanSeconds: 60}}",
+ "{$sort: {t: -1, 'm.a': 1}}",
+ "{$group: {_id: '$m.a', b: {$first: '$b'}, c: {$first: '$c'}}}"});
+
+ assertPipelineUnoptimized(
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'm', "
+ "bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.a', lastpoint: {$top: {output: {b: '$b', c: '$c'}, sortBy: {t: -1, "
+ "'m.a': 1}}}}}"});
// $group's _id must be a meta field.
assertPipelineUnoptimized(
- "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'm', "
- "bucketMaxSpanSeconds: 60}}",
- "{$sort: {'m.a': 1, t: -1}}",
- "{$group: {_id: '$nonMeta', b: {$first: '$b'}, c: {$first: '$c'}}}");
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'm', "
+ "bucketMaxSpanSeconds: 60}}",
+ "{$sort: {'m.a': 1, t: -1}}",
+ "{$group: {_id: '$nonMeta', b: {$first: '$b'}, c: {$first: '$c'}}}"});
+
+ assertPipelineUnoptimized(
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'm', "
+ "bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$nonMeta', lastpoint: {$top: {output: {b: '$b', c: '$c'}, sortBy: "
+ "{'m.a': 1, t: -1}}}}}"});
- // $group can only contain $first or $last accumulators.
+ // $group can only contain $first or $last accumulators or one $top/$bottom accumulator.
assertPipelineUnoptimized(
- "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'm', "
- "bucketMaxSpanSeconds: 60}}",
- "{$sort: {'m.a': 1, t: -1}}",
- "{$group: {_id: '$m.a', b: {$first: '$b'}, c: {$last: '$c'}}}");
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'm', "
+ "bucketMaxSpanSeconds: 60}}",
+ "{$sort: {'m.a': 1, t: -1}}",
+ "{$group: {_id: '$m.a', b: {$first: '$b'}, c: {$last: '$c'}}}"});
+
+ assertPipelineUnoptimized(
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'm', "
+ "bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$nonMeta', lastpoint1: {$top: {output: {b: '$b', c: '$c'}, sortBy: "
+ "{'m.a': 1, t: -1}}}}}, lastpoint2: {$bottom: {output: {b: '$b', c: '$c'}, sortBy: "
+ "{'m.a': 1, t: 1}}}}}"});
// We disallow the rewrite for firstpoint queries due to rounding behaviour on control.min.time.
assertPipelineUnoptimized(
- "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
- "'m', bucketMaxSpanSeconds: 60}}",
- "{$sort: {'m.a': -1, t: 1}}",
- "{$group: {_id: '$m.a', b: {$first: '$b'}, c: {$first: '$c'}}}");
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
+ "'m', bucketMaxSpanSeconds: 60}}",
+ "{$sort: {'m.a': -1, t: 1}}",
+ "{$group: {_id: '$m.a', b: {$first: '$b'}, c: {$first: '$c'}}}"});
+
+ assertPipelineUnoptimized(
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
+ "'m', bucketMaxSpanSeconds: 60}}",
+ "{$sort: {'m.a': -1, t: -1}}",
+ "{$group: {_id: '$m.a', b: {$last: '$b'}, c: {$last: '$c'}}}"});
+
+ assertPipelineUnoptimized(
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'm', "
+ "bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.a', lastpoint: {$top: {output: {b: '$b', c: '$c'}, sortBy: {'m.a': 1, "
+ "t: 1}}}}}"});
assertPipelineUnoptimized(
- "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
- "'m', bucketMaxSpanSeconds: 60}}",
- "{$sort: {'m.a': -1, t: -1}}",
- "{$group: {_id: '$m.a', b: {$last: '$b'}, c: {$last: '$c'}}}");
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'm', "
+ "bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.a', lastpoint: {$bottom: {output: {b: '$b', c: '$c'}, sortBy: {'m.a': "
+ "1, t: -1}}}}}"});
// The _id field in $group's must match the meta field in $sort.
assertPipelineUnoptimized(
- "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'm', "
- "bucketMaxSpanSeconds: 60}}",
- "{$sort: {'m.a': -1, t: -1}}",
- "{$group: {_id: '$m.z', b: {$first: '$b'}, c: {$first: '$c'}}}");
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'm', "
+ "bucketMaxSpanSeconds: 60}}",
+ "{$sort: {'m.a': -1, t: -1}}",
+ "{$group: {_id: '$m.z', b: {$first: '$b'}, c: {$first: '$c'}}}"});
+
+ assertPipelineUnoptimized(
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'm', "
+ "bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.z', lastpoint: {$top: {output: {b: '$b', c: '$c'}, sortBy: {'m.a': 1, "
+ "t: -1}}}}}"});
+
+ // We cannot optimize for $topN or $bottomN with n != 1.
+ assertPipelineUnoptimized(
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'm', "
+ "bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.a', lastpoint: {$topN: {n: {$const: 2}, output: {b: '$b', c: '$c'}, "
+ "sortBy: "
+ "{'m.a': 1, t: -1}}}}}"});
+
+ assertPipelineUnoptimized(
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'm', "
+ "bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.a', lastpoint: {$bottomN: {n: {$const: 2}, output: {b: '$b', c: "
+ "'$c'}, sortBy: "
+ "{'m.a': 1, t: 1}}}}}"});
}
TEST_F(InternalUnpackBucketOptimizeLastpointTest,
@@ -144,6 +199,35 @@ TEST_F(InternalUnpackBucketOptimizeLastpointTest,
"'m', bucketMaxSpanSeconds: 60}}",
"{$sort: {'m.a': 1, t: -1}}",
"{$group: {_id: '$m.a', b: {$first: '$b'}, c: {$first: '$c'}}}"});
+ assertExpectedLastpointOpt(getExpCtx(),
+ /* inputPipelineStrs */
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
+ "'m', bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.a', lastpoint: {$top: {output: {b: '$b', c: "
+ "'$c'}, sortBy: {'m.a': 1, t: -1}}}}}"},
+ /* expectedPipelineStrs */
+ {"{$sort: {'meta.a': 1, 'control.max.t': -1, 'control.min.t': -1}}",
+ "{$group: {_id: '$meta.a', meta: {$first: '$meta'}, control: "
+ "{$first: '$control'}, data: {$first: '$data'}}}",
+ "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
+ "'m', bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.a', lastpoint: {$top: {output: {b: '$b', c: "
+ "'$c'}, sortBy: {'m.a': 1, t: -1}}}}}"});
+ assertExpectedLastpointOpt(
+ getExpCtx(),
+ /* inputPipelineStrs */
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
+ "'m', bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.a', lastpoint: {$topN: {n: {$const: 1}, output: {b: '$b', c: "
+ "'$c'}, sortBy: {'m.a': 1, t: -1}}}}}"},
+ /* expectedPipelineStrs */
+ {"{$sort: {'meta.a': 1, 'control.max.t': -1, 'control.min.t': -1}}",
+ "{$group: {_id: '$meta.a', meta: {$first: '$meta'}, control: "
+ "{$first: '$control'}, data: {$first: '$data'}}}",
+ "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
+ "'m', bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.a', lastpoint: {$topN: {n: {$const: 1}, output: {b: '$b', c: '$c'}, "
+ "sortBy: {'m.a': 1, t: -1}}}}}"});
}
TEST_F(InternalUnpackBucketOptimizeLastpointTest,
@@ -163,6 +247,35 @@ TEST_F(InternalUnpackBucketOptimizeLastpointTest,
"'m', bucketMaxSpanSeconds: 60}}",
"{$sort: {'m.a': -1, t: -1}}",
"{$group: {_id: '$m.a', b: {$first: '$b'}, c: {$first: '$c'}}}"});
+ assertExpectedLastpointOpt(getExpCtx(),
+ /* inputPipelineStrs */
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
+ "'m', bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.a', lastpoint: {$top: {output: {b: '$b', c: "
+ "'$c'}, sortBy: {'m.a': -1, t: -1}}}}}"},
+ /* expectedPipelineStrs */
+ {"{$sort: {'meta.a': -1, 'control.max.t': -1, 'control.min.t': -1}}",
+ "{$group: {_id: '$meta.a', meta: {$first: '$meta'}, control: "
+ "{$first: '$control'}, data: {$first: '$data'}}}",
+ "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
+ "'m', bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.a', lastpoint: {$top: {output: {b: '$b', c: "
+ "'$c'}, sortBy: {'m.a': -1, t: -1}}}}}"});
+ assertExpectedLastpointOpt(
+ getExpCtx(),
+ /* inputPipelineStrs */
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
+ "'m', bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.a', lastpoint: {$topN: {n: {$const: 1}, output: {b: '$b', c: "
+ "'$c'}, sortBy: {'m.a': -1, t: -1}}}}}"},
+ /* expectedPipelineStrs */
+ {"{$sort: {'meta.a': -1, 'control.max.t': -1, 'control.min.t': -1}}",
+ "{$group: {_id: '$meta.a', meta: {$first: '$meta'}, control: "
+ "{$first: '$control'}, data: {$first: '$data'}}}",
+ "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
+ "'m', bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.a', lastpoint: {$topN: {n: {$const: 1}, output: {b: '$b', c: "
+ "'$c'}, sortBy: {'m.a': -1, t: -1}}}}}"});
}
TEST_F(InternalUnpackBucketOptimizeLastpointTest, LastpointWithMetaSubfieldAscendingTimeAscending) {
@@ -181,6 +294,35 @@ TEST_F(InternalUnpackBucketOptimizeLastpointTest, LastpointWithMetaSubfieldAscen
"'m', bucketMaxSpanSeconds: 60}}",
"{$sort: {'m.a': 1, t: 1}}",
"{$group: {_id: '$m.a', b: {$last: '$b'}, c: {$last: '$c'}}}"});
+ assertExpectedLastpointOpt(getExpCtx(),
+ /* inputPipelineStrs */
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
+ "'m', bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.a', lastpoint: {$bottom: {output: {b: '$b', c: "
+ "'$c'}, sortBy: {'m.a': 1, t: 1}}}}}"},
+ /* expectedPipelineStrs */
+ {"{$sort: {'meta.a': -1, 'control.max.t': -1, 'control.min.t': -1}}",
+ "{$group: {_id: '$meta.a', meta: {$first: '$meta'}, control: "
+ "{$first: '$control'}, data: {$first: '$data'}}}",
+ "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
+ "'m', bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.a', lastpoint: {$bottom: {output: {b: '$b', c: "
+ "'$c'}, sortBy: {'m.a': 1, t: 1}}}}}"});
+ assertExpectedLastpointOpt(
+ getExpCtx(),
+ /* inputPipelineStrs */
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
+ "'m', bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.a', lastpoint: {$bottomN: {n: {$const: 1}, output: {b: '$b', c: "
+ "'$c'}, sortBy: {'m.a': 1, t: 1}}}}}"},
+ /* expectedPipelineStrs */
+ {"{$sort: {'meta.a': -1, 'control.max.t': -1, 'control.min.t': -1}}",
+ "{$group: {_id: '$meta.a', meta: {$first: '$meta'}, control: "
+ "{$first: '$control'}, data: {$first: '$data'}}}",
+ "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
+ "'m', bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.a', lastpoint: {$bottomN: {n: {$const: 1}, output: {b: '$b', c: "
+ "'$c'}, sortBy: {'m.a': 1, t: 1}}}}}"});
}
TEST_F(InternalUnpackBucketOptimizeLastpointTest,
@@ -200,6 +342,34 @@ TEST_F(InternalUnpackBucketOptimizeLastpointTest,
"'m', bucketMaxSpanSeconds: 60}}",
"{$sort: {'m.a': -1, t: 1}}",
"{$group: {_id: '$m.a', b: {$last: '$b'}, c: {$last: '$c'}}}"});
+ assertExpectedLastpointOpt(getExpCtx(),
+ /* inputPipelineStrs */
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
+ "'m', bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.a', lastpoint: {$bottom: {output: {b: '$b', c: "
+ "'$c'}, sortBy: {'m.a': -1, t: 1}}}}}"},
+ /* expectedPipelineStrs */
+ {"{$sort: {'meta.a': 1, 'control.max.t': -1, 'control.min.t': -1}}",
+ "{$group: {_id: '$meta.a', meta: {$first: '$meta'}, control: "
+ "{$first: '$control'}, data: {$first: '$data'}}}",
+ "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
+ "'m', bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.a', lastpoint: {$bottom: {output: {b: '$b', c: "
+ "'$c'}, sortBy: {'m.a': -1, t: 1}}}}}"});
+ assertExpectedLastpointOpt(getExpCtx(),
+ /* inputPipelineStrs */
+ {"{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
+ "'m', bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.a', lastpoint: {$bottomN: {n: {$const: 1}, "
+ "output: {b: '$b', c: '$c'}, sortBy: {'m.a': -1, t: 1}}}}}"},
+ /* expectedPipelineStrs */
+ {"{$sort: {'meta.a': 1, 'control.max.t': -1, 'control.min.t': -1}}",
+ "{$group: {_id: '$meta.a', meta: {$first: '$meta'}, control: "
+ "{$first: '$control'}, data: {$first: '$data'}}}",
+ "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
+ "'m', bucketMaxSpanSeconds: 60}}",
+ "{$group: {_id: '$m.a', lastpoint: {$bottomN: {n: {$const: 1}, "
+ "output: {b: '$b', c: '$c'}, sortBy: {'m.a': -1, t: 1}}}}}"});
}
TEST_F(InternalUnpackBucketOptimizeLastpointTest, LastpointWithComputedMetaProjectionFields) {