diff options
author | samontea <merciers.merciers@gmail.com> | 2022-04-07 19:12:09 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-28 20:05:38 +0000 |
commit | d253f3aa9644ad82a1fc6afa9732267ff45e160e (patch) | |
tree | 7fbaf2e75152b052a028f04036e3cdba47183a1d /src/mongo/db/pipeline/pipeline_d.cpp | |
parent | 5063c61f2e2ea9eee9501b0f83ed8674fc7f6787 (diff) | |
download | mongo-d253f3aa9644ad82a1fc6afa9732267ff45e160e.tar.gz |
SERVER-64349 Add heuristic-based planning support for bucket unpacking with sort
Diffstat (limited to 'src/mongo/db/pipeline/pipeline_d.cpp')
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 191 |
1 files changed, 191 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 0719e235bb9..8347e6ed458 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -28,6 +28,7 @@ */ #include "mongo/db/query/projection_parser.h" +#include "mongo/db/server_options.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery #include "mongo/platform/basic.h" @@ -53,6 +54,7 @@ #include "mongo/db/exec/unpack_timeseries_bucket.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/index/index_access_method.h" +#include "mongo/db/matcher/expression_expr.h" #include "mongo/db/matcher/extensions_callback_real.h" #include "mongo/db/namespace_string.h" #include "mongo/db/ops/write_ops_exec.h" @@ -77,6 +79,7 @@ #include "mongo/db/query/collation/collator_interface.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_executor_factory.h" +#include "mongo/db/query/plan_executor_impl.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/query_feature_flags_gen.h" #include "mongo/db/query/query_knobs_gen.h" @@ -406,6 +409,35 @@ std::pair<DocumentSourceSample*, DocumentSourceInternalUnpackBucket*> extractSam return std::pair{sampleStage, unpackStage}; } + +std::tuple<DocumentSourceInternalUnpackBucket*, DocumentSourceSort*> findUnpackThenSort( + const Pipeline::SourceContainer& sources) { + DocumentSourceSort* sortStage = nullptr; + DocumentSourceInternalUnpackBucket* unpackStage = nullptr; + + auto sourcesIt = sources.begin(); + while (sourcesIt != sources.end()) { + if (!sortStage) { + sortStage = dynamic_cast<DocumentSourceSort*>(sourcesIt->get()); + + if (sortStage) { + // Do not double optimize + if (sortStage->isBoundedSortStage()) { + return {nullptr, nullptr}; + } + + return {unpackStage, sortStage}; + } + } + + if (!unpackStage) { + unpackStage = dynamic_cast<DocumentSourceInternalUnpackBucket*>(sourcesIt->get()); + } + ++sourcesIt; + } + + return {unpackStage, sortStage}; +} } // namespace StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::createRandomCursorExecutor( @@ -944,6 +976,165 @@ PipelineD::buildInnerQueryExecutorGeneric(const MultipleCollectionAccessor& coll Pipeline::kAllowedMatcherFeatures, &shouldProduceEmptyDocs)); + // If this is a query on a time-series collection then it may be eligible for a post-planning + // sort optimization. We check eligibility and perform the rewrite here. + auto [unpack, sort] = findUnpackThenSort(pipeline->_sources); + if (serverGlobalParams.featureCompatibility.isVersionInitialized() && + serverGlobalParams.featureCompatibility.isGreaterThanOrEqualTo( + multiversion::FeatureCompatibilityVersion::kVersion_6_1) && + feature_flags::gFeatureFlagBucketUnpackWithSort.isEnabled( + serverGlobalParams.featureCompatibility) && + unpack && sort) { + auto execImpl = dynamic_cast<PlanExecutorImpl*>(exec.get()); + if (execImpl) { + + // Get source stage + PlanStage* rootStage = execImpl->getRootStage(); + while (rootStage && rootStage->getChildren().size() == 1) { + switch (rootStage->stageType()) { + case STAGE_FETCH: + rootStage = rootStage->child().get(); + break; + case STAGE_SHARDING_FILTER: + rootStage = rootStage->child().get(); + break; + default: + rootStage = nullptr; + } + } + + if (rootStage && rootStage->getChildren().size() != 0) { + rootStage = nullptr; + } + + const auto& sortPattern = sort->getSortKeyPattern(); + if (auto agree = unpack->supportsSort(rootStage, sortPattern)) { + // Scan the pipeline to check if it's compatible with the optimization. + bool badStage = false; + bool seenSort = false; + std::list<boost::intrusive_ptr<DocumentSource>>::iterator iter = + pipeline->_sources.begin(); + std::list<boost::intrusive_ptr<DocumentSource>>::iterator unpackIter = + pipeline->_sources.end(); + for (; !badStage && iter != pipeline->_sources.end() && !seenSort; ++iter) { + if (dynamic_cast<const DocumentSourceSort*>(iter->get())) { + seenSort = true; + } else if (dynamic_cast<const DocumentSourceMatch*>(iter->get())) { + // do nothing + } else if (dynamic_cast<const DocumentSourceInternalUnpackBucket*>( + iter->get())) { + unpackIter = iter; + } else if (auto projection = + dynamic_cast<const DocumentSourceSingleDocumentTransformation*>( + iter->get())) { + auto modPaths = projection->getModifiedPaths(); + + // Check to see if the sort paths are modified. + for (auto sortIter = sortPattern.begin(); + !badStage && sortIter != sortPattern.end(); + ++sortIter) { + + auto fieldPath = sortIter->fieldPath; + // If they are then escap the loop & don't optimize. + if (!fieldPath || modPaths.canModify(*fieldPath)) { + badStage = true; + } + } + + } else { + badStage = true; + } + } + if (!badStage && seenSort) { + auto [indexSortOrderAgree, indexOrderedByMinTime] = *agree; + // This is safe because we have seen a sort so we must have at least one stage + // to the left of the current iterator position. + --iter; + + if (indexOrderedByMinTime) { + unpack->setIncludeMinTimeAsMetadata(); + } else { + unpack->setIncludeMaxTimeAsMetadata(); + } + + if (indexSortOrderAgree) { + pipeline->_sources.insert( + iter, + DocumentSourceSort::createBoundedSort(sort->getSortKeyPattern(), + (indexOrderedByMinTime + ? DocumentSourceSort::kMin + : DocumentSourceSort::kMax), + 0, + sort->getLimit(), + expCtx)); + } else { + // Since the sortPattern and the direction of the index don't agree we must + // use the offset to get an estimate on the bounds of the bucket. + pipeline->_sources.insert( + iter, + DocumentSourceSort::createBoundedSort( + sort->getSortKeyPattern(), + (indexOrderedByMinTime ? DocumentSourceSort::kMin + : DocumentSourceSort::kMax), + ((indexOrderedByMinTime) ? unpack->getBucketMaxSpanSeconds() + : -unpack->getBucketMaxSpanSeconds()) * + 1000, + sort->getLimit(), + expCtx)); + + /** + * We wish to create the following predicate to avoid returning incorrect + * results in the unlikely event bucketMaxSpanSeconds changes under us. + * + * {$expr: + * {$lte: [ + * {$subtract: [$control.max.timeField, $control.min.timeField]}, + * {$const: bucketMaxSpanSeconds, in milliseconds} + * ]}} + */ + auto minTime = unpack->getMinTimeField(); + auto maxTime = unpack->getMaxTimeField(); + auto match = std::make_unique<ExprMatchExpression>( + // This produces {$lte: ... } + make_intrusive<ExpressionCompare>( + expCtx.get(), + ExpressionCompare::CmpOp::LTE, + // This produces [...] + makeVector<boost::intrusive_ptr<Expression>>( + // This produces {$subtract: ... } + make_intrusive<ExpressionSubtract>( + expCtx.get(), + // This produces [...] + makeVector<boost::intrusive_ptr<Expression>>( + // This produces "$control.max.timeField" + ExpressionFieldPath::createPathFromString( + expCtx.get(), maxTime, expCtx->variablesParseState), + // This produces "$control.min.timeField" + ExpressionFieldPath::createPathFromString( + expCtx.get(), + minTime, + expCtx->variablesParseState))), + // This produces {$const: maxBucketSpanSeconds} + make_intrusive<ExpressionConstant>( + expCtx.get(), + Value{unpack->getBucketMaxSpanSeconds() * 1000}))), + expCtx); + pipeline->_sources.insert( + unpackIter, + make_intrusive<DocumentSourceMatch>(std::move(match), expCtx)); + } + // Ensure we're erasing the sort source. + tassert(6434901, + "we must erase a $sort stage and replace it with a bounded sort stage", + strcmp((*iter)->getSourceName(), + DocumentSourceSort::kStageName.rawData()) == 0); + pipeline->_sources.erase(iter); + pipeline->stitch(); + } + } + } + } + const auto cursorType = shouldProduceEmptyDocs ? DocumentSourceCursor::CursorType::kEmptyDocuments : DocumentSourceCursor::CursorType::kRegular; |