diff options
Diffstat (limited to 'src/mongo')
32 files changed, 762 insertions, 957 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 61e3abba115..c5a203c7163 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1079,7 +1079,6 @@ env.Library( 'exec/and_hash.cpp', 'exec/and_sorted.cpp', 'exec/cached_plan.cpp', - 'exec/change_stream_proxy.cpp', 'exec/collection_scan.cpp', 'exec/count.cpp', 'exec/count_scan.cpp', @@ -1098,7 +1097,6 @@ env.Library( 'exec/multi_plan.cpp', 'exec/near.cpp', 'exec/or.cpp', - 'exec/pipeline_proxy.cpp', 'exec/plan_cache_util.cpp', 'exec/plan_stage.cpp', 'exec/projection.cpp', @@ -1128,6 +1126,7 @@ env.Library( 'pipeline/document_source_cursor.cpp', 'pipeline/document_source_geo_near_cursor.cpp', 'pipeline/pipeline_d.cpp', + 'pipeline/plan_executor_pipeline.cpp', 'query/classic_stage_builder.cpp', 'query/explain.cpp', 'query/find.cpp', diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp index 125c5fa13b6..845c31e48b9 100644 --- a/src/mongo/db/clientcursor.cpp +++ b/src/mongo/db/clientcursor.cpp @@ -92,7 +92,7 @@ ClientCursor::ClientCursor(ClientCursorParams params, _operationUsingCursor(operationUsingCursor), _lastUseDate(now), _createdDate(now), - _planSummary(Explain::getPlanSummary(_exec.get())), + _planSummary(_exec->getPlanSummary()), _opKey(operationUsingCursor->getOperationKey()) { invariant(_exec); invariant(_operationUsingCursor); diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp index 54b8a808781..6165ef9cdb5 100644 --- a/src/mongo/db/commands/count_cmd.cpp +++ b/src/mongo/db/commands/count_cmd.cpp @@ -256,22 +256,20 @@ public: auto curOp = CurOp::get(opCtx); { stdx::lock_guard<Client> lk(*opCtx->getClient()); - curOp->setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); + curOp->setPlanSummary_inlock(exec->getPlanSummary()); } exec->executePlan(); PlanSummaryStats summaryStats; - Explain::getSummaryStats(*exec, &summaryStats); + exec->getSummaryStats(&summaryStats); if (collection) { CollectionQueryInfo::get(collection).notifyOfQuery(opCtx, collection, summaryStats); } curOp->debug().setPlanSummaryMetrics(summaryStats); if (curOp->shouldDBProfile()) { - BSONObjBuilder execStatsBob; - Explain::getWinningPlanStats(exec.get(), &execStatsBob); - curOp->debug().execStats = execStatsBob.obj(); + curOp->debug().execStats = exec->getStats(); } // Plan is done executing. We just need to pull the count out of the root stage. diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp index d43ab866f4f..fc8977182e8 100644 --- a/src/mongo/db/commands/distinct.cpp +++ b/src/mongo/db/commands/distinct.cpp @@ -239,8 +239,7 @@ public: { stdx::lock_guard<Client> lk(*opCtx->getClient()); - CurOp::get(opCtx)->setPlanSummary_inlock( - Explain::getPlanSummary(executor.getValue().get())); + CurOp::get(opCtx)->setPlanSummary_inlock(executor.getValue()->getPlanSummary()); } const auto key = cmdObj[ParsedDistinct::kKeyField].valuestrsafe(); @@ -286,8 +285,7 @@ public: "stats: {stats}", "Plan executor error during distinct command", "error"_attr = exception.toStatus(), - "stats"_attr = - redact(Explain::getWinningPlanStats(executor.getValue().get()))); + "stats"_attr = redact(executor.getValue()->getStats())); exception.addContext("Executor error during distinct command"); throw; @@ -297,16 +295,14 @@ public: // Get summary information about the plan. PlanSummaryStats stats; - Explain::getSummaryStats(*executor.getValue(), &stats); + executor.getValue()->getSummaryStats(&stats); if (collection) { CollectionQueryInfo::get(collection).notifyOfQuery(opCtx, collection, stats); } curOp->debug().setPlanSummaryMetrics(stats); if (curOp->shouldDBProfile()) { - BSONObjBuilder execStatsBob; - Explain::getWinningPlanStats(executor.getValue().get(), &execStatsBob); - curOp->debug().execStats = execStatsBob.obj(); + curOp->debug().execStats = executor.getValue()->getStats(); } BSONArrayBuilder valueListBuilder(result.subarrayStart("values")); diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index 33b489e2235..143a0d4f593 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -99,7 +99,7 @@ boost::optional<BSONObj> advanceExecutor(OperationContext* opCtx, "Plan executor error during findAndModify: {error}, stats: {stats}", "Plan executor error during findAndModify", "error"_attr = exception.toStatus(), - "stats"_attr = redact(Explain::getWinningPlanStats(exec))); + "stats"_attr = redact(exec->getStats())); exception.addContext("Plan executor error during findAndModify"); throw; @@ -469,7 +469,7 @@ public: { stdx::lock_guard<Client> lk(*opCtx->getClient()); - CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); + CurOp::get(opCtx)->setPlanSummary_inlock(exec->getPlanSummary()); } auto docFound = advanceExecutor(opCtx, exec.get(), args.isRemove()); @@ -478,7 +478,7 @@ public: // multiple times. PlanSummaryStats summaryStats; - Explain::getSummaryStats(*exec, &summaryStats); + exec->getSummaryStats(&summaryStats); if (collection) { CollectionQueryInfo::get(collection).notifyOfQuery(opCtx, collection, summaryStats); } @@ -488,9 +488,7 @@ public: opDebug->additiveMetrics.ndeleted = DeleteStage::getNumDeleted(*exec); if (curOp->shouldDBProfile()) { - BSONObjBuilder execStatsBob; - Explain::getWinningPlanStats(exec.get(), &execStatsBob); - curOp->debug().execStats = execStatsBob.obj(); + curOp->debug().execStats = exec->getStats(); } recordStatsForTopCommand(opCtx); @@ -550,7 +548,7 @@ public: { stdx::lock_guard<Client> lk(*opCtx->getClient()); - CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); + CurOp::get(opCtx)->setPlanSummary_inlock(exec->getPlanSummary()); } auto docFound = advanceExecutor(opCtx, exec.get(), args.isRemove()); @@ -559,7 +557,7 @@ public: // multiple times. PlanSummaryStats summaryStats; - Explain::getSummaryStats(*exec, &summaryStats); + exec->getSummaryStats(&summaryStats); if (collection) { CollectionQueryInfo::get(collection).notifyOfQuery(opCtx, collection, summaryStats); } @@ -567,9 +565,7 @@ public: opDebug->setPlanSummaryMetrics(summaryStats); if (curOp->shouldDBProfile()) { - BSONObjBuilder execStatsBob; - Explain::getWinningPlanStats(exec.get(), &execStatsBob); - curOp->debug().execStats = execStatsBob.obj(); + curOp->debug().execStats = exec->getStats(); } recordStatsForTopCommand(opCtx); diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index d333b91b8af..84501a9cbb7 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -418,7 +418,7 @@ public: { stdx::lock_guard<Client> lk(*opCtx->getClient()); - CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); + CurOp::get(opCtx)->setPlanSummary_inlock(exec->getPlanSummary()); } if (!collection) { @@ -472,7 +472,7 @@ public: "stats: {stats}", "Plan executor error during find command", "error"_attr = exception.toStatus(), - "stats"_attr = redact(Explain::getWinningPlanStats(exec.get()))); + "stats"_attr = redact(exec->getStats())); exception.addContext("Executor error during find command"); throw; diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index d76c295f259..33f731e7cde 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -333,7 +333,7 @@ public: "getMore command executor error: {error}, stats: {stats}", "getMore command executor error", "error"_attr = exception.toStatus(), - "stats"_attr = redact(Explain::getWinningPlanStats(exec))); + "stats"_attr = redact(exec->getStats())); exception.addContext("Executor error during getMore"); throw; @@ -508,7 +508,7 @@ public: exec->reattachToOperationContext(opCtx); exec->restoreState(); - auto planSummary = Explain::getPlanSummary(exec); + auto planSummary = exec->getPlanSummary(); { stdx::lock_guard<Client> lk(*opCtx->getClient()); curOp->setPlanSummary_inlock(planSummary); @@ -553,7 +553,7 @@ public: // obtain these values we need to take a diff of the pre-execution and post-execution // metrics, as they accumulate over the course of a cursor's lifetime. PlanSummaryStats preExecutionStats; - Explain::getSummaryStats(*exec, &preExecutionStats); + exec->getSummaryStats(&preExecutionStats); // Mark this as an AwaitData operation if appropriate. if (cursorPin->isAwaitData() && !disableAwaitDataFailpointActive) { @@ -600,7 +600,7 @@ public: &numResults); PlanSummaryStats postExecutionStats; - Explain::getSummaryStats(*exec, &postExecutionStats); + exec->getSummaryStats(&postExecutionStats); postExecutionStats.totalKeysExamined -= preExecutionStats.totalKeysExamined; postExecutionStats.totalDocsExamined -= preExecutionStats.totalDocsExamined; curOp->debug().setPlanSummaryMetrics(postExecutionStats); @@ -613,9 +613,7 @@ public: if (cursorPin->getExecutor()->lockPolicy() != PlanExecutor::LockPolicy::kLocksInternally && curOp->shouldDBProfile()) { - BSONObjBuilder execStatsBob; - Explain::getWinningPlanStats(exec, &execStatsBob); - curOp->debug().execStats = execStatsBob.obj(); + curOp->debug().execStats = exec->getStats(); } if (shouldSaveCursor) { diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index daa7cae1f95..c414b5ac4ac 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -42,7 +42,6 @@ #include "mongo/db/curop.h" #include "mongo/db/cursor_manager.h" #include "mongo/db/db_raii.h" -#include "mongo/db/exec/change_stream_proxy.h" #include "mongo/db/exec/document_value/document.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/namespace_string.h" @@ -54,6 +53,7 @@ #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/pipeline_d.h" +#include "mongo/db/pipeline/plan_executor_pipeline.h" #include "mongo/db/pipeline/process_interface/mongo_process_interface.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/query/collection_query_info.h" @@ -166,9 +166,9 @@ bool handleCursorCommand(OperationContext* opCtx, invariant(exec); bool stashedResult = false; + // We are careful to avoid ever calling 'getNext()' on the PlanExecutor when the batchSize is + // zero to avoid doing any query execution work. for (int objCount = 0; objCount < batchSize; objCount++) { - // The initial getNext() on a PipelineProxyStage may be very expensive so we don't - // do it when batchSize is 0 since that indicates a desire for a fast return. PlanExecutor::ExecState state; BSONObj nextDoc; @@ -185,7 +185,7 @@ bool handleCursorCommand(OperationContext* opCtx, "Aggregate command executor error: {error}, stats: {stats}", "Aggregate command executor error", "error"_attr = exception.toStatus(), - "stats"_attr = redact(Explain::getWinningPlanStats(exec))); + "stats"_attr = redact(exec->getStats())); exception.addContext("PlanExecutor error during aggregation"); throw; @@ -445,35 +445,6 @@ std::vector<std::unique_ptr<Pipeline, PipelineDeleter>> createExchangePipelinesI return pipelines; } - -/** - * Create a PlanExecutor to execute the given 'pipeline'. - */ -std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> createOuterPipelineProxyExecutor( - OperationContext* opCtx, - const NamespaceString& nss, - std::unique_ptr<Pipeline, PipelineDeleter> pipeline, - bool hasChangeStream) { - boost::intrusive_ptr<ExpressionContext> expCtx(pipeline->getContext()); - - // Transfer ownership of the Pipeline to the PipelineProxyStage. - auto ws = std::make_unique<WorkingSet>(); - auto proxy = hasChangeStream - ? std::make_unique<ChangeStreamProxyStage>(expCtx.get(), std::move(pipeline), ws.get()) - : std::make_unique<PipelineProxyStage>(expCtx.get(), std::move(pipeline), ws.get()); - - // This PlanExecutor will simply forward requests to the Pipeline, so does not need - // to yield or to be registered with any collection's CursorManager to receive - // invalidations. The Pipeline may contain PlanExecutors which *are* yielding - // PlanExecutors and which *are* registered with their respective collection's - // CursorManager - return uassertStatusOK(plan_executor_factory::make(std::move(expCtx), - std::move(ws), - std::move(proxy), - nullptr, - PlanYieldPolicy::YieldPolicy::NO_YIELD, - nss)); -} } // namespace Status runAggregate(OperationContext* opCtx, @@ -672,8 +643,13 @@ Status runAggregate(OperationContext* opCtx, auto pipelines = createExchangePipelinesIfNeeded(opCtx, expCtx, request, std::move(pipeline), uuid); for (auto&& pipelineIt : pipelines) { - execs.emplace_back(createOuterPipelineProxyExecutor( - opCtx, nss, std::move(pipelineIt), liteParsedPipeline.hasChangeStream())); + // There are separate ExpressionContexts for each exchange pipeline, so make sure to + // pass the pipeline's ExpressionContext to the plan executor factory. + auto pipelineExpCtx = pipelineIt->getContext(); + execs.emplace_back( + plan_executor_factory::make(std::move(pipelineExpCtx), + std::move(pipelineIt), + liteParsedPipeline.hasChangeStream())); } // With the pipelines created, we can relinquish locks as they will manage the locks @@ -685,7 +661,7 @@ Status runAggregate(OperationContext* opCtx, } { - auto planSummary = Explain::getPlanSummary(execs[0].get()); + auto planSummary = execs[0]->getPlanSummary(); stdx::lock_guard<Client> lk(*opCtx->getClient()); curOp->setPlanSummary_inlock(std::move(planSummary)); } @@ -735,8 +711,8 @@ Status runAggregate(OperationContext* opCtx, if (expCtx->explain) { auto explainExecutor = pins[0]->getExecutor(); auto bodyBuilder = result->getBodyBuilder(); - if (explainExecutor->isPipelineExecutor()) { - Explain::explainPipelineExecutor(explainExecutor, *(expCtx->explain), &bodyBuilder); + if (auto pipelineExec = dynamic_cast<PlanExecutorPipeline*>(explainExecutor)) { + Explain::explainPipelineExecutor(pipelineExec, *(expCtx->explain), &bodyBuilder); } else { invariant(pins[0]->getExecutor()->lockPolicy() == PlanExecutor::LockPolicy::kLockExternally); @@ -759,7 +735,7 @@ Status runAggregate(OperationContext* opCtx, } PlanSummaryStats stats; - Explain::getSummaryStats(*(pins[0].getCursor()->getExecutor()), &stats); + pins[0].getCursor()->getExecutor()->getSummaryStats(&stats); curOp->debug().setPlanSummaryMetrics(stats); curOp->debug().nreturned = stats.nReturned; // For an optimized away pipeline, signal the cache that a query operation has completed. diff --git a/src/mongo/db/exec/change_stream_proxy.cpp b/src/mongo/db/exec/change_stream_proxy.cpp deleted file mode 100644 index c16255a897b..00000000000 --- a/src/mongo/db/exec/change_stream_proxy.cpp +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/db/exec/change_stream_proxy.h" - -#include "mongo/db/pipeline/pipeline_d.h" -#include "mongo/db/pipeline/resume_token.h" -#include "mongo/db/repl/speculative_majority_read_info.h" - -namespace mongo { - -const char* ChangeStreamProxyStage::kStageType = "CHANGE_STREAM_PROXY"; - -ChangeStreamProxyStage::ChangeStreamProxyStage(ExpressionContext* expCtx, - std::unique_ptr<Pipeline, PipelineDeleter> pipeline, - WorkingSet* ws) - : PipelineProxyStage(expCtx, std::move(pipeline), ws, kStageType) { - // Set _postBatchResumeToken to the initial PBRT that was added to the expression context during - // pipeline construction, and use it to obtain the starting time for _latestOplogTimestamp. - invariant(!_pipeline->getContext()->initialPostBatchResumeToken.isEmpty()); - _postBatchResumeToken = _pipeline->getContext()->initialPostBatchResumeToken.getOwned(); - _latestOplogTimestamp = ResumeToken::parse(_postBatchResumeToken).getData().clusterTime; -} - -boost::optional<Document> ChangeStreamProxyStage::getNext() { - if (auto next = _pipeline->getNext()) { - // While we have more results to return, we track both the timestamp and the resume token of - // the latest event observed in the oplog, the latter via its sort key metadata field. - _validateResumeToken(*next); - _latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get()); - _postBatchResumeToken = next->metadata().getSortKey().getDocument().toBson(); - _setSpeculativeReadTimestamp(); - return next; - } - - // We ran out of results to return. Check whether the oplog cursor has moved forward since the - // last recorded timestamp. Because we advance _latestOplogTimestamp for every event we return, - // if the new time is higher than the last then we are guaranteed not to have already returned - // any events at this timestamp. We can set _postBatchResumeToken to a new high-water-mark token - // at the current clusterTime. - auto highWaterMark = PipelineD::getLatestOplogTimestamp(_pipeline.get()); - if (highWaterMark > _latestOplogTimestamp) { - auto token = ResumeToken::makeHighWaterMarkToken(highWaterMark); - _postBatchResumeToken = token.toDocument().toBson(); - _latestOplogTimestamp = highWaterMark; - _setSpeculativeReadTimestamp(); - } - return boost::none; -} - -void ChangeStreamProxyStage::_validateResumeToken(const Document& event) const { - // If we are producing output to be merged on mongoS, then no stages can have modified the _id. - if (_includeMetaData) { - return; - } - // Confirm that the document _id field matches the original resume token in the sort key field. - auto eventBSON = event.toBson(); - auto resumeToken = event.metadata().getSortKey(); - auto idField = eventBSON.getObjectField("_id"); - invariant(!resumeToken.missing()); - uassert(ErrorCodes::ChangeStreamFatalError, - str::stream() << "Encountered an event whose _id field, which contains the resume " - "token, was modified by the pipeline. Modifying the _id field of an " - "event makes it impossible to resume the stream from that point. Only " - "transformations that retain the unmodified _id field are allowed. " - "Expected: " - << BSON("_id" << resumeToken) << " but found: " - << (eventBSON["_id"] ? BSON("_id" << eventBSON["_id"]) : BSONObj()), - (resumeToken.getType() == BSONType::Object) && - idField.binaryEqual(resumeToken.getDocument().toBson())); -} - -void ChangeStreamProxyStage::_setSpeculativeReadTimestamp() { - repl::SpeculativeMajorityReadInfo& speculativeMajorityReadInfo = - repl::SpeculativeMajorityReadInfo::get(_pipeline->getContext()->opCtx); - if (speculativeMajorityReadInfo.isSpeculativeRead() && !_latestOplogTimestamp.isNull()) { - speculativeMajorityReadInfo.setSpeculativeReadTimestampForward(_latestOplogTimestamp); - } -} - -std::unique_ptr<PlanStageStats> ChangeStreamProxyStage::getStats() { - std::unique_ptr<PlanStageStats> ret = - std::make_unique<PlanStageStats>(CommonStats(kStageType), STAGE_CHANGE_STREAM_PROXY); - ret->specific = std::make_unique<CollectionScanStats>(); - return ret; -} - -} // namespace mongo diff --git a/src/mongo/db/exec/change_stream_proxy.h b/src/mongo/db/exec/change_stream_proxy.h deleted file mode 100644 index 6d115b78885..00000000000 --- a/src/mongo/db/exec/change_stream_proxy.h +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/db/exec/pipeline_proxy.h" - -namespace mongo { - -/** - * ChangeStreamProxyStage is a drop-in replacement for PipelineProxyStage, intended to manage the - * serialization of change stream pipeline output from Document to BSON. In particular, it is - * additionally responsible for tracking the latestOplogTimestamps and postBatchResumeTokens that - * are necessary for correct merging on mongoS and, in the latter case, must also be provided to - * mongoD clients. - */ -class ChangeStreamProxyStage final : public PipelineProxyStage { -public: - static const char* kStageType; - - /** - * The 'pipeline' argument must be a $changeStream pipeline. Passing a non-$changeStream into - * the constructor will cause an invariant() to fail. - */ - ChangeStreamProxyStage(ExpressionContext* expCtx, - std::unique_ptr<Pipeline, PipelineDeleter> pipeline, - WorkingSet* ws); - - /** - * Returns an empty PlanStageStats object. - */ - std::unique_ptr<PlanStageStats> getStats() final; - - /** - * Passes through the latest oplog timestamp from the proxied pipeline. We only expose the oplog - * timestamp in the event that we need to merge on mongoS. - */ - Timestamp getLatestOplogTimestamp() const { - return _includeMetaData ? _latestOplogTimestamp : Timestamp(); - } - - /** - * Passes through the most recent resume token from the proxied pipeline. - */ - BSONObj getPostBatchResumeToken() const { - return _postBatchResumeToken; - } - - StageType stageType() const final { - return STAGE_CHANGE_STREAM_PROXY; - } - -protected: - boost::optional<Document> getNext() final; - -private: - /** - * Verifies that the docs's resume token has not been modified. - */ - void _validateResumeToken(const Document& event) const; - - /** - * Set the speculative majority read timestamp if we have scanned up to a certain oplog - * timestamp. - */ - void _setSpeculativeReadTimestamp(); - - Timestamp _latestOplogTimestamp; - BSONObj _postBatchResumeToken; -}; -} // namespace mongo diff --git a/src/mongo/db/exec/pipeline_proxy.cpp b/src/mongo/db/exec/pipeline_proxy.cpp deleted file mode 100644 index c3014ce34a5..00000000000 --- a/src/mongo/db/exec/pipeline_proxy.cpp +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/db/exec/pipeline_proxy.h" - -#include <memory> - -#include "mongo/db/pipeline/document_source.h" -#include "mongo/db/pipeline/expression_context.h" -#include "mongo/db/pipeline/pipeline_d.h" - -namespace mongo { - -using boost::intrusive_ptr; -using std::shared_ptr; -using std::unique_ptr; -using std::vector; - -const char* PipelineProxyStage::kStageType = "PIPELINE_PROXY"; - -PipelineProxyStage::PipelineProxyStage(ExpressionContext* expCtx, - std::unique_ptr<Pipeline, PipelineDeleter> pipeline, - WorkingSet* ws) - : PipelineProxyStage(expCtx, std::move(pipeline), ws, kStageType) {} - -PipelineProxyStage::PipelineProxyStage(ExpressionContext* expCtx, - std::unique_ptr<Pipeline, PipelineDeleter> pipeline, - WorkingSet* ws, - const char* stageTypeName) - : PlanStage(stageTypeName, expCtx), - _pipeline(std::move(pipeline)), - _includeMetaData(_pipeline->getContext()->needsMerge), // send metadata to merger - _ws(ws) { - // We take over responsibility for disposing of the Pipeline, since it is required that - // doDispose() will be called before destruction of this PipelineProxyStage. - _pipeline.get_deleter().dismissDisposal(); -} - -PlanStage::StageState PipelineProxyStage::doWork(WorkingSetID* out) { - invariant(out); - - if (!_stash.empty()) { - *out = _ws->allocate(); - WorkingSetMember* member = _ws->get(*out); - if (_includeMetaData && _stash.back().metadata()) { - member->metadata() = _stash.back().metadata(); - } - member->doc = {SnapshotId(), std::move(_stash.back())}; - _stash.pop_back(); - member->transitionToOwnedObj(); - return PlanStage::ADVANCED; - } - - if (auto next = getNext()) { - *out = _ws->allocate(); - WorkingSetMember* member = _ws->get(*out); - if (_includeMetaData && next->metadata()) { - member->metadata() = next->metadata(); - } - member->doc = {SnapshotId(), std::move(*next)}; - member->transitionToOwnedObj(); - return PlanStage::ADVANCED; - } - - return PlanStage::IS_EOF; -} - -bool PipelineProxyStage::isEOF() { - if (!_stash.empty()) - return false; - - if (auto next = getNext()) { - _stash.emplace_back(*next); - return false; - } - - return true; -} - -void PipelineProxyStage::doDetachFromOperationContext() { - _pipeline->detachFromOperationContext(); -} - -void PipelineProxyStage::doReattachToOperationContext() { - _pipeline->reattachToOperationContext(opCtx()); -} - -void PipelineProxyStage::doDispose() { - _pipeline->dispose(opCtx()); -} - -unique_ptr<PlanStageStats> PipelineProxyStage::getStats() { - unique_ptr<PlanStageStats> ret = - std::make_unique<PlanStageStats>(CommonStats(kStageType), STAGE_PIPELINE_PROXY); - ret->specific = std::make_unique<CollectionScanStats>(); - return ret; -} - -boost::optional<Document> PipelineProxyStage::getNext() { - return _pipeline->getNext(); -} - -std::string PipelineProxyStage::getPlanSummaryStr() const { - return PipelineD::getPlanSummaryStr(_pipeline.get()); -} - -void PipelineProxyStage::getPlanSummaryStats(PlanSummaryStats* statsOut) const { - invariant(statsOut); - PipelineD::getPlanSummaryStats(_pipeline.get(), statsOut); - statsOut->nReturned = getCommonStats()->advanced; -} - -vector<Value> PipelineProxyStage::writeExplainOps(ExplainOptions::Verbosity verbosity) const { - return _pipeline->writeExplainOps(verbosity); -} - -} // namespace mongo diff --git a/src/mongo/db/exec/pipeline_proxy.h b/src/mongo/db/exec/pipeline_proxy.h deleted file mode 100644 index 99f8583f3c0..00000000000 --- a/src/mongo/db/exec/pipeline_proxy.h +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <boost/intrusive_ptr.hpp> -#include <boost/optional/optional.hpp> - -#include "mongo/db/exec/plan_stage.h" -#include "mongo/db/exec/plan_stats.h" -#include "mongo/db/pipeline/pipeline.h" -#include "mongo/db/query/plan_summary_stats.h" -#include "mongo/db/record_id.h" -#include "mongo/util/assert_util.h" - -namespace mongo { - -/** - * Stage for pulling results out from an aggregation pipeline. - */ -class PipelineProxyStage : public PlanStage { -public: - PipelineProxyStage(ExpressionContext* expCtx, - std::unique_ptr<Pipeline, PipelineDeleter> pipeline, - WorkingSet* ws); - - virtual ~PipelineProxyStage() = default; - - PlanStage::StageState doWork(WorkingSetID* out) final; - - bool isEOF() final; - - // - // Manage our OperationContext. - // - void doDetachFromOperationContext() final; - void doReattachToOperationContext() final; - - // Returns empty PlanStageStats object - std::unique_ptr<PlanStageStats> getStats() override; - - // Not used. - SpecificStats* getSpecificStats() const final { - MONGO_UNREACHABLE; - } - - std::string getPlanSummaryStr() const; - void getPlanSummaryStats(PlanSummaryStats* statsOut) const; - - StageType stageType() const override { - return STAGE_PIPELINE_PROXY; - } - - /** - * Writes the pipelineProxyStage's operators to a std::vector<Value>, providing the level of - * detail specified by 'verbosity'. - */ - std::vector<Value> writeExplainOps(ExplainOptions::Verbosity verbosity) const; - - static const char* kStageType; - -protected: - PipelineProxyStage(ExpressionContext* expCtx, - std::unique_ptr<Pipeline, PipelineDeleter> pipeline, - WorkingSet* ws, - const char* stageTypeName); - - virtual boost::optional<Document> getNext(); - void doDispose() final; - - // Items in the _stash should be returned before pulling items from _pipeline. - std::unique_ptr<Pipeline, PipelineDeleter> _pipeline; - const bool _includeMetaData; - -private: - std::vector<Document> _stash; - WorkingSet* _ws; -}; - -} // namespace mongo diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index c193a6fd7f8..7fa9c49d362 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -671,21 +671,19 @@ static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx, { stdx::lock_guard<Client> lk(*opCtx->getClient()); - CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); + CurOp::get(opCtx)->setPlanSummary_inlock(exec->getPlanSummary()); } exec->executePlan(); PlanSummaryStats summary; - Explain::getSummaryStats(*exec, &summary); + exec->getSummaryStats(&summary); if (auto coll = collection->getCollection()) { CollectionQueryInfo::get(coll).notifyOfQuery(opCtx, coll, summary); } if (curOp.shouldDBProfile()) { - BSONObjBuilder execStatsBob; - Explain::getWinningPlanStats(exec.get(), &execStatsBob); - curOp.debug().execStats = execStatsBob.obj(); + curOp.debug().execStats = exec->getStats(); } const UpdateStats* updateStats = UpdateStage::getUpdateStats(exec.get()); @@ -911,7 +909,7 @@ static SingleWriteResult performSingleDeleteOp(OperationContext* opCtx, { stdx::lock_guard<Client> lk(*opCtx->getClient()); - CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); + CurOp::get(opCtx)->setPlanSummary_inlock(exec->getPlanSummary()); } exec->executePlan(); @@ -919,16 +917,14 @@ static SingleWriteResult performSingleDeleteOp(OperationContext* opCtx, curOp.debug().additiveMetrics.ndeleted = n; PlanSummaryStats summary; - Explain::getSummaryStats(*exec, &summary); + exec->getSummaryStats(&summary); if (auto coll = collection.getCollection()) { CollectionQueryInfo::get(coll).notifyOfQuery(opCtx, coll, summary); } curOp.debug().setPlanSummaryMetrics(summary); if (curOp.shouldDBProfile()) { - BSONObjBuilder execStatsBob; - Explain::getWinningPlanStats(exec.get(), &execStatsBob); - curOp.debug().execStats = execStatsBob.obj(); + curOp.debug().execStats = exec->getStats(); } LastError::get(opCtx->getClient()).recordDelete(n); diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index 9c8389594b9..7c530ef0663 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -195,7 +195,7 @@ void DocumentSourceCursor::_updateOplogTimestamp() { void DocumentSourceCursor::recordPlanSummaryStats() { invariant(_exec); - Explain::getSummaryStats(*_exec, &_planSummaryStats); + _exec->getSummaryStats(&_planSummaryStats); } Value DocumentSourceCursor::serialize(boost::optional<ExplainOptions::Verbosity> verbosity) const { @@ -300,7 +300,7 @@ DocumentSourceCursor::DocumentSourceCursor( // Later code in the DocumentSourceCursor lifecycle expects that '_exec' is in a saved state. _exec->saveState(); - _planSummary = Explain::getPlanSummary(_exec.get()); + _planSummary = _exec->getPlanSummary(); recordPlanSummaryStats(); if (pExpCtx->explain) { diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 8ef13836c62..93663a68abf 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -157,18 +157,14 @@ public: } /** - * Sets the OperationContext of 'pCtx' to nullptr. - * - * The PipelineProxyStage is responsible for detaching the OperationContext and releasing any - * storage-engine state of the DocumentSourceCursor that may be present in '_sources'. + * Sets the OperationContext of 'pCtx' to nullptr and calls 'detachFromOperationContext()' on + * all underlying DocumentSources. */ void detachFromOperationContext(); /** - * Sets the OperationContext of 'pCtx' to 'opCtx'. - * - * The PipelineProxyStage is responsible for reattaching the OperationContext and reacquiring - * any storage-engine state of the DocumentSourceCursor that may be present in '_sources'. + * Sets the OperationContext of 'pCtx' to 'opCtx', and reattaches all underlying DocumentSources + * to 'opCtx'. */ void reattachToOperationContext(OperationContext* opCtx); @@ -186,6 +182,10 @@ public: */ void dispose(OperationContext* opCtx); + bool isDisposed() const { + return _disposed; + } + /** * Checks to see if disk is ever used within the pipeline. */ diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp new file mode 100644 index 00000000000..7a9f0524446 --- /dev/null +++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp @@ -0,0 +1,206 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/plan_executor_pipeline.h" + +#include "mongo/db/pipeline/pipeline_d.h" +#include "mongo/db/pipeline/resume_token.h" +#include "mongo/db/repl/speculative_majority_read_info.h" + +namespace mongo { + +PlanExecutorPipeline::PlanExecutorPipeline(boost::intrusive_ptr<ExpressionContext> expCtx, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + bool isChangeStream) + : _expCtx(std::move(expCtx)), _pipeline(std::move(pipeline)), _isChangeStream(isChangeStream) { + // Pipeline plan executors must always have an ExpressionContext. + invariant(_expCtx); + + // The caller is responsible for disposing this plan executor before deleting it, which will in + // turn dispose the underlying pipeline. Therefore, there is no need to dispose the pipeline + // again when it is destroyed. + _pipeline.get_deleter().dismissDisposal(); + + if (_isChangeStream) { + // Set _postBatchResumeToken to the initial PBRT that was added to the expression context + // during pipeline construction, and use it to obtain the starting time for + // _latestOplogTimestamp. + invariant(!_expCtx->initialPostBatchResumeToken.isEmpty()); + _postBatchResumeToken = _expCtx->initialPostBatchResumeToken.getOwned(); + _latestOplogTimestamp = ResumeToken::parse(_postBatchResumeToken).getData().clusterTime; + } +} + +PlanExecutor::ExecState PlanExecutorPipeline::getNext(BSONObj* objOut, RecordId* recordIdOut) { + // The pipeline-based execution engine does not track the record ids associated with documents, + // so it is an error for the caller to ask for one. For the same reason, we expect the caller to + // provide a non-null BSONObj pointer for 'objOut'. + invariant(!recordIdOut); + invariant(objOut); + + if (!_stash.empty()) { + *objOut = std::move(_stash.front()); + _stash.pop(); + ++_nReturned; + return PlanExecutor::ADVANCED; + } + + Document docOut; + auto execState = getNextDocument(&docOut, nullptr); + if (execState == PlanExecutor::ADVANCED) { + // Include metadata if the output will be consumed by a merging node. + *objOut = _expCtx->needsMerge ? docOut.toBsonWithMetaData() : docOut.toBson(); + } + return execState; +} + +PlanExecutor::ExecState PlanExecutorPipeline::getNextDocument(Document* docOut, + RecordId* recordIdOut) { + // The pipeline-based execution engine does not track the record ids associated with documents, + // so it is an error for the caller to ask for one. For the same reason, we expect the caller to + // provide a non-null pointer for 'docOut'. + invariant(!recordIdOut); + invariant(docOut); + + // Callers which use 'enqueue()' are not allowed to use 'getNextDocument()', and must instead + // use 'getNext()'. + invariant(_stash.empty()); + + if (auto next = _getNext()) { + *docOut = std::move(*next); + ++_nReturned; + return PlanExecutor::ADVANCED; + } + + return PlanExecutor::IS_EOF; +} + +bool PlanExecutorPipeline::isEOF() { + if (!_stash.empty()) { + return false; + } + + return _pipelineIsEof; +} + +boost::optional<Document> PlanExecutorPipeline::_getNext() { + auto nextDoc = _pipeline->getNext(); + if (!nextDoc) { + _pipelineIsEof = true; + } + + if (_isChangeStream) { + _performChangeStreamsAccounting(nextDoc); + } + return nextDoc; +} + +void PlanExecutorPipeline::_performChangeStreamsAccounting(const boost::optional<Document> doc) { + invariant(_isChangeStream); + if (doc) { + // While we have more results to return, we track both the timestamp and the resume token of + // the latest event observed in the oplog, the latter via its sort key metadata field. + _validateResumeToken(*doc); + _latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get()); + _postBatchResumeToken = doc->metadata().getSortKey().getDocument().toBson(); + _setSpeculativeReadTimestamp(); + } else { + // We ran out of results to return. Check whether the oplog cursor has moved forward since + // the last recorded timestamp. Because we advance _latestOplogTimestamp for every event we + // return, if the new time is higher than the last then we are guaranteed not to have + // already returned any events at this timestamp. We can set _postBatchResumeToken to a new + // high-water-mark token at the current clusterTime. + auto highWaterMark = PipelineD::getLatestOplogTimestamp(_pipeline.get()); + if (highWaterMark > _latestOplogTimestamp) { + auto token = ResumeToken::makeHighWaterMarkToken(highWaterMark); + _postBatchResumeToken = token.toDocument().toBson(); + _latestOplogTimestamp = highWaterMark; + _setSpeculativeReadTimestamp(); + } + } +} + +void PlanExecutorPipeline::executePlan() { + while (_getNext()) { + // Run the pipeline to completion, but discard the results. + } +} + +std::string PlanExecutorPipeline::getPlanSummary() const { + return PipelineD::getPlanSummaryStr(_pipeline.get()); +} + +void PlanExecutorPipeline::getSummaryStats(PlanSummaryStats* statsOut) const { + invariant(statsOut); + PipelineD::getPlanSummaryStats(_pipeline.get(), statsOut); + statsOut->nReturned = _nReturned; +} + +void PlanExecutorPipeline::_validateResumeToken(const Document& event) const { + // If we are producing output to be merged on mongoS, then no stages can have modified the _id. + if (_expCtx->needsMerge) { + return; + } + + // Confirm that the document _id field matches the original resume token in the sort key field. + auto eventBSON = event.toBson(); + auto resumeToken = event.metadata().getSortKey(); + auto idField = eventBSON.getObjectField("_id"); + invariant(!resumeToken.missing()); + uassert(ErrorCodes::ChangeStreamFatalError, + str::stream() << "Encountered an event whose _id field, which contains the resume " + "token, was modified by the pipeline. Modifying the _id field of an " + "event makes it impossible to resume the stream from that point. Only " + "transformations that retain the unmodified _id field are allowed. " + "Expected: " + << BSON("_id" << resumeToken) << " but found: " + << (eventBSON["_id"] ? BSON("_id" << eventBSON["_id"]) : BSONObj()), + (resumeToken.getType() == BSONType::Object) && + idField.binaryEqual(resumeToken.getDocument().toBson())); +} + +void PlanExecutorPipeline::_setSpeculativeReadTimestamp() { + repl::SpeculativeMajorityReadInfo& speculativeMajorityReadInfo = + repl::SpeculativeMajorityReadInfo::get(_expCtx->opCtx); + if (speculativeMajorityReadInfo.isSpeculativeRead() && !_latestOplogTimestamp.isNull()) { + speculativeMajorityReadInfo.setSpeculativeReadTimestampForward(_latestOplogTimestamp); + } +} + +void PlanExecutorPipeline::markAsKilled(Status killStatus) { + invariant(!killStatus.isOK()); + // If killed multiple times, only retain the first status. + if (_killStatus.isOK()) { + _killStatus = killStatus; + } +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.h b/src/mongo/db/pipeline/plan_executor_pipeline.h new file mode 100644 index 00000000000..f6ad2a97341 --- /dev/null +++ b/src/mongo/db/pipeline/plan_executor_pipeline.h @@ -0,0 +1,186 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <queue> + +#include "mongo/db/exec/document_value/document.h" +#include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/query/plan_executor.h" + +namespace mongo { + +/** + * A plan executor which is used to execute a Pipeline of DocumentSources. + */ +class PlanExecutorPipeline final : public PlanExecutor { +public: + PlanExecutorPipeline(boost::intrusive_ptr<ExpressionContext> expCtx, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + bool isChangeStream); + + PlanStage* getRootStage() const override { + MONGO_UNREACHABLE; + } + + CanonicalQuery* getCanonicalQuery() const override { + return nullptr; + } + + const NamespaceString& nss() const override { + return _expCtx->ns; + } + + OperationContext* getOpCtx() const override { + return _expCtx->opCtx; + } + + // Pipeline execution does not support the saveState()/restoreState() interface. Instead, the + // underlying data access plan is saved/restored internally in between DocumentSourceCursor + // batches, or when the underlying PlanStage tree yields. + void saveState() override {} + void restoreState() override {} + + void detachFromOperationContext() override { + _pipeline->detachFromOperationContext(); + } + + void reattachToOperationContext(OperationContext* opCtx) override { + _pipeline->reattachToOperationContext(opCtx); + } + + ExecState getNext(BSONObj* objOut, RecordId* recordIdOut) override; + ExecState getNextDocument(Document* docOut, RecordId* recordIdOut) override; + + bool isEOF() override; + + void executePlan() override; + + void dispose(OperationContext* opCtx) override { + _pipeline->dispose(opCtx); + } + + void enqueue(const BSONObj& obj) override { + _stash.push(obj.getOwned()); + } + + void markAsKilled(Status killStatus) override; + + bool isMarkedAsKilled() const override { + return !_killStatus.isOK(); + } + + Status getKillStatus() override { + invariant(isMarkedAsKilled()); + return _killStatus; + } + + bool isDisposed() const override { + return _pipeline->isDisposed(); + } + + Timestamp getLatestOplogTimestamp() const override { + return _latestOplogTimestamp; + } + + BSONObj getPostBatchResumeToken() const override { + return _postBatchResumeToken; + } + + LockPolicy lockPolicy() const override { + return LockPolicy::kLocksInternally; + } + + std::string getPlanSummary() const override; + + void getSummaryStats(PlanSummaryStats* statsOut) const override; + + /** + * Writes the explain information about the underlying pipeline to a std::vector<Value>, + * providing the level of detail specified by 'verbosity'. + */ + std::vector<Value> writeExplainOps(ExplainOptions::Verbosity verbosity) const { + return _pipeline->writeExplainOps(verbosity); + } + + BSONObj getStats() const override { + // TODO SERVER-49808: Report execution stats for the pipeline. + return BSONObj{}; + } + +private: + /** + * Obtains the next document from the underlying Pipeline, and does change streams-related + * accounting if needed. + */ + boost::optional<Document> _getNext(); + + /** + * If this is a change stream, advance the cluster time and post batch resume token based on the + * latest document returned by the underlying pipeline. + */ + void _performChangeStreamsAccounting(const boost::optional<Document>); + + /** + * Verifies that the docs's resume token has not been modified. + */ + void _validateResumeToken(const Document& event) const; + + /** + * Set the speculative majority read timestamp if we have scanned up to a certain oplog + * timestamp. + */ + void _setSpeculativeReadTimestamp(); + + boost::intrusive_ptr<ExpressionContext> _expCtx; + + std::unique_ptr<Pipeline, PipelineDeleter> _pipeline; + + const bool _isChangeStream; + + std::queue<BSONObj> _stash; + + // If _killStatus has a non-OK value, then we have been killed and the value represents the + // reason for the kill. + Status _killStatus = Status::OK(); + + size_t _nReturned = 0; + + // Set to true once we have received all results from the underlying '_pipeline', and the + // pipeline has indicated end-of-stream. + bool _pipelineIsEof = false; + + // If '_pipeline' is a change stream, these track the latest timestamp seen while scanning the + // oplog, as well as the most recent PBRT. + Timestamp _latestOplogTimestamp; + BSONObj _postBatchResumeToken; +}; + +} // namespace mongo diff --git a/src/mongo/db/query/classic_stage_builder.cpp b/src/mongo/db/query/classic_stage_builder.cpp index a8ef54c2abe..bf904550f9c 100644 --- a/src/mongo/db/query/classic_stage_builder.cpp +++ b/src/mongo/db/query/classic_stage_builder.cpp @@ -350,7 +350,6 @@ std::unique_ptr<PlanStage> ClassicStageBuilder::build(const QuerySolutionNode* r expCtx, esn->pattern, _ws, std::move(childStage)); } case STAGE_CACHED_PLAN: - case STAGE_CHANGE_STREAM_PROXY: case STAGE_COUNT: case STAGE_DELETE: case STAGE_EOF: @@ -358,7 +357,6 @@ std::unique_ptr<PlanStage> ClassicStageBuilder::build(const QuerySolutionNode* r case STAGE_MOCK: case STAGE_MULTI_ITERATOR: case STAGE_MULTI_PLAN: - case STAGE_PIPELINE_PROXY: case STAGE_QUEUED_DATA: case STAGE_RECORD_STORE_FAST_COUNT: case STAGE_SUBPLAN: diff --git a/src/mongo/db/query/explain.cpp b/src/mongo/db/query/explain.cpp index 4f6ca08cccf..d80405b8aaf 100644 --- a/src/mongo/db/query/explain.cpp +++ b/src/mongo/db/query/explain.cpp @@ -40,16 +40,17 @@ #include "mongo/db/exec/index_scan.h" #include "mongo/db/exec/multi_plan.h" #include "mongo/db/exec/near.h" -#include "mongo/db/exec/pipeline_proxy.h" #include "mongo/db/exec/sort.h" #include "mongo/db/exec/text.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/keypattern.h" +#include "mongo/db/pipeline/plan_executor_pipeline.h" #include "mongo/db/query/canonical_query_encoder.h" #include "mongo/db/query/collection_query_info.h" #include "mongo/db/query/explain_common.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_executor.h" +#include "mongo/db/query/plan_executor_impl.h" #include "mongo/db/query/plan_executor_sbe.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/query_planner.h" @@ -81,120 +82,6 @@ void flattenStatsTree(const PlanStageStats* root, vector<const PlanStageStats*>* } /** - * Traverse the tree rooted at 'root', and add all nodes into the list 'flattened'. If a - * MultiPlanStage is encountered, only add the best plan and its children to 'flattened'. - */ -void flattenExecTree(const PlanStage* root, vector<const PlanStage*>* flattened) { - flattened->push_back(root); - - if (root->stageType() == STAGE_MULTI_PLAN) { - // Only add the winning plan from a MultiPlanStage. - auto mps = static_cast<const MultiPlanStage*>(root); - const PlanStage* winningStage = mps->getChildren()[mps->bestPlanIdx()].get(); - return flattenExecTree(winningStage, flattened); - } - - const auto& children = root->getChildren(); - for (size_t i = 0; i < children.size(); ++i) { - flattenExecTree(children[i].get(), flattened); - } -} - -/** - * Traverse the stage tree, depth first and return the first stage of a given type. - */ -PlanStage* findStageOfType(PlanStage* root, StageType desiredStageType) { - if (root->stageType() == desiredStageType) { - return root; - } - - for (const auto& child : root->getChildren()) { - PlanStage* p = findStageOfType(child.get(), desiredStageType); - if (p) { - return p; - } - } - return nullptr; -} - -/** - * Gets a pointer to the MultiPlanStage inside the stage tree rooted at 'root'. Returns nullptr if - * there is no MPS. - */ -MultiPlanStage* getMultiPlanStage(PlanStage* root) { - PlanStage* ps = findStageOfType(root, STAGE_MULTI_PLAN); - invariant(ps == nullptr || ps->stageType() == STAGE_MULTI_PLAN); - return static_cast<MultiPlanStage*>(ps); -} - -/** - * Gets a pointer to the PipelineProxyStage if it is the root of the tree. Returns nullptr if - * there is no PPS that is root. - */ -PipelineProxyStage* getPipelineProxyStage(PlanStage* root) { - if (root->stageType() == STAGE_PIPELINE_PROXY || - root->stageType() == STAGE_CHANGE_STREAM_PROXY) { - return static_cast<PipelineProxyStage*>(root); - } - - return nullptr; -} - -/** - * Given the SpecificStats object for a stage and the type of the stage, returns the - * number of index keys examined by the stage. - * - * This is used for getting the total number of keys examined by a plan. We need - * to collect a 'totalKeysExamined' metric for a regular explain (in which case this - * gets called from Explain::generateSinglePlanExecutionInfo()) or for the slow query log / profiler - * (in which case this gets called from Explain::getSummaryStats()). - */ -size_t getKeysExamined(StageType type, const SpecificStats* specific) { - if (STAGE_IXSCAN == type) { - const IndexScanStats* spec = static_cast<const IndexScanStats*>(specific); - return spec->keysExamined; - } else if (STAGE_IDHACK == type) { - const IDHackStats* spec = static_cast<const IDHackStats*>(specific); - return spec->keysExamined; - } else if (STAGE_COUNT_SCAN == type) { - const CountScanStats* spec = static_cast<const CountScanStats*>(specific); - return spec->keysExamined; - } else if (STAGE_DISTINCT_SCAN == type) { - const DistinctScanStats* spec = static_cast<const DistinctScanStats*>(specific); - return spec->keysExamined; - } - - return 0; -} - -/** - * Given the SpecificStats object for a stage and the type of the stage, returns the - * number of documents examined by the stage. - * - * This is used for getting the total number of documents examined by a plan. We need - * to collect a 'totalDocsExamined' metric for a regular explain (in which case this - * gets called from Explain::generateSinglePlanExecutionInfo()) or for the slow query log / profiler - * (in which case this gets called from Explain::getSummaryStats()). - */ -size_t getDocsExamined(StageType type, const SpecificStats* specific) { - if (STAGE_COLLSCAN == type) { - const CollectionScanStats* spec = static_cast<const CollectionScanStats*>(specific); - return spec->docsTested; - } else if (STAGE_FETCH == type) { - const FetchStats* spec = static_cast<const FetchStats*>(specific); - return spec->docsExamined; - } else if (STAGE_IDHACK == type) { - const IDHackStats* spec = static_cast<const IDHackStats*>(specific); - return spec->docsExamined; - } else if (STAGE_TEXT_OR == type) { - const TextOrStats* spec = static_cast<const TextOrStats*>(specific); - return spec->fetches; - } - - return 0; -} - -/** * Adds to the plan summary string being built by 'sb' for the execution stage 'stage'. */ void addStageSummaryStr(const PlanStage* stage, StringBuilder& sb) { @@ -269,10 +156,10 @@ void appendMultikeyPaths(const BSONObj& keyPattern, * Gather the PlanStageStats for all of the losing plans. If exec doesn't have a MultiPlanStage * (or any losing plans), will return an empty vector. */ -std::vector<std::unique_ptr<PlanStageStats>> getRejectedPlansTrialStats(PlanExecutor* exec) { +std::vector<std::unique_ptr<PlanStageStats>> getRejectedPlansTrialStats(PlanExecutorImpl* exec) { // Inspect the tree to see if there is a MultiPlanStage. Plan selection has already happened at // this point, since we have a PlanExecutor. - const auto mps = getMultiPlanStage(exec->getRootStage()); + const auto mps = exec->getMultiPlanStage(); std::vector<std::unique_ptr<PlanStageStats>> res; // Get the stats from the trial period for all the plans. @@ -291,8 +178,8 @@ std::vector<std::unique_ptr<PlanStageStats>> getRejectedPlansTrialStats(PlanExec /** * Get PlanExecutor's winning plan stats tree. */ -unique_ptr<PlanStageStats> getWinningPlanStatsTree(const PlanExecutor* exec) { - MultiPlanStage* mps = getMultiPlanStage(exec->getRootStage()); +unique_ptr<PlanStageStats> getWinningPlanStatsTree(const PlanExecutorImpl* exec) { + auto mps = exec->getMultiPlanStage(); return mps ? std::move(mps->getStats()->children[mps->bestPlanIdx()]) : std::move(exec->getRootStage()->getStats()); } @@ -303,7 +190,58 @@ namespace mongo { using str::stream; -// static +void Explain::flattenExecTree(const PlanStage* root, vector<const PlanStage*>* flattened) { + flattened->push_back(root); + + if (root->stageType() == STAGE_MULTI_PLAN) { + // Only add the winning plan from a MultiPlanStage. + auto mps = static_cast<const MultiPlanStage*>(root); + const PlanStage* winningStage = mps->getChildren()[mps->bestPlanIdx()].get(); + return flattenExecTree(winningStage, flattened); + } + + const auto& children = root->getChildren(); + for (size_t i = 0; i < children.size(); ++i) { + flattenExecTree(children[i].get(), flattened); + } +} + +size_t Explain::getKeysExamined(StageType type, const SpecificStats* specific) { + if (STAGE_IXSCAN == type) { + const IndexScanStats* spec = static_cast<const IndexScanStats*>(specific); + return spec->keysExamined; + } else if (STAGE_IDHACK == type) { + const IDHackStats* spec = static_cast<const IDHackStats*>(specific); + return spec->keysExamined; + } else if (STAGE_COUNT_SCAN == type) { + const CountScanStats* spec = static_cast<const CountScanStats*>(specific); + return spec->keysExamined; + } else if (STAGE_DISTINCT_SCAN == type) { + const DistinctScanStats* spec = static_cast<const DistinctScanStats*>(specific); + return spec->keysExamined; + } + + return 0; +} + +size_t Explain::getDocsExamined(StageType type, const SpecificStats* specific) { + if (STAGE_COLLSCAN == type) { + const CollectionScanStats* spec = static_cast<const CollectionScanStats*>(specific); + return spec->docsTested; + } else if (STAGE_FETCH == type) { + const FetchStats* spec = static_cast<const FetchStats*>(specific); + return spec->docsExamined; + } else if (STAGE_IDHACK == type) { + const IDHackStats* spec = static_cast<const IDHackStats*>(specific); + return spec->docsExamined; + } else if (STAGE_TEXT_OR == type) { + const TextOrStats* spec = static_cast<const TextOrStats*>(specific); + return spec->fetches; + } + + return 0; +} + void Explain::statsToBSON(const PlanStageStats& stats, ExplainOptions::Verbosity verbosity, BSONObjBuilder* bob, @@ -621,7 +559,6 @@ void Explain::statsToBSON(const PlanStageStats& stats, childrenBob.doneFast(); } -// static BSONObj Explain::statsToBSON(const PlanStageStats& stats, ExplainOptions::Verbosity verbosity) { BSONObjBuilder bob; statsToBSON(stats, &bob, verbosity); @@ -634,33 +571,21 @@ BSONObj Explain::statsToBSON(const sbe::PlanStageStats& stats, return bob.obj(); } -// static void Explain::statsToBSON(const PlanStageStats& stats, BSONObjBuilder* bob, ExplainOptions::Verbosity verbosity) { statsToBSON(stats, verbosity, bob, bob); } -// static -BSONObj Explain::getWinningPlanStats(const PlanExecutor* exec) { - BSONObjBuilder bob; - if (!dynamic_cast<const PlanExecutorSBE*>(exec)) { - getWinningPlanStats(exec, &bob); - } - return bob.obj(); -} - -// static -void Explain::getWinningPlanStats(const PlanExecutor* exec, BSONObjBuilder* bob) { - unique_ptr<PlanStageStats> winningStats = getWinningPlanStatsTree(exec); - statsToBSON(*winningStats, ExplainOptions::Verbosity::kExecStats, bob, bob); -} - -// static void Explain::generatePlannerInfo(PlanExecutor* exec, const Collection* collection, BSONObj extraInfo, BSONObjBuilder* out) { + auto planExecImpl = dynamic_cast<PlanExecutorImpl*>(exec); + uassert(4847801, + "queryPlanner explain section is only supported for classic PlanStages", + planExecImpl); + CanonicalQuery* query = exec->getCanonicalQuery(); BSONObjBuilder plannerBob(out->subobjStart("queryPlanner")); @@ -716,12 +641,13 @@ void Explain::generatePlannerInfo(PlanExecutor* exec, } BSONObjBuilder winningPlanBob(plannerBob.subobjStart("winningPlan")); - const auto winnerStats = getWinningPlanStatsTree(exec); + const auto winnerStats = getWinningPlanStatsTree(planExecImpl); statsToBSON(*winnerStats.get(), &winningPlanBob, ExplainOptions::Verbosity::kQueryPlanner); winningPlanBob.doneFast(); // Genenerate array of rejected plans. - const vector<unique_ptr<PlanStageStats>> rejectedStats = getRejectedPlansTrialStats(exec); + const vector<unique_ptr<PlanStageStats>> rejectedStats = + getRejectedPlansTrialStats(planExecImpl); BSONArrayBuilder allPlansBob(plannerBob.subarrayStart("rejectedPlans")); for (size_t i = 0; i < rejectedStats.size(); i++) { BSONObjBuilder childBob(allPlansBob.subobjStart()); @@ -774,9 +700,14 @@ void Explain::generateSinglePlanExecutionInfo(const PlanStageStats* stats, } std::unique_ptr<PlanStageStats> Explain::getWinningPlanTrialStats(PlanExecutor* exec) { + auto planExecImpl = dynamic_cast<PlanExecutorImpl*>(exec); + uassert(4847802, + "getWinningPlanTrialStats() is only supported for classic PlanStages", + planExecImpl); + // Inspect the tree to see if there is a MultiPlanStage. Plan selection has already happened at // this point, since we have a PlanExecutor. - const auto mps = getMultiPlanStage(exec->getRootStage()); + const auto mps = planExecImpl->getMultiPlanStage(); if (mps) { const auto mpsStats = mps->getStats(); @@ -786,15 +717,19 @@ std::unique_ptr<PlanStageStats> Explain::getWinningPlanTrialStats(PlanExecutor* return nullptr; } -// static void Explain::generateExecutionInfo(PlanExecutor* exec, ExplainOptions::Verbosity verbosity, Status executePlanStatus, PlanStageStats* winningPlanTrialStats, BSONObjBuilder* out) { + auto planExecImpl = dynamic_cast<PlanExecutorImpl*>(exec); + uassert(4847800, + "executionStats explain section is only supported for classic PlanStages", + planExecImpl); + invariant(verbosity >= ExplainOptions::Verbosity::kExecStats); if (verbosity >= ExplainOptions::Verbosity::kExecAllPlans && - findStageOfType(exec->getRootStage(), STAGE_MULTI_PLAN) != nullptr) { + planExecImpl->getMultiPlanStage()) { invariant(winningPlanTrialStats, "winningPlanTrialStats must be non-null when requesting all execution stats"); } @@ -811,7 +746,7 @@ void Explain::generateExecutionInfo(PlanExecutor* exec, // Generate exec stats BSON for the winning plan. OperationContext* opCtx = exec->getOpCtx(); long long totalTimeMillis = durationCount<Milliseconds>(CurOp::get(opCtx)->elapsedTimeTotal()); - const auto winningExecStats = getWinningPlanStatsTree(exec); + const auto winningExecStats = getWinningPlanStatsTree(planExecImpl); generateSinglePlanExecutionInfo(winningExecStats.get(), verbosity, totalTimeMillis, &execBob); // Also generate exec stats for all plans, if the verbosity level is high enough. @@ -831,7 +766,8 @@ void Explain::generateExecutionInfo(PlanExecutor* exec, planBob.doneFast(); } - const vector<unique_ptr<PlanStageStats>> rejectedStats = getRejectedPlansTrialStats(exec); + const vector<unique_ptr<PlanStageStats>> rejectedStats = + getRejectedPlansTrialStats(planExecImpl); for (size_t i = 0; i < rejectedStats.size(); ++i) { BSONObjBuilder planBob(allPlansBob.subobjStart()); generateSinglePlanExecutionInfo( @@ -865,15 +801,12 @@ void Explain::explainStages(PlanExecutor* exec, } } -// static -void Explain::explainPipelineExecutor(PlanExecutor* exec, +void Explain::explainPipelineExecutor(PlanExecutorPipeline* exec, ExplainOptions::Verbosity verbosity, BSONObjBuilder* out) { + invariant(exec); invariant(out); - PipelineProxyStage* pps = getPipelineProxyStage(exec->getRootStage()); - invariant(pps, "Expected exec's root stage to be a PipelineProxyStage"); - // If we need execution stats, this runs the plan in order to gather the stats. if (verbosity >= ExplainOptions::Verbosity::kExecStats) { // TODO SERVER-32732: An execution error should be reported in explain, but should not @@ -881,20 +814,19 @@ void Explain::explainPipelineExecutor(PlanExecutor* exec, exec->executePlan(); } - *out << "stages" << Value(pps->writeExplainOps(verbosity)); + *out << "stages" << Value(exec->writeExplainOps(verbosity)); explain_common::generateServerInfo(out); } -// static void Explain::explainStages(PlanExecutor* exec, const Collection* collection, ExplainOptions::Verbosity verbosity, BSONObj extraInfo, BSONObjBuilder* out) { uassert(4822877, - "Explain facility is not supported for SBE plans", - !dynamic_cast<PlanExecutorSBE*>(exec)); + "explainStages() is only supported for PlanStage trees", + dynamic_cast<PlanExecutorImpl*>(exec)); auto winningPlanTrialStats = Explain::getWinningPlanTrialStats(exec); @@ -927,24 +859,7 @@ void Explain::explainStages(PlanExecutor* exec, explain_common::generateServerInfo(out); } -// static -std::string Explain::getPlanSummary(const PlanExecutor* exec) { - // TODO: Handle planSummary when SBE is enabled. - if (dynamic_cast<const PlanExecutorSBE*>(exec)) { - return "unsupported"; - } - - return getPlanSummary(exec->getRootStage()); -} - -// static std::string Explain::getPlanSummary(const PlanStage* root) { - if (root->stageType() == STAGE_PIPELINE_PROXY || - root->stageType() == STAGE_CHANGE_STREAM_PROXY) { - auto pipelineProxy = static_cast<const PipelineProxyStage*>(root); - return pipelineProxy->getPlanSummaryStr(); - } - std::vector<const PlanStage*> stages; flattenExecTree(root, &stages); @@ -968,105 +883,11 @@ std::string Explain::getPlanSummary(const PlanStage* root) { return sb.str(); } -// static std::string Explain::getPlanSummary(const sbe::PlanStage* root) { - // TODO: Handle 'planSummary' when SBE is enabled. + // TODO: Support 'planSummary' for SBE plan stage trees. return "unsupported"; } -// static -void Explain::getSummaryStats(const PlanExecutor& exec, PlanSummaryStats* statsOut) { - invariant(nullptr != statsOut); - - // TODO: Handle 'getSummaryStats' when SBE is enabled. - if (dynamic_cast<const PlanExecutorSBE*>(&exec)) { - return; - } - - PlanStage* root = exec.getRootStage(); - if (root->stageType() == STAGE_PIPELINE_PROXY || - root->stageType() == STAGE_CHANGE_STREAM_PROXY) { - auto pipelineProxy = static_cast<PipelineProxyStage*>(root); - pipelineProxy->getPlanSummaryStats(statsOut); - return; - } - - // We can get some of the fields we need from the common stats stored in the - // root stage of the plan tree. - const CommonStats* common = root->getCommonStats(); - statsOut->nReturned = common->advanced; - - // The other fields are aggregations over the stages in the plan tree. We flatten - // the tree into a list and then compute these aggregations. - std::vector<const PlanStage*> stages; - flattenExecTree(root, &stages); - - statsOut->totalKeysExamined = 0; - statsOut->totalDocsExamined = 0; - - for (size_t i = 0; i < stages.size(); i++) { - statsOut->totalKeysExamined += - getKeysExamined(stages[i]->stageType(), stages[i]->getSpecificStats()); - statsOut->totalDocsExamined += - getDocsExamined(stages[i]->stageType(), stages[i]->getSpecificStats()); - - if (isSortStageType(stages[i]->stageType())) { - statsOut->hasSortStage = true; - - auto sortStage = static_cast<const SortStage*>(stages[i]); - auto sortStats = static_cast<const SortStats*>(sortStage->getSpecificStats()); - statsOut->usedDisk = sortStats->wasDiskUsed; - } - - if (STAGE_IXSCAN == stages[i]->stageType()) { - const IndexScan* ixscan = static_cast<const IndexScan*>(stages[i]); - const IndexScanStats* ixscanStats = - static_cast<const IndexScanStats*>(ixscan->getSpecificStats()); - statsOut->indexesUsed.insert(ixscanStats->indexName); - } else if (STAGE_COUNT_SCAN == stages[i]->stageType()) { - const CountScan* countScan = static_cast<const CountScan*>(stages[i]); - const CountScanStats* countScanStats = - static_cast<const CountScanStats*>(countScan->getSpecificStats()); - statsOut->indexesUsed.insert(countScanStats->indexName); - } else if (STAGE_IDHACK == stages[i]->stageType()) { - const IDHackStage* idHackStage = static_cast<const IDHackStage*>(stages[i]); - const IDHackStats* idHackStats = - static_cast<const IDHackStats*>(idHackStage->getSpecificStats()); - statsOut->indexesUsed.insert(idHackStats->indexName); - } else if (STAGE_DISTINCT_SCAN == stages[i]->stageType()) { - const DistinctScan* distinctScan = static_cast<const DistinctScan*>(stages[i]); - const DistinctScanStats* distinctScanStats = - static_cast<const DistinctScanStats*>(distinctScan->getSpecificStats()); - statsOut->indexesUsed.insert(distinctScanStats->indexName); - } else if (STAGE_TEXT == stages[i]->stageType()) { - const TextStage* textStage = static_cast<const TextStage*>(stages[i]); - const TextStats* textStats = - static_cast<const TextStats*>(textStage->getSpecificStats()); - statsOut->indexesUsed.insert(textStats->indexName); - } else if (STAGE_GEO_NEAR_2D == stages[i]->stageType() || - STAGE_GEO_NEAR_2DSPHERE == stages[i]->stageType()) { - const NearStage* nearStage = static_cast<const NearStage*>(stages[i]); - const NearStats* nearStats = - static_cast<const NearStats*>(nearStage->getSpecificStats()); - statsOut->indexesUsed.insert(nearStats->indexName); - } else if (STAGE_CACHED_PLAN == stages[i]->stageType()) { - const CachedPlanStage* cachedPlan = static_cast<const CachedPlanStage*>(stages[i]); - const CachedPlanStats* cachedStats = - static_cast<const CachedPlanStats*>(cachedPlan->getSpecificStats()); - statsOut->replanReason = cachedStats->replanReason; - } else if (STAGE_MULTI_PLAN == stages[i]->stageType()) { - statsOut->fromMultiPlanner = true; - } else if (STAGE_COLLSCAN == stages[i]->stageType()) { - statsOut->collectionScans++; - const auto collScan = static_cast<const CollectionScan*>(stages[i]); - const auto collScanStats = - static_cast<const CollectionScanStats*>(collScan->getSpecificStats()); - if (!collScanStats->tailable) - statsOut->collectionScansNonTailable++; - } - } -} - void Explain::planCacheEntryToBSON(const PlanCacheEntry& entry, BSONObjBuilder* out) { BSONObjBuilder shapeBuilder(out->subobjStart("createdFromQuery")); shapeBuilder.append("query", entry.query); diff --git a/src/mongo/db/query/explain.h b/src/mongo/db/query/explain.h index f71c63a0691..3c692082a03 100644 --- a/src/mongo/db/query/explain.h +++ b/src/mongo/db/query/explain.h @@ -43,6 +43,7 @@ namespace mongo { class Collection; class OperationContext; +class PlanExecutorPipeline; struct PlanSummaryStats; /** @@ -89,7 +90,7 @@ public: * - 'winningPlanTrialStats' is the stats of the winning plan during the trial period. May be * nullptr. * - 'out' is the builder for the explain output. - **/ + */ static void explainStages(PlanExecutor* exec, const Collection* collection, ExplainOptions::Verbosity verbosity, @@ -99,28 +100,16 @@ public: BSONObjBuilder* out); /** - * Gets explain BSON for the document sources contained by 'exec'. Use this function if you - * have a PlanExecutor whose root is a PipelineProxyStage and want to turn it into a human - * readable explain format. + * Gets explain BSON for the document sources contained by 'exec'. Use this function if you have + * a PlanExecutor for a pipeline and want to turn it into a human readable explain format. * * The explain information is generated with the level of detail specified by 'verbosity'. - **/ - static void explainPipelineExecutor(PlanExecutor* exec, + */ + static void explainPipelineExecutor(PlanExecutorPipeline* exec, ExplainOptions::Verbosity verbosity, BSONObjBuilder* out); /** - * Converts the PlanExecutor's winning plan stats tree to BSON and returns to the caller. - */ - static BSONObj getWinningPlanStats(const PlanExecutor* exec); - - /** - * Converts the PlanExecutor's winning plan stats tree to BSON and returns the result through - * the out-parameter 'bob'. - */ - static void getWinningPlanStats(const PlanExecutor* exec, BSONObjBuilder* bob); - - /** * Converts the stats tree 'stats' into a corresponding BSON object containing * explain information. * @@ -149,26 +138,10 @@ public: /** * Returns a short plan summary std::string describing the leaves of the query plan. */ - static std::string getPlanSummary(const PlanExecutor* exec); static std::string getPlanSummary(const PlanStage* root); static std::string getPlanSummary(const sbe::PlanStage* root); /** - * Fills out 'statsOut' with summary stats using the execution tree contained - * in 'exec'. - * - * The summary stats are consumed by debug mechanisms such as the profiler and - * the slow query log. - * - * This is a lightweight alternative for explainStages(...) above which is useful - * when operations want to request debug information without doing all the work - * to generate a full explain. - * - * Does not take ownership of its arguments. - */ - static void getSummaryStats(const PlanExecutor& exec, PlanSummaryStats* statsOut); - - /** * If exec's root stage is a MultiPlanStage, returns the stats for the trial period of of the * winning plan. Otherwise, returns nullptr. * @@ -177,6 +150,32 @@ public: static std::unique_ptr<PlanStageStats> getWinningPlanTrialStats(PlanExecutor* exec); /** + * Serializes a PlanCacheEntry to the provided BSON object builder. The output format is + * intended to be human readable, and useful for debugging query performance problems related to + * the plan cache. + */ + static void planCacheEntryToBSON(const PlanCacheEntry& entry, BSONObjBuilder* out); + + /** + * Traverses the tree rooted at 'root', and adds all nodes into the list 'flattened'. If a + * MultiPlanStage is encountered, only adds the best plan and its children to 'flattened'. + */ + static void flattenExecTree(const PlanStage* root, std::vector<const PlanStage*>* flattened); + + /** + * Given the SpecificStats object for a stage and the type of the stage, returns the number of + * index keys examined by the stage. + */ + static size_t getKeysExamined(StageType type, const SpecificStats* specific); + + /** + * Given the SpecificStats object for a stage and the type of the stage, returns the number of + * documents examined by the stage. + */ + static size_t getDocsExamined(StageType type, const SpecificStats* specific); + +private: + /** * Generates the execution stats section for the stats tree 'stats', adding the resulting BSON * to 'out'. * @@ -193,14 +192,6 @@ public: BSONObjBuilder* out); /** - * Serializes a PlanCacheEntry to the provided BSON object builder. The output format is - * intended to be human readable, and useful for debugging query performance problems related to - * the plan cache. - */ - static void planCacheEntryToBSON(const PlanCacheEntry& entry, BSONObjBuilder* out); - -private: - /** * Adds the 'queryPlanner' explain section to the BSON object being built * by 'out'. * diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index c936ea37681..440c3c580d0 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -133,7 +133,7 @@ void endQueryOp(OperationContext* opCtx, // Fill out CurOp based on explain summary statistics. PlanSummaryStats summaryStats; - Explain::getSummaryStats(exec, &summaryStats); + exec.getSummaryStats(&summaryStats); curOp->debug().setPlanSummaryMetrics(summaryStats); if (collection) { @@ -141,9 +141,7 @@ void endQueryOp(OperationContext* opCtx, } if (curOp->shouldDBProfile()) { - BSONObjBuilder statsBob; - Explain::getWinningPlanStats(&exec, &statsBob); - curOp->debug().execStats = statsBob.obj(); + curOp->debug().execStats = exec.getStats(); } } @@ -185,7 +183,7 @@ void generateBatch(int ntoreturn, LOGV2_ERROR(20918, "getMore executor error, stats: {stats}", "getMore executor error", - "stats"_attr = redact(Explain::getWinningPlanStats(exec))); + "stats"_attr = redact(exec->getStats())); exception.addContext("Executor error during OP_GET_MORE"); throw; } @@ -414,7 +412,7 @@ Message getMore(OperationContext* opCtx, exec->reattachToOperationContext(opCtx); exec->restoreState(); - auto planSummary = Explain::getPlanSummary(exec); + auto planSummary = exec->getPlanSummary(); { stdx::lock_guard<Client> lk(*opCtx->getClient()); curOp.setPlanSummary_inlock(planSummary); @@ -450,7 +448,7 @@ Message getMore(OperationContext* opCtx, // these values we need to take a diff of the pre-execution and post-execution metrics, as they // accumulate over the course of a cursor's lifetime. PlanSummaryStats preExecutionStats; - Explain::getSummaryStats(*exec, &preExecutionStats); + exec->getSummaryStats(&preExecutionStats); if (MONGO_unlikely(waitWithPinnedCursorDuringGetMoreBatch.shouldFail())) { CurOpFailpointHelpers::waitWhileFailPointEnabled(&waitWithPinnedCursorDuringGetMoreBatch, opCtx, @@ -486,7 +484,7 @@ Message getMore(OperationContext* opCtx, } PlanSummaryStats postExecutionStats; - Explain::getSummaryStats(*exec, &postExecutionStats); + exec->getSummaryStats(&postExecutionStats); postExecutionStats.totalKeysExamined -= preExecutionStats.totalKeysExamined; postExecutionStats.totalDocsExamined -= preExecutionStats.totalDocsExamined; curOp.debug().setPlanSummaryMetrics(postExecutionStats); @@ -498,9 +496,7 @@ Message getMore(OperationContext* opCtx, // cost. if (cursorPin->getExecutor()->lockPolicy() != PlanExecutor::LockPolicy::kLocksInternally && curOp.shouldDBProfile()) { - BSONObjBuilder execStatsBob; - Explain::getWinningPlanStats(exec, &execStatsBob); - curOp.debug().execStats = execStatsBob.obj(); + curOp.debug().execStats = exec->getStats(); } // Our two possible ClientCursorPin cleanup paths are: @@ -692,7 +688,7 @@ bool runQuery(OperationContext* opCtx, // Get summary info about which plan the executor is using. { stdx::lock_guard<Client> lk(*opCtx->getClient()); - curOp.setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); + curOp.setPlanSummary_inlock(exec->getPlanSummary()); } try { @@ -726,7 +722,7 @@ bool runQuery(OperationContext* opCtx, "Plan executor error during find: {error}, stats: {stats}", "Plan executor error during find", "error"_attr = redact(exception.toStatus()), - "stats"_attr = redact(Explain::getWinningPlanStats(exec.get()))); + "stats"_attr = redact(exec->getStats())); exception.addContext("Executor error during find"); throw; diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index 87a1c5cbd1e..b271902e162 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -33,6 +33,7 @@ #include "mongo/db/exec/plan_stats.h" #include "mongo/db/operation_context.h" #include "mongo/db/query/canonical_query.h" +#include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/plan_yield_policy.h" namespace mongo { @@ -282,7 +283,8 @@ public: /** * Stash the BSONObj so that it gets returned from the PlanExecutor on a later call to - * getNext(). + * getNext(). Implementations should NOT support returning queued BSON objects using + * 'getNextDocument()'. Only 'getNext()' should return the queued BSON objects. * * Enqueued documents are returned in FIFO order. The queued results are exhausted before * generating further results from the underlying query plan. @@ -311,12 +313,30 @@ public: virtual LockPolicy lockPolicy() const = 0; /** - * Returns true if this PlanExecutor proxies to a Pipeline of DocumentSources. + * Returns a short string, suitable for the logs, which summarizes the execution plan. + */ + virtual std::string getPlanSummary() const = 0; + + /** + * Fills out 'statsOut' with summary stats collected during the execution of the PlanExecutor. + * This is a lightweight alternative which is useful when operations want to request a summary + * of the available debug information without generating complete explain output. + * + * The summary stats are consumed by debug mechanisms such as the profiler and the slow query + * log. + */ + virtual void getSummaryStats(PlanSummaryStats* statsOut) const = 0; + + /** + * Serializes any execution stats tracked by this executor to BSON, for debugging. The format of + * these stats are opaque to the caller, and different implementations may choose to provide + * different stats. * - * TODO SERVER-48478 : Create a new PlanExecutor implementation specifically for executing the - * Pipeline, and delete PipelineProxyStage. + * Implementations must be able to successfully generate and return stats even if the + * PlanExecutor has issued a query-fatal exception and the executor cannot be used for further + * query execution. */ - virtual bool isPipelineExecutor() const = 0; + virtual BSONObj getStats() const = 0; }; } // namespace mongo diff --git a/src/mongo/db/query/plan_executor_factory.cpp b/src/mongo/db/query/plan_executor_factory.cpp index 5c00c01dab6..fa16042eaf0 100644 --- a/src/mongo/db/query/plan_executor_factory.cpp +++ b/src/mongo/db/query/plan_executor_factory.cpp @@ -34,6 +34,7 @@ #include "mongo/db/query/plan_executor_factory.h" #include "mongo/db/exec/plan_stage.h" +#include "mongo/db/pipeline/plan_executor_pipeline.h" #include "mongo/db/query/plan_executor_impl.h" #include "mongo/db/query/plan_executor_sbe.h" #include "mongo/logv2/log.h" @@ -156,4 +157,13 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( return {{exec, PlanExecutor::Deleter{opCtx}}}; } +std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> make( + boost::intrusive_ptr<ExpressionContext> expCtx, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + bool isChangeStream) { + auto* opCtx = expCtx->opCtx; + auto exec = new PlanExecutorPipeline(std::move(expCtx), std::move(pipeline), isChangeStream); + return {exec, PlanExecutor::Deleter{opCtx}}; +} + } // namespace mongo::plan_executor_factory diff --git a/src/mongo/db/query/plan_executor_factory.h b/src/mongo/db/query/plan_executor_factory.h index 3d27507cfd2..8faad454285 100644 --- a/src/mongo/db/query/plan_executor_factory.h +++ b/src/mongo/db/query/plan_executor_factory.h @@ -33,6 +33,7 @@ #include "mongo/db/exec/sbe/stages/stages.h" #include "mongo/db/exec/working_set.h" +#include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/query/plan_yield_policy_sbe.h" #include "mongo/db/query/query_solution.h" @@ -120,4 +121,12 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( std::queue<std::pair<BSONObj, boost::optional<RecordId>>> stash, std::unique_ptr<PlanYieldPolicySBE> yieldPolicy); +/** + * Constructs a plan executor for executing the given 'pipeline'. + */ +std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> make( + boost::intrusive_ptr<ExpressionContext> expCtx, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + bool isChangeStream); + } // namespace mongo::plan_executor_factory diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp index 4e661a4f2ac..cc3923a7990 100644 --- a/src/mongo/db/query/plan_executor_impl.cpp +++ b/src/mongo/db/query/plan_executor_impl.cpp @@ -42,15 +42,21 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/exec/cached_plan.h" -#include "mongo/db/exec/change_stream_proxy.h" #include "mongo/db/exec/collection_scan.h" -#include "mongo/db/exec/multi_plan.h" +#include "mongo/db/exec/count_scan.h" +#include "mongo/db/exec/distinct_scan.h" +#include "mongo/db/exec/idhack.h" +#include "mongo/db/exec/index_scan.h" +#include "mongo/db/exec/near.h" #include "mongo/db/exec/plan_stage.h" #include "mongo/db/exec/plan_stats.h" +#include "mongo/db/exec/sort.h" #include "mongo/db/exec/subplan.h" +#include "mongo/db/exec/text.h" #include "mongo/db/exec/trial_stage.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/db/query/explain.h" #include "mongo/db/query/find_common.h" #include "mongo/db/query/mock_yield_policies.h" #include "mongo/db/query/plan_yield_policy_impl.h" @@ -149,14 +155,11 @@ PlanExecutorImpl::PlanExecutorImpl(OperationContext* opCtx, invariant(!_expCtx || _expCtx->opCtx == _opCtx); invariant(!_cq || !_expCtx || _cq->getExpCtx() == _expCtx); - // Both ChangeStreamProxy and CollectionScan stages can provide oplog tracking info, such as - // post batch resume token, or latest oplog timestamp. If either of these two stages is present - // in the execution tree, then cache it for fast retrieval of the oplog info, avoiding the need - // traverse the tree in runtime. - if (auto changeStreamProxy = getStageByType(_root.get(), STAGE_CHANGE_STREAM_PROXY)) { - _oplogTrackingStage = static_cast<ChangeStreamProxyStage*>(changeStreamProxy); - } else if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN)) { - _oplogTrackingStage = static_cast<CollectionScan*>(collectionScan); + // If this PlanExecutor is executing a COLLSCAN, keep a pointer directly to the COLLSCAN stage. + // This is used for change streams in order to keep the the latest oplog timestamp and post + // batch resume token up to date as the oplog scan progresses. + if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN)) { + _collScanStage = static_cast<CollectionScan*>(collectionScan); } // We may still need to initialize _nss from either collection or _cq. @@ -582,41 +585,15 @@ bool PlanExecutorImpl::isDisposed() const { } Timestamp PlanExecutorImpl::getLatestOplogTimestamp() const { - if (!_oplogTrackingStage) { - return {}; - } - - const auto stageType = _oplogTrackingStage->stageType(); - if (stageType == STAGE_COLLSCAN) { - return static_cast<const CollectionScan*>(_oplogTrackingStage)->getLatestOplogTimestamp(); - } else { - invariant(stageType == STAGE_CHANGE_STREAM_PROXY); - return static_cast<const ChangeStreamProxyStage*>(_oplogTrackingStage) - ->getLatestOplogTimestamp(); - } + return _collScanStage ? _collScanStage->getLatestOplogTimestamp() : Timestamp{}; } BSONObj PlanExecutorImpl::getPostBatchResumeToken() const { static const BSONObj kEmptyPBRT; - if (!_oplogTrackingStage) { - return kEmptyPBRT; - } - - const auto stageType = _oplogTrackingStage->stageType(); - if (stageType == STAGE_COLLSCAN) { - return static_cast<const CollectionScan*>(_oplogTrackingStage)->getPostBatchResumeToken(); - } else { - invariant(stageType == STAGE_CHANGE_STREAM_PROXY); - return static_cast<const ChangeStreamProxyStage*>(_oplogTrackingStage) - ->getPostBatchResumeToken(); - } + return _collScanStage ? _collScanStage->getPostBatchResumeToken() : kEmptyPBRT; } PlanExecutor::LockPolicy PlanExecutorImpl::lockPolicy() const { - if (isPipelineExecutor()) { - return LockPolicy::kLocksInternally; - } - // If this PlanExecutor is simply unspooling queued data, then there is no need to acquire // locks. if (_root->stageType() == StageType::STAGE_QUEUED_DATA) { @@ -626,8 +603,100 @@ PlanExecutor::LockPolicy PlanExecutorImpl::lockPolicy() const { return LockPolicy::kLockExternally; } -bool PlanExecutorImpl::isPipelineExecutor() const { - return _root->stageType() == StageType::STAGE_PIPELINE_PROXY || - _root->stageType() == StageType::STAGE_CHANGE_STREAM_PROXY; +std::string PlanExecutorImpl::getPlanSummary() const { + return Explain::getPlanSummary(_root.get()); +} + +void PlanExecutorImpl::getSummaryStats(PlanSummaryStats* statsOut) const { + invariant(statsOut); + + // We can get some of the fields we need from the common stats stored in the + // root stage of the plan tree. + const CommonStats* common = _root->getCommonStats(); + statsOut->nReturned = common->advanced; + + // The other fields are aggregations over the stages in the plan tree. We flatten + // the tree into a list and then compute these aggregations. + std::vector<const PlanStage*> stages; + Explain::flattenExecTree(_root.get(), &stages); + + statsOut->totalKeysExamined = 0; + statsOut->totalDocsExamined = 0; + + for (size_t i = 0; i < stages.size(); i++) { + statsOut->totalKeysExamined += + Explain::getKeysExamined(stages[i]->stageType(), stages[i]->getSpecificStats()); + statsOut->totalDocsExamined += + Explain::getDocsExamined(stages[i]->stageType(), stages[i]->getSpecificStats()); + + if (isSortStageType(stages[i]->stageType())) { + statsOut->hasSortStage = true; + + auto sortStage = static_cast<const SortStage*>(stages[i]); + auto sortStats = static_cast<const SortStats*>(sortStage->getSpecificStats()); + statsOut->usedDisk = sortStats->wasDiskUsed; + } + + if (STAGE_IXSCAN == stages[i]->stageType()) { + const IndexScan* ixscan = static_cast<const IndexScan*>(stages[i]); + const IndexScanStats* ixscanStats = + static_cast<const IndexScanStats*>(ixscan->getSpecificStats()); + statsOut->indexesUsed.insert(ixscanStats->indexName); + } else if (STAGE_COUNT_SCAN == stages[i]->stageType()) { + const CountScan* countScan = static_cast<const CountScan*>(stages[i]); + const CountScanStats* countScanStats = + static_cast<const CountScanStats*>(countScan->getSpecificStats()); + statsOut->indexesUsed.insert(countScanStats->indexName); + } else if (STAGE_IDHACK == stages[i]->stageType()) { + const IDHackStage* idHackStage = static_cast<const IDHackStage*>(stages[i]); + const IDHackStats* idHackStats = + static_cast<const IDHackStats*>(idHackStage->getSpecificStats()); + statsOut->indexesUsed.insert(idHackStats->indexName); + } else if (STAGE_DISTINCT_SCAN == stages[i]->stageType()) { + const DistinctScan* distinctScan = static_cast<const DistinctScan*>(stages[i]); + const DistinctScanStats* distinctScanStats = + static_cast<const DistinctScanStats*>(distinctScan->getSpecificStats()); + statsOut->indexesUsed.insert(distinctScanStats->indexName); + } else if (STAGE_TEXT == stages[i]->stageType()) { + const TextStage* textStage = static_cast<const TextStage*>(stages[i]); + const TextStats* textStats = + static_cast<const TextStats*>(textStage->getSpecificStats()); + statsOut->indexesUsed.insert(textStats->indexName); + } else if (STAGE_GEO_NEAR_2D == stages[i]->stageType() || + STAGE_GEO_NEAR_2DSPHERE == stages[i]->stageType()) { + const NearStage* nearStage = static_cast<const NearStage*>(stages[i]); + const NearStats* nearStats = + static_cast<const NearStats*>(nearStage->getSpecificStats()); + statsOut->indexesUsed.insert(nearStats->indexName); + } else if (STAGE_CACHED_PLAN == stages[i]->stageType()) { + const CachedPlanStage* cachedPlan = static_cast<const CachedPlanStage*>(stages[i]); + const CachedPlanStats* cachedStats = + static_cast<const CachedPlanStats*>(cachedPlan->getSpecificStats()); + statsOut->replanReason = cachedStats->replanReason; + } else if (STAGE_MULTI_PLAN == stages[i]->stageType()) { + statsOut->fromMultiPlanner = true; + } else if (STAGE_COLLSCAN == stages[i]->stageType()) { + statsOut->collectionScans++; + const auto collScan = static_cast<const CollectionScan*>(stages[i]); + const auto collScanStats = + static_cast<const CollectionScanStats*>(collScan->getSpecificStats()); + if (!collScanStats->tailable) + statsOut->collectionScansNonTailable++; + } + } +} + +BSONObj PlanExecutorImpl::getStats() const { + // Serialize all stats from the winning plan. + auto mps = getMultiPlanStage(); + auto winningPlanStats = + mps ? std::move(mps->getStats()->children[mps->bestPlanIdx()]) : _root->getStats(); + return Explain::statsToBSON(*winningPlanStats); +} + +MultiPlanStage* PlanExecutorImpl::getMultiPlanStage() const { + PlanStage* ps = getStageByType(_root.get(), StageType::STAGE_MULTI_PLAN); + invariant(ps == nullptr || ps->stageType() == StageType::STAGE_MULTI_PLAN); + return static_cast<MultiPlanStage*>(ps); } } // namespace mongo diff --git a/src/mongo/db/query/plan_executor_impl.h b/src/mongo/db/query/plan_executor_impl.h index ba0bdd58a9b..ac586aef59b 100644 --- a/src/mongo/db/query/plan_executor_impl.h +++ b/src/mongo/db/query/plan_executor_impl.h @@ -32,6 +32,7 @@ #include <boost/optional.hpp> #include <queue> +#include "mongo/db/exec/multi_plan.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/query/query_solution.h" @@ -39,6 +40,7 @@ namespace mongo { class CappedInsertNotifier; +class CollectionScan; struct CappedInsertNotifierData; class PlanExecutorImpl : public PlanExecutor { @@ -83,7 +85,9 @@ public: Timestamp getLatestOplogTimestamp() const final; BSONObj getPostBatchResumeToken() const final; LockPolicy lockPolicy() const final; - bool isPipelineExecutor() const final; + std::string getPlanSummary() const final; + void getSummaryStats(PlanSummaryStats* statsOut) const final; + BSONObj getStats() const final; /** * Same as restoreState() but without the logic to retry if a WriteConflictException is thrown. @@ -92,6 +96,11 @@ public: */ void restoreStateWithoutRetrying(); + /** + * Return a pointer to this executor's MultiPlanStage, or nullptr if it does not have one. + */ + MultiPlanStage* getMultiPlanStage() const; + private: /** * Called on construction in order to ensure that when callers receive a new instance of a @@ -177,12 +186,11 @@ private: enum { kUsable, kSaved, kDetached, kDisposed } _currentState = kUsable; - // A pointer either to a ChangeStreamProxy or a CollectionScan stage, if present in the - // execution tree, or nullptr otherwise. We cache it to avoid the need to traverse the execution - // tree in runtime when the executor is requested to return the oplog tracking info. Since this - // info is provided by either of these stages, the executor will simply delegate the request to - // the cached stage. - const PlanStage* _oplogTrackingStage{nullptr}; + // A pointer either to a CollectionScan stage, if present in the execution tree, or nullptr + // otherwise. We cache it to avoid the need to traverse the execution tree in runtime when the + // executor is requested to return the oplog tracking info. Since this info is provided by + // either of these stages, the executor will simply delegate the request to the cached stage. + const CollectionScan* _collScanStage{nullptr}; }; } // namespace mongo diff --git a/src/mongo/db/query/plan_executor_sbe.h b/src/mongo/db/query/plan_executor_sbe.h index 776e9d1665c..5ff2f0fe05d 100644 --- a/src/mongo/db/query/plan_executor_sbe.h +++ b/src/mongo/db/query/plan_executor_sbe.h @@ -107,8 +107,17 @@ public: return LockPolicy::kLocksInternally; } - bool isPipelineExecutor() const override { - return false; + // TODO: Support 'planSummary' for SBE. + std::string getPlanSummary() const override { + return "unsupported"; + } + + // TODO: Support collection of plan summary stats for SBE. + void getSummaryStats(PlanSummaryStats* statsOut) const override {} + + // TODO: Support debug stats for SBE. + BSONObj getStats() const override { + return BSONObj{}; } private: diff --git a/src/mongo/db/query/plan_yield_policy.h b/src/mongo/db/query/plan_yield_policy.h index 3c8a85cbbd7..ccd454a3503 100644 --- a/src/mongo/db/query/plan_yield_policy.h +++ b/src/mongo/db/query/plan_yield_policy.h @@ -69,8 +69,9 @@ public: // Can be used in one of the following scenarios: // - The caller will hold a lock continuously for the lifetime of this PlanExecutor. // - This PlanExecutor doesn't logically belong to a Collection, and so does not need to be - // locked during execution. For example, a PlanExecutor containing a PipelineProxyStage - // which is being used to execute an aggregation pipeline. + // locked during execution. For example, this yield policy is used for PlanExecutors + // which unspool queued metadata ("virtual collection scans") for listCollections and + // listIndexes. NO_YIELD, // Will not yield locks or storage engine resources, but will check for interrupt. diff --git a/src/mongo/db/query/stage_types.h b/src/mongo/db/query/stage_types.h index c45c3e01dd8..d3d48467e03 100644 --- a/src/mongo/db/query/stage_types.h +++ b/src/mongo/db/query/stage_types.h @@ -83,10 +83,6 @@ enum StageType { STAGE_PROJECTION_COVERED, STAGE_PROJECTION_SIMPLE, - // Stages for running aggregation pipelines. - STAGE_CHANGE_STREAM_PROXY, - STAGE_PIPELINE_PROXY, - STAGE_QUEUED_DATA, STAGE_RECORD_STORE_FAST_COUNT, STAGE_RETURN_KEY, diff --git a/src/mongo/db/s/range_deletion_util.cpp b/src/mongo/db/s/range_deletion_util.cpp index d32df41ed5a..f6790d26327 100644 --- a/src/mongo/db/s/range_deletion_util.cpp +++ b/src/mongo/db/s/range_deletion_util.cpp @@ -219,7 +219,7 @@ StatusWith<int> deleteNextBatch(OperationContext* opCtx, "min"_attr = redact(min), "max"_attr = redact(max), "namespace"_attr = nss, - "stats"_attr = Explain::getWinningPlanStats(exec.get()), + "stats"_attr = redact(exec->getStats()), "error"_attr = redact(ex.toStatus())); throw; } diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp index 4ed8b61720b..075ededc8e9 100644 --- a/src/mongo/dbtests/query_plan_executor.cpp +++ b/src/mongo/dbtests/query_plan_executor.cpp @@ -42,7 +42,6 @@ #include "mongo/db/exec/collection_scan.h" #include "mongo/db/exec/fetch.h" #include "mongo/db/exec/index_scan.h" -#include "mongo/db/exec/pipeline_proxy.h" #include "mongo/db/exec/plan_stage.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/json.h" @@ -51,6 +50,7 @@ #include "mongo/db/pipeline/document_source_cursor.h" #include "mongo/db/pipeline/expression_context_for_test.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/pipeline/plan_executor_pipeline.h" #include "mongo/db/query/plan_executor_factory.h" #include "mongo/db/query/query_solution.h" #include "mongo/dbtests/dbtests.h" @@ -223,18 +223,8 @@ TEST_F(PlanExecutorTest, DropIndexScanAgg) { collection, std::move(innerExec), _expCtx, DocumentSourceCursor::CursorType::kRegular); auto pipeline = Pipeline::create({cursorSource}, _expCtx); - // Create the output PlanExecutor that pulls results from the pipeline. - auto ws = std::make_unique<WorkingSet>(); - auto proxy = std::make_unique<PipelineProxyStage>(_expCtx.get(), std::move(pipeline), ws.get()); - - auto statusWithPlanExecutor = - plan_executor_factory::make(_expCtx, - std::move(ws), - std::move(proxy), - collection, - PlanYieldPolicy::YieldPolicy::NO_YIELD); - ASSERT_OK(statusWithPlanExecutor.getStatus()); - auto outerExec = std::move(statusWithPlanExecutor.getValue()); + auto outerExec = + plan_executor_factory::make(_expCtx, std::move(pipeline), false /* isChangeStream */); dropCollection(); diff --git a/src/mongo/dbtests/query_stage_multiplan.cpp b/src/mongo/dbtests/query_stage_multiplan.cpp index aa9abb89be2..83337ca69ec 100644 --- a/src/mongo/dbtests/query_stage_multiplan.cpp +++ b/src/mongo/dbtests/query_stage_multiplan.cpp @@ -556,7 +556,7 @@ TEST_F(QueryStageMultiPlanTest, MPSSummaryStats) { exec->executePlan(); PlanSummaryStats stats; - Explain::getSummaryStats(*exec, &stats); + exec->getSummaryStats(&stats); // If only the winning plan's stats are recorded, we should not have examined more than the // total number of documents/index keys. |