summaryrefslogtreecommitdiff
path: root/src/mongo/db/ops/write_ops_exec.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/ops/write_ops_exec.cpp')
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp630
1 files changed, 630 insertions, 0 deletions
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
new file mode 100644
index 00000000000..a995d1b25eb
--- /dev/null
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -0,0 +1,630 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kWrite
+
+#include "mongo/platform/basic.h"
+
+#include <memory>
+
+#include "mongo/base/checked_cast.h"
+#include "mongo/db/audit.h"
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/database_holder.h"
+#include "mongo/db/catalog/document_validation.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/curop_metrics.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/exec/delete.h"
+#include "mongo/db/exec/update.h"
+#include "mongo/db/introspect.h"
+#include "mongo/db/lasterror.h"
+#include "mongo/db/ops/delete_request.h"
+#include "mongo/db/ops/insert.h"
+#include "mongo/db/ops/parsed_delete.h"
+#include "mongo/db/ops/parsed_update.h"
+#include "mongo/db/ops/update_lifecycle_impl.h"
+#include "mongo/db/ops/update_request.h"
+#include "mongo/db/ops/write_ops_exec.h"
+#include "mongo/db/query/get_executor.h"
+#include "mongo/db/query/plan_summary_stats.h"
+#include "mongo/db/query/query_knobs.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/stats/counters.h"
+#include "mongo/db/stats/top.h"
+#include "mongo/db/write_concern.h"
+#include "mongo/rpc/command_reply.h"
+#include "mongo/rpc/command_reply_builder.h"
+#include "mongo/rpc/command_request.h"
+#include "mongo/rpc/command_request_builder.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/stale_exception.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+
+// Convention in this file: generic helpers go in the anonymous namespace. Helpers that are for a
+// single type of operation are static functions defined above their caller.
+namespace {
+
+void finishCurOp(OperationContext* txn, CurOp* curOp) {
+ try {
+ curOp->done();
+ int executionTimeMs = curOp->totalTimeMillis();
+ curOp->debug().executionTime = executionTimeMs;
+
+ recordCurOpMetrics(txn);
+ Top::get(txn->getServiceContext())
+ .record(curOp->getNS(),
+ curOp->getLogicalOp(),
+ 1, // "write locked"
+ curOp->totalTimeMicros(),
+ curOp->isCommand());
+
+ if (!curOp->debug().exceptionInfo.empty()) {
+ LOG(3) << "Caught Assertion in " << logicalOpToString(curOp->getLogicalOp()) << ": "
+ << curOp->debug().exceptionInfo.toString();
+ }
+
+ const bool logAll = logger::globalLogDomain()->shouldLog(logger::LogComponent::kCommand,
+ logger::LogSeverity::Debug(1));
+ const bool logSlow =
+ executionTimeMs > (serverGlobalParams.slowMS + curOp->getExpectedLatencyMs());
+
+ if (logAll || logSlow) {
+ Locker::LockerInfo lockerInfo;
+ txn->lockState()->getLockerInfo(&lockerInfo);
+ log() << curOp->debug().report(*curOp, lockerInfo.stats);
+ }
+
+ if (curOp->shouldDBProfile(executionTimeMs)) {
+ profile(txn, CurOp::get(txn)->getNetworkOp());
+ }
+ } catch (const DBException& ex) {
+ // We need to ignore all errors here. We don't want a successful op to fail because of a
+ // failure to record stats. We also don't want to replace the error reported for an op that
+ // is failing.
+ log() << "Ignoring error from finishCurOp: " << ex.toString();
+ }
+}
+
+/**
+ * Sets the Client's LastOp to the system OpTime if needed.
+ */
+class LastOpFixer {
+public:
+ LastOpFixer(OperationContext* txn, const NamespaceString& ns)
+ : _txn(txn), _isOnLocalDb(ns.isLocal()) {}
+
+ ~LastOpFixer() {
+ if (_needToFixLastOp && !_isOnLocalDb) {
+ // If this operation has already generated a new lastOp, don't bother setting it
+ // here. No-op updates will not generate a new lastOp, so we still need the
+ // guard to fire in that case. Operations on the local DB aren't replicated, so they
+ // don't need to bump the lastOp.
+ replClientInfo().setLastOpToSystemLastOpTime(_txn);
+ }
+ }
+
+ void startingOp() {
+ _needToFixLastOp = true;
+ _opTimeAtLastOpStart = replClientInfo().getLastOp();
+ }
+
+ void finishedOpSuccessfully() {
+ // If the op was succesful and bumped LastOp, we don't need to do it again. However, we
+ // still need to for no-ops and all failing ops.
+ _needToFixLastOp = (replClientInfo().getLastOp() == _opTimeAtLastOpStart);
+ }
+
+private:
+ repl::ReplClientInfo& replClientInfo() {
+ return repl::ReplClientInfo::forClient(_txn->getClient());
+ }
+
+ OperationContext* const _txn;
+ bool _needToFixLastOp = true;
+ const bool _isOnLocalDb;
+ repl::OpTime _opTimeAtLastOpStart;
+};
+
+void assertCanWrite_inlock(OperationContext* txn, const NamespaceString& ns) {
+ uassert(ErrorCodes::NotMaster,
+ str::stream() << "Not primary while writing to " << ns.ns(),
+ repl::ReplicationCoordinator::get(txn->getServiceContext())->canAcceptWritesFor(ns));
+ CollectionShardingState::get(txn, ns)->checkShardVersionOrThrow(txn);
+}
+
+void makeCollection(OperationContext* txn, const NamespaceString& ns) {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ AutoGetOrCreateDb db(txn, ns.db(), MODE_X);
+ assertCanWrite_inlock(txn, ns);
+ if (!db.getDb()->getCollection(ns.ns())) { // someone else may have beat us to it.
+ WriteUnitOfWork wuow(txn);
+ uassertStatusOK(userCreateNS(txn, db.getDb(), ns.ns(), BSONObj()));
+ wuow.commit();
+ }
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "implicit collection creation", ns.ns());
+}
+
+/**
+ * Returns true if the operation can continue.
+ */
+bool handleError(OperationContext* txn,
+ const DBException& ex,
+ const ParsedWriteOp& wholeOp,
+ WriteResult* out) {
+ LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg);
+ auto& curOp = *CurOp::get(txn);
+ curOp.debug().exceptionInfo = ex.getInfo();
+
+ if (ErrorCodes::isInterruption(ErrorCodes::Error(ex.getCode()))) {
+ throw; // These have always failed the whole batch.
+ }
+
+ if (ErrorCodes::isStaleShardingError(ErrorCodes::Error(ex.getCode()))) {
+ auto staleConfigException = dynamic_cast<const SendStaleConfigException*>(&ex);
+ if (!staleConfigException) {
+ // We need to get extra info off of the SCE, but some common patterns can result in the
+ // exception being converted to a Status then rethrown as a UserException, losing the
+ // info we need. It would be a bug if this happens so we want to detect it in testing,
+ // but it isn't severe enough that we should bring down the server if it happens in
+ // production.
+ dassert(staleConfigException);
+ msgassertedNoTrace(35475,
+ str::stream()
+ << "Got a StaleConfig error but exception was the wrong type: "
+ << demangleName(typeid(ex)));
+ }
+
+ ShardingState::get(txn)
+ ->onStaleShardVersion(txn, wholeOp.ns, staleConfigException->getVersionReceived());
+ out->staleConfigException =
+ stdx::make_unique<SendStaleConfigException>(*staleConfigException);
+ return false;
+ }
+
+ out->results.emplace_back(ex.toStatus());
+ return wholeOp.continueOnError;
+}
+
+} // namespace
+
+static WriteResult::SingleResult createIndex(OperationContext* txn,
+ const NamespaceString& systemIndexes,
+ const BSONObj& spec) {
+ BSONElement nsElement = spec["ns"];
+ uassert(ErrorCodes::NoSuchKey, "Missing \"ns\" field in index description", !nsElement.eoo());
+ uassert(ErrorCodes::TypeMismatch,
+ str::stream() << "Expected \"ns\" field of index description to be a "
+ "string, "
+ "but found a " << typeName(nsElement.type()),
+ nsElement.type() == String);
+ const NamespaceString ns(nsElement.valueStringData());
+ uassert(ErrorCodes::InvalidOptions,
+ str::stream() << "Cannot create an index on " << ns.ns() << " with an insert to "
+ << systemIndexes.ns(),
+ ns.db() == systemIndexes.db());
+
+ BSONObjBuilder cmdBuilder;
+ cmdBuilder << "createIndexes" << ns.coll();
+ cmdBuilder << "indexes" << BSON_ARRAY(spec);
+ BSONObj cmd = cmdBuilder.done();
+
+ rpc::CommandRequestBuilder requestBuilder;
+ auto cmdRequestMsg = requestBuilder.setDatabase(ns.db())
+ .setCommandName("createIndexes")
+ .setCommandArgs(cmd)
+ .setMetadata(rpc::makeEmptyMetadata())
+ .done();
+ rpc::CommandRequest cmdRequest(&cmdRequestMsg);
+ rpc::CommandReplyBuilder cmdReplyBuilder;
+ Command::findCommand("createIndexes")->run(txn, cmdRequest, &cmdReplyBuilder);
+ auto cmdReplyMsg = cmdReplyBuilder.done();
+ rpc::CommandReply cmdReply(&cmdReplyMsg);
+ auto cmdResult = cmdReply.getCommandReply();
+ uassertStatusOK(getStatusFromCommandResult(cmdResult));
+
+ // Unlike normal inserts, it is not an error to "insert" a duplicate index.
+ long long n =
+ cmdResult["numIndexesAfter"].numberInt() - cmdResult["numIndexesBefore"].numberInt();
+ CurOp::get(txn)->debug().ninserted += n;
+
+ return {n};
+}
+
+static WriteResult performCreateIndexes(OperationContext* txn, const InsertOp& wholeOp) {
+ // Currently this creates each index independently. We could pass multiple indexes to
+ // createIndexes, but there is a lot of complexity involved in doing it correctly. For one
+ // thing, createIndexes only takes indexes to a single collection, but this batch could include
+ // different collections. Additionally, the error handling is different: createIndexes is
+ // all-or-nothing while inserts are supposed to behave like a sequence that either skips over
+ // errors or stops at the first one. These could theoretically be worked around, but it doesn't
+ // seem worth it since users that want faster index builds should just use the createIndexes
+ // command rather than a legacy emulation.
+ LastOpFixer lastOpFixer(txn, wholeOp.ns);
+
+ // Creating an index can change the writeConcern. Make sure we set it back to what it was.
+ const auto oldWC = txn->getWriteConcern();
+ ON_BLOCK_EXIT([&] { txn->setWriteConcern(oldWC); });
+
+ WriteResult out;
+ for (auto&& spec : wholeOp.documents) {
+ try {
+ lastOpFixer.startingOp();
+ out.results.emplace_back(createIndex(txn, wholeOp.ns, spec));
+ lastOpFixer.finishedOpSuccessfully();
+ } catch (const DBException& ex) {
+ const bool canContinue = handleError(txn, ex, wholeOp, &out);
+ if (!canContinue)
+ break;
+ }
+ }
+ return out;
+}
+
+static void insertDocuments(OperationContext* txn,
+ const NamespaceString& ns,
+ std::vector<BSONObj>::const_iterator begin,
+ std::vector<BSONObj>::const_iterator end) {
+ auto& curOp = *CurOp::get(txn);
+ boost::optional<AutoGetCollection> collection;
+ while (true) {
+ txn->checkForInterrupt();
+ collection.emplace(txn, ns, MODE_IX);
+ if (collection->getCollection())
+ break;
+
+ collection.reset(); // unlock.
+ makeCollection(txn, ns);
+ }
+
+ curOp.raiseDbProfileLevel(collection->getDb()->getProfilingLevel());
+ assertCanWrite_inlock(txn, ns);
+
+ // Intentionally not using a WRITE_CONFLICT_RETRY_LOOP. That is handled by the caller so it can
+ // react to oversized batches.
+ WriteUnitOfWork wuow(txn);
+ uassertStatusOK(collection->getCollection()->insertDocuments(
+ txn, begin, end, &curOp.debug(), /*enforceQuota*/ true));
+ wuow.commit();
+}
+
+/**
+ * Returns true if caller should try to insert more documents. Does nothing else if batch is empty.
+ */
+static bool insertBatchAndHandleErrors(OperationContext* txn,
+ const InsertOp& wholeOp,
+ const std::vector<BSONObj>& batch,
+ LastOpFixer* lastOpFixer,
+ WriteResult* out) {
+ auto& curOp = *CurOp::get(txn);
+ if (batch.size() > 1) {
+ // First try doing it all together. If all goes well, this is all we need to do.
+ try {
+ lastOpFixer->startingOp();
+ insertDocuments(txn, wholeOp.ns, batch.begin(), batch.end());
+ lastOpFixer->finishedOpSuccessfully();
+ globalOpCounters.gotInserts(batch.size());
+ std::fill_n(
+ std::back_inserter(out->results), batch.size(), WriteResult::SingleResult{1});
+ curOp.debug().ninserted += batch.size();
+ return true;
+ } catch (const DBException& ex) {
+ // Ignore this failure and behave as-if we never tried to do the combined batch insert.
+ // The loop below will handle reporting any non-transient errors.
+ }
+ }
+
+ // Try to insert the batch one-at-a-time. This path is executed both for singular batches, and
+ // for batches that failed all-at-once inserting.
+ for (auto it = batch.begin(); it != batch.end(); ++it) {
+ globalOpCounters.gotInsert();
+ try {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ lastOpFixer->startingOp();
+ insertDocuments(txn, wholeOp.ns, it, it + 1);
+ lastOpFixer->finishedOpSuccessfully();
+ out->results.emplace_back(WriteResult::SingleResult{1});
+ curOp.debug().ninserted++;
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "insert", wholeOp.ns.ns());
+ } catch (const DBException& ex) {
+ bool canContinue = handleError(txn, ex, wholeOp, out);
+ if (!canContinue)
+ return false;
+ }
+ }
+
+ return true;
+}
+
+WriteResult performInserts(OperationContext* txn, const InsertOp& wholeOp) {
+ invariant(!txn->lockState()->inAWriteUnitOfWork()); // Does own retries.
+ auto& curOp = *CurOp::get(txn);
+ ON_BLOCK_EXIT([&] {
+ // This is the only part of finishCurOp we need to do for inserts because they reuse the
+ // top-level curOp. The rest is handled by the top-level entrypoint.
+ curOp.done();
+ Top::get(txn->getServiceContext())
+ .record(wholeOp.ns.ns(),
+ LogicalOp::opInsert,
+ 1 /* write locked*/,
+ curOp.totalTimeMicros(),
+ curOp.isCommand());
+
+ });
+
+ {
+ stdx::lock_guard<Client>(*txn->getClient());
+ curOp.setNS_inlock(wholeOp.ns.ns());
+ curOp.setLogicalOp_inlock(LogicalOp::opInsert);
+ curOp.ensureStarted();
+ curOp.debug().ninserted = 0;
+ }
+
+ uassertStatusOK(userAllowedWriteNS(wholeOp.ns));
+
+ if (wholeOp.ns.isSystemDotIndexes()) {
+ return performCreateIndexes(txn, wholeOp);
+ }
+
+ DisableDocumentValidationIfTrue docValidationDisabler(txn, wholeOp.bypassDocumentValidation);
+ LastOpFixer lastOpFixer(txn, wholeOp.ns);
+
+ WriteResult out;
+ out.results.reserve(wholeOp.documents.size());
+
+ size_t bytesInBatch = 0;
+ std::vector<BSONObj> batch;
+ const size_t maxBatchSize = internalQueryExecYieldIterations / 2;
+ batch.reserve(std::min(wholeOp.documents.size(), maxBatchSize));
+
+ for (auto&& doc : wholeOp.documents) {
+ const bool isLastDoc = (&doc == &wholeOp.documents.back());
+ auto fixedDoc = fixDocumentForInsert(doc);
+ if (!fixedDoc.isOK()) {
+ // Handled after we insert anything in the batch to be sure we report errors in the
+ // correct order. In an ordered insert, if one of the docs ahead of us fails, we should
+ // behave as-if we never got to this document.
+ } else {
+ batch.push_back(fixedDoc.getValue().isEmpty() ? doc : std::move(fixedDoc.getValue()));
+ bytesInBatch += batch.back().objsize();
+ if (!isLastDoc && batch.size() < maxBatchSize && bytesInBatch < insertVectorMaxBytes)
+ continue; // Add more to batch before inserting.
+ }
+
+ bool canContinue = insertBatchAndHandleErrors(txn, wholeOp, batch, &lastOpFixer, &out);
+ batch.clear(); // We won't need the current batch any more.
+ bytesInBatch = 0;
+
+ if (canContinue && !fixedDoc.isOK()) {
+ globalOpCounters.gotInsert();
+ canContinue = handleError(
+ txn,
+ UserException(fixedDoc.getStatus().code(), fixedDoc.getStatus().reason()),
+ wholeOp,
+ &out);
+ }
+
+ if (!canContinue)
+ break;
+ }
+
+ return out;
+}
+
+static WriteResult::SingleResult performSingleUpdateOp(OperationContext* txn,
+ const NamespaceString& ns,
+ const UpdateOp::SingleUpdate& op) {
+ globalOpCounters.gotUpdate();
+ auto& curOp = *CurOp::get(txn);
+ {
+ stdx::lock_guard<Client> lk(*txn->getClient());
+ curOp.setNS_inlock(ns.ns());
+ curOp.setNetworkOp_inlock(dbUpdate);
+ curOp.setLogicalOp_inlock(LogicalOp::opUpdate);
+ curOp.setQuery_inlock(op.query);
+ curOp.ensureStarted();
+
+ curOp.debug().query = op.query;
+ }
+
+ UpdateLifecycleImpl updateLifecycle(ns);
+ UpdateRequest request(ns);
+ request.setLifecycle(&updateLifecycle);
+ request.setQuery(op.query);
+ request.setUpdates(op.update);
+ request.setMulti(op.multi);
+ request.setUpsert(op.upsert);
+ request.setYieldPolicy(PlanExecutor::YIELD_AUTO); // ParsedUpdate overrides this for $isolated.
+
+ ParsedUpdate parsedUpdate(txn, &request);
+ uassertStatusOK(parsedUpdate.parseRequest());
+
+ ScopedTransaction scopedXact(txn, MODE_IX);
+ boost::optional<AutoGetCollection> collection;
+ while (true) {
+ txn->checkForInterrupt();
+ collection.emplace(txn,
+ ns,
+ MODE_IX, // DB is always IX, even if collection is X.
+ parsedUpdate.isIsolated() ? MODE_X : MODE_IX);
+ if (collection->getCollection() || !op.upsert)
+ break;
+
+ collection.reset(); // unlock.
+ makeCollection(txn, ns);
+ }
+
+ if (collection->getDb()) {
+ curOp.raiseDbProfileLevel(collection->getDb()->getProfilingLevel());
+ }
+
+ assertCanWrite_inlock(txn, ns);
+
+ auto exec = uassertStatusOK(
+ getExecutorUpdate(txn, &curOp.debug(), collection->getCollection(), &parsedUpdate));
+ uassertStatusOK(exec->executePlan());
+
+ PlanSummaryStats summary;
+ Explain::getSummaryStats(*exec, &summary);
+ if (collection->getCollection()) {
+ collection->getCollection()->infoCache()->notifyOfQuery(txn, summary.indexesUsed);
+ }
+
+ const UpdateStats* updateStats = UpdateStage::getUpdateStats(exec.get());
+ UpdateStage::recordUpdateStatsInOpDebug(updateStats, &curOp.debug());
+ curOp.debug().setPlanSummaryMetrics(summary);
+ UpdateResult res = UpdateStage::makeUpdateResult(updateStats);
+
+ const bool didInsert = !res.upserted.isEmpty();
+ const long long nMatchedOrInserted = didInsert ? 1 : res.numMatched;
+ LastError::get(txn->getClient()).recordUpdate(res.existing, nMatchedOrInserted, res.upserted);
+
+ return {nMatchedOrInserted, res.numDocsModified, res.upserted};
+}
+
+WriteResult performUpdates(OperationContext* txn, const UpdateOp& wholeOp) {
+ invariant(!txn->lockState()->inAWriteUnitOfWork()); // Does own retries.
+ uassertStatusOK(userAllowedWriteNS(wholeOp.ns));
+
+ DisableDocumentValidationIfTrue docValidationDisabler(txn, wholeOp.bypassDocumentValidation);
+ LastOpFixer lastOpFixer(txn, wholeOp.ns);
+
+ WriteResult out;
+ out.results.reserve(wholeOp.updates.size());
+ for (auto&& singleOp : wholeOp.updates) {
+ // TODO: don't create nested CurOp for legacy writes.
+ CurOp curOp(txn);
+ ON_BLOCK_EXIT([&] { finishCurOp(txn, &curOp); });
+ try {
+ lastOpFixer.startingOp();
+ out.results.emplace_back(performSingleUpdateOp(txn, wholeOp.ns, singleOp));
+ lastOpFixer.finishedOpSuccessfully();
+ } catch (const DBException& ex) {
+ const bool canContinue = handleError(txn, ex, wholeOp, &out);
+ if (!canContinue)
+ break;
+ }
+ }
+
+ return out;
+}
+
+static WriteResult::SingleResult performSingleDeleteOp(OperationContext* txn,
+ const NamespaceString& ns,
+ const DeleteOp::SingleDelete& op) {
+ globalOpCounters.gotDelete();
+ auto& curOp = *CurOp::get(txn);
+ {
+ stdx::lock_guard<Client> lk(*txn->getClient());
+ curOp.setNS_inlock(ns.ns());
+ curOp.setNetworkOp_inlock(dbDelete);
+ curOp.setLogicalOp_inlock(LogicalOp::opDelete);
+ curOp.setQuery_inlock(op.query);
+ curOp.ensureStarted();
+
+ curOp.debug().query = op.query;
+ curOp.debug().ndeleted = 0;
+ }
+
+ txn->checkForInterrupt();
+
+ DeleteRequest request(ns);
+ request.setQuery(op.query);
+ request.setMulti(op.multi);
+ request.setYieldPolicy(PlanExecutor::YIELD_AUTO); // ParsedDelete overrides this for $isolated.
+
+ ParsedDelete parsedDelete(txn, &request);
+ uassertStatusOK(parsedDelete.parseRequest());
+
+ ScopedTransaction scopedXact(txn, MODE_IX);
+ AutoGetCollection collection(txn,
+ ns,
+ MODE_IX, // DB is always IX, even if collection is X.
+ parsedDelete.isIsolated() ? MODE_X : MODE_IX);
+ if (collection.getDb()) {
+ curOp.raiseDbProfileLevel(collection.getDb()->getProfilingLevel());
+ }
+
+ assertCanWrite_inlock(txn, ns);
+
+ auto exec = uassertStatusOK(
+ getExecutorDelete(txn, &curOp.debug(), collection.getCollection(), &parsedDelete));
+ uassertStatusOK(exec->executePlan());
+ long long n = DeleteStage::getNumDeleted(*exec);
+ curOp.debug().ndeleted = n;
+
+ PlanSummaryStats summary;
+ Explain::getSummaryStats(*exec, &summary);
+ if (collection.getCollection()) {
+ collection.getCollection()->infoCache()->notifyOfQuery(txn, summary.indexesUsed);
+ }
+ curOp.debug().setPlanSummaryMetrics(summary);
+ LastError::get(txn->getClient()).recordDelete(n);
+
+ return {n};
+}
+
+WriteResult performDeletes(OperationContext* txn, const DeleteOp& wholeOp) {
+ invariant(!txn->lockState()->inAWriteUnitOfWork()); // Does own retries.
+ uassertStatusOK(userAllowedWriteNS(wholeOp.ns));
+
+ DisableDocumentValidationIfTrue docValidationDisabler(txn, wholeOp.bypassDocumentValidation);
+ LastOpFixer lastOpFixer(txn, wholeOp.ns);
+
+ WriteResult out;
+ out.results.reserve(wholeOp.deletes.size());
+ for (auto&& singleOp : wholeOp.deletes) {
+ // TODO: don't create nested CurOp for legacy writes.
+ CurOp curOp(txn);
+ ON_BLOCK_EXIT([&] { finishCurOp(txn, &curOp); });
+ try {
+ lastOpFixer.startingOp();
+ out.results.emplace_back(performSingleDeleteOp(txn, wholeOp.ns, singleOp));
+ lastOpFixer.finishedOpSuccessfully();
+ } catch (const DBException& ex) {
+ const bool canContinue = handleError(txn, ex, wholeOp, &out);
+ if (!canContinue)
+ break;
+ }
+ }
+
+ return out;
+}
+
+} // namespace mongo