diff options
author | Nicholas Zolnierz <nicholas.zolnierz@mongodb.com> | 2020-02-19 22:58:38 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2020-02-19 22:58:38 +0000 |
commit | b06b7d7dc5badc18c2977ee22ecb8ad339f5f27a (patch) | |
tree | 63fc225aee7ed58fd3bd59f310acb181a2d04b67 /src/mongo | |
parent | c54a777a4a154984f5595b11993d7d009350a38c (diff) | |
download | mongo-b06b7d7dc5badc18c2977ee22ecb8ad339f5f27a.tar.gz |
SERVER-46015 Cleanup Pipeline parsing for aggregation stages with child pipelines
Diffstat (limited to 'src/mongo')
27 files changed, 505 insertions, 633 deletions
diff --git a/src/mongo/db/commands/mr_common.cpp b/src/mongo/db/commands/mr_common.cpp index 098c702f408..4943ef16307 100644 --- a/src/mongo/db/commands/mr_common.cpp +++ b/src/mongo/db/commands/mr_common.cpp @@ -397,7 +397,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR( } try { - auto pipeline = uassertStatusOK(Pipeline::create( + auto pipeline = Pipeline::create( makeFlattenedList<boost::intrusive_ptr<DocumentSource>>( parsedMr.getQuery().map( [&](auto&& query) { return DocumentSourceMatch::create(query, expCtx); }), @@ -415,7 +415,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR( std::move(targetCollectionVersion), parsedMr.getReduce().getCode(), parsedMr.getFinalize())), - expCtx)); + expCtx); pipeline->optimizePipeline(); return pipeline; } catch (DBException& ex) { diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 369106d769b..8a306ca2014 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -441,7 +441,7 @@ std::vector<std::unique_ptr<Pipeline, PipelineDeleter>> createExchangePipelinesI // DocumentSourceExchange. boost::intrusive_ptr<DocumentSource> consumer = new DocumentSourceExchange( expCtx, exchange, idx, expCtx->mongoProcessInterface->getResourceYielder()); - pipelines.emplace_back(uassertStatusOK(Pipeline::create({consumer}, expCtx))); + pipelines.emplace_back(Pipeline::create({consumer}, expCtx)); } } else { pipelines.emplace_back(std::move(pipeline)); @@ -638,7 +638,7 @@ Status runAggregate(OperationContext* opCtx, // Use default value for the ExpressionContext's 'sortKeyFormat' member variable. } - auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx)); + auto pipeline = Pipeline::parse(request.getPipeline(), expCtx); // Check that the view's collation matches the collation of any views involved in the // pipeline. diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index ae5b676769a..b699c1e3ae7 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -1489,8 +1489,8 @@ TEST_F(ChangeStreamStageTest, TransformationShouldBeAbleToReParseSerializedStage // equivalent to the original serialization. // auto serializedBson = serializedDoc.toBson(); - auto roundTripped = uassertStatusOK(Pipeline::create( - DSChangeStream::createFromBson(serializedBson.firstElement(), expCtx), expCtx)); + auto roundTripped = Pipeline::create( + DSChangeStream::createFromBson(serializedBson.firstElement(), expCtx), expCtx); auto newSerialization = roundTripped->serialize(); diff --git a/src/mongo/db/pipeline/document_source_exchange_test.cpp b/src/mongo/db/pipeline/document_source_exchange_test.cpp index 539cbc5cef9..8639cfd45a6 100644 --- a/src/mongo/db/pipeline/document_source_exchange_test.cpp +++ b/src/mongo/db/pipeline/document_source_exchange_test.cpp @@ -172,8 +172,7 @@ TEST_F(DocumentSourceExchangeTest, SimpleExchange1Consumer) { spec.setConsumers(1); spec.setBufferSize(1024); - boost::intrusive_ptr<Exchange> ex = - new Exchange(spec, unittest::assertGet(Pipeline::create({source}, getExpCtx()))); + boost::intrusive_ptr<Exchange> ex = new Exchange(spec, Pipeline::create({source}, getExpCtx())); auto input = ex->getNext(getExpCtx()->opCtx, 0, nullptr); @@ -198,8 +197,7 @@ TEST_F(DocumentSourceExchangeTest, SimpleExchangeNConsumer) { spec.setConsumers(nConsumers); spec.setBufferSize(1024); - boost::intrusive_ptr<Exchange> ex = - new Exchange(spec, unittest::assertGet(Pipeline::create({source}, getExpCtx()))); + boost::intrusive_ptr<Exchange> ex = new Exchange(spec, Pipeline::create({source}, getExpCtx())); std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex); std::vector<executor::TaskExecutor::CallbackHandle> handles; @@ -241,8 +239,7 @@ TEST_F(DocumentSourceExchangeTest, ExchangeNConsumerEarlyout) { spec.setConsumers(nConsumers); spec.setBufferSize(1024); - boost::intrusive_ptr<Exchange> ex = - new Exchange(spec, unittest::assertGet(Pipeline::create({source}, getExpCtx()))); + boost::intrusive_ptr<Exchange> ex = new Exchange(spec, Pipeline::create({source}, getExpCtx())); std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex); std::vector<executor::TaskExecutor::CallbackHandle> handles; @@ -291,8 +288,7 @@ TEST_F(DocumentSourceExchangeTest, BroadcastExchangeNConsumer) { spec.setConsumers(nConsumers); spec.setBufferSize(1024); - boost::intrusive_ptr<Exchange> ex = - new Exchange(spec, unittest::assertGet(Pipeline::create({source}, getExpCtx()))); + boost::intrusive_ptr<Exchange> ex = new Exchange(spec, Pipeline::create({source}, getExpCtx())); std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex); std::vector<executor::TaskExecutor::CallbackHandle> handles; @@ -339,7 +335,7 @@ TEST_F(DocumentSourceExchangeTest, RangeExchangeNConsumer) { spec.setBufferSize(1024); boost::intrusive_ptr<Exchange> ex = - new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx()))); + new Exchange(std::move(spec), Pipeline::create({source}, getExpCtx())); std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex); std::vector<executor::TaskExecutor::CallbackHandle> handles; @@ -401,7 +397,7 @@ TEST_F(DocumentSourceExchangeTest, RangeShardingExchangeNConsumer) { spec.setBufferSize(1024); boost::intrusive_ptr<Exchange> ex = - new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx()))); + new Exchange(std::move(spec), Pipeline::create({source}, getExpCtx())); std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex); std::vector<executor::TaskExecutor::CallbackHandle> handles; @@ -454,7 +450,7 @@ TEST_F(DocumentSourceExchangeTest, RangeRandomExchangeNConsumer) { spec.setBufferSize(1024); boost::intrusive_ptr<Exchange> ex = - new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx()))); + new Exchange(std::move(spec), Pipeline::create({source}, getExpCtx())); std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex); std::vector<executor::TaskExecutor::CallbackHandle> handles; @@ -522,7 +518,7 @@ TEST_F(DocumentSourceExchangeTest, RandomExchangeNConsumerResourceYielding) { auto artificalGlobalMutex = MONGO_MAKE_LATCH(); boost::intrusive_ptr<Exchange> ex = - new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx()))); + new Exchange(std::move(spec), Pipeline::create({source}, getExpCtx())); std::vector<ThreadInfo> threads; for (size_t idx = 0; idx < nConsumers; ++idx) { @@ -601,7 +597,7 @@ TEST_F(DocumentSourceExchangeTest, RangeRandomHashExchangeNConsumer) { spec.setBufferSize(1024); boost::intrusive_ptr<Exchange> ex = - new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx()))); + new Exchange(std::move(spec), Pipeline::create({source}, getExpCtx())); std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex); std::vector<executor::TaskExecutor::CallbackHandle> handles; @@ -641,9 +637,7 @@ TEST_F(DocumentSourceExchangeTest, RejectNoConsumers) { << "broadcast" << "consumers" << 0); ASSERT_THROWS_CODE( - Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))), - AssertionException, - 50901); + Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 50901); } TEST_F(DocumentSourceExchangeTest, RejectInvalidKey) { @@ -651,9 +645,7 @@ TEST_F(DocumentSourceExchangeTest, RejectInvalidKey) { << "broadcast" << "consumers" << 1 << "key" << BSON("a" << 2)); ASSERT_THROWS_CODE( - Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))), - AssertionException, - 50896); + Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 50896); } TEST_F(DocumentSourceExchangeTest, RejectInvalidKeyHashExpected) { @@ -663,9 +655,7 @@ TEST_F(DocumentSourceExchangeTest, RejectInvalidKeyHashExpected) { << BSON("a" << "nothash")); ASSERT_THROWS_CODE( - Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))), - AssertionException, - 50895); + Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 50895); } TEST_F(DocumentSourceExchangeTest, RejectInvalidKeyWrongType) { @@ -673,9 +663,7 @@ TEST_F(DocumentSourceExchangeTest, RejectInvalidKeyWrongType) { << "broadcast" << "consumers" << 1 << "key" << BSON("a" << true)); ASSERT_THROWS_CODE( - Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))), - AssertionException, - 50897); + Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 50897); } TEST_F(DocumentSourceExchangeTest, RejectInvalidKeyEmpty) { @@ -683,9 +671,7 @@ TEST_F(DocumentSourceExchangeTest, RejectInvalidKeyEmpty) { << "broadcast" << "consumers" << 1 << "key" << BSON("" << 1)); ASSERT_THROWS_CODE( - Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))), - AssertionException, - 40352); + Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 40352); } TEST_F(DocumentSourceExchangeTest, RejectInvalidBoundaries) { @@ -695,9 +681,7 @@ TEST_F(DocumentSourceExchangeTest, RejectInvalidBoundaries) { << BSON_ARRAY(BSON("a" << MAXKEY) << BSON("a" << MINKEY)) << "consumerIds" << BSON_ARRAY(0)); ASSERT_THROWS_CODE( - Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))), - AssertionException, - 50893); + Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 50893); } TEST_F(DocumentSourceExchangeTest, RejectInvalidBoundariesMissingMin) { @@ -707,9 +691,7 @@ TEST_F(DocumentSourceExchangeTest, RejectInvalidBoundariesMissingMin) { << BSON_ARRAY(BSON("a" << 0) << BSON("a" << MAXKEY)) << "consumerIds" << BSON_ARRAY(0)); ASSERT_THROWS_CODE( - Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))), - AssertionException, - 50958); + Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 50958); } TEST_F(DocumentSourceExchangeTest, RejectInvalidBoundariesMissingMax) { @@ -719,9 +701,7 @@ TEST_F(DocumentSourceExchangeTest, RejectInvalidBoundariesMissingMax) { << BSON_ARRAY(BSON("a" << MINKEY) << BSON("a" << 0)) << "consumerIds" << BSON_ARRAY(0)); ASSERT_THROWS_CODE( - Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))), - AssertionException, - 50959); + Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 50959); } TEST_F(DocumentSourceExchangeTest, RejectInvalidBoundariesAndConsumerIds) { @@ -731,9 +711,7 @@ TEST_F(DocumentSourceExchangeTest, RejectInvalidBoundariesAndConsumerIds) { << BSON_ARRAY(BSON("a" << MINKEY) << BSON("a" << MAXKEY)) << "consumerIds" << BSON_ARRAY(0 << 1)); ASSERT_THROWS_CODE( - Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))), - AssertionException, - 50900); + Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 50900); } TEST_F(DocumentSourceExchangeTest, RejectInvalidPolicyBoundaries) { @@ -743,9 +721,7 @@ TEST_F(DocumentSourceExchangeTest, RejectInvalidPolicyBoundaries) { << BSON_ARRAY(BSON("a" << MINKEY) << BSON("a" << MAXKEY)) << "consumerIds" << BSON_ARRAY(0)); ASSERT_THROWS_CODE( - Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))), - AssertionException, - 50899); + Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 50899); } TEST_F(DocumentSourceExchangeTest, RejectInvalidConsumerIds) { @@ -755,9 +731,7 @@ TEST_F(DocumentSourceExchangeTest, RejectInvalidConsumerIds) { << BSON_ARRAY(BSON("a" << MINKEY) << BSON("a" << MAXKEY)) << "consumerIds" << BSON_ARRAY(1)); ASSERT_THROWS_CODE( - Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))), - AssertionException, - 50894); + Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 50894); } TEST_F(DocumentSourceExchangeTest, RejectInvalidMissingKeys) { @@ -767,9 +741,7 @@ TEST_F(DocumentSourceExchangeTest, RejectInvalidMissingKeys) { << BSON_ARRAY(BSON("a" << MINKEY) << BSON("a" << MAXKEY)) << "consumerIds" << BSON_ARRAY(0)); ASSERT_THROWS_CODE( - Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))), - AssertionException, - 50967); + Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 50967); } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index 9414a87ad5c..1152e92f5f4 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -291,7 +291,24 @@ intrusive_ptr<DocumentSource> DocumentSourceFacet::createFromBson( for (auto&& rawFacet : extractRawPipelines(elem)) { const auto facetName = rawFacet.first; - auto pipeline = uassertStatusOK(Pipeline::parseFacetPipeline(rawFacet.second, expCtx)); + auto pipeline = Pipeline::parse(rawFacet.second, expCtx, [](const Pipeline& pipeline) { + auto sources = pipeline.getSources(); + uassert(ErrorCodes::BadValue, + "sub-pipeline in $facet stage cannot be empty", + !sources.empty()); + + std::for_each(sources.begin(), sources.end(), [](auto& stage) { + auto stageConstraints = stage->constraints(); + uassert(40600, + str::stream() << stage->getSourceName() + << " is not allowed to be used within a $facet stage", + stageConstraints.isAllowedInsideFacetStage()); + // We expect a stage within a $facet stage to have these properties. + invariant(stageConstraints.requiredPosition == + StageConstraints::PositionRequirement::kNone); + invariant(!stageConstraints.isIndependentOfAnyCollection); + }); + }); // Validate that none of the facet pipelines have any conflicting HostTypeRequirements. This // verifies both that all stages within each pipeline are consistent, and that the pipelines diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp index 98bb537d421..da7718fa5c6 100644 --- a/src/mongo/db/pipeline/document_source_facet_test.cpp +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -257,7 +257,7 @@ public: TEST_F(DocumentSourceFacetTest, PassthroughFacetDoesntRequireDiskAndIsOKInaTxn) { auto ctx = getExpCtx(); auto passthrough = DocumentSourcePassthrough::create(); - auto passthroughPipe = uassertStatusOK(Pipeline::createFacetPipeline({passthrough}, ctx)); + auto passthroughPipe = Pipeline::create({passthrough}, ctx); std::vector<DocumentSourceFacet::FacetPipeline> facets; facets.emplace_back("passthrough", std::move(passthroughPipe)); @@ -295,7 +295,7 @@ public: TEST_F(DocumentSourceFacetTest, FacetWithChildThatWritesDataAlsoReportsWritingData) { auto ctx = getExpCtx(); auto writesDataStage = DocumentSourceWritesPersistentData::create(); - auto pipeline = uassertStatusOK(Pipeline::createFacetPipeline({writesDataStage}, ctx)); + auto pipeline = Pipeline::create({writesDataStage}, ctx); std::vector<DocumentSourceFacet::FacetPipeline> facets; facets.emplace_back("writes", std::move(pipeline)); @@ -316,9 +316,7 @@ TEST_F(DocumentSourceFacetTest, SingleFacetShouldReceiveAllDocuments) { auto dummy = DocumentSourcePassthrough::create(); - auto statusWithPipeline = Pipeline::createFacetPipeline({dummy}, ctx); - ASSERT_OK(statusWithPipeline.getStatus()); - auto pipeline = std::move(statusWithPipeline.getValue()); + auto pipeline = Pipeline::create({dummy}, ctx); std::vector<DocumentSourceFacet::FacetPipeline> facets; facets.emplace_back("results", std::move(pipeline)); @@ -344,10 +342,10 @@ TEST_F(DocumentSourceFacetTest, MultipleFacetsShouldSeeTheSameDocuments) { auto mock = DocumentSourceMock::createForTest(inputs); auto firstDummy = DocumentSourcePassthrough::create(); - auto firstPipeline = uassertStatusOK(Pipeline::createFacetPipeline({firstDummy}, ctx)); + auto firstPipeline = Pipeline::create({firstDummy}, ctx); auto secondDummy = DocumentSourcePassthrough::create(); - auto secondPipeline = uassertStatusOK(Pipeline::createFacetPipeline({secondDummy}, ctx)); + auto secondPipeline = Pipeline::create({secondDummy}, ctx); std::vector<DocumentSourceFacet::FacetPipeline> facets; facets.emplace_back("first", std::move(firstPipeline)); @@ -383,10 +381,10 @@ TEST_F(DocumentSourceFacetTest, auto mock = DocumentSourceMock::createForTest(inputs); auto passthrough = DocumentSourcePassthrough::create(); - auto passthroughPipe = uassertStatusOK(Pipeline::createFacetPipeline({passthrough}, ctx)); + auto passthroughPipe = Pipeline::create({passthrough}, ctx); auto limit = DocumentSourceLimit::create(ctx, 1); - auto limitedPipe = uassertStatusOK(Pipeline::createFacetPipeline({limit}, ctx)); + auto limitedPipe = Pipeline::create({limit}, ctx); std::vector<DocumentSourceFacet::FacetPipeline> facets; facets.emplace_back("all", std::move(passthroughPipe)); @@ -422,7 +420,7 @@ TEST_F(DocumentSourceFacetTest, ShouldBeAbleToEvaluateMultipleStagesWithinOneSub auto firstDummy = DocumentSourcePassthrough::create(); auto secondDummy = DocumentSourcePassthrough::create(); - auto pipeline = uassertStatusOK(Pipeline::createFacetPipeline({firstDummy, secondDummy}, ctx)); + auto pipeline = Pipeline::create({firstDummy, secondDummy}, ctx); std::vector<DocumentSourceFacet::FacetPipeline> facets; facets.emplace_back("subPipe", std::move(pipeline)); @@ -441,9 +439,9 @@ TEST_F(DocumentSourceFacetTest, ShouldPropagateDisposeThroughToSource) { auto mockSource = DocumentSourceMock::createForTest(); auto firstDummy = DocumentSourcePassthrough::create(); - auto firstPipe = uassertStatusOK(Pipeline::createFacetPipeline({firstDummy}, ctx)); + auto firstPipe = Pipeline::create({firstDummy}, ctx); auto secondDummy = DocumentSourcePassthrough::create(); - auto secondPipe = uassertStatusOK(Pipeline::createFacetPipeline({secondDummy}, ctx)); + auto secondPipe = Pipeline::create({secondDummy}, ctx); std::vector<DocumentSourceFacet::FacetPipeline> facets; facets.emplace_back("firstPipe", std::move(firstPipe)); @@ -465,7 +463,7 @@ DEATH_TEST_F(DocumentSourceFacetTest, DocumentSourceMock::createForTest(DocumentSource::GetNextResult::makePauseExecution()); auto firstDummy = DocumentSourcePassthrough::create(); - auto pipeline = uassertStatusOK(Pipeline::createFacetPipeline({firstDummy}, ctx)); + auto pipeline = Pipeline::create({firstDummy}, ctx); std::vector<DocumentSourceFacet::FacetPipeline> facets; facets.emplace_back("subPipe", std::move(pipeline)); @@ -489,10 +487,10 @@ TEST_F(DocumentSourceFacetTest, ShouldBeAbleToReParseSerializedStage) { // skippedTwo: [{$skip: 2}] // }} auto firstSkip = DocumentSourceSkip::create(ctx, 1); - auto firstPipeline = uassertStatusOK(Pipeline::createFacetPipeline({firstSkip}, ctx)); + auto firstPipeline = Pipeline::create({firstSkip}, ctx); auto secondSkip = DocumentSourceSkip::create(ctx, 2); - auto secondPipeline = uassertStatusOK(Pipeline::createFacetPipeline({secondSkip}, ctx)); + auto secondPipeline = Pipeline::create({secondSkip}, ctx); std::vector<DocumentSourceFacet::FacetPipeline> facets; facets.emplace_back("skippedOne", std::move(firstPipeline)); @@ -532,7 +530,7 @@ TEST_F(DocumentSourceFacetTest, ShouldOptimizeInnerPipelines) { auto ctx = getExpCtx(); auto dummy = DocumentSourcePassthrough::create(); - auto pipeline = unittest::assertGet(Pipeline::createFacetPipeline({dummy}, ctx)); + auto pipeline = Pipeline::create({dummy}, ctx); std::vector<DocumentSourceFacet::FacetPipeline> facets; facets.emplace_back("subPipe", std::move(pipeline)); @@ -550,10 +548,10 @@ TEST_F(DocumentSourceFacetTest, ShouldPropagateDetachingAndReattachingOfOpCtx) { ctx->mongoProcessInterface = std::make_unique<StubMongoProcessInterface>(); auto firstDummy = DocumentSourcePassthrough::create(); - auto firstPipeline = unittest::assertGet(Pipeline::createFacetPipeline({firstDummy}, ctx)); + auto firstPipeline = Pipeline::create({firstDummy}, ctx); auto secondDummy = DocumentSourcePassthrough::create(); - auto secondPipeline = unittest::assertGet(Pipeline::createFacetPipeline({secondDummy}, ctx)); + auto secondPipeline = Pipeline::create({secondDummy}, ctx); std::vector<DocumentSourceFacet::FacetPipeline> facets; facets.emplace_back("one", std::move(firstPipeline)); @@ -607,7 +605,7 @@ TEST_F(DocumentSourceFacetTest, ShouldUnionDependenciesOfInnerPipelines) { auto ctx = getExpCtx(); auto needsA = DocumentSourceNeedsA::create(); - auto firstPipeline = unittest::assertGet(Pipeline::createFacetPipeline({needsA}, ctx)); + auto firstPipeline = Pipeline::create({needsA}, ctx); auto firstPipelineDeps = firstPipeline->getDependencies(DepsTracker::kNoMetadata); ASSERT_FALSE(firstPipelineDeps.needWholeDocument); @@ -615,7 +613,7 @@ TEST_F(DocumentSourceFacetTest, ShouldUnionDependenciesOfInnerPipelines) { ASSERT_EQ(firstPipelineDeps.fields.count("a"), 1UL); auto needsB = DocumentSourceNeedsB::create(); - auto secondPipeline = unittest::assertGet(Pipeline::createFacetPipeline({needsB}, ctx)); + auto secondPipeline = Pipeline::create({needsB}, ctx); auto secondPipelineDeps = secondPipeline->getDependencies(DepsTracker::kNoMetadata); ASSERT_FALSE(secondPipelineDeps.needWholeDocument); @@ -654,11 +652,10 @@ TEST_F(DocumentSourceFacetTest, ShouldRequireWholeDocumentIfAnyPipelineRequiresW auto ctx = getExpCtx(); auto needsA = DocumentSourceNeedsA::create(); - auto firstPipeline = unittest::assertGet(Pipeline::createFacetPipeline({needsA}, ctx)); + auto firstPipeline = Pipeline::create({needsA}, ctx); auto needsWholeDocument = DocumentSourceNeedsWholeDocument::create(); - auto secondPipeline = - unittest::assertGet(Pipeline::createFacetPipeline({needsWholeDocument}, ctx)); + auto secondPipeline = Pipeline::create({needsWholeDocument}, ctx); std::vector<DocumentSourceFacet::FacetPipeline> facets; facets.emplace_back("needsA", std::move(firstPipeline)); @@ -689,14 +686,13 @@ TEST_F(DocumentSourceFacetTest, ShouldRequireTextScoreIfAnyPipelineRequiresTextS auto ctx = getExpCtx(); auto needsA = DocumentSourceNeedsA::create(); - auto firstPipeline = unittest::assertGet(Pipeline::createFacetPipeline({needsA}, ctx)); + auto firstPipeline = Pipeline::create({needsA}, ctx); auto needsWholeDocument = DocumentSourceNeedsWholeDocument::create(); - auto secondPipeline = - unittest::assertGet(Pipeline::createFacetPipeline({needsWholeDocument}, ctx)); + auto secondPipeline = Pipeline::create({needsWholeDocument}, ctx); auto needsTextScore = DocumentSourceNeedsOnlyTextScore::create(); - auto thirdPipeline = unittest::assertGet(Pipeline::createFacetPipeline({needsTextScore}, ctx)); + auto thirdPipeline = Pipeline::create({needsTextScore}, ctx); std::vector<DocumentSourceFacet::FacetPipeline> facets; facets.emplace_back("needsA", std::move(firstPipeline)); @@ -714,10 +710,10 @@ TEST_F(DocumentSourceFacetTest, ShouldThrowIfAnyPipelineRequiresTextScoreButItIs auto ctx = getExpCtx(); auto needsA = DocumentSourceNeedsA::create(); - auto firstPipeline = unittest::assertGet(Pipeline::createFacetPipeline({needsA}, ctx)); + auto firstPipeline = Pipeline::create({needsA}, ctx); auto needsTextScore = DocumentSourceNeedsOnlyTextScore::create(); - auto secondPipeline = unittest::assertGet(Pipeline::createFacetPipeline({needsTextScore}, ctx)); + auto secondPipeline = Pipeline::create({needsTextScore}, ctx); std::vector<DocumentSourceFacet::FacetPipeline> facets; facets.emplace_back("needsA", std::move(firstPipeline)); @@ -753,11 +749,10 @@ TEST_F(DocumentSourceFacetTest, ShouldRequirePrimaryShardIfAnyStageRequiresPrima auto ctx = getExpCtx(); auto passthrough = DocumentSourcePassthrough::create(); - auto firstPipeline = unittest::assertGet(Pipeline::createFacetPipeline({passthrough}, ctx)); + auto firstPipeline = Pipeline::create({passthrough}, ctx); auto needsPrimaryShard = DocumentSourceNeedsPrimaryShard::create(); - auto secondPipeline = - unittest::assertGet(Pipeline::createFacetPipeline({needsPrimaryShard}, ctx)); + auto secondPipeline = Pipeline::create({needsPrimaryShard}, ctx); std::vector<DocumentSourceFacet::FacetPipeline> facets; facets.emplace_back("passthrough", std::move(firstPipeline)); @@ -776,12 +771,10 @@ TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPr auto ctx = getExpCtx(); auto firstPassthrough = DocumentSourcePassthrough::create(); - auto firstPipeline = - unittest::assertGet(Pipeline::createFacetPipeline({firstPassthrough}, ctx)); + auto firstPipeline = Pipeline::create({firstPassthrough}, ctx); auto secondPassthrough = DocumentSourcePassthrough::create(); - auto secondPipeline = - unittest::assertGet(Pipeline::createFacetPipeline({secondPassthrough}, ctx)); + auto secondPipeline = Pipeline::create({secondPassthrough}, ctx); std::vector<DocumentSourceFacet::FacetPipeline> facets; facets.emplace_back("first", std::move(firstPipeline)); @@ -843,16 +836,13 @@ TEST_F(DocumentSourceFacetTest, ShouldSurfaceStrictestRequirementsOfEachConstrai auto ctx = getExpCtx(); auto firstPassthrough = DocumentSourcePassthrough::create(); - auto firstPipeline = - unittest::assertGet(Pipeline::createFacetPipeline({firstPassthrough}, ctx)); + auto firstPipeline = Pipeline::create({firstPassthrough}, ctx); auto secondPassthrough = DocumentSourcePrimaryShardTmpDataNoTxn::create(); - auto secondPipeline = - unittest::assertGet(Pipeline::createFacetPipeline({secondPassthrough}, ctx)); + auto secondPipeline = Pipeline::create({secondPassthrough}, ctx); auto thirdPassthrough = DocumentSourceBannedInLookup::create(); - auto thirdPipeline = - unittest::assertGet(Pipeline::createFacetPipeline({thirdPassthrough}, ctx)); + auto thirdPipeline = Pipeline::create({thirdPassthrough}, ctx); std::vector<DocumentSourceFacet::FacetPipeline> facets; facets.emplace_back("first", std::move(firstPipeline)); diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index b714ed53c9a..d01784a4bbf 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -607,7 +607,7 @@ intrusive_ptr<DocumentSource> DocumentSourceGraphLookUp::createFromBson( void DocumentSourceGraphLookUp::addInvolvedCollections( stdx::unordered_set<NamespaceString>* collectionNames) const { collectionNames->insert(_fromExpCtx->ns); - auto introspectionPipeline = uassertStatusOK(Pipeline::parse(_fromPipeline, _fromExpCtx)); + auto introspectionPipeline = Pipeline::parse(_fromPipeline, _fromExpCtx); for (auto&& stage : introspectionPipeline->getSources()) { stage->addInvolvedCollections(collectionNames); } diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index ddbe6ac0e59..e280277115f 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -664,20 +664,15 @@ void DocumentSourceLookUp::resolveLetVariables(const Document& localDoc, Variabl void DocumentSourceLookUp::initializeResolvedIntrospectionPipeline() { copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get()); _resolvedIntrospectionPipeline = - uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx)); - - auto& sources = _resolvedIntrospectionPipeline->getSources(); - - auto it = std::find_if( - sources.begin(), sources.end(), [](const boost::intrusive_ptr<DocumentSource>& src) { - return !src->constraints().isAllowedInLookupPipeline(); + Pipeline::parse(_resolvedPipeline, _fromExpCtx, [](const Pipeline& pipeline) { + const auto& sources = pipeline.getSources(); + std::for_each(sources.begin(), sources.end(), [](auto& src) { + uassert(51047, + str::stream() << src->getSourceName() + << " is not allowed within a $lookup's sub-pipeline", + src->constraints().isAllowedInLookupPipeline()); + }); }); - - // For other stages, use a generic error. - uassert(51047, - str::stream() << (*it)->getSourceName() - << " is not allowed within a $lookup's sub-pipeline", - it == sources.end()); } void DocumentSourceLookUp::serializeToArray( diff --git a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp index ef6d685983c..f5763d42987 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp +++ b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp @@ -205,7 +205,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldReportEOFWithNoCursors) { cursors.emplace_back(makeRemoteCursor( kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, kExhaustedCursorID, {}))); armParams.setRemotes(std::move(cursors)); - auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); + auto pipeline = Pipeline::create({}, expCtx); auto mergeCursorsStage = DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx); @@ -229,7 +229,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToIterateCursorsUntilEOF) { cursors.emplace_back( makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}))); armParams.setRemotes(std::move(cursors)); - auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); + auto pipeline = Pipeline::create({}, expCtx); pipeline->addInitialSource( DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); @@ -278,7 +278,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldNotKillCursorsIfTheyAreNotOwned) { cursors.emplace_back( makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}))); armParams.setRemotes(std::move(cursors)); - auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); + auto pipeline = Pipeline::create({}, expCtx); pipeline->addInitialSource( DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); @@ -300,7 +300,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldKillCursorIfPartiallyIterated) { cursors.emplace_back( makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}))); armParams.setRemotes(std::move(cursors)); - auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); + auto pipeline = Pipeline::create({}, expCtx); pipeline->addInitialSource( DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); @@ -335,7 +335,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldKillCursorIfPartiallyIterated) { TEST_F(DocumentSourceMergeCursorsTest, ShouldEnforceSortSpecifiedViaARMParams) { auto expCtx = getExpCtx(); - auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); + auto pipeline = Pipeline::create({}, expCtx); // Make a $mergeCursors stage with a sort on "x" and add it to the front of the pipeline. AsyncResultsMergerParams armParams; diff --git a/src/mongo/db/pipeline/document_source_plan_cache_stats_test.cpp b/src/mongo/db/pipeline/document_source_plan_cache_stats_test.cpp index 95f5c799b7c..4f89e47acdf 100644 --- a/src/mongo/db/pipeline/document_source_plan_cache_stats_test.cpp +++ b/src/mongo/db/pipeline/document_source_plan_cache_stats_test.cpp @@ -116,7 +116,7 @@ TEST_F(DocumentSourcePlanCacheStatsTest, SerializesSuccessfullyAfterAbsorbingMat auto planCacheStats = DocumentSourcePlanCacheStats::createFromBson(specObj.firstElement(), getExpCtx()); auto match = DocumentSourceMatch::create(fromjson("{foo: 'bar'}"), getExpCtx()); - auto pipeline = unittest::assertGet(Pipeline::create({planCacheStats, match}, getExpCtx())); + auto pipeline = Pipeline::create({planCacheStats, match}, getExpCtx()); ASSERT_EQ(2u, pipeline->getSources().size()); pipeline->optimizePipeline(); @@ -133,7 +133,7 @@ TEST_F(DocumentSourcePlanCacheStatsTest, SerializesSuccessfullyAfterAbsorbingMat auto planCacheStats = DocumentSourcePlanCacheStats::createFromBson(specObj.firstElement(), getExpCtx()); auto match = DocumentSourceMatch::create(fromjson("{foo: 'bar'}"), getExpCtx()); - auto pipeline = unittest::assertGet(Pipeline::create({planCacheStats, match}, getExpCtx())); + auto pipeline = Pipeline::create({planCacheStats, match}, getExpCtx()); ASSERT_EQ(2u, pipeline->getSources().size()); pipeline->optimizePipeline(); @@ -170,7 +170,7 @@ TEST_F(DocumentSourcePlanCacheStatsTest, ReturnsOnlyMatchingStatsAfterAbsorbingM auto planCacheStats = DocumentSourcePlanCacheStats::createFromBson(specObj.firstElement(), getExpCtx()); auto match = DocumentSourceMatch::create(fromjson("{foo: 'bar'}"), getExpCtx()); - auto pipeline = unittest::assertGet(Pipeline::create({planCacheStats, match}, getExpCtx())); + auto pipeline = Pipeline::create({planCacheStats, match}, getExpCtx()); pipeline->optimizePipeline(); ASSERT_BSONOBJ_EQ(pipeline->getNext()->toBson(), @@ -197,7 +197,7 @@ TEST_F(DocumentSourcePlanCacheStatsTest, ReturnsHostNameWhenNotFromMongos) { const auto specObj = fromjson("{$planCacheStats: {}}"); auto planCacheStats = DocumentSourcePlanCacheStats::createFromBson(specObj.firstElement(), getExpCtx()); - auto pipeline = unittest::assertGet(Pipeline::create({planCacheStats}, getExpCtx())); + auto pipeline = Pipeline::create({planCacheStats}, getExpCtx()); ASSERT_BSONOBJ_EQ(pipeline->getNext()->toBson(), BSON("foo" << "bar" @@ -223,7 +223,7 @@ TEST_F(DocumentSourcePlanCacheStatsTest, ReturnsShardAndHostNameWhenFromMongos) const auto specObj = fromjson("{$planCacheStats: {}}"); auto planCacheStats = DocumentSourcePlanCacheStats::createFromBson(specObj.firstElement(), getExpCtx()); - auto pipeline = unittest::assertGet(Pipeline::create({planCacheStats}, getExpCtx())); + auto pipeline = Pipeline::create({planCacheStats}, getExpCtx()); ASSERT_BSONOBJ_EQ(pipeline->getNext()->toBson(), BSON("foo" << "bar" diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp index 151ea99c58a..1885bef0b51 100644 --- a/src/mongo/db/pipeline/document_source_union_with.cpp +++ b/src/mongo/db/pipeline/document_source_union_with.cpp @@ -52,11 +52,22 @@ std::unique_ptr<Pipeline, PipelineDeleter> buildPipelineFromViewDefinition( const boost::intrusive_ptr<ExpressionContext>& expCtx, ExpressionContext::ResolvedNamespace resolvedNs, std::vector<BSONObj> currentPipeline) { + + auto validatorCallback = [](const Pipeline& pipeline) { + const auto& sources = pipeline.getSources(); + std::for_each(sources.begin(), sources.end(), [](auto& src) { + uassert(31441, + str::stream() << src->getSourceName() + << " is not allowed within a $unionWith's sub-pipeline", + src->constraints().isAllowedInUnionPipeline()); + }); + }; + // Copy the ExpressionContext of the base aggregation, using the inner namespace instead. auto unionExpCtx = expCtx->copyForSubPipeline(resolvedNs.ns); if (resolvedNs.pipeline.empty()) { - return uassertStatusOK(Pipeline::parse(std::move(currentPipeline), unionExpCtx)); + return Pipeline::parse(std::move(currentPipeline), unionExpCtx, validatorCallback); } auto resolvedPipeline = std::move(resolvedNs.pipeline); resolvedPipeline.reserve(currentPipeline.size() + resolvedPipeline.size()); @@ -64,7 +75,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> buildPipelineFromViewDefinition( std::make_move_iterator(currentPipeline.begin()), std::make_move_iterator(currentPipeline.end())); - return uassertStatusOK(Pipeline::parse(std::move(resolvedPipeline), unionExpCtx)); + return Pipeline::parse(std::move(resolvedPipeline), unionExpCtx, validatorCallback); } } // namespace diff --git a/src/mongo/db/pipeline/document_source_union_with.h b/src/mongo/db/pipeline/document_source_union_with.h index 3225a1e6468..ade72532935 100644 --- a/src/mongo/db/pipeline/document_source_union_with.h +++ b/src/mongo/db/pipeline/document_source_union_with.h @@ -62,19 +62,7 @@ public: DocumentSourceUnionWith(const boost::intrusive_ptr<ExpressionContext>& expCtx, std::unique_ptr<Pipeline, PipelineDeleter> pipeline) - : DocumentSource(kStageName, expCtx), _pipeline(std::move(pipeline)) { - if (_pipeline) { - const auto& sources = _pipeline->getSources(); - auto it = std::find_if(sources.begin(), sources.end(), [](const auto& src) { - return !src->constraints().isAllowedInUnionPipeline(); - }); - - uassert(31441, - str::stream() << (*it)->getSourceName() - << " is not allowed within a $unionWith's sub-pipeline", - it == sources.end()); - } - } + : DocumentSource(kStageName, expCtx), _pipeline(std::move(pipeline)) {} const char* getSourceName() const final { return kStageName.rawData(); diff --git a/src/mongo/db/pipeline/document_source_union_with_test.cpp b/src/mongo/db/pipeline/document_source_union_with_test.cpp index eaebccf21a7..61412ad421c 100644 --- a/src/mongo/db/pipeline/document_source_union_with_test.cpp +++ b/src/mongo/db/pipeline/document_source_union_with_test.cpp @@ -70,12 +70,10 @@ TEST_F(DocumentSourceUnionWithTest, BasicSerialUnions) { mockCtxTwo->mongoProcessInterface = std::make_unique<MockMongoInterface>(mockDequeTwo); auto unionWithOne = DocumentSourceUnionWith( mockCtxOne, - uassertStatusOK( - Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx()))); + Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx())); auto unionWithTwo = DocumentSourceUnionWith( mockCtxTwo, - uassertStatusOK( - Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx()))); + Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx())); unionWithOne.setSource(mock.get()); unionWithTwo.setSource(&unionWithOne); @@ -106,12 +104,11 @@ TEST_F(DocumentSourceUnionWithTest, BasicNestedUnions) { mockCtxTwo->mongoProcessInterface = std::make_unique<MockMongoInterface>(mockDequeTwo); auto unionWithOne = make_intrusive<DocumentSourceUnionWith>( mockCtxOne, - uassertStatusOK( - Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx()))); + Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx())); auto unionWithTwo = DocumentSourceUnionWith( mockCtxTwo, - uassertStatusOK(Pipeline::create( - std::list<boost::intrusive_ptr<DocumentSource>>{unionWithOne}, getExpCtx()))); + Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{unionWithOne}, + getExpCtx())); unionWithTwo.setSource(mock.get()); auto comparator = DocumentComparator(); @@ -144,12 +141,10 @@ TEST_F(DocumentSourceUnionWithTest, UnionsWithNonEmptySubPipelines) { const auto proj = DocumentSourceAddFields::create(BSON("d" << 1), mockCtxTwo); auto unionWithOne = DocumentSourceUnionWith( mockCtxOne, - uassertStatusOK(Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{filter}, - getExpCtx()))); + Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{filter}, getExpCtx())); auto unionWithTwo = DocumentSourceUnionWith( mockCtxTwo, - uassertStatusOK( - Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{proj}, getExpCtx()))); + Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{proj}, getExpCtx())); unionWithOne.setSource(mock.get()); unionWithTwo.setSource(&unionWithOne); @@ -312,12 +307,10 @@ TEST_F(DocumentSourceUnionWithTest, PropagatePauses) { mockCtxTwo->mongoProcessInterface = std::make_unique<MockMongoInterface>(mockDequeTwo); auto unionWithOne = DocumentSourceUnionWith( mockCtxOne, - uassertStatusOK( - Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx()))); + Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx())); auto unionWithTwo = DocumentSourceUnionWith( mockCtxTwo, - uassertStatusOK( - Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx()))); + Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx())); unionWithOne.setSource(mock.get()); unionWithTwo.setSource(&unionWithOne); @@ -339,13 +332,11 @@ TEST_F(DocumentSourceUnionWithTest, DependencyAnalysisReportsFullDoc) { .firstElement(), expCtx); const auto unionWith = make_intrusive<DocumentSourceUnionWith>( - expCtx, - uassertStatusOK( - Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, expCtx))); + expCtx, Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, expCtx)); // With the $unionWith *before* the $replaceRoot, the dependency analysis will report that all // fields are needed. - auto pipeline = uassertStatusOK(Pipeline::create({unionWith, replaceRoot}, expCtx)); + auto pipeline = Pipeline::create({unionWith, replaceRoot}, expCtx); auto deps = pipeline->getDependencies(DepsTracker::kNoMetadata); ASSERT_BSONOBJ_EQ(deps.toProjectionWithoutMetadata(), BSONObj()); @@ -361,13 +352,11 @@ TEST_F(DocumentSourceUnionWithTest, DependencyAnalysisReportsReferencedFieldsBef .firstElement(), expCtx); const auto unionWith = make_intrusive<DocumentSourceUnionWith>( - expCtx, - uassertStatusOK( - Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, expCtx))); + expCtx, Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, expCtx)); // With the $unionWith *after* the $replaceRoot, the dependency analysis will now report only // the referenced fields. - auto pipeline = uassertStatusOK(Pipeline::create({replaceRoot, unionWith}, expCtx)); + auto pipeline = Pipeline::create({replaceRoot, unionWith}, expCtx); auto deps = pipeline->getDependencies(DepsTracker::kNoMetadata); ASSERT_BSONOBJ_EQ(deps.toProjectionWithoutMetadata(), BSON("b" << 1 << "_id" << 0)); @@ -459,8 +448,7 @@ TEST_F(DocumentSourceUnionWithTest, RejectUnionWhenDepthLimitIsExceeded) { TEST_F(DocumentSourceUnionWithTest, ConstraintsWithoutPipelineAreCorrect) { auto emptyUnion = DocumentSourceUnionWith( getExpCtx(), - uassertStatusOK( - Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx()))); + Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx())); StageConstraints defaultConstraints(StageConstraints::StreamType::kStreaming, StageConstraints::PositionRequirement::kNone, StageConstraints::HostTypeRequirement::kAnyShard, @@ -485,8 +473,7 @@ TEST_F(DocumentSourceUnionWithTest, ConstraintsWithMixedSubPipelineAreCorrect) { mock->mockConstraints = stricterConstraint; auto unionWithOne = DocumentSourceUnionWith( getExpCtx(), - uassertStatusOK( - Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{mock}, getExpCtx()))); + Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{mock}, getExpCtx())); ASSERT_TRUE(unionWithOne.constraints(Pipeline::SplitState::kUnsplit) == stricterConstraint); } @@ -525,9 +512,9 @@ TEST_F(DocumentSourceUnionWithTest, ConstraintsWithStrictSubPipelineAreCorrect) mockThree->mockConstraints = constraintPersistentDataTransactionLookupNotAllowed; auto unionStage = DocumentSourceUnionWith( getExpCtx(), - uassertStatusOK(Pipeline::create( + Pipeline::create( std::list<boost::intrusive_ptr<DocumentSource>>{mockOne, mockTwo, mockThree}, - getExpCtx()))); + getExpCtx())); StageConstraints strict(StageConstraints::StreamType::kStreaming, StageConstraints::PositionRequirement::kNone, StageConstraints::HostTypeRequirement::kAnyShard, @@ -549,14 +536,13 @@ TEST_F(DocumentSourceUnionWithTest, StricterConstraintsFromSubSubPipelineAreInhe StageConstraints::LookupRequirement::kNotAllowed, StageConstraints::UnionRequirement::kAllowed); mock->mockConstraints = strictConstraint; - auto facetPipeline = uassertStatusOK(Pipeline::createFacetPipeline({mock}, getExpCtx())); + auto facetPipeline = Pipeline::create({mock}, getExpCtx()); std::vector<DocumentSourceFacet::FacetPipeline> facets; facets.emplace_back("pipeline", std::move(facetPipeline)); auto facetStage = DocumentSourceFacet::create(std::move(facets), getExpCtx()); auto unionStage = DocumentSourceUnionWith( getExpCtx(), - uassertStatusOK(Pipeline::create( - std::list<boost::intrusive_ptr<DocumentSource>>{facetStage}, getExpCtx()))); + Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{facetStage}, getExpCtx())); StageConstraints expectedConstraints(StageConstraints::StreamType::kStreaming, StageConstraints::PositionRequirement::kNone, StageConstraints::HostTypeRequirement::kAnyShard, diff --git a/src/mongo/db/pipeline/expression_walker_test.cpp b/src/mongo/db/pipeline/expression_walker_test.cpp index 4cc5a1c4691..4c409a468b6 100644 --- a/src/mongo/db/pipeline/expression_walker_test.cpp +++ b/src/mongo/db/pipeline/expression_walker_test.cpp @@ -53,7 +53,7 @@ protected: NamespaceString testNss("test", "collection"); AggregationRequest request(testNss, rawPipeline); - return uassertStatusOK(Pipeline::parse(request.getPipeline(), getExpCtx())); + return Pipeline::parse(request.getPipeline(), getExpCtx()); } auto parseExpression(std::string expressionString) { diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index bd8cf53df0c..eb3a4d97914 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -69,6 +69,54 @@ Value appendExecStats(Value docSource, const CommonStats& stats) { doc.addField("executionTimeMillisEstimate", Value(executionTimeMillisEstimate)); return Value(doc.freeze()); } + +/** + * Performs validation checking specific to top-level pipelines. Throws an assertion if the + * pipeline is invalid. + */ +void validateTopLevelPipeline(const Pipeline& pipeline) { + // Verify that the specified namespace is valid for the initial stage of this pipeline. + const NamespaceString& nss = pipeline.getContext()->ns; + + auto sources = pipeline.getSources(); + + if (sources.empty()) { + uassert(ErrorCodes::InvalidNamespace, + "{aggregate: 1} is not valid for an empty pipeline.", + !nss.isCollectionlessAggregateNS()); + return; + } + + if ("$mergeCursors"_sd != sources.front()->getSourceName()) { + // The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this, + // {aggregate: 1} is only valid for collectionless sources, and vice-versa. + const auto firstStageConstraints = sources.front()->constraints(); + + uassert(ErrorCodes::InvalidNamespace, + str::stream() << "{aggregate: 1} is not valid for '" + << sources.front()->getSourceName() << "'; a collection is required.", + !(nss.isCollectionlessAggregateNS() && + !firstStageConstraints.isIndependentOfAnyCollection)); + + uassert(ErrorCodes::InvalidNamespace, + str::stream() << "'" << sources.front()->getSourceName() + << "' can only be run with {aggregate: 1}", + !(!nss.isCollectionlessAggregateNS() && + firstStageConstraints.isIndependentOfAnyCollection)); + + // If the first stage is a $changeStream stage, then all stages in the pipeline must be + // either $changeStream stages or whitelisted as being able to run in a change stream. + if (firstStageConstraints.isChangeStreamStage()) { + for (auto&& source : sources) { + uassert(ErrorCodes::IllegalOperation, + str::stream() << source->getSourceName() + << " is not permitted in a $changeStream pipeline", + source->constraints().isAllowedInChangeStream()); + } + } + } +} + } // namespace MONGO_FAIL_POINT_DEFINE(disablePipelineOptimization); @@ -100,20 +148,10 @@ Pipeline::~Pipeline() { invariant(_disposed); } -StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::parse( - const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx) { - return parseTopLevelOrFacetPipeline(rawPipeline, expCtx, false); -} - -StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::parseFacetPipeline( - const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx) { - return parseTopLevelOrFacetPipeline(rawPipeline, expCtx, true); -} - -StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::parseTopLevelOrFacetPipeline( +std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::parse( const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx, - const bool isFacetPipeline) { + PipelineValidatorCallback validator) { SourceContainer stages; @@ -122,105 +160,32 @@ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::parseTopLevelOr stages.insert(stages.end(), parsedSources.begin(), parsedSources.end()); } - return createTopLevelOrFacetPipeline(std::move(stages), expCtx, isFacetPipeline); -} - -StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::create( - SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) { - return createTopLevelOrFacetPipeline(std::move(stages), expCtx, false); -} - -StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::createFacetPipeline( - SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) { - return createTopLevelOrFacetPipeline(std::move(stages), expCtx, true); -} - -StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::createTopLevelOrFacetPipeline( - SourceContainer stages, - const intrusive_ptr<ExpressionContext>& expCtx, - const bool isFacetPipeline) { std::unique_ptr<Pipeline, PipelineDeleter> pipeline(new Pipeline(std::move(stages), expCtx), PipelineDeleter(expCtx->opCtx)); - try { - pipeline->validate(isFacetPipeline); - } catch (const DBException& ex) { - return ex.toStatus(); - } - - pipeline->stitch(); - return std::move(pipeline); -} -void Pipeline::validate(bool isFacetPipeline) const { - if (isFacetPipeline) { - validateFacetPipeline(); - } else { - validateTopLevelPipeline(); + // First call the context-specific validator, which may be different for top-level pipelines + // versus nested pipelines. + if (validator) + validator(*pipeline); + else { + validateTopLevelPipeline(*pipeline); } - validateCommon(); -} - -void Pipeline::validateTopLevelPipeline() const { - // Verify that the specified namespace is valid for the initial stage of this pipeline. - const NamespaceString& nss = pCtx->ns; + // Next run through the common validation rules that apply to every pipeline. + pipeline->validateCommon(); - if (_sources.empty()) { - if (nss.isCollectionlessAggregateNS()) { - uasserted(ErrorCodes::InvalidNamespace, - "{aggregate: 1} is not valid for an empty pipeline."); - } - return; - } - if ("$mergeCursors"_sd != _sources.front()->getSourceName()) { - // The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this, - // {aggregate: 1} is only valid for collectionless sources, and vice-versa. - const auto firstStageConstraints = _sources.front()->constraints(_splitState); - - if (nss.isCollectionlessAggregateNS() && - !firstStageConstraints.isIndependentOfAnyCollection) { - uasserted(ErrorCodes::InvalidNamespace, - str::stream() - << "{aggregate: 1} is not valid for '" - << _sources.front()->getSourceName() << "'; a collection is required."); - } - - if (!nss.isCollectionlessAggregateNS() && - firstStageConstraints.isIndependentOfAnyCollection) { - uasserted(ErrorCodes::InvalidNamespace, - str::stream() << "'" << _sources.front()->getSourceName() - << "' can only be run with {aggregate: 1}"); - } - - // If the first stage is a $changeStream stage, then all stages in the pipeline must be - // either $changeStream stages or whitelisted as being able to run in a change stream. - if (firstStageConstraints.isChangeStreamStage()) { - for (auto&& source : _sources) { - uassert(ErrorCodes::IllegalOperation, - str::stream() << source->getSourceName() - << " is not permitted in a $changeStream pipeline", - source->constraints(_splitState).isAllowedInChangeStream()); - } - } - } + pipeline->stitch(); + return pipeline; } -void Pipeline::validateFacetPipeline() const { - if (_sources.empty()) { - uasserted(ErrorCodes::BadValue, "sub-pipeline in $facet stage cannot be empty"); - } +std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::create( + SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) { + std::unique_ptr<Pipeline, PipelineDeleter> pipeline(new Pipeline(std::move(stages), expCtx), + PipelineDeleter(expCtx->opCtx)); - for (auto&& stage : _sources) { - auto stageConstraints = stage->constraints(_splitState); - if (!stageConstraints.isAllowedInsideFacetStage()) { - uasserted(40600, - str::stream() << stage->getSourceName() - << " is not allowed to be used within a $facet stage"); - } - // We expect a stage within a $facet stage to have these properties. - invariant(stageConstraints.requiredPosition == PositionRequirement::kNone); - invariant(!stageConstraints.isIndependentOfAnyCollection); - } + pipeline->validateCommon(); + pipeline->stitch(); + return pipeline; } void Pipeline::validateCommon() const { @@ -229,22 +194,21 @@ void Pipeline::validateCommon() const { auto constraints = stage->constraints(_splitState); // Verify that all stages adhere to their PositionRequirement constraints. - if (constraints.requiredPosition == PositionRequirement::kFirst && i != 0) { - uasserted(40602, - str::stream() << stage->getSourceName() - << " is only valid as the first stage in a pipeline."); - } - auto matchStage = dynamic_cast<DocumentSourceMatch*>(stage.get()); - if (i != 0 && matchStage && matchStage->isTextQuery()) { - uasserted(17313, "$match with $text is only allowed as the first pipeline stage"); - } + uassert(40602, + str::stream() << stage->getSourceName() + << " is only valid as the first stage in a pipeline.", + !(constraints.requiredPosition == PositionRequirement::kFirst && i != 0)); - if (constraints.requiredPosition == PositionRequirement::kLast && - i != _sources.size() - 1) { - uasserted(40601, - str::stream() << stage->getSourceName() - << " can only be the final stage in the pipeline"); - } + auto matchStage = dynamic_cast<DocumentSourceMatch*>(stage.get()); + uassert(17313, + "$match with $text is only allowed as the first pipeline stage", + !(i != 0 && matchStage && matchStage->isTextQuery())); + + uassert(40601, + str::stream() << stage->getSourceName() + << " can only be the final stage in the pipeline", + !(constraints.requiredPosition == PositionRequirement::kLast && + i != _sources.size() - 1)); ++i; // Verify that we are not attempting to run a mongoS-only stage on mongoD. @@ -252,12 +216,10 @@ void Pipeline::validateCommon() const { str::stream() << stage->getSourceName() << " can only be run on mongoS", !(constraints.hostRequirement == HostTypeRequirement::kMongoS && !pCtx->inMongos)); - if (pCtx->inMultiDocumentTransaction) { - uassert(ErrorCodes::OperationNotSupportedInTransaction, - str::stream() << "Stage not supported inside of a multi-document transaction: " - << stage->getSourceName(), - constraints.isAllowedInTransaction()); - } + uassert(ErrorCodes::OperationNotSupportedInTransaction, + str::stream() << "Stage not supported inside of a multi-document transaction: " + << stage->getSourceName(), + !(pCtx->inMultiDocumentTransaction && !constraints.isAllowedInTransaction())); } } @@ -661,7 +623,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) { - auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); + auto pipeline = Pipeline::parse(rawPipeline, expCtx); if (opts.optimize) { pipeline->optimizePipeline(); diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index c3bad653814..5eb096fc7e2 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -101,28 +101,22 @@ public: MatchExpressionParser::AllowedFeatures::kGeoNear; /** - * Parses a Pipeline from a vector of BSONObjs. Returns a non-OK status if it failed to parse. - * The returned pipeline is not optimized, but the caller may convert it to an optimized - * pipeline by calling optimizePipeline(). + * Parses a Pipeline from a vector of BSONObjs then invokes the optional 'validator' callback + * with a reference to the newly created Pipeline. If no validator callback is given, this + * method assumes that we're parsing a top-level pipeline. Throws an exception if it failed to + * parse or if any exception occurs in the validator. The returned pipeline is not optimized, + * but the caller may convert it to an optimized pipeline by calling optimizePipeline(). * * It is illegal to create a pipeline using an ExpressionContext which contains a collation that * will not be used during execution of the pipeline. Doing so may cause comparisons made during * parse-time to return the wrong results. */ - static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> parse( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx); + using PipelineValidatorCallback = std::function<void(const Pipeline&)>; - /** - * Parses a $facet Pipeline from a vector of BSONObjs. Validation checks which are only relevant - * to top-level pipelines are skipped, and additional checks applicable to $facet pipelines are - * performed. Returns a non-OK status if it failed to parse. The returned pipeline is not - * optimized, but the caller may convert it to an optimized pipeline by calling - * optimizePipeline(). - */ - static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> parseFacetPipeline( + static std::unique_ptr<Pipeline, PipelineDeleter> parse( const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx); + const boost::intrusive_ptr<ExpressionContext>& expCtx, + PipelineValidatorCallback validator = nullptr); /** * Creates a Pipeline from an existing SourceContainer. @@ -130,16 +124,7 @@ public: * Returns a non-OK status if any stage is in an invalid position. For example, if an $out stage * is present but is not the last stage. */ - static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> create( - SourceContainer sources, const boost::intrusive_ptr<ExpressionContext>& expCtx); - - /** - * Creates a $facet Pipeline from an existing SourceContainer. - * - * Returns a non-OK status if any stage is invalid. For example, if the pipeline is empty or if - * any stage is an initial source. - */ - static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> createFacetPipeline( + static std::unique_ptr<Pipeline, PipelineDeleter> create( SourceContainer sources, const boost::intrusive_ptr<ExpressionContext>& expCtx); /** @@ -335,24 +320,6 @@ public: private: friend class PipelineDeleter; - /** - * Used by both Pipeline::parse() and Pipeline::parseFacetPipeline() to build and validate the - * pipeline. - */ - static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> parseTopLevelOrFacetPipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const bool isFacetPipeline); - - /** - * Used by both Pipeline::create() and Pipeline::createFacetPipeline() to build and validate the - * pipeline. - */ - static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> createTopLevelOrFacetPipeline( - SourceContainer sources, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const bool isSubPipeline); - Pipeline(const boost::intrusive_ptr<ExpressionContext>& pCtx); Pipeline(SourceContainer stages, const boost::intrusive_ptr<ExpressionContext>& pCtx); @@ -372,25 +339,6 @@ private: void unstitch(); /** - * Throws if the pipeline fails any of a set of semantic checks. For example, if an $out stage - * is present then it must come last in the pipeline, while initial stages such as $indexStats - * must be at the start. - */ - void validate(bool isFacetPipeline) const; - - /** - * Performs validation checking specific to top-level pipelines. Throws if the pipeline is - * invalid. - */ - void validateTopLevelPipeline() const; - - /** - * Performs validation checking specific to nested $facet pipelines. Throws if the pipeline is - * invalid. - */ - void validateFacetPipeline() const; - - /** * Performs common validation for top-level or facet pipelines. Throws if the pipeline is * invalid. * diff --git a/src/mongo/db/pipeline/pipeline_metadata_tree_test.cpp b/src/mongo/db/pipeline/pipeline_metadata_tree_test.cpp index 5a15074b361..c9d05ea2b4e 100644 --- a/src/mongo/db/pipeline/pipeline_metadata_tree_test.cpp +++ b/src/mongo/db/pipeline/pipeline_metadata_tree_test.cpp @@ -83,7 +83,7 @@ protected: AggregationRequest request(testNss, rawPipeline); getExpCtx()->ns = testNss; - return uassertStatusOK(Pipeline::parse(request.getPipeline(), getExpCtx())); + return Pipeline::parse(request.getPipeline(), getExpCtx()); } template <typename T, typename... Args> @@ -391,7 +391,7 @@ TEST_F(PipelineMetadataTreeTest, ZipWalksAPipelineAndTreeInTandemAndInOrder) { } TEST_F(PipelineMetadataTreeTest, MakeTreeWithEmptyPipeline) { - auto pipeline = uassertStatusOK(Pipeline::parse({}, getExpCtx())); + auto pipeline = Pipeline::parse({}, getExpCtx()); auto result = makeTree<std::string>({{NamespaceString("unittests.pipeline_test"), std::string("input")}}, *pipeline, diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 4139cd1cd5a..912d96fe49f 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -116,7 +116,7 @@ void assertPipelineOptimizesAndSerializesTo(std::string inputPipeJson, ctx->setResolvedNamespace(lookupCollNs, {lookupCollNs, std::vector<BSONObj>{}}); ctx->setResolvedNamespace(unionCollNs, {unionCollNs, std::vector<BSONObj>{}}); - auto outputPipe = uassertStatusOK(Pipeline::parse(request.getPipeline(), ctx)); + auto outputPipe = Pipeline::parse(request.getPipeline(), ctx); outputPipe->optimizePipeline(); ASSERT_VALUE_EQ(Value(outputPipe->writeExplainOps(ExplainOptions::Verbosity::kQueryPlanner)), @@ -1679,7 +1679,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupSwapsWithIndependentMatch) { auto matchPredicate = BSON("extra" << "predicate"); stages.push_back(DocumentSourceMatch::create(matchPredicate, expCtx)); - auto pipeline = uassertStatusOK(Pipeline::create(stages, expCtx)); + auto pipeline = Pipeline::create(stages, expCtx); pipeline->optimizePipeline(); // Make sure the $match stage has swapped before the change look up. @@ -1704,7 +1704,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupDoesNotSwapWithMatchOnPostImage stages.push_back(DocumentSourceMatch::create( BSON(DocumentSourceLookupChangePostImage::kFullDocumentFieldName << BSONNULL), expCtx)); - auto pipeline = uassertStatusOK(Pipeline::create(stages, expCtx)); + auto pipeline = Pipeline::create(stages, expCtx); pipeline->optimizePipeline(); // Make sure the $match stage stays at the end. @@ -1730,7 +1730,7 @@ TEST(PipelineOptimizationTest, FullDocumentBeforeChangeLookupSwapsWithIndependen auto matchPredicate = BSON("extra" << "predicate"); stages.push_back(DocumentSourceMatch::create(matchPredicate, expCtx)); - auto pipeline = uassertStatusOK(Pipeline::create(stages, expCtx)); + auto pipeline = Pipeline::create(stages, expCtx); pipeline->optimizePipeline(); // Make sure the $match stage has swapped before the change look up. @@ -1756,7 +1756,7 @@ TEST(PipelineOptimizationTest, FullDocumentBeforeChangeDoesNotSwapWithMatchOnPre stages.push_back(DocumentSourceMatch::create( BSON(DocumentSourceLookupChangePreImage::kFullDocumentBeforeChangeFieldName << BSONNULL), expCtx)); - auto pipeline = uassertStatusOK(Pipeline::create(stages, expCtx)); + auto pipeline = Pipeline::create(stages, expCtx); pipeline->optimizePipeline(); // Make sure the $match stage stays at the end. @@ -2095,7 +2095,7 @@ public: ctx->setResolvedNamespace(lookupCollNs, {lookupCollNs, std::vector<BSONObj>{}}); // Test that we can both split the pipeline and reassemble it into its original form. - mergePipe = uassertStatusOK(Pipeline::parse(request.getPipeline(), ctx)); + mergePipe = Pipeline::parse(request.getPipeline(), ctx); mergePipe->optimizePipeline(); auto splitPipeline = sharded_agg_helpers::splitPipeline(std::move(mergePipe)); @@ -2690,7 +2690,7 @@ TEST_F(PipelineMustRunOnMongoSTest, UnsplittablePipelineMustRunOnMongoS) { auto match = DocumentSourceMatch::create(fromjson("{x: 5}"), expCtx); auto runOnMongoS = DocumentSourceMustRunOnMongoS::create(); - auto pipeline = uassertStatusOK(Pipeline::create({match, runOnMongoS}, expCtx)); + auto pipeline = Pipeline::create({match, runOnMongoS}, expCtx); ASSERT_TRUE(pipeline->requiredToRunOnMongos()); pipeline->optimizePipeline(); @@ -2707,7 +2707,7 @@ TEST_F(PipelineMustRunOnMongoSTest, UnsplittableMongoSPipelineAssertsIfDisallowe auto runOnMongoS = DocumentSourceMustRunOnMongoS::create(); auto sort = DocumentSourceSort::create(expCtx, fromjson("{x: 1}")); - auto pipeline = uassertStatusOK(Pipeline::create({match, runOnMongoS, sort}, expCtx)); + auto pipeline = Pipeline::create({match, runOnMongoS, sort}, expCtx); pipeline->optimizePipeline(); // The entire pipeline must run on mongoS, but $sort cannot do so when 'allowDiskUse' is true. @@ -2725,7 +2725,7 @@ DEATH_TEST_F(PipelineMustRunOnMongoSTest, auto split = DocumentSourceInternalSplitPipeline::create(expCtx, HostTypeRequirement::kNone); auto runOnMongoS = DocumentSourceMustRunOnMongoS::create(); - auto pipeline = uassertStatusOK(Pipeline::create({match, split, runOnMongoS}, expCtx)); + auto pipeline = Pipeline::create({match, split, runOnMongoS}, expCtx); // We don't need to run the entire pipeline on mongoS because we can split at // $_internalSplitPipeline. @@ -2766,7 +2766,7 @@ TEST_F(PipelineMustRunOnMongoSTest, SplitMongoSMergePipelineAssertsIfShardStageP auto outSpec = fromjson("{$out: 'outcoll'}"); auto out = DocumentSourceOut::createFromBson(outSpec["$out"], expCtx); - auto pipeline = uassertStatusOK(Pipeline::create({match, split, runOnMongoS, out}, expCtx)); + auto pipeline = Pipeline::create({match, split, runOnMongoS, out}, expCtx); // We don't need to run the entire pipeline on mongoS because we can split at // $_internalSplitPipeline. @@ -2789,7 +2789,7 @@ TEST_F(PipelineMustRunOnMongoSTest, SplittablePipelineAssertsIfMongoSStageOnShar auto split = DocumentSourceInternalSplitPipeline::create(expCtx, HostTypeRequirement::kAnyShard); - auto pipeline = uassertStatusOK(Pipeline::create({match, runOnMongoS, split}, expCtx)); + auto pipeline = Pipeline::create({match, runOnMongoS, split}, expCtx); pipeline->optimizePipeline(); // The 'runOnMongoS' stage comes before any splitpoint, so this entire pipeline must run on @@ -2808,7 +2808,7 @@ TEST_F(PipelineMustRunOnMongoSTest, SplittablePipelineRunsUnsplitOnMongoSIfSplit auto runOnMongoS = DocumentSourceMustRunOnMongoS::create(); auto split = DocumentSourceInternalSplitPipeline::create(expCtx, HostTypeRequirement::kNone); - auto pipeline = uassertStatusOK(Pipeline::create({match, runOnMongoS, split}, expCtx)); + auto pipeline = Pipeline::create({match, runOnMongoS, split}, expCtx); pipeline->optimizePipeline(); // The 'runOnMongoS' stage is before the splitpoint, so this entire pipeline must run on mongoS. @@ -2827,7 +2827,7 @@ TEST(PipelineInitialSource, GeoNearInitialQuery) { fromjson("{$geoNear: {distanceField: 'd', near: [0, 0], query: {a: 1}}}")}; intrusive_ptr<ExpressionContextForTest> ctx = new ExpressionContextForTest( &_opCtx, AggregationRequest(NamespaceString("a.collection"), rawPipeline)); - auto pipe = uassertStatusOK(Pipeline::parse(rawPipeline, ctx)); + auto pipe = Pipeline::parse(rawPipeline, ctx); ASSERT_BSONOBJ_EQ(pipe->getInitialQuery(), BSON("a" << 1)); } @@ -2837,7 +2837,7 @@ TEST(PipelineInitialSource, MatchInitialQuery) { intrusive_ptr<ExpressionContextForTest> ctx = new ExpressionContextForTest( &_opCtx, AggregationRequest(NamespaceString("a.collection"), rawPipeline)); - auto pipe = uassertStatusOK(Pipeline::parse(rawPipeline, ctx)); + auto pipe = Pipeline::parse(rawPipeline, ctx); ASSERT_BSONOBJ_EQ(pipe->getInitialQuery(), BSON("a" << 4)); } @@ -2875,7 +2875,8 @@ TEST_F(PipelineValidateTest, AggregateOneNSNotValidForEmptyPipeline) { ctx->ns = NamespaceString::makeCollectionlessAggregateNSS("a"); - ASSERT_NOT_OK(Pipeline::parse(rawPipeline, ctx).getStatus()); + ASSERT_THROWS_CODE( + Pipeline::parse(rawPipeline, ctx), AssertionException, ErrorCodes::InvalidNamespace); } TEST_F(PipelineValidateTest, AggregateOneNSNotValidIfInitialStageRequiresCollection) { @@ -2884,7 +2885,8 @@ TEST_F(PipelineValidateTest, AggregateOneNSNotValidIfInitialStageRequiresCollect ctx->ns = NamespaceString::makeCollectionlessAggregateNSS("a"); - ASSERT_NOT_OK(Pipeline::parse(rawPipeline, ctx).getStatus()); + ASSERT_THROWS_CODE( + Pipeline::parse(rawPipeline, ctx), AssertionException, ErrorCodes::InvalidNamespace); } TEST_F(PipelineValidateTest, AggregateOneNSValidIfInitialStageIsCollectionless) { @@ -2893,7 +2895,7 @@ TEST_F(PipelineValidateTest, AggregateOneNSValidIfInitialStageIsCollectionless) ctx->ns = NamespaceString::makeCollectionlessAggregateNSS("a"); - ASSERT_OK(Pipeline::create({collectionlessSource}, ctx).getStatus()); + Pipeline::create({collectionlessSource}, ctx); } TEST_F(PipelineValidateTest, CollectionNSNotValidIfInitialStageIsCollectionless) { @@ -2902,16 +2904,20 @@ TEST_F(PipelineValidateTest, CollectionNSNotValidIfInitialStageIsCollectionless) ctx->ns = kTestNss; - ASSERT_NOT_OK(Pipeline::create({collectionlessSource}, ctx).getStatus()); + ASSERT_THROWS_CODE(Pipeline::parse({fromjson("{$listLocalSessions: {}}")}, + ctx), // Pipeline::create({collectionlessSource}, ctx), + AssertionException, + ErrorCodes::InvalidNamespace); } TEST_F(PipelineValidateTest, AggregateOneNSValidForFacetPipelineRegardlessOfInitialStage) { - const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {}}")}; + const std::vector<BSONObj> rawPipeline = {fromjson("{$facet: {subPipe: [{$match: {}}]}}")}; auto ctx = getExpCtx(); ctx->ns = NamespaceString::makeCollectionlessAggregateNSS("unittests"); - ASSERT_OK(Pipeline::parseFacetPipeline(rawPipeline, ctx).getStatus()); + ASSERT_THROWS_CODE( + Pipeline::parse(rawPipeline, ctx), AssertionException, ErrorCodes::InvalidNamespace); } TEST_F(PipelineValidateTest, ChangeStreamIsValidAsFirstStage) { @@ -2919,7 +2925,7 @@ TEST_F(PipelineValidateTest, ChangeStreamIsValidAsFirstStage) { auto ctx = getExpCtx(); setMockReplicationCoordinatorOnOpCtx(ctx->opCtx); ctx->ns = NamespaceString("a.collection"); - ASSERT_OK(Pipeline::parse(rawPipeline, ctx).getStatus()); + Pipeline::parse(rawPipeline, ctx); } TEST_F(PipelineValidateTest, ChangeStreamIsNotValidIfNotFirstStage) { @@ -2928,19 +2934,17 @@ TEST_F(PipelineValidateTest, ChangeStreamIsNotValidIfNotFirstStage) { auto ctx = getExpCtx(); setMockReplicationCoordinatorOnOpCtx(ctx->opCtx); ctx->ns = NamespaceString("a.collection"); - auto parseStatus = Pipeline::parse(rawPipeline, ctx).getStatus(); - ASSERT_EQ(parseStatus, ErrorCodes::duplicateCodeForTest(40602)); + ASSERT_THROWS_CODE(Pipeline::parse(rawPipeline, ctx), AssertionException, 40602); } TEST_F(PipelineValidateTest, ChangeStreamIsNotValidIfNotFirstStageInFacet) { - const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"), - fromjson("{$changeStream: {}}")}; + const std::vector<BSONObj> rawPipeline = { + fromjson("{$facet: {subPipe: [{$match: {}}, {$changeStream: {}}]}}")}; + auto ctx = getExpCtx(); setMockReplicationCoordinatorOnOpCtx(ctx->opCtx); ctx->ns = NamespaceString("a.collection"); - auto parseStatus = Pipeline::parseFacetPipeline(rawPipeline, ctx).getStatus(); - ASSERT_EQ(parseStatus, ErrorCodes::duplicateCodeForTest(40600)); - ASSERT(std::string::npos != parseStatus.reason().find("$changeStream")); + ASSERT_THROWS_CODE(Pipeline::parse(rawPipeline, ctx), AssertionException, 40600); } class DocumentSourceDisallowedInTransactions : public DocumentSourceMock { @@ -2964,8 +2968,6 @@ public: }; TEST_F(PipelineValidateTest, TopLevelPipelineValidatedForStagesIllegalInTransactions) { - BSONObj readConcernSnapshot = BSON("readConcern" << BSON("level" - << "snapshot")); auto ctx = getExpCtx(); ctx->inMultiDocumentTransaction = true; @@ -2973,24 +2975,20 @@ TEST_F(PipelineValidateTest, TopLevelPipelineValidatedForStagesIllegalInTransact // creation fails with the expected error code. auto matchStage = DocumentSourceMatch::create(BSON("_id" << 3), ctx); auto illegalStage = DocumentSourceDisallowedInTransactions::create(); - auto pipeline = Pipeline::create({matchStage, illegalStage}, ctx); - ASSERT_NOT_OK(pipeline.getStatus()); - ASSERT_EQ(pipeline.getStatus(), ErrorCodes::OperationNotSupportedInTransaction); + ASSERT_THROWS_CODE(Pipeline::create({matchStage, illegalStage}, ctx), + AssertionException, + ErrorCodes::OperationNotSupportedInTransaction); } TEST_F(PipelineValidateTest, FacetPipelineValidatedForStagesIllegalInTransactions) { - BSONObj readConcernSnapshot = BSON("readConcern" << BSON("level" - << "snapshot")); auto ctx = getExpCtx(); ctx->inMultiDocumentTransaction = true; - // Make a pipeline with a legal $match, and then an illegal mock stage, and verify that pipeline - // creation fails with the expected error code. - auto matchStage = DocumentSourceMatch::create(BSON("_id" << 3), ctx); - auto illegalStage = DocumentSourceDisallowedInTransactions::create(); - auto pipeline = Pipeline::createFacetPipeline({matchStage, illegalStage}, ctx); - ASSERT_NOT_OK(pipeline.getStatus()); - ASSERT_EQ(pipeline.getStatus(), ErrorCodes::OperationNotSupportedInTransaction); + const std::vector<BSONObj> rawPipeline = { + fromjson("{$facet: {subPipe: [{$match: {}}, {$out: 'outColl'}]}}")}; + ASSERT_THROWS_CODE(Pipeline::parse(rawPipeline, ctx), + AssertionException, + ErrorCodes::OperationNotSupportedInTransaction); } } // namespace pipeline_validate @@ -3000,7 +2998,7 @@ namespace Dependencies { using PipelineDependenciesTest = AggregationContextFixture; TEST_F(PipelineDependenciesTest, EmptyPipelineShouldRequireWholeDocument) { - auto pipeline = unittest::assertGet(Pipeline::create({}, getExpCtx())); + auto pipeline = Pipeline::create({}, getExpCtx()); auto depsTracker = pipeline->getDependencies(DepsTracker::kAllMetadata); ASSERT_TRUE(depsTracker.needWholeDocument); @@ -3096,7 +3094,7 @@ TEST_F(PipelineDependenciesTest, ShouldRequireWholeDocumentIfAnyStageDoesNotSupp auto ctx = getExpCtx(); auto needsASeeNext = DocumentSourceNeedsASeeNext::create(); auto notSupported = DocumentSourceDependenciesNotSupported::create(); - auto pipeline = unittest::assertGet(Pipeline::create({needsASeeNext, notSupported}, ctx)); + auto pipeline = Pipeline::create({needsASeeNext, notSupported}, ctx); auto depsTracker = pipeline->getDependencies(DepsTracker::kAllMetadata); ASSERT_TRUE(depsTracker.needWholeDocument); @@ -3104,7 +3102,7 @@ TEST_F(PipelineDependenciesTest, ShouldRequireWholeDocumentIfAnyStageDoesNotSupp ASSERT_FALSE(depsTracker.getNeedsMetadata(DocumentMetadataFields::kTextScore)); // Now in the other order. - pipeline = unittest::assertGet(Pipeline::create({notSupported, needsASeeNext}, ctx)); + pipeline = Pipeline::create({notSupported, needsASeeNext}, ctx); depsTracker = pipeline->getDependencies(DepsTracker::kAllMetadata); ASSERT_TRUE(depsTracker.needWholeDocument); @@ -3113,7 +3111,7 @@ TEST_F(PipelineDependenciesTest, ShouldRequireWholeDocumentIfAnyStageDoesNotSupp TEST_F(PipelineDependenciesTest, ShouldRequireWholeDocumentIfNoStageReturnsExhaustiveFields) { auto ctx = getExpCtx(); auto needsASeeNext = DocumentSourceNeedsASeeNext::create(); - auto pipeline = unittest::assertGet(Pipeline::create({needsASeeNext}, ctx)); + auto pipeline = Pipeline::create({needsASeeNext}, ctx); auto depsTracker = pipeline->getDependencies(DepsTracker::kNoMetadata); ASSERT_TRUE(depsTracker.needWholeDocument); @@ -3123,7 +3121,7 @@ TEST_F(PipelineDependenciesTest, ShouldNotRequireWholeDocumentIfAnyStageReturnsE auto ctx = getExpCtx(); auto needsASeeNext = DocumentSourceNeedsASeeNext::create(); auto needsOnlyB = DocumentSourceNeedsOnlyB::create(); - auto pipeline = unittest::assertGet(Pipeline::create({needsASeeNext, needsOnlyB}, ctx)); + auto pipeline = Pipeline::create({needsASeeNext, needsOnlyB}, ctx); auto depsTracker = pipeline->getDependencies(DepsTracker::kNoMetadata); ASSERT_FALSE(depsTracker.needWholeDocument); @@ -3136,7 +3134,7 @@ TEST_F(PipelineDependenciesTest, ShouldNotAddAnyRequiredFieldsAfterFirstStageWit auto ctx = getExpCtx(); auto needsOnlyB = DocumentSourceNeedsOnlyB::create(); auto needsASeeNext = DocumentSourceNeedsASeeNext::create(); - auto pipeline = unittest::assertGet(Pipeline::create({needsOnlyB, needsASeeNext}, ctx)); + auto pipeline = Pipeline::create({needsOnlyB, needsASeeNext}, ctx); auto depsTracker = pipeline->getDependencies(DepsTracker::kAllMetadata); ASSERT_FALSE(depsTracker.needWholeDocument); @@ -3150,7 +3148,7 @@ TEST_F(PipelineDependenciesTest, ShouldNotAddAnyRequiredFieldsAfterFirstStageWit TEST_F(PipelineDependenciesTest, ShouldNotRequireTextScoreIfThereIsNoScoreAvailable) { auto ctx = getExpCtx(); - auto pipeline = unittest::assertGet(Pipeline::create({}, ctx)); + auto pipeline = Pipeline::create({}, ctx); auto depsTracker = pipeline->getDependencies(DepsTracker::kAllMetadata); ASSERT_FALSE(depsTracker.getNeedsMetadata(DocumentMetadataFields::kTextScore)); @@ -3159,21 +3157,21 @@ TEST_F(PipelineDependenciesTest, ShouldNotRequireTextScoreIfThereIsNoScoreAvaila TEST_F(PipelineDependenciesTest, ShouldThrowIfTextScoreIsNeededButNotPresent) { auto ctx = getExpCtx(); auto needsText = DocumentSourceNeedsOnlyTextScore::create(); - auto pipeline = unittest::assertGet(Pipeline::create({needsText}, ctx)); + auto pipeline = Pipeline::create({needsText}, ctx); ASSERT_THROWS(pipeline->getDependencies(DepsTracker::kAllMetadata), AssertionException); } TEST_F(PipelineDependenciesTest, ShouldRequireTextScoreIfAvailableAndNoStageReturnsExhaustiveMeta) { auto ctx = getExpCtx(); - auto pipeline = unittest::assertGet(Pipeline::create({}, ctx)); + auto pipeline = Pipeline::create({}, ctx); auto depsTracker = pipeline->getDependencies(DepsTracker::kAllMetadata & ~DepsTracker::kOnlyTextScore); ASSERT_TRUE(depsTracker.getNeedsMetadata(DocumentMetadataFields::kTextScore)); auto needsASeeNext = DocumentSourceNeedsASeeNext::create(); - pipeline = unittest::assertGet(Pipeline::create({needsASeeNext}, ctx)); + pipeline = Pipeline::create({needsASeeNext}, ctx); depsTracker = pipeline->getDependencies(DepsTracker::kAllMetadata & ~DepsTracker::kOnlyTextScore); ASSERT_TRUE(depsTracker.getNeedsMetadata(DocumentMetadataFields::kTextScore)); @@ -3183,7 +3181,7 @@ TEST_F(PipelineDependenciesTest, ShouldNotRequireTextScoreIfAvailableButDefinite auto ctx = getExpCtx(); auto stripsTextScore = DocumentSourceStripsTextScore::create(); auto needsText = DocumentSourceNeedsOnlyTextScore::create(); - auto pipeline = unittest::assertGet(Pipeline::create({stripsTextScore, needsText}, ctx)); + auto pipeline = Pipeline::create({stripsTextScore, needsText}, ctx); auto depsTracker = pipeline->getDependencies(DepsTracker::kAllMetadata & ~DepsTracker::kOnlyTextScore); @@ -3198,8 +3196,7 @@ TEST_F(PipelineDependenciesTest, ShouldNotRequireTextScoreIfAvailableButDefinite namespace { TEST(PipelineRenameTracking, ReportsIdentityMapWhenEmpty) { boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest()); - auto pipeline = - unittest::assertGet(Pipeline::create({DocumentSourceMock::createForTest()}, expCtx)); + auto pipeline = Pipeline::create({DocumentSourceMock::createForTest()}, expCtx); { // Tracking renames backwards. auto renames = semantic_analysis::renamedPaths( @@ -3243,8 +3240,8 @@ TEST(PipelineRenameTracking, ReportsIdentityWhenNoStageModifiesAnything) { boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest()); { // Tracking renames backwards. - auto pipeline = unittest::assertGet(Pipeline::create( - {DocumentSourceMock::createForTest(), NoModifications::create()}, expCtx)); + auto pipeline = Pipeline::create( + {DocumentSourceMock::createForTest(), NoModifications::create()}, expCtx); auto renames = semantic_analysis::renamedPaths( pipeline->getSources().crbegin(), pipeline->getSources().crend(), {"a", "b", "c.d"}); ASSERT(static_cast<bool>(renames)); @@ -3256,8 +3253,8 @@ TEST(PipelineRenameTracking, ReportsIdentityWhenNoStageModifiesAnything) { } { // Tracking renames forwards. - auto pipeline = unittest::assertGet(Pipeline::create( - {DocumentSourceMock::createForTest(), NoModifications::create()}, expCtx)); + auto pipeline = Pipeline::create( + {DocumentSourceMock::createForTest(), NoModifications::create()}, expCtx); auto renames = semantic_analysis::renamedPaths( pipeline->getSources().cbegin(), pipeline->getSources().cend(), {"a", "b", "c.d"}); ASSERT(static_cast<bool>(renames)); @@ -3269,11 +3266,11 @@ TEST(PipelineRenameTracking, ReportsIdentityWhenNoStageModifiesAnything) { } { // Tracking renames backwards. - auto pipeline = unittest::assertGet(Pipeline::create({DocumentSourceMock::createForTest(), - NoModifications::create(), - NoModifications::create(), - NoModifications::create()}, - expCtx)); + auto pipeline = Pipeline::create({DocumentSourceMock::createForTest(), + NoModifications::create(), + NoModifications::create(), + NoModifications::create()}, + expCtx); auto renames = semantic_analysis::renamedPaths( pipeline->getSources().crbegin(), pipeline->getSources().crend(), {"a", "b", "c.d"}); auto nameMap = *renames; @@ -3284,11 +3281,11 @@ TEST(PipelineRenameTracking, ReportsIdentityWhenNoStageModifiesAnything) { } { // Tracking renames forwards. - auto pipeline = unittest::assertGet(Pipeline::create({DocumentSourceMock::createForTest(), - NoModifications::create(), - NoModifications::create(), - NoModifications::create()}, - expCtx)); + auto pipeline = Pipeline::create({DocumentSourceMock::createForTest(), + NoModifications::create(), + NoModifications::create(), + NoModifications::create()}, + expCtx); auto renames = semantic_analysis::renamedPaths( pipeline->getSources().cbegin(), pipeline->getSources().cend(), {"a", "b", "c.d"}); auto nameMap = *renames; @@ -3316,11 +3313,11 @@ public: TEST(PipelineRenameTracking, DoesNotReportRenamesIfAStageDoesNotSupportTrackingThem) { boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest()); - auto pipeline = unittest::assertGet(Pipeline::create({DocumentSourceMock::createForTest(), - NoModifications::create(), - NotSupported::create(), - NoModifications::create()}, - expCtx)); + auto pipeline = Pipeline::create({DocumentSourceMock::createForTest(), + NoModifications::create(), + NotSupported::create(), + NoModifications::create()}, + expCtx); // Backwards case. ASSERT_FALSE(static_cast<bool>(semantic_analysis::renamedPaths( pipeline->getSources().crbegin(), pipeline->getSources().crend(), {"a"}))); @@ -3350,8 +3347,8 @@ public: TEST(PipelineRenameTracking, ReportsNewNamesWhenSingleStageRenames) { boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest()); - auto pipeline = unittest::assertGet( - Pipeline::create({DocumentSourceMock::createForTest(), RenamesAToB::create()}, expCtx)); + auto pipeline = + Pipeline::create({DocumentSourceMock::createForTest(), RenamesAToB::create()}, expCtx); { // Tracking backwards. auto renames = semantic_analysis::renamedPaths( @@ -3416,8 +3413,8 @@ TEST(PipelineRenameTracking, ReportsNewNamesWhenSingleStageRenames) { TEST(PipelineRenameTracking, ReportsIdentityMapWhenGivenEmptyIteratorRange) { boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest()); - auto pipeline = unittest::assertGet( - Pipeline::create({DocumentSourceMock::createForTest(), RenamesAToB::create()}, expCtx)); + auto pipeline = + Pipeline::create({DocumentSourceMock::createForTest(), RenamesAToB::create()}, expCtx); { // Tracking backwards. auto renames = semantic_analysis::renamedPaths( @@ -3474,9 +3471,9 @@ TEST(PipelineRenameTracking, ReportsNewNameAcrossMultipleRenames) { boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest()); { // Tracking backwards. - auto pipeline = unittest::assertGet(Pipeline::create( + auto pipeline = Pipeline::create( {DocumentSourceMock::createForTest(), RenamesAToB::create(), RenamesBToC::create()}, - expCtx)); + expCtx); auto stages = pipeline->getSources(); auto renames = semantic_analysis::renamedPaths(stages.crbegin(), stages.crend(), {"c"}); ASSERT(static_cast<bool>(renames)); @@ -3486,9 +3483,9 @@ TEST(PipelineRenameTracking, ReportsNewNameAcrossMultipleRenames) { } { // Tracking forwards. - auto pipeline = unittest::assertGet(Pipeline::create( + auto pipeline = Pipeline::create( {DocumentSourceMock::createForTest(), RenamesAToB::create(), RenamesBToC::create()}, - expCtx)); + expCtx); auto stages = pipeline->getSources(); auto renames = semantic_analysis::renamedPaths(stages.cbegin(), stages.cend(), {"a"}); ASSERT(static_cast<bool>(renames)); @@ -3513,9 +3510,9 @@ TEST(PipelineRenameTracking, CanHandleBackAndForthRename) { boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest()); { // Tracking backwards. - auto pipeline = unittest::assertGet(Pipeline::create( + auto pipeline = Pipeline::create( {DocumentSourceMock::createForTest(), RenamesAToB::create(), RenamesBToA::create()}, - expCtx)); + expCtx); auto stages = pipeline->getSources(); auto renames = semantic_analysis::renamedPaths(stages.crbegin(), stages.crend(), {"a"}); ASSERT(static_cast<bool>(renames)); @@ -3525,9 +3522,9 @@ TEST(PipelineRenameTracking, CanHandleBackAndForthRename) { } { // Tracking forwards. - auto pipeline = unittest::assertGet(Pipeline::create( + auto pipeline = Pipeline::create( {DocumentSourceMock::createForTest(), RenamesAToB::create(), RenamesBToA::create()}, - expCtx)); + expCtx); auto stages = pipeline->getSources(); auto renames = semantic_analysis::renamedPaths(stages.cbegin(), stages.cend(), {"a"}); ASSERT(static_cast<bool>(renames)); @@ -3539,12 +3536,12 @@ TEST(PipelineRenameTracking, CanHandleBackAndForthRename) { TEST(InvolvedNamespacesTest, NoInvolvedNamespacesForMatchSortProject) { boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest()); - auto pipeline = unittest::assertGet(Pipeline::create( + auto pipeline = Pipeline::create( {DocumentSourceMock::createForTest(), DocumentSourceMatch::create(BSON("x" << 1), expCtx), DocumentSourceSort::create(expCtx, BSON("y" << -1)), DocumentSourceProject::create(BSON("x" << 1 << "y" << 1), expCtx, "$project"_sd)}, - expCtx)); + expCtx); auto involvedNssSet = pipeline->getInvolvedCollections(); ASSERT(involvedNssSet.empty()); } @@ -3556,10 +3553,10 @@ TEST(InvolvedNamespacesTest, IncludesLookupNamespace) { expCtx->setResolvedNamespace(lookupNss, {resolvedNss, vector<BSONObj>{}}); auto lookupSpec = fromjson("{$lookup: {from: 'foo', as: 'x', localField: 'foo_id', foreignField: '_id'}}"); - auto pipeline = unittest::assertGet( + auto pipeline = Pipeline::create({DocumentSourceMock::createForTest(), DocumentSourceLookUp::createFromBson(lookupSpec.firstElement(), expCtx)}, - expCtx)); + expCtx); auto involvedNssSet = pipeline->getInvolvedCollections(); ASSERT_EQ(involvedNssSet.size(), 1UL); @@ -3579,10 +3576,10 @@ TEST(InvolvedNamespacesTest, IncludesGraphLookupNamespace) { " connectToField: 'y'," " startWith: '$start'" "}}"); - auto pipeline = unittest::assertGet(Pipeline::create( + auto pipeline = Pipeline::create( {DocumentSourceMock::createForTest(), DocumentSourceGraphLookUp::createFromBson(graphLookupSpec.firstElement(), expCtx)}, - expCtx)); + expCtx); auto involvedNssSet = pipeline->getInvolvedCollections(); ASSERT_EQ(involvedNssSet.size(), 1UL); @@ -3603,10 +3600,10 @@ TEST(InvolvedNamespacesTest, IncludesLookupSubpipelineNamespaces) { " as: 'x', " " pipeline: [{$lookup: {from: 'foo_inner', as: 'y', pipeline: []}}]" "}}"); - auto pipeline = unittest::assertGet( + auto pipeline = Pipeline::create({DocumentSourceMock::createForTest(), DocumentSourceLookUp::createFromBson(lookupSpec.firstElement(), expCtx)}, - expCtx)); + expCtx); auto involvedNssSet = pipeline->getInvolvedCollections(); ASSERT_EQ(involvedNssSet.size(), 2UL); @@ -3634,10 +3631,10 @@ TEST(InvolvedNamespacesTest, IncludesGraphLookupSubPipeline) { " connectToField: 'y'," " startWith: '$start'" "}}"); - auto pipeline = unittest::assertGet(Pipeline::create( + auto pipeline = Pipeline::create( {DocumentSourceMock::createForTest(), DocumentSourceGraphLookUp::createFromBson(graphLookupSpec.firstElement(), expCtx)}, - expCtx)); + expCtx); auto involvedNssSet = pipeline->getInvolvedCollections(); ASSERT_EQ(involvedNssSet.size(), 2UL); @@ -3677,10 +3674,10 @@ TEST(InvolvedNamespacesTest, IncludesAllCollectionsWhenResolvingViews) { " }}" " ]" "}}"); - auto pipeline = unittest::assertGet( + auto pipeline = Pipeline::create({DocumentSourceMock::createForTest(), DocumentSourceFacet::createFromBson(facetSpec.firstElement(), expCtx)}, - expCtx)); + expCtx); auto involvedNssSet = pipeline->getInvolvedCollections(); ASSERT_EQ(involvedNssSet.size(), 3UL); diff --git a/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp index 782ad6e56ef..b88b40e989d 100644 --- a/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp @@ -38,7 +38,6 @@ namespace mongo { - std::unique_ptr<Pipeline, PipelineDeleter> StubLookupSingleDocumentProcessInterface::attachCursorSourceToPipelineForLocalRead( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline) { diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index e288214f60d..291b477341e 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -627,7 +627,7 @@ SplitPipeline splitPipeline(std::unique_ptr<Pipeline, PipelineDeleter> pipeline) Pipeline::SourceContainer shardStages; boost::optional<BSONObj> inputsSort = findSplitPoint(&shardStages, mergePipeline.get()); - auto shardsPipeline = uassertStatusOK(Pipeline::create(std::move(shardStages), expCtx)); + auto shardsPipeline = Pipeline::create(std::move(shardStages), expCtx); // The order in which optimizations are applied can have significant impact on the efficiency of // the final pipeline. Be Careful! @@ -987,7 +987,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors( } else { // We have not split the pipeline, and will execute entirely on the remote shards. Set up an // empty local pipeline which we will attach the merge cursors stage to. - mergePipeline = uassertStatusOK(Pipeline::parse(std::vector<BSONObj>(), expCtx)); + mergePipeline = Pipeline::parse(std::vector<BSONObj>(), expCtx); } addMergeCursorsSource(mergePipeline.get(), diff --git a/src/mongo/db/update/pipeline_executor.cpp b/src/mongo/db/update/pipeline_executor.cpp index 219f1ab6453..241143af138 100644 --- a/src/mongo/db/update/pipeline_executor.cpp +++ b/src/mongo/db/update/pipeline_executor.cpp @@ -67,7 +67,7 @@ PipelineExecutor::PipelineExecutor(const boost::intrusive_ptr<ExpressionContext> } _expCtx->setResolvedNamespaces(resolvedNamespaces); - _pipeline = uassertStatusOK(Pipeline::parse(pipeline, _expCtx)); + _pipeline = Pipeline::parse(pipeline, _expCtx); // Validate the update pipeline. for (auto&& stage : _pipeline->getSources()) { diff --git a/src/mongo/db/views/view_catalog.cpp b/src/mongo/db/views/view_catalog.cpp index 01a104b14d3..ca81e4c5ac8 100644 --- a/src/mongo/db/views/view_catalog.cpp +++ b/src/mongo/db/views/view_catalog.cpp @@ -342,29 +342,38 @@ StatusWith<stdx::unordered_set<NamespaceString>> ViewCatalog::_validatePipeline( // to apply some additional checks. expCtx->isParsingViewDefinition = true; - auto pipelineStatus = Pipeline::parse(viewDef.pipeline(), std::move(expCtx)); - if (!pipelineStatus.isOK()) { - return pipelineStatus.getStatus(); - } - - // Validate that the view pipeline does not contain any ineligible stages. - const auto& sources = pipelineStatus.getValue()->getSources(); - if (!sources.empty()) { - const auto firstPersistentStage = - std::find_if(sources.begin(), sources.end(), [](const auto& source) { - return source->constraints().writesPersistentData(); + try { + auto pipeline = + Pipeline::parse(viewDef.pipeline(), std::move(expCtx), [&](const Pipeline& pipeline) { + // Validate that the view pipeline does not contain any ineligible stages. + const auto& sources = pipeline.getSources(); + const auto firstPersistentStage = + std::find_if(sources.begin(), sources.end(), [](const auto& source) { + return source->constraints().writesPersistentData(); + }); + + uassert(ErrorCodes::OptionNotSupportedOnView, + str::stream() + << "The aggregation stage " + << firstPersistentStage->get()->getSourceName() << " in location " + << std::distance(sources.begin(), firstPersistentStage) + << " of the pipeline cannot be used in the view definition of " + << viewDef.name().ns() << " because it writes to disk", + firstPersistentStage == sources.end()); + + uassert(ErrorCodes::OptionNotSupportedOnView, + "$changeStream cannot be used in a view definition", + sources.empty() || !sources.front()->constraints().isChangeStreamStage()); + + std::for_each(sources.begin(), sources.end(), [](auto& stage) { + uassert(ErrorCodes::InvalidNamespace, + str::stream() << "'" << stage->getSourceName() + << "' cannot be used in a view definition", + !stage->constraints().isIndependentOfAnyCollection); + }); }); - if (sources.front()->constraints().isChangeStreamStage()) { - return {ErrorCodes::OptionNotSupportedOnView, - "$changeStream cannot be used in a view definition"}; - } else if (firstPersistentStage != sources.end()) { - mongo::StringBuilder errorMessage; - errorMessage << "The aggregation stage " << firstPersistentStage->get()->getSourceName() - << " in location " << std::distance(sources.begin(), firstPersistentStage) - << " of the pipeline cannot be used in the view definition of " - << viewDef.name().ns() << " because it writes to disk"; - return {ErrorCodes::OptionNotSupportedOnView, errorMessage.str()}; - } + } catch (const DBException& ex) { + return ex.toStatus(); } return std::move(involvedNamespaces); diff --git a/src/mongo/db/views/view_catalog_test.cpp b/src/mongo/db/views/view_catalog_test.cpp index 7eaab9fc6c7..c186573e437 100644 --- a/src/mongo/db/views/view_catalog_test.cpp +++ b/src/mongo/db/views/view_catalog_test.cpp @@ -269,7 +269,7 @@ TEST_F(ViewCatalogFixture, CreateViewWithPipelineFailsOnInvalidStageName) { AssertionException); } -TEST_F(ReplViewCatalogFixture, CreateViewWithPipelineFailsOnIneligibleStage) { +TEST_F(ReplViewCatalogFixture, CreateViewWithPipelineFailsOnChangeStreamsStage) { const NamespaceString viewName("db.view"); const NamespaceString viewOn("db.coll"); @@ -282,6 +282,18 @@ TEST_F(ReplViewCatalogFixture, CreateViewWithPipelineFailsOnIneligibleStage) { ErrorCodes::OptionNotSupportedOnView); } +TEST_F(ReplViewCatalogFixture, CreateViewWithPipelineFailsOnCollectionlessStage) { + const NamespaceString viewName("db.view"); + const NamespaceString viewOn("db.coll"); + + auto invalidPipeline = BSON_ARRAY(BSON("$currentOp" << BSONObj())); + + ASSERT_THROWS_CODE( + createView(operationContext(), viewName, viewOn, invalidPipeline, emptyCollation), + AssertionException, + ErrorCodes::InvalidNamespace); +} + TEST_F(ReplViewCatalogFixture, CreateViewWithPipelineFailsOnIneligibleStagePersistentWrite) { const NamespaceString viewName("db.view"); const NamespaceString viewOn("db.coll"); diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp index 92e99e1d3ae..2cc7026e4b9 100644 --- a/src/mongo/dbtests/query_plan_executor.cpp +++ b/src/mongo/dbtests/query_plan_executor.cpp @@ -215,7 +215,7 @@ TEST_F(PlanExecutorTest, DropIndexScanAgg) { // in the pipeline. innerExec->saveState(); auto cursorSource = DocumentSourceCursor::create(collection, std::move(innerExec), expCtx); - auto pipeline = assertGet(Pipeline::create({cursorSource}, expCtx)); + auto pipeline = Pipeline::create({cursorSource}, expCtx); // Create the output PlanExecutor that pulls results from the pipeline. auto ws = std::make_unique<WorkingSet>(); diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 61c3d034f9b..a7a111cfc72 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -266,7 +266,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, opCtx, request, collationObj, uuid, resolveInvolvedNamespaces(involvedNamespaces)); // Parse and optimize the full pipeline. - auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx)); + auto pipeline = Pipeline::parse(request.getPipeline(), expCtx); pipeline->optimizePipeline(); return pipeline; }; diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 0a0eb8f109c..c3281cff249 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -374,8 +374,8 @@ DispatchShardPipelineResults dispatchExchangeConsumerPipeline( } // Create a pipeline for a consumer and add the merging stage. - auto consumerPipeline = uassertStatusOK(Pipeline::create( - shardDispatchResults->splitPipeline->mergePipeline->getSources(), expCtx)); + auto consumerPipeline = Pipeline::create( + shardDispatchResults->splitPipeline->mergePipeline->getSources(), expCtx); sharded_agg_helpers::addMergeCursorsSource( consumerPipeline.get(), @@ -409,7 +409,7 @@ DispatchShardPipelineResults dispatchExchangeConsumerPipeline( // The merging pipeline is just a union of the results from each of the shards involved on the // consumer side of the exchange. - auto mergePipeline = uassertStatusOK(Pipeline::create({}, expCtx)); + auto mergePipeline = Pipeline::create({}, expCtx); mergePipeline->setSplitState(Pipeline::SplitState::kSplitForMerge); SplitPipeline splitPipeline{nullptr, std::move(mergePipeline), boost::none}; diff --git a/src/mongo/s/query/cluster_exchange_test.cpp b/src/mongo/s/query/cluster_exchange_test.cpp index a49ff306214..cde513be35a 100644 --- a/src/mongo/s/query/cluster_exchange_test.cpp +++ b/src/mongo/s/query/cluster_exchange_test.cpp @@ -132,12 +132,10 @@ protected: TEST_F(ClusterExchangeTest, ShouldNotExchangeIfPipelineDoesNotEndWithOut) { setupNShards(2); - auto mergePipe = - unittest::assertGet(Pipeline::create({DocumentSourceLimit::create(expCtx(), 1)}, expCtx())); + auto mergePipe = Pipeline::create({DocumentSourceLimit::create(expCtx(), 1)}, expCtx()); ASSERT_FALSE( sharded_agg_helpers::checkIfEligibleForExchange(operationContext(), mergePipe.get())); - mergePipe = unittest::assertGet( - Pipeline::create({DocumentSourceMatch::create(BSONObj(), expCtx())}, expCtx())); + mergePipe = Pipeline::create({DocumentSourceMatch::create(BSONObj(), expCtx())}, expCtx()); ASSERT_FALSE( sharded_agg_helpers::checkIfEligibleForExchange(operationContext(), mergePipe.get())); } @@ -150,24 +148,22 @@ TEST_F(ClusterExchangeTest, ShouldNotExchangeIfPipelineEndsWithOut) { expCtx()->mongoProcessInterface = std::make_shared<StubMongoProcessInterface>(); ON_BLOCK_EXIT([&]() { expCtx()->mongoProcessInterface = originalMongoProcessInterface; }); - auto mergePipe = unittest::assertGet( - Pipeline::create({DocumentSourceOut::create(kTestOutNss, expCtx())}, expCtx())); + auto mergePipe = Pipeline::create({DocumentSourceOut::create(kTestOutNss, expCtx())}, expCtx()); ASSERT_FALSE( sharded_agg_helpers::checkIfEligibleForExchange(operationContext(), mergePipe.get())); } TEST_F(ClusterExchangeTest, SingleMergeStageNotEligibleForExchangeIfOutputDatabaseDoesNotExist) { setupNShards(2); - auto mergePipe = unittest::assertGet( - Pipeline::create({DocumentSourceMerge::create(kTestOutNss, - expCtx(), - WhenMatched::kFail, - WhenNotMatched::kInsert, - _mergeLetVariables, - _mergePipeline, - _mergeOnFields, - _mergeTargetCollectionVersion)}, - expCtx())); + auto mergePipe = Pipeline::create({DocumentSourceMerge::create(kTestOutNss, + expCtx(), + WhenMatched::kFail, + WhenNotMatched::kInsert, + _mergeLetVariables, + _mergePipeline, + _mergeOnFields, + _mergeTargetCollectionVersion)}, + expCtx()); auto future = launchAsync([&] { ASSERT_THROWS_CODE( @@ -187,17 +183,16 @@ TEST_F(ClusterExchangeTest, SingleMergeStageNotEligibleForExchangeIfOutputDataba // cannot insert an $exchange. The $merge stage should later create a new, unsharded collection. TEST_F(ClusterExchangeTest, SingleMergeStageNotEligibleForExchangeIfOutputCollectionDoesNotExist) { setupNShards(2); - auto mergePipe = unittest::assertGet( - Pipeline::create({DocumentSourceMerge::create(kTestOutNss, - expCtx(), - WhenMatched::kFail, - WhenNotMatched::kInsert, - _mergeLetVariables, - _mergePipeline, - _mergeOnFields, - _mergeTargetCollectionVersion)}, - - expCtx())); + auto mergePipe = Pipeline::create({DocumentSourceMerge::create(kTestOutNss, + expCtx(), + WhenMatched::kFail, + WhenNotMatched::kInsert, + _mergeLetVariables, + _mergePipeline, + _mergeOnFields, + _mergeTargetCollectionVersion)}, + + expCtx()); auto future = launchAsync([&] { ASSERT_FALSE( @@ -217,17 +212,16 @@ TEST_F(ClusterExchangeTest, LimitFollowedByMergeStageIsNotEligibleForExchange) { setupNShards(2); loadRoutingTableWithTwoChunksAndTwoShards(kTestOutNss); - auto mergePipe = unittest::assertGet( - Pipeline::create({DocumentSourceLimit::create(expCtx(), 6), - DocumentSourceMerge::create(kTestOutNss, - expCtx(), - WhenMatched::kFail, - WhenNotMatched::kInsert, - _mergeLetVariables, - _mergePipeline, - _mergeOnFields, - _mergeTargetCollectionVersion)}, - expCtx())); + auto mergePipe = Pipeline::create({DocumentSourceLimit::create(expCtx(), 6), + DocumentSourceMerge::create(kTestOutNss, + expCtx(), + WhenMatched::kFail, + WhenNotMatched::kInsert, + _mergeLetVariables, + _mergePipeline, + _mergeOnFields, + _mergeTargetCollectionVersion)}, + expCtx()); auto future = launchAsync([&] { ASSERT_FALSE( @@ -242,17 +236,16 @@ TEST_F(ClusterExchangeTest, GroupFollowedByMergeIsEligbleForExchange) { setupNShards(2); loadRoutingTableWithTwoChunksAndTwoShards(kTestOutNss); - auto mergePipe = unittest::assertGet( - Pipeline::create({parse("{$group: {_id: '$x', $doingMerge: true}}"), - DocumentSourceMerge::create(kTestOutNss, - expCtx(), - WhenMatched::kFail, - WhenNotMatched::kInsert, - _mergeLetVariables, - _mergePipeline, - _mergeOnFields, - _mergeTargetCollectionVersion)}, - expCtx())); + auto mergePipe = Pipeline::create({parse("{$group: {_id: '$x', $doingMerge: true}}"), + DocumentSourceMerge::create(kTestOutNss, + expCtx(), + WhenMatched::kFail, + WhenNotMatched::kInsert, + _mergeLetVariables, + _mergePipeline, + _mergeOnFields, + _mergeTargetCollectionVersion)}, + expCtx()); auto future = launchAsync([&] { auto exchangeSpec = @@ -277,19 +270,18 @@ TEST_F(ClusterExchangeTest, RenamesAreEligibleForExchange) { setupNShards(2); loadRoutingTableWithTwoChunksAndTwoShards(kTestOutNss); - auto mergePipe = unittest::assertGet( - Pipeline::create({parse("{$group: {_id: '$x', $doingMerge: true}}"), - parse("{$project: {temporarily_renamed: '$_id'}}"), - parse("{$project: {_id: '$temporarily_renamed'}}"), - DocumentSourceMerge::create(kTestOutNss, - expCtx(), - WhenMatched::kFail, - WhenNotMatched::kInsert, - _mergeLetVariables, - _mergePipeline, - _mergeOnFields, - _mergeTargetCollectionVersion)}, - expCtx())); + auto mergePipe = Pipeline::create({parse("{$group: {_id: '$x', $doingMerge: true}}"), + parse("{$project: {temporarily_renamed: '$_id'}}"), + parse("{$project: {_id: '$temporarily_renamed'}}"), + DocumentSourceMerge::create(kTestOutNss, + expCtx(), + WhenMatched::kFail, + WhenNotMatched::kInsert, + _mergeLetVariables, + _mergePipeline, + _mergeOnFields, + _mergeTargetCollectionVersion)}, + expCtx()); auto future = launchAsync([&] { auto exchangeSpec = @@ -318,18 +310,17 @@ TEST_F(ClusterExchangeTest, MatchesAreEligibleForExchange) { setupNShards(2); loadRoutingTableWithTwoChunksAndTwoShards(kTestOutNss); - auto mergePipe = unittest::assertGet( - Pipeline::create({parse("{$group: {_id: '$x', $doingMerge: true}}"), - parse("{$match: {_id: {$gte: 0}}}"), - DocumentSourceMerge::create(kTestOutNss, - expCtx(), - WhenMatched::kFail, - WhenNotMatched::kInsert, - _mergeLetVariables, - _mergePipeline, - _mergeOnFields, - _mergeTargetCollectionVersion)}, - expCtx())); + auto mergePipe = Pipeline::create({parse("{$group: {_id: '$x', $doingMerge: true}}"), + parse("{$match: {_id: {$gte: 0}}}"), + DocumentSourceMerge::create(kTestOutNss, + expCtx(), + WhenMatched::kFail, + WhenNotMatched::kInsert, + _mergeLetVariables, + _mergePipeline, + _mergeOnFields, + _mergeTargetCollectionVersion)}, + expCtx()); auto future = launchAsync([&] { auto exchangeSpec = @@ -364,17 +355,16 @@ TEST_F(ClusterExchangeTest, SortThenGroupIsEligibleForExchange) { // {$out: {to: "sharded_by_id", mode: "replaceDocuments"}}]. // No $sort stage appears in the merging half since we'd expect that to be absorbed by the // $mergeCursors and AsyncResultsMerger. - auto mergePipe = unittest::assertGet( - Pipeline::create({parse("{$group: {_id: '$x'}}"), - DocumentSourceMerge::create(kTestOutNss, - expCtx(), - WhenMatched::kFail, - WhenNotMatched::kInsert, - _mergeLetVariables, - _mergePipeline, - _mergeOnFields, - _mergeTargetCollectionVersion)}, - expCtx())); + auto mergePipe = Pipeline::create({parse("{$group: {_id: '$x'}}"), + DocumentSourceMerge::create(kTestOutNss, + expCtx(), + WhenMatched::kFail, + WhenNotMatched::kInsert, + _mergeLetVariables, + _mergePipeline, + _mergeOnFields, + _mergeTargetCollectionVersion)}, + expCtx()); auto future = launchAsync([&] { auto exchangeSpec = @@ -409,17 +399,16 @@ TEST_F(ClusterExchangeTest, SortThenGroupIsEligibleForExchangeHash) { // {$merge: {into: "sharded_by_id", whenMatched: "fail", whenNotMatched: "insert"}}]. // No $sort stage appears in the merging half since we'd expect that to be absorbed by the // $mergeCursors and AsyncResultsMerger. - auto mergePipe = unittest::assertGet( - Pipeline::create({parse("{$group: {_id: '$x'}}"), - DocumentSourceMerge::create(kTestOutNss, - expCtx(), - WhenMatched::kFail, - WhenNotMatched::kInsert, - _mergeLetVariables, - _mergePipeline, - _mergeOnFields, - _mergeTargetCollectionVersion)}, - expCtx())); + auto mergePipe = Pipeline::create({parse("{$group: {_id: '$x'}}"), + DocumentSourceMerge::create(kTestOutNss, + expCtx(), + WhenMatched::kFail, + WhenNotMatched::kInsert, + _mergeLetVariables, + _mergePipeline, + _mergeOnFields, + _mergeTargetCollectionVersion)}, + expCtx()); auto future = launchAsync([&] { auto exchangeSpec = @@ -450,7 +439,7 @@ TEST_F(ClusterExchangeTest, ProjectThroughDottedFieldDoesNotPreserveShardKey) { setupNShards(2); loadRoutingTableWithTwoChunksAndTwoShards(kTestOutNss); - auto mergePipe = unittest::assertGet(Pipeline::create( + auto mergePipe = Pipeline::create( {parse("{$group: {" " _id: {region: '$region', country: '$country'}," " population: {$sum: '$population'}," @@ -466,7 +455,7 @@ TEST_F(ClusterExchangeTest, ProjectThroughDottedFieldDoesNotPreserveShardKey) { _mergePipeline, _mergeOnFields, _mergeTargetCollectionVersion)}, - expCtx())); + expCtx()); auto future = launchAsync([&] { auto exchangeSpec = @@ -488,21 +477,20 @@ TEST_F(ClusterExchangeTest, WordCountUseCaseExample) { // As an example of a pipeline that might replace a map reduce, imagine that we are performing a // word count, and the shards part of the pipeline tokenized some text field of each document // into {word: <token>, count: 1}. Then this is the merging half of the pipeline: - auto mergePipe = unittest::assertGet( - Pipeline::create({parse("{$group: {" - " _id: '$word'," - " count: {$sum: 1}," - " $doingMerge: true" - "}}"), - DocumentSourceMerge::create(kTestOutNss, - expCtx(), - WhenMatched::kFail, - WhenNotMatched::kInsert, - _mergeLetVariables, - _mergePipeline, - _mergeOnFields, - _mergeTargetCollectionVersion)}, - expCtx())); + auto mergePipe = Pipeline::create({parse("{$group: {" + " _id: '$word'," + " count: {$sum: 1}," + " $doingMerge: true" + "}}"), + DocumentSourceMerge::create(kTestOutNss, + expCtx(), + WhenMatched::kFail, + WhenNotMatched::kInsert, + _mergeLetVariables, + _mergePipeline, + _mergeOnFields, + _mergeTargetCollectionVersion)}, + expCtx()); auto future = launchAsync([&] { auto exchangeSpec = @@ -552,22 +540,21 @@ TEST_F(ClusterExchangeTest, WordCountUseCaseExampleShardedByWord) { // As an example of a pipeline that might replace a map reduce, imagine that we are performing a // word count, and the shards part of the pipeline tokenized some text field of each document // into {word: <token>, count: 1}. Then this is the merging half of the pipeline: - auto mergePipe = unittest::assertGet( - Pipeline::create({parse("{$group: {" - " _id: '$word'," - " count: {$sum: 1}," - " $doingMerge: true" - "}}"), - parse("{$project: {word: '$_id', count: 1}}"), - DocumentSourceMerge::create(kTestOutNss, - expCtx(), - WhenMatched::kFail, - WhenNotMatched::kInsert, - _mergeLetVariables, - _mergePipeline, - _mergeOnFields, - _mergeTargetCollectionVersion)}, - expCtx())); + auto mergePipe = Pipeline::create({parse("{$group: {" + " _id: '$word'," + " count: {$sum: 1}," + " $doingMerge: true" + "}}"), + parse("{$project: {word: '$_id', count: 1}}"), + DocumentSourceMerge::create(kTestOutNss, + expCtx(), + WhenMatched::kFail, + WhenNotMatched::kInsert, + _mergeLetVariables, + _mergePipeline, + _mergeOnFields, + _mergeTargetCollectionVersion)}, + expCtx()); auto future = launchAsync([&] { auto exchangeSpec = @@ -636,21 +623,20 @@ TEST_F(ClusterExchangeTest, CompoundShardKeyThreeShards) { loadRoutingTable(kTestOutNss, epoch, shardKey, chunks); - auto mergePipe = unittest::assertGet( - Pipeline::create({parse("{$group: {" - " _id: '$x'," - " $doingMerge: true" - "}}"), - parse("{$project: {x: '$_id', y: '$_id'}}"), - DocumentSourceMerge::create(kTestOutNss, - expCtx(), - WhenMatched::kFail, - WhenNotMatched::kInsert, - _mergeLetVariables, - _mergePipeline, - _mergeOnFields, - _mergeTargetCollectionVersion)}, - expCtx())); + auto mergePipe = Pipeline::create({parse("{$group: {" + " _id: '$x'," + " $doingMerge: true" + "}}"), + parse("{$project: {x: '$_id', y: '$_id'}}"), + DocumentSourceMerge::create(kTestOutNss, + expCtx(), + WhenMatched::kFail, + WhenNotMatched::kInsert, + _mergeLetVariables, + _mergePipeline, + _mergeOnFields, + _mergeTargetCollectionVersion)}, + expCtx()); auto future = launchAsync([&] { auto exchangeSpec = |