summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp221
1 files changed, 211 insertions, 10 deletions
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
index 6dccf7eaf37..5040fdc2445 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
@@ -49,8 +49,10 @@
#include "mongo/db/pipeline/document_source_add_fields.h"
#include "mongo/db/pipeline/document_source_geo_near.h"
#include "mongo/db/pipeline/document_source_group.h"
+#include "mongo/db/pipeline/document_source_lookup.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_project.h"
+#include "mongo/db/pipeline/document_source_replace_root.h"
#include "mongo/db/pipeline/document_source_sample.h"
#include "mongo/db/pipeline/document_source_single_document_transformation.h"
#include "mongo/db/pipeline/document_source_sort.h"
@@ -108,11 +110,28 @@ auto getIncludeExcludeProjectAndType(DocumentSource* src) {
return std::pair{BSONObj{}, false};
}
+auto isLastpointSortTimeAscending(const SortPattern& sortPattern, const std::string& timeField) {
+ for (auto entry : sortPattern) {
+ if (entry.fieldPath->fullPath() == timeField) {
+ return entry.isAscending;
+ }
+ }
+ // A lastpoint query will always have the time field as part of the sort pattern.
+ MONGO_UNREACHABLE;
+}
+
/**
* Checks if a sort stage's pattern following our internal unpack bucket is suitable to be reordered
* before us. The sort stage must refer exclusively to the meta field or any subfields.
+ *
+ * If this check is being used for lastpoint, the sort stage can also refer to the time field,
+ * which should be the last field in the pattern.
*/
-bool checkMetadataSortReorder(const SortPattern& sortPattern, const StringData& metaFieldStr) {
+bool checkMetadataSortReorder(
+ const SortPattern& sortPattern,
+ const StringData& metaFieldStr,
+ const boost::optional<std::string&> lastpointTimeField = boost::none) {
+ auto timeFound = false;
for (const auto& sortKey : sortPattern) {
if (!sortKey.fieldPath.has_value()) {
return false;
@@ -121,27 +140,47 @@ bool checkMetadataSortReorder(const SortPattern& sortPattern, const StringData&
return false;
}
if (sortKey.fieldPath->getFieldName(0) != metaFieldStr) {
+ if (lastpointTimeField && sortKey.fieldPath->fullPath() == lastpointTimeField.get()) {
+ // If we are checking the sort pattern for the lastpoint case, 'time' is allowed.
+ timeFound = true;
+ continue;
+ }
return false;
+ } else {
+ if (lastpointTimeField && timeFound) {
+ // The time field was not the last field in the sort pattern.
+ return false;
+ }
}
}
- return true;
+ // If we are checking for lastpoint, make sure we encountered the time field.
+ return !lastpointTimeField || timeFound;
}
/**
* Returns a new DocumentSort to reorder before current unpack bucket document.
*/
boost::intrusive_ptr<DocumentSourceSort> createMetadataSortForReorder(
- const DocumentSourceSort& sort) {
+ const DocumentSourceSort& sort,
+ const boost::optional<std::string&> lastpointTimeField = boost::none) {
std::vector<SortPattern::SortPatternPart> updatedPattern;
for (const auto& entry : sort.getSortKeyPattern()) {
- // Repoint sort to use metadata field before renaming.
- auto updatedFieldPath = FieldPath(timeseries::kBucketMetaFieldName);
- if (entry.fieldPath->getPathLength() > 1) {
- updatedFieldPath = updatedFieldPath.concat(entry.fieldPath->tail());
- }
-
updatedPattern.push_back(entry);
- updatedPattern.back().fieldPath = updatedFieldPath;
+
+ if (lastpointTimeField && entry.fieldPath->fullPath() == lastpointTimeField.get()) {
+ updatedPattern.back().fieldPath =
+ FieldPath(timeseries::kControlMaxFieldNamePrefix + lastpointTimeField.get());
+ updatedPattern.push_back(SortPattern::SortPatternPart{
+ entry.isAscending,
+ FieldPath(timeseries::kControlMinFieldNamePrefix + lastpointTimeField.get()),
+ nullptr});
+ } else {
+ auto updated = FieldPath(timeseries::kBucketMetaFieldName);
+ if (entry.fieldPath->getPathLength() > 1) {
+ updated = updated.concat(entry.fieldPath->tail());
+ }
+ updatedPattern.back().fieldPath = updated;
+ }
}
boost::optional<uint64_t> maxMemoryUsageBytes;
@@ -646,6 +685,159 @@ bool DocumentSourceInternalUnpackBucket::haveComputedMetaField() const {
_bucketUnpacker.bucketSpec().metaField().get());
}
+void addStagesToRetrieveEventLevelFields(Pipeline::SourceContainer& sources,
+ const Pipeline::SourceContainer::const_iterator unpackIt,
+ boost::intrusive_ptr<ExpressionContext> expCtx,
+ boost::intrusive_ptr<DocumentSourceGroup> group,
+ const boost::optional<std::string&> lastpointTimeField,
+ bool timeAscending) {
+ mongo::stdx::unordered_set<mongo::NamespaceString> nss;
+ auto&& ns = expCtx->ns;
+ nss.emplace(ns);
+ expCtx->addResolvedNamespaces(nss);
+
+ FieldPath metrics("metrics");
+ auto lookup = DocumentSourceLookUp::createFromBson(
+ BSON(DocumentSourceLookUp::kStageName << BSON(
+ DocumentSourceLookUp::kFromField
+ << ns.coll() << DocumentSourceLookUp::kLocalField << "bucket"
+ << DocumentSourceLookUp::kForeignField << "_id" << DocumentSourceLookUp::kAsField
+ << metrics.fullPath() << DocumentSourceLookUp::kPipelineField
+ << BSON_ARRAY(unpackIt->get()->serializeToBSONForDebug()
+ << BSON(DocumentSourceSort::kStageName << BSON(
+ lastpointTimeField.get() << (timeAscending ? 1 : -1)))
+ << BSON(DocumentSourceLimit::kStageName << 1))))
+ .firstElement(),
+ expCtx);
+
+ sources.insert(unpackIt, lookup);
+
+ auto unwind = DocumentSourceUnwind::createFromBson(
+ BSON(DocumentSourceUnwind::kStageName << metrics.fullPathWithPrefix()).firstElement(),
+ expCtx);
+
+ sources.insert(unpackIt, unwind);
+
+ BSONObjBuilder fields;
+ fields << "_id"
+ << "$_id";
+ for (auto&& accumulator : group->getAccumulatedFields()) {
+ auto&& v = accumulator.expr.argument;
+ if (auto expr = dynamic_cast<ExpressionFieldPath*>(v.get())) {
+ auto&& newPath = metrics.concat(expr->getFieldPath().tail());
+ fields << StringData{accumulator.fieldName} << "$" + newPath.fullPath();
+ }
+ }
+
+ auto replaceWith = DocumentSourceReplaceRoot::createFromBson(
+ BSON(DocumentSourceReplaceRoot::kAliasNameReplaceWith << fields.obj()).firstElement(),
+ expCtx);
+
+ sources.insert(unpackIt, replaceWith);
+}
+
+bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) {
+ // A lastpoint-type aggregation must contain both a $sort and a $group stage, in that order.
+ if (std::next(itr, 2) == container->end()) {
+ return false;
+ }
+
+ auto sortStage = dynamic_cast<DocumentSourceSort*>(std::next(itr)->get());
+ auto groupStage = dynamic_cast<DocumentSourceGroup*>(std::next(itr, 2)->get());
+
+ if (!sortStage || !groupStage) {
+ return false;
+ }
+
+ // Attempt to create a new bucket-level $sort.
+ if (sortStage->hasLimit()) {
+ // This $sort stage was previously followed by a $limit stage.
+ return false;
+ }
+
+ auto spec = _bucketUnpacker.bucketSpec();
+ auto metaField = spec.metaField();
+ auto timeField = spec.timeField();
+
+ if (!metaField || haveComputedMetaField()) {
+ return false;
+ }
+
+ if (!checkMetadataSortReorder(sortStage->getSortKeyPattern(), metaField.get(), timeField)) {
+ return false;
+ }
+
+ auto newSort = createMetadataSortForReorder(*sortStage, timeField);
+
+ // Attempt to create a new bucket-level $group.
+ auto groupIdFields = groupStage->getIdFields();
+ if (groupIdFields.size() != 1) {
+ return false;
+ }
+
+ auto groupId = dynamic_cast<ExpressionFieldPath*>(groupIdFields.cbegin()->second.get());
+ if (!groupId || groupId->isVariableReference()) {
+ return false;
+ }
+
+ const auto fieldPath = groupId->getFieldPath();
+ if (fieldPath.getPathLength() <= 1 || fieldPath.tail().getFieldName(0) != metaField.get()) {
+ return false;
+ }
+
+ auto newFieldPath = FieldPath(timeseries::kBucketMetaFieldName);
+ if (fieldPath.tail().getPathLength() > 1) {
+ newFieldPath = newFieldPath.concat(fieldPath.tail().tail());
+ }
+ auto groupByExpr = ExpressionFieldPath::createPathFromString(
+ pExpCtx.get(), newFieldPath.fullPath(), pExpCtx->variablesParseState);
+
+ for (auto&& accumulator : groupStage->getAccumulatedFields()) {
+ if (AccumulatorDocumentsNeeded::kFirstDocument !=
+ accumulator.makeAccumulator()->documentsNeeded()) {
+ return false;
+ }
+ }
+ auto newAccum =
+ AccumulationStatement::parseAccumulationStatement(pExpCtx.get(),
+ BSON("bucket" << BSON("$first"
+ << "$_id"))
+ .firstElement(),
+ pExpCtx->variablesParseState);
+ auto newGroup = DocumentSourceGroup::create(pExpCtx, groupByExpr, {newAccum});
+
+ container->insert(itr, newSort);
+ container->insert(itr, newGroup);
+
+ // Add $lookup, $unwind and $replaceWith stages.
+ addStagesToRetrieveEventLevelFields(
+ *container,
+ itr,
+ pExpCtx,
+ groupStage,
+ timeField,
+ isLastpointSortTimeAscending(sortStage->getSortKeyPattern(), timeField));
+
+ // Remove the $sort, $group and $_internalUnpackBucket stages.
+ tassert(6165401,
+ "unexpected stage in lastpoint aggregate, expected $_internalUnpackBucket",
+ itr->get()->getSourceName() == kStageNameInternal);
+ itr = container->erase(itr);
+
+ tassert(6165402,
+ "unexpected stage in lastpoint aggregate, expected $sort",
+ itr->get()->getSourceName() == DocumentSourceSort::kStageName);
+ itr = container->erase(itr);
+
+ tassert(6165403,
+ "unexpected stage in lastpoint aggregate, expected $group",
+ itr->get()->getSourceName() == DocumentSourceGroup::kStageName);
+ container->erase(itr);
+
+ return true;
+}
+
Pipeline::SourceContainer::iterator DocumentSourceInternalUnpackBucket::doOptimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
invariant(*itr == this);
@@ -772,6 +964,15 @@ Pipeline::SourceContainer::iterator DocumentSourceInternalUnpackBucket::doOptimi
}
}
+ // Attempt to optimize last-point type queries.
+ if (feature_flags::gfeatureFlagLastPointQuery.isEnabled(
+ serverGlobalParams.featureCompatibility) &&
+ optimizeLastpoint(itr, container)) {
+ // If we are able to rewrite the aggregation, give the resulting pipeline a chance to
+ // perform further optimizations.
+ return container->begin();
+ };
+
// Attempt to map predicates on bucketed fields to predicates on the control field.
if (auto nextMatch = dynamic_cast<DocumentSourceMatch*>(std::next(itr)->get());
nextMatch && !_triedBucketLevelFieldsPredicatesPushdown) {