summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/pipeline_d.cpp
diff options
context:
space:
mode:
authorsamontea <merciers.merciers@gmail.com>2022-04-07 19:12:09 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-28 20:05:38 +0000
commitd253f3aa9644ad82a1fc6afa9732267ff45e160e (patch)
tree7fbaf2e75152b052a028f04036e3cdba47183a1d /src/mongo/db/pipeline/pipeline_d.cpp
parent5063c61f2e2ea9eee9501b0f83ed8674fc7f6787 (diff)
downloadmongo-d253f3aa9644ad82a1fc6afa9732267ff45e160e.tar.gz
SERVER-64349 Add heuristic-based planning support for bucket unpacking with sort
Diffstat (limited to 'src/mongo/db/pipeline/pipeline_d.cpp')
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp191
1 files changed, 191 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 0719e235bb9..8347e6ed458 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -28,6 +28,7 @@
*/
#include "mongo/db/query/projection_parser.h"
+#include "mongo/db/server_options.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
#include "mongo/platform/basic.h"
@@ -53,6 +54,7 @@
#include "mongo/db/exec/unpack_timeseries_bucket.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/index/index_access_method.h"
+#include "mongo/db/matcher/expression_expr.h"
#include "mongo/db/matcher/extensions_callback_real.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/ops/write_ops_exec.h"
@@ -77,6 +79,7 @@
#include "mongo/db/query/collation/collator_interface.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_executor_factory.h"
+#include "mongo/db/query/plan_executor_impl.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/query_feature_flags_gen.h"
#include "mongo/db/query/query_knobs_gen.h"
@@ -406,6 +409,35 @@ std::pair<DocumentSourceSample*, DocumentSourceInternalUnpackBucket*> extractSam
return std::pair{sampleStage, unpackStage};
}
+
+std::tuple<DocumentSourceInternalUnpackBucket*, DocumentSourceSort*> findUnpackThenSort(
+ const Pipeline::SourceContainer& sources) {
+ DocumentSourceSort* sortStage = nullptr;
+ DocumentSourceInternalUnpackBucket* unpackStage = nullptr;
+
+ auto sourcesIt = sources.begin();
+ while (sourcesIt != sources.end()) {
+ if (!sortStage) {
+ sortStage = dynamic_cast<DocumentSourceSort*>(sourcesIt->get());
+
+ if (sortStage) {
+ // Do not double optimize
+ if (sortStage->isBoundedSortStage()) {
+ return {nullptr, nullptr};
+ }
+
+ return {unpackStage, sortStage};
+ }
+ }
+
+ if (!unpackStage) {
+ unpackStage = dynamic_cast<DocumentSourceInternalUnpackBucket*>(sourcesIt->get());
+ }
+ ++sourcesIt;
+ }
+
+ return {unpackStage, sortStage};
+}
} // namespace
StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::createRandomCursorExecutor(
@@ -944,6 +976,165 @@ PipelineD::buildInnerQueryExecutorGeneric(const MultipleCollectionAccessor& coll
Pipeline::kAllowedMatcherFeatures,
&shouldProduceEmptyDocs));
+ // If this is a query on a time-series collection then it may be eligible for a post-planning
+ // sort optimization. We check eligibility and perform the rewrite here.
+ auto [unpack, sort] = findUnpackThenSort(pipeline->_sources);
+ if (serverGlobalParams.featureCompatibility.isVersionInitialized() &&
+ serverGlobalParams.featureCompatibility.isGreaterThanOrEqualTo(
+ multiversion::FeatureCompatibilityVersion::kVersion_6_1) &&
+ feature_flags::gFeatureFlagBucketUnpackWithSort.isEnabled(
+ serverGlobalParams.featureCompatibility) &&
+ unpack && sort) {
+ auto execImpl = dynamic_cast<PlanExecutorImpl*>(exec.get());
+ if (execImpl) {
+
+ // Get source stage
+ PlanStage* rootStage = execImpl->getRootStage();
+ while (rootStage && rootStage->getChildren().size() == 1) {
+ switch (rootStage->stageType()) {
+ case STAGE_FETCH:
+ rootStage = rootStage->child().get();
+ break;
+ case STAGE_SHARDING_FILTER:
+ rootStage = rootStage->child().get();
+ break;
+ default:
+ rootStage = nullptr;
+ }
+ }
+
+ if (rootStage && rootStage->getChildren().size() != 0) {
+ rootStage = nullptr;
+ }
+
+ const auto& sortPattern = sort->getSortKeyPattern();
+ if (auto agree = unpack->supportsSort(rootStage, sortPattern)) {
+ // Scan the pipeline to check if it's compatible with the optimization.
+ bool badStage = false;
+ bool seenSort = false;
+ std::list<boost::intrusive_ptr<DocumentSource>>::iterator iter =
+ pipeline->_sources.begin();
+ std::list<boost::intrusive_ptr<DocumentSource>>::iterator unpackIter =
+ pipeline->_sources.end();
+ for (; !badStage && iter != pipeline->_sources.end() && !seenSort; ++iter) {
+ if (dynamic_cast<const DocumentSourceSort*>(iter->get())) {
+ seenSort = true;
+ } else if (dynamic_cast<const DocumentSourceMatch*>(iter->get())) {
+ // do nothing
+ } else if (dynamic_cast<const DocumentSourceInternalUnpackBucket*>(
+ iter->get())) {
+ unpackIter = iter;
+ } else if (auto projection =
+ dynamic_cast<const DocumentSourceSingleDocumentTransformation*>(
+ iter->get())) {
+ auto modPaths = projection->getModifiedPaths();
+
+ // Check to see if the sort paths are modified.
+ for (auto sortIter = sortPattern.begin();
+ !badStage && sortIter != sortPattern.end();
+ ++sortIter) {
+
+ auto fieldPath = sortIter->fieldPath;
+ // If they are then escap the loop & don't optimize.
+ if (!fieldPath || modPaths.canModify(*fieldPath)) {
+ badStage = true;
+ }
+ }
+
+ } else {
+ badStage = true;
+ }
+ }
+ if (!badStage && seenSort) {
+ auto [indexSortOrderAgree, indexOrderedByMinTime] = *agree;
+ // This is safe because we have seen a sort so we must have at least one stage
+ // to the left of the current iterator position.
+ --iter;
+
+ if (indexOrderedByMinTime) {
+ unpack->setIncludeMinTimeAsMetadata();
+ } else {
+ unpack->setIncludeMaxTimeAsMetadata();
+ }
+
+ if (indexSortOrderAgree) {
+ pipeline->_sources.insert(
+ iter,
+ DocumentSourceSort::createBoundedSort(sort->getSortKeyPattern(),
+ (indexOrderedByMinTime
+ ? DocumentSourceSort::kMin
+ : DocumentSourceSort::kMax),
+ 0,
+ sort->getLimit(),
+ expCtx));
+ } else {
+ // Since the sortPattern and the direction of the index don't agree we must
+ // use the offset to get an estimate on the bounds of the bucket.
+ pipeline->_sources.insert(
+ iter,
+ DocumentSourceSort::createBoundedSort(
+ sort->getSortKeyPattern(),
+ (indexOrderedByMinTime ? DocumentSourceSort::kMin
+ : DocumentSourceSort::kMax),
+ ((indexOrderedByMinTime) ? unpack->getBucketMaxSpanSeconds()
+ : -unpack->getBucketMaxSpanSeconds()) *
+ 1000,
+ sort->getLimit(),
+ expCtx));
+
+ /**
+ * We wish to create the following predicate to avoid returning incorrect
+ * results in the unlikely event bucketMaxSpanSeconds changes under us.
+ *
+ * {$expr:
+ * {$lte: [
+ * {$subtract: [$control.max.timeField, $control.min.timeField]},
+ * {$const: bucketMaxSpanSeconds, in milliseconds}
+ * ]}}
+ */
+ auto minTime = unpack->getMinTimeField();
+ auto maxTime = unpack->getMaxTimeField();
+ auto match = std::make_unique<ExprMatchExpression>(
+ // This produces {$lte: ... }
+ make_intrusive<ExpressionCompare>(
+ expCtx.get(),
+ ExpressionCompare::CmpOp::LTE,
+ // This produces [...]
+ makeVector<boost::intrusive_ptr<Expression>>(
+ // This produces {$subtract: ... }
+ make_intrusive<ExpressionSubtract>(
+ expCtx.get(),
+ // This produces [...]
+ makeVector<boost::intrusive_ptr<Expression>>(
+ // This produces "$control.max.timeField"
+ ExpressionFieldPath::createPathFromString(
+ expCtx.get(), maxTime, expCtx->variablesParseState),
+ // This produces "$control.min.timeField"
+ ExpressionFieldPath::createPathFromString(
+ expCtx.get(),
+ minTime,
+ expCtx->variablesParseState))),
+ // This produces {$const: maxBucketSpanSeconds}
+ make_intrusive<ExpressionConstant>(
+ expCtx.get(),
+ Value{unpack->getBucketMaxSpanSeconds() * 1000}))),
+ expCtx);
+ pipeline->_sources.insert(
+ unpackIter,
+ make_intrusive<DocumentSourceMatch>(std::move(match), expCtx));
+ }
+ // Ensure we're erasing the sort source.
+ tassert(6434901,
+ "we must erase a $sort stage and replace it with a bounded sort stage",
+ strcmp((*iter)->getSourceName(),
+ DocumentSourceSort::kStageName.rawData()) == 0);
+ pipeline->_sources.erase(iter);
+ pipeline->stitch();
+ }
+ }
+ }
+ }
+
const auto cursorType = shouldProduceEmptyDocs
? DocumentSourceCursor::CursorType::kEmptyDocuments
: DocumentSourceCursor::CursorType::kRegular;