summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Storch <david.storch@mongodb.com>2020-07-09 20:07:55 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-07-24 16:31:37 +0000
commitb4b35f9cc69412611a198642333bf40daa5ba58c (patch)
tree909673b812a499a60692c46abb53853f7df42b48
parent5e53ee3ca0a90eb98cdab94b298dec810fb46804 (diff)
downloadmongo-b4b35f9cc69412611a198642333bf40daa5ba58c.tar.gz
SERVER-48478 Replace PipelineProxyStage with PlanExecutorPipeline
-rw-r--r--src/mongo/db/SConscript3
-rw-r--r--src/mongo/db/clientcursor.cpp2
-rw-r--r--src/mongo/db/commands/count_cmd.cpp8
-rw-r--r--src/mongo/db/commands/distinct.cpp12
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp18
-rw-r--r--src/mongo/db/commands/find_cmd.cpp4
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp12
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp54
-rw-r--r--src/mongo/db/exec/change_stream_proxy.cpp116
-rw-r--r--src/mongo/db/exec/change_stream_proxy.h97
-rw-r--r--src/mongo/db/exec/pipeline_proxy.cpp145
-rw-r--r--src/mongo/db/exec/pipeline_proxy.h106
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp16
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp4
-rw-r--r--src/mongo/db/pipeline/pipeline.h16
-rw-r--r--src/mongo/db/pipeline/plan_executor_pipeline.cpp206
-rw-r--r--src/mongo/db/pipeline/plan_executor_pipeline.h186
-rw-r--r--src/mongo/db/query/classic_stage_builder.cpp2
-rw-r--r--src/mongo/db/query/explain.cpp353
-rw-r--r--src/mongo/db/query/explain.h73
-rw-r--r--src/mongo/db/query/find.cpp22
-rw-r--r--src/mongo/db/query/plan_executor.h30
-rw-r--r--src/mongo/db/query/plan_executor_factory.cpp10
-rw-r--r--src/mongo/db/query/plan_executor_factory.h9
-rw-r--r--src/mongo/db/query/plan_executor_impl.cpp151
-rw-r--r--src/mongo/db/query/plan_executor_impl.h22
-rw-r--r--src/mongo/db/query/plan_executor_sbe.h13
-rw-r--r--src/mongo/db/query/plan_yield_policy.h5
-rw-r--r--src/mongo/db/query/stage_types.h4
-rw-r--r--src/mongo/db/s/range_deletion_util.cpp2
-rw-r--r--src/mongo/dbtests/query_plan_executor.cpp16
-rw-r--r--src/mongo/dbtests/query_stage_multiplan.cpp2
32 files changed, 762 insertions, 957 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 61e3abba115..c5a203c7163 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1079,7 +1079,6 @@ env.Library(
'exec/and_hash.cpp',
'exec/and_sorted.cpp',
'exec/cached_plan.cpp',
- 'exec/change_stream_proxy.cpp',
'exec/collection_scan.cpp',
'exec/count.cpp',
'exec/count_scan.cpp',
@@ -1098,7 +1097,6 @@ env.Library(
'exec/multi_plan.cpp',
'exec/near.cpp',
'exec/or.cpp',
- 'exec/pipeline_proxy.cpp',
'exec/plan_cache_util.cpp',
'exec/plan_stage.cpp',
'exec/projection.cpp',
@@ -1128,6 +1126,7 @@ env.Library(
'pipeline/document_source_cursor.cpp',
'pipeline/document_source_geo_near_cursor.cpp',
'pipeline/pipeline_d.cpp',
+ 'pipeline/plan_executor_pipeline.cpp',
'query/classic_stage_builder.cpp',
'query/explain.cpp',
'query/find.cpp',
diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp
index 125c5fa13b6..845c31e48b9 100644
--- a/src/mongo/db/clientcursor.cpp
+++ b/src/mongo/db/clientcursor.cpp
@@ -92,7 +92,7 @@ ClientCursor::ClientCursor(ClientCursorParams params,
_operationUsingCursor(operationUsingCursor),
_lastUseDate(now),
_createdDate(now),
- _planSummary(Explain::getPlanSummary(_exec.get())),
+ _planSummary(_exec->getPlanSummary()),
_opKey(operationUsingCursor->getOperationKey()) {
invariant(_exec);
invariant(_operationUsingCursor);
diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp
index 54b8a808781..6165ef9cdb5 100644
--- a/src/mongo/db/commands/count_cmd.cpp
+++ b/src/mongo/db/commands/count_cmd.cpp
@@ -256,22 +256,20 @@ public:
auto curOp = CurOp::get(opCtx);
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
- curOp->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
+ curOp->setPlanSummary_inlock(exec->getPlanSummary());
}
exec->executePlan();
PlanSummaryStats summaryStats;
- Explain::getSummaryStats(*exec, &summaryStats);
+ exec->getSummaryStats(&summaryStats);
if (collection) {
CollectionQueryInfo::get(collection).notifyOfQuery(opCtx, collection, summaryStats);
}
curOp->debug().setPlanSummaryMetrics(summaryStats);
if (curOp->shouldDBProfile()) {
- BSONObjBuilder execStatsBob;
- Explain::getWinningPlanStats(exec.get(), &execStatsBob);
- curOp->debug().execStats = execStatsBob.obj();
+ curOp->debug().execStats = exec->getStats();
}
// Plan is done executing. We just need to pull the count out of the root stage.
diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp
index d43ab866f4f..fc8977182e8 100644
--- a/src/mongo/db/commands/distinct.cpp
+++ b/src/mongo/db/commands/distinct.cpp
@@ -239,8 +239,7 @@ public:
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
- CurOp::get(opCtx)->setPlanSummary_inlock(
- Explain::getPlanSummary(executor.getValue().get()));
+ CurOp::get(opCtx)->setPlanSummary_inlock(executor.getValue()->getPlanSummary());
}
const auto key = cmdObj[ParsedDistinct::kKeyField].valuestrsafe();
@@ -286,8 +285,7 @@ public:
"stats: {stats}",
"Plan executor error during distinct command",
"error"_attr = exception.toStatus(),
- "stats"_attr =
- redact(Explain::getWinningPlanStats(executor.getValue().get())));
+ "stats"_attr = redact(executor.getValue()->getStats()));
exception.addContext("Executor error during distinct command");
throw;
@@ -297,16 +295,14 @@ public:
// Get summary information about the plan.
PlanSummaryStats stats;
- Explain::getSummaryStats(*executor.getValue(), &stats);
+ executor.getValue()->getSummaryStats(&stats);
if (collection) {
CollectionQueryInfo::get(collection).notifyOfQuery(opCtx, collection, stats);
}
curOp->debug().setPlanSummaryMetrics(stats);
if (curOp->shouldDBProfile()) {
- BSONObjBuilder execStatsBob;
- Explain::getWinningPlanStats(executor.getValue().get(), &execStatsBob);
- curOp->debug().execStats = execStatsBob.obj();
+ curOp->debug().execStats = executor.getValue()->getStats();
}
BSONArrayBuilder valueListBuilder(result.subarrayStart("values"));
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index 33b489e2235..143a0d4f593 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -99,7 +99,7 @@ boost::optional<BSONObj> advanceExecutor(OperationContext* opCtx,
"Plan executor error during findAndModify: {error}, stats: {stats}",
"Plan executor error during findAndModify",
"error"_attr = exception.toStatus(),
- "stats"_attr = redact(Explain::getWinningPlanStats(exec)));
+ "stats"_attr = redact(exec->getStats()));
exception.addContext("Plan executor error during findAndModify");
throw;
@@ -469,7 +469,7 @@ public:
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
- CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
+ CurOp::get(opCtx)->setPlanSummary_inlock(exec->getPlanSummary());
}
auto docFound = advanceExecutor(opCtx, exec.get(), args.isRemove());
@@ -478,7 +478,7 @@ public:
// multiple times.
PlanSummaryStats summaryStats;
- Explain::getSummaryStats(*exec, &summaryStats);
+ exec->getSummaryStats(&summaryStats);
if (collection) {
CollectionQueryInfo::get(collection).notifyOfQuery(opCtx, collection, summaryStats);
}
@@ -488,9 +488,7 @@ public:
opDebug->additiveMetrics.ndeleted = DeleteStage::getNumDeleted(*exec);
if (curOp->shouldDBProfile()) {
- BSONObjBuilder execStatsBob;
- Explain::getWinningPlanStats(exec.get(), &execStatsBob);
- curOp->debug().execStats = execStatsBob.obj();
+ curOp->debug().execStats = exec->getStats();
}
recordStatsForTopCommand(opCtx);
@@ -550,7 +548,7 @@ public:
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
- CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
+ CurOp::get(opCtx)->setPlanSummary_inlock(exec->getPlanSummary());
}
auto docFound = advanceExecutor(opCtx, exec.get(), args.isRemove());
@@ -559,7 +557,7 @@ public:
// multiple times.
PlanSummaryStats summaryStats;
- Explain::getSummaryStats(*exec, &summaryStats);
+ exec->getSummaryStats(&summaryStats);
if (collection) {
CollectionQueryInfo::get(collection).notifyOfQuery(opCtx, collection, summaryStats);
}
@@ -567,9 +565,7 @@ public:
opDebug->setPlanSummaryMetrics(summaryStats);
if (curOp->shouldDBProfile()) {
- BSONObjBuilder execStatsBob;
- Explain::getWinningPlanStats(exec.get(), &execStatsBob);
- curOp->debug().execStats = execStatsBob.obj();
+ curOp->debug().execStats = exec->getStats();
}
recordStatsForTopCommand(opCtx);
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index d333b91b8af..84501a9cbb7 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -418,7 +418,7 @@ public:
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
- CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
+ CurOp::get(opCtx)->setPlanSummary_inlock(exec->getPlanSummary());
}
if (!collection) {
@@ -472,7 +472,7 @@ public:
"stats: {stats}",
"Plan executor error during find command",
"error"_attr = exception.toStatus(),
- "stats"_attr = redact(Explain::getWinningPlanStats(exec.get())));
+ "stats"_attr = redact(exec->getStats()));
exception.addContext("Executor error during find command");
throw;
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index d76c295f259..33f731e7cde 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -333,7 +333,7 @@ public:
"getMore command executor error: {error}, stats: {stats}",
"getMore command executor error",
"error"_attr = exception.toStatus(),
- "stats"_attr = redact(Explain::getWinningPlanStats(exec)));
+ "stats"_attr = redact(exec->getStats()));
exception.addContext("Executor error during getMore");
throw;
@@ -508,7 +508,7 @@ public:
exec->reattachToOperationContext(opCtx);
exec->restoreState();
- auto planSummary = Explain::getPlanSummary(exec);
+ auto planSummary = exec->getPlanSummary();
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
curOp->setPlanSummary_inlock(planSummary);
@@ -553,7 +553,7 @@ public:
// obtain these values we need to take a diff of the pre-execution and post-execution
// metrics, as they accumulate over the course of a cursor's lifetime.
PlanSummaryStats preExecutionStats;
- Explain::getSummaryStats(*exec, &preExecutionStats);
+ exec->getSummaryStats(&preExecutionStats);
// Mark this as an AwaitData operation if appropriate.
if (cursorPin->isAwaitData() && !disableAwaitDataFailpointActive) {
@@ -600,7 +600,7 @@ public:
&numResults);
PlanSummaryStats postExecutionStats;
- Explain::getSummaryStats(*exec, &postExecutionStats);
+ exec->getSummaryStats(&postExecutionStats);
postExecutionStats.totalKeysExamined -= preExecutionStats.totalKeysExamined;
postExecutionStats.totalDocsExamined -= preExecutionStats.totalDocsExamined;
curOp->debug().setPlanSummaryMetrics(postExecutionStats);
@@ -613,9 +613,7 @@ public:
if (cursorPin->getExecutor()->lockPolicy() !=
PlanExecutor::LockPolicy::kLocksInternally &&
curOp->shouldDBProfile()) {
- BSONObjBuilder execStatsBob;
- Explain::getWinningPlanStats(exec, &execStatsBob);
- curOp->debug().execStats = execStatsBob.obj();
+ curOp->debug().execStats = exec->getStats();
}
if (shouldSaveCursor) {
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index daa7cae1f95..c414b5ac4ac 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -42,7 +42,6 @@
#include "mongo/db/curop.h"
#include "mongo/db/cursor_manager.h"
#include "mongo/db/db_raii.h"
-#include "mongo/db/exec/change_stream_proxy.h"
#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/namespace_string.h"
@@ -54,6 +53,7 @@
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/pipeline_d.h"
+#include "mongo/db/pipeline/plan_executor_pipeline.h"
#include "mongo/db/pipeline/process_interface/mongo_process_interface.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/query/collection_query_info.h"
@@ -166,9 +166,9 @@ bool handleCursorCommand(OperationContext* opCtx,
invariant(exec);
bool stashedResult = false;
+ // We are careful to avoid ever calling 'getNext()' on the PlanExecutor when the batchSize is
+ // zero to avoid doing any query execution work.
for (int objCount = 0; objCount < batchSize; objCount++) {
- // The initial getNext() on a PipelineProxyStage may be very expensive so we don't
- // do it when batchSize is 0 since that indicates a desire for a fast return.
PlanExecutor::ExecState state;
BSONObj nextDoc;
@@ -185,7 +185,7 @@ bool handleCursorCommand(OperationContext* opCtx,
"Aggregate command executor error: {error}, stats: {stats}",
"Aggregate command executor error",
"error"_attr = exception.toStatus(),
- "stats"_attr = redact(Explain::getWinningPlanStats(exec)));
+ "stats"_attr = redact(exec->getStats()));
exception.addContext("PlanExecutor error during aggregation");
throw;
@@ -445,35 +445,6 @@ std::vector<std::unique_ptr<Pipeline, PipelineDeleter>> createExchangePipelinesI
return pipelines;
}
-
-/**
- * Create a PlanExecutor to execute the given 'pipeline'.
- */
-std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> createOuterPipelineProxyExecutor(
- OperationContext* opCtx,
- const NamespaceString& nss,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- bool hasChangeStream) {
- boost::intrusive_ptr<ExpressionContext> expCtx(pipeline->getContext());
-
- // Transfer ownership of the Pipeline to the PipelineProxyStage.
- auto ws = std::make_unique<WorkingSet>();
- auto proxy = hasChangeStream
- ? std::make_unique<ChangeStreamProxyStage>(expCtx.get(), std::move(pipeline), ws.get())
- : std::make_unique<PipelineProxyStage>(expCtx.get(), std::move(pipeline), ws.get());
-
- // This PlanExecutor will simply forward requests to the Pipeline, so does not need
- // to yield or to be registered with any collection's CursorManager to receive
- // invalidations. The Pipeline may contain PlanExecutors which *are* yielding
- // PlanExecutors and which *are* registered with their respective collection's
- // CursorManager
- return uassertStatusOK(plan_executor_factory::make(std::move(expCtx),
- std::move(ws),
- std::move(proxy),
- nullptr,
- PlanYieldPolicy::YieldPolicy::NO_YIELD,
- nss));
-}
} // namespace
Status runAggregate(OperationContext* opCtx,
@@ -672,8 +643,13 @@ Status runAggregate(OperationContext* opCtx,
auto pipelines =
createExchangePipelinesIfNeeded(opCtx, expCtx, request, std::move(pipeline), uuid);
for (auto&& pipelineIt : pipelines) {
- execs.emplace_back(createOuterPipelineProxyExecutor(
- opCtx, nss, std::move(pipelineIt), liteParsedPipeline.hasChangeStream()));
+ // There are separate ExpressionContexts for each exchange pipeline, so make sure to
+ // pass the pipeline's ExpressionContext to the plan executor factory.
+ auto pipelineExpCtx = pipelineIt->getContext();
+ execs.emplace_back(
+ plan_executor_factory::make(std::move(pipelineExpCtx),
+ std::move(pipelineIt),
+ liteParsedPipeline.hasChangeStream()));
}
// With the pipelines created, we can relinquish locks as they will manage the locks
@@ -685,7 +661,7 @@ Status runAggregate(OperationContext* opCtx,
}
{
- auto planSummary = Explain::getPlanSummary(execs[0].get());
+ auto planSummary = execs[0]->getPlanSummary();
stdx::lock_guard<Client> lk(*opCtx->getClient());
curOp->setPlanSummary_inlock(std::move(planSummary));
}
@@ -735,8 +711,8 @@ Status runAggregate(OperationContext* opCtx,
if (expCtx->explain) {
auto explainExecutor = pins[0]->getExecutor();
auto bodyBuilder = result->getBodyBuilder();
- if (explainExecutor->isPipelineExecutor()) {
- Explain::explainPipelineExecutor(explainExecutor, *(expCtx->explain), &bodyBuilder);
+ if (auto pipelineExec = dynamic_cast<PlanExecutorPipeline*>(explainExecutor)) {
+ Explain::explainPipelineExecutor(pipelineExec, *(expCtx->explain), &bodyBuilder);
} else {
invariant(pins[0]->getExecutor()->lockPolicy() ==
PlanExecutor::LockPolicy::kLockExternally);
@@ -759,7 +735,7 @@ Status runAggregate(OperationContext* opCtx,
}
PlanSummaryStats stats;
- Explain::getSummaryStats(*(pins[0].getCursor()->getExecutor()), &stats);
+ pins[0].getCursor()->getExecutor()->getSummaryStats(&stats);
curOp->debug().setPlanSummaryMetrics(stats);
curOp->debug().nreturned = stats.nReturned;
// For an optimized away pipeline, signal the cache that a query operation has completed.
diff --git a/src/mongo/db/exec/change_stream_proxy.cpp b/src/mongo/db/exec/change_stream_proxy.cpp
deleted file mode 100644
index c16255a897b..00000000000
--- a/src/mongo/db/exec/change_stream_proxy.cpp
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/exec/change_stream_proxy.h"
-
-#include "mongo/db/pipeline/pipeline_d.h"
-#include "mongo/db/pipeline/resume_token.h"
-#include "mongo/db/repl/speculative_majority_read_info.h"
-
-namespace mongo {
-
-const char* ChangeStreamProxyStage::kStageType = "CHANGE_STREAM_PROXY";
-
-ChangeStreamProxyStage::ChangeStreamProxyStage(ExpressionContext* expCtx,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- WorkingSet* ws)
- : PipelineProxyStage(expCtx, std::move(pipeline), ws, kStageType) {
- // Set _postBatchResumeToken to the initial PBRT that was added to the expression context during
- // pipeline construction, and use it to obtain the starting time for _latestOplogTimestamp.
- invariant(!_pipeline->getContext()->initialPostBatchResumeToken.isEmpty());
- _postBatchResumeToken = _pipeline->getContext()->initialPostBatchResumeToken.getOwned();
- _latestOplogTimestamp = ResumeToken::parse(_postBatchResumeToken).getData().clusterTime;
-}
-
-boost::optional<Document> ChangeStreamProxyStage::getNext() {
- if (auto next = _pipeline->getNext()) {
- // While we have more results to return, we track both the timestamp and the resume token of
- // the latest event observed in the oplog, the latter via its sort key metadata field.
- _validateResumeToken(*next);
- _latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get());
- _postBatchResumeToken = next->metadata().getSortKey().getDocument().toBson();
- _setSpeculativeReadTimestamp();
- return next;
- }
-
- // We ran out of results to return. Check whether the oplog cursor has moved forward since the
- // last recorded timestamp. Because we advance _latestOplogTimestamp for every event we return,
- // if the new time is higher than the last then we are guaranteed not to have already returned
- // any events at this timestamp. We can set _postBatchResumeToken to a new high-water-mark token
- // at the current clusterTime.
- auto highWaterMark = PipelineD::getLatestOplogTimestamp(_pipeline.get());
- if (highWaterMark > _latestOplogTimestamp) {
- auto token = ResumeToken::makeHighWaterMarkToken(highWaterMark);
- _postBatchResumeToken = token.toDocument().toBson();
- _latestOplogTimestamp = highWaterMark;
- _setSpeculativeReadTimestamp();
- }
- return boost::none;
-}
-
-void ChangeStreamProxyStage::_validateResumeToken(const Document& event) const {
- // If we are producing output to be merged on mongoS, then no stages can have modified the _id.
- if (_includeMetaData) {
- return;
- }
- // Confirm that the document _id field matches the original resume token in the sort key field.
- auto eventBSON = event.toBson();
- auto resumeToken = event.metadata().getSortKey();
- auto idField = eventBSON.getObjectField("_id");
- invariant(!resumeToken.missing());
- uassert(ErrorCodes::ChangeStreamFatalError,
- str::stream() << "Encountered an event whose _id field, which contains the resume "
- "token, was modified by the pipeline. Modifying the _id field of an "
- "event makes it impossible to resume the stream from that point. Only "
- "transformations that retain the unmodified _id field are allowed. "
- "Expected: "
- << BSON("_id" << resumeToken) << " but found: "
- << (eventBSON["_id"] ? BSON("_id" << eventBSON["_id"]) : BSONObj()),
- (resumeToken.getType() == BSONType::Object) &&
- idField.binaryEqual(resumeToken.getDocument().toBson()));
-}
-
-void ChangeStreamProxyStage::_setSpeculativeReadTimestamp() {
- repl::SpeculativeMajorityReadInfo& speculativeMajorityReadInfo =
- repl::SpeculativeMajorityReadInfo::get(_pipeline->getContext()->opCtx);
- if (speculativeMajorityReadInfo.isSpeculativeRead() && !_latestOplogTimestamp.isNull()) {
- speculativeMajorityReadInfo.setSpeculativeReadTimestampForward(_latestOplogTimestamp);
- }
-}
-
-std::unique_ptr<PlanStageStats> ChangeStreamProxyStage::getStats() {
- std::unique_ptr<PlanStageStats> ret =
- std::make_unique<PlanStageStats>(CommonStats(kStageType), STAGE_CHANGE_STREAM_PROXY);
- ret->specific = std::make_unique<CollectionScanStats>();
- return ret;
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/exec/change_stream_proxy.h b/src/mongo/db/exec/change_stream_proxy.h
deleted file mode 100644
index 6d115b78885..00000000000
--- a/src/mongo/db/exec/change_stream_proxy.h
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/exec/pipeline_proxy.h"
-
-namespace mongo {
-
-/**
- * ChangeStreamProxyStage is a drop-in replacement for PipelineProxyStage, intended to manage the
- * serialization of change stream pipeline output from Document to BSON. In particular, it is
- * additionally responsible for tracking the latestOplogTimestamps and postBatchResumeTokens that
- * are necessary for correct merging on mongoS and, in the latter case, must also be provided to
- * mongoD clients.
- */
-class ChangeStreamProxyStage final : public PipelineProxyStage {
-public:
- static const char* kStageType;
-
- /**
- * The 'pipeline' argument must be a $changeStream pipeline. Passing a non-$changeStream into
- * the constructor will cause an invariant() to fail.
- */
- ChangeStreamProxyStage(ExpressionContext* expCtx,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- WorkingSet* ws);
-
- /**
- * Returns an empty PlanStageStats object.
- */
- std::unique_ptr<PlanStageStats> getStats() final;
-
- /**
- * Passes through the latest oplog timestamp from the proxied pipeline. We only expose the oplog
- * timestamp in the event that we need to merge on mongoS.
- */
- Timestamp getLatestOplogTimestamp() const {
- return _includeMetaData ? _latestOplogTimestamp : Timestamp();
- }
-
- /**
- * Passes through the most recent resume token from the proxied pipeline.
- */
- BSONObj getPostBatchResumeToken() const {
- return _postBatchResumeToken;
- }
-
- StageType stageType() const final {
- return STAGE_CHANGE_STREAM_PROXY;
- }
-
-protected:
- boost::optional<Document> getNext() final;
-
-private:
- /**
- * Verifies that the docs's resume token has not been modified.
- */
- void _validateResumeToken(const Document& event) const;
-
- /**
- * Set the speculative majority read timestamp if we have scanned up to a certain oplog
- * timestamp.
- */
- void _setSpeculativeReadTimestamp();
-
- Timestamp _latestOplogTimestamp;
- BSONObj _postBatchResumeToken;
-};
-} // namespace mongo
diff --git a/src/mongo/db/exec/pipeline_proxy.cpp b/src/mongo/db/exec/pipeline_proxy.cpp
deleted file mode 100644
index c3014ce34a5..00000000000
--- a/src/mongo/db/exec/pipeline_proxy.cpp
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/exec/pipeline_proxy.h"
-
-#include <memory>
-
-#include "mongo/db/pipeline/document_source.h"
-#include "mongo/db/pipeline/expression_context.h"
-#include "mongo/db/pipeline/pipeline_d.h"
-
-namespace mongo {
-
-using boost::intrusive_ptr;
-using std::shared_ptr;
-using std::unique_ptr;
-using std::vector;
-
-const char* PipelineProxyStage::kStageType = "PIPELINE_PROXY";
-
-PipelineProxyStage::PipelineProxyStage(ExpressionContext* expCtx,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- WorkingSet* ws)
- : PipelineProxyStage(expCtx, std::move(pipeline), ws, kStageType) {}
-
-PipelineProxyStage::PipelineProxyStage(ExpressionContext* expCtx,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- WorkingSet* ws,
- const char* stageTypeName)
- : PlanStage(stageTypeName, expCtx),
- _pipeline(std::move(pipeline)),
- _includeMetaData(_pipeline->getContext()->needsMerge), // send metadata to merger
- _ws(ws) {
- // We take over responsibility for disposing of the Pipeline, since it is required that
- // doDispose() will be called before destruction of this PipelineProxyStage.
- _pipeline.get_deleter().dismissDisposal();
-}
-
-PlanStage::StageState PipelineProxyStage::doWork(WorkingSetID* out) {
- invariant(out);
-
- if (!_stash.empty()) {
- *out = _ws->allocate();
- WorkingSetMember* member = _ws->get(*out);
- if (_includeMetaData && _stash.back().metadata()) {
- member->metadata() = _stash.back().metadata();
- }
- member->doc = {SnapshotId(), std::move(_stash.back())};
- _stash.pop_back();
- member->transitionToOwnedObj();
- return PlanStage::ADVANCED;
- }
-
- if (auto next = getNext()) {
- *out = _ws->allocate();
- WorkingSetMember* member = _ws->get(*out);
- if (_includeMetaData && next->metadata()) {
- member->metadata() = next->metadata();
- }
- member->doc = {SnapshotId(), std::move(*next)};
- member->transitionToOwnedObj();
- return PlanStage::ADVANCED;
- }
-
- return PlanStage::IS_EOF;
-}
-
-bool PipelineProxyStage::isEOF() {
- if (!_stash.empty())
- return false;
-
- if (auto next = getNext()) {
- _stash.emplace_back(*next);
- return false;
- }
-
- return true;
-}
-
-void PipelineProxyStage::doDetachFromOperationContext() {
- _pipeline->detachFromOperationContext();
-}
-
-void PipelineProxyStage::doReattachToOperationContext() {
- _pipeline->reattachToOperationContext(opCtx());
-}
-
-void PipelineProxyStage::doDispose() {
- _pipeline->dispose(opCtx());
-}
-
-unique_ptr<PlanStageStats> PipelineProxyStage::getStats() {
- unique_ptr<PlanStageStats> ret =
- std::make_unique<PlanStageStats>(CommonStats(kStageType), STAGE_PIPELINE_PROXY);
- ret->specific = std::make_unique<CollectionScanStats>();
- return ret;
-}
-
-boost::optional<Document> PipelineProxyStage::getNext() {
- return _pipeline->getNext();
-}
-
-std::string PipelineProxyStage::getPlanSummaryStr() const {
- return PipelineD::getPlanSummaryStr(_pipeline.get());
-}
-
-void PipelineProxyStage::getPlanSummaryStats(PlanSummaryStats* statsOut) const {
- invariant(statsOut);
- PipelineD::getPlanSummaryStats(_pipeline.get(), statsOut);
- statsOut->nReturned = getCommonStats()->advanced;
-}
-
-vector<Value> PipelineProxyStage::writeExplainOps(ExplainOptions::Verbosity verbosity) const {
- return _pipeline->writeExplainOps(verbosity);
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/exec/pipeline_proxy.h b/src/mongo/db/exec/pipeline_proxy.h
deleted file mode 100644
index 99f8583f3c0..00000000000
--- a/src/mongo/db/exec/pipeline_proxy.h
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <boost/intrusive_ptr.hpp>
-#include <boost/optional/optional.hpp>
-
-#include "mongo/db/exec/plan_stage.h"
-#include "mongo/db/exec/plan_stats.h"
-#include "mongo/db/pipeline/pipeline.h"
-#include "mongo/db/query/plan_summary_stats.h"
-#include "mongo/db/record_id.h"
-#include "mongo/util/assert_util.h"
-
-namespace mongo {
-
-/**
- * Stage for pulling results out from an aggregation pipeline.
- */
-class PipelineProxyStage : public PlanStage {
-public:
- PipelineProxyStage(ExpressionContext* expCtx,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- WorkingSet* ws);
-
- virtual ~PipelineProxyStage() = default;
-
- PlanStage::StageState doWork(WorkingSetID* out) final;
-
- bool isEOF() final;
-
- //
- // Manage our OperationContext.
- //
- void doDetachFromOperationContext() final;
- void doReattachToOperationContext() final;
-
- // Returns empty PlanStageStats object
- std::unique_ptr<PlanStageStats> getStats() override;
-
- // Not used.
- SpecificStats* getSpecificStats() const final {
- MONGO_UNREACHABLE;
- }
-
- std::string getPlanSummaryStr() const;
- void getPlanSummaryStats(PlanSummaryStats* statsOut) const;
-
- StageType stageType() const override {
- return STAGE_PIPELINE_PROXY;
- }
-
- /**
- * Writes the pipelineProxyStage's operators to a std::vector<Value>, providing the level of
- * detail specified by 'verbosity'.
- */
- std::vector<Value> writeExplainOps(ExplainOptions::Verbosity verbosity) const;
-
- static const char* kStageType;
-
-protected:
- PipelineProxyStage(ExpressionContext* expCtx,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- WorkingSet* ws,
- const char* stageTypeName);
-
- virtual boost::optional<Document> getNext();
- void doDispose() final;
-
- // Items in the _stash should be returned before pulling items from _pipeline.
- std::unique_ptr<Pipeline, PipelineDeleter> _pipeline;
- const bool _includeMetaData;
-
-private:
- std::vector<Document> _stash;
- WorkingSet* _ws;
-};
-
-} // namespace mongo
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index c193a6fd7f8..7fa9c49d362 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -671,21 +671,19 @@ static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx,
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
- CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
+ CurOp::get(opCtx)->setPlanSummary_inlock(exec->getPlanSummary());
}
exec->executePlan();
PlanSummaryStats summary;
- Explain::getSummaryStats(*exec, &summary);
+ exec->getSummaryStats(&summary);
if (auto coll = collection->getCollection()) {
CollectionQueryInfo::get(coll).notifyOfQuery(opCtx, coll, summary);
}
if (curOp.shouldDBProfile()) {
- BSONObjBuilder execStatsBob;
- Explain::getWinningPlanStats(exec.get(), &execStatsBob);
- curOp.debug().execStats = execStatsBob.obj();
+ curOp.debug().execStats = exec->getStats();
}
const UpdateStats* updateStats = UpdateStage::getUpdateStats(exec.get());
@@ -911,7 +909,7 @@ static SingleWriteResult performSingleDeleteOp(OperationContext* opCtx,
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
- CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
+ CurOp::get(opCtx)->setPlanSummary_inlock(exec->getPlanSummary());
}
exec->executePlan();
@@ -919,16 +917,14 @@ static SingleWriteResult performSingleDeleteOp(OperationContext* opCtx,
curOp.debug().additiveMetrics.ndeleted = n;
PlanSummaryStats summary;
- Explain::getSummaryStats(*exec, &summary);
+ exec->getSummaryStats(&summary);
if (auto coll = collection.getCollection()) {
CollectionQueryInfo::get(coll).notifyOfQuery(opCtx, coll, summary);
}
curOp.debug().setPlanSummaryMetrics(summary);
if (curOp.shouldDBProfile()) {
- BSONObjBuilder execStatsBob;
- Explain::getWinningPlanStats(exec.get(), &execStatsBob);
- curOp.debug().execStats = execStatsBob.obj();
+ curOp.debug().execStats = exec->getStats();
}
LastError::get(opCtx->getClient()).recordDelete(n);
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index 9c8389594b9..7c530ef0663 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -195,7 +195,7 @@ void DocumentSourceCursor::_updateOplogTimestamp() {
void DocumentSourceCursor::recordPlanSummaryStats() {
invariant(_exec);
- Explain::getSummaryStats(*_exec, &_planSummaryStats);
+ _exec->getSummaryStats(&_planSummaryStats);
}
Value DocumentSourceCursor::serialize(boost::optional<ExplainOptions::Verbosity> verbosity) const {
@@ -300,7 +300,7 @@ DocumentSourceCursor::DocumentSourceCursor(
// Later code in the DocumentSourceCursor lifecycle expects that '_exec' is in a saved state.
_exec->saveState();
- _planSummary = Explain::getPlanSummary(_exec.get());
+ _planSummary = _exec->getPlanSummary();
recordPlanSummaryStats();
if (pExpCtx->explain) {
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 8ef13836c62..93663a68abf 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -157,18 +157,14 @@ public:
}
/**
- * Sets the OperationContext of 'pCtx' to nullptr.
- *
- * The PipelineProxyStage is responsible for detaching the OperationContext and releasing any
- * storage-engine state of the DocumentSourceCursor that may be present in '_sources'.
+ * Sets the OperationContext of 'pCtx' to nullptr and calls 'detachFromOperationContext()' on
+ * all underlying DocumentSources.
*/
void detachFromOperationContext();
/**
- * Sets the OperationContext of 'pCtx' to 'opCtx'.
- *
- * The PipelineProxyStage is responsible for reattaching the OperationContext and reacquiring
- * any storage-engine state of the DocumentSourceCursor that may be present in '_sources'.
+ * Sets the OperationContext of 'pCtx' to 'opCtx', and reattaches all underlying DocumentSources
+ * to 'opCtx'.
*/
void reattachToOperationContext(OperationContext* opCtx);
@@ -186,6 +182,10 @@ public:
*/
void dispose(OperationContext* opCtx);
+ bool isDisposed() const {
+ return _disposed;
+ }
+
/**
* Checks to see if disk is ever used within the pipeline.
*/
diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
new file mode 100644
index 00000000000..7a9f0524446
--- /dev/null
+++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
@@ -0,0 +1,206 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/plan_executor_pipeline.h"
+
+#include "mongo/db/pipeline/pipeline_d.h"
+#include "mongo/db/pipeline/resume_token.h"
+#include "mongo/db/repl/speculative_majority_read_info.h"
+
+namespace mongo {
+
+PlanExecutorPipeline::PlanExecutorPipeline(boost::intrusive_ptr<ExpressionContext> expCtx,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ bool isChangeStream)
+ : _expCtx(std::move(expCtx)), _pipeline(std::move(pipeline)), _isChangeStream(isChangeStream) {
+ // Pipeline plan executors must always have an ExpressionContext.
+ invariant(_expCtx);
+
+ // The caller is responsible for disposing this plan executor before deleting it, which will in
+ // turn dispose the underlying pipeline. Therefore, there is no need to dispose the pipeline
+ // again when it is destroyed.
+ _pipeline.get_deleter().dismissDisposal();
+
+ if (_isChangeStream) {
+ // Set _postBatchResumeToken to the initial PBRT that was added to the expression context
+ // during pipeline construction, and use it to obtain the starting time for
+ // _latestOplogTimestamp.
+ invariant(!_expCtx->initialPostBatchResumeToken.isEmpty());
+ _postBatchResumeToken = _expCtx->initialPostBatchResumeToken.getOwned();
+ _latestOplogTimestamp = ResumeToken::parse(_postBatchResumeToken).getData().clusterTime;
+ }
+}
+
+PlanExecutor::ExecState PlanExecutorPipeline::getNext(BSONObj* objOut, RecordId* recordIdOut) {
+ // The pipeline-based execution engine does not track the record ids associated with documents,
+ // so it is an error for the caller to ask for one. For the same reason, we expect the caller to
+ // provide a non-null BSONObj pointer for 'objOut'.
+ invariant(!recordIdOut);
+ invariant(objOut);
+
+ if (!_stash.empty()) {
+ *objOut = std::move(_stash.front());
+ _stash.pop();
+ ++_nReturned;
+ return PlanExecutor::ADVANCED;
+ }
+
+ Document docOut;
+ auto execState = getNextDocument(&docOut, nullptr);
+ if (execState == PlanExecutor::ADVANCED) {
+ // Include metadata if the output will be consumed by a merging node.
+ *objOut = _expCtx->needsMerge ? docOut.toBsonWithMetaData() : docOut.toBson();
+ }
+ return execState;
+}
+
+PlanExecutor::ExecState PlanExecutorPipeline::getNextDocument(Document* docOut,
+ RecordId* recordIdOut) {
+ // The pipeline-based execution engine does not track the record ids associated with documents,
+ // so it is an error for the caller to ask for one. For the same reason, we expect the caller to
+ // provide a non-null pointer for 'docOut'.
+ invariant(!recordIdOut);
+ invariant(docOut);
+
+ // Callers which use 'enqueue()' are not allowed to use 'getNextDocument()', and must instead
+ // use 'getNext()'.
+ invariant(_stash.empty());
+
+ if (auto next = _getNext()) {
+ *docOut = std::move(*next);
+ ++_nReturned;
+ return PlanExecutor::ADVANCED;
+ }
+
+ return PlanExecutor::IS_EOF;
+}
+
+bool PlanExecutorPipeline::isEOF() {
+ if (!_stash.empty()) {
+ return false;
+ }
+
+ return _pipelineIsEof;
+}
+
+boost::optional<Document> PlanExecutorPipeline::_getNext() {
+ auto nextDoc = _pipeline->getNext();
+ if (!nextDoc) {
+ _pipelineIsEof = true;
+ }
+
+ if (_isChangeStream) {
+ _performChangeStreamsAccounting(nextDoc);
+ }
+ return nextDoc;
+}
+
+void PlanExecutorPipeline::_performChangeStreamsAccounting(const boost::optional<Document> doc) {
+ invariant(_isChangeStream);
+ if (doc) {
+ // While we have more results to return, we track both the timestamp and the resume token of
+ // the latest event observed in the oplog, the latter via its sort key metadata field.
+ _validateResumeToken(*doc);
+ _latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get());
+ _postBatchResumeToken = doc->metadata().getSortKey().getDocument().toBson();
+ _setSpeculativeReadTimestamp();
+ } else {
+ // We ran out of results to return. Check whether the oplog cursor has moved forward since
+ // the last recorded timestamp. Because we advance _latestOplogTimestamp for every event we
+ // return, if the new time is higher than the last then we are guaranteed not to have
+ // already returned any events at this timestamp. We can set _postBatchResumeToken to a new
+ // high-water-mark token at the current clusterTime.
+ auto highWaterMark = PipelineD::getLatestOplogTimestamp(_pipeline.get());
+ if (highWaterMark > _latestOplogTimestamp) {
+ auto token = ResumeToken::makeHighWaterMarkToken(highWaterMark);
+ _postBatchResumeToken = token.toDocument().toBson();
+ _latestOplogTimestamp = highWaterMark;
+ _setSpeculativeReadTimestamp();
+ }
+ }
+}
+
+void PlanExecutorPipeline::executePlan() {
+ while (_getNext()) {
+ // Run the pipeline to completion, but discard the results.
+ }
+}
+
+std::string PlanExecutorPipeline::getPlanSummary() const {
+ return PipelineD::getPlanSummaryStr(_pipeline.get());
+}
+
+void PlanExecutorPipeline::getSummaryStats(PlanSummaryStats* statsOut) const {
+ invariant(statsOut);
+ PipelineD::getPlanSummaryStats(_pipeline.get(), statsOut);
+ statsOut->nReturned = _nReturned;
+}
+
+void PlanExecutorPipeline::_validateResumeToken(const Document& event) const {
+ // If we are producing output to be merged on mongoS, then no stages can have modified the _id.
+ if (_expCtx->needsMerge) {
+ return;
+ }
+
+ // Confirm that the document _id field matches the original resume token in the sort key field.
+ auto eventBSON = event.toBson();
+ auto resumeToken = event.metadata().getSortKey();
+ auto idField = eventBSON.getObjectField("_id");
+ invariant(!resumeToken.missing());
+ uassert(ErrorCodes::ChangeStreamFatalError,
+ str::stream() << "Encountered an event whose _id field, which contains the resume "
+ "token, was modified by the pipeline. Modifying the _id field of an "
+ "event makes it impossible to resume the stream from that point. Only "
+ "transformations that retain the unmodified _id field are allowed. "
+ "Expected: "
+ << BSON("_id" << resumeToken) << " but found: "
+ << (eventBSON["_id"] ? BSON("_id" << eventBSON["_id"]) : BSONObj()),
+ (resumeToken.getType() == BSONType::Object) &&
+ idField.binaryEqual(resumeToken.getDocument().toBson()));
+}
+
+void PlanExecutorPipeline::_setSpeculativeReadTimestamp() {
+ repl::SpeculativeMajorityReadInfo& speculativeMajorityReadInfo =
+ repl::SpeculativeMajorityReadInfo::get(_expCtx->opCtx);
+ if (speculativeMajorityReadInfo.isSpeculativeRead() && !_latestOplogTimestamp.isNull()) {
+ speculativeMajorityReadInfo.setSpeculativeReadTimestampForward(_latestOplogTimestamp);
+ }
+}
+
+void PlanExecutorPipeline::markAsKilled(Status killStatus) {
+ invariant(!killStatus.isOK());
+ // If killed multiple times, only retain the first status.
+ if (_killStatus.isOK()) {
+ _killStatus = killStatus;
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.h b/src/mongo/db/pipeline/plan_executor_pipeline.h
new file mode 100644
index 00000000000..f6ad2a97341
--- /dev/null
+++ b/src/mongo/db/pipeline/plan_executor_pipeline.h
@@ -0,0 +1,186 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <queue>
+
+#include "mongo/db/exec/document_value/document.h"
+#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/query/plan_executor.h"
+
+namespace mongo {
+
+/**
+ * A plan executor which is used to execute a Pipeline of DocumentSources.
+ */
+class PlanExecutorPipeline final : public PlanExecutor {
+public:
+ PlanExecutorPipeline(boost::intrusive_ptr<ExpressionContext> expCtx,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ bool isChangeStream);
+
+ PlanStage* getRootStage() const override {
+ MONGO_UNREACHABLE;
+ }
+
+ CanonicalQuery* getCanonicalQuery() const override {
+ return nullptr;
+ }
+
+ const NamespaceString& nss() const override {
+ return _expCtx->ns;
+ }
+
+ OperationContext* getOpCtx() const override {
+ return _expCtx->opCtx;
+ }
+
+ // Pipeline execution does not support the saveState()/restoreState() interface. Instead, the
+ // underlying data access plan is saved/restored internally in between DocumentSourceCursor
+ // batches, or when the underlying PlanStage tree yields.
+ void saveState() override {}
+ void restoreState() override {}
+
+ void detachFromOperationContext() override {
+ _pipeline->detachFromOperationContext();
+ }
+
+ void reattachToOperationContext(OperationContext* opCtx) override {
+ _pipeline->reattachToOperationContext(opCtx);
+ }
+
+ ExecState getNext(BSONObj* objOut, RecordId* recordIdOut) override;
+ ExecState getNextDocument(Document* docOut, RecordId* recordIdOut) override;
+
+ bool isEOF() override;
+
+ void executePlan() override;
+
+ void dispose(OperationContext* opCtx) override {
+ _pipeline->dispose(opCtx);
+ }
+
+ void enqueue(const BSONObj& obj) override {
+ _stash.push(obj.getOwned());
+ }
+
+ void markAsKilled(Status killStatus) override;
+
+ bool isMarkedAsKilled() const override {
+ return !_killStatus.isOK();
+ }
+
+ Status getKillStatus() override {
+ invariant(isMarkedAsKilled());
+ return _killStatus;
+ }
+
+ bool isDisposed() const override {
+ return _pipeline->isDisposed();
+ }
+
+ Timestamp getLatestOplogTimestamp() const override {
+ return _latestOplogTimestamp;
+ }
+
+ BSONObj getPostBatchResumeToken() const override {
+ return _postBatchResumeToken;
+ }
+
+ LockPolicy lockPolicy() const override {
+ return LockPolicy::kLocksInternally;
+ }
+
+ std::string getPlanSummary() const override;
+
+ void getSummaryStats(PlanSummaryStats* statsOut) const override;
+
+ /**
+ * Writes the explain information about the underlying pipeline to a std::vector<Value>,
+ * providing the level of detail specified by 'verbosity'.
+ */
+ std::vector<Value> writeExplainOps(ExplainOptions::Verbosity verbosity) const {
+ return _pipeline->writeExplainOps(verbosity);
+ }
+
+ BSONObj getStats() const override {
+ // TODO SERVER-49808: Report execution stats for the pipeline.
+ return BSONObj{};
+ }
+
+private:
+ /**
+ * Obtains the next document from the underlying Pipeline, and does change streams-related
+ * accounting if needed.
+ */
+ boost::optional<Document> _getNext();
+
+ /**
+ * If this is a change stream, advance the cluster time and post batch resume token based on the
+ * latest document returned by the underlying pipeline.
+ */
+ void _performChangeStreamsAccounting(const boost::optional<Document>);
+
+ /**
+ * Verifies that the docs's resume token has not been modified.
+ */
+ void _validateResumeToken(const Document& event) const;
+
+ /**
+ * Set the speculative majority read timestamp if we have scanned up to a certain oplog
+ * timestamp.
+ */
+ void _setSpeculativeReadTimestamp();
+
+ boost::intrusive_ptr<ExpressionContext> _expCtx;
+
+ std::unique_ptr<Pipeline, PipelineDeleter> _pipeline;
+
+ const bool _isChangeStream;
+
+ std::queue<BSONObj> _stash;
+
+ // If _killStatus has a non-OK value, then we have been killed and the value represents the
+ // reason for the kill.
+ Status _killStatus = Status::OK();
+
+ size_t _nReturned = 0;
+
+ // Set to true once we have received all results from the underlying '_pipeline', and the
+ // pipeline has indicated end-of-stream.
+ bool _pipelineIsEof = false;
+
+ // If '_pipeline' is a change stream, these track the latest timestamp seen while scanning the
+ // oplog, as well as the most recent PBRT.
+ Timestamp _latestOplogTimestamp;
+ BSONObj _postBatchResumeToken;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/query/classic_stage_builder.cpp b/src/mongo/db/query/classic_stage_builder.cpp
index a8ef54c2abe..bf904550f9c 100644
--- a/src/mongo/db/query/classic_stage_builder.cpp
+++ b/src/mongo/db/query/classic_stage_builder.cpp
@@ -350,7 +350,6 @@ std::unique_ptr<PlanStage> ClassicStageBuilder::build(const QuerySolutionNode* r
expCtx, esn->pattern, _ws, std::move(childStage));
}
case STAGE_CACHED_PLAN:
- case STAGE_CHANGE_STREAM_PROXY:
case STAGE_COUNT:
case STAGE_DELETE:
case STAGE_EOF:
@@ -358,7 +357,6 @@ std::unique_ptr<PlanStage> ClassicStageBuilder::build(const QuerySolutionNode* r
case STAGE_MOCK:
case STAGE_MULTI_ITERATOR:
case STAGE_MULTI_PLAN:
- case STAGE_PIPELINE_PROXY:
case STAGE_QUEUED_DATA:
case STAGE_RECORD_STORE_FAST_COUNT:
case STAGE_SUBPLAN:
diff --git a/src/mongo/db/query/explain.cpp b/src/mongo/db/query/explain.cpp
index 4f6ca08cccf..d80405b8aaf 100644
--- a/src/mongo/db/query/explain.cpp
+++ b/src/mongo/db/query/explain.cpp
@@ -40,16 +40,17 @@
#include "mongo/db/exec/index_scan.h"
#include "mongo/db/exec/multi_plan.h"
#include "mongo/db/exec/near.h"
-#include "mongo/db/exec/pipeline_proxy.h"
#include "mongo/db/exec/sort.h"
#include "mongo/db/exec/text.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/keypattern.h"
+#include "mongo/db/pipeline/plan_executor_pipeline.h"
#include "mongo/db/query/canonical_query_encoder.h"
#include "mongo/db/query/collection_query_info.h"
#include "mongo/db/query/explain_common.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_executor.h"
+#include "mongo/db/query/plan_executor_impl.h"
#include "mongo/db/query/plan_executor_sbe.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/query_planner.h"
@@ -81,120 +82,6 @@ void flattenStatsTree(const PlanStageStats* root, vector<const PlanStageStats*>*
}
/**
- * Traverse the tree rooted at 'root', and add all nodes into the list 'flattened'. If a
- * MultiPlanStage is encountered, only add the best plan and its children to 'flattened'.
- */
-void flattenExecTree(const PlanStage* root, vector<const PlanStage*>* flattened) {
- flattened->push_back(root);
-
- if (root->stageType() == STAGE_MULTI_PLAN) {
- // Only add the winning plan from a MultiPlanStage.
- auto mps = static_cast<const MultiPlanStage*>(root);
- const PlanStage* winningStage = mps->getChildren()[mps->bestPlanIdx()].get();
- return flattenExecTree(winningStage, flattened);
- }
-
- const auto& children = root->getChildren();
- for (size_t i = 0; i < children.size(); ++i) {
- flattenExecTree(children[i].get(), flattened);
- }
-}
-
-/**
- * Traverse the stage tree, depth first and return the first stage of a given type.
- */
-PlanStage* findStageOfType(PlanStage* root, StageType desiredStageType) {
- if (root->stageType() == desiredStageType) {
- return root;
- }
-
- for (const auto& child : root->getChildren()) {
- PlanStage* p = findStageOfType(child.get(), desiredStageType);
- if (p) {
- return p;
- }
- }
- return nullptr;
-}
-
-/**
- * Gets a pointer to the MultiPlanStage inside the stage tree rooted at 'root'. Returns nullptr if
- * there is no MPS.
- */
-MultiPlanStage* getMultiPlanStage(PlanStage* root) {
- PlanStage* ps = findStageOfType(root, STAGE_MULTI_PLAN);
- invariant(ps == nullptr || ps->stageType() == STAGE_MULTI_PLAN);
- return static_cast<MultiPlanStage*>(ps);
-}
-
-/**
- * Gets a pointer to the PipelineProxyStage if it is the root of the tree. Returns nullptr if
- * there is no PPS that is root.
- */
-PipelineProxyStage* getPipelineProxyStage(PlanStage* root) {
- if (root->stageType() == STAGE_PIPELINE_PROXY ||
- root->stageType() == STAGE_CHANGE_STREAM_PROXY) {
- return static_cast<PipelineProxyStage*>(root);
- }
-
- return nullptr;
-}
-
-/**
- * Given the SpecificStats object for a stage and the type of the stage, returns the
- * number of index keys examined by the stage.
- *
- * This is used for getting the total number of keys examined by a plan. We need
- * to collect a 'totalKeysExamined' metric for a regular explain (in which case this
- * gets called from Explain::generateSinglePlanExecutionInfo()) or for the slow query log / profiler
- * (in which case this gets called from Explain::getSummaryStats()).
- */
-size_t getKeysExamined(StageType type, const SpecificStats* specific) {
- if (STAGE_IXSCAN == type) {
- const IndexScanStats* spec = static_cast<const IndexScanStats*>(specific);
- return spec->keysExamined;
- } else if (STAGE_IDHACK == type) {
- const IDHackStats* spec = static_cast<const IDHackStats*>(specific);
- return spec->keysExamined;
- } else if (STAGE_COUNT_SCAN == type) {
- const CountScanStats* spec = static_cast<const CountScanStats*>(specific);
- return spec->keysExamined;
- } else if (STAGE_DISTINCT_SCAN == type) {
- const DistinctScanStats* spec = static_cast<const DistinctScanStats*>(specific);
- return spec->keysExamined;
- }
-
- return 0;
-}
-
-/**
- * Given the SpecificStats object for a stage and the type of the stage, returns the
- * number of documents examined by the stage.
- *
- * This is used for getting the total number of documents examined by a plan. We need
- * to collect a 'totalDocsExamined' metric for a regular explain (in which case this
- * gets called from Explain::generateSinglePlanExecutionInfo()) or for the slow query log / profiler
- * (in which case this gets called from Explain::getSummaryStats()).
- */
-size_t getDocsExamined(StageType type, const SpecificStats* specific) {
- if (STAGE_COLLSCAN == type) {
- const CollectionScanStats* spec = static_cast<const CollectionScanStats*>(specific);
- return spec->docsTested;
- } else if (STAGE_FETCH == type) {
- const FetchStats* spec = static_cast<const FetchStats*>(specific);
- return spec->docsExamined;
- } else if (STAGE_IDHACK == type) {
- const IDHackStats* spec = static_cast<const IDHackStats*>(specific);
- return spec->docsExamined;
- } else if (STAGE_TEXT_OR == type) {
- const TextOrStats* spec = static_cast<const TextOrStats*>(specific);
- return spec->fetches;
- }
-
- return 0;
-}
-
-/**
* Adds to the plan summary string being built by 'sb' for the execution stage 'stage'.
*/
void addStageSummaryStr(const PlanStage* stage, StringBuilder& sb) {
@@ -269,10 +156,10 @@ void appendMultikeyPaths(const BSONObj& keyPattern,
* Gather the PlanStageStats for all of the losing plans. If exec doesn't have a MultiPlanStage
* (or any losing plans), will return an empty vector.
*/
-std::vector<std::unique_ptr<PlanStageStats>> getRejectedPlansTrialStats(PlanExecutor* exec) {
+std::vector<std::unique_ptr<PlanStageStats>> getRejectedPlansTrialStats(PlanExecutorImpl* exec) {
// Inspect the tree to see if there is a MultiPlanStage. Plan selection has already happened at
// this point, since we have a PlanExecutor.
- const auto mps = getMultiPlanStage(exec->getRootStage());
+ const auto mps = exec->getMultiPlanStage();
std::vector<std::unique_ptr<PlanStageStats>> res;
// Get the stats from the trial period for all the plans.
@@ -291,8 +178,8 @@ std::vector<std::unique_ptr<PlanStageStats>> getRejectedPlansTrialStats(PlanExec
/**
* Get PlanExecutor's winning plan stats tree.
*/
-unique_ptr<PlanStageStats> getWinningPlanStatsTree(const PlanExecutor* exec) {
- MultiPlanStage* mps = getMultiPlanStage(exec->getRootStage());
+unique_ptr<PlanStageStats> getWinningPlanStatsTree(const PlanExecutorImpl* exec) {
+ auto mps = exec->getMultiPlanStage();
return mps ? std::move(mps->getStats()->children[mps->bestPlanIdx()])
: std::move(exec->getRootStage()->getStats());
}
@@ -303,7 +190,58 @@ namespace mongo {
using str::stream;
-// static
+void Explain::flattenExecTree(const PlanStage* root, vector<const PlanStage*>* flattened) {
+ flattened->push_back(root);
+
+ if (root->stageType() == STAGE_MULTI_PLAN) {
+ // Only add the winning plan from a MultiPlanStage.
+ auto mps = static_cast<const MultiPlanStage*>(root);
+ const PlanStage* winningStage = mps->getChildren()[mps->bestPlanIdx()].get();
+ return flattenExecTree(winningStage, flattened);
+ }
+
+ const auto& children = root->getChildren();
+ for (size_t i = 0; i < children.size(); ++i) {
+ flattenExecTree(children[i].get(), flattened);
+ }
+}
+
+size_t Explain::getKeysExamined(StageType type, const SpecificStats* specific) {
+ if (STAGE_IXSCAN == type) {
+ const IndexScanStats* spec = static_cast<const IndexScanStats*>(specific);
+ return spec->keysExamined;
+ } else if (STAGE_IDHACK == type) {
+ const IDHackStats* spec = static_cast<const IDHackStats*>(specific);
+ return spec->keysExamined;
+ } else if (STAGE_COUNT_SCAN == type) {
+ const CountScanStats* spec = static_cast<const CountScanStats*>(specific);
+ return spec->keysExamined;
+ } else if (STAGE_DISTINCT_SCAN == type) {
+ const DistinctScanStats* spec = static_cast<const DistinctScanStats*>(specific);
+ return spec->keysExamined;
+ }
+
+ return 0;
+}
+
+size_t Explain::getDocsExamined(StageType type, const SpecificStats* specific) {
+ if (STAGE_COLLSCAN == type) {
+ const CollectionScanStats* spec = static_cast<const CollectionScanStats*>(specific);
+ return spec->docsTested;
+ } else if (STAGE_FETCH == type) {
+ const FetchStats* spec = static_cast<const FetchStats*>(specific);
+ return spec->docsExamined;
+ } else if (STAGE_IDHACK == type) {
+ const IDHackStats* spec = static_cast<const IDHackStats*>(specific);
+ return spec->docsExamined;
+ } else if (STAGE_TEXT_OR == type) {
+ const TextOrStats* spec = static_cast<const TextOrStats*>(specific);
+ return spec->fetches;
+ }
+
+ return 0;
+}
+
void Explain::statsToBSON(const PlanStageStats& stats,
ExplainOptions::Verbosity verbosity,
BSONObjBuilder* bob,
@@ -621,7 +559,6 @@ void Explain::statsToBSON(const PlanStageStats& stats,
childrenBob.doneFast();
}
-// static
BSONObj Explain::statsToBSON(const PlanStageStats& stats, ExplainOptions::Verbosity verbosity) {
BSONObjBuilder bob;
statsToBSON(stats, &bob, verbosity);
@@ -634,33 +571,21 @@ BSONObj Explain::statsToBSON(const sbe::PlanStageStats& stats,
return bob.obj();
}
-// static
void Explain::statsToBSON(const PlanStageStats& stats,
BSONObjBuilder* bob,
ExplainOptions::Verbosity verbosity) {
statsToBSON(stats, verbosity, bob, bob);
}
-// static
-BSONObj Explain::getWinningPlanStats(const PlanExecutor* exec) {
- BSONObjBuilder bob;
- if (!dynamic_cast<const PlanExecutorSBE*>(exec)) {
- getWinningPlanStats(exec, &bob);
- }
- return bob.obj();
-}
-
-// static
-void Explain::getWinningPlanStats(const PlanExecutor* exec, BSONObjBuilder* bob) {
- unique_ptr<PlanStageStats> winningStats = getWinningPlanStatsTree(exec);
- statsToBSON(*winningStats, ExplainOptions::Verbosity::kExecStats, bob, bob);
-}
-
-// static
void Explain::generatePlannerInfo(PlanExecutor* exec,
const Collection* collection,
BSONObj extraInfo,
BSONObjBuilder* out) {
+ auto planExecImpl = dynamic_cast<PlanExecutorImpl*>(exec);
+ uassert(4847801,
+ "queryPlanner explain section is only supported for classic PlanStages",
+ planExecImpl);
+
CanonicalQuery* query = exec->getCanonicalQuery();
BSONObjBuilder plannerBob(out->subobjStart("queryPlanner"));
@@ -716,12 +641,13 @@ void Explain::generatePlannerInfo(PlanExecutor* exec,
}
BSONObjBuilder winningPlanBob(plannerBob.subobjStart("winningPlan"));
- const auto winnerStats = getWinningPlanStatsTree(exec);
+ const auto winnerStats = getWinningPlanStatsTree(planExecImpl);
statsToBSON(*winnerStats.get(), &winningPlanBob, ExplainOptions::Verbosity::kQueryPlanner);
winningPlanBob.doneFast();
// Genenerate array of rejected plans.
- const vector<unique_ptr<PlanStageStats>> rejectedStats = getRejectedPlansTrialStats(exec);
+ const vector<unique_ptr<PlanStageStats>> rejectedStats =
+ getRejectedPlansTrialStats(planExecImpl);
BSONArrayBuilder allPlansBob(plannerBob.subarrayStart("rejectedPlans"));
for (size_t i = 0; i < rejectedStats.size(); i++) {
BSONObjBuilder childBob(allPlansBob.subobjStart());
@@ -774,9 +700,14 @@ void Explain::generateSinglePlanExecutionInfo(const PlanStageStats* stats,
}
std::unique_ptr<PlanStageStats> Explain::getWinningPlanTrialStats(PlanExecutor* exec) {
+ auto planExecImpl = dynamic_cast<PlanExecutorImpl*>(exec);
+ uassert(4847802,
+ "getWinningPlanTrialStats() is only supported for classic PlanStages",
+ planExecImpl);
+
// Inspect the tree to see if there is a MultiPlanStage. Plan selection has already happened at
// this point, since we have a PlanExecutor.
- const auto mps = getMultiPlanStage(exec->getRootStage());
+ const auto mps = planExecImpl->getMultiPlanStage();
if (mps) {
const auto mpsStats = mps->getStats();
@@ -786,15 +717,19 @@ std::unique_ptr<PlanStageStats> Explain::getWinningPlanTrialStats(PlanExecutor*
return nullptr;
}
-// static
void Explain::generateExecutionInfo(PlanExecutor* exec,
ExplainOptions::Verbosity verbosity,
Status executePlanStatus,
PlanStageStats* winningPlanTrialStats,
BSONObjBuilder* out) {
+ auto planExecImpl = dynamic_cast<PlanExecutorImpl*>(exec);
+ uassert(4847800,
+ "executionStats explain section is only supported for classic PlanStages",
+ planExecImpl);
+
invariant(verbosity >= ExplainOptions::Verbosity::kExecStats);
if (verbosity >= ExplainOptions::Verbosity::kExecAllPlans &&
- findStageOfType(exec->getRootStage(), STAGE_MULTI_PLAN) != nullptr) {
+ planExecImpl->getMultiPlanStage()) {
invariant(winningPlanTrialStats,
"winningPlanTrialStats must be non-null when requesting all execution stats");
}
@@ -811,7 +746,7 @@ void Explain::generateExecutionInfo(PlanExecutor* exec,
// Generate exec stats BSON for the winning plan.
OperationContext* opCtx = exec->getOpCtx();
long long totalTimeMillis = durationCount<Milliseconds>(CurOp::get(opCtx)->elapsedTimeTotal());
- const auto winningExecStats = getWinningPlanStatsTree(exec);
+ const auto winningExecStats = getWinningPlanStatsTree(planExecImpl);
generateSinglePlanExecutionInfo(winningExecStats.get(), verbosity, totalTimeMillis, &execBob);
// Also generate exec stats for all plans, if the verbosity level is high enough.
@@ -831,7 +766,8 @@ void Explain::generateExecutionInfo(PlanExecutor* exec,
planBob.doneFast();
}
- const vector<unique_ptr<PlanStageStats>> rejectedStats = getRejectedPlansTrialStats(exec);
+ const vector<unique_ptr<PlanStageStats>> rejectedStats =
+ getRejectedPlansTrialStats(planExecImpl);
for (size_t i = 0; i < rejectedStats.size(); ++i) {
BSONObjBuilder planBob(allPlansBob.subobjStart());
generateSinglePlanExecutionInfo(
@@ -865,15 +801,12 @@ void Explain::explainStages(PlanExecutor* exec,
}
}
-// static
-void Explain::explainPipelineExecutor(PlanExecutor* exec,
+void Explain::explainPipelineExecutor(PlanExecutorPipeline* exec,
ExplainOptions::Verbosity verbosity,
BSONObjBuilder* out) {
+ invariant(exec);
invariant(out);
- PipelineProxyStage* pps = getPipelineProxyStage(exec->getRootStage());
- invariant(pps, "Expected exec's root stage to be a PipelineProxyStage");
-
// If we need execution stats, this runs the plan in order to gather the stats.
if (verbosity >= ExplainOptions::Verbosity::kExecStats) {
// TODO SERVER-32732: An execution error should be reported in explain, but should not
@@ -881,20 +814,19 @@ void Explain::explainPipelineExecutor(PlanExecutor* exec,
exec->executePlan();
}
- *out << "stages" << Value(pps->writeExplainOps(verbosity));
+ *out << "stages" << Value(exec->writeExplainOps(verbosity));
explain_common::generateServerInfo(out);
}
-// static
void Explain::explainStages(PlanExecutor* exec,
const Collection* collection,
ExplainOptions::Verbosity verbosity,
BSONObj extraInfo,
BSONObjBuilder* out) {
uassert(4822877,
- "Explain facility is not supported for SBE plans",
- !dynamic_cast<PlanExecutorSBE*>(exec));
+ "explainStages() is only supported for PlanStage trees",
+ dynamic_cast<PlanExecutorImpl*>(exec));
auto winningPlanTrialStats = Explain::getWinningPlanTrialStats(exec);
@@ -927,24 +859,7 @@ void Explain::explainStages(PlanExecutor* exec,
explain_common::generateServerInfo(out);
}
-// static
-std::string Explain::getPlanSummary(const PlanExecutor* exec) {
- // TODO: Handle planSummary when SBE is enabled.
- if (dynamic_cast<const PlanExecutorSBE*>(exec)) {
- return "unsupported";
- }
-
- return getPlanSummary(exec->getRootStage());
-}
-
-// static
std::string Explain::getPlanSummary(const PlanStage* root) {
- if (root->stageType() == STAGE_PIPELINE_PROXY ||
- root->stageType() == STAGE_CHANGE_STREAM_PROXY) {
- auto pipelineProxy = static_cast<const PipelineProxyStage*>(root);
- return pipelineProxy->getPlanSummaryStr();
- }
-
std::vector<const PlanStage*> stages;
flattenExecTree(root, &stages);
@@ -968,105 +883,11 @@ std::string Explain::getPlanSummary(const PlanStage* root) {
return sb.str();
}
-// static
std::string Explain::getPlanSummary(const sbe::PlanStage* root) {
- // TODO: Handle 'planSummary' when SBE is enabled.
+ // TODO: Support 'planSummary' for SBE plan stage trees.
return "unsupported";
}
-// static
-void Explain::getSummaryStats(const PlanExecutor& exec, PlanSummaryStats* statsOut) {
- invariant(nullptr != statsOut);
-
- // TODO: Handle 'getSummaryStats' when SBE is enabled.
- if (dynamic_cast<const PlanExecutorSBE*>(&exec)) {
- return;
- }
-
- PlanStage* root = exec.getRootStage();
- if (root->stageType() == STAGE_PIPELINE_PROXY ||
- root->stageType() == STAGE_CHANGE_STREAM_PROXY) {
- auto pipelineProxy = static_cast<PipelineProxyStage*>(root);
- pipelineProxy->getPlanSummaryStats(statsOut);
- return;
- }
-
- // We can get some of the fields we need from the common stats stored in the
- // root stage of the plan tree.
- const CommonStats* common = root->getCommonStats();
- statsOut->nReturned = common->advanced;
-
- // The other fields are aggregations over the stages in the plan tree. We flatten
- // the tree into a list and then compute these aggregations.
- std::vector<const PlanStage*> stages;
- flattenExecTree(root, &stages);
-
- statsOut->totalKeysExamined = 0;
- statsOut->totalDocsExamined = 0;
-
- for (size_t i = 0; i < stages.size(); i++) {
- statsOut->totalKeysExamined +=
- getKeysExamined(stages[i]->stageType(), stages[i]->getSpecificStats());
- statsOut->totalDocsExamined +=
- getDocsExamined(stages[i]->stageType(), stages[i]->getSpecificStats());
-
- if (isSortStageType(stages[i]->stageType())) {
- statsOut->hasSortStage = true;
-
- auto sortStage = static_cast<const SortStage*>(stages[i]);
- auto sortStats = static_cast<const SortStats*>(sortStage->getSpecificStats());
- statsOut->usedDisk = sortStats->wasDiskUsed;
- }
-
- if (STAGE_IXSCAN == stages[i]->stageType()) {
- const IndexScan* ixscan = static_cast<const IndexScan*>(stages[i]);
- const IndexScanStats* ixscanStats =
- static_cast<const IndexScanStats*>(ixscan->getSpecificStats());
- statsOut->indexesUsed.insert(ixscanStats->indexName);
- } else if (STAGE_COUNT_SCAN == stages[i]->stageType()) {
- const CountScan* countScan = static_cast<const CountScan*>(stages[i]);
- const CountScanStats* countScanStats =
- static_cast<const CountScanStats*>(countScan->getSpecificStats());
- statsOut->indexesUsed.insert(countScanStats->indexName);
- } else if (STAGE_IDHACK == stages[i]->stageType()) {
- const IDHackStage* idHackStage = static_cast<const IDHackStage*>(stages[i]);
- const IDHackStats* idHackStats =
- static_cast<const IDHackStats*>(idHackStage->getSpecificStats());
- statsOut->indexesUsed.insert(idHackStats->indexName);
- } else if (STAGE_DISTINCT_SCAN == stages[i]->stageType()) {
- const DistinctScan* distinctScan = static_cast<const DistinctScan*>(stages[i]);
- const DistinctScanStats* distinctScanStats =
- static_cast<const DistinctScanStats*>(distinctScan->getSpecificStats());
- statsOut->indexesUsed.insert(distinctScanStats->indexName);
- } else if (STAGE_TEXT == stages[i]->stageType()) {
- const TextStage* textStage = static_cast<const TextStage*>(stages[i]);
- const TextStats* textStats =
- static_cast<const TextStats*>(textStage->getSpecificStats());
- statsOut->indexesUsed.insert(textStats->indexName);
- } else if (STAGE_GEO_NEAR_2D == stages[i]->stageType() ||
- STAGE_GEO_NEAR_2DSPHERE == stages[i]->stageType()) {
- const NearStage* nearStage = static_cast<const NearStage*>(stages[i]);
- const NearStats* nearStats =
- static_cast<const NearStats*>(nearStage->getSpecificStats());
- statsOut->indexesUsed.insert(nearStats->indexName);
- } else if (STAGE_CACHED_PLAN == stages[i]->stageType()) {
- const CachedPlanStage* cachedPlan = static_cast<const CachedPlanStage*>(stages[i]);
- const CachedPlanStats* cachedStats =
- static_cast<const CachedPlanStats*>(cachedPlan->getSpecificStats());
- statsOut->replanReason = cachedStats->replanReason;
- } else if (STAGE_MULTI_PLAN == stages[i]->stageType()) {
- statsOut->fromMultiPlanner = true;
- } else if (STAGE_COLLSCAN == stages[i]->stageType()) {
- statsOut->collectionScans++;
- const auto collScan = static_cast<const CollectionScan*>(stages[i]);
- const auto collScanStats =
- static_cast<const CollectionScanStats*>(collScan->getSpecificStats());
- if (!collScanStats->tailable)
- statsOut->collectionScansNonTailable++;
- }
- }
-}
-
void Explain::planCacheEntryToBSON(const PlanCacheEntry& entry, BSONObjBuilder* out) {
BSONObjBuilder shapeBuilder(out->subobjStart("createdFromQuery"));
shapeBuilder.append("query", entry.query);
diff --git a/src/mongo/db/query/explain.h b/src/mongo/db/query/explain.h
index f71c63a0691..3c692082a03 100644
--- a/src/mongo/db/query/explain.h
+++ b/src/mongo/db/query/explain.h
@@ -43,6 +43,7 @@ namespace mongo {
class Collection;
class OperationContext;
+class PlanExecutorPipeline;
struct PlanSummaryStats;
/**
@@ -89,7 +90,7 @@ public:
* - 'winningPlanTrialStats' is the stats of the winning plan during the trial period. May be
* nullptr.
* - 'out' is the builder for the explain output.
- **/
+ */
static void explainStages(PlanExecutor* exec,
const Collection* collection,
ExplainOptions::Verbosity verbosity,
@@ -99,28 +100,16 @@ public:
BSONObjBuilder* out);
/**
- * Gets explain BSON for the document sources contained by 'exec'. Use this function if you
- * have a PlanExecutor whose root is a PipelineProxyStage and want to turn it into a human
- * readable explain format.
+ * Gets explain BSON for the document sources contained by 'exec'. Use this function if you have
+ * a PlanExecutor for a pipeline and want to turn it into a human readable explain format.
*
* The explain information is generated with the level of detail specified by 'verbosity'.
- **/
- static void explainPipelineExecutor(PlanExecutor* exec,
+ */
+ static void explainPipelineExecutor(PlanExecutorPipeline* exec,
ExplainOptions::Verbosity verbosity,
BSONObjBuilder* out);
/**
- * Converts the PlanExecutor's winning plan stats tree to BSON and returns to the caller.
- */
- static BSONObj getWinningPlanStats(const PlanExecutor* exec);
-
- /**
- * Converts the PlanExecutor's winning plan stats tree to BSON and returns the result through
- * the out-parameter 'bob'.
- */
- static void getWinningPlanStats(const PlanExecutor* exec, BSONObjBuilder* bob);
-
- /**
* Converts the stats tree 'stats' into a corresponding BSON object containing
* explain information.
*
@@ -149,26 +138,10 @@ public:
/**
* Returns a short plan summary std::string describing the leaves of the query plan.
*/
- static std::string getPlanSummary(const PlanExecutor* exec);
static std::string getPlanSummary(const PlanStage* root);
static std::string getPlanSummary(const sbe::PlanStage* root);
/**
- * Fills out 'statsOut' with summary stats using the execution tree contained
- * in 'exec'.
- *
- * The summary stats are consumed by debug mechanisms such as the profiler and
- * the slow query log.
- *
- * This is a lightweight alternative for explainStages(...) above which is useful
- * when operations want to request debug information without doing all the work
- * to generate a full explain.
- *
- * Does not take ownership of its arguments.
- */
- static void getSummaryStats(const PlanExecutor& exec, PlanSummaryStats* statsOut);
-
- /**
* If exec's root stage is a MultiPlanStage, returns the stats for the trial period of of the
* winning plan. Otherwise, returns nullptr.
*
@@ -177,6 +150,32 @@ public:
static std::unique_ptr<PlanStageStats> getWinningPlanTrialStats(PlanExecutor* exec);
/**
+ * Serializes a PlanCacheEntry to the provided BSON object builder. The output format is
+ * intended to be human readable, and useful for debugging query performance problems related to
+ * the plan cache.
+ */
+ static void planCacheEntryToBSON(const PlanCacheEntry& entry, BSONObjBuilder* out);
+
+ /**
+ * Traverses the tree rooted at 'root', and adds all nodes into the list 'flattened'. If a
+ * MultiPlanStage is encountered, only adds the best plan and its children to 'flattened'.
+ */
+ static void flattenExecTree(const PlanStage* root, std::vector<const PlanStage*>* flattened);
+
+ /**
+ * Given the SpecificStats object for a stage and the type of the stage, returns the number of
+ * index keys examined by the stage.
+ */
+ static size_t getKeysExamined(StageType type, const SpecificStats* specific);
+
+ /**
+ * Given the SpecificStats object for a stage and the type of the stage, returns the number of
+ * documents examined by the stage.
+ */
+ static size_t getDocsExamined(StageType type, const SpecificStats* specific);
+
+private:
+ /**
* Generates the execution stats section for the stats tree 'stats', adding the resulting BSON
* to 'out'.
*
@@ -193,14 +192,6 @@ public:
BSONObjBuilder* out);
/**
- * Serializes a PlanCacheEntry to the provided BSON object builder. The output format is
- * intended to be human readable, and useful for debugging query performance problems related to
- * the plan cache.
- */
- static void planCacheEntryToBSON(const PlanCacheEntry& entry, BSONObjBuilder* out);
-
-private:
- /**
* Adds the 'queryPlanner' explain section to the BSON object being built
* by 'out'.
*
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index c936ea37681..440c3c580d0 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -133,7 +133,7 @@ void endQueryOp(OperationContext* opCtx,
// Fill out CurOp based on explain summary statistics.
PlanSummaryStats summaryStats;
- Explain::getSummaryStats(exec, &summaryStats);
+ exec.getSummaryStats(&summaryStats);
curOp->debug().setPlanSummaryMetrics(summaryStats);
if (collection) {
@@ -141,9 +141,7 @@ void endQueryOp(OperationContext* opCtx,
}
if (curOp->shouldDBProfile()) {
- BSONObjBuilder statsBob;
- Explain::getWinningPlanStats(&exec, &statsBob);
- curOp->debug().execStats = statsBob.obj();
+ curOp->debug().execStats = exec.getStats();
}
}
@@ -185,7 +183,7 @@ void generateBatch(int ntoreturn,
LOGV2_ERROR(20918,
"getMore executor error, stats: {stats}",
"getMore executor error",
- "stats"_attr = redact(Explain::getWinningPlanStats(exec)));
+ "stats"_attr = redact(exec->getStats()));
exception.addContext("Executor error during OP_GET_MORE");
throw;
}
@@ -414,7 +412,7 @@ Message getMore(OperationContext* opCtx,
exec->reattachToOperationContext(opCtx);
exec->restoreState();
- auto planSummary = Explain::getPlanSummary(exec);
+ auto planSummary = exec->getPlanSummary();
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
curOp.setPlanSummary_inlock(planSummary);
@@ -450,7 +448,7 @@ Message getMore(OperationContext* opCtx,
// these values we need to take a diff of the pre-execution and post-execution metrics, as they
// accumulate over the course of a cursor's lifetime.
PlanSummaryStats preExecutionStats;
- Explain::getSummaryStats(*exec, &preExecutionStats);
+ exec->getSummaryStats(&preExecutionStats);
if (MONGO_unlikely(waitWithPinnedCursorDuringGetMoreBatch.shouldFail())) {
CurOpFailpointHelpers::waitWhileFailPointEnabled(&waitWithPinnedCursorDuringGetMoreBatch,
opCtx,
@@ -486,7 +484,7 @@ Message getMore(OperationContext* opCtx,
}
PlanSummaryStats postExecutionStats;
- Explain::getSummaryStats(*exec, &postExecutionStats);
+ exec->getSummaryStats(&postExecutionStats);
postExecutionStats.totalKeysExamined -= preExecutionStats.totalKeysExamined;
postExecutionStats.totalDocsExamined -= preExecutionStats.totalDocsExamined;
curOp.debug().setPlanSummaryMetrics(postExecutionStats);
@@ -498,9 +496,7 @@ Message getMore(OperationContext* opCtx,
// cost.
if (cursorPin->getExecutor()->lockPolicy() != PlanExecutor::LockPolicy::kLocksInternally &&
curOp.shouldDBProfile()) {
- BSONObjBuilder execStatsBob;
- Explain::getWinningPlanStats(exec, &execStatsBob);
- curOp.debug().execStats = execStatsBob.obj();
+ curOp.debug().execStats = exec->getStats();
}
// Our two possible ClientCursorPin cleanup paths are:
@@ -692,7 +688,7 @@ bool runQuery(OperationContext* opCtx,
// Get summary info about which plan the executor is using.
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
- curOp.setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
+ curOp.setPlanSummary_inlock(exec->getPlanSummary());
}
try {
@@ -726,7 +722,7 @@ bool runQuery(OperationContext* opCtx,
"Plan executor error during find: {error}, stats: {stats}",
"Plan executor error during find",
"error"_attr = redact(exception.toStatus()),
- "stats"_attr = redact(Explain::getWinningPlanStats(exec.get())));
+ "stats"_attr = redact(exec->getStats()));
exception.addContext("Executor error during find");
throw;
diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h
index 87a1c5cbd1e..b271902e162 100644
--- a/src/mongo/db/query/plan_executor.h
+++ b/src/mongo/db/query/plan_executor.h
@@ -33,6 +33,7 @@
#include "mongo/db/exec/plan_stats.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/query/canonical_query.h"
+#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/plan_yield_policy.h"
namespace mongo {
@@ -282,7 +283,8 @@ public:
/**
* Stash the BSONObj so that it gets returned from the PlanExecutor on a later call to
- * getNext().
+ * getNext(). Implementations should NOT support returning queued BSON objects using
+ * 'getNextDocument()'. Only 'getNext()' should return the queued BSON objects.
*
* Enqueued documents are returned in FIFO order. The queued results are exhausted before
* generating further results from the underlying query plan.
@@ -311,12 +313,30 @@ public:
virtual LockPolicy lockPolicy() const = 0;
/**
- * Returns true if this PlanExecutor proxies to a Pipeline of DocumentSources.
+ * Returns a short string, suitable for the logs, which summarizes the execution plan.
+ */
+ virtual std::string getPlanSummary() const = 0;
+
+ /**
+ * Fills out 'statsOut' with summary stats collected during the execution of the PlanExecutor.
+ * This is a lightweight alternative which is useful when operations want to request a summary
+ * of the available debug information without generating complete explain output.
+ *
+ * The summary stats are consumed by debug mechanisms such as the profiler and the slow query
+ * log.
+ */
+ virtual void getSummaryStats(PlanSummaryStats* statsOut) const = 0;
+
+ /**
+ * Serializes any execution stats tracked by this executor to BSON, for debugging. The format of
+ * these stats are opaque to the caller, and different implementations may choose to provide
+ * different stats.
*
- * TODO SERVER-48478 : Create a new PlanExecutor implementation specifically for executing the
- * Pipeline, and delete PipelineProxyStage.
+ * Implementations must be able to successfully generate and return stats even if the
+ * PlanExecutor has issued a query-fatal exception and the executor cannot be used for further
+ * query execution.
*/
- virtual bool isPipelineExecutor() const = 0;
+ virtual BSONObj getStats() const = 0;
};
} // namespace mongo
diff --git a/src/mongo/db/query/plan_executor_factory.cpp b/src/mongo/db/query/plan_executor_factory.cpp
index 5c00c01dab6..fa16042eaf0 100644
--- a/src/mongo/db/query/plan_executor_factory.cpp
+++ b/src/mongo/db/query/plan_executor_factory.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/query/plan_executor_factory.h"
#include "mongo/db/exec/plan_stage.h"
+#include "mongo/db/pipeline/plan_executor_pipeline.h"
#include "mongo/db/query/plan_executor_impl.h"
#include "mongo/db/query/plan_executor_sbe.h"
#include "mongo/logv2/log.h"
@@ -156,4 +157,13 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make(
return {{exec, PlanExecutor::Deleter{opCtx}}};
}
+std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> make(
+ boost::intrusive_ptr<ExpressionContext> expCtx,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ bool isChangeStream) {
+ auto* opCtx = expCtx->opCtx;
+ auto exec = new PlanExecutorPipeline(std::move(expCtx), std::move(pipeline), isChangeStream);
+ return {exec, PlanExecutor::Deleter{opCtx}};
+}
+
} // namespace mongo::plan_executor_factory
diff --git a/src/mongo/db/query/plan_executor_factory.h b/src/mongo/db/query/plan_executor_factory.h
index 3d27507cfd2..8faad454285 100644
--- a/src/mongo/db/query/plan_executor_factory.h
+++ b/src/mongo/db/query/plan_executor_factory.h
@@ -33,6 +33,7 @@
#include "mongo/db/exec/sbe/stages/stages.h"
#include "mongo/db/exec/working_set.h"
+#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/query/plan_yield_policy_sbe.h"
#include "mongo/db/query/query_solution.h"
@@ -120,4 +121,12 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make(
std::queue<std::pair<BSONObj, boost::optional<RecordId>>> stash,
std::unique_ptr<PlanYieldPolicySBE> yieldPolicy);
+/**
+ * Constructs a plan executor for executing the given 'pipeline'.
+ */
+std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> make(
+ boost::intrusive_ptr<ExpressionContext> expCtx,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ bool isChangeStream);
+
} // namespace mongo::plan_executor_factory
diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp
index 4e661a4f2ac..cc3923a7990 100644
--- a/src/mongo/db/query/plan_executor_impl.cpp
+++ b/src/mongo/db/query/plan_executor_impl.cpp
@@ -42,15 +42,21 @@
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
#include "mongo/db/exec/cached_plan.h"
-#include "mongo/db/exec/change_stream_proxy.h"
#include "mongo/db/exec/collection_scan.h"
-#include "mongo/db/exec/multi_plan.h"
+#include "mongo/db/exec/count_scan.h"
+#include "mongo/db/exec/distinct_scan.h"
+#include "mongo/db/exec/idhack.h"
+#include "mongo/db/exec/index_scan.h"
+#include "mongo/db/exec/near.h"
#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/exec/plan_stats.h"
+#include "mongo/db/exec/sort.h"
#include "mongo/db/exec/subplan.h"
+#include "mongo/db/exec/text.h"
#include "mongo/db/exec/trial_stage.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/query/explain.h"
#include "mongo/db/query/find_common.h"
#include "mongo/db/query/mock_yield_policies.h"
#include "mongo/db/query/plan_yield_policy_impl.h"
@@ -149,14 +155,11 @@ PlanExecutorImpl::PlanExecutorImpl(OperationContext* opCtx,
invariant(!_expCtx || _expCtx->opCtx == _opCtx);
invariant(!_cq || !_expCtx || _cq->getExpCtx() == _expCtx);
- // Both ChangeStreamProxy and CollectionScan stages can provide oplog tracking info, such as
- // post batch resume token, or latest oplog timestamp. If either of these two stages is present
- // in the execution tree, then cache it for fast retrieval of the oplog info, avoiding the need
- // traverse the tree in runtime.
- if (auto changeStreamProxy = getStageByType(_root.get(), STAGE_CHANGE_STREAM_PROXY)) {
- _oplogTrackingStage = static_cast<ChangeStreamProxyStage*>(changeStreamProxy);
- } else if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN)) {
- _oplogTrackingStage = static_cast<CollectionScan*>(collectionScan);
+ // If this PlanExecutor is executing a COLLSCAN, keep a pointer directly to the COLLSCAN stage.
+ // This is used for change streams in order to keep the the latest oplog timestamp and post
+ // batch resume token up to date as the oplog scan progresses.
+ if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN)) {
+ _collScanStage = static_cast<CollectionScan*>(collectionScan);
}
// We may still need to initialize _nss from either collection or _cq.
@@ -582,41 +585,15 @@ bool PlanExecutorImpl::isDisposed() const {
}
Timestamp PlanExecutorImpl::getLatestOplogTimestamp() const {
- if (!_oplogTrackingStage) {
- return {};
- }
-
- const auto stageType = _oplogTrackingStage->stageType();
- if (stageType == STAGE_COLLSCAN) {
- return static_cast<const CollectionScan*>(_oplogTrackingStage)->getLatestOplogTimestamp();
- } else {
- invariant(stageType == STAGE_CHANGE_STREAM_PROXY);
- return static_cast<const ChangeStreamProxyStage*>(_oplogTrackingStage)
- ->getLatestOplogTimestamp();
- }
+ return _collScanStage ? _collScanStage->getLatestOplogTimestamp() : Timestamp{};
}
BSONObj PlanExecutorImpl::getPostBatchResumeToken() const {
static const BSONObj kEmptyPBRT;
- if (!_oplogTrackingStage) {
- return kEmptyPBRT;
- }
-
- const auto stageType = _oplogTrackingStage->stageType();
- if (stageType == STAGE_COLLSCAN) {
- return static_cast<const CollectionScan*>(_oplogTrackingStage)->getPostBatchResumeToken();
- } else {
- invariant(stageType == STAGE_CHANGE_STREAM_PROXY);
- return static_cast<const ChangeStreamProxyStage*>(_oplogTrackingStage)
- ->getPostBatchResumeToken();
- }
+ return _collScanStage ? _collScanStage->getPostBatchResumeToken() : kEmptyPBRT;
}
PlanExecutor::LockPolicy PlanExecutorImpl::lockPolicy() const {
- if (isPipelineExecutor()) {
- return LockPolicy::kLocksInternally;
- }
-
// If this PlanExecutor is simply unspooling queued data, then there is no need to acquire
// locks.
if (_root->stageType() == StageType::STAGE_QUEUED_DATA) {
@@ -626,8 +603,100 @@ PlanExecutor::LockPolicy PlanExecutorImpl::lockPolicy() const {
return LockPolicy::kLockExternally;
}
-bool PlanExecutorImpl::isPipelineExecutor() const {
- return _root->stageType() == StageType::STAGE_PIPELINE_PROXY ||
- _root->stageType() == StageType::STAGE_CHANGE_STREAM_PROXY;
+std::string PlanExecutorImpl::getPlanSummary() const {
+ return Explain::getPlanSummary(_root.get());
+}
+
+void PlanExecutorImpl::getSummaryStats(PlanSummaryStats* statsOut) const {
+ invariant(statsOut);
+
+ // We can get some of the fields we need from the common stats stored in the
+ // root stage of the plan tree.
+ const CommonStats* common = _root->getCommonStats();
+ statsOut->nReturned = common->advanced;
+
+ // The other fields are aggregations over the stages in the plan tree. We flatten
+ // the tree into a list and then compute these aggregations.
+ std::vector<const PlanStage*> stages;
+ Explain::flattenExecTree(_root.get(), &stages);
+
+ statsOut->totalKeysExamined = 0;
+ statsOut->totalDocsExamined = 0;
+
+ for (size_t i = 0; i < stages.size(); i++) {
+ statsOut->totalKeysExamined +=
+ Explain::getKeysExamined(stages[i]->stageType(), stages[i]->getSpecificStats());
+ statsOut->totalDocsExamined +=
+ Explain::getDocsExamined(stages[i]->stageType(), stages[i]->getSpecificStats());
+
+ if (isSortStageType(stages[i]->stageType())) {
+ statsOut->hasSortStage = true;
+
+ auto sortStage = static_cast<const SortStage*>(stages[i]);
+ auto sortStats = static_cast<const SortStats*>(sortStage->getSpecificStats());
+ statsOut->usedDisk = sortStats->wasDiskUsed;
+ }
+
+ if (STAGE_IXSCAN == stages[i]->stageType()) {
+ const IndexScan* ixscan = static_cast<const IndexScan*>(stages[i]);
+ const IndexScanStats* ixscanStats =
+ static_cast<const IndexScanStats*>(ixscan->getSpecificStats());
+ statsOut->indexesUsed.insert(ixscanStats->indexName);
+ } else if (STAGE_COUNT_SCAN == stages[i]->stageType()) {
+ const CountScan* countScan = static_cast<const CountScan*>(stages[i]);
+ const CountScanStats* countScanStats =
+ static_cast<const CountScanStats*>(countScan->getSpecificStats());
+ statsOut->indexesUsed.insert(countScanStats->indexName);
+ } else if (STAGE_IDHACK == stages[i]->stageType()) {
+ const IDHackStage* idHackStage = static_cast<const IDHackStage*>(stages[i]);
+ const IDHackStats* idHackStats =
+ static_cast<const IDHackStats*>(idHackStage->getSpecificStats());
+ statsOut->indexesUsed.insert(idHackStats->indexName);
+ } else if (STAGE_DISTINCT_SCAN == stages[i]->stageType()) {
+ const DistinctScan* distinctScan = static_cast<const DistinctScan*>(stages[i]);
+ const DistinctScanStats* distinctScanStats =
+ static_cast<const DistinctScanStats*>(distinctScan->getSpecificStats());
+ statsOut->indexesUsed.insert(distinctScanStats->indexName);
+ } else if (STAGE_TEXT == stages[i]->stageType()) {
+ const TextStage* textStage = static_cast<const TextStage*>(stages[i]);
+ const TextStats* textStats =
+ static_cast<const TextStats*>(textStage->getSpecificStats());
+ statsOut->indexesUsed.insert(textStats->indexName);
+ } else if (STAGE_GEO_NEAR_2D == stages[i]->stageType() ||
+ STAGE_GEO_NEAR_2DSPHERE == stages[i]->stageType()) {
+ const NearStage* nearStage = static_cast<const NearStage*>(stages[i]);
+ const NearStats* nearStats =
+ static_cast<const NearStats*>(nearStage->getSpecificStats());
+ statsOut->indexesUsed.insert(nearStats->indexName);
+ } else if (STAGE_CACHED_PLAN == stages[i]->stageType()) {
+ const CachedPlanStage* cachedPlan = static_cast<const CachedPlanStage*>(stages[i]);
+ const CachedPlanStats* cachedStats =
+ static_cast<const CachedPlanStats*>(cachedPlan->getSpecificStats());
+ statsOut->replanReason = cachedStats->replanReason;
+ } else if (STAGE_MULTI_PLAN == stages[i]->stageType()) {
+ statsOut->fromMultiPlanner = true;
+ } else if (STAGE_COLLSCAN == stages[i]->stageType()) {
+ statsOut->collectionScans++;
+ const auto collScan = static_cast<const CollectionScan*>(stages[i]);
+ const auto collScanStats =
+ static_cast<const CollectionScanStats*>(collScan->getSpecificStats());
+ if (!collScanStats->tailable)
+ statsOut->collectionScansNonTailable++;
+ }
+ }
+}
+
+BSONObj PlanExecutorImpl::getStats() const {
+ // Serialize all stats from the winning plan.
+ auto mps = getMultiPlanStage();
+ auto winningPlanStats =
+ mps ? std::move(mps->getStats()->children[mps->bestPlanIdx()]) : _root->getStats();
+ return Explain::statsToBSON(*winningPlanStats);
+}
+
+MultiPlanStage* PlanExecutorImpl::getMultiPlanStage() const {
+ PlanStage* ps = getStageByType(_root.get(), StageType::STAGE_MULTI_PLAN);
+ invariant(ps == nullptr || ps->stageType() == StageType::STAGE_MULTI_PLAN);
+ return static_cast<MultiPlanStage*>(ps);
}
} // namespace mongo
diff --git a/src/mongo/db/query/plan_executor_impl.h b/src/mongo/db/query/plan_executor_impl.h
index ba0bdd58a9b..ac586aef59b 100644
--- a/src/mongo/db/query/plan_executor_impl.h
+++ b/src/mongo/db/query/plan_executor_impl.h
@@ -32,6 +32,7 @@
#include <boost/optional.hpp>
#include <queue>
+#include "mongo/db/exec/multi_plan.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/query/query_solution.h"
@@ -39,6 +40,7 @@
namespace mongo {
class CappedInsertNotifier;
+class CollectionScan;
struct CappedInsertNotifierData;
class PlanExecutorImpl : public PlanExecutor {
@@ -83,7 +85,9 @@ public:
Timestamp getLatestOplogTimestamp() const final;
BSONObj getPostBatchResumeToken() const final;
LockPolicy lockPolicy() const final;
- bool isPipelineExecutor() const final;
+ std::string getPlanSummary() const final;
+ void getSummaryStats(PlanSummaryStats* statsOut) const final;
+ BSONObj getStats() const final;
/**
* Same as restoreState() but without the logic to retry if a WriteConflictException is thrown.
@@ -92,6 +96,11 @@ public:
*/
void restoreStateWithoutRetrying();
+ /**
+ * Return a pointer to this executor's MultiPlanStage, or nullptr if it does not have one.
+ */
+ MultiPlanStage* getMultiPlanStage() const;
+
private:
/**
* Called on construction in order to ensure that when callers receive a new instance of a
@@ -177,12 +186,11 @@ private:
enum { kUsable, kSaved, kDetached, kDisposed } _currentState = kUsable;
- // A pointer either to a ChangeStreamProxy or a CollectionScan stage, if present in the
- // execution tree, or nullptr otherwise. We cache it to avoid the need to traverse the execution
- // tree in runtime when the executor is requested to return the oplog tracking info. Since this
- // info is provided by either of these stages, the executor will simply delegate the request to
- // the cached stage.
- const PlanStage* _oplogTrackingStage{nullptr};
+ // A pointer either to a CollectionScan stage, if present in the execution tree, or nullptr
+ // otherwise. We cache it to avoid the need to traverse the execution tree in runtime when the
+ // executor is requested to return the oplog tracking info. Since this info is provided by
+ // either of these stages, the executor will simply delegate the request to the cached stage.
+ const CollectionScan* _collScanStage{nullptr};
};
} // namespace mongo
diff --git a/src/mongo/db/query/plan_executor_sbe.h b/src/mongo/db/query/plan_executor_sbe.h
index 776e9d1665c..5ff2f0fe05d 100644
--- a/src/mongo/db/query/plan_executor_sbe.h
+++ b/src/mongo/db/query/plan_executor_sbe.h
@@ -107,8 +107,17 @@ public:
return LockPolicy::kLocksInternally;
}
- bool isPipelineExecutor() const override {
- return false;
+ // TODO: Support 'planSummary' for SBE.
+ std::string getPlanSummary() const override {
+ return "unsupported";
+ }
+
+ // TODO: Support collection of plan summary stats for SBE.
+ void getSummaryStats(PlanSummaryStats* statsOut) const override {}
+
+ // TODO: Support debug stats for SBE.
+ BSONObj getStats() const override {
+ return BSONObj{};
}
private:
diff --git a/src/mongo/db/query/plan_yield_policy.h b/src/mongo/db/query/plan_yield_policy.h
index 3c8a85cbbd7..ccd454a3503 100644
--- a/src/mongo/db/query/plan_yield_policy.h
+++ b/src/mongo/db/query/plan_yield_policy.h
@@ -69,8 +69,9 @@ public:
// Can be used in one of the following scenarios:
// - The caller will hold a lock continuously for the lifetime of this PlanExecutor.
// - This PlanExecutor doesn't logically belong to a Collection, and so does not need to be
- // locked during execution. For example, a PlanExecutor containing a PipelineProxyStage
- // which is being used to execute an aggregation pipeline.
+ // locked during execution. For example, this yield policy is used for PlanExecutors
+ // which unspool queued metadata ("virtual collection scans") for listCollections and
+ // listIndexes.
NO_YIELD,
// Will not yield locks or storage engine resources, but will check for interrupt.
diff --git a/src/mongo/db/query/stage_types.h b/src/mongo/db/query/stage_types.h
index c45c3e01dd8..d3d48467e03 100644
--- a/src/mongo/db/query/stage_types.h
+++ b/src/mongo/db/query/stage_types.h
@@ -83,10 +83,6 @@ enum StageType {
STAGE_PROJECTION_COVERED,
STAGE_PROJECTION_SIMPLE,
- // Stages for running aggregation pipelines.
- STAGE_CHANGE_STREAM_PROXY,
- STAGE_PIPELINE_PROXY,
-
STAGE_QUEUED_DATA,
STAGE_RECORD_STORE_FAST_COUNT,
STAGE_RETURN_KEY,
diff --git a/src/mongo/db/s/range_deletion_util.cpp b/src/mongo/db/s/range_deletion_util.cpp
index d32df41ed5a..f6790d26327 100644
--- a/src/mongo/db/s/range_deletion_util.cpp
+++ b/src/mongo/db/s/range_deletion_util.cpp
@@ -219,7 +219,7 @@ StatusWith<int> deleteNextBatch(OperationContext* opCtx,
"min"_attr = redact(min),
"max"_attr = redact(max),
"namespace"_attr = nss,
- "stats"_attr = Explain::getWinningPlanStats(exec.get()),
+ "stats"_attr = redact(exec->getStats()),
"error"_attr = redact(ex.toStatus()));
throw;
}
diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp
index 4ed8b61720b..075ededc8e9 100644
--- a/src/mongo/dbtests/query_plan_executor.cpp
+++ b/src/mongo/dbtests/query_plan_executor.cpp
@@ -42,7 +42,6 @@
#include "mongo/db/exec/collection_scan.h"
#include "mongo/db/exec/fetch.h"
#include "mongo/db/exec/index_scan.h"
-#include "mongo/db/exec/pipeline_proxy.h"
#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/json.h"
@@ -51,6 +50,7 @@
#include "mongo/db/pipeline/document_source_cursor.h"
#include "mongo/db/pipeline/expression_context_for_test.h"
#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/pipeline/plan_executor_pipeline.h"
#include "mongo/db/query/plan_executor_factory.h"
#include "mongo/db/query/query_solution.h"
#include "mongo/dbtests/dbtests.h"
@@ -223,18 +223,8 @@ TEST_F(PlanExecutorTest, DropIndexScanAgg) {
collection, std::move(innerExec), _expCtx, DocumentSourceCursor::CursorType::kRegular);
auto pipeline = Pipeline::create({cursorSource}, _expCtx);
- // Create the output PlanExecutor that pulls results from the pipeline.
- auto ws = std::make_unique<WorkingSet>();
- auto proxy = std::make_unique<PipelineProxyStage>(_expCtx.get(), std::move(pipeline), ws.get());
-
- auto statusWithPlanExecutor =
- plan_executor_factory::make(_expCtx,
- std::move(ws),
- std::move(proxy),
- collection,
- PlanYieldPolicy::YieldPolicy::NO_YIELD);
- ASSERT_OK(statusWithPlanExecutor.getStatus());
- auto outerExec = std::move(statusWithPlanExecutor.getValue());
+ auto outerExec =
+ plan_executor_factory::make(_expCtx, std::move(pipeline), false /* isChangeStream */);
dropCollection();
diff --git a/src/mongo/dbtests/query_stage_multiplan.cpp b/src/mongo/dbtests/query_stage_multiplan.cpp
index aa9abb89be2..83337ca69ec 100644
--- a/src/mongo/dbtests/query_stage_multiplan.cpp
+++ b/src/mongo/dbtests/query_stage_multiplan.cpp
@@ -556,7 +556,7 @@ TEST_F(QueryStageMultiPlanTest, MPSSummaryStats) {
exec->executePlan();
PlanSummaryStats stats;
- Explain::getSummaryStats(*exec, &stats);
+ exec->getSummaryStats(&stats);
// If only the winning plan's stats are recorded, we should not have examined more than the
// total number of documents/index keys.