diff options
Diffstat (limited to 'src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp | 105 |
1 files changed, 67 insertions, 38 deletions
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp index 5040fdc2445..c1608ee0142 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp @@ -58,6 +58,7 @@ #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" +#include "mongo/db/query/query_planner_common.h" #include "mongo/db/query/util/make_data_structure.h" #include "mongo/db/timeseries/timeseries_constants.h" #include "mongo/db/timeseries/timeseries_options.h" @@ -162,17 +163,30 @@ bool checkMetadataSortReorder( */ boost::intrusive_ptr<DocumentSourceSort> createMetadataSortForReorder( const DocumentSourceSort& sort, - const boost::optional<std::string&> lastpointTimeField = boost::none) { + const boost::optional<std::string&> lastpointTimeField = boost::none, + bool flipSort = false) { + auto sortPattern = flipSort + ? SortPattern( + QueryPlannerCommon::reverseSortObj( + sort.getSortKeyPattern() + .serialize(SortPattern::SortKeySerialization::kForPipelineSerialization) + .toBson()), + sort.getContext()) + : sort.getSortKeyPattern(); std::vector<SortPattern::SortPatternPart> updatedPattern; - for (const auto& entry : sort.getSortKeyPattern()) { + for (const auto& entry : sortPattern) { updatedPattern.push_back(entry); if (lastpointTimeField && entry.fieldPath->fullPath() == lastpointTimeField.get()) { updatedPattern.back().fieldPath = - FieldPath(timeseries::kControlMaxFieldNamePrefix + lastpointTimeField.get()); + FieldPath((entry.isAscending ? timeseries::kControlMinFieldNamePrefix + : timeseries::kControlMaxFieldNamePrefix) + + lastpointTimeField.get()); updatedPattern.push_back(SortPattern::SortPatternPart{ entry.isAscending, - FieldPath(timeseries::kControlMinFieldNamePrefix + lastpointTimeField.get()), + FieldPath((entry.isAscending ? timeseries::kControlMaxFieldNamePrefix + : timeseries::kControlMinFieldNamePrefix) + + lastpointTimeField.get()), nullptr}); } else { auto updated = FieldPath(timeseries::kBucketMetaFieldName); @@ -192,6 +206,16 @@ boost::intrusive_ptr<DocumentSourceSort> createMetadataSortForReorder( sort.getContext(), SortPattern{updatedPattern}, 0, maxMemoryUsageBytes); } +boost::intrusive_ptr<DocumentSourceGroup> createGroupForReorder( + const boost::intrusive_ptr<ExpressionContext>& expCtx, FieldPath& fieldPath) { + auto elem = BSON("bucket" << BSON(AccumulatorFirst::kName << "$_id")).firstElement(); + auto newAccum = AccumulationStatement::parseAccumulationStatement( + expCtx.get(), elem, expCtx->variablesParseState); + auto groupByExpr = ExpressionFieldPath::createPathFromString( + expCtx.get(), fieldPath.fullPath(), expCtx->variablesParseState); + return DocumentSourceGroup::create(expCtx, groupByExpr, {newAccum}); +} + // Optimize the section of the pipeline before the $_internalUnpackBucket stage. void optimizePrefix(Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { auto prefix = Pipeline::SourceContainer(container->begin(), itr); @@ -709,13 +733,11 @@ void addStagesToRetrieveEventLevelFields(Pipeline::SourceContainer& sources, << BSON(DocumentSourceLimit::kStageName << 1)))) .firstElement(), expCtx); - sources.insert(unpackIt, lookup); auto unwind = DocumentSourceUnwind::createFromBson( BSON(DocumentSourceUnwind::kStageName << metrics.fullPathWithPrefix()).firstElement(), expCtx); - sources.insert(unpackIt, unwind); BSONObjBuilder fields; @@ -725,14 +747,17 @@ void addStagesToRetrieveEventLevelFields(Pipeline::SourceContainer& sources, auto&& v = accumulator.expr.argument; if (auto expr = dynamic_cast<ExpressionFieldPath*>(v.get())) { auto&& newPath = metrics.concat(expr->getFieldPath().tail()); - fields << StringData{accumulator.fieldName} << "$" + newPath.fullPath(); + // This is necessary to preserve $first, $last null-check semantics for handling + // nullish fields, e.g. returning missing field paths as null. + auto ifNullCheck = BSON( + "$ifNull" << BSONArray(BSON("0" << ("$" + newPath.fullPath()) << "1" << BSONNULL))); + fields << StringData{accumulator.fieldName} << ifNullCheck; } } - auto replaceWith = DocumentSourceReplaceRoot::createFromBson( - BSON(DocumentSourceReplaceRoot::kAliasNameReplaceWith << fields.obj()).firstElement(), - expCtx); - + auto replaceWithBson = BSON(DocumentSourceReplaceRoot::kAliasNameReplaceWith << fields.obj()); + auto replaceWith = + DocumentSourceReplaceRoot::createFromBson(replaceWithBson.firstElement(), expCtx); sources.insert(unpackIt, replaceWith); } @@ -750,27 +775,23 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta return false; } - // Attempt to create a new bucket-level $sort. if (sortStage->hasLimit()) { // This $sort stage was previously followed by a $limit stage. return false; } auto spec = _bucketUnpacker.bucketSpec(); - auto metaField = spec.metaField(); + auto maybeMetaField = spec.metaField(); auto timeField = spec.timeField(); - - if (!metaField || haveComputedMetaField()) { + if (!maybeMetaField || haveComputedMetaField()) { return false; } - if (!checkMetadataSortReorder(sortStage->getSortKeyPattern(), metaField.get(), timeField)) { + auto metaField = maybeMetaField.get(); + if (!checkMetadataSortReorder(sortStage->getSortKeyPattern(), metaField, timeField)) { return false; } - auto newSort = createMetadataSortForReorder(*sortStage, timeField); - - // Attempt to create a new bucket-level $group. auto groupIdFields = groupStage->getIdFields(); if (groupIdFields.size() != 1) { return false; @@ -782,7 +803,7 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta } const auto fieldPath = groupId->getFieldPath(); - if (fieldPath.getPathLength() <= 1 || fieldPath.tail().getFieldName(0) != metaField.get()) { + if (fieldPath.getPathLength() <= 1 || fieldPath.tail().getFieldName(0) != metaField) { return false; } @@ -790,25 +811,33 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta if (fieldPath.tail().getPathLength() > 1) { newFieldPath = newFieldPath.concat(fieldPath.tail().tail()); } - auto groupByExpr = ExpressionFieldPath::createPathFromString( - pExpCtx.get(), newFieldPath.fullPath(), pExpCtx->variablesParseState); - for (auto&& accumulator : groupStage->getAccumulatedFields()) { - if (AccumulatorDocumentsNeeded::kFirstDocument != - accumulator.makeAccumulator()->documentsNeeded()) { - return false; - } - } - auto newAccum = - AccumulationStatement::parseAccumulationStatement(pExpCtx.get(), - BSON("bucket" << BSON("$first" - << "$_id")) - .firstElement(), - pExpCtx->variablesParseState); - auto newGroup = DocumentSourceGroup::create(pExpCtx, groupByExpr, {newAccum}); + // Insert bucket-level $sort and $group stages before we unpack any buckets. + boost::intrusive_ptr<DocumentSourceSort> newSort; + auto insertBucketLevelSortAndGroup = [&](bool flipSort) { + newSort = createMetadataSortForReorder(*sortStage, timeField, flipSort); + auto newGroup = createGroupForReorder(pExpCtx, newFieldPath); + container->insert(itr, newSort); + container->insert(itr, newGroup); + }; - container->insert(itr, newSort); - container->insert(itr, newGroup); + auto accumulators = groupStage->getAccumulatedFields(); + auto groupOnlyUsesTargetAccum = [&](AccumulatorDocumentsNeeded targetAccum) { + return std::all_of(accumulators.begin(), accumulators.end(), [&](auto&& accum) { + return targetAccum == accum.makeAccumulator()->documentsNeeded(); + }); + }; + + std::string newTimeField; + if (groupOnlyUsesTargetAccum(AccumulatorDocumentsNeeded::kFirstDocument)) { + insertBucketLevelSortAndGroup(false); + newTimeField = timeseries::kControlMinFieldNamePrefix + timeField; + } else if (groupOnlyUsesTargetAccum(AccumulatorDocumentsNeeded::kLastDocument)) { + insertBucketLevelSortAndGroup(true); + newTimeField = timeseries::kControlMaxFieldNamePrefix + timeField; + } else { + return false; + } // Add $lookup, $unwind and $replaceWith stages. addStagesToRetrieveEventLevelFields( @@ -817,7 +846,7 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta pExpCtx, groupStage, timeField, - isLastpointSortTimeAscending(sortStage->getSortKeyPattern(), timeField)); + isLastpointSortTimeAscending(newSort->getSortKeyPattern(), newTimeField)); // Remove the $sort, $group and $_internalUnpackBucket stages. tassert(6165401, |