summaryrefslogtreecommitdiff
path: root/src/mongo/dbtests/query_plan_executor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/dbtests/query_plan_executor.cpp')
-rw-r--r--src/mongo/dbtests/query_plan_executor.cpp848
1 files changed, 422 insertions, 426 deletions
diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp
index 58149558719..9c977bf5336 100644
--- a/src/mongo/dbtests/query_plan_executor.cpp
+++ b/src/mongo/dbtests/query_plan_executor.cpp
@@ -49,481 +49,477 @@
namespace QueryPlanExecutor {
- using std::shared_ptr;
- using std::string;
- using std::unique_ptr;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
- class PlanExecutorBase {
- public:
- PlanExecutorBase() : _client(&_txn) {
+class PlanExecutorBase {
+public:
+ PlanExecutorBase() : _client(&_txn) {}
- }
+ virtual ~PlanExecutorBase() {
+ _client.dropCollection(ns());
+ }
- virtual ~PlanExecutorBase() {
- _client.dropCollection(ns());
- }
+ void addIndex(const BSONObj& obj) {
+ ASSERT_OK(dbtests::createIndex(&_txn, ns(), obj));
+ }
- void addIndex(const BSONObj& obj) {
- ASSERT_OK(dbtests::createIndex(&_txn, ns(), obj));
- }
+ void insert(const BSONObj& obj) {
+ _client.insert(ns(), obj);
+ }
- void insert(const BSONObj& obj) {
- _client.insert(ns(), obj);
- }
+ void remove(const BSONObj& obj) {
+ _client.remove(ns(), obj);
+ }
- void remove(const BSONObj& obj) {
- _client.remove(ns(), obj);
- }
+ void dropCollection() {
+ _client.dropCollection(ns());
+ }
- void dropCollection() {
- _client.dropCollection(ns());
- }
+ void update(BSONObj& query, BSONObj& updateSpec) {
+ _client.update(ns(), query, updateSpec, false, false);
+ }
- void update(BSONObj& query, BSONObj& updateSpec) {
- _client.update(ns(), query, updateSpec, false, false);
- }
+ /**
+ * Given a match expression, represented as the BSON object 'filterObj',
+ * create a PlanExecutor capable of executing a simple collection
+ * scan.
+ *
+ * The caller takes ownership of the returned PlanExecutor*.
+ */
+ PlanExecutor* makeCollScanExec(Collection* coll, BSONObj& filterObj) {
+ CollectionScanParams csparams;
+ csparams.collection = coll;
+ csparams.direction = CollectionScanParams::FORWARD;
+ unique_ptr<WorkingSet> ws(new WorkingSet());
+
+ // Canonicalize the query
+ CanonicalQuery* cq;
+ verify(CanonicalQuery::canonicalize(ns(), filterObj, &cq).isOK());
+ verify(NULL != cq);
+
+ // Make the stage.
+ unique_ptr<PlanStage> root(new CollectionScan(&_txn, csparams, ws.get(), cq->root()));
+
+ PlanExecutor* exec;
+ // Hand the plan off to the executor.
+ Status stat = PlanExecutor::make(
+ &_txn, ws.release(), root.release(), cq, coll, PlanExecutor::YIELD_MANUAL, &exec);
+ ASSERT_OK(stat);
+ return exec;
+ }
- /**
- * Given a match expression, represented as the BSON object 'filterObj',
- * create a PlanExecutor capable of executing a simple collection
- * scan.
- *
- * The caller takes ownership of the returned PlanExecutor*.
- */
- PlanExecutor* makeCollScanExec(Collection* coll, BSONObj& filterObj) {
- CollectionScanParams csparams;
- csparams.collection = coll;
- csparams.direction = CollectionScanParams::FORWARD;
- unique_ptr<WorkingSet> ws(new WorkingSet());
-
- // Canonicalize the query
- CanonicalQuery* cq;
- verify(CanonicalQuery::canonicalize(ns(), filterObj, &cq).isOK());
- verify(NULL != cq);
-
- // Make the stage.
- unique_ptr<PlanStage> root(new CollectionScan(&_txn, csparams, ws.get(), cq->root()));
-
- PlanExecutor* exec;
- // Hand the plan off to the executor.
- Status stat = PlanExecutor::make(&_txn, ws.release(), root.release(), cq, coll,
- PlanExecutor::YIELD_MANUAL, &exec);
- ASSERT_OK(stat);
- return exec;
- }
+ /**
+ * @param indexSpec -- a BSONObj giving the index over which to
+ * scan, e.g. {_id: 1}.
+ * @param start -- the lower bound (inclusive) at which to start
+ * the index scan
+ * @param end -- the lower bound (inclusive) at which to end the
+ * index scan
+ *
+ * Returns a PlanExecutor capable of executing an index scan
+ * over the specified index with the specified bounds.
+ *
+ * The caller takes ownership of the returned PlanExecutor*.
+ */
+ PlanExecutor* makeIndexScanExec(Database* db, BSONObj& indexSpec, int start, int end) {
+ // Build the index scan stage.
+ IndexScanParams ixparams;
+ ixparams.descriptor = getIndex(db, indexSpec);
+ ixparams.bounds.isSimpleRange = true;
+ ixparams.bounds.startKey = BSON("" << start);
+ ixparams.bounds.endKey = BSON("" << end);
+ ixparams.bounds.endKeyInclusive = true;
+ ixparams.direction = 1;
+
+ const Collection* coll = db->getCollection(ns());
+
+ unique_ptr<WorkingSet> ws(new WorkingSet());
+ IndexScan* ix = new IndexScan(&_txn, ixparams, ws.get(), NULL);
+ unique_ptr<PlanStage> root(new FetchStage(&_txn, ws.get(), ix, NULL, coll));
+
+ CanonicalQuery* cq;
+ verify(CanonicalQuery::canonicalize(ns(), BSONObj(), &cq).isOK());
+ verify(NULL != cq);
+
+ PlanExecutor* exec;
+ // Hand the plan off to the executor.
+ Status stat = PlanExecutor::make(
+ &_txn, ws.release(), root.release(), cq, coll, PlanExecutor::YIELD_MANUAL, &exec);
+ ASSERT_OK(stat);
+ return exec;
+ }
+
+ static const char* ns() {
+ return "unittests.QueryPlanExecutor";
+ }
+
+ size_t numCursors() {
+ AutoGetCollectionForRead ctx(&_txn, ns());
+ Collection* collection = ctx.getCollection();
+ if (!collection)
+ return 0;
+ return collection->getCursorManager()->numCursors();
+ }
+
+ void registerExec(PlanExecutor* exec) {
+ // TODO: This is not correct (create collection under S-lock)
+ AutoGetCollectionForRead ctx(&_txn, ns());
+ WriteUnitOfWork wunit(&_txn);
+ Collection* collection = ctx.getDb()->getOrCreateCollection(&_txn, ns());
+ collection->getCursorManager()->registerExecutor(exec);
+ wunit.commit();
+ }
+
+ void deregisterExec(PlanExecutor* exec) {
+ // TODO: This is not correct (create collection under S-lock)
+ AutoGetCollectionForRead ctx(&_txn, ns());
+ WriteUnitOfWork wunit(&_txn);
+ Collection* collection = ctx.getDb()->getOrCreateCollection(&_txn, ns());
+ collection->getCursorManager()->deregisterExecutor(exec);
+ wunit.commit();
+ }
+
+protected:
+ OperationContextImpl _txn;
+
+private:
+ IndexDescriptor* getIndex(Database* db, const BSONObj& obj) {
+ Collection* collection = db->getCollection(ns());
+ return collection->getIndexCatalog()->findIndexByKeyPattern(&_txn, obj);
+ }
+
+ DBDirectClient _client;
+};
- /**
- * @param indexSpec -- a BSONObj giving the index over which to
- * scan, e.g. {_id: 1}.
- * @param start -- the lower bound (inclusive) at which to start
- * the index scan
- * @param end -- the lower bound (inclusive) at which to end the
- * index scan
- *
- * Returns a PlanExecutor capable of executing an index scan
- * over the specified index with the specified bounds.
- *
- * The caller takes ownership of the returned PlanExecutor*.
- */
- PlanExecutor* makeIndexScanExec(Database* db, BSONObj& indexSpec, int start, int end) {
- // Build the index scan stage.
- IndexScanParams ixparams;
- ixparams.descriptor = getIndex(db, indexSpec);
- ixparams.bounds.isSimpleRange = true;
- ixparams.bounds.startKey = BSON("" << start);
- ixparams.bounds.endKey = BSON("" << end);
- ixparams.bounds.endKeyInclusive = true;
- ixparams.direction = 1;
-
- const Collection* coll = db->getCollection(ns());
-
- unique_ptr<WorkingSet> ws(new WorkingSet());
- IndexScan* ix = new IndexScan(&_txn, ixparams, ws.get(), NULL);
- unique_ptr<PlanStage> root(new FetchStage(&_txn, ws.get(), ix, NULL, coll));
-
- CanonicalQuery* cq;
- verify(CanonicalQuery::canonicalize(ns(), BSONObj(), &cq).isOK());
- verify(NULL != cq);
-
- PlanExecutor* exec;
- // Hand the plan off to the executor.
- Status stat = PlanExecutor::make(&_txn, ws.release(), root.release(), cq, coll,
- PlanExecutor::YIELD_MANUAL, &exec);
- ASSERT_OK(stat);
- return exec;
- }
+/**
+ * Test dropping the collection while the
+ * PlanExecutor is doing a collection scan.
+ */
+class DropCollScan : public PlanExecutorBase {
+public:
+ void run() {
+ OldClientWriteContext ctx(&_txn, ns());
+ insert(BSON("_id" << 1));
+ insert(BSON("_id" << 2));
- static const char* ns() { return "unittests.QueryPlanExecutor"; }
+ BSONObj filterObj = fromjson("{_id: {$gt: 0}}");
- size_t numCursors() {
- AutoGetCollectionForRead ctx(&_txn, ns() );
- Collection* collection = ctx.getCollection();
- if ( !collection )
- return 0;
- return collection->getCursorManager()->numCursors();
- }
+ Collection* coll = ctx.getCollection();
+ unique_ptr<PlanExecutor> exec(makeCollScanExec(coll, filterObj));
+ registerExec(exec.get());
- void registerExec( PlanExecutor* exec ) {
- // TODO: This is not correct (create collection under S-lock)
- AutoGetCollectionForRead ctx(&_txn, ns());
- WriteUnitOfWork wunit(&_txn);
- Collection* collection = ctx.getDb()->getOrCreateCollection(&_txn, ns());
- collection->getCursorManager()->registerExecutor( exec );
- wunit.commit();
- }
+ BSONObj objOut;
+ ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL));
+ ASSERT_EQUALS(1, objOut["_id"].numberInt());
- void deregisterExec( PlanExecutor* exec ) {
- // TODO: This is not correct (create collection under S-lock)
- AutoGetCollectionForRead ctx(&_txn, ns());
- WriteUnitOfWork wunit(&_txn);
- Collection* collection = ctx.getDb()->getOrCreateCollection(&_txn, ns());
- collection->getCursorManager()->deregisterExecutor( exec );
- wunit.commit();
- }
+ // After dropping the collection, the runner
+ // should be dead.
+ dropCollection();
+ ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL));
- protected:
- OperationContextImpl _txn;
+ deregisterExec(exec.get());
+ }
+};
- private:
- IndexDescriptor* getIndex(Database* db, const BSONObj& obj) {
- Collection* collection = db->getCollection( ns() );
- return collection->getIndexCatalog()->findIndexByKeyPattern(&_txn, obj);
- }
+/**
+ * Test dropping the collection while the PlanExecutor is doing an index scan.
+ */
+class DropIndexScan : public PlanExecutorBase {
+public:
+ void run() {
+ OldClientWriteContext ctx(&_txn, ns());
+ insert(BSON("_id" << 1 << "a" << 6));
+ insert(BSON("_id" << 2 << "a" << 7));
+ insert(BSON("_id" << 3 << "a" << 8));
+ BSONObj indexSpec = BSON("a" << 1);
+ addIndex(indexSpec);
+
+ unique_ptr<PlanExecutor> exec(makeIndexScanExec(ctx.db(), indexSpec, 7, 10));
+ registerExec(exec.get());
+
+ BSONObj objOut;
+ ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL));
+ ASSERT_EQUALS(7, objOut["a"].numberInt());
+
+ // After dropping the collection, the runner
+ // should be dead.
+ dropCollection();
+ ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL));
+
+ deregisterExec(exec.get());
+ }
+};
- DBDirectClient _client;
- };
+/**
+ * Test dropping the collection while an agg PlanExecutor is doing an index scan.
+ */
+class DropIndexScanAgg : public PlanExecutorBase {
+public:
+ void run() {
+ OldClientWriteContext ctx(&_txn, ns());
+
+ insert(BSON("_id" << 1 << "a" << 6));
+ insert(BSON("_id" << 2 << "a" << 7));
+ insert(BSON("_id" << 3 << "a" << 8));
+ BSONObj indexSpec = BSON("a" << 1);
+ addIndex(indexSpec);
+
+ // Create the PlanExecutor which feeds the aggregation pipeline.
+ std::shared_ptr<PlanExecutor> innerExec(makeIndexScanExec(ctx.db(), indexSpec, 7, 10));
+
+ // Create the aggregation pipeline.
+ boost::intrusive_ptr<ExpressionContext> expCtx =
+ new ExpressionContext(&_txn, NamespaceString(ns()));
+
+ string errmsg;
+ BSONObj inputBson = fromjson("{$match: {a: {$gte: 7, $lte: 10}}}");
+ boost::intrusive_ptr<Pipeline> pipeline = Pipeline::parseCommand(errmsg, inputBson, expCtx);
+ ASSERT_EQUALS(errmsg, "");
+
+ // Create the output PlanExecutor that pulls results from the pipeline.
+ std::unique_ptr<WorkingSet> ws(new WorkingSet());
+ std::unique_ptr<PipelineProxyStage> proxy(
+ new PipelineProxyStage(pipeline, innerExec, ws.get()));
+ Collection* collection = ctx.getCollection();
+
+ PlanExecutor* rawExec;
+ Status status = PlanExecutor::make(
+ &_txn, ws.release(), proxy.release(), collection, PlanExecutor::YIELD_MANUAL, &rawExec);
+ ASSERT_OK(status);
+ std::unique_ptr<PlanExecutor> outerExec(rawExec);
+
+ // Only the outer executor gets registered.
+ registerExec(outerExec.get());
+
+ // Verify that both the "inner" and "outer" plan executors have been killed after
+ // dropping the collection.
+ BSONObj objOut;
+ dropCollection();
+ ASSERT_EQUALS(PlanExecutor::DEAD, innerExec->getNext(&objOut, NULL));
+ ASSERT_EQUALS(PlanExecutor::DEAD, outerExec->getNext(&objOut, NULL));
+
+ deregisterExec(outerExec.get());
+ }
+};
+
+class SnapshotBase : public PlanExecutorBase {
+protected:
+ void setupCollection() {
+ insert(BSON("_id" << 1 << "a" << 1));
+ insert(BSON("_id" << 2 << "a" << 2 << "payload"
+ << "x"));
+ insert(BSON("_id" << 3 << "a" << 3));
+ insert(BSON("_id" << 4 << "a" << 4));
+ }
/**
- * Test dropping the collection while the
- * PlanExecutor is doing a collection scan.
+ * Increases a document's size dramatically such that the document
+ * exceeds the available padding and must be moved to the end of
+ * the collection.
*/
- class DropCollScan : public PlanExecutorBase {
- public:
- void run() {
- OldClientWriteContext ctx(&_txn, ns());
- insert(BSON("_id" << 1));
- insert(BSON("_id" << 2));
-
- BSONObj filterObj = fromjson("{_id: {$gt: 0}}");
-
- Collection* coll = ctx.getCollection();
- unique_ptr<PlanExecutor> exec(makeCollScanExec(coll, filterObj));
- registerExec(exec.get());
+ void forceDocumentMove() {
+ BSONObj query = BSON("_id" << 2);
+ BSONObj updateSpec = BSON("$set" << BSON("payload" << payload8k()));
+ update(query, updateSpec);
+ }
- BSONObj objOut;
- ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL));
- ASSERT_EQUALS(1, objOut["_id"].numberInt());
-
- // After dropping the collection, the runner
- // should be dead.
- dropCollection();
- ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL));
-
- deregisterExec(exec.get());
- }
- };
+ std::string payload8k() {
+ return std::string(8 * 1024, 'x');
+ }
/**
- * Test dropping the collection while the PlanExecutor is doing an index scan.
+ * Given an array of ints, 'expectedIds', and a PlanExecutor,
+ * 'exec', uses the executor to iterate through the collection. While
+ * iterating, asserts that the _id of each successive document equals
+ * the respective integer in 'expectedIds'.
*/
- class DropIndexScan : public PlanExecutorBase {
- public:
- void run() {
- OldClientWriteContext ctx(&_txn, ns());
- insert(BSON("_id" << 1 << "a" << 6));
- insert(BSON("_id" << 2 << "a" << 7));
- insert(BSON("_id" << 3 << "a" << 8));
- BSONObj indexSpec = BSON("a" << 1);
- addIndex(indexSpec);
+ void checkIds(int* expectedIds, PlanExecutor* exec) {
+ BSONObj objOut;
+ int idcount = 0;
+ while (PlanExecutor::ADVANCED == exec->getNext(&objOut, NULL)) {
+ ASSERT_EQUALS(expectedIds[idcount], objOut["_id"].numberInt());
+ ++idcount;
+ }
+ }
+};
- unique_ptr<PlanExecutor> exec(makeIndexScanExec(ctx.db(), indexSpec, 7, 10));
- registerExec(exec.get());
+/**
+ * Create a scenario in which the same document is returned
+ * twice due to a concurrent document move and collection
+ * scan.
+ */
+class SnapshotControl : public SnapshotBase {
+public:
+ void run() {
+ OldClientWriteContext ctx(&_txn, ns());
+ setupCollection();
- BSONObj objOut;
- ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL));
- ASSERT_EQUALS(7, objOut["a"].numberInt());
+ BSONObj filterObj = fromjson("{a: {$gte: 2}}");
- // After dropping the collection, the runner
- // should be dead.
- dropCollection();
- ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL));
+ Collection* coll = ctx.getCollection();
+ unique_ptr<PlanExecutor> exec(makeCollScanExec(coll, filterObj));
- deregisterExec(exec.get());
- }
- };
+ BSONObj objOut;
+ ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL));
+ ASSERT_EQUALS(2, objOut["a"].numberInt());
- /**
- * Test dropping the collection while an agg PlanExecutor is doing an index scan.
- */
- class DropIndexScanAgg : public PlanExecutorBase {
- public:
- void run() {
- OldClientWriteContext ctx(&_txn, ns());
+ forceDocumentMove();
- insert(BSON("_id" << 1 << "a" << 6));
- insert(BSON("_id" << 2 << "a" << 7));
- insert(BSON("_id" << 3 << "a" << 8));
- BSONObj indexSpec = BSON("a" << 1);
- addIndex(indexSpec);
-
- // Create the PlanExecutor which feeds the aggregation pipeline.
- std::shared_ptr<PlanExecutor> innerExec(
- makeIndexScanExec(ctx.db(), indexSpec, 7, 10));
-
- // Create the aggregation pipeline.
- boost::intrusive_ptr<ExpressionContext> expCtx =
- new ExpressionContext(&_txn, NamespaceString(ns()));
-
- string errmsg;
- BSONObj inputBson = fromjson("{$match: {a: {$gte: 7, $lte: 10}}}");
- boost::intrusive_ptr<Pipeline> pipeline =
- Pipeline::parseCommand(errmsg, inputBson, expCtx);
- ASSERT_EQUALS(errmsg, "");
-
- // Create the output PlanExecutor that pulls results from the pipeline.
- std::unique_ptr<WorkingSet> ws(new WorkingSet());
- std::unique_ptr<PipelineProxyStage> proxy(
- new PipelineProxyStage(pipeline, innerExec, ws.get()));
- Collection* collection = ctx.getCollection();
+ int ids[] = {3, 4, 2};
+ checkIds(ids, exec.get());
+ }
+};
- PlanExecutor* rawExec;
- Status status = PlanExecutor::make(&_txn, ws.release(), proxy.release(), collection,
- PlanExecutor::YIELD_MANUAL, &rawExec);
- ASSERT_OK(status);
- std::unique_ptr<PlanExecutor> outerExec(rawExec);
+/**
+ * A snapshot is really just a hint that means scan the _id index.
+ * Make sure that we do not see the document move with an _id
+ * index scan.
+ */
+class SnapshotTest : public SnapshotBase {
+public:
+ void run() {
+ OldClientWriteContext ctx(&_txn, ns());
+ setupCollection();
+ BSONObj indexSpec = BSON("_id" << 1);
+ addIndex(indexSpec);
- // Only the outer executor gets registered.
- registerExec(outerExec.get());
+ BSONObj filterObj = fromjson("{a: {$gte: 2}}");
+ unique_ptr<PlanExecutor> exec(makeIndexScanExec(ctx.db(), indexSpec, 2, 5));
- // Verify that both the "inner" and "outer" plan executors have been killed after
- // dropping the collection.
- BSONObj objOut;
- dropCollection();
- ASSERT_EQUALS(PlanExecutor::DEAD, innerExec->getNext(&objOut, NULL));
- ASSERT_EQUALS(PlanExecutor::DEAD, outerExec->getNext(&objOut, NULL));
+ BSONObj objOut;
+ ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL));
+ ASSERT_EQUALS(2, objOut["a"].numberInt());
- deregisterExec(outerExec.get());
- }
- };
-
- class SnapshotBase : public PlanExecutorBase {
- protected:
- void setupCollection() {
- insert(BSON("_id" << 1 << "a" << 1));
- insert(BSON("_id" << 2 << "a" << 2 << "payload" << "x"));
- insert(BSON("_id" << 3 << "a" << 3));
- insert(BSON("_id" << 4 << "a" << 4));
- }
+ forceDocumentMove();
- /**
- * Increases a document's size dramatically such that the document
- * exceeds the available padding and must be moved to the end of
- * the collection.
- */
- void forceDocumentMove() {
- BSONObj query = BSON("_id" << 2);
- BSONObj updateSpec = BSON("$set" << BSON("payload" << payload8k()));
- update(query, updateSpec);
- }
+ // Since this time we're scanning the _id index,
+ // we should not see the moved document again.
+ int ids[] = {3, 4};
+ checkIds(ids, exec.get());
+ }
+};
- std::string payload8k() {
- return std::string(8*1024, 'x');
- }
+namespace ClientCursor {
- /**
- * Given an array of ints, 'expectedIds', and a PlanExecutor,
- * 'exec', uses the executor to iterate through the collection. While
- * iterating, asserts that the _id of each successive document equals
- * the respective integer in 'expectedIds'.
- */
- void checkIds(int* expectedIds, PlanExecutor* exec) {
- BSONObj objOut;
- int idcount = 0;
- while (PlanExecutor::ADVANCED == exec->getNext(&objOut, NULL)) {
- ASSERT_EQUALS(expectedIds[idcount], objOut["_id"].numberInt());
- ++idcount;
- }
- }
- };
+using mongo::ClientCursor;
- /**
- * Create a scenario in which the same document is returned
- * twice due to a concurrent document move and collection
- * scan.
- */
- class SnapshotControl : public SnapshotBase {
- public:
- void run() {
- OldClientWriteContext ctx(&_txn, ns());
- setupCollection();
+/**
+ * Test invalidation of ClientCursor.
+ */
+class Invalidate : public PlanExecutorBase {
+public:
+ void run() {
+ OldClientWriteContext ctx(&_txn, ns());
+ insert(BSON("a" << 1 << "b" << 1));
- BSONObj filterObj = fromjson("{a: {$gte: 2}}");
+ BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}");
- Collection* coll = ctx.getCollection();
- unique_ptr<PlanExecutor> exec(makeCollScanExec(coll, filterObj));
+ Collection* coll = ctx.getCollection();
+ PlanExecutor* exec = makeCollScanExec(coll, filterObj);
- BSONObj objOut;
- ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL));
- ASSERT_EQUALS(2, objOut["a"].numberInt());
+ // Make a client cursor from the runner.
+ new ClientCursor(coll->getCursorManager(), exec, ns(), 0, BSONObj());
- forceDocumentMove();
+ // There should be one cursor before invalidation,
+ // and zero cursors after invalidation.
+ ASSERT_EQUALS(1U, numCursors());
+ coll->getCursorManager()->invalidateAll(false, "Invalidate Test");
+ ASSERT_EQUALS(0U, numCursors());
+ }
+};
- int ids[] = {3, 4, 2};
- checkIds(ids, exec.get());
- }
- };
+/**
+ * Test that pinned client cursors persist even after
+ * invalidation.
+ */
+class InvalidatePinned : public PlanExecutorBase {
+public:
+ void run() {
+ OldClientWriteContext ctx(&_txn, ns());
+ insert(BSON("a" << 1 << "b" << 1));
+
+ Collection* collection = ctx.getCollection();
+
+ BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}");
+ PlanExecutor* exec = makeCollScanExec(collection, filterObj);
+
+ // Make a client cursor from the runner.
+ ClientCursor* cc =
+ new ClientCursor(collection->getCursorManager(), exec, ns(), 0, BSONObj());
+ ClientCursorPin ccPin(collection->getCursorManager(), cc->cursorid());
+
+ // If the cursor is pinned, it sticks around,
+ // even after invalidation.
+ ASSERT_EQUALS(1U, numCursors());
+ const std::string invalidateReason("InvalidatePinned Test");
+ collection->getCursorManager()->invalidateAll(false, invalidateReason);
+ ASSERT_EQUALS(1U, numCursors());
+
+ // The invalidation should have killed the runner.
+ BSONObj objOut;
+ ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL));
+ ASSERT(WorkingSetCommon::isValidStatusMemberObject(objOut));
+ const Status status = WorkingSetCommon::getMemberObjectStatus(objOut);
+ ASSERT(status.reason().find(invalidateReason) != string::npos);
+
+ // Deleting the underlying cursor should cause the
+ // number of cursors to return to 0.
+ ccPin.deleteUnderlying();
+ ASSERT_EQUALS(0U, numCursors());
+ }
+};
- /**
- * A snapshot is really just a hint that means scan the _id index.
- * Make sure that we do not see the document move with an _id
- * index scan.
- */
- class SnapshotTest : public SnapshotBase {
- public:
- void run() {
+/**
+ * Test that client cursors time out and get
+ * deleted.
+ */
+class Timeout : public PlanExecutorBase {
+public:
+ void run() {
+ {
OldClientWriteContext ctx(&_txn, ns());
- setupCollection();
- BSONObj indexSpec = BSON("_id" << 1);
- addIndex(indexSpec);
-
- BSONObj filterObj = fromjson("{a: {$gte: 2}}");
- unique_ptr<PlanExecutor> exec(makeIndexScanExec(ctx.db(), indexSpec, 2, 5));
+ insert(BSON("a" << 1 << "b" << 1));
+ }
- BSONObj objOut;
- ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL));
- ASSERT_EQUALS(2, objOut["a"].numberInt());
+ {
+ AutoGetCollectionForRead ctx(&_txn, ns());
+ Collection* collection = ctx.getCollection();
- forceDocumentMove();
+ BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}");
+ PlanExecutor* exec = makeCollScanExec(collection, filterObj);
- // Since this time we're scanning the _id index,
- // we should not see the moved document again.
- int ids[] = {3, 4};
- checkIds(ids, exec.get());
- }
- };
-
- namespace ClientCursor {
-
- using mongo::ClientCursor;
-
- /**
- * Test invalidation of ClientCursor.
- */
- class Invalidate : public PlanExecutorBase {
- public:
- void run() {
- OldClientWriteContext ctx(&_txn, ns());
- insert(BSON("a" << 1 << "b" << 1));
-
- BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}");
-
- Collection* coll = ctx.getCollection();
- PlanExecutor* exec = makeCollScanExec(coll,filterObj);
-
- // Make a client cursor from the runner.
- new ClientCursor(coll->getCursorManager(), exec, ns(), 0, BSONObj());
-
- // There should be one cursor before invalidation,
- // and zero cursors after invalidation.
- ASSERT_EQUALS(1U, numCursors());
- coll->getCursorManager()->invalidateAll(false, "Invalidate Test");
- ASSERT_EQUALS(0U, numCursors());
- }
- };
-
- /**
- * Test that pinned client cursors persist even after
- * invalidation.
- */
- class InvalidatePinned : public PlanExecutorBase {
- public:
- void run() {
- OldClientWriteContext ctx(&_txn, ns());
- insert(BSON("a" << 1 << "b" << 1));
-
- Collection* collection = ctx.getCollection();
-
- BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}");
- PlanExecutor* exec = makeCollScanExec(collection, filterObj);
-
- // Make a client cursor from the runner.
- ClientCursor* cc = new ClientCursor(collection->getCursorManager(),
- exec,
- ns(),
- 0,
- BSONObj());
- ClientCursorPin ccPin(collection->getCursorManager(), cc->cursorid());
-
- // If the cursor is pinned, it sticks around,
- // even after invalidation.
- ASSERT_EQUALS(1U, numCursors());
- const std::string invalidateReason("InvalidatePinned Test");
- collection->getCursorManager()->invalidateAll(false, invalidateReason);
- ASSERT_EQUALS(1U, numCursors());
-
- // The invalidation should have killed the runner.
- BSONObj objOut;
- ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL));
- ASSERT(WorkingSetCommon::isValidStatusMemberObject(objOut));
- const Status status = WorkingSetCommon::getMemberObjectStatus(objOut);
- ASSERT(status.reason().find(invalidateReason) != string::npos);
-
- // Deleting the underlying cursor should cause the
- // number of cursors to return to 0.
- ccPin.deleteUnderlying();
- ASSERT_EQUALS(0U, numCursors());
- }
- };
-
- /**
- * Test that client cursors time out and get
- * deleted.
- */
- class Timeout : public PlanExecutorBase {
- public:
- void run() {
- {
- OldClientWriteContext ctx(&_txn, ns());
- insert(BSON("a" << 1 << "b" << 1));
- }
-
- {
- AutoGetCollectionForRead ctx(&_txn, ns());
- Collection* collection = ctx.getCollection();
-
- BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}");
- PlanExecutor* exec = makeCollScanExec(collection, filterObj);
-
- // Make a client cursor from the runner.
- new ClientCursor(collection->getCursorManager(), exec, ns(), 0, BSONObj());
- }
-
- // There should be one cursor before timeout,
- // and zero cursors after timeout.
- ASSERT_EQUALS(1U, numCursors());
- CursorManager::timeoutCursorsGlobal(&_txn, 600001);
- ASSERT_EQUALS(0U, numCursors());
- }
- };
-
- } // namespace ClientCursor
-
- class All : public Suite {
- public:
- All() : Suite( "query_plan_executor" ) { }
-
- void setupTests() {
- add<DropCollScan>();
- add<DropIndexScan>();
- add<DropIndexScanAgg>();
- add<SnapshotControl>();
- add<SnapshotTest>();
- add<ClientCursor::Invalidate>();
- add<ClientCursor::InvalidatePinned>();
- add<ClientCursor::Timeout>();
+ // Make a client cursor from the runner.
+ new ClientCursor(collection->getCursorManager(), exec, ns(), 0, BSONObj());
}
- };
- SuiteInstance<All> queryPlanExecutorAll;
+ // There should be one cursor before timeout,
+ // and zero cursors after timeout.
+ ASSERT_EQUALS(1U, numCursors());
+ CursorManager::timeoutCursorsGlobal(&_txn, 600001);
+ ASSERT_EQUALS(0U, numCursors());
+ }
+};
+
+} // namespace ClientCursor
+
+class All : public Suite {
+public:
+ All() : Suite("query_plan_executor") {}
+
+ void setupTests() {
+ add<DropCollScan>();
+ add<DropIndexScan>();
+ add<DropIndexScanAgg>();
+ add<SnapshotControl>();
+ add<SnapshotTest>();
+ add<ClientCursor::Invalidate>();
+ add<ClientCursor::InvalidatePinned>();
+ add<ClientCursor::Timeout>();
+ }
+};
+
+SuiteInstance<All> queryPlanExecutorAll;
} // namespace QueryPlanExecutor