diff options
author | Jason Rassi <rassi@10gen.com> | 2014-07-30 15:12:11 -0400 |
---|---|---|
committer | Jason Rassi <rassi@10gen.com> | 2014-07-30 16:12:02 -0400 |
commit | 13fe3b061fa3c5970c40973d6f36f69fa06130a5 (patch) | |
tree | 22f051cf09c9c64149f68580aa001b0195983ef0 | |
parent | 76be742def16b4081e559374d66ede6a00cf3ff0 (diff) | |
download | mongo-13fe3b061fa3c5970c40973d6f36f69fa06130a5.tar.gz |
SERVER-14498 Add DeleteStage, rewrite DeleteExecutor to use it
-rw-r--r-- | jstests/core/stages_delete.js | 27 | ||||
-rw-r--r-- | src/mongo/db/exec/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/exec/delete.cpp | 182 | ||||
-rw-r--r-- | src/mongo/db/exec/delete.h | 106 | ||||
-rw-r--r-- | src/mongo/db/exec/plan_stats.h | 10 | ||||
-rw-r--r-- | src/mongo/db/exec/stagedebug_cmd.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/ops/delete_executor.cpp | 97 | ||||
-rw-r--r-- | src/mongo/db/query/get_executor.cpp | 71 | ||||
-rw-r--r-- | src/mongo/db/query/get_executor.h | 35 | ||||
-rw-r--r-- | src/mongo/db/query/plan_executor.h | 5 | ||||
-rw-r--r-- | src/mongo/db/query/stage_types.h | 2 | ||||
-rw-r--r-- | src/mongo/dbtests/query_stage_delete.cpp | 174 |
12 files changed, 656 insertions, 80 deletions
diff --git a/jstests/core/stages_delete.js b/jstests/core/stages_delete.js new file mode 100644 index 00000000000..f8c7fd14ea1 --- /dev/null +++ b/jstests/core/stages_delete.js @@ -0,0 +1,27 @@ +// Test basic delete stage functionality. +var coll = db.stages_delete; +var collScanStage = {cscan: {args: {direction: 1}, filter: {deleteMe: true}}}; +var deleteStage; + +// Test delete stage with isMulti: true. +coll.drop(); +assert.writeOK(coll.insert({deleteMe: true})); +assert.writeOK(coll.insert({deleteMe: true})); +assert.writeOK(coll.insert({deleteMe: false})); +deleteStage = {delete: {args: {node: collScanStage, isMulti: true, shouldCallLogOp: true}}}; +assert.eq(coll.count(), 3); +assert.commandWorked(db.runCommand({stageDebug: {collection: coll.getName(), plan: deleteStage}})); +assert.eq(coll.count(), 1); +assert.eq(coll.count({deleteMe: false}), 1); + +// Test delete stage with isMulti: false. +coll.drop(); +assert.writeOK(coll.insert({deleteMe: true})); +assert.writeOK(coll.insert({deleteMe: true})); +assert.writeOK(coll.insert({deleteMe: false})); +deleteStage = {delete: {args: {node: collScanStage, isMulti: false, shouldCallLogOp: true}}}; +assert.eq(coll.count(), 3); +assert.commandWorked(db.runCommand({stageDebug: {collection: coll.getName(), plan: deleteStage}})); +assert.eq(coll.count(), 2); +assert.eq(coll.count({deleteMe: true}), 1); +assert.eq(coll.count({deleteMe: false}), 1); diff --git a/src/mongo/db/exec/SConscript b/src/mongo/db/exec/SConscript index abfd4c0c458..d3bea2e5974 100644 --- a/src/mongo/db/exec/SConscript +++ b/src/mongo/db/exec/SConscript @@ -40,6 +40,7 @@ env.Library( "cached_plan.cpp", "collection_scan.cpp", "count.cpp", + "delete.cpp", "distinct_scan.cpp", "eof.cpp", "fetch.cpp", diff --git a/src/mongo/db/exec/delete.cpp b/src/mongo/db/exec/delete.cpp new file mode 100644 index 00000000000..f06fac15398 --- /dev/null +++ b/src/mongo/db/exec/delete.cpp @@ -0,0 +1,182 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/exec/delete.h" + +#include "mongo/db/catalog/collection.h" +#include "mongo/db/exec/working_set_common.h" +#include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/repl_coordinator_global.h" + +namespace mongo { + + // static + const char* DeleteStage::kStageType = "DELETE"; + + DeleteStage::DeleteStage(OperationContext* txn, + const DeleteStageParams& params, + WorkingSet* ws, + Collection* collection, + PlanStage* child) + : _txn(txn), + _params(params), + _ws(ws), + _collection(collection), + _child(child), + _commonStats(kStageType) { } + + DeleteStage::~DeleteStage() {} + + bool DeleteStage::isEOF() { + if (!_collection) { + return true; + } + if (!_params.isMulti && _specificStats.docsDeleted > 0) { + return true; + } + return _child->isEOF(); + } + + PlanStage::StageState DeleteStage::work(WorkingSetID* out) { + ++_commonStats.works; + + // Adds the amount of time taken by work() to executionTimeMillis. + ScopedTimer timer(&_commonStats.executionTimeMillis); + + if (isEOF()) { return PlanStage::IS_EOF; } + invariant(_collection); // If isEOF() returns false, we must have a collection. + + WorkingSetID id = WorkingSet::INVALID_ID; + StageState status = _child->work(&id); + + if (PlanStage::ADVANCED == status) { + WorkingSetMember* member = _ws->get(id); + if (!member->hasLoc()) { + _ws->free(id); + const std::string errmsg = "delete stage failed to read member w/ loc from child"; + *out = WorkingSetCommon::allocateStatusMember(_ws, Status(ErrorCodes::InternalError, + errmsg)); + return PlanStage::FAILURE; + } + DiskLoc rloc = member->loc; + _ws->free(id); + + BSONObj deletedDoc; + + WriteUnitOfWork wunit(_txn->recoveryUnit()); + + // TODO: Do we want to buffer docs and delete them in a group rather than + // saving/restoring state repeatedly? + saveState(); + const bool deleteCappedOK = false; + const bool deleteNoWarn = false; + _collection->deleteDocument(_txn, rloc, deleteCappedOK, deleteNoWarn, + _params.shouldCallLogOp ? &deletedDoc : NULL); + restoreState(_txn); + + ++_specificStats.docsDeleted; + + if (_params.shouldCallLogOp) { + if (deletedDoc.isEmpty()) { + log() << "Deleted object without id in collection " << _collection->ns() + << ", not logging."; + } + else { + bool replJustOne = true; + repl::logOp(_txn, "d", _collection->ns().ns().c_str(), deletedDoc, 0, + &replJustOne); + } + } + + wunit.commit(); + + _txn->recoveryUnit()->commitIfNeeded(); + + ++_commonStats.needTime; + return PlanStage::NEED_TIME; + } + else if (PlanStage::FAILURE == status) { + *out = id; + // If a stage fails, it may create a status WSM to indicate why it failed, in which case + // 'id' is valid. If ID is invalid, we create our own error message. + if (WorkingSet::INVALID_ID == id) { + const std::string errmsg = "delete stage failed to read in results from child"; + *out = WorkingSetCommon::allocateStatusMember(_ws, Status(ErrorCodes::InternalError, + errmsg)); + return PlanStage::FAILURE; + } + return status; + } + else { + if (PlanStage::NEED_TIME == status) { + ++_commonStats.needTime; + } + return status; + } + } + + void DeleteStage::saveState() { + ++_commonStats.yields; + _child->saveState(); + } + + void DeleteStage::restoreState(OperationContext* opCtx) { + ++_commonStats.unyields; + _child->restoreState(opCtx); + } + + void DeleteStage::invalidate(const DiskLoc& dl, InvalidationType type) { + ++_commonStats.invalidates; + _child->invalidate(dl, type); + } + + vector<PlanStage*> DeleteStage::getChildren() const { + vector<PlanStage*> children; + children.push_back(_child.get()); + return children; + } + + PlanStageStats* DeleteStage::getStats() { + _commonStats.isEOF = isEOF(); + auto_ptr<PlanStageStats> ret(new PlanStageStats(_commonStats, STAGE_DELETE)); + ret->specific.reset(new DeleteStats(_specificStats)); + ret->children.push_back(_child->getStats()); + return ret.release(); + } + + const CommonStats* DeleteStage::getCommonStats() { + return &_commonStats; + } + + const SpecificStats* DeleteStage::getSpecificStats() { + return &_specificStats; + } + +} // namespace mongo diff --git a/src/mongo/db/exec/delete.h b/src/mongo/db/exec/delete.h new file mode 100644 index 00000000000..3e24375a5dc --- /dev/null +++ b/src/mongo/db/exec/delete.h @@ -0,0 +1,106 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/exec/plan_stage.h" +#include "mongo/db/jsobj.h" + +namespace mongo { + + class OperationContext; + + struct DeleteStageParams { + DeleteStageParams() : isMulti(false), shouldCallLogOp(false) { } + + // Should we delete all documents returned from the child (a "multi delete"), or at most one + // (a "single delete")? + bool isMulti; + + // Should we write each delete to the oplog? + bool shouldCallLogOp; + }; + + /** + * This stage delete documents by DiskLoc that are returned from its child. NEED_TIME + * is returned after deleting a document. + * + * Callers of work() must be holding a write lock (and, for shouldCallLogOp=true deletes, + * callers must have had the replication coordinator approve the write). + */ + class DeleteStage : public PlanStage { + MONGO_DISALLOW_COPYING(DeleteStage); + public: + DeleteStage(OperationContext* txn, + const DeleteStageParams& params, + WorkingSet* ws, + Collection* collection, + PlanStage* child); + virtual ~DeleteStage(); + + virtual bool isEOF(); + virtual StageState work(WorkingSetID* out); + + virtual void saveState(); + virtual void restoreState(OperationContext* opCtx); + virtual void invalidate(const DiskLoc& dl, InvalidationType type); + + virtual std::vector<PlanStage*> getChildren() const; + + virtual StageType stageType() const { return STAGE_DELETE; } + + virtual PlanStageStats* getStats(); + + virtual const CommonStats* getCommonStats(); + + virtual const SpecificStats* getSpecificStats(); + + static const char* kStageType; + + private: + // Transactional context. Not owned by us. + OperationContext* _txn; + + DeleteStageParams _params; + + // Not owned by us. + WorkingSet* _ws; + + // Collection to operate on. Not owned by us. Can be NULL (if NULL, isEOF() will always + // return true). If non-NULL, the lifetime of the collection must supersede that of the + // stage. + Collection* _collection; + + scoped_ptr<PlanStage> _child; + + // Stats + CommonStats _commonStats; + DeleteStats _specificStats; + }; + +} // namespace mongo diff --git a/src/mongo/db/exec/plan_stats.h b/src/mongo/db/exec/plan_stats.h index f22b9bef7c2..adbc0b6d986 100644 --- a/src/mongo/db/exec/plan_stats.h +++ b/src/mongo/db/exec/plan_stats.h @@ -264,6 +264,16 @@ namespace mongo { }; + struct DeleteStats : public SpecificStats { + DeleteStats() : docsDeleted(0) { } + + virtual SpecificStats* clone() const { + return new DeleteStats(*this); + } + + size_t docsDeleted; + }; + struct DistinctScanStats : public SpecificStats { DistinctScanStats() : keysExamined(0) { } diff --git a/src/mongo/db/exec/stagedebug_cmd.cpp b/src/mongo/db/exec/stagedebug_cmd.cpp index ea8fff78edd..6251076f8a8 100644 --- a/src/mongo/db/exec/stagedebug_cmd.cpp +++ b/src/mongo/db/exec/stagedebug_cmd.cpp @@ -35,6 +35,7 @@ #include "mongo/db/exec/and_hash.h" #include "mongo/db/exec/and_sorted.h" #include "mongo/db/exec/collection_scan.h" +#include "mongo/db/exec/delete.h" #include "mongo/db/exec/fetch.h" #include "mongo/db/exec/index_scan.h" #include "mongo/db/exec/limit.h" @@ -78,6 +79,7 @@ namespace mongo { * node -> {skip: {args: {node: node, num: posint}}} * node -> {sort: {args: {node: node, pattern: objWithSortCriterion }}} * node -> {mergeSort: {args: {nodes: [node, node], pattern: objWithSortCriterion}}} + * node -> {delete: {args: {node: node, isMulti: bool, shouldCallLogOp: bool}}} * * Forthcoming Nodes: * @@ -116,7 +118,10 @@ namespace mongo { string collName = collElt.String(); // Need a context to get the actual Collection* - Client::ReadContext ctx(txn, dbname); + // TODO A write lock is currently taken here to accommodate stages that perform writes + // (e.g. DeleteStage). This should be changed to use a read lock for read-only + // execution trees. + Client::WriteContext ctx(txn, dbname); // Make sure the collection is valid. Database* db = ctx.ctx().db(); @@ -409,6 +414,25 @@ namespace mongo { return new TextStage(txn, params, workingSet, matcher); } + else if ("delete" == nodeName) { + uassert(18636, "Delete stage doesn't have a filter (put it on the child)", + NULL == matcher); + uassert(18637, "node argument must be provided to delete", + nodeArgs["node"].isABSONObj()); + uassert(18638, "isMulti argument must be provided to delete", + nodeArgs["isMulti"].type() == Bool); + uassert(18639, "shouldCallLogOp argument must be provided to delete", + nodeArgs["shouldCallLogOp"].type() == Bool); + PlanStage* subNode = parseQuery(txn, + collection, + nodeArgs["node"].Obj(), + workingSet, + exprs); + DeleteStageParams params; + params.isMulti = nodeArgs["isMulti"].Bool(); + params.shouldCallLogOp = nodeArgs["shouldCallLogOp"].Bool(); + return new DeleteStage(txn, params, workingSet, collection, subNode); + } else { return NULL; } diff --git a/src/mongo/db/ops/delete_executor.cpp b/src/mongo/db/ops/delete_executor.cpp index 42459e77f05..ba36650e634 100644 --- a/src/mongo/db/ops/delete_executor.cpp +++ b/src/mongo/db/ops/delete_executor.cpp @@ -32,15 +32,11 @@ #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" -#include "mongo/db/client.h" -#include "mongo/db/curop.h" +#include "mongo/db/exec/delete.h" #include "mongo/db/ops/delete_request.h" #include "mongo/db/query/canonical_query.h" #include "mongo/db/query/get_executor.h" -#include "mongo/db/query/lite_parsed_query.h" -#include "mongo/db/query/query_planner_common.h" #include "mongo/db/repl/repl_coordinator_global.h" -#include "mongo/db/repl/oplog.h" #include "mongo/util/assert_util.h" #include "mongo/util/mongoutils/str.h" @@ -87,7 +83,7 @@ namespace mongo { mongoutils::str::stream() << "DeleteExecutor::prepare() failed to parse query " << _request->getQuery(), _isQueryParsed); - const bool logop = _request->shouldCallLogOp(); + const NamespaceString& ns(_request->getNamespaceString()); if (!_request->isGod()) { if (ns.isSystem()) { @@ -97,7 +93,7 @@ namespace mongo { } if (ns.ns().find('$') != string::npos) { log() << "cannot delete from collection with reserved $ in name: " << ns << endl; - uasserted( 10100, "cannot delete from collection with reserved $ in name" ); + uasserted(10100, "cannot delete from collection with reserved $ in name"); } } @@ -112,86 +108,35 @@ namespace mongo { uassert(ErrorCodes::NotMaster, str::stream() << "Not primary while removing from " << ns.ns(), - !logop || + !_request->shouldCallLogOp() || repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(ns.db())); - long long nDeleted = 0; - PlanExecutor* rawExec; if (_canonicalQuery.get()) { - uassertStatusOK(getExecutor(_request->getOpCtx(), - collection, - _canonicalQuery.release(), - &rawExec)); + // This is the non-idhack branch. + uassertStatusOK(getExecutorDelete(_request->getOpCtx(), collection, + _canonicalQuery.release(), _request->isMulti(), + _request->shouldCallLogOp(), &rawExec)); } else { - uassertStatusOK(getExecutor(_request->getOpCtx(), - collection, - ns.ns(), - _request->getQuery(), - &rawExec)); + // This is the idhack branch. + uassertStatusOK(getExecutorDelete(_request->getOpCtx(), collection, ns.ns(), + _request->getQuery(), _request->isMulti(), + _request->shouldCallLogOp(), &rawExec)); } - - auto_ptr<PlanExecutor> exec(rawExec); + scoped_ptr<PlanExecutor> exec(rawExec); // Concurrently mutating state (by us) so we need to register 'exec'. - ScopedExecutorRegistration safety(exec.get()); - - DiskLoc rloc; - PlanExecutor::ExecState state; - CurOp* curOp = _request->getOpCtx()->getCurOp(); - int oldYieldCount = curOp->numYields(); - while (PlanExecutor::ADVANCED == (state = exec->getNext(NULL, &rloc))) { - if (oldYieldCount != curOp->numYields()) { - uassert(ErrorCodes::NotMaster, - str::stream() << "No longer primary while removing from " << ns.ns(), - !logop || - repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase( - ns.db())); - oldYieldCount = curOp->numYields(); - } - BSONObj toDelete; - - WriteUnitOfWork wunit(_request->getOpCtx()->recoveryUnit()); - - // TODO: do we want to buffer docs and delete them in a group rather than - // saving/restoring state repeatedly? - exec->saveState(); - collection->deleteDocument( - _request->getOpCtx(), rloc, false, false, logop ? &toDelete : NULL); - exec->restoreState(_request->getOpCtx()); - - nDeleted++; - - if (logop) { - if ( toDelete.isEmpty() ) { - log() << "Deleted object without id in collection " << collection->ns() - << ", not logging."; - } - else { - bool replJustOne = true; - repl::logOp( - _request->getOpCtx(), "d", ns.ns().c_str(), toDelete, 0, &replJustOne); - } - } - - wunit.commit(); + const ScopedExecutorRegistration safety(exec.get()); - if (!_request->isMulti()) { - break; - } - - if (!_request->isGod()) { - _request->getOpCtx()->recoveryUnit()->commitIfNeeded(); - } - - if (debug && _request->isGod() && nDeleted == 100) { - log() << "warning high number of deletes with god=true " - << " which could use significant memory b/c we don't commit journal"; - } - } + uassertStatusOK(exec->executePlan()); - return nDeleted; + // Extract the number of documents deleted from the DeleteStage stats. + invariant(exec->getRootStage()->stageType() == STAGE_DELETE); + DeleteStage* deleteStage = static_cast<DeleteStage*>(exec->getRootStage()); + const DeleteStats* deleteStats = + static_cast<const DeleteStats*>(deleteStage->getSpecificStats()); + return deleteStats->docsDeleted; } } // namespace mongo diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index daa832034c6..305888b8c64 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -35,6 +35,7 @@ #include "mongo/base/parse_number.h" #include "mongo/client/dbclientinterface.h" #include "mongo/db/exec/cached_plan.h" +#include "mongo/db/exec/delete.h" #include "mongo/db/exec/eof.h" #include "mongo/db/exec/idhack.h" #include "mongo/db/exec/multi_plan.h" @@ -458,6 +459,76 @@ namespace mongo { return Status::OK(); } + Status getExecutorDelete(OperationContext* txn, + Collection* collection, + CanonicalQuery* rawCanonicalQuery, + bool isMulti, + bool shouldCallLogOp, + PlanExecutor** out) { + auto_ptr<CanonicalQuery> canonicalQuery(rawCanonicalQuery); + auto_ptr<WorkingSet> ws(new WorkingSet()); + PlanStage* root; + QuerySolution* querySolution; + Status status = prepareExecution(txn, collection, ws.get(), canonicalQuery.get(), 0, &root, + &querySolution); + if (!status.isOK()) { + return status; + } + invariant(root); + DeleteStageParams deleteStageParams; + deleteStageParams.isMulti = isMulti; + deleteStageParams.shouldCallLogOp = shouldCallLogOp; + root = new DeleteStage(txn, deleteStageParams, ws.get(), collection, root); + // We must have a tree of stages in order to have a valid plan executor, but the query + // solution may be null. + *out = new PlanExecutor(ws.release(), root, querySolution, canonicalQuery.release(), + collection); + return Status::OK(); + } + + Status getExecutorDelete(OperationContext* txn, + Collection* collection, + const std::string& ns, + const BSONObj& unparsedQuery, + bool isMulti, + bool shouldCallLogOp, + PlanExecutor** out) { + auto_ptr<WorkingSet> ws(new WorkingSet()); + DeleteStageParams deleteStageParams; + deleteStageParams.isMulti = isMulti; + deleteStageParams.shouldCallLogOp = shouldCallLogOp; + if (!collection) { + LOG(2) << "Collection " << ns << " does not exist." + << " Using EOF stage: " << unparsedQuery.toString(); + DeleteStage* deleteStage = new DeleteStage(txn, deleteStageParams, ws.get(), NULL, + new EOFStage()); + *out = new PlanExecutor(ws.release(), deleteStage, ns); + return Status::OK(); + } + + if (CanonicalQuery::isSimpleIdQuery(unparsedQuery) && + collection->getIndexCatalog()->findIdIndex()) { + LOG(2) << "Using idhack: " << unparsedQuery.toString(); + + PlanStage* idHackStage = new IDHackStage(txn, collection, unparsedQuery["_id"].wrap(), + ws.get()); + DeleteStage* root = new DeleteStage(txn, deleteStageParams, ws.get(), collection, + idHackStage); + *out = new PlanExecutor(ws.release(), root, collection); + return Status::OK(); + } + + const WhereCallbackReal whereCallback(txn, collection->ns().db()); + CanonicalQuery* cq; + Status status = CanonicalQuery::canonicalize(collection->ns(), unparsedQuery, &cq, + whereCallback); + if (!status.isOK()) + return status; + + // Takes ownership of 'cq'. + return getExecutorDelete(txn, collection, cq, isMulti, shouldCallLogOp, out); + } + // // Count hack // diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h index d9238741253..3e8cb781bd0 100644 --- a/src/mongo/db/query/get_executor.h +++ b/src/mongo/db/query/get_executor.h @@ -121,4 +121,39 @@ namespace mongo { const BSONObj& hintObj, PlanExecutor** execOut); + /** + * Get a PlanExecutor for a delete operation. 'rawCanonicalQuery' describes the predicate for + * the documents to be deleted. A write lock is required to execute the returned plan. + * + * Takes ownership of 'rawCanonicalQuery'. + * + * If the query is valid and an executor could be created, returns Status::OK() and populates + * *out with the PlanExecutor. + * + * If the query cannot be executed, returns a Status indicating why. + */ + Status getExecutorDelete(OperationContext* txn, + Collection* collection, + CanonicalQuery* rawCanonicalQuery, + bool isMulti, + bool shouldCallLogOp, + PlanExecutor** execOut); + + /** + * Overload of getExecutorDelete() above, for when a canonicalQuery is not available. Used to + * support idhack-powered deletes. + * + * If the query is valid and an executor could be created, returns Status::OK() and populates + * *out with the PlanExecutor. + * + * If the query cannot be executed, returns a Status indicating why. + */ + Status getExecutorDelete(OperationContext* txn, + Collection* collection, + const std::string& ns, + const BSONObj& unparsedQuery, + bool isMulti, + bool shouldCallLogOp, + PlanExecutor** execOut); + } // namespace mongo diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index a0d80f28615..b592ee1e5c7 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -220,9 +220,8 @@ namespace mongo { void kill(); /** - * Execute the plan to completion, throwing out the results. - * - * Used by explain. + * Execute the plan to completion, throwing out the results. Used when you want to work the + * underlying tree without getting results back. */ Status executePlan(); diff --git a/src/mongo/db/query/stage_types.h b/src/mongo/db/query/stage_types.h index 6b841645ffb..791145cce91 100644 --- a/src/mongo/db/query/stage_types.h +++ b/src/mongo/db/query/stage_types.h @@ -44,6 +44,8 @@ namespace mongo { // them. STAGE_COUNT, + STAGE_DELETE, + // If we're running a distinct, we only care about one value for each key. The distinct // stage is an ixscan with some key-skipping behvaior that only distinct uses. STAGE_DISTINCT, diff --git a/src/mongo/dbtests/query_stage_delete.cpp b/src/mongo/dbtests/query_stage_delete.cpp new file mode 100644 index 00000000000..96c7fad666a --- /dev/null +++ b/src/mongo/dbtests/query_stage_delete.cpp @@ -0,0 +1,174 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +/** + * This file tests db/exec/delete.cpp. + */ + +#include "mongo/db/catalog/database.h" +#include "mongo/db/exec/collection_scan.h" +#include "mongo/db/exec/delete.h" +#include "mongo/db/operation_context_impl.h" +#include "mongo/dbtests/dbtests.h" + +namespace QueryStageDelete { + + // + // Stage-specific tests. + // + + class QueryStageDeleteBase { + public: + QueryStageDeleteBase() : _client(&_txn) { + Client::WriteContext ctx(&_txn, ns()); + + for (size_t i = 0; i < numObj(); ++i) { + BSONObjBuilder bob; + bob.append("foo", static_cast<long long int>(i)); + _client.insert(ns(), bob.obj()); + } + ctx.commit(); + } + + virtual ~QueryStageDeleteBase() { + Client::WriteContext ctx(&_txn, ns()); + _client.dropCollection(ns()); + ctx.commit(); + } + + void remove(const BSONObj& obj) { + _client.remove(ns(), obj); + } + + void getLocs(Collection* collection, + CollectionScanParams::Direction direction, + vector<DiskLoc>* out) { + WorkingSet ws; + + CollectionScanParams params; + params.collection = collection; + params.direction = direction; + params.tailable = false; + + scoped_ptr<CollectionScan> scan(new CollectionScan(&_txn, params, &ws, NULL)); + while (!scan->isEOF()) { + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState state = scan->work(&id); + if (PlanStage::ADVANCED == state) { + WorkingSetMember* member = ws.get(id); + verify(member->hasLoc()); + out->push_back(member->loc); + } + } + } + + static size_t numObj() { return 50; } + + static const char* ns() { return "unittests.QueryStageDelete"; } + + protected: + OperationContextImpl _txn; + + private: + DBDirectClient _client; + }; + + // + // Test invalidation for the delete stage. Use the delete stage to delete some objects + // retrieved by a collscan, then invalidate the upcoming object, then expect the delete stage to + // skip over it and successfully delete the rest. + // + class QueryStageDeleteInvalidateUpcomingObject : public QueryStageDeleteBase { + public: + void run() { + Client::WriteContext ctx(&_txn, ns()); + + Collection* coll = ctx.ctx().db()->getCollection(&_txn, ns()); + + // Get the DiskLocs that would be returned by an in-order scan. + vector<DiskLoc> locs; + getLocs(coll, CollectionScanParams::FORWARD, &locs); + + // Configure the scan. + CollectionScanParams collScanParams; + collScanParams.collection = coll; + collScanParams.direction = CollectionScanParams::FORWARD; + collScanParams.tailable = false; + + // Configure the delete stage. + DeleteStageParams deleteStageParams; + deleteStageParams.isMulti = true; + deleteStageParams.shouldCallLogOp = false; + + WorkingSet ws; + DeleteStage deleteStage(&_txn, deleteStageParams, &ws, coll, + new CollectionScan(&_txn, collScanParams, &ws, NULL)); + + const DeleteStats* stats = + static_cast<const DeleteStats*>(deleteStage.getSpecificStats()); + + const size_t targetDocIndex = 10; + + while (stats->docsDeleted < targetDocIndex) { + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState state = deleteStage.work(&id); + ASSERT_EQUALS(PlanStage::NEED_TIME, state); + } + + // Remove locs[targetDocIndex]; + deleteStage.saveState(); + deleteStage.invalidate(locs[targetDocIndex], INVALIDATION_DELETION); + BSONObj targetDoc = coll->docFor(locs[targetDocIndex]); + ASSERT(!targetDoc.isEmpty()); + remove(targetDoc); + deleteStage.restoreState(&_txn); + + // Remove the rest. + while (!deleteStage.isEOF()) { + WorkingSetID id = WorkingSet::INVALID_ID; + PlanStage::StageState state = deleteStage.work(&id); + invariant(PlanStage::NEED_TIME == state || PlanStage::IS_EOF == state); + } + + ASSERT_EQUALS(numObj() - 1, stats->docsDeleted); + + ctx.commit(); + } + }; + + class All : public Suite { + public: + All() : Suite("query_stage_delete") {} + + void setupTests() { + // Stage-specific tests below. + add<QueryStageDeleteInvalidateUpcomingObject>(); + } + } all; + +} |