summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorSvilen Mihaylov <svilen.mihaylov@mongodb.com>2022-08-29 22:07:55 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-08-29 22:40:58 +0000
commitccb0d93033900be8cead29d3cdd414d8a85f2b03 (patch)
tree7ac339ddb4c9b03f00891d5eb7937784dda0fb9f /src/mongo
parentc1adfd31dfb0780155e7da768dbbd3e1cc4b9705 (diff)
downloadmongo-ccb0d93033900be8cead29d3cdd414d8a85f2b03.tar.gz
SERVER-67780 Incorrect $group rewrite for timeseries collection when the accumulator uses meta field
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp109
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/group_reorder_test.cpp16
2 files changed, 73 insertions, 52 deletions
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
index cfa58471fda..1729510a154 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
@@ -639,60 +639,70 @@ DocumentSourceInternalUnpackBucket::rewriteGroupByMinMax(Pipeline::SourceContain
return {};
}
+ if (!_bucketUnpacker.bucketSpec().metaField()) {
+ return {};
+ }
+ const auto& metaField = *_bucketUnpacker.bucketSpec().metaField();
+
const auto& idFields = groupPtr->getIdFields();
- if (idFields.size() != 1 || !_bucketUnpacker.bucketSpec().metaField().has_value()) {
+ if (idFields.size() != 1) {
return {};
}
const auto& exprId = idFields.cbegin()->second;
+ // TODO: SERVER-68811. Allow rewrites if expression is constant.
const auto* exprIdPath = dynamic_cast<const ExpressionFieldPath*>(exprId.get());
if (exprIdPath == nullptr) {
return {};
}
const auto& idPath = exprIdPath->getFieldPath();
- if (idPath.getPathLength() < 2 ||
- idPath.getFieldName(1) != _bucketUnpacker.bucketSpec().metaField().value()) {
+ if (idPath.getPathLength() < 2 || idPath.getFieldName(1) != metaField) {
return {};
}
- bool suitable = true;
std::vector<AccumulationStatement> accumulationStatements;
for (const AccumulationStatement& stmt : groupPtr->getAccumulatedFields()) {
- const auto op = stmt.expr.name;
- const bool isMin = op == "$min";
- const bool isMax = op == "$max";
-
- // Rewrite is valid only for min and max aggregates.
- if (!isMin && !isMax) {
- suitable = false;
- break;
- }
-
const auto* exprArg = stmt.expr.argument.get();
if (const auto* exprArgPath = dynamic_cast<const ExpressionFieldPath*>(exprArg)) {
const auto& path = exprArgPath->getFieldPath();
- if (path.getPathLength() <= 1 ||
- path.getFieldName(1) == _bucketUnpacker.bucketSpec().timeField()) {
+ if (path.getPathLength() <= 1) {
+ return {};
+ }
+
+ const auto& rootFieldName = path.getFieldName(1);
+ if (rootFieldName == _bucketUnpacker.bucketSpec().timeField()) {
// Rewrite not valid for time field. We want to eliminate the bucket
// unpack stage here.
- suitable = false;
- break;
+ return {};
}
- // Update aggregates to reference the control field.
std::ostringstream os;
- if (isMin) {
- os << timeseries::kControlMinFieldNamePrefix;
+ if (rootFieldName == metaField) {
+ // Update aggregates to reference the meta field.
+ os << timeseries::kBucketMetaFieldName;
+
+ for (size_t index = 2; index < path.getPathLength(); index++) {
+ os << "." << path.getFieldName(index);
+ }
} else {
- os << timeseries::kControlMaxFieldNamePrefix;
- }
+ // Update aggregates to reference the control field.
+ const auto op = stmt.expr.name;
+ if (op == "$min") {
+ os << timeseries::kControlMinFieldNamePrefix;
+ } else if (op == "$max") {
+ os << timeseries::kControlMaxFieldNamePrefix;
+ } else {
+ // Rewrite is valid only for min and max aggregates.
+ return {};
+ }
- for (size_t index = 1; index < path.getPathLength(); index++) {
- if (index > 1) {
- os << ".";
+ for (size_t index = 1; index < path.getPathLength(); index++) {
+ if (index > 1) {
+ os << ".";
+ }
+ os << path.getFieldName(index);
}
- os << path.getFieldName(index);
}
const auto& newExpr = ExpressionFieldPath::createPathFromString(
@@ -704,35 +714,30 @@ DocumentSourceInternalUnpackBucket::rewriteGroupByMinMax(Pipeline::SourceContain
}
}
- if (suitable) {
- std::ostringstream os;
- os << timeseries::kBucketMetaFieldName;
- for (size_t index = 2; index < idPath.getPathLength(); index++) {
- os << "." << idPath.getFieldName(index);
- }
- auto exprId1 = ExpressionFieldPath::createPathFromString(
- pExpCtx.get(), os.str(), pExpCtx->variablesParseState);
+ std::ostringstream os;
+ os << timeseries::kBucketMetaFieldName;
+ for (size_t index = 2; index < idPath.getPathLength(); index++) {
+ os << "." << idPath.getFieldName(index);
+ }
+ auto exprId1 = ExpressionFieldPath::createPathFromString(
+ pExpCtx.get(), os.str(), pExpCtx->variablesParseState);
- auto newGroup = DocumentSourceGroup::create(pExpCtx,
- std::move(exprId1),
- std::move(accumulationStatements),
- groupPtr->getMaxMemoryUsageBytes());
+ auto newGroup = DocumentSourceGroup::create(pExpCtx,
+ std::move(exprId1),
+ std::move(accumulationStatements),
+ groupPtr->getMaxMemoryUsageBytes());
- // Erase current stage and following group stage, and replace with updated
- // group.
- container->erase(std::next(itr));
- *itr = std::move(newGroup);
+ // Erase current stage and following group stage, and replace with updated group.
+ container->erase(std::next(itr));
+ *itr = std::move(newGroup);
- if (itr == container->begin()) {
- // Optimize group stage.
- return {true, itr};
- } else {
- // Give chance of the previous stage to optimize against group stage.
- return {true, std::prev(itr)};
- }
+ if (itr == container->begin()) {
+ // Optimize group stage.
+ return {true, itr};
+ } else {
+ // Give chance of the previous stage to optimize against group stage.
+ return {true, std::prev(itr)};
}
-
- return {};
}
bool DocumentSourceInternalUnpackBucket::haveComputedMetaField() const {
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/group_reorder_test.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/group_reorder_test.cpp
index 70b524c7510..6100897fd79 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/group_reorder_test.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/group_reorder_test.cpp
@@ -94,6 +94,22 @@ TEST_F(InternalUnpackBucketGroupReorder, MinMaxGroupOnMetadata) {
ASSERT_BSONOBJ_EQ(optimized, serialized[0]);
}
+TEST_F(InternalUnpackBucketGroupReorder, MinMaxGroupOnMetafield) {
+ auto unpackSpecObj = fromjson(
+ "{$_internalUnpackBucket: { include: ['a', 'b', 'c'], metaField: 'meta1', timeField: 't', "
+ "bucketMaxSpanSeconds: 3600}}");
+ auto groupSpecObj = fromjson("{$group: {_id: '$meta1.a.b', accmin: {$sum: '$meta1.f1'}}}");
+
+ auto pipeline = Pipeline::parse(makeVector(unpackSpecObj, groupSpecObj), getExpCtx());
+ pipeline->optimizePipeline();
+
+ auto serialized = pipeline->serializeToBson();
+ ASSERT_EQ(1, serialized.size());
+
+ auto optimized = fromjson("{$group: {_id: '$meta.a.b', accmin: {$sum: '$meta.f1'}}}");
+ ASSERT_BSONOBJ_EQ(optimized, serialized[0]);
+}
+
TEST_F(InternalUnpackBucketGroupReorder, MinMaxGroupOnMetadataNegative) {
auto unpackSpecObj = fromjson(
"{$_internalUnpackBucket: { include: ['a', 'b', 'c'], timeField: 't', metaField: 'meta', "