summaryrefslogtreecommitdiff
path: root/src/mongo/db/query/plan_executor_impl.cpp
diff options
context:
space:
mode:
authorDavid Storch <david.storch@mongodb.com>2020-07-09 20:07:55 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-07-24 16:31:37 +0000
commitb4b35f9cc69412611a198642333bf40daa5ba58c (patch)
tree909673b812a499a60692c46abb53853f7df42b48 /src/mongo/db/query/plan_executor_impl.cpp
parent5e53ee3ca0a90eb98cdab94b298dec810fb46804 (diff)
downloadmongo-b4b35f9cc69412611a198642333bf40daa5ba58c.tar.gz
SERVER-48478 Replace PipelineProxyStage with PlanExecutorPipeline
Diffstat (limited to 'src/mongo/db/query/plan_executor_impl.cpp')
-rw-r--r--src/mongo/db/query/plan_executor_impl.cpp151
1 files changed, 110 insertions, 41 deletions
diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp
index 4e661a4f2ac..cc3923a7990 100644
--- a/src/mongo/db/query/plan_executor_impl.cpp
+++ b/src/mongo/db/query/plan_executor_impl.cpp
@@ -42,15 +42,21 @@
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
#include "mongo/db/exec/cached_plan.h"
-#include "mongo/db/exec/change_stream_proxy.h"
#include "mongo/db/exec/collection_scan.h"
-#include "mongo/db/exec/multi_plan.h"
+#include "mongo/db/exec/count_scan.h"
+#include "mongo/db/exec/distinct_scan.h"
+#include "mongo/db/exec/idhack.h"
+#include "mongo/db/exec/index_scan.h"
+#include "mongo/db/exec/near.h"
#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/exec/plan_stats.h"
+#include "mongo/db/exec/sort.h"
#include "mongo/db/exec/subplan.h"
+#include "mongo/db/exec/text.h"
#include "mongo/db/exec/trial_stage.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/query/explain.h"
#include "mongo/db/query/find_common.h"
#include "mongo/db/query/mock_yield_policies.h"
#include "mongo/db/query/plan_yield_policy_impl.h"
@@ -149,14 +155,11 @@ PlanExecutorImpl::PlanExecutorImpl(OperationContext* opCtx,
invariant(!_expCtx || _expCtx->opCtx == _opCtx);
invariant(!_cq || !_expCtx || _cq->getExpCtx() == _expCtx);
- // Both ChangeStreamProxy and CollectionScan stages can provide oplog tracking info, such as
- // post batch resume token, or latest oplog timestamp. If either of these two stages is present
- // in the execution tree, then cache it for fast retrieval of the oplog info, avoiding the need
- // traverse the tree in runtime.
- if (auto changeStreamProxy = getStageByType(_root.get(), STAGE_CHANGE_STREAM_PROXY)) {
- _oplogTrackingStage = static_cast<ChangeStreamProxyStage*>(changeStreamProxy);
- } else if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN)) {
- _oplogTrackingStage = static_cast<CollectionScan*>(collectionScan);
+ // If this PlanExecutor is executing a COLLSCAN, keep a pointer directly to the COLLSCAN stage.
+ // This is used for change streams in order to keep the the latest oplog timestamp and post
+ // batch resume token up to date as the oplog scan progresses.
+ if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN)) {
+ _collScanStage = static_cast<CollectionScan*>(collectionScan);
}
// We may still need to initialize _nss from either collection or _cq.
@@ -582,41 +585,15 @@ bool PlanExecutorImpl::isDisposed() const {
}
Timestamp PlanExecutorImpl::getLatestOplogTimestamp() const {
- if (!_oplogTrackingStage) {
- return {};
- }
-
- const auto stageType = _oplogTrackingStage->stageType();
- if (stageType == STAGE_COLLSCAN) {
- return static_cast<const CollectionScan*>(_oplogTrackingStage)->getLatestOplogTimestamp();
- } else {
- invariant(stageType == STAGE_CHANGE_STREAM_PROXY);
- return static_cast<const ChangeStreamProxyStage*>(_oplogTrackingStage)
- ->getLatestOplogTimestamp();
- }
+ return _collScanStage ? _collScanStage->getLatestOplogTimestamp() : Timestamp{};
}
BSONObj PlanExecutorImpl::getPostBatchResumeToken() const {
static const BSONObj kEmptyPBRT;
- if (!_oplogTrackingStage) {
- return kEmptyPBRT;
- }
-
- const auto stageType = _oplogTrackingStage->stageType();
- if (stageType == STAGE_COLLSCAN) {
- return static_cast<const CollectionScan*>(_oplogTrackingStage)->getPostBatchResumeToken();
- } else {
- invariant(stageType == STAGE_CHANGE_STREAM_PROXY);
- return static_cast<const ChangeStreamProxyStage*>(_oplogTrackingStage)
- ->getPostBatchResumeToken();
- }
+ return _collScanStage ? _collScanStage->getPostBatchResumeToken() : kEmptyPBRT;
}
PlanExecutor::LockPolicy PlanExecutorImpl::lockPolicy() const {
- if (isPipelineExecutor()) {
- return LockPolicy::kLocksInternally;
- }
-
// If this PlanExecutor is simply unspooling queued data, then there is no need to acquire
// locks.
if (_root->stageType() == StageType::STAGE_QUEUED_DATA) {
@@ -626,8 +603,100 @@ PlanExecutor::LockPolicy PlanExecutorImpl::lockPolicy() const {
return LockPolicy::kLockExternally;
}
-bool PlanExecutorImpl::isPipelineExecutor() const {
- return _root->stageType() == StageType::STAGE_PIPELINE_PROXY ||
- _root->stageType() == StageType::STAGE_CHANGE_STREAM_PROXY;
+std::string PlanExecutorImpl::getPlanSummary() const {
+ return Explain::getPlanSummary(_root.get());
+}
+
+void PlanExecutorImpl::getSummaryStats(PlanSummaryStats* statsOut) const {
+ invariant(statsOut);
+
+ // We can get some of the fields we need from the common stats stored in the
+ // root stage of the plan tree.
+ const CommonStats* common = _root->getCommonStats();
+ statsOut->nReturned = common->advanced;
+
+ // The other fields are aggregations over the stages in the plan tree. We flatten
+ // the tree into a list and then compute these aggregations.
+ std::vector<const PlanStage*> stages;
+ Explain::flattenExecTree(_root.get(), &stages);
+
+ statsOut->totalKeysExamined = 0;
+ statsOut->totalDocsExamined = 0;
+
+ for (size_t i = 0; i < stages.size(); i++) {
+ statsOut->totalKeysExamined +=
+ Explain::getKeysExamined(stages[i]->stageType(), stages[i]->getSpecificStats());
+ statsOut->totalDocsExamined +=
+ Explain::getDocsExamined(stages[i]->stageType(), stages[i]->getSpecificStats());
+
+ if (isSortStageType(stages[i]->stageType())) {
+ statsOut->hasSortStage = true;
+
+ auto sortStage = static_cast<const SortStage*>(stages[i]);
+ auto sortStats = static_cast<const SortStats*>(sortStage->getSpecificStats());
+ statsOut->usedDisk = sortStats->wasDiskUsed;
+ }
+
+ if (STAGE_IXSCAN == stages[i]->stageType()) {
+ const IndexScan* ixscan = static_cast<const IndexScan*>(stages[i]);
+ const IndexScanStats* ixscanStats =
+ static_cast<const IndexScanStats*>(ixscan->getSpecificStats());
+ statsOut->indexesUsed.insert(ixscanStats->indexName);
+ } else if (STAGE_COUNT_SCAN == stages[i]->stageType()) {
+ const CountScan* countScan = static_cast<const CountScan*>(stages[i]);
+ const CountScanStats* countScanStats =
+ static_cast<const CountScanStats*>(countScan->getSpecificStats());
+ statsOut->indexesUsed.insert(countScanStats->indexName);
+ } else if (STAGE_IDHACK == stages[i]->stageType()) {
+ const IDHackStage* idHackStage = static_cast<const IDHackStage*>(stages[i]);
+ const IDHackStats* idHackStats =
+ static_cast<const IDHackStats*>(idHackStage->getSpecificStats());
+ statsOut->indexesUsed.insert(idHackStats->indexName);
+ } else if (STAGE_DISTINCT_SCAN == stages[i]->stageType()) {
+ const DistinctScan* distinctScan = static_cast<const DistinctScan*>(stages[i]);
+ const DistinctScanStats* distinctScanStats =
+ static_cast<const DistinctScanStats*>(distinctScan->getSpecificStats());
+ statsOut->indexesUsed.insert(distinctScanStats->indexName);
+ } else if (STAGE_TEXT == stages[i]->stageType()) {
+ const TextStage* textStage = static_cast<const TextStage*>(stages[i]);
+ const TextStats* textStats =
+ static_cast<const TextStats*>(textStage->getSpecificStats());
+ statsOut->indexesUsed.insert(textStats->indexName);
+ } else if (STAGE_GEO_NEAR_2D == stages[i]->stageType() ||
+ STAGE_GEO_NEAR_2DSPHERE == stages[i]->stageType()) {
+ const NearStage* nearStage = static_cast<const NearStage*>(stages[i]);
+ const NearStats* nearStats =
+ static_cast<const NearStats*>(nearStage->getSpecificStats());
+ statsOut->indexesUsed.insert(nearStats->indexName);
+ } else if (STAGE_CACHED_PLAN == stages[i]->stageType()) {
+ const CachedPlanStage* cachedPlan = static_cast<const CachedPlanStage*>(stages[i]);
+ const CachedPlanStats* cachedStats =
+ static_cast<const CachedPlanStats*>(cachedPlan->getSpecificStats());
+ statsOut->replanReason = cachedStats->replanReason;
+ } else if (STAGE_MULTI_PLAN == stages[i]->stageType()) {
+ statsOut->fromMultiPlanner = true;
+ } else if (STAGE_COLLSCAN == stages[i]->stageType()) {
+ statsOut->collectionScans++;
+ const auto collScan = static_cast<const CollectionScan*>(stages[i]);
+ const auto collScanStats =
+ static_cast<const CollectionScanStats*>(collScan->getSpecificStats());
+ if (!collScanStats->tailable)
+ statsOut->collectionScansNonTailable++;
+ }
+ }
+}
+
+BSONObj PlanExecutorImpl::getStats() const {
+ // Serialize all stats from the winning plan.
+ auto mps = getMultiPlanStage();
+ auto winningPlanStats =
+ mps ? std::move(mps->getStats()->children[mps->bestPlanIdx()]) : _root->getStats();
+ return Explain::statsToBSON(*winningPlanStats);
+}
+
+MultiPlanStage* PlanExecutorImpl::getMultiPlanStage() const {
+ PlanStage* ps = getStageByType(_root.get(), StageType::STAGE_MULTI_PLAN);
+ invariant(ps == nullptr || ps->stageType() == StageType::STAGE_MULTI_PLAN);
+ return static_cast<MultiPlanStage*>(ps);
}
} // namespace mongo