diff options
Diffstat (limited to 'src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp | 221 |
1 files changed, 211 insertions, 10 deletions
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp index 6dccf7eaf37..5040fdc2445 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp @@ -49,8 +49,10 @@ #include "mongo/db/pipeline/document_source_add_fields.h" #include "mongo/db/pipeline/document_source_geo_near.h" #include "mongo/db/pipeline/document_source_group.h" +#include "mongo/db/pipeline/document_source_lookup.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_project.h" +#include "mongo/db/pipeline/document_source_replace_root.h" #include "mongo/db/pipeline/document_source_sample.h" #include "mongo/db/pipeline/document_source_single_document_transformation.h" #include "mongo/db/pipeline/document_source_sort.h" @@ -108,11 +110,28 @@ auto getIncludeExcludeProjectAndType(DocumentSource* src) { return std::pair{BSONObj{}, false}; } +auto isLastpointSortTimeAscending(const SortPattern& sortPattern, const std::string& timeField) { + for (auto entry : sortPattern) { + if (entry.fieldPath->fullPath() == timeField) { + return entry.isAscending; + } + } + // A lastpoint query will always have the time field as part of the sort pattern. + MONGO_UNREACHABLE; +} + /** * Checks if a sort stage's pattern following our internal unpack bucket is suitable to be reordered * before us. The sort stage must refer exclusively to the meta field or any subfields. + * + * If this check is being used for lastpoint, the sort stage can also refer to the time field, + * which should be the last field in the pattern. */ -bool checkMetadataSortReorder(const SortPattern& sortPattern, const StringData& metaFieldStr) { +bool checkMetadataSortReorder( + const SortPattern& sortPattern, + const StringData& metaFieldStr, + const boost::optional<std::string&> lastpointTimeField = boost::none) { + auto timeFound = false; for (const auto& sortKey : sortPattern) { if (!sortKey.fieldPath.has_value()) { return false; @@ -121,27 +140,47 @@ bool checkMetadataSortReorder(const SortPattern& sortPattern, const StringData& return false; } if (sortKey.fieldPath->getFieldName(0) != metaFieldStr) { + if (lastpointTimeField && sortKey.fieldPath->fullPath() == lastpointTimeField.get()) { + // If we are checking the sort pattern for the lastpoint case, 'time' is allowed. + timeFound = true; + continue; + } return false; + } else { + if (lastpointTimeField && timeFound) { + // The time field was not the last field in the sort pattern. + return false; + } } } - return true; + // If we are checking for lastpoint, make sure we encountered the time field. + return !lastpointTimeField || timeFound; } /** * Returns a new DocumentSort to reorder before current unpack bucket document. */ boost::intrusive_ptr<DocumentSourceSort> createMetadataSortForReorder( - const DocumentSourceSort& sort) { + const DocumentSourceSort& sort, + const boost::optional<std::string&> lastpointTimeField = boost::none) { std::vector<SortPattern::SortPatternPart> updatedPattern; for (const auto& entry : sort.getSortKeyPattern()) { - // Repoint sort to use metadata field before renaming. - auto updatedFieldPath = FieldPath(timeseries::kBucketMetaFieldName); - if (entry.fieldPath->getPathLength() > 1) { - updatedFieldPath = updatedFieldPath.concat(entry.fieldPath->tail()); - } - updatedPattern.push_back(entry); - updatedPattern.back().fieldPath = updatedFieldPath; + + if (lastpointTimeField && entry.fieldPath->fullPath() == lastpointTimeField.get()) { + updatedPattern.back().fieldPath = + FieldPath(timeseries::kControlMaxFieldNamePrefix + lastpointTimeField.get()); + updatedPattern.push_back(SortPattern::SortPatternPart{ + entry.isAscending, + FieldPath(timeseries::kControlMinFieldNamePrefix + lastpointTimeField.get()), + nullptr}); + } else { + auto updated = FieldPath(timeseries::kBucketMetaFieldName); + if (entry.fieldPath->getPathLength() > 1) { + updated = updated.concat(entry.fieldPath->tail()); + } + updatedPattern.back().fieldPath = updated; + } } boost::optional<uint64_t> maxMemoryUsageBytes; @@ -646,6 +685,159 @@ bool DocumentSourceInternalUnpackBucket::haveComputedMetaField() const { _bucketUnpacker.bucketSpec().metaField().get()); } +void addStagesToRetrieveEventLevelFields(Pipeline::SourceContainer& sources, + const Pipeline::SourceContainer::const_iterator unpackIt, + boost::intrusive_ptr<ExpressionContext> expCtx, + boost::intrusive_ptr<DocumentSourceGroup> group, + const boost::optional<std::string&> lastpointTimeField, + bool timeAscending) { + mongo::stdx::unordered_set<mongo::NamespaceString> nss; + auto&& ns = expCtx->ns; + nss.emplace(ns); + expCtx->addResolvedNamespaces(nss); + + FieldPath metrics("metrics"); + auto lookup = DocumentSourceLookUp::createFromBson( + BSON(DocumentSourceLookUp::kStageName << BSON( + DocumentSourceLookUp::kFromField + << ns.coll() << DocumentSourceLookUp::kLocalField << "bucket" + << DocumentSourceLookUp::kForeignField << "_id" << DocumentSourceLookUp::kAsField + << metrics.fullPath() << DocumentSourceLookUp::kPipelineField + << BSON_ARRAY(unpackIt->get()->serializeToBSONForDebug() + << BSON(DocumentSourceSort::kStageName << BSON( + lastpointTimeField.get() << (timeAscending ? 1 : -1))) + << BSON(DocumentSourceLimit::kStageName << 1)))) + .firstElement(), + expCtx); + + sources.insert(unpackIt, lookup); + + auto unwind = DocumentSourceUnwind::createFromBson( + BSON(DocumentSourceUnwind::kStageName << metrics.fullPathWithPrefix()).firstElement(), + expCtx); + + sources.insert(unpackIt, unwind); + + BSONObjBuilder fields; + fields << "_id" + << "$_id"; + for (auto&& accumulator : group->getAccumulatedFields()) { + auto&& v = accumulator.expr.argument; + if (auto expr = dynamic_cast<ExpressionFieldPath*>(v.get())) { + auto&& newPath = metrics.concat(expr->getFieldPath().tail()); + fields << StringData{accumulator.fieldName} << "$" + newPath.fullPath(); + } + } + + auto replaceWith = DocumentSourceReplaceRoot::createFromBson( + BSON(DocumentSourceReplaceRoot::kAliasNameReplaceWith << fields.obj()).firstElement(), + expCtx); + + sources.insert(unpackIt, replaceWith); +} + +bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) { + // A lastpoint-type aggregation must contain both a $sort and a $group stage, in that order. + if (std::next(itr, 2) == container->end()) { + return false; + } + + auto sortStage = dynamic_cast<DocumentSourceSort*>(std::next(itr)->get()); + auto groupStage = dynamic_cast<DocumentSourceGroup*>(std::next(itr, 2)->get()); + + if (!sortStage || !groupStage) { + return false; + } + + // Attempt to create a new bucket-level $sort. + if (sortStage->hasLimit()) { + // This $sort stage was previously followed by a $limit stage. + return false; + } + + auto spec = _bucketUnpacker.bucketSpec(); + auto metaField = spec.metaField(); + auto timeField = spec.timeField(); + + if (!metaField || haveComputedMetaField()) { + return false; + } + + if (!checkMetadataSortReorder(sortStage->getSortKeyPattern(), metaField.get(), timeField)) { + return false; + } + + auto newSort = createMetadataSortForReorder(*sortStage, timeField); + + // Attempt to create a new bucket-level $group. + auto groupIdFields = groupStage->getIdFields(); + if (groupIdFields.size() != 1) { + return false; + } + + auto groupId = dynamic_cast<ExpressionFieldPath*>(groupIdFields.cbegin()->second.get()); + if (!groupId || groupId->isVariableReference()) { + return false; + } + + const auto fieldPath = groupId->getFieldPath(); + if (fieldPath.getPathLength() <= 1 || fieldPath.tail().getFieldName(0) != metaField.get()) { + return false; + } + + auto newFieldPath = FieldPath(timeseries::kBucketMetaFieldName); + if (fieldPath.tail().getPathLength() > 1) { + newFieldPath = newFieldPath.concat(fieldPath.tail().tail()); + } + auto groupByExpr = ExpressionFieldPath::createPathFromString( + pExpCtx.get(), newFieldPath.fullPath(), pExpCtx->variablesParseState); + + for (auto&& accumulator : groupStage->getAccumulatedFields()) { + if (AccumulatorDocumentsNeeded::kFirstDocument != + accumulator.makeAccumulator()->documentsNeeded()) { + return false; + } + } + auto newAccum = + AccumulationStatement::parseAccumulationStatement(pExpCtx.get(), + BSON("bucket" << BSON("$first" + << "$_id")) + .firstElement(), + pExpCtx->variablesParseState); + auto newGroup = DocumentSourceGroup::create(pExpCtx, groupByExpr, {newAccum}); + + container->insert(itr, newSort); + container->insert(itr, newGroup); + + // Add $lookup, $unwind and $replaceWith stages. + addStagesToRetrieveEventLevelFields( + *container, + itr, + pExpCtx, + groupStage, + timeField, + isLastpointSortTimeAscending(sortStage->getSortKeyPattern(), timeField)); + + // Remove the $sort, $group and $_internalUnpackBucket stages. + tassert(6165401, + "unexpected stage in lastpoint aggregate, expected $_internalUnpackBucket", + itr->get()->getSourceName() == kStageNameInternal); + itr = container->erase(itr); + + tassert(6165402, + "unexpected stage in lastpoint aggregate, expected $sort", + itr->get()->getSourceName() == DocumentSourceSort::kStageName); + itr = container->erase(itr); + + tassert(6165403, + "unexpected stage in lastpoint aggregate, expected $group", + itr->get()->getSourceName() == DocumentSourceGroup::kStageName); + container->erase(itr); + + return true; +} + Pipeline::SourceContainer::iterator DocumentSourceInternalUnpackBucket::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this); @@ -772,6 +964,15 @@ Pipeline::SourceContainer::iterator DocumentSourceInternalUnpackBucket::doOptimi } } + // Attempt to optimize last-point type queries. + if (feature_flags::gfeatureFlagLastPointQuery.isEnabled( + serverGlobalParams.featureCompatibility) && + optimizeLastpoint(itr, container)) { + // If we are able to rewrite the aggregation, give the resulting pipeline a chance to + // perform further optimizations. + return container->begin(); + }; + // Attempt to map predicates on bucketed fields to predicates on the control field. if (auto nextMatch = dynamic_cast<DocumentSourceMatch*>(std::next(itr)->get()); nextMatch && !_triedBucketLevelFieldsPredicatesPushdown) { |