summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorNicholas Zolnierz <nicholas.zolnierz@mongodb.com>2020-02-19 22:58:38 +0000
committerevergreen <evergreen@mongodb.com>2020-02-19 22:58:38 +0000
commitb06b7d7dc5badc18c2977ee22ecb8ad339f5f27a (patch)
tree63fc225aee7ed58fd3bd59f310acb181a2d04b67 /src/mongo/db/pipeline
parentc54a777a4a154984f5595b11993d7d009350a38c (diff)
downloadmongo-b06b7d7dc5badc18c2977ee22ecb8ad339f5f27a.tar.gz
SERVER-46015 Cleanup Pipeline parsing for aggregation stages with child pipelines
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_exchange_test.cpp70
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp19
-rw-r--r--src/mongo/db/pipeline/document_source_facet_test.cpp74
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp21
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors_test.cpp10
-rw-r--r--src/mongo/db/pipeline/document_source_plan_cache_stats_test.cpp10
-rw-r--r--src/mongo/db/pipeline/document_source_union_with.cpp15
-rw-r--r--src/mongo/db/pipeline/document_source_union_with.h14
-rw-r--r--src/mongo/db/pipeline/document_source_union_with_test.cpp52
-rw-r--r--src/mongo/db/pipeline/expression_walker_test.cpp2
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp210
-rw-r--r--src/mongo/db/pipeline/pipeline.h72
-rw-r--r--src/mongo/db/pipeline/pipeline_metadata_tree_test.cpp4
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp193
-rw-r--r--src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp1
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp4
18 files changed, 321 insertions, 456 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index ae5b676769a..b699c1e3ae7 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -1489,8 +1489,8 @@ TEST_F(ChangeStreamStageTest, TransformationShouldBeAbleToReParseSerializedStage
// equivalent to the original serialization.
//
auto serializedBson = serializedDoc.toBson();
- auto roundTripped = uassertStatusOK(Pipeline::create(
- DSChangeStream::createFromBson(serializedBson.firstElement(), expCtx), expCtx));
+ auto roundTripped = Pipeline::create(
+ DSChangeStream::createFromBson(serializedBson.firstElement(), expCtx), expCtx);
auto newSerialization = roundTripped->serialize();
diff --git a/src/mongo/db/pipeline/document_source_exchange_test.cpp b/src/mongo/db/pipeline/document_source_exchange_test.cpp
index 539cbc5cef9..8639cfd45a6 100644
--- a/src/mongo/db/pipeline/document_source_exchange_test.cpp
+++ b/src/mongo/db/pipeline/document_source_exchange_test.cpp
@@ -172,8 +172,7 @@ TEST_F(DocumentSourceExchangeTest, SimpleExchange1Consumer) {
spec.setConsumers(1);
spec.setBufferSize(1024);
- boost::intrusive_ptr<Exchange> ex =
- new Exchange(spec, unittest::assertGet(Pipeline::create({source}, getExpCtx())));
+ boost::intrusive_ptr<Exchange> ex = new Exchange(spec, Pipeline::create({source}, getExpCtx()));
auto input = ex->getNext(getExpCtx()->opCtx, 0, nullptr);
@@ -198,8 +197,7 @@ TEST_F(DocumentSourceExchangeTest, SimpleExchangeNConsumer) {
spec.setConsumers(nConsumers);
spec.setBufferSize(1024);
- boost::intrusive_ptr<Exchange> ex =
- new Exchange(spec, unittest::assertGet(Pipeline::create({source}, getExpCtx())));
+ boost::intrusive_ptr<Exchange> ex = new Exchange(spec, Pipeline::create({source}, getExpCtx()));
std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex);
std::vector<executor::TaskExecutor::CallbackHandle> handles;
@@ -241,8 +239,7 @@ TEST_F(DocumentSourceExchangeTest, ExchangeNConsumerEarlyout) {
spec.setConsumers(nConsumers);
spec.setBufferSize(1024);
- boost::intrusive_ptr<Exchange> ex =
- new Exchange(spec, unittest::assertGet(Pipeline::create({source}, getExpCtx())));
+ boost::intrusive_ptr<Exchange> ex = new Exchange(spec, Pipeline::create({source}, getExpCtx()));
std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex);
std::vector<executor::TaskExecutor::CallbackHandle> handles;
@@ -291,8 +288,7 @@ TEST_F(DocumentSourceExchangeTest, BroadcastExchangeNConsumer) {
spec.setConsumers(nConsumers);
spec.setBufferSize(1024);
- boost::intrusive_ptr<Exchange> ex =
- new Exchange(spec, unittest::assertGet(Pipeline::create({source}, getExpCtx())));
+ boost::intrusive_ptr<Exchange> ex = new Exchange(spec, Pipeline::create({source}, getExpCtx()));
std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex);
std::vector<executor::TaskExecutor::CallbackHandle> handles;
@@ -339,7 +335,7 @@ TEST_F(DocumentSourceExchangeTest, RangeExchangeNConsumer) {
spec.setBufferSize(1024);
boost::intrusive_ptr<Exchange> ex =
- new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx())));
+ new Exchange(std::move(spec), Pipeline::create({source}, getExpCtx()));
std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex);
std::vector<executor::TaskExecutor::CallbackHandle> handles;
@@ -401,7 +397,7 @@ TEST_F(DocumentSourceExchangeTest, RangeShardingExchangeNConsumer) {
spec.setBufferSize(1024);
boost::intrusive_ptr<Exchange> ex =
- new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx())));
+ new Exchange(std::move(spec), Pipeline::create({source}, getExpCtx()));
std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex);
std::vector<executor::TaskExecutor::CallbackHandle> handles;
@@ -454,7 +450,7 @@ TEST_F(DocumentSourceExchangeTest, RangeRandomExchangeNConsumer) {
spec.setBufferSize(1024);
boost::intrusive_ptr<Exchange> ex =
- new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx())));
+ new Exchange(std::move(spec), Pipeline::create({source}, getExpCtx()));
std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex);
std::vector<executor::TaskExecutor::CallbackHandle> handles;
@@ -522,7 +518,7 @@ TEST_F(DocumentSourceExchangeTest, RandomExchangeNConsumerResourceYielding) {
auto artificalGlobalMutex = MONGO_MAKE_LATCH();
boost::intrusive_ptr<Exchange> ex =
- new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx())));
+ new Exchange(std::move(spec), Pipeline::create({source}, getExpCtx()));
std::vector<ThreadInfo> threads;
for (size_t idx = 0; idx < nConsumers; ++idx) {
@@ -601,7 +597,7 @@ TEST_F(DocumentSourceExchangeTest, RangeRandomHashExchangeNConsumer) {
spec.setBufferSize(1024);
boost::intrusive_ptr<Exchange> ex =
- new Exchange(std::move(spec), unittest::assertGet(Pipeline::create({source}, getExpCtx())));
+ new Exchange(std::move(spec), Pipeline::create({source}, getExpCtx()));
std::vector<ThreadInfo> threads = createNProducers(nConsumers, ex);
std::vector<executor::TaskExecutor::CallbackHandle> handles;
@@ -641,9 +637,7 @@ TEST_F(DocumentSourceExchangeTest, RejectNoConsumers) {
<< "broadcast"
<< "consumers" << 0);
ASSERT_THROWS_CODE(
- Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))),
- AssertionException,
- 50901);
+ Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 50901);
}
TEST_F(DocumentSourceExchangeTest, RejectInvalidKey) {
@@ -651,9 +645,7 @@ TEST_F(DocumentSourceExchangeTest, RejectInvalidKey) {
<< "broadcast"
<< "consumers" << 1 << "key" << BSON("a" << 2));
ASSERT_THROWS_CODE(
- Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))),
- AssertionException,
- 50896);
+ Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 50896);
}
TEST_F(DocumentSourceExchangeTest, RejectInvalidKeyHashExpected) {
@@ -663,9 +655,7 @@ TEST_F(DocumentSourceExchangeTest, RejectInvalidKeyHashExpected) {
<< BSON("a"
<< "nothash"));
ASSERT_THROWS_CODE(
- Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))),
- AssertionException,
- 50895);
+ Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 50895);
}
TEST_F(DocumentSourceExchangeTest, RejectInvalidKeyWrongType) {
@@ -673,9 +663,7 @@ TEST_F(DocumentSourceExchangeTest, RejectInvalidKeyWrongType) {
<< "broadcast"
<< "consumers" << 1 << "key" << BSON("a" << true));
ASSERT_THROWS_CODE(
- Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))),
- AssertionException,
- 50897);
+ Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 50897);
}
TEST_F(DocumentSourceExchangeTest, RejectInvalidKeyEmpty) {
@@ -683,9 +671,7 @@ TEST_F(DocumentSourceExchangeTest, RejectInvalidKeyEmpty) {
<< "broadcast"
<< "consumers" << 1 << "key" << BSON("" << 1));
ASSERT_THROWS_CODE(
- Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))),
- AssertionException,
- 40352);
+ Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 40352);
}
TEST_F(DocumentSourceExchangeTest, RejectInvalidBoundaries) {
@@ -695,9 +681,7 @@ TEST_F(DocumentSourceExchangeTest, RejectInvalidBoundaries) {
<< BSON_ARRAY(BSON("a" << MAXKEY) << BSON("a" << MINKEY)) << "consumerIds"
<< BSON_ARRAY(0));
ASSERT_THROWS_CODE(
- Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))),
- AssertionException,
- 50893);
+ Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 50893);
}
TEST_F(DocumentSourceExchangeTest, RejectInvalidBoundariesMissingMin) {
@@ -707,9 +691,7 @@ TEST_F(DocumentSourceExchangeTest, RejectInvalidBoundariesMissingMin) {
<< BSON_ARRAY(BSON("a" << 0) << BSON("a" << MAXKEY)) << "consumerIds"
<< BSON_ARRAY(0));
ASSERT_THROWS_CODE(
- Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))),
- AssertionException,
- 50958);
+ Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 50958);
}
TEST_F(DocumentSourceExchangeTest, RejectInvalidBoundariesMissingMax) {
@@ -719,9 +701,7 @@ TEST_F(DocumentSourceExchangeTest, RejectInvalidBoundariesMissingMax) {
<< BSON_ARRAY(BSON("a" << MINKEY) << BSON("a" << 0)) << "consumerIds"
<< BSON_ARRAY(0));
ASSERT_THROWS_CODE(
- Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))),
- AssertionException,
- 50959);
+ Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 50959);
}
TEST_F(DocumentSourceExchangeTest, RejectInvalidBoundariesAndConsumerIds) {
@@ -731,9 +711,7 @@ TEST_F(DocumentSourceExchangeTest, RejectInvalidBoundariesAndConsumerIds) {
<< BSON_ARRAY(BSON("a" << MINKEY) << BSON("a" << MAXKEY)) << "consumerIds"
<< BSON_ARRAY(0 << 1));
ASSERT_THROWS_CODE(
- Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))),
- AssertionException,
- 50900);
+ Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 50900);
}
TEST_F(DocumentSourceExchangeTest, RejectInvalidPolicyBoundaries) {
@@ -743,9 +721,7 @@ TEST_F(DocumentSourceExchangeTest, RejectInvalidPolicyBoundaries) {
<< BSON_ARRAY(BSON("a" << MINKEY) << BSON("a" << MAXKEY)) << "consumerIds"
<< BSON_ARRAY(0));
ASSERT_THROWS_CODE(
- Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))),
- AssertionException,
- 50899);
+ Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 50899);
}
TEST_F(DocumentSourceExchangeTest, RejectInvalidConsumerIds) {
@@ -755,9 +731,7 @@ TEST_F(DocumentSourceExchangeTest, RejectInvalidConsumerIds) {
<< BSON_ARRAY(BSON("a" << MINKEY) << BSON("a" << MAXKEY)) << "consumerIds"
<< BSON_ARRAY(1));
ASSERT_THROWS_CODE(
- Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))),
- AssertionException,
- 50894);
+ Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 50894);
}
TEST_F(DocumentSourceExchangeTest, RejectInvalidMissingKeys) {
@@ -767,9 +741,7 @@ TEST_F(DocumentSourceExchangeTest, RejectInvalidMissingKeys) {
<< BSON_ARRAY(BSON("a" << MINKEY) << BSON("a" << MAXKEY)) << "consumerIds"
<< BSON_ARRAY(0));
ASSERT_THROWS_CODE(
- Exchange(parseSpec(spec), unittest::assertGet(Pipeline::create({}, getExpCtx()))),
- AssertionException,
- 50967);
+ Exchange(parseSpec(spec), Pipeline::create({}, getExpCtx())), AssertionException, 50967);
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index 9414a87ad5c..1152e92f5f4 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -291,7 +291,24 @@ intrusive_ptr<DocumentSource> DocumentSourceFacet::createFromBson(
for (auto&& rawFacet : extractRawPipelines(elem)) {
const auto facetName = rawFacet.first;
- auto pipeline = uassertStatusOK(Pipeline::parseFacetPipeline(rawFacet.second, expCtx));
+ auto pipeline = Pipeline::parse(rawFacet.second, expCtx, [](const Pipeline& pipeline) {
+ auto sources = pipeline.getSources();
+ uassert(ErrorCodes::BadValue,
+ "sub-pipeline in $facet stage cannot be empty",
+ !sources.empty());
+
+ std::for_each(sources.begin(), sources.end(), [](auto& stage) {
+ auto stageConstraints = stage->constraints();
+ uassert(40600,
+ str::stream() << stage->getSourceName()
+ << " is not allowed to be used within a $facet stage",
+ stageConstraints.isAllowedInsideFacetStage());
+ // We expect a stage within a $facet stage to have these properties.
+ invariant(stageConstraints.requiredPosition ==
+ StageConstraints::PositionRequirement::kNone);
+ invariant(!stageConstraints.isIndependentOfAnyCollection);
+ });
+ });
// Validate that none of the facet pipelines have any conflicting HostTypeRequirements. This
// verifies both that all stages within each pipeline are consistent, and that the pipelines
diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp
index 98bb537d421..da7718fa5c6 100644
--- a/src/mongo/db/pipeline/document_source_facet_test.cpp
+++ b/src/mongo/db/pipeline/document_source_facet_test.cpp
@@ -257,7 +257,7 @@ public:
TEST_F(DocumentSourceFacetTest, PassthroughFacetDoesntRequireDiskAndIsOKInaTxn) {
auto ctx = getExpCtx();
auto passthrough = DocumentSourcePassthrough::create();
- auto passthroughPipe = uassertStatusOK(Pipeline::createFacetPipeline({passthrough}, ctx));
+ auto passthroughPipe = Pipeline::create({passthrough}, ctx);
std::vector<DocumentSourceFacet::FacetPipeline> facets;
facets.emplace_back("passthrough", std::move(passthroughPipe));
@@ -295,7 +295,7 @@ public:
TEST_F(DocumentSourceFacetTest, FacetWithChildThatWritesDataAlsoReportsWritingData) {
auto ctx = getExpCtx();
auto writesDataStage = DocumentSourceWritesPersistentData::create();
- auto pipeline = uassertStatusOK(Pipeline::createFacetPipeline({writesDataStage}, ctx));
+ auto pipeline = Pipeline::create({writesDataStage}, ctx);
std::vector<DocumentSourceFacet::FacetPipeline> facets;
facets.emplace_back("writes", std::move(pipeline));
@@ -316,9 +316,7 @@ TEST_F(DocumentSourceFacetTest, SingleFacetShouldReceiveAllDocuments) {
auto dummy = DocumentSourcePassthrough::create();
- auto statusWithPipeline = Pipeline::createFacetPipeline({dummy}, ctx);
- ASSERT_OK(statusWithPipeline.getStatus());
- auto pipeline = std::move(statusWithPipeline.getValue());
+ auto pipeline = Pipeline::create({dummy}, ctx);
std::vector<DocumentSourceFacet::FacetPipeline> facets;
facets.emplace_back("results", std::move(pipeline));
@@ -344,10 +342,10 @@ TEST_F(DocumentSourceFacetTest, MultipleFacetsShouldSeeTheSameDocuments) {
auto mock = DocumentSourceMock::createForTest(inputs);
auto firstDummy = DocumentSourcePassthrough::create();
- auto firstPipeline = uassertStatusOK(Pipeline::createFacetPipeline({firstDummy}, ctx));
+ auto firstPipeline = Pipeline::create({firstDummy}, ctx);
auto secondDummy = DocumentSourcePassthrough::create();
- auto secondPipeline = uassertStatusOK(Pipeline::createFacetPipeline({secondDummy}, ctx));
+ auto secondPipeline = Pipeline::create({secondDummy}, ctx);
std::vector<DocumentSourceFacet::FacetPipeline> facets;
facets.emplace_back("first", std::move(firstPipeline));
@@ -383,10 +381,10 @@ TEST_F(DocumentSourceFacetTest,
auto mock = DocumentSourceMock::createForTest(inputs);
auto passthrough = DocumentSourcePassthrough::create();
- auto passthroughPipe = uassertStatusOK(Pipeline::createFacetPipeline({passthrough}, ctx));
+ auto passthroughPipe = Pipeline::create({passthrough}, ctx);
auto limit = DocumentSourceLimit::create(ctx, 1);
- auto limitedPipe = uassertStatusOK(Pipeline::createFacetPipeline({limit}, ctx));
+ auto limitedPipe = Pipeline::create({limit}, ctx);
std::vector<DocumentSourceFacet::FacetPipeline> facets;
facets.emplace_back("all", std::move(passthroughPipe));
@@ -422,7 +420,7 @@ TEST_F(DocumentSourceFacetTest, ShouldBeAbleToEvaluateMultipleStagesWithinOneSub
auto firstDummy = DocumentSourcePassthrough::create();
auto secondDummy = DocumentSourcePassthrough::create();
- auto pipeline = uassertStatusOK(Pipeline::createFacetPipeline({firstDummy, secondDummy}, ctx));
+ auto pipeline = Pipeline::create({firstDummy, secondDummy}, ctx);
std::vector<DocumentSourceFacet::FacetPipeline> facets;
facets.emplace_back("subPipe", std::move(pipeline));
@@ -441,9 +439,9 @@ TEST_F(DocumentSourceFacetTest, ShouldPropagateDisposeThroughToSource) {
auto mockSource = DocumentSourceMock::createForTest();
auto firstDummy = DocumentSourcePassthrough::create();
- auto firstPipe = uassertStatusOK(Pipeline::createFacetPipeline({firstDummy}, ctx));
+ auto firstPipe = Pipeline::create({firstDummy}, ctx);
auto secondDummy = DocumentSourcePassthrough::create();
- auto secondPipe = uassertStatusOK(Pipeline::createFacetPipeline({secondDummy}, ctx));
+ auto secondPipe = Pipeline::create({secondDummy}, ctx);
std::vector<DocumentSourceFacet::FacetPipeline> facets;
facets.emplace_back("firstPipe", std::move(firstPipe));
@@ -465,7 +463,7 @@ DEATH_TEST_F(DocumentSourceFacetTest,
DocumentSourceMock::createForTest(DocumentSource::GetNextResult::makePauseExecution());
auto firstDummy = DocumentSourcePassthrough::create();
- auto pipeline = uassertStatusOK(Pipeline::createFacetPipeline({firstDummy}, ctx));
+ auto pipeline = Pipeline::create({firstDummy}, ctx);
std::vector<DocumentSourceFacet::FacetPipeline> facets;
facets.emplace_back("subPipe", std::move(pipeline));
@@ -489,10 +487,10 @@ TEST_F(DocumentSourceFacetTest, ShouldBeAbleToReParseSerializedStage) {
// skippedTwo: [{$skip: 2}]
// }}
auto firstSkip = DocumentSourceSkip::create(ctx, 1);
- auto firstPipeline = uassertStatusOK(Pipeline::createFacetPipeline({firstSkip}, ctx));
+ auto firstPipeline = Pipeline::create({firstSkip}, ctx);
auto secondSkip = DocumentSourceSkip::create(ctx, 2);
- auto secondPipeline = uassertStatusOK(Pipeline::createFacetPipeline({secondSkip}, ctx));
+ auto secondPipeline = Pipeline::create({secondSkip}, ctx);
std::vector<DocumentSourceFacet::FacetPipeline> facets;
facets.emplace_back("skippedOne", std::move(firstPipeline));
@@ -532,7 +530,7 @@ TEST_F(DocumentSourceFacetTest, ShouldOptimizeInnerPipelines) {
auto ctx = getExpCtx();
auto dummy = DocumentSourcePassthrough::create();
- auto pipeline = unittest::assertGet(Pipeline::createFacetPipeline({dummy}, ctx));
+ auto pipeline = Pipeline::create({dummy}, ctx);
std::vector<DocumentSourceFacet::FacetPipeline> facets;
facets.emplace_back("subPipe", std::move(pipeline));
@@ -550,10 +548,10 @@ TEST_F(DocumentSourceFacetTest, ShouldPropagateDetachingAndReattachingOfOpCtx) {
ctx->mongoProcessInterface = std::make_unique<StubMongoProcessInterface>();
auto firstDummy = DocumentSourcePassthrough::create();
- auto firstPipeline = unittest::assertGet(Pipeline::createFacetPipeline({firstDummy}, ctx));
+ auto firstPipeline = Pipeline::create({firstDummy}, ctx);
auto secondDummy = DocumentSourcePassthrough::create();
- auto secondPipeline = unittest::assertGet(Pipeline::createFacetPipeline({secondDummy}, ctx));
+ auto secondPipeline = Pipeline::create({secondDummy}, ctx);
std::vector<DocumentSourceFacet::FacetPipeline> facets;
facets.emplace_back("one", std::move(firstPipeline));
@@ -607,7 +605,7 @@ TEST_F(DocumentSourceFacetTest, ShouldUnionDependenciesOfInnerPipelines) {
auto ctx = getExpCtx();
auto needsA = DocumentSourceNeedsA::create();
- auto firstPipeline = unittest::assertGet(Pipeline::createFacetPipeline({needsA}, ctx));
+ auto firstPipeline = Pipeline::create({needsA}, ctx);
auto firstPipelineDeps = firstPipeline->getDependencies(DepsTracker::kNoMetadata);
ASSERT_FALSE(firstPipelineDeps.needWholeDocument);
@@ -615,7 +613,7 @@ TEST_F(DocumentSourceFacetTest, ShouldUnionDependenciesOfInnerPipelines) {
ASSERT_EQ(firstPipelineDeps.fields.count("a"), 1UL);
auto needsB = DocumentSourceNeedsB::create();
- auto secondPipeline = unittest::assertGet(Pipeline::createFacetPipeline({needsB}, ctx));
+ auto secondPipeline = Pipeline::create({needsB}, ctx);
auto secondPipelineDeps = secondPipeline->getDependencies(DepsTracker::kNoMetadata);
ASSERT_FALSE(secondPipelineDeps.needWholeDocument);
@@ -654,11 +652,10 @@ TEST_F(DocumentSourceFacetTest, ShouldRequireWholeDocumentIfAnyPipelineRequiresW
auto ctx = getExpCtx();
auto needsA = DocumentSourceNeedsA::create();
- auto firstPipeline = unittest::assertGet(Pipeline::createFacetPipeline({needsA}, ctx));
+ auto firstPipeline = Pipeline::create({needsA}, ctx);
auto needsWholeDocument = DocumentSourceNeedsWholeDocument::create();
- auto secondPipeline =
- unittest::assertGet(Pipeline::createFacetPipeline({needsWholeDocument}, ctx));
+ auto secondPipeline = Pipeline::create({needsWholeDocument}, ctx);
std::vector<DocumentSourceFacet::FacetPipeline> facets;
facets.emplace_back("needsA", std::move(firstPipeline));
@@ -689,14 +686,13 @@ TEST_F(DocumentSourceFacetTest, ShouldRequireTextScoreIfAnyPipelineRequiresTextS
auto ctx = getExpCtx();
auto needsA = DocumentSourceNeedsA::create();
- auto firstPipeline = unittest::assertGet(Pipeline::createFacetPipeline({needsA}, ctx));
+ auto firstPipeline = Pipeline::create({needsA}, ctx);
auto needsWholeDocument = DocumentSourceNeedsWholeDocument::create();
- auto secondPipeline =
- unittest::assertGet(Pipeline::createFacetPipeline({needsWholeDocument}, ctx));
+ auto secondPipeline = Pipeline::create({needsWholeDocument}, ctx);
auto needsTextScore = DocumentSourceNeedsOnlyTextScore::create();
- auto thirdPipeline = unittest::assertGet(Pipeline::createFacetPipeline({needsTextScore}, ctx));
+ auto thirdPipeline = Pipeline::create({needsTextScore}, ctx);
std::vector<DocumentSourceFacet::FacetPipeline> facets;
facets.emplace_back("needsA", std::move(firstPipeline));
@@ -714,10 +710,10 @@ TEST_F(DocumentSourceFacetTest, ShouldThrowIfAnyPipelineRequiresTextScoreButItIs
auto ctx = getExpCtx();
auto needsA = DocumentSourceNeedsA::create();
- auto firstPipeline = unittest::assertGet(Pipeline::createFacetPipeline({needsA}, ctx));
+ auto firstPipeline = Pipeline::create({needsA}, ctx);
auto needsTextScore = DocumentSourceNeedsOnlyTextScore::create();
- auto secondPipeline = unittest::assertGet(Pipeline::createFacetPipeline({needsTextScore}, ctx));
+ auto secondPipeline = Pipeline::create({needsTextScore}, ctx);
std::vector<DocumentSourceFacet::FacetPipeline> facets;
facets.emplace_back("needsA", std::move(firstPipeline));
@@ -753,11 +749,10 @@ TEST_F(DocumentSourceFacetTest, ShouldRequirePrimaryShardIfAnyStageRequiresPrima
auto ctx = getExpCtx();
auto passthrough = DocumentSourcePassthrough::create();
- auto firstPipeline = unittest::assertGet(Pipeline::createFacetPipeline({passthrough}, ctx));
+ auto firstPipeline = Pipeline::create({passthrough}, ctx);
auto needsPrimaryShard = DocumentSourceNeedsPrimaryShard::create();
- auto secondPipeline =
- unittest::assertGet(Pipeline::createFacetPipeline({needsPrimaryShard}, ctx));
+ auto secondPipeline = Pipeline::create({needsPrimaryShard}, ctx);
std::vector<DocumentSourceFacet::FacetPipeline> facets;
facets.emplace_back("passthrough", std::move(firstPipeline));
@@ -776,12 +771,10 @@ TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPr
auto ctx = getExpCtx();
auto firstPassthrough = DocumentSourcePassthrough::create();
- auto firstPipeline =
- unittest::assertGet(Pipeline::createFacetPipeline({firstPassthrough}, ctx));
+ auto firstPipeline = Pipeline::create({firstPassthrough}, ctx);
auto secondPassthrough = DocumentSourcePassthrough::create();
- auto secondPipeline =
- unittest::assertGet(Pipeline::createFacetPipeline({secondPassthrough}, ctx));
+ auto secondPipeline = Pipeline::create({secondPassthrough}, ctx);
std::vector<DocumentSourceFacet::FacetPipeline> facets;
facets.emplace_back("first", std::move(firstPipeline));
@@ -843,16 +836,13 @@ TEST_F(DocumentSourceFacetTest, ShouldSurfaceStrictestRequirementsOfEachConstrai
auto ctx = getExpCtx();
auto firstPassthrough = DocumentSourcePassthrough::create();
- auto firstPipeline =
- unittest::assertGet(Pipeline::createFacetPipeline({firstPassthrough}, ctx));
+ auto firstPipeline = Pipeline::create({firstPassthrough}, ctx);
auto secondPassthrough = DocumentSourcePrimaryShardTmpDataNoTxn::create();
- auto secondPipeline =
- unittest::assertGet(Pipeline::createFacetPipeline({secondPassthrough}, ctx));
+ auto secondPipeline = Pipeline::create({secondPassthrough}, ctx);
auto thirdPassthrough = DocumentSourceBannedInLookup::create();
- auto thirdPipeline =
- unittest::assertGet(Pipeline::createFacetPipeline({thirdPassthrough}, ctx));
+ auto thirdPipeline = Pipeline::create({thirdPassthrough}, ctx);
std::vector<DocumentSourceFacet::FacetPipeline> facets;
facets.emplace_back("first", std::move(firstPipeline));
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
index b714ed53c9a..d01784a4bbf 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
@@ -607,7 +607,7 @@ intrusive_ptr<DocumentSource> DocumentSourceGraphLookUp::createFromBson(
void DocumentSourceGraphLookUp::addInvolvedCollections(
stdx::unordered_set<NamespaceString>* collectionNames) const {
collectionNames->insert(_fromExpCtx->ns);
- auto introspectionPipeline = uassertStatusOK(Pipeline::parse(_fromPipeline, _fromExpCtx));
+ auto introspectionPipeline = Pipeline::parse(_fromPipeline, _fromExpCtx);
for (auto&& stage : introspectionPipeline->getSources()) {
stage->addInvolvedCollections(collectionNames);
}
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index ddbe6ac0e59..e280277115f 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -664,20 +664,15 @@ void DocumentSourceLookUp::resolveLetVariables(const Document& localDoc, Variabl
void DocumentSourceLookUp::initializeResolvedIntrospectionPipeline() {
copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
_resolvedIntrospectionPipeline =
- uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx));
-
- auto& sources = _resolvedIntrospectionPipeline->getSources();
-
- auto it = std::find_if(
- sources.begin(), sources.end(), [](const boost::intrusive_ptr<DocumentSource>& src) {
- return !src->constraints().isAllowedInLookupPipeline();
+ Pipeline::parse(_resolvedPipeline, _fromExpCtx, [](const Pipeline& pipeline) {
+ const auto& sources = pipeline.getSources();
+ std::for_each(sources.begin(), sources.end(), [](auto& src) {
+ uassert(51047,
+ str::stream() << src->getSourceName()
+ << " is not allowed within a $lookup's sub-pipeline",
+ src->constraints().isAllowedInLookupPipeline());
+ });
});
-
- // For other stages, use a generic error.
- uassert(51047,
- str::stream() << (*it)->getSourceName()
- << " is not allowed within a $lookup's sub-pipeline",
- it == sources.end());
}
void DocumentSourceLookUp::serializeToArray(
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
index ef6d685983c..f5763d42987 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
@@ -205,7 +205,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldReportEOFWithNoCursors) {
cursors.emplace_back(makeRemoteCursor(
kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, kExhaustedCursorID, {})));
armParams.setRemotes(std::move(cursors));
- auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
+ auto pipeline = Pipeline::create({}, expCtx);
auto mergeCursorsStage =
DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx);
@@ -229,7 +229,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToIterateCursorsUntilEOF) {
cursors.emplace_back(
makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})));
armParams.setRemotes(std::move(cursors));
- auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
+ auto pipeline = Pipeline::create({}, expCtx);
pipeline->addInitialSource(
DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
@@ -278,7 +278,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldNotKillCursorsIfTheyAreNotOwned) {
cursors.emplace_back(
makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})));
armParams.setRemotes(std::move(cursors));
- auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
+ auto pipeline = Pipeline::create({}, expCtx);
pipeline->addInitialSource(
DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
@@ -300,7 +300,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldKillCursorIfPartiallyIterated) {
cursors.emplace_back(
makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})));
armParams.setRemotes(std::move(cursors));
- auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
+ auto pipeline = Pipeline::create({}, expCtx);
pipeline->addInitialSource(
DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
@@ -335,7 +335,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldKillCursorIfPartiallyIterated) {
TEST_F(DocumentSourceMergeCursorsTest, ShouldEnforceSortSpecifiedViaARMParams) {
auto expCtx = getExpCtx();
- auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
+ auto pipeline = Pipeline::create({}, expCtx);
// Make a $mergeCursors stage with a sort on "x" and add it to the front of the pipeline.
AsyncResultsMergerParams armParams;
diff --git a/src/mongo/db/pipeline/document_source_plan_cache_stats_test.cpp b/src/mongo/db/pipeline/document_source_plan_cache_stats_test.cpp
index 95f5c799b7c..4f89e47acdf 100644
--- a/src/mongo/db/pipeline/document_source_plan_cache_stats_test.cpp
+++ b/src/mongo/db/pipeline/document_source_plan_cache_stats_test.cpp
@@ -116,7 +116,7 @@ TEST_F(DocumentSourcePlanCacheStatsTest, SerializesSuccessfullyAfterAbsorbingMat
auto planCacheStats =
DocumentSourcePlanCacheStats::createFromBson(specObj.firstElement(), getExpCtx());
auto match = DocumentSourceMatch::create(fromjson("{foo: 'bar'}"), getExpCtx());
- auto pipeline = unittest::assertGet(Pipeline::create({planCacheStats, match}, getExpCtx()));
+ auto pipeline = Pipeline::create({planCacheStats, match}, getExpCtx());
ASSERT_EQ(2u, pipeline->getSources().size());
pipeline->optimizePipeline();
@@ -133,7 +133,7 @@ TEST_F(DocumentSourcePlanCacheStatsTest, SerializesSuccessfullyAfterAbsorbingMat
auto planCacheStats =
DocumentSourcePlanCacheStats::createFromBson(specObj.firstElement(), getExpCtx());
auto match = DocumentSourceMatch::create(fromjson("{foo: 'bar'}"), getExpCtx());
- auto pipeline = unittest::assertGet(Pipeline::create({planCacheStats, match}, getExpCtx()));
+ auto pipeline = Pipeline::create({planCacheStats, match}, getExpCtx());
ASSERT_EQ(2u, pipeline->getSources().size());
pipeline->optimizePipeline();
@@ -170,7 +170,7 @@ TEST_F(DocumentSourcePlanCacheStatsTest, ReturnsOnlyMatchingStatsAfterAbsorbingM
auto planCacheStats =
DocumentSourcePlanCacheStats::createFromBson(specObj.firstElement(), getExpCtx());
auto match = DocumentSourceMatch::create(fromjson("{foo: 'bar'}"), getExpCtx());
- auto pipeline = unittest::assertGet(Pipeline::create({planCacheStats, match}, getExpCtx()));
+ auto pipeline = Pipeline::create({planCacheStats, match}, getExpCtx());
pipeline->optimizePipeline();
ASSERT_BSONOBJ_EQ(pipeline->getNext()->toBson(),
@@ -197,7 +197,7 @@ TEST_F(DocumentSourcePlanCacheStatsTest, ReturnsHostNameWhenNotFromMongos) {
const auto specObj = fromjson("{$planCacheStats: {}}");
auto planCacheStats =
DocumentSourcePlanCacheStats::createFromBson(specObj.firstElement(), getExpCtx());
- auto pipeline = unittest::assertGet(Pipeline::create({planCacheStats}, getExpCtx()));
+ auto pipeline = Pipeline::create({planCacheStats}, getExpCtx());
ASSERT_BSONOBJ_EQ(pipeline->getNext()->toBson(),
BSON("foo"
<< "bar"
@@ -223,7 +223,7 @@ TEST_F(DocumentSourcePlanCacheStatsTest, ReturnsShardAndHostNameWhenFromMongos)
const auto specObj = fromjson("{$planCacheStats: {}}");
auto planCacheStats =
DocumentSourcePlanCacheStats::createFromBson(specObj.firstElement(), getExpCtx());
- auto pipeline = unittest::assertGet(Pipeline::create({planCacheStats}, getExpCtx()));
+ auto pipeline = Pipeline::create({planCacheStats}, getExpCtx());
ASSERT_BSONOBJ_EQ(pipeline->getNext()->toBson(),
BSON("foo"
<< "bar"
diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp
index 151ea99c58a..1885bef0b51 100644
--- a/src/mongo/db/pipeline/document_source_union_with.cpp
+++ b/src/mongo/db/pipeline/document_source_union_with.cpp
@@ -52,11 +52,22 @@ std::unique_ptr<Pipeline, PipelineDeleter> buildPipelineFromViewDefinition(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
ExpressionContext::ResolvedNamespace resolvedNs,
std::vector<BSONObj> currentPipeline) {
+
+ auto validatorCallback = [](const Pipeline& pipeline) {
+ const auto& sources = pipeline.getSources();
+ std::for_each(sources.begin(), sources.end(), [](auto& src) {
+ uassert(31441,
+ str::stream() << src->getSourceName()
+ << " is not allowed within a $unionWith's sub-pipeline",
+ src->constraints().isAllowedInUnionPipeline());
+ });
+ };
+
// Copy the ExpressionContext of the base aggregation, using the inner namespace instead.
auto unionExpCtx = expCtx->copyForSubPipeline(resolvedNs.ns);
if (resolvedNs.pipeline.empty()) {
- return uassertStatusOK(Pipeline::parse(std::move(currentPipeline), unionExpCtx));
+ return Pipeline::parse(std::move(currentPipeline), unionExpCtx, validatorCallback);
}
auto resolvedPipeline = std::move(resolvedNs.pipeline);
resolvedPipeline.reserve(currentPipeline.size() + resolvedPipeline.size());
@@ -64,7 +75,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> buildPipelineFromViewDefinition(
std::make_move_iterator(currentPipeline.begin()),
std::make_move_iterator(currentPipeline.end()));
- return uassertStatusOK(Pipeline::parse(std::move(resolvedPipeline), unionExpCtx));
+ return Pipeline::parse(std::move(resolvedPipeline), unionExpCtx, validatorCallback);
}
} // namespace
diff --git a/src/mongo/db/pipeline/document_source_union_with.h b/src/mongo/db/pipeline/document_source_union_with.h
index 3225a1e6468..ade72532935 100644
--- a/src/mongo/db/pipeline/document_source_union_with.h
+++ b/src/mongo/db/pipeline/document_source_union_with.h
@@ -62,19 +62,7 @@ public:
DocumentSourceUnionWith(const boost::intrusive_ptr<ExpressionContext>& expCtx,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline)
- : DocumentSource(kStageName, expCtx), _pipeline(std::move(pipeline)) {
- if (_pipeline) {
- const auto& sources = _pipeline->getSources();
- auto it = std::find_if(sources.begin(), sources.end(), [](const auto& src) {
- return !src->constraints().isAllowedInUnionPipeline();
- });
-
- uassert(31441,
- str::stream() << (*it)->getSourceName()
- << " is not allowed within a $unionWith's sub-pipeline",
- it == sources.end());
- }
- }
+ : DocumentSource(kStageName, expCtx), _pipeline(std::move(pipeline)) {}
const char* getSourceName() const final {
return kStageName.rawData();
diff --git a/src/mongo/db/pipeline/document_source_union_with_test.cpp b/src/mongo/db/pipeline/document_source_union_with_test.cpp
index eaebccf21a7..61412ad421c 100644
--- a/src/mongo/db/pipeline/document_source_union_with_test.cpp
+++ b/src/mongo/db/pipeline/document_source_union_with_test.cpp
@@ -70,12 +70,10 @@ TEST_F(DocumentSourceUnionWithTest, BasicSerialUnions) {
mockCtxTwo->mongoProcessInterface = std::make_unique<MockMongoInterface>(mockDequeTwo);
auto unionWithOne = DocumentSourceUnionWith(
mockCtxOne,
- uassertStatusOK(
- Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx())));
+ Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx()));
auto unionWithTwo = DocumentSourceUnionWith(
mockCtxTwo,
- uassertStatusOK(
- Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx())));
+ Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx()));
unionWithOne.setSource(mock.get());
unionWithTwo.setSource(&unionWithOne);
@@ -106,12 +104,11 @@ TEST_F(DocumentSourceUnionWithTest, BasicNestedUnions) {
mockCtxTwo->mongoProcessInterface = std::make_unique<MockMongoInterface>(mockDequeTwo);
auto unionWithOne = make_intrusive<DocumentSourceUnionWith>(
mockCtxOne,
- uassertStatusOK(
- Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx())));
+ Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx()));
auto unionWithTwo = DocumentSourceUnionWith(
mockCtxTwo,
- uassertStatusOK(Pipeline::create(
- std::list<boost::intrusive_ptr<DocumentSource>>{unionWithOne}, getExpCtx())));
+ Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{unionWithOne},
+ getExpCtx()));
unionWithTwo.setSource(mock.get());
auto comparator = DocumentComparator();
@@ -144,12 +141,10 @@ TEST_F(DocumentSourceUnionWithTest, UnionsWithNonEmptySubPipelines) {
const auto proj = DocumentSourceAddFields::create(BSON("d" << 1), mockCtxTwo);
auto unionWithOne = DocumentSourceUnionWith(
mockCtxOne,
- uassertStatusOK(Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{filter},
- getExpCtx())));
+ Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{filter}, getExpCtx()));
auto unionWithTwo = DocumentSourceUnionWith(
mockCtxTwo,
- uassertStatusOK(
- Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{proj}, getExpCtx())));
+ Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{proj}, getExpCtx()));
unionWithOne.setSource(mock.get());
unionWithTwo.setSource(&unionWithOne);
@@ -312,12 +307,10 @@ TEST_F(DocumentSourceUnionWithTest, PropagatePauses) {
mockCtxTwo->mongoProcessInterface = std::make_unique<MockMongoInterface>(mockDequeTwo);
auto unionWithOne = DocumentSourceUnionWith(
mockCtxOne,
- uassertStatusOK(
- Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx())));
+ Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx()));
auto unionWithTwo = DocumentSourceUnionWith(
mockCtxTwo,
- uassertStatusOK(
- Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx())));
+ Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx()));
unionWithOne.setSource(mock.get());
unionWithTwo.setSource(&unionWithOne);
@@ -339,13 +332,11 @@ TEST_F(DocumentSourceUnionWithTest, DependencyAnalysisReportsFullDoc) {
.firstElement(),
expCtx);
const auto unionWith = make_intrusive<DocumentSourceUnionWith>(
- expCtx,
- uassertStatusOK(
- Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, expCtx)));
+ expCtx, Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, expCtx));
// With the $unionWith *before* the $replaceRoot, the dependency analysis will report that all
// fields are needed.
- auto pipeline = uassertStatusOK(Pipeline::create({unionWith, replaceRoot}, expCtx));
+ auto pipeline = Pipeline::create({unionWith, replaceRoot}, expCtx);
auto deps = pipeline->getDependencies(DepsTracker::kNoMetadata);
ASSERT_BSONOBJ_EQ(deps.toProjectionWithoutMetadata(), BSONObj());
@@ -361,13 +352,11 @@ TEST_F(DocumentSourceUnionWithTest, DependencyAnalysisReportsReferencedFieldsBef
.firstElement(),
expCtx);
const auto unionWith = make_intrusive<DocumentSourceUnionWith>(
- expCtx,
- uassertStatusOK(
- Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, expCtx)));
+ expCtx, Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, expCtx));
// With the $unionWith *after* the $replaceRoot, the dependency analysis will now report only
// the referenced fields.
- auto pipeline = uassertStatusOK(Pipeline::create({replaceRoot, unionWith}, expCtx));
+ auto pipeline = Pipeline::create({replaceRoot, unionWith}, expCtx);
auto deps = pipeline->getDependencies(DepsTracker::kNoMetadata);
ASSERT_BSONOBJ_EQ(deps.toProjectionWithoutMetadata(), BSON("b" << 1 << "_id" << 0));
@@ -459,8 +448,7 @@ TEST_F(DocumentSourceUnionWithTest, RejectUnionWhenDepthLimitIsExceeded) {
TEST_F(DocumentSourceUnionWithTest, ConstraintsWithoutPipelineAreCorrect) {
auto emptyUnion = DocumentSourceUnionWith(
getExpCtx(),
- uassertStatusOK(
- Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx())));
+ Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{}, getExpCtx()));
StageConstraints defaultConstraints(StageConstraints::StreamType::kStreaming,
StageConstraints::PositionRequirement::kNone,
StageConstraints::HostTypeRequirement::kAnyShard,
@@ -485,8 +473,7 @@ TEST_F(DocumentSourceUnionWithTest, ConstraintsWithMixedSubPipelineAreCorrect) {
mock->mockConstraints = stricterConstraint;
auto unionWithOne = DocumentSourceUnionWith(
getExpCtx(),
- uassertStatusOK(
- Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{mock}, getExpCtx())));
+ Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{mock}, getExpCtx()));
ASSERT_TRUE(unionWithOne.constraints(Pipeline::SplitState::kUnsplit) == stricterConstraint);
}
@@ -525,9 +512,9 @@ TEST_F(DocumentSourceUnionWithTest, ConstraintsWithStrictSubPipelineAreCorrect)
mockThree->mockConstraints = constraintPersistentDataTransactionLookupNotAllowed;
auto unionStage = DocumentSourceUnionWith(
getExpCtx(),
- uassertStatusOK(Pipeline::create(
+ Pipeline::create(
std::list<boost::intrusive_ptr<DocumentSource>>{mockOne, mockTwo, mockThree},
- getExpCtx())));
+ getExpCtx()));
StageConstraints strict(StageConstraints::StreamType::kStreaming,
StageConstraints::PositionRequirement::kNone,
StageConstraints::HostTypeRequirement::kAnyShard,
@@ -549,14 +536,13 @@ TEST_F(DocumentSourceUnionWithTest, StricterConstraintsFromSubSubPipelineAreInhe
StageConstraints::LookupRequirement::kNotAllowed,
StageConstraints::UnionRequirement::kAllowed);
mock->mockConstraints = strictConstraint;
- auto facetPipeline = uassertStatusOK(Pipeline::createFacetPipeline({mock}, getExpCtx()));
+ auto facetPipeline = Pipeline::create({mock}, getExpCtx());
std::vector<DocumentSourceFacet::FacetPipeline> facets;
facets.emplace_back("pipeline", std::move(facetPipeline));
auto facetStage = DocumentSourceFacet::create(std::move(facets), getExpCtx());
auto unionStage = DocumentSourceUnionWith(
getExpCtx(),
- uassertStatusOK(Pipeline::create(
- std::list<boost::intrusive_ptr<DocumentSource>>{facetStage}, getExpCtx())));
+ Pipeline::create(std::list<boost::intrusive_ptr<DocumentSource>>{facetStage}, getExpCtx()));
StageConstraints expectedConstraints(StageConstraints::StreamType::kStreaming,
StageConstraints::PositionRequirement::kNone,
StageConstraints::HostTypeRequirement::kAnyShard,
diff --git a/src/mongo/db/pipeline/expression_walker_test.cpp b/src/mongo/db/pipeline/expression_walker_test.cpp
index 4cc5a1c4691..4c409a468b6 100644
--- a/src/mongo/db/pipeline/expression_walker_test.cpp
+++ b/src/mongo/db/pipeline/expression_walker_test.cpp
@@ -53,7 +53,7 @@ protected:
NamespaceString testNss("test", "collection");
AggregationRequest request(testNss, rawPipeline);
- return uassertStatusOK(Pipeline::parse(request.getPipeline(), getExpCtx()));
+ return Pipeline::parse(request.getPipeline(), getExpCtx());
}
auto parseExpression(std::string expressionString) {
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index bd8cf53df0c..eb3a4d97914 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -69,6 +69,54 @@ Value appendExecStats(Value docSource, const CommonStats& stats) {
doc.addField("executionTimeMillisEstimate", Value(executionTimeMillisEstimate));
return Value(doc.freeze());
}
+
+/**
+ * Performs validation checking specific to top-level pipelines. Throws an assertion if the
+ * pipeline is invalid.
+ */
+void validateTopLevelPipeline(const Pipeline& pipeline) {
+ // Verify that the specified namespace is valid for the initial stage of this pipeline.
+ const NamespaceString& nss = pipeline.getContext()->ns;
+
+ auto sources = pipeline.getSources();
+
+ if (sources.empty()) {
+ uassert(ErrorCodes::InvalidNamespace,
+ "{aggregate: 1} is not valid for an empty pipeline.",
+ !nss.isCollectionlessAggregateNS());
+ return;
+ }
+
+ if ("$mergeCursors"_sd != sources.front()->getSourceName()) {
+ // The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this,
+ // {aggregate: 1} is only valid for collectionless sources, and vice-versa.
+ const auto firstStageConstraints = sources.front()->constraints();
+
+ uassert(ErrorCodes::InvalidNamespace,
+ str::stream() << "{aggregate: 1} is not valid for '"
+ << sources.front()->getSourceName() << "'; a collection is required.",
+ !(nss.isCollectionlessAggregateNS() &&
+ !firstStageConstraints.isIndependentOfAnyCollection));
+
+ uassert(ErrorCodes::InvalidNamespace,
+ str::stream() << "'" << sources.front()->getSourceName()
+ << "' can only be run with {aggregate: 1}",
+ !(!nss.isCollectionlessAggregateNS() &&
+ firstStageConstraints.isIndependentOfAnyCollection));
+
+ // If the first stage is a $changeStream stage, then all stages in the pipeline must be
+ // either $changeStream stages or whitelisted as being able to run in a change stream.
+ if (firstStageConstraints.isChangeStreamStage()) {
+ for (auto&& source : sources) {
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << source->getSourceName()
+ << " is not permitted in a $changeStream pipeline",
+ source->constraints().isAllowedInChangeStream());
+ }
+ }
+ }
+}
+
} // namespace
MONGO_FAIL_POINT_DEFINE(disablePipelineOptimization);
@@ -100,20 +148,10 @@ Pipeline::~Pipeline() {
invariant(_disposed);
}
-StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::parse(
- const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx) {
- return parseTopLevelOrFacetPipeline(rawPipeline, expCtx, false);
-}
-
-StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::parseFacetPipeline(
- const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx) {
- return parseTopLevelOrFacetPipeline(rawPipeline, expCtx, true);
-}
-
-StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::parseTopLevelOrFacetPipeline(
+std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::parse(
const std::vector<BSONObj>& rawPipeline,
const intrusive_ptr<ExpressionContext>& expCtx,
- const bool isFacetPipeline) {
+ PipelineValidatorCallback validator) {
SourceContainer stages;
@@ -122,105 +160,32 @@ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::parseTopLevelOr
stages.insert(stages.end(), parsedSources.begin(), parsedSources.end());
}
- return createTopLevelOrFacetPipeline(std::move(stages), expCtx, isFacetPipeline);
-}
-
-StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::create(
- SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) {
- return createTopLevelOrFacetPipeline(std::move(stages), expCtx, false);
-}
-
-StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::createFacetPipeline(
- SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) {
- return createTopLevelOrFacetPipeline(std::move(stages), expCtx, true);
-}
-
-StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::createTopLevelOrFacetPipeline(
- SourceContainer stages,
- const intrusive_ptr<ExpressionContext>& expCtx,
- const bool isFacetPipeline) {
std::unique_ptr<Pipeline, PipelineDeleter> pipeline(new Pipeline(std::move(stages), expCtx),
PipelineDeleter(expCtx->opCtx));
- try {
- pipeline->validate(isFacetPipeline);
- } catch (const DBException& ex) {
- return ex.toStatus();
- }
-
- pipeline->stitch();
- return std::move(pipeline);
-}
-void Pipeline::validate(bool isFacetPipeline) const {
- if (isFacetPipeline) {
- validateFacetPipeline();
- } else {
- validateTopLevelPipeline();
+ // First call the context-specific validator, which may be different for top-level pipelines
+ // versus nested pipelines.
+ if (validator)
+ validator(*pipeline);
+ else {
+ validateTopLevelPipeline(*pipeline);
}
- validateCommon();
-}
-
-void Pipeline::validateTopLevelPipeline() const {
- // Verify that the specified namespace is valid for the initial stage of this pipeline.
- const NamespaceString& nss = pCtx->ns;
+ // Next run through the common validation rules that apply to every pipeline.
+ pipeline->validateCommon();
- if (_sources.empty()) {
- if (nss.isCollectionlessAggregateNS()) {
- uasserted(ErrorCodes::InvalidNamespace,
- "{aggregate: 1} is not valid for an empty pipeline.");
- }
- return;
- }
- if ("$mergeCursors"_sd != _sources.front()->getSourceName()) {
- // The $mergeCursors stage can take {aggregate: 1} or a normal namespace. Aside from this,
- // {aggregate: 1} is only valid for collectionless sources, and vice-versa.
- const auto firstStageConstraints = _sources.front()->constraints(_splitState);
-
- if (nss.isCollectionlessAggregateNS() &&
- !firstStageConstraints.isIndependentOfAnyCollection) {
- uasserted(ErrorCodes::InvalidNamespace,
- str::stream()
- << "{aggregate: 1} is not valid for '"
- << _sources.front()->getSourceName() << "'; a collection is required.");
- }
-
- if (!nss.isCollectionlessAggregateNS() &&
- firstStageConstraints.isIndependentOfAnyCollection) {
- uasserted(ErrorCodes::InvalidNamespace,
- str::stream() << "'" << _sources.front()->getSourceName()
- << "' can only be run with {aggregate: 1}");
- }
-
- // If the first stage is a $changeStream stage, then all stages in the pipeline must be
- // either $changeStream stages or whitelisted as being able to run in a change stream.
- if (firstStageConstraints.isChangeStreamStage()) {
- for (auto&& source : _sources) {
- uassert(ErrorCodes::IllegalOperation,
- str::stream() << source->getSourceName()
- << " is not permitted in a $changeStream pipeline",
- source->constraints(_splitState).isAllowedInChangeStream());
- }
- }
- }
+ pipeline->stitch();
+ return pipeline;
}
-void Pipeline::validateFacetPipeline() const {
- if (_sources.empty()) {
- uasserted(ErrorCodes::BadValue, "sub-pipeline in $facet stage cannot be empty");
- }
+std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::create(
+ SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) {
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline(new Pipeline(std::move(stages), expCtx),
+ PipelineDeleter(expCtx->opCtx));
- for (auto&& stage : _sources) {
- auto stageConstraints = stage->constraints(_splitState);
- if (!stageConstraints.isAllowedInsideFacetStage()) {
- uasserted(40600,
- str::stream() << stage->getSourceName()
- << " is not allowed to be used within a $facet stage");
- }
- // We expect a stage within a $facet stage to have these properties.
- invariant(stageConstraints.requiredPosition == PositionRequirement::kNone);
- invariant(!stageConstraints.isIndependentOfAnyCollection);
- }
+ pipeline->validateCommon();
+ pipeline->stitch();
+ return pipeline;
}
void Pipeline::validateCommon() const {
@@ -229,22 +194,21 @@ void Pipeline::validateCommon() const {
auto constraints = stage->constraints(_splitState);
// Verify that all stages adhere to their PositionRequirement constraints.
- if (constraints.requiredPosition == PositionRequirement::kFirst && i != 0) {
- uasserted(40602,
- str::stream() << stage->getSourceName()
- << " is only valid as the first stage in a pipeline.");
- }
- auto matchStage = dynamic_cast<DocumentSourceMatch*>(stage.get());
- if (i != 0 && matchStage && matchStage->isTextQuery()) {
- uasserted(17313, "$match with $text is only allowed as the first pipeline stage");
- }
+ uassert(40602,
+ str::stream() << stage->getSourceName()
+ << " is only valid as the first stage in a pipeline.",
+ !(constraints.requiredPosition == PositionRequirement::kFirst && i != 0));
- if (constraints.requiredPosition == PositionRequirement::kLast &&
- i != _sources.size() - 1) {
- uasserted(40601,
- str::stream() << stage->getSourceName()
- << " can only be the final stage in the pipeline");
- }
+ auto matchStage = dynamic_cast<DocumentSourceMatch*>(stage.get());
+ uassert(17313,
+ "$match with $text is only allowed as the first pipeline stage",
+ !(i != 0 && matchStage && matchStage->isTextQuery()));
+
+ uassert(40601,
+ str::stream() << stage->getSourceName()
+ << " can only be the final stage in the pipeline",
+ !(constraints.requiredPosition == PositionRequirement::kLast &&
+ i != _sources.size() - 1));
++i;
// Verify that we are not attempting to run a mongoS-only stage on mongoD.
@@ -252,12 +216,10 @@ void Pipeline::validateCommon() const {
str::stream() << stage->getSourceName() << " can only be run on mongoS",
!(constraints.hostRequirement == HostTypeRequirement::kMongoS && !pCtx->inMongos));
- if (pCtx->inMultiDocumentTransaction) {
- uassert(ErrorCodes::OperationNotSupportedInTransaction,
- str::stream() << "Stage not supported inside of a multi-document transaction: "
- << stage->getSourceName(),
- constraints.isAllowedInTransaction());
- }
+ uassert(ErrorCodes::OperationNotSupportedInTransaction,
+ str::stream() << "Stage not supported inside of a multi-document transaction: "
+ << stage->getSourceName(),
+ !(pCtx->inMultiDocumentTransaction && !constraints.isAllowedInTransaction()));
}
}
@@ -661,7 +623,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts) {
- auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
+ auto pipeline = Pipeline::parse(rawPipeline, expCtx);
if (opts.optimize) {
pipeline->optimizePipeline();
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index c3bad653814..5eb096fc7e2 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -101,28 +101,22 @@ public:
MatchExpressionParser::AllowedFeatures::kGeoNear;
/**
- * Parses a Pipeline from a vector of BSONObjs. Returns a non-OK status if it failed to parse.
- * The returned pipeline is not optimized, but the caller may convert it to an optimized
- * pipeline by calling optimizePipeline().
+ * Parses a Pipeline from a vector of BSONObjs then invokes the optional 'validator' callback
+ * with a reference to the newly created Pipeline. If no validator callback is given, this
+ * method assumes that we're parsing a top-level pipeline. Throws an exception if it failed to
+ * parse or if any exception occurs in the validator. The returned pipeline is not optimized,
+ * but the caller may convert it to an optimized pipeline by calling optimizePipeline().
*
* It is illegal to create a pipeline using an ExpressionContext which contains a collation that
* will not be used during execution of the pipeline. Doing so may cause comparisons made during
* parse-time to return the wrong results.
*/
- static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> parse(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx);
+ using PipelineValidatorCallback = std::function<void(const Pipeline&)>;
- /**
- * Parses a $facet Pipeline from a vector of BSONObjs. Validation checks which are only relevant
- * to top-level pipelines are skipped, and additional checks applicable to $facet pipelines are
- * performed. Returns a non-OK status if it failed to parse. The returned pipeline is not
- * optimized, but the caller may convert it to an optimized pipeline by calling
- * optimizePipeline().
- */
- static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> parseFacetPipeline(
+ static std::unique_ptr<Pipeline, PipelineDeleter> parse(
const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx);
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ PipelineValidatorCallback validator = nullptr);
/**
* Creates a Pipeline from an existing SourceContainer.
@@ -130,16 +124,7 @@ public:
* Returns a non-OK status if any stage is in an invalid position. For example, if an $out stage
* is present but is not the last stage.
*/
- static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> create(
- SourceContainer sources, const boost::intrusive_ptr<ExpressionContext>& expCtx);
-
- /**
- * Creates a $facet Pipeline from an existing SourceContainer.
- *
- * Returns a non-OK status if any stage is invalid. For example, if the pipeline is empty or if
- * any stage is an initial source.
- */
- static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> createFacetPipeline(
+ static std::unique_ptr<Pipeline, PipelineDeleter> create(
SourceContainer sources, const boost::intrusive_ptr<ExpressionContext>& expCtx);
/**
@@ -335,24 +320,6 @@ public:
private:
friend class PipelineDeleter;
- /**
- * Used by both Pipeline::parse() and Pipeline::parseFacetPipeline() to build and validate the
- * pipeline.
- */
- static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> parseTopLevelOrFacetPipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const bool isFacetPipeline);
-
- /**
- * Used by both Pipeline::create() and Pipeline::createFacetPipeline() to build and validate the
- * pipeline.
- */
- static StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> createTopLevelOrFacetPipeline(
- SourceContainer sources,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const bool isSubPipeline);
-
Pipeline(const boost::intrusive_ptr<ExpressionContext>& pCtx);
Pipeline(SourceContainer stages, const boost::intrusive_ptr<ExpressionContext>& pCtx);
@@ -372,25 +339,6 @@ private:
void unstitch();
/**
- * Throws if the pipeline fails any of a set of semantic checks. For example, if an $out stage
- * is present then it must come last in the pipeline, while initial stages such as $indexStats
- * must be at the start.
- */
- void validate(bool isFacetPipeline) const;
-
- /**
- * Performs validation checking specific to top-level pipelines. Throws if the pipeline is
- * invalid.
- */
- void validateTopLevelPipeline() const;
-
- /**
- * Performs validation checking specific to nested $facet pipelines. Throws if the pipeline is
- * invalid.
- */
- void validateFacetPipeline() const;
-
- /**
* Performs common validation for top-level or facet pipelines. Throws if the pipeline is
* invalid.
*
diff --git a/src/mongo/db/pipeline/pipeline_metadata_tree_test.cpp b/src/mongo/db/pipeline/pipeline_metadata_tree_test.cpp
index 5a15074b361..c9d05ea2b4e 100644
--- a/src/mongo/db/pipeline/pipeline_metadata_tree_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_metadata_tree_test.cpp
@@ -83,7 +83,7 @@ protected:
AggregationRequest request(testNss, rawPipeline);
getExpCtx()->ns = testNss;
- return uassertStatusOK(Pipeline::parse(request.getPipeline(), getExpCtx()));
+ return Pipeline::parse(request.getPipeline(), getExpCtx());
}
template <typename T, typename... Args>
@@ -391,7 +391,7 @@ TEST_F(PipelineMetadataTreeTest, ZipWalksAPipelineAndTreeInTandemAndInOrder) {
}
TEST_F(PipelineMetadataTreeTest, MakeTreeWithEmptyPipeline) {
- auto pipeline = uassertStatusOK(Pipeline::parse({}, getExpCtx()));
+ auto pipeline = Pipeline::parse({}, getExpCtx());
auto result =
makeTree<std::string>({{NamespaceString("unittests.pipeline_test"), std::string("input")}},
*pipeline,
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 4139cd1cd5a..912d96fe49f 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -116,7 +116,7 @@ void assertPipelineOptimizesAndSerializesTo(std::string inputPipeJson,
ctx->setResolvedNamespace(lookupCollNs, {lookupCollNs, std::vector<BSONObj>{}});
ctx->setResolvedNamespace(unionCollNs, {unionCollNs, std::vector<BSONObj>{}});
- auto outputPipe = uassertStatusOK(Pipeline::parse(request.getPipeline(), ctx));
+ auto outputPipe = Pipeline::parse(request.getPipeline(), ctx);
outputPipe->optimizePipeline();
ASSERT_VALUE_EQ(Value(outputPipe->writeExplainOps(ExplainOptions::Verbosity::kQueryPlanner)),
@@ -1679,7 +1679,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupSwapsWithIndependentMatch) {
auto matchPredicate = BSON("extra"
<< "predicate");
stages.push_back(DocumentSourceMatch::create(matchPredicate, expCtx));
- auto pipeline = uassertStatusOK(Pipeline::create(stages, expCtx));
+ auto pipeline = Pipeline::create(stages, expCtx);
pipeline->optimizePipeline();
// Make sure the $match stage has swapped before the change look up.
@@ -1704,7 +1704,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupDoesNotSwapWithMatchOnPostImage
stages.push_back(DocumentSourceMatch::create(
BSON(DocumentSourceLookupChangePostImage::kFullDocumentFieldName << BSONNULL), expCtx));
- auto pipeline = uassertStatusOK(Pipeline::create(stages, expCtx));
+ auto pipeline = Pipeline::create(stages, expCtx);
pipeline->optimizePipeline();
// Make sure the $match stage stays at the end.
@@ -1730,7 +1730,7 @@ TEST(PipelineOptimizationTest, FullDocumentBeforeChangeLookupSwapsWithIndependen
auto matchPredicate = BSON("extra"
<< "predicate");
stages.push_back(DocumentSourceMatch::create(matchPredicate, expCtx));
- auto pipeline = uassertStatusOK(Pipeline::create(stages, expCtx));
+ auto pipeline = Pipeline::create(stages, expCtx);
pipeline->optimizePipeline();
// Make sure the $match stage has swapped before the change look up.
@@ -1756,7 +1756,7 @@ TEST(PipelineOptimizationTest, FullDocumentBeforeChangeDoesNotSwapWithMatchOnPre
stages.push_back(DocumentSourceMatch::create(
BSON(DocumentSourceLookupChangePreImage::kFullDocumentBeforeChangeFieldName << BSONNULL),
expCtx));
- auto pipeline = uassertStatusOK(Pipeline::create(stages, expCtx));
+ auto pipeline = Pipeline::create(stages, expCtx);
pipeline->optimizePipeline();
// Make sure the $match stage stays at the end.
@@ -2095,7 +2095,7 @@ public:
ctx->setResolvedNamespace(lookupCollNs, {lookupCollNs, std::vector<BSONObj>{}});
// Test that we can both split the pipeline and reassemble it into its original form.
- mergePipe = uassertStatusOK(Pipeline::parse(request.getPipeline(), ctx));
+ mergePipe = Pipeline::parse(request.getPipeline(), ctx);
mergePipe->optimizePipeline();
auto splitPipeline = sharded_agg_helpers::splitPipeline(std::move(mergePipe));
@@ -2690,7 +2690,7 @@ TEST_F(PipelineMustRunOnMongoSTest, UnsplittablePipelineMustRunOnMongoS) {
auto match = DocumentSourceMatch::create(fromjson("{x: 5}"), expCtx);
auto runOnMongoS = DocumentSourceMustRunOnMongoS::create();
- auto pipeline = uassertStatusOK(Pipeline::create({match, runOnMongoS}, expCtx));
+ auto pipeline = Pipeline::create({match, runOnMongoS}, expCtx);
ASSERT_TRUE(pipeline->requiredToRunOnMongos());
pipeline->optimizePipeline();
@@ -2707,7 +2707,7 @@ TEST_F(PipelineMustRunOnMongoSTest, UnsplittableMongoSPipelineAssertsIfDisallowe
auto runOnMongoS = DocumentSourceMustRunOnMongoS::create();
auto sort = DocumentSourceSort::create(expCtx, fromjson("{x: 1}"));
- auto pipeline = uassertStatusOK(Pipeline::create({match, runOnMongoS, sort}, expCtx));
+ auto pipeline = Pipeline::create({match, runOnMongoS, sort}, expCtx);
pipeline->optimizePipeline();
// The entire pipeline must run on mongoS, but $sort cannot do so when 'allowDiskUse' is true.
@@ -2725,7 +2725,7 @@ DEATH_TEST_F(PipelineMustRunOnMongoSTest,
auto split = DocumentSourceInternalSplitPipeline::create(expCtx, HostTypeRequirement::kNone);
auto runOnMongoS = DocumentSourceMustRunOnMongoS::create();
- auto pipeline = uassertStatusOK(Pipeline::create({match, split, runOnMongoS}, expCtx));
+ auto pipeline = Pipeline::create({match, split, runOnMongoS}, expCtx);
// We don't need to run the entire pipeline on mongoS because we can split at
// $_internalSplitPipeline.
@@ -2766,7 +2766,7 @@ TEST_F(PipelineMustRunOnMongoSTest, SplitMongoSMergePipelineAssertsIfShardStageP
auto outSpec = fromjson("{$out: 'outcoll'}");
auto out = DocumentSourceOut::createFromBson(outSpec["$out"], expCtx);
- auto pipeline = uassertStatusOK(Pipeline::create({match, split, runOnMongoS, out}, expCtx));
+ auto pipeline = Pipeline::create({match, split, runOnMongoS, out}, expCtx);
// We don't need to run the entire pipeline on mongoS because we can split at
// $_internalSplitPipeline.
@@ -2789,7 +2789,7 @@ TEST_F(PipelineMustRunOnMongoSTest, SplittablePipelineAssertsIfMongoSStageOnShar
auto split =
DocumentSourceInternalSplitPipeline::create(expCtx, HostTypeRequirement::kAnyShard);
- auto pipeline = uassertStatusOK(Pipeline::create({match, runOnMongoS, split}, expCtx));
+ auto pipeline = Pipeline::create({match, runOnMongoS, split}, expCtx);
pipeline->optimizePipeline();
// The 'runOnMongoS' stage comes before any splitpoint, so this entire pipeline must run on
@@ -2808,7 +2808,7 @@ TEST_F(PipelineMustRunOnMongoSTest, SplittablePipelineRunsUnsplitOnMongoSIfSplit
auto runOnMongoS = DocumentSourceMustRunOnMongoS::create();
auto split = DocumentSourceInternalSplitPipeline::create(expCtx, HostTypeRequirement::kNone);
- auto pipeline = uassertStatusOK(Pipeline::create({match, runOnMongoS, split}, expCtx));
+ auto pipeline = Pipeline::create({match, runOnMongoS, split}, expCtx);
pipeline->optimizePipeline();
// The 'runOnMongoS' stage is before the splitpoint, so this entire pipeline must run on mongoS.
@@ -2827,7 +2827,7 @@ TEST(PipelineInitialSource, GeoNearInitialQuery) {
fromjson("{$geoNear: {distanceField: 'd', near: [0, 0], query: {a: 1}}}")};
intrusive_ptr<ExpressionContextForTest> ctx = new ExpressionContextForTest(
&_opCtx, AggregationRequest(NamespaceString("a.collection"), rawPipeline));
- auto pipe = uassertStatusOK(Pipeline::parse(rawPipeline, ctx));
+ auto pipe = Pipeline::parse(rawPipeline, ctx);
ASSERT_BSONOBJ_EQ(pipe->getInitialQuery(), BSON("a" << 1));
}
@@ -2837,7 +2837,7 @@ TEST(PipelineInitialSource, MatchInitialQuery) {
intrusive_ptr<ExpressionContextForTest> ctx = new ExpressionContextForTest(
&_opCtx, AggregationRequest(NamespaceString("a.collection"), rawPipeline));
- auto pipe = uassertStatusOK(Pipeline::parse(rawPipeline, ctx));
+ auto pipe = Pipeline::parse(rawPipeline, ctx);
ASSERT_BSONOBJ_EQ(pipe->getInitialQuery(), BSON("a" << 4));
}
@@ -2875,7 +2875,8 @@ TEST_F(PipelineValidateTest, AggregateOneNSNotValidForEmptyPipeline) {
ctx->ns = NamespaceString::makeCollectionlessAggregateNSS("a");
- ASSERT_NOT_OK(Pipeline::parse(rawPipeline, ctx).getStatus());
+ ASSERT_THROWS_CODE(
+ Pipeline::parse(rawPipeline, ctx), AssertionException, ErrorCodes::InvalidNamespace);
}
TEST_F(PipelineValidateTest, AggregateOneNSNotValidIfInitialStageRequiresCollection) {
@@ -2884,7 +2885,8 @@ TEST_F(PipelineValidateTest, AggregateOneNSNotValidIfInitialStageRequiresCollect
ctx->ns = NamespaceString::makeCollectionlessAggregateNSS("a");
- ASSERT_NOT_OK(Pipeline::parse(rawPipeline, ctx).getStatus());
+ ASSERT_THROWS_CODE(
+ Pipeline::parse(rawPipeline, ctx), AssertionException, ErrorCodes::InvalidNamespace);
}
TEST_F(PipelineValidateTest, AggregateOneNSValidIfInitialStageIsCollectionless) {
@@ -2893,7 +2895,7 @@ TEST_F(PipelineValidateTest, AggregateOneNSValidIfInitialStageIsCollectionless)
ctx->ns = NamespaceString::makeCollectionlessAggregateNSS("a");
- ASSERT_OK(Pipeline::create({collectionlessSource}, ctx).getStatus());
+ Pipeline::create({collectionlessSource}, ctx);
}
TEST_F(PipelineValidateTest, CollectionNSNotValidIfInitialStageIsCollectionless) {
@@ -2902,16 +2904,20 @@ TEST_F(PipelineValidateTest, CollectionNSNotValidIfInitialStageIsCollectionless)
ctx->ns = kTestNss;
- ASSERT_NOT_OK(Pipeline::create({collectionlessSource}, ctx).getStatus());
+ ASSERT_THROWS_CODE(Pipeline::parse({fromjson("{$listLocalSessions: {}}")},
+ ctx), // Pipeline::create({collectionlessSource}, ctx),
+ AssertionException,
+ ErrorCodes::InvalidNamespace);
}
TEST_F(PipelineValidateTest, AggregateOneNSValidForFacetPipelineRegardlessOfInitialStage) {
- const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {}}")};
+ const std::vector<BSONObj> rawPipeline = {fromjson("{$facet: {subPipe: [{$match: {}}]}}")};
auto ctx = getExpCtx();
ctx->ns = NamespaceString::makeCollectionlessAggregateNSS("unittests");
- ASSERT_OK(Pipeline::parseFacetPipeline(rawPipeline, ctx).getStatus());
+ ASSERT_THROWS_CODE(
+ Pipeline::parse(rawPipeline, ctx), AssertionException, ErrorCodes::InvalidNamespace);
}
TEST_F(PipelineValidateTest, ChangeStreamIsValidAsFirstStage) {
@@ -2919,7 +2925,7 @@ TEST_F(PipelineValidateTest, ChangeStreamIsValidAsFirstStage) {
auto ctx = getExpCtx();
setMockReplicationCoordinatorOnOpCtx(ctx->opCtx);
ctx->ns = NamespaceString("a.collection");
- ASSERT_OK(Pipeline::parse(rawPipeline, ctx).getStatus());
+ Pipeline::parse(rawPipeline, ctx);
}
TEST_F(PipelineValidateTest, ChangeStreamIsNotValidIfNotFirstStage) {
@@ -2928,19 +2934,17 @@ TEST_F(PipelineValidateTest, ChangeStreamIsNotValidIfNotFirstStage) {
auto ctx = getExpCtx();
setMockReplicationCoordinatorOnOpCtx(ctx->opCtx);
ctx->ns = NamespaceString("a.collection");
- auto parseStatus = Pipeline::parse(rawPipeline, ctx).getStatus();
- ASSERT_EQ(parseStatus, ErrorCodes::duplicateCodeForTest(40602));
+ ASSERT_THROWS_CODE(Pipeline::parse(rawPipeline, ctx), AssertionException, 40602);
}
TEST_F(PipelineValidateTest, ChangeStreamIsNotValidIfNotFirstStageInFacet) {
- const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"),
- fromjson("{$changeStream: {}}")};
+ const std::vector<BSONObj> rawPipeline = {
+ fromjson("{$facet: {subPipe: [{$match: {}}, {$changeStream: {}}]}}")};
+
auto ctx = getExpCtx();
setMockReplicationCoordinatorOnOpCtx(ctx->opCtx);
ctx->ns = NamespaceString("a.collection");
- auto parseStatus = Pipeline::parseFacetPipeline(rawPipeline, ctx).getStatus();
- ASSERT_EQ(parseStatus, ErrorCodes::duplicateCodeForTest(40600));
- ASSERT(std::string::npos != parseStatus.reason().find("$changeStream"));
+ ASSERT_THROWS_CODE(Pipeline::parse(rawPipeline, ctx), AssertionException, 40600);
}
class DocumentSourceDisallowedInTransactions : public DocumentSourceMock {
@@ -2964,8 +2968,6 @@ public:
};
TEST_F(PipelineValidateTest, TopLevelPipelineValidatedForStagesIllegalInTransactions) {
- BSONObj readConcernSnapshot = BSON("readConcern" << BSON("level"
- << "snapshot"));
auto ctx = getExpCtx();
ctx->inMultiDocumentTransaction = true;
@@ -2973,24 +2975,20 @@ TEST_F(PipelineValidateTest, TopLevelPipelineValidatedForStagesIllegalInTransact
// creation fails with the expected error code.
auto matchStage = DocumentSourceMatch::create(BSON("_id" << 3), ctx);
auto illegalStage = DocumentSourceDisallowedInTransactions::create();
- auto pipeline = Pipeline::create({matchStage, illegalStage}, ctx);
- ASSERT_NOT_OK(pipeline.getStatus());
- ASSERT_EQ(pipeline.getStatus(), ErrorCodes::OperationNotSupportedInTransaction);
+ ASSERT_THROWS_CODE(Pipeline::create({matchStage, illegalStage}, ctx),
+ AssertionException,
+ ErrorCodes::OperationNotSupportedInTransaction);
}
TEST_F(PipelineValidateTest, FacetPipelineValidatedForStagesIllegalInTransactions) {
- BSONObj readConcernSnapshot = BSON("readConcern" << BSON("level"
- << "snapshot"));
auto ctx = getExpCtx();
ctx->inMultiDocumentTransaction = true;
- // Make a pipeline with a legal $match, and then an illegal mock stage, and verify that pipeline
- // creation fails with the expected error code.
- auto matchStage = DocumentSourceMatch::create(BSON("_id" << 3), ctx);
- auto illegalStage = DocumentSourceDisallowedInTransactions::create();
- auto pipeline = Pipeline::createFacetPipeline({matchStage, illegalStage}, ctx);
- ASSERT_NOT_OK(pipeline.getStatus());
- ASSERT_EQ(pipeline.getStatus(), ErrorCodes::OperationNotSupportedInTransaction);
+ const std::vector<BSONObj> rawPipeline = {
+ fromjson("{$facet: {subPipe: [{$match: {}}, {$out: 'outColl'}]}}")};
+ ASSERT_THROWS_CODE(Pipeline::parse(rawPipeline, ctx),
+ AssertionException,
+ ErrorCodes::OperationNotSupportedInTransaction);
}
} // namespace pipeline_validate
@@ -3000,7 +2998,7 @@ namespace Dependencies {
using PipelineDependenciesTest = AggregationContextFixture;
TEST_F(PipelineDependenciesTest, EmptyPipelineShouldRequireWholeDocument) {
- auto pipeline = unittest::assertGet(Pipeline::create({}, getExpCtx()));
+ auto pipeline = Pipeline::create({}, getExpCtx());
auto depsTracker = pipeline->getDependencies(DepsTracker::kAllMetadata);
ASSERT_TRUE(depsTracker.needWholeDocument);
@@ -3096,7 +3094,7 @@ TEST_F(PipelineDependenciesTest, ShouldRequireWholeDocumentIfAnyStageDoesNotSupp
auto ctx = getExpCtx();
auto needsASeeNext = DocumentSourceNeedsASeeNext::create();
auto notSupported = DocumentSourceDependenciesNotSupported::create();
- auto pipeline = unittest::assertGet(Pipeline::create({needsASeeNext, notSupported}, ctx));
+ auto pipeline = Pipeline::create({needsASeeNext, notSupported}, ctx);
auto depsTracker = pipeline->getDependencies(DepsTracker::kAllMetadata);
ASSERT_TRUE(depsTracker.needWholeDocument);
@@ -3104,7 +3102,7 @@ TEST_F(PipelineDependenciesTest, ShouldRequireWholeDocumentIfAnyStageDoesNotSupp
ASSERT_FALSE(depsTracker.getNeedsMetadata(DocumentMetadataFields::kTextScore));
// Now in the other order.
- pipeline = unittest::assertGet(Pipeline::create({notSupported, needsASeeNext}, ctx));
+ pipeline = Pipeline::create({notSupported, needsASeeNext}, ctx);
depsTracker = pipeline->getDependencies(DepsTracker::kAllMetadata);
ASSERT_TRUE(depsTracker.needWholeDocument);
@@ -3113,7 +3111,7 @@ TEST_F(PipelineDependenciesTest, ShouldRequireWholeDocumentIfAnyStageDoesNotSupp
TEST_F(PipelineDependenciesTest, ShouldRequireWholeDocumentIfNoStageReturnsExhaustiveFields) {
auto ctx = getExpCtx();
auto needsASeeNext = DocumentSourceNeedsASeeNext::create();
- auto pipeline = unittest::assertGet(Pipeline::create({needsASeeNext}, ctx));
+ auto pipeline = Pipeline::create({needsASeeNext}, ctx);
auto depsTracker = pipeline->getDependencies(DepsTracker::kNoMetadata);
ASSERT_TRUE(depsTracker.needWholeDocument);
@@ -3123,7 +3121,7 @@ TEST_F(PipelineDependenciesTest, ShouldNotRequireWholeDocumentIfAnyStageReturnsE
auto ctx = getExpCtx();
auto needsASeeNext = DocumentSourceNeedsASeeNext::create();
auto needsOnlyB = DocumentSourceNeedsOnlyB::create();
- auto pipeline = unittest::assertGet(Pipeline::create({needsASeeNext, needsOnlyB}, ctx));
+ auto pipeline = Pipeline::create({needsASeeNext, needsOnlyB}, ctx);
auto depsTracker = pipeline->getDependencies(DepsTracker::kNoMetadata);
ASSERT_FALSE(depsTracker.needWholeDocument);
@@ -3136,7 +3134,7 @@ TEST_F(PipelineDependenciesTest, ShouldNotAddAnyRequiredFieldsAfterFirstStageWit
auto ctx = getExpCtx();
auto needsOnlyB = DocumentSourceNeedsOnlyB::create();
auto needsASeeNext = DocumentSourceNeedsASeeNext::create();
- auto pipeline = unittest::assertGet(Pipeline::create({needsOnlyB, needsASeeNext}, ctx));
+ auto pipeline = Pipeline::create({needsOnlyB, needsASeeNext}, ctx);
auto depsTracker = pipeline->getDependencies(DepsTracker::kAllMetadata);
ASSERT_FALSE(depsTracker.needWholeDocument);
@@ -3150,7 +3148,7 @@ TEST_F(PipelineDependenciesTest, ShouldNotAddAnyRequiredFieldsAfterFirstStageWit
TEST_F(PipelineDependenciesTest, ShouldNotRequireTextScoreIfThereIsNoScoreAvailable) {
auto ctx = getExpCtx();
- auto pipeline = unittest::assertGet(Pipeline::create({}, ctx));
+ auto pipeline = Pipeline::create({}, ctx);
auto depsTracker = pipeline->getDependencies(DepsTracker::kAllMetadata);
ASSERT_FALSE(depsTracker.getNeedsMetadata(DocumentMetadataFields::kTextScore));
@@ -3159,21 +3157,21 @@ TEST_F(PipelineDependenciesTest, ShouldNotRequireTextScoreIfThereIsNoScoreAvaila
TEST_F(PipelineDependenciesTest, ShouldThrowIfTextScoreIsNeededButNotPresent) {
auto ctx = getExpCtx();
auto needsText = DocumentSourceNeedsOnlyTextScore::create();
- auto pipeline = unittest::assertGet(Pipeline::create({needsText}, ctx));
+ auto pipeline = Pipeline::create({needsText}, ctx);
ASSERT_THROWS(pipeline->getDependencies(DepsTracker::kAllMetadata), AssertionException);
}
TEST_F(PipelineDependenciesTest, ShouldRequireTextScoreIfAvailableAndNoStageReturnsExhaustiveMeta) {
auto ctx = getExpCtx();
- auto pipeline = unittest::assertGet(Pipeline::create({}, ctx));
+ auto pipeline = Pipeline::create({}, ctx);
auto depsTracker =
pipeline->getDependencies(DepsTracker::kAllMetadata & ~DepsTracker::kOnlyTextScore);
ASSERT_TRUE(depsTracker.getNeedsMetadata(DocumentMetadataFields::kTextScore));
auto needsASeeNext = DocumentSourceNeedsASeeNext::create();
- pipeline = unittest::assertGet(Pipeline::create({needsASeeNext}, ctx));
+ pipeline = Pipeline::create({needsASeeNext}, ctx);
depsTracker =
pipeline->getDependencies(DepsTracker::kAllMetadata & ~DepsTracker::kOnlyTextScore);
ASSERT_TRUE(depsTracker.getNeedsMetadata(DocumentMetadataFields::kTextScore));
@@ -3183,7 +3181,7 @@ TEST_F(PipelineDependenciesTest, ShouldNotRequireTextScoreIfAvailableButDefinite
auto ctx = getExpCtx();
auto stripsTextScore = DocumentSourceStripsTextScore::create();
auto needsText = DocumentSourceNeedsOnlyTextScore::create();
- auto pipeline = unittest::assertGet(Pipeline::create({stripsTextScore, needsText}, ctx));
+ auto pipeline = Pipeline::create({stripsTextScore, needsText}, ctx);
auto depsTracker =
pipeline->getDependencies(DepsTracker::kAllMetadata & ~DepsTracker::kOnlyTextScore);
@@ -3198,8 +3196,7 @@ TEST_F(PipelineDependenciesTest, ShouldNotRequireTextScoreIfAvailableButDefinite
namespace {
TEST(PipelineRenameTracking, ReportsIdentityMapWhenEmpty) {
boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest());
- auto pipeline =
- unittest::assertGet(Pipeline::create({DocumentSourceMock::createForTest()}, expCtx));
+ auto pipeline = Pipeline::create({DocumentSourceMock::createForTest()}, expCtx);
{
// Tracking renames backwards.
auto renames = semantic_analysis::renamedPaths(
@@ -3243,8 +3240,8 @@ TEST(PipelineRenameTracking, ReportsIdentityWhenNoStageModifiesAnything) {
boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest());
{
// Tracking renames backwards.
- auto pipeline = unittest::assertGet(Pipeline::create(
- {DocumentSourceMock::createForTest(), NoModifications::create()}, expCtx));
+ auto pipeline = Pipeline::create(
+ {DocumentSourceMock::createForTest(), NoModifications::create()}, expCtx);
auto renames = semantic_analysis::renamedPaths(
pipeline->getSources().crbegin(), pipeline->getSources().crend(), {"a", "b", "c.d"});
ASSERT(static_cast<bool>(renames));
@@ -3256,8 +3253,8 @@ TEST(PipelineRenameTracking, ReportsIdentityWhenNoStageModifiesAnything) {
}
{
// Tracking renames forwards.
- auto pipeline = unittest::assertGet(Pipeline::create(
- {DocumentSourceMock::createForTest(), NoModifications::create()}, expCtx));
+ auto pipeline = Pipeline::create(
+ {DocumentSourceMock::createForTest(), NoModifications::create()}, expCtx);
auto renames = semantic_analysis::renamedPaths(
pipeline->getSources().cbegin(), pipeline->getSources().cend(), {"a", "b", "c.d"});
ASSERT(static_cast<bool>(renames));
@@ -3269,11 +3266,11 @@ TEST(PipelineRenameTracking, ReportsIdentityWhenNoStageModifiesAnything) {
}
{
// Tracking renames backwards.
- auto pipeline = unittest::assertGet(Pipeline::create({DocumentSourceMock::createForTest(),
- NoModifications::create(),
- NoModifications::create(),
- NoModifications::create()},
- expCtx));
+ auto pipeline = Pipeline::create({DocumentSourceMock::createForTest(),
+ NoModifications::create(),
+ NoModifications::create(),
+ NoModifications::create()},
+ expCtx);
auto renames = semantic_analysis::renamedPaths(
pipeline->getSources().crbegin(), pipeline->getSources().crend(), {"a", "b", "c.d"});
auto nameMap = *renames;
@@ -3284,11 +3281,11 @@ TEST(PipelineRenameTracking, ReportsIdentityWhenNoStageModifiesAnything) {
}
{
// Tracking renames forwards.
- auto pipeline = unittest::assertGet(Pipeline::create({DocumentSourceMock::createForTest(),
- NoModifications::create(),
- NoModifications::create(),
- NoModifications::create()},
- expCtx));
+ auto pipeline = Pipeline::create({DocumentSourceMock::createForTest(),
+ NoModifications::create(),
+ NoModifications::create(),
+ NoModifications::create()},
+ expCtx);
auto renames = semantic_analysis::renamedPaths(
pipeline->getSources().cbegin(), pipeline->getSources().cend(), {"a", "b", "c.d"});
auto nameMap = *renames;
@@ -3316,11 +3313,11 @@ public:
TEST(PipelineRenameTracking, DoesNotReportRenamesIfAStageDoesNotSupportTrackingThem) {
boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest());
- auto pipeline = unittest::assertGet(Pipeline::create({DocumentSourceMock::createForTest(),
- NoModifications::create(),
- NotSupported::create(),
- NoModifications::create()},
- expCtx));
+ auto pipeline = Pipeline::create({DocumentSourceMock::createForTest(),
+ NoModifications::create(),
+ NotSupported::create(),
+ NoModifications::create()},
+ expCtx);
// Backwards case.
ASSERT_FALSE(static_cast<bool>(semantic_analysis::renamedPaths(
pipeline->getSources().crbegin(), pipeline->getSources().crend(), {"a"})));
@@ -3350,8 +3347,8 @@ public:
TEST(PipelineRenameTracking, ReportsNewNamesWhenSingleStageRenames) {
boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest());
- auto pipeline = unittest::assertGet(
- Pipeline::create({DocumentSourceMock::createForTest(), RenamesAToB::create()}, expCtx));
+ auto pipeline =
+ Pipeline::create({DocumentSourceMock::createForTest(), RenamesAToB::create()}, expCtx);
{
// Tracking backwards.
auto renames = semantic_analysis::renamedPaths(
@@ -3416,8 +3413,8 @@ TEST(PipelineRenameTracking, ReportsNewNamesWhenSingleStageRenames) {
TEST(PipelineRenameTracking, ReportsIdentityMapWhenGivenEmptyIteratorRange) {
boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest());
- auto pipeline = unittest::assertGet(
- Pipeline::create({DocumentSourceMock::createForTest(), RenamesAToB::create()}, expCtx));
+ auto pipeline =
+ Pipeline::create({DocumentSourceMock::createForTest(), RenamesAToB::create()}, expCtx);
{
// Tracking backwards.
auto renames = semantic_analysis::renamedPaths(
@@ -3474,9 +3471,9 @@ TEST(PipelineRenameTracking, ReportsNewNameAcrossMultipleRenames) {
boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest());
{
// Tracking backwards.
- auto pipeline = unittest::assertGet(Pipeline::create(
+ auto pipeline = Pipeline::create(
{DocumentSourceMock::createForTest(), RenamesAToB::create(), RenamesBToC::create()},
- expCtx));
+ expCtx);
auto stages = pipeline->getSources();
auto renames = semantic_analysis::renamedPaths(stages.crbegin(), stages.crend(), {"c"});
ASSERT(static_cast<bool>(renames));
@@ -3486,9 +3483,9 @@ TEST(PipelineRenameTracking, ReportsNewNameAcrossMultipleRenames) {
}
{
// Tracking forwards.
- auto pipeline = unittest::assertGet(Pipeline::create(
+ auto pipeline = Pipeline::create(
{DocumentSourceMock::createForTest(), RenamesAToB::create(), RenamesBToC::create()},
- expCtx));
+ expCtx);
auto stages = pipeline->getSources();
auto renames = semantic_analysis::renamedPaths(stages.cbegin(), stages.cend(), {"a"});
ASSERT(static_cast<bool>(renames));
@@ -3513,9 +3510,9 @@ TEST(PipelineRenameTracking, CanHandleBackAndForthRename) {
boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest());
{
// Tracking backwards.
- auto pipeline = unittest::assertGet(Pipeline::create(
+ auto pipeline = Pipeline::create(
{DocumentSourceMock::createForTest(), RenamesAToB::create(), RenamesBToA::create()},
- expCtx));
+ expCtx);
auto stages = pipeline->getSources();
auto renames = semantic_analysis::renamedPaths(stages.crbegin(), stages.crend(), {"a"});
ASSERT(static_cast<bool>(renames));
@@ -3525,9 +3522,9 @@ TEST(PipelineRenameTracking, CanHandleBackAndForthRename) {
}
{
// Tracking forwards.
- auto pipeline = unittest::assertGet(Pipeline::create(
+ auto pipeline = Pipeline::create(
{DocumentSourceMock::createForTest(), RenamesAToB::create(), RenamesBToA::create()},
- expCtx));
+ expCtx);
auto stages = pipeline->getSources();
auto renames = semantic_analysis::renamedPaths(stages.cbegin(), stages.cend(), {"a"});
ASSERT(static_cast<bool>(renames));
@@ -3539,12 +3536,12 @@ TEST(PipelineRenameTracking, CanHandleBackAndForthRename) {
TEST(InvolvedNamespacesTest, NoInvolvedNamespacesForMatchSortProject) {
boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest());
- auto pipeline = unittest::assertGet(Pipeline::create(
+ auto pipeline = Pipeline::create(
{DocumentSourceMock::createForTest(),
DocumentSourceMatch::create(BSON("x" << 1), expCtx),
DocumentSourceSort::create(expCtx, BSON("y" << -1)),
DocumentSourceProject::create(BSON("x" << 1 << "y" << 1), expCtx, "$project"_sd)},
- expCtx));
+ expCtx);
auto involvedNssSet = pipeline->getInvolvedCollections();
ASSERT(involvedNssSet.empty());
}
@@ -3556,10 +3553,10 @@ TEST(InvolvedNamespacesTest, IncludesLookupNamespace) {
expCtx->setResolvedNamespace(lookupNss, {resolvedNss, vector<BSONObj>{}});
auto lookupSpec =
fromjson("{$lookup: {from: 'foo', as: 'x', localField: 'foo_id', foreignField: '_id'}}");
- auto pipeline = unittest::assertGet(
+ auto pipeline =
Pipeline::create({DocumentSourceMock::createForTest(),
DocumentSourceLookUp::createFromBson(lookupSpec.firstElement(), expCtx)},
- expCtx));
+ expCtx);
auto involvedNssSet = pipeline->getInvolvedCollections();
ASSERT_EQ(involvedNssSet.size(), 1UL);
@@ -3579,10 +3576,10 @@ TEST(InvolvedNamespacesTest, IncludesGraphLookupNamespace) {
" connectToField: 'y',"
" startWith: '$start'"
"}}");
- auto pipeline = unittest::assertGet(Pipeline::create(
+ auto pipeline = Pipeline::create(
{DocumentSourceMock::createForTest(),
DocumentSourceGraphLookUp::createFromBson(graphLookupSpec.firstElement(), expCtx)},
- expCtx));
+ expCtx);
auto involvedNssSet = pipeline->getInvolvedCollections();
ASSERT_EQ(involvedNssSet.size(), 1UL);
@@ -3603,10 +3600,10 @@ TEST(InvolvedNamespacesTest, IncludesLookupSubpipelineNamespaces) {
" as: 'x', "
" pipeline: [{$lookup: {from: 'foo_inner', as: 'y', pipeline: []}}]"
"}}");
- auto pipeline = unittest::assertGet(
+ auto pipeline =
Pipeline::create({DocumentSourceMock::createForTest(),
DocumentSourceLookUp::createFromBson(lookupSpec.firstElement(), expCtx)},
- expCtx));
+ expCtx);
auto involvedNssSet = pipeline->getInvolvedCollections();
ASSERT_EQ(involvedNssSet.size(), 2UL);
@@ -3634,10 +3631,10 @@ TEST(InvolvedNamespacesTest, IncludesGraphLookupSubPipeline) {
" connectToField: 'y',"
" startWith: '$start'"
"}}");
- auto pipeline = unittest::assertGet(Pipeline::create(
+ auto pipeline = Pipeline::create(
{DocumentSourceMock::createForTest(),
DocumentSourceGraphLookUp::createFromBson(graphLookupSpec.firstElement(), expCtx)},
- expCtx));
+ expCtx);
auto involvedNssSet = pipeline->getInvolvedCollections();
ASSERT_EQ(involvedNssSet.size(), 2UL);
@@ -3677,10 +3674,10 @@ TEST(InvolvedNamespacesTest, IncludesAllCollectionsWhenResolvingViews) {
" }}"
" ]"
"}}");
- auto pipeline = unittest::assertGet(
+ auto pipeline =
Pipeline::create({DocumentSourceMock::createForTest(),
DocumentSourceFacet::createFromBson(facetSpec.firstElement(), expCtx)},
- expCtx));
+ expCtx);
auto involvedNssSet = pipeline->getInvolvedCollections();
ASSERT_EQ(involvedNssSet.size(), 3UL);
diff --git a/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp
index 782ad6e56ef..b88b40e989d 100644
--- a/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp
@@ -38,7 +38,6 @@
namespace mongo {
-
std::unique_ptr<Pipeline, PipelineDeleter>
StubLookupSingleDocumentProcessInterface::attachCursorSourceToPipelineForLocalRead(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline) {
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index e288214f60d..291b477341e 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -627,7 +627,7 @@ SplitPipeline splitPipeline(std::unique_ptr<Pipeline, PipelineDeleter> pipeline)
Pipeline::SourceContainer shardStages;
boost::optional<BSONObj> inputsSort = findSplitPoint(&shardStages, mergePipeline.get());
- auto shardsPipeline = uassertStatusOK(Pipeline::create(std::move(shardStages), expCtx));
+ auto shardsPipeline = Pipeline::create(std::move(shardStages), expCtx);
// The order in which optimizations are applied can have significant impact on the efficiency of
// the final pipeline. Be Careful!
@@ -987,7 +987,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
} else {
// We have not split the pipeline, and will execute entirely on the remote shards. Set up an
// empty local pipeline which we will attach the merge cursors stage to.
- mergePipeline = uassertStatusOK(Pipeline::parse(std::vector<BSONObj>(), expCtx));
+ mergePipeline = Pipeline::parse(std::vector<BSONObj>(), expCtx);
}
addMergeCursorsSource(mergePipeline.get(),