diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-07-28 17:17:51 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-08-28 11:24:48 -0400 |
commit | 55a85da4980f1967f88bbccbd43646ee89c6301f (patch) | |
tree | d0911d9ca87de609e2a3d4d5391ec0752a472f5f /src/mongo/dbtests/documentsourcetests.cpp | |
parent | 6e2cc35d6d4370804f09665b243d1e4d5d418ec0 (diff) | |
download | mongo-55a85da4980f1967f88bbccbd43646ee89c6301f.tar.gz |
SERVER-30410 Ensure executor is saved after tailable cursor time out.
Diffstat (limited to 'src/mongo/dbtests/documentsourcetests.cpp')
-rw-r--r-- | src/mongo/dbtests/documentsourcetests.cpp | 537 |
1 files changed, 322 insertions, 215 deletions
diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp index 4af64c6be73..f5bd327705f 100644 --- a/src/mongo/dbtests/documentsourcetests.cpp +++ b/src/mongo/dbtests/documentsourcetests.cpp @@ -34,6 +34,7 @@ #include "mongo/db/client.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/exec/collection_scan.h" #include "mongo/db/exec/multi_plan.h" #include "mongo/db/exec/plan_stage.h" #include "mongo/db/pipeline/dependencies.h" @@ -43,12 +44,15 @@ #include "mongo/db/pipeline/expression_context_for_test.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/get_executor.h" +#include "mongo/db/query/mock_yield_policies.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/query/stage_builder.h" #include "mongo/dbtests/dbtests.h" +#include "mongo/util/scopeguard.h" -namespace DocumentSourceCursorTests { +namespace mongo { +namespace { using boost::intrusive_ptr; using std::unique_ptr; @@ -65,28 +69,16 @@ BSONObj toBson(const intrusive_ptr<DocumentSource>& source) { return arr[0].getDocument().toBson(); } -class CollectionBase { +class DocumentSourceCursorTest : public unittest::Test { public: - CollectionBase() : client(&_opCtx) {} - - ~CollectionBase() { - client.dropCollection(nss.ns()); + DocumentSourceCursorTest() + : client(_opCtx.get()), + _ctx(new ExpressionContextForTest(_opCtx.get(), AggregationRequest(nss, {}))) { + _ctx->tempDir = storageGlobalParams.dbpath + "/_tmp"; } -protected: - const ServiceContext::UniqueOperationContext _opCtxPtr = cc().makeOperationContext(); - OperationContext& _opCtx = *_opCtxPtr; - DBDirectClient client; -}; - -namespace DocumentSourceCursor { - -using mongo::DocumentSourceCursor; - -class Base : public CollectionBase { -public: - Base() : _ctx(new ExpressionContextForTest(&_opCtx, AggregationRequest(nss, {}))) { - _ctx->tempDir = storageGlobalParams.dbpath + "/_tmp"; + virtual ~DocumentSourceCursorTest() { + client.dropCollection(nss.ns()); } protected: @@ -94,16 +86,16 @@ protected: // clean up first if this was called before _source.reset(); - OldClientWriteContext ctx(&_opCtx, nss.ns()); + OldClientWriteContext ctx(opCtx(), nss.ns()); auto qr = stdx::make_unique<QueryRequest>(nss); if (hint) { qr->setHint(*hint); } - auto cq = uassertStatusOK(CanonicalQuery::canonicalize(&_opCtx, std::move(qr))); + auto cq = uassertStatusOK(CanonicalQuery::canonicalize(opCtx(), std::move(qr))); auto exec = uassertStatusOK( - getExecutor(&_opCtx, ctx.getCollection(), std::move(cq), PlanExecutor::NO_YIELD)); + getExecutor(opCtx(), ctx.getCollection(), std::move(cq), PlanExecutor::NO_YIELD)); exec->saveState(); _source = DocumentSourceCursor::create(ctx.getCollection(), std::move(exec), _ctx); @@ -117,6 +109,14 @@ protected: return _source.get(); } + OperationContext* opCtx() { + return _opCtx.get(); + } + +protected: + const ServiceContext::UniqueOperationContext _opCtx = cc().makeOperationContext(); + DBDirectClient client; + private: // It is important that these are ordered to ensure correct destruction order. intrusive_ptr<ExpressionContextForTest> _ctx; @@ -124,78 +124,66 @@ private: }; /** Create a DocumentSourceCursor. */ -class Empty : public Base { -public: - void run() { - createSource(); - // The DocumentSourceCursor doesn't hold a read lock. - ASSERT(!_opCtx.lockState()->isReadLocked()); - // The collection is empty, so the source produces no results. - ASSERT(source()->getNext().isEOF()); - // Exhausting the source releases the read lock. - ASSERT(!_opCtx.lockState()->isReadLocked()); - } -}; +TEST_F(DocumentSourceCursorTest, Empty) { + createSource(); + // The DocumentSourceCursor doesn't hold a read lock. + ASSERT(!opCtx()->lockState()->isReadLocked()); + // The collection is empty, so the source produces no results. + ASSERT(source()->getNext().isEOF()); + // Exhausting the source releases the read lock. + ASSERT(!opCtx()->lockState()->isReadLocked()); +} /** Iterate a DocumentSourceCursor. */ -class Iterate : public Base { -public: - void run() { - client.insert(nss.ns(), BSON("a" << 1)); - createSource(); - // The DocumentSourceCursor doesn't hold a read lock. - ASSERT(!_opCtx.lockState()->isReadLocked()); - // The cursor will produce the expected result. - auto next = source()->getNext(); - ASSERT(next.isAdvanced()); - ASSERT_VALUE_EQ(Value(1), next.getDocument().getField("a")); - // There are no more results. - ASSERT(source()->getNext().isEOF()); - // Exhausting the source releases the read lock. - ASSERT(!_opCtx.lockState()->isReadLocked()); - } -}; +TEST_F(DocumentSourceCursorTest, Iterate) { + client.insert(nss.ns(), BSON("a" << 1)); + createSource(); + // The DocumentSourceCursor doesn't hold a read lock. + ASSERT(!opCtx()->lockState()->isReadLocked()); + // The cursor will produce the expected result. + auto next = source()->getNext(); + ASSERT(next.isAdvanced()); + ASSERT_VALUE_EQ(Value(1), next.getDocument().getField("a")); + // There are no more results. + ASSERT(source()->getNext().isEOF()); + // Exhausting the source releases the read lock. + ASSERT(!opCtx()->lockState()->isReadLocked()); +} /** Dispose of a DocumentSourceCursor. */ -class Dispose : public Base { -public: - void run() { - createSource(); - // The DocumentSourceCursor doesn't hold a read lock. - ASSERT(!_opCtx.lockState()->isReadLocked()); - source()->dispose(); - // Releasing the cursor releases the read lock. - ASSERT(!_opCtx.lockState()->isReadLocked()); - // The source is marked as exhausted. - ASSERT(source()->getNext().isEOF()); - } -}; +TEST_F(DocumentSourceCursorTest, Dispose) { + createSource(); + // The DocumentSourceCursor doesn't hold a read lock. + ASSERT(!opCtx()->lockState()->isReadLocked()); + source()->dispose(); + // Releasing the cursor releases the read lock. + ASSERT(!opCtx()->lockState()->isReadLocked()); + // The source is marked as exhausted. + ASSERT(source()->getNext().isEOF()); +} /** Iterate a DocumentSourceCursor and then dispose of it. */ -class IterateDispose : public Base { -public: - void run() { - client.insert(nss.ns(), BSON("a" << 1)); - client.insert(nss.ns(), BSON("a" << 2)); - client.insert(nss.ns(), BSON("a" << 3)); - createSource(); - // The result is as expected. - auto next = source()->getNext(); - ASSERT(next.isAdvanced()); - ASSERT_VALUE_EQ(Value(1), next.getDocument().getField("a")); - // The next result is as expected. - next = source()->getNext(); - ASSERT(next.isAdvanced()); - ASSERT_VALUE_EQ(Value(2), next.getDocument().getField("a")); - // The DocumentSourceCursor doesn't hold a read lock. - ASSERT(!_opCtx.lockState()->isReadLocked()); - source()->dispose(); - // Disposing of the source releases the lock. - ASSERT(!_opCtx.lockState()->isReadLocked()); - // The source cannot be advanced further. - ASSERT(source()->getNext().isEOF()); - } -}; +TEST_F(DocumentSourceCursorTest, IterateDispose) { + client.insert(nss.ns(), BSON("a" << 1)); + client.insert(nss.ns(), BSON("a" << 2)); + client.insert(nss.ns(), BSON("a" << 3)); + createSource(); + // The result is as expected. + auto next = source()->getNext(); + ASSERT(next.isAdvanced()); + ASSERT_VALUE_EQ(Value(1), next.getDocument().getField("a")); + // The next result is as expected. + next = source()->getNext(); + ASSERT(next.isAdvanced()); + ASSERT_VALUE_EQ(Value(2), next.getDocument().getField("a")); + // The DocumentSourceCursor doesn't hold a read lock. + ASSERT(!opCtx()->lockState()->isReadLocked()); + source()->dispose(); + // Disposing of the source releases the lock. + ASSERT(!opCtx()->lockState()->isReadLocked()); + // The source cannot be advanced further. + ASSERT(source()->getNext().isEOF()); +} /** Set a value or await an expected value. */ class PendingValue { @@ -221,148 +209,267 @@ private: /** Test coalescing a limit into a cursor */ -class LimitCoalesce : public Base { -public: - intrusive_ptr<DocumentSourceLimit> mkLimit(long long limit) { - return DocumentSourceLimit::create(ctx(), limit); - } - void run() { - client.insert(nss.ns(), BSON("a" << 1)); - client.insert(nss.ns(), BSON("a" << 2)); - client.insert(nss.ns(), BSON("a" << 3)); - createSource(); - - Pipeline::SourceContainer container; - container.push_back(source()); - container.push_back(mkLimit(10)); - source()->optimizeAt(container.begin(), &container); - - // initial limit becomes limit of cursor - ASSERT_EQUALS(container.size(), 1U); - ASSERT_EQUALS(source()->getLimit(), 10); - - container.push_back(mkLimit(2)); - source()->optimizeAt(container.begin(), &container); - // smaller limit lowers cursor limit - ASSERT_EQUALS(container.size(), 1U); - ASSERT_EQUALS(source()->getLimit(), 2); - - container.push_back(mkLimit(3)); - source()->optimizeAt(container.begin(), &container); - // higher limit doesn't effect cursor limit - ASSERT_EQUALS(container.size(), 1U); - ASSERT_EQUALS(source()->getLimit(), 2); - - // The cursor allows exactly 2 documents through - ASSERT(source()->getNext().isAdvanced()); - ASSERT(source()->getNext().isAdvanced()); - ASSERT(source()->getNext().isEOF()); - } -}; +TEST_F(DocumentSourceCursorTest, LimitCoalesce) { + client.insert(nss.ns(), BSON("a" << 1)); + client.insert(nss.ns(), BSON("a" << 2)); + client.insert(nss.ns(), BSON("a" << 3)); + createSource(); + + Pipeline::SourceContainer container; + container.push_back(source()); + container.push_back(DocumentSourceLimit::create(ctx(), 10)); + source()->optimizeAt(container.begin(), &container); + + // initial limit becomes limit of cursor + ASSERT_EQUALS(container.size(), 1U); + ASSERT_EQUALS(source()->getLimit(), 10); + + container.push_back(DocumentSourceLimit::create(ctx(), 2)); + source()->optimizeAt(container.begin(), &container); + // smaller limit lowers cursor limit + ASSERT_EQUALS(container.size(), 1U); + ASSERT_EQUALS(source()->getLimit(), 2); + + container.push_back(DocumentSourceLimit::create(ctx(), 3)); + source()->optimizeAt(container.begin(), &container); + // higher limit doesn't effect cursor limit + ASSERT_EQUALS(container.size(), 1U); + ASSERT_EQUALS(source()->getLimit(), 2); + + // The cursor allows exactly 2 documents through + ASSERT(source()->getNext().isAdvanced()); + ASSERT(source()->getNext().isAdvanced()); + ASSERT(source()->getNext().isEOF()); +} // // Test cursor output sort. // -class CollectionScanProvidesNoSort : public Base { -public: - void run() { - createSource(BSON("$natural" << 1)); - ASSERT_EQ(source()->getOutputSorts().size(), 0U); - source()->dispose(); - } -}; - -class IndexScanProvidesSortOnKeys : public Base { -public: - void run() { - client.createIndex(nss.ns(), BSON("a" << 1)); - createSource(BSON("a" << 1)); +TEST_F(DocumentSourceCursorTest, CollectionScanProvidesNoSort) { + createSource(BSON("$natural" << 1)); + ASSERT_EQ(source()->getOutputSorts().size(), 0U); + source()->dispose(); +} - ASSERT_EQ(source()->getOutputSorts().size(), 1U); - ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << 1)), 1U); - source()->dispose(); - } -}; +TEST_F(DocumentSourceCursorTest, IndexScanProvidesSortOnKeys) { + client.createIndex(nss.ns(), BSON("a" << 1)); + createSource(BSON("a" << 1)); -class ReverseIndexScanProvidesSort : public Base { -public: - void run() { - client.createIndex(nss.ns(), BSON("a" << -1)); - createSource(BSON("a" << -1)); + ASSERT_EQ(source()->getOutputSorts().size(), 1U); + ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << 1)), 1U); + source()->dispose(); +} - ASSERT_EQ(source()->getOutputSorts().size(), 1U); - ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << -1)), 1U); - source()->dispose(); - } -}; +TEST_F(DocumentSourceCursorTest, ReverseIndexScanProvidesSort) { + client.createIndex(nss.ns(), BSON("a" << -1)); + createSource(BSON("a" << -1)); -class CompoundIndexScanProvidesMultipleSorts : public Base { -public: - void run() { - client.createIndex(nss.ns(), BSON("a" << 1 << "b" << -1)); - createSource(BSON("a" << 1 << "b" << -1)); - - ASSERT_EQ(source()->getOutputSorts().size(), 2U); - ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << 1)), 1U); - ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << 1 << "b" << -1)), 1U); - source()->dispose(); - } -}; + ASSERT_EQ(source()->getOutputSorts().size(), 1U); + ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << -1)), 1U); + source()->dispose(); +} -class SerializationRespectsExplainModes : public Base { -public: - void run() { - createSource(); +TEST_F(DocumentSourceCursorTest, CompoundIndexScanProvidesMultipleSorts) { + client.createIndex(nss.ns(), BSON("a" << 1 << "b" << -1)); + createSource(BSON("a" << 1 << "b" << -1)); - { - // Nothing serialized when no explain mode specified. - auto explainResult = source()->serialize(); - ASSERT_TRUE(explainResult.missing()); - } + ASSERT_EQ(source()->getOutputSorts().size(), 2U); + ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << 1)), 1U); + ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << 1 << "b" << -1)), 1U); + source()->dispose(); +} - { - auto explainResult = source()->serialize(ExplainOptions::Verbosity::kQueryPlanner); - ASSERT_FALSE(explainResult["$cursor"]["queryPlanner"].missing()); - ASSERT_TRUE(explainResult["$cursor"]["executionStats"].missing()); - } +TEST_F(DocumentSourceCursorTest, SerializationRespectsExplainModes) { + createSource(); - { - auto explainResult = source()->serialize(ExplainOptions::Verbosity::kExecStats); - ASSERT_FALSE(explainResult["$cursor"]["queryPlanner"].missing()); - ASSERT_FALSE(explainResult["$cursor"]["executionStats"].missing()); - ASSERT_TRUE(explainResult["$cursor"]["executionStats"]["allPlansExecution"].missing()); - } + { + // Nothing serialized when no explain mode specified. + auto explainResult = source()->serialize(); + ASSERT_TRUE(explainResult.missing()); + } - { - auto explainResult = - source()->serialize(ExplainOptions::Verbosity::kExecAllPlans).getDocument(); - ASSERT_FALSE(explainResult["$cursor"]["queryPlanner"].missing()); - ASSERT_FALSE(explainResult["$cursor"]["executionStats"].missing()); - ASSERT_FALSE(explainResult["$cursor"]["executionStats"]["allPlansExecution"].missing()); - } - source()->dispose(); + { + auto explainResult = source()->serialize(ExplainOptions::Verbosity::kQueryPlanner); + ASSERT_FALSE(explainResult["$cursor"]["queryPlanner"].missing()); + ASSERT_TRUE(explainResult["$cursor"]["executionStats"].missing()); } -}; -} // namespace DocumentSourceCursor + { + auto explainResult = source()->serialize(ExplainOptions::Verbosity::kExecStats); + ASSERT_FALSE(explainResult["$cursor"]["queryPlanner"].missing()); + ASSERT_FALSE(explainResult["$cursor"]["executionStats"].missing()); + ASSERT_TRUE(explainResult["$cursor"]["executionStats"]["allPlansExecution"].missing()); + } -class All : public Suite { -public: - All() : Suite("documentsource") {} - void setupTests() { - add<DocumentSourceCursor::Empty>(); - add<DocumentSourceCursor::Iterate>(); - add<DocumentSourceCursor::Dispose>(); - add<DocumentSourceCursor::IterateDispose>(); - add<DocumentSourceCursor::LimitCoalesce>(); - add<DocumentSourceCursor::CollectionScanProvidesNoSort>(); - add<DocumentSourceCursor::IndexScanProvidesSortOnKeys>(); - add<DocumentSourceCursor::ReverseIndexScanProvidesSort>(); - add<DocumentSourceCursor::CompoundIndexScanProvidesMultipleSorts>(); - add<DocumentSourceCursor::SerializationRespectsExplainModes>(); + { + auto explainResult = + source()->serialize(ExplainOptions::Verbosity::kExecAllPlans).getDocument(); + ASSERT_FALSE(explainResult["$cursor"]["queryPlanner"].missing()); + ASSERT_FALSE(explainResult["$cursor"]["executionStats"].missing()); + ASSERT_FALSE(explainResult["$cursor"]["executionStats"]["allPlansExecution"].missing()); } -}; + source()->dispose(); +} -SuiteInstance<All> myall; +TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorStillUsableAfterTimeout) { + // Make sure the collection exists, otherwise we'll default to a NO_YIELD yield policy. + const bool capped = true; + const bool cappedSize = 1024; + ASSERT_TRUE(client.createCollection(nss.ns(), cappedSize, capped)); + client.insert(nss.ns(), BSON("a" << 1)); + + // Make a tailable collection scan wrapped up in a PlanExecutor. + AutoGetCollectionForRead readLock(opCtx(), nss); + auto workingSet = stdx::make_unique<WorkingSet>(); + CollectionScanParams collScanParams; + collScanParams.collection = readLock.getCollection(); + collScanParams.tailable = true; + auto filter = BSON("a" << 1); + auto matchExpression = + uassertStatusOK(MatchExpressionParser::parse(filter, ctx()->getCollator())); + auto collectionScan = stdx::make_unique<CollectionScan>( + opCtx(), collScanParams, workingSet.get(), matchExpression.get()); + auto queryRequest = stdx::make_unique<QueryRequest>(nss); + queryRequest->setFilter(filter); + queryRequest->setTailable(true); + queryRequest->setAwaitData(true); + auto canonicalQuery = unittest::assertGet( + CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr)); + auto planExecutor = + uassertStatusOK(PlanExecutor::make(opCtx(), + std::move(workingSet), + std::move(collectionScan), + std::move(canonicalQuery), + readLock.getCollection(), + PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT)); + + // Make a DocumentSourceCursor. + ctx()->tailableMode = ExpressionContext::TailableMode::kTailableAndAwaitData; + // DocumentSourceCursor expects a PlanExecutor that has had its state saved. + planExecutor->saveState(); + auto cursor = + DocumentSourceCursor::create(readLock.getCollection(), std::move(planExecutor), ctx()); + + ASSERT(cursor->getNext().isEOF()); + cursor->dispose(); +} + +TEST_F(DocumentSourceCursorTest, NonAwaitDataCursorShouldErrorAfterTimeout) { + // Make sure the collection exists, otherwise we'll default to a NO_YIELD yield policy. + ASSERT_TRUE(client.createCollection(nss.ns())); + client.insert(nss.ns(), BSON("a" << 1)); + + // Make a tailable collection scan wrapped up in a PlanExecutor. + AutoGetCollectionForRead readLock(opCtx(), nss); + auto workingSet = stdx::make_unique<WorkingSet>(); + CollectionScanParams collScanParams; + collScanParams.collection = readLock.getCollection(); + auto filter = BSON("a" << 1); + auto matchExpression = + uassertStatusOK(MatchExpressionParser::parse(filter, ctx()->getCollator())); + auto collectionScan = stdx::make_unique<CollectionScan>( + opCtx(), collScanParams, workingSet.get(), matchExpression.get()); + auto queryRequest = stdx::make_unique<QueryRequest>(nss); + queryRequest->setFilter(filter); + auto canonicalQuery = unittest::assertGet( + CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr)); + auto planExecutor = + uassertStatusOK(PlanExecutor::make(opCtx(), + std::move(workingSet), + std::move(collectionScan), + std::move(canonicalQuery), + readLock.getCollection(), + PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT)); + + // Make a DocumentSourceCursor. + ctx()->tailableMode = ExpressionContext::TailableMode::kNormal; + // DocumentSourceCursor expects a PlanExecutor that has had its state saved. + planExecutor->saveState(); + auto cursor = + DocumentSourceCursor::create(readLock.getCollection(), std::move(planExecutor), ctx()); + + ON_BLOCK_EXIT([cursor]() { cursor->dispose(); }); + ASSERT_THROWS_CODE(cursor->getNext().isEOF(), AssertionException, ErrorCodes::QueryPlanKilled); +} + +TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterBeingKilled) { + // Make sure the collection exists, otherwise we'll default to a NO_YIELD yield policy. + const bool capped = true; + const bool cappedSize = 1024; + ASSERT_TRUE(client.createCollection(nss.ns(), cappedSize, capped)); + client.insert(nss.ns(), BSON("a" << 1)); + + // Make a tailable collection scan wrapped up in a PlanExecutor. + AutoGetCollectionForRead readLock(opCtx(), nss); + auto workingSet = stdx::make_unique<WorkingSet>(); + CollectionScanParams collScanParams; + collScanParams.collection = readLock.getCollection(); + collScanParams.tailable = true; + auto filter = BSON("a" << 1); + auto matchExpression = + uassertStatusOK(MatchExpressionParser::parse(filter, ctx()->getCollator())); + auto collectionScan = stdx::make_unique<CollectionScan>( + opCtx(), collScanParams, workingSet.get(), matchExpression.get()); + auto queryRequest = stdx::make_unique<QueryRequest>(nss); + queryRequest->setFilter(filter); + auto canonicalQuery = unittest::assertGet( + CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr)); + auto planExecutor = + uassertStatusOK(PlanExecutor::make(opCtx(), + std::move(workingSet), + std::move(collectionScan), + std::move(canonicalQuery), + readLock.getCollection(), + PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED)); + + // Make a DocumentSourceCursor. + ctx()->tailableMode = ExpressionContext::TailableMode::kTailableAndAwaitData; + // DocumentSourceCursor expects a PlanExecutor that has had its state saved. + planExecutor->saveState(); + auto cursor = + DocumentSourceCursor::create(readLock.getCollection(), std::move(planExecutor), ctx()); + + ON_BLOCK_EXIT([cursor]() { cursor->dispose(); }); + ASSERT_THROWS_CODE(cursor->getNext().isEOF(), AssertionException, ErrorCodes::QueryPlanKilled); +} + +TEST_F(DocumentSourceCursorTest, NormalCursorShouldErrorAfterBeingKilled) { + // Make sure the collection exists, otherwise we'll default to a NO_YIELD yield policy. + ASSERT_TRUE(client.createCollection(nss.ns())); + client.insert(nss.ns(), BSON("a" << 1)); + + // Make a tailable collection scan wrapped up in a PlanExecutor. + AutoGetCollectionForRead readLock(opCtx(), nss); + auto workingSet = stdx::make_unique<WorkingSet>(); + CollectionScanParams collScanParams; + collScanParams.collection = readLock.getCollection(); + auto filter = BSON("a" << 1); + auto matchExpression = + uassertStatusOK(MatchExpressionParser::parse(filter, ctx()->getCollator())); + auto collectionScan = stdx::make_unique<CollectionScan>( + opCtx(), collScanParams, workingSet.get(), matchExpression.get()); + auto queryRequest = stdx::make_unique<QueryRequest>(nss); + queryRequest->setFilter(filter); + auto canonicalQuery = unittest::assertGet( + CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr)); + auto planExecutor = + uassertStatusOK(PlanExecutor::make(opCtx(), + std::move(workingSet), + std::move(collectionScan), + std::move(canonicalQuery), + readLock.getCollection(), + PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED)); + + // Make a DocumentSourceCursor. + ctx()->tailableMode = ExpressionContext::TailableMode::kNormal; + // DocumentSourceCursor expects a PlanExecutor that has had its state saved. + planExecutor->saveState(); + auto cursor = + DocumentSourceCursor::create(readLock.getCollection(), std::move(planExecutor), ctx()); + + ON_BLOCK_EXIT([cursor]() { cursor->dispose(); }); + ASSERT_THROWS_CODE(cursor->getNext().isEOF(), AssertionException, ErrorCodes::QueryPlanKilled); +} -} // namespace DocumentSourceCursorTests +} // namespace +} // namespace mongo |