diff options
author | Svilen Mihaylov <svilen.mihaylov@mongodb.com> | 2022-08-29 22:07:55 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-08-29 22:40:58 +0000 |
commit | ccb0d93033900be8cead29d3cdd414d8a85f2b03 (patch) | |
tree | 7ac339ddb4c9b03f00891d5eb7937784dda0fb9f /src/mongo | |
parent | c1adfd31dfb0780155e7da768dbbd3e1cc4b9705 (diff) | |
download | mongo-ccb0d93033900be8cead29d3cdd414d8a85f2b03.tar.gz |
SERVER-67780 Incorrect $group rewrite for timeseries collection when the accumulator uses meta field
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp | 109 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/group_reorder_test.cpp | 16 |
2 files changed, 73 insertions, 52 deletions
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp index cfa58471fda..1729510a154 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp @@ -639,60 +639,70 @@ DocumentSourceInternalUnpackBucket::rewriteGroupByMinMax(Pipeline::SourceContain return {}; } + if (!_bucketUnpacker.bucketSpec().metaField()) { + return {}; + } + const auto& metaField = *_bucketUnpacker.bucketSpec().metaField(); + const auto& idFields = groupPtr->getIdFields(); - if (idFields.size() != 1 || !_bucketUnpacker.bucketSpec().metaField().has_value()) { + if (idFields.size() != 1) { return {}; } const auto& exprId = idFields.cbegin()->second; + // TODO: SERVER-68811. Allow rewrites if expression is constant. const auto* exprIdPath = dynamic_cast<const ExpressionFieldPath*>(exprId.get()); if (exprIdPath == nullptr) { return {}; } const auto& idPath = exprIdPath->getFieldPath(); - if (idPath.getPathLength() < 2 || - idPath.getFieldName(1) != _bucketUnpacker.bucketSpec().metaField().value()) { + if (idPath.getPathLength() < 2 || idPath.getFieldName(1) != metaField) { return {}; } - bool suitable = true; std::vector<AccumulationStatement> accumulationStatements; for (const AccumulationStatement& stmt : groupPtr->getAccumulatedFields()) { - const auto op = stmt.expr.name; - const bool isMin = op == "$min"; - const bool isMax = op == "$max"; - - // Rewrite is valid only for min and max aggregates. - if (!isMin && !isMax) { - suitable = false; - break; - } - const auto* exprArg = stmt.expr.argument.get(); if (const auto* exprArgPath = dynamic_cast<const ExpressionFieldPath*>(exprArg)) { const auto& path = exprArgPath->getFieldPath(); - if (path.getPathLength() <= 1 || - path.getFieldName(1) == _bucketUnpacker.bucketSpec().timeField()) { + if (path.getPathLength() <= 1) { + return {}; + } + + const auto& rootFieldName = path.getFieldName(1); + if (rootFieldName == _bucketUnpacker.bucketSpec().timeField()) { // Rewrite not valid for time field. We want to eliminate the bucket // unpack stage here. - suitable = false; - break; + return {}; } - // Update aggregates to reference the control field. std::ostringstream os; - if (isMin) { - os << timeseries::kControlMinFieldNamePrefix; + if (rootFieldName == metaField) { + // Update aggregates to reference the meta field. + os << timeseries::kBucketMetaFieldName; + + for (size_t index = 2; index < path.getPathLength(); index++) { + os << "." << path.getFieldName(index); + } } else { - os << timeseries::kControlMaxFieldNamePrefix; - } + // Update aggregates to reference the control field. + const auto op = stmt.expr.name; + if (op == "$min") { + os << timeseries::kControlMinFieldNamePrefix; + } else if (op == "$max") { + os << timeseries::kControlMaxFieldNamePrefix; + } else { + // Rewrite is valid only for min and max aggregates. + return {}; + } - for (size_t index = 1; index < path.getPathLength(); index++) { - if (index > 1) { - os << "."; + for (size_t index = 1; index < path.getPathLength(); index++) { + if (index > 1) { + os << "."; + } + os << path.getFieldName(index); } - os << path.getFieldName(index); } const auto& newExpr = ExpressionFieldPath::createPathFromString( @@ -704,35 +714,30 @@ DocumentSourceInternalUnpackBucket::rewriteGroupByMinMax(Pipeline::SourceContain } } - if (suitable) { - std::ostringstream os; - os << timeseries::kBucketMetaFieldName; - for (size_t index = 2; index < idPath.getPathLength(); index++) { - os << "." << idPath.getFieldName(index); - } - auto exprId1 = ExpressionFieldPath::createPathFromString( - pExpCtx.get(), os.str(), pExpCtx->variablesParseState); + std::ostringstream os; + os << timeseries::kBucketMetaFieldName; + for (size_t index = 2; index < idPath.getPathLength(); index++) { + os << "." << idPath.getFieldName(index); + } + auto exprId1 = ExpressionFieldPath::createPathFromString( + pExpCtx.get(), os.str(), pExpCtx->variablesParseState); - auto newGroup = DocumentSourceGroup::create(pExpCtx, - std::move(exprId1), - std::move(accumulationStatements), - groupPtr->getMaxMemoryUsageBytes()); + auto newGroup = DocumentSourceGroup::create(pExpCtx, + std::move(exprId1), + std::move(accumulationStatements), + groupPtr->getMaxMemoryUsageBytes()); - // Erase current stage and following group stage, and replace with updated - // group. - container->erase(std::next(itr)); - *itr = std::move(newGroup); + // Erase current stage and following group stage, and replace with updated group. + container->erase(std::next(itr)); + *itr = std::move(newGroup); - if (itr == container->begin()) { - // Optimize group stage. - return {true, itr}; - } else { - // Give chance of the previous stage to optimize against group stage. - return {true, std::prev(itr)}; - } + if (itr == container->begin()) { + // Optimize group stage. + return {true, itr}; + } else { + // Give chance of the previous stage to optimize against group stage. + return {true, std::prev(itr)}; } - - return {}; } bool DocumentSourceInternalUnpackBucket::haveComputedMetaField() const { diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/group_reorder_test.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/group_reorder_test.cpp index 70b524c7510..6100897fd79 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/group_reorder_test.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/group_reorder_test.cpp @@ -94,6 +94,22 @@ TEST_F(InternalUnpackBucketGroupReorder, MinMaxGroupOnMetadata) { ASSERT_BSONOBJ_EQ(optimized, serialized[0]); } +TEST_F(InternalUnpackBucketGroupReorder, MinMaxGroupOnMetafield) { + auto unpackSpecObj = fromjson( + "{$_internalUnpackBucket: { include: ['a', 'b', 'c'], metaField: 'meta1', timeField: 't', " + "bucketMaxSpanSeconds: 3600}}"); + auto groupSpecObj = fromjson("{$group: {_id: '$meta1.a.b', accmin: {$sum: '$meta1.f1'}}}"); + + auto pipeline = Pipeline::parse(makeVector(unpackSpecObj, groupSpecObj), getExpCtx()); + pipeline->optimizePipeline(); + + auto serialized = pipeline->serializeToBson(); + ASSERT_EQ(1, serialized.size()); + + auto optimized = fromjson("{$group: {_id: '$meta.a.b', accmin: {$sum: '$meta.f1'}}}"); + ASSERT_BSONOBJ_EQ(optimized, serialized[0]); +} + TEST_F(InternalUnpackBucketGroupReorder, MinMaxGroupOnMetadataNegative) { auto unpackSpecObj = fromjson( "{$_internalUnpackBucket: { include: ['a', 'b', 'c'], timeField: 't', metaField: 'meta', " |