diff options
36 files changed, 739 insertions, 814 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index a115b0da1dd..10e1afc525c 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1117,6 +1117,7 @@ env.Library( 'query/internal_plans.cpp', 'query/plan_executor_impl.cpp', 'query/plan_executor_sbe.cpp', + 'query/plan_executor_factory.cpp', 'query/plan_ranker.cpp', 'query/plan_yield_policy_impl.cpp', 'query/plan_yield_policy_sbe.cpp', diff --git a/src/mongo/db/catalog/capped_utils.cpp b/src/mongo/db/catalog/capped_utils.cpp index 9efabad72bc..bc5b78af2f1 100644 --- a/src/mongo/db/catalog/capped_utils.cpp +++ b/src/mongo/db/catalog/capped_utils.cpp @@ -173,16 +173,17 @@ void cloneCollectionAsCapped(OperationContext* opCtx, PlanYieldPolicy::YieldPolicy::WRITE_CONFLICT_RETRY_ONLY, InternalPlanner::FORWARD); - Snapshotted<BSONObj> objToClone; + BSONObj objToClone; RecordId loc; DisableDocumentValidation validationDisabler(opCtx); int retries = 0; // non-zero when retrying our last document. while (true) { + auto beforeGetNextSnapshotId = opCtx->recoveryUnit()->getSnapshotId(); PlanExecutor::ExecState state = PlanExecutor::IS_EOF; if (!retries) { - state = exec->getNextSnapshotted(&objToClone, &loc); + state = exec->getNext(&objToClone, &loc); } switch (state) { @@ -191,7 +192,7 @@ void cloneCollectionAsCapped(OperationContext* opCtx, case PlanExecutor::ADVANCED: { if (excessSize > 0) { // 4x is for padding, power of 2, etc... - excessSize -= (4 * objToClone.value().objsize()); + excessSize -= (4 * objToClone.objsize()); continue; } break; @@ -199,18 +200,25 @@ void cloneCollectionAsCapped(OperationContext* opCtx, } try { - // Make sure we are working with the latest version of the document. - if (objToClone.snapshotId() != opCtx->recoveryUnit()->getSnapshotId() && - !fromCollection->findDoc(opCtx, loc, &objToClone)) { - // doc was deleted so don't clone it. - retries = 0; - continue; + // If the snapshot id changed while using the 'PlanExecutor' to retrieve the next + // document from the collection scan, then it's possible that the document retrieved + // from the scan may have since been deleted or modified in our current snapshot. + if (beforeGetNextSnapshotId != opCtx->recoveryUnit()->getSnapshotId()) { + // The snapshot has changed. Fetch the document again from the collection in order + // to check whether it has been deleted. + Snapshotted<BSONObj> snapshottedObj; + if (!fromCollection->findDoc(opCtx, loc, &snapshottedObj)) { + // Doc was deleted so don't clone it. + retries = 0; + continue; + } + objToClone = std::move(snapshottedObj.value()); } WriteUnitOfWork wunit(opCtx); OpDebug* const nullOpDebug = nullptr; uassertStatusOK(toCollection->insertDocument( - opCtx, InsertStatement(objToClone.value()), nullOpDebug, true)); + opCtx, InsertStatement(objToClone), nullOpDebug, true)); wunit.commit(); // Go to the next document diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp index f7264afade1..125c5fa13b6 100644 --- a/src/mongo/db/clientcursor.cpp +++ b/src/mongo/db/clientcursor.cpp @@ -88,7 +88,6 @@ ClientCursor::ClientCursor(ClientCursorParams params, _originatingCommand(params.originatingCommandObj), _originatingPrivileges(std::move(params.originatingPrivileges)), _queryOptions(params.queryOptions), - _needsMerge(params.needsMerge), _exec(std::move(params.exec)), _operationUsingCursor(operationUsingCursor), _lastUseDate(now), diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h index a9665b878c2..1536921d8e1 100644 --- a/src/mongo/db/clientcursor.h +++ b/src/mongo/db/clientcursor.h @@ -61,8 +61,7 @@ struct ClientCursorParams { WriteConcernOptions writeConcernOptions, repl::ReadConcernArgs readConcernArgs, BSONObj originatingCommandObj, - PrivilegeVector originatingPrivileges, - bool needsMerge) + PrivilegeVector originatingPrivileges) : exec(std::move(planExecutor)), nss(std::move(nss)), writeConcernOptions(std::move(writeConcernOptions)), @@ -71,8 +70,7 @@ struct ClientCursorParams { ? exec->getCanonicalQuery()->getQueryRequest().getOptions() : 0), originatingCommandObj(originatingCommandObj.getOwned()), - originatingPrivileges(std::move(originatingPrivileges)), - needsMerge(needsMerge) { + originatingPrivileges(std::move(originatingPrivileges)) { while (authenticatedUsersIter.more()) { authenticatedUsers.emplace_back(authenticatedUsersIter.next()); } @@ -100,7 +98,6 @@ struct ClientCursorParams { int queryOptions = 0; BSONObj originatingCommandObj; PrivilegeVector originatingPrivileges; - const bool needsMerge; }; /** @@ -152,10 +149,6 @@ public: return _writeConcernOptions; } - bool needsMerge() const { - return _needsMerge; - } - /** * Returns a pointer to the underlying query plan executor. All cursors manage a PlanExecutor, * so this method never returns a null pointer. @@ -387,13 +380,6 @@ private: // See the QueryOptions enum in dbclientinterface.h. const int _queryOptions = 0; - // The value of a flag specified on the originating command which indicates whether the result - // of this cursor will be consumed by a merging node (mongos or a mongod selected to perform a - // merge). Note that this flag is only set for aggregate() commands, and not for find() - // commands. It is therefore possible that 'needsMerge' is false when in fact there will be a - // merge performed. - const bool _needsMerge; - // Unused maxTime budget for this cursor. Microseconds _leftoverMaxTimeMicros = Microseconds::max(); diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 2a65847d865..d333b91b8af 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -443,16 +443,15 @@ public: options.atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime(); } CursorResponseBuilder firstBatch(result, options); - Document doc; + BSONObj obj; PlanExecutor::ExecState state = PlanExecutor::ADVANCED; std::uint64_t numResults = 0; try { while (!FindCommon::enoughForFirstBatch(originalQR, numResults) && - PlanExecutor::ADVANCED == (state = exec->getNext(&doc, nullptr))) { + PlanExecutor::ADVANCED == (state = exec->getNext(&obj, nullptr))) { // If we can't fit this result inside the current batch, then we stash it for // later. - BSONObj obj = doc.toBson(); if (!FindCommon::haveSpaceForNext(obj, numResults, firstBatch.bytesUsed())) { exec->enqueue(obj); break; @@ -490,8 +489,7 @@ public: opCtx->getWriteConcern(), repl::ReadConcernArgs::get(opCtx), _request.body, - {Privilege(ResourcePattern::forExactNamespace(nss), ActionType::find)}, - expCtx->needsMerge}); + {Privilege(ResourcePattern::forExactNamespace(nss), ActionType::find)}}); cursorId = pinnedCursor.getCursor()->cursorid(); invariant(!exec); diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index ccbca7e7b78..d76c295f259 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -302,15 +302,11 @@ public: // If an awaitData getMore is killed during this process due to our max time expiring at // an interrupt point, we just continue as normal and return rather than reporting a // timeout to the user. - Document doc; + BSONObj obj; PlanExecutor::ExecState state; try { while (!FindCommon::enoughForGetMore(request.batchSize.value_or(0), *numResults) && - PlanExecutor::ADVANCED == (state = exec->getNext(&doc, nullptr))) { - // Note that "needsMerge" implies a find or aggregate operation, which should - // always have a non-NULL 'expCtx' value. - BSONObj obj = cursor->needsMerge() ? doc.toBsonWithMetaData() : doc.toBson(); - + PlanExecutor::ADVANCED == (state = exec->getNext(&obj, nullptr))) { // If adding this object will cause us to exceed the message size limit, then we // stash it for later. if (!FindCommon::haveSpaceForNext(obj, *numResults, nextBatch->bytesUsed())) { diff --git a/src/mongo/db/commands/list_collections.cpp b/src/mongo/db/commands/list_collections.cpp index dc590c7f972..ffe9956f205 100644 --- a/src/mongo/db/commands/list_collections.cpp +++ b/src/mongo/db/commands/list_collections.cpp @@ -55,6 +55,7 @@ #include "mongo/db/query/cursor_request.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/find_common.h" +#include "mongo/db/query/plan_executor_factory.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/durable_catalog.h" #include "mongo/db/storage/storage_engine.h" @@ -364,15 +365,16 @@ public: } } - exec = uassertStatusOK(PlanExecutor::make(expCtx, - std::move(ws), - std::move(root), - nullptr, - PlanYieldPolicy::YieldPolicy::NO_YIELD, - cursorNss)); + exec = + uassertStatusOK(plan_executor_factory::make(expCtx, + std::move(ws), + std::move(root), + nullptr, + PlanYieldPolicy::YieldPolicy::NO_YIELD, + cursorNss)); for (long long objCount = 0; objCount < batchSize; objCount++) { - Document nextDoc; + BSONObj nextDoc; PlanExecutor::ExecState state = exec->getNext(&nextDoc, nullptr); if (state == PlanExecutor::IS_EOF) { break; @@ -380,13 +382,12 @@ public: invariant(state == PlanExecutor::ADVANCED); // If we can't fit this result inside the current batch, then we stash it for later. - BSONObj next = nextDoc.toBson(); - if (!FindCommon::haveSpaceForNext(next, objCount, firstBatch.len())) { - exec->enqueue(next); + if (!FindCommon::haveSpaceForNext(nextDoc, objCount, firstBatch.len())) { + exec->enqueue(nextDoc); break; } - firstBatch.append(next); + firstBatch.append(nextDoc); } if (exec->isEOF()) { appendCursorResponseObject(0LL, cursorNss.ns(), firstBatch.arr(), &result); @@ -398,17 +399,14 @@ public: auto pinnedCursor = CursorManager::get(opCtx)->registerCursor( opCtx, - { - std::move(exec), - cursorNss, - AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), - opCtx->getWriteConcern(), - repl::ReadConcernArgs::get(opCtx), - jsobj, - uassertStatusOK(AuthorizationSession::get(opCtx->getClient()) - ->checkAuthorizedToListCollections(dbname, jsobj)), - false // needsMerge always 'false' for listCollections. - }); + {std::move(exec), + cursorNss, + AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), + opCtx->getWriteConcern(), + repl::ReadConcernArgs::get(opCtx), + jsobj, + uassertStatusOK(AuthorizationSession::get(opCtx->getClient()) + ->checkAuthorizedToListCollections(dbname, jsobj))}); appendCursorResponseObject( pinnedCursor.getCursor()->cursorid(), cursorNss.ns(), firstBatch.arr(), &result); diff --git a/src/mongo/db/commands/list_indexes.cpp b/src/mongo/db/commands/list_indexes.cpp index a8df5c10eab..219cd2926f6 100644 --- a/src/mongo/db/commands/list_indexes.cpp +++ b/src/mongo/db/commands/list_indexes.cpp @@ -47,6 +47,7 @@ #include "mongo/db/query/cursor_request.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/find_common.h" +#include "mongo/db/query/plan_executor_factory.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/durable_catalog.h" #include "mongo/db/storage/storage_engine.h" @@ -173,29 +174,29 @@ public: root->pushBack(id); } - exec = uassertStatusOK(PlanExecutor::make(expCtx, - std::move(ws), - std::move(root), - nullptr, - PlanYieldPolicy::YieldPolicy::NO_YIELD, - nss)); + exec = + uassertStatusOK(plan_executor_factory::make(expCtx, + std::move(ws), + std::move(root), + nullptr, + PlanYieldPolicy::YieldPolicy::NO_YIELD, + nss)); for (long long objCount = 0; objCount < batchSize; objCount++) { - Document nextDoc; + BSONObj nextDoc; PlanExecutor::ExecState state = exec->getNext(&nextDoc, nullptr); if (state == PlanExecutor::IS_EOF) { break; } invariant(state == PlanExecutor::ADVANCED); - BSONObj next = nextDoc.toBson(); // If we can't fit this result inside the current batch, then we stash it for later. - if (!FindCommon::haveSpaceForNext(next, objCount, firstBatch.len())) { - exec->enqueue(next); + if (!FindCommon::haveSpaceForNext(nextDoc, objCount, firstBatch.len())) { + exec->enqueue(nextDoc); break; } - firstBatch.append(next); + firstBatch.append(nextDoc); } if (exec->isEOF()) { @@ -210,16 +211,13 @@ public: const auto pinnedCursor = CursorManager::get(opCtx)->registerCursor( opCtx, - { - std::move(exec), - nss, - AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), - opCtx->getWriteConcern(), - repl::ReadConcernArgs::get(opCtx), - cmdObj, - {Privilege(ResourcePattern::forExactNamespace(nss), ActionType::listIndexes)}, - false // needsMerge always 'false' for listIndexes. - }); + {std::move(exec), + nss, + AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), + opCtx->getWriteConcern(), + repl::ReadConcernArgs::get(opCtx), + cmdObj, + {Privilege(ResourcePattern::forExactNamespace(nss), ActionType::listIndexes)}}); appendCursorResponseObject( pinnedCursor.getCursor()->cursorid(), nss.ns(), firstBatch.arr(), &result); diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 38e9b713853..daa7cae1f95 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -60,6 +60,7 @@ #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/find_common.h" #include "mongo/db/query/get_executor.h" +#include "mongo/db/query/plan_executor_factory.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/query_planner_common.h" #include "mongo/db/read_concern.h" @@ -169,7 +170,7 @@ bool handleCursorCommand(OperationContext* opCtx, // The initial getNext() on a PipelineProxyStage may be very expensive so we don't // do it when batchSize is 0 since that indicates a desire for a fast return. PlanExecutor::ExecState state; - Document nextDoc; + BSONObj nextDoc; try { state = exec->getNext(&nextDoc, nullptr); @@ -204,8 +205,7 @@ bool handleCursorCommand(OperationContext* opCtx, // If adding this object will cause us to exceed the message size limit, then we stash it // for later. - BSONObj next = expCtx->needsMerge ? nextDoc.toBsonWithMetaData() : nextDoc.toBson(); - if (!FindCommon::haveSpaceForNext(next, objCount, responseBuilder.bytesUsed())) { + if (!FindCommon::haveSpaceForNext(nextDoc, objCount, responseBuilder.bytesUsed())) { exec->enqueue(nextDoc); stashedResult = true; break; @@ -213,7 +213,7 @@ bool handleCursorCommand(OperationContext* opCtx, // If this executor produces a postBatchResumeToken, add it to the cursor response. responseBuilder.setPostBatchResumeToken(exec->getPostBatchResumeToken()); - responseBuilder.append(next); + responseBuilder.append(nextDoc); } if (cursor) { @@ -467,12 +467,12 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> createOuterPipelineProxyExe // invalidations. The Pipeline may contain PlanExecutors which *are* yielding // PlanExecutors and which *are* registered with their respective collection's // CursorManager - return uassertStatusOK(PlanExecutor::make(std::move(expCtx), - std::move(ws), - std::move(proxy), - nullptr, - PlanYieldPolicy::YieldPolicy::NO_YIELD, - nss)); + return uassertStatusOK(plan_executor_factory::make(std::move(expCtx), + std::move(ws), + std::move(proxy), + nullptr, + PlanYieldPolicy::YieldPolicy::NO_YIELD, + nss)); } } // namespace @@ -713,8 +713,7 @@ Status runAggregate(OperationContext* opCtx, opCtx->getWriteConcern(), repl::ReadConcernArgs::get(opCtx), cmdObj, - privileges, - expCtx->needsMerge); + privileges); if (expCtx->tailableMode == TailableModeEnum::kTailable) { cursorParams.setTailable(true); } else if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) { @@ -741,7 +740,6 @@ Status runAggregate(OperationContext* opCtx, } else { invariant(pins[0]->getExecutor()->lockPolicy() == PlanExecutor::LockPolicy::kLockExternally); - invariant(!explainExecutor->isDetached()); invariant(explainExecutor->getOpCtx() == opCtx); // The explainStages() function for a non-pipeline executor expects to be called with // the appropriate collection lock already held. Make sure it has not been released yet. diff --git a/src/mongo/db/exec/sbe_cmd.cpp b/src/mongo/db/exec/sbe_cmd.cpp index d29988e7b1b..d88039b3d22 100644 --- a/src/mongo/db/exec/sbe_cmd.cpp +++ b/src/mongo/db/exec/sbe_cmd.cpp @@ -38,6 +38,7 @@ #include "mongo/db/query/cursor_request.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/find_common.h" +#include "mongo/db/query/plan_executor_factory.h" namespace mongo { /** @@ -77,7 +78,7 @@ public: NamespaceString nss{dbname}; - exec = uassertStatusOK(PlanExecutor::make( + exec = uassertStatusOK(plan_executor_factory::make( opCtx, nullptr, {std::move(root), stage_builder::PlanStageData{resultSlot, recordIdSlot}}, @@ -110,16 +111,13 @@ public: exec->detachFromOperationContext(); const auto pinnedCursor = CursorManager::get(opCtx)->registerCursor( opCtx, - { - std::move(exec), - nss, - AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), - opCtx->getWriteConcern(), - repl::ReadConcernArgs::get(opCtx), - cmdObj, - {}, - false // needsMerge always 'false' for sbe. - }); + {std::move(exec), + nss, + AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), + opCtx->getWriteConcern(), + repl::ReadConcernArgs::get(opCtx), + cmdObj, + {}}); appendCursorResponseObject( pinnedCursor.getCursor()->cursorid(), nss.ns(), firstBatch.arr(), &result); diff --git a/src/mongo/db/exec/stagedebug_cmd.cpp b/src/mongo/db/exec/stagedebug_cmd.cpp index 4442f69a597..7966f34aa2d 100644 --- a/src/mongo/db/exec/stagedebug_cmd.cpp +++ b/src/mongo/db/exec/stagedebug_cmd.cpp @@ -62,7 +62,7 @@ #include "mongo/db/matcher/expression_text_base.h" #include "mongo/db/matcher/extensions_callback_real.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/query/plan_executor.h" +#include "mongo/db/query/plan_executor_factory.h" #include "mongo/logv2/log.h" namespace mongo { @@ -182,11 +182,12 @@ public: unique_ptr<PlanStage> rootFetch = std::make_unique<FetchStage>( expCtx.get(), ws.get(), std::move(userRoot), nullptr, collection); - auto statusWithPlanExecutor = PlanExecutor::make(expCtx, - std::move(ws), - std::move(rootFetch), - collection, - PlanYieldPolicy::YieldPolicy::YIELD_AUTO); + auto statusWithPlanExecutor = + plan_executor_factory::make(expCtx, + std::move(ws), + std::move(rootFetch), + collection, + PlanYieldPolicy::YieldPolicy::YIELD_AUTO); fassert(28536, statusWithPlanExecutor.getStatus()); auto exec = std::move(statusWithPlanExecutor.getValue()); diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index 4aa7ce75d11..9c8389594b9 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -147,7 +147,7 @@ void DocumentSourceCursor::loadBatch() { try { ON_BLOCK_EXIT([this] { recordPlanSummaryStats(); }); - while ((state = _exec->getNext(&resultObj, nullptr)) == PlanExecutor::ADVANCED) { + while ((state = _exec->getNextDocument(&resultObj, nullptr)) == PlanExecutor::ADVANCED) { _currentBatch.enqueue(transformDoc(std::move(resultObj))); // As long as we're waiting for inserts, we shouldn't do any batching at this level we @@ -244,7 +244,8 @@ Value DocumentSourceCursor::serialize(boost::optional<ExplainOptions::Verbosity> } void DocumentSourceCursor::detachFromOperationContext() { - if (_exec && !_exec->isDetached()) { + // Only detach the underlying executor if it hasn't been detached already. + if (_exec && _exec->getOpCtx()) { _exec->detachFromOperationContext(); } } diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index bece5cefcea..e436fc29334 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -70,6 +70,7 @@ #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/collation/collator_interface.h" #include "mongo/db/query/get_executor.h" +#include "mongo/db/query/plan_executor_factory.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/query/sort_pattern.h" @@ -169,7 +170,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorEx minWorkAdvancedRatio); } - return PlanExecutor::make( + return plan_executor_factory::make( expCtx, std::move(ws), std::move(root), coll, PlanYieldPolicy::YieldPolicy::YIELD_AUTO); } diff --git a/src/mongo/db/query/explain.h b/src/mongo/db/query/explain.h index 2cc16c85b0a..f71c63a0691 100644 --- a/src/mongo/db/query/explain.h +++ b/src/mongo/db/query/explain.h @@ -31,6 +31,8 @@ #include "mongo/db/exec/plan_stage.h" #include "mongo/db/exec/plan_stats.h" +#include "mongo/db/exec/sbe/stages/plan_stats.h" +#include "mongo/db/exec/sbe/stages/stages.h" #include "mongo/db/query/canonical_query.h" #include "mongo/db/query/explain_options.h" #include "mongo/db/query/plan_executor.h" diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index 49169ef4ddc..c936ea37681 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -165,10 +165,9 @@ void generateBatch(int ntoreturn, PlanExecutor* exec = cursor->getExecutor(); try { - Document doc; + BSONObj obj; while (!FindCommon::enoughForGetMore(ntoreturn, *numResults) && - PlanExecutor::ADVANCED == (*state = exec->getNext(&doc, nullptr))) { - BSONObj obj = doc.toBson(); + PlanExecutor::ADVANCED == (*state = exec->getNext(&obj, nullptr))) { // If we can't fit this result inside the current batch, then we stash it for later. if (!FindCommon::haveSpaceForNext(obj, *numResults, bb->len())) { @@ -697,10 +696,7 @@ bool runQuery(OperationContext* opCtx, } try { - Document doc; - while (PlanExecutor::ADVANCED == (state = exec->getNext(&doc, nullptr))) { - obj = doc.toBson(); - + while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, nullptr))) { // If we can't fit this result inside the current batch, then we stash it for later. if (!FindCommon::haveSpaceForNext(obj, numResults, bb.len())) { exec->enqueue(obj); @@ -749,16 +745,13 @@ bool runQuery(OperationContext* opCtx, // Allocate a new ClientCursor and register it with the cursor manager. ClientCursorPin pinnedCursor = CursorManager::get(opCtx)->registerCursor( opCtx, - { - std::move(exec), - nss, - AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), - opCtx->getWriteConcern(), - readConcernArgs, - upconvertedQuery, - {Privilege(ResourcePattern::forExactNamespace(nss), ActionType::find)}, - false // needsMerge always 'false' for find(). - }); + {std::move(exec), + nss, + AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), + opCtx->getWriteConcern(), + readConcernArgs, + upconvertedQuery, + {Privilege(ResourcePattern::forExactNamespace(nss), ActionType::find)}}); ccId = pinnedCursor.getCursor()->cursorid(); LOGV2_DEBUG( diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index 3d2f9bb5563..f72ae1f898a 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -71,7 +71,7 @@ #include "mongo/db/query/index_bounds_builder.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/query/plan_cache.h" -#include "mongo/db/query/plan_executor.h" +#include "mongo/db/query/plan_executor_factory.h" #include "mongo/db/query/planner_access.h" #include "mongo/db/query/planner_analysis.h" #include "mongo/db/query/planner_ixselect.h" @@ -971,13 +971,13 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getClassicExecu invariant(root); // We must have a tree of stages in order to have a valid plan executor, but the query // solution may be null. - return PlanExecutor::make(std::move(canonicalQuery), - std::move(ws), - std::move(root), - collection, - yieldPolicy, - {}, - result->solution()); + return plan_executor_factory::make(std::move(canonicalQuery), + std::move(ws), + std::move(root), + collection, + yieldPolicy, + {}, + result->solution()); } /** @@ -1067,16 +1067,16 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getSlotBasedExe plannerOptions)) { // Do the runtime planning and pick the best candidate plan. auto plan = planner->plan(std::move(solutions), std::move(roots)); - return PlanExecutor::make(opCtx, - std::move(cq), - {std::move(plan.root), std::move(plan.data)}, - {}, - std::move(plan.results), - std::move(yieldPolicy)); + return plan_executor_factory::make(opCtx, + std::move(cq), + {std::move(plan.root), std::move(plan.data)}, + {}, + std::move(plan.results), + std::move(yieldPolicy)); } // No need for runtime planning, just use the constructed plan stage tree. invariant(roots.size() == 1); - return PlanExecutor::make( + return plan_executor_factory::make( opCtx, std::move(cq), std::move(roots[0]), {}, std::move(yieldPolicy)); } } // namespace @@ -1237,7 +1237,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorDele "Collection does not exist. Using EOF stage", "namespace"_attr = nss.ns(), "query"_attr = redact(request->getQuery())); - return PlanExecutor::make( + return plan_executor_factory::make( expCtx, std::move(ws), std::make_unique<EOFStage>(expCtx.get()), nullptr, policy, nss); } @@ -1280,7 +1280,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorDele ws.get(), collection, idHackStage.release()); - return PlanExecutor::make( + return plan_executor_factory::make( expCtx, std::move(ws), std::move(root), collection, policy); } } @@ -1337,13 +1337,13 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorDele // We must have a tree of stages in order to have a valid plan executor, but the query // solution may be null. - return PlanExecutor::make(std::move(cq), - std::move(ws), - std::move(root), - collection, - policy, - NamespaceString(), - std::move(querySolution)); + return plan_executor_factory::make(std::move(cq), + std::move(ws), + std::move(root), + collection, + policy, + NamespaceString(), + std::move(querySolution)); } // @@ -1410,7 +1410,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorUpda "Collection does not exist. Using EOF stage", "namespace"_attr = nss.ns(), "query"_attr = redact(request->getQuery())); - return PlanExecutor::make( + return plan_executor_factory::make( expCtx, std::move(ws), std::make_unique<EOFStage>(expCtx.get()), nullptr, policy, nss); } @@ -1507,13 +1507,13 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorUpda // We must have a tree of stages in order to have a valid plan executor, but the query // solution may be null. Takes ownership of all args other than 'collection' and 'opCtx' - return PlanExecutor::make(std::move(cq), - std::move(ws), - std::move(root), - collection, - policy, - NamespaceString(), - std::move(querySolution)); + return plan_executor_factory::make(std::move(cq), + std::move(ws), + std::move(root), + collection, + policy, + NamespaceString(), + std::move(querySolution)); } // @@ -1696,7 +1696,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorCoun // this case we put a CountStage on top of an EOFStage. std::unique_ptr<PlanStage> root = std::make_unique<CountStage>( expCtx.get(), collection, limit, skip, ws.get(), new EOFStage(expCtx.get())); - return PlanExecutor::make( + return plan_executor_factory::make( expCtx, std::move(ws), std::move(root), nullptr, yieldPolicy, nss); } @@ -1712,7 +1712,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorCoun if (useRecordStoreCount) { std::unique_ptr<PlanStage> root = std::make_unique<RecordStoreFastCountStage>(expCtx.get(), collection, skip, limit); - return PlanExecutor::make( + return plan_executor_factory::make( expCtx, std::move(ws), std::move(root), nullptr, yieldPolicy, nss); } @@ -1737,13 +1737,13 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorCoun expCtx.get(), collection, limit, skip, ws.get(), root.release()); // We must have a tree of stages in order to have a valid plan executor, but the query // solution may be NULL. Takes ownership of all args other than 'collection' and 'opCtx' - return PlanExecutor::make(std::move(cq), - std::move(ws), - std::move(root), - collection, - yieldPolicy, - NamespaceString(), - std::move(querySolution)); + return plan_executor_factory::make(std::move(cq), + std::move(ws), + std::move(root), + collection, + yieldPolicy, + NamespaceString(), + std::move(querySolution)); } // @@ -2057,13 +2057,13 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorForS "query"_attr = redact(parsedDistinct->getQuery()->toStringShort()), "planSummary"_attr = Explain::getPlanSummary(root.get())); - return PlanExecutor::make(parsedDistinct->releaseQuery(), - std::move(ws), - std::move(root), - collection, - yieldPolicy, - NamespaceString(), - std::move(soln)); + return plan_executor_factory::make(parsedDistinct->releaseQuery(), + std::move(ws), + std::move(root), + collection, + yieldPolicy, + NamespaceString(), + std::move(soln)); } // Checks each solution in the 'solutions' std::vector to see if one includes an IXSCAN that can be @@ -2102,13 +2102,13 @@ getExecutorDistinctFromIndexSolutions(OperationContext* opCtx, "query"_attr = redact(parsedDistinct->getQuery()->toStringShort()), "planSummary"_attr = Explain::getPlanSummary(root.get())); - return PlanExecutor::make(parsedDistinct->releaseQuery(), - std::move(ws), - std::move(root), - collection, - yieldPolicy, - NamespaceString(), - std::move(currentSolution)); + return plan_executor_factory::make(parsedDistinct->releaseQuery(), + std::move(ws), + std::move(root), + collection, + yieldPolicy, + NamespaceString(), + std::move(currentSolution)); } } @@ -2152,11 +2152,11 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorDist if (!collection) { // Treat collections that do not exist as empty collections. - return PlanExecutor::make(parsedDistinct->releaseQuery(), - std::make_unique<WorkingSet>(), - std::make_unique<EOFStage>(expCtx.get()), - collection, - yieldPolicy); + return plan_executor_factory::make(parsedDistinct->releaseQuery(), + std::make_unique<WorkingSet>(), + std::make_unique<EOFStage>(expCtx.get()), + collection, + yieldPolicy); } // TODO: check for idhack here? diff --git a/src/mongo/db/query/internal_plans.cpp b/src/mongo/db/query/internal_plans.cpp index 33d2a3d1489..7d932c8efcd 100644 --- a/src/mongo/db/query/internal_plans.cpp +++ b/src/mongo/db/query/internal_plans.cpp @@ -44,6 +44,7 @@ #include "mongo/db/exec/update_stage.h" #include "mongo/db/exec/upsert_stage.h" #include "mongo/db/query/get_executor.h" +#include "mongo/db/query/plan_executor_factory.h" namespace mongo { @@ -61,7 +62,7 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::collection if (nullptr == collection) { auto eof = std::make_unique<EOFStage>(expCtx.get()); // Takes ownership of 'ws' and 'eof'. - auto statusWithPlanExecutor = PlanExecutor::make( + auto statusWithPlanExecutor = plan_executor_factory::make( expCtx, std::move(ws), std::move(eof), nullptr, yieldPolicy, NamespaceString(ns)); invariant(statusWithPlanExecutor.isOK()); return std::move(statusWithPlanExecutor.getValue()); @@ -73,7 +74,7 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::collection // Takes ownership of 'ws' and 'cs'. auto statusWithPlanExecutor = - PlanExecutor::make(expCtx, std::move(ws), std::move(cs), collection, yieldPolicy); + plan_executor_factory::make(expCtx, std::move(ws), std::move(cs), collection, yieldPolicy); invariant(statusWithPlanExecutor.isOK()); return std::move(statusWithPlanExecutor.getValue()); } @@ -95,8 +96,8 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::deleteWith root = std::make_unique<DeleteStage>( expCtx.get(), std::move(params), ws.get(), collection, root.release()); - auto executor = - PlanExecutor::make(expCtx, std::move(ws), std::move(root), collection, yieldPolicy); + auto executor = plan_executor_factory::make( + expCtx, std::move(ws), std::move(root), collection, yieldPolicy); invariant(executor.getStatus()); return std::move(executor.getValue()); } @@ -127,8 +128,8 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::indexScan( direction, options); - auto executor = - PlanExecutor::make(expCtx, std::move(ws), std::move(root), collection, yieldPolicy); + auto executor = plan_executor_factory::make( + expCtx, std::move(ws), std::move(root), collection, yieldPolicy); invariant(executor.getStatus()); return std::move(executor.getValue()); } @@ -162,8 +163,8 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::deleteWith root = std::make_unique<DeleteStage>( expCtx.get(), std::move(params), ws.get(), collection, root.release()); - auto executor = - PlanExecutor::make(expCtx, std::move(ws), std::move(root), collection, yieldPolicy); + auto executor = plan_executor_factory::make( + expCtx, std::move(ws), std::move(root), collection, yieldPolicy); invariant(executor.getStatus()); return std::move(executor.getValue()); } @@ -190,8 +191,8 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::updateWith : std::make_unique<UpdateStage>( expCtx.get(), params, ws.get(), collection, idHackStage.release())); - auto executor = - PlanExecutor::make(expCtx, std::move(ws), std::move(root), collection, yieldPolicy); + auto executor = plan_executor_factory::make( + expCtx, std::move(ws), std::move(root), collection, yieldPolicy); invariant(executor.getStatus()); return std::move(executor.getValue()); } diff --git a/src/mongo/db/query/internal_plans.h b/src/mongo/db/query/internal_plans.h index 05e357fadd1..a7d91590554 100644 --- a/src/mongo/db/query/internal_plans.h +++ b/src/mongo/db/query/internal_plans.h @@ -31,6 +31,7 @@ #include "mongo/base/string_data.h" #include "mongo/db/exec/delete.h" +#include "mongo/db/query/index_bounds.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/record_id.h" diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index 40a8e4fee18..87a1c5cbd1e 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -29,33 +29,17 @@ #pragma once -#include <boost/optional.hpp> -#include <queue> - #include "mongo/base/status.h" -#include "mongo/db/catalog/util/partitioned.h" #include "mongo/db/exec/plan_stats.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/query/canonical_query.h" #include "mongo/db/query/plan_yield_policy.h" -#include "mongo/db/query/plan_yield_policy_sbe.h" -#include "mongo/db/query/query_solution.h" -#include "mongo/db/query/sbe_stage_builder.h" -#include "mongo/db/storage/snapshot.h" -#include "mongo/stdx/unordered_set.h" namespace mongo { class BSONObj; -class CappedInsertNotifier; -struct CappedInsertNotifierData; -class Collection; -class PlanExecutor; class PlanStage; class RecordId; -class WorkingSet; - -namespace sbe { -class PlanStage; -} // namespace sbe /** * If a getMore command specified a lastKnownCommittedOpTime (as secondaries do), we want to stop @@ -149,86 +133,18 @@ public: bool _dismissed = false; }; - // - // Factory methods. - // - // On success, return a new PlanExecutor, owned by the caller. - // - // Passing YIELD_AUTO to any of these factories will construct a yielding executor which may - // yield in the following circumstances: - // - During plan selection inside the call to make(). - // - On any call to getNext(). - // - On any call to restoreState(). - // - While executing the plan inside executePlan(). - // - // If auto-yielding is enabled, a yield during make() may result in the PlanExecutor being - // killed, in which case this method will return a non-OK status. - // - // All callers of these factory methods should provide either a non-null value for 'collection' - // or a non-empty 'nss' NamespaceString but not both. - // - - /** - * Note that the PlanExecutor will use the ExpressionContext associated with 'cq' and the - * OperationContext associated with that ExpressionContext. - */ - static StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( - std::unique_ptr<CanonicalQuery> cq, - std::unique_ptr<WorkingSet> ws, - std::unique_ptr<PlanStage> rt, - const Collection* collection, - PlanYieldPolicy::YieldPolicy yieldPolicy, - NamespaceString nss = NamespaceString(), - std::unique_ptr<QuerySolution> qs = nullptr); - - /** - * This overload is provided for executors that do not need a CanonicalQuery. For example, the - * outer plan executor for an aggregate command does not have a CanonicalQuery. - * - * Note that the PlanExecutor will use the OperationContext associated with the 'expCtx' - * ExpressionContext. - */ - static StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::unique_ptr<WorkingSet> ws, - std::unique_ptr<PlanStage> rt, - const Collection* collection, - PlanYieldPolicy::YieldPolicy yieldPolicy, - NamespaceString nss = NamespaceString(), - std::unique_ptr<QuerySolution> qs = nullptr); - /** - * These overloads are for SBE. + * Helper method to aid in displaying an ExecState for debug or other recreational purposes. */ - static StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( - OperationContext* opCtx, - std::unique_ptr<CanonicalQuery> cq, - std::pair<std::unique_ptr<sbe::PlanStage>, stage_builder::PlanStageData> root, - NamespaceString nss, - std::unique_ptr<PlanYieldPolicySBE> yieldPolicy); - static StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( - OperationContext* opCtx, - std::unique_ptr<CanonicalQuery> cq, - std::pair<std::unique_ptr<sbe::PlanStage>, stage_builder::PlanStageData> root, - NamespaceString nss, - std::queue<std::pair<BSONObj, boost::optional<RecordId>>> stash, - std::unique_ptr<PlanYieldPolicySBE> yieldPolicy); + static std::string statestr(ExecState s); /** * A PlanExecutor must be disposed before destruction. In most cases, this will happen * automatically through a PlanExecutor::Deleter or a ClientCursor. */ PlanExecutor() = default; - virtual ~PlanExecutor() = default; - - // - // Accessors - // - /** - * Get the working set used by this executor, without transferring ownership. - */ - virtual WorkingSet* getWorkingSet() const = 0; + virtual ~PlanExecutor() = default; /** * Get the stage tree wrapped by this executor, without transferring ownership. @@ -241,7 +157,11 @@ public: virtual CanonicalQuery* getCanonicalQuery() const = 0; /** - * Return the NS that the query is running over. + * Return the namespace that the query is running over. + * + * WARNING: In general, a query execution plan can involve multiple collections, and therefore + * there is not a single namespace associated with a PlanExecutor. This method is here for + * legacy reasons, and new call sites should not be added. */ virtual const NamespaceString& nss() const = 0; @@ -251,15 +171,6 @@ public: virtual OperationContext* getOpCtx() const = 0; /** - * Return the ExpressionContext that the plan is currently executing with. - */ - virtual const boost::intrusive_ptr<ExpressionContext>& getExpCtx() const = 0; - - // - // Methods that just pass down to the PlanStage tree. - // - - /** * Save any state required to recover from changes to the underlying collection's data. * * While in the "saved" state, it is only legal to call restoreState, @@ -299,33 +210,6 @@ public: virtual void reattachToOperationContext(OperationContext* opCtx) = 0; /** - * Same as restoreState but without the logic to retry if a WriteConflictException is - * thrown. - * - * This is only public for PlanYieldPolicy. DO NOT CALL ANYWHERE ELSE. - */ - virtual void restoreStateWithoutRetrying() = 0; - - // - // Running Support - // - - /** - * Return the next result from the underlying execution tree. - * - * For read operations, objOut or dlOut are populated with another query result. - * - * For write operations, the return depends on the particulars of the write stage. - * - * If a YIELD_AUTO policy is set, then this method may yield. - * - * The Documents returned by this method may not be owned. If the caller wants to ensure a - * returned Document is preserved across a yield, getOwned() should be called. - */ - virtual ExecState getNextSnapshotted(Snapshotted<Document>* objOut, RecordId* dlOut) = 0; - virtual ExecState getNextSnapshotted(Snapshotted<BSONObj>* objOut, RecordId* dlOut) = 0; - - /** * Produces the next document from the query execution plan. The caller can request that the * executor returns documents by passing a non-null pointer for the 'objOut' output parameter, * and similarly can request the RecordId by passing a non-null pointer for 'dlOut'. @@ -341,12 +225,16 @@ public: * during yield recovery, an exception can be thrown while locks are not held. Callers cannot * expect locks to be held when this method throws an exception. */ - virtual ExecState getNext(Document* objOut, RecordId* dlOut) = 0; + virtual ExecState getNext(BSONObj* out, RecordId* dlOut) = 0; /** - * Will perform the Document -> BSON conversion for the caller. + * Similar to 'getNext()', but returns a Document rather than a BSONObj. + * + * Callers should generally prefer the BSONObj variant, since not all implementations of + * PlanExecutor use Document/Value as their runtime value format. These implementations will + * typically just convert the BSON to Document on behalf of the caller. */ - virtual ExecState getNext(BSONObj* out, RecordId* dlOut) = 0; + virtual ExecState getNextDocument(Document* objOut, RecordId* dlOut) = 0; /** * Returns 'true' if the plan is done producing results (or writing), 'false' otherwise. @@ -393,11 +281,6 @@ public: virtual void dispose(OperationContext* opCtx) = 0; /** - * Helper method to aid in displaying an ExecState for debug or other recreational purposes. - */ - static std::string statestr(ExecState s); - - /** * Stash the BSONObj so that it gets returned from the PlanExecutor on a later call to * getNext(). * @@ -405,18 +288,13 @@ public: * generating further results from the underlying query plan. * * Subsequent calls to getNext() must request the BSONObj and *not* the RecordId. - * - * If used in combination with getNextSnapshotted(), then the SnapshotId associated with - * 'obj' will be null when 'obj' is dequeued. */ - virtual void enqueue(const Document& obj) = 0; virtual void enqueue(const BSONObj& obj) = 0; virtual bool isMarkedAsKilled() const = 0; virtual Status getKillStatus() = 0; virtual bool isDisposed() const = 0; - virtual bool isDetached() const = 0; /** * If the last oplog timestamp is being tracked for this PlanExecutor, return it. diff --git a/src/mongo/db/query/plan_executor_factory.cpp b/src/mongo/db/query/plan_executor_factory.cpp new file mode 100644 index 00000000000..5c00c01dab6 --- /dev/null +++ b/src/mongo/db/query/plan_executor_factory.cpp @@ -0,0 +1,159 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery + +#include "mongo/platform/basic.h" + +#include "mongo/db/query/plan_executor_factory.h" + +#include "mongo/db/exec/plan_stage.h" +#include "mongo/db/query/plan_executor_impl.h" +#include "mongo/db/query/plan_executor_sbe.h" +#include "mongo/logv2/log.h" + +namespace mongo::plan_executor_factory { + +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( + std::unique_ptr<CanonicalQuery> cq, + std::unique_ptr<WorkingSet> ws, + std::unique_ptr<PlanStage> rt, + const Collection* collection, + PlanYieldPolicy::YieldPolicy yieldPolicy, + NamespaceString nss, + std::unique_ptr<QuerySolution> qs) { + auto expCtx = cq->getExpCtx(); + return make(expCtx->opCtx, + std::move(ws), + std::move(rt), + std::move(qs), + std::move(cq), + expCtx, + collection, + nss, + yieldPolicy); +} + +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + std::unique_ptr<WorkingSet> ws, + std::unique_ptr<PlanStage> rt, + const Collection* collection, + PlanYieldPolicy::YieldPolicy yieldPolicy, + NamespaceString nss, + std::unique_ptr<QuerySolution> qs) { + return make(expCtx->opCtx, + std::move(ws), + std::move(rt), + std::move(qs), + nullptr, + expCtx, + collection, + nss, + yieldPolicy); +} + +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( + OperationContext* opCtx, + std::unique_ptr<WorkingSet> ws, + std::unique_ptr<PlanStage> rt, + std::unique_ptr<QuerySolution> qs, + std::unique_ptr<CanonicalQuery> cq, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const Collection* collection, + NamespaceString nss, + PlanYieldPolicy::YieldPolicy yieldPolicy) { + + try { + auto execImpl = new PlanExecutorImpl(opCtx, + std::move(ws), + std::move(rt), + std::move(qs), + std::move(cq), + expCtx, + collection, + std::move(nss), + yieldPolicy); + PlanExecutor::Deleter planDeleter(opCtx); + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec(execImpl, std::move(planDeleter)); + return {std::move(exec)}; + } catch (...) { + return {exceptionToStatus()}; + } +} + +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( + OperationContext* opCtx, + std::unique_ptr<CanonicalQuery> cq, + std::pair<std::unique_ptr<sbe::PlanStage>, stage_builder::PlanStageData> root, + NamespaceString nss, + std::unique_ptr<PlanYieldPolicySBE> yieldPolicy) { + + auto&& [rootStage, data] = root; + + LOGV2_DEBUG(4822860, + 5, + "SBE plan", + "slots"_attr = data.debugString(), + "stages"_attr = sbe::DebugPrinter{}.print(rootStage.get())); + + rootStage->prepare(data.ctx); + + auto exec = new PlanExecutorSBE(opCtx, + std::move(cq), + std::move(root), + std::move(nss), + false, + boost::none, + std::move(yieldPolicy)); + return {{exec, PlanExecutor::Deleter{opCtx}}}; +} + +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( + OperationContext* opCtx, + std::unique_ptr<CanonicalQuery> cq, + std::pair<std::unique_ptr<sbe::PlanStage>, stage_builder::PlanStageData> root, + NamespaceString nss, + std::queue<std::pair<BSONObj, boost::optional<RecordId>>> stash, + std::unique_ptr<PlanYieldPolicySBE> yieldPolicy) { + + auto&& [rootStage, data] = root; + + LOGV2_DEBUG(4822861, + 5, + "SBE plan", + "slots"_attr = data.debugString(), + "stages"_attr = sbe::DebugPrinter{}.print(rootStage.get())); + + auto exec = new PlanExecutorSBE( + opCtx, std::move(cq), std::move(root), std::move(nss), true, stash, std::move(yieldPolicy)); + return {{exec, PlanExecutor::Deleter{opCtx}}}; +} + +} // namespace mongo::plan_executor_factory diff --git a/src/mongo/db/query/plan_executor_factory.h b/src/mongo/db/query/plan_executor_factory.h new file mode 100644 index 00000000000..3d27507cfd2 --- /dev/null +++ b/src/mongo/db/query/plan_executor_factory.h @@ -0,0 +1,123 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <queue> + +#include "mongo/db/exec/sbe/stages/stages.h" +#include "mongo/db/exec/working_set.h" +#include "mongo/db/query/plan_executor.h" +#include "mongo/db/query/plan_yield_policy_sbe.h" +#include "mongo/db/query/query_solution.h" +#include "mongo/db/query/sbe_stage_builder.h" + +namespace mongo::plan_executor_factory { + +/** + * Creates a new 'PlanExecutor' capable of executing the query 'cq', or a non-OK status if a + * plan executor could not be created. + * + * Passing YIELD_AUTO will construct a yielding executor which may yield in the following + * circumstances: + * - During plan selection inside the call to make(). + * - On any call to getNext(). + * - On any call to restoreState(). + * - While executing the plan inside executePlan(). + * + * If auto-yielding is enabled, a yield during make() may result in the PlanExecutorImpl being + * killed, in which case this method will return a non-OK status. + * + * The caller must provide either a non-null value for 'collection, or a non-empty 'nss' + * NamespaceString but not both. + * + * Note that the PlanExecutor will use the ExpressionContext associated with 'cq' and the + * OperationContext associated with that ExpressionContext. + */ +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( + std::unique_ptr<CanonicalQuery> cq, + std::unique_ptr<WorkingSet> ws, + std::unique_ptr<PlanStage> rt, + const Collection* collection, + PlanYieldPolicy::YieldPolicy yieldPolicy, + NamespaceString nss = NamespaceString(), + std::unique_ptr<QuerySolution> qs = nullptr); + +/** + * This overload is provided for executors that do not need a CanonicalQuery. For example, the + * outer plan executor for an aggregate command does not have a CanonicalQuery. + * + * Note that the PlanExecutor will use the OperationContext associated with the 'expCtx' + * ExpressionContext. + */ +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + std::unique_ptr<WorkingSet> ws, + std::unique_ptr<PlanStage> rt, + const Collection* collection, + PlanYieldPolicy::YieldPolicy yieldPolicy, + NamespaceString nss = NamespaceString(), + std::unique_ptr<QuerySolution> qs = nullptr); + +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( + OperationContext* opCtx, + std::unique_ptr<WorkingSet> ws, + std::unique_ptr<PlanStage> rt, + std::unique_ptr<QuerySolution> qs, + std::unique_ptr<CanonicalQuery> cq, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const Collection* collection, + NamespaceString nss, + PlanYieldPolicy::YieldPolicy yieldPolicy); + +/** + * Constructs a PlanExecutor for the query 'cq' which will execute the SBE plan 'root'. A yield + * policy can optionally be provided if the plan should automatically yield during execution. + */ +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( + OperationContext* opCtx, + std::unique_ptr<CanonicalQuery> cq, + std::pair<std::unique_ptr<sbe::PlanStage>, stage_builder::PlanStageData> root, + NamespaceString nss, + std::unique_ptr<PlanYieldPolicySBE> yieldPolicy); + +/** + * Similar to the factory function above in that it also constructs an executor for the SBE plan + * 'root'. This overload allows callers to pass a pre-existing queue ('stash') of BSON objects or + * record ids to return to the caller. + */ +StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( + OperationContext* opCtx, + std::unique_ptr<CanonicalQuery> cq, + std::pair<std::unique_ptr<sbe::PlanStage>, stage_builder::PlanStageData> root, + NamespaceString nss, + std::queue<std::pair<BSONObj, boost::optional<RecordId>>> stash, + std::unique_ptr<PlanYieldPolicySBE> yieldPolicy); + +} // namespace mongo::plan_executor_factory diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp index 2f2a6bbc192..4e661a4f2ac 100644 --- a/src/mongo/db/query/plan_executor_impl.cpp +++ b/src/mongo/db/query/plan_executor_impl.cpp @@ -85,7 +85,7 @@ MONGO_FAIL_POINT_DEFINE(planExecutorHangWhileYieldedInWaitForInserts); /** * Constructs a PlanYieldPolicy based on 'policy'. */ -std::unique_ptr<PlanYieldPolicy> makeYieldPolicy(PlanExecutor* exec, +std::unique_ptr<PlanYieldPolicy> makeYieldPolicy(PlanExecutorImpl* exec, PlanYieldPolicy::YieldPolicy policy) { switch (policy) { case PlanYieldPolicy::YieldPolicy::YIELD_AUTO: @@ -127,77 +127,6 @@ PlanStage* getStageByType(PlanStage* root, StageType type) { } } // namespace -StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( - std::unique_ptr<CanonicalQuery> cq, - std::unique_ptr<WorkingSet> ws, - std::unique_ptr<PlanStage> rt, - const Collection* collection, - PlanYieldPolicy::YieldPolicy yieldPolicy, - NamespaceString nss, - std::unique_ptr<QuerySolution> qs) { - auto expCtx = cq->getExpCtx(); - return PlanExecutorImpl::make(expCtx->opCtx, - std::move(ws), - std::move(rt), - std::move(qs), - std::move(cq), - expCtx, - collection, - nss, - yieldPolicy); -} - -StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - std::unique_ptr<WorkingSet> ws, - std::unique_ptr<PlanStage> rt, - const Collection* collection, - PlanYieldPolicy::YieldPolicy yieldPolicy, - NamespaceString nss, - std::unique_ptr<QuerySolution> qs) { - return PlanExecutorImpl::make(expCtx->opCtx, - std::move(ws), - std::move(rt), - std::move(qs), - nullptr, - expCtx, - collection, - nss, - yieldPolicy); -} - -StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutorImpl::make( - OperationContext* opCtx, - unique_ptr<WorkingSet> ws, - unique_ptr<PlanStage> rt, - unique_ptr<QuerySolution> qs, - unique_ptr<CanonicalQuery> cq, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const Collection* collection, - NamespaceString nss, - PlanYieldPolicy::YieldPolicy yieldPolicy) { - - auto execImpl = new PlanExecutorImpl(opCtx, - std::move(ws), - std::move(rt), - std::move(qs), - std::move(cq), - expCtx, - collection, - std::move(nss), - yieldPolicy); - PlanExecutor::Deleter planDeleter(opCtx); - std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec(execImpl, std::move(planDeleter)); - - // Perform plan selection, if necessary. - Status status = execImpl->_pickBestPlan(); - if (!status.isOK()) { - return status; - } - - return std::move(exec); -} - PlanExecutorImpl::PlanExecutorImpl(OperationContext* opCtx, unique_ptr<WorkingSet> ws, unique_ptr<PlanStage> rt, @@ -241,6 +170,8 @@ PlanExecutorImpl::PlanExecutorImpl(OperationContext* opCtx, invariant(_cq); _nss = _cq->getQueryRequest().nss(); } + + uassertStatusOK(_pickBestPlan()); } Status PlanExecutorImpl::_pickBestPlan() { @@ -296,10 +227,6 @@ std::string PlanExecutor::statestr(ExecState execState) { MONGO_UNREACHABLE; } -WorkingSet* PlanExecutorImpl::getWorkingSet() const { - return _workingSet.get(); -} - PlanStage* PlanExecutorImpl::getRootStage() const { return _root.get(); } @@ -316,10 +243,6 @@ OperationContext* PlanExecutorImpl::getOpCtx() const { return _opCtx; } -const boost::intrusive_ptr<ExpressionContext>& PlanExecutorImpl::getExpCtx() const { - return _expCtx; -} - void PlanExecutorImpl::saveState() { invariant(_currentState == kUsable || _currentState == kSaved); @@ -360,7 +283,6 @@ void PlanExecutorImpl::detachFromOperationContext() { _expCtx->opCtx = nullptr; } _currentState = kDetached; - _everDetachedFromOperationContext = true; } void PlanExecutorImpl::reattachToOperationContext(OperationContext* opCtx) { @@ -379,14 +301,15 @@ void PlanExecutorImpl::reattachToOperationContext(OperationContext* opCtx) { } PlanExecutor::ExecState PlanExecutorImpl::getNext(BSONObj* objOut, RecordId* dlOut) { - const auto state = getNext(&_docOutput, dlOut); + const auto state = getNextDocument(&_docOutput, dlOut); if (objOut) { - *objOut = _docOutput.toBson(); + const bool includeMetadata = _expCtx && _expCtx->needsMerge; + *objOut = includeMetadata ? _docOutput.toBsonWithMetaData() : _docOutput.toBson(); } return state; } -PlanExecutor::ExecState PlanExecutorImpl::getNext(Document* objOut, RecordId* dlOut) { +PlanExecutor::ExecState PlanExecutorImpl::getNextDocument(Document* objOut, RecordId* dlOut) { Snapshotted<Document> snapshotted; if (objOut) { snapshotted.value() = std::move(*objOut); @@ -400,27 +323,6 @@ PlanExecutor::ExecState PlanExecutorImpl::getNext(Document* objOut, RecordId* dl return state; } -PlanExecutor::ExecState PlanExecutorImpl::getNextSnapshotted(Snapshotted<Document>* objOut, - RecordId* dlOut) { - // Detaching from the OperationContext means that the returned snapshot ids could be invalid. - invariant(!_everDetachedFromOperationContext); - return _getNextImpl(objOut, dlOut); -} - -PlanExecutor::ExecState PlanExecutorImpl::getNextSnapshotted(Snapshotted<BSONObj>* objOut, - RecordId* dlOut) { - // Detaching from the OperationContext means that the returned snapshot ids could be invalid. - invariant(!_everDetachedFromOperationContext); - Snapshotted<Document> docOut; - docOut.value() = std::move(_docOutput); - const auto status = _getNextImpl(&docOut, dlOut); - if (objOut) { - *objOut = {docOut.snapshotId(), docOut.value().toBson()}; - } - _docOutput = std::move(docOut.value()); - return status; -} - bool PlanExecutorImpl::_shouldListenForInserts() { return _cq && _cq->getQueryRequest().isTailableAndAwaitData() && awaitDataState(_opCtx).shouldWaitForInserts && _opCtx->checkForInterruptNoAssert().isOK() && @@ -651,7 +553,7 @@ void PlanExecutorImpl::executePlan() { Document obj; PlanExecutor::ExecState state = PlanExecutor::ADVANCED; while (PlanExecutor::ADVANCED == state) { - state = this->getNext(&obj, nullptr); + state = this->getNextDocument(&obj, nullptr); } if (isMarkedAsKilled()) { @@ -662,12 +564,8 @@ void PlanExecutorImpl::executePlan() { invariant(PlanExecutor::IS_EOF == state); } -void PlanExecutorImpl::enqueue(const Document& obj) { - _stash.push(obj.getOwned()); -} - void PlanExecutorImpl::enqueue(const BSONObj& obj) { - enqueue(Document{obj}); + _stash.push(Document{obj.getOwned()}); } bool PlanExecutorImpl::isMarkedAsKilled() const { @@ -683,10 +581,6 @@ bool PlanExecutorImpl::isDisposed() const { return _currentState == kDisposed; } -bool PlanExecutorImpl::isDetached() const { - return _currentState == kDetached; -} - Timestamp PlanExecutorImpl::getLatestOplogTimestamp() const { if (!_oplogTrackingStage) { return {}; diff --git a/src/mongo/db/query/plan_executor_impl.h b/src/mongo/db/query/plan_executor_impl.h index ac1bcd8e43b..ba0bdd58a9b 100644 --- a/src/mongo/db/query/plan_executor_impl.h +++ b/src/mongo/db/query/plan_executor_impl.h @@ -32,79 +32,70 @@ #include <boost/optional.hpp> #include <queue> +#include "mongo/db/exec/working_set.h" #include "mongo/db/query/plan_executor.h" +#include "mongo/db/query/query_solution.h" namespace mongo { +class CappedInsertNotifier; +struct CappedInsertNotifierData; + class PlanExecutorImpl : public PlanExecutor { PlanExecutorImpl(const PlanExecutorImpl&) = delete; PlanExecutorImpl& operator=(const PlanExecutorImpl&) = delete; public: /** - * Public factory methods delegate to this impl factory to do their work. + * Callers should obtain PlanExecutorImpl instances uses the 'plan_executor_factory' methods, in + * order to avoid depending directly on this concrete implementation of the PlanExecutor + * interface. */ - static StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> make( - OperationContext* opCtx, - std::unique_ptr<WorkingSet> ws, - std::unique_ptr<PlanStage> rt, - std::unique_ptr<QuerySolution> qs, - std::unique_ptr<CanonicalQuery> cq, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const Collection* collection, - NamespaceString nss, - PlanYieldPolicy::YieldPolicy yieldPolicy); + PlanExecutorImpl(OperationContext* opCtx, + std::unique_ptr<WorkingSet> ws, + std::unique_ptr<PlanStage> rt, + std::unique_ptr<QuerySolution> qs, + std::unique_ptr<CanonicalQuery> cq, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const Collection* collection, + NamespaceString nss, + PlanYieldPolicy::YieldPolicy yieldPolicy); virtual ~PlanExecutorImpl(); - WorkingSet* getWorkingSet() const final; PlanStage* getRootStage() const final; CanonicalQuery* getCanonicalQuery() const final; const NamespaceString& nss() const final; OperationContext* getOpCtx() const final; - const boost::intrusive_ptr<ExpressionContext>& getExpCtx() const final; void saveState() final; void restoreState() final; void detachFromOperationContext() final; void reattachToOperationContext(OperationContext* opCtx) final; - void restoreStateWithoutRetrying() final; - ExecState getNextSnapshotted(Snapshotted<Document>* objOut, RecordId* dlOut) final; - ExecState getNextSnapshotted(Snapshotted<BSONObj>* objOut, RecordId* dlOut) final; - ExecState getNext(Document* objOut, RecordId* dlOut) final; + ExecState getNextDocument(Document* objOut, RecordId* dlOut) final; ExecState getNext(BSONObj* out, RecordId* dlOut) final; bool isEOF() final; void executePlan() final; void markAsKilled(Status killStatus) final; void dispose(OperationContext* opCtx) final; - void enqueue(const Document& obj) final; void enqueue(const BSONObj& obj) final; bool isMarkedAsKilled() const final; Status getKillStatus() final; bool isDisposed() const final; - bool isDetached() const final; Timestamp getLatestOplogTimestamp() const final; BSONObj getPostBatchResumeToken() const final; LockPolicy lockPolicy() const final; bool isPipelineExecutor() const final; -private: /** - * New PlanExecutor instances are created with the static make() method above. + * Same as restoreState() but without the logic to retry if a WriteConflictException is thrown. + * + * This is only public for PlanYieldPolicy. DO NOT CALL ANYWHERE ELSE. */ - PlanExecutorImpl(OperationContext* opCtx, - std::unique_ptr<WorkingSet> ws, - std::unique_ptr<PlanStage> rt, - std::unique_ptr<QuerySolution> qs, - std::unique_ptr<CanonicalQuery> cq, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const Collection* collection, - NamespaceString nss, - PlanYieldPolicy::YieldPolicy yieldPolicy); + void restoreStateWithoutRetrying(); +private: /** - * Clients of PlanExecutor expect that on receiving a new instance from one of the make() - * factory methods, plan selection has already been completed. In order to enforce this - * property, this function is called to do plan selection prior to returning the new - * PlanExecutor. + * Called on construction in order to ensure that when callers receive a new instance of a + * 'PlanExecutorImpl', plan selection has already been completed. * * If the tree contains plan selection stages, such as MultiPlanStage or SubplanStage, * this calls into their underlying plan selection facilities. Otherwise, does nothing. @@ -144,9 +135,6 @@ private: */ void _waitForInserts(CappedInsertNotifierData* notifierData); - /** - * Common implementation for getNext() and getNextSnapshotted(). - */ ExecState _getNextImpl(Snapshotted<Document>* objOut, RecordId* dlOut); // The OperationContext that we're executing within. This can be updated if necessary by using @@ -189,8 +177,6 @@ private: enum { kUsable, kSaved, kDetached, kDisposed } _currentState = kUsable; - bool _everDetachedFromOperationContext = false; - // A pointer either to a ChangeStreamProxy or a CollectionScan stage, if present in the // execution tree, or nullptr otherwise. We cache it to avoid the need to traverse the execution // tree in runtime when the executor is requested to return the oplog tracking info. Since this diff --git a/src/mongo/db/query/plan_executor_sbe.cpp b/src/mongo/db/query/plan_executor_sbe.cpp index 187837a1157..5d9375f7ed4 100644 --- a/src/mongo/db/query/plan_executor_sbe.cpp +++ b/src/mongo/db/query/plan_executor_sbe.cpp @@ -27,8 +27,6 @@ * it in the license file. */ -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery - #include "mongo/platform/basic.h" #include "mongo/db/query/plan_executor_sbe.h" @@ -36,57 +34,8 @@ #include "mongo/db/exec/sbe/expressions/expression.h" #include "mongo/db/exec/sbe/values/bson.h" #include "mongo/db/query/sbe_stage_builder.h" -#include "mongo/logv2/log.h" namespace mongo { -StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( - OperationContext* opCtx, - std::unique_ptr<CanonicalQuery> cq, - std::pair<std::unique_ptr<sbe::PlanStage>, stage_builder::PlanStageData> root, - NamespaceString nss, - std::unique_ptr<PlanYieldPolicySBE> yieldPolicy) { - - auto&& [rootStage, data] = root; - - LOGV2_DEBUG(4822860, - 5, - "SBE plan", - "slots"_attr = data.debugString(), - "stages"_attr = sbe::DebugPrinter{}.print(rootStage.get())); - - rootStage->prepare(data.ctx); - - auto exec = new PlanExecutorSBE(opCtx, - std::move(cq), - std::move(root), - std::move(nss), - false, - boost::none, - std::move(yieldPolicy)); - return {{exec, PlanExecutor::Deleter{opCtx}}}; -} - -StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PlanExecutor::make( - OperationContext* opCtx, - std::unique_ptr<CanonicalQuery> cq, - std::pair<std::unique_ptr<sbe::PlanStage>, stage_builder::PlanStageData> root, - NamespaceString nss, - std::queue<std::pair<BSONObj, boost::optional<RecordId>>> stash, - std::unique_ptr<PlanYieldPolicySBE> yieldPolicy) { - - auto&& [rootStage, data] = root; - - LOGV2_DEBUG(4822861, - 5, - "SBE plan", - "slots"_attr = data.debugString(), - "stages"_attr = sbe::DebugPrinter{}.print(rootStage.get())); - - auto exec = new PlanExecutorSBE( - opCtx, std::move(cq), std::move(root), std::move(nss), true, stash, std::move(yieldPolicy)); - return {{exec, PlanExecutor::Deleter{opCtx}}}; -} - PlanExecutorSBE::PlanExecutorSBE( OperationContext* opCtx, std::unique_ptr<CanonicalQuery> cq, @@ -178,16 +127,12 @@ void PlanExecutorSBE::dispose(OperationContext* opCtx) { _root.reset(); } -void PlanExecutorSBE::enqueue(const Document& obj) { - enqueue(obj.toBson()); -} - void PlanExecutorSBE::enqueue(const BSONObj& obj) { invariant(_state == State::kOpened); _stash.push({obj.getOwned(), boost::none}); } -PlanExecutor::ExecState PlanExecutorSBE::getNext(Document* objOut, RecordId* dlOut) { +PlanExecutor::ExecState PlanExecutorSBE::getNextDocument(Document* objOut, RecordId* dlOut) { invariant(_root); BSONObj obj; diff --git a/src/mongo/db/query/plan_executor_sbe.h b/src/mongo/db/query/plan_executor_sbe.h index 7d3c169cc84..776e9d1665c 100644 --- a/src/mongo/db/query/plan_executor_sbe.h +++ b/src/mongo/db/query/plan_executor_sbe.h @@ -34,6 +34,7 @@ #include "mongo/db/exec/sbe/stages/stages.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/query/plan_yield_policy_sbe.h" +#include "mongo/db/query/sbe_stage_builder.h" namespace mongo { class PlanExecutorSBE final : public PlanExecutor { @@ -47,10 +48,6 @@ public: boost::optional<std::queue<std::pair<BSONObj, boost::optional<RecordId>>>> stash, std::unique_ptr<PlanYieldPolicySBE> yieldPolicy); - WorkingSet* getWorkingSet() const override { - MONGO_UNREACHABLE; - } - PlanStage* getRootStage() const override { return nullptr; } @@ -67,31 +64,14 @@ public: return _opCtx; } - const boost::intrusive_ptr<ExpressionContext>& getExpCtx() const override { - static boost::intrusive_ptr<ExpressionContext> unused; - return unused; - } - void saveState(); void restoreState(); void detachFromOperationContext(); void reattachToOperationContext(OperationContext* opCtx); - void restoreStateWithoutRetrying() override { - MONGO_UNREACHABLE; - } - - ExecState getNextSnapshotted(Snapshotted<Document>* objOut, RecordId* dlOut) override { - MONGO_UNREACHABLE; - } - - ExecState getNextSnapshotted(Snapshotted<BSONObj>* objOut, RecordId* dlOut) override { - MONGO_UNREACHABLE; - } - - ExecState getNext(Document* objOut, RecordId* dlOut) override; ExecState getNext(BSONObj* out, RecordId* dlOut) override; + ExecState getNextDocument(Document* objOut, RecordId* dlOut) override; bool isEOF() override { return _state == State::kClosed; @@ -105,7 +85,6 @@ public: void dispose(OperationContext* opCtx); - void enqueue(const Document& obj); void enqueue(const BSONObj& obj); bool isMarkedAsKilled() const override { @@ -121,10 +100,6 @@ public: return !_root; } - bool isDetached() const override { - return !_opCtx; - } - Timestamp getLatestOplogTimestamp() const override; BSONObj getPostBatchResumeToken() const override; diff --git a/src/mongo/db/query/plan_yield_policy_impl.cpp b/src/mongo/db/query/plan_yield_policy_impl.cpp index 3f6a0b0f9ff..808f9464b50 100644 --- a/src/mongo/db/query/plan_yield_policy_impl.cpp +++ b/src/mongo/db/query/plan_yield_policy_impl.cpp @@ -44,7 +44,8 @@ namespace { MONGO_FAIL_POINT_DEFINE(setInterruptOnlyPlansCheckForInterruptHang); } // namespace -PlanYieldPolicyImpl::PlanYieldPolicyImpl(PlanExecutor* exec, PlanYieldPolicy::YieldPolicy policy) +PlanYieldPolicyImpl::PlanYieldPolicyImpl(PlanExecutorImpl* exec, + PlanYieldPolicy::YieldPolicy policy) : PlanYieldPolicy(exec->getOpCtx()->lockState()->isGlobalLockedRecursively() ? PlanYieldPolicy::YieldPolicy::NO_YIELD : policy, diff --git a/src/mongo/db/query/plan_yield_policy_impl.h b/src/mongo/db/query/plan_yield_policy_impl.h index fcc92669fe7..be4163eedec 100644 --- a/src/mongo/db/query/plan_yield_policy_impl.h +++ b/src/mongo/db/query/plan_yield_policy_impl.h @@ -29,14 +29,14 @@ #pragma once -#include "mongo/db/query/plan_executor.h" +#include "mongo/db/query/plan_executor_impl.h" #include "mongo/db/query/plan_yield_policy.h" namespace mongo { class PlanYieldPolicyImpl final : public PlanYieldPolicy { public: - PlanYieldPolicyImpl(PlanExecutor* exec, PlanYieldPolicy::YieldPolicy policy); + PlanYieldPolicyImpl(PlanExecutorImpl* exec, PlanYieldPolicy::YieldPolicy policy); private: Status yield(OperationContext* opCtx, std::function<void()> whileYieldingFn = nullptr) override; @@ -57,7 +57,7 @@ private: // The plan executor which this yield policy is responsible for yielding. Must not outlive the // plan executor. - PlanExecutor* const _planYielding; + PlanExecutorImpl* const _planYielding; }; } // namespace mongo diff --git a/src/mongo/dbtests/cursor_manager_test.cpp b/src/mongo/dbtests/cursor_manager_test.cpp index 500ada8ea8e..b8a841b56a3 100644 --- a/src/mongo/dbtests/cursor_manager_test.cpp +++ b/src/mongo/dbtests/cursor_manager_test.cpp @@ -42,7 +42,7 @@ #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/operation_context.h" -#include "mongo/db/query/plan_executor.h" +#include "mongo/db/query/plan_executor_factory.h" #include "mongo/db/query/query_test_service_context.h" #include "mongo/db/repl/read_concern_level.h" #include "mongo/dbtests/dbtests.h" @@ -75,12 +75,13 @@ public: auto workingSet = std::make_unique<WorkingSet>(); auto queuedDataStage = std::make_unique<QueuedDataStage>(expCtx.get(), workingSet.get()); - return unittest::assertGet(PlanExecutor::make(expCtx, - std::move(workingSet), - std::move(queuedDataStage), - nullptr, - PlanYieldPolicy::YieldPolicy::NO_YIELD, - kTestNss)); + return unittest::assertGet( + plan_executor_factory::make(expCtx, + std::move(workingSet), + std::move(queuedDataStage), + nullptr, + PlanYieldPolicy::YieldPolicy::NO_YIELD, + kTestNss)); } ClientCursorParams makeParams(OperationContext* opCtx) { @@ -92,7 +93,6 @@ public: repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern), BSONObj(), PrivilegeVector(), - false // needsMerge }; } @@ -137,16 +137,13 @@ TEST_F(CursorManagerTest, ShouldBeAbleToKillPinnedCursor) { auto cursorPin = cursorManager->registerCursor( pinningOpCtx, - { - makeFakePlanExecutor(), - kTestNss, - {}, - {}, - repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern), - BSONObj(), - PrivilegeVector(), - false // needsMerge - }); + {makeFakePlanExecutor(), + kTestNss, + {}, + {}, + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern), + BSONObj(), + PrivilegeVector()}); auto cursorId = cursorPin.getCursor()->cursorid(); ASSERT_OK(cursorManager->killCursor(_opCtx.get(), cursorId, shouldAudit)); @@ -166,16 +163,13 @@ TEST_F(CursorManagerTest, ShouldBeAbleToKillPinnedCursorMultiClient) { // Pin the cursor from one client. auto cursorPin = cursorManager->registerCursor( pinningOpCtx, - { - makeFakePlanExecutor(), - kTestNss, - {}, - {}, - repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern), - BSONObj(), - PrivilegeVector(), - false // needsMerge - }); + {makeFakePlanExecutor(), + kTestNss, + {}, + {}, + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern), + BSONObj(), + PrivilegeVector()}); auto cursorId = cursorPin.getCursor()->cursorid(); @@ -204,18 +198,14 @@ TEST_F(CursorManagerTest, InactiveCursorShouldTimeout) { CursorManager* cursorManager = useCursorManager(); auto clock = useClock(); - cursorManager->registerCursor( - _opCtx.get(), - { - makeFakePlanExecutor(), - NamespaceString{"test.collection"}, - {}, - {}, - repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern), - BSONObj(), - PrivilegeVector(), - false // needsMerge - }); + cursorManager->registerCursor(_opCtx.get(), + {makeFakePlanExecutor(), + NamespaceString{"test.collection"}, + {}, + {}, + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern), + BSONObj(), + PrivilegeVector()}); ASSERT_EQ(0UL, cursorManager->timeoutCursors(_opCtx.get(), Date_t())); @@ -223,18 +213,14 @@ TEST_F(CursorManagerTest, InactiveCursorShouldTimeout) { ASSERT_EQ(1UL, cursorManager->timeoutCursors(_opCtx.get(), clock->now())); ASSERT_EQ(0UL, cursorManager->numCursors()); - cursorManager->registerCursor( - _opCtx.get(), - { - makeFakePlanExecutor(), - NamespaceString{"test.collection"}, - {}, - {}, - repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern), - BSONObj(), - PrivilegeVector(), - false // needsMerge - }); + cursorManager->registerCursor(_opCtx.get(), + {makeFakePlanExecutor(), + NamespaceString{"test.collection"}, + {}, + {}, + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern), + BSONObj(), + PrivilegeVector()}); ASSERT_EQ(1UL, cursorManager->timeoutCursors(_opCtx.get(), Date_t::max())); ASSERT_EQ(0UL, cursorManager->numCursors()); } @@ -248,16 +234,13 @@ TEST_F(CursorManagerTest, InactivePinnedCursorShouldNotTimeout) { auto cursorPin = cursorManager->registerCursor( _opCtx.get(), - { - makeFakePlanExecutor(), - NamespaceString{"test.collection"}, - {}, - {}, - repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern), - BSONObj(), - PrivilegeVector(), - false // needsMerge - }); + {makeFakePlanExecutor(), + NamespaceString{"test.collection"}, + {}, + {}, + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern), + BSONObj(), + PrivilegeVector()}); // The pin is still in scope, so it should not time out. clock->advance(getDefaultCursorTimeoutMillis()); @@ -275,16 +258,13 @@ TEST_F(CursorManagerTest, MarkedAsKilledCursorsShouldBeDeletedOnCursorPin) { auto cursorPin = cursorManager->registerCursor( _opCtx.get(), - { - makeFakePlanExecutor(), - NamespaceString{"test.collection"}, - {}, - {}, - repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern), - BSONObj(), - PrivilegeVector(), - false // needsMerge - }); + {makeFakePlanExecutor(), + NamespaceString{"test.collection"}, + {}, + {}, + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern), + BSONObj(), + PrivilegeVector()}); auto cursorId = cursorPin->cursorid(); // A cursor will stay alive, but be marked as killed, if it is interrupted with a code other @@ -311,16 +291,13 @@ TEST_F(CursorManagerTest, InactiveKilledCursorsShouldTimeout) { auto cursorPin = cursorManager->registerCursor( _opCtx.get(), - { - makeFakePlanExecutor(), - NamespaceString{"test.collection"}, - {}, - {}, - repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern), - BSONObj(), - PrivilegeVector(), - false // needsMerge - }); + {makeFakePlanExecutor(), + NamespaceString{"test.collection"}, + {}, + {}, + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern), + BSONObj(), + PrivilegeVector()}); // A cursor will stay alive, but be marked as killed, if it is interrupted with a code other // than ErrorCodes::Interrupted or ErrorCodes::CursorKilled and then unpinned. @@ -346,33 +323,26 @@ TEST_F(CursorManagerTest, UsingACursorShouldUpdateTimeOfLastUse) { // Register a cursor which we will look at again. auto cursorPin = cursorManager->registerCursor( _opCtx.get(), - { - makeFakePlanExecutor(), - kTestNss, - {}, - {}, - repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern), - BSONObj(), - PrivilegeVector(), - false // needsMerge - }); + {makeFakePlanExecutor(), + kTestNss, + {}, + {}, + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern), + BSONObj(), + PrivilegeVector()}); auto usedCursorId = cursorPin.getCursor()->cursorid(); cursorPin.release(); // Register a cursor to immediately forget about, to make sure it will time out on a normal // schedule. - cursorManager->registerCursor( - _opCtx.get(), - { - makeFakePlanExecutor(), - kTestNss, - {}, - {}, - repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern), - BSONObj(), - PrivilegeVector(), - false // needsMerge - }); + cursorManager->registerCursor(_opCtx.get(), + {makeFakePlanExecutor(), + kTestNss, + {}, + {}, + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern), + BSONObj(), + PrivilegeVector()}); // Advance the clock to simulate time passing. clock->advance(Milliseconds(1)); @@ -403,16 +373,13 @@ TEST_F(CursorManagerTest, CursorShouldNotTimeOutUntilIdleForLongEnoughAfterBeing // Register a cursor which we will look at again. auto cursorPin = cursorManager->registerCursor( _opCtx.get(), - { - makeFakePlanExecutor(), - kTestNss, - {}, - {}, - repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern), - BSONObj(), - PrivilegeVector(), - false // needsMerge - }); + {makeFakePlanExecutor(), + kTestNss, + {}, + {}, + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern), + BSONObj(), + PrivilegeVector()}); // Advance the clock to simulate time passing. clock->advance(getDefaultCursorTimeoutMillis() + Milliseconds(1)); diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp index a779fe0224d..71e5ac4927d 100644 --- a/src/mongo/dbtests/documentsourcetests.cpp +++ b/src/mongo/dbtests/documentsourcetests.cpp @@ -48,7 +48,7 @@ #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/mock_yield_policies.h" -#include "mongo/db/query/plan_executor.h" +#include "mongo/db/query/plan_executor_factory.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/query/stage_builder.h" #include "mongo/dbtests/dbtests.h" @@ -315,11 +315,11 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterTimeout) auto canonicalQuery = unittest::assertGet( CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr)); auto planExecutor = - uassertStatusOK(PlanExecutor::make(std::move(canonicalQuery), - std::move(workingSet), - std::move(collectionScan), - readLock.getCollection(), - PlanYieldPolicy::YieldPolicy::ALWAYS_TIME_OUT)); + uassertStatusOK(plan_executor_factory::make(std::move(canonicalQuery), + std::move(workingSet), + std::move(collectionScan), + readLock.getCollection(), + PlanYieldPolicy::YieldPolicy::ALWAYS_TIME_OUT)); // Make a DocumentSourceCursor. ctx()->tailableMode = TailableModeEnum::kTailableAndAwaitData; @@ -356,11 +356,11 @@ TEST_F(DocumentSourceCursorTest, NonAwaitDataCursorShouldErrorAfterTimeout) { auto canonicalQuery = unittest::assertGet( CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr)); auto planExecutor = - uassertStatusOK(PlanExecutor::make(std::move(canonicalQuery), - std::move(workingSet), - std::move(collectionScan), - readLock.getCollection(), - PlanYieldPolicy::YieldPolicy::ALWAYS_TIME_OUT)); + uassertStatusOK(plan_executor_factory::make(std::move(canonicalQuery), + std::move(workingSet), + std::move(collectionScan), + readLock.getCollection(), + PlanYieldPolicy::YieldPolicy::ALWAYS_TIME_OUT)); // Make a DocumentSourceCursor. ctx()->tailableMode = TailableModeEnum::kNormal; @@ -405,12 +405,12 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterBeingKil queryRequest->setTailableMode(TailableModeEnum::kTailableAndAwaitData); auto canonicalQuery = unittest::assertGet( CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr)); - auto planExecutor = - uassertStatusOK(PlanExecutor::make(std::move(canonicalQuery), - std::move(workingSet), - std::move(collectionScan), - readLock.getCollection(), - PlanYieldPolicy::YieldPolicy::ALWAYS_MARK_KILLED)); + auto planExecutor = uassertStatusOK( + plan_executor_factory::make(std::move(canonicalQuery), + std::move(workingSet), + std::move(collectionScan), + readLock.getCollection(), + PlanYieldPolicy::YieldPolicy::ALWAYS_MARK_KILLED)); // Make a DocumentSourceCursor. ctx()->tailableMode = TailableModeEnum::kTailableAndAwaitData; @@ -445,12 +445,12 @@ TEST_F(DocumentSourceCursorTest, NormalCursorShouldErrorAfterBeingKilled) { queryRequest->setFilter(filter); auto canonicalQuery = unittest::assertGet( CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr)); - auto planExecutor = - uassertStatusOK(PlanExecutor::make(std::move(canonicalQuery), - std::move(workingSet), - std::move(collectionScan), - readLock.getCollection(), - PlanYieldPolicy::YieldPolicy::ALWAYS_MARK_KILLED)); + auto planExecutor = uassertStatusOK( + plan_executor_factory::make(std::move(canonicalQuery), + std::move(workingSet), + std::move(collectionScan), + readLock.getCollection(), + PlanYieldPolicy::YieldPolicy::ALWAYS_MARK_KILLED)); // Make a DocumentSourceCursor. ctx()->tailableMode = TailableModeEnum::kNormal; diff --git a/src/mongo/dbtests/plan_executor_invalidation_test.cpp b/src/mongo/dbtests/plan_executor_invalidation_test.cpp index 1bf6edc5a93..8ff34319036 100644 --- a/src/mongo/dbtests/plan_executor_invalidation_test.cpp +++ b/src/mongo/dbtests/plan_executor_invalidation_test.cpp @@ -42,7 +42,7 @@ #include "mongo/db/json.h" #include "mongo/db/matcher/expression_parser.h" #include "mongo/db/query/internal_plans.h" -#include "mongo/db/query/plan_executor.h" +#include "mongo/db/query/plan_executor_factory.h" #include "mongo/db/service_context.h" #include "mongo/dbtests/dbtests.h" #include "mongo/unittest/unittest.h" @@ -87,7 +87,7 @@ public: std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); // Takes ownership of 'ws', 'scan', and 'cq'. - auto statusWithPlanExecutor = PlanExecutor::make( + auto statusWithPlanExecutor = plan_executor_factory::make( std::move(cq), std::move(ws), std::move(scan), diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp index 10961cb1018..4ed8b61720b 100644 --- a/src/mongo/dbtests/query_plan_executor.cpp +++ b/src/mongo/dbtests/query_plan_executor.cpp @@ -51,7 +51,7 @@ #include "mongo/db/pipeline/document_source_cursor.h" #include "mongo/db/pipeline/expression_context_for_test.h" #include "mongo/db/pipeline/pipeline.h" -#include "mongo/db/query/plan_executor.h" +#include "mongo/db/query/plan_executor_factory.h" #include "mongo/db/query/query_solution.h" #include "mongo/dbtests/dbtests.h" @@ -119,8 +119,8 @@ public: new CollectionScan(cq->getExpCtxRaw(), coll, csparams, ws.get(), cq.get()->root())); // Hand the plan off to the executor. - auto statusWithPlanExecutor = - PlanExecutor::make(std::move(cq), std::move(ws), std::move(root), coll, yieldPolicy); + auto statusWithPlanExecutor = plan_executor_factory::make( + std::move(cq), std::move(ws), std::move(root), coll, yieldPolicy); ASSERT_OK(statusWithPlanExecutor.getStatus()); return std::move(statusWithPlanExecutor.getValue()); } @@ -165,11 +165,11 @@ public: // Hand the plan off to the executor. auto statusWithPlanExecutor = - PlanExecutor::make(std::move(cq), - std::move(ws), - std::move(root), - coll, - PlanYieldPolicy::YieldPolicy::YIELD_MANUAL); + plan_executor_factory::make(std::move(cq), + std::move(ws), + std::move(root), + coll, + PlanYieldPolicy::YieldPolicy::YIELD_MANUAL); ASSERT_OK(statusWithPlanExecutor.getStatus()); return std::move(statusWithPlanExecutor.getValue()); } @@ -227,11 +227,12 @@ TEST_F(PlanExecutorTest, DropIndexScanAgg) { auto ws = std::make_unique<WorkingSet>(); auto proxy = std::make_unique<PipelineProxyStage>(_expCtx.get(), std::move(pipeline), ws.get()); - auto statusWithPlanExecutor = PlanExecutor::make(_expCtx, - std::move(ws), - std::move(proxy), - collection, - PlanYieldPolicy::YieldPolicy::NO_YIELD); + auto statusWithPlanExecutor = + plan_executor_factory::make(_expCtx, + std::move(ws), + std::move(proxy), + collection, + PlanYieldPolicy::YieldPolicy::NO_YIELD); ASSERT_OK(statusWithPlanExecutor.getStatus()); auto outerExec = std::move(statusWithPlanExecutor.getValue()); diff --git a/src/mongo/dbtests/query_stage_collscan.cpp b/src/mongo/dbtests/query_stage_collscan.cpp index 27fd43dd268..04bfe282571 100644 --- a/src/mongo/dbtests/query_stage_collscan.cpp +++ b/src/mongo/dbtests/query_stage_collscan.cpp @@ -46,7 +46,7 @@ #include "mongo/db/json.h" #include "mongo/db/matcher/expression_parser.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/query/plan_executor.h" +#include "mongo/db/query/plan_executor_factory.h" #include "mongo/db/storage/record_store.h" #include "mongo/dbtests/dbtests.h" #include "mongo/unittest/unittest.h" @@ -104,11 +104,12 @@ public: unique_ptr<PlanStage> ps = std::make_unique<CollectionScan>( _expCtx.get(), collection, params, ws.get(), filterExpr.get()); - auto statusWithPlanExecutor = PlanExecutor::make(_expCtx, - std::move(ws), - std::move(ps), - collection, - PlanYieldPolicy::YieldPolicy::NO_YIELD); + auto statusWithPlanExecutor = + plan_executor_factory::make(_expCtx, + std::move(ws), + std::move(ps), + collection, + PlanYieldPolicy::YieldPolicy::NO_YIELD); ASSERT_OK(statusWithPlanExecutor.getStatus()); auto exec = std::move(statusWithPlanExecutor.getValue()); @@ -197,7 +198,7 @@ TEST_F(QueryStageCollectionScanTest, QueryStageCollscanObjectsInOrderForward) { unique_ptr<PlanStage> ps = std::make_unique<CollectionScan>(_expCtx.get(), collection, params, ws.get(), nullptr); - auto statusWithPlanExecutor = PlanExecutor::make( + auto statusWithPlanExecutor = plan_executor_factory::make( _expCtx, std::move(ws), std::move(ps), collection, PlanYieldPolicy::YieldPolicy::NO_YIELD); ASSERT_OK(statusWithPlanExecutor.getStatus()); auto exec = std::move(statusWithPlanExecutor.getValue()); @@ -226,7 +227,7 @@ TEST_F(QueryStageCollectionScanTest, QueryStageCollscanObjectsInOrderBackward) { unique_ptr<PlanStage> ps = std::make_unique<CollectionScan>(_expCtx.get(), collection, params, ws.get(), nullptr); - auto statusWithPlanExecutor = PlanExecutor::make( + auto statusWithPlanExecutor = plan_executor_factory::make( _expCtx, std::move(ws), std::move(ps), collection, PlanYieldPolicy::YieldPolicy::NO_YIELD); ASSERT_OK(statusWithPlanExecutor.getStatus()); auto exec = std::move(statusWithPlanExecutor.getValue()); @@ -379,7 +380,7 @@ TEST_F(QueryStageCollectionScanTest, QueryTestCollscanResumeAfterRecordIdSeekSuc ASSERT_EQUALS(PlanStage::NEED_TIME, ps->work(&id)); // Run the rest of the scan and verify the results. - auto statusWithPlanExecutor = PlanExecutor::make( + auto statusWithPlanExecutor = plan_executor_factory::make( _expCtx, std::move(ws), std::move(ps), collection, PlanYieldPolicy::YieldPolicy::NO_YIELD); ASSERT_OK(statusWithPlanExecutor.getStatus()); auto exec = std::move(statusWithPlanExecutor.getValue()); diff --git a/src/mongo/dbtests/query_stage_merge_sort.cpp b/src/mongo/dbtests/query_stage_merge_sort.cpp index ec91fcc4607..f645dc88d07 100644 --- a/src/mongo/dbtests/query_stage_merge_sort.cpp +++ b/src/mongo/dbtests/query_stage_merge_sort.cpp @@ -45,7 +45,7 @@ #include "mongo/db/exec/working_set_common.h" #include "mongo/db/json.h" #include "mongo/db/query/collation/collator_interface_mock.h" -#include "mongo/db/query/plan_executor.h" +#include "mongo/db/query/plan_executor_factory.h" #include "mongo/dbtests/dbtests.h" /** @@ -186,11 +186,12 @@ public: unique_ptr<FetchStage> fetchStage = make_unique<FetchStage>(_expCtx.get(), ws.get(), std::move(ms), nullptr, coll); // Must fetch if we want to easily pull out an obj. - auto statusWithPlanExecutor = PlanExecutor::make(_expCtx, - std::move(ws), - std::move(fetchStage), - coll, - PlanYieldPolicy::YieldPolicy::NO_YIELD); + auto statusWithPlanExecutor = + plan_executor_factory::make(_expCtx, + std::move(ws), + std::move(fetchStage), + coll, + PlanYieldPolicy::YieldPolicy::NO_YIELD); ASSERT_OK(statusWithPlanExecutor.getStatus()); auto exec = std::move(statusWithPlanExecutor.getValue()); @@ -254,11 +255,12 @@ public: unique_ptr<FetchStage> fetchStage = make_unique<FetchStage>(_expCtx.get(), ws.get(), std::move(ms), nullptr, coll); - auto statusWithPlanExecutor = PlanExecutor::make(_expCtx, - std::move(ws), - std::move(fetchStage), - coll, - PlanYieldPolicy::YieldPolicy::NO_YIELD); + auto statusWithPlanExecutor = + plan_executor_factory::make(_expCtx, + std::move(ws), + std::move(fetchStage), + coll, + PlanYieldPolicy::YieldPolicy::NO_YIELD); ASSERT_OK(statusWithPlanExecutor.getStatus()); auto exec = std::move(statusWithPlanExecutor.getValue()); @@ -322,11 +324,12 @@ public: unique_ptr<FetchStage> fetchStage = make_unique<FetchStage>(_expCtx.get(), ws.get(), std::move(ms), nullptr, coll); - auto statusWithPlanExecutor = PlanExecutor::make(_expCtx, - std::move(ws), - std::move(fetchStage), - coll, - PlanYieldPolicy::YieldPolicy::NO_YIELD); + auto statusWithPlanExecutor = + plan_executor_factory::make(_expCtx, + std::move(ws), + std::move(fetchStage), + coll, + PlanYieldPolicy::YieldPolicy::NO_YIELD); ASSERT_OK(statusWithPlanExecutor.getStatus()); auto exec = std::move(statusWithPlanExecutor.getValue()); @@ -396,11 +399,12 @@ public: unique_ptr<FetchStage> fetchStage = make_unique<FetchStage>(_expCtx.get(), ws.get(), std::move(ms), nullptr, coll); - auto statusWithPlanExecutor = PlanExecutor::make(_expCtx, - std::move(ws), - std::move(fetchStage), - coll, - PlanYieldPolicy::YieldPolicy::NO_YIELD); + auto statusWithPlanExecutor = + plan_executor_factory::make(_expCtx, + std::move(ws), + std::move(fetchStage), + coll, + PlanYieldPolicy::YieldPolicy::NO_YIELD); ASSERT_OK(statusWithPlanExecutor.getStatus()); auto exec = std::move(statusWithPlanExecutor.getValue()); @@ -466,11 +470,12 @@ public: unique_ptr<FetchStage> fetchStage = make_unique<FetchStage>(_expCtx.get(), ws.get(), std::move(ms), nullptr, coll); - auto statusWithPlanExecutor = PlanExecutor::make(_expCtx, - std::move(ws), - std::move(fetchStage), - coll, - PlanYieldPolicy::YieldPolicy::NO_YIELD); + auto statusWithPlanExecutor = + plan_executor_factory::make(_expCtx, + std::move(ws), + std::move(fetchStage), + coll, + PlanYieldPolicy::YieldPolicy::NO_YIELD); ASSERT_OK(statusWithPlanExecutor.getStatus()); auto exec = std::move(statusWithPlanExecutor.getValue()); @@ -523,11 +528,12 @@ public: unique_ptr<FetchStage> fetchStage = make_unique<FetchStage>(_expCtx.get(), ws.get(), std::move(ms), nullptr, coll); - auto statusWithPlanExecutor = PlanExecutor::make(_expCtx, - std::move(ws), - std::move(fetchStage), - coll, - PlanYieldPolicy::YieldPolicy::NO_YIELD); + auto statusWithPlanExecutor = + plan_executor_factory::make(_expCtx, + std::move(ws), + std::move(fetchStage), + coll, + PlanYieldPolicy::YieldPolicy::NO_YIELD); ASSERT_OK(statusWithPlanExecutor.getStatus()); auto exec = std::move(statusWithPlanExecutor.getValue()); @@ -815,11 +821,12 @@ public: auto fetchStage = make_unique<FetchStage>(_expCtx.get(), ws.get(), std::move(ms), nullptr, coll); // Must fetch if we want to easily pull out an obj. - auto statusWithPlanExecutor = PlanExecutor::make(_expCtx, - std::move(ws), - std::move(fetchStage), - coll, - PlanYieldPolicy::YieldPolicy::NO_YIELD); + auto statusWithPlanExecutor = + plan_executor_factory::make(_expCtx, + std::move(ws), + std::move(fetchStage), + coll, + PlanYieldPolicy::YieldPolicy::NO_YIELD); ASSERT_OK(statusWithPlanExecutor.getStatus()); auto exec = std::move(statusWithPlanExecutor.getValue()); @@ -887,11 +894,12 @@ public: unique_ptr<FetchStage> fetchStage = make_unique<FetchStage>(_expCtx.get(), ws.get(), std::move(ms), nullptr, coll); // Must fetch if we want to easily pull out an obj. - auto statusWithPlanExecutor = PlanExecutor::make(_expCtx, - std::move(ws), - std::move(fetchStage), - coll, - PlanYieldPolicy::YieldPolicy::NO_YIELD); + auto statusWithPlanExecutor = + plan_executor_factory::make(_expCtx, + std::move(ws), + std::move(fetchStage), + coll, + PlanYieldPolicy::YieldPolicy::NO_YIELD); ASSERT_OK(statusWithPlanExecutor.getStatus()); auto exec = std::move(statusWithPlanExecutor.getValue()); diff --git a/src/mongo/dbtests/query_stage_multiplan.cpp b/src/mongo/dbtests/query_stage_multiplan.cpp index ca75cc3052b..aa9abb89be2 100644 --- a/src/mongo/dbtests/query_stage_multiplan.cpp +++ b/src/mongo/dbtests/query_stage_multiplan.cpp @@ -49,7 +49,7 @@ #include "mongo/db/query/collection_query_info.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/mock_yield_policies.h" -#include "mongo/db/query/plan_executor.h" +#include "mongo/db/query/plan_executor_factory.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/query/query_planner.h" @@ -255,11 +255,12 @@ TEST_F(QueryStageMultiPlanTest, MPSCollectionScanVsHighlySelectiveIXScan) { ASSERT_EQUALS(0, mps->bestPlanIdx()); // Takes ownership of arguments other than 'collection'. - auto statusWithPlanExecutor = PlanExecutor::make(std::move(cq), - std::move(sharedWs), - std::move(mps), - coll, - PlanYieldPolicy::YieldPolicy::NO_YIELD); + auto statusWithPlanExecutor = + plan_executor_factory::make(std::move(cq), + std::move(sharedWs), + std::move(mps), + coll, + PlanYieldPolicy::YieldPolicy::NO_YIELD); ASSERT_OK(statusWithPlanExecutor.getStatus()); auto exec = std::move(statusWithPlanExecutor.getValue()); @@ -494,11 +495,12 @@ TEST_F(QueryStageMultiPlanTest, MPSExplainAllPlans) { mps->addPlan(std::make_unique<QuerySolution>(), std::move(secondPlan), ws.get()); // Making a PlanExecutor chooses the best plan. - auto exec = uassertStatusOK(PlanExecutor::make(_expCtx, - std::move(ws), - std::move(mps), - ctx.getCollection(), - PlanYieldPolicy::YieldPolicy::NO_YIELD)); + auto exec = + uassertStatusOK(plan_executor_factory::make(_expCtx, + std::move(ws), + std::move(mps), + ctx.getCollection(), + PlanYieldPolicy::YieldPolicy::NO_YIELD)); auto root = static_cast<MultiPlanStage*>(exec->getRootStage()); ASSERT_TRUE(root->bestPlanChosen()); diff --git a/src/mongo/dbtests/query_stage_sort.cpp b/src/mongo/dbtests/query_stage_sort.cpp index 612771d6425..4e14c38fec9 100644 --- a/src/mongo/dbtests/query_stage_sort.cpp +++ b/src/mongo/dbtests/query_stage_sort.cpp @@ -43,7 +43,7 @@ #include "mongo/db/exec/queued_data_stage.h" #include "mongo/db/exec/sort.h" #include "mongo/db/json.h" -#include "mongo/db/query/plan_executor.h" +#include "mongo/db/query/plan_executor_factory.h" #include "mongo/dbtests/dbtests.h" /** @@ -113,6 +113,7 @@ public: Collection* coll) { // Build the mock scan stage which feeds the data. auto ws = std::make_unique<WorkingSet>(); + _workingSet = ws.get(); auto queuedDataStage = std::make_unique<QueuedDataStage>(_expCtx.get(), ws.get()); insertVarietyOfObjects(ws.get(), queuedDataStage.get(), coll); @@ -130,7 +131,7 @@ public: // The PlanExecutor will be automatically registered on construction due to the auto // yield policy, so it can receive invalidations when we remove documents later. - auto statusWithPlanExecutor = PlanExecutor::make( + auto statusWithPlanExecutor = plan_executor_factory::make( _expCtx, std::move(ws), std::move(ss), coll, PlanYieldPolicy::YieldPolicy::YIELD_AUTO); invariant(statusWithPlanExecutor.isOK()); return std::move(statusWithPlanExecutor.getValue()); @@ -173,11 +174,12 @@ public: _expCtx.get(), ws.get(), std::move(sortStage), nullptr, coll); // Must fetch so we can look at the doc as a BSONObj. - auto statusWithPlanExecutor = PlanExecutor::make(_expCtx, - std::move(ws), - std::move(fetchStage), - coll, - PlanYieldPolicy::YieldPolicy::NO_YIELD); + auto statusWithPlanExecutor = + plan_executor_factory::make(_expCtx, + std::move(ws), + std::move(fetchStage), + coll, + PlanYieldPolicy::YieldPolicy::NO_YIELD); ASSERT_OK(statusWithPlanExecutor.getStatus()); auto exec = std::move(statusWithPlanExecutor.getValue()); @@ -242,6 +244,7 @@ protected: boost::intrusive_ptr<ExpressionContext> _expCtx = new ExpressionContext(&_opCtx, nullptr, nss()); DBDirectClient _client; + WorkingSet* _workingSet = nullptr; }; @@ -417,7 +420,7 @@ public: if (PlanStage::ADVANCED != status) { continue; } - WorkingSetMember* member = exec->getWorkingSet()->get(id); + WorkingSetMember* member = _workingSet->get(id); ASSERT(member->hasObj()); if (member->doc.value().getField("_id").getOid() == updatedId) { ASSERT(idBeforeUpdate == member->doc.snapshotId()); @@ -510,7 +513,7 @@ public: if (PlanStage::ADVANCED != status) { continue; } - WorkingSetMember* member = exec->getWorkingSet()->get(id); + WorkingSetMember* member = _workingSet->get(id); ASSERT(member->hasObj()); ++count; } @@ -590,11 +593,12 @@ public: _expCtx.get(), ws.get(), std::move(sortStage), nullptr, coll); // We don't get results back since we're sorting some parallel arrays. - auto statusWithPlanExecutor = PlanExecutor::make(_expCtx, - std::move(ws), - std::move(fetchStage), - coll, - PlanYieldPolicy::YieldPolicy::NO_YIELD); + auto statusWithPlanExecutor = + plan_executor_factory::make(_expCtx, + std::move(ws), + std::move(fetchStage), + coll, + PlanYieldPolicy::YieldPolicy::NO_YIELD); auto exec = std::move(statusWithPlanExecutor.getValue()); ASSERT_THROWS_CODE(exec->getNext(static_cast<BSONObj*>(nullptr), nullptr), diff --git a/src/mongo/dbtests/query_stage_tests.cpp b/src/mongo/dbtests/query_stage_tests.cpp index e51f95f830b..1b6701a9de7 100644 --- a/src/mongo/dbtests/query_stage_tests.cpp +++ b/src/mongo/dbtests/query_stage_tests.cpp @@ -43,7 +43,7 @@ #include "mongo/db/json.h" #include "mongo/db/matcher/expression_parser.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/query/plan_executor.h" +#include "mongo/db/query/plan_executor_factory.h" #include "mongo/dbtests/dbtests.h" /** @@ -92,11 +92,12 @@ public: unique_ptr<IndexScan> ix = std::make_unique<IndexScan>( _expCtx.get(), ctx.getCollection(), params, ws.get(), filterExpr.get()); - auto statusWithPlanExecutor = PlanExecutor::make(_expCtx, - std::move(ws), - std::move(ix), - ctx.getCollection(), - PlanYieldPolicy::YieldPolicy::NO_YIELD); + auto statusWithPlanExecutor = + plan_executor_factory::make(_expCtx, + std::move(ws), + std::move(ix), + ctx.getCollection(), + PlanYieldPolicy::YieldPolicy::NO_YIELD); ASSERT_OK(statusWithPlanExecutor.getStatus()); auto exec = std::move(statusWithPlanExecutor.getValue()); |