summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/core/timeseries/timeseries_groupby_reorder.js43
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp109
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/group_reorder_test.cpp16
3 files changed, 116 insertions, 52 deletions
diff --git a/jstests/core/timeseries/timeseries_groupby_reorder.js b/jstests/core/timeseries/timeseries_groupby_reorder.js
new file mode 100644
index 00000000000..29d07f8b4fa
--- /dev/null
+++ b/jstests/core/timeseries/timeseries_groupby_reorder.js
@@ -0,0 +1,43 @@
+/**
+ * Test the behavior of $group on time-series collections.
+ *
+ * @tags: [
+ * directly_against_shardsvrs_incompatible,
+ * does_not_support_stepdowns,
+ * does_not_support_transactions,
+ * requires_fcv_61,
+ * ]
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/fixture_helpers.js");
+load("jstests/core/timeseries/libs/timeseries.js");
+
+const coll = db.timeseries_groupby_reorder;
+coll.drop();
+
+assert.commandWorked(
+ db.createCollection(coll.getName(), {timeseries: {metaField: "meta", timeField: "t"}}));
+
+const t = new Date();
+assert.commandWorked(coll.insert({_id: 0, t: t, b: 1, c: 1}));
+assert.commandWorked(coll.insert({_id: 0, t: t, b: 2, c: 2}));
+assert.commandWorked(coll.insert({_id: 0, t: t, b: 3, c: 3}));
+
+// Test reordering the groupby and internal unpack buckets.
+if (!isMongos(db)) {
+ const res = coll.explain("queryPlanner").aggregate([
+ {$group: {_id: '$meta', accmin: {$min: '$b'}, accmax: {$max: '$c'}}}
+ ]);
+
+ assert.docEq(res.stages[1], {
+ "$group":
+ {_id: "$meta", accmin: {"$min": "$control.min.b"}, accmax: {"$max": "$control.max.c"}}
+ });
+}
+
+const res = coll.aggregate([{$group: {_id: '$meta', accmin: {$min: '$b'}, accmax: {$max: '$c'}}}])
+ .toArray();
+assert.docEq(res, [{"_id": null, "accmin": 1, "accmax": 3}]);
+})();
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
index 16d0ec9517f..79dafdb4618 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
@@ -939,60 +939,70 @@ DocumentSourceInternalUnpackBucket::rewriteGroupByMinMax(Pipeline::SourceContain
return {};
}
+ if (!_bucketUnpacker.bucketSpec().metaField()) {
+ return {};
+ }
+ const auto& metaField = *_bucketUnpacker.bucketSpec().metaField();
+
const auto& idFields = groupPtr->getIdFields();
- if (idFields.size() != 1 || !_bucketUnpacker.bucketSpec().metaField.has_value()) {
+ if (idFields.size() != 1) {
return {};
}
const auto& exprId = idFields.cbegin()->second;
+ // TODO: SERVER-68811. Allow rewrites if expression is constant.
const auto* exprIdPath = dynamic_cast<const ExpressionFieldPath*>(exprId.get());
if (exprIdPath == nullptr) {
return {};
}
const auto& idPath = exprIdPath->getFieldPath();
- if (idPath.getPathLength() < 2 ||
- idPath.getFieldName(1) != _bucketUnpacker.bucketSpec().metaField.get()) {
+ if (idPath.getPathLength() < 2 || idPath.getFieldName(1) != metaField) {
return {};
}
- bool suitable = true;
std::vector<AccumulationStatement> accumulationStatements;
for (const AccumulationStatement& stmt : groupPtr->getAccumulatedFields()) {
- const std::string& op = stmt.makeAccumulator()->getOpName();
- const bool isMin = op == "$min";
- const bool isMax = op == "$max";
-
- // Rewrite is valid only for min and max aggregates.
- if (!isMin && !isMax) {
- suitable = false;
- break;
- }
-
const auto* exprArg = stmt.expr.argument.get();
if (const auto* exprArgPath = dynamic_cast<const ExpressionFieldPath*>(exprArg)) {
const auto& path = exprArgPath->getFieldPath();
- if (path.getPathLength() <= 1 ||
- path.getFieldName(1) == _bucketUnpacker.bucketSpec().timeField) {
+ if (path.getPathLength() <= 1) {
+ return {};
+ }
+
+ const auto& rootFieldName = path.getFieldName(1);
+ if (rootFieldName == _bucketUnpacker.bucketSpec().timeField()) {
// Rewrite not valid for time field. We want to eliminate the bucket
// unpack stage here.
- suitable = false;
- break;
+ return {};
}
- // Update aggregates to reference the control field.
std::ostringstream os;
- if (isMin) {
- os << timeseries::kControlMinFieldNamePrefix;
+ if (rootFieldName == metaField) {
+ // Update aggregates to reference the meta field.
+ os << timeseries::kBucketMetaFieldName;
+
+ for (size_t index = 2; index < path.getPathLength(); index++) {
+ os << "." << path.getFieldName(index);
+ }
} else {
- os << timeseries::kControlMaxFieldNamePrefix;
- }
+ // Update aggregates to reference the control field.
+ const auto op = stmt.expr.name;
+ if (op == "$min") {
+ os << timeseries::kControlMinFieldNamePrefix;
+ } else if (op == "$max") {
+ os << timeseries::kControlMaxFieldNamePrefix;
+ } else {
+ // Rewrite is valid only for min and max aggregates.
+ return {};
+ }
- for (size_t index = 1; index < path.getPathLength(); index++) {
- if (index > 1) {
- os << ".";
+ for (size_t index = 1; index < path.getPathLength(); index++) {
+ if (index > 1) {
+ os << ".";
+ }
+ os << path.getFieldName(index);
}
- os << path.getFieldName(index);
}
const auto& newExpr = ExpressionFieldPath::createPathFromString(
@@ -1004,35 +1014,30 @@ DocumentSourceInternalUnpackBucket::rewriteGroupByMinMax(Pipeline::SourceContain
}
}
- if (suitable) {
- std::ostringstream os;
- os << timeseries::kBucketMetaFieldName;
- for (size_t index = 2; index < idPath.getPathLength(); index++) {
- os << "." << idPath.getFieldName(index);
- }
- auto exprId1 = ExpressionFieldPath::createPathFromString(
- pExpCtx.get(), os.str(), pExpCtx->variablesParseState);
+ std::ostringstream os;
+ os << timeseries::kBucketMetaFieldName;
+ for (size_t index = 2; index < idPath.getPathLength(); index++) {
+ os << "." << idPath.getFieldName(index);
+ }
+ auto exprId1 = ExpressionFieldPath::createPathFromString(
+ pExpCtx.get(), os.str(), pExpCtx->variablesParseState);
- auto newGroup = DocumentSourceGroup::create(pExpCtx,
- std::move(exprId1),
- std::move(accumulationStatements),
- groupPtr->getMaxMemoryUsageBytes());
+ auto newGroup = DocumentSourceGroup::create(pExpCtx,
+ std::move(exprId1),
+ std::move(accumulationStatements),
+ groupPtr->getMaxMemoryUsageBytes());
- // Erase current stage and following group stage, and replace with updated
- // group.
- container->erase(std::next(itr));
- *itr = std::move(newGroup);
+ // Erase current stage and following group stage, and replace with updated group.
+ container->erase(std::next(itr));
+ *itr = std::move(newGroup);
- if (itr == container->begin()) {
- // Optimize group stage.
- return {true, itr};
- } else {
- // Give chance of the previous stage to optimize against group stage.
- return {true, std::prev(itr)};
- }
+ if (itr == container->begin()) {
+ // Optimize group stage.
+ return {true, itr};
+ } else {
+ // Give chance of the previous stage to optimize against group stage.
+ return {true, std::prev(itr)};
}
-
- return {};
}
bool findSequentialDocumentCache(Pipeline::SourceContainer::iterator start,
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/group_reorder_test.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/group_reorder_test.cpp
index 70b524c7510..6100897fd79 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/group_reorder_test.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/group_reorder_test.cpp
@@ -94,6 +94,22 @@ TEST_F(InternalUnpackBucketGroupReorder, MinMaxGroupOnMetadata) {
ASSERT_BSONOBJ_EQ(optimized, serialized[0]);
}
+TEST_F(InternalUnpackBucketGroupReorder, MinMaxGroupOnMetafield) {
+ auto unpackSpecObj = fromjson(
+ "{$_internalUnpackBucket: { include: ['a', 'b', 'c'], metaField: 'meta1', timeField: 't', "
+ "bucketMaxSpanSeconds: 3600}}");
+ auto groupSpecObj = fromjson("{$group: {_id: '$meta1.a.b', accmin: {$sum: '$meta1.f1'}}}");
+
+ auto pipeline = Pipeline::parse(makeVector(unpackSpecObj, groupSpecObj), getExpCtx());
+ pipeline->optimizePipeline();
+
+ auto serialized = pipeline->serializeToBson();
+ ASSERT_EQ(1, serialized.size());
+
+ auto optimized = fromjson("{$group: {_id: '$meta.a.b', accmin: {$sum: '$meta.f1'}}}");
+ ASSERT_BSONOBJ_EQ(optimized, serialized[0]);
+}
+
TEST_F(InternalUnpackBucketGroupReorder, MinMaxGroupOnMetadataNegative) {
auto unpackSpecObj = fromjson(
"{$_internalUnpackBucket: { include: ['a', 'b', 'c'], timeField: 't', metaField: 'meta', "