diff options
author | David Storch <david.storch@mongodb.com> | 2020-07-09 20:07:55 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-07-24 16:31:37 +0000 |
commit | b4b35f9cc69412611a198642333bf40daa5ba58c (patch) | |
tree | 909673b812a499a60692c46abb53853f7df42b48 /src/mongo/db/query/plan_executor_impl.cpp | |
parent | 5e53ee3ca0a90eb98cdab94b298dec810fb46804 (diff) | |
download | mongo-b4b35f9cc69412611a198642333bf40daa5ba58c.tar.gz |
SERVER-48478 Replace PipelineProxyStage with PlanExecutorPipeline
Diffstat (limited to 'src/mongo/db/query/plan_executor_impl.cpp')
-rw-r--r-- | src/mongo/db/query/plan_executor_impl.cpp | 151 |
1 files changed, 110 insertions, 41 deletions
diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp index 4e661a4f2ac..cc3923a7990 100644 --- a/src/mongo/db/query/plan_executor_impl.cpp +++ b/src/mongo/db/query/plan_executor_impl.cpp @@ -42,15 +42,21 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/exec/cached_plan.h" -#include "mongo/db/exec/change_stream_proxy.h" #include "mongo/db/exec/collection_scan.h" -#include "mongo/db/exec/multi_plan.h" +#include "mongo/db/exec/count_scan.h" +#include "mongo/db/exec/distinct_scan.h" +#include "mongo/db/exec/idhack.h" +#include "mongo/db/exec/index_scan.h" +#include "mongo/db/exec/near.h" #include "mongo/db/exec/plan_stage.h" #include "mongo/db/exec/plan_stats.h" +#include "mongo/db/exec/sort.h" #include "mongo/db/exec/subplan.h" +#include "mongo/db/exec/text.h" #include "mongo/db/exec/trial_stage.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/db/query/explain.h" #include "mongo/db/query/find_common.h" #include "mongo/db/query/mock_yield_policies.h" #include "mongo/db/query/plan_yield_policy_impl.h" @@ -149,14 +155,11 @@ PlanExecutorImpl::PlanExecutorImpl(OperationContext* opCtx, invariant(!_expCtx || _expCtx->opCtx == _opCtx); invariant(!_cq || !_expCtx || _cq->getExpCtx() == _expCtx); - // Both ChangeStreamProxy and CollectionScan stages can provide oplog tracking info, such as - // post batch resume token, or latest oplog timestamp. If either of these two stages is present - // in the execution tree, then cache it for fast retrieval of the oplog info, avoiding the need - // traverse the tree in runtime. - if (auto changeStreamProxy = getStageByType(_root.get(), STAGE_CHANGE_STREAM_PROXY)) { - _oplogTrackingStage = static_cast<ChangeStreamProxyStage*>(changeStreamProxy); - } else if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN)) { - _oplogTrackingStage = static_cast<CollectionScan*>(collectionScan); + // If this PlanExecutor is executing a COLLSCAN, keep a pointer directly to the COLLSCAN stage. + // This is used for change streams in order to keep the the latest oplog timestamp and post + // batch resume token up to date as the oplog scan progresses. + if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN)) { + _collScanStage = static_cast<CollectionScan*>(collectionScan); } // We may still need to initialize _nss from either collection or _cq. @@ -582,41 +585,15 @@ bool PlanExecutorImpl::isDisposed() const { } Timestamp PlanExecutorImpl::getLatestOplogTimestamp() const { - if (!_oplogTrackingStage) { - return {}; - } - - const auto stageType = _oplogTrackingStage->stageType(); - if (stageType == STAGE_COLLSCAN) { - return static_cast<const CollectionScan*>(_oplogTrackingStage)->getLatestOplogTimestamp(); - } else { - invariant(stageType == STAGE_CHANGE_STREAM_PROXY); - return static_cast<const ChangeStreamProxyStage*>(_oplogTrackingStage) - ->getLatestOplogTimestamp(); - } + return _collScanStage ? _collScanStage->getLatestOplogTimestamp() : Timestamp{}; } BSONObj PlanExecutorImpl::getPostBatchResumeToken() const { static const BSONObj kEmptyPBRT; - if (!_oplogTrackingStage) { - return kEmptyPBRT; - } - - const auto stageType = _oplogTrackingStage->stageType(); - if (stageType == STAGE_COLLSCAN) { - return static_cast<const CollectionScan*>(_oplogTrackingStage)->getPostBatchResumeToken(); - } else { - invariant(stageType == STAGE_CHANGE_STREAM_PROXY); - return static_cast<const ChangeStreamProxyStage*>(_oplogTrackingStage) - ->getPostBatchResumeToken(); - } + return _collScanStage ? _collScanStage->getPostBatchResumeToken() : kEmptyPBRT; } PlanExecutor::LockPolicy PlanExecutorImpl::lockPolicy() const { - if (isPipelineExecutor()) { - return LockPolicy::kLocksInternally; - } - // If this PlanExecutor is simply unspooling queued data, then there is no need to acquire // locks. if (_root->stageType() == StageType::STAGE_QUEUED_DATA) { @@ -626,8 +603,100 @@ PlanExecutor::LockPolicy PlanExecutorImpl::lockPolicy() const { return LockPolicy::kLockExternally; } -bool PlanExecutorImpl::isPipelineExecutor() const { - return _root->stageType() == StageType::STAGE_PIPELINE_PROXY || - _root->stageType() == StageType::STAGE_CHANGE_STREAM_PROXY; +std::string PlanExecutorImpl::getPlanSummary() const { + return Explain::getPlanSummary(_root.get()); +} + +void PlanExecutorImpl::getSummaryStats(PlanSummaryStats* statsOut) const { + invariant(statsOut); + + // We can get some of the fields we need from the common stats stored in the + // root stage of the plan tree. + const CommonStats* common = _root->getCommonStats(); + statsOut->nReturned = common->advanced; + + // The other fields are aggregations over the stages in the plan tree. We flatten + // the tree into a list and then compute these aggregations. + std::vector<const PlanStage*> stages; + Explain::flattenExecTree(_root.get(), &stages); + + statsOut->totalKeysExamined = 0; + statsOut->totalDocsExamined = 0; + + for (size_t i = 0; i < stages.size(); i++) { + statsOut->totalKeysExamined += + Explain::getKeysExamined(stages[i]->stageType(), stages[i]->getSpecificStats()); + statsOut->totalDocsExamined += + Explain::getDocsExamined(stages[i]->stageType(), stages[i]->getSpecificStats()); + + if (isSortStageType(stages[i]->stageType())) { + statsOut->hasSortStage = true; + + auto sortStage = static_cast<const SortStage*>(stages[i]); + auto sortStats = static_cast<const SortStats*>(sortStage->getSpecificStats()); + statsOut->usedDisk = sortStats->wasDiskUsed; + } + + if (STAGE_IXSCAN == stages[i]->stageType()) { + const IndexScan* ixscan = static_cast<const IndexScan*>(stages[i]); + const IndexScanStats* ixscanStats = + static_cast<const IndexScanStats*>(ixscan->getSpecificStats()); + statsOut->indexesUsed.insert(ixscanStats->indexName); + } else if (STAGE_COUNT_SCAN == stages[i]->stageType()) { + const CountScan* countScan = static_cast<const CountScan*>(stages[i]); + const CountScanStats* countScanStats = + static_cast<const CountScanStats*>(countScan->getSpecificStats()); + statsOut->indexesUsed.insert(countScanStats->indexName); + } else if (STAGE_IDHACK == stages[i]->stageType()) { + const IDHackStage* idHackStage = static_cast<const IDHackStage*>(stages[i]); + const IDHackStats* idHackStats = + static_cast<const IDHackStats*>(idHackStage->getSpecificStats()); + statsOut->indexesUsed.insert(idHackStats->indexName); + } else if (STAGE_DISTINCT_SCAN == stages[i]->stageType()) { + const DistinctScan* distinctScan = static_cast<const DistinctScan*>(stages[i]); + const DistinctScanStats* distinctScanStats = + static_cast<const DistinctScanStats*>(distinctScan->getSpecificStats()); + statsOut->indexesUsed.insert(distinctScanStats->indexName); + } else if (STAGE_TEXT == stages[i]->stageType()) { + const TextStage* textStage = static_cast<const TextStage*>(stages[i]); + const TextStats* textStats = + static_cast<const TextStats*>(textStage->getSpecificStats()); + statsOut->indexesUsed.insert(textStats->indexName); + } else if (STAGE_GEO_NEAR_2D == stages[i]->stageType() || + STAGE_GEO_NEAR_2DSPHERE == stages[i]->stageType()) { + const NearStage* nearStage = static_cast<const NearStage*>(stages[i]); + const NearStats* nearStats = + static_cast<const NearStats*>(nearStage->getSpecificStats()); + statsOut->indexesUsed.insert(nearStats->indexName); + } else if (STAGE_CACHED_PLAN == stages[i]->stageType()) { + const CachedPlanStage* cachedPlan = static_cast<const CachedPlanStage*>(stages[i]); + const CachedPlanStats* cachedStats = + static_cast<const CachedPlanStats*>(cachedPlan->getSpecificStats()); + statsOut->replanReason = cachedStats->replanReason; + } else if (STAGE_MULTI_PLAN == stages[i]->stageType()) { + statsOut->fromMultiPlanner = true; + } else if (STAGE_COLLSCAN == stages[i]->stageType()) { + statsOut->collectionScans++; + const auto collScan = static_cast<const CollectionScan*>(stages[i]); + const auto collScanStats = + static_cast<const CollectionScanStats*>(collScan->getSpecificStats()); + if (!collScanStats->tailable) + statsOut->collectionScansNonTailable++; + } + } +} + +BSONObj PlanExecutorImpl::getStats() const { + // Serialize all stats from the winning plan. + auto mps = getMultiPlanStage(); + auto winningPlanStats = + mps ? std::move(mps->getStats()->children[mps->bestPlanIdx()]) : _root->getStats(); + return Explain::statsToBSON(*winningPlanStats); +} + +MultiPlanStage* PlanExecutorImpl::getMultiPlanStage() const { + PlanStage* ps = getStageByType(_root.get(), StageType::STAGE_MULTI_PLAN); + invariant(ps == nullptr || ps->stageType() == StageType::STAGE_MULTI_PLAN); + return static_cast<MultiPlanStage*>(ps); } } // namespace mongo |