summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Rassi <rassi@10gen.com>2014-07-30 15:12:11 -0400
committerJason Rassi <rassi@10gen.com>2014-07-30 16:12:02 -0400
commit13fe3b061fa3c5970c40973d6f36f69fa06130a5 (patch)
tree22f051cf09c9c64149f68580aa001b0195983ef0
parent76be742def16b4081e559374d66ede6a00cf3ff0 (diff)
downloadmongo-13fe3b061fa3c5970c40973d6f36f69fa06130a5.tar.gz
SERVER-14498 Add DeleteStage, rewrite DeleteExecutor to use it
-rw-r--r--jstests/core/stages_delete.js27
-rw-r--r--src/mongo/db/exec/SConscript1
-rw-r--r--src/mongo/db/exec/delete.cpp182
-rw-r--r--src/mongo/db/exec/delete.h106
-rw-r--r--src/mongo/db/exec/plan_stats.h10
-rw-r--r--src/mongo/db/exec/stagedebug_cmd.cpp26
-rw-r--r--src/mongo/db/ops/delete_executor.cpp97
-rw-r--r--src/mongo/db/query/get_executor.cpp71
-rw-r--r--src/mongo/db/query/get_executor.h35
-rw-r--r--src/mongo/db/query/plan_executor.h5
-rw-r--r--src/mongo/db/query/stage_types.h2
-rw-r--r--src/mongo/dbtests/query_stage_delete.cpp174
12 files changed, 656 insertions, 80 deletions
diff --git a/jstests/core/stages_delete.js b/jstests/core/stages_delete.js
new file mode 100644
index 00000000000..f8c7fd14ea1
--- /dev/null
+++ b/jstests/core/stages_delete.js
@@ -0,0 +1,27 @@
+// Test basic delete stage functionality.
+var coll = db.stages_delete;
+var collScanStage = {cscan: {args: {direction: 1}, filter: {deleteMe: true}}};
+var deleteStage;
+
+// Test delete stage with isMulti: true.
+coll.drop();
+assert.writeOK(coll.insert({deleteMe: true}));
+assert.writeOK(coll.insert({deleteMe: true}));
+assert.writeOK(coll.insert({deleteMe: false}));
+deleteStage = {delete: {args: {node: collScanStage, isMulti: true, shouldCallLogOp: true}}};
+assert.eq(coll.count(), 3);
+assert.commandWorked(db.runCommand({stageDebug: {collection: coll.getName(), plan: deleteStage}}));
+assert.eq(coll.count(), 1);
+assert.eq(coll.count({deleteMe: false}), 1);
+
+// Test delete stage with isMulti: false.
+coll.drop();
+assert.writeOK(coll.insert({deleteMe: true}));
+assert.writeOK(coll.insert({deleteMe: true}));
+assert.writeOK(coll.insert({deleteMe: false}));
+deleteStage = {delete: {args: {node: collScanStage, isMulti: false, shouldCallLogOp: true}}};
+assert.eq(coll.count(), 3);
+assert.commandWorked(db.runCommand({stageDebug: {collection: coll.getName(), plan: deleteStage}}));
+assert.eq(coll.count(), 2);
+assert.eq(coll.count({deleteMe: true}), 1);
+assert.eq(coll.count({deleteMe: false}), 1);
diff --git a/src/mongo/db/exec/SConscript b/src/mongo/db/exec/SConscript
index abfd4c0c458..d3bea2e5974 100644
--- a/src/mongo/db/exec/SConscript
+++ b/src/mongo/db/exec/SConscript
@@ -40,6 +40,7 @@ env.Library(
"cached_plan.cpp",
"collection_scan.cpp",
"count.cpp",
+ "delete.cpp",
"distinct_scan.cpp",
"eof.cpp",
"fetch.cpp",
diff --git a/src/mongo/db/exec/delete.cpp b/src/mongo/db/exec/delete.cpp
new file mode 100644
index 00000000000..f06fac15398
--- /dev/null
+++ b/src/mongo/db/exec/delete.cpp
@@ -0,0 +1,182 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/exec/delete.h"
+
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/repl_coordinator_global.h"
+
+namespace mongo {
+
+ // static
+ const char* DeleteStage::kStageType = "DELETE";
+
+ DeleteStage::DeleteStage(OperationContext* txn,
+ const DeleteStageParams& params,
+ WorkingSet* ws,
+ Collection* collection,
+ PlanStage* child)
+ : _txn(txn),
+ _params(params),
+ _ws(ws),
+ _collection(collection),
+ _child(child),
+ _commonStats(kStageType) { }
+
+ DeleteStage::~DeleteStage() {}
+
+ bool DeleteStage::isEOF() {
+ if (!_collection) {
+ return true;
+ }
+ if (!_params.isMulti && _specificStats.docsDeleted > 0) {
+ return true;
+ }
+ return _child->isEOF();
+ }
+
+ PlanStage::StageState DeleteStage::work(WorkingSetID* out) {
+ ++_commonStats.works;
+
+ // Adds the amount of time taken by work() to executionTimeMillis.
+ ScopedTimer timer(&_commonStats.executionTimeMillis);
+
+ if (isEOF()) { return PlanStage::IS_EOF; }
+ invariant(_collection); // If isEOF() returns false, we must have a collection.
+
+ WorkingSetID id = WorkingSet::INVALID_ID;
+ StageState status = _child->work(&id);
+
+ if (PlanStage::ADVANCED == status) {
+ WorkingSetMember* member = _ws->get(id);
+ if (!member->hasLoc()) {
+ _ws->free(id);
+ const std::string errmsg = "delete stage failed to read member w/ loc from child";
+ *out = WorkingSetCommon::allocateStatusMember(_ws, Status(ErrorCodes::InternalError,
+ errmsg));
+ return PlanStage::FAILURE;
+ }
+ DiskLoc rloc = member->loc;
+ _ws->free(id);
+
+ BSONObj deletedDoc;
+
+ WriteUnitOfWork wunit(_txn->recoveryUnit());
+
+ // TODO: Do we want to buffer docs and delete them in a group rather than
+ // saving/restoring state repeatedly?
+ saveState();
+ const bool deleteCappedOK = false;
+ const bool deleteNoWarn = false;
+ _collection->deleteDocument(_txn, rloc, deleteCappedOK, deleteNoWarn,
+ _params.shouldCallLogOp ? &deletedDoc : NULL);
+ restoreState(_txn);
+
+ ++_specificStats.docsDeleted;
+
+ if (_params.shouldCallLogOp) {
+ if (deletedDoc.isEmpty()) {
+ log() << "Deleted object without id in collection " << _collection->ns()
+ << ", not logging.";
+ }
+ else {
+ bool replJustOne = true;
+ repl::logOp(_txn, "d", _collection->ns().ns().c_str(), deletedDoc, 0,
+ &replJustOne);
+ }
+ }
+
+ wunit.commit();
+
+ _txn->recoveryUnit()->commitIfNeeded();
+
+ ++_commonStats.needTime;
+ return PlanStage::NEED_TIME;
+ }
+ else if (PlanStage::FAILURE == status) {
+ *out = id;
+ // If a stage fails, it may create a status WSM to indicate why it failed, in which case
+ // 'id' is valid. If ID is invalid, we create our own error message.
+ if (WorkingSet::INVALID_ID == id) {
+ const std::string errmsg = "delete stage failed to read in results from child";
+ *out = WorkingSetCommon::allocateStatusMember(_ws, Status(ErrorCodes::InternalError,
+ errmsg));
+ return PlanStage::FAILURE;
+ }
+ return status;
+ }
+ else {
+ if (PlanStage::NEED_TIME == status) {
+ ++_commonStats.needTime;
+ }
+ return status;
+ }
+ }
+
+ void DeleteStage::saveState() {
+ ++_commonStats.yields;
+ _child->saveState();
+ }
+
+ void DeleteStage::restoreState(OperationContext* opCtx) {
+ ++_commonStats.unyields;
+ _child->restoreState(opCtx);
+ }
+
+ void DeleteStage::invalidate(const DiskLoc& dl, InvalidationType type) {
+ ++_commonStats.invalidates;
+ _child->invalidate(dl, type);
+ }
+
+ vector<PlanStage*> DeleteStage::getChildren() const {
+ vector<PlanStage*> children;
+ children.push_back(_child.get());
+ return children;
+ }
+
+ PlanStageStats* DeleteStage::getStats() {
+ _commonStats.isEOF = isEOF();
+ auto_ptr<PlanStageStats> ret(new PlanStageStats(_commonStats, STAGE_DELETE));
+ ret->specific.reset(new DeleteStats(_specificStats));
+ ret->children.push_back(_child->getStats());
+ return ret.release();
+ }
+
+ const CommonStats* DeleteStage::getCommonStats() {
+ return &_commonStats;
+ }
+
+ const SpecificStats* DeleteStage::getSpecificStats() {
+ return &_specificStats;
+ }
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/delete.h b/src/mongo/db/exec/delete.h
new file mode 100644
index 00000000000..3e24375a5dc
--- /dev/null
+++ b/src/mongo/db/exec/delete.h
@@ -0,0 +1,106 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/exec/plan_stage.h"
+#include "mongo/db/jsobj.h"
+
+namespace mongo {
+
+ class OperationContext;
+
+ struct DeleteStageParams {
+ DeleteStageParams() : isMulti(false), shouldCallLogOp(false) { }
+
+ // Should we delete all documents returned from the child (a "multi delete"), or at most one
+ // (a "single delete")?
+ bool isMulti;
+
+ // Should we write each delete to the oplog?
+ bool shouldCallLogOp;
+ };
+
+ /**
+ * This stage delete documents by DiskLoc that are returned from its child. NEED_TIME
+ * is returned after deleting a document.
+ *
+ * Callers of work() must be holding a write lock (and, for shouldCallLogOp=true deletes,
+ * callers must have had the replication coordinator approve the write).
+ */
+ class DeleteStage : public PlanStage {
+ MONGO_DISALLOW_COPYING(DeleteStage);
+ public:
+ DeleteStage(OperationContext* txn,
+ const DeleteStageParams& params,
+ WorkingSet* ws,
+ Collection* collection,
+ PlanStage* child);
+ virtual ~DeleteStage();
+
+ virtual bool isEOF();
+ virtual StageState work(WorkingSetID* out);
+
+ virtual void saveState();
+ virtual void restoreState(OperationContext* opCtx);
+ virtual void invalidate(const DiskLoc& dl, InvalidationType type);
+
+ virtual std::vector<PlanStage*> getChildren() const;
+
+ virtual StageType stageType() const { return STAGE_DELETE; }
+
+ virtual PlanStageStats* getStats();
+
+ virtual const CommonStats* getCommonStats();
+
+ virtual const SpecificStats* getSpecificStats();
+
+ static const char* kStageType;
+
+ private:
+ // Transactional context. Not owned by us.
+ OperationContext* _txn;
+
+ DeleteStageParams _params;
+
+ // Not owned by us.
+ WorkingSet* _ws;
+
+ // Collection to operate on. Not owned by us. Can be NULL (if NULL, isEOF() will always
+ // return true). If non-NULL, the lifetime of the collection must supersede that of the
+ // stage.
+ Collection* _collection;
+
+ scoped_ptr<PlanStage> _child;
+
+ // Stats
+ CommonStats _commonStats;
+ DeleteStats _specificStats;
+ };
+
+} // namespace mongo
diff --git a/src/mongo/db/exec/plan_stats.h b/src/mongo/db/exec/plan_stats.h
index f22b9bef7c2..adbc0b6d986 100644
--- a/src/mongo/db/exec/plan_stats.h
+++ b/src/mongo/db/exec/plan_stats.h
@@ -264,6 +264,16 @@ namespace mongo {
};
+ struct DeleteStats : public SpecificStats {
+ DeleteStats() : docsDeleted(0) { }
+
+ virtual SpecificStats* clone() const {
+ return new DeleteStats(*this);
+ }
+
+ size_t docsDeleted;
+ };
+
struct DistinctScanStats : public SpecificStats {
DistinctScanStats() : keysExamined(0) { }
diff --git a/src/mongo/db/exec/stagedebug_cmd.cpp b/src/mongo/db/exec/stagedebug_cmd.cpp
index ea8fff78edd..6251076f8a8 100644
--- a/src/mongo/db/exec/stagedebug_cmd.cpp
+++ b/src/mongo/db/exec/stagedebug_cmd.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/exec/and_hash.h"
#include "mongo/db/exec/and_sorted.h"
#include "mongo/db/exec/collection_scan.h"
+#include "mongo/db/exec/delete.h"
#include "mongo/db/exec/fetch.h"
#include "mongo/db/exec/index_scan.h"
#include "mongo/db/exec/limit.h"
@@ -78,6 +79,7 @@ namespace mongo {
* node -> {skip: {args: {node: node, num: posint}}}
* node -> {sort: {args: {node: node, pattern: objWithSortCriterion }}}
* node -> {mergeSort: {args: {nodes: [node, node], pattern: objWithSortCriterion}}}
+ * node -> {delete: {args: {node: node, isMulti: bool, shouldCallLogOp: bool}}}
*
* Forthcoming Nodes:
*
@@ -116,7 +118,10 @@ namespace mongo {
string collName = collElt.String();
// Need a context to get the actual Collection*
- Client::ReadContext ctx(txn, dbname);
+ // TODO A write lock is currently taken here to accommodate stages that perform writes
+ // (e.g. DeleteStage). This should be changed to use a read lock for read-only
+ // execution trees.
+ Client::WriteContext ctx(txn, dbname);
// Make sure the collection is valid.
Database* db = ctx.ctx().db();
@@ -409,6 +414,25 @@ namespace mongo {
return new TextStage(txn, params, workingSet, matcher);
}
+ else if ("delete" == nodeName) {
+ uassert(18636, "Delete stage doesn't have a filter (put it on the child)",
+ NULL == matcher);
+ uassert(18637, "node argument must be provided to delete",
+ nodeArgs["node"].isABSONObj());
+ uassert(18638, "isMulti argument must be provided to delete",
+ nodeArgs["isMulti"].type() == Bool);
+ uassert(18639, "shouldCallLogOp argument must be provided to delete",
+ nodeArgs["shouldCallLogOp"].type() == Bool);
+ PlanStage* subNode = parseQuery(txn,
+ collection,
+ nodeArgs["node"].Obj(),
+ workingSet,
+ exprs);
+ DeleteStageParams params;
+ params.isMulti = nodeArgs["isMulti"].Bool();
+ params.shouldCallLogOp = nodeArgs["shouldCallLogOp"].Bool();
+ return new DeleteStage(txn, params, workingSet, collection, subNode);
+ }
else {
return NULL;
}
diff --git a/src/mongo/db/ops/delete_executor.cpp b/src/mongo/db/ops/delete_executor.cpp
index 42459e77f05..ba36650e634 100644
--- a/src/mongo/db/ops/delete_executor.cpp
+++ b/src/mongo/db/ops/delete_executor.cpp
@@ -32,15 +32,11 @@
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
-#include "mongo/db/client.h"
-#include "mongo/db/curop.h"
+#include "mongo/db/exec/delete.h"
#include "mongo/db/ops/delete_request.h"
#include "mongo/db/query/canonical_query.h"
#include "mongo/db/query/get_executor.h"
-#include "mongo/db/query/lite_parsed_query.h"
-#include "mongo/db/query/query_planner_common.h"
#include "mongo/db/repl/repl_coordinator_global.h"
-#include "mongo/db/repl/oplog.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/mongoutils/str.h"
@@ -87,7 +83,7 @@ namespace mongo {
mongoutils::str::stream() <<
"DeleteExecutor::prepare() failed to parse query " << _request->getQuery(),
_isQueryParsed);
- const bool logop = _request->shouldCallLogOp();
+
const NamespaceString& ns(_request->getNamespaceString());
if (!_request->isGod()) {
if (ns.isSystem()) {
@@ -97,7 +93,7 @@ namespace mongo {
}
if (ns.ns().find('$') != string::npos) {
log() << "cannot delete from collection with reserved $ in name: " << ns << endl;
- uasserted( 10100, "cannot delete from collection with reserved $ in name" );
+ uasserted(10100, "cannot delete from collection with reserved $ in name");
}
}
@@ -112,86 +108,35 @@ namespace mongo {
uassert(ErrorCodes::NotMaster,
str::stream() << "Not primary while removing from " << ns.ns(),
- !logop ||
+ !_request->shouldCallLogOp() ||
repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(ns.db()));
- long long nDeleted = 0;
-
PlanExecutor* rawExec;
if (_canonicalQuery.get()) {
- uassertStatusOK(getExecutor(_request->getOpCtx(),
- collection,
- _canonicalQuery.release(),
- &rawExec));
+ // This is the non-idhack branch.
+ uassertStatusOK(getExecutorDelete(_request->getOpCtx(), collection,
+ _canonicalQuery.release(), _request->isMulti(),
+ _request->shouldCallLogOp(), &rawExec));
}
else {
- uassertStatusOK(getExecutor(_request->getOpCtx(),
- collection,
- ns.ns(),
- _request->getQuery(),
- &rawExec));
+ // This is the idhack branch.
+ uassertStatusOK(getExecutorDelete(_request->getOpCtx(), collection, ns.ns(),
+ _request->getQuery(), _request->isMulti(),
+ _request->shouldCallLogOp(), &rawExec));
}
-
- auto_ptr<PlanExecutor> exec(rawExec);
+ scoped_ptr<PlanExecutor> exec(rawExec);
// Concurrently mutating state (by us) so we need to register 'exec'.
- ScopedExecutorRegistration safety(exec.get());
-
- DiskLoc rloc;
- PlanExecutor::ExecState state;
- CurOp* curOp = _request->getOpCtx()->getCurOp();
- int oldYieldCount = curOp->numYields();
- while (PlanExecutor::ADVANCED == (state = exec->getNext(NULL, &rloc))) {
- if (oldYieldCount != curOp->numYields()) {
- uassert(ErrorCodes::NotMaster,
- str::stream() << "No longer primary while removing from " << ns.ns(),
- !logop ||
- repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(
- ns.db()));
- oldYieldCount = curOp->numYields();
- }
- BSONObj toDelete;
-
- WriteUnitOfWork wunit(_request->getOpCtx()->recoveryUnit());
-
- // TODO: do we want to buffer docs and delete them in a group rather than
- // saving/restoring state repeatedly?
- exec->saveState();
- collection->deleteDocument(
- _request->getOpCtx(), rloc, false, false, logop ? &toDelete : NULL);
- exec->restoreState(_request->getOpCtx());
-
- nDeleted++;
-
- if (logop) {
- if ( toDelete.isEmpty() ) {
- log() << "Deleted object without id in collection " << collection->ns()
- << ", not logging.";
- }
- else {
- bool replJustOne = true;
- repl::logOp(
- _request->getOpCtx(), "d", ns.ns().c_str(), toDelete, 0, &replJustOne);
- }
- }
-
- wunit.commit();
+ const ScopedExecutorRegistration safety(exec.get());
- if (!_request->isMulti()) {
- break;
- }
-
- if (!_request->isGod()) {
- _request->getOpCtx()->recoveryUnit()->commitIfNeeded();
- }
-
- if (debug && _request->isGod() && nDeleted == 100) {
- log() << "warning high number of deletes with god=true "
- << " which could use significant memory b/c we don't commit journal";
- }
- }
+ uassertStatusOK(exec->executePlan());
- return nDeleted;
+ // Extract the number of documents deleted from the DeleteStage stats.
+ invariant(exec->getRootStage()->stageType() == STAGE_DELETE);
+ DeleteStage* deleteStage = static_cast<DeleteStage*>(exec->getRootStage());
+ const DeleteStats* deleteStats =
+ static_cast<const DeleteStats*>(deleteStage->getSpecificStats());
+ return deleteStats->docsDeleted;
}
} // namespace mongo
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index daa832034c6..305888b8c64 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -35,6 +35,7 @@
#include "mongo/base/parse_number.h"
#include "mongo/client/dbclientinterface.h"
#include "mongo/db/exec/cached_plan.h"
+#include "mongo/db/exec/delete.h"
#include "mongo/db/exec/eof.h"
#include "mongo/db/exec/idhack.h"
#include "mongo/db/exec/multi_plan.h"
@@ -458,6 +459,76 @@ namespace mongo {
return Status::OK();
}
+ Status getExecutorDelete(OperationContext* txn,
+ Collection* collection,
+ CanonicalQuery* rawCanonicalQuery,
+ bool isMulti,
+ bool shouldCallLogOp,
+ PlanExecutor** out) {
+ auto_ptr<CanonicalQuery> canonicalQuery(rawCanonicalQuery);
+ auto_ptr<WorkingSet> ws(new WorkingSet());
+ PlanStage* root;
+ QuerySolution* querySolution;
+ Status status = prepareExecution(txn, collection, ws.get(), canonicalQuery.get(), 0, &root,
+ &querySolution);
+ if (!status.isOK()) {
+ return status;
+ }
+ invariant(root);
+ DeleteStageParams deleteStageParams;
+ deleteStageParams.isMulti = isMulti;
+ deleteStageParams.shouldCallLogOp = shouldCallLogOp;
+ root = new DeleteStage(txn, deleteStageParams, ws.get(), collection, root);
+ // We must have a tree of stages in order to have a valid plan executor, but the query
+ // solution may be null.
+ *out = new PlanExecutor(ws.release(), root, querySolution, canonicalQuery.release(),
+ collection);
+ return Status::OK();
+ }
+
+ Status getExecutorDelete(OperationContext* txn,
+ Collection* collection,
+ const std::string& ns,
+ const BSONObj& unparsedQuery,
+ bool isMulti,
+ bool shouldCallLogOp,
+ PlanExecutor** out) {
+ auto_ptr<WorkingSet> ws(new WorkingSet());
+ DeleteStageParams deleteStageParams;
+ deleteStageParams.isMulti = isMulti;
+ deleteStageParams.shouldCallLogOp = shouldCallLogOp;
+ if (!collection) {
+ LOG(2) << "Collection " << ns << " does not exist."
+ << " Using EOF stage: " << unparsedQuery.toString();
+ DeleteStage* deleteStage = new DeleteStage(txn, deleteStageParams, ws.get(), NULL,
+ new EOFStage());
+ *out = new PlanExecutor(ws.release(), deleteStage, ns);
+ return Status::OK();
+ }
+
+ if (CanonicalQuery::isSimpleIdQuery(unparsedQuery) &&
+ collection->getIndexCatalog()->findIdIndex()) {
+ LOG(2) << "Using idhack: " << unparsedQuery.toString();
+
+ PlanStage* idHackStage = new IDHackStage(txn, collection, unparsedQuery["_id"].wrap(),
+ ws.get());
+ DeleteStage* root = new DeleteStage(txn, deleteStageParams, ws.get(), collection,
+ idHackStage);
+ *out = new PlanExecutor(ws.release(), root, collection);
+ return Status::OK();
+ }
+
+ const WhereCallbackReal whereCallback(txn, collection->ns().db());
+ CanonicalQuery* cq;
+ Status status = CanonicalQuery::canonicalize(collection->ns(), unparsedQuery, &cq,
+ whereCallback);
+ if (!status.isOK())
+ return status;
+
+ // Takes ownership of 'cq'.
+ return getExecutorDelete(txn, collection, cq, isMulti, shouldCallLogOp, out);
+ }
+
//
// Count hack
//
diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h
index d9238741253..3e8cb781bd0 100644
--- a/src/mongo/db/query/get_executor.h
+++ b/src/mongo/db/query/get_executor.h
@@ -121,4 +121,39 @@ namespace mongo {
const BSONObj& hintObj,
PlanExecutor** execOut);
+ /**
+ * Get a PlanExecutor for a delete operation. 'rawCanonicalQuery' describes the predicate for
+ * the documents to be deleted. A write lock is required to execute the returned plan.
+ *
+ * Takes ownership of 'rawCanonicalQuery'.
+ *
+ * If the query is valid and an executor could be created, returns Status::OK() and populates
+ * *out with the PlanExecutor.
+ *
+ * If the query cannot be executed, returns a Status indicating why.
+ */
+ Status getExecutorDelete(OperationContext* txn,
+ Collection* collection,
+ CanonicalQuery* rawCanonicalQuery,
+ bool isMulti,
+ bool shouldCallLogOp,
+ PlanExecutor** execOut);
+
+ /**
+ * Overload of getExecutorDelete() above, for when a canonicalQuery is not available. Used to
+ * support idhack-powered deletes.
+ *
+ * If the query is valid and an executor could be created, returns Status::OK() and populates
+ * *out with the PlanExecutor.
+ *
+ * If the query cannot be executed, returns a Status indicating why.
+ */
+ Status getExecutorDelete(OperationContext* txn,
+ Collection* collection,
+ const std::string& ns,
+ const BSONObj& unparsedQuery,
+ bool isMulti,
+ bool shouldCallLogOp,
+ PlanExecutor** execOut);
+
} // namespace mongo
diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h
index a0d80f28615..b592ee1e5c7 100644
--- a/src/mongo/db/query/plan_executor.h
+++ b/src/mongo/db/query/plan_executor.h
@@ -220,9 +220,8 @@ namespace mongo {
void kill();
/**
- * Execute the plan to completion, throwing out the results.
- *
- * Used by explain.
+ * Execute the plan to completion, throwing out the results. Used when you want to work the
+ * underlying tree without getting results back.
*/
Status executePlan();
diff --git a/src/mongo/db/query/stage_types.h b/src/mongo/db/query/stage_types.h
index 6b841645ffb..791145cce91 100644
--- a/src/mongo/db/query/stage_types.h
+++ b/src/mongo/db/query/stage_types.h
@@ -44,6 +44,8 @@ namespace mongo {
// them.
STAGE_COUNT,
+ STAGE_DELETE,
+
// If we're running a distinct, we only care about one value for each key. The distinct
// stage is an ixscan with some key-skipping behvaior that only distinct uses.
STAGE_DISTINCT,
diff --git a/src/mongo/dbtests/query_stage_delete.cpp b/src/mongo/dbtests/query_stage_delete.cpp
new file mode 100644
index 00000000000..96c7fad666a
--- /dev/null
+++ b/src/mongo/dbtests/query_stage_delete.cpp
@@ -0,0 +1,174 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+/**
+ * This file tests db/exec/delete.cpp.
+ */
+
+#include "mongo/db/catalog/database.h"
+#include "mongo/db/exec/collection_scan.h"
+#include "mongo/db/exec/delete.h"
+#include "mongo/db/operation_context_impl.h"
+#include "mongo/dbtests/dbtests.h"
+
+namespace QueryStageDelete {
+
+ //
+ // Stage-specific tests.
+ //
+
+ class QueryStageDeleteBase {
+ public:
+ QueryStageDeleteBase() : _client(&_txn) {
+ Client::WriteContext ctx(&_txn, ns());
+
+ for (size_t i = 0; i < numObj(); ++i) {
+ BSONObjBuilder bob;
+ bob.append("foo", static_cast<long long int>(i));
+ _client.insert(ns(), bob.obj());
+ }
+ ctx.commit();
+ }
+
+ virtual ~QueryStageDeleteBase() {
+ Client::WriteContext ctx(&_txn, ns());
+ _client.dropCollection(ns());
+ ctx.commit();
+ }
+
+ void remove(const BSONObj& obj) {
+ _client.remove(ns(), obj);
+ }
+
+ void getLocs(Collection* collection,
+ CollectionScanParams::Direction direction,
+ vector<DiskLoc>* out) {
+ WorkingSet ws;
+
+ CollectionScanParams params;
+ params.collection = collection;
+ params.direction = direction;
+ params.tailable = false;
+
+ scoped_ptr<CollectionScan> scan(new CollectionScan(&_txn, params, &ws, NULL));
+ while (!scan->isEOF()) {
+ WorkingSetID id = WorkingSet::INVALID_ID;
+ PlanStage::StageState state = scan->work(&id);
+ if (PlanStage::ADVANCED == state) {
+ WorkingSetMember* member = ws.get(id);
+ verify(member->hasLoc());
+ out->push_back(member->loc);
+ }
+ }
+ }
+
+ static size_t numObj() { return 50; }
+
+ static const char* ns() { return "unittests.QueryStageDelete"; }
+
+ protected:
+ OperationContextImpl _txn;
+
+ private:
+ DBDirectClient _client;
+ };
+
+ //
+ // Test invalidation for the delete stage. Use the delete stage to delete some objects
+ // retrieved by a collscan, then invalidate the upcoming object, then expect the delete stage to
+ // skip over it and successfully delete the rest.
+ //
+ class QueryStageDeleteInvalidateUpcomingObject : public QueryStageDeleteBase {
+ public:
+ void run() {
+ Client::WriteContext ctx(&_txn, ns());
+
+ Collection* coll = ctx.ctx().db()->getCollection(&_txn, ns());
+
+ // Get the DiskLocs that would be returned by an in-order scan.
+ vector<DiskLoc> locs;
+ getLocs(coll, CollectionScanParams::FORWARD, &locs);
+
+ // Configure the scan.
+ CollectionScanParams collScanParams;
+ collScanParams.collection = coll;
+ collScanParams.direction = CollectionScanParams::FORWARD;
+ collScanParams.tailable = false;
+
+ // Configure the delete stage.
+ DeleteStageParams deleteStageParams;
+ deleteStageParams.isMulti = true;
+ deleteStageParams.shouldCallLogOp = false;
+
+ WorkingSet ws;
+ DeleteStage deleteStage(&_txn, deleteStageParams, &ws, coll,
+ new CollectionScan(&_txn, collScanParams, &ws, NULL));
+
+ const DeleteStats* stats =
+ static_cast<const DeleteStats*>(deleteStage.getSpecificStats());
+
+ const size_t targetDocIndex = 10;
+
+ while (stats->docsDeleted < targetDocIndex) {
+ WorkingSetID id = WorkingSet::INVALID_ID;
+ PlanStage::StageState state = deleteStage.work(&id);
+ ASSERT_EQUALS(PlanStage::NEED_TIME, state);
+ }
+
+ // Remove locs[targetDocIndex];
+ deleteStage.saveState();
+ deleteStage.invalidate(locs[targetDocIndex], INVALIDATION_DELETION);
+ BSONObj targetDoc = coll->docFor(locs[targetDocIndex]);
+ ASSERT(!targetDoc.isEmpty());
+ remove(targetDoc);
+ deleteStage.restoreState(&_txn);
+
+ // Remove the rest.
+ while (!deleteStage.isEOF()) {
+ WorkingSetID id = WorkingSet::INVALID_ID;
+ PlanStage::StageState state = deleteStage.work(&id);
+ invariant(PlanStage::NEED_TIME == state || PlanStage::IS_EOF == state);
+ }
+
+ ASSERT_EQUALS(numObj() - 1, stats->docsDeleted);
+
+ ctx.commit();
+ }
+ };
+
+ class All : public Suite {
+ public:
+ All() : Suite("query_stage_delete") {}
+
+ void setupTests() {
+ // Stage-specific tests below.
+ add<QueryStageDeleteInvalidateUpcomingObject>();
+ }
+ } all;
+
+}