summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/run_aggregate.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/commands/run_aggregate.cpp')
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp24
1 files changed, 11 insertions, 13 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 38e9b713853..daa7cae1f95 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -60,6 +60,7 @@
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/find_common.h"
#include "mongo/db/query/get_executor.h"
+#include "mongo/db/query/plan_executor_factory.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/query_planner_common.h"
#include "mongo/db/read_concern.h"
@@ -169,7 +170,7 @@ bool handleCursorCommand(OperationContext* opCtx,
// The initial getNext() on a PipelineProxyStage may be very expensive so we don't
// do it when batchSize is 0 since that indicates a desire for a fast return.
PlanExecutor::ExecState state;
- Document nextDoc;
+ BSONObj nextDoc;
try {
state = exec->getNext(&nextDoc, nullptr);
@@ -204,8 +205,7 @@ bool handleCursorCommand(OperationContext* opCtx,
// If adding this object will cause us to exceed the message size limit, then we stash it
// for later.
- BSONObj next = expCtx->needsMerge ? nextDoc.toBsonWithMetaData() : nextDoc.toBson();
- if (!FindCommon::haveSpaceForNext(next, objCount, responseBuilder.bytesUsed())) {
+ if (!FindCommon::haveSpaceForNext(nextDoc, objCount, responseBuilder.bytesUsed())) {
exec->enqueue(nextDoc);
stashedResult = true;
break;
@@ -213,7 +213,7 @@ bool handleCursorCommand(OperationContext* opCtx,
// If this executor produces a postBatchResumeToken, add it to the cursor response.
responseBuilder.setPostBatchResumeToken(exec->getPostBatchResumeToken());
- responseBuilder.append(next);
+ responseBuilder.append(nextDoc);
}
if (cursor) {
@@ -467,12 +467,12 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> createOuterPipelineProxyExe
// invalidations. The Pipeline may contain PlanExecutors which *are* yielding
// PlanExecutors and which *are* registered with their respective collection's
// CursorManager
- return uassertStatusOK(PlanExecutor::make(std::move(expCtx),
- std::move(ws),
- std::move(proxy),
- nullptr,
- PlanYieldPolicy::YieldPolicy::NO_YIELD,
- nss));
+ return uassertStatusOK(plan_executor_factory::make(std::move(expCtx),
+ std::move(ws),
+ std::move(proxy),
+ nullptr,
+ PlanYieldPolicy::YieldPolicy::NO_YIELD,
+ nss));
}
} // namespace
@@ -713,8 +713,7 @@ Status runAggregate(OperationContext* opCtx,
opCtx->getWriteConcern(),
repl::ReadConcernArgs::get(opCtx),
cmdObj,
- privileges,
- expCtx->needsMerge);
+ privileges);
if (expCtx->tailableMode == TailableModeEnum::kTailable) {
cursorParams.setTailable(true);
} else if (expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData) {
@@ -741,7 +740,6 @@ Status runAggregate(OperationContext* opCtx,
} else {
invariant(pins[0]->getExecutor()->lockPolicy() ==
PlanExecutor::LockPolicy::kLockExternally);
- invariant(!explainExecutor->isDetached());
invariant(explainExecutor->getOpCtx() == opCtx);
// The explainStages() function for a non-pipeline executor expects to be called with
// the appropriate collection lock already held. Make sure it has not been released yet.