diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-04-05 11:35:23 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-04-13 16:15:20 -0400 |
commit | cc954e9e1d88b30d1ab89ee3bbbd9db0bb15263d (patch) | |
tree | 37df000f0d37d17bc82d5d1ad5436b4911249e4b /src/mongo/db/pipeline/document_source_facet_test.cpp | |
parent | b02b7f7bb78d4fd0bb006591769faaa216e6f8a7 (diff) | |
download | mongo-cc954e9e1d88b30d1ab89ee3bbbd9db0bb15263d.tar.gz |
SERVER-25694 Eliminate race in PlanExecutor cleanup.
Ensures that a collection lock is held in at least MODE_IS while
deregistering a PlanExecutor from the cursor manager. Introduces new
PlanExecutor::dispose() and ClientCursor::dispose() methods that must be
called before destruction of those classes, and ensures they are called
before destruction. These calls will thread an OperationContext all the
way through to DocumentSource::dispose() for each stage in a Pipeline,
which will give DocumentSourceCursor a chance to acquire locks and
deregister its PlanExecutor if necessary.
Diffstat (limited to 'src/mongo/db/pipeline/document_source_facet_test.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_facet_test.cpp | 129 |
1 files changed, 90 insertions, 39 deletions
diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp index 733634b8994..54c22a306fc 100644 --- a/src/mongo/db/pipeline/document_source_facet_test.cpp +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -186,17 +186,19 @@ public: TEST_F(DocumentSourceFacetTest, SingleFacetShouldReceiveAllDocuments) { auto ctx = getExpCtx(); + deque<DocumentSource::GetNextResult> inputs = { + Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}}; + auto mock = DocumentSourceMock::create(inputs); + auto dummy = DocumentSourcePassthrough::create(); auto statusWithPipeline = Pipeline::create({dummy}, ctx); ASSERT_OK(statusWithPipeline.getStatus()); auto pipeline = std::move(statusWithPipeline.getValue()); - auto facetStage = DocumentSourceFacet::create({{"results", pipeline}}, ctx); - - deque<DocumentSource::GetNextResult> inputs = { - Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}}; - auto mock = DocumentSourceMock::create(inputs); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("results", std::move(pipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); facetStage->setSource(mock.get()); auto output = facetStage->getNext(); @@ -213,18 +215,21 @@ TEST_F(DocumentSourceFacetTest, SingleFacetShouldReceiveAllDocuments) { TEST_F(DocumentSourceFacetTest, MultipleFacetsShouldSeeTheSameDocuments) { auto ctx = getExpCtx(); + deque<DocumentSource::GetNextResult> inputs = { + Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}}; + auto mock = DocumentSourceMock::create(inputs); + auto firstDummy = DocumentSourcePassthrough::create(); auto firstPipeline = uassertStatusOK(Pipeline::create({firstDummy}, ctx)); auto secondDummy = DocumentSourcePassthrough::create(); auto secondPipeline = uassertStatusOK(Pipeline::create({secondDummy}, ctx)); - auto facetStage = - DocumentSourceFacet::create({{"first", firstPipeline}, {"second", secondPipeline}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("first", std::move(firstPipeline)); + facets.emplace_back("second", std::move(secondPipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); - deque<DocumentSource::GetNextResult> inputs = { - Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}}; - auto mock = DocumentSourceMock::create(inputs); facetStage->setSource(mock.get()); auto output = facetStage->getNext(); @@ -249,18 +254,21 @@ TEST_F(DocumentSourceFacetTest, ShouldCorrectlyHandleSubPipelinesYieldingDifferentNumbersOfResults) { auto ctx = getExpCtx(); + deque<DocumentSource::GetNextResult> inputs = { + Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}, Document{{"_id", 3}}}; + auto mock = DocumentSourceMock::create(inputs); + auto passthrough = DocumentSourcePassthrough::create(); auto passthroughPipe = uassertStatusOK(Pipeline::create({passthrough}, ctx)); auto limit = DocumentSourceLimit::create(ctx, 1); auto limitedPipe = uassertStatusOK(Pipeline::create({limit}, ctx)); - auto facetStage = - DocumentSourceFacet::create({{"all", passthroughPipe}, {"first", limitedPipe}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("all", std::move(passthroughPipe)); + facets.emplace_back("first", std::move(limitedPipe)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); - deque<DocumentSource::GetNextResult> inputs = { - Document{{"_id", 0}}, Document{{"_id", 1}}, Document{{"_id", 2}}, Document{{"_id", 3}}}; - auto mock = DocumentSourceMock::create(inputs); facetStage->setSource(mock.get()); vector<Value> expectedPassthroughOutput; @@ -285,14 +293,17 @@ TEST_F(DocumentSourceFacetTest, TEST_F(DocumentSourceFacetTest, ShouldBeAbleToEvaluateMultipleStagesWithinOneSubPipeline) { auto ctx = getExpCtx(); + deque<DocumentSource::GetNextResult> inputs = {Document{{"_id", 0}}, Document{{"_id", 1}}}; + auto mock = DocumentSourceMock::create(inputs); + auto firstDummy = DocumentSourcePassthrough::create(); auto secondDummy = DocumentSourcePassthrough::create(); auto pipeline = uassertStatusOK(Pipeline::create({firstDummy, secondDummy}, ctx)); - auto facetStage = DocumentSourceFacet::create({{"subPipe", pipeline}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("subPipe", std::move(pipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); - deque<DocumentSource::GetNextResult> inputs = {Document{{"_id", 0}}, Document{{"_id", 1}}}; - auto mock = DocumentSourceMock::create(inputs); facetStage->setSource(mock.get()); auto output = facetStage->getNext(); @@ -300,18 +311,41 @@ TEST_F(DocumentSourceFacetTest, ShouldBeAbleToEvaluateMultipleStagesWithinOneSub ASSERT_DOCUMENT_EQ(output.getDocument(), Document(fromjson("{subPipe: [{_id: 0}, {_id: 1}]}"))); } +TEST_F(DocumentSourceFacetTest, ShouldPropagateDisposeThroughToSource) { + auto ctx = getExpCtx(); + + auto mockSource = DocumentSourceMock::create(); + + auto firstDummy = DocumentSourcePassthrough::create(); + auto firstPipe = uassertStatusOK(Pipeline::create({firstDummy}, ctx)); + auto secondDummy = DocumentSourcePassthrough::create(); + auto secondPipe = uassertStatusOK(Pipeline::create({secondDummy}, ctx)); + + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("firstPipe", std::move(firstPipe)); + facets.emplace_back("secondPipe", std::move(secondPipe)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); + + facetStage->setSource(mockSource.get()); + + facetStage->dispose(); + ASSERT_TRUE(mockSource->isDisposed); +} + // TODO: DocumentSourceFacet will have to propagate pauses if we ever allow nested $facets. DEATH_TEST_F(DocumentSourceFacetTest, ShouldFailIfGivenPausedInput, "Invariant failure !input.isPaused()") { auto ctx = getExpCtx(); + auto mock = DocumentSourceMock::create(DocumentSource::GetNextResult::makePauseExecution()); auto firstDummy = DocumentSourcePassthrough::create(); auto pipeline = uassertStatusOK(Pipeline::create({firstDummy}, ctx)); - auto facetStage = DocumentSourceFacet::create({{"subPipe", pipeline}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("subPipe", std::move(pipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); - auto mock = DocumentSourceMock::create(DocumentSource::GetNextResult::makePauseExecution()); facetStage->setSource(mock.get()); facetStage->getNext(); // This should cause a crash. @@ -335,8 +369,10 @@ TEST_F(DocumentSourceFacetTest, ShouldBeAbleToReParseSerializedStage) { auto secondSkip = DocumentSourceSkip::create(ctx, 2); auto secondPipeline = uassertStatusOK(Pipeline::create({secondSkip}, ctx)); - auto facetStage = DocumentSourceFacet::create( - {{"skippedOne", firstPipeline}, {"skippedTwo", secondPipeline}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("skippedOne", std::move(firstPipeline)); + facets.emplace_back("skippedTwo", std::move(secondPipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); // Serialize the facet stage. vector<Value> serialization; @@ -373,7 +409,9 @@ TEST_F(DocumentSourceFacetTest, ShouldOptimizeInnerPipelines) { auto dummy = DocumentSourcePassthrough::create(); auto pipeline = unittest::assertGet(Pipeline::create({dummy}, ctx)); - auto facetStage = DocumentSourceFacet::create({{"subPipe", pipeline}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("subPipe", std::move(pipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); ASSERT_FALSE(dummy->isOptimized); facetStage->optimize(); @@ -389,8 +427,10 @@ TEST_F(DocumentSourceFacetTest, ShouldPropogateDetachingAndReattachingOfOpCtx) { auto secondDummy = DocumentSourcePassthrough::create(); auto secondPipeline = unittest::assertGet(Pipeline::create({secondDummy}, ctx)); - auto facetStage = - DocumentSourceFacet::create({{"one", firstPipeline}, {"two", secondPipeline}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("one", std::move(firstPipeline)); + facets.emplace_back("two", std::move(secondPipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); // Test detaching. ASSERT_FALSE(firstDummy->isDetachedFromOpCtx); @@ -456,8 +496,10 @@ TEST_F(DocumentSourceFacetTest, ShouldUnionDependenciesOfInnerPipelines) { ASSERT_EQ(secondPipelineDeps.fields.size(), 1UL); ASSERT_EQ(secondPipelineDeps.fields.count("b"), 1UL); - auto facetStage = - DocumentSourceFacet::create({{"needsA", firstPipeline}, {"needsB", secondPipeline}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("needsA", std::move(firstPipeline)); + facets.emplace_back("needsB", std::move(secondPipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); DepsTracker deps(DepsTracker::MetadataAvailable::kNoMetadata); ASSERT_EQ(facetStage->getDependencies(&deps), DocumentSource::GetDepsReturn::EXHAUSTIVE_ALL); @@ -491,8 +533,10 @@ TEST_F(DocumentSourceFacetTest, ShouldRequireWholeDocumentIfAnyPipelineRequiresW auto needsWholeDocument = DocumentSourceNeedsWholeDocument::create(); auto secondPipeline = unittest::assertGet(Pipeline::create({needsWholeDocument}, ctx)); - auto facetStage = DocumentSourceFacet::create( - {{"needsA", firstPipeline}, {"needsWholeDocument", secondPipeline}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("needsA", std::move(firstPipeline)); + facets.emplace_back("needsWholeDocument", std::move(secondPipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); DepsTracker deps(DepsTracker::MetadataAvailable::kNoMetadata); ASSERT_EQ(facetStage->getDependencies(&deps), DocumentSource::GetDepsReturn::EXHAUSTIVE_ALL); @@ -526,10 +570,11 @@ TEST_F(DocumentSourceFacetTest, ShouldRequireTextScoreIfAnyPipelineRequiresTextS auto needsTextScore = DocumentSourceNeedsOnlyTextScore::create(); auto thirdPipeline = unittest::assertGet(Pipeline::create({needsTextScore}, ctx)); - auto facetStage = DocumentSourceFacet::create({{"needsA", firstPipeline}, - {"needsWholeDocument", secondPipeline}, - {"needsTextScore", thirdPipeline}}, - ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("needsA", std::move(firstPipeline)); + facets.emplace_back("needsWholeDocument", std::move(secondPipeline)); + facets.emplace_back("needsTextScore", std::move(thirdPipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); DepsTracker deps(DepsTracker::MetadataAvailable::kTextScore); ASSERT_EQ(facetStage->getDependencies(&deps), DocumentSource::GetDepsReturn::EXHAUSTIVE_ALL); @@ -546,8 +591,10 @@ TEST_F(DocumentSourceFacetTest, ShouldThrowIfAnyPipelineRequiresTextScoreButItIs auto needsTextScore = DocumentSourceNeedsOnlyTextScore::create(); auto secondPipeline = unittest::assertGet(Pipeline::create({needsTextScore}, ctx)); - auto facetStage = DocumentSourceFacet::create( - {{"needsA", firstPipeline}, {"needsTextScore", secondPipeline}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("needsA", std::move(firstPipeline)); + facets.emplace_back("needsTextScore", std::move(secondPipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); DepsTracker deps(DepsTracker::MetadataAvailable::kNoMetadata); ASSERT_THROWS(facetStage->getDependencies(&deps), UserException); @@ -576,8 +623,10 @@ TEST_F(DocumentSourceFacetTest, ShouldRequirePrimaryShardIfAnyStageRequiresPrima auto needsPrimaryShard = DocumentSourceNeedsPrimaryShard::create(); auto secondPipeline = unittest::assertGet(Pipeline::create({needsPrimaryShard}, ctx)); - auto facetStage = DocumentSourceFacet::create( - {{"passthrough", firstPipeline}, {"needsPrimaryShard", secondPipeline}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("passthrough", std::move(firstPipeline)); + facets.emplace_back("needsPrimaryShard", std::move(secondPipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); ASSERT_TRUE(facetStage->needsPrimaryShard()); } @@ -591,8 +640,10 @@ TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPr auto secondPassthrough = DocumentSourcePassthrough::create(); auto secondPipeline = unittest::assertGet(Pipeline::create({secondPassthrough}, ctx)); - auto facetStage = - DocumentSourceFacet::create({{"first", firstPipeline}, {"second", secondPipeline}}, ctx); + std::vector<DocumentSourceFacet::FacetPipeline> facets; + facets.emplace_back("first", std::move(firstPipeline)); + facets.emplace_back("second", std::move(secondPipeline)); + auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); ASSERT_FALSE(facetStage->needsPrimaryShard()); } |