/** * Copyright (C) 2013-2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #include "mongo/client/dbclientcursor.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/exec/fetch.h" #include "mongo/db/exec/index_scan.h" #include "mongo/db/exec/merge_sort.h" #include "mongo/db/exec/plan_stage.h" #include "mongo/db/json.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/query/plan_executor.h" #include "mongo/dbtests/dbtests.h" /** * This file tests db/exec/merge_sort.cpp */ namespace QueryStageMergeSortTests { using std::auto_ptr; using std::set; using std::string; class QueryStageMergeSortTestBase { public: QueryStageMergeSortTestBase() : _client(&_txn) { } virtual ~QueryStageMergeSortTestBase() { OldClientWriteContext ctx(&_txn, ns()); _client.dropCollection(ns()); } void addIndex(const BSONObj& obj) { ASSERT_OK(dbtests::createIndex(&_txn, ns(), obj)); } IndexDescriptor* getIndex(const BSONObj& obj, Collection* coll) { return coll->getIndexCatalog()->findIndexByKeyPattern( &_txn, obj ); } void insert(const BSONObj& obj) { _client.insert(ns(), obj); } void remove(const BSONObj& obj) { _client.remove(ns(), obj); } void getLocs(set* out, Collection* coll) { RecordIterator* it = coll->getIterator(&_txn); while (!it->isEOF()) { RecordId nextLoc = it->getNext(); out->insert(nextLoc); } delete it; } BSONObj objWithMinKey(int start) { BSONObjBuilder startKeyBob; startKeyBob.append("", start); startKeyBob.appendMinKey(""); return startKeyBob.obj(); } BSONObj objWithMaxKey(int start) { BSONObjBuilder endKeyBob; endKeyBob.append("", start); endKeyBob.appendMaxKey(""); return endKeyBob.obj(); } static const char* ns() { return "unittests.QueryStageMergeSort"; } protected: OperationContextImpl _txn; private: DBDirectClient _client; }; // SERVER-1205: // find($or[{a:1}, {b:1}]).sort({c:1}) with indices {a:1, c:1} and {b:1, c:1}. class QueryStageMergeSortPrefixIndex : public QueryStageMergeSortTestBase { public: void run() { OldClientWriteContext ctx(&_txn, ns()); Database* db = ctx.db(); Collection* coll = db->getCollection(ns()); if (!coll) { WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); wuow.commit(); } const int N = 50; for (int i = 0; i < N; ++i) { insert(BSON("a" << 1 << "c" << i)); insert(BSON("b" << 1 << "c" << i)); } BSONObj firstIndex = BSON("a" << 1 << "c" << 1); BSONObj secondIndex = BSON("b" << 1 << "c" << 1); addIndex(firstIndex); addIndex(secondIndex); WorkingSet* ws = new WorkingSet(); // Sort by c:1 MergeSortStageParams msparams; msparams.pattern = BSON("c" << 1); MergeSortStage* ms = new MergeSortStage(msparams, ws, coll); // a:1 IndexScanParams params; params.descriptor = getIndex(firstIndex, coll); params.bounds.isSimpleRange = true; params.bounds.startKey = objWithMinKey(1); params.bounds.endKey = objWithMaxKey(1); params.bounds.endKeyInclusive = true; params.direction = 1; ms->addChild(new IndexScan(&_txn, params, ws, NULL)); // b:1 params.descriptor = getIndex(secondIndex, coll); ms->addChild(new IndexScan(&_txn, params, ws, NULL)); // Must fetch if we want to easily pull out an obj. PlanExecutor* rawExec; Status status = PlanExecutor::make(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), coll, PlanExecutor::YIELD_MANUAL, &rawExec); ASSERT_OK(status); boost::scoped_ptr exec(rawExec); for (int i = 0; i < N; ++i) { BSONObj first, second; ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&first, NULL)); ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&second, NULL)); ASSERT_EQUALS(first["c"].numberInt(), second["c"].numberInt()); ASSERT_EQUALS(i, first["c"].numberInt()); ASSERT((first.hasField("a") && second.hasField("b")) || (first.hasField("b") && second.hasField("a"))); } // Should be done now. BSONObj foo; ASSERT_NOT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&foo, NULL)); } }; // Each inserted document appears in both indices but is deduped and returned properly/sorted. class QueryStageMergeSortDups : public QueryStageMergeSortTestBase { public: void run() { OldClientWriteContext ctx(&_txn, ns()); Database* db = ctx.db(); Collection* coll = db->getCollection(ns()); if (!coll) { WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); wuow.commit(); } const int N = 50; for (int i = 0; i < N; ++i) { insert(BSON("a" << 1 << "b" << 1 << "c" << i)); insert(BSON("a" << 1 << "b" << 1 << "c" << i)); } BSONObj firstIndex = BSON("a" << 1 << "c" << 1); BSONObj secondIndex = BSON("b" << 1 << "c" << 1); addIndex(firstIndex); addIndex(secondIndex); WorkingSet* ws = new WorkingSet(); // Sort by c:1 MergeSortStageParams msparams; msparams.pattern = BSON("c" << 1); MergeSortStage* ms = new MergeSortStage(msparams, ws, coll); // a:1 IndexScanParams params; params.descriptor = getIndex(firstIndex, coll); params.bounds.isSimpleRange = true; params.bounds.startKey = objWithMinKey(1); params.bounds.endKey = objWithMaxKey(1); params.bounds.endKeyInclusive = true; params.direction = 1; ms->addChild(new IndexScan(&_txn, params, ws, NULL)); // b:1 params.descriptor = getIndex(secondIndex, coll); ms->addChild(new IndexScan(&_txn, params, ws, NULL)); PlanExecutor* rawExec; Status status = PlanExecutor::make(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), coll, PlanExecutor::YIELD_MANUAL, &rawExec); ASSERT_OK(status); boost::scoped_ptr exec(rawExec); for (int i = 0; i < N; ++i) { BSONObj first, second; ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&first, NULL)); ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&second, NULL)); ASSERT_EQUALS(first["c"].numberInt(), second["c"].numberInt()); ASSERT_EQUALS(i, first["c"].numberInt()); ASSERT((first.hasField("a") && second.hasField("b")) || (first.hasField("b") && second.hasField("a"))); } // Should be done now. BSONObj foo; ASSERT_EQUALS(PlanExecutor::IS_EOF, exec->getNext(&foo, NULL)); } }; // Each inserted document appears in both indices, no deduping, get each result twice. class QueryStageMergeSortDupsNoDedup : public QueryStageMergeSortTestBase { public: void run() { OldClientWriteContext ctx(&_txn, ns()); Database* db = ctx.db(); Collection* coll = db->getCollection(ns()); if (!coll) { WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); wuow.commit(); } const int N = 50; for (int i = 0; i < N; ++i) { insert(BSON("a" << 1 << "b" << 1 << "c" << i)); } BSONObj firstIndex = BSON("a" << 1 << "c" << 1); BSONObj secondIndex = BSON("b" << 1 << "c" << 1); addIndex(firstIndex); addIndex(secondIndex); WorkingSet* ws = new WorkingSet(); // Sort by c:1 MergeSortStageParams msparams; msparams.dedup = false; msparams.pattern = BSON("c" << 1); MergeSortStage* ms = new MergeSortStage(msparams, ws, coll); // a:1 IndexScanParams params; params.descriptor = getIndex(firstIndex, coll); params.bounds.isSimpleRange = true; params.bounds.startKey = objWithMinKey(1); params.bounds.endKey = objWithMaxKey(1); params.bounds.endKeyInclusive = true; params.direction = 1; ms->addChild(new IndexScan(&_txn, params, ws, NULL)); // b:1 params.descriptor = getIndex(secondIndex, coll); ms->addChild(new IndexScan(&_txn, params, ws, NULL)); PlanExecutor* rawExec; Status status = PlanExecutor::make(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), coll, PlanExecutor::YIELD_MANUAL, &rawExec); ASSERT_OK(status); boost::scoped_ptr exec(rawExec); for (int i = 0; i < N; ++i) { BSONObj first, second; // We inserted N objects but we get 2 * N from the runner because of dups. ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&first, NULL)); ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&second, NULL)); ASSERT_EQUALS(first["c"].numberInt(), second["c"].numberInt()); ASSERT_EQUALS(i, first["c"].numberInt()); ASSERT((first.hasField("a") && second.hasField("b")) || (first.hasField("b") && second.hasField("a"))); } // Should be done now. BSONObj foo; ASSERT_EQUALS(PlanExecutor::IS_EOF, exec->getNext(&foo, NULL)); } }; // Decreasing indices merged ok. Basically the test above but decreasing. class QueryStageMergeSortPrefixIndexReverse : public QueryStageMergeSortTestBase { public: void run() { OldClientWriteContext ctx(&_txn, ns()); Database* db = ctx.db(); Collection* coll = db->getCollection(ns()); if (!coll) { WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); wuow.commit(); } const int N = 50; for (int i = 0; i < N; ++i) { // We insert a:1 c:i for i=0..49 but in reverse order for the heck of it. insert(BSON("a" << 1 << "c" << N - i - 1)); insert(BSON("b" << 1 << "c" << i)); } BSONObj firstIndex = BSON("a" << 1 << "c" << -1); BSONObj secondIndex = BSON("b" << 1 << "c" << -1); addIndex(firstIndex); addIndex(secondIndex); WorkingSet* ws = new WorkingSet(); // Sort by c:-1 MergeSortStageParams msparams; msparams.pattern = BSON("c" << -1); MergeSortStage* ms = new MergeSortStage(msparams, ws, coll); // a:1 IndexScanParams params; params.descriptor = getIndex(firstIndex, coll); params.bounds.isSimpleRange = true; params.bounds.startKey = objWithMaxKey(1); params.bounds.endKey = objWithMinKey(1); params.bounds.endKeyInclusive = true; // This is the direction along the index. params.direction = 1; ms->addChild(new IndexScan(&_txn, params, ws, NULL)); // b:1 params.descriptor = getIndex(secondIndex, coll); ms->addChild(new IndexScan(&_txn, params, ws, NULL)); PlanExecutor* rawExec; Status status = PlanExecutor::make(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), coll, PlanExecutor::YIELD_MANUAL, &rawExec); ASSERT_OK(status); boost::scoped_ptr exec(rawExec); for (int i = 0; i < N; ++i) { BSONObj first, second; ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&first, NULL)); ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&second, NULL)); ASSERT_EQUALS(first["c"].numberInt(), second["c"].numberInt()); ASSERT_EQUALS(N - i - 1, first["c"].numberInt()); ASSERT((first.hasField("a") && second.hasField("b")) || (first.hasField("b") && second.hasField("a"))); } // Should be done now. BSONObj foo; ASSERT_EQUALS(PlanExecutor::IS_EOF, exec->getNext(&foo, NULL)); } }; // One stage EOF immediately class QueryStageMergeSortOneStageEOF : public QueryStageMergeSortTestBase { public: void run() { OldClientWriteContext ctx(&_txn, ns()); Database* db = ctx.db(); Collection* coll = db->getCollection(ns()); if (!coll) { WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); wuow.commit(); } const int N = 50; for (int i = 0; i < N; ++i) { insert(BSON("a" << 1 << "c" << i)); insert(BSON("b" << 1 << "c" << i)); } BSONObj firstIndex = BSON("a" << 1 << "c" << 1); BSONObj secondIndex = BSON("b" << 1 << "c" << 1); addIndex(firstIndex); addIndex(secondIndex); WorkingSet* ws = new WorkingSet(); // Sort by c:1 MergeSortStageParams msparams; msparams.pattern = BSON("c" << 1); MergeSortStage* ms = new MergeSortStage(msparams, ws, coll); // a:1 IndexScanParams params; params.descriptor = getIndex(firstIndex, coll); params.bounds.isSimpleRange = true; params.bounds.startKey = objWithMinKey(1); params.bounds.endKey = objWithMaxKey(1); params.bounds.endKeyInclusive = true; params.direction = 1; ms->addChild(new IndexScan(&_txn, params, ws, NULL)); // b:51 (EOF) params.descriptor = getIndex(secondIndex, coll); params.bounds.startKey = BSON("" << 51 << "" << MinKey); params.bounds.endKey = BSON("" << 51 << "" << MaxKey); ms->addChild(new IndexScan(&_txn, params, ws, NULL)); PlanExecutor* rawExec; Status status = PlanExecutor::make(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), coll, PlanExecutor::YIELD_MANUAL, &rawExec); ASSERT_OK(status); boost::scoped_ptr exec(rawExec); // Only getting results from the a:1 index scan. for (int i = 0; i < N; ++i) { BSONObj obj; ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&obj, NULL)); ASSERT_EQUALS(i, obj["c"].numberInt()); ASSERT_EQUALS(1, obj["a"].numberInt()); } // Should be done now. BSONObj foo; ASSERT_EQUALS(PlanExecutor::IS_EOF, exec->getNext(&foo, NULL)); } }; // N stages each have 1 result class QueryStageMergeSortManyShort : public QueryStageMergeSortTestBase { public: void run() { OldClientWriteContext ctx(&_txn, ns()); Database* db = ctx.db(); Collection* coll = db->getCollection(ns()); if (!coll) { WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); wuow.commit(); } WorkingSet* ws = new WorkingSet(); // Sort by foo:1 MergeSortStageParams msparams; msparams.pattern = BSON("foo" << 1); MergeSortStage* ms = new MergeSortStage(msparams, ws, coll); IndexScanParams params; params.bounds.isSimpleRange = true; params.bounds.startKey = objWithMinKey(1); params.bounds.endKey = objWithMaxKey(1); params.bounds.endKeyInclusive = true; params.direction = 1; int numIndices = 20; for (int i = 0; i < numIndices; ++i) { // 'a', 'b', ... string index(1, 'a' + i); insert(BSON(index << 1 << "foo" << i)); BSONObj indexSpec = BSON(index << 1 << "foo" << 1); addIndex(indexSpec); params.descriptor = getIndex(indexSpec, coll); ms->addChild(new IndexScan(&_txn, params, ws, NULL)); } PlanExecutor* rawExec; Status status = PlanExecutor::make(&_txn, ws, new FetchStage(&_txn, ws, ms, NULL, coll), coll, PlanExecutor::YIELD_MANUAL, &rawExec); ASSERT_OK(status); boost::scoped_ptr exec(rawExec); for (int i = 0; i < numIndices; ++i) { BSONObj obj; ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&obj, NULL)); ASSERT_EQUALS(i, obj["foo"].numberInt()); string index(1, 'a' + i); ASSERT_EQUALS(1, obj[index].numberInt()); } // Should be done now. BSONObj foo; ASSERT_EQUALS(PlanExecutor::IS_EOF, exec->getNext(&foo, NULL)); } }; // Invalidation mid-run class QueryStageMergeSortInvalidation : public QueryStageMergeSortTestBase { public: void run() { OldClientWriteContext ctx(&_txn, ns()); Database* db = ctx.db(); Collection* coll = db->getCollection(ns()); if (!coll) { WriteUnitOfWork wuow(&_txn); coll = db->createCollection(&_txn, ns()); wuow.commit(); } WorkingSet ws; // Sort by foo:1 MergeSortStageParams msparams; msparams.pattern = BSON("foo" << 1); auto_ptr ms(new MergeSortStage(msparams, &ws, coll)); IndexScanParams params; params.bounds.isSimpleRange = true; params.bounds.startKey = objWithMinKey(1); params.bounds.endKey = objWithMaxKey(1); params.bounds.endKeyInclusive = true; params.direction = 1; // Index 'a'+i has foo equal to 'i'. int numIndices = 20; for (int i = 0; i < numIndices; ++i) { // 'a', 'b', ... string index(1, 'a' + i); insert(BSON(index << 1 << "foo" << i)); BSONObj indexSpec = BSON(index << 1 << "foo" << 1); addIndex(indexSpec); params.descriptor = getIndex(indexSpec, coll); ms->addChild(new IndexScan(&_txn, params, &ws, NULL)); } set locs; getLocs(&locs, coll); set::iterator it = locs.begin(); // Get 10 results. Should be getting results in order of 'locs'. int count = 0; while (!ms->isEOF() && count < 10) { WorkingSetID id = WorkingSet::INVALID_ID; PlanStage::StageState status = ms->work(&id); if (PlanStage::ADVANCED != status) { continue; } WorkingSetMember* member = ws.get(id); ASSERT_EQUALS(member->loc, *it); BSONElement elt; string index(1, 'a' + count); ASSERT(member->getFieldDotted(index, &elt)); ASSERT_EQUALS(1, elt.numberInt()); ASSERT(member->getFieldDotted("foo", &elt)); ASSERT_EQUALS(count, elt.numberInt()); ++count; ++it; } // Invalidate locs[11]. Should force a fetch. We don't get it back. ms->saveState(); ms->invalidate(&_txn, *it, INVALIDATION_DELETION); ms->restoreState(&_txn); // Make sure locs[11] was fetched for us. { // TODO: If we have "return upon invalidation" ever triggerable, do the following test. /* WorkingSetID id = WorkingSet::INVALID_ID; PlanStage::StageState status; do { status = ms->work(&id); } while (PlanStage::ADVANCED != status); WorkingSetMember* member = ws.get(id); ASSERT(!member->hasLoc()); ASSERT(member->hasObj()); string index(1, 'a' + count); BSONElement elt; ASSERT_TRUE(member->getFieldDotted(index, &elt)); ASSERT_EQUALS(1, elt.numberInt()); ASSERT(member->getFieldDotted("foo", &elt)); ASSERT_EQUALS(count, elt.numberInt()); */ ++it; ++count; } // And get the rest. while (!ms->isEOF()) { WorkingSetID id = WorkingSet::INVALID_ID; PlanStage::StageState status = ms->work(&id); if (PlanStage::ADVANCED != status) { continue; } WorkingSetMember* member = ws.get(id); ASSERT_EQUALS(member->loc, *it); BSONElement elt; string index(1, 'a' + count); ASSERT_TRUE(member->getFieldDotted(index, &elt)); ASSERT_EQUALS(1, elt.numberInt()); ASSERT(member->getFieldDotted("foo", &elt)); ASSERT_EQUALS(count, elt.numberInt()); ++count; ++it; } } }; class All : public Suite { public: All() : Suite( "query_stage_merge_sort_test" ) { } void setupTests() { add(); add(); add(); add(); add(); add(); add(); } }; SuiteInstance queryStageMergeSortTest; } // namespace