From e4544716efb8892741cfc6f79e81d802a894581b Mon Sep 17 00:00:00 2001 From: Katherine Wu Date: Tue, 15 Feb 2022 15:04:48 +0000 Subject: SERVER-61654 Implement lastpoint optimization for {meta.a: 1, ts: -1} use case --- jstests/core/timeseries/timeseries_lastpoint.js | 103 ++++++++++ jstests/core/timeseries/timeseries_merge.js | 2 +- src/mongo/db/pipeline/SConscript | 1 + .../document_source_internal_unpack_bucket.cpp | 221 ++++++++++++++++++++- .../document_source_internal_unpack_bucket.h | 25 +++ .../optimize_lastpoint_test.cpp | 179 +++++++++++++++++ src/mongo/db/pipeline/document_source_lookup.cpp | 10 +- src/mongo/db/pipeline/document_source_lookup.h | 5 + 8 files changed, 530 insertions(+), 16 deletions(-) create mode 100644 jstests/core/timeseries/timeseries_lastpoint.js create mode 100644 src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp diff --git a/jstests/core/timeseries/timeseries_lastpoint.js b/jstests/core/timeseries/timeseries_lastpoint.js new file mode 100644 index 00000000000..05344c194da --- /dev/null +++ b/jstests/core/timeseries/timeseries_lastpoint.js @@ -0,0 +1,103 @@ +/** + * Tests the optimization of "lastpoint"-type queries on time-series collections. + * + * @tags: [ + * does_not_support_stepdowns, + * does_not_support_transactions, + * requires_timeseries, + * requires_pipeline_optimization, + * requires_fcv_53, + * # TODO (SERVER-63590): Investigate presence of getmore tag in timeseries jstests. + * requires_getmore + * ] + */ +(function() { +"use strict"; + +load("jstests/aggregation/extras/utils.js"); +load("jstests/core/timeseries/libs/timeseries_agg_helpers.js"); +load('jstests/libs/analyze_plan.js'); + +const testDB = TimeseriesAggTests.getTestDb(); +assert.commandWorked(testDB.dropDatabase()); + +// Do not run the rest of the tests if the lastpoint optimization is disabled. +const getLastpointParam = db.adminCommand({getParameter: 1, featureFlagLastPointQuery: 1}); +const isLastpointEnabled = getLastpointParam.hasOwnProperty("featureFlagLastPointQuery") && + getLastpointParam.featureFlagLastPointQuery.value; +if (!isLastpointEnabled) { + return; +} + +function verifyTsResults(pipeline, index, precedingFilter) { + // Prepare collections. + const numHosts = 10; + const numIterations = 20; + const [tsColl, observerColl] = + TimeseriesAggTests.prepareInputCollections(numHosts, numIterations); + if (index) { + tsColl.createIndex(index); + } + + // Verify lastpoint optmization. + const explain = tsColl.explain().aggregate(pipeline); + if (index) { + // The query can utilize DISTINCT_SCAN. + assert.neq(getAggPlanStage(explain, "DISTINCT_SCAN"), null, explain); + + // Pipelines that use the DISTINCT_SCAN optimization should not also have a blocking sort. + assert.eq(getAggPlanStage(explain, "SORT"), null, explain); + } else { + // $sort can be pushed into the cursor layer. + assert.neq(getAggPlanStage(explain, "SORT"), null, explain); + + // At the bottom, there should be a COLLSCAN. + const collScanStage = getAggPlanStage(explain, "COLLSCAN"); + assert.neq(collScanStage, null, explain); + if (precedingFilter) { + assert.eq(precedingFilter, collScanStage.filter, collScanStage); + } + } + + // Assert that the time-series aggregation results match that of the observer collection. + const expectedResults = observerColl.aggregate(pipeline).toArray(); + const actualResults = tsColl.aggregate(pipeline).toArray(); + assert(resultsEq(actualResults, expectedResults), + `Expected ${tojson(expectedResults)} but got ${tojson(actualResults)}`); +} + +function verifyTsResultsWithAndWithoutIndex(pipeline, index, precedingFilter) { + verifyTsResults(pipeline, undefined, precedingFilter); + verifyTsResults(pipeline, index, precedingFilter); +} + +verifyTsResultsWithAndWithoutIndex( + [ + {$sort: {"tags.hostid": 1, time: -1}}, + { + $group: { + _id: "$tags.hostid", + usage_user: {$first: "$usage_user"}, + usage_guest: {$first: "$usage_guest"}, + usage_idle: {$first: "$usage_idle"} + } + } + ], + {"tags.hostid": 1, time: -1}); + +verifyTsResultsWithAndWithoutIndex( + [ + {$match: {"tags.hostid": "host_0"}}, + {$sort: {"tags.hostid": 1, time: -1}}, + { + $group: { + _id: "$tags.hostid", + usage_user: {$first: "$usage_user"}, + usage_guest: {$first: "$usage_guest"}, + usage_idle: {$first: "$usage_idle"} + } + } + ], + {"tags.hostid": 1, time: -1}, + {"meta.hostid": {$eq: "host_0"}}); +})(); diff --git a/jstests/core/timeseries/timeseries_merge.js b/jstests/core/timeseries/timeseries_merge.js index ac24b40e01e..59d4dbee889 100644 --- a/jstests/core/timeseries/timeseries_merge.js +++ b/jstests/core/timeseries/timeseries_merge.js @@ -71,7 +71,7 @@ let runMergeOnErrorTestCase = () => { */ let runMergeOnTestCase = () => { var mergePipeline = [ - {$project: {_id: 0, cpu: 1, idle: 1, "tags.hostid": 1, time: 1}}, + {$project: {_id: 0, cpu: 1, idle_user: 1, "tags.hostid": 1, time: 1}}, {$sort: {time: 1}}, { $merge: { diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 0edf1244058..7410cc36679 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -546,6 +546,7 @@ env.CppUnitTest( 'document_source_internal_unpack_bucket_test/group_reorder_test.cpp', 'document_source_internal_unpack_bucket_test/internalize_project_test.cpp', 'document_source_internal_unpack_bucket_test/optimize_pipeline_test.cpp', + 'document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp', 'document_source_internal_unpack_bucket_test/pushdown_computed_meta_projections_test.cpp', 'document_source_internal_unpack_bucket_test/sample_reorder_test.cpp', 'document_source_internal_unpack_bucket_test/sort_reorder_test.cpp', diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp index 6dccf7eaf37..5040fdc2445 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp @@ -49,8 +49,10 @@ #include "mongo/db/pipeline/document_source_add_fields.h" #include "mongo/db/pipeline/document_source_geo_near.h" #include "mongo/db/pipeline/document_source_group.h" +#include "mongo/db/pipeline/document_source_lookup.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_project.h" +#include "mongo/db/pipeline/document_source_replace_root.h" #include "mongo/db/pipeline/document_source_sample.h" #include "mongo/db/pipeline/document_source_single_document_transformation.h" #include "mongo/db/pipeline/document_source_sort.h" @@ -108,11 +110,28 @@ auto getIncludeExcludeProjectAndType(DocumentSource* src) { return std::pair{BSONObj{}, false}; } +auto isLastpointSortTimeAscending(const SortPattern& sortPattern, const std::string& timeField) { + for (auto entry : sortPattern) { + if (entry.fieldPath->fullPath() == timeField) { + return entry.isAscending; + } + } + // A lastpoint query will always have the time field as part of the sort pattern. + MONGO_UNREACHABLE; +} + /** * Checks if a sort stage's pattern following our internal unpack bucket is suitable to be reordered * before us. The sort stage must refer exclusively to the meta field or any subfields. + * + * If this check is being used for lastpoint, the sort stage can also refer to the time field, + * which should be the last field in the pattern. */ -bool checkMetadataSortReorder(const SortPattern& sortPattern, const StringData& metaFieldStr) { +bool checkMetadataSortReorder( + const SortPattern& sortPattern, + const StringData& metaFieldStr, + const boost::optional lastpointTimeField = boost::none) { + auto timeFound = false; for (const auto& sortKey : sortPattern) { if (!sortKey.fieldPath.has_value()) { return false; @@ -121,27 +140,47 @@ bool checkMetadataSortReorder(const SortPattern& sortPattern, const StringData& return false; } if (sortKey.fieldPath->getFieldName(0) != metaFieldStr) { + if (lastpointTimeField && sortKey.fieldPath->fullPath() == lastpointTimeField.get()) { + // If we are checking the sort pattern for the lastpoint case, 'time' is allowed. + timeFound = true; + continue; + } return false; + } else { + if (lastpointTimeField && timeFound) { + // The time field was not the last field in the sort pattern. + return false; + } } } - return true; + // If we are checking for lastpoint, make sure we encountered the time field. + return !lastpointTimeField || timeFound; } /** * Returns a new DocumentSort to reorder before current unpack bucket document. */ boost::intrusive_ptr createMetadataSortForReorder( - const DocumentSourceSort& sort) { + const DocumentSourceSort& sort, + const boost::optional lastpointTimeField = boost::none) { std::vector updatedPattern; for (const auto& entry : sort.getSortKeyPattern()) { - // Repoint sort to use metadata field before renaming. - auto updatedFieldPath = FieldPath(timeseries::kBucketMetaFieldName); - if (entry.fieldPath->getPathLength() > 1) { - updatedFieldPath = updatedFieldPath.concat(entry.fieldPath->tail()); - } - updatedPattern.push_back(entry); - updatedPattern.back().fieldPath = updatedFieldPath; + + if (lastpointTimeField && entry.fieldPath->fullPath() == lastpointTimeField.get()) { + updatedPattern.back().fieldPath = + FieldPath(timeseries::kControlMaxFieldNamePrefix + lastpointTimeField.get()); + updatedPattern.push_back(SortPattern::SortPatternPart{ + entry.isAscending, + FieldPath(timeseries::kControlMinFieldNamePrefix + lastpointTimeField.get()), + nullptr}); + } else { + auto updated = FieldPath(timeseries::kBucketMetaFieldName); + if (entry.fieldPath->getPathLength() > 1) { + updated = updated.concat(entry.fieldPath->tail()); + } + updatedPattern.back().fieldPath = updated; + } } boost::optional maxMemoryUsageBytes; @@ -646,6 +685,159 @@ bool DocumentSourceInternalUnpackBucket::haveComputedMetaField() const { _bucketUnpacker.bucketSpec().metaField().get()); } +void addStagesToRetrieveEventLevelFields(Pipeline::SourceContainer& sources, + const Pipeline::SourceContainer::const_iterator unpackIt, + boost::intrusive_ptr expCtx, + boost::intrusive_ptr group, + const boost::optional lastpointTimeField, + bool timeAscending) { + mongo::stdx::unordered_set nss; + auto&& ns = expCtx->ns; + nss.emplace(ns); + expCtx->addResolvedNamespaces(nss); + + FieldPath metrics("metrics"); + auto lookup = DocumentSourceLookUp::createFromBson( + BSON(DocumentSourceLookUp::kStageName << BSON( + DocumentSourceLookUp::kFromField + << ns.coll() << DocumentSourceLookUp::kLocalField << "bucket" + << DocumentSourceLookUp::kForeignField << "_id" << DocumentSourceLookUp::kAsField + << metrics.fullPath() << DocumentSourceLookUp::kPipelineField + << BSON_ARRAY(unpackIt->get()->serializeToBSONForDebug() + << BSON(DocumentSourceSort::kStageName << BSON( + lastpointTimeField.get() << (timeAscending ? 1 : -1))) + << BSON(DocumentSourceLimit::kStageName << 1)))) + .firstElement(), + expCtx); + + sources.insert(unpackIt, lookup); + + auto unwind = DocumentSourceUnwind::createFromBson( + BSON(DocumentSourceUnwind::kStageName << metrics.fullPathWithPrefix()).firstElement(), + expCtx); + + sources.insert(unpackIt, unwind); + + BSONObjBuilder fields; + fields << "_id" + << "$_id"; + for (auto&& accumulator : group->getAccumulatedFields()) { + auto&& v = accumulator.expr.argument; + if (auto expr = dynamic_cast(v.get())) { + auto&& newPath = metrics.concat(expr->getFieldPath().tail()); + fields << StringData{accumulator.fieldName} << "$" + newPath.fullPath(); + } + } + + auto replaceWith = DocumentSourceReplaceRoot::createFromBson( + BSON(DocumentSourceReplaceRoot::kAliasNameReplaceWith << fields.obj()).firstElement(), + expCtx); + + sources.insert(unpackIt, replaceWith); +} + +bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) { + // A lastpoint-type aggregation must contain both a $sort and a $group stage, in that order. + if (std::next(itr, 2) == container->end()) { + return false; + } + + auto sortStage = dynamic_cast(std::next(itr)->get()); + auto groupStage = dynamic_cast(std::next(itr, 2)->get()); + + if (!sortStage || !groupStage) { + return false; + } + + // Attempt to create a new bucket-level $sort. + if (sortStage->hasLimit()) { + // This $sort stage was previously followed by a $limit stage. + return false; + } + + auto spec = _bucketUnpacker.bucketSpec(); + auto metaField = spec.metaField(); + auto timeField = spec.timeField(); + + if (!metaField || haveComputedMetaField()) { + return false; + } + + if (!checkMetadataSortReorder(sortStage->getSortKeyPattern(), metaField.get(), timeField)) { + return false; + } + + auto newSort = createMetadataSortForReorder(*sortStage, timeField); + + // Attempt to create a new bucket-level $group. + auto groupIdFields = groupStage->getIdFields(); + if (groupIdFields.size() != 1) { + return false; + } + + auto groupId = dynamic_cast(groupIdFields.cbegin()->second.get()); + if (!groupId || groupId->isVariableReference()) { + return false; + } + + const auto fieldPath = groupId->getFieldPath(); + if (fieldPath.getPathLength() <= 1 || fieldPath.tail().getFieldName(0) != metaField.get()) { + return false; + } + + auto newFieldPath = FieldPath(timeseries::kBucketMetaFieldName); + if (fieldPath.tail().getPathLength() > 1) { + newFieldPath = newFieldPath.concat(fieldPath.tail().tail()); + } + auto groupByExpr = ExpressionFieldPath::createPathFromString( + pExpCtx.get(), newFieldPath.fullPath(), pExpCtx->variablesParseState); + + for (auto&& accumulator : groupStage->getAccumulatedFields()) { + if (AccumulatorDocumentsNeeded::kFirstDocument != + accumulator.makeAccumulator()->documentsNeeded()) { + return false; + } + } + auto newAccum = + AccumulationStatement::parseAccumulationStatement(pExpCtx.get(), + BSON("bucket" << BSON("$first" + << "$_id")) + .firstElement(), + pExpCtx->variablesParseState); + auto newGroup = DocumentSourceGroup::create(pExpCtx, groupByExpr, {newAccum}); + + container->insert(itr, newSort); + container->insert(itr, newGroup); + + // Add $lookup, $unwind and $replaceWith stages. + addStagesToRetrieveEventLevelFields( + *container, + itr, + pExpCtx, + groupStage, + timeField, + isLastpointSortTimeAscending(sortStage->getSortKeyPattern(), timeField)); + + // Remove the $sort, $group and $_internalUnpackBucket stages. + tassert(6165401, + "unexpected stage in lastpoint aggregate, expected $_internalUnpackBucket", + itr->get()->getSourceName() == kStageNameInternal); + itr = container->erase(itr); + + tassert(6165402, + "unexpected stage in lastpoint aggregate, expected $sort", + itr->get()->getSourceName() == DocumentSourceSort::kStageName); + itr = container->erase(itr); + + tassert(6165403, + "unexpected stage in lastpoint aggregate, expected $group", + itr->get()->getSourceName() == DocumentSourceGroup::kStageName); + container->erase(itr); + + return true; +} + Pipeline::SourceContainer::iterator DocumentSourceInternalUnpackBucket::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this); @@ -772,6 +964,15 @@ Pipeline::SourceContainer::iterator DocumentSourceInternalUnpackBucket::doOptimi } } + // Attempt to optimize last-point type queries. + if (feature_flags::gfeatureFlagLastPointQuery.isEnabled( + serverGlobalParams.featureCompatibility) && + optimizeLastpoint(itr, container)) { + // If we are able to rewrite the aggregation, give the resulting pipeline a chance to + // perform further optimizations. + return container->begin(); + }; + // Attempt to map predicates on bucketed fields to predicates on the control field. if (auto nextMatch = dynamic_cast(std::next(itr)->get()); nextMatch && !_triedBucketLevelFieldsPredicatesPushdown) { diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h index 32079ed2ceb..9238d87176f 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h @@ -190,6 +190,31 @@ public: std::pair rewriteGroupByMinMax( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container); + /** + * If the current aggregation is a lastpoint-type query (ie. with a $sort on meta and time + * fields, and a $group with a meta _id and only $first or $last accumulators) we can rewrite + * it to avoid unpacking all buckets. + * + * Ex: user aggregation of + * [{_internalUnpackBucket: {...}}, + * {$sort: {myMeta.a: 1, myTime: -1}}, + * {$group: {_id: "$myMeta.a", otherFields: {$first: {$otherFields}}}}] + * + * will be rewritten into: + * [{$sort: {meta.a: 1, time: -1}}, + * {$group: {_id: "$meta.a": 1, bucket: {$first: "$_id"}}}, + * {$lookup: { + * from: , + * as: "metrics", + * localField: "bucket", + * foreignField: "_id" + * pipeline: [{$_internalUnpackBucket: {...}}, {$sort: {myTime: -1}}, {$limit: 1}]}}, + * {$unwind: "$metrics"}, + * {$replaceWith: {_id: "$_id", otherFields: {$metrics.otherFields}}}] + */ + bool optimizeLastpoint(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container); + GetModPathsReturn getModifiedPaths() const final override; private: diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp new file mode 100644 index 00000000000..323602418ac --- /dev/null +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp @@ -0,0 +1,179 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h" +#include "mongo/db/query/util/make_data_structure.h" +#include "mongo/idl/server_parameter_test_util.h" + +namespace mongo { +namespace { + +using InternalUnpackBucketOptimizeLastpointTest = AggregationContextFixture; + +TEST_F(InternalUnpackBucketOptimizeLastpointTest, NonLastpointDoesNotParticipateInOptimization) { + RAIIServerParameterControllerForTest controller("featureFlagLastPointQuery", true); + { + // $sort must contain a time field. + auto unpackSpec = fromjson( + "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'tags', " + "bucketMaxSpanSeconds: 3600}}"); + auto sortSpec = fromjson("{$sort: {'tags.a': 1}}"); + auto groupSpec = + fromjson("{$group: {_id: '$tags.a', b: {$first: '$b'}, c: {$first: '$c'}}}"); + auto pipeline = Pipeline::parse(makeVector(unpackSpec, sortSpec, groupSpec), getExpCtx()); + auto& container = pipeline->getSources(); + + ASSERT_EQ(container.size(), 3U); + + auto success = dynamic_cast(container.front().get()) + ->optimizeLastpoint(container.begin(), &container); + ASSERT_FALSE(success); + + // The pipeline is unchanged. + auto serialized = pipeline->serializeToBson(); + ASSERT_EQ(serialized.size(), 3u); + ASSERT_BSONOBJ_EQ(serialized[0], unpackSpec); + ASSERT_BSONOBJ_EQ(serialized[1], sortSpec); + ASSERT_BSONOBJ_EQ(serialized[2], groupSpec); + } + { + // $sort must have the time field as the last field in the sort key pattern. + auto unpackSpec = fromjson( + "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'tags', " + "bucketMaxSpanSeconds: 3600}}"); + auto sortSpec = fromjson("{$sort: {t: -1, 'tags.a': 1}}"); + auto groupSpec = + fromjson("{$group: {_id: '$tags.a', b: {$first: '$b'}, c: {$first: '$c'}}}"); + auto pipeline = Pipeline::parse(makeVector(unpackSpec, sortSpec, groupSpec), getExpCtx()); + auto& container = pipeline->getSources(); + + ASSERT_EQ(container.size(), 3U); + + auto success = dynamic_cast(container.front().get()) + ->optimizeLastpoint(container.begin(), &container); + ASSERT_FALSE(success); + + // The pipeline is unchanged. + auto serialized = pipeline->serializeToBson(); + ASSERT_EQ(serialized.size(), 3u); + ASSERT_BSONOBJ_EQ(serialized[0], unpackSpec); + ASSERT_BSONOBJ_EQ(serialized[1], sortSpec); + ASSERT_BSONOBJ_EQ(serialized[2], groupSpec); + } + { + // $group's _id must be a meta field. + auto unpackSpec = fromjson( + "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'tags', " + "bucketMaxSpanSeconds: 3600}}"); + auto sortSpec = fromjson("{$sort: {'tags.a': 1, t: -1}}"); + auto groupSpec = + fromjson("{$group: {_id: '$nonMeta', b: {$first: '$b'}, c: {$first: '$c'}}}"); + auto pipeline = Pipeline::parse(makeVector(unpackSpec, sortSpec, groupSpec), getExpCtx()); + auto& container = pipeline->getSources(); + + ASSERT_EQ(container.size(), 3U); + + auto success = dynamic_cast(container.front().get()) + ->optimizeLastpoint(container.begin(), &container); + ASSERT_FALSE(success); + + // The pipeline is unchanged. + auto serialized = pipeline->serializeToBson(); + ASSERT_EQ(serialized.size(), 3u); + ASSERT_BSONOBJ_EQ(serialized[0], unpackSpec); + ASSERT_BSONOBJ_EQ(serialized[1], sortSpec); + ASSERT_BSONOBJ_EQ(serialized[2], groupSpec); + } + { + // For now, $group can only contain $first accumulators. + auto unpackSpec = fromjson( + "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'tags', " + "bucketMaxSpanSeconds: 3600}}"); + auto sortSpec = fromjson("{$sort: {'tags.a': 1, t: -1}}"); + auto groupSpec = + fromjson("{$group: {_id: '$tags.a', b: {$first: '$b'}, c: {$last: '$c'}}}"); + auto pipeline = Pipeline::parse(makeVector(unpackSpec, sortSpec, groupSpec), getExpCtx()); + auto& container = pipeline->getSources(); + + ASSERT_EQ(container.size(), 3U); + + auto success = dynamic_cast(container.front().get()) + ->optimizeLastpoint(container.begin(), &container); + ASSERT_FALSE(success); + + // The pipeline is unchanged. + auto serialized = pipeline->serializeToBson(); + ASSERT_EQ(serialized.size(), 3u); + ASSERT_BSONOBJ_EQ(serialized[0], unpackSpec); + ASSERT_BSONOBJ_EQ(serialized[1], sortSpec); + ASSERT_BSONOBJ_EQ(serialized[2], groupSpec); + } +} + +TEST_F(InternalUnpackBucketOptimizeLastpointTest, + LastpointWithMetaSubfieldAscendingTimeDescending) { + RAIIServerParameterControllerForTest controller("featureFlagLastPointQuery", true); + auto pipeline = Pipeline::parse( + makeVector(fromjson("{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: " + "'tags', bucketMaxSpanSeconds: 3600}}"), + fromjson("{$sort: {'tags.a': 1, t: -1}}"), + fromjson("{$group: {_id: '$tags.a', b: {$first: '$b'}, c: {$first: '$c'}}}")), + getExpCtx()); + auto& container = pipeline->getSources(); + + ASSERT_EQ(container.size(), 3U); + + auto success = dynamic_cast(container.front().get()) + ->optimizeLastpoint(container.begin(), &container); + ASSERT_TRUE(success); + + auto serialized = pipeline->serializeToBson(); + + ASSERT_EQ(serialized.size(), 5u); + ASSERT_BSONOBJ_EQ(serialized[0], + fromjson("{$sort: {'meta.a': 1, 'control.max.t': -1, 'control.min.t': -1}}")); + ASSERT_BSONOBJ_EQ(serialized[1], + fromjson("{$group: {_id: '$meta.a', bucket: {$first: '$_id'}}}")); + ASSERT_BSONOBJ_EQ( + serialized[2], + fromjson( + "{$lookup: {from: 'pipeline_test', as: 'metrics', localField: 'bucket', foreignField: " + "'_id', let: {}, pipeline: [{$_internalUnpackBucket: {exclude: [], timeField: 't', " + "metaField: 'tags', bucketMaxSpanSeconds: 3600}}, {$sort: {t: -1}}, {$limit: 1}]}}")); + ASSERT_BSONOBJ_EQ(serialized[3], fromjson("{$unwind: {path: '$metrics'}}")); + ASSERT_BSONOBJ_EQ( + serialized[4], + fromjson("{$replaceRoot: {newRoot: {_id: '$_id', b: '$metrics.b', c: '$metrics.c'}}}")); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index d44f3fde71a..2f3cab580a5 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -1167,7 +1167,7 @@ intrusive_ptr DocumentSourceLookUp::createFromBson( for (auto&& argument : elem.Obj()) { const auto argName = argument.fieldNameStringData(); - if (argName == "pipeline"_sd) { + if (argName == kPipelineField) { pipeline = parsePipelineFromBSON(argument); hasPipeline = true; continue; @@ -1183,7 +1183,7 @@ intrusive_ptr DocumentSourceLookUp::createFromBson( continue; } - if (argName == "from"_sd) { + if (argName == kFromField) { fromNs = parseLookupFromAndResolveNamespace(argument, pExpCtx->ns.db()); continue; } @@ -1203,11 +1203,11 @@ intrusive_ptr DocumentSourceLookUp::createFromBson( << argument << ": " << argument.type(), argument.type() == BSONType::String); - if (argName == "as"_sd) { + if (argName == kAsField) { as = argument.String(); - } else if (argName == "localField"_sd) { + } else if (argName == kLocalField) { localField = argument.String(); - } else if (argName == "foreignField"_sd) { + } else if (argName == kForeignField) { foreignField = argument.String(); } else { uasserted(ErrorCodes::FailedToParse, diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index 20afde99bf3..d69530a67cf 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -49,6 +49,11 @@ namespace mongo { class DocumentSourceLookUp final : public DocumentSource { public: static constexpr StringData kStageName = "$lookup"_sd; + static constexpr StringData kFromField = "from"_sd; + static constexpr StringData kLocalField = "localField"_sd; + static constexpr StringData kForeignField = "foreignField"_sd; + static constexpr StringData kPipelineField = "pipeline"_sd; + static constexpr StringData kAsField = "as"_sd; struct LetVariable { LetVariable(std::string name, boost::intrusive_ptr expression, Variables::Id id) -- cgit v1.2.1