summaryrefslogtreecommitdiff
path: root/src/mongo/dbtests/documentsourcetests.cpp
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2017-07-28 17:17:51 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2017-08-28 11:24:48 -0400
commit55a85da4980f1967f88bbccbd43646ee89c6301f (patch)
treed0911d9ca87de609e2a3d4d5391ec0752a472f5f /src/mongo/dbtests/documentsourcetests.cpp
parent6e2cc35d6d4370804f09665b243d1e4d5d418ec0 (diff)
downloadmongo-55a85da4980f1967f88bbccbd43646ee89c6301f.tar.gz
SERVER-30410 Ensure executor is saved after tailable cursor time out.
Diffstat (limited to 'src/mongo/dbtests/documentsourcetests.cpp')
-rw-r--r--src/mongo/dbtests/documentsourcetests.cpp537
1 files changed, 322 insertions, 215 deletions
diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp
index 4af64c6be73..f5bd327705f 100644
--- a/src/mongo/dbtests/documentsourcetests.cpp
+++ b/src/mongo/dbtests/documentsourcetests.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/client.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/exec/collection_scan.h"
#include "mongo/db/exec/multi_plan.h"
#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/pipeline/dependencies.h"
@@ -43,12 +44,15 @@
#include "mongo/db/pipeline/expression_context_for_test.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/get_executor.h"
+#include "mongo/db/query/mock_yield_policies.h"
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/query/query_planner.h"
#include "mongo/db/query/stage_builder.h"
#include "mongo/dbtests/dbtests.h"
+#include "mongo/util/scopeguard.h"
-namespace DocumentSourceCursorTests {
+namespace mongo {
+namespace {
using boost::intrusive_ptr;
using std::unique_ptr;
@@ -65,28 +69,16 @@ BSONObj toBson(const intrusive_ptr<DocumentSource>& source) {
return arr[0].getDocument().toBson();
}
-class CollectionBase {
+class DocumentSourceCursorTest : public unittest::Test {
public:
- CollectionBase() : client(&_opCtx) {}
-
- ~CollectionBase() {
- client.dropCollection(nss.ns());
+ DocumentSourceCursorTest()
+ : client(_opCtx.get()),
+ _ctx(new ExpressionContextForTest(_opCtx.get(), AggregationRequest(nss, {}))) {
+ _ctx->tempDir = storageGlobalParams.dbpath + "/_tmp";
}
-protected:
- const ServiceContext::UniqueOperationContext _opCtxPtr = cc().makeOperationContext();
- OperationContext& _opCtx = *_opCtxPtr;
- DBDirectClient client;
-};
-
-namespace DocumentSourceCursor {
-
-using mongo::DocumentSourceCursor;
-
-class Base : public CollectionBase {
-public:
- Base() : _ctx(new ExpressionContextForTest(&_opCtx, AggregationRequest(nss, {}))) {
- _ctx->tempDir = storageGlobalParams.dbpath + "/_tmp";
+ virtual ~DocumentSourceCursorTest() {
+ client.dropCollection(nss.ns());
}
protected:
@@ -94,16 +86,16 @@ protected:
// clean up first if this was called before
_source.reset();
- OldClientWriteContext ctx(&_opCtx, nss.ns());
+ OldClientWriteContext ctx(opCtx(), nss.ns());
auto qr = stdx::make_unique<QueryRequest>(nss);
if (hint) {
qr->setHint(*hint);
}
- auto cq = uassertStatusOK(CanonicalQuery::canonicalize(&_opCtx, std::move(qr)));
+ auto cq = uassertStatusOK(CanonicalQuery::canonicalize(opCtx(), std::move(qr)));
auto exec = uassertStatusOK(
- getExecutor(&_opCtx, ctx.getCollection(), std::move(cq), PlanExecutor::NO_YIELD));
+ getExecutor(opCtx(), ctx.getCollection(), std::move(cq), PlanExecutor::NO_YIELD));
exec->saveState();
_source = DocumentSourceCursor::create(ctx.getCollection(), std::move(exec), _ctx);
@@ -117,6 +109,14 @@ protected:
return _source.get();
}
+ OperationContext* opCtx() {
+ return _opCtx.get();
+ }
+
+protected:
+ const ServiceContext::UniqueOperationContext _opCtx = cc().makeOperationContext();
+ DBDirectClient client;
+
private:
// It is important that these are ordered to ensure correct destruction order.
intrusive_ptr<ExpressionContextForTest> _ctx;
@@ -124,78 +124,66 @@ private:
};
/** Create a DocumentSourceCursor. */
-class Empty : public Base {
-public:
- void run() {
- createSource();
- // The DocumentSourceCursor doesn't hold a read lock.
- ASSERT(!_opCtx.lockState()->isReadLocked());
- // The collection is empty, so the source produces no results.
- ASSERT(source()->getNext().isEOF());
- // Exhausting the source releases the read lock.
- ASSERT(!_opCtx.lockState()->isReadLocked());
- }
-};
+TEST_F(DocumentSourceCursorTest, Empty) {
+ createSource();
+ // The DocumentSourceCursor doesn't hold a read lock.
+ ASSERT(!opCtx()->lockState()->isReadLocked());
+ // The collection is empty, so the source produces no results.
+ ASSERT(source()->getNext().isEOF());
+ // Exhausting the source releases the read lock.
+ ASSERT(!opCtx()->lockState()->isReadLocked());
+}
/** Iterate a DocumentSourceCursor. */
-class Iterate : public Base {
-public:
- void run() {
- client.insert(nss.ns(), BSON("a" << 1));
- createSource();
- // The DocumentSourceCursor doesn't hold a read lock.
- ASSERT(!_opCtx.lockState()->isReadLocked());
- // The cursor will produce the expected result.
- auto next = source()->getNext();
- ASSERT(next.isAdvanced());
- ASSERT_VALUE_EQ(Value(1), next.getDocument().getField("a"));
- // There are no more results.
- ASSERT(source()->getNext().isEOF());
- // Exhausting the source releases the read lock.
- ASSERT(!_opCtx.lockState()->isReadLocked());
- }
-};
+TEST_F(DocumentSourceCursorTest, Iterate) {
+ client.insert(nss.ns(), BSON("a" << 1));
+ createSource();
+ // The DocumentSourceCursor doesn't hold a read lock.
+ ASSERT(!opCtx()->lockState()->isReadLocked());
+ // The cursor will produce the expected result.
+ auto next = source()->getNext();
+ ASSERT(next.isAdvanced());
+ ASSERT_VALUE_EQ(Value(1), next.getDocument().getField("a"));
+ // There are no more results.
+ ASSERT(source()->getNext().isEOF());
+ // Exhausting the source releases the read lock.
+ ASSERT(!opCtx()->lockState()->isReadLocked());
+}
/** Dispose of a DocumentSourceCursor. */
-class Dispose : public Base {
-public:
- void run() {
- createSource();
- // The DocumentSourceCursor doesn't hold a read lock.
- ASSERT(!_opCtx.lockState()->isReadLocked());
- source()->dispose();
- // Releasing the cursor releases the read lock.
- ASSERT(!_opCtx.lockState()->isReadLocked());
- // The source is marked as exhausted.
- ASSERT(source()->getNext().isEOF());
- }
-};
+TEST_F(DocumentSourceCursorTest, Dispose) {
+ createSource();
+ // The DocumentSourceCursor doesn't hold a read lock.
+ ASSERT(!opCtx()->lockState()->isReadLocked());
+ source()->dispose();
+ // Releasing the cursor releases the read lock.
+ ASSERT(!opCtx()->lockState()->isReadLocked());
+ // The source is marked as exhausted.
+ ASSERT(source()->getNext().isEOF());
+}
/** Iterate a DocumentSourceCursor and then dispose of it. */
-class IterateDispose : public Base {
-public:
- void run() {
- client.insert(nss.ns(), BSON("a" << 1));
- client.insert(nss.ns(), BSON("a" << 2));
- client.insert(nss.ns(), BSON("a" << 3));
- createSource();
- // The result is as expected.
- auto next = source()->getNext();
- ASSERT(next.isAdvanced());
- ASSERT_VALUE_EQ(Value(1), next.getDocument().getField("a"));
- // The next result is as expected.
- next = source()->getNext();
- ASSERT(next.isAdvanced());
- ASSERT_VALUE_EQ(Value(2), next.getDocument().getField("a"));
- // The DocumentSourceCursor doesn't hold a read lock.
- ASSERT(!_opCtx.lockState()->isReadLocked());
- source()->dispose();
- // Disposing of the source releases the lock.
- ASSERT(!_opCtx.lockState()->isReadLocked());
- // The source cannot be advanced further.
- ASSERT(source()->getNext().isEOF());
- }
-};
+TEST_F(DocumentSourceCursorTest, IterateDispose) {
+ client.insert(nss.ns(), BSON("a" << 1));
+ client.insert(nss.ns(), BSON("a" << 2));
+ client.insert(nss.ns(), BSON("a" << 3));
+ createSource();
+ // The result is as expected.
+ auto next = source()->getNext();
+ ASSERT(next.isAdvanced());
+ ASSERT_VALUE_EQ(Value(1), next.getDocument().getField("a"));
+ // The next result is as expected.
+ next = source()->getNext();
+ ASSERT(next.isAdvanced());
+ ASSERT_VALUE_EQ(Value(2), next.getDocument().getField("a"));
+ // The DocumentSourceCursor doesn't hold a read lock.
+ ASSERT(!opCtx()->lockState()->isReadLocked());
+ source()->dispose();
+ // Disposing of the source releases the lock.
+ ASSERT(!opCtx()->lockState()->isReadLocked());
+ // The source cannot be advanced further.
+ ASSERT(source()->getNext().isEOF());
+}
/** Set a value or await an expected value. */
class PendingValue {
@@ -221,148 +209,267 @@ private:
/** Test coalescing a limit into a cursor */
-class LimitCoalesce : public Base {
-public:
- intrusive_ptr<DocumentSourceLimit> mkLimit(long long limit) {
- return DocumentSourceLimit::create(ctx(), limit);
- }
- void run() {
- client.insert(nss.ns(), BSON("a" << 1));
- client.insert(nss.ns(), BSON("a" << 2));
- client.insert(nss.ns(), BSON("a" << 3));
- createSource();
-
- Pipeline::SourceContainer container;
- container.push_back(source());
- container.push_back(mkLimit(10));
- source()->optimizeAt(container.begin(), &container);
-
- // initial limit becomes limit of cursor
- ASSERT_EQUALS(container.size(), 1U);
- ASSERT_EQUALS(source()->getLimit(), 10);
-
- container.push_back(mkLimit(2));
- source()->optimizeAt(container.begin(), &container);
- // smaller limit lowers cursor limit
- ASSERT_EQUALS(container.size(), 1U);
- ASSERT_EQUALS(source()->getLimit(), 2);
-
- container.push_back(mkLimit(3));
- source()->optimizeAt(container.begin(), &container);
- // higher limit doesn't effect cursor limit
- ASSERT_EQUALS(container.size(), 1U);
- ASSERT_EQUALS(source()->getLimit(), 2);
-
- // The cursor allows exactly 2 documents through
- ASSERT(source()->getNext().isAdvanced());
- ASSERT(source()->getNext().isAdvanced());
- ASSERT(source()->getNext().isEOF());
- }
-};
+TEST_F(DocumentSourceCursorTest, LimitCoalesce) {
+ client.insert(nss.ns(), BSON("a" << 1));
+ client.insert(nss.ns(), BSON("a" << 2));
+ client.insert(nss.ns(), BSON("a" << 3));
+ createSource();
+
+ Pipeline::SourceContainer container;
+ container.push_back(source());
+ container.push_back(DocumentSourceLimit::create(ctx(), 10));
+ source()->optimizeAt(container.begin(), &container);
+
+ // initial limit becomes limit of cursor
+ ASSERT_EQUALS(container.size(), 1U);
+ ASSERT_EQUALS(source()->getLimit(), 10);
+
+ container.push_back(DocumentSourceLimit::create(ctx(), 2));
+ source()->optimizeAt(container.begin(), &container);
+ // smaller limit lowers cursor limit
+ ASSERT_EQUALS(container.size(), 1U);
+ ASSERT_EQUALS(source()->getLimit(), 2);
+
+ container.push_back(DocumentSourceLimit::create(ctx(), 3));
+ source()->optimizeAt(container.begin(), &container);
+ // higher limit doesn't effect cursor limit
+ ASSERT_EQUALS(container.size(), 1U);
+ ASSERT_EQUALS(source()->getLimit(), 2);
+
+ // The cursor allows exactly 2 documents through
+ ASSERT(source()->getNext().isAdvanced());
+ ASSERT(source()->getNext().isAdvanced());
+ ASSERT(source()->getNext().isEOF());
+}
//
// Test cursor output sort.
//
-class CollectionScanProvidesNoSort : public Base {
-public:
- void run() {
- createSource(BSON("$natural" << 1));
- ASSERT_EQ(source()->getOutputSorts().size(), 0U);
- source()->dispose();
- }
-};
-
-class IndexScanProvidesSortOnKeys : public Base {
-public:
- void run() {
- client.createIndex(nss.ns(), BSON("a" << 1));
- createSource(BSON("a" << 1));
+TEST_F(DocumentSourceCursorTest, CollectionScanProvidesNoSort) {
+ createSource(BSON("$natural" << 1));
+ ASSERT_EQ(source()->getOutputSorts().size(), 0U);
+ source()->dispose();
+}
- ASSERT_EQ(source()->getOutputSorts().size(), 1U);
- ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << 1)), 1U);
- source()->dispose();
- }
-};
+TEST_F(DocumentSourceCursorTest, IndexScanProvidesSortOnKeys) {
+ client.createIndex(nss.ns(), BSON("a" << 1));
+ createSource(BSON("a" << 1));
-class ReverseIndexScanProvidesSort : public Base {
-public:
- void run() {
- client.createIndex(nss.ns(), BSON("a" << -1));
- createSource(BSON("a" << -1));
+ ASSERT_EQ(source()->getOutputSorts().size(), 1U);
+ ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << 1)), 1U);
+ source()->dispose();
+}
- ASSERT_EQ(source()->getOutputSorts().size(), 1U);
- ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << -1)), 1U);
- source()->dispose();
- }
-};
+TEST_F(DocumentSourceCursorTest, ReverseIndexScanProvidesSort) {
+ client.createIndex(nss.ns(), BSON("a" << -1));
+ createSource(BSON("a" << -1));
-class CompoundIndexScanProvidesMultipleSorts : public Base {
-public:
- void run() {
- client.createIndex(nss.ns(), BSON("a" << 1 << "b" << -1));
- createSource(BSON("a" << 1 << "b" << -1));
-
- ASSERT_EQ(source()->getOutputSorts().size(), 2U);
- ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << 1)), 1U);
- ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << 1 << "b" << -1)), 1U);
- source()->dispose();
- }
-};
+ ASSERT_EQ(source()->getOutputSorts().size(), 1U);
+ ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << -1)), 1U);
+ source()->dispose();
+}
-class SerializationRespectsExplainModes : public Base {
-public:
- void run() {
- createSource();
+TEST_F(DocumentSourceCursorTest, CompoundIndexScanProvidesMultipleSorts) {
+ client.createIndex(nss.ns(), BSON("a" << 1 << "b" << -1));
+ createSource(BSON("a" << 1 << "b" << -1));
- {
- // Nothing serialized when no explain mode specified.
- auto explainResult = source()->serialize();
- ASSERT_TRUE(explainResult.missing());
- }
+ ASSERT_EQ(source()->getOutputSorts().size(), 2U);
+ ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << 1)), 1U);
+ ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << 1 << "b" << -1)), 1U);
+ source()->dispose();
+}
- {
- auto explainResult = source()->serialize(ExplainOptions::Verbosity::kQueryPlanner);
- ASSERT_FALSE(explainResult["$cursor"]["queryPlanner"].missing());
- ASSERT_TRUE(explainResult["$cursor"]["executionStats"].missing());
- }
+TEST_F(DocumentSourceCursorTest, SerializationRespectsExplainModes) {
+ createSource();
- {
- auto explainResult = source()->serialize(ExplainOptions::Verbosity::kExecStats);
- ASSERT_FALSE(explainResult["$cursor"]["queryPlanner"].missing());
- ASSERT_FALSE(explainResult["$cursor"]["executionStats"].missing());
- ASSERT_TRUE(explainResult["$cursor"]["executionStats"]["allPlansExecution"].missing());
- }
+ {
+ // Nothing serialized when no explain mode specified.
+ auto explainResult = source()->serialize();
+ ASSERT_TRUE(explainResult.missing());
+ }
- {
- auto explainResult =
- source()->serialize(ExplainOptions::Verbosity::kExecAllPlans).getDocument();
- ASSERT_FALSE(explainResult["$cursor"]["queryPlanner"].missing());
- ASSERT_FALSE(explainResult["$cursor"]["executionStats"].missing());
- ASSERT_FALSE(explainResult["$cursor"]["executionStats"]["allPlansExecution"].missing());
- }
- source()->dispose();
+ {
+ auto explainResult = source()->serialize(ExplainOptions::Verbosity::kQueryPlanner);
+ ASSERT_FALSE(explainResult["$cursor"]["queryPlanner"].missing());
+ ASSERT_TRUE(explainResult["$cursor"]["executionStats"].missing());
}
-};
-} // namespace DocumentSourceCursor
+ {
+ auto explainResult = source()->serialize(ExplainOptions::Verbosity::kExecStats);
+ ASSERT_FALSE(explainResult["$cursor"]["queryPlanner"].missing());
+ ASSERT_FALSE(explainResult["$cursor"]["executionStats"].missing());
+ ASSERT_TRUE(explainResult["$cursor"]["executionStats"]["allPlansExecution"].missing());
+ }
-class All : public Suite {
-public:
- All() : Suite("documentsource") {}
- void setupTests() {
- add<DocumentSourceCursor::Empty>();
- add<DocumentSourceCursor::Iterate>();
- add<DocumentSourceCursor::Dispose>();
- add<DocumentSourceCursor::IterateDispose>();
- add<DocumentSourceCursor::LimitCoalesce>();
- add<DocumentSourceCursor::CollectionScanProvidesNoSort>();
- add<DocumentSourceCursor::IndexScanProvidesSortOnKeys>();
- add<DocumentSourceCursor::ReverseIndexScanProvidesSort>();
- add<DocumentSourceCursor::CompoundIndexScanProvidesMultipleSorts>();
- add<DocumentSourceCursor::SerializationRespectsExplainModes>();
+ {
+ auto explainResult =
+ source()->serialize(ExplainOptions::Verbosity::kExecAllPlans).getDocument();
+ ASSERT_FALSE(explainResult["$cursor"]["queryPlanner"].missing());
+ ASSERT_FALSE(explainResult["$cursor"]["executionStats"].missing());
+ ASSERT_FALSE(explainResult["$cursor"]["executionStats"]["allPlansExecution"].missing());
}
-};
+ source()->dispose();
+}
-SuiteInstance<All> myall;
+TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorStillUsableAfterTimeout) {
+ // Make sure the collection exists, otherwise we'll default to a NO_YIELD yield policy.
+ const bool capped = true;
+ const bool cappedSize = 1024;
+ ASSERT_TRUE(client.createCollection(nss.ns(), cappedSize, capped));
+ client.insert(nss.ns(), BSON("a" << 1));
+
+ // Make a tailable collection scan wrapped up in a PlanExecutor.
+ AutoGetCollectionForRead readLock(opCtx(), nss);
+ auto workingSet = stdx::make_unique<WorkingSet>();
+ CollectionScanParams collScanParams;
+ collScanParams.collection = readLock.getCollection();
+ collScanParams.tailable = true;
+ auto filter = BSON("a" << 1);
+ auto matchExpression =
+ uassertStatusOK(MatchExpressionParser::parse(filter, ctx()->getCollator()));
+ auto collectionScan = stdx::make_unique<CollectionScan>(
+ opCtx(), collScanParams, workingSet.get(), matchExpression.get());
+ auto queryRequest = stdx::make_unique<QueryRequest>(nss);
+ queryRequest->setFilter(filter);
+ queryRequest->setTailable(true);
+ queryRequest->setAwaitData(true);
+ auto canonicalQuery = unittest::assertGet(
+ CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr));
+ auto planExecutor =
+ uassertStatusOK(PlanExecutor::make(opCtx(),
+ std::move(workingSet),
+ std::move(collectionScan),
+ std::move(canonicalQuery),
+ readLock.getCollection(),
+ PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT));
+
+ // Make a DocumentSourceCursor.
+ ctx()->tailableMode = ExpressionContext::TailableMode::kTailableAndAwaitData;
+ // DocumentSourceCursor expects a PlanExecutor that has had its state saved.
+ planExecutor->saveState();
+ auto cursor =
+ DocumentSourceCursor::create(readLock.getCollection(), std::move(planExecutor), ctx());
+
+ ASSERT(cursor->getNext().isEOF());
+ cursor->dispose();
+}
+
+TEST_F(DocumentSourceCursorTest, NonAwaitDataCursorShouldErrorAfterTimeout) {
+ // Make sure the collection exists, otherwise we'll default to a NO_YIELD yield policy.
+ ASSERT_TRUE(client.createCollection(nss.ns()));
+ client.insert(nss.ns(), BSON("a" << 1));
+
+ // Make a tailable collection scan wrapped up in a PlanExecutor.
+ AutoGetCollectionForRead readLock(opCtx(), nss);
+ auto workingSet = stdx::make_unique<WorkingSet>();
+ CollectionScanParams collScanParams;
+ collScanParams.collection = readLock.getCollection();
+ auto filter = BSON("a" << 1);
+ auto matchExpression =
+ uassertStatusOK(MatchExpressionParser::parse(filter, ctx()->getCollator()));
+ auto collectionScan = stdx::make_unique<CollectionScan>(
+ opCtx(), collScanParams, workingSet.get(), matchExpression.get());
+ auto queryRequest = stdx::make_unique<QueryRequest>(nss);
+ queryRequest->setFilter(filter);
+ auto canonicalQuery = unittest::assertGet(
+ CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr));
+ auto planExecutor =
+ uassertStatusOK(PlanExecutor::make(opCtx(),
+ std::move(workingSet),
+ std::move(collectionScan),
+ std::move(canonicalQuery),
+ readLock.getCollection(),
+ PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT));
+
+ // Make a DocumentSourceCursor.
+ ctx()->tailableMode = ExpressionContext::TailableMode::kNormal;
+ // DocumentSourceCursor expects a PlanExecutor that has had its state saved.
+ planExecutor->saveState();
+ auto cursor =
+ DocumentSourceCursor::create(readLock.getCollection(), std::move(planExecutor), ctx());
+
+ ON_BLOCK_EXIT([cursor]() { cursor->dispose(); });
+ ASSERT_THROWS_CODE(cursor->getNext().isEOF(), AssertionException, ErrorCodes::QueryPlanKilled);
+}
+
+TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterBeingKilled) {
+ // Make sure the collection exists, otherwise we'll default to a NO_YIELD yield policy.
+ const bool capped = true;
+ const bool cappedSize = 1024;
+ ASSERT_TRUE(client.createCollection(nss.ns(), cappedSize, capped));
+ client.insert(nss.ns(), BSON("a" << 1));
+
+ // Make a tailable collection scan wrapped up in a PlanExecutor.
+ AutoGetCollectionForRead readLock(opCtx(), nss);
+ auto workingSet = stdx::make_unique<WorkingSet>();
+ CollectionScanParams collScanParams;
+ collScanParams.collection = readLock.getCollection();
+ collScanParams.tailable = true;
+ auto filter = BSON("a" << 1);
+ auto matchExpression =
+ uassertStatusOK(MatchExpressionParser::parse(filter, ctx()->getCollator()));
+ auto collectionScan = stdx::make_unique<CollectionScan>(
+ opCtx(), collScanParams, workingSet.get(), matchExpression.get());
+ auto queryRequest = stdx::make_unique<QueryRequest>(nss);
+ queryRequest->setFilter(filter);
+ auto canonicalQuery = unittest::assertGet(
+ CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr));
+ auto planExecutor =
+ uassertStatusOK(PlanExecutor::make(opCtx(),
+ std::move(workingSet),
+ std::move(collectionScan),
+ std::move(canonicalQuery),
+ readLock.getCollection(),
+ PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED));
+
+ // Make a DocumentSourceCursor.
+ ctx()->tailableMode = ExpressionContext::TailableMode::kTailableAndAwaitData;
+ // DocumentSourceCursor expects a PlanExecutor that has had its state saved.
+ planExecutor->saveState();
+ auto cursor =
+ DocumentSourceCursor::create(readLock.getCollection(), std::move(planExecutor), ctx());
+
+ ON_BLOCK_EXIT([cursor]() { cursor->dispose(); });
+ ASSERT_THROWS_CODE(cursor->getNext().isEOF(), AssertionException, ErrorCodes::QueryPlanKilled);
+}
+
+TEST_F(DocumentSourceCursorTest, NormalCursorShouldErrorAfterBeingKilled) {
+ // Make sure the collection exists, otherwise we'll default to a NO_YIELD yield policy.
+ ASSERT_TRUE(client.createCollection(nss.ns()));
+ client.insert(nss.ns(), BSON("a" << 1));
+
+ // Make a tailable collection scan wrapped up in a PlanExecutor.
+ AutoGetCollectionForRead readLock(opCtx(), nss);
+ auto workingSet = stdx::make_unique<WorkingSet>();
+ CollectionScanParams collScanParams;
+ collScanParams.collection = readLock.getCollection();
+ auto filter = BSON("a" << 1);
+ auto matchExpression =
+ uassertStatusOK(MatchExpressionParser::parse(filter, ctx()->getCollator()));
+ auto collectionScan = stdx::make_unique<CollectionScan>(
+ opCtx(), collScanParams, workingSet.get(), matchExpression.get());
+ auto queryRequest = stdx::make_unique<QueryRequest>(nss);
+ queryRequest->setFilter(filter);
+ auto canonicalQuery = unittest::assertGet(
+ CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr));
+ auto planExecutor =
+ uassertStatusOK(PlanExecutor::make(opCtx(),
+ std::move(workingSet),
+ std::move(collectionScan),
+ std::move(canonicalQuery),
+ readLock.getCollection(),
+ PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED));
+
+ // Make a DocumentSourceCursor.
+ ctx()->tailableMode = ExpressionContext::TailableMode::kNormal;
+ // DocumentSourceCursor expects a PlanExecutor that has had its state saved.
+ planExecutor->saveState();
+ auto cursor =
+ DocumentSourceCursor::create(readLock.getCollection(), std::move(planExecutor), ctx());
+
+ ON_BLOCK_EXIT([cursor]() { cursor->dispose(); });
+ ASSERT_THROWS_CODE(cursor->getNext().isEOF(), AssertionException, ErrorCodes::QueryPlanKilled);
+}
-} // namespace DocumentSourceCursorTests
+} // namespace
+} // namespace mongo