summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2021-10-05 00:35:26 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-05 00:48:06 +0000
commitbd574911e9d1686922476938920516c37f9083f9 (patch)
treee77057581b2b0fd6c065d0429506fbeb6a087e3a
parent6940a21899491ade7aab2fb95bfffc7fcd8d2e5c (diff)
downloadmongo-bd574911e9d1686922476938920516c37f9083f9.tar.gz
SERVER-60406 Harden $searchMeta implementation in unsharded collection
-rw-r--r--jstests/aggregation/documents.js22
-rw-r--r--src/mongo/db/pipeline/document_source_queue.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_queue.h3
-rw-r--r--src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp10
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp21
5 files changed, 42 insertions, 16 deletions
diff --git a/jstests/aggregation/documents.js b/jstests/aggregation/documents.js
index f9289a3baf0..5996b5dd4ff 100644
--- a/jstests/aggregation/documents.js
+++ b/jstests/aggregation/documents.js
@@ -29,7 +29,7 @@ coll.drop(writeConcernOptions);
coll.insert({a: 1}, writeConcernOptions);
// $documents given an array of objects.
-const docs = coll.aggregate([{$documents: [{a1: 1}, {a1: 2}]}], writeConcernOptions).toArray();
+const docs = currDB.aggregate([{$documents: [{a1: 1}, {a1: 2}]}], writeConcernOptions).toArray();
assert.eq(2, docs.length);
assert.eq(docs[0], {a1: 1});
@@ -37,7 +37,8 @@ assert.eq(docs[1], {a1: 2});
// $documents evaluates to an array of objects.
const docs1 =
- coll.aggregate([{$documents: {$map: {input: {$range: [0, 100]}, in : {x: "$$this"}}}}],
+ currDB
+ .aggregate([{$documents: {$map: {input: {$range: [0, 100]}, in : {x: "$$this"}}}}],
writeConcernOptions)
.toArray();
@@ -48,7 +49,8 @@ for (let i = 0; i < 100; i++) {
// $documents evaluates to an array of objects.
const docsUnionWith =
- coll.aggregate(
+ currDB
+ .aggregate(
[
{$documents: [{a: 13}]},
{
@@ -76,9 +78,9 @@ assert.eq(res[0]["a"], 1);
assert.eq(res[1], {xx: 1});
assert.eq(res[2], {xx: 2});
-function assertFails(pipeline, code) {
+function assertFails(coll, pipeline, code) {
assert.commandFailedWithCode(currDB.runCommand({
- aggregate: coll.getName(),
+ aggregate: coll,
pipeline: pipeline,
writeConcern: writeConcernOptions.writeConcern,
cursor: {}
@@ -87,16 +89,16 @@ function assertFails(pipeline, code) {
}
// Must fail due to misplaced $document.
-assertFails([{$project: {a: [{xx: 1}, {xx: 2}]}}, {$documents: [{a: 1}]}], 40602);
+assertFails(coll.getName(), [{$project: {a: [{xx: 1}, {xx: 2}]}}, {$documents: [{a: 1}]}], 40602);
// $unionWith must fail due to no $document
-assertFails([{$unionWith: {pipeline: [{$project: {a: [{xx: 1}, {xx: 2}]}}]}}], 9);
+assertFails(coll.getName(), [{$unionWith: {pipeline: [{$project: {a: [{xx: 1}, {xx: 2}]}}]}}], 9);
// Must fail due to $documents producing array of non-objects.
-assertFails([{$documents: [1, 2, 3]}], 40228);
+assertFails(1, [{$documents: [1, 2, 3]}], 40228);
// Must fail due $documents producing non-array.
-assertFails([{$documents: {a: 1}}], 5858203);
+assertFails(1, [{$documents: {a: 1}}], 5858203);
// Must fail due $documents producing array of non-objects.
-assertFails([{$documents: {a: [1, 2, 3]}}], 5858203);
+assertFails(1, [{$documents: {a: [1, 2, 3]}}], 5858203);
})();
diff --git a/src/mongo/db/pipeline/document_source_queue.cpp b/src/mongo/db/pipeline/document_source_queue.cpp
index 5567581486f..aba954dac02 100644
--- a/src/mongo/db/pipeline/document_source_queue.cpp
+++ b/src/mongo/db/pipeline/document_source_queue.cpp
@@ -83,7 +83,7 @@ DocumentSource::GetNextResult DocumentSourceQueue::doGetNext() {
Value DocumentSourceQueue::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
ValueArrayStream vals;
for (auto elem : _queue) {
- vals << elem.getDocument();
+ vals << elem.getDocument().getOwned();
}
return Value(DOC(kStageName << vals.done()));
}
diff --git a/src/mongo/db/pipeline/document_source_queue.h b/src/mongo/db/pipeline/document_source_queue.h
index f24b0c875e4..dec6f473a15 100644
--- a/src/mongo/db/pipeline/document_source_queue.h
+++ b/src/mongo/db/pipeline/document_source_queue.h
@@ -59,7 +59,7 @@ public:
StageConstraints constraints(Pipeline::SplitState pipeState) const override {
StageConstraints constraints(StreamType::kStreaming,
PositionRequirement::kFirst,
- HostTypeRequirement::kNone,
+ HostTypeRequirement::kLocalOnly,
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kNotAllowed,
TransactionRequirement::kAllowed,
@@ -67,6 +67,7 @@ public:
UnionRequirement::kAllowed);
constraints.requiresInputDocSource = false;
+ constraints.isIndependentOfAnyCollection = true;
return constraints;
}
diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
index bc9f5205cd8..e73764a6a7e 100644
--- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
@@ -277,8 +277,14 @@ CommonMongodProcessInterface::attachCursorSourceToPipelineForLocalRead(Pipeline*
std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline,
PipelineDeleter(expCtx->opCtx));
- invariant(pipeline->getSources().empty() ||
- !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get()));
+ boost::optional<DocumentSource*> firstStage = pipeline->getSources().empty()
+ ? boost::optional<DocumentSource*>{}
+ : pipeline->getSources().front().get();
+ invariant(!firstStage || !dynamic_cast<DocumentSourceCursor*>(*firstStage));
+ if (firstStage && !(*firstStage)->constraints().requiresInputDocSource) {
+ // There's no need to attach a cursor here.
+ return pipeline;
+ }
boost::optional<AutoGetCollectionForReadCommand> autoColl;
const NamespaceStringOrUUID nsOrUUID = expCtx->uuid
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index f4b0b5fb689..9d31dd15b64 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -1146,8 +1146,25 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(Pipeline* owne
auto expCtx = ownedPipeline->getContext();
std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline,
PipelineDeleter(expCtx->opCtx));
- invariant(pipeline->getSources().empty() ||
- !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
+ boost::optional<DocumentSource*> hasFirstStage = pipeline->getSources().empty()
+ ? boost::optional<DocumentSource*>{}
+ : pipeline->getSources().front().get();
+
+ if (hasFirstStage) {
+ // Make sure the first stage isn't already a $mergeCursors, and also check if it is a stage
+ // which needs to actually get a cursor attached or not.
+ const auto* firstStage = *hasFirstStage;
+ invariant(!dynamic_cast<const DocumentSourceMergeCursors*>(firstStage));
+ // Here we check the hostRequirement because there is at least one stage ($indexStats) which
+ // does not require input data, but is still expected to fan out and contact remote shards
+ // nonetheless.
+ if (auto constraints = firstStage->constraints(); !constraints.requiresInputDocSource &&
+ (constraints.hostRequirement == StageConstraints::HostTypeRequirement::kLocalOnly)) {
+ // There's no need to attach a cursor here - the first stage provides its own data and
+ // is meant to be run locally (e.g. $documents).
+ return pipeline;
+ }
+ }
auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache();
return shardVersionRetry(