diff options
Diffstat (limited to 'src/mongo/dbtests/query_plan_executor.cpp')
-rw-r--r-- | src/mongo/dbtests/query_plan_executor.cpp | 848 |
1 files changed, 422 insertions, 426 deletions
diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp index 58149558719..9c977bf5336 100644 --- a/src/mongo/dbtests/query_plan_executor.cpp +++ b/src/mongo/dbtests/query_plan_executor.cpp @@ -49,481 +49,477 @@ namespace QueryPlanExecutor { - using std::shared_ptr; - using std::string; - using std::unique_ptr; +using std::shared_ptr; +using std::string; +using std::unique_ptr; - class PlanExecutorBase { - public: - PlanExecutorBase() : _client(&_txn) { +class PlanExecutorBase { +public: + PlanExecutorBase() : _client(&_txn) {} - } + virtual ~PlanExecutorBase() { + _client.dropCollection(ns()); + } - virtual ~PlanExecutorBase() { - _client.dropCollection(ns()); - } + void addIndex(const BSONObj& obj) { + ASSERT_OK(dbtests::createIndex(&_txn, ns(), obj)); + } - void addIndex(const BSONObj& obj) { - ASSERT_OK(dbtests::createIndex(&_txn, ns(), obj)); - } + void insert(const BSONObj& obj) { + _client.insert(ns(), obj); + } - void insert(const BSONObj& obj) { - _client.insert(ns(), obj); - } + void remove(const BSONObj& obj) { + _client.remove(ns(), obj); + } - void remove(const BSONObj& obj) { - _client.remove(ns(), obj); - } + void dropCollection() { + _client.dropCollection(ns()); + } - void dropCollection() { - _client.dropCollection(ns()); - } + void update(BSONObj& query, BSONObj& updateSpec) { + _client.update(ns(), query, updateSpec, false, false); + } - void update(BSONObj& query, BSONObj& updateSpec) { - _client.update(ns(), query, updateSpec, false, false); - } + /** + * Given a match expression, represented as the BSON object 'filterObj', + * create a PlanExecutor capable of executing a simple collection + * scan. + * + * The caller takes ownership of the returned PlanExecutor*. + */ + PlanExecutor* makeCollScanExec(Collection* coll, BSONObj& filterObj) { + CollectionScanParams csparams; + csparams.collection = coll; + csparams.direction = CollectionScanParams::FORWARD; + unique_ptr<WorkingSet> ws(new WorkingSet()); + + // Canonicalize the query + CanonicalQuery* cq; + verify(CanonicalQuery::canonicalize(ns(), filterObj, &cq).isOK()); + verify(NULL != cq); + + // Make the stage. + unique_ptr<PlanStage> root(new CollectionScan(&_txn, csparams, ws.get(), cq->root())); + + PlanExecutor* exec; + // Hand the plan off to the executor. + Status stat = PlanExecutor::make( + &_txn, ws.release(), root.release(), cq, coll, PlanExecutor::YIELD_MANUAL, &exec); + ASSERT_OK(stat); + return exec; + } - /** - * Given a match expression, represented as the BSON object 'filterObj', - * create a PlanExecutor capable of executing a simple collection - * scan. - * - * The caller takes ownership of the returned PlanExecutor*. - */ - PlanExecutor* makeCollScanExec(Collection* coll, BSONObj& filterObj) { - CollectionScanParams csparams; - csparams.collection = coll; - csparams.direction = CollectionScanParams::FORWARD; - unique_ptr<WorkingSet> ws(new WorkingSet()); - - // Canonicalize the query - CanonicalQuery* cq; - verify(CanonicalQuery::canonicalize(ns(), filterObj, &cq).isOK()); - verify(NULL != cq); - - // Make the stage. - unique_ptr<PlanStage> root(new CollectionScan(&_txn, csparams, ws.get(), cq->root())); - - PlanExecutor* exec; - // Hand the plan off to the executor. - Status stat = PlanExecutor::make(&_txn, ws.release(), root.release(), cq, coll, - PlanExecutor::YIELD_MANUAL, &exec); - ASSERT_OK(stat); - return exec; - } + /** + * @param indexSpec -- a BSONObj giving the index over which to + * scan, e.g. {_id: 1}. + * @param start -- the lower bound (inclusive) at which to start + * the index scan + * @param end -- the lower bound (inclusive) at which to end the + * index scan + * + * Returns a PlanExecutor capable of executing an index scan + * over the specified index with the specified bounds. + * + * The caller takes ownership of the returned PlanExecutor*. + */ + PlanExecutor* makeIndexScanExec(Database* db, BSONObj& indexSpec, int start, int end) { + // Build the index scan stage. + IndexScanParams ixparams; + ixparams.descriptor = getIndex(db, indexSpec); + ixparams.bounds.isSimpleRange = true; + ixparams.bounds.startKey = BSON("" << start); + ixparams.bounds.endKey = BSON("" << end); + ixparams.bounds.endKeyInclusive = true; + ixparams.direction = 1; + + const Collection* coll = db->getCollection(ns()); + + unique_ptr<WorkingSet> ws(new WorkingSet()); + IndexScan* ix = new IndexScan(&_txn, ixparams, ws.get(), NULL); + unique_ptr<PlanStage> root(new FetchStage(&_txn, ws.get(), ix, NULL, coll)); + + CanonicalQuery* cq; + verify(CanonicalQuery::canonicalize(ns(), BSONObj(), &cq).isOK()); + verify(NULL != cq); + + PlanExecutor* exec; + // Hand the plan off to the executor. + Status stat = PlanExecutor::make( + &_txn, ws.release(), root.release(), cq, coll, PlanExecutor::YIELD_MANUAL, &exec); + ASSERT_OK(stat); + return exec; + } + + static const char* ns() { + return "unittests.QueryPlanExecutor"; + } + + size_t numCursors() { + AutoGetCollectionForRead ctx(&_txn, ns()); + Collection* collection = ctx.getCollection(); + if (!collection) + return 0; + return collection->getCursorManager()->numCursors(); + } + + void registerExec(PlanExecutor* exec) { + // TODO: This is not correct (create collection under S-lock) + AutoGetCollectionForRead ctx(&_txn, ns()); + WriteUnitOfWork wunit(&_txn); + Collection* collection = ctx.getDb()->getOrCreateCollection(&_txn, ns()); + collection->getCursorManager()->registerExecutor(exec); + wunit.commit(); + } + + void deregisterExec(PlanExecutor* exec) { + // TODO: This is not correct (create collection under S-lock) + AutoGetCollectionForRead ctx(&_txn, ns()); + WriteUnitOfWork wunit(&_txn); + Collection* collection = ctx.getDb()->getOrCreateCollection(&_txn, ns()); + collection->getCursorManager()->deregisterExecutor(exec); + wunit.commit(); + } + +protected: + OperationContextImpl _txn; + +private: + IndexDescriptor* getIndex(Database* db, const BSONObj& obj) { + Collection* collection = db->getCollection(ns()); + return collection->getIndexCatalog()->findIndexByKeyPattern(&_txn, obj); + } + + DBDirectClient _client; +}; - /** - * @param indexSpec -- a BSONObj giving the index over which to - * scan, e.g. {_id: 1}. - * @param start -- the lower bound (inclusive) at which to start - * the index scan - * @param end -- the lower bound (inclusive) at which to end the - * index scan - * - * Returns a PlanExecutor capable of executing an index scan - * over the specified index with the specified bounds. - * - * The caller takes ownership of the returned PlanExecutor*. - */ - PlanExecutor* makeIndexScanExec(Database* db, BSONObj& indexSpec, int start, int end) { - // Build the index scan stage. - IndexScanParams ixparams; - ixparams.descriptor = getIndex(db, indexSpec); - ixparams.bounds.isSimpleRange = true; - ixparams.bounds.startKey = BSON("" << start); - ixparams.bounds.endKey = BSON("" << end); - ixparams.bounds.endKeyInclusive = true; - ixparams.direction = 1; - - const Collection* coll = db->getCollection(ns()); - - unique_ptr<WorkingSet> ws(new WorkingSet()); - IndexScan* ix = new IndexScan(&_txn, ixparams, ws.get(), NULL); - unique_ptr<PlanStage> root(new FetchStage(&_txn, ws.get(), ix, NULL, coll)); - - CanonicalQuery* cq; - verify(CanonicalQuery::canonicalize(ns(), BSONObj(), &cq).isOK()); - verify(NULL != cq); - - PlanExecutor* exec; - // Hand the plan off to the executor. - Status stat = PlanExecutor::make(&_txn, ws.release(), root.release(), cq, coll, - PlanExecutor::YIELD_MANUAL, &exec); - ASSERT_OK(stat); - return exec; - } +/** + * Test dropping the collection while the + * PlanExecutor is doing a collection scan. + */ +class DropCollScan : public PlanExecutorBase { +public: + void run() { + OldClientWriteContext ctx(&_txn, ns()); + insert(BSON("_id" << 1)); + insert(BSON("_id" << 2)); - static const char* ns() { return "unittests.QueryPlanExecutor"; } + BSONObj filterObj = fromjson("{_id: {$gt: 0}}"); - size_t numCursors() { - AutoGetCollectionForRead ctx(&_txn, ns() ); - Collection* collection = ctx.getCollection(); - if ( !collection ) - return 0; - return collection->getCursorManager()->numCursors(); - } + Collection* coll = ctx.getCollection(); + unique_ptr<PlanExecutor> exec(makeCollScanExec(coll, filterObj)); + registerExec(exec.get()); - void registerExec( PlanExecutor* exec ) { - // TODO: This is not correct (create collection under S-lock) - AutoGetCollectionForRead ctx(&_txn, ns()); - WriteUnitOfWork wunit(&_txn); - Collection* collection = ctx.getDb()->getOrCreateCollection(&_txn, ns()); - collection->getCursorManager()->registerExecutor( exec ); - wunit.commit(); - } + BSONObj objOut; + ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL)); + ASSERT_EQUALS(1, objOut["_id"].numberInt()); - void deregisterExec( PlanExecutor* exec ) { - // TODO: This is not correct (create collection under S-lock) - AutoGetCollectionForRead ctx(&_txn, ns()); - WriteUnitOfWork wunit(&_txn); - Collection* collection = ctx.getDb()->getOrCreateCollection(&_txn, ns()); - collection->getCursorManager()->deregisterExecutor( exec ); - wunit.commit(); - } + // After dropping the collection, the runner + // should be dead. + dropCollection(); + ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL)); - protected: - OperationContextImpl _txn; + deregisterExec(exec.get()); + } +}; - private: - IndexDescriptor* getIndex(Database* db, const BSONObj& obj) { - Collection* collection = db->getCollection( ns() ); - return collection->getIndexCatalog()->findIndexByKeyPattern(&_txn, obj); - } +/** + * Test dropping the collection while the PlanExecutor is doing an index scan. + */ +class DropIndexScan : public PlanExecutorBase { +public: + void run() { + OldClientWriteContext ctx(&_txn, ns()); + insert(BSON("_id" << 1 << "a" << 6)); + insert(BSON("_id" << 2 << "a" << 7)); + insert(BSON("_id" << 3 << "a" << 8)); + BSONObj indexSpec = BSON("a" << 1); + addIndex(indexSpec); + + unique_ptr<PlanExecutor> exec(makeIndexScanExec(ctx.db(), indexSpec, 7, 10)); + registerExec(exec.get()); + + BSONObj objOut; + ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL)); + ASSERT_EQUALS(7, objOut["a"].numberInt()); + + // After dropping the collection, the runner + // should be dead. + dropCollection(); + ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL)); + + deregisterExec(exec.get()); + } +}; - DBDirectClient _client; - }; +/** + * Test dropping the collection while an agg PlanExecutor is doing an index scan. + */ +class DropIndexScanAgg : public PlanExecutorBase { +public: + void run() { + OldClientWriteContext ctx(&_txn, ns()); + + insert(BSON("_id" << 1 << "a" << 6)); + insert(BSON("_id" << 2 << "a" << 7)); + insert(BSON("_id" << 3 << "a" << 8)); + BSONObj indexSpec = BSON("a" << 1); + addIndex(indexSpec); + + // Create the PlanExecutor which feeds the aggregation pipeline. + std::shared_ptr<PlanExecutor> innerExec(makeIndexScanExec(ctx.db(), indexSpec, 7, 10)); + + // Create the aggregation pipeline. + boost::intrusive_ptr<ExpressionContext> expCtx = + new ExpressionContext(&_txn, NamespaceString(ns())); + + string errmsg; + BSONObj inputBson = fromjson("{$match: {a: {$gte: 7, $lte: 10}}}"); + boost::intrusive_ptr<Pipeline> pipeline = Pipeline::parseCommand(errmsg, inputBson, expCtx); + ASSERT_EQUALS(errmsg, ""); + + // Create the output PlanExecutor that pulls results from the pipeline. + std::unique_ptr<WorkingSet> ws(new WorkingSet()); + std::unique_ptr<PipelineProxyStage> proxy( + new PipelineProxyStage(pipeline, innerExec, ws.get())); + Collection* collection = ctx.getCollection(); + + PlanExecutor* rawExec; + Status status = PlanExecutor::make( + &_txn, ws.release(), proxy.release(), collection, PlanExecutor::YIELD_MANUAL, &rawExec); + ASSERT_OK(status); + std::unique_ptr<PlanExecutor> outerExec(rawExec); + + // Only the outer executor gets registered. + registerExec(outerExec.get()); + + // Verify that both the "inner" and "outer" plan executors have been killed after + // dropping the collection. + BSONObj objOut; + dropCollection(); + ASSERT_EQUALS(PlanExecutor::DEAD, innerExec->getNext(&objOut, NULL)); + ASSERT_EQUALS(PlanExecutor::DEAD, outerExec->getNext(&objOut, NULL)); + + deregisterExec(outerExec.get()); + } +}; + +class SnapshotBase : public PlanExecutorBase { +protected: + void setupCollection() { + insert(BSON("_id" << 1 << "a" << 1)); + insert(BSON("_id" << 2 << "a" << 2 << "payload" + << "x")); + insert(BSON("_id" << 3 << "a" << 3)); + insert(BSON("_id" << 4 << "a" << 4)); + } /** - * Test dropping the collection while the - * PlanExecutor is doing a collection scan. + * Increases a document's size dramatically such that the document + * exceeds the available padding and must be moved to the end of + * the collection. */ - class DropCollScan : public PlanExecutorBase { - public: - void run() { - OldClientWriteContext ctx(&_txn, ns()); - insert(BSON("_id" << 1)); - insert(BSON("_id" << 2)); - - BSONObj filterObj = fromjson("{_id: {$gt: 0}}"); - - Collection* coll = ctx.getCollection(); - unique_ptr<PlanExecutor> exec(makeCollScanExec(coll, filterObj)); - registerExec(exec.get()); + void forceDocumentMove() { + BSONObj query = BSON("_id" << 2); + BSONObj updateSpec = BSON("$set" << BSON("payload" << payload8k())); + update(query, updateSpec); + } - BSONObj objOut; - ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL)); - ASSERT_EQUALS(1, objOut["_id"].numberInt()); - - // After dropping the collection, the runner - // should be dead. - dropCollection(); - ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL)); - - deregisterExec(exec.get()); - } - }; + std::string payload8k() { + return std::string(8 * 1024, 'x'); + } /** - * Test dropping the collection while the PlanExecutor is doing an index scan. + * Given an array of ints, 'expectedIds', and a PlanExecutor, + * 'exec', uses the executor to iterate through the collection. While + * iterating, asserts that the _id of each successive document equals + * the respective integer in 'expectedIds'. */ - class DropIndexScan : public PlanExecutorBase { - public: - void run() { - OldClientWriteContext ctx(&_txn, ns()); - insert(BSON("_id" << 1 << "a" << 6)); - insert(BSON("_id" << 2 << "a" << 7)); - insert(BSON("_id" << 3 << "a" << 8)); - BSONObj indexSpec = BSON("a" << 1); - addIndex(indexSpec); + void checkIds(int* expectedIds, PlanExecutor* exec) { + BSONObj objOut; + int idcount = 0; + while (PlanExecutor::ADVANCED == exec->getNext(&objOut, NULL)) { + ASSERT_EQUALS(expectedIds[idcount], objOut["_id"].numberInt()); + ++idcount; + } + } +}; - unique_ptr<PlanExecutor> exec(makeIndexScanExec(ctx.db(), indexSpec, 7, 10)); - registerExec(exec.get()); +/** + * Create a scenario in which the same document is returned + * twice due to a concurrent document move and collection + * scan. + */ +class SnapshotControl : public SnapshotBase { +public: + void run() { + OldClientWriteContext ctx(&_txn, ns()); + setupCollection(); - BSONObj objOut; - ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL)); - ASSERT_EQUALS(7, objOut["a"].numberInt()); + BSONObj filterObj = fromjson("{a: {$gte: 2}}"); - // After dropping the collection, the runner - // should be dead. - dropCollection(); - ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL)); + Collection* coll = ctx.getCollection(); + unique_ptr<PlanExecutor> exec(makeCollScanExec(coll, filterObj)); - deregisterExec(exec.get()); - } - }; + BSONObj objOut; + ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL)); + ASSERT_EQUALS(2, objOut["a"].numberInt()); - /** - * Test dropping the collection while an agg PlanExecutor is doing an index scan. - */ - class DropIndexScanAgg : public PlanExecutorBase { - public: - void run() { - OldClientWriteContext ctx(&_txn, ns()); + forceDocumentMove(); - insert(BSON("_id" << 1 << "a" << 6)); - insert(BSON("_id" << 2 << "a" << 7)); - insert(BSON("_id" << 3 << "a" << 8)); - BSONObj indexSpec = BSON("a" << 1); - addIndex(indexSpec); - - // Create the PlanExecutor which feeds the aggregation pipeline. - std::shared_ptr<PlanExecutor> innerExec( - makeIndexScanExec(ctx.db(), indexSpec, 7, 10)); - - // Create the aggregation pipeline. - boost::intrusive_ptr<ExpressionContext> expCtx = - new ExpressionContext(&_txn, NamespaceString(ns())); - - string errmsg; - BSONObj inputBson = fromjson("{$match: {a: {$gte: 7, $lte: 10}}}"); - boost::intrusive_ptr<Pipeline> pipeline = - Pipeline::parseCommand(errmsg, inputBson, expCtx); - ASSERT_EQUALS(errmsg, ""); - - // Create the output PlanExecutor that pulls results from the pipeline. - std::unique_ptr<WorkingSet> ws(new WorkingSet()); - std::unique_ptr<PipelineProxyStage> proxy( - new PipelineProxyStage(pipeline, innerExec, ws.get())); - Collection* collection = ctx.getCollection(); + int ids[] = {3, 4, 2}; + checkIds(ids, exec.get()); + } +}; - PlanExecutor* rawExec; - Status status = PlanExecutor::make(&_txn, ws.release(), proxy.release(), collection, - PlanExecutor::YIELD_MANUAL, &rawExec); - ASSERT_OK(status); - std::unique_ptr<PlanExecutor> outerExec(rawExec); +/** + * A snapshot is really just a hint that means scan the _id index. + * Make sure that we do not see the document move with an _id + * index scan. + */ +class SnapshotTest : public SnapshotBase { +public: + void run() { + OldClientWriteContext ctx(&_txn, ns()); + setupCollection(); + BSONObj indexSpec = BSON("_id" << 1); + addIndex(indexSpec); - // Only the outer executor gets registered. - registerExec(outerExec.get()); + BSONObj filterObj = fromjson("{a: {$gte: 2}}"); + unique_ptr<PlanExecutor> exec(makeIndexScanExec(ctx.db(), indexSpec, 2, 5)); - // Verify that both the "inner" and "outer" plan executors have been killed after - // dropping the collection. - BSONObj objOut; - dropCollection(); - ASSERT_EQUALS(PlanExecutor::DEAD, innerExec->getNext(&objOut, NULL)); - ASSERT_EQUALS(PlanExecutor::DEAD, outerExec->getNext(&objOut, NULL)); + BSONObj objOut; + ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL)); + ASSERT_EQUALS(2, objOut["a"].numberInt()); - deregisterExec(outerExec.get()); - } - }; - - class SnapshotBase : public PlanExecutorBase { - protected: - void setupCollection() { - insert(BSON("_id" << 1 << "a" << 1)); - insert(BSON("_id" << 2 << "a" << 2 << "payload" << "x")); - insert(BSON("_id" << 3 << "a" << 3)); - insert(BSON("_id" << 4 << "a" << 4)); - } + forceDocumentMove(); - /** - * Increases a document's size dramatically such that the document - * exceeds the available padding and must be moved to the end of - * the collection. - */ - void forceDocumentMove() { - BSONObj query = BSON("_id" << 2); - BSONObj updateSpec = BSON("$set" << BSON("payload" << payload8k())); - update(query, updateSpec); - } + // Since this time we're scanning the _id index, + // we should not see the moved document again. + int ids[] = {3, 4}; + checkIds(ids, exec.get()); + } +}; - std::string payload8k() { - return std::string(8*1024, 'x'); - } +namespace ClientCursor { - /** - * Given an array of ints, 'expectedIds', and a PlanExecutor, - * 'exec', uses the executor to iterate through the collection. While - * iterating, asserts that the _id of each successive document equals - * the respective integer in 'expectedIds'. - */ - void checkIds(int* expectedIds, PlanExecutor* exec) { - BSONObj objOut; - int idcount = 0; - while (PlanExecutor::ADVANCED == exec->getNext(&objOut, NULL)) { - ASSERT_EQUALS(expectedIds[idcount], objOut["_id"].numberInt()); - ++idcount; - } - } - }; +using mongo::ClientCursor; - /** - * Create a scenario in which the same document is returned - * twice due to a concurrent document move and collection - * scan. - */ - class SnapshotControl : public SnapshotBase { - public: - void run() { - OldClientWriteContext ctx(&_txn, ns()); - setupCollection(); +/** + * Test invalidation of ClientCursor. + */ +class Invalidate : public PlanExecutorBase { +public: + void run() { + OldClientWriteContext ctx(&_txn, ns()); + insert(BSON("a" << 1 << "b" << 1)); - BSONObj filterObj = fromjson("{a: {$gte: 2}}"); + BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}"); - Collection* coll = ctx.getCollection(); - unique_ptr<PlanExecutor> exec(makeCollScanExec(coll, filterObj)); + Collection* coll = ctx.getCollection(); + PlanExecutor* exec = makeCollScanExec(coll, filterObj); - BSONObj objOut; - ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL)); - ASSERT_EQUALS(2, objOut["a"].numberInt()); + // Make a client cursor from the runner. + new ClientCursor(coll->getCursorManager(), exec, ns(), 0, BSONObj()); - forceDocumentMove(); + // There should be one cursor before invalidation, + // and zero cursors after invalidation. + ASSERT_EQUALS(1U, numCursors()); + coll->getCursorManager()->invalidateAll(false, "Invalidate Test"); + ASSERT_EQUALS(0U, numCursors()); + } +}; - int ids[] = {3, 4, 2}; - checkIds(ids, exec.get()); - } - }; +/** + * Test that pinned client cursors persist even after + * invalidation. + */ +class InvalidatePinned : public PlanExecutorBase { +public: + void run() { + OldClientWriteContext ctx(&_txn, ns()); + insert(BSON("a" << 1 << "b" << 1)); + + Collection* collection = ctx.getCollection(); + + BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}"); + PlanExecutor* exec = makeCollScanExec(collection, filterObj); + + // Make a client cursor from the runner. + ClientCursor* cc = + new ClientCursor(collection->getCursorManager(), exec, ns(), 0, BSONObj()); + ClientCursorPin ccPin(collection->getCursorManager(), cc->cursorid()); + + // If the cursor is pinned, it sticks around, + // even after invalidation. + ASSERT_EQUALS(1U, numCursors()); + const std::string invalidateReason("InvalidatePinned Test"); + collection->getCursorManager()->invalidateAll(false, invalidateReason); + ASSERT_EQUALS(1U, numCursors()); + + // The invalidation should have killed the runner. + BSONObj objOut; + ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL)); + ASSERT(WorkingSetCommon::isValidStatusMemberObject(objOut)); + const Status status = WorkingSetCommon::getMemberObjectStatus(objOut); + ASSERT(status.reason().find(invalidateReason) != string::npos); + + // Deleting the underlying cursor should cause the + // number of cursors to return to 0. + ccPin.deleteUnderlying(); + ASSERT_EQUALS(0U, numCursors()); + } +}; - /** - * A snapshot is really just a hint that means scan the _id index. - * Make sure that we do not see the document move with an _id - * index scan. - */ - class SnapshotTest : public SnapshotBase { - public: - void run() { +/** + * Test that client cursors time out and get + * deleted. + */ +class Timeout : public PlanExecutorBase { +public: + void run() { + { OldClientWriteContext ctx(&_txn, ns()); - setupCollection(); - BSONObj indexSpec = BSON("_id" << 1); - addIndex(indexSpec); - - BSONObj filterObj = fromjson("{a: {$gte: 2}}"); - unique_ptr<PlanExecutor> exec(makeIndexScanExec(ctx.db(), indexSpec, 2, 5)); + insert(BSON("a" << 1 << "b" << 1)); + } - BSONObj objOut; - ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL)); - ASSERT_EQUALS(2, objOut["a"].numberInt()); + { + AutoGetCollectionForRead ctx(&_txn, ns()); + Collection* collection = ctx.getCollection(); - forceDocumentMove(); + BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}"); + PlanExecutor* exec = makeCollScanExec(collection, filterObj); - // Since this time we're scanning the _id index, - // we should not see the moved document again. - int ids[] = {3, 4}; - checkIds(ids, exec.get()); - } - }; - - namespace ClientCursor { - - using mongo::ClientCursor; - - /** - * Test invalidation of ClientCursor. - */ - class Invalidate : public PlanExecutorBase { - public: - void run() { - OldClientWriteContext ctx(&_txn, ns()); - insert(BSON("a" << 1 << "b" << 1)); - - BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}"); - - Collection* coll = ctx.getCollection(); - PlanExecutor* exec = makeCollScanExec(coll,filterObj); - - // Make a client cursor from the runner. - new ClientCursor(coll->getCursorManager(), exec, ns(), 0, BSONObj()); - - // There should be one cursor before invalidation, - // and zero cursors after invalidation. - ASSERT_EQUALS(1U, numCursors()); - coll->getCursorManager()->invalidateAll(false, "Invalidate Test"); - ASSERT_EQUALS(0U, numCursors()); - } - }; - - /** - * Test that pinned client cursors persist even after - * invalidation. - */ - class InvalidatePinned : public PlanExecutorBase { - public: - void run() { - OldClientWriteContext ctx(&_txn, ns()); - insert(BSON("a" << 1 << "b" << 1)); - - Collection* collection = ctx.getCollection(); - - BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}"); - PlanExecutor* exec = makeCollScanExec(collection, filterObj); - - // Make a client cursor from the runner. - ClientCursor* cc = new ClientCursor(collection->getCursorManager(), - exec, - ns(), - 0, - BSONObj()); - ClientCursorPin ccPin(collection->getCursorManager(), cc->cursorid()); - - // If the cursor is pinned, it sticks around, - // even after invalidation. - ASSERT_EQUALS(1U, numCursors()); - const std::string invalidateReason("InvalidatePinned Test"); - collection->getCursorManager()->invalidateAll(false, invalidateReason); - ASSERT_EQUALS(1U, numCursors()); - - // The invalidation should have killed the runner. - BSONObj objOut; - ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL)); - ASSERT(WorkingSetCommon::isValidStatusMemberObject(objOut)); - const Status status = WorkingSetCommon::getMemberObjectStatus(objOut); - ASSERT(status.reason().find(invalidateReason) != string::npos); - - // Deleting the underlying cursor should cause the - // number of cursors to return to 0. - ccPin.deleteUnderlying(); - ASSERT_EQUALS(0U, numCursors()); - } - }; - - /** - * Test that client cursors time out and get - * deleted. - */ - class Timeout : public PlanExecutorBase { - public: - void run() { - { - OldClientWriteContext ctx(&_txn, ns()); - insert(BSON("a" << 1 << "b" << 1)); - } - - { - AutoGetCollectionForRead ctx(&_txn, ns()); - Collection* collection = ctx.getCollection(); - - BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}"); - PlanExecutor* exec = makeCollScanExec(collection, filterObj); - - // Make a client cursor from the runner. - new ClientCursor(collection->getCursorManager(), exec, ns(), 0, BSONObj()); - } - - // There should be one cursor before timeout, - // and zero cursors after timeout. - ASSERT_EQUALS(1U, numCursors()); - CursorManager::timeoutCursorsGlobal(&_txn, 600001); - ASSERT_EQUALS(0U, numCursors()); - } - }; - - } // namespace ClientCursor - - class All : public Suite { - public: - All() : Suite( "query_plan_executor" ) { } - - void setupTests() { - add<DropCollScan>(); - add<DropIndexScan>(); - add<DropIndexScanAgg>(); - add<SnapshotControl>(); - add<SnapshotTest>(); - add<ClientCursor::Invalidate>(); - add<ClientCursor::InvalidatePinned>(); - add<ClientCursor::Timeout>(); + // Make a client cursor from the runner. + new ClientCursor(collection->getCursorManager(), exec, ns(), 0, BSONObj()); } - }; - SuiteInstance<All> queryPlanExecutorAll; + // There should be one cursor before timeout, + // and zero cursors after timeout. + ASSERT_EQUALS(1U, numCursors()); + CursorManager::timeoutCursorsGlobal(&_txn, 600001); + ASSERT_EQUALS(0U, numCursors()); + } +}; + +} // namespace ClientCursor + +class All : public Suite { +public: + All() : Suite("query_plan_executor") {} + + void setupTests() { + add<DropCollScan>(); + add<DropIndexScan>(); + add<DropIndexScanAgg>(); + add<SnapshotControl>(); + add<SnapshotTest>(); + add<ClientCursor::Invalidate>(); + add<ClientCursor::InvalidatePinned>(); + add<ClientCursor::Timeout>(); + } +}; + +SuiteInstance<All> queryPlanExecutorAll; } // namespace QueryPlanExecutor |