/** * Copyright (C) 2013-2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" #include "mongo/db/client.h" #include "mongo/db/clientcursor.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/exec/collection_scan.h" #include "mongo/db/exec/fetch.h" #include "mongo/db/exec/index_scan.h" #include "mongo/db/exec/pipeline_proxy.h" #include "mongo/db/exec/plan_stage.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/json.h" #include "mongo/db/matcher/expression_parser.h" #include "mongo/db/matcher/extensions_callback_disallow_extensions.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/document_source_cursor.h" #include "mongo/db/pipeline/expression_context_for_test.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/query/query_solution.h" #include "mongo/dbtests/dbtests.h" #include "mongo/stdx/memory.h" namespace QueryPlanExecutor { using std::shared_ptr; using std::string; using std::unique_ptr; using stdx::make_unique; static const NamespaceString nss("unittests.QueryPlanExecutor"); class PlanExecutorBase { public: PlanExecutorBase() : _client(&_opCtx) {} virtual ~PlanExecutorBase() { _client.dropCollection(nss.ns()); } void addIndex(const BSONObj& obj) { ASSERT_OK(dbtests::createIndex(&_opCtx, nss.ns(), obj)); } void insert(const BSONObj& obj) { _client.insert(nss.ns(), obj); } void remove(const BSONObj& obj) { _client.remove(nss.ns(), obj); } void dropCollection() { _client.dropCollection(nss.ns()); } void update(BSONObj& query, BSONObj& updateSpec) { _client.update(nss.ns(), query, updateSpec, false, false); } /** * Given a match expression, represented as the BSON object 'filterObj', * create a PlanExecutor capable of executing a simple collection * scan. * * The caller takes ownership of the returned PlanExecutor*. */ PlanExecutor* makeCollScanExec(Collection* coll, BSONObj& filterObj) { CollectionScanParams csparams; csparams.collection = coll; csparams.direction = CollectionScanParams::FORWARD; unique_ptr ws(new WorkingSet()); // Canonicalize the query. auto qr = stdx::make_unique(nss); qr->setFilter(filterObj); auto statusWithCQ = CanonicalQuery::canonicalize( &_opCtx, std::move(qr), ExtensionsCallbackDisallowExtensions()); verify(statusWithCQ.isOK()); unique_ptr cq = std::move(statusWithCQ.getValue()); verify(NULL != cq.get()); // Make the stage. unique_ptr root( new CollectionScan(&_opCtx, csparams, ws.get(), cq.get()->root())); // Hand the plan off to the executor. auto statusWithPlanExecutor = PlanExecutor::make(&_opCtx, std::move(ws), std::move(root), std::move(cq), coll, PlanExecutor::YIELD_MANUAL); ASSERT_OK(statusWithPlanExecutor.getStatus()); return statusWithPlanExecutor.getValue().release(); } /** * @param indexSpec -- a BSONObj giving the index over which to * scan, e.g. {_id: 1}. * @param start -- the lower bound (inclusive) at which to start * the index scan * @param end -- the lower bound (inclusive) at which to end the * index scan * * Returns a PlanExecutor capable of executing an index scan * over the specified index with the specified bounds. * * The caller takes ownership of the returned PlanExecutor*. */ PlanExecutor* makeIndexScanExec(Database* db, BSONObj& indexSpec, int start, int end) { // Build the index scan stage. IndexScanParams ixparams; ixparams.descriptor = getIndex(db, indexSpec); ixparams.bounds.isSimpleRange = true; ixparams.bounds.startKey = BSON("" << start); ixparams.bounds.endKey = BSON("" << end); ixparams.bounds.boundInclusion = BoundInclusion::kIncludeBothStartAndEndKeys; ixparams.direction = 1; const Collection* coll = db->getCollection(nss.ns()); unique_ptr ws(new WorkingSet()); IndexScan* ix = new IndexScan(&_opCtx, ixparams, ws.get(), NULL); unique_ptr root(new FetchStage(&_opCtx, ws.get(), ix, NULL, coll)); auto qr = stdx::make_unique(nss); auto statusWithCQ = CanonicalQuery::canonicalize( &_opCtx, std::move(qr), ExtensionsCallbackDisallowExtensions()); verify(statusWithCQ.isOK()); unique_ptr cq = std::move(statusWithCQ.getValue()); verify(NULL != cq.get()); // Hand the plan off to the executor. auto statusWithPlanExecutor = PlanExecutor::make(&_opCtx, std::move(ws), std::move(root), std::move(cq), coll, PlanExecutor::YIELD_MANUAL); ASSERT_OK(statusWithPlanExecutor.getStatus()); return statusWithPlanExecutor.getValue().release(); } size_t numCursors() { AutoGetCollectionForRead ctx(&_opCtx, nss); Collection* collection = ctx.getCollection(); if (!collection) return 0; return collection->getCursorManager()->numCursors(); } void registerExec(PlanExecutor* exec) { // TODO: This is not correct (create collection under S-lock) AutoGetCollectionForRead ctx(&_opCtx, nss); WriteUnitOfWork wunit(&_opCtx); Collection* collection = ctx.getDb()->getOrCreateCollection(&_opCtx, nss.ns()); collection->getCursorManager()->registerExecutor(exec); wunit.commit(); } void deregisterExec(PlanExecutor* exec) { // TODO: This is not correct (create collection under S-lock) AutoGetCollectionForRead ctx(&_opCtx, nss); WriteUnitOfWork wunit(&_opCtx); Collection* collection = ctx.getDb()->getOrCreateCollection(&_opCtx, nss.ns()); collection->getCursorManager()->deregisterExecutor(exec); wunit.commit(); } protected: const ServiceContext::UniqueOperationContext _txnPtr = cc().makeOperationContext(); OperationContext& _opCtx = *_txnPtr; private: IndexDescriptor* getIndex(Database* db, const BSONObj& obj) { Collection* collection = db->getCollection(nss.ns()); std::vector indexes; collection->getIndexCatalog()->findIndexesByKeyPattern(&_opCtx, obj, false, &indexes); ASSERT_LTE(indexes.size(), 1U); return indexes.size() == 0 ? nullptr : indexes[0]; } DBDirectClient _client; }; /** * Test dropping the collection while the * PlanExecutor is doing a collection scan. */ class DropCollScan : public PlanExecutorBase { public: void run() { OldClientWriteContext ctx(&_opCtx, nss.ns()); insert(BSON("_id" << 1)); insert(BSON("_id" << 2)); BSONObj filterObj = fromjson("{_id: {$gt: 0}}"); Collection* coll = ctx.getCollection(); unique_ptr exec(makeCollScanExec(coll, filterObj)); registerExec(exec.get()); BSONObj objOut; ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL)); ASSERT_EQUALS(1, objOut["_id"].numberInt()); // After dropping the collection, the plan executor should be dead. dropCollection(); ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL)); deregisterExec(exec.get()); } }; /** * Test dropping the collection while the PlanExecutor is doing an index scan. */ class DropIndexScan : public PlanExecutorBase { public: void run() { OldClientWriteContext ctx(&_opCtx, nss.ns()); insert(BSON("_id" << 1 << "a" << 6)); insert(BSON("_id" << 2 << "a" << 7)); insert(BSON("_id" << 3 << "a" << 8)); BSONObj indexSpec = BSON("a" << 1); addIndex(indexSpec); unique_ptr exec(makeIndexScanExec(ctx.db(), indexSpec, 7, 10)); registerExec(exec.get()); BSONObj objOut; ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL)); ASSERT_EQUALS(7, objOut["a"].numberInt()); // After dropping the collection, the plan executor should be dead. dropCollection(); ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL)); deregisterExec(exec.get()); } }; /** * Test dropping the collection while an agg PlanExecutor is doing an index scan. */ class DropIndexScanAgg : public PlanExecutorBase { public: void run() { OldClientWriteContext ctx(&_opCtx, nss.ns()); insert(BSON("_id" << 1 << "a" << 6)); insert(BSON("_id" << 2 << "a" << 7)); insert(BSON("_id" << 3 << "a" << 8)); BSONObj indexSpec = BSON("a" << 1); addIndex(indexSpec); Collection* collection = ctx.getCollection(); // Create the aggregation pipeline. std::vector rawPipeline = {fromjson("{$match: {a: {$gte: 7, $lte: 10}}}")}; boost::intrusive_ptr expCtx = new ExpressionContextForTest(&_opCtx, AggregationRequest(nss, rawPipeline)); // Create an "inner" plan executor and register it with the cursor manager so that it can // get notified when the collection is dropped. unique_ptr innerExec(makeIndexScanExec(ctx.db(), indexSpec, 7, 10)); registerExec(innerExec.get()); // Wrap the "inner" plan executor in a DocumentSourceCursor and add it as the first source // in the pipeline. innerExec->saveState(); auto cursorSource = DocumentSourceCursor::create(collection, nss.ns(), std::move(innerExec), expCtx); auto pipeline = assertGet(Pipeline::create({cursorSource}, expCtx)); // Create the output PlanExecutor that pulls results from the pipeline. auto ws = make_unique(); auto proxy = make_unique(&_opCtx, pipeline, ws.get()); auto statusWithPlanExecutor = PlanExecutor::make( &_opCtx, std::move(ws), std::move(proxy), collection, PlanExecutor::YIELD_MANUAL); ASSERT_OK(statusWithPlanExecutor.getStatus()); unique_ptr outerExec = std::move(statusWithPlanExecutor.getValue()); // Register the "outer" plan executor with the cursor manager so it can get notified when // the collection is dropped. registerExec(outerExec.get()); dropCollection(); // Verify that the aggregation pipeline returns an error because its "inner" plan executor // has been killed due to the collection being dropped. ASSERT_THROWS_CODE(pipeline->getNext(), UserException, 16028); // Verify that the "outer" plan executor has been killed due to the collection being // dropped. BSONObj objOut; ASSERT_EQUALS(PlanExecutor::DEAD, outerExec->getNext(&objOut, nullptr)); deregisterExec(outerExec.get()); } }; class SnapshotBase : public PlanExecutorBase { protected: void setupCollection() { insert(BSON("_id" << 1 << "a" << 1)); insert(BSON("_id" << 2 << "a" << 2 << "payload" << "x")); insert(BSON("_id" << 3 << "a" << 3)); insert(BSON("_id" << 4 << "a" << 4)); } /** * Increases a document's size dramatically such that the document * exceeds the available padding and must be moved to the end of * the collection. */ void forceDocumentMove() { BSONObj query = BSON("_id" << 2); BSONObj updateSpec = BSON("$set" << BSON("payload" << payload8k())); update(query, updateSpec); } std::string payload8k() { return std::string(8 * 1024, 'x'); } /** * Given an array of ints, 'expectedIds', and a PlanExecutor, * 'exec', uses the executor to iterate through the collection. While * iterating, asserts that the _id of each successive document equals * the respective integer in 'expectedIds'. */ void checkIds(int* expectedIds, PlanExecutor* exec) { BSONObj objOut; int idcount = 0; PlanExecutor::ExecState state; while (PlanExecutor::ADVANCED == (state = exec->getNext(&objOut, NULL))) { ASSERT_EQUALS(expectedIds[idcount], objOut["_id"].numberInt()); ++idcount; } ASSERT_EQUALS(PlanExecutor::IS_EOF, state); } }; /** * Create a scenario in which the same document is returned * twice due to a concurrent document move and collection * scan. */ class SnapshotControl : public SnapshotBase { public: void run() { OldClientWriteContext ctx(&_opCtx, nss.ns()); setupCollection(); BSONObj filterObj = fromjson("{a: {$gte: 2}}"); Collection* coll = ctx.getCollection(); unique_ptr exec(makeCollScanExec(coll, filterObj)); BSONObj objOut; ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL)); ASSERT_EQUALS(2, objOut["a"].numberInt()); forceDocumentMove(); int ids[] = {3, 4, 2}; checkIds(ids, exec.get()); } }; /** * A snapshot is really just a hint that means scan the _id index. * Make sure that we do not see the document move with an _id * index scan. */ class SnapshotTest : public SnapshotBase { public: void run() { OldClientWriteContext ctx(&_opCtx, nss.ns()); setupCollection(); BSONObj indexSpec = BSON("_id" << 1); addIndex(indexSpec); BSONObj filterObj = fromjson("{a: {$gte: 2}}"); unique_ptr exec(makeIndexScanExec(ctx.db(), indexSpec, 2, 5)); BSONObj objOut; ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&objOut, NULL)); ASSERT_EQUALS(2, objOut["a"].numberInt()); forceDocumentMove(); // Since this time we're scanning the _id index, // we should not see the moved document again. int ids[] = {3, 4}; checkIds(ids, exec.get()); } }; namespace ClientCursor { using mongo::ClientCursor; /** * Test invalidation of ClientCursor. */ class Invalidate : public PlanExecutorBase { public: void run() { OldClientWriteContext ctx(&_opCtx, nss.ns()); insert(BSON("a" << 1 << "b" << 1)); BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}"); Collection* coll = ctx.getCollection(); PlanExecutor* exec = makeCollScanExec(coll, filterObj); // Make a client cursor from the plan executor. coll->getCursorManager()->registerCursor({exec, nss.ns(), false, 0, BSONObj()}); // There should be one cursor before invalidation, // and zero cursors after invalidation. ASSERT_EQUALS(1U, numCursors()); coll->getCursorManager()->invalidateAll(false, "Invalidate Test"); ASSERT_EQUALS(0U, numCursors()); } }; /** * Test that pinned client cursors persist even after * invalidation. */ class InvalidatePinned : public PlanExecutorBase { public: void run() { OldClientWriteContext ctx(&_opCtx, nss.ns()); insert(BSON("a" << 1 << "b" << 1)); Collection* collection = ctx.getCollection(); BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}"); PlanExecutor* exec = makeCollScanExec(collection, filterObj); // Make a client cursor from the plan executor. auto ccPin = collection->getCursorManager()->registerCursor({exec, nss.ns(), false, 0, BSONObj()}); // If the cursor is pinned, it sticks around, even after invalidation. ASSERT_EQUALS(1U, numCursors()); const std::string invalidateReason("InvalidatePinned Test"); collection->getCursorManager()->invalidateAll(false, invalidateReason); ASSERT_EQUALS(1U, numCursors()); // The invalidation should have killed the plan executor. BSONObj objOut; ASSERT_EQUALS(PlanExecutor::DEAD, exec->getNext(&objOut, NULL)); ASSERT(WorkingSetCommon::isValidStatusMemberObject(objOut)); const Status status = WorkingSetCommon::getMemberObjectStatus(objOut); ASSERT(status.reason().find(invalidateReason) != string::npos); // Deleting the underlying cursor should cause the // number of cursors to return to 0. ccPin.deleteUnderlying(); ASSERT_EQUALS(0U, numCursors()); } }; /** * Test that client cursors time out and get * deleted. */ class Timeout : public PlanExecutorBase { public: void run() { { OldClientWriteContext ctx(&_opCtx, nss.ns()); insert(BSON("a" << 1 << "b" << 1)); } { AutoGetCollectionForRead ctx(&_opCtx, nss); Collection* collection = ctx.getCollection(); BSONObj filterObj = fromjson("{_id: {$gt: 0}, b: {$gt: 0}}"); PlanExecutor* exec = makeCollScanExec(collection, filterObj); // Make a client cursor from the plan executor. collection->getCursorManager()->registerCursor({exec, nss.ns(), false, 0, BSONObj()}); } // There should be one cursor before timeout, // and zero cursors after timeout. ASSERT_EQUALS(1U, numCursors()); CursorManager::timeoutCursorsGlobal(&_opCtx, 600001); ASSERT_EQUALS(0U, numCursors()); } }; } // namespace ClientCursor class All : public Suite { public: All() : Suite("query_plan_executor") {} void setupTests() { add(); add(); add(); add(); add(); add(); add(); add(); } }; SuiteInstance queryPlanExecutorAll; } // namespace QueryPlanExecutor