summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorMatt Boros <matt.boros@mongodb.com>2022-06-10 19:34:42 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-06-13 23:50:25 +0000
commit0c75db7b1167d4544254724735535505bb6b4a70 (patch)
treed8448f6756c92be59db555821b6de7d5ef0979cd /src/mongo/db/pipeline
parent8971b20d7b836a2641d7d74f4fe4e41c907811e7 (diff)
downloadmongo-0c75db7b1167d4544254724735535505bb6b4a70.tar.gz
SERVER-64994 Extend the planner to allow soft hints about index traversal directionr6.0.0-rc10
(cherry picked from commit 00d2a56763b2b0da941a41684d20e7080da5058e)
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp77
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h5
2 files changed, 67 insertions, 15 deletions
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index b57dbab5fdc..619bd431584 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/pipeline/pipeline_d.h"
#include "mongo/base/exact_cast.h"
+#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
@@ -87,9 +88,11 @@
#include "mongo/db/query/query_feature_flags_gen.h"
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/query/query_planner.h"
+#include "mongo/db/query/query_planner_params.h"
#include "mongo/db/query/sort_pattern.h"
#include "mongo/db/query/stage_types.h"
#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/db/stats/top.h"
#include "mongo/db/storage/record_store.h"
@@ -244,7 +247,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
SkipThenLimit skipThenLimit,
boost::optional<std::string> groupIdForDistinctScan,
const AggregateCommandRequest* aggRequest,
- const size_t plannerOpts,
+ const QueryPlannerParams& plannerOpts,
const MatchExpressionParser::AllowedFeatureSet& matcherFeatures,
Pipeline* pipeline) {
auto findCommand = std::make_unique<FindCommandRequest>(nss);
@@ -313,7 +316,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
// index would produce one result for '1' and another for '2', which would be incorrect.
auto distinctExecutor =
getExecutorDistinct(&collections.getMainCollection(),
- plannerOpts | QueryPlannerParams::STRICT_DISTINCT_ONLY,
+ plannerOpts.options | QueryPlannerParams::STRICT_DISTINCT_ONLY,
&parsedDistinct);
if (!distinctExecutor.isOK()) {
return distinctExecutor.getStatus().withContext(
@@ -1111,6 +1114,41 @@ bool PipelineD::sortAndKeyPatternPartAgreeAndOnMeta(const BucketUnpacker& bucket
return (keyPatternFieldPath.tail() == sortFieldPath.tail());
}
+boost::optional<TraversalPreference> createTimeSeriesTraversalPreference(
+ DocumentSourceInternalUnpackBucket* unpack, DocumentSourceSort* sort) {
+ const auto metaField = unpack->bucketUnpacker().getMetaField();
+ BSONObjBuilder builder;
+ // Reverse the sort pattern so we can look for indexes that match.
+ for (const auto& sortPart : sort->getSortKeyPattern()) {
+ if (!sortPart.fieldPath) {
+ return boost::none;
+ }
+ const int reversedDirection = sortPart.isAscending ? -1 : 1;
+ const auto& path = sortPart.fieldPath->fullPath();
+ if (metaField.has_value() &&
+ (expression::isPathPrefixOf(*metaField, path) || *metaField == path)) {
+ std::string rewrittenField =
+ std::string{timeseries::kBucketMetaFieldName} + path.substr(metaField->size());
+ builder.append(rewrittenField, reversedDirection);
+ } else if (path == unpack->bucketUnpacker().getTimeField()) {
+ if (reversedDirection == 1) {
+ builder.append(unpack->bucketUnpacker().getMinField(path), reversedDirection);
+ } else {
+ builder.append(unpack->bucketUnpacker().getMaxField(path), reversedDirection);
+ }
+ } else {
+ // The field wasn't meta or time, so no direction preference should be made.
+ return boost::none;
+ }
+ }
+
+ TraversalPreference traversalPreference;
+ traversalPreference.sortPattern = builder.obj();
+ traversalPreference.clusterField = unpack->getMinTimeField();
+ traversalPreference.direction = -1;
+ return traversalPreference;
+}
+
std::pair<PipelineD::AttachExecutorCallback, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>>
PipelineD::buildInnerQueryExecutorGeneric(const MultipleCollectionAccessor& collections,
const NamespaceString& nss,
@@ -1166,6 +1204,19 @@ PipelineD::buildInnerQueryExecutorGeneric(const MultipleCollectionAccessor& coll
? DepsTracker::kDefaultUnavailableMetadata & ~DepsTracker::kOnlyTextScore
: DepsTracker::kDefaultUnavailableMetadata;
+ // If this is a query on a time-series collection then it may be eligible for a post-planning
+ // sort optimization. We check eligibility and perform the rewrite here.
+ auto [unpack, sort] = findUnpackThenSort(pipeline->_sources);
+ QueryPlannerParams plannerOpts;
+ if (serverGlobalParams.featureCompatibility.isVersionInitialized() &&
+ serverGlobalParams.featureCompatibility.isGreaterThanOrEqualTo(
+ multiversion::FeatureCompatibilityVersion::kVersion_6_0) &&
+ feature_flags::gFeatureFlagBucketUnpackWithSort.isEnabled(
+ serverGlobalParams.featureCompatibility) &&
+ unpack && sort) {
+ plannerOpts.traversalPreference = createTimeSeriesTraversalPreference(unpack, sort);
+ }
+
// Create the PlanExecutor.
bool shouldProduceEmptyDocs = false;
auto exec = uassertStatusOK(prepareExecutor(expCtx,
@@ -1179,11 +1230,11 @@ PipelineD::buildInnerQueryExecutorGeneric(const MultipleCollectionAccessor& coll
skipThenLimit,
aggRequest,
Pipeline::kAllowedMatcherFeatures,
- &shouldProduceEmptyDocs));
+ &shouldProduceEmptyDocs,
+ std::move(plannerOpts)));
// If this is a query on a time-series collection then it may be eligible for a post-planning
// sort optimization. We check eligibility and perform the rewrite here.
- auto [unpack, sort] = findUnpackThenSort(pipeline->_sources);
if (serverGlobalParams.featureCompatibility.isVersionInitialized() &&
serverGlobalParams.featureCompatibility.isGreaterThanOrEqualTo(
multiversion::FeatureCompatibilityVersion::kVersion_6_0) &&
@@ -1513,24 +1564,22 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
SkipThenLimit skipThenLimit,
const AggregateCommandRequest* aggRequest,
const MatchExpressionParser::AllowedFeatureSet& matcherFeatures,
- bool* hasNoRequirements) {
+ bool* hasNoRequirements,
+ QueryPlannerParams plannerOpts) {
invariant(hasNoRequirements);
- // Any data returned from the inner executor must be owned.
- size_t plannerOpts = QueryPlannerParams::DEFAULT;
-
bool isChangeStream =
pipeline->peekFront() && pipeline->peekFront()->constraints().isChangeStreamStage();
if (isChangeStream) {
invariant(expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData);
- plannerOpts |= (QueryPlannerParams::TRACK_LATEST_OPLOG_TS |
- QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG);
+ plannerOpts.options |= (QueryPlannerParams::TRACK_LATEST_OPLOG_TS |
+ QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG);
}
// The $_requestReshardingResumeToken parameter is only valid for an oplog scan.
if (aggRequest && aggRequest->getRequestReshardingResumeToken()) {
- plannerOpts |= (QueryPlannerParams::TRACK_LATEST_OPLOG_TS |
- QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG);
+ plannerOpts.options |= (QueryPlannerParams::TRACK_LATEST_OPLOG_TS |
+ QueryPlannerParams::ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG);
}
// If there is a sort stage eligible for pushdown, serialize its SortPattern to a BSONObj. The
@@ -1570,7 +1619,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
if (*hasNoRequirements) {
// This query might be eligible for count optimizations, since the remaining stages in the
// pipeline don't actually need to read any data produced by the query execution layer.
- plannerOpts |= QueryPlannerParams::IS_COUNT;
+ plannerOpts.options |= QueryPlannerParams::IS_COUNT;
} else {
// Build a BSONObj representing a projection eligible for pushdown. If there is an inclusion
// projection at the front of the pipeline, it will be removed and handled by the PlanStage
@@ -1588,7 +1637,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
// top-k sort, which both sorts and limits.)
bool allowExpressions = !sortStage && !skipThenLimit.getSkip() && !skipThenLimit.getLimit();
projObj = buildProjectionForPushdown(deps, pipeline, allowExpressions);
- plannerOpts |= QueryPlannerParams::RETURN_OWNED_DATA;
+ plannerOpts.options |= QueryPlannerParams::RETURN_OWNED_DATA;
}
if (rewrittenGroupStage) {
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index cd40bc33b8b..c109e75b1b8 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -30,6 +30,7 @@
#pragma once
#include "mongo/db/exec/bucket_unpacker.h"
+#include "mongo/db/query/query_planner_params.h"
#include <boost/intrusive_ptr.hpp>
#include <memory>
@@ -44,6 +45,7 @@
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/query/multiple_collection_accessor.h"
#include "mongo/db/query/plan_executor.h"
+#include "mongo/db/query/query_planner.h"
namespace mongo {
class Collection;
@@ -202,7 +204,8 @@ private:
SkipThenLimit skipThenLimit,
const AggregateCommandRequest* aggRequest,
const MatchExpressionParser::AllowedFeatureSet& matcherFeatures,
- bool* hasNoRequirements);
+ bool* hasNoRequirements,
+ QueryPlannerParams plannerOpts = QueryPlannerParams{});
/**
* Build a PlanExecutor and prepare a callback to create a special DocumentSourceGeoNearCursor