diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2020-01-09 12:34:47 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-13 03:13:43 +0000 |
commit | 70258e4babfcfa3725a4bf9cf06e853632917e57 (patch) | |
tree | c077f63382b148844d88805888b2c87c2d2676af /src/mongo/db | |
parent | 66002c604a9a2cd9c419bad318db0252f576dbd8 (diff) | |
download | mongo-70258e4babfcfa3725a4bf9cf06e853632917e57.tar.gz |
SERVER-45465 Add support for storing $unionWith in a view (unsharded)
Diffstat (limited to 'src/mongo/db')
21 files changed, 139 insertions, 239 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 2e951f66970..ac57422ea33 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -302,9 +302,8 @@ StatusWith<StringMap<ExpressionContext::ResolvedNamespace>> resolveInvolvedNames // If 'involvedNs' refers to a view namespace, then we resolve its definition. auto resolvedView = viewCatalog->resolveView(opCtx, involvedNs); if (!resolvedView.isOK()) { - return {ErrorCodes::FailedToParse, - str::stream() << "Failed to resolve view '" << involvedNs.ns() - << "': " << resolvedView.getStatus().toString()}; + return resolvedView.getStatus().withContext( + str::stream() << "Failed to resolve view '" << involvedNs.ns()); } resolvedNamespaces[involvedNs.coll()] = {resolvedView.getValue().getNamespace(), diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp index 4ba697ae599..3305b2b108b 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -327,7 +327,7 @@ void DocumentSourceShardCheckResumability::_assertOplogHasEnoughHistory( // Look up the first document in the oplog and compare it with the resume token's clusterTime. auto firstEntryExpCtx = pExpCtx->copyWith(NamespaceString::kRsOplogNamespace); auto matchSpec = BSON("$match" << BSONObj()); - auto pipeline = pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx); + auto pipeline = Pipeline::makePipeline({matchSpec}, firstEntryExpCtx); if (auto first = pipeline->getNext()) { auto firstOplogEntry = Value(*first); // If the first entry in the oplog is the replset initialization, then it doesn't matter diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index f2319bc17a3..eace0a24f3d 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -601,23 +601,6 @@ public: return false; } - std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions opts) final { - auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); - - if (opts.optimize) { - pipeline->optimizePipeline(); - } - - if (opts.attachCursorSource) { - pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release(), false); - } - - return pipeline; - } - std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline, diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index 4c2eb0d013d..7b76bfc9d26 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -214,13 +214,12 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() { // We've already allocated space for the trailing $match stage in '_fromPipeline'. _fromPipeline.back() = *matchStage; - MongoProcessInterface::MakePipelineOptions pipelineOpts; + MakePipelineOptions pipelineOpts; pipelineOpts.optimize = true; pipelineOpts.attachCursorSource = true; // By default, $graphLookup doesn't support a sharded 'from' collection. pipelineOpts.allowTargetingShards = internalQueryAllowShardedLookup.load(); - auto pipeline = pExpCtx->mongoProcessInterface->makePipeline( - _fromPipeline, _fromExpCtx, pipelineOpts); + auto pipeline = Pipeline::makePipeline(_fromPipeline, _fromExpCtx, pipelineOpts); while (auto next = pipeline->getNext()) { uassert(40271, str::stream() diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp index a3996c47480..084b71ac99b 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp @@ -62,24 +62,6 @@ public: MockMongoInterface(std::deque<DocumentSource::GetNextResult> results) : _results(std::move(results)) {} - std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions opts) final { - auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); - - if (opts.optimize) { - pipeline->optimizePipeline(); - } - - if (opts.attachCursorSource) { - pipeline = attachCursorSourceToPipeline( - expCtx, pipeline.release(), false /* allowTargetingShards */); - } - - return pipeline; - } - std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline, diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 1c5d8c49319..f998c3d66a8 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -50,8 +50,6 @@ namespace mongo { using boost::intrusive_ptr; using std::vector; -constexpr size_t DocumentSourceLookUp::kMaxSubPipelineDepth; - DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs, std::string as, const boost::intrusive_ptr<ExpressionContext>& expCtx) @@ -63,13 +61,7 @@ DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs, const auto& resolvedNamespace = expCtx->getResolvedNamespace(_fromNs); _resolvedNs = resolvedNamespace.ns; _resolvedPipeline = resolvedNamespace.pipeline; - _fromExpCtx = expCtx->copyWith(_resolvedNs); - - _fromExpCtx->subPipelineDepth += 1; - uassert(ErrorCodes::MaxSubPipelineDepthExceeded, - str::stream() << "Maximum number of nested $lookup sub-pipelines exceeded. Limit is " - << kMaxSubPipelineDepth, - _fromExpCtx->subPipelineDepth <= kMaxSubPipelineDepth); + _fromExpCtx = expCtx->copyForSubPipeline(resolvedNamespace.ns); } DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs, @@ -318,24 +310,20 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline( // If we don't have a cache, build and return the pipeline immediately. if (!_cache || _cache->isAbandoned()) { - MongoProcessInterface::MakePipelineOptions pipelineOpts; + MakePipelineOptions pipelineOpts; pipelineOpts.optimize = true; pipelineOpts.attachCursorSource = true; // By default, $lookup doesnt support sharded 'from' collections. pipelineOpts.allowTargetingShards = internalQueryAllowShardedLookup.load(); - return pExpCtx->mongoProcessInterface->makePipeline( - _resolvedPipeline, _fromExpCtx, pipelineOpts); + return Pipeline::makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts); } - // Tailor the pipeline construction for our needs. We want a non-optimized pipeline without a - // cursor source. - MongoProcessInterface::MakePipelineOptions pipelineOpts; + // Construct the basic pipeline without a cache stage. Avoid optimizing here since we need to + // add the cache first, as detailed below. + MakePipelineOptions pipelineOpts; pipelineOpts.optimize = false; pipelineOpts.attachCursorSource = false; - - // Construct the basic pipeline without a cache stage. - auto pipeline = - pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts); + auto pipeline = Pipeline::makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts); // Add the cache stage at the end and optimize. During the optimization process, the cache will // either move itself to the correct position in the pipeline, or will abandon itself if no diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index b437ecce2a2..7138db9748f 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -48,7 +48,6 @@ namespace mongo { */ class DocumentSourceLookUp final : public DocumentSource { public: - static constexpr size_t kMaxSubPipelineDepth = 20; static constexpr StringData kStageName = "$lookup"_sd; struct LetVariable { diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index ae0f329e14f..e6c4e9e93f5 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -220,7 +220,7 @@ TEST_F(DocumentSourceLookUpTest, RejectLookupWhenDepthLimitIsExceeded) { expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}}); - expCtx->subPipelineDepth = DocumentSourceLookUp::kMaxSubPipelineDepth; + expCtx->subPipelineDepth = ExpressionContext::kMaxSubPipelineViewDepth; ASSERT_THROWS_CODE( DocumentSourceLookUp::createFromBson( @@ -493,23 +493,6 @@ public: return false; } - std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions opts) final { - auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); - - if (opts.optimize) { - pipeline->optimizePipeline(); - } - - if (opts.attachCursorSource) { - pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release(), false); - } - - return pipeline; - } - std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline, @@ -540,31 +523,31 @@ TEST_F(DocumentSourceLookUpTest, ShouldPropagatePauses) { expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}}); - // Set up the $lookup stage. - auto lookupSpec = Document{{"$lookup", - Document{{"from", fromNs.coll()}, - {"localField", "foreignId"_sd}, - {"foreignField", "_id"_sd}, - {"as", "foreignDocs"_sd}}}} - .toBson(); - auto parsed = DocumentSourceLookUp::createFromBson(lookupSpec.firstElement(), expCtx); - auto lookup = static_cast<DocumentSourceLookUp*>(parsed.get()); - - // Mock its input, pausing every other result. + // Mock the input of a foreign namespace, pausing every other result. auto mockLocalSource = DocumentSourceMock::createForTest({Document{{"foreignId", 0}}, DocumentSource::GetNextResult::makePauseExecution(), Document{{"foreignId", 1}}, DocumentSource::GetNextResult::makePauseExecution()}); - lookup->setSource(mockLocalSource.get()); - // Mock out the foreign collection. deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}}, Document{{"_id", 1}}}; expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(std::move(mockForeignContents)); + // Set up the $lookup stage. + auto lookupSpec = Document{{"$lookup", + Document{{"from", fromNs.coll()}, + {"localField", "foreignId"_sd}, + {"foreignField", "_id"_sd}, + {"as", "foreignDocs"_sd}}}} + .toBson(); + auto parsed = DocumentSourceLookUp::createFromBson(lookupSpec.firstElement(), expCtx); + auto lookup = static_cast<DocumentSourceLookUp*>(parsed.get()); + + lookup->setSource(mockLocalSource.get()); + auto next = lookup->getNext(); ASSERT_TRUE(next.isAdvanced()); ASSERT_DOCUMENT_EQ( @@ -592,6 +575,12 @@ TEST_F(DocumentSourceLookUpTest, ShouldPropagatePausesWhileUnwinding) { expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}}); + // Mock out the foreign collection. + deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}}, + Document{{"_id", 1}}}; + expCtx->mongoProcessInterface = + std::make_shared<MockMongoInterface>(std::move(mockForeignContents)); + // Set up the $lookup stage. auto lookupSpec = Document{{"$lookup", Document{{"from", fromNs.coll()}, @@ -615,12 +604,6 @@ TEST_F(DocumentSourceLookUpTest, ShouldPropagatePausesWhileUnwinding) { DocumentSource::GetNextResult::makePauseExecution()}); lookup->setSource(mockLocalSource.get()); - // Mock out the foreign collection. - deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}}, - Document{{"_id", 1}}}; - expCtx->mongoProcessInterface = - std::make_shared<MockMongoInterface>(std::move(mockForeignContents)); - auto next = lookup->getNext(); ASSERT_TRUE(next.isAdvanced()); ASSERT_DOCUMENT_EQ(next.releaseDocument(), @@ -703,6 +686,9 @@ TEST_F(DocumentSourceLookUpTest, ShouldCacheNonCorrelatedSubPipelinePrefix) { expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}}); + expCtx->mongoProcessInterface = + std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{}); + auto docSource = DocumentSourceLookUp::createFromBson( fromjson("{$lookup: {let: {var1: '$_id'}, pipeline: [{$match: {x:1}}, {$sort: {x: 1}}, " "{$addFields: {varField: '$$var1'}}], from: 'coll', as: 'as'}}") @@ -712,9 +698,6 @@ TEST_F(DocumentSourceLookUpTest, ShouldCacheNonCorrelatedSubPipelinePrefix) { auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); ASSERT(lookupStage); - expCtx->mongoProcessInterface = - std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{}); - auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); ASSERT(subPipeline); @@ -732,6 +715,9 @@ TEST_F(DocumentSourceLookUpTest, expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}}); + expCtx->mongoProcessInterface = + std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{}); + // In the $facet stage here, the correlated $match stage comes after a $group stage which // returns EXHAUSTIVE_ALL for its dependencies. Verify that we continue enumerating the $facet // pipeline's variable dependencies after this point, so that the $facet stage is correctly @@ -746,9 +732,6 @@ TEST_F(DocumentSourceLookUpTest, auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); ASSERT(lookupStage); - expCtx->mongoProcessInterface = - std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{}); - auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); ASSERT(subPipeline); @@ -768,6 +751,9 @@ TEST_F(DocumentSourceLookUpTest, ExprEmbeddedInMatchExpressionShouldBeOptimized) expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}}); + expCtx->mongoProcessInterface = + std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{}); + // This pipeline includes a $match stage that itself includes a $expr expression. auto docSource = DocumentSourceLookUp::createFromBson( fromjson("{$lookup: {let: {var1: '$_id'}, pipeline: [{$match: {$expr: {$eq: " @@ -778,9 +764,6 @@ TEST_F(DocumentSourceLookUpTest, ExprEmbeddedInMatchExpressionShouldBeOptimized) auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); ASSERT(lookupStage); - expCtx->mongoProcessInterface = - std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{}); - auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); ASSERT(subPipeline); @@ -808,6 +791,9 @@ TEST_F(DocumentSourceLookUpTest, expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}}); + expCtx->mongoProcessInterface = + std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{}); + // The $project stage defines a local variable with the same name as the $lookup 'let' variable. // Verify that the $project is identified as non-correlated and the cache is placed after it. auto docSource = DocumentSourceLookUp::createFromBson( @@ -822,9 +808,6 @@ TEST_F(DocumentSourceLookUpTest, auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); ASSERT(lookupStage); - expCtx->mongoProcessInterface = - std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{}); - auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); ASSERT(subPipeline); @@ -844,6 +827,9 @@ TEST_F(DocumentSourceLookUpTest, ShouldInsertCacheBeforeCorrelatedNestedLookup) expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}}); + expCtx->mongoProcessInterface = + std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{}); + // Create a $lookup stage whose pipeline contains nested $lookups. The third-level $lookup // refers to a 'let' variable defined in the top-level $lookup. Verify that the second-level // $lookup is correctly identified as a correlated stage and the cache is placed before it. @@ -858,9 +844,6 @@ TEST_F(DocumentSourceLookUpTest, ShouldInsertCacheBeforeCorrelatedNestedLookup) auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); ASSERT(lookupStage); - expCtx->mongoProcessInterface = - std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{}); - auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); ASSERT(subPipeline); @@ -882,6 +865,9 @@ TEST_F(DocumentSourceLookUpTest, expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}}); + expCtx->mongoProcessInterface = + std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{}); + // The nested $lookup stage defines a 'let' variable with the same name as the top-level 'let'. // Verify the nested $lookup is identified as non-correlated and the cache is placed after it. auto docSource = DocumentSourceLookUp::createFromBson( @@ -895,9 +881,6 @@ TEST_F(DocumentSourceLookUpTest, auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); ASSERT(lookupStage); - expCtx->mongoProcessInterface = - std::make_shared<MockMongoInterface>(std::deque<DocumentSource::GetNextResult>{}); - auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 5)); ASSERT(subPipeline); @@ -949,6 +932,10 @@ TEST_F(DocumentSourceLookUpTest, expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}}); + deque<DocumentSource::GetNextResult> mockForeignContents{ + Document{{"x", 0}}, Document{{"x", 1}}, Document{{"x", 2}}}; + expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockForeignContents); + auto docSource = DocumentSourceLookUp::createFromBson( fromjson( "{$lookup: {let: {var1: '$_id'}, pipeline: [{$match: {x: {$gte: 0}}}, {$sort: {x: " @@ -959,17 +946,12 @@ TEST_F(DocumentSourceLookUpTest, auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); ASSERT(lookupStage); - // Prepare the mocked local and foreign sources. + // Prepare the mocked local source. auto mockLocalSource = DocumentSourceMock::createForTest( {Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}}); lookupStage->setSource(mockLocalSource.get()); - deque<DocumentSource::GetNextResult> mockForeignContents{ - Document{{"x", 0}}, Document{{"x", 1}}, Document{{"x", 2}}}; - - expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockForeignContents); - // Confirm that the empty 'kBuilding' cache is placed just before the correlated $addFields. auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 0)); ASSERT(subPipeline); @@ -1024,6 +1006,10 @@ TEST_F(DocumentSourceLookUpTest, expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}}); + deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"x", 0}}, + Document{{"x", 1}}}; + expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockForeignContents); + // Ensure the cache is abandoned after the first iteration by setting its max size to 0. size_t maxCacheSizeBytes = 0; auto docSource = DocumentSourceLookUp::createFromBsonWithCacheSize( @@ -1043,11 +1029,6 @@ TEST_F(DocumentSourceLookUpTest, lookupStage->setSource(mockLocalSource.get()); - deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"x", 0}}, - Document{{"x", 1}}}; - - expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockForeignContents); - // Confirm that the empty 'kBuilding' cache is placed just before the correlated $addFields. auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 0)); ASSERT(subPipeline); @@ -1091,6 +1072,10 @@ TEST_F(DocumentSourceLookUpTest, ShouldNotCacheIfCorrelatedStageIsAbsorbedIntoPl expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}}); + const bool removeLeadingQueryStages = true; + expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>( + std::deque<DocumentSource::GetNextResult>{}, removeLeadingQueryStages); + auto docSource = DocumentSourceLookUp::createFromBson( fromjson("{$lookup: {let: {var1: '$_id'}, pipeline: [{$match: {$expr: { $gte: ['$x', " "'$$var1']}}}, {$sort: {x: 1}}, {$addFields: {varField: {$sum: ['$x', " @@ -1101,11 +1086,6 @@ TEST_F(DocumentSourceLookUpTest, ShouldNotCacheIfCorrelatedStageIsAbsorbedIntoPl auto lookupStage = static_cast<DocumentSourceLookUp*>(docSource.get()); ASSERT(lookupStage); - const bool removeLeadingQueryStages = true; - - expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>( - std::deque<DocumentSource::GetNextResult>{}, removeLeadingQueryStages); - auto subPipeline = lookupStage->getSubPipeline_forTest(DOC("_id" << 0)); ASSERT(subPipeline); diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp index 088954a6439..6d77dc4ca1e 100644 --- a/src/mongo/db/pipeline/document_source_union_with.cpp +++ b/src/mongo/db/pipeline/document_source_union_with.cpp @@ -50,8 +50,11 @@ std::unique_ptr<Pipeline, PipelineDeleter> buildPipelineFromViewDefinition( const boost::intrusive_ptr<ExpressionContext>& expCtx, ExpressionContext::ResolvedNamespace resolvedNs, std::vector<BSONObj> currentPipeline) { + // Copy the ExpressionContext of the base aggregation, using the inner namespace instead. + auto unionExpCtx = expCtx->copyForSubPipeline(resolvedNs.ns); + if (resolvedNs.pipeline.empty()) { - return uassertStatusOK(Pipeline::parse(currentPipeline, expCtx->copyWith(resolvedNs.ns))); + return uassertStatusOK(Pipeline::parse(std::move(currentPipeline), unionExpCtx)); } auto resolvedPipeline = std::move(resolvedNs.pipeline); resolvedPipeline.reserve(currentPipeline.size() + resolvedPipeline.size()); @@ -59,8 +62,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> buildPipelineFromViewDefinition( std::make_move_iterator(currentPipeline.begin()), std::make_move_iterator(currentPipeline.end())); - return uassertStatusOK( - Pipeline::parse(std::move(resolvedPipeline), expCtx->copyWith(resolvedNs.ns))); + return uassertStatusOK(Pipeline::parse(std::move(resolvedPipeline), unionExpCtx)); } } // namespace diff --git a/src/mongo/db/pipeline/document_source_union_with_test.cpp b/src/mongo/db/pipeline/document_source_union_with_test.cpp index 90b6b9fefa4..9309b24ab7b 100644 --- a/src/mongo/db/pipeline/document_source_union_with_test.cpp +++ b/src/mongo/db/pipeline/document_source_union_with_test.cpp @@ -437,5 +437,22 @@ TEST_F(DocumentSourceUnionWithTest, ConcatenatesViewDefinitionToPipeline) { ASSERT_TRUE(unionWith->getNext().isEOF()); } +TEST_F(DocumentSourceUnionWithTest, RejectUnionWhenDepthLimitIsExceeded) { + auto expCtx = getExpCtx(); + NamespaceString fromNs("test", "coll"); + expCtx->setResolvedNamespaces(StringMap<ExpressionContext::ResolvedNamespace>{ + {fromNs.coll().toString(), {fromNs, std::vector<BSONObj>()}}}); + + expCtx->subPipelineDepth = ExpressionContext::kMaxSubPipelineViewDepth; + + ASSERT_THROWS_CODE( + DocumentSourceUnionWith::createFromBson( + BSON("$unionWith" << BSON("coll" << fromNs.coll() << "pipeline" + << BSON_ARRAY(BSON("$match" << BSON("x" << 1))))) + .firstElement(), + expCtx), + AssertionException, + ErrorCodes::MaxSubPipelineDepthExceeded); +} } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index 559e6cd6596..11d0337c482 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -60,6 +60,7 @@ namespace mongo { class ExpressionContext : public RefCountable { public: + static constexpr size_t kMaxSubPipelineViewDepth = 20; struct ResolvedNamespace { ResolvedNamespace() = default; ResolvedNamespace(NamespaceString ns, std::vector<BSONObj> pipeline); @@ -209,6 +210,16 @@ public: boost::optional<UUID> uuid = boost::none, boost::optional<std::unique_ptr<CollatorInterface>> updatedCollator = boost::none) const; + boost::intrusive_ptr<ExpressionContext> copyForSubPipeline(NamespaceString nss) const { + uassert(ErrorCodes::MaxSubPipelineDepthExceeded, + str::stream() << "Maximum number of nested sub-pipelines exceeded. Limit is " + << ExpressionContext::kMaxSubPipelineViewDepth, + subPipelineDepth < kMaxSubPipelineViewDepth); + auto newCopy = copyWith(std::move(nss)); + newCopy->subPipelineDepth += 1; + return newCopy; + } + /** * Returns the ResolvedNamespace corresponding to 'nss'. It is an error to call this method on a * namespace not involved in the pipeline. diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 63dc6bb89b3..bd8cf53df0c 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -657,4 +657,21 @@ boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithNameAndCriteria( return popFront(); } +std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions opts) { + auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); + + if (opts.optimize) { + pipeline->optimizePipeline(); + } + + if (opts.attachCursorSource) { + pipeline = expCtx->mongoProcessInterface->attachCursorSourceToPipeline( + expCtx, pipeline.release(), opts.allowTargetingShards); + } + + return pipeline; +} } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index aa08f8d98b8..c3bad653814 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -63,6 +63,12 @@ class PipelineDeleter; */ extern FailPoint disablePipelineOptimization; +struct MakePipelineOptions { + bool optimize = true; + bool attachCursorSource = true; + bool allowTargetingShards = true; +}; + /** * A Pipeline object represents a list of DocumentSources and is responsible for optimizing the * pipeline. @@ -141,6 +147,20 @@ public: */ static bool aggHasWriteStage(const BSONObj& cmd); + /** + * Parses a Pipeline from a vector of BSONObjs representing DocumentSources. The state of the + * returned pipeline will depend upon the supplied MakePipelineOptions: + * - The boolean opts.optimize determines whether the pipeline will be optimized. + * - If opts.attachCursorSource is false, the pipeline will be returned without attempting to + * add an initial cursor source. + * + * This function throws if parsing the pipeline failed. + */ + static std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions opts = MakePipelineOptions{}); + const boost::intrusive_ptr<ExpressionContext>& getContext() const { return pCtx; } diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp index fd2dac05fc0..b95439e7240 100644 --- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp @@ -277,24 +277,6 @@ BSONObj CommonMongodProcessInterface::getCollectionOptions(OperationContext* opC return collectionOptions; } -std::unique_ptr<Pipeline, PipelineDeleter> CommonMongodProcessInterface::makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions opts) { - auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); - - if (opts.optimize) { - pipeline->optimizePipeline(); - } - - if (opts.attachCursorSource) { - pipeline = - attachCursorSourceToPipeline(expCtx, pipeline.release(), opts.allowTargetingShards); - } - - return pipeline; -} - std::unique_ptr<Pipeline, PipelineDeleter> CommonMongodProcessInterface::attachCursorSourceToPipelineForLocalRead( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline) { @@ -355,11 +337,10 @@ boost::optional<Document> CommonMongodProcessInterface::lookupSingleDocument( _getCollectionDefaultCollator(expCtx->opCtx, nss.db(), collectionUUID)); // When looking up on a mongoD, we only ever want to read from the local collection. By // default, makePipeline will attach a cursor source which may read from remote if the - // collection is sharded, so we manually attach a local-only cursor source here. + // collection is sharded, so we configure it to not allow that here. MakePipelineOptions opts; - opts.attachCursorSource = false; - pipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx, opts); - pipeline = attachCursorSourceToPipelineForLocalRead(foreignExpCtx, pipeline.release()); + opts.allowTargetingShards = false; + pipeline = Pipeline::makePipeline({BSON("$match" << documentKey)}, foreignExpCtx, opts); } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) { return boost::none; } diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h index f8058e18d89..898d5380d13 100644 --- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h @@ -81,10 +81,6 @@ public: const NamespaceString& nss, BSONObjBuilder* builder) const final override; BSONObj getCollectionOptions(OperationContext* opCtx, const NamespaceString& nss) final; - std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions opts = MakePipelineOptions{}) final; std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipelineForLocalRead( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final; std::string getShardName(OperationContext* opCtx) const final; diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h index c4d5267f452..7179a05cb21 100644 --- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h @@ -106,14 +106,6 @@ public: */ static std::shared_ptr<MongoProcessInterface> create(OperationContext* opCtx); - struct MakePipelineOptions { - MakePipelineOptions(){}; - - bool optimize = true; - bool attachCursorSource = true; - bool allowTargetingShards = true; - }; - /** * This structure holds the result of a batched update operation, such as the number of * documents that matched the query predicate, and the number of documents modified by the @@ -250,20 +242,6 @@ public: virtual void dropCollection(OperationContext* opCtx, const NamespaceString& collection) = 0; /** - * Parses a Pipeline from a vector of BSONObjs representing DocumentSources. The state of the - * returned pipeline will depend upon the supplied MakePipelineOptions: - * - The boolean opts.optimize determines whether the pipeline will be optimized. - * - If opts.attachCursorSource is false, the pipeline will be returned without attempting to - * add an initial cursor source. - * - * This function throws if parsing the pipeline failed. - */ - virtual std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions opts = MakePipelineOptions{}) = 0; - - /** * Accepts a pipeline and returns a new one which will draw input from the underlying * collection. Performs no further optimization of the pipeline. NamespaceNotFound will be * thrown if ExpressionContext has a UUID and that UUID doesn't exist anymore. That should be diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp index 18b6e2125ca..6c28dfd47f6 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp @@ -100,25 +100,6 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, } // namespace -std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions pipelineOptions) { - // Explain is not supported for auxiliary lookups. - invariant(!expCtx->explain); - - auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); - if (pipelineOptions.optimize) { - pipeline->optimizePipeline(); - } - if (pipelineOptions.attachCursorSource) { - // 'attachCursorSourceToPipeline' handles any complexity related to sharding. - pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release(), false); - } - - return pipeline; -} - std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::attachCursorSourceToPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline, diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h index 36abd00c6da..2e505f47ddd 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h @@ -173,11 +173,6 @@ public: MONGO_UNREACHABLE; } - std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions pipelineOptions) final; - /** * The following methods only make sense for data-bearing nodes and should never be called on * a mongos. diff --git a/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp index cba2a97ffcc..782ad6e56ef 100644 --- a/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp @@ -33,27 +33,11 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/pipeline.h" #include "mongo/util/assert_util.h" namespace mongo { -std::unique_ptr<Pipeline, PipelineDeleter> StubLookupSingleDocumentProcessInterface::makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions opts) { - auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); - - if (opts.optimize) { - pipeline->optimizePipeline(); - } - - if (opts.attachCursorSource) { - pipeline = - attachCursorSourceToPipeline(expCtx, pipeline.release(), opts.allowTargetingShards); - } - - return pipeline; -} std::unique_ptr<Pipeline, PipelineDeleter> StubLookupSingleDocumentProcessInterface::attachCursorSourceToPipelineForLocalRead( @@ -85,7 +69,7 @@ boost::optional<Document> StubLookupSingleDocumentProcessInterface::lookupSingle auto foreignExpCtx = expCtx->copyWith(nss, collectionUUID, boost::none); std::unique_ptr<Pipeline, PipelineDeleter> pipeline; try { - pipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx); + pipeline = Pipeline::makePipeline({BSON("$match" << documentKey)}, foreignExpCtx); } catch (ExceptionFor<ErrorCodes::NamespaceNotFound>&) { return boost::none; } diff --git a/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h index 421fb759ae2..68f8317ec0a 100644 --- a/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h @@ -67,11 +67,6 @@ public: StubLookupSingleDocumentProcessInterface(std::deque<DocumentSource::GetNextResult> mockResults) : _mockResults(std::move(mockResults)) {} - std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions opts = MakePipelineOptions{}) final; - std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline, diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h index 69d7b5eec49..d197918719b 100644 --- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h @@ -140,17 +140,10 @@ public: MONGO_UNREACHABLE; } - std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions opts) override { - MONGO_UNREACHABLE; - } - std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline, - bool allowTargetingShards = true) override { + bool allowTargetingShards) override { MONGO_UNREACHABLE; } |