diff options
Diffstat (limited to 'src/mongo')
4 files changed, 30 insertions, 6 deletions
diff --git a/src/mongo/db/pipeline/document_source_queue.cpp b/src/mongo/db/pipeline/document_source_queue.cpp index 5567581486f..aba954dac02 100644 --- a/src/mongo/db/pipeline/document_source_queue.cpp +++ b/src/mongo/db/pipeline/document_source_queue.cpp @@ -83,7 +83,7 @@ DocumentSource::GetNextResult DocumentSourceQueue::doGetNext() { Value DocumentSourceQueue::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { ValueArrayStream vals; for (auto elem : _queue) { - vals << elem.getDocument(); + vals << elem.getDocument().getOwned(); } return Value(DOC(kStageName << vals.done())); } diff --git a/src/mongo/db/pipeline/document_source_queue.h b/src/mongo/db/pipeline/document_source_queue.h index f24b0c875e4..dec6f473a15 100644 --- a/src/mongo/db/pipeline/document_source_queue.h +++ b/src/mongo/db/pipeline/document_source_queue.h @@ -59,7 +59,7 @@ public: StageConstraints constraints(Pipeline::SplitState pipeState) const override { StageConstraints constraints(StreamType::kStreaming, PositionRequirement::kFirst, - HostTypeRequirement::kNone, + HostTypeRequirement::kLocalOnly, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kAllowed, @@ -67,6 +67,7 @@ public: UnionRequirement::kAllowed); constraints.requiresInputDocSource = false; + constraints.isIndependentOfAnyCollection = true; return constraints; } diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp index bc9f5205cd8..e73764a6a7e 100644 --- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp @@ -277,8 +277,14 @@ CommonMongodProcessInterface::attachCursorSourceToPipelineForLocalRead(Pipeline* std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline, PipelineDeleter(expCtx->opCtx)); - invariant(pipeline->getSources().empty() || - !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get())); + boost::optional<DocumentSource*> firstStage = pipeline->getSources().empty() + ? boost::optional<DocumentSource*>{} + : pipeline->getSources().front().get(); + invariant(!firstStage || !dynamic_cast<DocumentSourceCursor*>(*firstStage)); + if (firstStage && !(*firstStage)->constraints().requiresInputDocSource) { + // There's no need to attach a cursor here. + return pipeline; + } boost::optional<AutoGetCollectionForReadCommand> autoColl; const NamespaceStringOrUUID nsOrUUID = expCtx->uuid diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index f4b0b5fb689..9d31dd15b64 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -1146,8 +1146,25 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(Pipeline* owne auto expCtx = ownedPipeline->getContext(); std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline, PipelineDeleter(expCtx->opCtx)); - invariant(pipeline->getSources().empty() || - !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); + boost::optional<DocumentSource*> hasFirstStage = pipeline->getSources().empty() + ? boost::optional<DocumentSource*>{} + : pipeline->getSources().front().get(); + + if (hasFirstStage) { + // Make sure the first stage isn't already a $mergeCursors, and also check if it is a stage + // which needs to actually get a cursor attached or not. + const auto* firstStage = *hasFirstStage; + invariant(!dynamic_cast<const DocumentSourceMergeCursors*>(firstStage)); + // Here we check the hostRequirement because there is at least one stage ($indexStats) which + // does not require input data, but is still expected to fan out and contact remote shards + // nonetheless. + if (auto constraints = firstStage->constraints(); !constraints.requiresInputDocSource && + (constraints.hostRequirement == StageConstraints::HostTypeRequirement::kLocalOnly)) { + // There's no need to attach a cursor here - the first stage provides its own data and + // is meant to be run locally (e.g. $documents). + return pipeline; + } + } auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache(); return shardVersionRetry( |