summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/pipeline/document_source_queue.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_queue.h3
-rw-r--r--src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp10
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp21
4 files changed, 30 insertions, 6 deletions
diff --git a/src/mongo/db/pipeline/document_source_queue.cpp b/src/mongo/db/pipeline/document_source_queue.cpp
index 5567581486f..aba954dac02 100644
--- a/src/mongo/db/pipeline/document_source_queue.cpp
+++ b/src/mongo/db/pipeline/document_source_queue.cpp
@@ -83,7 +83,7 @@ DocumentSource::GetNextResult DocumentSourceQueue::doGetNext() {
Value DocumentSourceQueue::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
ValueArrayStream vals;
for (auto elem : _queue) {
- vals << elem.getDocument();
+ vals << elem.getDocument().getOwned();
}
return Value(DOC(kStageName << vals.done()));
}
diff --git a/src/mongo/db/pipeline/document_source_queue.h b/src/mongo/db/pipeline/document_source_queue.h
index f24b0c875e4..dec6f473a15 100644
--- a/src/mongo/db/pipeline/document_source_queue.h
+++ b/src/mongo/db/pipeline/document_source_queue.h
@@ -59,7 +59,7 @@ public:
StageConstraints constraints(Pipeline::SplitState pipeState) const override {
StageConstraints constraints(StreamType::kStreaming,
PositionRequirement::kFirst,
- HostTypeRequirement::kNone,
+ HostTypeRequirement::kLocalOnly,
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kNotAllowed,
TransactionRequirement::kAllowed,
@@ -67,6 +67,7 @@ public:
UnionRequirement::kAllowed);
constraints.requiresInputDocSource = false;
+ constraints.isIndependentOfAnyCollection = true;
return constraints;
}
diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
index bc9f5205cd8..e73764a6a7e 100644
--- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
@@ -277,8 +277,14 @@ CommonMongodProcessInterface::attachCursorSourceToPipelineForLocalRead(Pipeline*
std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline,
PipelineDeleter(expCtx->opCtx));
- invariant(pipeline->getSources().empty() ||
- !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get()));
+ boost::optional<DocumentSource*> firstStage = pipeline->getSources().empty()
+ ? boost::optional<DocumentSource*>{}
+ : pipeline->getSources().front().get();
+ invariant(!firstStage || !dynamic_cast<DocumentSourceCursor*>(*firstStage));
+ if (firstStage && !(*firstStage)->constraints().requiresInputDocSource) {
+ // There's no need to attach a cursor here.
+ return pipeline;
+ }
boost::optional<AutoGetCollectionForReadCommand> autoColl;
const NamespaceStringOrUUID nsOrUUID = expCtx->uuid
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index f4b0b5fb689..9d31dd15b64 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -1146,8 +1146,25 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(Pipeline* owne
auto expCtx = ownedPipeline->getContext();
std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline,
PipelineDeleter(expCtx->opCtx));
- invariant(pipeline->getSources().empty() ||
- !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
+ boost::optional<DocumentSource*> hasFirstStage = pipeline->getSources().empty()
+ ? boost::optional<DocumentSource*>{}
+ : pipeline->getSources().front().get();
+
+ if (hasFirstStage) {
+ // Make sure the first stage isn't already a $mergeCursors, and also check if it is a stage
+ // which needs to actually get a cursor attached or not.
+ const auto* firstStage = *hasFirstStage;
+ invariant(!dynamic_cast<const DocumentSourceMergeCursors*>(firstStage));
+ // Here we check the hostRequirement because there is at least one stage ($indexStats) which
+ // does not require input data, but is still expected to fan out and contact remote shards
+ // nonetheless.
+ if (auto constraints = firstStage->constraints(); !constraints.requiresInputDocSource &&
+ (constraints.hostRequirement == StageConstraints::HostTypeRequirement::kLocalOnly)) {
+ // There's no need to attach a cursor here - the first stage provides its own data and
+ // is meant to be run locally (e.g. $documents).
+ return pipeline;
+ }
+ }
auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache();
return shardVersionRetry(