diff options
author | Alya Berciu <alyacarina@gmail.com> | 2022-03-15 11:43:45 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-15 12:46:03 +0000 |
commit | 811dc428a93105d0aa9091ca20e4dfb120e4e57d (patch) | |
tree | 5b5e8cfd639349d52d9cf1807c96b09b12765e5b | |
parent | e09abbd7289641317d9e213204fb79731655e004 (diff) | |
download | mongo-811dc428a93105d0aa9091ca20e4dfb120e4e57d.tar.gz |
SERVER-64235 Implement timeseries lastpoint rewrite for ascending time
4 files changed, 339 insertions, 99 deletions
diff --git a/jstests/core/timeseries/libs/timeseries_agg_helpers.js b/jstests/core/timeseries/libs/timeseries_agg_helpers.js index a2ae1ffcca6..a2a2f74393f 100644 --- a/jstests/core/timeseries/libs/timeseries_agg_helpers.js +++ b/jstests/core/timeseries/libs/timeseries_agg_helpers.js @@ -20,7 +20,7 @@ var TimeseriesAggTests = class { * @returns An array of a time-series collection and a non time-series collection, * respectively in this order. */ - static prepareInputCollections(numHosts, numIterations) { + static prepareInputCollections(numHosts, numIterations, includeIdleMeasurements = true) { const timeseriesCollOption = {timeseries: {timeField: "time", metaField: "tags"}}; Random.setRandomSeed(); @@ -60,7 +60,7 @@ var TimeseriesAggTests = class { assert.commandWorked(inColl.insert(newMeasurement)); assert.commandWorked(observerInColl.insert(newMeasurement)); - if (i % 2) { + if (includeIdleMeasurements && (i % 2)) { let idleMeasurement = { tags: host.tags, time: new Date(currTime + i), diff --git a/jstests/core/timeseries/timeseries_lastpoint.js b/jstests/core/timeseries/timeseries_lastpoint.js index 05344c194da..3be28645c7b 100644 --- a/jstests/core/timeseries/timeseries_lastpoint.js +++ b/jstests/core/timeseries/timeseries_lastpoint.js @@ -29,75 +29,173 @@ if (!isLastpointEnabled) { return; } -function verifyTsResults(pipeline, index, precedingFilter) { - // Prepare collections. - const numHosts = 10; - const numIterations = 20; - const [tsColl, observerColl] = - TimeseriesAggTests.prepareInputCollections(numHosts, numIterations); - if (index) { - tsColl.createIndex(index); +// Timeseries test parameters. +const numHosts = 10; +const numIterations = 20; + +function verifyTsResults({pipeline, precedingFilter, expectStage, prepareTest}) { + // Prepare collections. Note: we test without idle measurements (all meta subfields are + // non-null). If we allow the insertion of idle measurements, we will obtain multiple lastpoints + // per bucket, and may have different results on the observer and timeseries collections. + const [tsColl, observerColl] = TimeseriesAggTests.prepareInputCollections( + numHosts, numIterations, false /* includeIdleMeasurements */); + + // Additional preparation before running the test. + if (prepareTest) { + prepareTest(tsColl, observerColl); } // Verify lastpoint optmization. const explain = tsColl.explain().aggregate(pipeline); - if (index) { - // The query can utilize DISTINCT_SCAN. - assert.neq(getAggPlanStage(explain, "DISTINCT_SCAN"), null, explain); - - // Pipelines that use the DISTINCT_SCAN optimization should not also have a blocking sort. - assert.eq(getAggPlanStage(explain, "SORT"), null, explain); - } else { - // $sort can be pushed into the cursor layer. - assert.neq(getAggPlanStage(explain, "SORT"), null, explain); - - // At the bottom, there should be a COLLSCAN. - const collScanStage = getAggPlanStage(explain, "COLLSCAN"); - assert.neq(collScanStage, null, explain); - if (precedingFilter) { - assert.eq(precedingFilter, collScanStage.filter, collScanStage); - } - } + expectStage({explain, precedingFilter}); // Assert that the time-series aggregation results match that of the observer collection. - const expectedResults = observerColl.aggregate(pipeline).toArray(); - const actualResults = tsColl.aggregate(pipeline).toArray(); - assert(resultsEq(actualResults, expectedResults), - `Expected ${tojson(expectedResults)} but got ${tojson(actualResults)}`); -} + const expected = observerColl.aggregate(pipeline).toArray(); + const actual = tsColl.aggregate(pipeline).toArray(); + assertArrayEq({actual, expected}); -function verifyTsResultsWithAndWithoutIndex(pipeline, index, precedingFilter) { - verifyTsResults(pipeline, undefined, precedingFilter); - verifyTsResults(pipeline, index, precedingFilter); + // Drop collections. + tsColl.drop(); + observerColl.drop(); } -verifyTsResultsWithAndWithoutIndex( - [ - {$sort: {"tags.hostid": 1, time: -1}}, - { - $group: { - _id: "$tags.hostid", - usage_user: {$first: "$usage_user"}, - usage_guest: {$first: "$usage_guest"}, - usage_idle: {$first: "$usage_idle"} +function verifyTsResultsWithAndWithoutIndex( + {pipeline, index, bucketsIndex, precedingFilter, expectStage, prePrepareTest}) { + verifyTsResults( + {pipeline, precedingFilter, expectStage: expectCollScan, prepareTest: prePrepareTest}); + verifyTsResults({ + pipeline, + precedingFilter, + expectStage, + prepareTest: (testColl, observerColl) => { + // Optionally do extra test preparation. + if (prePrepareTest) { + prePrepareTest(testColl, observerColl); } + + // Create index on the timeseries collection. + testColl.createIndex(index); + + // Create an additional secondary index directly on the buckets collection so that we + // can test the DISTINCT_SCAN optimization when time is sorted in ascending order. + if (bucketsIndex) { + const bucketsColl = testDB["system.buckets.in"]; + bucketsColl.createIndex(bucketsIndex); + } + } + }); +} + +function expectDistinctScan({explain}) { + // The query can utilize DISTINCT_SCAN. + assert.neq(getAggPlanStage(explain, "DISTINCT_SCAN"), null, explain); + + // Pipelines that use the DISTINCT_SCAN optimization should not also have a blocking sort. + assert.eq(getAggPlanStage(explain, "SORT"), null, explain); +} + +function expectCollScan({explain, precedingFilter}) { + // $sort can be pushed into the cursor layer. + assert.neq(getAggPlanStage(explain, "SORT"), null, explain); + + // At the bottom, there should be a COLLSCAN. + const collScanStage = getAggPlanStage(explain, "COLLSCAN"); + assert.neq(collScanStage, null, explain); + if (precedingFilter) { + assert.eq(precedingFilter, collScanStage.filter, collScanStage); + } +} + +function expectIxscan({explain}) { + // $sort can be pushed into the cursor layer. + assert.neq(getAggPlanStage(explain, "SORT"), null, explain); + + // At the bottom, there should be a IXSCAN. + assert.neq(getAggPlanStage(explain, "IXSCAN"), null, explain); +} + +function getGroupStage(accumulator) { + return { + $group: { + _id: "$tags.hostid", + usage_user: {[accumulator]: "$usage_user"}, + usage_guest: {[accumulator]: "$usage_guest"}, + usage_idle: {[accumulator]: "$usage_idle"} } - ], - {"tags.hostid": 1, time: -1}); - -verifyTsResultsWithAndWithoutIndex( - [ - {$match: {"tags.hostid": "host_0"}}, - {$sort: {"tags.hostid": 1, time: -1}}, - { - $group: { - _id: "$tags.hostid", - usage_user: {$first: "$usage_user"}, - usage_guest: {$first: "$usage_guest"}, - usage_idle: {$first: "$usage_idle"} + }; +} + +/** + Test cases: + 1. Lastpoint queries on indexes with descending time and $first (DISTINCT_SCAN). + 2. Lastpoint queries on indexes with ascending time and $last (no DISTINCT_SCAN). + 3. Lastpoint queries on indexes with ascending time and $last and an additional secondary + index so that we can use the DISTINCT_SCAN optimization. +*/ +const testCases = [ + {time: -1}, + {time: 1}, + {time: -1, bucketsIndex: {"meta.hostid": -1, "control.max.time": 1, "control.min.time": 1}} +]; + +for (const {time, bucketsIndex} of testCases) { + const isTimeDescending = time < 0; + const canUseDistinct = isTimeDescending || bucketsIndex; + const groupStage = isTimeDescending ? getGroupStage("$first") : getGroupStage("$last"); + + // Test both directions of the metaField sort for each direction of time. + for (const index of [{"tags.hostid": 1, time}, {"tags.hostid": -1, time}]) { + // Test pipeline without a preceding $match stage. + verifyTsResultsWithAndWithoutIndex({ + pipeline: [{$sort: index}, groupStage], + index, + bucketsIndex, + expectStage: (canUseDistinct ? expectDistinctScan : expectCollScan) + }); + + // Test pipeline without a preceding $match stage which has an extra idle measurement. This + // verifies that the query rewrite correctly returns missing fields. + verifyTsResultsWithAndWithoutIndex({ + pipeline: [{$sort: index}, groupStage], + index, + bucketsIndex, + expectStage: (canUseDistinct ? expectDistinctScan : expectCollScan), + prePrepareTest: (testColl, observerColl) => { + const currTime = new Date(); + for (const host of TimeseriesTest.generateHosts(numHosts)) { + const idleMeasurement = { + tags: host.tags, + time: new Date(currTime + numIterations), // Ensure this is the lastpoint. + idle_user: 100 - TimeseriesTest.getRandomUsage() + }; + assert.commandWorked(testColl.insert(idleMeasurement)); + assert.commandWorked(observerColl.insert(idleMeasurement)); + } } + }); + + // Test pipeline with a preceding $match stage. + function testWithMatch(matchStage, precedingFilter) { + verifyTsResultsWithAndWithoutIndex({ + pipeline: [matchStage, {$sort: index}, groupStage], + index, + bucketsIndex, + expectStage: canUseDistinct ? expectDistinctScan : expectIxscan, + precedingFilter + }); } - ], - {"tags.hostid": 1, time: -1}, - {"meta.hostid": {$eq: "host_0"}}); + + // Test pipeline with an equality $match stage. + testWithMatch({$match: {"tags.hostid": 0}}, {"meta.hostid": {$eq: 0}}); + + // Test pipeline with an inequality $match stage. + testWithMatch({$match: {"tags.hostid": {$ne: 0}}}, {"meta.hostid": {$not: {$eq: 0}}}); + + // Test pipeline with a $match stage that uses a $gt query. + testWithMatch({$match: {"tags.hostid": {$gt: 5}}}, {"meta.hostid": {$gt: 5}}); + + // Test pipeline with a $match stage that uses a $lt query. + testWithMatch({$match: {"tags.hostid": {$lt: 5}}}, {"meta.hostid": {$lt: 5}}); + } +} })(); diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp index 5040fdc2445..c1608ee0142 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp @@ -58,6 +58,7 @@ #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" +#include "mongo/db/query/query_planner_common.h" #include "mongo/db/query/util/make_data_structure.h" #include "mongo/db/timeseries/timeseries_constants.h" #include "mongo/db/timeseries/timeseries_options.h" @@ -162,17 +163,30 @@ bool checkMetadataSortReorder( */ boost::intrusive_ptr<DocumentSourceSort> createMetadataSortForReorder( const DocumentSourceSort& sort, - const boost::optional<std::string&> lastpointTimeField = boost::none) { + const boost::optional<std::string&> lastpointTimeField = boost::none, + bool flipSort = false) { + auto sortPattern = flipSort + ? SortPattern( + QueryPlannerCommon::reverseSortObj( + sort.getSortKeyPattern() + .serialize(SortPattern::SortKeySerialization::kForPipelineSerialization) + .toBson()), + sort.getContext()) + : sort.getSortKeyPattern(); std::vector<SortPattern::SortPatternPart> updatedPattern; - for (const auto& entry : sort.getSortKeyPattern()) { + for (const auto& entry : sortPattern) { updatedPattern.push_back(entry); if (lastpointTimeField && entry.fieldPath->fullPath() == lastpointTimeField.get()) { updatedPattern.back().fieldPath = - FieldPath(timeseries::kControlMaxFieldNamePrefix + lastpointTimeField.get()); + FieldPath((entry.isAscending ? timeseries::kControlMinFieldNamePrefix + : timeseries::kControlMaxFieldNamePrefix) + + lastpointTimeField.get()); updatedPattern.push_back(SortPattern::SortPatternPart{ entry.isAscending, - FieldPath(timeseries::kControlMinFieldNamePrefix + lastpointTimeField.get()), + FieldPath((entry.isAscending ? timeseries::kControlMaxFieldNamePrefix + : timeseries::kControlMinFieldNamePrefix) + + lastpointTimeField.get()), nullptr}); } else { auto updated = FieldPath(timeseries::kBucketMetaFieldName); @@ -192,6 +206,16 @@ boost::intrusive_ptr<DocumentSourceSort> createMetadataSortForReorder( sort.getContext(), SortPattern{updatedPattern}, 0, maxMemoryUsageBytes); } +boost::intrusive_ptr<DocumentSourceGroup> createGroupForReorder( + const boost::intrusive_ptr<ExpressionContext>& expCtx, FieldPath& fieldPath) { + auto elem = BSON("bucket" << BSON(AccumulatorFirst::kName << "$_id")).firstElement(); + auto newAccum = AccumulationStatement::parseAccumulationStatement( + expCtx.get(), elem, expCtx->variablesParseState); + auto groupByExpr = ExpressionFieldPath::createPathFromString( + expCtx.get(), fieldPath.fullPath(), expCtx->variablesParseState); + return DocumentSourceGroup::create(expCtx, groupByExpr, {newAccum}); +} + // Optimize the section of the pipeline before the $_internalUnpackBucket stage. void optimizePrefix(Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { auto prefix = Pipeline::SourceContainer(container->begin(), itr); @@ -709,13 +733,11 @@ void addStagesToRetrieveEventLevelFields(Pipeline::SourceContainer& sources, << BSON(DocumentSourceLimit::kStageName << 1)))) .firstElement(), expCtx); - sources.insert(unpackIt, lookup); auto unwind = DocumentSourceUnwind::createFromBson( BSON(DocumentSourceUnwind::kStageName << metrics.fullPathWithPrefix()).firstElement(), expCtx); - sources.insert(unpackIt, unwind); BSONObjBuilder fields; @@ -725,14 +747,17 @@ void addStagesToRetrieveEventLevelFields(Pipeline::SourceContainer& sources, auto&& v = accumulator.expr.argument; if (auto expr = dynamic_cast<ExpressionFieldPath*>(v.get())) { auto&& newPath = metrics.concat(expr->getFieldPath().tail()); - fields << StringData{accumulator.fieldName} << "$" + newPath.fullPath(); + // This is necessary to preserve $first, $last null-check semantics for handling + // nullish fields, e.g. returning missing field paths as null. + auto ifNullCheck = BSON( + "$ifNull" << BSONArray(BSON("0" << ("$" + newPath.fullPath()) << "1" << BSONNULL))); + fields << StringData{accumulator.fieldName} << ifNullCheck; } } - auto replaceWith = DocumentSourceReplaceRoot::createFromBson( - BSON(DocumentSourceReplaceRoot::kAliasNameReplaceWith << fields.obj()).firstElement(), - expCtx); - + auto replaceWithBson = BSON(DocumentSourceReplaceRoot::kAliasNameReplaceWith << fields.obj()); + auto replaceWith = + DocumentSourceReplaceRoot::createFromBson(replaceWithBson.firstElement(), expCtx); sources.insert(unpackIt, replaceWith); } @@ -750,27 +775,23 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta return false; } - // Attempt to create a new bucket-level $sort. if (sortStage->hasLimit()) { // This $sort stage was previously followed by a $limit stage. return false; } auto spec = _bucketUnpacker.bucketSpec(); - auto metaField = spec.metaField(); + auto maybeMetaField = spec.metaField(); auto timeField = spec.timeField(); - - if (!metaField || haveComputedMetaField()) { + if (!maybeMetaField || haveComputedMetaField()) { return false; } - if (!checkMetadataSortReorder(sortStage->getSortKeyPattern(), metaField.get(), timeField)) { + auto metaField = maybeMetaField.get(); + if (!checkMetadataSortReorder(sortStage->getSortKeyPattern(), metaField, timeField)) { return false; } - auto newSort = createMetadataSortForReorder(*sortStage, timeField); - - // Attempt to create a new bucket-level $group. auto groupIdFields = groupStage->getIdFields(); if (groupIdFields.size() != 1) { return false; @@ -782,7 +803,7 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta } const auto fieldPath = groupId->getFieldPath(); - if (fieldPath.getPathLength() <= 1 || fieldPath.tail().getFieldName(0) != metaField.get()) { + if (fieldPath.getPathLength() <= 1 || fieldPath.tail().getFieldName(0) != metaField) { return false; } @@ -790,25 +811,33 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta if (fieldPath.tail().getPathLength() > 1) { newFieldPath = newFieldPath.concat(fieldPath.tail().tail()); } - auto groupByExpr = ExpressionFieldPath::createPathFromString( - pExpCtx.get(), newFieldPath.fullPath(), pExpCtx->variablesParseState); - for (auto&& accumulator : groupStage->getAccumulatedFields()) { - if (AccumulatorDocumentsNeeded::kFirstDocument != - accumulator.makeAccumulator()->documentsNeeded()) { - return false; - } - } - auto newAccum = - AccumulationStatement::parseAccumulationStatement(pExpCtx.get(), - BSON("bucket" << BSON("$first" - << "$_id")) - .firstElement(), - pExpCtx->variablesParseState); - auto newGroup = DocumentSourceGroup::create(pExpCtx, groupByExpr, {newAccum}); + // Insert bucket-level $sort and $group stages before we unpack any buckets. + boost::intrusive_ptr<DocumentSourceSort> newSort; + auto insertBucketLevelSortAndGroup = [&](bool flipSort) { + newSort = createMetadataSortForReorder(*sortStage, timeField, flipSort); + auto newGroup = createGroupForReorder(pExpCtx, newFieldPath); + container->insert(itr, newSort); + container->insert(itr, newGroup); + }; - container->insert(itr, newSort); - container->insert(itr, newGroup); + auto accumulators = groupStage->getAccumulatedFields(); + auto groupOnlyUsesTargetAccum = [&](AccumulatorDocumentsNeeded targetAccum) { + return std::all_of(accumulators.begin(), accumulators.end(), [&](auto&& accum) { + return targetAccum == accum.makeAccumulator()->documentsNeeded(); + }); + }; + + std::string newTimeField; + if (groupOnlyUsesTargetAccum(AccumulatorDocumentsNeeded::kFirstDocument)) { + insertBucketLevelSortAndGroup(false); + newTimeField = timeseries::kControlMinFieldNamePrefix + timeField; + } else if (groupOnlyUsesTargetAccum(AccumulatorDocumentsNeeded::kLastDocument)) { + insertBucketLevelSortAndGroup(true); + newTimeField = timeseries::kControlMaxFieldNamePrefix + timeField; + } else { + return false; + } // Add $lookup, $unwind and $replaceWith stages. addStagesToRetrieveEventLevelFields( @@ -817,7 +846,7 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta pExpCtx, groupStage, timeField, - isLastpointSortTimeAscending(sortStage->getSortKeyPattern(), timeField)); + isLastpointSortTimeAscending(newSort->getSortKeyPattern(), newTimeField)); // Remove the $sort, $group and $_internalUnpackBucket stages. tassert(6165401, diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp index 323602418ac..642852bed40 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp @@ -172,7 +172,120 @@ TEST_F(InternalUnpackBucketOptimizeLastpointTest, ASSERT_BSONOBJ_EQ(serialized[3], fromjson("{$unwind: {path: '$metrics'}}")); ASSERT_BSONOBJ_EQ( serialized[4], - fromjson("{$replaceRoot: {newRoot: {_id: '$_id', b: '$metrics.b', c: '$metrics.c'}}}")); + fromjson("{$replaceRoot: {newRoot: {_id: '$_id', b: {$ifNull: ['$metrics.b', {$const: " + "null}]}, c: {$ifNull: ['$metrics.c', {$const: null}]}}}}")); +} + +TEST_F(InternalUnpackBucketOptimizeLastpointTest, + LastpointWithMetaSubfieldDescendingTimeDescending) { + RAIIServerParameterControllerForTest controller("featureFlagLastPointQuery", true); + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: " + "'tags', bucketMaxSpanSeconds: 3600}}"), + fromjson("{$sort: {'tags.a': -1, t: -1}}"), + fromjson("{$group: {_id: '$tags.a', b: {$first: '$b'}, c: {$first: '$c'}}}")), + getExpCtx()); + auto& container = pipeline->getSources(); + + ASSERT_EQ(container.size(), 3U); + + auto success = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.front().get()) + ->optimizeLastpoint(container.begin(), &container); + ASSERT_TRUE(success); + + auto serialized = pipeline->serializeToBson(); + + ASSERT_EQ(serialized.size(), 5u); + ASSERT_BSONOBJ_EQ( + serialized[0], + fromjson("{$sort: {'meta.a': -1, 'control.max.t': -1, 'control.min.t': -1}}")); + ASSERT_BSONOBJ_EQ(serialized[1], + fromjson("{$group: {_id: '$meta.a', bucket: {$first: '$_id'}}}")); + ASSERT_BSONOBJ_EQ( + serialized[2], + fromjson( + "{$lookup: {from: 'pipeline_test', as: 'metrics', localField: 'bucket', foreignField: " + "'_id', let: {}, pipeline: [{$_internalUnpackBucket: {exclude: [], timeField: 't', " + "metaField: 'tags', bucketMaxSpanSeconds: 3600}}, {$sort: {t: -1}}, {$limit: 1}]}}")); + ASSERT_BSONOBJ_EQ(serialized[3], fromjson("{$unwind: {path: '$metrics'}}")); + ASSERT_BSONOBJ_EQ( + serialized[4], + fromjson("{$replaceRoot: {newRoot: {_id: '$_id', b: {$ifNull: ['$metrics.b', {$const: " + "null}]}, c: {$ifNull: ['$metrics.c', {$const: null}]}}}}")); +} + +TEST_F(InternalUnpackBucketOptimizeLastpointTest, LastpointWithMetaSubfieldAscendingTimeAscending) { + RAIIServerParameterControllerForTest controller("featureFlagLastPointQuery", true); + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: " + "'tags', bucketMaxSpanSeconds: 3600}}"), + fromjson("{$sort: {'tags.a': 1, t: 1}}"), + fromjson("{$group: {_id: '$tags.a', b: {$last: '$b'}, c: {$last: '$c'}}}")), + getExpCtx()); + auto& container = pipeline->getSources(); + + ASSERT_EQ(container.size(), 3U); + + auto success = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.front().get()) + ->optimizeLastpoint(container.begin(), &container); + ASSERT_TRUE(success); + + auto serialized = pipeline->serializeToBson(); + + ASSERT_EQ(serialized.size(), 5u); + ASSERT_BSONOBJ_EQ( + serialized[0], + fromjson("{$sort: {'meta.a': -1, 'control.max.t': -1, 'control.min.t': -1}}")); + ASSERT_BSONOBJ_EQ(serialized[1], + fromjson("{$group: {_id: '$meta.a', bucket: {$first: '$_id'}}}")); + ASSERT_BSONOBJ_EQ( + serialized[2], + fromjson( + "{$lookup: {from: 'pipeline_test', as: 'metrics', localField: 'bucket', foreignField: " + "'_id', let: {}, pipeline: [{$_internalUnpackBucket: {exclude: [], timeField: 't', " + "metaField: 'tags', bucketMaxSpanSeconds: 3600}}, {$sort: {t: -1}}, {$limit: 1}]}}")); + ASSERT_BSONOBJ_EQ(serialized[3], fromjson("{$unwind: {path: '$metrics'}}")); + ASSERT_BSONOBJ_EQ( + serialized[4], + fromjson("{$replaceRoot: {newRoot: {_id: '$_id', b: {$ifNull: ['$metrics.b', {$const: " + "null}]}, c: {$ifNull: ['$metrics.c', {$const: null}]}}}}")); +} + +TEST_F(InternalUnpackBucketOptimizeLastpointTest, + LastpointWithMetaSubfieldDescendingTimeAscending) { + RAIIServerParameterControllerForTest controller("featureFlagLastPointQuery", true); + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: " + "'tags', bucketMaxSpanSeconds: 3600}}"), + fromjson("{$sort: {'tags.a': -1, t: 1}}"), + fromjson("{$group: {_id: '$tags.a', b: {$last: '$b'}, c: {$last: '$c'}}}")), + getExpCtx()); + auto& container = pipeline->getSources(); + + ASSERT_EQ(container.size(), 3U); + + auto success = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.front().get()) + ->optimizeLastpoint(container.begin(), &container); + ASSERT_TRUE(success); + + auto serialized = pipeline->serializeToBson(); + + ASSERT_EQ(serialized.size(), 5u); + ASSERT_BSONOBJ_EQ(serialized[0], + fromjson("{$sort: {'meta.a': 1, 'control.max.t': -1, 'control.min.t': -1}}")); + ASSERT_BSONOBJ_EQ(serialized[1], + fromjson("{$group: {_id: '$meta.a', bucket: {$first: '$_id'}}}")); + ASSERT_BSONOBJ_EQ( + serialized[2], + fromjson( + "{$lookup: {from: 'pipeline_test', as: 'metrics', localField: 'bucket', foreignField: " + "'_id', let: {}, pipeline: [{$_internalUnpackBucket: {exclude: [], timeField: 't', " + "metaField: 'tags', bucketMaxSpanSeconds: 3600}}, {$sort: {t: -1}}, {$limit: 1}]}}")); + ASSERT_BSONOBJ_EQ(serialized[3], fromjson("{$unwind: {path: '$metrics'}}")); + ASSERT_BSONOBJ_EQ( + serialized[4], + fromjson("{$replaceRoot: {newRoot: {_id: '$_id', b: {$ifNull: ['$metrics.b', {$const: " + "null}]}, c: {$ifNull: ['$metrics.c', {$const: null}]}}}}")); } } // namespace |