summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKatherine Wu <katherine.wu@mongodb.com>2022-02-15 15:04:48 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-15 17:10:54 +0000
commite4544716efb8892741cfc6f79e81d802a894581b (patch)
tree76663f86317226eb890d3b455f78ede146de3e69
parenta5d73b3c4b3680ed6cb34aaa90a0af33f1176846 (diff)
downloadmongo-e4544716efb8892741cfc6f79e81d802a894581b.tar.gz
SERVER-61654 Implement lastpoint optimization for {meta.a: 1, ts: -1} use case
-rw-r--r--jstests/core/timeseries/timeseries_lastpoint.js103
-rw-r--r--jstests/core/timeseries/timeseries_merge.js2
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp221
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.h25
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp179
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp10
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h5
8 files changed, 530 insertions, 16 deletions
diff --git a/jstests/core/timeseries/timeseries_lastpoint.js b/jstests/core/timeseries/timeseries_lastpoint.js
new file mode 100644
index 00000000000..05344c194da
--- /dev/null
+++ b/jstests/core/timeseries/timeseries_lastpoint.js
@@ -0,0 +1,103 @@
+/**
+ * Tests the optimization of "lastpoint"-type queries on time-series collections.
+ *
+ * @tags: [
+ * does_not_support_stepdowns,
+ * does_not_support_transactions,
+ * requires_timeseries,
+ * requires_pipeline_optimization,
+ * requires_fcv_53,
+ * # TODO (SERVER-63590): Investigate presence of getmore tag in timeseries jstests.
+ * requires_getmore
+ * ]
+ */
+(function() {
+"use strict";
+
+load("jstests/aggregation/extras/utils.js");
+load("jstests/core/timeseries/libs/timeseries_agg_helpers.js");
+load('jstests/libs/analyze_plan.js');
+
+const testDB = TimeseriesAggTests.getTestDb();
+assert.commandWorked(testDB.dropDatabase());
+
+// Do not run the rest of the tests if the lastpoint optimization is disabled.
+const getLastpointParam = db.adminCommand({getParameter: 1, featureFlagLastPointQuery: 1});
+const isLastpointEnabled = getLastpointParam.hasOwnProperty("featureFlagLastPointQuery") &&
+ getLastpointParam.featureFlagLastPointQuery.value;
+if (!isLastpointEnabled) {
+ return;
+}
+
+function verifyTsResults(pipeline, index, precedingFilter) {
+ // Prepare collections.
+ const numHosts = 10;
+ const numIterations = 20;
+ const [tsColl, observerColl] =
+ TimeseriesAggTests.prepareInputCollections(numHosts, numIterations);
+ if (index) {
+ tsColl.createIndex(index);
+ }
+
+ // Verify lastpoint optmization.
+ const explain = tsColl.explain().aggregate(pipeline);
+ if (index) {
+ // The query can utilize DISTINCT_SCAN.
+ assert.neq(getAggPlanStage(explain, "DISTINCT_SCAN"), null, explain);
+
+ // Pipelines that use the DISTINCT_SCAN optimization should not also have a blocking sort.
+ assert.eq(getAggPlanStage(explain, "SORT"), null, explain);
+ } else {
+ // $sort can be pushed into the cursor layer.
+ assert.neq(getAggPlanStage(explain, "SORT"), null, explain);
+
+ // At the bottom, there should be a COLLSCAN.
+ const collScanStage = getAggPlanStage(explain, "COLLSCAN");
+ assert.neq(collScanStage, null, explain);
+ if (precedingFilter) {
+ assert.eq(precedingFilter, collScanStage.filter, collScanStage);
+ }
+ }
+
+ // Assert that the time-series aggregation results match that of the observer collection.
+ const expectedResults = observerColl.aggregate(pipeline).toArray();
+ const actualResults = tsColl.aggregate(pipeline).toArray();
+ assert(resultsEq(actualResults, expectedResults),
+ `Expected ${tojson(expectedResults)} but got ${tojson(actualResults)}`);
+}
+
+function verifyTsResultsWithAndWithoutIndex(pipeline, index, precedingFilter) {
+ verifyTsResults(pipeline, undefined, precedingFilter);
+ verifyTsResults(pipeline, index, precedingFilter);
+}
+
+verifyTsResultsWithAndWithoutIndex(
+ [
+ {$sort: {"tags.hostid": 1, time: -1}},
+ {
+ $group: {
+ _id: "$tags.hostid",
+ usage_user: {$first: "$usage_user"},
+ usage_guest: {$first: "$usage_guest"},
+ usage_idle: {$first: "$usage_idle"}
+ }
+ }
+ ],
+ {"tags.hostid": 1, time: -1});
+
+verifyTsResultsWithAndWithoutIndex(
+ [
+ {$match: {"tags.hostid": "host_0"}},
+ {$sort: {"tags.hostid": 1, time: -1}},
+ {
+ $group: {
+ _id: "$tags.hostid",
+ usage_user: {$first: "$usage_user"},
+ usage_guest: {$first: "$usage_guest"},
+ usage_idle: {$first: "$usage_idle"}
+ }
+ }
+ ],
+ {"tags.hostid": 1, time: -1},
+ {"meta.hostid": {$eq: "host_0"}});
+})();
diff --git a/jstests/core/timeseries/timeseries_merge.js b/jstests/core/timeseries/timeseries_merge.js
index ac24b40e01e..59d4dbee889 100644
--- a/jstests/core/timeseries/timeseries_merge.js
+++ b/jstests/core/timeseries/timeseries_merge.js
@@ -71,7 +71,7 @@ let runMergeOnErrorTestCase = () => {
*/
let runMergeOnTestCase = () => {
var mergePipeline = [
- {$project: {_id: 0, cpu: 1, idle: 1, "tags.hostid": 1, time: 1}},
+ {$project: {_id: 0, cpu: 1, idle_user: 1, "tags.hostid": 1, time: 1}},
{$sort: {time: 1}},
{
$merge: {
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 0edf1244058..7410cc36679 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -546,6 +546,7 @@ env.CppUnitTest(
'document_source_internal_unpack_bucket_test/group_reorder_test.cpp',
'document_source_internal_unpack_bucket_test/internalize_project_test.cpp',
'document_source_internal_unpack_bucket_test/optimize_pipeline_test.cpp',
+ 'document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp',
'document_source_internal_unpack_bucket_test/pushdown_computed_meta_projections_test.cpp',
'document_source_internal_unpack_bucket_test/sample_reorder_test.cpp',
'document_source_internal_unpack_bucket_test/sort_reorder_test.cpp',
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
index 6dccf7eaf37..5040fdc2445 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
@@ -49,8 +49,10 @@
#include "mongo/db/pipeline/document_source_add_fields.h"
#include "mongo/db/pipeline/document_source_geo_near.h"
#include "mongo/db/pipeline/document_source_group.h"
+#include "mongo/db/pipeline/document_source_lookup.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_project.h"
+#include "mongo/db/pipeline/document_source_replace_root.h"
#include "mongo/db/pipeline/document_source_sample.h"
#include "mongo/db/pipeline/document_source_single_document_transformation.h"
#include "mongo/db/pipeline/document_source_sort.h"
@@ -108,11 +110,28 @@ auto getIncludeExcludeProjectAndType(DocumentSource* src) {
return std::pair{BSONObj{}, false};
}
+auto isLastpointSortTimeAscending(const SortPattern& sortPattern, const std::string& timeField) {
+ for (auto entry : sortPattern) {
+ if (entry.fieldPath->fullPath() == timeField) {
+ return entry.isAscending;
+ }
+ }
+ // A lastpoint query will always have the time field as part of the sort pattern.
+ MONGO_UNREACHABLE;
+}
+
/**
* Checks if a sort stage's pattern following our internal unpack bucket is suitable to be reordered
* before us. The sort stage must refer exclusively to the meta field or any subfields.
+ *
+ * If this check is being used for lastpoint, the sort stage can also refer to the time field,
+ * which should be the last field in the pattern.
*/
-bool checkMetadataSortReorder(const SortPattern& sortPattern, const StringData& metaFieldStr) {
+bool checkMetadataSortReorder(
+ const SortPattern& sortPattern,
+ const StringData& metaFieldStr,
+ const boost::optional<std::string&> lastpointTimeField = boost::none) {
+ auto timeFound = false;
for (const auto& sortKey : sortPattern) {
if (!sortKey.fieldPath.has_value()) {
return false;
@@ -121,27 +140,47 @@ bool checkMetadataSortReorder(const SortPattern& sortPattern, const StringData&
return false;
}
if (sortKey.fieldPath->getFieldName(0) != metaFieldStr) {
+ if (lastpointTimeField && sortKey.fieldPath->fullPath() == lastpointTimeField.get()) {
+ // If we are checking the sort pattern for the lastpoint case, 'time' is allowed.
+ timeFound = true;
+ continue;
+ }
return false;
+ } else {
+ if (lastpointTimeField && timeFound) {
+ // The time field was not the last field in the sort pattern.
+ return false;
+ }
}
}
- return true;
+ // If we are checking for lastpoint, make sure we encountered the time field.
+ return !lastpointTimeField || timeFound;
}
/**
* Returns a new DocumentSort to reorder before current unpack bucket document.
*/
boost::intrusive_ptr<DocumentSourceSort> createMetadataSortForReorder(
- const DocumentSourceSort& sort) {
+ const DocumentSourceSort& sort,
+ const boost::optional<std::string&> lastpointTimeField = boost::none) {
std::vector<SortPattern::SortPatternPart> updatedPattern;
for (const auto& entry : sort.getSortKeyPattern()) {
- // Repoint sort to use metadata field before renaming.
- auto updatedFieldPath = FieldPath(timeseries::kBucketMetaFieldName);
- if (entry.fieldPath->getPathLength() > 1) {
- updatedFieldPath = updatedFieldPath.concat(entry.fieldPath->tail());
- }
-
updatedPattern.push_back(entry);
- updatedPattern.back().fieldPath = updatedFieldPath;
+
+ if (lastpointTimeField && entry.fieldPath->fullPath() == lastpointTimeField.get()) {
+ updatedPattern.back().fieldPath =
+ FieldPath(timeseries::kControlMaxFieldNamePrefix + lastpointTimeField.get());
+ updatedPattern.push_back(SortPattern::SortPatternPart{
+ entry.isAscending,
+ FieldPath(timeseries::kControlMinFieldNamePrefix + lastpointTimeField.get()),
+ nullptr});
+ } else {
+ auto updated = FieldPath(timeseries::kBucketMetaFieldName);
+ if (entry.fieldPath->getPathLength() > 1) {
+ updated = updated.concat(entry.fieldPath->tail());
+ }
+ updatedPattern.back().fieldPath = updated;
+ }
}
boost::optional<uint64_t> maxMemoryUsageBytes;
@@ -646,6 +685,159 @@ bool DocumentSourceInternalUnpackBucket::haveComputedMetaField() const {
_bucketUnpacker.bucketSpec().metaField().get());
}
+void addStagesToRetrieveEventLevelFields(Pipeline::SourceContainer& sources,
+ const Pipeline::SourceContainer::const_iterator unpackIt,
+ boost::intrusive_ptr<ExpressionContext> expCtx,
+ boost::intrusive_ptr<DocumentSourceGroup> group,
+ const boost::optional<std::string&> lastpointTimeField,
+ bool timeAscending) {
+ mongo::stdx::unordered_set<mongo::NamespaceString> nss;
+ auto&& ns = expCtx->ns;
+ nss.emplace(ns);
+ expCtx->addResolvedNamespaces(nss);
+
+ FieldPath metrics("metrics");
+ auto lookup = DocumentSourceLookUp::createFromBson(
+ BSON(DocumentSourceLookUp::kStageName << BSON(
+ DocumentSourceLookUp::kFromField
+ << ns.coll() << DocumentSourceLookUp::kLocalField << "bucket"
+ << DocumentSourceLookUp::kForeignField << "_id" << DocumentSourceLookUp::kAsField
+ << metrics.fullPath() << DocumentSourceLookUp::kPipelineField
+ << BSON_ARRAY(unpackIt->get()->serializeToBSONForDebug()
+ << BSON(DocumentSourceSort::kStageName << BSON(
+ lastpointTimeField.get() << (timeAscending ? 1 : -1)))
+ << BSON(DocumentSourceLimit::kStageName << 1))))
+ .firstElement(),
+ expCtx);
+
+ sources.insert(unpackIt, lookup);
+
+ auto unwind = DocumentSourceUnwind::createFromBson(
+ BSON(DocumentSourceUnwind::kStageName << metrics.fullPathWithPrefix()).firstElement(),
+ expCtx);
+
+ sources.insert(unpackIt, unwind);
+
+ BSONObjBuilder fields;
+ fields << "_id"
+ << "$_id";
+ for (auto&& accumulator : group->getAccumulatedFields()) {
+ auto&& v = accumulator.expr.argument;
+ if (auto expr = dynamic_cast<ExpressionFieldPath*>(v.get())) {
+ auto&& newPath = metrics.concat(expr->getFieldPath().tail());
+ fields << StringData{accumulator.fieldName} << "$" + newPath.fullPath();
+ }
+ }
+
+ auto replaceWith = DocumentSourceReplaceRoot::createFromBson(
+ BSON(DocumentSourceReplaceRoot::kAliasNameReplaceWith << fields.obj()).firstElement(),
+ expCtx);
+
+ sources.insert(unpackIt, replaceWith);
+}
+
+bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) {
+ // A lastpoint-type aggregation must contain both a $sort and a $group stage, in that order.
+ if (std::next(itr, 2) == container->end()) {
+ return false;
+ }
+
+ auto sortStage = dynamic_cast<DocumentSourceSort*>(std::next(itr)->get());
+ auto groupStage = dynamic_cast<DocumentSourceGroup*>(std::next(itr, 2)->get());
+
+ if (!sortStage || !groupStage) {
+ return false;
+ }
+
+ // Attempt to create a new bucket-level $sort.
+ if (sortStage->hasLimit()) {
+ // This $sort stage was previously followed by a $limit stage.
+ return false;
+ }
+
+ auto spec = _bucketUnpacker.bucketSpec();
+ auto metaField = spec.metaField();
+ auto timeField = spec.timeField();
+
+ if (!metaField || haveComputedMetaField()) {
+ return false;
+ }
+
+ if (!checkMetadataSortReorder(sortStage->getSortKeyPattern(), metaField.get(), timeField)) {
+ return false;
+ }
+
+ auto newSort = createMetadataSortForReorder(*sortStage, timeField);
+
+ // Attempt to create a new bucket-level $group.
+ auto groupIdFields = groupStage->getIdFields();
+ if (groupIdFields.size() != 1) {
+ return false;
+ }
+
+ auto groupId = dynamic_cast<ExpressionFieldPath*>(groupIdFields.cbegin()->second.get());
+ if (!groupId || groupId->isVariableReference()) {
+ return false;
+ }
+
+ const auto fieldPath = groupId->getFieldPath();
+ if (fieldPath.getPathLength() <= 1 || fieldPath.tail().getFieldName(0) != metaField.get()) {
+ return false;
+ }
+
+ auto newFieldPath = FieldPath(timeseries::kBucketMetaFieldName);
+ if (fieldPath.tail().getPathLength() > 1) {
+ newFieldPath = newFieldPath.concat(fieldPath.tail().tail());
+ }
+ auto groupByExpr = ExpressionFieldPath::createPathFromString(
+ pExpCtx.get(), newFieldPath.fullPath(), pExpCtx->variablesParseState);
+
+ for (auto&& accumulator : groupStage->getAccumulatedFields()) {
+ if (AccumulatorDocumentsNeeded::kFirstDocument !=
+ accumulator.makeAccumulator()->documentsNeeded()) {
+ return false;
+ }
+ }
+ auto newAccum =
+ AccumulationStatement::parseAccumulationStatement(pExpCtx.get(),
+ BSON("bucket" << BSON("$first"
+ << "$_id"))
+ .firstElement(),
+ pExpCtx->variablesParseState);
+ auto newGroup = DocumentSourceGroup::create(pExpCtx, groupByExpr, {newAccum});
+
+ container->insert(itr, newSort);
+ container->insert(itr, newGroup);
+
+ // Add $lookup, $unwind and $replaceWith stages.
+ addStagesToRetrieveEventLevelFields(
+ *container,
+ itr,
+ pExpCtx,
+ groupStage,
+ timeField,
+ isLastpointSortTimeAscending(sortStage->getSortKeyPattern(), timeField));
+
+ // Remove the $sort, $group and $_internalUnpackBucket stages.
+ tassert(6165401,
+ "unexpected stage in lastpoint aggregate, expected $_internalUnpackBucket",
+ itr->get()->getSourceName() == kStageNameInternal);
+ itr = container->erase(itr);
+
+ tassert(6165402,
+ "unexpected stage in lastpoint aggregate, expected $sort",
+ itr->get()->getSourceName() == DocumentSourceSort::kStageName);
+ itr = container->erase(itr);
+
+ tassert(6165403,
+ "unexpected stage in lastpoint aggregate, expected $group",
+ itr->get()->getSourceName() == DocumentSourceGroup::kStageName);
+ container->erase(itr);
+
+ return true;
+}
+
Pipeline::SourceContainer::iterator DocumentSourceInternalUnpackBucket::doOptimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
invariant(*itr == this);
@@ -772,6 +964,15 @@ Pipeline::SourceContainer::iterator DocumentSourceInternalUnpackBucket::doOptimi
}
}
+ // Attempt to optimize last-point type queries.
+ if (feature_flags::gfeatureFlagLastPointQuery.isEnabled(
+ serverGlobalParams.featureCompatibility) &&
+ optimizeLastpoint(itr, container)) {
+ // If we are able to rewrite the aggregation, give the resulting pipeline a chance to
+ // perform further optimizations.
+ return container->begin();
+ };
+
// Attempt to map predicates on bucketed fields to predicates on the control field.
if (auto nextMatch = dynamic_cast<DocumentSourceMatch*>(std::next(itr)->get());
nextMatch && !_triedBucketLevelFieldsPredicatesPushdown) {
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
index 32079ed2ceb..9238d87176f 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
@@ -190,6 +190,31 @@ public:
std::pair<bool, Pipeline::SourceContainer::iterator> rewriteGroupByMinMax(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container);
+ /**
+ * If the current aggregation is a lastpoint-type query (ie. with a $sort on meta and time
+ * fields, and a $group with a meta _id and only $first or $last accumulators) we can rewrite
+ * it to avoid unpacking all buckets.
+ *
+ * Ex: user aggregation of
+ * [{_internalUnpackBucket: {...}},
+ * {$sort: {myMeta.a: 1, myTime: -1}},
+ * {$group: {_id: "$myMeta.a", otherFields: {$first: {$otherFields}}}}]
+ *
+ * will be rewritten into:
+ * [{$sort: {meta.a: 1, time: -1}},
+ * {$group: {_id: "$meta.a": 1, bucket: {$first: "$_id"}}},
+ * {$lookup: {
+ * from: <bucketColl>,
+ * as: "metrics",
+ * localField: "bucket",
+ * foreignField: "_id"
+ * pipeline: [{$_internalUnpackBucket: {...}}, {$sort: {myTime: -1}}, {$limit: 1}]}},
+ * {$unwind: "$metrics"},
+ * {$replaceWith: {_id: "$_id", otherFields: {$metrics.otherFields}}}]
+ */
+ bool optimizeLastpoint(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container);
+
GetModPathsReturn getModifiedPaths() const final override;
private:
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp
new file mode 100644
index 00000000000..323602418ac
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp
@@ -0,0 +1,179 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h"
+#include "mongo/db/query/util/make_data_structure.h"
+#include "mongo/idl/server_parameter_test_util.h"
+
+namespace mongo {
+namespace {
+
+using InternalUnpackBucketOptimizeLastpointTest = AggregationContextFixture;
+
+TEST_F(InternalUnpackBucketOptimizeLastpointTest, NonLastpointDoesNotParticipateInOptimization) {
+ RAIIServerParameterControllerForTest controller("featureFlagLastPointQuery", true);
+ {
+ // $sort must contain a time field.
+ auto unpackSpec = fromjson(
+ "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'tags', "
+ "bucketMaxSpanSeconds: 3600}}");
+ auto sortSpec = fromjson("{$sort: {'tags.a': 1}}");
+ auto groupSpec =
+ fromjson("{$group: {_id: '$tags.a', b: {$first: '$b'}, c: {$first: '$c'}}}");
+ auto pipeline = Pipeline::parse(makeVector(unpackSpec, sortSpec, groupSpec), getExpCtx());
+ auto& container = pipeline->getSources();
+
+ ASSERT_EQ(container.size(), 3U);
+
+ auto success = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.front().get())
+ ->optimizeLastpoint(container.begin(), &container);
+ ASSERT_FALSE(success);
+
+ // The pipeline is unchanged.
+ auto serialized = pipeline->serializeToBson();
+ ASSERT_EQ(serialized.size(), 3u);
+ ASSERT_BSONOBJ_EQ(serialized[0], unpackSpec);
+ ASSERT_BSONOBJ_EQ(serialized[1], sortSpec);
+ ASSERT_BSONOBJ_EQ(serialized[2], groupSpec);
+ }
+ {
+ // $sort must have the time field as the last field in the sort key pattern.
+ auto unpackSpec = fromjson(
+ "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'tags', "
+ "bucketMaxSpanSeconds: 3600}}");
+ auto sortSpec = fromjson("{$sort: {t: -1, 'tags.a': 1}}");
+ auto groupSpec =
+ fromjson("{$group: {_id: '$tags.a', b: {$first: '$b'}, c: {$first: '$c'}}}");
+ auto pipeline = Pipeline::parse(makeVector(unpackSpec, sortSpec, groupSpec), getExpCtx());
+ auto& container = pipeline->getSources();
+
+ ASSERT_EQ(container.size(), 3U);
+
+ auto success = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.front().get())
+ ->optimizeLastpoint(container.begin(), &container);
+ ASSERT_FALSE(success);
+
+ // The pipeline is unchanged.
+ auto serialized = pipeline->serializeToBson();
+ ASSERT_EQ(serialized.size(), 3u);
+ ASSERT_BSONOBJ_EQ(serialized[0], unpackSpec);
+ ASSERT_BSONOBJ_EQ(serialized[1], sortSpec);
+ ASSERT_BSONOBJ_EQ(serialized[2], groupSpec);
+ }
+ {
+ // $group's _id must be a meta field.
+ auto unpackSpec = fromjson(
+ "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'tags', "
+ "bucketMaxSpanSeconds: 3600}}");
+ auto sortSpec = fromjson("{$sort: {'tags.a': 1, t: -1}}");
+ auto groupSpec =
+ fromjson("{$group: {_id: '$nonMeta', b: {$first: '$b'}, c: {$first: '$c'}}}");
+ auto pipeline = Pipeline::parse(makeVector(unpackSpec, sortSpec, groupSpec), getExpCtx());
+ auto& container = pipeline->getSources();
+
+ ASSERT_EQ(container.size(), 3U);
+
+ auto success = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.front().get())
+ ->optimizeLastpoint(container.begin(), &container);
+ ASSERT_FALSE(success);
+
+ // The pipeline is unchanged.
+ auto serialized = pipeline->serializeToBson();
+ ASSERT_EQ(serialized.size(), 3u);
+ ASSERT_BSONOBJ_EQ(serialized[0], unpackSpec);
+ ASSERT_BSONOBJ_EQ(serialized[1], sortSpec);
+ ASSERT_BSONOBJ_EQ(serialized[2], groupSpec);
+ }
+ {
+ // For now, $group can only contain $first accumulators.
+ auto unpackSpec = fromjson(
+ "{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: 'tags', "
+ "bucketMaxSpanSeconds: 3600}}");
+ auto sortSpec = fromjson("{$sort: {'tags.a': 1, t: -1}}");
+ auto groupSpec =
+ fromjson("{$group: {_id: '$tags.a', b: {$first: '$b'}, c: {$last: '$c'}}}");
+ auto pipeline = Pipeline::parse(makeVector(unpackSpec, sortSpec, groupSpec), getExpCtx());
+ auto& container = pipeline->getSources();
+
+ ASSERT_EQ(container.size(), 3U);
+
+ auto success = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.front().get())
+ ->optimizeLastpoint(container.begin(), &container);
+ ASSERT_FALSE(success);
+
+ // The pipeline is unchanged.
+ auto serialized = pipeline->serializeToBson();
+ ASSERT_EQ(serialized.size(), 3u);
+ ASSERT_BSONOBJ_EQ(serialized[0], unpackSpec);
+ ASSERT_BSONOBJ_EQ(serialized[1], sortSpec);
+ ASSERT_BSONOBJ_EQ(serialized[2], groupSpec);
+ }
+}
+
+TEST_F(InternalUnpackBucketOptimizeLastpointTest,
+ LastpointWithMetaSubfieldAscendingTimeDescending) {
+ RAIIServerParameterControllerForTest controller("featureFlagLastPointQuery", true);
+ auto pipeline = Pipeline::parse(
+ makeVector(fromjson("{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
+ "'tags', bucketMaxSpanSeconds: 3600}}"),
+ fromjson("{$sort: {'tags.a': 1, t: -1}}"),
+ fromjson("{$group: {_id: '$tags.a', b: {$first: '$b'}, c: {$first: '$c'}}}")),
+ getExpCtx());
+ auto& container = pipeline->getSources();
+
+ ASSERT_EQ(container.size(), 3U);
+
+ auto success = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.front().get())
+ ->optimizeLastpoint(container.begin(), &container);
+ ASSERT_TRUE(success);
+
+ auto serialized = pipeline->serializeToBson();
+
+ ASSERT_EQ(serialized.size(), 5u);
+ ASSERT_BSONOBJ_EQ(serialized[0],
+ fromjson("{$sort: {'meta.a': 1, 'control.max.t': -1, 'control.min.t': -1}}"));
+ ASSERT_BSONOBJ_EQ(serialized[1],
+ fromjson("{$group: {_id: '$meta.a', bucket: {$first: '$_id'}}}"));
+ ASSERT_BSONOBJ_EQ(
+ serialized[2],
+ fromjson(
+ "{$lookup: {from: 'pipeline_test', as: 'metrics', localField: 'bucket', foreignField: "
+ "'_id', let: {}, pipeline: [{$_internalUnpackBucket: {exclude: [], timeField: 't', "
+ "metaField: 'tags', bucketMaxSpanSeconds: 3600}}, {$sort: {t: -1}}, {$limit: 1}]}}"));
+ ASSERT_BSONOBJ_EQ(serialized[3], fromjson("{$unwind: {path: '$metrics'}}"));
+ ASSERT_BSONOBJ_EQ(
+ serialized[4],
+ fromjson("{$replaceRoot: {newRoot: {_id: '$_id', b: '$metrics.b', c: '$metrics.c'}}}"));
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index d44f3fde71a..2f3cab580a5 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -1167,7 +1167,7 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
for (auto&& argument : elem.Obj()) {
const auto argName = argument.fieldNameStringData();
- if (argName == "pipeline"_sd) {
+ if (argName == kPipelineField) {
pipeline = parsePipelineFromBSON(argument);
hasPipeline = true;
continue;
@@ -1183,7 +1183,7 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
continue;
}
- if (argName == "from"_sd) {
+ if (argName == kFromField) {
fromNs = parseLookupFromAndResolveNamespace(argument, pExpCtx->ns.db());
continue;
}
@@ -1203,11 +1203,11 @@ intrusive_ptr<DocumentSource> DocumentSourceLookUp::createFromBson(
<< argument << ": " << argument.type(),
argument.type() == BSONType::String);
- if (argName == "as"_sd) {
+ if (argName == kAsField) {
as = argument.String();
- } else if (argName == "localField"_sd) {
+ } else if (argName == kLocalField) {
localField = argument.String();
- } else if (argName == "foreignField"_sd) {
+ } else if (argName == kForeignField) {
foreignField = argument.String();
} else {
uasserted(ErrorCodes::FailedToParse,
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index 20afde99bf3..d69530a67cf 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -49,6 +49,11 @@ namespace mongo {
class DocumentSourceLookUp final : public DocumentSource {
public:
static constexpr StringData kStageName = "$lookup"_sd;
+ static constexpr StringData kFromField = "from"_sd;
+ static constexpr StringData kLocalField = "localField"_sd;
+ static constexpr StringData kForeignField = "foreignField"_sd;
+ static constexpr StringData kPipelineField = "pipeline"_sd;
+ static constexpr StringData kAsField = "as"_sd;
struct LetVariable {
LetVariable(std::string name, boost::intrusive_ptr<Expression> expression, Variables::Id id)