diff options
4 files changed, 99 insertions, 339 deletions
diff --git a/jstests/core/timeseries/libs/timeseries_agg_helpers.js b/jstests/core/timeseries/libs/timeseries_agg_helpers.js index a2a2f74393f..a2ae1ffcca6 100644 --- a/jstests/core/timeseries/libs/timeseries_agg_helpers.js +++ b/jstests/core/timeseries/libs/timeseries_agg_helpers.js @@ -20,7 +20,7 @@ var TimeseriesAggTests = class { * @returns An array of a time-series collection and a non time-series collection, * respectively in this order. */ - static prepareInputCollections(numHosts, numIterations, includeIdleMeasurements = true) { + static prepareInputCollections(numHosts, numIterations) { const timeseriesCollOption = {timeseries: {timeField: "time", metaField: "tags"}}; Random.setRandomSeed(); @@ -60,7 +60,7 @@ var TimeseriesAggTests = class { assert.commandWorked(inColl.insert(newMeasurement)); assert.commandWorked(observerInColl.insert(newMeasurement)); - if (includeIdleMeasurements && (i % 2)) { + if (i % 2) { let idleMeasurement = { tags: host.tags, time: new Date(currTime + i), diff --git a/jstests/core/timeseries/timeseries_lastpoint.js b/jstests/core/timeseries/timeseries_lastpoint.js index 3be28645c7b..05344c194da 100644 --- a/jstests/core/timeseries/timeseries_lastpoint.js +++ b/jstests/core/timeseries/timeseries_lastpoint.js @@ -29,173 +29,75 @@ if (!isLastpointEnabled) { return; } -// Timeseries test parameters. -const numHosts = 10; -const numIterations = 20; - -function verifyTsResults({pipeline, precedingFilter, expectStage, prepareTest}) { - // Prepare collections. Note: we test without idle measurements (all meta subfields are - // non-null). If we allow the insertion of idle measurements, we will obtain multiple lastpoints - // per bucket, and may have different results on the observer and timeseries collections. - const [tsColl, observerColl] = TimeseriesAggTests.prepareInputCollections( - numHosts, numIterations, false /* includeIdleMeasurements */); - - // Additional preparation before running the test. - if (prepareTest) { - prepareTest(tsColl, observerColl); +function verifyTsResults(pipeline, index, precedingFilter) { + // Prepare collections. + const numHosts = 10; + const numIterations = 20; + const [tsColl, observerColl] = + TimeseriesAggTests.prepareInputCollections(numHosts, numIterations); + if (index) { + tsColl.createIndex(index); } // Verify lastpoint optmization. const explain = tsColl.explain().aggregate(pipeline); - expectStage({explain, precedingFilter}); - - // Assert that the time-series aggregation results match that of the observer collection. - const expected = observerColl.aggregate(pipeline).toArray(); - const actual = tsColl.aggregate(pipeline).toArray(); - assertArrayEq({actual, expected}); - - // Drop collections. - tsColl.drop(); - observerColl.drop(); -} - -function verifyTsResultsWithAndWithoutIndex( - {pipeline, index, bucketsIndex, precedingFilter, expectStage, prePrepareTest}) { - verifyTsResults( - {pipeline, precedingFilter, expectStage: expectCollScan, prepareTest: prePrepareTest}); - verifyTsResults({ - pipeline, - precedingFilter, - expectStage, - prepareTest: (testColl, observerColl) => { - // Optionally do extra test preparation. - if (prePrepareTest) { - prePrepareTest(testColl, observerColl); - } - - // Create index on the timeseries collection. - testColl.createIndex(index); - - // Create an additional secondary index directly on the buckets collection so that we - // can test the DISTINCT_SCAN optimization when time is sorted in ascending order. - if (bucketsIndex) { - const bucketsColl = testDB["system.buckets.in"]; - bucketsColl.createIndex(bucketsIndex); - } + if (index) { + // The query can utilize DISTINCT_SCAN. + assert.neq(getAggPlanStage(explain, "DISTINCT_SCAN"), null, explain); + + // Pipelines that use the DISTINCT_SCAN optimization should not also have a blocking sort. + assert.eq(getAggPlanStage(explain, "SORT"), null, explain); + } else { + // $sort can be pushed into the cursor layer. + assert.neq(getAggPlanStage(explain, "SORT"), null, explain); + + // At the bottom, there should be a COLLSCAN. + const collScanStage = getAggPlanStage(explain, "COLLSCAN"); + assert.neq(collScanStage, null, explain); + if (precedingFilter) { + assert.eq(precedingFilter, collScanStage.filter, collScanStage); } - }); -} - -function expectDistinctScan({explain}) { - // The query can utilize DISTINCT_SCAN. - assert.neq(getAggPlanStage(explain, "DISTINCT_SCAN"), null, explain); - - // Pipelines that use the DISTINCT_SCAN optimization should not also have a blocking sort. - assert.eq(getAggPlanStage(explain, "SORT"), null, explain); -} - -function expectCollScan({explain, precedingFilter}) { - // $sort can be pushed into the cursor layer. - assert.neq(getAggPlanStage(explain, "SORT"), null, explain); - - // At the bottom, there should be a COLLSCAN. - const collScanStage = getAggPlanStage(explain, "COLLSCAN"); - assert.neq(collScanStage, null, explain); - if (precedingFilter) { - assert.eq(precedingFilter, collScanStage.filter, collScanStage); } -} - -function expectIxscan({explain}) { - // $sort can be pushed into the cursor layer. - assert.neq(getAggPlanStage(explain, "SORT"), null, explain); - // At the bottom, there should be a IXSCAN. - assert.neq(getAggPlanStage(explain, "IXSCAN"), null, explain); + // Assert that the time-series aggregation results match that of the observer collection. + const expectedResults = observerColl.aggregate(pipeline).toArray(); + const actualResults = tsColl.aggregate(pipeline).toArray(); + assert(resultsEq(actualResults, expectedResults), + `Expected ${tojson(expectedResults)} but got ${tojson(actualResults)}`); } -function getGroupStage(accumulator) { - return { - $group: { - _id: "$tags.hostid", - usage_user: {[accumulator]: "$usage_user"}, - usage_guest: {[accumulator]: "$usage_guest"}, - usage_idle: {[accumulator]: "$usage_idle"} - } - }; +function verifyTsResultsWithAndWithoutIndex(pipeline, index, precedingFilter) { + verifyTsResults(pipeline, undefined, precedingFilter); + verifyTsResults(pipeline, index, precedingFilter); } -/** - Test cases: - 1. Lastpoint queries on indexes with descending time and $first (DISTINCT_SCAN). - 2. Lastpoint queries on indexes with ascending time and $last (no DISTINCT_SCAN). - 3. Lastpoint queries on indexes with ascending time and $last and an additional secondary - index so that we can use the DISTINCT_SCAN optimization. -*/ -const testCases = [ - {time: -1}, - {time: 1}, - {time: -1, bucketsIndex: {"meta.hostid": -1, "control.max.time": 1, "control.min.time": 1}} -]; - -for (const {time, bucketsIndex} of testCases) { - const isTimeDescending = time < 0; - const canUseDistinct = isTimeDescending || bucketsIndex; - const groupStage = isTimeDescending ? getGroupStage("$first") : getGroupStage("$last"); - - // Test both directions of the metaField sort for each direction of time. - for (const index of [{"tags.hostid": 1, time}, {"tags.hostid": -1, time}]) { - // Test pipeline without a preceding $match stage. - verifyTsResultsWithAndWithoutIndex({ - pipeline: [{$sort: index}, groupStage], - index, - bucketsIndex, - expectStage: (canUseDistinct ? expectDistinctScan : expectCollScan) - }); - - // Test pipeline without a preceding $match stage which has an extra idle measurement. This - // verifies that the query rewrite correctly returns missing fields. - verifyTsResultsWithAndWithoutIndex({ - pipeline: [{$sort: index}, groupStage], - index, - bucketsIndex, - expectStage: (canUseDistinct ? expectDistinctScan : expectCollScan), - prePrepareTest: (testColl, observerColl) => { - const currTime = new Date(); - for (const host of TimeseriesTest.generateHosts(numHosts)) { - const idleMeasurement = { - tags: host.tags, - time: new Date(currTime + numIterations), // Ensure this is the lastpoint. - idle_user: 100 - TimeseriesTest.getRandomUsage() - }; - assert.commandWorked(testColl.insert(idleMeasurement)); - assert.commandWorked(observerColl.insert(idleMeasurement)); - } +verifyTsResultsWithAndWithoutIndex( + [ + {$sort: {"tags.hostid": 1, time: -1}}, + { + $group: { + _id: "$tags.hostid", + usage_user: {$first: "$usage_user"}, + usage_guest: {$first: "$usage_guest"}, + usage_idle: {$first: "$usage_idle"} } - }); - - // Test pipeline with a preceding $match stage. - function testWithMatch(matchStage, precedingFilter) { - verifyTsResultsWithAndWithoutIndex({ - pipeline: [matchStage, {$sort: index}, groupStage], - index, - bucketsIndex, - expectStage: canUseDistinct ? expectDistinctScan : expectIxscan, - precedingFilter - }); } - - // Test pipeline with an equality $match stage. - testWithMatch({$match: {"tags.hostid": 0}}, {"meta.hostid": {$eq: 0}}); - - // Test pipeline with an inequality $match stage. - testWithMatch({$match: {"tags.hostid": {$ne: 0}}}, {"meta.hostid": {$not: {$eq: 0}}}); - - // Test pipeline with a $match stage that uses a $gt query. - testWithMatch({$match: {"tags.hostid": {$gt: 5}}}, {"meta.hostid": {$gt: 5}}); - - // Test pipeline with a $match stage that uses a $lt query. - testWithMatch({$match: {"tags.hostid": {$lt: 5}}}, {"meta.hostid": {$lt: 5}}); - } -} + ], + {"tags.hostid": 1, time: -1}); + +verifyTsResultsWithAndWithoutIndex( + [ + {$match: {"tags.hostid": "host_0"}}, + {$sort: {"tags.hostid": 1, time: -1}}, + { + $group: { + _id: "$tags.hostid", + usage_user: {$first: "$usage_user"}, + usage_guest: {$first: "$usage_guest"}, + usage_idle: {$first: "$usage_idle"} + } + } + ], + {"tags.hostid": 1, time: -1}, + {"meta.hostid": {$eq: "host_0"}}); })(); diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp index c1608ee0142..5040fdc2445 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp @@ -58,7 +58,6 @@ #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" -#include "mongo/db/query/query_planner_common.h" #include "mongo/db/query/util/make_data_structure.h" #include "mongo/db/timeseries/timeseries_constants.h" #include "mongo/db/timeseries/timeseries_options.h" @@ -163,30 +162,17 @@ bool checkMetadataSortReorder( */ boost::intrusive_ptr<DocumentSourceSort> createMetadataSortForReorder( const DocumentSourceSort& sort, - const boost::optional<std::string&> lastpointTimeField = boost::none, - bool flipSort = false) { - auto sortPattern = flipSort - ? SortPattern( - QueryPlannerCommon::reverseSortObj( - sort.getSortKeyPattern() - .serialize(SortPattern::SortKeySerialization::kForPipelineSerialization) - .toBson()), - sort.getContext()) - : sort.getSortKeyPattern(); + const boost::optional<std::string&> lastpointTimeField = boost::none) { std::vector<SortPattern::SortPatternPart> updatedPattern; - for (const auto& entry : sortPattern) { + for (const auto& entry : sort.getSortKeyPattern()) { updatedPattern.push_back(entry); if (lastpointTimeField && entry.fieldPath->fullPath() == lastpointTimeField.get()) { updatedPattern.back().fieldPath = - FieldPath((entry.isAscending ? timeseries::kControlMinFieldNamePrefix - : timeseries::kControlMaxFieldNamePrefix) + - lastpointTimeField.get()); + FieldPath(timeseries::kControlMaxFieldNamePrefix + lastpointTimeField.get()); updatedPattern.push_back(SortPattern::SortPatternPart{ entry.isAscending, - FieldPath((entry.isAscending ? timeseries::kControlMaxFieldNamePrefix - : timeseries::kControlMinFieldNamePrefix) + - lastpointTimeField.get()), + FieldPath(timeseries::kControlMinFieldNamePrefix + lastpointTimeField.get()), nullptr}); } else { auto updated = FieldPath(timeseries::kBucketMetaFieldName); @@ -206,16 +192,6 @@ boost::intrusive_ptr<DocumentSourceSort> createMetadataSortForReorder( sort.getContext(), SortPattern{updatedPattern}, 0, maxMemoryUsageBytes); } -boost::intrusive_ptr<DocumentSourceGroup> createGroupForReorder( - const boost::intrusive_ptr<ExpressionContext>& expCtx, FieldPath& fieldPath) { - auto elem = BSON("bucket" << BSON(AccumulatorFirst::kName << "$_id")).firstElement(); - auto newAccum = AccumulationStatement::parseAccumulationStatement( - expCtx.get(), elem, expCtx->variablesParseState); - auto groupByExpr = ExpressionFieldPath::createPathFromString( - expCtx.get(), fieldPath.fullPath(), expCtx->variablesParseState); - return DocumentSourceGroup::create(expCtx, groupByExpr, {newAccum}); -} - // Optimize the section of the pipeline before the $_internalUnpackBucket stage. void optimizePrefix(Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { auto prefix = Pipeline::SourceContainer(container->begin(), itr); @@ -733,11 +709,13 @@ void addStagesToRetrieveEventLevelFields(Pipeline::SourceContainer& sources, << BSON(DocumentSourceLimit::kStageName << 1)))) .firstElement(), expCtx); + sources.insert(unpackIt, lookup); auto unwind = DocumentSourceUnwind::createFromBson( BSON(DocumentSourceUnwind::kStageName << metrics.fullPathWithPrefix()).firstElement(), expCtx); + sources.insert(unpackIt, unwind); BSONObjBuilder fields; @@ -747,17 +725,14 @@ void addStagesToRetrieveEventLevelFields(Pipeline::SourceContainer& sources, auto&& v = accumulator.expr.argument; if (auto expr = dynamic_cast<ExpressionFieldPath*>(v.get())) { auto&& newPath = metrics.concat(expr->getFieldPath().tail()); - // This is necessary to preserve $first, $last null-check semantics for handling - // nullish fields, e.g. returning missing field paths as null. - auto ifNullCheck = BSON( - "$ifNull" << BSONArray(BSON("0" << ("$" + newPath.fullPath()) << "1" << BSONNULL))); - fields << StringData{accumulator.fieldName} << ifNullCheck; + fields << StringData{accumulator.fieldName} << "$" + newPath.fullPath(); } } - auto replaceWithBson = BSON(DocumentSourceReplaceRoot::kAliasNameReplaceWith << fields.obj()); - auto replaceWith = - DocumentSourceReplaceRoot::createFromBson(replaceWithBson.firstElement(), expCtx); + auto replaceWith = DocumentSourceReplaceRoot::createFromBson( + BSON(DocumentSourceReplaceRoot::kAliasNameReplaceWith << fields.obj()).firstElement(), + expCtx); + sources.insert(unpackIt, replaceWith); } @@ -775,23 +750,27 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta return false; } + // Attempt to create a new bucket-level $sort. if (sortStage->hasLimit()) { // This $sort stage was previously followed by a $limit stage. return false; } auto spec = _bucketUnpacker.bucketSpec(); - auto maybeMetaField = spec.metaField(); + auto metaField = spec.metaField(); auto timeField = spec.timeField(); - if (!maybeMetaField || haveComputedMetaField()) { + + if (!metaField || haveComputedMetaField()) { return false; } - auto metaField = maybeMetaField.get(); - if (!checkMetadataSortReorder(sortStage->getSortKeyPattern(), metaField, timeField)) { + if (!checkMetadataSortReorder(sortStage->getSortKeyPattern(), metaField.get(), timeField)) { return false; } + auto newSort = createMetadataSortForReorder(*sortStage, timeField); + + // Attempt to create a new bucket-level $group. auto groupIdFields = groupStage->getIdFields(); if (groupIdFields.size() != 1) { return false; @@ -803,7 +782,7 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta } const auto fieldPath = groupId->getFieldPath(); - if (fieldPath.getPathLength() <= 1 || fieldPath.tail().getFieldName(0) != metaField) { + if (fieldPath.getPathLength() <= 1 || fieldPath.tail().getFieldName(0) != metaField.get()) { return false; } @@ -811,33 +790,25 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta if (fieldPath.tail().getPathLength() > 1) { newFieldPath = newFieldPath.concat(fieldPath.tail().tail()); } + auto groupByExpr = ExpressionFieldPath::createPathFromString( + pExpCtx.get(), newFieldPath.fullPath(), pExpCtx->variablesParseState); - // Insert bucket-level $sort and $group stages before we unpack any buckets. - boost::intrusive_ptr<DocumentSourceSort> newSort; - auto insertBucketLevelSortAndGroup = [&](bool flipSort) { - newSort = createMetadataSortForReorder(*sortStage, timeField, flipSort); - auto newGroup = createGroupForReorder(pExpCtx, newFieldPath); - container->insert(itr, newSort); - container->insert(itr, newGroup); - }; - - auto accumulators = groupStage->getAccumulatedFields(); - auto groupOnlyUsesTargetAccum = [&](AccumulatorDocumentsNeeded targetAccum) { - return std::all_of(accumulators.begin(), accumulators.end(), [&](auto&& accum) { - return targetAccum == accum.makeAccumulator()->documentsNeeded(); - }); - }; - - std::string newTimeField; - if (groupOnlyUsesTargetAccum(AccumulatorDocumentsNeeded::kFirstDocument)) { - insertBucketLevelSortAndGroup(false); - newTimeField = timeseries::kControlMinFieldNamePrefix + timeField; - } else if (groupOnlyUsesTargetAccum(AccumulatorDocumentsNeeded::kLastDocument)) { - insertBucketLevelSortAndGroup(true); - newTimeField = timeseries::kControlMaxFieldNamePrefix + timeField; - } else { - return false; + for (auto&& accumulator : groupStage->getAccumulatedFields()) { + if (AccumulatorDocumentsNeeded::kFirstDocument != + accumulator.makeAccumulator()->documentsNeeded()) { + return false; + } } + auto newAccum = + AccumulationStatement::parseAccumulationStatement(pExpCtx.get(), + BSON("bucket" << BSON("$first" + << "$_id")) + .firstElement(), + pExpCtx->variablesParseState); + auto newGroup = DocumentSourceGroup::create(pExpCtx, groupByExpr, {newAccum}); + + container->insert(itr, newSort); + container->insert(itr, newGroup); // Add $lookup, $unwind and $replaceWith stages. addStagesToRetrieveEventLevelFields( @@ -846,7 +817,7 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta pExpCtx, groupStage, timeField, - isLastpointSortTimeAscending(newSort->getSortKeyPattern(), newTimeField)); + isLastpointSortTimeAscending(sortStage->getSortKeyPattern(), timeField)); // Remove the $sort, $group and $_internalUnpackBucket stages. tassert(6165401, diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp index 642852bed40..323602418ac 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp @@ -172,120 +172,7 @@ TEST_F(InternalUnpackBucketOptimizeLastpointTest, ASSERT_BSONOBJ_EQ(serialized[3], fromjson("{$unwind: {path: '$metrics'}}")); ASSERT_BSONOBJ_EQ( serialized[4], - fromjson("{$replaceRoot: {newRoot: {_id: '$_id', b: {$ifNull: ['$metrics.b', {$const: " - "null}]}, c: {$ifNull: ['$metrics.c', {$const: null}]}}}}")); -} - -TEST_F(InternalUnpackBucketOptimizeLastpointTest, - LastpointWithMetaSubfieldDescendingTimeDescending) { - RAIIServerParameterControllerForTest controller("featureFlagLastPointQuery", true); - auto pipeline = Pipeline::parse( - makeVector(fromjson("{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: " - "'tags', bucketMaxSpanSeconds: 3600}}"), - fromjson("{$sort: {'tags.a': -1, t: -1}}"), - fromjson("{$group: {_id: '$tags.a', b: {$first: '$b'}, c: {$first: '$c'}}}")), - getExpCtx()); - auto& container = pipeline->getSources(); - - ASSERT_EQ(container.size(), 3U); - - auto success = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.front().get()) - ->optimizeLastpoint(container.begin(), &container); - ASSERT_TRUE(success); - - auto serialized = pipeline->serializeToBson(); - - ASSERT_EQ(serialized.size(), 5u); - ASSERT_BSONOBJ_EQ( - serialized[0], - fromjson("{$sort: {'meta.a': -1, 'control.max.t': -1, 'control.min.t': -1}}")); - ASSERT_BSONOBJ_EQ(serialized[1], - fromjson("{$group: {_id: '$meta.a', bucket: {$first: '$_id'}}}")); - ASSERT_BSONOBJ_EQ( - serialized[2], - fromjson( - "{$lookup: {from: 'pipeline_test', as: 'metrics', localField: 'bucket', foreignField: " - "'_id', let: {}, pipeline: [{$_internalUnpackBucket: {exclude: [], timeField: 't', " - "metaField: 'tags', bucketMaxSpanSeconds: 3600}}, {$sort: {t: -1}}, {$limit: 1}]}}")); - ASSERT_BSONOBJ_EQ(serialized[3], fromjson("{$unwind: {path: '$metrics'}}")); - ASSERT_BSONOBJ_EQ( - serialized[4], - fromjson("{$replaceRoot: {newRoot: {_id: '$_id', b: {$ifNull: ['$metrics.b', {$const: " - "null}]}, c: {$ifNull: ['$metrics.c', {$const: null}]}}}}")); -} - -TEST_F(InternalUnpackBucketOptimizeLastpointTest, LastpointWithMetaSubfieldAscendingTimeAscending) { - RAIIServerParameterControllerForTest controller("featureFlagLastPointQuery", true); - auto pipeline = Pipeline::parse( - makeVector(fromjson("{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: " - "'tags', bucketMaxSpanSeconds: 3600}}"), - fromjson("{$sort: {'tags.a': 1, t: 1}}"), - fromjson("{$group: {_id: '$tags.a', b: {$last: '$b'}, c: {$last: '$c'}}}")), - getExpCtx()); - auto& container = pipeline->getSources(); - - ASSERT_EQ(container.size(), 3U); - - auto success = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.front().get()) - ->optimizeLastpoint(container.begin(), &container); - ASSERT_TRUE(success); - - auto serialized = pipeline->serializeToBson(); - - ASSERT_EQ(serialized.size(), 5u); - ASSERT_BSONOBJ_EQ( - serialized[0], - fromjson("{$sort: {'meta.a': -1, 'control.max.t': -1, 'control.min.t': -1}}")); - ASSERT_BSONOBJ_EQ(serialized[1], - fromjson("{$group: {_id: '$meta.a', bucket: {$first: '$_id'}}}")); - ASSERT_BSONOBJ_EQ( - serialized[2], - fromjson( - "{$lookup: {from: 'pipeline_test', as: 'metrics', localField: 'bucket', foreignField: " - "'_id', let: {}, pipeline: [{$_internalUnpackBucket: {exclude: [], timeField: 't', " - "metaField: 'tags', bucketMaxSpanSeconds: 3600}}, {$sort: {t: -1}}, {$limit: 1}]}}")); - ASSERT_BSONOBJ_EQ(serialized[3], fromjson("{$unwind: {path: '$metrics'}}")); - ASSERT_BSONOBJ_EQ( - serialized[4], - fromjson("{$replaceRoot: {newRoot: {_id: '$_id', b: {$ifNull: ['$metrics.b', {$const: " - "null}]}, c: {$ifNull: ['$metrics.c', {$const: null}]}}}}")); -} - -TEST_F(InternalUnpackBucketOptimizeLastpointTest, - LastpointWithMetaSubfieldDescendingTimeAscending) { - RAIIServerParameterControllerForTest controller("featureFlagLastPointQuery", true); - auto pipeline = Pipeline::parse( - makeVector(fromjson("{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: " - "'tags', bucketMaxSpanSeconds: 3600}}"), - fromjson("{$sort: {'tags.a': -1, t: 1}}"), - fromjson("{$group: {_id: '$tags.a', b: {$last: '$b'}, c: {$last: '$c'}}}")), - getExpCtx()); - auto& container = pipeline->getSources(); - - ASSERT_EQ(container.size(), 3U); - - auto success = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.front().get()) - ->optimizeLastpoint(container.begin(), &container); - ASSERT_TRUE(success); - - auto serialized = pipeline->serializeToBson(); - - ASSERT_EQ(serialized.size(), 5u); - ASSERT_BSONOBJ_EQ(serialized[0], - fromjson("{$sort: {'meta.a': 1, 'control.max.t': -1, 'control.min.t': -1}}")); - ASSERT_BSONOBJ_EQ(serialized[1], - fromjson("{$group: {_id: '$meta.a', bucket: {$first: '$_id'}}}")); - ASSERT_BSONOBJ_EQ( - serialized[2], - fromjson( - "{$lookup: {from: 'pipeline_test', as: 'metrics', localField: 'bucket', foreignField: " - "'_id', let: {}, pipeline: [{$_internalUnpackBucket: {exclude: [], timeField: 't', " - "metaField: 'tags', bucketMaxSpanSeconds: 3600}}, {$sort: {t: -1}}, {$limit: 1}]}}")); - ASSERT_BSONOBJ_EQ(serialized[3], fromjson("{$unwind: {path: '$metrics'}}")); - ASSERT_BSONOBJ_EQ( - serialized[4], - fromjson("{$replaceRoot: {newRoot: {_id: '$_id', b: {$ifNull: ['$metrics.b', {$const: " - "null}]}, c: {$ifNull: ['$metrics.c', {$const: null}]}}}}")); + fromjson("{$replaceRoot: {newRoot: {_id: '$_id', b: '$metrics.b', c: '$metrics.c'}}}")); } } // namespace |