diff options
Diffstat (limited to 'src/mongo/db/commands/run_aggregate.cpp')
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 24 |
1 files changed, 11 insertions, 13 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 38e9b713853..daa7cae1f95 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -60,6 +60,7 @@ #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/find_common.h" #include "mongo/db/query/get_executor.h" +#include "mongo/db/query/plan_executor_factory.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/query_planner_common.h" #include "mongo/db/read_concern.h" @@ -169,7 +170,7 @@ bool handleCursorCommand(OperationContext* opCtx, // The initial getNext() on a PipelineProxyStage may be very expensive so we don't // do it when batchSize is 0 since that indicates a desire for a fast return. PlanExecutor::ExecState state; - Document nextDoc; + BSONObj nextDoc; try { state = exec->getNext(&nextDoc, nullptr); @@ -204,8 +205,7 @@ bool handleCursorCommand(OperationContext* opCtx, // If adding this object will cause us to exceed the message size limit, then we stash it // for later. - BSONObj next = expCtx->needsMerge ? nextDoc.toBsonWithMetaData() : nextDoc.toBson(); - if (!FindCommon::haveSpaceForNext(next, objCount, responseBuilder.bytesUsed())) { + if (!FindCommon::haveSpaceForNext(nextDoc, objCount, responseBuilder.bytesUsed())) { exec->enqueue(nextDoc); stashedResult = true; break; @@ -213,7 +213,7 @@ bool handleCursorCommand(OperationContext* opCtx, // If this executor produces a postBatchResumeToken, add it to the cursor response. responseBuilder.setPostBatchResumeToken(exec->getPostBatchResumeToken()); - responseBuilder.append(next); + responseBuilder.append(nextDoc); } if (cursor) { @@ -467,12 +467,12 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> createOuterPipelineProxyExe // invalidations. The Pipeline may contain PlanExecutors which *are* yielding // PlanExecutors and which *are* registered with their respective collection's // CursorManager - return uassertStatusOK(PlanExecutor::make(std::move(expCtx), - std::move(ws), - std::move(proxy), - nullptr, - PlanYieldPolicy::YieldPolicy::NO_YIELD, - nss)); + return uassertStatusOK(plan_executor_factory::make(std::move(expCtx), + std::move(ws), + std::move(proxy), + nullptr, + PlanYieldPolicy::YieldPolicy::NO_YIELD, + nss)); } } // namespace @@ -713,8 +713,7 @@ Status runAggregate(OperationContext* opCtx, opCtx->getWriteConcern(), repl::ReadConcernArgs::get(opCtx), cmdObj, - privileges, - expCtx->needsMerge); + privileges); if (expCtx->tailableMode == TailableModeEnum::kTailable) { cursorParams.setTailable(true); } else if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) { @@ -741,7 +740,6 @@ Status runAggregate(OperationContext* opCtx, } else { invariant(pins[0]->getExecutor()->lockPolicy() == PlanExecutor::LockPolicy::kLockExternally); - invariant(!explainExecutor->isDetached()); invariant(explainExecutor->getOpCtx() == opCtx); // The explainStages() function for a non-pipeline executor expects to be called with // the appropriate collection lock already held. Make sure it has not been released yet. |