summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp105
1 files changed, 67 insertions, 38 deletions
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
index 5040fdc2445..c1608ee0142 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
@@ -58,6 +58,7 @@
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
+#include "mongo/db/query/query_planner_common.h"
#include "mongo/db/query/util/make_data_structure.h"
#include "mongo/db/timeseries/timeseries_constants.h"
#include "mongo/db/timeseries/timeseries_options.h"
@@ -162,17 +163,30 @@ bool checkMetadataSortReorder(
*/
boost::intrusive_ptr<DocumentSourceSort> createMetadataSortForReorder(
const DocumentSourceSort& sort,
- const boost::optional<std::string&> lastpointTimeField = boost::none) {
+ const boost::optional<std::string&> lastpointTimeField = boost::none,
+ bool flipSort = false) {
+ auto sortPattern = flipSort
+ ? SortPattern(
+ QueryPlannerCommon::reverseSortObj(
+ sort.getSortKeyPattern()
+ .serialize(SortPattern::SortKeySerialization::kForPipelineSerialization)
+ .toBson()),
+ sort.getContext())
+ : sort.getSortKeyPattern();
std::vector<SortPattern::SortPatternPart> updatedPattern;
- for (const auto& entry : sort.getSortKeyPattern()) {
+ for (const auto& entry : sortPattern) {
updatedPattern.push_back(entry);
if (lastpointTimeField && entry.fieldPath->fullPath() == lastpointTimeField.get()) {
updatedPattern.back().fieldPath =
- FieldPath(timeseries::kControlMaxFieldNamePrefix + lastpointTimeField.get());
+ FieldPath((entry.isAscending ? timeseries::kControlMinFieldNamePrefix
+ : timeseries::kControlMaxFieldNamePrefix) +
+ lastpointTimeField.get());
updatedPattern.push_back(SortPattern::SortPatternPart{
entry.isAscending,
- FieldPath(timeseries::kControlMinFieldNamePrefix + lastpointTimeField.get()),
+ FieldPath((entry.isAscending ? timeseries::kControlMaxFieldNamePrefix
+ : timeseries::kControlMinFieldNamePrefix) +
+ lastpointTimeField.get()),
nullptr});
} else {
auto updated = FieldPath(timeseries::kBucketMetaFieldName);
@@ -192,6 +206,16 @@ boost::intrusive_ptr<DocumentSourceSort> createMetadataSortForReorder(
sort.getContext(), SortPattern{updatedPattern}, 0, maxMemoryUsageBytes);
}
+boost::intrusive_ptr<DocumentSourceGroup> createGroupForReorder(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, FieldPath& fieldPath) {
+ auto elem = BSON("bucket" << BSON(AccumulatorFirst::kName << "$_id")).firstElement();
+ auto newAccum = AccumulationStatement::parseAccumulationStatement(
+ expCtx.get(), elem, expCtx->variablesParseState);
+ auto groupByExpr = ExpressionFieldPath::createPathFromString(
+ expCtx.get(), fieldPath.fullPath(), expCtx->variablesParseState);
+ return DocumentSourceGroup::create(expCtx, groupByExpr, {newAccum});
+}
+
// Optimize the section of the pipeline before the $_internalUnpackBucket stage.
void optimizePrefix(Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
auto prefix = Pipeline::SourceContainer(container->begin(), itr);
@@ -709,13 +733,11 @@ void addStagesToRetrieveEventLevelFields(Pipeline::SourceContainer& sources,
<< BSON(DocumentSourceLimit::kStageName << 1))))
.firstElement(),
expCtx);
-
sources.insert(unpackIt, lookup);
auto unwind = DocumentSourceUnwind::createFromBson(
BSON(DocumentSourceUnwind::kStageName << metrics.fullPathWithPrefix()).firstElement(),
expCtx);
-
sources.insert(unpackIt, unwind);
BSONObjBuilder fields;
@@ -725,14 +747,17 @@ void addStagesToRetrieveEventLevelFields(Pipeline::SourceContainer& sources,
auto&& v = accumulator.expr.argument;
if (auto expr = dynamic_cast<ExpressionFieldPath*>(v.get())) {
auto&& newPath = metrics.concat(expr->getFieldPath().tail());
- fields << StringData{accumulator.fieldName} << "$" + newPath.fullPath();
+ // This is necessary to preserve $first, $last null-check semantics for handling
+ // nullish fields, e.g. returning missing field paths as null.
+ auto ifNullCheck = BSON(
+ "$ifNull" << BSONArray(BSON("0" << ("$" + newPath.fullPath()) << "1" << BSONNULL)));
+ fields << StringData{accumulator.fieldName} << ifNullCheck;
}
}
- auto replaceWith = DocumentSourceReplaceRoot::createFromBson(
- BSON(DocumentSourceReplaceRoot::kAliasNameReplaceWith << fields.obj()).firstElement(),
- expCtx);
-
+ auto replaceWithBson = BSON(DocumentSourceReplaceRoot::kAliasNameReplaceWith << fields.obj());
+ auto replaceWith =
+ DocumentSourceReplaceRoot::createFromBson(replaceWithBson.firstElement(), expCtx);
sources.insert(unpackIt, replaceWith);
}
@@ -750,27 +775,23 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta
return false;
}
- // Attempt to create a new bucket-level $sort.
if (sortStage->hasLimit()) {
// This $sort stage was previously followed by a $limit stage.
return false;
}
auto spec = _bucketUnpacker.bucketSpec();
- auto metaField = spec.metaField();
+ auto maybeMetaField = spec.metaField();
auto timeField = spec.timeField();
-
- if (!metaField || haveComputedMetaField()) {
+ if (!maybeMetaField || haveComputedMetaField()) {
return false;
}
- if (!checkMetadataSortReorder(sortStage->getSortKeyPattern(), metaField.get(), timeField)) {
+ auto metaField = maybeMetaField.get();
+ if (!checkMetadataSortReorder(sortStage->getSortKeyPattern(), metaField, timeField)) {
return false;
}
- auto newSort = createMetadataSortForReorder(*sortStage, timeField);
-
- // Attempt to create a new bucket-level $group.
auto groupIdFields = groupStage->getIdFields();
if (groupIdFields.size() != 1) {
return false;
@@ -782,7 +803,7 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta
}
const auto fieldPath = groupId->getFieldPath();
- if (fieldPath.getPathLength() <= 1 || fieldPath.tail().getFieldName(0) != metaField.get()) {
+ if (fieldPath.getPathLength() <= 1 || fieldPath.tail().getFieldName(0) != metaField) {
return false;
}
@@ -790,25 +811,33 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta
if (fieldPath.tail().getPathLength() > 1) {
newFieldPath = newFieldPath.concat(fieldPath.tail().tail());
}
- auto groupByExpr = ExpressionFieldPath::createPathFromString(
- pExpCtx.get(), newFieldPath.fullPath(), pExpCtx->variablesParseState);
- for (auto&& accumulator : groupStage->getAccumulatedFields()) {
- if (AccumulatorDocumentsNeeded::kFirstDocument !=
- accumulator.makeAccumulator()->documentsNeeded()) {
- return false;
- }
- }
- auto newAccum =
- AccumulationStatement::parseAccumulationStatement(pExpCtx.get(),
- BSON("bucket" << BSON("$first"
- << "$_id"))
- .firstElement(),
- pExpCtx->variablesParseState);
- auto newGroup = DocumentSourceGroup::create(pExpCtx, groupByExpr, {newAccum});
+ // Insert bucket-level $sort and $group stages before we unpack any buckets.
+ boost::intrusive_ptr<DocumentSourceSort> newSort;
+ auto insertBucketLevelSortAndGroup = [&](bool flipSort) {
+ newSort = createMetadataSortForReorder(*sortStage, timeField, flipSort);
+ auto newGroup = createGroupForReorder(pExpCtx, newFieldPath);
+ container->insert(itr, newSort);
+ container->insert(itr, newGroup);
+ };
- container->insert(itr, newSort);
- container->insert(itr, newGroup);
+ auto accumulators = groupStage->getAccumulatedFields();
+ auto groupOnlyUsesTargetAccum = [&](AccumulatorDocumentsNeeded targetAccum) {
+ return std::all_of(accumulators.begin(), accumulators.end(), [&](auto&& accum) {
+ return targetAccum == accum.makeAccumulator()->documentsNeeded();
+ });
+ };
+
+ std::string newTimeField;
+ if (groupOnlyUsesTargetAccum(AccumulatorDocumentsNeeded::kFirstDocument)) {
+ insertBucketLevelSortAndGroup(false);
+ newTimeField = timeseries::kControlMinFieldNamePrefix + timeField;
+ } else if (groupOnlyUsesTargetAccum(AccumulatorDocumentsNeeded::kLastDocument)) {
+ insertBucketLevelSortAndGroup(true);
+ newTimeField = timeseries::kControlMaxFieldNamePrefix + timeField;
+ } else {
+ return false;
+ }
// Add $lookup, $unwind and $replaceWith stages.
addStagesToRetrieveEventLevelFields(
@@ -817,7 +846,7 @@ bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceConta
pExpCtx,
groupStage,
timeField,
- isLastpointSortTimeAscending(sortStage->getSortKeyPattern(), timeField));
+ isLastpointSortTimeAscending(newSort->getSortKeyPattern(), newTimeField));
// Remove the $sort, $group and $_internalUnpackBucket stages.
tassert(6165401,