diff options
Diffstat (limited to 'src/mongo/dbtests/query_stage_sort.cpp')
-rw-r--r-- | src/mongo/dbtests/query_stage_sort.cpp | 912 |
1 files changed, 461 insertions, 451 deletions
diff --git a/src/mongo/dbtests/query_stage_sort.cpp b/src/mongo/dbtests/query_stage_sort.cpp index eac35313223..c2b30c1d084 100644 --- a/src/mongo/dbtests/query_stage_sort.cpp +++ b/src/mongo/dbtests/query_stage_sort.cpp @@ -46,520 +46,530 @@ namespace QueryStageSortTests { - using std::unique_ptr; - using std::set; +using std::unique_ptr; +using std::set; - class QueryStageSortTestBase { - public: - QueryStageSortTestBase() : _client(&_txn) { - - } +class QueryStageSortTestBase { +public: + QueryStageSortTestBase() : _client(&_txn) {} - void fillData() { - for (int i = 0; i < numObj(); ++i) { - insert(BSON("foo" << i)); - } + void fillData() { + for (int i = 0; i < numObj(); ++i) { + insert(BSON("foo" << i)); } + } - virtual ~QueryStageSortTestBase() { - _client.dropCollection(ns()); - } + virtual ~QueryStageSortTestBase() { + _client.dropCollection(ns()); + } - void insert(const BSONObj& obj) { - _client.insert(ns(), obj); - } + void insert(const BSONObj& obj) { + _client.insert(ns(), obj); + } - void getLocs(set<RecordId>* out, Collection* coll) { - auto cursor = coll->getCursor(&_txn); - while (auto record = cursor->next()) { - out->insert(record->id); - } + void getLocs(set<RecordId>* out, Collection* coll) { + auto cursor = coll->getCursor(&_txn); + while (auto record = cursor->next()) { + out->insert(record->id); } - - /** - * We feed a mix of (key, unowned, owned) data to the sort stage. - */ - void insertVarietyOfObjects(QueuedDataStage* ms, Collection* coll) { - set<RecordId> locs; - getLocs(&locs, coll); - - set<RecordId>::iterator it = locs.begin(); - - for (int i = 0; i < numObj(); ++i, ++it) { - ASSERT_FALSE(it == locs.end()); - - // Insert some owned obj data. - WorkingSetMember member; - member.loc = *it; - member.state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; - member.obj = coll->docFor(&_txn, *it); - ms->pushBack(member); - } + } + + /** + * We feed a mix of (key, unowned, owned) data to the sort stage. + */ + void insertVarietyOfObjects(QueuedDataStage* ms, Collection* coll) { + set<RecordId> locs; + getLocs(&locs, coll); + + set<RecordId>::iterator it = locs.begin(); + + for (int i = 0; i < numObj(); ++i, ++it) { + ASSERT_FALSE(it == locs.end()); + + // Insert some owned obj data. + WorkingSetMember member; + member.loc = *it; + member.state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; + member.obj = coll->docFor(&_txn, *it); + ms->pushBack(member); } - - /* - * Wraps a sort stage with a QueuedDataStage in a plan executor. Returns the plan executor, - * which is owned by the caller. - */ - PlanExecutor* makePlanExecutorWithSortStage(Collection* coll) { - PlanExecutor* exec; - // Build the mock scan stage which feeds the data. - std::unique_ptr<WorkingSet> ws(new WorkingSet()); - unique_ptr<QueuedDataStage> ms(new QueuedDataStage(ws.get())); - insertVarietyOfObjects(ms.get(), coll); - - SortStageParams params; - params.collection = coll; - params.pattern = BSON("foo" << 1); - params.limit = limit(); - unique_ptr<SortStage> ss(new SortStage(params, ws.get(), ms.release())); - - // The PlanExecutor will be automatically registered on construction due to the auto - // yield policy, so it can receive invalidations when we remove documents later. - Status execStatus = PlanExecutor::make(&_txn, - ws.release(), - ss.release(), - coll, - PlanExecutor::YIELD_AUTO, - &exec); - invariant(execStatus.isOK()); - return exec; + } + + /* + * Wraps a sort stage with a QueuedDataStage in a plan executor. Returns the plan executor, + * which is owned by the caller. + */ + PlanExecutor* makePlanExecutorWithSortStage(Collection* coll) { + PlanExecutor* exec; + // Build the mock scan stage which feeds the data. + std::unique_ptr<WorkingSet> ws(new WorkingSet()); + unique_ptr<QueuedDataStage> ms(new QueuedDataStage(ws.get())); + insertVarietyOfObjects(ms.get(), coll); + + SortStageParams params; + params.collection = coll; + params.pattern = BSON("foo" << 1); + params.limit = limit(); + unique_ptr<SortStage> ss(new SortStage(params, ws.get(), ms.release())); + + // The PlanExecutor will be automatically registered on construction due to the auto + // yield policy, so it can receive invalidations when we remove documents later. + Status execStatus = PlanExecutor::make( + &_txn, ws.release(), ss.release(), coll, PlanExecutor::YIELD_AUTO, &exec); + invariant(execStatus.isOK()); + return exec; + } + + // Return a value in the set {-1, 0, 1} to represent the sign of parameter i. Used to + // normalize woCompare calls. + int sgn(int i) { + if (i == 0) + return 0; + return i > 0 ? 1 : -1; + } + + /** + * A template used by many tests below. + * Fill out numObj objects, sort them in the order provided by 'direction'. + * If extAllowed is true, sorting will use use external sorting if available. + * If limit is not zero, we limit the output of the sort stage to 'limit' results. + */ + void sortAndCheck(int direction, Collection* coll) { + WorkingSet* ws = new WorkingSet(); + QueuedDataStage* ms = new QueuedDataStage(ws); + + // Insert a mix of the various types of data. + insertVarietyOfObjects(ms, coll); + + SortStageParams params; + params.collection = coll; + params.pattern = BSON("foo" << direction); + params.limit = limit(); + + // Must fetch so we can look at the doc as a BSONObj. + PlanExecutor* rawExec; + Status status = + PlanExecutor::make(&_txn, + ws, + new FetchStage(&_txn, ws, new SortStage(params, ws, ms), NULL, coll), + coll, + PlanExecutor::YIELD_MANUAL, + &rawExec); + ASSERT_OK(status); + std::unique_ptr<PlanExecutor> exec(rawExec); + + // Look at pairs of objects to make sure that the sort order is pairwise (and therefore + // totally) correct. + BSONObj last; + ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&last, NULL)); + + // Count 'last'. + int count = 1; + + BSONObj current; + while (PlanExecutor::ADVANCED == exec->getNext(¤t, NULL)) { + int cmp = sgn(current.woSortOrder(last, params.pattern)); + // The next object should be equal to the previous or oriented according to the sort + // pattern. + ASSERT(cmp == 0 || cmp == 1); + ++count; + last = current; } - // Return a value in the set {-1, 0, 1} to represent the sign of parameter i. Used to - // normalize woCompare calls. - int sgn(int i) { - if (i == 0) - return 0; - return i > 0 ? 1 : -1; - } + checkCount(count); + } - /** - * A template used by many tests below. - * Fill out numObj objects, sort them in the order provided by 'direction'. - * If extAllowed is true, sorting will use use external sorting if available. - * If limit is not zero, we limit the output of the sort stage to 'limit' results. - */ - void sortAndCheck(int direction, Collection* coll) { - WorkingSet* ws = new WorkingSet(); - QueuedDataStage* ms = new QueuedDataStage(ws); - - // Insert a mix of the various types of data. - insertVarietyOfObjects(ms, coll); - - SortStageParams params; - params.collection = coll; - params.pattern = BSON("foo" << direction); - params.limit = limit(); - - // Must fetch so we can look at the doc as a BSONObj. - PlanExecutor* rawExec; - Status status = - PlanExecutor::make(&_txn, - ws, - new FetchStage(&_txn, ws, - new SortStage(params, ws, ms), NULL, coll), - coll, PlanExecutor::YIELD_MANUAL, &rawExec); - ASSERT_OK(status); - std::unique_ptr<PlanExecutor> exec(rawExec); - - // Look at pairs of objects to make sure that the sort order is pairwise (and therefore - // totally) correct. - BSONObj last; - ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&last, NULL)); - - // Count 'last'. - int count = 1; - - BSONObj current; - while (PlanExecutor::ADVANCED == exec->getNext(¤t, NULL)) { - int cmp = sgn(current.woSortOrder(last, params.pattern)); - // The next object should be equal to the previous or oriented according to the sort - // pattern. - ASSERT(cmp == 0 || cmp == 1); - ++count; - last = current; - } - - checkCount(count); - } - - /** - * Check number of results returned from sort. - */ - void checkCount(int count) { - // No limit, should get all objects back. - // Otherwise, result set should be smaller of limit and input data size. - if (limit() > 0 && limit() < numObj()) { - ASSERT_EQUALS(limit(), count); - } - else { - ASSERT_EQUALS(numObj(), count); - } + /** + * Check number of results returned from sort. + */ + void checkCount(int count) { + // No limit, should get all objects back. + // Otherwise, result set should be smaller of limit and input data size. + if (limit() > 0 && limit() < numObj()) { + ASSERT_EQUALS(limit(), count); + } else { + ASSERT_EQUALS(numObj(), count); } + } - virtual int numObj() = 0; - - // Returns sort limit - // Leave as 0 to disable limit. - virtual int limit() const { return 0; }; - + virtual int numObj() = 0; - static const char* ns() { return "unittests.QueryStageSort"; } - - protected: - OperationContextImpl _txn; - DBDirectClient _client; + // Returns sort limit + // Leave as 0 to disable limit. + virtual int limit() const { + return 0; }; - // Sort some small # of results in increasing order. - class QueryStageSortInc: public QueryStageSortTestBase { - public: - virtual int numObj() { return 100; } - - void run() { - OldClientWriteContext ctx(&_txn, ns()); - Database* db = ctx.db(); - Collection* coll = db->getCollection(ns()); - if (!coll) { - WriteUnitOfWork wuow(&_txn); - coll = db->createCollection(&_txn, ns()); - wuow.commit(); - } + static const char* ns() { + return "unittests.QueryStageSort"; + } - fillData(); - sortAndCheck(1, coll); - } - }; +protected: + OperationContextImpl _txn; + DBDirectClient _client; +}; - // Sort some small # of results in decreasing order. - class QueryStageSortDec : public QueryStageSortTestBase { - public: - virtual int numObj() { return 100; } - void run() { - OldClientWriteContext ctx(&_txn, ns()); - Database* db = ctx.db(); - Collection* coll = db->getCollection(ns()); - if (!coll) { - WriteUnitOfWork wuow(&_txn); - coll = db->createCollection(&_txn, ns()); - wuow.commit(); - } +// Sort some small # of results in increasing order. +class QueryStageSortInc : public QueryStageSortTestBase { +public: + virtual int numObj() { + return 100; + } - fillData(); - sortAndCheck(-1, coll); + void run() { + OldClientWriteContext ctx(&_txn, ns()); + Database* db = ctx.db(); + Collection* coll = db->getCollection(ns()); + if (!coll) { + WriteUnitOfWork wuow(&_txn); + coll = db->createCollection(&_txn, ns()); + wuow.commit(); } - }; - // Sort in descreasing order with limit applied - template <int LIMIT> - class QueryStageSortDecWithLimit : public QueryStageSortDec { - public: - virtual int limit() const { - return LIMIT; + fillData(); + sortAndCheck(1, coll); + } +}; + +// Sort some small # of results in decreasing order. +class QueryStageSortDec : public QueryStageSortTestBase { +public: + virtual int numObj() { + return 100; + } + + void run() { + OldClientWriteContext ctx(&_txn, ns()); + Database* db = ctx.db(); + Collection* coll = db->getCollection(ns()); + if (!coll) { + WriteUnitOfWork wuow(&_txn); + coll = db->createCollection(&_txn, ns()); + wuow.commit(); } - }; - // Sort a big bunch of objects. - class QueryStageSortExt : public QueryStageSortTestBase { - public: - virtual int numObj() { return 10000; } + fillData(); + sortAndCheck(-1, coll); + } +}; + +// Sort in descreasing order with limit applied +template <int LIMIT> +class QueryStageSortDecWithLimit : public QueryStageSortDec { +public: + virtual int limit() const { + return LIMIT; + } +}; + +// Sort a big bunch of objects. +class QueryStageSortExt : public QueryStageSortTestBase { +public: + virtual int numObj() { + return 10000; + } + + void run() { + OldClientWriteContext ctx(&_txn, ns()); + Database* db = ctx.db(); + Collection* coll = db->getCollection(ns()); + if (!coll) { + WriteUnitOfWork wuow(&_txn); + coll = db->createCollection(&_txn, ns()); + wuow.commit(); + } - void run() { - OldClientWriteContext ctx(&_txn, ns()); - Database* db = ctx.db(); - Collection* coll = db->getCollection(ns()); - if (!coll) { - WriteUnitOfWork wuow(&_txn); - coll = db->createCollection(&_txn, ns()); - wuow.commit(); - } + fillData(); + sortAndCheck(-1, coll); + } +}; + +// Mutation invalidation of docs fed to sort. +class QueryStageSortMutationInvalidation : public QueryStageSortTestBase { +public: + virtual int numObj() { + return 2000; + } + virtual int limit() const { + return 10; + } + + void run() { + OldClientWriteContext ctx(&_txn, ns()); + Database* db = ctx.db(); + Collection* coll = db->getCollection(ns()); + if (!coll) { + WriteUnitOfWork wuow(&_txn); + coll = db->createCollection(&_txn, ns()); + wuow.commit(); + } + { + WriteUnitOfWork wuow(&_txn); fillData(); - sortAndCheck(-1, coll); + wuow.commit(); } - }; - // Mutation invalidation of docs fed to sort. - class QueryStageSortMutationInvalidation : public QueryStageSortTestBase { - public: - virtual int numObj() { return 2000; } - virtual int limit() const { return 10; } - - void run() { - OldClientWriteContext ctx(&_txn, ns()); - Database* db = ctx.db(); - Collection* coll = db->getCollection(ns()); - if (!coll) { - WriteUnitOfWork wuow(&_txn); - coll = db->createCollection(&_txn, ns()); - wuow.commit(); - } + // The data we're going to later invalidate. + set<RecordId> locs; + getLocs(&locs, coll); - { - WriteUnitOfWork wuow(&_txn); - fillData(); - wuow.commit(); - } + std::unique_ptr<PlanExecutor> exec(makePlanExecutorWithSortStage(coll)); + SortStage* ss = static_cast<SortStage*>(exec->getRootStage()); + QueuedDataStage* ms = static_cast<QueuedDataStage*>(ss->getChildren()[0]); - // The data we're going to later invalidate. - set<RecordId> locs; - getLocs(&locs, coll); + // Have sort read in data from the queued data stage. + const int firstRead = 5; + for (int i = 0; i < firstRead; ++i) { + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState status = ss->work(&id); + ASSERT_NOT_EQUALS(PlanStage::ADVANCED, status); + } - std::unique_ptr<PlanExecutor> exec(makePlanExecutorWithSortStage(coll)); - SortStage * ss = static_cast<SortStage*>(exec->getRootStage()); - QueuedDataStage* ms = static_cast<QueuedDataStage*>(ss->getChildren()[0]); + // We should have read in the first 'firstRead' locs. Invalidate the first one. + // Since it's in the WorkingSet, the updates should not be reflected in the output. + exec->saveState(); + set<RecordId>::iterator it = locs.begin(); + Snapshotted<BSONObj> oldDoc = coll->docFor(&_txn, *it); + + OID updatedId = oldDoc.value().getField("_id").OID(); + SnapshotId idBeforeUpdate = oldDoc.snapshotId(); + // We purposefully update the document to have a 'foo' value greater than limit(). + // This allows us to check that we don't return the new copy of a doc by asserting + // foo < limit(). + BSONObj newDoc = BSON("_id" << updatedId << "foo" << limit() + 10); + oplogUpdateEntryArgs args; + { + WriteUnitOfWork wuow(&_txn); + coll->updateDocument(&_txn, *it, oldDoc, newDoc, false, false, NULL, args); + wuow.commit(); + } + exec->restoreState(&_txn); - // Have sort read in data from the queued data stage. - const int firstRead = 5; - for (int i = 0; i < firstRead; ++i) { - WorkingSetID id = WorkingSet::INVALID_ID; - PlanStage::StageState status = ss->work(&id); - ASSERT_NOT_EQUALS(PlanStage::ADVANCED, status); - } + // Read the rest of the data from the queued data stage. + while (!ms->isEOF()) { + WorkingSetID id = WorkingSet::INVALID_ID; + ss->work(&id); + } - // We should have read in the first 'firstRead' locs. Invalidate the first one. - // Since it's in the WorkingSet, the updates should not be reflected in the output. - exec->saveState(); - set<RecordId>::iterator it = locs.begin(); - Snapshotted<BSONObj> oldDoc = coll->docFor(&_txn, *it); - - OID updatedId = oldDoc.value().getField("_id").OID(); - SnapshotId idBeforeUpdate = oldDoc.snapshotId(); - // We purposefully update the document to have a 'foo' value greater than limit(). - // This allows us to check that we don't return the new copy of a doc by asserting - // foo < limit(). - BSONObj newDoc = BSON("_id" << updatedId << "foo" << limit() + 10); - oplogUpdateEntryArgs args; + // Let's just invalidate everything now. Already read into ss, so original values + // should be fetched. + exec->saveState(); + while (it != locs.end()) { + oldDoc = coll->docFor(&_txn, *it); { WriteUnitOfWork wuow(&_txn); - coll->updateDocument(&_txn, *it, oldDoc, newDoc, false, false, NULL, args); + coll->updateDocument(&_txn, *it++, oldDoc, newDoc, false, false, NULL, args); wuow.commit(); } - exec->restoreState(&_txn); - - // Read the rest of the data from the queued data stage. - while (!ms->isEOF()) { - WorkingSetID id = WorkingSet::INVALID_ID; - ss->work(&id); - } - - // Let's just invalidate everything now. Already read into ss, so original values - // should be fetched. - exec->saveState(); - while (it != locs.end()) { - oldDoc = coll->docFor(&_txn, *it); - { - WriteUnitOfWork wuow(&_txn); - coll->updateDocument(&_txn, *it++, oldDoc, newDoc, false, false, NULL, args); - wuow.commit(); - } + } + exec->restoreState(&_txn); + + // Verify that it's sorted, the right number of documents are returned, and they're all + // in the expected range. + int count = 0; + int lastVal = 0; + int thisVal; + while (!ss->isEOF()) { + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState status = ss->work(&id); + if (PlanStage::ADVANCED != status) { + ASSERT_NE(status, PlanStage::FAILURE); + ASSERT_NE(status, PlanStage::DEAD); + continue; } - exec->restoreState(&_txn); - - // Verify that it's sorted, the right number of documents are returned, and they're all - // in the expected range. - int count = 0; - int lastVal = 0; - int thisVal; - while (!ss->isEOF()) { - WorkingSetID id = WorkingSet::INVALID_ID; - PlanStage::StageState status = ss->work(&id); - if (PlanStage::ADVANCED != status) { - ASSERT_NE(status, PlanStage::FAILURE); - ASSERT_NE(status, PlanStage::DEAD); - continue; - } - WorkingSetMember* member = exec->getWorkingSet()->get(id); - ASSERT(member->hasObj()); - if (member->obj.value().getField("_id").OID() == updatedId) { - ASSERT(idBeforeUpdate == member->obj.snapshotId()); - } - thisVal = member->obj.value().getField("foo").Int(); - ASSERT_LTE(lastVal, thisVal); - // Expect docs in range [0, limit) - ASSERT_LTE(0, thisVal); - ASSERT_LT(thisVal, limit()); - lastVal = thisVal; - ++count; + WorkingSetMember* member = exec->getWorkingSet()->get(id); + ASSERT(member->hasObj()); + if (member->obj.value().getField("_id").OID() == updatedId) { + ASSERT(idBeforeUpdate == member->obj.snapshotId()); } - // Returns all docs. - ASSERT_EQUALS(limit(), count); + thisVal = member->obj.value().getField("foo").Int(); + ASSERT_LTE(lastVal, thisVal); + // Expect docs in range [0, limit) + ASSERT_LTE(0, thisVal); + ASSERT_LT(thisVal, limit()); + lastVal = thisVal; + ++count; + } + // Returns all docs. + ASSERT_EQUALS(limit(), count); + } +}; + +// Deletion invalidation of everything fed to sort. +class QueryStageSortDeletionInvalidation : public QueryStageSortTestBase { +public: + virtual int numObj() { + return 2000; + } + + void run() { + OldClientWriteContext ctx(&_txn, ns()); + Database* db = ctx.db(); + Collection* coll = db->getCollection(ns()); + if (!coll) { + WriteUnitOfWork wuow(&_txn); + coll = db->createCollection(&_txn, ns()); + wuow.commit(); } - }; - // Deletion invalidation of everything fed to sort. - class QueryStageSortDeletionInvalidation : public QueryStageSortTestBase { - public: - virtual int numObj() { return 2000; } + { + WriteUnitOfWork wuow(&_txn); + fillData(); + wuow.commit(); + } - void run() { - OldClientWriteContext ctx(&_txn, ns()); - Database* db = ctx.db(); - Collection* coll = db->getCollection(ns()); - if (!coll) { - WriteUnitOfWork wuow(&_txn); - coll = db->createCollection(&_txn, ns()); - wuow.commit(); - } + // The data we're going to later invalidate. + set<RecordId> locs; + getLocs(&locs, coll); - { - WriteUnitOfWork wuow(&_txn); - fillData(); - wuow.commit(); - } + std::unique_ptr<PlanExecutor> exec(makePlanExecutorWithSortStage(coll)); + SortStage* ss = static_cast<SortStage*>(exec->getRootStage()); + QueuedDataStage* ms = static_cast<QueuedDataStage*>(ss->getChildren()[0]); - // The data we're going to later invalidate. - set<RecordId> locs; - getLocs(&locs, coll); + const int firstRead = 10; + // Have sort read in data from the queued data stage. + for (int i = 0; i < firstRead; ++i) { + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState status = ss->work(&id); + ASSERT_NOT_EQUALS(PlanStage::ADVANCED, status); + } - std::unique_ptr<PlanExecutor> exec(makePlanExecutorWithSortStage(coll)); - SortStage * ss = static_cast<SortStage*>(exec->getRootStage()); - QueuedDataStage* ms = static_cast<QueuedDataStage*>(ss->getChildren()[0]); + // We should have read in the first 'firstRead' locs. Invalidate the first. + exec->saveState(); + set<RecordId>::iterator it = locs.begin(); + { + WriteUnitOfWork wuow(&_txn); + coll->deleteDocument(&_txn, *it++, false, false, NULL); + wuow.commit(); + } + exec->restoreState(&_txn); - const int firstRead = 10; - // Have sort read in data from the queued data stage. - for (int i = 0; i < firstRead; ++i) { - WorkingSetID id = WorkingSet::INVALID_ID; - PlanStage::StageState status = ss->work(&id); - ASSERT_NOT_EQUALS(PlanStage::ADVANCED, status); - } + // Read the rest of the data from the queued data stage. + while (!ms->isEOF()) { + WorkingSetID id = WorkingSet::INVALID_ID; + ss->work(&id); + } - // We should have read in the first 'firstRead' locs. Invalidate the first. - exec->saveState(); - set<RecordId>::iterator it = locs.begin(); + // Let's just invalidate everything now. + exec->saveState(); + while (it != locs.end()) { { WriteUnitOfWork wuow(&_txn); coll->deleteDocument(&_txn, *it++, false, false, NULL); wuow.commit(); } - exec->restoreState(&_txn); - - // Read the rest of the data from the queued data stage. - while (!ms->isEOF()) { - WorkingSetID id = WorkingSet::INVALID_ID; - ss->work(&id); - } - - // Let's just invalidate everything now. - exec->saveState(); - while (it != locs.end()) { - { - WriteUnitOfWork wuow(&_txn); - coll->deleteDocument(&_txn, *it++, false, false, NULL); - wuow.commit(); - } - } - exec->restoreState(&_txn); - - // Regardless of storage engine, all the documents should come back with their objects - int count = 0; - while (!ss->isEOF()) { - WorkingSetID id = WorkingSet::INVALID_ID; - PlanStage::StageState status = ss->work(&id); - if (PlanStage::ADVANCED != status) { - ASSERT_NE(status, PlanStage::FAILURE); - ASSERT_NE(status, PlanStage::DEAD); - continue; - } - WorkingSetMember* member = exec->getWorkingSet()->get(id); - ASSERT(member->hasObj()); - ++count; + } + exec->restoreState(&_txn); + + // Regardless of storage engine, all the documents should come back with their objects + int count = 0; + while (!ss->isEOF()) { + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState status = ss->work(&id); + if (PlanStage::ADVANCED != status) { + ASSERT_NE(status, PlanStage::FAILURE); + ASSERT_NE(status, PlanStage::DEAD); + continue; } - - // Returns all docs. - ASSERT_EQUALS(limit() ? limit() : numObj(), count); + WorkingSetMember* member = exec->getWorkingSet()->get(id); + ASSERT(member->hasObj()); + ++count; } - }; - - // Deletion invalidation of everything fed to sort with limit enabled. - // Limit size of working set within sort stage to a small number - // Sort stage implementation should not try to invalidate DiskLocc that - // are no longer in the working set. - template<int LIMIT> - class QueryStageSortDeletionInvalidationWithLimit : public QueryStageSortDeletionInvalidation { - public: - virtual int limit() const { - return LIMIT; + // Returns all docs. + ASSERT_EQUALS(limit() ? limit() : numObj(), count); + } +}; + +// Deletion invalidation of everything fed to sort with limit enabled. +// Limit size of working set within sort stage to a small number +// Sort stage implementation should not try to invalidate DiskLocc that +// are no longer in the working set. + +template <int LIMIT> +class QueryStageSortDeletionInvalidationWithLimit : public QueryStageSortDeletionInvalidation { +public: + virtual int limit() const { + return LIMIT; + } +}; + +// Should error out if we sort with parallel arrays. +class QueryStageSortParallelArrays : public QueryStageSortTestBase { +public: + virtual int numObj() { + return 100; + } + + void run() { + OldClientWriteContext ctx(&_txn, ns()); + Database* db = ctx.db(); + Collection* coll = db->getCollection(ns()); + if (!coll) { + WriteUnitOfWork wuow(&_txn); + coll = db->createCollection(&_txn, ns()); + wuow.commit(); } - }; - // Should error out if we sort with parallel arrays. - class QueryStageSortParallelArrays : public QueryStageSortTestBase { - public: - virtual int numObj() { return 100; } + WorkingSet* ws = new WorkingSet(); + QueuedDataStage* ms = new QueuedDataStage(ws); - void run() { - OldClientWriteContext ctx(&_txn, ns()); - Database* db = ctx.db(); - Collection* coll = db->getCollection(ns()); - if (!coll) { - WriteUnitOfWork wuow(&_txn); - coll = db->createCollection(&_txn, ns()); - wuow.commit(); - } + for (int i = 0; i < numObj(); ++i) { + WorkingSetMember member; + member.state = WorkingSetMember::OWNED_OBJ; - WorkingSet* ws = new WorkingSet(); - QueuedDataStage* ms = new QueuedDataStage(ws); + member.obj = Snapshotted<BSONObj>( + SnapshotId(), fromjson("{a: [1,2,3], b:[1,2,3], c:[1,2,3], d:[1,2,3,4]}")); + ms->pushBack(member); - for (int i = 0; i < numObj(); ++i) { - WorkingSetMember member; - member.state = WorkingSetMember::OWNED_OBJ; - - member.obj = Snapshotted<BSONObj>( - SnapshotId(), - fromjson("{a: [1,2,3], b:[1,2,3], c:[1,2,3], d:[1,2,3,4]}") - ); - ms->pushBack(member); - - member.obj = Snapshotted<BSONObj>( SnapshotId(), fromjson("{a:1, b:1, c:1}")); - ms->pushBack(member); - } - - SortStageParams params; - params.collection = coll; - params.pattern = BSON("b" << -1 << "c" << 1 << "a" << 1); - params.limit = 0; - - // We don't get results back since we're sorting some parallel arrays. - PlanExecutor* rawExec; - Status status = - PlanExecutor::make(&_txn, - ws, - new FetchStage(&_txn, - ws, - new SortStage(params, ws, ms), NULL, coll), - coll, PlanExecutor::YIELD_MANUAL, &rawExec); - std::unique_ptr<PlanExecutor> exec(rawExec); - - PlanExecutor::ExecState runnerState = exec->getNext(NULL, NULL); - ASSERT_EQUALS(PlanExecutor::FAILURE, runnerState); + member.obj = Snapshotted<BSONObj>(SnapshotId(), fromjson("{a:1, b:1, c:1}")); + ms->pushBack(member); } - }; - - class All : public Suite { - public: - All() : Suite( "query_stage_sort_test" ) { } - - void setupTests() { - add<QueryStageSortInc>(); - add<QueryStageSortDec>(); - // Sort with limit has a general limiting strategy for limit > 1 - add<QueryStageSortDecWithLimit<10> >(); - // and a special case for limit == 1 - add<QueryStageSortDecWithLimit<1> >(); - add<QueryStageSortExt>(); - add<QueryStageSortMutationInvalidation>(); - add<QueryStageSortDeletionInvalidation>(); - add<QueryStageSortDeletionInvalidationWithLimit<10> >(); - add<QueryStageSortDeletionInvalidationWithLimit<1> >(); - add<QueryStageSortParallelArrays>(); - } - }; - SuiteInstance<All> queryStageSortTest; + SortStageParams params; + params.collection = coll; + params.pattern = BSON("b" << -1 << "c" << 1 << "a" << 1); + params.limit = 0; + + // We don't get results back since we're sorting some parallel arrays. + PlanExecutor* rawExec; + Status status = + PlanExecutor::make(&_txn, + ws, + new FetchStage(&_txn, ws, new SortStage(params, ws, ms), NULL, coll), + coll, + PlanExecutor::YIELD_MANUAL, + &rawExec); + std::unique_ptr<PlanExecutor> exec(rawExec); + + PlanExecutor::ExecState runnerState = exec->getNext(NULL, NULL); + ASSERT_EQUALS(PlanExecutor::FAILURE, runnerState); + } +}; + +class All : public Suite { +public: + All() : Suite("query_stage_sort_test") {} + + void setupTests() { + add<QueryStageSortInc>(); + add<QueryStageSortDec>(); + // Sort with limit has a general limiting strategy for limit > 1 + add<QueryStageSortDecWithLimit<10>>(); + // and a special case for limit == 1 + add<QueryStageSortDecWithLimit<1>>(); + add<QueryStageSortExt>(); + add<QueryStageSortMutationInvalidation>(); + add<QueryStageSortDeletionInvalidation>(); + add<QueryStageSortDeletionInvalidationWithLimit<10>>(); + add<QueryStageSortDeletionInvalidationWithLimit<1>>(); + add<QueryStageSortParallelArrays>(); + } +}; + +SuiteInstance<All> queryStageSortTest; } // namespace |