summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlya Berciu <alyacarina@gmail.com>2022-03-16 10:56:39 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-16 11:56:47 +0000
commite102dbba6cd1f5b99991089b90e841007bef16d7 (patch)
treec9362281be8d5c1c68276f8d279af77bb7c68b62
parent539165defa601981015f0e89eba9a0d957c3b031 (diff)
downloadmongo-e102dbba6cd1f5b99991089b90e841007bef16d7.tar.gz
Revert "SERVER-64235 Implement timeseries lastpoint rewrite for ascending time"
This reverts commit 811dc428a93105d0aa9091ca20e4dfb120e4e57d.
-rw-r--r--jstests/core/timeseries/libs/timeseries_agg_helpers.js4
-rw-r--r--jstests/core/timeseries/timeseries_lastpoint.js214
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp105
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp115
4 files changed, 99 insertions, 339 deletions
diff --git a/jstests/core/timeseries/libs/timeseries_agg_helpers.js b/jstests/core/timeseries/libs/timeseries_agg_helpers.js
index a2a2f74393f..a2ae1ffcca6 100644
--- a/jstests/core/timeseries/libs/timeseries_agg_helpers.js
+++ b/jstests/core/timeseries/libs/timeseries_agg_helpers.js
@@ -20,7 +20,7 @@ var TimeseriesAggTests = class {
* @returns An array of a time-series collection and a non time-series collection,
* respectively in this order.
*/
- static prepareInputCollections(numHosts, numIterations, includeIdleMeasurements = true) {
+ static prepareInputCollections(numHosts, numIterations) {
const timeseriesCollOption = {timeseries: {timeField: "time", metaField: "tags"}};
Random.setRandomSeed();
@@ -60,7 +60,7 @@ var TimeseriesAggTests = class {
assert.commandWorked(inColl.insert(newMeasurement));
assert.commandWorked(observerInColl.insert(newMeasurement));
- if (includeIdleMeasurements && (i % 2)) {
+ if (i % 2) {
let idleMeasurement = {
tags: host.tags,
time: new Date(currTime + i),
diff --git a/jstests/core/timeseries/timeseries_lastpoint.js b/jstests/core/timeseries/timeseries_lastpoint.js
index 3be28645c7b..05344c194da 100644
--- a/jstests/core/timeseries/timeseries_lastpoint.js
+++ b/jstests/core/timeseries/timeseries_lastpoint.js
@@ -29,173 +29,75 @@ if (!isLastpointEnabled) {
return;
}
-// Timeseries test parameters.
-const numHosts = 10;
-const numIterations = 20;
-
-function verifyTsResults({pipeline, precedingFilter, expectStage, prepareTest}) {
- // Prepare collections. Note: we test without idle measurements (all meta subfields are
- // non-null). If we allow the insertion of idle measurements, we will obtain multiple lastpoints
- // per bucket, and may have different results on the observer and timeseries collections.
- const [tsColl, observerColl] = TimeseriesAggTests.prepareInputCollections(
- numHosts, numIterations, false /* includeIdleMeasurements */);
-
- // Additional preparation before running the test.
- if (prepareTest) {
- prepareTest(tsColl, observerColl);
+function verifyTsResults(pipeline, index, precedingFilter) {
+ // Prepare collections.
+ const numHosts = 10;
+ const numIterations = 20;
+ const [tsColl, observerColl] =
+ TimeseriesAggTests.prepareInputCollections(numHosts, numIterations);
+ if (index) {
+ tsColl.createIndex(index);
}
// Verify lastpoint optmization.
const explain = tsColl.explain().aggregate(pipeline);
- expectStage({explain, precedingFilter});
-
- // Assert that the time-series aggregation results match that of the observer collection.
- const expected = observerColl.aggregate(pipeline).toArray();
- const actual = tsColl.aggregate(pipeline).toArray();
- assertArrayEq({actual, expected});
-
- // Drop collections.
- tsColl.drop();
- observerColl.drop();
-}
-
-function verifyTsResultsWithAndWithoutIndex(
- {pipeline, index, bucketsIndex, precedingFilter, expectStage, prePrepareTest}) {
- verifyTsResults(
- {pipeline, precedingFilter, expectStage: expectCollScan, prepareTest: prePrepareTest});
- verifyTsResults({
- pipeline,
- precedingFilter,
- expectStage,
- prepareTest: (testColl, observerColl) => {
- // Optionally do extra test preparation.
- if (prePrepareTest) {
- prePrepareTest(testColl, observerColl);
- }
-
- // Create index on the timeseries collection.
- testColl.createIndex(index);
-
- // Create an additional secondary index directly on the buckets collection so that we
- // can test the DISTINCT_SCAN optimization when time is sorted in ascending order.
- if (bucketsIndex) {
- const bucketsColl = testDB["system.buckets.in"];
- bucketsColl.createIndex(bucketsIndex);
- }
+ if (index) {
+ // The query can utilize DISTINCT_SCAN.
+ assert.neq(getAggPlanStage(explain, "DISTINCT_SCAN"), null, explain);
+
+ // Pipelines that use the DISTINCT_SCAN optimization should not also have a blocking sort.
+ assert.eq(getAggPlanStage(explain, "SORT"), null, explain);
+ } else {
+ // $sort can be pushed into the cursor layer.
+ assert.neq(getAggPlanStage(explain, "SORT"), null, explain);
+
+ // At the bottom, there should be a COLLSCAN.
+ const collScanStage = getAggPlanStage(explain, "COLLSCAN");
+ assert.neq(collScanStage, null, explain);
+ if (precedingFilter) {
+ assert.eq(precedingFilter, collScanStage.filter, collScanStage);
}
- });
-}
-
-function expectDistinctScan({explain}) {
- // The query can utilize DISTINCT_SCAN.
- assert.neq(getAggPlanStage(explain, "DISTINCT_SCAN"), null, explain);
-
- // Pipelines that use the DISTINCT_SCAN optimization should not also have a blocking sort.
- assert.eq(getAggPlanStage(explain, "SORT"), null, explain);
-}
-
-function expectCollScan({explain, precedingFilter}) {
- // $sort can be pushed into the cursor layer.
- assert.neq(getAggPlanStage(explain, "SORT"), null, explain);
-
- // At the bottom, there should be a COLLSCAN.
- const collScanStage = getAggPlanStage(explain, "COLLSCAN");
- assert.neq(collScanStage, null, explain);
- if (precedingFilter) {
- assert.eq(precedingFilter, collScanStage.filter, collScanStage);
}
-}
-
-function expectIxscan({explain}) {
- // $sort can be pushed into the cursor layer.
- assert.neq(getAggPlanStage(explain, "SORT"), null, explain);
- // At the bottom, there should be a IXSCAN.
- assert.neq(getAggPlanStage(explain, "IXSCAN"), null, explain);
+ // Assert that the time-series aggregation results match that of the observer collection.
+ const expectedResults = observerColl.aggregate(pipeline).toArray();
+ const actualResults = tsColl.aggregate(pipeline).toArray();
+ assert(resultsEq(actualResults, expectedResults),
+ `Expected ${tojson(expectedResults)} but got ${tojson(actualResults)}`);
}
-function getGroupStage(accumulator) {
- return {
- $group: {
- _id: "$tags.hostid",
- usage_user: {[accumulator]: "$usage_user"},
- usage_guest: {[accumulator]: "$usage_guest"},
- usage_idle: {[accumulator]: "$usage_idle"}
- }
- };
+function verifyTsResultsWithAndWithoutIndex(pipeline, index, precedingFilter) {
+ verifyTsResults(pipeline, undefined, precedingFilter);
+ verifyTsResults(pipeline, index, precedingFilter);
}
-/**
- Test cases:
- 1. Lastpoint queries on indexes with descending time and $first (DISTINCT_SCAN).
- 2. Lastpoint queries on indexes with ascending time and $last (no DISTINCT_SCAN).
- 3. Lastpoint queries on indexes with ascending time and $last and an additional secondary
- index so that we can use the DISTINCT_SCAN optimization.
-*/
-const testCases = [
- {time: -1},
- {time: 1},
- {time: -1, bucketsIndex: {"meta.hostid": -1, "control.max.time": 1, "control.min.time": 1}}
-];
-
-for (const {time, bucketsIndex} of testCases) {
- const isTimeDescending = time < 0;
- const canUseDistinct = isTimeDescending || bucketsIndex;
- const groupStage = isTimeDescending ? getGroupStage("$first") : getGroupStage("$last");
-
- // Test both directions of the metaField sort for each direction of time.
- for (const index of [{"tags.hostid": 1, time}, {"tags.hostid": -1, time}]) {
- // Test pipeline without a preceding $match stage.
- verifyTsResultsWithAndWithoutIndex({
- pipeline: [{$sort: index}, groupStage],
- index,
- bucketsIndex,
- expectStage: (canUseDistinct ? expectDistinctScan : expectCollScan)
- });
-
- // Test pipeline without a preceding $match stage which has an extra idle measurement. This
- // verifies that the query rewrite correctly returns missing fields.
- verifyTsResultsWithAndWithoutIndex({
- pipeline: [{$sort: index}, groupStage],
- index,
- bucketsIndex,
- expectStage: (canUseDistinct ? expectDistinctScan : expectCollScan),
- prePrepareTest: (testColl, observerColl) => {
- const currTime = new Date();
- for (const host of TimeseriesTest.generateHosts(numHosts)) {
- const idleMeasurement = {
- tags: host.tags,
- time: new Date(currTime + numIterations), // Ensure this is the lastpoint.
- idle_user: 100 - TimeseriesTest.getRandomUsage()
- };
- assert.commandWorked(testColl.insert(idleMeasurement));
- assert.commandWorked(observerColl.insert(idleMeasurement));
- }
+verifyTsResultsWithAndWithoutIndex(
+ [
+ {$sort: {"tags.hostid": 1, time: -1}},
+ {
+ $group: {
+ _id: "$tags.hostid",
+ usage_user: {$first: "$usage_user"},
+ usage_guest: {$first: "$usage_guest"},
+ usage_idle: {$first: "$usage_idle"}
}
- });
-
- // Test pipeline with a preceding $match stage.
- function testWithMatch(matchStage, precedingFilter) {
- verifyTsResultsWithAndWithoutIndex({
- pipeline: [matchStage, {$sort: index}, groupStage],
- index,
- bucketsIndex,
- expectStage: canUseDistinct ? expectDistinctScan : expectIxscan,
- precedingFilter
- });
}
-
- // Test pipeline with an equality $match stage.
- testWithMatch({$match: {"tags.hostid": 0}}, {"meta.hostid": {$eq: 0}});
-
- // Test pipeline with an inequality $match stage.
- testWithMatch({$match: {"tags.hostid": {$ne: 0}}}, {"meta.hostid": {$not: {$eq: 0}}});
-
- // Test pipeline with a $match stage that uses a $gt query.
- testWithMatch({$match: {"tags.hostid": {$gt: 5}}}, {"meta.hostid": {$gt: 5}});
-
- // Test pipeline with a $match stage that uses a $lt query.
- testWithMatch({$match: {"tags.hostid": {$lt: 5}}}, {"meta.hostid": {$lt: 5}});
- }
-}
+ ],
+ {"tags.hostid": 1, time: -1});
+
+verifyTsResultsWithAndWithoutIndex(
+ [
+ {$match: {"tags.hostid": "host_0"}},
+ {$sort: {"tags.hostid": 1, time: -1}},
+ {
+ $group: {
+ _id: "$tags.hostid",
+ usage_user: {$first: "$usage_user"},
+ usage_guest: {$first: "$usage_guest"},
+ usage_idle: {$first: "$usage_idle"}
+ }
+ }
+ ],
+ {"tags.hostid": 1, time: -1},
+ {"meta.hostid": {$eq: "host_0"}});
})();
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
index c1608ee0142..5040fdc2445 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
@@ -58,7 +58,6 @@
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
-#include "mongo/db/query/query_planner_common.h"
#include "mongo/db/query/util/make_data_structure.h"
#include "mongo/db/timeseries/timeseries_constants.h"
#include "mongo/db/timeseries/timeseries_options.h"
@@ -163,30 +162,17 @@ bool checkMetadataSortReorder(
*/
boost::intrusive_ptr<DocumentSourceSort> createMetadataSortForReorder(
const DocumentSourceSort& sort,
- const boost::optional<std::string&> lastpointTimeField = boost::none,
- bool flipSort = false) {
- auto sortPattern = flipSort
- ? SortPattern(
- QueryPlannerCommon::reverseSortObj(
- sort.getSortKeyPattern()
- .serialize(SortPattern::SortKeySerialization::kForPipelineSerialization)
- .toBson()),
- sort.getContext())
- : sort.getSortKeyPattern();
+ const boost::optional<std::string&> lastpointTimeField = boost::none) {
std::vector<SortPattern::SortPatternPart> updatedPattern;
- for (const auto& entry : sortPattern) {
+ for (const auto& entry : sort.getSortKeyPattern()) {
updatedPattern.push_back(entry);
if (lastpointTimeField && entry.fieldPath->fullPath() == lastpointTimeField.get()) {
updatedPattern.back().fieldPath =
- FieldPath((entry.isAscending ? timeseries::kControlMinFieldNamePrefix
- : timeseries::kControlMaxFieldNamePrefix) +
- lastpointTimeField.get());
+ FieldPath(timeseries::kControlMaxFieldNamePrefix + lastpointTimeField.get());
updatedPattern.push_back(SortPattern::SortPatternPart{
entry.isAscending,
- FieldPath((entry.isAscending ? timeseries::kControlMaxFieldNamePrefix
- : timeseries::kControlMinFieldNamePrefix) +
- lastpointTimeField.get()),
+ FieldPath(timeseries::kControlMinFieldNamePrefix + lastpointTimeField.get()),
nullptr});
} else {
auto updated = FieldPath(timeseries::kBucketMetaFieldName);
@@ -206,16 +192,6 @@ boost::intrusive_ptr<DocumentSourceSort> createMetadataSortForReorder(
sort.getContext(), SortPattern{updatedPattern}, 0, maxMemoryUsageBytes);
}
-boost::intrusive_ptr<DocumentSourceGroup> createGroupForReorder(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, FieldPath& fieldPath) {
- auto elem = BSON("bucket" << BSON(AccumulatorFirst::kName << "$_id")).firstElement();
- auto newAccum = AccumulationStatement::parseAccumulationStatement(
- expCtx.get(), elem, expCtx->variablesParseState);
- auto groupByExpr = ExpressionFieldPath::createPathFromString(
- expCtx.get(), fieldPath.fullPath(), expCtx->variablesParseState);
- return DocumentSourceGroup::create(expCtx, groupByExpr, {newAccum});
-}
-
// Optimize the section of the pipeline before the $_internalUnpackBucket stage.
void optimizePrefix(Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
auto prefix = Pipeline::SourceContainer(container->begin(), itr);
@@ -733,11 +709,13 @@ void addStagesToRetrieveEventLevelFields(Pipeline::SourceContainer& sources,
<< BSON(DocumentSourceLimit::kStageName << 1))))
.firstElement(),
expCtx);
+
sources.insert(unpackIt, lookup);
auto unwind = DocumentSourceUnwind::createFromBson(
BSON(DocumentSourceUnwind::kStageName << metrics.fullPathWithPrefix()).firstElement(),
expCtx);
+
sources.insert(unpackIt, unwind);
BSONObjBuilder fields;
@@ -747,17 +725,14 @@ void addStagesToRetrieveEventLevelFields(Pipeline::SourceContainer& sources,
auto&& v = accumulator.expr.argument;
if (auto expr = dynamic_cast<ExpressionFieldPath*>(v.get())) {
auto&& newPath = metrics.concat(expr->getFieldPath().tail());
- // This is necessary to preserve $first, $last null-check semantics for handling
- // nullish fields, e.g. returning missing field paths as null.
- auto ifNullCheck = BSON(
- "$ifNull" << BSONArray(BSON("0" << ("$" + newPath.fullPath()) << "1" << BSONNULL)));
- fields << StringData{accumulator.fieldName} << ifNullCheck;
+ fields << StringData{accumulator.fieldName} << "$" + newPath.fullPath();
}
}
- auto replaceWithBson = BSON(DocumentSourceReplaceRoot::kAliasNameReplaceWith << fields.obj());
- auto replaceWith =
- DocumentSourceReplaceRoot::createFromBson(replaceWithBson.firstElement(), expCtx);
+ auto replaceWith = DocumentSourceReplaceRoot::createFromBson(
+ BSON(DocumentSourceReplaceRoot::kAliasNameReplaceWith << fields.obj()).firstElement(),
+ expCtx);
+
sources.insert(unpackIt, replaceWith);
}
@@ -775,23 +750,27 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta
return false;
}
+ // Attempt to create a new bucket-level $sort.
if (sortStage->hasLimit()) {
// This $sort stage was previously followed by a $limit stage.
return false;
}
auto spec = _bucketUnpacker.bucketSpec();
- auto maybeMetaField = spec.metaField();
+ auto metaField = spec.metaField();
auto timeField = spec.timeField();
- if (!maybeMetaField || haveComputedMetaField()) {
+
+ if (!metaField || haveComputedMetaField()) {
return false;
}
- auto metaField = maybeMetaField.get();
- if (!checkMetadataSortReorder(sortStage->getSortKeyPattern(), metaField, timeField)) {
+ if (!checkMetadataSortReorder(sortStage->getSortKeyPattern(), metaField.get(), timeField)) {
return false;
}
+ auto newSort = createMetadataSortForReorder(*sortStage, timeField);
+
+ // Attempt to create a new bucket-level $group.
auto groupIdFields = groupStage->getIdFields();
if (groupIdFields.size() != 1) {
return false;
@@ -803,7 +782,7 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta
}
const auto fieldPath = groupId->getFieldPath();
- if (fieldPath.getPathLength() <= 1 || fieldPath.tail().getFieldName(0) != metaField) {
+ if (fieldPath.getPathLength() <= 1 || fieldPath.tail().getFieldName(0) != metaField.get()) {
return false;
}
@@ -811,33 +790,25 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta
if (fieldPath.tail().getPathLength() > 1) {
newFieldPath = newFieldPath.concat(fieldPath.tail().tail());
}
+ auto groupByExpr = ExpressionFieldPath::createPathFromString(
+ pExpCtx.get(), newFieldPath.fullPath(), pExpCtx->variablesParseState);
- // Insert bucket-level $sort and $group stages before we unpack any buckets.
- boost::intrusive_ptr<DocumentSourceSort> newSort;
- auto insertBucketLevelSortAndGroup = [&](bool flipSort) {
- newSort = createMetadataSortForReorder(*sortStage, timeField, flipSort);
- auto newGroup = createGroupForReorder(pExpCtx, newFieldPath);
- container->insert(itr, newSort);
- container->insert(itr, newGroup);
- };
-
- auto accumulators = groupStage->getAccumulatedFields();
- auto groupOnlyUsesTargetAccum = [&](AccumulatorDocumentsNeeded targetAccum) {
- return std::all_of(accumulators.begin(), accumulators.end(), [&](auto&& accum) {
- return targetAccum == accum.makeAccumulator()->documentsNeeded();
- });
- };
-
- std::string newTimeField;
- if (groupOnlyUsesTargetAccum(AccumulatorDocumentsNeeded::kFirstDocument)) {
- insertBucketLevelSortAndGroup(false);
- newTimeField = timeseries::kControlMinFieldNamePrefix + timeField;
- } else if (groupOnlyUsesTargetAccum(AccumulatorDocumentsNeeded::kLastDocument)) {
- insertBucketLevelSortAndGroup(true);
- newTimeField = timeseries::kControlMaxFieldNamePrefix + timeField;
- } else {
- return false;
+ for (auto&& accumulator : groupStage->getAccumulatedFields()) {
+ if (AccumulatorDocumentsNeeded::kFirstDocument !=
+ accumulator.makeAccumulator()->documentsNeeded()) {
+ return false;
+ }
}
+ auto newAccum =
+ AccumulationStatement::parseAccumulationStatement(pExpCtx.get(),
+ BSON("bucket" << BSON("$first"
+ << "$_id"))
+ .firstElement(),
+ pExpCtx->variablesParseState);
+ auto newGroup = DocumentSourceGroup::create(pExpCtx, groupByExpr, {newAccum});
+
+ container->insert(itr, newSort);
+ container->insert(itr, newGroup);
// Add $lookup, $unwind and $replaceWith stages.
addStagesToRetrieveEventLevelFields(
@@ -846,7 +817,7 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta
pExpCtx,
groupStage,
timeField,
- isLastpointSortTimeAscending(newSort->getSortKeyPattern(), newTimeField));
+ isLastpointSortTimeAscending(sortStage->getSortKeyPattern(), timeField));
// Remove the $sort, $group and $_internalUnpackBucket stages.
tassert(6165401,
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp
index 642852bed40..323602418ac 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/optimize_lastpoint_test.cpp
@@ -172,120 +172,7 @@ TEST_F(InternalUnpackBucketOptimizeLastpointTest,
ASSERT_BSONOBJ_EQ(serialized[3], fromjson("{$unwind: {path: '$metrics'}}"));
ASSERT_BSONOBJ_EQ(
serialized[4],
- fromjson("{$replaceRoot: {newRoot: {_id: '$_id', b: {$ifNull: ['$metrics.b', {$const: "
- "null}]}, c: {$ifNull: ['$metrics.c', {$const: null}]}}}}"));
-}
-
-TEST_F(InternalUnpackBucketOptimizeLastpointTest,
- LastpointWithMetaSubfieldDescendingTimeDescending) {
- RAIIServerParameterControllerForTest controller("featureFlagLastPointQuery", true);
- auto pipeline = Pipeline::parse(
- makeVector(fromjson("{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
- "'tags', bucketMaxSpanSeconds: 3600}}"),
- fromjson("{$sort: {'tags.a': -1, t: -1}}"),
- fromjson("{$group: {_id: '$tags.a', b: {$first: '$b'}, c: {$first: '$c'}}}")),
- getExpCtx());
- auto& container = pipeline->getSources();
-
- ASSERT_EQ(container.size(), 3U);
-
- auto success = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.front().get())
- ->optimizeLastpoint(container.begin(), &container);
- ASSERT_TRUE(success);
-
- auto serialized = pipeline->serializeToBson();
-
- ASSERT_EQ(serialized.size(), 5u);
- ASSERT_BSONOBJ_EQ(
- serialized[0],
- fromjson("{$sort: {'meta.a': -1, 'control.max.t': -1, 'control.min.t': -1}}"));
- ASSERT_BSONOBJ_EQ(serialized[1],
- fromjson("{$group: {_id: '$meta.a', bucket: {$first: '$_id'}}}"));
- ASSERT_BSONOBJ_EQ(
- serialized[2],
- fromjson(
- "{$lookup: {from: 'pipeline_test', as: 'metrics', localField: 'bucket', foreignField: "
- "'_id', let: {}, pipeline: [{$_internalUnpackBucket: {exclude: [], timeField: 't', "
- "metaField: 'tags', bucketMaxSpanSeconds: 3600}}, {$sort: {t: -1}}, {$limit: 1}]}}"));
- ASSERT_BSONOBJ_EQ(serialized[3], fromjson("{$unwind: {path: '$metrics'}}"));
- ASSERT_BSONOBJ_EQ(
- serialized[4],
- fromjson("{$replaceRoot: {newRoot: {_id: '$_id', b: {$ifNull: ['$metrics.b', {$const: "
- "null}]}, c: {$ifNull: ['$metrics.c', {$const: null}]}}}}"));
-}
-
-TEST_F(InternalUnpackBucketOptimizeLastpointTest, LastpointWithMetaSubfieldAscendingTimeAscending) {
- RAIIServerParameterControllerForTest controller("featureFlagLastPointQuery", true);
- auto pipeline = Pipeline::parse(
- makeVector(fromjson("{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
- "'tags', bucketMaxSpanSeconds: 3600}}"),
- fromjson("{$sort: {'tags.a': 1, t: 1}}"),
- fromjson("{$group: {_id: '$tags.a', b: {$last: '$b'}, c: {$last: '$c'}}}")),
- getExpCtx());
- auto& container = pipeline->getSources();
-
- ASSERT_EQ(container.size(), 3U);
-
- auto success = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.front().get())
- ->optimizeLastpoint(container.begin(), &container);
- ASSERT_TRUE(success);
-
- auto serialized = pipeline->serializeToBson();
-
- ASSERT_EQ(serialized.size(), 5u);
- ASSERT_BSONOBJ_EQ(
- serialized[0],
- fromjson("{$sort: {'meta.a': -1, 'control.max.t': -1, 'control.min.t': -1}}"));
- ASSERT_BSONOBJ_EQ(serialized[1],
- fromjson("{$group: {_id: '$meta.a', bucket: {$first: '$_id'}}}"));
- ASSERT_BSONOBJ_EQ(
- serialized[2],
- fromjson(
- "{$lookup: {from: 'pipeline_test', as: 'metrics', localField: 'bucket', foreignField: "
- "'_id', let: {}, pipeline: [{$_internalUnpackBucket: {exclude: [], timeField: 't', "
- "metaField: 'tags', bucketMaxSpanSeconds: 3600}}, {$sort: {t: -1}}, {$limit: 1}]}}"));
- ASSERT_BSONOBJ_EQ(serialized[3], fromjson("{$unwind: {path: '$metrics'}}"));
- ASSERT_BSONOBJ_EQ(
- serialized[4],
- fromjson("{$replaceRoot: {newRoot: {_id: '$_id', b: {$ifNull: ['$metrics.b', {$const: "
- "null}]}, c: {$ifNull: ['$metrics.c', {$const: null}]}}}}"));
-}
-
-TEST_F(InternalUnpackBucketOptimizeLastpointTest,
- LastpointWithMetaSubfieldDescendingTimeAscending) {
- RAIIServerParameterControllerForTest controller("featureFlagLastPointQuery", true);
- auto pipeline = Pipeline::parse(
- makeVector(fromjson("{$_internalUnpackBucket: {exclude: [], timeField: 't', metaField: "
- "'tags', bucketMaxSpanSeconds: 3600}}"),
- fromjson("{$sort: {'tags.a': -1, t: 1}}"),
- fromjson("{$group: {_id: '$tags.a', b: {$last: '$b'}, c: {$last: '$c'}}}")),
- getExpCtx());
- auto& container = pipeline->getSources();
-
- ASSERT_EQ(container.size(), 3U);
-
- auto success = dynamic_cast<DocumentSourceInternalUnpackBucket*>(container.front().get())
- ->optimizeLastpoint(container.begin(), &container);
- ASSERT_TRUE(success);
-
- auto serialized = pipeline->serializeToBson();
-
- ASSERT_EQ(serialized.size(), 5u);
- ASSERT_BSONOBJ_EQ(serialized[0],
- fromjson("{$sort: {'meta.a': 1, 'control.max.t': -1, 'control.min.t': -1}}"));
- ASSERT_BSONOBJ_EQ(serialized[1],
- fromjson("{$group: {_id: '$meta.a', bucket: {$first: '$_id'}}}"));
- ASSERT_BSONOBJ_EQ(
- serialized[2],
- fromjson(
- "{$lookup: {from: 'pipeline_test', as: 'metrics', localField: 'bucket', foreignField: "
- "'_id', let: {}, pipeline: [{$_internalUnpackBucket: {exclude: [], timeField: 't', "
- "metaField: 'tags', bucketMaxSpanSeconds: 3600}}, {$sort: {t: -1}}, {$limit: 1}]}}"));
- ASSERT_BSONOBJ_EQ(serialized[3], fromjson("{$unwind: {path: '$metrics'}}"));
- ASSERT_BSONOBJ_EQ(
- serialized[4],
- fromjson("{$replaceRoot: {newRoot: {_id: '$_id', b: {$ifNull: ['$metrics.b', {$const: "
- "null}]}, c: {$ifNull: ['$metrics.c', {$const: null}]}}}}"));
+ fromjson("{$replaceRoot: {newRoot: {_id: '$_id', b: '$metrics.b', c: '$metrics.c'}}}"));
}
} // namespace