/**
* Copyright (C) 2016 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kWrite
#include "mongo/platform/basic.h"
#include
#include "mongo/base/checked_cast.h"
#include "mongo/db/audit.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/commands.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop_metrics.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/exec/delete.h"
#include "mongo/db/exec/update.h"
#include "mongo/db/introspect.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/ops/delete_request.h"
#include "mongo/db/ops/insert.h"
#include "mongo/db/ops/parsed_delete.h"
#include "mongo/db/ops/parsed_update.h"
#include "mongo/db/ops/update_lifecycle_impl.h"
#include "mongo/db/ops/update_request.h"
#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/ops/write_ops_gen.h"
#include "mongo/db/ops/write_ops_retryability.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/query_knobs.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/session_catalog.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/stats/top.h"
#include "mongo/db/write_concern.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
// Convention in this file: generic helpers go in the anonymous namespace. Helpers that are for a
// single type of operation are static functions defined above their caller.
namespace {
MONGO_FP_DECLARE(failAllInserts);
MONGO_FP_DECLARE(failAllUpdates);
MONGO_FP_DECLARE(failAllRemoves);
void finishCurOp(OperationContext* opCtx, CurOp* curOp) {
try {
curOp->done();
long long executionTimeMicros =
durationCount(curOp->elapsedTimeExcludingPauses());
curOp->debug().executionTimeMicros = executionTimeMicros;
recordCurOpMetrics(opCtx);
Top::get(opCtx->getServiceContext())
.record(opCtx,
curOp->getNS(),
curOp->getLogicalOp(),
Top::LockType::WriteLocked,
durationCount(curOp->elapsedTimeExcludingPauses()),
curOp->isCommand(),
curOp->getReadWriteType());
if (!curOp->debug().exceptionInfo.isOK()) {
LOG(3) << "Caught Assertion in " << redact(logicalOpToString(curOp->getLogicalOp()))
<< ": " << curOp->debug().exceptionInfo.toString();
}
const bool logAll = logger::globalLogDomain()->shouldLog(logger::LogComponent::kCommand,
logger::LogSeverity::Debug(1));
const bool logSlow = executionTimeMicros > (serverGlobalParams.slowMS * 1000LL);
const bool shouldSample = serverGlobalParams.sampleRate == 1.0
? true
: opCtx->getClient()->getPrng().nextCanonicalDouble() < serverGlobalParams.sampleRate;
if (logAll || (shouldSample && logSlow)) {
Locker::LockerInfo lockerInfo;
opCtx->lockState()->getLockerInfo(&lockerInfo);
log() << curOp->debug().report(opCtx->getClient(), *curOp, lockerInfo.stats);
}
if (curOp->shouldDBProfile(shouldSample)) {
profile(opCtx, CurOp::get(opCtx)->getNetworkOp());
}
} catch (const DBException& ex) {
// We need to ignore all errors here. We don't want a successful op to fail because of a
// failure to record stats. We also don't want to replace the error reported for an op that
// is failing.
log() << "Ignoring error from finishCurOp: " << redact(ex);
}
}
/**
* Sets the Client's LastOp to the system OpTime if needed.
*/
class LastOpFixer {
public:
LastOpFixer(OperationContext* opCtx, const NamespaceString& ns)
: _opCtx(opCtx), _isOnLocalDb(ns.isLocal()) {}
~LastOpFixer() {
if (_needToFixLastOp && !_isOnLocalDb) {
// If this operation has already generated a new lastOp, don't bother setting it
// here. No-op updates will not generate a new lastOp, so we still need the
// guard to fire in that case. Operations on the local DB aren't replicated, so they
// don't need to bump the lastOp.
replClientInfo().setLastOpToSystemLastOpTime(_opCtx);
}
}
void startingOp() {
_needToFixLastOp = true;
_opTimeAtLastOpStart = replClientInfo().getLastOp();
}
void finishedOpSuccessfully() {
// If the op was succesful and bumped LastOp, we don't need to do it again. However, we
// still need to for no-ops and all failing ops.
_needToFixLastOp = (replClientInfo().getLastOp() == _opTimeAtLastOpStart);
}
private:
repl::ReplClientInfo& replClientInfo() {
return repl::ReplClientInfo::forClient(_opCtx->getClient());
}
OperationContext* const _opCtx;
bool _needToFixLastOp = true;
const bool _isOnLocalDb;
repl::OpTime _opTimeAtLastOpStart;
};
void assertCanWrite_inlock(OperationContext* opCtx, const NamespaceString& ns) {
uassert(ErrorCodes::PrimarySteppedDown,
str::stream() << "Not primary while writing to " << ns.ns(),
repl::ReplicationCoordinator::get(opCtx->getServiceContext())
->canAcceptWritesFor(opCtx, ns));
CollectionShardingState::get(opCtx, ns)->checkShardVersionOrThrow(opCtx);
}
void makeCollection(OperationContext* opCtx, const NamespaceString& ns) {
writeConflictRetry(opCtx, "implicit collection creation", ns.ns(), [&opCtx, &ns] {
AutoGetOrCreateDb db(opCtx, ns.db(), MODE_X);
assertCanWrite_inlock(opCtx, ns);
if (!db.getDb()->getCollection(opCtx, ns)) { // someone else may have beat us to it.
WriteUnitOfWork wuow(opCtx);
uassertStatusOK(userCreateNS(opCtx, db.getDb(), ns.ns(), BSONObj()));
wuow.commit();
}
});
}
/**
* Returns true if the operation can continue.
*/
bool handleError(OperationContext* opCtx,
const DBException& ex,
const NamespaceString& nss,
const write_ops::WriteCommandBase& wholeOp,
WriteResult* out) {
LastError::get(opCtx->getClient()).setLastError(ex.code(), ex.reason());
auto& curOp = *CurOp::get(opCtx);
curOp.debug().exceptionInfo = ex.toStatus();
if (ErrorCodes::isInterruption(ex.code())) {
throw; // These have always failed the whole batch.
}
if (ErrorCodes::isStaleShardingError(ex.code())) {
auto staleConfigException = dynamic_cast(&ex);
if (!staleConfigException) {
// We need to get extra info off of the SCE, but some common patterns can result in the
// exception being converted to a Status then rethrown as a AssertionException, losing
// the info we need. It would be a bug if this happens so we want to detect it in
// testing, but it isn't severe enough that we should bring down the server if it
// happens in production.
dassert(staleConfigException);
msgasserted(35475,
str::stream()
<< "Got a StaleConfig error but exception was the wrong type: "
<< demangleName(typeid(ex)));
}
if (!opCtx->getClient()->isInDirectClient()) {
ShardingState::get(opCtx)
->onStaleShardVersion(opCtx, nss, staleConfigException->getVersionReceived())
.transitional_ignore();
}
out->staleConfigException = stdx::make_unique(*staleConfigException);
return false;
}
out->results.emplace_back(ex.toStatus());
return !wholeOp.getOrdered();
}
SingleWriteResult createIndex(OperationContext* opCtx,
const NamespaceString& systemIndexes,
const BSONObj& spec) {
BSONElement nsElement = spec["ns"];
uassert(ErrorCodes::NoSuchKey, "Missing \"ns\" field in index description", !nsElement.eoo());
uassert(ErrorCodes::TypeMismatch,
str::stream() << "Expected \"ns\" field of index description to be a "
"string, "
"but found a "
<< typeName(nsElement.type()),
nsElement.type() == String);
const NamespaceString ns(nsElement.valueStringData());
uassert(ErrorCodes::InvalidOptions,
str::stream() << "Cannot create an index on " << ns.ns() << " with an insert to "
<< systemIndexes.ns(),
ns.db() == systemIndexes.db());
BSONObjBuilder cmdBuilder;
cmdBuilder << "createIndexes" << ns.coll();
cmdBuilder << "indexes" << BSON_ARRAY(spec);
auto cmdResult = Command::runCommandDirectly(
opCtx, OpMsgRequest::fromDBAndBody(systemIndexes.db(), cmdBuilder.obj()));
uassertStatusOK(getStatusFromCommandResult(cmdResult));
// Unlike normal inserts, it is not an error to "insert" a duplicate index.
long long n =
cmdResult["numIndexesAfter"].numberInt() - cmdResult["numIndexesBefore"].numberInt();
CurOp::get(opCtx)->debug().ninserted += n;
SingleWriteResult result;
result.setN(n);
return result;
}
WriteResult performCreateIndexes(OperationContext* opCtx, const write_ops::Insert& wholeOp) {
// Currently this creates each index independently. We could pass multiple indexes to
// createIndexes, but there is a lot of complexity involved in doing it correctly. For one
// thing, createIndexes only takes indexes to a single collection, but this batch could include
// different collections. Additionally, the error handling is different: createIndexes is
// all-or-nothing while inserts are supposed to behave like a sequence that either skips over
// errors or stops at the first one. These could theoretically be worked around, but it doesn't
// seem worth it since users that want faster index builds should just use the createIndexes
// command rather than a legacy emulation.
LastOpFixer lastOpFixer(opCtx, wholeOp.getNamespace());
WriteResult out;
for (auto&& spec : wholeOp.getDocuments()) {
try {
lastOpFixer.startingOp();
out.results.emplace_back(createIndex(opCtx, wholeOp.getNamespace(), spec));
lastOpFixer.finishedOpSuccessfully();
} catch (const DBException& ex) {
const bool canContinue =
handleError(opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), &out);
if (!canContinue)
break;
}
}
return out;
}
void insertDocuments(OperationContext* opCtx,
Collection* collection,
std::vector::iterator begin,
std::vector::iterator end) {
// Intentionally not using writeConflictRetry. That is handled by the caller so it can react to
// oversized batches.
WriteUnitOfWork wuow(opCtx);
// Acquire optimes and fill them in for each item in the batch.
// This must only be done for doc-locking storage engines, which are allowed to insert oplog
// documents out-of-timestamp-order. For other storage engines, the oplog entries must be
// physically written in timestamp order, so we defer optime assignment until the oplog is about
// to be written.
auto batchSize = std::distance(begin, end);
if (supportsDocLocking()) {
auto replCoord = repl::ReplicationCoordinator::get(opCtx);
if (!replCoord->isOplogDisabledFor(opCtx, collection->ns())) {
// Populate 'slots' with new optimes for each insert.
// This also notifies the storage engine of each new timestamp.
auto oplogSlots = repl::getNextOpTimes(opCtx, batchSize);
auto slot = oplogSlots.begin();
for (auto it = begin; it != end; it++) {
it->oplogSlot = *slot++;
}
}
}
uassertStatusOK(collection->insertDocuments(
opCtx, begin, end, &CurOp::get(opCtx)->debug(), /*enforceQuota*/ true));
wuow.commit();
}
/**
* Returns true if caller should try to insert more documents. Does nothing else if batch is empty.
*/
bool insertBatchAndHandleErrors(OperationContext* opCtx,
const write_ops::Insert& wholeOp,
std::vector& batch,
LastOpFixer* lastOpFixer,
WriteResult* out) {
if (batch.empty())
return true;
auto& curOp = *CurOp::get(opCtx);
boost::optional collection;
auto acquireCollection = [&] {
while (true) {
opCtx->checkForInterrupt();
if (MONGO_FAIL_POINT(failAllInserts)) {
uasserted(ErrorCodes::InternalError, "failAllInserts failpoint active!");
}
collection.emplace(opCtx, wholeOp.getNamespace(), MODE_IX);
if (collection->getCollection())
break;
collection.reset(); // unlock.
makeCollection(opCtx, wholeOp.getNamespace());
}
curOp.raiseDbProfileLevel(collection->getDb()->getProfilingLevel());
assertCanWrite_inlock(opCtx, wholeOp.getNamespace());
};
try {
acquireCollection();
if (!collection->getCollection()->isCapped() && batch.size() > 1) {
// First try doing it all together. If all goes well, this is all we need to do.
// See Collection::_insertDocuments for why we do all capped inserts one-at-a-time.
lastOpFixer->startingOp();
insertDocuments(opCtx, collection->getCollection(), batch.begin(), batch.end());
lastOpFixer->finishedOpSuccessfully();
globalOpCounters.gotInserts(batch.size());
SingleWriteResult result;
result.setN(1);
std::fill_n(std::back_inserter(out->results), batch.size(), std::move(result));
curOp.debug().ninserted += batch.size();
return true;
}
} catch (const DBException&) {
collection.reset();
// Ignore this failure and behave as-if we never tried to do the combined batch insert.
// The loop below will handle reporting any non-transient errors.
}
// Try to insert the batch one-at-a-time. This path is executed both for singular batches, and
// for batches that failed all-at-once inserting.
for (auto it = batch.begin(); it != batch.end(); ++it) {
globalOpCounters.gotInsert();
try {
writeConflictRetry(opCtx, "insert", wholeOp.getNamespace().ns(), [&] {
try {
if (!collection)
acquireCollection();
lastOpFixer->startingOp();
insertDocuments(opCtx, collection->getCollection(), it, it + 1);
lastOpFixer->finishedOpSuccessfully();
SingleWriteResult result;
result.setN(1);
out->results.emplace_back(std::move(result));
curOp.debug().ninserted++;
} catch (...) {
// Release the lock following any error. Among other things, this ensures that
// we don't sleep in the WCE retry loop with the lock held.
collection.reset();
throw;
}
});
} catch (const DBException& ex) {
bool canContinue =
handleError(opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), out);
if (!canContinue)
return false;
}
}
return true;
}
template
StmtId getStmtIdForWriteOp(OperationContext* opCtx, const T& wholeOp, size_t opIndex) {
return opCtx->getTxnNumber() ? write_ops::getStmtIdForWriteAt(wholeOp, opIndex)
: kUninitializedStmtId;
}
SingleWriteResult makeWriteResultForInsertOrDeleteRetry() {
SingleWriteResult res;
res.setN(1);
res.setNModified(0);
return res;
}
} // namespace
WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& wholeOp) {
invariant(!opCtx->lockState()->inAWriteUnitOfWork()); // Does own retries.
auto& curOp = *CurOp::get(opCtx);
ON_BLOCK_EXIT([&] {
// This is the only part of finishCurOp we need to do for inserts because they reuse the
// top-level curOp. The rest is handled by the top-level entrypoint.
curOp.done();
Top::get(opCtx->getServiceContext())
.record(opCtx,
wholeOp.getNamespace().ns(),
LogicalOp::opInsert,
Top::LockType::WriteLocked,
durationCount(curOp.elapsedTimeExcludingPauses()),
curOp.isCommand(),
curOp.getReadWriteType());
});
{
stdx::lock_guard lk(*opCtx->getClient());
curOp.setNS_inlock(wholeOp.getNamespace().ns());
curOp.setLogicalOp_inlock(LogicalOp::opInsert);
curOp.ensureStarted();
curOp.debug().ninserted = 0;
}
uassertStatusOK(userAllowedWriteNS(wholeOp.getNamespace()));
if (wholeOp.getNamespace().isSystemDotIndexes()) {
return performCreateIndexes(opCtx, wholeOp);
}
DisableDocumentValidationIfTrue docValidationDisabler(
opCtx, wholeOp.getWriteCommandBase().getBypassDocumentValidation());
LastOpFixer lastOpFixer(opCtx, wholeOp.getNamespace());
WriteResult out;
out.results.reserve(wholeOp.getDocuments().size());
size_t stmtIdIndex = 0;
size_t bytesInBatch = 0;
std::vector batch;
const size_t maxBatchSize = internalInsertMaxBatchSize.load();
batch.reserve(std::min(wholeOp.getDocuments().size(), maxBatchSize));
for (auto&& doc : wholeOp.getDocuments()) {
const bool isLastDoc = (&doc == &wholeOp.getDocuments().back());
auto fixedDoc = fixDocumentForInsert(opCtx->getServiceContext(), doc);
if (!fixedDoc.isOK()) {
// Handled after we insert anything in the batch to be sure we report errors in the
// correct order. In an ordered insert, if one of the docs ahead of us fails, we should
// behave as-if we never got to this document.
} else {
const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++);
if (opCtx->getTxnNumber()) {
auto session = OperationContextSession::get(opCtx);
if (session->checkStatementExecutedNoOplogEntryFetch(*opCtx->getTxnNumber(),
stmtId)) {
out.results.emplace_back(makeWriteResultForInsertOrDeleteRetry());
continue;
}
}
BSONObj toInsert = fixedDoc.getValue().isEmpty() ? doc : std::move(fixedDoc.getValue());
batch.emplace_back(stmtId, toInsert);
bytesInBatch += batch.back().doc.objsize();
if (!isLastDoc && batch.size() < maxBatchSize && bytesInBatch < insertVectorMaxBytes)
continue; // Add more to batch before inserting.
}
bool canContinue = insertBatchAndHandleErrors(opCtx, wholeOp, batch, &lastOpFixer, &out);
batch.clear(); // We won't need the current batch any more.
bytesInBatch = 0;
if (canContinue && !fixedDoc.isOK()) {
globalOpCounters.gotInsert();
try {
uassertStatusOK(fixedDoc.getStatus());
MONGO_UNREACHABLE;
} catch (const DBException& ex) {
canContinue = handleError(
opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), &out);
}
}
if (!canContinue)
break;
}
return out;
}
static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx,
const NamespaceString& ns,
StmtId stmtId,
const write_ops::UpdateOpEntry& op) {
uassert(ErrorCodes::InvalidOptions,
"Cannot use (or request) retryable writes with multi=true",
!(opCtx->getTxnNumber() && op.getMulti()));
globalOpCounters.gotUpdate();
auto& curOp = *CurOp::get(opCtx);
{
stdx::lock_guard lk(*opCtx->getClient());
curOp.setNS_inlock(ns.ns());
curOp.setNetworkOp_inlock(dbUpdate);
curOp.setLogicalOp_inlock(LogicalOp::opUpdate);
curOp.setOpDescription_inlock(op.toBSON());
curOp.ensureStarted();
}
UpdateLifecycleImpl updateLifecycle(ns);
UpdateRequest request(ns);
request.setLifecycle(&updateLifecycle);
request.setQuery(op.getQ());
request.setUpdates(op.getU());
request.setCollation(write_ops::collationOf(op));
request.setStmtId(stmtId);
request.setArrayFilters(write_ops::arrayFiltersOf(op));
request.setMulti(op.getMulti());
request.setUpsert(op.getUpsert());
request.setYieldPolicy(PlanExecutor::YIELD_AUTO); // ParsedUpdate overrides this for $isolated.
ParsedUpdate parsedUpdate(opCtx, &request);
uassertStatusOK(parsedUpdate.parseRequest());
boost::optional collection;
while (true) {
opCtx->checkForInterrupt();
if (MONGO_FAIL_POINT(failAllUpdates)) {
uasserted(ErrorCodes::InternalError, "failAllUpdates failpoint active!");
}
collection.emplace(opCtx,
ns,
MODE_IX, // DB is always IX, even if collection is X.
parsedUpdate.isIsolated() ? MODE_X : MODE_IX);
if (collection->getCollection() || !op.getUpsert())
break;
collection.reset(); // unlock.
makeCollection(opCtx, ns);
}
if (collection->getDb()) {
curOp.raiseDbProfileLevel(collection->getDb()->getProfilingLevel());
}
assertCanWrite_inlock(opCtx, ns);
auto exec = uassertStatusOK(
getExecutorUpdate(opCtx, &curOp.debug(), collection->getCollection(), &parsedUpdate));
{
stdx::lock_guard lk(*opCtx->getClient());
CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
}
uassertStatusOK(exec->executePlan());
PlanSummaryStats summary;
Explain::getSummaryStats(*exec, &summary);
if (collection->getCollection()) {
collection->getCollection()->infoCache()->notifyOfQuery(opCtx, summary.indexesUsed);
}
if (curOp.shouldDBProfile()) {
BSONObjBuilder execStatsBob;
Explain::getWinningPlanStats(exec.get(), &execStatsBob);
curOp.debug().execStats = execStatsBob.obj();
}
const UpdateStats* updateStats = UpdateStage::getUpdateStats(exec.get());
UpdateStage::recordUpdateStatsInOpDebug(updateStats, &curOp.debug());
curOp.debug().setPlanSummaryMetrics(summary);
UpdateResult res = UpdateStage::makeUpdateResult(updateStats);
const bool didInsert = !res.upserted.isEmpty();
const long long nMatchedOrInserted = didInsert ? 1 : res.numMatched;
LastError::get(opCtx->getClient()).recordUpdate(res.existing, nMatchedOrInserted, res.upserted);
SingleWriteResult result;
result.setN(nMatchedOrInserted);
result.setNModified(res.numDocsModified);
result.setUpsertedId(res.upserted);
return result;
}
WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& wholeOp) {
invariant(!opCtx->lockState()->inAWriteUnitOfWork()); // Does own retries.
uassertStatusOK(userAllowedWriteNS(wholeOp.getNamespace()));
DisableDocumentValidationIfTrue docValidationDisabler(
opCtx, wholeOp.getWriteCommandBase().getBypassDocumentValidation());
LastOpFixer lastOpFixer(opCtx, wholeOp.getNamespace());
size_t stmtIdIndex = 0;
WriteResult out;
out.results.reserve(wholeOp.getUpdates().size());
for (auto&& singleOp : wholeOp.getUpdates()) {
const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++);
if (opCtx->getTxnNumber()) {
auto session = OperationContextSession::get(opCtx);
if (auto entry =
session->checkStatementExecuted(opCtx, *opCtx->getTxnNumber(), stmtId)) {
out.results.emplace_back(parseOplogEntryForUpdate(*entry));
continue;
}
}
// TODO: don't create nested CurOp for legacy writes.
// Add Command pointer to the nested CurOp.
auto& parentCurOp = *CurOp::get(opCtx);
Command* cmd = parentCurOp.getCommand();
CurOp curOp(opCtx);
{
stdx::lock_guard lk(*opCtx->getClient());
curOp.setCommand_inlock(cmd);
}
ON_BLOCK_EXIT([&] { finishCurOp(opCtx, &curOp); });
try {
lastOpFixer.startingOp();
out.results.emplace_back(
performSingleUpdateOp(opCtx, wholeOp.getNamespace(), stmtId, singleOp));
lastOpFixer.finishedOpSuccessfully();
} catch (const DBException& ex) {
const bool canContinue =
handleError(opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), &out);
if (!canContinue)
break;
}
}
return out;
}
static SingleWriteResult performSingleDeleteOp(OperationContext* opCtx,
const NamespaceString& ns,
StmtId stmtId,
const write_ops::DeleteOpEntry& op) {
uassert(ErrorCodes::InvalidOptions,
"Cannot use (or request) retryable writes with limit=0",
!(opCtx->getTxnNumber() && op.getMulti()));
globalOpCounters.gotDelete();
auto& curOp = *CurOp::get(opCtx);
{
stdx::lock_guard lk(*opCtx->getClient());
curOp.setNS_inlock(ns.ns());
curOp.setNetworkOp_inlock(dbDelete);
curOp.setLogicalOp_inlock(LogicalOp::opDelete);
curOp.setOpDescription_inlock(op.toBSON());
curOp.ensureStarted();
}
curOp.debug().ndeleted = 0;
DeleteRequest request(ns);
request.setQuery(op.getQ());
request.setCollation(write_ops::collationOf(op));
request.setMulti(op.getMulti());
request.setYieldPolicy(PlanExecutor::YIELD_AUTO); // ParsedDelete overrides this for $isolated.
request.setStmtId(stmtId);
ParsedDelete parsedDelete(opCtx, &request);
uassertStatusOK(parsedDelete.parseRequest());
opCtx->checkForInterrupt();
if (MONGO_FAIL_POINT(failAllRemoves)) {
uasserted(ErrorCodes::InternalError, "failAllRemoves failpoint active!");
}
AutoGetCollection collection(opCtx,
ns,
MODE_IX, // DB is always IX, even if collection is X.
parsedDelete.isIsolated() ? MODE_X : MODE_IX);
if (collection.getDb()) {
curOp.raiseDbProfileLevel(collection.getDb()->getProfilingLevel());
}
assertCanWrite_inlock(opCtx, ns);
auto exec = uassertStatusOK(
getExecutorDelete(opCtx, &curOp.debug(), collection.getCollection(), &parsedDelete));
{
stdx::lock_guard lk(*opCtx->getClient());
CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
}
uassertStatusOK(exec->executePlan());
long long n = DeleteStage::getNumDeleted(*exec);
curOp.debug().ndeleted = n;
PlanSummaryStats summary;
Explain::getSummaryStats(*exec, &summary);
if (collection.getCollection()) {
collection.getCollection()->infoCache()->notifyOfQuery(opCtx, summary.indexesUsed);
}
curOp.debug().setPlanSummaryMetrics(summary);
if (curOp.shouldDBProfile()) {
BSONObjBuilder execStatsBob;
Explain::getWinningPlanStats(exec.get(), &execStatsBob);
curOp.debug().execStats = execStatsBob.obj();
}
LastError::get(opCtx->getClient()).recordDelete(n);
SingleWriteResult result;
result.setN(n);
return result;
}
WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& wholeOp) {
invariant(!opCtx->lockState()->inAWriteUnitOfWork()); // Does own retries.
uassertStatusOK(userAllowedWriteNS(wholeOp.getNamespace()));
DisableDocumentValidationIfTrue docValidationDisabler(
opCtx, wholeOp.getWriteCommandBase().getBypassDocumentValidation());
LastOpFixer lastOpFixer(opCtx, wholeOp.getNamespace());
size_t stmtIdIndex = 0;
WriteResult out;
out.results.reserve(wholeOp.getDeletes().size());
for (auto&& singleOp : wholeOp.getDeletes()) {
const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++);
if (opCtx->getTxnNumber()) {
auto session = OperationContextSession::get(opCtx);
if (session->checkStatementExecutedNoOplogEntryFetch(*opCtx->getTxnNumber(), stmtId)) {
out.results.emplace_back(makeWriteResultForInsertOrDeleteRetry());
continue;
}
}
// TODO: don't create nested CurOp for legacy writes.
// Add Command pointer to the nested CurOp.
auto& parentCurOp = *CurOp::get(opCtx);
Command* cmd = parentCurOp.getCommand();
CurOp curOp(opCtx);
{
stdx::lock_guard lk(*opCtx->getClient());
curOp.setCommand_inlock(cmd);
}
ON_BLOCK_EXIT([&] { finishCurOp(opCtx, &curOp); });
try {
lastOpFixer.startingOp();
out.results.emplace_back(
performSingleDeleteOp(opCtx, wholeOp.getNamespace(), stmtId, singleOp));
lastOpFixer.finishedOpSuccessfully();
} catch (const DBException& ex) {
const bool canContinue =
handleError(opCtx, ex, wholeOp.getNamespace(), wholeOp.getWriteCommandBase(), &out);
if (!canContinue)
break;
}
}
return out;
}
} // namespace mongo