summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_facet_test.cpp
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2017-04-05 11:35:23 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2017-04-13 16:15:20 -0400
commitcc954e9e1d88b30d1ab89ee3bbbd9db0bb15263d (patch)
tree37df000f0d37d17bc82d5d1ad5436b4911249e4b /src/mongo/db/pipeline/document_source_facet_test.cpp
parentb02b7f7bb78d4fd0bb006591769faaa216e6f8a7 (diff)
downloadmongo-cc954e9e1d88b30d1ab89ee3bbbd9db0bb15263d.tar.gz
SERVER-25694 Eliminate race in PlanExecutor cleanup.
Ensures that a collection lock is held in at least MODE_IS while deregistering a PlanExecutor from the cursor manager. Introduces new PlanExecutor::dispose() and ClientCursor::dispose() methods that must be called before destruction of those classes, and ensures they are called before destruction. These calls will thread an OperationContext all the way through to DocumentSource::dispose() for each stage in a Pipeline, which will give DocumentSourceCursor a chance to acquire locks and deregister its PlanExecutor if necessary.
Diffstat (limited to 'src/mongo/db/pipeline/document_source_facet_test.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_facet_test.cpp129
1 files changed, 90 insertions, 39 deletions
diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp
index 733634b8994..54c22a306fc 100644
--- a/src/mongo/db/pipeline/document_source_facet_test.cpp
+++ b/src/mongo/db/pipeline/document_source_facet_test.cpp
@@ -186,17 +186,19 @@ public:
TEST_F(DocumentSourceFacetTest, SingleFacetShouldReceiveAllDocuments) {
auto ctx = getExpCtx();
+ deque<DocumentSource::GetNextResult> inputs = {
+ Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}};
+ auto mock = DocumentSourceMock::create(inputs);
+
auto dummy = DocumentSourcePassthrough::create();
auto statusWithPipeline = Pipeline::create({dummy}, ctx);
ASSERT_OK(statusWithPipeline.getStatus());
auto pipeline = std::move(statusWithPipeline.getValue());
- auto facetStage = DocumentSourceFacet::create({{"results", pipeline}}, ctx);
-
- deque<DocumentSource::GetNextResult> inputs = {
- Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}};
- auto mock = DocumentSourceMock::create(inputs);
+ std::vector<DocumentSourceFacet::FacetPipeline> facets;
+ facets.emplace_back("results", std::move(pipeline));
+ auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
facetStage->setSource(mock.get());
auto output = facetStage->getNext();
@@ -213,18 +215,21 @@ TEST_F(DocumentSourceFacetTest, SingleFacetShouldReceiveAllDocuments) {
TEST_F(DocumentSourceFacetTest, MultipleFacetsShouldSeeTheSameDocuments) {
auto ctx = getExpCtx();
+ deque<DocumentSource::GetNextResult> inputs = {
+ Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}};
+ auto mock = DocumentSourceMock::create(inputs);
+
auto firstDummy = DocumentSourcePassthrough::create();
auto firstPipeline = uassertStatusOK(Pipeline::create({firstDummy}, ctx));
auto secondDummy = DocumentSourcePassthrough::create();
auto secondPipeline = uassertStatusOK(Pipeline::create({secondDummy}, ctx));
- auto facetStage =
- DocumentSourceFacet::create({{"first", firstPipeline}, {"second", secondPipeline}}, ctx);
+ std::vector<DocumentSourceFacet::FacetPipeline> facets;
+ facets.emplace_back("first", std::move(firstPipeline));
+ facets.emplace_back("second", std::move(secondPipeline));
+ auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
- deque<DocumentSource::GetNextResult> inputs = {
- Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}};
- auto mock = DocumentSourceMock::create(inputs);
facetStage->setSource(mock.get());
auto output = facetStage->getNext();
@@ -249,18 +254,21 @@ TEST_F(DocumentSourceFacetTest,
ShouldCorrectlyHandleSubPipelinesYieldingDifferentNumbersOfResults) {
auto ctx = getExpCtx();
+ deque<DocumentSource::GetNextResult> inputs = {
+ Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}, Document{{"_id", 3}}};
+ auto mock = DocumentSourceMock::create(inputs);
+
auto passthrough = DocumentSourcePassthrough::create();
auto passthroughPipe = uassertStatusOK(Pipeline::create({passthrough}, ctx));
auto limit = DocumentSourceLimit::create(ctx, 1);
auto limitedPipe = uassertStatusOK(Pipeline::create({limit}, ctx));
- auto facetStage =
- DocumentSourceFacet::create({{"all", passthroughPipe}, {"first", limitedPipe}}, ctx);
+ std::vector<DocumentSourceFacet::FacetPipeline> facets;
+ facets.emplace_back("all", std::move(passthroughPipe));
+ facets.emplace_back("first", std::move(limitedPipe));
+ auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
- deque<DocumentSource::GetNextResult> inputs = {
- Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}, Document{{"_id", 3}}};
- auto mock = DocumentSourceMock::create(inputs);
facetStage->setSource(mock.get());
vector<Value> expectedPassthroughOutput;
@@ -285,14 +293,17 @@ TEST_F(DocumentSourceFacetTest,
TEST_F(DocumentSourceFacetTest, ShouldBeAbleToEvaluateMultipleStagesWithinOneSubPipeline) {
auto ctx = getExpCtx();
+ deque<DocumentSource::GetNextResult> inputs = {Document{{"_id", 0}}, Document{{"_id", 1}}};
+ auto mock = DocumentSourceMock::create(inputs);
+
auto firstDummy = DocumentSourcePassthrough::create();
auto secondDummy = DocumentSourcePassthrough::create();
auto pipeline = uassertStatusOK(Pipeline::create({firstDummy, secondDummy}, ctx));
- auto facetStage = DocumentSourceFacet::create({{"subPipe", pipeline}}, ctx);
+ std::vector<DocumentSourceFacet::FacetPipeline> facets;
+ facets.emplace_back("subPipe", std::move(pipeline));
+ auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
- deque<DocumentSource::GetNextResult> inputs = {Document{{"_id", 0}}, Document{{"_id", 1}}};
- auto mock = DocumentSourceMock::create(inputs);
facetStage->setSource(mock.get());
auto output = facetStage->getNext();
@@ -300,18 +311,41 @@ TEST_F(DocumentSourceFacetTest, ShouldBeAbleToEvaluateMultipleStagesWithinOneSub
ASSERT_DOCUMENT_EQ(output.getDocument(), Document(fromjson("{subPipe: [{_id: 0}, {_id: 1}]}")));
}
+TEST_F(DocumentSourceFacetTest, ShouldPropagateDisposeThroughToSource) {
+ auto ctx = getExpCtx();
+
+ auto mockSource = DocumentSourceMock::create();
+
+ auto firstDummy = DocumentSourcePassthrough::create();
+ auto firstPipe = uassertStatusOK(Pipeline::create({firstDummy}, ctx));
+ auto secondDummy = DocumentSourcePassthrough::create();
+ auto secondPipe = uassertStatusOK(Pipeline::create({secondDummy}, ctx));
+
+ std::vector<DocumentSourceFacet::FacetPipeline> facets;
+ facets.emplace_back("firstPipe", std::move(firstPipe));
+ facets.emplace_back("secondPipe", std::move(secondPipe));
+ auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
+
+ facetStage->setSource(mockSource.get());
+
+ facetStage->dispose();
+ ASSERT_TRUE(mockSource->isDisposed);
+}
+
// TODO: DocumentSourceFacet will have to propagate pauses if we ever allow nested $facets.
DEATH_TEST_F(DocumentSourceFacetTest,
ShouldFailIfGivenPausedInput,
"Invariant failure !input.isPaused()") {
auto ctx = getExpCtx();
+ auto mock = DocumentSourceMock::create(DocumentSource::GetNextResult::makePauseExecution());
auto firstDummy = DocumentSourcePassthrough::create();
auto pipeline = uassertStatusOK(Pipeline::create({firstDummy}, ctx));
- auto facetStage = DocumentSourceFacet::create({{"subPipe", pipeline}}, ctx);
+ std::vector<DocumentSourceFacet::FacetPipeline> facets;
+ facets.emplace_back("subPipe", std::move(pipeline));
+ auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
- auto mock = DocumentSourceMock::create(DocumentSource::GetNextResult::makePauseExecution());
facetStage->setSource(mock.get());
facetStage->getNext(); // This should cause a crash.
@@ -335,8 +369,10 @@ TEST_F(DocumentSourceFacetTest, ShouldBeAbleToReParseSerializedStage) {
auto secondSkip = DocumentSourceSkip::create(ctx, 2);
auto secondPipeline = uassertStatusOK(Pipeline::create({secondSkip}, ctx));
- auto facetStage = DocumentSourceFacet::create(
- {{"skippedOne", firstPipeline}, {"skippedTwo", secondPipeline}}, ctx);
+ std::vector<DocumentSourceFacet::FacetPipeline> facets;
+ facets.emplace_back("skippedOne", std::move(firstPipeline));
+ facets.emplace_back("skippedTwo", std::move(secondPipeline));
+ auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
// Serialize the facet stage.
vector<Value> serialization;
@@ -373,7 +409,9 @@ TEST_F(DocumentSourceFacetTest, ShouldOptimizeInnerPipelines) {
auto dummy = DocumentSourcePassthrough::create();
auto pipeline = unittest::assertGet(Pipeline::create({dummy}, ctx));
- auto facetStage = DocumentSourceFacet::create({{"subPipe", pipeline}}, ctx);
+ std::vector<DocumentSourceFacet::FacetPipeline> facets;
+ facets.emplace_back("subPipe", std::move(pipeline));
+ auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
ASSERT_FALSE(dummy->isOptimized);
facetStage->optimize();
@@ -389,8 +427,10 @@ TEST_F(DocumentSourceFacetTest, ShouldPropogateDetachingAndReattachingOfOpCtx) {
auto secondDummy = DocumentSourcePassthrough::create();
auto secondPipeline = unittest::assertGet(Pipeline::create({secondDummy}, ctx));
- auto facetStage =
- DocumentSourceFacet::create({{"one", firstPipeline}, {"two", secondPipeline}}, ctx);
+ std::vector<DocumentSourceFacet::FacetPipeline> facets;
+ facets.emplace_back("one", std::move(firstPipeline));
+ facets.emplace_back("two", std::move(secondPipeline));
+ auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
// Test detaching.
ASSERT_FALSE(firstDummy->isDetachedFromOpCtx);
@@ -456,8 +496,10 @@ TEST_F(DocumentSourceFacetTest, ShouldUnionDependenciesOfInnerPipelines) {
ASSERT_EQ(secondPipelineDeps.fields.size(), 1UL);
ASSERT_EQ(secondPipelineDeps.fields.count("b"), 1UL);
- auto facetStage =
- DocumentSourceFacet::create({{"needsA", firstPipeline}, {"needsB", secondPipeline}}, ctx);
+ std::vector<DocumentSourceFacet::FacetPipeline> facets;
+ facets.emplace_back("needsA", std::move(firstPipeline));
+ facets.emplace_back("needsB", std::move(secondPipeline));
+ auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
DepsTracker deps(DepsTracker::MetadataAvailable::kNoMetadata);
ASSERT_EQ(facetStage->getDependencies(&deps), DocumentSource::GetDepsReturn::EXHAUSTIVE_ALL);
@@ -491,8 +533,10 @@ TEST_F(DocumentSourceFacetTest, ShouldRequireWholeDocumentIfAnyPipelineRequiresW
auto needsWholeDocument = DocumentSourceNeedsWholeDocument::create();
auto secondPipeline = unittest::assertGet(Pipeline::create({needsWholeDocument}, ctx));
- auto facetStage = DocumentSourceFacet::create(
- {{"needsA", firstPipeline}, {"needsWholeDocument", secondPipeline}}, ctx);
+ std::vector<DocumentSourceFacet::FacetPipeline> facets;
+ facets.emplace_back("needsA", std::move(firstPipeline));
+ facets.emplace_back("needsWholeDocument", std::move(secondPipeline));
+ auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
DepsTracker deps(DepsTracker::MetadataAvailable::kNoMetadata);
ASSERT_EQ(facetStage->getDependencies(&deps), DocumentSource::GetDepsReturn::EXHAUSTIVE_ALL);
@@ -526,10 +570,11 @@ TEST_F(DocumentSourceFacetTest, ShouldRequireTextScoreIfAnyPipelineRequiresTextS
auto needsTextScore = DocumentSourceNeedsOnlyTextScore::create();
auto thirdPipeline = unittest::assertGet(Pipeline::create({needsTextScore}, ctx));
- auto facetStage = DocumentSourceFacet::create({{"needsA", firstPipeline},
- {"needsWholeDocument", secondPipeline},
- {"needsTextScore", thirdPipeline}},
- ctx);
+ std::vector<DocumentSourceFacet::FacetPipeline> facets;
+ facets.emplace_back("needsA", std::move(firstPipeline));
+ facets.emplace_back("needsWholeDocument", std::move(secondPipeline));
+ facets.emplace_back("needsTextScore", std::move(thirdPipeline));
+ auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
DepsTracker deps(DepsTracker::MetadataAvailable::kTextScore);
ASSERT_EQ(facetStage->getDependencies(&deps), DocumentSource::GetDepsReturn::EXHAUSTIVE_ALL);
@@ -546,8 +591,10 @@ TEST_F(DocumentSourceFacetTest, ShouldThrowIfAnyPipelineRequiresTextScoreButItIs
auto needsTextScore = DocumentSourceNeedsOnlyTextScore::create();
auto secondPipeline = unittest::assertGet(Pipeline::create({needsTextScore}, ctx));
- auto facetStage = DocumentSourceFacet::create(
- {{"needsA", firstPipeline}, {"needsTextScore", secondPipeline}}, ctx);
+ std::vector<DocumentSourceFacet::FacetPipeline> facets;
+ facets.emplace_back("needsA", std::move(firstPipeline));
+ facets.emplace_back("needsTextScore", std::move(secondPipeline));
+ auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
DepsTracker deps(DepsTracker::MetadataAvailable::kNoMetadata);
ASSERT_THROWS(facetStage->getDependencies(&deps), UserException);
@@ -576,8 +623,10 @@ TEST_F(DocumentSourceFacetTest, ShouldRequirePrimaryShardIfAnyStageRequiresPrima
auto needsPrimaryShard = DocumentSourceNeedsPrimaryShard::create();
auto secondPipeline = unittest::assertGet(Pipeline::create({needsPrimaryShard}, ctx));
- auto facetStage = DocumentSourceFacet::create(
- {{"passthrough", firstPipeline}, {"needsPrimaryShard", secondPipeline}}, ctx);
+ std::vector<DocumentSourceFacet::FacetPipeline> facets;
+ facets.emplace_back("passthrough", std::move(firstPipeline));
+ facets.emplace_back("needsPrimaryShard", std::move(secondPipeline));
+ auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
ASSERT_TRUE(facetStage->needsPrimaryShard());
}
@@ -591,8 +640,10 @@ TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPr
auto secondPassthrough = DocumentSourcePassthrough::create();
auto secondPipeline = unittest::assertGet(Pipeline::create({secondPassthrough}, ctx));
- auto facetStage =
- DocumentSourceFacet::create({{"first", firstPipeline}, {"second", secondPipeline}}, ctx);
+ std::vector<DocumentSourceFacet::FacetPipeline> facets;
+ facets.emplace_back("first", std::move(firstPipeline));
+ facets.emplace_back("second", std::move(secondPipeline));
+ auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
ASSERT_FALSE(facetStage->needsPrimaryShard());
}