/** * Copyright (C) 2020-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/db/pipeline/document_source_internal_unpack_bucket.h" #include #include #include #include #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsontypes.h" #include "mongo/db/exec/document_value/document.h" #include "mongo/db/matcher/expression.h" #include "mongo/db/matcher/expression_algo.h" #include "mongo/db/matcher/expression_expr.h" #include "mongo/db/matcher/expression_geo.h" #include "mongo/db/matcher/expression_internal_bucket_geo_within.h" #include "mongo/db/matcher/expression_internal_expr_comparison.h" #include "mongo/db/matcher/match_expression_dependencies.h" #include "mongo/db/pipeline/accumulator_multi.h" #include "mongo/db/pipeline/document_source_add_fields.h" #include "mongo/db/pipeline/document_source_geo_near.h" #include "mongo/db/pipeline/document_source_group.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_project.h" #include "mongo/db/pipeline/document_source_sample.h" #include "mongo/db/pipeline/document_source_sequential_document_cache.h" #include "mongo/db/pipeline/document_source_single_document_transformation.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/document_source_streaming_group.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" #include "mongo/db/query/query_planner_common.h" #include "mongo/db/query/util/make_data_structure.h" #include "mongo/db/timeseries/timeseries_constants.h" #include "mongo/db/timeseries/timeseries_options.h" #include "mongo/logv2/log.h" #include "mongo/util/duration.h" #include "mongo/util/time_support.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery namespace mongo { /* * $_internalUnpackBucket is an internal stage for materializing time-series measurements from * time-series collections. It should never be used anywhere outside the MongoDB server. */ REGISTER_DOCUMENT_SOURCE(_internalUnpackBucket, LiteParsedDocumentSourceDefault::parse, DocumentSourceInternalUnpackBucket::createFromBsonInternal, AllowedWithApiStrict::kAlways); /* * $_unpackBucket is an alias of $_internalUnpackBucket. It only exposes the "timeField" and the * "metaField" parameters and is only used for special known use cases by other MongoDB products * rather than user applications. */ REGISTER_DOCUMENT_SOURCE(_unpackBucket, LiteParsedDocumentSourceDefault::parse, DocumentSourceInternalUnpackBucket::createFromBsonExternal, AllowedWithApiStrict::kAlways); namespace { /** * A projection can be internalized if every field corresponds to a boolean value. Note that this * correctly rejects dotted fieldnames, which are mapped to objects internally. */ bool canInternalizeProjectObj(const BSONObj& projObj) { return std::all_of(projObj.begin(), projObj.end(), [](auto&& e) { return e.isBoolean(); }); } /** * If 'src' represents an inclusion or exclusion $project, return a BSONObj representing it and a * bool indicating its type (true for inclusion, false for exclusion). Else return an empty BSONObj. */ auto getIncludeExcludeProjectAndType(DocumentSource* src) { if (const auto proj = dynamic_cast(src); proj && (proj->getType() == TransformerInterface::TransformerType::kInclusionProjection || proj->getType() == TransformerInterface::TransformerType::kExclusionProjection)) { return std::pair{proj->getTransformer().serializeTransformation(boost::none).toBson(), proj->getType() == TransformerInterface::TransformerType::kInclusionProjection}; } return std::pair{BSONObj{}, false}; } /** * Checks if a sort stage's pattern following our internal unpack bucket is suitable to be reordered * before us. The sort stage must refer exclusively to the meta field or any subfields. * * If this check is being used for lastpoint, the sort stage can also refer to the time field, * which should be the last field in the pattern. */ bool checkMetadataSortReorder( const SortPattern& sortPattern, const StringData& metaFieldStr, const boost::optional lastpointTimeField = boost::none) { auto timeFound = false; for (const auto& sortKey : sortPattern) { if (!sortKey.fieldPath.has_value()) { return false; } if (sortKey.fieldPath->getPathLength() < 1) { return false; } if (sortKey.fieldPath->getFieldName(0) != metaFieldStr) { if (lastpointTimeField && sortKey.fieldPath->fullPath() == lastpointTimeField.value()) { // If we are checking the sort pattern for the lastpoint case, 'time' is allowed. timeFound = true; continue; } return false; } else { if (lastpointTimeField && timeFound) { // The time field was not the last field in the sort pattern. return false; } } } // If we are checking for lastpoint, make sure we encountered the time field. return !lastpointTimeField || timeFound; } /** * Returns a new DocumentSort to reorder before current unpack bucket document. */ boost::intrusive_ptr createMetadataSortForReorder( const DocumentSourceSort& sort, const boost::optional lastpointTimeField = boost::none, const boost::optional groupIdField = boost::none, bool flipSort = false) { auto sortPattern = flipSort ? SortPattern( QueryPlannerCommon::reverseSortObj( sort.getSortKeyPattern() .serialize(SortPattern::SortKeySerialization::kForPipelineSerialization) .toBson()), sort.getContext()) : sort.getSortKeyPattern(); std::vector updatedPattern; for (const auto& entry : sortPattern) { updatedPattern.push_back(entry); if (lastpointTimeField && entry.fieldPath->fullPath() == lastpointTimeField.value()) { updatedPattern.back().fieldPath = FieldPath((entry.isAscending ? timeseries::kControlMinFieldNamePrefix : timeseries::kControlMaxFieldNamePrefix) + lastpointTimeField.value()); updatedPattern.push_back(SortPattern::SortPatternPart{ entry.isAscending, FieldPath((entry.isAscending ? timeseries::kControlMaxFieldNamePrefix : timeseries::kControlMinFieldNamePrefix) + lastpointTimeField.value()), nullptr}); } else { auto updated = FieldPath(timeseries::kBucketMetaFieldName); if (entry.fieldPath->getPathLength() > 1) { updated = updated.concat(entry.fieldPath->tail()); } updatedPattern.back().fieldPath = updated; } } // After the modifications of the sortPattern are completed, for the lastPoint // optimizations, the group field needs to be added to the beginning of the sort pattern. // Do note that the modified sort pattern is for sorting within a group (within the bucket) // and the plan is to do the grouping and sort within on go. // If the group field is already in the sortPattern then it needs to moved to the first // position. A flip in the later case is not necessary anymore as the sort order was // already flipped. // Example 1: $group: {a:1}, $sort{b: 1, a: -1} --> modifiedPattern: {a: -1, b: 1} // Example 2: $group: {c:1}, $sort{d: -1, e: 1} --> modifiedPattern: {c: 1, d: -1, e: 1} if (groupIdField) { const auto groupId = FieldPath(groupIdField.value()); SortPattern::SortPatternPart patternPart; patternPart.fieldPath = groupId; const auto pattern = std::find_if(updatedPattern.begin(), updatedPattern.end(), [&groupId](const SortPattern::SortPatternPart& s) -> bool { return s.fieldPath->fullPath().compare(groupId.fullPath()) == 0; }); if (pattern != updatedPattern.end()) { patternPart.isAscending = pattern->isAscending; updatedPattern.erase(pattern); } else { patternPart.isAscending = !flipSort; } updatedPattern.insert(updatedPattern.begin(), patternPart); } boost::optional maxMemoryUsageBytes; if (auto sortStatsPtr = dynamic_cast(sort.getSpecificStats())) { maxMemoryUsageBytes = sortStatsPtr->maxMemoryUsageBytes; } return DocumentSourceSort::create( sort.getContext(), SortPattern{updatedPattern}, 0, maxMemoryUsageBytes); } /** * Returns a new DocumentSourceGroup to add before current unpack bucket document. */ boost::intrusive_ptr createBucketGroupForReorder( const boost::intrusive_ptr& expCtx, const std::vector& fieldsToInclude, const std::string& fieldPath) { auto groupByExpr = ExpressionFieldPath::createPathFromString( expCtx.get(), fieldPath, expCtx->variablesParseState); std::vector accumulators; for (const auto& fieldName : fieldsToInclude) { auto field = BSON(fieldName << BSON(AccumulatorFirst::kName << "$" + fieldName)); accumulators.emplace_back(AccumulationStatement::parseAccumulationStatement( expCtx.get(), field.firstElement(), expCtx->variablesParseState)); }; return DocumentSourceGroup::create(expCtx, groupByExpr, accumulators); } // Optimize the section of the pipeline before the $_internalUnpackBucket stage. void optimizePrefix(Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { auto prefix = Pipeline::SourceContainer(container->begin(), itr); Pipeline::optimizeContainer(&prefix); container->erase(container->begin(), itr); container->splice(itr, prefix); } } // namespace DocumentSourceInternalUnpackBucket::DocumentSourceInternalUnpackBucket( const boost::intrusive_ptr& expCtx, BucketUnpacker bucketUnpacker, int bucketMaxSpanSeconds, bool assumeNoMixedSchemaData) : DocumentSource(kStageNameInternal, expCtx), _assumeNoMixedSchemaData(assumeNoMixedSchemaData), _bucketUnpacker(std::move(bucketUnpacker)), _bucketMaxSpanSeconds{bucketMaxSpanSeconds} {} DocumentSourceInternalUnpackBucket::DocumentSourceInternalUnpackBucket( const boost::intrusive_ptr& expCtx, BucketUnpacker bucketUnpacker, int bucketMaxSpanSeconds, const boost::optional& eventFilterBson, const boost::optional& wholeBucketFilterBson, bool assumeNoMixedSchemaData) : DocumentSourceInternalUnpackBucket( expCtx, std::move(bucketUnpacker), bucketMaxSpanSeconds, assumeNoMixedSchemaData) { if (eventFilterBson) { _eventFilterBson = eventFilterBson->getOwned(); _eventFilter = uassertStatusOK(MatchExpressionParser::parse(_eventFilterBson, pExpCtx, ExtensionsCallbackNoop(), Pipeline::kAllowedMatcherFeatures)); _eventFilterDeps = {}; match_expression::addDependencies(_eventFilter.get(), &_eventFilterDeps); } if (wholeBucketFilterBson) { _wholeBucketFilterBson = wholeBucketFilterBson->getOwned(); _wholeBucketFilter = uassertStatusOK(MatchExpressionParser::parse(_wholeBucketFilterBson, pExpCtx, ExtensionsCallbackNoop(), Pipeline::kAllowedMatcherFeatures)); } } boost::intrusive_ptr DocumentSourceInternalUnpackBucket::createFromBsonInternal( BSONElement specElem, const boost::intrusive_ptr& expCtx) { uassert(5346500, str::stream() << "$_internalUnpackBucket specification must be an object, got: " << specElem.type(), specElem.type() == BSONType::Object); // If neither "include" nor "exclude" is specified, the default is "exclude": [] and // if that's the case, no field will be added to 'bucketSpec.fieldSet' in the for-loop below. BucketSpec bucketSpec; // Use extended-range support if any individual collection requires it, even if 'specElem' // doesn't mention this flag. if (expCtx->getRequiresTimeseriesExtendedRangeSupport()) { bucketSpec.setUsesExtendedRange(true); } auto hasIncludeExclude = false; auto hasTimeField = false; auto hasBucketMaxSpanSeconds = false; auto bucketMaxSpanSeconds = 0; auto assumeClean = false; std::vector computedMetaProjFields; boost::optional eventFilterBson; boost::optional wholeBucketFilterBson; for (auto&& elem : specElem.embeddedObject()) { auto fieldName = elem.fieldNameStringData(); if (fieldName == kInclude || fieldName == kExclude) { uassert(5408000, "The $_internalUnpackBucket stage expects at most one of include/exclude " "parameters to be specified", !hasIncludeExclude); uassert(5346501, str::stream() << "include or exclude field must be an array, got: " << elem.type(), elem.type() == BSONType::Array); for (auto&& elt : elem.embeddedObject()) { uassert(5346502, str::stream() << "include or exclude field element must be a string, got: " << elt.type(), elt.type() == BSONType::String); auto field = elt.valueStringData(); uassert(5346503, "include or exclude field element must be a single-element field path", field.find('.') == std::string::npos); bucketSpec.addIncludeExcludeField(field); } bucketSpec.setBehavior(fieldName == kInclude ? BucketSpec::Behavior::kInclude : BucketSpec::Behavior::kExclude); hasIncludeExclude = true; } else if (fieldName == kAssumeNoMixedSchemaData) { uassert(6067202, str::stream() << "assumeClean field must be a bool, got: " << elem.type(), elem.type() == BSONType::Bool); assumeClean = elem.boolean(); } else if (fieldName == timeseries::kTimeFieldName) { uassert(5346504, str::stream() << "timeField field must be a string, got: " << elem.type(), elem.type() == BSONType::String); bucketSpec.setTimeField(elem.str()); hasTimeField = true; } else if (fieldName == timeseries::kMetaFieldName) { uassert(5346505, str::stream() << "metaField field must be a string, got: " << elem.type(), elem.type() == BSONType::String); auto metaField = elem.str(); uassert(5545700, str::stream() << "metaField field must be a single-element field path", metaField.find('.') == std::string::npos); bucketSpec.setMetaField(std::move(metaField)); } else if (fieldName == kBucketMaxSpanSeconds) { uassert(5510600, str::stream() << "bucketMaxSpanSeconds field must be an integer, got: " << elem.type(), elem.type() == BSONType::NumberInt); uassert(5510601, "bucketMaxSpanSeconds field must be greater than zero", elem._numberInt() > 0); bucketMaxSpanSeconds = elem._numberInt(); hasBucketMaxSpanSeconds = true; } else if (fieldName == "computedMetaProjFields") { uassert(5509900, str::stream() << "computedMetaProjFields field must be an array, got: " << elem.type(), elem.type() == BSONType::Array); for (auto&& elt : elem.embeddedObject()) { uassert(5509901, str::stream() << "computedMetaProjFields field element must be a string, got: " << elt.type(), elt.type() == BSONType::String); auto field = elt.valueStringData(); uassert(5509902, "computedMetaProjFields field element must be a single-element field path", field.find('.') == std::string::npos); bucketSpec.addComputedMetaProjFields(field); } } else if (fieldName == kIncludeMinTimeAsMetadata) { uassert(6460208, str::stream() << kIncludeMinTimeAsMetadata << " field must be a bool, got: " << elem.type(), elem.type() == BSONType::Bool); bucketSpec.includeMinTimeAsMetadata = elem.boolean(); } else if (fieldName == kIncludeMaxTimeAsMetadata) { uassert(6460209, str::stream() << kIncludeMaxTimeAsMetadata << " field must be a bool, got: " << elem.type(), elem.type() == BSONType::Bool); bucketSpec.includeMaxTimeAsMetadata = elem.boolean(); } else if (fieldName == kUsesExtendedRange) { uassert(6646901, str::stream() << kUsesExtendedRange << " field must be a bool, got: " << elem.type(), elem.type() == BSONType::Bool); bucketSpec.setUsesExtendedRange(elem.boolean()); } else if (fieldName == kEventFilter) { uassert(7026902, str::stream() << kEventFilter << " field must be an object, got: " << elem.type(), elem.type() == BSONType::Object); eventFilterBson = elem.Obj(); } else if (fieldName == kWholeBucketFilter) { uassert(7026903, str::stream() << kWholeBucketFilter << " field must be an object, got: " << elem.type(), elem.type() == BSONType::Object); wholeBucketFilterBson = elem.Obj(); } else { uasserted(5346506, str::stream() << "unrecognized parameter to $_internalUnpackBucket: " << fieldName); } } uassert( 5346508, "The $_internalUnpackBucket stage requires a timeField parameter", hasTimeField); uassert(5510602, "The $_internalUnpackBucket stage requires a bucketMaxSpanSeconds parameter", hasBucketMaxSpanSeconds); return make_intrusive(expCtx, BucketUnpacker{std::move(bucketSpec)}, bucketMaxSpanSeconds, eventFilterBson, wholeBucketFilterBson, assumeClean); } boost::intrusive_ptr DocumentSourceInternalUnpackBucket::createFromBsonExternal( BSONElement specElem, const boost::intrusive_ptr& expCtx) { uassert(5612400, str::stream() << "$_unpackBucket specification must be an object, got: " << specElem.type(), specElem.type() == BSONType::Object); BucketSpec bucketSpec; auto hasTimeField = false; auto assumeClean = false; for (auto&& elem : specElem.embeddedObject()) { auto fieldName = elem.fieldNameStringData(); // We only expose "timeField" and "metaField" as parameters in $_unpackBucket. if (fieldName == timeseries::kTimeFieldName) { uassert(5612401, str::stream() << "timeField field must be a string, got: " << elem.type(), elem.type() == BSONType::String); bucketSpec.setTimeField(elem.str()); hasTimeField = true; } else if (fieldName == timeseries::kMetaFieldName) { uassert(5612402, str::stream() << "metaField field must be a string, got: " << elem.type(), elem.type() == BSONType::String); auto metaField = elem.str(); uassert(5612403, str::stream() << "metaField field must be a single-element field path", metaField.find('.') == std::string::npos); bucketSpec.setMetaField(std::move(metaField)); } else if (fieldName == kAssumeNoMixedSchemaData) { uassert(6067203, str::stream() << "assumeClean field must be a bool, got: " << elem.type(), elem.type() == BSONType::Bool); assumeClean = elem.boolean(); } else { uasserted(5612404, str::stream() << "unrecognized parameter to $_unpackBucket: " << fieldName); } } uassert(5612405, str::stream() << "The $_unpackBucket stage requires a timeField parameter", hasTimeField); return make_intrusive( expCtx, BucketUnpacker{std::move(bucketSpec)}, // Using the bucket span associated with the default granularity seconds. timeseries::getMaxSpanSecondsFromGranularity(BucketGranularityEnum::Seconds), assumeClean); } void DocumentSourceInternalUnpackBucket::serializeToArray(std::vector& array, SerializationOptions opts) const { auto explain = opts.verbosity; MutableDocument out; auto behavior = _bucketUnpacker.behavior() == BucketSpec::Behavior::kInclude ? kInclude : kExclude; const auto& spec = _bucketUnpacker.bucketSpec(); std::vector fields; for (auto&& field : spec.fieldSet()) { fields.emplace_back(opts.serializeFieldPathFromString(field)); } if (((_bucketUnpacker.includeMetaField() && _bucketUnpacker.behavior() == BucketSpec::Behavior::kInclude) || (!_bucketUnpacker.includeMetaField() && _bucketUnpacker.behavior() == BucketSpec::Behavior::kExclude && spec.metaField())) && std::find(spec.computedMetaProjFields().cbegin(), spec.computedMetaProjFields().cend(), *spec.metaField()) == spec.computedMetaProjFields().cend()) fields.emplace_back(opts.serializeFieldPathFromString(*spec.metaField())); out.addField(behavior, Value{std::move(fields)}); out.addField(timeseries::kTimeFieldName, Value{opts.serializeFieldPathFromString(spec.timeField())}); if (spec.metaField()) { out.addField(timeseries::kMetaFieldName, Value{opts.serializeFieldPathFromString(*spec.metaField())}); } out.addField(kBucketMaxSpanSeconds, opts.serializeLiteralValue(Value{_bucketMaxSpanSeconds})); if (_assumeNoMixedSchemaData) out.addField(kAssumeNoMixedSchemaData, opts.serializeLiteralValue(Value(_assumeNoMixedSchemaData))); if (spec.usesExtendedRange()) { // Include this flag so that 'explain' is more helpful. // But this is not so useful for communicating from one process to another, // because mongos and/or the primary shard don't know whether any other shard // has extended-range data. out.addField(kUsesExtendedRange, opts.serializeLiteralValue(Value{true})); } if (!spec.computedMetaProjFields().empty()) out.addField("computedMetaProjFields", Value{[&] { std::vector compFields; std::transform(spec.computedMetaProjFields().cbegin(), spec.computedMetaProjFields().cend(), std::back_inserter(compFields), [opts](auto&& projString) { return Value{ opts.serializeFieldPathFromString(projString)}; }); return compFields; }()}); if (_bucketUnpacker.includeMinTimeAsMetadata()) { out.addField(kIncludeMinTimeAsMetadata, opts.serializeLiteralValue(Value{_bucketUnpacker.includeMinTimeAsMetadata()})); } if (_bucketUnpacker.includeMaxTimeAsMetadata()) { out.addField(kIncludeMaxTimeAsMetadata, opts.serializeLiteralValue(Value{_bucketUnpacker.includeMaxTimeAsMetadata()})); } if (_wholeBucketFilter) { out.addField(kWholeBucketFilter, Value{_wholeBucketFilter->serialize(opts)}); } if (_eventFilter) { out.addField(kEventFilter, Value{_eventFilter->serialize(opts)}); } if (!explain) { array.push_back(Value(DOC(getSourceName() << out.freeze()))); if (_sampleSize) { auto sampleSrc = DocumentSourceSample::create(pExpCtx, *_sampleSize); sampleSrc->serializeToArray(array, opts); } } else { if (_sampleSize) { out.addField("sample", opts.serializeLiteralValue(Value{static_cast(*_sampleSize)})); out.addField("bucketMaxCount", opts.serializeLiteralValue(Value{_bucketMaxCount})); } array.push_back(Value(DOC(getSourceName() << out.freeze()))); } } boost::optional DocumentSourceInternalUnpackBucket::getNextMatchingMeasure() { while (_bucketUnpacker.hasNext()) { if (_eventFilter) { if (_unpackToBson) { auto measure = _bucketUnpacker.getNextBson(); if (_bucketUnpacker.bucketMatchedQuery() || _eventFilter->matchesBSON(measure)) { return Document(measure); } } else { auto measure = _bucketUnpacker.getNext(); // MatchExpression only takes BSON documents, so we have to make one. As an // optimization, only serialize the fields we need to do the match. BSONObj measureBson = _eventFilterDeps.needWholeDocument ? measure.toBson() : document_path_support::documentToBsonWithPaths(measure, _eventFilterDeps.fields); if (_bucketUnpacker.bucketMatchedQuery() || _eventFilter->matchesBSON(measureBson)) { return measure; } } } else { return _bucketUnpacker.getNext(); } } return {}; } DocumentSource::GetNextResult DocumentSourceInternalUnpackBucket::doGetNext() { tassert(5521502, "calling doGetNext() when '_sampleSize' is set is disallowed", !_sampleSize); // Otherwise, fallback to unpacking every measurement in all buckets until the child stage is // exhausted. if (auto measure = getNextMatchingMeasure()) { return GetNextResult(std::move(*measure)); } auto nextResult = pSource->getNext(); while (nextResult.isAdvanced()) { auto bucket = nextResult.getDocument().toBson(); auto bucketMatchedQuery = _wholeBucketFilter && _wholeBucketFilter->matchesBSON(bucket); _bucketUnpacker.reset(std::move(bucket), bucketMatchedQuery); uassert(5346509, str::stream() << "A bucket with _id " << _bucketUnpacker.bucket()[timeseries::kBucketIdFieldName].toString() << " contains an empty data region", _bucketUnpacker.hasNext()); if (auto measure = getNextMatchingMeasure()) { return GetNextResult(std::move(*measure)); } nextResult = pSource->getNext(); } return nextResult; } bool DocumentSourceInternalUnpackBucket::pushDownComputedMetaProjection( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { bool nextStageWasRemoved = false; if (std::next(itr) == container->end()) { return nextStageWasRemoved; } if (!_bucketUnpacker.getMetaField() || !_bucketUnpacker.includeMetaField()) { return nextStageWasRemoved; } if (auto nextTransform = dynamic_cast(std::next(itr)->get()); nextTransform && (nextTransform->getType() == TransformerInterface::TransformerType::kInclusionProjection || nextTransform->getType() == TransformerInterface::TransformerType::kComputedProjection)) { auto& metaName = _bucketUnpacker.bucketSpec().metaField().value(); auto [addFieldsSpec, deleteStage] = nextTransform->extractComputedProjections(metaName, timeseries::kBucketMetaFieldName.toString(), BucketUnpacker::reservedBucketFieldNames); nextStageWasRemoved = deleteStage; if (!addFieldsSpec.isEmpty()) { // Extend bucket specification of this stage to include the computed meta projections // that are passed through. std::vector computedMetaProjFields; for (auto&& elem : addFieldsSpec) { computedMetaProjFields.emplace_back(elem.fieldName()); } _bucketUnpacker.addComputedMetaProjFields(computedMetaProjFields); // Insert extracted computed projections before the $_internalUnpackBucket. container->insert( itr, DocumentSourceAddFields::createFromBson( BSON("$addFields" << addFieldsSpec).firstElement(), getContext())); // Remove the next stage if it became empty after the field extraction. if (deleteStage) { container->erase(std::next(itr)); } } } return nextStageWasRemoved; } void DocumentSourceInternalUnpackBucket::internalizeProject(const BSONObj& project, bool isInclusion) { // 'fields' are the top-level fields to be included/excluded by the unpacker. We handle the // special case of _id, which may be excluded in an inclusion $project (or vice versa), here. auto fields = project.getFieldNames>(); if (auto elt = project.getField("_id"); (elt.isBoolean() && elt.Bool() != isInclusion) || (elt.isNumber() && (elt.Int() == 1) != isInclusion)) { fields.erase("_id"); } // Update '_bucketUnpacker' state with the new fields and behavior. auto spec = _bucketUnpacker.bucketSpec(); spec.setFieldSet(fields); spec.setBehavior(isInclusion ? BucketSpec::Behavior::kInclude : BucketSpec::Behavior::kExclude); _bucketUnpacker.setBucketSpec(std::move(spec)); } std::pair DocumentSourceInternalUnpackBucket::extractOrBuildProjectToInternalize( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) const { if (std::next(itr) == container->end() || !_bucketUnpacker.bucketSpec().fieldSet().empty()) { // There is no project to internalize or there are already fields being included/excluded. return {BSONObj{}, false}; } // Check for a viable inclusion $project after the $_internalUnpackBucket. auto [existingProj, isInclusion] = getIncludeExcludeProjectAndType(std::next(itr)->get()); if (!_eventFilter && isInclusion && !existingProj.isEmpty() && canInternalizeProjectObj(existingProj)) { container->erase(std::next(itr)); return {existingProj, isInclusion}; } // Attempt to get an inclusion $project representing the root-level dependencies of the pipeline // after the $_internalUnpackBucket. If this $project is not empty, then the dependency set was // finite. auto deps = getRestPipelineDependencies(itr, container, true /* includeEventFilter */); if (auto dependencyProj = deps.toProjectionWithoutMetadata(DepsTracker::TruncateToRootLevel::yes); !dependencyProj.isEmpty()) { return {dependencyProj, true}; } // Check for a viable exclusion $project after the $_internalUnpackBucket. if (!_eventFilter && !existingProj.isEmpty() && canInternalizeProjectObj(existingProj)) { container->erase(std::next(itr)); return {existingProj, isInclusion}; } return {BSONObj{}, false}; } BucketSpec::BucketPredicate DocumentSourceInternalUnpackBucket::createPredicatesOnBucketLevelField( const MatchExpression* matchExpr) const { return BucketSpec::createPredicatesOnBucketLevelField( matchExpr, _bucketUnpacker.bucketSpec(), _bucketMaxSpanSeconds, pExpCtx, haveComputedMetaField(), _bucketUnpacker.includeMetaField(), _assumeNoMixedSchemaData, BucketSpec::IneligiblePredicatePolicy::kIgnore); } std::pair DocumentSourceInternalUnpackBucket::extractProjectForPushDown( DocumentSource* src) const { if (auto nextProject = dynamic_cast(src); _bucketUnpacker.bucketSpec().metaField() && nextProject && nextProject->getType() == TransformerInterface::TransformerType::kExclusionProjection) { return nextProject->extractProjectOnFieldAndRename( _bucketUnpacker.bucketSpec().metaField().value(), timeseries::kBucketMetaFieldName); } return {BSONObj{}, false}; } std::pair DocumentSourceInternalUnpackBucket::rewriteGroupByMinMax(Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { const auto* groupPtr = dynamic_cast(std::next(itr)->get()); if (groupPtr == nullptr) { return {}; } if (!_bucketUnpacker.bucketSpec().metaField()) { return {}; } const auto& metaField = *_bucketUnpacker.bucketSpec().metaField(); const auto& idFields = groupPtr->getIdFields(); if (idFields.size() != 1) { return {}; } const auto& exprId = idFields.cbegin()->second; // TODO: SERVER-68811. Allow rewrites if expression is constant. const auto* exprIdPath = dynamic_cast(exprId.get()); if (exprIdPath == nullptr) { return {}; } const auto& idPath = exprIdPath->getFieldPath(); if (idPath.getPathLength() < 2 || idPath.getFieldName(1) != metaField) { return {}; } std::vector accumulationStatements; for (const AccumulationStatement& stmt : groupPtr->getAccumulatedFields()) { const auto* exprArg = stmt.expr.argument.get(); if (const auto* exprArgPath = dynamic_cast(exprArg)) { const auto& path = exprArgPath->getFieldPath(); if (path.getPathLength() <= 1) { return {}; } const auto& rootFieldName = path.getFieldName(1); if (rootFieldName == _bucketUnpacker.bucketSpec().timeField()) { // Rewrite not valid for time field. We want to eliminate the bucket // unpack stage here. return {}; } std::ostringstream os; if (rootFieldName == metaField) { // Update aggregates to reference the meta field. os << timeseries::kBucketMetaFieldName; for (size_t index = 2; index < path.getPathLength(); index++) { os << "." << path.getFieldName(index); } } else { // Update aggregates to reference the control field. const auto op = stmt.expr.name; if (op == "$min") { os << timeseries::kControlMinFieldNamePrefix; } else if (op == "$max") { os << timeseries::kControlMaxFieldNamePrefix; } else { // Rewrite is valid only for min and max aggregates. return {}; } for (size_t index = 1; index < path.getPathLength(); index++) { if (index > 1) { os << "."; } os << path.getFieldName(index); } } const auto& newExpr = ExpressionFieldPath::createPathFromString( pExpCtx.get(), os.str(), pExpCtx->variablesParseState); AccumulationExpression accExpr = stmt.expr; accExpr.argument = newExpr; accumulationStatements.emplace_back(stmt.fieldName, std::move(accExpr)); } else { return {}; } } std::ostringstream os; os << timeseries::kBucketMetaFieldName; for (size_t index = 2; index < idPath.getPathLength(); index++) { os << "." << idPath.getFieldName(index); } auto exprId1 = ExpressionFieldPath::createPathFromString( pExpCtx.get(), os.str(), pExpCtx->variablesParseState); auto newGroup = DocumentSourceGroup::create(pExpCtx, std::move(exprId1), std::move(accumulationStatements), groupPtr->getMaxMemoryUsageBytes()); // Erase current stage and following group stage, and replace with updated group. container->erase(std::next(itr)); *itr = std::move(newGroup); if (itr == container->begin()) { // Optimize group stage. return {true, itr}; } else { // Give chance of the previous stage to optimize against group stage. return {true, std::prev(itr)}; } } bool DocumentSourceInternalUnpackBucket::haveComputedMetaField() const { return _bucketUnpacker.bucketSpec().metaField() && _bucketUnpacker.bucketSpec().fieldIsComputed( _bucketUnpacker.bucketSpec().metaField().value()); } bool DocumentSourceInternalUnpackBucket::enableStreamingGroupIfPossible( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { // skip unpack stage itr = std::next(itr); FieldPath timeField = _bucketUnpacker.bucketSpec().timeField(); DocumentSourceGroup* groupStage = nullptr; bool isSortedOnTime = false; for (; itr != container->end(); ++itr) { if (auto groupStagePtr = dynamic_cast(itr->get())) { groupStage = groupStagePtr; break; } if (auto sortStagePtr = dynamic_cast(itr->get())) { isSortedOnTime = sortStagePtr->getSortKeyPattern().front().fieldPath == timeField; } else if (!itr->get()->constraints().preservesOrderAndMetadata) { // If this is after the sort, the sort is invalidated. If it's before the sort, there's // no harm in keeping the boolean false. isSortedOnTime = false; } // We modify time field, so we can't proceed with optimization. It may be possible to // proceed in some cases if the modification happens before the sort, but we won't worry // about or bother with those - in large part because it is risky that it will change the // type away from a date into something with more difficult/subtle semantics. if (itr->get()->getModifiedPaths().canModify(timeField)) { return false; } } if (groupStage == nullptr || !isSortedOnTime) { return false; } const auto& idFields = groupStage->getMutableIdFields(); std::vector monotonicIdFields; for (size_t i = 0; i < idFields.size(); ++i) { // To enable streaming, we need id field expression to be clustered, so that all documents // with the same value of this id field are in a single continious cluster. However this // property is hard to check for, so we check for monotonicity instead, which is stronger. idFields[i]->optimize(); // We optimize here to make use of constant folding. auto monotonicState = idFields[i]->getMonotonicState(timeField); // We don't add monotonic::State::Constant id fields, because they are useless when // determining if a group batch is finished. if (monotonicState == monotonic::State::Increasing || monotonicState == monotonic::State::Decreasing) { monotonicIdFields.push_back(i); } } if (monotonicIdFields.empty()) { return false; } *itr = DocumentSourceStreamingGroup::create(pExpCtx, groupStage->getIdExpression(), std::move(monotonicIdFields), std::move(groupStage->getMutableAccumulatedFields()), groupStage->getMaxMemoryUsageBytes()); return true; } template bool extractFromAcc(const AccumulatorN* acc, const boost::intrusive_ptr& init, boost::optional& outputAccumulator, boost::optional& outputSortPattern) { // If this accumulator will not return a single document then we cannot rewrite this query to // use a $sort + a $group with $first or $last. if constexpr (!single) { // We may have a $topN or a $bottomN with n = 1; in this case, we may still be able to // perform the lastpoint rewrite. if (auto constInit = dynamic_cast(init.get()); constInit) { // Since this is a $const expression, the input to evaluate() should not matter. auto constVal = constInit->evaluate(Document(), nullptr); if (!constVal.numeric() || (constVal.coerceToLong() != 1)) { return false; } } else { return false; } } // Retrieve sort pattern for an equivalent $sort. const auto multiAc = dynamic_cast*>(acc); invariant(multiAc); outputSortPattern = multiAc->getSortPattern() .serialize(SortPattern::SortKeySerialization::kForPipelineSerialization) .toBson(); // Retrieve equivalent accumulator statement using $first/$last for retrieving the entire // document. constexpr auto accumulator = (sense == TopBottomSense::kTop) ? AccumulatorFirst::kName : AccumulatorLast::kName; // Note: we don't need to preserve what the $top/$bottom accumulator outputs here. We only need // a $group stage with the appropriate accumulator that retrieves the bucket in some way. For // the rewrite we preserve the original group and insert an $group that returns all the data in // the $first bucket selected for each _id. outputAccumulator = BSON("bucket" << BSON(accumulator << "$$ROOT")); return true; } bool extractFromAccIfTopBottomN(const AccumulatorN* multiAcc, const boost::intrusive_ptr& init, boost::optional& outputAccumulator, boost::optional& outputSortPattern) { const auto accType = multiAcc->getAccumulatorType(); if (accType == AccumulatorN::kTopN) { return extractFromAcc( multiAcc, init, outputAccumulator, outputSortPattern); } else if (accType == AccumulatorN::kTop) { return extractFromAcc( multiAcc, init, outputAccumulator, outputSortPattern); } else if (accType == AccumulatorN::kBottomN) { return extractFromAcc( multiAcc, init, outputAccumulator, outputSortPattern); } else if (accType == AccumulatorN::kBottom) { return extractFromAcc( multiAcc, init, outputAccumulator, outputSortPattern); } // This isn't a topN/bottomN/top/bottom accumulator. return false; } std::pair, boost::intrusive_ptr> tryRewriteGroupAsSortGroup(boost::intrusive_ptr expCtx, Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container, DocumentSourceGroup* groupStage) { const auto accumulators = groupStage->getAccumulatedFields(); if (accumulators.size() != 1) { // If we have multiple accumulators, we fail to optimize for a lastpoint query. return {nullptr, nullptr}; } const auto init = accumulators[0].expr.initializer; const auto accState = accumulators[0].makeAccumulator(); const AccumulatorN* multiAcc = dynamic_cast(accState.get()); if (!multiAcc) { return {nullptr, nullptr}; } boost::optional maybeAcc; boost::optional maybeSortPattern; if (!extractFromAccIfTopBottomN(multiAcc, init, maybeAcc, maybeSortPattern)) { // This isn't a topN/bottomN/top/bottom accumulator or N != 1. return {nullptr, nullptr}; } tassert(6165600, "sort pattern and accumulator must be initialized if cast of $top or $bottom succeeds", maybeSortPattern && maybeAcc); auto newSortStage = DocumentSourceSort::create(expCtx, SortPattern(*maybeSortPattern, expCtx)); auto newAccState = AccumulationStatement::parseAccumulationStatement( expCtx.get(), maybeAcc->firstElement(), expCtx->variablesParseState); auto newGroupStage = DocumentSourceGroup::create(expCtx, groupStage->getIdExpression(), {newAccState}); return {newSortStage, newGroupStage}; } bool DocumentSourceInternalUnpackBucket::optimizeLastpoint(Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { // A lastpoint-type aggregation must contain both a $sort and a $group stage, in that order, or // only a $group stage with a $top, $topN, $bottom, or $bottomN accumulator. This means we need // at least one stage after $_internalUnpackBucket. if (std::next(itr) == container->end()) { return false; } // If we only have one stage after $_internalUnpackBucket, it must be a $group for the // lastpoint rewrite to happen. DocumentSourceSort* sortStage = nullptr; auto groupStage = dynamic_cast(std::next(itr)->get()); // If we don't have a $sort + $group lastpoint query, we will need to replace the $group with // equivalent $sort + $group stages for the rewrite. boost::intrusive_ptr sortStagePtr; boost::intrusive_ptr groupStagePtr; if (!groupStage && (std::next(itr, 2) != container->end())) { // If the first stage is not a $group, we may have a $sort + $group lastpoint query. sortStage = dynamic_cast(std::next(itr)->get()); groupStage = dynamic_cast(std::next(itr, 2)->get()); } else if (groupStage) { // Try to rewrite the $group to a $sort+$group-style lastpoint query before proceeding with // the optimization. std::tie(sortStagePtr, groupStagePtr) = tryRewriteGroupAsSortGroup(pExpCtx, itr, container, groupStage); // Both these stages should be discarded once we exit this function; either because the // rewrite failed validation checks, or because we created updated versions of these stages // in 'tryInsertBucketLevelSortAndGroup' below (which will be inserted into the pipeline). // The intrusive_ptrs above handle this deletion gracefully. sortStage = sortStagePtr.get(); groupStage = groupStagePtr.get(); } if (!sortStage || !groupStage) { return false; } if (sortStage->hasLimit()) { // This $sort stage was previously followed by a $limit stage. return false; } auto spec = _bucketUnpacker.bucketSpec(); auto maybeMetaField = spec.metaField(); auto timeField = spec.timeField(); if (!maybeMetaField || haveComputedMetaField()) { return false; } auto metaField = maybeMetaField.value(); if (!checkMetadataSortReorder(sortStage->getSortKeyPattern(), metaField, timeField)) { return false; } auto groupIdFields = groupStage->getIdFields(); if (groupIdFields.size() != 1) { return false; } auto groupId = dynamic_cast(groupIdFields.cbegin()->second.get()); if (!groupId || groupId->isVariableReference()) { return false; } const auto fieldPath = groupId->getFieldPath(); if (fieldPath.getPathLength() <= 1 || fieldPath.tail().getFieldName(0) != metaField) { return false; } auto rewrittenFieldPath = FieldPath(timeseries::kBucketMetaFieldName); if (fieldPath.tail().getPathLength() > 1) { rewrittenFieldPath = rewrittenFieldPath.concat(fieldPath.tail().tail()); } auto newFieldPath = rewrittenFieldPath.fullPath(); // Check to see if $group uses only the specified accumulator. auto accumulators = groupStage->getAccumulatedFields(); auto groupOnlyUsesTargetAccum = [&](AccumulatorDocumentsNeeded targetAccum) { return std::all_of(accumulators.begin(), accumulators.end(), [&](auto&& accum) { return targetAccum == accum.makeAccumulator()->documentsNeeded(); }); }; // We disallow firstpoint queries from participating in this rewrite due to the fact that we // round down control.min.time for buckets, which means that if we are given two buckets with // the same min time, we won't know which bucket contains the earliest time until we unpack // them and sort their measurements. Hence, this rewrite could give us an incorrect firstpoint. auto isSortValidForGroup = [&](AccumulatorDocumentsNeeded targetAccum) { bool firstpointTimeIsAscending = (targetAccum == AccumulatorDocumentsNeeded::kFirstDocument); for (const auto& entry : sortStage->getSortKeyPattern()) { auto isTimeField = entry.fieldPath->fullPath() == timeField; if (isTimeField && (entry.isAscending == firstpointTimeIsAscending)) { // This is a first-point query, which is disallowed. return false; } else if (!isTimeField && (entry.fieldPath->fullPath() != fieldPath.tail().fullPath())) { // This sorts on a metaField which does not match the $group's _id field. return false; } } return true; }; // Try to insert bucket-level $sort and $group stages before we unpack any buckets. We ensure // that the generated $group preserves all bucket fields, so that the $_internalUnpackBucket // stage and the original $group stage can read them. std::vector fieldsToInclude{timeseries::kBucketMetaFieldName.toString(), timeseries::kBucketControlFieldName.toString(), timeseries::kBucketDataFieldName.toString()}; const auto& computedMetaProjFields = _bucketUnpacker.bucketSpec().computedMetaProjFields(); std::copy(computedMetaProjFields.begin(), computedMetaProjFields.end(), std::back_inserter(fieldsToInclude)); auto tryInsertBucketLevelSortAndGroup = [&](AccumulatorDocumentsNeeded accum) { if (!groupOnlyUsesTargetAccum(accum) || !isSortValidForGroup(accum)) { return false; } bool flipSort = (accum == AccumulatorDocumentsNeeded::kLastDocument); auto newSort = createMetadataSortForReorder(*sortStage, timeField, newFieldPath, flipSort); auto newGroup = createBucketGroupForReorder(pExpCtx, fieldsToInclude, newFieldPath); // Note that we don't erase any of the original stages for this rewrite. This allows us to // preserve the particular semantics of the original group (e.g. $top behaves differently // than topN with n = 1, $group accumulators have a special way of dealing with nulls, etc.) // without constructing a specialized projection to exactly match what the original query // would have returned. container->insert(itr, newSort); container->insert(itr, newGroup); return true; }; return tryInsertBucketLevelSortAndGroup(AccumulatorDocumentsNeeded::kFirstDocument) || tryInsertBucketLevelSortAndGroup(AccumulatorDocumentsNeeded::kLastDocument); } bool findSequentialDocumentCache(Pipeline::SourceContainer::iterator start, Pipeline::SourceContainer::iterator end) { while (start != end && !dynamic_cast(start->get())) { start = std::next(start); } return start != end; } DepsTracker DocumentSourceInternalUnpackBucket::getRestPipelineDependencies( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container, bool includeEventFilter) const { auto deps = Pipeline::getDependenciesForContainer( pExpCtx, Pipeline::SourceContainer{std::next(itr), container->end()}, boost::none); if (_eventFilter && includeEventFilter) { match_expression::addDependencies(_eventFilter.get(), &deps); } return deps; } Pipeline::SourceContainer::iterator DocumentSourceInternalUnpackBucket::doOptimizeAt( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { invariant(*itr == this); if (std::next(itr) == container->end()) { return container->end(); } // Some optimizations may not be safe to do if we have computed the metaField via an $addFields // or a computed $project. We won't do those optimizations if 'haveComputedMetaField' is true. bool haveComputedMetaField = this->haveComputedMetaField(); // Before any other rewrites for the current stage, consider reordering with $sort. if (auto sortPtr = dynamic_cast(std::next(itr)->get()); sortPtr && !_eventFilter) { if (auto metaField = _bucketUnpacker.bucketSpec().metaField(); metaField && !haveComputedMetaField) { if (checkMetadataSortReorder(sortPtr->getSortKeyPattern(), metaField.value())) { // We have a sort on metadata field following this stage. Reorder the two stages // and return a pointer to the preceding stage. auto sortForReorder = createMetadataSortForReorder(*sortPtr); // If the original sort had a limit, we will not preserve that in the swapped sort. // Instead we will add a $limit to the end of the pipeline to keep the number of // expected results. if (auto limit = sortPtr->getLimit(); limit && *limit != 0) { container->push_back(DocumentSourceLimit::create(pExpCtx, *limit)); } // Reorder sort and current doc. *std::next(itr) = std::move(*itr); *itr = std::move(sortForReorder); if (itr == container->begin()) { // Try to optimize the current stage again. return std::next(itr); } else { // Try to optimize the previous stage against $sort. return std::prev(itr); } } } } // Attempt to push geoNear on the metaField past $_internalUnpackBucket. if (auto nextNear = dynamic_cast(std::next(itr)->get()); nextNear && !_eventFilter) { // Currently we only support geo indexes on the meta field, and we enforce this by // requiring the key field to be set so we can check before we try to look up indexes. auto keyField = nextNear->getKeyField(); uassert(5892921, "Must specify 'key' option for $geoNear on a time-series collection", keyField); // Currently we do not support query for $geoNear on a bucket uassert( 1938439, "Must not specify 'query' for $geoNear on a time-series collection; use $match instead", nextNear->getQuery().binaryEqual(BSONObj())); auto metaField = _bucketUnpacker.bucketSpec().metaField(); if (metaField && *metaField == keyField->front()) { // Make sure we actually re-write the key field for the buckets collection so we can // locate the index. static const FieldPath baseMetaFieldPath{timeseries::kBucketMetaFieldName}; nextNear->setKeyField(keyField->getPathLength() > 1 ? baseMetaFieldPath.concat(keyField->tail()) : baseMetaFieldPath); // Save the source, remove it, and then push it down. auto source = *std::next(itr); container->erase(std::next(itr)); container->insert(itr, source); return std::prev(itr) == container->begin() ? std::prev(itr) : std::prev(std::prev(itr)); } else { // Don't push down query on measurements. } } // Optimize the pipeline after this stage to merge $match stages and push them forward, and to // take advantage of $expr rewrite optimizations. if (!_optimizedEndOfPipeline) { _optimizedEndOfPipeline = true; if (std::next(itr) == container->end()) { return container->end(); } if (auto nextStage = dynamic_cast(std::next(itr)->get())) { // If the end of the pipeline starts with a $geoNear stage, make sure it gets optimized // in a context where it knows there are other stages before it. It will split itself // up into separate $match and $sort stages. But it doesn't split itself up when it's // the first stage, because it expects to use a special DocumentSouceGeoNearCursor plan. nextStage->optimizeAt(std::next(itr), container); } auto cacheFound = findSequentialDocumentCache(itr, container->end()); if (cacheFound) { // optimizeAt() is responsible for reordering stages, and optimize() is responsible for // simplifying individual stages. $sequentialCache's optimizeAt() places the stage where // it can cache as big a prefix of the pipeline as possible. To do so correctly, it // needs to look at dependencies: a stage that depends on a let-variable cannot be // cached. But optimize() can inline variables. Therefore, we want to avoid calling // optimize() before $sequentialCache has a chance to run optimizeAt(). return Pipeline::optimizeAtEndOfPipeline(itr, container); } else { Pipeline::optimizeEndOfPipeline(itr, container); } if (std::next(itr) == container->end()) { return container->end(); } else { // Kick back out to optimizing this stage again a level up, so any matches that were // moved to directly after this stage can be moved before it if possible. return itr; } } { // Check if we can avoid unpacking if we have a group stage with min/max aggregates. auto [success, result] = rewriteGroupByMinMax(itr, container); if (success) { return result; } } { // Check if the rest of the pipeline needs any fields. For example we might only be // interested in $count. auto deps = getRestPipelineDependencies(itr, container, true /* includeEventFilter */); if (deps.hasNoRequirements()) { _bucketUnpacker.setBucketSpec({_bucketUnpacker.bucketSpec().timeField(), _bucketUnpacker.bucketSpec().metaField(), {}, BucketSpec::Behavior::kInclude}); // Keep going for next optimization. } if (deps.getNeedsMetadata(DocumentMetadataFields::MetaType::kTimeseriesBucketMinTime)) { _bucketUnpacker.setIncludeMinTimeAsMetadata(); } if (deps.getNeedsMetadata(DocumentMetadataFields::MetaType::kTimeseriesBucketMaxTime)) { _bucketUnpacker.setIncludeMaxTimeAsMetadata(); } } // Attempt to optimize last-point type queries. if (!_triedLastpointRewrite && !_eventFilter && optimizeLastpoint(itr, container)) { _triedLastpointRewrite = true; // If we are able to rewrite the aggregation, give the resulting pipeline a chance to // perform further optimizations. return container->begin(); }; // Attempt to map predicates on bucketed fields to the predicates on the control field. if (auto nextMatch = dynamic_cast(std::next(itr)->get())) { // Merge multiple following $match stages. auto itrToMatch = std::next(itr); while (std::next(itrToMatch) != container->end() && dynamic_cast(std::next(itrToMatch)->get())) { nextMatch->doOptimizeAt(itrToMatch, container); } auto predicates = createPredicatesOnBucketLevelField(nextMatch->getMatchExpression()); // Try to create a tight bucket predicate to perform bucket level matching. if (predicates.tightPredicate) { _wholeBucketFilterBson = predicates.tightPredicate->serialize(); _wholeBucketFilter = uassertStatusOK(MatchExpressionParser::parse(_wholeBucketFilterBson, pExpCtx, ExtensionsCallbackNoop(), Pipeline::kAllowedMatcherFeatures)); _wholeBucketFilter = MatchExpression::optimize(std::move(_wholeBucketFilter)); } // Push the original event predicate into the unpacking stage. _eventFilterBson = nextMatch->getQuery().getOwned(); _eventFilter = uassertStatusOK(MatchExpressionParser::parse(_eventFilterBson, pExpCtx, ExtensionsCallbackNoop(), Pipeline::kAllowedMatcherFeatures)); _eventFilter = MatchExpression::optimize(std::move(_eventFilter)); _eventFilterDeps = {}; match_expression::addDependencies(_eventFilter.get(), &_eventFilterDeps); container->erase(std::next(itr)); // If the $match is not followed by other stages referencing fields (e.g. $count), we can // unpack directly to BSON so that data doesn't need to be materialized to Document. auto deps = getRestPipelineDependencies(itr, container, false /* includeEventFilter */); if (deps.fields.empty()) { _unpackToBson = true; } // Create a loose bucket predicate and push it before the unpacking stage. if (predicates.loosePredicate) { container->insert( itr, DocumentSourceMatch::create(predicates.loosePredicate->serialize(), pExpCtx)); // Give other stages a chance to optimize with the new $match. return std::prev(itr) == container->begin() ? std::prev(itr) : std::prev(std::prev(itr)); } // We have removed a $match after this stage, so we try to optimize this stage again. return itr; } // Attempt to push down a $project on the metaField past $_internalUnpackBucket. if (!_eventFilter && !haveComputedMetaField) { if (auto [metaProject, deleteRemainder] = extractProjectForPushDown(std::next(itr)->get()); !metaProject.isEmpty()) { container->insert(itr, DocumentSourceProject::createFromBson( BSON("$project" << metaProject).firstElement(), getContext())); if (deleteRemainder) { // We have pushed down the entire $project. Remove the old $project from the // pipeline, then attempt to optimize this stage again. container->erase(std::next(itr)); return std::prev(itr) == container->begin() ? std::prev(itr) : std::prev(std::prev(itr)); } } } // Attempt to extract computed meta projections from subsequent $project, $addFields, or $set // and push them before the $_internalunpackBucket. if (!_eventFilter && pushDownComputedMetaProjection(itr, container)) { // We've pushed down and removed a stage after this one. Try to optimize the new stage. return std::prev(itr) == container->begin() ? std::prev(itr) : std::prev(std::prev(itr)); } // Attempt to build a $project based on dependency analysis or extract one from the pipeline. We // can internalize the result so we can handle projections during unpacking. if (!_triedInternalizeProject) { if (auto [project, isInclusion] = extractOrBuildProjectToInternalize(itr, container); !project.isEmpty()) { _triedInternalizeProject = true; internalizeProject(project, isInclusion); // We may have removed a $project after this stage, so we try to optimize this stage // again. return itr; } } enableStreamingGroupIfPossible(itr, container); return container->end(); } DocumentSource::GetModPathsReturn DocumentSourceInternalUnpackBucket::getModifiedPaths() const { if (_bucketUnpacker.includeMetaField()) { StringMap renames; renames.emplace(*_bucketUnpacker.bucketSpec().metaField(), timeseries::kBucketMetaFieldName); return {GetModPathsReturn::Type::kAllExcept, OrderedPathSet{}, std::move(renames)}; } return {GetModPathsReturn::Type::kAllPaths, OrderedPathSet{}, {}}; } } // namespace mongo