diff options
Diffstat (limited to 'src/mongo/dbtests/query_stage_count.cpp')
-rw-r--r-- | src/mongo/dbtests/query_stage_count.cpp | 575 |
1 files changed, 285 insertions, 290 deletions
diff --git a/src/mongo/dbtests/query_stage_count.cpp b/src/mongo/dbtests/query_stage_count.cpp index 7c326b15c11..652747bc482 100644 --- a/src/mongo/dbtests/query_stage_count.cpp +++ b/src/mongo/dbtests/query_stage_count.cpp @@ -42,319 +42,314 @@ namespace QueryStageCount { - using std::unique_ptr; - using std::vector; +using std::unique_ptr; +using std::vector; - const int kDocuments = 100; - const int kInterjections = kDocuments; +const int kDocuments = 100; +const int kInterjections = kDocuments; - class CountStageTest { - public: - CountStageTest() - : _txn(), - _scopedXact(&_txn, MODE_IX), - _dbLock(_txn.lockState(), nsToDatabaseSubstring(ns()), MODE_X), - _ctx(&_txn, ns()), - _coll(NULL) { +class CountStageTest { +public: + CountStageTest() + : _txn(), + _scopedXact(&_txn, MODE_IX), + _dbLock(_txn.lockState(), nsToDatabaseSubstring(ns()), MODE_X), + _ctx(&_txn, ns()), + _coll(NULL) {} - } - - virtual ~CountStageTest() {} - - virtual void interject(CountStage&, int) {} - - virtual void setup() { - WriteUnitOfWork wunit(&_txn); - - _ctx.db()->dropCollection(&_txn, ns()); - _coll = _ctx.db()->createCollection(&_txn, ns()); - - _coll->getIndexCatalog()->createIndexOnEmptyCollection( - &_txn, - BSON( - "key" << BSON("x" << 1) << - "name" << "x_1" << - "ns" << ns() << - "v" << 1 - ) - ); - - for (int i=0; i<kDocuments; i++) { - insert(BSON(GENOID << "x" << i)); - } - - wunit.commit(); - } - - void getLocs() { - _locs.clear(); - WorkingSet ws; - - CollectionScanParams params; - params.collection = _coll; - params.direction = CollectionScanParams::FORWARD; - params.tailable = false; - - unique_ptr<CollectionScan> scan(new CollectionScan(&_txn, params, &ws, NULL)); - while (!scan->isEOF()) { - WorkingSetID id = WorkingSet::INVALID_ID; - PlanStage::StageState state = scan->work(&id); - if (PlanStage::ADVANCED == state) { - WorkingSetMember* member = ws.get(id); - verify(member->hasLoc()); - _locs.push_back(member->loc); - } - } - } + virtual ~CountStageTest() {} - void insert(const BSONObj& doc) { - WriteUnitOfWork wunit(&_txn); - _coll->insertDocument(&_txn, doc, false); - wunit.commit(); - } + virtual void interject(CountStage&, int) {} - void remove(const RecordId& loc) { - WriteUnitOfWork wunit(&_txn); - _coll->deleteDocument(&_txn, loc, false, false, NULL); - wunit.commit(); - } + virtual void setup() { + WriteUnitOfWork wunit(&_txn); - void update(const RecordId& oldLoc, const BSONObj& newDoc) { - WriteUnitOfWork wunit(&_txn); - BSONObj oldDoc = _coll->getRecordStore()->dataFor( &_txn, oldLoc ).releaseToBson(); - oplogUpdateEntryArgs args; - _coll->updateDocument(&_txn, - oldLoc, - Snapshotted<BSONObj>(_txn.recoveryUnit()->getSnapshotId(), - oldDoc), - newDoc, - false, - true, - NULL, - args); - wunit.commit(); - } + _ctx.db()->dropCollection(&_txn, ns()); + _coll = _ctx.db()->createCollection(&_txn, ns()); - // testcount is a wrapper around runCount that - // - sets up a countStage - // - runs it - // - asserts count is not trivial - // - asserts nCounted is equal to expected_n - // - asserts nSkipped is correct - void testCount(const CountRequest& request, int expected_n=kDocuments, bool indexed=false) { - setup(); - getLocs(); - - unique_ptr<WorkingSet> ws(new WorkingSet); - - StatusWithMatchExpression swme = MatchExpressionParser::parse(request.getQuery()); - unique_ptr<MatchExpression> expression(swme.getValue()); - - PlanStage* scan; - if (indexed) { - scan = createIndexScan(expression.get(), ws.get()); - } else { - scan = createCollScan(expression.get(), ws.get()); - } + _coll->getIndexCatalog()->createIndexOnEmptyCollection(&_txn, + BSON("key" + << BSON("x" << 1) << "name" + << "x_1" + << "ns" << ns() << "v" << 1)); - CountStage countStage(&_txn, _coll, request, ws.get(), scan); - - const CountStats* stats = runCount(countStage); - - ASSERT_FALSE(stats->trivialCount); - ASSERT_EQUALS(stats->nCounted, expected_n); - ASSERT_EQUALS(stats->nSkipped, request.getSkip()); + for (int i = 0; i < kDocuments; i++) { + insert(BSON(GENOID << "x" << i)); } - // Performs a test using a count stage whereby each unit of work is interjected - // in some way by the invocation of interject(). - const CountStats* runCount(CountStage& count_stage) { - int interjection = 0; - WorkingSetID wsid; - - while (!count_stage.isEOF()) { - // do some work -- assumes that one work unit counts a single doc - PlanStage::StageState state = count_stage.work(&wsid); - ASSERT_NOT_EQUALS(state, PlanStage::FAILURE); - ASSERT_NOT_EQUALS(state, PlanStage::DEAD); - - // prepare for yield - count_stage.saveState(); - - // interject in some way kInterjection times - if (interjection < kInterjections) { - interject(count_stage, interjection++); - } - - // resume from yield - count_stage.restoreState(&_txn); + wunit.commit(); + } + + void getLocs() { + _locs.clear(); + WorkingSet ws; + + CollectionScanParams params; + params.collection = _coll; + params.direction = CollectionScanParams::FORWARD; + params.tailable = false; + + unique_ptr<CollectionScan> scan(new CollectionScan(&_txn, params, &ws, NULL)); + while (!scan->isEOF()) { + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState state = scan->work(&id); + if (PlanStage::ADVANCED == state) { + WorkingSetMember* member = ws.get(id); + verify(member->hasLoc()); + _locs.push_back(member->loc); } - - return static_cast<const CountStats*>(count_stage.getSpecificStats()); - } - - IndexScan* createIndexScan(MatchExpression* expr, WorkingSet* ws) { - IndexCatalog* catalog = _coll->getIndexCatalog(); - IndexDescriptor* descriptor = catalog->findIndexByKeyPattern(&_txn, BSON("x" << 1)); - invariant(descriptor); - - // We are not testing indexing here so use maximal bounds - IndexScanParams params; - params.descriptor = descriptor; - params.bounds.isSimpleRange = true; - params.bounds.startKey = BSON("" << 0); - params.bounds.endKey = BSON("" << kDocuments+1); - params.bounds.endKeyInclusive = true; - params.direction = 1; - - // This child stage gets owned and freed by its parent CountStage - return new IndexScan(&_txn, params, ws, expr); } - - CollectionScan* createCollScan(MatchExpression* expr, WorkingSet* ws) { - CollectionScanParams params; - params.collection = _coll; - - // This child stage gets owned and freed by its parent CountStage - return new CollectionScan(&_txn, params, ws, expr); - } - - static const char* ns() { return "unittest.QueryStageCount"; } - - protected: - vector<RecordId> _locs; - OperationContextImpl _txn; - ScopedTransaction _scopedXact; - Lock::DBLock _dbLock; - OldClientContext _ctx; - Collection* _coll; - }; - - class QueryStageCountNoChangeDuringYield : public CountStageTest { - public: - void run() { - CountRequest request(ns(), BSON("x" << LT << kDocuments / 2)); - - testCount(request, kDocuments/2); - testCount(request, kDocuments/2, true); - } - }; - - class QueryStageCountYieldWithSkip : public CountStageTest { - public: - void run() { - CountRequest request(ns(), BSON("x" << GTE << 0)); - request.setSkip(2); - - testCount(request, kDocuments-2); - testCount(request, kDocuments-2, true); - } - }; - - class QueryStageCountYieldWithLimit : public CountStageTest { - public: - void run() { - CountRequest request(ns(), BSON("x" << GTE << 0)); - request.setSkip(0); - request.setLimit(2); - - testCount(request, 2); - testCount(request, 2, true); + } + + void insert(const BSONObj& doc) { + WriteUnitOfWork wunit(&_txn); + _coll->insertDocument(&_txn, doc, false); + wunit.commit(); + } + + void remove(const RecordId& loc) { + WriteUnitOfWork wunit(&_txn); + _coll->deleteDocument(&_txn, loc, false, false, NULL); + wunit.commit(); + } + + void update(const RecordId& oldLoc, const BSONObj& newDoc) { + WriteUnitOfWork wunit(&_txn); + BSONObj oldDoc = _coll->getRecordStore()->dataFor(&_txn, oldLoc).releaseToBson(); + oplogUpdateEntryArgs args; + _coll->updateDocument(&_txn, + oldLoc, + Snapshotted<BSONObj>(_txn.recoveryUnit()->getSnapshotId(), oldDoc), + newDoc, + false, + true, + NULL, + args); + wunit.commit(); + } + + // testcount is a wrapper around runCount that + // - sets up a countStage + // - runs it + // - asserts count is not trivial + // - asserts nCounted is equal to expected_n + // - asserts nSkipped is correct + void testCount(const CountRequest& request, int expected_n = kDocuments, bool indexed = false) { + setup(); + getLocs(); + + unique_ptr<WorkingSet> ws(new WorkingSet); + + StatusWithMatchExpression swme = MatchExpressionParser::parse(request.getQuery()); + unique_ptr<MatchExpression> expression(swme.getValue()); + + PlanStage* scan; + if (indexed) { + scan = createIndexScan(expression.get(), ws.get()); + } else { + scan = createCollScan(expression.get(), ws.get()); } - }; + CountStage countStage(&_txn, _coll, request, ws.get(), scan); - class QueryStageCountInsertDuringYield : public CountStageTest { - public: - void run() { - CountRequest request(ns(), BSON("x" << 1)); + const CountStats* stats = runCount(countStage); - testCount(request, kInterjections+1); - testCount(request, kInterjections+1, true); - } - - // This is called 100 times as we scan the collection - void interject(CountStage&, int) { - insert(BSON(GENOID << "x" << 1)); - } - }; + ASSERT_FALSE(stats->trivialCount); + ASSERT_EQUALS(stats->nCounted, expected_n); + ASSERT_EQUALS(stats->nSkipped, request.getSkip()); + } - class QueryStageCountDeleteDuringYield : public CountStageTest { - public: - void run() { - // expected count would be 99 but we delete the second record - // after doing the first unit of work - CountRequest request(ns(), BSON("x" << GTE << 1)); + // Performs a test using a count stage whereby each unit of work is interjected + // in some way by the invocation of interject(). + const CountStats* runCount(CountStage& count_stage) { + int interjection = 0; + WorkingSetID wsid; - testCount(request, kDocuments-2); - testCount(request, kDocuments-2, true); - } + while (!count_stage.isEOF()) { + // do some work -- assumes that one work unit counts a single doc + PlanStage::StageState state = count_stage.work(&wsid); + ASSERT_NOT_EQUALS(state, PlanStage::FAILURE); + ASSERT_NOT_EQUALS(state, PlanStage::DEAD); - // At the point which this is called we are in between counting the first + second record - void interject(CountStage& count_stage, int interjection) { - if (interjection == 0) { - // At this point, our first interjection, we've counted _locs[0] - // and are about to count _locs[1] - count_stage.invalidate(&_txn, _locs[interjection], INVALIDATION_DELETION); - remove(_locs[interjection]); + // prepare for yield + count_stage.saveState(); - count_stage.invalidate(&_txn, _locs[interjection+1], INVALIDATION_DELETION); - remove(_locs[interjection+1]); + // interject in some way kInterjection times + if (interjection < kInterjections) { + interject(count_stage, interjection++); } - } - }; - - class QueryStageCountUpdateDuringYield : public CountStageTest { - public: - void run() { - // expected count would be kDocuments-2 but we update the first and second records - // after doing the first unit of work so they wind up getting counted later on - CountRequest request(ns(), BSON("x" << GTE << 2)); - testCount(request, kDocuments); - testCount(request, kDocuments, true); + // resume from yield + count_stage.restoreState(&_txn); } - // At the point which this is called we are in between the first and second record - void interject(CountStage& count_stage, int interjection) { - if (interjection == 0) { - count_stage.invalidate(&_txn, _locs[0], INVALIDATION_MUTATION); - OID id1 = _coll->docFor(&_txn, _locs[0]).value().getField("_id").OID(); - update(_locs[0], BSON("_id" << id1 << "x" << 100)); - - count_stage.invalidate(&_txn, _locs[1], INVALIDATION_MUTATION); - OID id2 = _coll->docFor(&_txn, _locs[1]).value().getField("_id").OID(); - update(_locs[1], BSON("_id" << id2 << "x" << 100)); - } - } - }; - - class QueryStageCountMultiKeyDuringYield : public CountStageTest { - public: - void run() { - CountRequest request(ns(), BSON("x" << 1)); - testCount(request, kDocuments+1, true); // only applies to indexed case + return static_cast<const CountStats*>(count_stage.getSpecificStats()); + } + + IndexScan* createIndexScan(MatchExpression* expr, WorkingSet* ws) { + IndexCatalog* catalog = _coll->getIndexCatalog(); + IndexDescriptor* descriptor = catalog->findIndexByKeyPattern(&_txn, BSON("x" << 1)); + invariant(descriptor); + + // We are not testing indexing here so use maximal bounds + IndexScanParams params; + params.descriptor = descriptor; + params.bounds.isSimpleRange = true; + params.bounds.startKey = BSON("" << 0); + params.bounds.endKey = BSON("" << kDocuments + 1); + params.bounds.endKeyInclusive = true; + params.direction = 1; + + // This child stage gets owned and freed by its parent CountStage + return new IndexScan(&_txn, params, ws, expr); + } + + CollectionScan* createCollScan(MatchExpression* expr, WorkingSet* ws) { + CollectionScanParams params; + params.collection = _coll; + + // This child stage gets owned and freed by its parent CountStage + return new CollectionScan(&_txn, params, ws, expr); + } + + static const char* ns() { + return "unittest.QueryStageCount"; + } + +protected: + vector<RecordId> _locs; + OperationContextImpl _txn; + ScopedTransaction _scopedXact; + Lock::DBLock _dbLock; + OldClientContext _ctx; + Collection* _coll; +}; + +class QueryStageCountNoChangeDuringYield : public CountStageTest { +public: + void run() { + CountRequest request(ns(), BSON("x" << LT << kDocuments / 2)); + + testCount(request, kDocuments / 2); + testCount(request, kDocuments / 2, true); + } +}; + +class QueryStageCountYieldWithSkip : public CountStageTest { +public: + void run() { + CountRequest request(ns(), BSON("x" << GTE << 0)); + request.setSkip(2); + + testCount(request, kDocuments - 2); + testCount(request, kDocuments - 2, true); + } +}; + +class QueryStageCountYieldWithLimit : public CountStageTest { +public: + void run() { + CountRequest request(ns(), BSON("x" << GTE << 0)); + request.setSkip(0); + request.setLimit(2); + + testCount(request, 2); + testCount(request, 2, true); + } +}; + + +class QueryStageCountInsertDuringYield : public CountStageTest { +public: + void run() { + CountRequest request(ns(), BSON("x" << 1)); + + testCount(request, kInterjections + 1); + testCount(request, kInterjections + 1, true); + } + + // This is called 100 times as we scan the collection + void interject(CountStage&, int) { + insert(BSON(GENOID << "x" << 1)); + } +}; + +class QueryStageCountDeleteDuringYield : public CountStageTest { +public: + void run() { + // expected count would be 99 but we delete the second record + // after doing the first unit of work + CountRequest request(ns(), BSON("x" << GTE << 1)); + + testCount(request, kDocuments - 2); + testCount(request, kDocuments - 2, true); + } + + // At the point which this is called we are in between counting the first + second record + void interject(CountStage& count_stage, int interjection) { + if (interjection == 0) { + // At this point, our first interjection, we've counted _locs[0] + // and are about to count _locs[1] + count_stage.invalidate(&_txn, _locs[interjection], INVALIDATION_DELETION); + remove(_locs[interjection]); + + count_stage.invalidate(&_txn, _locs[interjection + 1], INVALIDATION_DELETION); + remove(_locs[interjection + 1]); } - - void interject(CountStage&, int) { - // Should cause index to be converted to multikey - insert(BSON(GENOID << "x" << BSON_ARRAY(1 << 2))); - } - }; - - class All : public Suite { - public: - All() : Suite("query_stage_count") {} - - void setupTests() { - add<QueryStageCountNoChangeDuringYield>(); - add<QueryStageCountYieldWithSkip>(); - add<QueryStageCountYieldWithLimit>(); - add<QueryStageCountInsertDuringYield>(); - add<QueryStageCountDeleteDuringYield>(); - add<QueryStageCountUpdateDuringYield>(); - add<QueryStageCountMultiKeyDuringYield>(); + } +}; + +class QueryStageCountUpdateDuringYield : public CountStageTest { +public: + void run() { + // expected count would be kDocuments-2 but we update the first and second records + // after doing the first unit of work so they wind up getting counted later on + CountRequest request(ns(), BSON("x" << GTE << 2)); + + testCount(request, kDocuments); + testCount(request, kDocuments, true); + } + + // At the point which this is called we are in between the first and second record + void interject(CountStage& count_stage, int interjection) { + if (interjection == 0) { + count_stage.invalidate(&_txn, _locs[0], INVALIDATION_MUTATION); + OID id1 = _coll->docFor(&_txn, _locs[0]).value().getField("_id").OID(); + update(_locs[0], BSON("_id" << id1 << "x" << 100)); + + count_stage.invalidate(&_txn, _locs[1], INVALIDATION_MUTATION); + OID id2 = _coll->docFor(&_txn, _locs[1]).value().getField("_id").OID(); + update(_locs[1], BSON("_id" << id2 << "x" << 100)); } - } QueryStageCountAll; - -} // namespace QueryStageCount + } +}; + +class QueryStageCountMultiKeyDuringYield : public CountStageTest { +public: + void run() { + CountRequest request(ns(), BSON("x" << 1)); + testCount(request, kDocuments + 1, true); // only applies to indexed case + } + + void interject(CountStage&, int) { + // Should cause index to be converted to multikey + insert(BSON(GENOID << "x" << BSON_ARRAY(1 << 2))); + } +}; + +class All : public Suite { +public: + All() : Suite("query_stage_count") {} + + void setupTests() { + add<QueryStageCountNoChangeDuringYield>(); + add<QueryStageCountYieldWithSkip>(); + add<QueryStageCountYieldWithLimit>(); + add<QueryStageCountInsertDuringYield>(); + add<QueryStageCountDeleteDuringYield>(); + add<QueryStageCountUpdateDuringYield>(); + add<QueryStageCountMultiKeyDuringYield>(); + } +} QueryStageCountAll; + +} // namespace QueryStageCount |