summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2020-01-09 12:34:47 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-13 03:13:43 +0000
commit70258e4babfcfa3725a4bf9cf06e853632917e57 (patch)
treec077f63382b148844d88805888b2c87c2d2676af /src
parent66002c604a9a2cd9c419bad318db0252f576dbd8 (diff)
downloadmongo-70258e4babfcfa3725a4bf9cf06e853632917e57.tar.gz
SERVER-45465 Add support for storing $unionWith in a view (unsharded)
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp17
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup_test.cpp18
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp26
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h1
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp122
-rw-r--r--src/mongo/db/pipeline/document_source_union_with.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_union_with_test.cpp17
-rw-r--r--src/mongo/db/pipeline/expression_context.h11
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp17
-rw-r--r--src/mongo/db/pipeline/pipeline.h20
-rw-r--r--src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp25
-rw-r--r--src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h4
-rw-r--r--src/mongo/db/pipeline/process_interface/mongo_process_interface.h22
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp19
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.h5
-rw-r--r--src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp20
-rw-r--r--src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h5
-rw-r--r--src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h9
21 files changed, 139 insertions, 239 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 2e951f66970..ac57422ea33 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -302,9 +302,8 @@ StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNames
// If 'involvedNs' refers to a view namespace, then we resolve its definition.
auto resolvedView = viewCatalog->resolveView(opCtx, involvedNs);
if (!resolvedView.isOK()) {
- return {ErrorCodes::FailedToParse,
- str::stream() << "Failed to resolve view '" << involvedNs.ns()
- << "': " << resolvedView.getStatus().toString()};
+ return resolvedView.getStatus().withContext(
+ str::stream() << "Failed to resolve view '" << involvedNs.ns());
}
resolvedNamespaces[involvedNs.coll()] = {resolvedView.getValue().getNamespace(),
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
index 4ba697ae599..3305b2b108b 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -327,7 +327,7 @@ void DocumentSourceShardCheckResumability::_assertOplogHasEnoughHistory(
// Look up the first document in the oplog and compare it with the resume token's clusterTime.
auto firstEntryExpCtx = pExpCtx->copyWith(NamespaceString::kRsOplogNamespace);
auto matchSpec = BSON("$match" << BSONObj());
- auto pipeline = pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx);
+ auto pipeline = Pipeline::makePipeline({matchSpec}, firstEntryExpCtx);
if (auto first = pipeline->getNext()) {
auto firstOplogEntry = Value(*first);
// If the first entry in the oplog is the replset initialization, then it doesn't matter
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index f2319bc17a3..eace0a24f3d 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -601,23 +601,6 @@ public:
return false;
}
- std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions opts) final {
- auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
-
- if (opts.optimize) {
- pipeline->optimizePipeline();
- }
-
- if (opts.attachCursorSource) {
- pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release(), false);
- }
-
- return pipeline;
- }
-
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
Pipeline* ownedPipeline,
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
index 4c2eb0d013d..7b76bfc9d26 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
@@ -214,13 +214,12 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() {
// We've already allocated space for the trailing $match stage in '_fromPipeline'.
_fromPipeline.back() = *matchStage;
- MongoProcessInterface::MakePipelineOptions pipelineOpts;
+ MakePipelineOptions pipelineOpts;
pipelineOpts.optimize = true;
pipelineOpts.attachCursorSource = true;
// By default, $graphLookup doesn't support a sharded 'from' collection.
pipelineOpts.allowTargetingShards = internalQueryAllowShardedLookup.load();
- auto pipeline = pExpCtx->mongoProcessInterface->makePipeline(
- _fromPipeline, _fromExpCtx, pipelineOpts);
+ auto pipeline = Pipeline::makePipeline(_fromPipeline, _fromExpCtx, pipelineOpts);
while (auto next = pipeline->getNext()) {
uassert(40271,
str::stream()
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
index a3996c47480..084b71ac99b 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
@@ -62,24 +62,6 @@ public:
MockMongoInterface(std::deque<DocumentSource::GetNextResult> results)
: _results(std::move(results)) {}
- std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions opts) final {
- auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
-
- if (opts.optimize) {
- pipeline->optimizePipeline();
- }
-
- if (opts.attachCursorSource) {
- pipeline = attachCursorSourceToPipeline(
- expCtx, pipeline.release(), false /* allowTargetingShards */);
- }
-
- return pipeline;
- }
-
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
Pipeline* ownedPipeline,
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 1c5d8c49319..f998c3d66a8 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -50,8 +50,6 @@ namespace mongo {
using boost::intrusive_ptr;
using std::vector;
-constexpr size_t DocumentSourceLookUp::kMaxSubPipelineDepth;
-
DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
std::string as,
const boost::intrusive_ptr<ExpressionContext>& expCtx)
@@ -63,13 +61,7 @@ DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
const auto& resolvedNamespace = expCtx->getResolvedNamespace(_fromNs);
_resolvedNs = resolvedNamespace.ns;
_resolvedPipeline = resolvedNamespace.pipeline;
- _fromExpCtx = expCtx->copyWith(_resolvedNs);
-
- _fromExpCtx->subPipelineDepth += 1;
- uassert(ErrorCodes::MaxSubPipelineDepthExceeded,
- str::stream() << "Maximum number of nested $lookup sub-pipelines exceeded. Limit is "
- << kMaxSubPipelineDepth,
- _fromExpCtx->subPipelineDepth <= kMaxSubPipelineDepth);
+ _fromExpCtx = expCtx->copyForSubPipeline(resolvedNamespace.ns);
}
DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
@@ -318,24 +310,20 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline(
// If we don't have a cache, build and return the pipeline immediately.
if (!_cache || _cache->isAbandoned()) {
- MongoProcessInterface::MakePipelineOptions pipelineOpts;
+ MakePipelineOptions pipelineOpts;
pipelineOpts.optimize = true;
pipelineOpts.attachCursorSource = true;
// By default, $lookup doesnt support sharded 'from' collections.
pipelineOpts.allowTargetingShards = internalQueryAllowShardedLookup.load();
- return pExpCtx->mongoProcessInterface->makePipeline(
- _resolvedPipeline, _fromExpCtx, pipelineOpts);
+ return Pipeline::makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts);
}
- // Tailor the pipeline construction for our needs. We want a non-optimized pipeline without a
- // cursor source.
- MongoProcessInterface::MakePipelineOptions pipelineOpts;
+ // Construct the basic pipeline without a cache stage. Avoid optimizing here since we need to
+ // add the cache first, as detailed below.
+ MakePipelineOptions pipelineOpts;
pipelineOpts.optimize = false;
pipelineOpts.attachCursorSource = false;
-
- // Construct the basic pipeline without a cache stage.
- auto pipeline =
- pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts);
+ auto pipeline = Pipeline::makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts);
// Add the cache stage at the end and optimize. During the optimization process, the cache will
// either move itself to the correct position in the pipeline, or will abandon itself if no
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index b437ecce2a2..7138db9748f 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -48,7 +48,6 @@ namespace mongo {
*/
class DocumentSourceLookUp final : public DocumentSource {
public:
- static constexpr size_t kMaxSubPipelineDepth = 20;
static constexpr StringData kStageName = "$lookup"_sd;
struct LetVariable {
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index ae0f329e14f..e6c4e9e93f5 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -220,7 +220,7 @@ TEST_F(DocumentSourceLookUpTest, RejectLookupWhenDepthLimitIsExceeded) {
expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
{fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}});
- expCtx->subPipelineDepth = DocumentSourceLookUp::kMaxSubPipelineDepth;
+ expCtx->subPipelineDepth = ExpressionContext::kMaxSubPipelineViewDepth;
ASSERT_THROWS_CODE(
DocumentSourceLookUp::createFromBson(
@@ -493,23 +493,6 @@ public:
return false;
}
- std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions opts) final {
- auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
-
- if (opts.optimize) {
- pipeline->optimizePipeline();
- }
-
- if (opts.attachCursorSource) {
- pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release(), false);
- }
-
- return pipeline;
- }
-
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
Pipeline* ownedPipeline,
@@ -540,31 +523,31 @@ TEST_F(DocumentSourceLookUpTest, ShouldPropagatePauses) {
expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
{fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}});
- // Set up the $lookup stage.
- auto lookupSpec = Document{{"$lookup",
- Document{{"from", fromNs.coll()},
- {"localField", "foreignId"_sd},
- {"foreignField", "_id"_sd},
- {"as", "foreignDocs"_sd}}}}
- .toBson();
- auto parsed = DocumentSourceLookUp::createFromBson(lookupSpec.firstElement(), expCtx);
- auto lookup = static_cast<DocumentSourceLookUp*>(parsed.get());
-
- // Mock its input, pausing every other result.
+ // Mock the input of a foreign namespace, pausing every other result.
auto mockLocalSource =
DocumentSourceMock::createForTest({Document{{"foreignId", 0}},
DocumentSource::GetNextResult::makePauseExecution(),
Document{{"foreignId", 1}},
DocumentSource::GetNextResult::makePauseExecution()});
- lookup->setSource(mockLocalSource.get());
-
// Mock out the foreign collection.
deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}},
Document{{"_id", 1}}};
expCtx->mongoProcessInterface =
std::make_shared<MockMongoInterface>(std::move(mockForeignContents));
+ // Set up the $lookup stage.
+ auto lookupSpec = Document{{"$lookup",
+ Document{{"from", fromNs.coll()},
+ {"localField", "foreignId"_sd},
+ {"foreignField", "_id"_sd},
+ {"as", "foreignDocs"_sd}}}}
+ .toBson();
+ auto parsed = DocumentSourceLookUp::createFromBson(lookupSpec.firstElement(), expCtx);
+ auto lookup = static_cast<DocumentSourceLookUp*>(parsed.get());
+
+ lookup->setSource(mockLocalSource.get());
+
auto next = lookup->getNext();
ASSERT_TRUE(next.isAdvanced());
ASSERT_DOCUMENT_EQ(
@@ -592,6 +575,12 @@ TEST_F(DocumentSourceLookUpTest, ShouldPropagatePausesWhileUnwinding) {
expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
{fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}});
+ // Mock out the foreign collection.
+ deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}},
+ Document{{"_id", 1}}};
+ expCtx->mongoProcessInterface =
+ std::make_shared<MockMongoInterface>(std::move(mockForeignContents));
+
// Set up the $lookup stage.
auto lookupSpec = Document{{"$lookup",
Document{{"from", fromNs.coll()},
@@ -615,12 +604,6 @@ TEST_F(DocumentSourceLookUpTest, ShouldPropagatePausesWhileUnwinding) {
DocumentSource::GetNextResult::makePauseExecution()});
lookup->setSource(mockLocalSource.get());
- // Mock out the foreign collection.
- deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}},
- Document{{"_id", 1}}};
- expCtx->mongoProcessInterface =
- std::make_shared<MockMongoInterface>(std::move(mockForeignContents));
-
auto next = lookup->getNext();
ASSERT_TRUE(next.isAdvanced());
ASSERT_DOCUMENT_EQ(next.releaseDocument(),
@@ -703,6 +686,9 @@ TEST_F(DocumentSourceLookUpTest, ShouldCacheNonCorrelatedSubPipelinePrefix) {
expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
{fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}});
+ expCtx->mongoProcessInterface =
+ std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{});
+
auto docSource = DocumentSourceLookUp::createFromBson(
fromjson("{$lookup: {let: {var1: '$_id'}, pipeline: [{$match: {x:1}}, {$sort: {x: 1}}, "
"{$addFields: {varField: '$$var1'}}], from: 'coll', as: 'as'}}")
@@ -712,9 +698,6 @@ TEST_F(DocumentSourceLookUpTest, ShouldCacheNonCorrelatedSubPipelinePrefix) {
auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
ASSERT(lookupStage);
- expCtx->mongoProcessInterface =
- std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{});
-
auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5));
ASSERT(subPipeline);
@@ -732,6 +715,9 @@ TEST_F(DocumentSourceLookUpTest,
expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
{fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}});
+ expCtx->mongoProcessInterface =
+ std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{});
+
// In the $facet stage here, the correlated $match stage comes after a $group stage which
// returns EXHAUSTIVE_ALL for its dependencies. Verify that we continue enumerating the $facet
// pipeline's variable dependencies after this point, so that the $facet stage is correctly
@@ -746,9 +732,6 @@ TEST_F(DocumentSourceLookUpTest,
auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
ASSERT(lookupStage);
- expCtx->mongoProcessInterface =
- std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{});
-
auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5));
ASSERT(subPipeline);
@@ -768,6 +751,9 @@ TEST_F(DocumentSourceLookUpTest, ExprEmbeddedInMatchExpressionShouldBeOptimized)
expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
{fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}});
+ expCtx->mongoProcessInterface =
+ std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{});
+
// This pipeline includes a $match stage that itself includes a $expr expression.
auto docSource = DocumentSourceLookUp::createFromBson(
fromjson("{$lookup: {let: {var1: '$_id'}, pipeline: [{$match: {$expr: {$eq: "
@@ -778,9 +764,6 @@ TEST_F(DocumentSourceLookUpTest, ExprEmbeddedInMatchExpressionShouldBeOptimized)
auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
ASSERT(lookupStage);
- expCtx->mongoProcessInterface =
- std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{});
-
auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5));
ASSERT(subPipeline);
@@ -808,6 +791,9 @@ TEST_F(DocumentSourceLookUpTest,
expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
{fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}});
+ expCtx->mongoProcessInterface =
+ std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{});
+
// The $project stage defines a local variable with the same name as the $lookup 'let' variable.
// Verify that the $project is identified as non-correlated and the cache is placed after it.
auto docSource = DocumentSourceLookUp::createFromBson(
@@ -822,9 +808,6 @@ TEST_F(DocumentSourceLookUpTest,
auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
ASSERT(lookupStage);
- expCtx->mongoProcessInterface =
- std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{});
-
auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5));
ASSERT(subPipeline);
@@ -844,6 +827,9 @@ TEST_F(DocumentSourceLookUpTest, ShouldInsertCacheBeforeCorrelatedNestedLookup)
expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
{fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}});
+ expCtx->mongoProcessInterface =
+ std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{});
+
// Create a $lookup stage whose pipeline contains nested $lookups. The third-level $lookup
// refers to a 'let' variable defined in the top-level $lookup. Verify that the second-level
// $lookup is correctly identified as a correlated stage and the cache is placed before it.
@@ -858,9 +844,6 @@ TEST_F(DocumentSourceLookUpTest, ShouldInsertCacheBeforeCorrelatedNestedLookup)
auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
ASSERT(lookupStage);
- expCtx->mongoProcessInterface =
- std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{});
-
auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5));
ASSERT(subPipeline);
@@ -882,6 +865,9 @@ TEST_F(DocumentSourceLookUpTest,
expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
{fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}});
+ expCtx->mongoProcessInterface =
+ std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{});
+
// The nested $lookup stage defines a 'let' variable with the same name as the top-level 'let'.
// Verify the nested $lookup is identified as non-correlated and the cache is placed after it.
auto docSource = DocumentSourceLookUp::createFromBson(
@@ -895,9 +881,6 @@ TEST_F(DocumentSourceLookUpTest,
auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
ASSERT(lookupStage);
- expCtx->mongoProcessInterface =
- std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{});
-
auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5));
ASSERT(subPipeline);
@@ -949,6 +932,10 @@ TEST_F(DocumentSourceLookUpTest,
expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
{fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}});
+ deque<DocumentSource::GetNextResult> mockForeignContents{
+ Document{{"x", 0}}, Document{{"x", 1}}, Document{{"x", 2}}};
+ expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockForeignContents);
+
auto docSource = DocumentSourceLookUp::createFromBson(
fromjson(
"{$lookup: {let: {var1: '$_id'}, pipeline: [{$match: {x: {$gte: 0}}}, {$sort: {x: "
@@ -959,17 +946,12 @@ TEST_F(DocumentSourceLookUpTest,
auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
ASSERT(lookupStage);
- // Prepare the mocked local and foreign sources.
+ // Prepare the mocked local source.
auto mockLocalSource = DocumentSourceMock::createForTest(
{Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}});
lookupStage->setSource(mockLocalSource.get());
- deque<DocumentSource::GetNextResult> mockForeignContents{
- Document{{"x", 0}}, Document{{"x", 1}}, Document{{"x", 2}}};
-
- expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockForeignContents);
-
// Confirm that the empty 'kBuilding' cache is placed just before the correlated $addFields.
auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 0));
ASSERT(subPipeline);
@@ -1024,6 +1006,10 @@ TEST_F(DocumentSourceLookUpTest,
expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
{fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}});
+ deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"x", 0}},
+ Document{{"x", 1}}};
+ expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockForeignContents);
+
// Ensure the cache is abandoned after the first iteration by setting its max size to 0.
size_t maxCacheSizeBytes = 0;
auto docSource = DocumentSourceLookUp::createFromBsonWithCacheSize(
@@ -1043,11 +1029,6 @@ TEST_F(DocumentSourceLookUpTest,
lookupStage->setSource(mockLocalSource.get());
- deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"x", 0}},
- Document{{"x", 1}}};
-
- expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockForeignContents);
-
// Confirm that the empty 'kBuilding' cache is placed just before the correlated $addFields.
auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 0));
ASSERT(subPipeline);
@@ -1091,6 +1072,10 @@ TEST_F(DocumentSourceLookUpTest, ShouldNotCacheIfCorrelatedStageIsAbsorbedIntoPl
expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
{fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}});
+ const bool removeLeadingQueryStages = true;
+ expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(
+ std::deque<DocumentSource::GetNextResult>{}, removeLeadingQueryStages);
+
auto docSource = DocumentSourceLookUp::createFromBson(
fromjson("{$lookup: {let: {var1: '$_id'}, pipeline: [{$match: {$expr: { $gte: ['$x', "
"'$$var1']}}}, {$sort: {x: 1}}, {$addFields: {varField: {$sum: ['$x', "
@@ -1101,11 +1086,6 @@ TEST_F(DocumentSourceLookUpTest, ShouldNotCacheIfCorrelatedStageIsAbsorbedIntoPl
auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get());
ASSERT(lookupStage);
- const bool removeLeadingQueryStages = true;
-
- expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(
- std::deque<DocumentSource::GetNextResult>{}, removeLeadingQueryStages);
-
auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 0));
ASSERT(subPipeline);
diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp
index 088954a6439..6d77dc4ca1e 100644
--- a/src/mongo/db/pipeline/document_source_union_with.cpp
+++ b/src/mongo/db/pipeline/document_source_union_with.cpp
@@ -50,8 +50,11 @@ std::unique_ptr<Pipeline, PipelineDeleter> buildPipelineFromViewDefinition(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
ExpressionContext::ResolvedNamespace resolvedNs,
std::vector<BSONObj> currentPipeline) {
+ // Copy the ExpressionContext of the base aggregation, using the inner namespace instead.
+ auto unionExpCtx = expCtx->copyForSubPipeline(resolvedNs.ns);
+
if (resolvedNs.pipeline.empty()) {
- return uassertStatusOK(Pipeline::parse(currentPipeline, expCtx->copyWith(resolvedNs.ns)));
+ return uassertStatusOK(Pipeline::parse(std::move(currentPipeline), unionExpCtx));
}
auto resolvedPipeline = std::move(resolvedNs.pipeline);
resolvedPipeline.reserve(currentPipeline.size() + resolvedPipeline.size());
@@ -59,8 +62,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> buildPipelineFromViewDefinition(
std::make_move_iterator(currentPipeline.begin()),
std::make_move_iterator(currentPipeline.end()));
- return uassertStatusOK(
- Pipeline::parse(std::move(resolvedPipeline), expCtx->copyWith(resolvedNs.ns)));
+ return uassertStatusOK(Pipeline::parse(std::move(resolvedPipeline), unionExpCtx));
}
} // namespace
diff --git a/src/mongo/db/pipeline/document_source_union_with_test.cpp b/src/mongo/db/pipeline/document_source_union_with_test.cpp
index 90b6b9fefa4..9309b24ab7b 100644
--- a/src/mongo/db/pipeline/document_source_union_with_test.cpp
+++ b/src/mongo/db/pipeline/document_source_union_with_test.cpp
@@ -437,5 +437,22 @@ TEST_F(DocumentSourceUnionWithTest, ConcatenatesViewDefinitionToPipeline) {
ASSERT_TRUE(unionWith->getNext().isEOF());
}
+TEST_F(DocumentSourceUnionWithTest, RejectUnionWhenDepthLimitIsExceeded) {
+ auto expCtx = getExpCtx();
+ NamespaceString fromNs("test", "coll");
+ expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{
+ {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}});
+
+ expCtx->subPipelineDepth = ExpressionContext::kMaxSubPipelineViewDepth;
+
+ ASSERT_THROWS_CODE(
+ DocumentSourceUnionWith::createFromBson(
+ BSON("$unionWith" << BSON("coll" << fromNs.coll() << "pipeline"
+ << BSON_ARRAY(BSON("$match" << BSON("x" << 1)))))
+ .firstElement(),
+ expCtx),
+ AssertionException,
+ ErrorCodes::MaxSubPipelineDepthExceeded);
+}
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index 559e6cd6596..11d0337c482 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -60,6 +60,7 @@ namespace mongo {
class ExpressionContext : public RefCountable {
public:
+ static constexpr size_t kMaxSubPipelineViewDepth = 20;
struct ResolvedNamespace {
ResolvedNamespace() = default;
ResolvedNamespace(NamespaceString ns, std::vector<BSONObj> pipeline);
@@ -209,6 +210,16 @@ public:
boost::optional<UUID> uuid = boost::none,
boost::optional<std::unique_ptr<CollatorInterface>> updatedCollator = boost::none) const;
+ boost::intrusive_ptr<ExpressionContext> copyForSubPipeline(NamespaceString nss) const {
+ uassert(ErrorCodes::MaxSubPipelineDepthExceeded,
+ str::stream() << "Maximum number of nested sub-pipelines exceeded. Limit is "
+ << ExpressionContext::kMaxSubPipelineViewDepth,
+ subPipelineDepth < kMaxSubPipelineViewDepth);
+ auto newCopy = copyWith(std::move(nss));
+ newCopy->subPipelineDepth += 1;
+ return newCopy;
+ }
+
/**
* Returns the ResolvedNamespace corresponding to 'nss'. It is an error to call this method on a
* namespace not involved in the pipeline.
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 63dc6bb89b3..bd8cf53df0c 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -657,4 +657,21 @@ boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithNameAndCriteria(
return popFront();
}
+std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::makePipeline(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions opts) {
+ auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
+
+ if (opts.optimize) {
+ pipeline->optimizePipeline();
+ }
+
+ if (opts.attachCursorSource) {
+ pipeline = expCtx->mongoProcessInterface->attachCursorSourceToPipeline(
+ expCtx, pipeline.release(), opts.allowTargetingShards);
+ }
+
+ return pipeline;
+}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index aa08f8d98b8..c3bad653814 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -63,6 +63,12 @@ class PipelineDeleter;
*/
extern FailPoint disablePipelineOptimization;
+struct MakePipelineOptions {
+ bool optimize = true;
+ bool attachCursorSource = true;
+ bool allowTargetingShards = true;
+};
+
/**
* A Pipeline object represents a list of DocumentSources and is responsible for optimizing the
* pipeline.
@@ -141,6 +147,20 @@ public:
*/
static bool aggHasWriteStage(const BSONObj& cmd);
+ /**
+ * Parses a Pipeline from a vector of BSONObjs representing DocumentSources. The state of the
+ * returned pipeline will depend upon the supplied MakePipelineOptions:
+ * - The boolean opts.optimize determines whether the pipeline will be optimized.
+ * - If opts.attachCursorSource is false, the pipeline will be returned without attempting to
+ * add an initial cursor source.
+ *
+ * This function throws if parsing the pipeline failed.
+ */
+ static std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions opts = MakePipelineOptions{});
+
const boost::intrusive_ptr<ExpressionContext>& getContext() const {
return pCtx;
}
diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
index fd2dac05fc0..b95439e7240 100644
--- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
@@ -277,24 +277,6 @@ BSONObj CommonMongodProcessInterface::getCollectionOptions(OperationContext* opC
return collectionOptions;
}
-std::unique_ptr<Pipeline, PipelineDeleter> CommonMongodProcessInterface::makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions opts) {
- auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
-
- if (opts.optimize) {
- pipeline->optimizePipeline();
- }
-
- if (opts.attachCursorSource) {
- pipeline =
- attachCursorSourceToPipeline(expCtx, pipeline.release(), opts.allowTargetingShards);
- }
-
- return pipeline;
-}
-
std::unique_ptr<Pipeline, PipelineDeleter>
CommonMongodProcessInterface::attachCursorSourceToPipelineForLocalRead(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline) {
@@ -355,11 +337,10 @@ boost::optional<Document> CommonMongodProcessInterface::lookupSingleDocument(
_getCollectionDefaultCollator(expCtx->opCtx, nss.db(), collectionUUID));
// When looking up on a mongoD, we only ever want to read from the local collection. By
// default, makePipeline will attach a cursor source which may read from remote if the
- // collection is sharded, so we manually attach a local-only cursor source here.
+ // collection is sharded, so we configure it to not allow that here.
MakePipelineOptions opts;
- opts.attachCursorSource = false;
- pipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx, opts);
- pipeline = attachCursorSourceToPipelineForLocalRead(foreignExpCtx, pipeline.release());
+ opts.allowTargetingShards = false;
+ pipeline = Pipeline::makePipeline({BSON("$match" << documentKey)}, foreignExpCtx, opts);
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
return boost::none;
}
diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h
index f8058e18d89..898d5380d13 100644
--- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h
@@ -81,10 +81,6 @@ public:
const NamespaceString& nss,
BSONObjBuilder* builder) const final override;
BSONObj getCollectionOptions(OperationContext* opCtx, const NamespaceString& nss) final;
- std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions opts = MakePipelineOptions{}) final;
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipelineForLocalRead(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final;
std::string getShardName(OperationContext* opCtx) const final;
diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
index c4d5267f452..7179a05cb21 100644
--- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
@@ -106,14 +106,6 @@ public:
*/
static std::shared_ptr<MongoProcessInterface> create(OperationContext* opCtx);
- struct MakePipelineOptions {
- MakePipelineOptions(){};
-
- bool optimize = true;
- bool attachCursorSource = true;
- bool allowTargetingShards = true;
- };
-
/**
* This structure holds the result of a batched update operation, such as the number of
* documents that matched the query predicate, and the number of documents modified by the
@@ -250,20 +242,6 @@ public:
virtual void dropCollection(OperationContext* opCtx, const NamespaceString& collection) = 0;
/**
- * Parses a Pipeline from a vector of BSONObjs representing DocumentSources. The state of the
- * returned pipeline will depend upon the supplied MakePipelineOptions:
- * - The boolean opts.optimize determines whether the pipeline will be optimized.
- * - If opts.attachCursorSource is false, the pipeline will be returned without attempting to
- * add an initial cursor source.
- *
- * This function throws if parsing the pipeline failed.
- */
- virtual std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions opts = MakePipelineOptions{}) = 0;
-
- /**
* Accepts a pipeline and returns a new one which will draw input from the underlying
* collection. Performs no further optimization of the pipeline. NamespaceNotFound will be
* thrown if ExpressionContext has a UUID and that UUID doesn't exist anymore. That should be
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
index 18b6e2125ca..6c28dfd47f6 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
@@ -100,25 +100,6 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
} // namespace
-std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions pipelineOptions) {
- // Explain is not supported for auxiliary lookups.
- invariant(!expCtx->explain);
-
- auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
- if (pipelineOptions.optimize) {
- pipeline->optimizePipeline();
- }
- if (pipelineOptions.attachCursorSource) {
- // 'attachCursorSourceToPipeline' handles any complexity related to sharding.
- pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release(), false);
- }
-
- return pipeline;
-}
-
std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::attachCursorSourceToPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
Pipeline* ownedPipeline,
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
index 36abd00c6da..2e505f47ddd 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
@@ -173,11 +173,6 @@ public:
MONGO_UNREACHABLE;
}
- std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions pipelineOptions) final;
-
/**
* The following methods only make sense for data-bearing nodes and should never be called on
* a mongos.
diff --git a/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp
index cba2a97ffcc..782ad6e56ef 100644
--- a/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp
@@ -33,27 +33,11 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/pipeline.h"
#include "mongo/util/assert_util.h"
namespace mongo {
-std::unique_ptr<Pipeline, PipelineDeleter> StubLookupSingleDocumentProcessInterface::makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions opts) {
- auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
-
- if (opts.optimize) {
- pipeline->optimizePipeline();
- }
-
- if (opts.attachCursorSource) {
- pipeline =
- attachCursorSourceToPipeline(expCtx, pipeline.release(), opts.allowTargetingShards);
- }
-
- return pipeline;
-}
std::unique_ptr<Pipeline, PipelineDeleter>
StubLookupSingleDocumentProcessInterface::attachCursorSourceToPipelineForLocalRead(
@@ -85,7 +69,7 @@ boost::optional<Document> StubLookupSingleDocumentProcessInterface::lookupSingle
auto foreignExpCtx = expCtx->copyWith(nss, collectionUUID, boost::none);
std::unique_ptr<Pipeline, PipelineDeleter> pipeline;
try {
- pipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx);
+ pipeline = Pipeline::makePipeline({BSON("$match" << documentKey)}, foreignExpCtx);
} catch (ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
return boost::none;
}
diff --git a/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h
index 421fb759ae2..68f8317ec0a 100644
--- a/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h
@@ -67,11 +67,6 @@ public:
StubLookupSingleDocumentProcessInterface(std::deque<DocumentSource::GetNextResult> mockResults)
: _mockResults(std::move(mockResults)) {}
- std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions opts = MakePipelineOptions{}) final;
-
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
Pipeline* ownedPipeline,
diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
index 69d7b5eec49..d197918719b 100644
--- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
@@ -140,17 +140,10 @@ public:
MONGO_UNREACHABLE;
}
- std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions opts) override {
- MONGO_UNREACHABLE;
- }
-
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
Pipeline* pipeline,
- bool allowTargetingShards = true) override {
+ bool allowTargetingShards) override {
MONGO_UNREACHABLE;
}