diff options
3 files changed, 116 insertions, 52 deletions
diff --git a/jstests/core/timeseries/timeseries_groupby_reorder.js b/jstests/core/timeseries/timeseries_groupby_reorder.js new file mode 100644 index 00000000000..29d07f8b4fa --- /dev/null +++ b/jstests/core/timeseries/timeseries_groupby_reorder.js @@ -0,0 +1,43 @@ +/** + * Test the behavior of $group on time-series collections. + * + * @tags: [ + * directly_against_shardsvrs_incompatible, + * does_not_support_stepdowns, + * does_not_support_transactions, + * requires_fcv_61, + * ] + */ +(function() { +"use strict"; + +load("jstests/libs/fixture_helpers.js"); +load("jstests/core/timeseries/libs/timeseries.js"); + +const coll = db.timeseries_groupby_reorder; +coll.drop(); + +assert.commandWorked( + db.createCollection(coll.getName(), {timeseries: {metaField: "meta", timeField: "t"}})); + +const t = new Date(); +assert.commandWorked(coll.insert({_id: 0, t: t, b: 1, c: 1})); +assert.commandWorked(coll.insert({_id: 0, t: t, b: 2, c: 2})); +assert.commandWorked(coll.insert({_id: 0, t: t, b: 3, c: 3})); + +// Test reordering the groupby and internal unpack buckets. +if (!isMongos(db)) { + const res = coll.explain("queryPlanner").aggregate([ + {$group: {_id: '$meta', accmin: {$min: '$b'}, accmax: {$max: '$c'}}} + ]); + + assert.docEq(res.stages[1], { + "$group": + {_id: "$meta", accmin: {"$min": "$control.min.b"}, accmax: {"$max": "$control.max.c"}} + }); +} + +const res = coll.aggregate([{$group: {_id: '$meta', accmin: {$min: '$b'}, accmax: {$max: '$c'}}}]) + .toArray(); +assert.docEq(res, [{"_id": null, "accmin": 1, "accmax": 3}]); +})(); diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp index 16d0ec9517f..79dafdb4618 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp @@ -939,60 +939,70 @@ DocumentSourceInternalUnpackBucket::rewriteGroupByMinMax(Pipeline::SourceContain return {}; } + if (!_bucketUnpacker.bucketSpec().metaField()) { + return {}; + } + const auto& metaField = *_bucketUnpacker.bucketSpec().metaField(); + const auto& idFields = groupPtr->getIdFields(); - if (idFields.size() != 1 || !_bucketUnpacker.bucketSpec().metaField.has_value()) { + if (idFields.size() != 1) { return {}; } const auto& exprId = idFields.cbegin()->second; + // TODO: SERVER-68811. Allow rewrites if expression is constant. const auto* exprIdPath = dynamic_cast<const ExpressionFieldPath*>(exprId.get()); if (exprIdPath == nullptr) { return {}; } const auto& idPath = exprIdPath->getFieldPath(); - if (idPath.getPathLength() < 2 || - idPath.getFieldName(1) != _bucketUnpacker.bucketSpec().metaField.get()) { + if (idPath.getPathLength() < 2 || idPath.getFieldName(1) != metaField) { return {}; } - bool suitable = true; std::vector<AccumulationStatement> accumulationStatements; for (const AccumulationStatement& stmt : groupPtr->getAccumulatedFields()) { - const std::string& op = stmt.makeAccumulator()->getOpName(); - const bool isMin = op == "$min"; - const bool isMax = op == "$max"; - - // Rewrite is valid only for min and max aggregates. - if (!isMin && !isMax) { - suitable = false; - break; - } - const auto* exprArg = stmt.expr.argument.get(); if (const auto* exprArgPath = dynamic_cast<const ExpressionFieldPath*>(exprArg)) { const auto& path = exprArgPath->getFieldPath(); - if (path.getPathLength() <= 1 || - path.getFieldName(1) == _bucketUnpacker.bucketSpec().timeField) { + if (path.getPathLength() <= 1) { + return {}; + } + + const auto& rootFieldName = path.getFieldName(1); + if (rootFieldName == _bucketUnpacker.bucketSpec().timeField()) { // Rewrite not valid for time field. We want to eliminate the bucket // unpack stage here. - suitable = false; - break; + return {}; } - // Update aggregates to reference the control field. std::ostringstream os; - if (isMin) { - os << timeseries::kControlMinFieldNamePrefix; + if (rootFieldName == metaField) { + // Update aggregates to reference the meta field. + os << timeseries::kBucketMetaFieldName; + + for (size_t index = 2; index < path.getPathLength(); index++) { + os << "." << path.getFieldName(index); + } } else { - os << timeseries::kControlMaxFieldNamePrefix; - } + // Update aggregates to reference the control field. + const auto op = stmt.expr.name; + if (op == "$min") { + os << timeseries::kControlMinFieldNamePrefix; + } else if (op == "$max") { + os << timeseries::kControlMaxFieldNamePrefix; + } else { + // Rewrite is valid only for min and max aggregates. + return {}; + } - for (size_t index = 1; index < path.getPathLength(); index++) { - if (index > 1) { - os << "."; + for (size_t index = 1; index < path.getPathLength(); index++) { + if (index > 1) { + os << "."; + } + os << path.getFieldName(index); } - os << path.getFieldName(index); } const auto& newExpr = ExpressionFieldPath::createPathFromString( @@ -1004,35 +1014,30 @@ DocumentSourceInternalUnpackBucket::rewriteGroupByMinMax(Pipeline::SourceContain } } - if (suitable) { - std::ostringstream os; - os << timeseries::kBucketMetaFieldName; - for (size_t index = 2; index < idPath.getPathLength(); index++) { - os << "." << idPath.getFieldName(index); - } - auto exprId1 = ExpressionFieldPath::createPathFromString( - pExpCtx.get(), os.str(), pExpCtx->variablesParseState); + std::ostringstream os; + os << timeseries::kBucketMetaFieldName; + for (size_t index = 2; index < idPath.getPathLength(); index++) { + os << "." << idPath.getFieldName(index); + } + auto exprId1 = ExpressionFieldPath::createPathFromString( + pExpCtx.get(), os.str(), pExpCtx->variablesParseState); - auto newGroup = DocumentSourceGroup::create(pExpCtx, - std::move(exprId1), - std::move(accumulationStatements), - groupPtr->getMaxMemoryUsageBytes()); + auto newGroup = DocumentSourceGroup::create(pExpCtx, + std::move(exprId1), + std::move(accumulationStatements), + groupPtr->getMaxMemoryUsageBytes()); - // Erase current stage and following group stage, and replace with updated - // group. - container->erase(std::next(itr)); - *itr = std::move(newGroup); + // Erase current stage and following group stage, and replace with updated group. + container->erase(std::next(itr)); + *itr = std::move(newGroup); - if (itr == container->begin()) { - // Optimize group stage. - return {true, itr}; - } else { - // Give chance of the previous stage to optimize against group stage. - return {true, std::prev(itr)}; - } + if (itr == container->begin()) { + // Optimize group stage. + return {true, itr}; + } else { + // Give chance of the previous stage to optimize against group stage. + return {true, std::prev(itr)}; } - - return {}; } bool findSequentialDocumentCache(Pipeline::SourceContainer::iterator start, diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/group_reorder_test.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/group_reorder_test.cpp index 70b524c7510..6100897fd79 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/group_reorder_test.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/group_reorder_test.cpp @@ -94,6 +94,22 @@ TEST_F(InternalUnpackBucketGroupReorder, MinMaxGroupOnMetadata) { ASSERT_BSONOBJ_EQ(optimized, serialized[0]); } +TEST_F(InternalUnpackBucketGroupReorder, MinMaxGroupOnMetafield) { + auto unpackSpecObj = fromjson( + "{$_internalUnpackBucket: { include: ['a', 'b', 'c'], metaField: 'meta1', timeField: 't', " + "bucketMaxSpanSeconds: 3600}}"); + auto groupSpecObj = fromjson("{$group: {_id: '$meta1.a.b', accmin: {$sum: '$meta1.f1'}}}"); + + auto pipeline = Pipeline::parse(makeVector(unpackSpecObj, groupSpecObj), getExpCtx()); + pipeline->optimizePipeline(); + + auto serialized = pipeline->serializeToBson(); + ASSERT_EQ(1, serialized.size()); + + auto optimized = fromjson("{$group: {_id: '$meta.a.b', accmin: {$sum: '$meta.f1'}}}"); + ASSERT_BSONOBJ_EQ(optimized, serialized[0]); +} + TEST_F(InternalUnpackBucketGroupReorder, MinMaxGroupOnMetadataNegative) { auto unpackSpecObj = fromjson( "{$_internalUnpackBucket: { include: ['a', 'b', 'c'], timeField: 't', metaField: 'meta', " |