diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2021-10-05 00:35:26 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-10-05 00:48:06 +0000 |
commit | bd574911e9d1686922476938920516c37f9083f9 (patch) | |
tree | e77057581b2b0fd6c065d0429506fbeb6a087e3a | |
parent | 6940a21899491ade7aab2fb95bfffc7fcd8d2e5c (diff) | |
download | mongo-bd574911e9d1686922476938920516c37f9083f9.tar.gz |
SERVER-60406 Harden $searchMeta implementation in unsharded collection
-rw-r--r-- | jstests/aggregation/documents.js | 22 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_queue.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_queue.h | 3 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.cpp | 21 |
5 files changed, 42 insertions, 16 deletions
diff --git a/jstests/aggregation/documents.js b/jstests/aggregation/documents.js index f9289a3baf0..5996b5dd4ff 100644 --- a/jstests/aggregation/documents.js +++ b/jstests/aggregation/documents.js @@ -29,7 +29,7 @@ coll.drop(writeConcernOptions); coll.insert({a: 1}, writeConcernOptions); // $documents given an array of objects. -const docs = coll.aggregate([{$documents: [{a1: 1}, {a1: 2}]}], writeConcernOptions).toArray(); +const docs = currDB.aggregate([{$documents: [{a1: 1}, {a1: 2}]}], writeConcernOptions).toArray(); assert.eq(2, docs.length); assert.eq(docs[0], {a1: 1}); @@ -37,7 +37,8 @@ assert.eq(docs[1], {a1: 2}); // $documents evaluates to an array of objects. const docs1 = - coll.aggregate([{$documents: {$map: {input: {$range: [0, 100]}, in : {x: "$$this"}}}}], + currDB + .aggregate([{$documents: {$map: {input: {$range: [0, 100]}, in : {x: "$$this"}}}}], writeConcernOptions) .toArray(); @@ -48,7 +49,8 @@ for (let i = 0; i < 100; i++) { // $documents evaluates to an array of objects. const docsUnionWith = - coll.aggregate( + currDB + .aggregate( [ {$documents: [{a: 13}]}, { @@ -76,9 +78,9 @@ assert.eq(res[0]["a"], 1); assert.eq(res[1], {xx: 1}); assert.eq(res[2], {xx: 2}); -function assertFails(pipeline, code) { +function assertFails(coll, pipeline, code) { assert.commandFailedWithCode(currDB.runCommand({ - aggregate: coll.getName(), + aggregate: coll, pipeline: pipeline, writeConcern: writeConcernOptions.writeConcern, cursor: {} @@ -87,16 +89,16 @@ function assertFails(pipeline, code) { } // Must fail due to misplaced $document. -assertFails([{$project: {a: [{xx: 1}, {xx: 2}]}}, {$documents: [{a: 1}]}], 40602); +assertFails(coll.getName(), [{$project: {a: [{xx: 1}, {xx: 2}]}}, {$documents: [{a: 1}]}], 40602); // $unionWith must fail due to no $document -assertFails([{$unionWith: {pipeline: [{$project: {a: [{xx: 1}, {xx: 2}]}}]}}], 9); +assertFails(coll.getName(), [{$unionWith: {pipeline: [{$project: {a: [{xx: 1}, {xx: 2}]}}]}}], 9); // Must fail due to $documents producing array of non-objects. -assertFails([{$documents: [1, 2, 3]}], 40228); +assertFails(1, [{$documents: [1, 2, 3]}], 40228); // Must fail due $documents producing non-array. -assertFails([{$documents: {a: 1}}], 5858203); +assertFails(1, [{$documents: {a: 1}}], 5858203); // Must fail due $documents producing array of non-objects. -assertFails([{$documents: {a: [1, 2, 3]}}], 5858203); +assertFails(1, [{$documents: {a: [1, 2, 3]}}], 5858203); })(); diff --git a/src/mongo/db/pipeline/document_source_queue.cpp b/src/mongo/db/pipeline/document_source_queue.cpp index 5567581486f..aba954dac02 100644 --- a/src/mongo/db/pipeline/document_source_queue.cpp +++ b/src/mongo/db/pipeline/document_source_queue.cpp @@ -83,7 +83,7 @@ DocumentSource::GetNextResult DocumentSourceQueue::doGetNext() { Value DocumentSourceQueue::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { ValueArrayStream vals; for (auto elem : _queue) { - vals << elem.getDocument(); + vals << elem.getDocument().getOwned(); } return Value(DOC(kStageName << vals.done())); } diff --git a/src/mongo/db/pipeline/document_source_queue.h b/src/mongo/db/pipeline/document_source_queue.h index f24b0c875e4..dec6f473a15 100644 --- a/src/mongo/db/pipeline/document_source_queue.h +++ b/src/mongo/db/pipeline/document_source_queue.h @@ -59,7 +59,7 @@ public: StageConstraints constraints(Pipeline::SplitState pipeState) const override { StageConstraints constraints(StreamType::kStreaming, PositionRequirement::kFirst, - HostTypeRequirement::kNone, + HostTypeRequirement::kLocalOnly, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kAllowed, @@ -67,6 +67,7 @@ public: UnionRequirement::kAllowed); constraints.requiresInputDocSource = false; + constraints.isIndependentOfAnyCollection = true; return constraints; } diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp index bc9f5205cd8..e73764a6a7e 100644 --- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp @@ -277,8 +277,14 @@ CommonMongodProcessInterface::attachCursorSourceToPipelineForLocalRead(Pipeline* std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline, PipelineDeleter(expCtx->opCtx)); - invariant(pipeline->getSources().empty() || - !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get())); + boost::optional<DocumentSource*> firstStage = pipeline->getSources().empty() + ? boost::optional<DocumentSource*>{} + : pipeline->getSources().front().get(); + invariant(!firstStage || !dynamic_cast<DocumentSourceCursor*>(*firstStage)); + if (firstStage && !(*firstStage)->constraints().requiresInputDocSource) { + // There's no need to attach a cursor here. + return pipeline; + } boost::optional<AutoGetCollectionForReadCommand> autoColl; const NamespaceStringOrUUID nsOrUUID = expCtx->uuid diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index f4b0b5fb689..9d31dd15b64 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -1146,8 +1146,25 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(Pipeline* owne auto expCtx = ownedPipeline->getContext(); std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline, PipelineDeleter(expCtx->opCtx)); - invariant(pipeline->getSources().empty() || - !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); + boost::optional<DocumentSource*> hasFirstStage = pipeline->getSources().empty() + ? boost::optional<DocumentSource*>{} + : pipeline->getSources().front().get(); + + if (hasFirstStage) { + // Make sure the first stage isn't already a $mergeCursors, and also check if it is a stage + // which needs to actually get a cursor attached or not. + const auto* firstStage = *hasFirstStage; + invariant(!dynamic_cast<const DocumentSourceMergeCursors*>(firstStage)); + // Here we check the hostRequirement because there is at least one stage ($indexStats) which + // does not require input data, but is still expected to fan out and contact remote shards + // nonetheless. + if (auto constraints = firstStage->constraints(); !constraints.requiresInputDocSource && + (constraints.hostRequirement == StageConstraints::HostTypeRequirement::kLocalOnly)) { + // There's no need to attach a cursor here - the first stage provides its own data and + // is meant to be run locally (e.g. $documents). + return pipeline; + } + } auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache(); return shardVersionRetry( |