diff options
author | Matt Boros <matt.boros@mongodb.com> | 2022-06-10 19:34:42 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-06-10 20:22:54 +0000 |
commit | 00d2a56763b2b0da941a41684d20e7080da5058e (patch) | |
tree | ca0e92620a220e1581632e3846e5f14790c78f8f /src/mongo/db/pipeline/pipeline_d.cpp | |
parent | 5b0d49742e1dd9462262593fe844bf3e65bca1d7 (diff) | |
download | mongo-00d2a56763b2b0da941a41684d20e7080da5058e.tar.gz |
SERVER-64994 Extend the planner to allow soft hints about index traversal direction
Diffstat (limited to 'src/mongo/db/pipeline/pipeline_d.cpp')
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 77 |
1 files changed, 63 insertions, 14 deletions
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 3169859560f..7bd2179d97d 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -34,6 +34,7 @@ #include "mongo/db/pipeline/pipeline_d.h" #include "mongo/base/exact_cast.h" +#include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" @@ -84,9 +85,11 @@ #include "mongo/db/query/query_feature_flags_gen.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/query/query_planner.h" +#include "mongo/db/query/query_planner_params.h" #include "mongo/db/query/sort_pattern.h" #include "mongo/db/query/stage_types.h" #include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/db/stats/top.h" #include "mongo/db/storage/record_store.h" @@ -244,7 +247,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe SkipThenLimit skipThenLimit, boost::optional<std::string> groupIdForDistinctScan, const AggregateCommandRequest* aggRequest, - const size_t plannerOpts, + const QueryPlannerParams& plannerOpts, const MatchExpressionParser::AllowedFeatureSet& matcherFeatures, Pipeline* pipeline) { auto findCommand = std::make_unique<FindCommandRequest>(nss); @@ -313,7 +316,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe // index would produce one result for '1' and another for '2', which would be incorrect. auto distinctExecutor = getExecutorDistinct(&collections.getMainCollection(), - plannerOpts | QueryPlannerParams::STRICT_DISTINCT_ONLY, + plannerOpts.options | QueryPlannerParams::STRICT_DISTINCT_ONLY, &parsedDistinct); if (!distinctExecutor.isOK()) { return distinctExecutor.getStatus().withContext( @@ -1111,6 +1114,41 @@ bool PipelineD::sortAndKeyPatternPartAgreeAndOnMeta(const BucketUnpacker& bucket return (keyPatternFieldPath.tail() == sortFieldPath.tail()); } +boost::optional<TraversalPreference> createTimeSeriesTraversalPreference( + DocumentSourceInternalUnpackBucket* unpack, DocumentSourceSort* sort) { + const auto metaField = unpack->bucketUnpacker().getMetaField(); + BSONObjBuilder builder; + // Reverse the sort pattern so we can look for indexes that match. + for (const auto& sortPart : sort->getSortKeyPattern()) { + if (!sortPart.fieldPath) { + return boost::none; + } + const int reversedDirection = sortPart.isAscending ? -1 : 1; + const auto& path = sortPart.fieldPath->fullPath(); + if (metaField.has_value() && + (expression::isPathPrefixOf(*metaField, path) || *metaField == path)) { + std::string rewrittenField = + std::string{timeseries::kBucketMetaFieldName} + path.substr(metaField->size()); + builder.append(rewrittenField, reversedDirection); + } else if (path == unpack->bucketUnpacker().getTimeField()) { + if (reversedDirection == 1) { + builder.append(unpack->bucketUnpacker().getMinField(path), reversedDirection); + } else { + builder.append(unpack->bucketUnpacker().getMaxField(path), reversedDirection); + } + } else { + // The field wasn't meta or time, so no direction preference should be made. + return boost::none; + } + } + + TraversalPreference traversalPreference; + traversalPreference.sortPattern = builder.obj(); + traversalPreference.clusterField = unpack->getMinTimeField(); + traversalPreference.direction = -1; + return traversalPreference; +} + std::pair<PipelineD::AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::buildInnerQueryExecutorGeneric(const MultipleCollectionAccessor& collections, const NamespaceString& nss, @@ -1166,6 +1204,19 @@ PipelineD::buildInnerQueryExecutorGeneric(const MultipleCollectionAccessor& coll ? DepsTracker::kDefaultUnavailableMetadata & ~DepsTracker::kOnlyTextScore : DepsTracker::kDefaultUnavailableMetadata; + // If this is a query on a time-series collection then it may be eligible for a post-planning + // sort optimization. We check eligibility and perform the rewrite here. + auto [unpack, sort] = findUnpackThenSort(pipeline->_sources); + QueryPlannerParams plannerOpts; + if (serverGlobalParams.featureCompatibility.isVersionInitialized() && + serverGlobalParams.featureCompatibility.isGreaterThanOrEqualTo( + multiversion::FeatureCompatibilityVersion::kVersion_6_0) && + feature_flags::gFeatureFlagBucketUnpackWithSort.isEnabled( + serverGlobalParams.featureCompatibility) && + unpack && sort) { + plannerOpts.traversalPreference = createTimeSeriesTraversalPreference(unpack, sort); + } + // Create the PlanExecutor. bool shouldProduceEmptyDocs = false; auto exec = uassertStatusOK(prepareExecutor(expCtx, @@ -1179,11 +1230,11 @@ PipelineD::buildInnerQueryExecutorGeneric(const MultipleCollectionAccessor& coll skipThenLimit, aggRequest, Pipeline::kAllowedMatcherFeatures, - &shouldProduceEmptyDocs)); + &shouldProduceEmptyDocs, + std::move(plannerOpts))); // If this is a query on a time-series collection then it may be eligible for a post-planning // sort optimization. We check eligibility and perform the rewrite here. - auto [unpack, sort] = findUnpackThenSort(pipeline->_sources); if (serverGlobalParams.featureCompatibility.isVersionInitialized() && serverGlobalParams.featureCompatibility.isGreaterThanOrEqualTo( multiversion::FeatureCompatibilityVersion::kVersion_6_0) && @@ -1513,24 +1564,22 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep SkipThenLimit skipThenLimit, const AggregateCommandRequest* aggRequest, const MatchExpressionParser::AllowedFeatureSet& matcherFeatures, - bool* hasNoRequirements) { + bool* hasNoRequirements, + QueryPlannerParams plannerOpts) { invariant(hasNoRequirements); - // Any data returned from the inner executor must be owned. - size_t plannerOpts = QueryPlannerParams::DEFAULT; - bool isChangeStream = pipeline->peekFront() && pipeline->peekFront()->constraints().isChangeStreamStage(); if (isChangeStream) { invariant(expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData); - plannerOpts |= (QueryPlannerParams::TRACK_LATEST_OPLOG_TS | - QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG); + plannerOpts.options |= (QueryPlannerParams::TRACK_LATEST_OPLOG_TS | + QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG); } // The $_requestReshardingResumeToken parameter is only valid for an oplog scan. if (aggRequest && aggRequest->getRequestReshardingResumeToken()) { - plannerOpts |= (QueryPlannerParams::TRACK_LATEST_OPLOG_TS | - QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG); + plannerOpts.options |= (QueryPlannerParams::TRACK_LATEST_OPLOG_TS | + QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG); } // If there is a sort stage eligible for pushdown, serialize its SortPattern to a BSONObj. The @@ -1570,7 +1619,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep if (*hasNoRequirements) { // This query might be eligible for count optimizations, since the remaining stages in the // pipeline don't actually need to read any data produced by the query execution layer. - plannerOpts |= QueryPlannerParams::IS_COUNT; + plannerOpts.options |= QueryPlannerParams::IS_COUNT; } else { // Build a BSONObj representing a projection eligible for pushdown. If there is an inclusion // projection at the front of the pipeline, it will be removed and handled by the PlanStage @@ -1588,7 +1637,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep // top-k sort, which both sorts and limits.) bool allowExpressions = !sortStage && !skipThenLimit.getSkip() && !skipThenLimit.getLimit(); projObj = buildProjectionForPushdown(deps, pipeline, allowExpressions); - plannerOpts |= QueryPlannerParams::RETURN_OWNED_DATA; + plannerOpts.options |= QueryPlannerParams::RETURN_OWNED_DATA; } if (rewrittenGroupStage) { |