summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2016-04-07 18:34:44 -0400
committerMathias Stearn <mathias@10gen.com>2016-04-21 18:58:41 -0400
commitd819ac65d1a0f941bd3e201f343ac04e252c4442 (patch)
treea531131c01f3816094d34cd845cfd71aec2e3459
parenta7a593da31a944c90d7c5f0422eeee8264eb438d (diff)
downloadmongo-d819ac65d1a0f941bd3e201f343ac04e252c4442.tar.gz
SERVER-23128 Refactor mongod write operations
Now both write commands and legacy writes share an implementation.
-rw-r--r--jstests/core/batch_write_command_insert.js4
-rw-r--r--jstests/gle/opcounters_legacy.js6
-rw-r--r--src/mongo/db/SConscript2
-rw-r--r--src/mongo/db/catalog/document_validation.h14
-rw-r--r--src/mongo/db/commands/write_commands/batch_executor.cpp1284
-rw-r--r--src/mongo/db/commands/write_commands/batch_executor.h186
-rw-r--r--src/mongo/db/commands/write_commands/write_commands.cpp445
-rw-r--r--src/mongo/db/commands/write_commands/write_commands.h127
-rw-r--r--src/mongo/db/instance.cpp847
-rw-r--r--src/mongo/db/lasterror.cpp5
-rw-r--r--src/mongo/db/lasterror.h4
-rw-r--r--src/mongo/db/ops/write_ops.h1
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp630
-rw-r--r--src/mongo/db/ops/write_ops_exec.h85
-rw-r--r--src/mongo/db/ops/write_ops_parsers.cpp16
-rw-r--r--src/mongo/db/ops/write_ops_parsers_test.cpp20
-rw-r--r--src/mongo/db/stats/counters.cpp2
-rw-r--r--src/mongo/db/stats/counters.h2
-rw-r--r--src/mongo/dbtests/query_stage_sort.cpp12
19 files changed, 1202 insertions, 2490 deletions
diff --git a/jstests/core/batch_write_command_insert.js b/jstests/core/batch_write_command_insert.js
index 6b42cf08ebf..3405785ecce 100644
--- a/jstests/core/batch_write_command_insert.js
+++ b/jstests/core/batch_write_command_insert.js
@@ -285,7 +285,7 @@ request = {
documents: [{ns: "invalid." + coll.getName(), key: {x: 1}, name: "x_1", unique: true}]
};
result = coll.runCommand(request);
-assert(!result.ok, tojson(result));
+assert(!result.ok || (result.n == 0 && result.writeErrors.length == 1), tojson(result));
assert.eq(coll.getIndexes().length, 0);
//
@@ -296,7 +296,7 @@ request = {
documents: [{}]
};
result = coll.runCommand(request);
-assert(!result.ok, tojson(result));
+assert(!result.ok || (result.n == 0 && result.writeErrors.length == 1), tojson(result));
assert.eq(coll.getIndexes().length, 0);
//
diff --git a/jstests/gle/opcounters_legacy.js b/jstests/gle/opcounters_legacy.js
index c31494b6c01..b243b8bc076 100644
--- a/jstests/gle/opcounters_legacy.js
+++ b/jstests/gle/opcounters_legacy.js
@@ -46,20 +46,20 @@ opCounters = db.serverStatus().opcounters;
t.insert({_id: 0});
print(db.getLastError());
assert(db.getLastError());
-assert.eq(opCounters.insert + (isMongos ? 1 : 0), db.serverStatus().opcounters.insert);
+assert.eq(opCounters.insert + 1, db.serverStatus().opcounters.insert);
// Bulk insert, with error, continueOnError=false.
opCounters = db.serverStatus().opcounters;
t.insert([{_id: 3}, {_id: 3}, {_id: 4}]);
assert(db.getLastError());
-assert.eq(opCounters.insert + (isMongos ? 2 : 1), db.serverStatus().opcounters.insert);
+assert.eq(opCounters.insert + 2, db.serverStatus().opcounters.insert);
// Bulk insert, with error, continueOnError=true.
var continueOnErrorFlag = 1;
opCounters = db.serverStatus().opcounters;
t.insert([{_id: 5}, {_id: 5}, {_id: 6}], continueOnErrorFlag);
assert(db.getLastError());
-assert.eq(opCounters.insert + 2, db.serverStatus().opcounters.insert);
+assert.eq(opCounters.insert + (isMongos ? 2 : 3), db.serverStatus().opcounters.insert);
//
// 2. Update.
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 71e16230890..5b68bb4454f 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -615,7 +615,6 @@ serverOnlyFiles = [
"commands/touch.cpp",
"commands/user_management_commands.cpp",
"commands/validate.cpp",
- "commands/write_commands/batch_executor.cpp",
"commands/write_commands/write_commands.cpp",
"curop_metrics.cpp",
"db_raii.cpp",
@@ -646,6 +645,7 @@ serverOnlyFiles = [
"ops/update.cpp",
"ops/update_lifecycle_impl.cpp",
"ops/update_result.cpp",
+ "ops/write_ops_exec.cpp",
"pipeline/document_source_cursor.cpp",
"pipeline/pipeline_d.cpp",
"prefetch.cpp",
diff --git a/src/mongo/db/catalog/document_validation.h b/src/mongo/db/catalog/document_validation.h
index 2bc8f8b4787..e5b0fc0555c 100644
--- a/src/mongo/db/catalog/document_validation.h
+++ b/src/mongo/db/catalog/document_validation.h
@@ -69,4 +69,18 @@ private:
OperationContext* const _txn;
const bool _initialState;
};
+
+/**
+ * Disables document validation while in scope if the constructor is passed true.
+ */
+class DisableDocumentValidationIfTrue {
+public:
+ DisableDocumentValidationIfTrue(OperationContext* txn, bool shouldDisableValidation) {
+ if (shouldDisableValidation)
+ _documentValidationDisabler.emplace(txn);
+ }
+
+private:
+ boost::optional<DisableDocumentValidation> _documentValidationDisabler;
+};
}
diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp
deleted file mode 100644
index 69bd166f4e5..00000000000
--- a/src/mongo/db/commands/write_commands/batch_executor.cpp
+++ /dev/null
@@ -1,1284 +0,0 @@
-/**
- * Copyright (C) 2013 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kWrite
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/commands/write_commands/batch_executor.h"
-
-#include <memory>
-
-#include "mongo/base/error_codes.h"
-#include "mongo/db/catalog/database.h"
-#include "mongo/db/catalog/database_holder.h"
-#include "mongo/db/catalog/document_validation.h"
-#include "mongo/db/catalog/index_create.h"
-#include "mongo/db/clientcursor.h"
-#include "mongo/db/commands.h"
-#include "mongo/db/concurrency/write_conflict_exception.h"
-#include "mongo/db/curop_metrics.h"
-#include "mongo/db/db_raii.h"
-#include "mongo/db/exec/delete.h"
-#include "mongo/db/exec/update.h"
-#include "mongo/db/instance.h"
-#include "mongo/db/introspect.h"
-#include "mongo/db/lasterror.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/db/op_observer.h"
-#include "mongo/db/ops/delete_request.h"
-#include "mongo/db/ops/insert.h"
-#include "mongo/db/ops/parsed_delete.h"
-#include "mongo/db/ops/parsed_update.h"
-#include "mongo/db/ops/update_lifecycle_impl.h"
-#include "mongo/db/query/get_executor.h"
-#include "mongo/db/query/plan_executor.h"
-#include "mongo/db/query/plan_summary_stats.h"
-#include "mongo/db/query/query_knobs.h"
-#include "mongo/db/repl/oplog.h"
-#include "mongo/db/repl/repl_client_info.h"
-#include "mongo/db/repl/repl_settings.h"
-#include "mongo/db/repl/replication_coordinator_global.h"
-#include "mongo/db/server_parameters.h"
-#include "mongo/db/s/collection_metadata.h"
-#include "mongo/db/s/collection_sharding_state.h"
-#include "mongo/db/s/operation_sharding_state.h"
-#include "mongo/db/s/sharded_connection_info.h"
-#include "mongo/db/s/sharding_state.h"
-#include "mongo/db/stats/counters.h"
-#include "mongo/db/stats/top.h"
-#include "mongo/db/write_concern.h"
-#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/s/shard_key_pattern.h"
-#include "mongo/s/stale_exception.h"
-#include "mongo/s/write_ops/batched_upsert_detail.h"
-#include "mongo/s/write_ops/write_error_detail.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/stdx/mutex.h"
-#include "mongo/util/elapsed_tracker.h"
-#include "mongo/util/log.h"
-#include "mongo/util/mongoutils/str.h"
-#include "mongo/util/scopeguard.h"
-
-namespace mongo {
-
-using std::string;
-using std::unique_ptr;
-using std::vector;
-using str::stream;
-
-namespace {
-
-/**
- * Data structure to safely hold and clean up results of single write operations.
- */
-class WriteOpResult {
- MONGO_DISALLOW_COPYING(WriteOpResult);
-
-public:
- WriteOpResult() {}
-
- WriteOpStats& getStats() {
- return _stats;
- }
-
- WriteErrorDetail* getError() {
- return _error.get();
- }
- WriteErrorDetail* releaseError() {
- return _error.release();
- }
- void setError(WriteErrorDetail* error) {
- _error.reset(error);
- }
-
-private:
- WriteOpStats _stats;
- std::unique_ptr<WriteErrorDetail> _error;
-};
-
-WriteErrorDetail* toWriteError(const Status& status) {
- WriteErrorDetail* error = new WriteErrorDetail;
-
- // TODO: Complex transform here?
- error->setErrCode(status.code());
- error->setErrMessage(status.reason());
-
- return error;
-}
-
-void toBatchError(const Status& status, BatchedCommandResponse* response) {
- response->clear();
- response->setErrCode(status.code());
- response->setErrMessage(status.reason());
- response->setOk(false);
- dassert(response->isValid(NULL));
-}
-
-/**
- * Translates write item type to wire protocol op code. Helper for
- * WriteBatchExecutor::applyWriteItem().
- */
-NetworkOp getOpCode(const BatchItemRef& currWrite) {
- switch (currWrite.getRequest()->getBatchType()) {
- case BatchedCommandRequest::BatchType_Insert:
- return dbInsert;
- case BatchedCommandRequest::BatchType_Update:
- return dbUpdate;
- case BatchedCommandRequest::BatchType_Delete:
- return dbDelete;
- default:
- MONGO_UNREACHABLE;
- }
-}
-
-void buildStaleError(const ChunkVersion& shardVersionRecvd,
- const ChunkVersion& shardVersionWanted,
- WriteErrorDetail* error) {
- // Write stale error to results
- error->setErrCode(ErrorCodes::StaleShardVersion);
-
- BSONObjBuilder infoB;
- shardVersionWanted.addToBSON(infoB, "vWanted");
- error->setErrInfo(infoB.obj());
-
- string errMsg = stream() << "stale shard version detected before write, received "
- << shardVersionRecvd.toString() << " but local version is "
- << shardVersionWanted.toString();
- error->setErrMessage(errMsg);
-}
-
-bool checkShardVersion(OperationContext* txn,
- const BatchedCommandRequest& request,
- WriteOpResult* result) {
- const auto& css = CollectionShardingState::get(txn, request.getTargetingNSS());
-
- try {
- css->checkShardVersionOrThrow(txn);
- return true;
- } catch (const StaleConfigException& e) {
- result->setError(new WriteErrorDetail());
- buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError());
- return false;
- }
-}
-
-} // namespace
-
-WriteBatchExecutor::WriteBatchExecutor(OperationContext* txn, OpCounters* opCounters, LastError* le)
- : _txn(txn), _opCounters(opCounters), _le(le), _stats(new WriteBatchStats) {}
-
-// static
-Status WriteBatchExecutor::validateBatch(const BatchedCommandRequest& request) {
- // Validate namespace
- const NamespaceString& nss = request.getNS();
- if (!nss.isValid()) {
- return Status(ErrorCodes::InvalidNamespace, nss.ns() + " is not a valid namespace");
- }
-
- // Make sure we can write to the namespace
- Status allowedStatus = userAllowedWriteNS(nss);
- if (!allowedStatus.isOK()) {
- return allowedStatus;
- }
-
- // Validate insert index requests
- // TODO: Push insert index requests through createIndex once all upgrade paths support it
- string errMsg;
- if (request.isInsertIndexRequest() && !request.isValidIndexRequest(&errMsg)) {
- return Status(ErrorCodes::InvalidOptions, errMsg);
- }
-
- return Status::OK();
-}
-
-void WriteBatchExecutor::executeBatch(const BatchedCommandRequest& request,
- BatchedCommandResponse* response) {
- // Validate namespace
- Status isValid = validateBatch(request);
- if (!isValid.isOK()) {
- toBatchError(isValid, response);
- return;
- }
-
- if (request.sizeWriteOps() == 0u) {
- toBatchError(Status(ErrorCodes::InvalidLength, "no write ops were included in the batch"),
- response);
- return;
- }
-
- // Validate batch size
- if (request.sizeWriteOps() > BatchedCommandRequest::kMaxWriteBatchSize) {
- toBatchError(Status(ErrorCodes::InvalidLength,
- stream() << "exceeded maximum write batch size of "
- << BatchedCommandRequest::kMaxWriteBatchSize),
- response);
- return;
- }
-
- //
- // End validation
- //
-
- const WriteConcernOptions& writeConcern = _txn->getWriteConcern();
- bool silentWC = writeConcern.wMode.empty() && writeConcern.wNumNodes == 0 &&
- (writeConcern.syncMode == WriteConcernOptions::SyncMode::NONE ||
- writeConcern.syncMode == WriteConcernOptions::SyncMode::UNSET);
-
- Timer commandTimer;
-
- OwnedPointerVector<WriteErrorDetail> writeErrorsOwned;
- vector<WriteErrorDetail*>& writeErrors = writeErrorsOwned.mutableVector();
-
- OwnedPointerVector<BatchedUpsertDetail> upsertedOwned;
- vector<BatchedUpsertDetail*>& upserted = upsertedOwned.mutableVector();
-
- //
- // Apply each batch item, possibly bulking some items together in the write lock.
- // Stops on error if batch is ordered.
- //
-
- bulkExecute(request, &upserted, &writeErrors);
-
- //
- // Refresh metadata if needed
- //
-
- const bool staleBatch =
- !writeErrors.empty() && writeErrors.back()->getErrCode() == ErrorCodes::StaleShardVersion;
-
- if (staleBatch) {
- const auto& oss = OperationShardingState::get(_txn);
-
- ChunkVersion requestedShardVersion = oss.getShardVersion(request.getTargetingNSS());
- ShardingState::get(_txn)
- ->onStaleShardVersion(_txn, request.getTargetingNSS(), requestedShardVersion);
- }
-
- //
- // Construct response
- //
-
- response->setOk(true);
-
- if (!silentWC) {
- if (upserted.size()) {
- response->setUpsertDetails(upserted);
- }
-
- if (writeErrors.size()) {
- response->setErrDetails(writeErrors);
- }
-
- repl::ReplicationCoordinator* replCoord = repl::getGlobalReplicationCoordinator();
- const repl::ReplicationCoordinator::Mode replMode = replCoord->getReplicationMode();
- if (replMode != repl::ReplicationCoordinator::modeNone) {
- response->setLastOp(repl::ReplClientInfo::forClient(_txn->getClient()).getLastOp());
- if (replMode == repl::ReplicationCoordinator::modeReplSet) {
- response->setElectionId(replCoord->getElectionId());
- }
- }
-
- // Set the stats for the response
- response->setN(_stats->numInserted + _stats->numUpserted + _stats->numMatched +
- _stats->numDeleted);
- if (request.getBatchType() == BatchedCommandRequest::BatchType_Update)
- response->setNModified(_stats->numModified);
- }
-
- dassert(response->isValid(NULL));
-}
-
-static bool checkIsMasterForDatabase(const NamespaceString& ns, WriteOpResult* result) {
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(ns)) {
- WriteErrorDetail* errorDetail = new WriteErrorDetail;
- result->setError(errorDetail);
- errorDetail->setErrCode(ErrorCodes::NotMaster);
- errorDetail->setErrMessage("Not primary while writing to " + ns.toString());
- return false;
- }
- return true;
-}
-
-static void buildUniqueIndexError(const BSONObj& keyPattern,
- const BSONObj& indexPattern,
- WriteErrorDetail* error) {
- error->setErrCode(ErrorCodes::CannotCreateIndex);
- string errMsg = stream() << "cannot create unique index over " << indexPattern
- << " with shard key pattern " << keyPattern;
- error->setErrMessage(errMsg);
-}
-
-static bool checkIndexConstraints(OperationContext* txn,
- const BatchedCommandRequest& request,
- WriteOpResult* result) {
- const NamespaceString& nss = request.getTargetingNSS();
- dassert(txn->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IX));
-
- if (!request.isUniqueIndexRequest())
- return true;
-
- ShardingState* shardingState = ShardingState::get(txn);
- if (shardingState->enabled()) {
- auto metadata = shardingState->getCollectionMetadata(nss.ns());
- if (metadata) {
- ShardKeyPattern shardKeyPattern(metadata->getKeyPattern());
- if (!shardKeyPattern.isUniqueIndexCompatible(request.getIndexKeyPattern())) {
- result->setError(new WriteErrorDetail);
- buildUniqueIndexError(
- metadata->getKeyPattern(), request.getIndexKeyPattern(), result->getError());
-
- return false;
- }
- }
- }
-
- return true;
-}
-
-//
-// HELPERS FOR CUROP MANAGEMENT AND GLOBAL STATS
-//
-
-static void beginCurrentOp(OperationContext* txn, const BatchItemRef& currWrite) {
- stdx::lock_guard<Client> lk(*txn->getClient());
- CurOp* const currentOp = CurOp::get(txn);
- currentOp->setNetworkOp_inlock(getOpCode(currWrite));
- currentOp->setLogicalOp_inlock(networkOpToLogicalOp(getOpCode(currWrite)));
- currentOp->ensureStarted();
- currentOp->setNS_inlock(currWrite.getRequest()->getNS().ns());
-
- if (currWrite.getOpType() == BatchedCommandRequest::BatchType_Insert) {
- currentOp->setQuery_inlock(currWrite.getDocument());
- currentOp->debug().query = currWrite.getDocument();
- currentOp->debug().ninserted = 0;
- } else if (currWrite.getOpType() == BatchedCommandRequest::BatchType_Update) {
- currentOp->setQuery_inlock(currWrite.getUpdate()->getQuery());
- currentOp->debug().query = currWrite.getUpdate()->getQuery();
- currentOp->debug().updateobj = currWrite.getUpdate()->getUpdateExpr();
- // Note: debug().nMatched, nModified and nmoved are set internally in update
- } else {
- dassert(currWrite.getOpType() == BatchedCommandRequest::BatchType_Delete);
- currentOp->setQuery_inlock(currWrite.getDelete()->getQuery());
- currentOp->debug().query = currWrite.getDelete()->getQuery();
- currentOp->debug().ndeleted = 0;
- }
-}
-
-void WriteBatchExecutor::incWriteStats(const BatchedCommandRequest::BatchType opType,
- const WriteOpStats& stats,
- const WriteErrorDetail* error,
- CurOp* currentOp) {
- if (opType == BatchedCommandRequest::BatchType_Update) {
- if (stats.upsertedID.isEmpty()) {
- _stats->numMatched += stats.n;
- _stats->numModified += stats.nModified;
- } else {
- ++_stats->numUpserted;
- }
-
- if (!error) {
- _le->recordUpdate(stats.upsertedID.isEmpty() && stats.n > 0, stats.n, stats.upsertedID);
- }
- } else {
- dassert(opType == BatchedCommandRequest::BatchType_Delete);
- _stats->numDeleted += stats.n;
- if (!error) {
- _le->recordDelete(stats.n);
- }
- currentOp->debug().ndeleted += stats.n;
- }
-
- if (error) {
- _le->setLastError(error->getErrCode(), error->getErrMessage().c_str());
- }
-}
-
-static void logCurOpError(CurOp* currentOp, WriteErrorDetail* opError) {
- invariant(opError != nullptr);
- currentOp->debug().exceptionInfo =
- ExceptionInfo(opError->getErrMessage(), opError->getErrCode());
-
- LOG(3) << " Caught Assertion in " << networkOpToString(currentOp->getNetworkOp())
- << ", continuing " << causedBy(opError->getErrMessage());
-}
-
-static void finishCurrentOp(OperationContext* txn, WriteErrorDetail* opError) {
- CurOp* currentOp = CurOp::get(txn);
- currentOp->done();
- int executionTime = currentOp->debug().executionTime = currentOp->totalTimeMillis();
- recordCurOpMetrics(txn);
- Top::get(txn->getClient()->getServiceContext())
- .record(currentOp->getNS(),
- currentOp->getLogicalOp(),
- 1, // "write locked"
- currentOp->totalTimeMicros(),
- currentOp->isCommand());
-
- if (opError)
- logCurOpError(currentOp, opError);
-
- bool logAll = logger::globalLogDomain()->shouldLog(logger::LogComponent::kCommand,
- logger::LogSeverity::Debug(1));
- bool logSlow = executionTime > (serverGlobalParams.slowMS + currentOp->getExpectedLatencyMs());
-
- if (logAll || logSlow) {
- Locker::LockerInfo lockerInfo;
- txn->lockState()->getLockerInfo(&lockerInfo);
-
- LOG(0) << currentOp->debug().report(*currentOp, lockerInfo.stats);
- }
-
- if (currentOp->shouldDBProfile(executionTime)) {
- profile(txn, CurOp::get(txn)->getNetworkOp());
- }
-}
-
-// END HELPERS
-
-//
-// CORE WRITE OPERATIONS (declaration)
-// These functions write to the database and return stats and zero or one of:
-// - page fault
-// - error
-//
-
-static void singleCreateIndex(OperationContext* txn,
- const BSONObj& indexDesc,
- WriteOpResult* result);
-
-static void multiUpdate(OperationContext* txn,
- const BatchItemRef& updateItem,
- WriteOpResult* result);
-
-static void multiRemove(OperationContext* txn,
- const BatchItemRef& removeItem,
- WriteOpResult* result);
-
-//
-// WRITE EXECUTION
-// In general, the exec* operations manage db lock state and stats before dispatching to the
-// core write operations, which are *only* responsible for performing a write and reporting
-// success or failure.
-//
-
-/**
- * Representation of the execution state of execInserts. Used by a single
- * execution of execInserts in a single thread.
- */
-class WriteBatchExecutor::ExecInsertsState {
- MONGO_DISALLOW_COPYING(ExecInsertsState);
-
-public:
- /**
- * Constructs a new instance, for performing inserts described in "aRequest".
- */
- explicit ExecInsertsState(OperationContext* txn, const BatchedCommandRequest* aRequest);
-
- /**
- * Acquires the write lock and client context needed to perform the current write operation.
- * Returns true on success, after which it is safe to use the "context" and "collection"
- * members. It is safe to call this function if this instance already holds the write lock.
- *
- * On failure, writeLock, context and collection will be NULL/clear.
- */
- bool lockAndCheck(WriteOpResult* result);
-
- /**
- * Releases the client context and write lock acquired by lockAndCheck. Safe to call
- * regardless of whether or not this state object currently owns the lock.
- */
- void unlock();
-
- /**
- * Returns true if this executor has the lock on the target database.
- */
- bool hasLock() {
- return _dbLock.get();
- }
-
- /**
- * Gets the target collection for the batch operation. Value is undefined
- * unless hasLock() is true.
- */
- Collection* getCollection() {
- return _collection;
- }
-
- OperationContext* txn;
-
- // Request object describing the inserts.
- const BatchedCommandRequest* request;
-
- // Index of the current insert operation to perform.
- size_t currIndex = 0;
-
- // Translation of insert documents in "request" into insert-ready forms. This vector has a
- // correspondence with elements of the "request", and "currIndex" is used to
- // index both.
- std::vector<StatusWith<BSONObj>> normalizedInserts;
-
-private:
- bool _lockAndCheckImpl(WriteOpResult* result, bool intentLock);
-
- ScopedTransaction _transaction;
- // Guard object for the write lock on the target database.
- std::unique_ptr<Lock::DBLock> _dbLock;
- std::unique_ptr<Lock::CollectionLock> _collLock;
-
- Database* _database = nullptr;
- Collection* _collection = nullptr;
-};
-
-void WriteBatchExecutor::bulkExecute(const BatchedCommandRequest& request,
- std::vector<BatchedUpsertDetail*>* upsertedIds,
- std::vector<WriteErrorDetail*>* errors) {
- boost::optional<DisableDocumentValidation> maybeDisableValidation;
- if (request.shouldBypassValidation()) {
- maybeDisableValidation.emplace(_txn);
- }
-
- if (request.getBatchType() == BatchedCommandRequest::BatchType_Insert) {
- execInserts(request, errors);
- } else if (request.getBatchType() == BatchedCommandRequest::BatchType_Update) {
- for (size_t i = 0; i < request.sizeWriteOps(); i++) {
- if (i + 1 == request.sizeWriteOps()) {
- setupSynchronousCommit(_txn);
- }
-
- WriteErrorDetail* error = NULL;
- BSONObj upsertedId;
- execUpdate(BatchItemRef(&request, i), &upsertedId, &error);
-
- if (!upsertedId.isEmpty()) {
- BatchedUpsertDetail* batchUpsertedId = new BatchedUpsertDetail;
- batchUpsertedId->setIndex(i);
- batchUpsertedId->setUpsertedID(upsertedId);
- upsertedIds->push_back(batchUpsertedId);
- }
-
- if (error) {
- errors->push_back(error);
- if (request.getOrdered())
- break;
- }
- }
- } else {
- dassert(request.getBatchType() == BatchedCommandRequest::BatchType_Delete);
- for (size_t i = 0; i < request.sizeWriteOps(); i++) {
- if (i + 1 == request.sizeWriteOps()) {
- setupSynchronousCommit(_txn);
- }
-
- WriteErrorDetail* error = NULL;
- execRemove(BatchItemRef(&request, i), &error);
-
- if (error) {
- errors->push_back(error);
- if (request.getOrdered())
- break;
- }
- }
- }
-
- // Fill in stale version errors for unordered batches (update/delete can't do this on own)
- if (!errors->empty() && !request.getOrdered()) {
- const WriteErrorDetail* finalError = errors->back();
-
- if (finalError->getErrCode() == ErrorCodes::StaleShardVersion) {
- for (size_t i = finalError->getIndex() + 1; i < request.sizeWriteOps(); i++) {
- WriteErrorDetail* dupStaleError = new WriteErrorDetail;
- finalError->cloneTo(dupStaleError);
- errors->push_back(dupStaleError);
- }
- }
- }
-}
-
-// Goes over the request and preprocesses normalized versions of all the inserts in the request
-static void normalizeInserts(const BatchedCommandRequest& request,
- vector<StatusWith<BSONObj>>* normalizedInserts) {
- normalizedInserts->reserve(request.sizeWriteOps());
- for (size_t i = 0; i < request.sizeWriteOps(); ++i) {
- BSONObj insertDoc = request.getInsertRequest()->getDocumentsAt(i);
- StatusWith<BSONObj> normalInsert = fixDocumentForInsert(insertDoc);
- normalizedInserts->push_back(normalInsert);
- if (request.getOrdered() && !normalInsert.isOK())
- break;
- }
-}
-
-static void insertOne(WriteBatchExecutor::ExecInsertsState* state, WriteOpResult* result);
-
-// Loops over the specified subset of the batch, processes one document at a time.
-// Returns a true to discontinue the insert, or false if not.
-bool WriteBatchExecutor::insertMany(WriteBatchExecutor::ExecInsertsState* state,
- size_t startIndex,
- size_t endIndex,
- CurOp* currentOp,
- std::vector<WriteErrorDetail*>* errors,
- bool ordered) {
- for (state->currIndex = startIndex; state->currIndex < endIndex; ++state->currIndex) {
- WriteOpResult result;
- BatchItemRef currInsertItem(state->request, state->currIndex);
- {
- stdx::lock_guard<Client> lk(*_txn->getClient());
- currentOp->setQuery_inlock(currInsertItem.getDocument());
- currentOp->debug().query = currInsertItem.getDocument();
- }
-
- _opCounters->gotInsert();
- // Internally, insertOne retries the single insert until it completes without a write
- // conflict exception, or until it fails with some kind of error. Errors are mostly
- // propagated via the request->error field, but DBExceptions or std::exceptions may escape,
- // particularly on operation interruption. These kinds of errors necessarily prevent
- // further insertOne calls, and stop the batch. As a result, the only expected source of
- // such exceptions are interruptions.
- insertOne(state, &result);
-
- uint64_t nInserted = result.getStats().n;
- _stats->numInserted += nInserted;
- currentOp->debug().ninserted += nInserted;
-
- const WriteErrorDetail* error = result.getError();
- if (error) {
- _le->setLastError(error->getErrCode(), error->getErrMessage().c_str());
- WriteErrorDetail* error = NULL;
- error = result.releaseError();
- errors->push_back(error);
- error->setIndex(state->currIndex);
- logCurOpError(CurOp::get(_txn), error);
- if (ordered)
- return true;
- } else {
- _le->recordInsert(nInserted);
- }
- }
- return false;
-}
-
-// Instantiates an ExecInsertsState, which represents all of the state for the batch.
-// Breaks out into manageably sized chunks for insertMany, between which we can yield.
-// Encapsulates the lock state.
-void WriteBatchExecutor::execInserts(const BatchedCommandRequest& request,
- std::vector<WriteErrorDetail*>* errors) {
- ExecInsertsState state(_txn, &request);
- normalizeInserts(request, &state.normalizedInserts);
-
- CurOp* currentOp;
- {
- stdx::lock_guard<Client> lk(*_txn->getClient());
- currentOp = CurOp::get(_txn);
- currentOp->setLogicalOp_inlock(LogicalOp::opInsert);
- currentOp->ensureStarted();
- currentOp->setNS_inlock(request.getNS().ns());
- currentOp->debug().ninserted = 0;
- }
-
- auto client = _txn->getClient();
- auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp();
- ScopeGuard lastOpSetterGuard = MakeObjGuard(repl::ReplClientInfo::forClient(client),
- &repl::ReplClientInfo::setLastOpToSystemLastOpTime,
- _txn);
-
- // If this is the local database, don't set last op.
- if (request.getNS().isLocal()) {
- lastOpSetterGuard.Dismiss();
- }
-
- int64_t chunkCount = 0;
- int64_t chunkBytes = 0;
- const int64_t chunkMaxCount = internalQueryExecYieldIterations / 2;
- size_t startIndex = 0;
- size_t maxIndex = state.request->sizeWriteOps() - 1;
-
- for (size_t i = 0; i <= maxIndex; ++i) {
- if (i == maxIndex)
- setupSynchronousCommit(_txn);
- state.currIndex = i;
- BatchItemRef currInsertItem(state.request, state.currIndex);
- chunkBytes += currInsertItem.getDocument().objsize();
- chunkCount++;
-
- if ((chunkCount >= chunkMaxCount) || (chunkBytes >= insertVectorMaxBytes) ||
- (i == maxIndex)) {
- bool stop;
- stop = insertMany(&state, startIndex, i + 1, currentOp, errors, request.getOrdered());
- startIndex = i + 1;
- chunkCount = 0;
- chunkBytes = 0;
-
- if (state.hasLock()) {
- // insertOne acquires the locks, but does not release them on non-error cases,
- // so we release them here. insertOne() reacquires them via lockAndCheck().
- state.unlock();
- // This releases any storage engine held locks/snapshots.
- _txn->recoveryUnit()->abandonSnapshot();
- // Since the lock manager guarantees FIFO queues waiting on locks,
- // there is no need to explicitly sleep or give up control of the processor here.
- }
- if (stop)
- break;
- }
- }
-
- if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) {
- // If this operation has already generated a new lastOp, don't bother setting it
- // here. No-op updates will not generate a new lastOp, so we still need the guard to
- // fire in that case.
- lastOpSetterGuard.Dismiss();
- }
-
- // TODO: Move Top and CurOp metrics management into an RAII object.
- currentOp->done();
- recordCurOpMetrics(_txn);
- Top::get(_txn->getClient()->getServiceContext())
- .record(currentOp->getNS(),
- currentOp->getLogicalOp(),
- 1, // "write locked"
- currentOp->totalTimeMicros(),
- currentOp->isCommand());
-}
-
-void WriteBatchExecutor::execUpdate(const BatchItemRef& updateItem,
- BSONObj* upsertedId,
- WriteErrorDetail** error) {
- // BEGIN CURRENT OP
- CurOp currentOp(_txn);
- beginCurrentOp(_txn, updateItem);
- _opCounters->gotUpdate();
-
- WriteOpResult result;
- multiUpdate(_txn, updateItem, &result);
-
- if (!result.getStats().upsertedID.isEmpty()) {
- *upsertedId = result.getStats().upsertedID;
- }
- // END CURRENT OP
- incWriteStats(updateItem.getOpType(), result.getStats(), result.getError(), &currentOp);
- finishCurrentOp(_txn, result.getError());
-
- // End current transaction and release snapshot.
- _txn->recoveryUnit()->abandonSnapshot();
-
- if (result.getError()) {
- result.getError()->setIndex(updateItem.getItemIndex());
- *error = result.releaseError();
- }
-}
-
-void WriteBatchExecutor::execRemove(const BatchItemRef& removeItem, WriteErrorDetail** error) {
- // Removes are similar to updates, but page faults are handled externally
-
- // BEGIN CURRENT OP
- CurOp currentOp(_txn);
- beginCurrentOp(_txn, removeItem);
- _opCounters->gotDelete();
-
- WriteOpResult result;
- multiRemove(_txn, removeItem, &result);
-
- // END CURRENT OP
- incWriteStats(removeItem.getOpType(), result.getStats(), result.getError(), &currentOp);
- finishCurrentOp(_txn, result.getError());
-
- // End current transaction and release snapshot.
- _txn->recoveryUnit()->abandonSnapshot();
-
- if (result.getError()) {
- result.getError()->setIndex(removeItem.getItemIndex());
- *error = result.releaseError();
- }
-}
-
-//
-// IN-DB-LOCK CORE OPERATIONS
-//
-
-WriteBatchExecutor::ExecInsertsState::ExecInsertsState(OperationContext* txn,
- const BatchedCommandRequest* aRequest)
- : txn(txn), request(aRequest), _transaction(txn, MODE_IX) {}
-
-bool WriteBatchExecutor::ExecInsertsState::_lockAndCheckImpl(WriteOpResult* result,
- bool intentLock) {
- if (hasLock()) {
- CurOp::get(txn)->raiseDbProfileLevel(_database->getProfilingLevel());
- return true;
- }
-
- if (request->isInsertIndexRequest())
- intentLock = false; // can't build indexes in intent mode
-
- const NamespaceString& nss = request->getNS();
- invariant(!_collLock);
- invariant(!_dbLock);
- _dbLock =
- stdx::make_unique<Lock::DBLock>(txn->lockState(), nss.db(), intentLock ? MODE_IX : MODE_X);
- _database = dbHolder().get(txn, nss.ns());
- if (intentLock && !_database) {
- // Ensure exclusive lock in case the database doesn't yet exist
- _dbLock.reset();
- _dbLock = stdx::make_unique<Lock::DBLock>(txn->lockState(), nss.db(), MODE_X);
- intentLock = false;
- }
- _collLock = stdx::make_unique<Lock::CollectionLock>(
- txn->lockState(), nss.ns(), intentLock ? MODE_IX : MODE_X);
- if (!checkIsMasterForDatabase(nss, result)) {
- return false;
- }
- if (!checkShardVersion(txn, *request, result)) {
- return false;
- }
- if (!checkIndexConstraints(txn, *request, result)) {
- return false;
- }
-
- if (!_database) {
- invariant(!intentLock);
- _database = dbHolder().openDb(txn, nss.ns());
- }
- CurOp::get(txn)->raiseDbProfileLevel(_database->getProfilingLevel());
- _collection = _database->getCollection(request->getTargetingNS());
- if (!_collection) {
- if (intentLock) {
- // try again with full X lock.
- unlock();
- return _lockAndCheckImpl(result, false);
- }
-
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- WriteUnitOfWork wunit(txn);
- // Implicitly create if it doesn't exist
- _collection = _database->createCollection(txn, request->getTargetingNS());
- if (!_collection) {
- result->setError(toWriteError(
- Status(ErrorCodes::InternalError,
- "could not create collection " + request->getTargetingNS())));
- return false;
- }
- wunit.commit();
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", request->getTargetingNS());
- }
- return true;
-}
-
-bool WriteBatchExecutor::ExecInsertsState::lockAndCheck(WriteOpResult* result) {
- if (_lockAndCheckImpl(result, true))
- return true;
- unlock();
- return false;
-}
-
-void WriteBatchExecutor::ExecInsertsState::unlock() {
- _collection = nullptr;
- _database = nullptr;
- _collLock.reset();
- _dbLock.reset();
-}
-
-static void insertOne(WriteBatchExecutor::ExecInsertsState* state, WriteOpResult* result) {
- // we have to be top level so we can retry
- OperationContext* txn = state->txn;
- invariant(!txn->lockState()->inAWriteUnitOfWork());
- invariant(state->currIndex < state->normalizedInserts.size());
-
- const StatusWith<BSONObj>& normalizedInsert(state->normalizedInserts[state->currIndex]);
-
- if (!normalizedInsert.isOK()) {
- result->setError(toWriteError(normalizedInsert.getStatus()));
- return;
- }
-
- const BSONObj& insertDoc = normalizedInsert.getValue().isEmpty()
- ? state->request->getInsertRequest()->getDocumentsAt(state->currIndex)
- : normalizedInsert.getValue();
-
- try {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- if (state->request->isInsertIndexRequest()) {
- singleCreateIndex(txn, insertDoc, result);
- } else {
- if (state->lockAndCheck(result)) {
- dassert(txn->lockState()->isCollectionLockedForMode(
- state->getCollection()->ns().ns(), MODE_IX));
-
- WriteUnitOfWork wunit(txn);
- Status status = state->getCollection()->insertDocument(
- txn, insertDoc, &CurOp::get(txn)->debug(), true);
-
- if (status.isOK()) {
- result->getStats().n++;
- wunit.commit();
- } else {
- result->setError(toWriteError(status));
- }
- }
- }
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- txn, "insert", state->getCollection() ? state->getCollection()->ns().ns() : "index");
- } catch (const StaleConfigException& e) {
- result->setError(new WriteErrorDetail());
- buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError());
- } catch (const DBException& ex) {
- Status status(ex.toStatus());
- if (ErrorCodes::isInterruption(status.code()))
- throw;
- result->setError(toWriteError(status));
- }
-
- // Errors release the write lock, as a matter of policy.
- if (result->getError()) {
- txn->recoveryUnit()->abandonSnapshot();
- state->unlock();
- }
-}
-
-/**
- * Perform a single index creation on a collection. Requires the index descriptor be
- * preprocessed.
- *
- * Might fault or error, otherwise populates the result.
- */
-static void singleCreateIndex(OperationContext* txn,
- const BSONObj& indexDesc,
- WriteOpResult* result) {
- BSONElement nsElement = indexDesc["ns"];
- uassert(ErrorCodes::NoSuchKey, "Missing \"ns\" field in index description", !nsElement.eoo());
- uassert(ErrorCodes::TypeMismatch,
- str::stream() << "Expected \"ns\" field of index description to be a "
- "string, "
- "but found a " << typeName(nsElement.type()),
- nsElement.type() == String);
- const NamespaceString ns(nsElement.valueStringData());
- BSONObjBuilder cmdBuilder;
- cmdBuilder << "createIndexes" << ns.coll();
- cmdBuilder << "indexes" << BSON_ARRAY(indexDesc);
- BSONObj cmd = cmdBuilder.done();
- Command* createIndexesCmd = Command::findCommand("createIndexes");
- invariant(createIndexesCmd);
- std::string errmsg;
- BSONObjBuilder resultBuilder;
- const bool success =
- createIndexesCmd->run(txn, ns.db().toString(), cmd, 0, errmsg, resultBuilder);
- Command::appendCommandStatus(resultBuilder, success, errmsg);
- BSONObj cmdResult = resultBuilder.done();
- uassertStatusOK(getStatusFromCommandResult(cmdResult));
- result->getStats().n =
- cmdResult["numIndexesAfter"].numberInt() - cmdResult["numIndexesBefore"].numberInt();
-}
-
-static void multiUpdate(OperationContext* txn,
- const BatchItemRef& updateItem,
- WriteOpResult* result) {
- const NamespaceString nsString(updateItem.getRequest()->getNS());
- const bool isMulti = updateItem.getUpdate()->getMulti();
- UpdateRequest request(nsString);
- request.setQuery(updateItem.getUpdate()->getQuery());
- request.setUpdates(updateItem.getUpdate()->getUpdateExpr());
- request.setMulti(isMulti);
- request.setUpsert(updateItem.getUpdate()->getUpsert());
- UpdateLifecycleImpl updateLifecycle(request.getNamespaceString());
- request.setLifecycle(&updateLifecycle);
-
- // Updates from the write commands path can yield.
- request.setYieldPolicy(PlanExecutor::YIELD_AUTO);
-
- auto client = txn->getClient();
- auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp();
- ScopeGuard lastOpSetterGuard = MakeObjGuard(repl::ReplClientInfo::forClient(client),
- &repl::ReplClientInfo::setLastOpToSystemLastOpTime,
- txn);
-
- // If this is the local database, don't set last op.
- if (nsString.isLocal()) {
- lastOpSetterGuard.Dismiss();
- }
-
- int attempt = 0;
- bool createCollection = false;
- for (int fakeLoop = 0; fakeLoop < 1; fakeLoop++) {
- ParsedUpdate parsedUpdate(txn, &request);
- Status status = parsedUpdate.parseRequest();
- if (!status.isOK()) {
- result->setError(toWriteError(status));
- return;
- }
-
- if (createCollection) {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- const AutoGetOrCreateDb adb{txn, nsString.db(), MODE_X};
-
- if (!checkIsMasterForDatabase(nsString, result)) {
- return;
- }
-
- Database* const db = adb.getDb();
- if (db->getCollection(nsString.ns())) {
- // someone else beat us to it
- } else {
- WriteUnitOfWork wuow(txn);
- uassertStatusOK(userCreateNS(txn, db, nsString.ns(), BSONObj()));
- wuow.commit();
- }
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "update", nsString.ns());
- }
-
- ///////////////////////////////////////////
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX);
- Lock::CollectionLock colLock(
- txn->lockState(), nsString.ns(), parsedUpdate.isIsolated() ? MODE_X : MODE_IX);
- ///////////////////////////////////////////
-
- if (!checkIsMasterForDatabase(nsString, result)) {
- return;
- }
-
- if (!checkShardVersion(txn, *updateItem.getRequest(), result)) {
- return;
- }
-
- Database* const db = dbHolder().get(txn, nsString.db());
-
- if (db == NULL) {
- if (createCollection) {
- // we raced with some, accept defeat
- result->getStats().nModified = 0;
- result->getStats().n = 0;
- return;
- }
-
- // Database not yet created
- if (!request.isUpsert()) {
- // not an upsert, no database, nothing to do
- result->getStats().nModified = 0;
- result->getStats().n = 0;
- return;
- }
-
- // upsert, don't try to get a context as no MODE_X lock is held
- fakeLoop = -1;
- createCollection = true;
- continue;
- }
-
- CurOp::get(txn)->raiseDbProfileLevel(db->getProfilingLevel());
- Collection* collection = db->getCollection(nsString.ns());
-
- if (collection == NULL) {
- if (createCollection) {
- // we raced with some, accept defeat
- result->getStats().nModified = 0;
- result->getStats().n = 0;
- return;
- }
-
- if (!request.isUpsert()) {
- // not an upsert, no collection, nothing to do
- result->getStats().nModified = 0;
- result->getStats().n = 0;
- return;
- }
-
- // upsert, mark that we should create collection
- fakeLoop = -1;
- createCollection = true;
- continue;
- }
-
- OpDebug* debug = &CurOp::get(txn)->debug();
-
- try {
- invariant(collection);
- std::unique_ptr<PlanExecutor> exec =
- uassertStatusOK(getExecutorUpdate(txn, debug, collection, &parsedUpdate));
-
- uassertStatusOK(exec->executePlan());
-
- PlanSummaryStats summary;
- Explain::getSummaryStats(*exec, &summary);
- collection->infoCache()->notifyOfQuery(txn, summary.indexesUsed);
-
- const UpdateStats* updateStats = UpdateStage::getUpdateStats(exec.get());
- UpdateStage::recordUpdateStatsInOpDebug(updateStats, debug);
- debug->setPlanSummaryMetrics(summary);
-
- UpdateResult res = UpdateStage::makeUpdateResult(updateStats);
-
- const long long numDocsModified = res.numDocsModified;
- const long long numMatched = res.numMatched;
- const BSONObj resUpsertedID = res.upserted;
-
- // We have an _id from an insert
- const bool didInsert = !resUpsertedID.isEmpty();
-
- result->getStats().nModified = numDocsModified;
- result->getStats().n = didInsert ? 1 : numMatched;
- result->getStats().upsertedID = resUpsertedID;
-
- if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) {
- // If this operation has already generated a new lastOp, don't bother setting it
- // here. No-op updates will not generate a new lastOp, so we still need the guard to
- // fire in that case.
- lastOpSetterGuard.Dismiss();
- }
- } catch (const WriteConflictException&) {
- debug->writeConflicts++;
- if (isMulti) {
- log() << "Had WriteConflict during multi update, aborting";
- throw;
- }
-
- createCollection = false;
-
- // RESTART LOOP
- fakeLoop = -1;
- txn->recoveryUnit()->abandonSnapshot();
-
- WriteConflictException::logAndBackoff(attempt++, "update", nsString.ns());
- } catch (const StaleConfigException& e) {
- result->setError(new WriteErrorDetail());
- buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError());
- } catch (const DBException& ex) {
- Status status = ex.toStatus();
- if (ErrorCodes::isInterruption(status.code())) {
- throw;
- }
- result->setError(toWriteError(status));
- }
- }
-}
-
-/**
- * Perform a remove operation, which might remove multiple documents. Dispatches to remove code
- * currently to do most of this.
- *
- * Might fault or error, otherwise populates the result.
- */
-static void multiRemove(OperationContext* txn,
- const BatchItemRef& removeItem,
- WriteOpResult* result) {
- const NamespaceString& nss = removeItem.getRequest()->getNS();
- DeleteRequest request(nss);
- request.setQuery(removeItem.getDelete()->getQuery());
- request.setMulti(removeItem.getDelete()->getLimit() != 1);
- request.setGod(false);
-
- // Deletes running through the write commands path can yield.
- request.setYieldPolicy(PlanExecutor::YIELD_AUTO);
-
- auto client = txn->getClient();
- auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp();
- ScopeGuard lastOpSetterGuard = MakeObjGuard(repl::ReplClientInfo::forClient(client),
- &repl::ReplClientInfo::setLastOpToSystemLastOpTime,
- txn);
-
- // If this is the local database, don't set last op.
- if (nss.isLocal()) {
- lastOpSetterGuard.Dismiss();
- }
-
- OpDebug* opDebug = &CurOp::get(txn)->debug();
-
- int attempt = 1;
- while (1) {
- try {
- ParsedDelete parsedDelete(txn, &request);
- Status status = parsedDelete.parseRequest();
- if (!status.isOK()) {
- result->setError(toWriteError(status));
- return;
- }
-
- ScopedTransaction scopedXact(txn, MODE_IX);
- AutoGetDb autoDb(txn, nss.db(), MODE_IX);
- if (!autoDb.getDb()) {
- break;
- }
-
- CurOp::get(txn)->raiseDbProfileLevel(autoDb.getDb()->getProfilingLevel());
- Lock::CollectionLock collLock(
- txn->lockState(), nss.ns(), parsedDelete.isIsolated() ? MODE_X : MODE_IX);
-
- // getExecutorDelete() also checks if writes are allowed.
- if (!checkIsMasterForDatabase(nss, result)) {
- return;
- }
- // Check version once we're locked
-
- if (!checkShardVersion(txn, *removeItem.getRequest(), result)) {
- // Version error
- return;
- }
-
- auto collection = autoDb.getDb()->getCollection(nss);
-
- std::unique_ptr<PlanExecutor> exec =
- uassertStatusOK(getExecutorDelete(txn, opDebug, collection, &parsedDelete));
-
- // Execute the delete and retrieve the number deleted.
- uassertStatusOK(exec->executePlan());
- result->getStats().n = DeleteStage::getNumDeleted(*exec);
-
- PlanSummaryStats summary;
- Explain::getSummaryStats(*exec, &summary);
- if (collection) {
- collection->infoCache()->notifyOfQuery(txn, summary.indexesUsed);
- }
-
- CurOp::get(txn)->debug().setPlanSummaryMetrics(summary);
-
- if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) {
- // If this operation has already generated a new lastOp, don't bother setting it
- // here. No-op updates will not generate a new lastOp, so we still need the guard to
- // fire in that case.
- lastOpSetterGuard.Dismiss();
- }
- break;
- } catch (const WriteConflictException&) {
- CurOp::get(txn)->debug().writeConflicts++;
- WriteConflictException::logAndBackoff(attempt++, "delete", nss.ns());
- } catch (const StaleConfigException& e) {
- result->setError(new WriteErrorDetail());
- buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError());
- return;
- } catch (const DBException& ex) {
- Status status = ex.toStatus();
- if (ErrorCodes::isInterruption(status.code())) {
- throw;
- }
- result->setError(toWriteError(status));
- return;
- }
- }
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/commands/write_commands/batch_executor.h b/src/mongo/db/commands/write_commands/batch_executor.h
deleted file mode 100644
index 945adfca021..00000000000
--- a/src/mongo/db/commands/write_commands/batch_executor.h
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Copyright (C) 2013 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <string>
-
-#include "mongo/base/disallow_copying.h"
-#include "mongo/db/ops/update_request.h"
-#include "mongo/s/write_ops/batched_command_request.h"
-#include "mongo/s/write_ops/batched_command_response.h"
-#include "mongo/s/write_ops/batched_delete_document.h"
-#include "mongo/s/write_ops/batched_update_document.h"
-#include "mongo/util/mongoutils/str.h"
-
-namespace mongo {
-
-class BSONObjBuilder;
-class CurOp;
-class LastError;
-class OpCounters;
-class OperationContext;
-class WriteBatchStats;
-struct WriteOpStats;
-
-/**
- * An instance of WriteBatchExecutor is an object capable of issuing a write batch.
- */
-class WriteBatchExecutor {
- MONGO_DISALLOW_COPYING(WriteBatchExecutor);
-
-public:
- // State object used by private execInserts. TODO: Do not expose this type.
- class ExecInsertsState;
-
- WriteBatchExecutor(OperationContext* txn, OpCounters* opCounters, LastError* le);
-
- /**
- * Issues writes with requested write concern. Fills response with errors if problems
- * occur.
- */
- void executeBatch(const BatchedCommandRequest& request, BatchedCommandResponse* response);
-
- const WriteBatchStats& getStats() const;
-
- /**
- * Does basic validation of the batch request. Returns a non-OK status if
- * any problems with the batch are found.
- */
- static Status validateBatch(const BatchedCommandRequest& request);
-
-private:
- /**
- * Executes the writes in the batch and returns upserted _ids and write errors.
- * Dispatches to one of the three functions below for DBLock, CurOp, and stats management.
- */
- void bulkExecute(const BatchedCommandRequest& request,
- std::vector<BatchedUpsertDetail*>* upsertedIds,
- std::vector<WriteErrorDetail*>* errors);
-
- /**
- * Inserts a subset of an insert batch.
- * Returns a true to discontinue the insert, or false if not.
- */
- bool insertMany(WriteBatchExecutor::ExecInsertsState* state,
- size_t startIndex,
- size_t endIndex,
- CurOp* currentOp,
- std::vector<WriteErrorDetail*>* errors,
- bool ordered);
-
- /**
- * Executes the inserts of an insert batch and returns the write errors.
- *
- * Internally uses the DBLock of the request namespace.
- * May execute multiple inserts inside the same DBLock, and/or take the DBLock multiple
- * times.
- */
- void execInserts(const BatchedCommandRequest& request, std::vector<WriteErrorDetail*>* errors);
-
- /**
- * Executes an update item (which may update many documents or upsert), and returns the
- * upserted _id on upsert or error on failure.
- *
- * Internally uses the DBLock of the update namespace.
- * May take the DBLock multiple times.
- */
- void execUpdate(const BatchItemRef& updateItem, BSONObj* upsertedId, WriteErrorDetail** error);
-
- /**
- * Executes a delete item (which may remove many documents) and returns an error on failure.
- *
- * Internally uses the DBLock of the delete namespace.
- * May take the DBLock multiple times.
- */
- void execRemove(const BatchItemRef& removeItem, WriteErrorDetail** error);
-
- /**
- * Helper for incrementing stats after each individual write op.
- *
- * No lock requirements (though usually done inside write lock to make stats update look
- * atomic).
- */
- void incWriteStats(const BatchedCommandRequest::BatchType opType,
- const WriteOpStats& stats,
- const WriteErrorDetail* error,
- CurOp* currentOp);
-
- OperationContext* _txn;
-
- // OpCounters object to update - needed for stats reporting
- // Not owned here.
- OpCounters* _opCounters;
-
- // LastError object to use for preparing write results - needed for stats reporting
- // Not owned here.
- LastError* _le;
-
- // Stats
- std::unique_ptr<WriteBatchStats> _stats;
-};
-
-/**
- * Holds information about the result of a single write operation.
- */
-struct WriteOpStats {
- WriteOpStats() : n(0), nModified(0) {}
-
- void reset() {
- n = 0;
- nModified = 0;
- upsertedID = BSONObj();
- }
-
- // Num docs logically affected by this operation.
- int n;
-
- // Num docs actually modified by this operation, if applicable (update)
- int nModified;
-
- // _id of newly upserted document, if applicable (update)
- BSONObj upsertedID;
-};
-
-/**
- * Full stats accumulated by a write batch execution. Note that these stats do not directly
- * correspond to the stats accumulated in opCounters and LastError.
- */
-class WriteBatchStats {
-public:
- WriteBatchStats()
- : numInserted(0), numUpserted(0), numMatched(0), numModified(0), numDeleted(0) {}
-
- int numInserted;
- int numUpserted;
- int numMatched;
- int numModified;
- int numDeleted;
-};
-
-} // namespace mongo
diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp
index 78f5cde997c..dacfe9dab56 100644
--- a/src/mongo/db/commands/write_commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands/write_commands.cpp
@@ -26,14 +26,12 @@
* it in the license file.
*/
-#include "mongo/db/commands/write_commands/write_commands.h"
-
#include "mongo/base/init.h"
#include "mongo/bson/mutable/document.h"
#include "mongo/bson/mutable/element.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/client.h"
-#include "mongo/db/commands/write_commands/batch_executor.h"
+#include "mongo/db/commands.h"
#include "mongo/db/commands/write_commands/write_commands_common.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
@@ -43,12 +41,16 @@
#include "mongo/db/ops/parsed_delete.h"
#include "mongo/db/ops/parsed_update.h"
#include "mongo/db/ops/update_lifecycle_impl.h"
+#include "mongo/db/ops/write_ops_exec.h"
+#include "mongo/db/ops/write_ops_parsers.h"
#include "mongo/db/query/explain.h"
#include "mongo/db/query/get_executor.h"
+#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/write_concern.h"
+#include "mongo/s/stale_exception.h"
namespace mongo {
@@ -57,20 +59,7 @@ using std::stringstream;
namespace {
-MONGO_INITIALIZER(RegisterWriteCommands)(InitializerContext* context) {
- // Leaked intentionally: a Command registers itself when constructed.
- new CmdInsert();
- new CmdUpdate();
- new CmdDelete();
- return Status::OK();
-}
-
-} // namespace
-
-WriteCmd::WriteCmd(StringData name, BatchedCommandRequest::BatchType writeType)
- : Command(name), _writeType(writeType) {}
-
-void WriteCmd::redactTooLongLog(mutablebson::Document* cmdObj, StringData fieldName) {
+void redactTooLongLog(mutablebson::Document* cmdObj, StringData fieldName) {
namespace mmb = mutablebson;
mmb::Element root = cmdObj->root();
mmb::Element field = root.findFirstChildNamed(fieldName);
@@ -86,204 +75,326 @@ void WriteCmd::redactTooLongLog(mutablebson::Document* cmdObj, StringData fieldN
}
}
-// Slaves can't perform writes.
-bool WriteCmd::slaveOk() const {
- return false;
-}
-
-
-bool WriteCmd::supportsWriteConcern(const BSONObj& cmd) const {
- return true;
-}
-
-Status WriteCmd::checkAuthForCommand(ClientBasic* client,
- const std::string& dbname,
- const BSONObj& cmdObj) {
- Status status(auth::checkAuthForWriteCommand(AuthorizationSession::get(client),
- _writeType,
- NamespaceString(parseNs(dbname, cmdObj)),
- cmdObj));
-
- // TODO: Remove this when we standardize GLE reporting from commands
+Status checkAuthForWriteCommand(ClientBasic* client,
+ BatchedCommandRequest::BatchType batchType,
+ NamespaceString ns,
+ const BSONObj& cmdObj) {
+ Status status =
+ auth::checkAuthForWriteCommand(AuthorizationSession::get(client), batchType, ns, cmdObj);
if (!status.isOK()) {
LastError::get(client).setLastError(status.code(), status.reason());
}
-
return status;
}
-// Write commands are counted towards their corresponding opcounters, not command opcounters.
-bool WriteCmd::shouldAffectCommandCounter() const {
- return false;
+bool shouldSkipOutput(OperationContext* txn) {
+ const WriteConcernOptions& writeConcern = txn->getWriteConcern();
+ return writeConcern.wMode.empty() && writeConcern.wNumNodes == 0 &&
+ (writeConcern.syncMode == WriteConcernOptions::SyncMode::NONE ||
+ writeConcern.syncMode == WriteConcernOptions::SyncMode::UNSET);
}
-bool WriteCmd::run(OperationContext* txn,
- const string& dbName,
- BSONObj& cmdObj,
- int options,
- string& errMsg,
- BSONObjBuilder& result) {
- // Can't be run on secondaries.
- dassert(txn->writesAreReplicated());
- BatchedCommandRequest request(_writeType);
- BatchedCommandResponse response;
-
- if (!request.parseBSON(dbName, cmdObj, &errMsg) || !request.isValid(&errMsg)) {
- return appendCommandStatus(result, Status(ErrorCodes::FailedToParse, errMsg));
+enum class ReplyStyle { kUpdate, kNotUpdate }; // update has extra fields.
+void serializeReply(OperationContext* txn,
+ ReplyStyle replyStyle,
+ bool continueOnError,
+ size_t opsInBatch,
+ const WriteResult& result,
+ BSONObjBuilder* out) {
+ if (shouldSkipOutput(txn))
+ return;
+
+ long long n = 0;
+ long long nModified = 0;
+ std::vector<BSONObj> upsertInfo;
+ std::vector<BSONObj> errors;
+ BSONSizeTracker upsertInfoSizeTracker;
+ BSONSizeTracker errorsSizeTracker;
+
+ for (size_t i = 0; i < result.results.size(); i++) {
+ if (result.results[i].isOK()) {
+ const auto& opResult = result.results[i].getValue();
+ n += opResult.n; // Always there.
+ if (replyStyle == ReplyStyle::kUpdate) {
+ nModified += opResult.nModified;
+ if (!opResult.upsertedId.isEmpty()) {
+ BSONObjBuilder upsertedId(upsertInfoSizeTracker);
+ upsertedId.append("index", int(i));
+ upsertedId.appendAs(opResult.upsertedId.firstElement(), "_id");
+ upsertInfo.push_back(upsertedId.obj());
+ }
+ }
+ continue;
+ }
+
+ const auto& status = result.results[i].getStatus();
+ BSONObjBuilder error(errorsSizeTracker);
+ error.append("index", int(i));
+ error.append("code", int(status.code()));
+ error.append("errmsg", status.reason());
+ errors.push_back(error.obj());
}
- WriteBatchExecutor writeBatchExecutor(
- txn, &globalOpCounters, &LastError::get(txn->getClient()));
+ if (result.staleConfigException) {
+ // For ordered:false commands we need to duplicate the StaleConfig result for all ops
+ // after we stopped. result.results doesn't include the staleConfigException.
+ // See the comment on WriteResult::staleConfigException for more info.
+ int endIndex = continueOnError ? opsInBatch : result.results.size() + 1;
+ for (int i = result.results.size(); i < endIndex; i++) {
+ BSONObjBuilder error(errorsSizeTracker);
+ error.append("index", i);
+ error.append("code", int(ErrorCodes::StaleShardVersion)); // Different from exception!
+ error.append("errmsg", result.staleConfigException->getInfo().msg);
+ {
+ BSONObjBuilder errInfo(error.subobjStart("errInfo"));
+ result.staleConfigException->getVersionWanted().addToBSON(errInfo, "vWanted");
+ }
+ errors.push_back(error.obj());
+ }
+ }
+
+ out->appendNumber("n", n);
+
+ if (replyStyle == ReplyStyle::kUpdate) {
+ out->appendNumber("nModified", nModified);
+ if (!upsertInfo.empty()) {
+ out->append("upserted", upsertInfo);
+ }
+ }
- writeBatchExecutor.executeBatch(request, &response);
+ if (!errors.empty()) {
+ out->append("writeErrors", errors);
+ }
- result.appendElements(response.toBSON());
- return response.getOk();
+ // writeConcernError field is handled by command processor.
+
+ {
+ // Undocumented repl fields that mongos depends on.
+ auto* replCoord = repl::ReplicationCoordinator::get(txn->getServiceContext());
+ const auto replMode = replCoord->getReplicationMode();
+ if (replMode != repl::ReplicationCoordinator::modeNone) {
+ const auto lastOp = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
+ if (lastOp.getTerm() == repl::OpTime::kUninitializedTerm) {
+ out->append("opTime", lastOp.getTimestamp());
+ } else {
+ lastOp.append(out, "opTime");
+ }
+
+ if (replMode == repl::ReplicationCoordinator::modeReplSet) {
+ out->append("electionId", replCoord->getElectionId());
+ }
+ }
+ }
}
-Status WriteCmd::explain(OperationContext* txn,
- const std::string& dbname,
- const BSONObj& cmdObj,
- ExplainCommon::Verbosity verbosity,
- const rpc::ServerSelectionMetadata&,
- BSONObjBuilder* out) const {
- // For now we only explain update and delete write commands.
- if (BatchedCommandRequest::BatchType_Update != _writeType &&
- BatchedCommandRequest::BatchType_Delete != _writeType) {
- return Status(ErrorCodes::IllegalOperation,
- "Only update and delete write ops can be explained");
+class WriteCommand : public Command {
+public:
+ explicit WriteCommand(StringData name) : Command(name) {}
+
+ bool slaveOk() const final {
+ return false;
}
- // Parse the batch request.
- BatchedCommandRequest request(_writeType);
- std::string errMsg;
- if (!request.parseBSON(dbname, cmdObj, &errMsg) || !request.isValid(&errMsg)) {
- return Status(ErrorCodes::FailedToParse, errMsg);
+ bool shouldAffectCommandCounter() const final {
+ return false;
}
- // Do the validation of the batch that is shared with non-explained write batches.
- Status isValid = WriteBatchExecutor::validateBatch(request);
- if (!isValid.isOK()) {
- return isValid;
+ bool supportsWriteConcern(const BSONObj& cmd) const {
+ return true;
}
- // Explain must do one additional piece of validation: For now we only explain
- // singleton batches.
- if (request.sizeWriteOps() != 1u) {
- return Status(ErrorCodes::InvalidLength, "explained write batches must be of size 1");
+ bool run(OperationContext* txn,
+ const std::string& dbname,
+ BSONObj& cmdObj,
+ int options,
+ std::string& errmsg,
+ BSONObjBuilder& result) final {
+ try {
+ runImpl(txn, dbname, cmdObj, result);
+ return true;
+ } catch (const DBException& ex) {
+ LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg);
+ throw;
+ }
}
- ScopedTransaction scopedXact(txn, MODE_IX);
+ virtual void runImpl(OperationContext* txn,
+ const std::string& dbname,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) = 0;
+};
- // Get a reference to the singleton batch item (it's the 0th item in the batch).
- BatchItemRef batchItem(&request, 0);
+} // namespace
- OpDebug* opDebug = &CurOp::get(txn)->debug();
+class CmdInsert final : public WriteCommand {
+public:
+ CmdInsert() : WriteCommand("insert") {}
- if (BatchedCommandRequest::BatchType_Update == _writeType) {
- // Create the update request.
- UpdateRequest updateRequest(request.getNS());
- updateRequest.setQuery(batchItem.getUpdate()->getQuery());
- updateRequest.setUpdates(batchItem.getUpdate()->getUpdateExpr());
- updateRequest.setMulti(batchItem.getUpdate()->getMulti());
- updateRequest.setUpsert(batchItem.getUpdate()->getUpsert());
- UpdateLifecycleImpl updateLifecycle(updateRequest.getNamespaceString());
- updateRequest.setLifecycle(&updateLifecycle);
- updateRequest.setExplain();
+ void redactForLogging(mutablebson::Document* cmdObj) final {
+ redactTooLongLog(cmdObj, "documents");
+ }
- // Explained updates can yield.
- updateRequest.setYieldPolicy(PlanExecutor::YIELD_AUTO);
+ void help(stringstream& help) const final {
+ help << "insert documents";
+ }
- ParsedUpdate parsedUpdate(txn, &updateRequest);
- Status parseStatus = parsedUpdate.parseRequest();
- if (!parseStatus.isOK()) {
- return parseStatus;
- }
+ Status checkAuthForCommand(ClientBasic* client,
+ const std::string& dbname,
+ const BSONObj& cmdObj) final {
+ return checkAuthForWriteCommand(client,
+ BatchedCommandRequest::BatchType_Insert,
+ NamespaceString(parseNs(dbname, cmdObj)),
+ cmdObj);
+ }
+
+ void runImpl(OperationContext* txn,
+ const std::string& dbname,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) final {
+ const auto batch = parseInsertCommand(dbname, cmdObj);
+ const auto reply = performInserts(txn, batch);
+ serializeReply(txn,
+ ReplyStyle::kNotUpdate,
+ batch.continueOnError,
+ batch.documents.size(),
+ reply,
+ &result);
+ }
+} cmdInsert;
- // Explains of write commands are read-only, but we take write locks so
- // that timing info is more accurate.
- AutoGetDb autoDb(txn, request.getNS().db(), MODE_IX);
- Lock::CollectionLock colLock(txn->lockState(), request.getNS().ns(), MODE_IX);
+class CmdUpdate final : public WriteCommand {
+public:
+ CmdUpdate() : WriteCommand("update") {}
- // Get a pointer to the (possibly NULL) collection.
- Collection* collection = NULL;
- if (autoDb.getDb()) {
- collection = autoDb.getDb()->getCollection(request.getNS());
- }
+ void redactForLogging(mutablebson::Document* cmdObj) final {
+ redactTooLongLog(cmdObj, "updates");
+ }
- std::unique_ptr<PlanExecutor> exec =
- uassertStatusOK(getExecutorUpdate(txn, opDebug, collection, &parsedUpdate));
+ void help(stringstream& help) const final {
+ help << "update documents";
+ }
- // Explain the plan tree.
- Explain::explainStages(exec.get(), verbosity, out);
- return Status::OK();
- } else {
- invariant(BatchedCommandRequest::BatchType_Delete == _writeType);
-
- // Create the delete request.
- DeleteRequest deleteRequest(request.getNS());
- deleteRequest.setQuery(batchItem.getDelete()->getQuery());
- deleteRequest.setMulti(batchItem.getDelete()->getLimit() != 1);
- deleteRequest.setGod(false);
- deleteRequest.setExplain();
+ Status checkAuthForCommand(ClientBasic* client,
+ const std::string& dbname,
+ const BSONObj& cmdObj) final {
+ return checkAuthForWriteCommand(client,
+ BatchedCommandRequest::BatchType_Update,
+ NamespaceString(parseNs(dbname, cmdObj)),
+ cmdObj);
+ }
- // Explained deletes can yield.
- deleteRequest.setYieldPolicy(PlanExecutor::YIELD_AUTO);
+ void runImpl(OperationContext* txn,
+ const std::string& dbname,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) final {
+ const auto batch = parseUpdateCommand(dbname, cmdObj);
+ const auto reply = performUpdates(txn, batch);
+ serializeReply(
+ txn, ReplyStyle::kUpdate, batch.continueOnError, batch.updates.size(), reply, &result);
+ }
- ParsedDelete parsedDelete(txn, &deleteRequest);
- Status parseStatus = parsedDelete.parseRequest();
- if (!parseStatus.isOK()) {
- return parseStatus;
- }
+ Status explain(OperationContext* txn,
+ const std::string& dbname,
+ const BSONObj& cmdObj,
+ ExplainCommon::Verbosity verbosity,
+ const rpc::ServerSelectionMetadata&,
+ BSONObjBuilder* out) const final {
+ const auto batch = parseUpdateCommand(dbname, cmdObj);
+ uassert(ErrorCodes::InvalidLength,
+ "explained write batches must be of size 1",
+ batch.updates.size() == 1);
+
+ UpdateLifecycleImpl updateLifecycle(batch.ns);
+ UpdateRequest updateRequest(batch.ns);
+ updateRequest.setLifecycle(&updateLifecycle);
+ updateRequest.setQuery(batch.updates[0].query);
+ updateRequest.setUpdates(batch.updates[0].update);
+ updateRequest.setMulti(batch.updates[0].multi);
+ updateRequest.setUpsert(batch.updates[0].upsert);
+ updateRequest.setYieldPolicy(PlanExecutor::YIELD_AUTO);
+ updateRequest.setExplain();
+
+ ParsedUpdate parsedUpdate(txn, &updateRequest);
+ uassertStatusOK(parsedUpdate.parseRequest());
// Explains of write commands are read-only, but we take write locks so that timing
// info is more accurate.
- AutoGetDb autoDb(txn, request.getNS().db(), MODE_IX);
- Lock::CollectionLock colLock(txn->lockState(), request.getNS().ns(), MODE_IX);
-
- // Get a pointer to the (possibly NULL) collection.
- Collection* collection = NULL;
- if (autoDb.getDb()) {
- collection = autoDb.getDb()->getCollection(request.getNS());
- }
-
- std::unique_ptr<PlanExecutor> exec =
- uassertStatusOK(getExecutorDelete(txn, opDebug, collection, &parsedDelete));
+ ScopedTransaction scopedXact(txn, MODE_IX);
+ AutoGetCollection collection(txn, batch.ns, MODE_IX);
- // Explain the plan tree.
+ auto exec = uassertStatusOK(getExecutorUpdate(
+ txn, &CurOp::get(txn)->debug(), collection.getCollection(), &parsedUpdate));
Explain::explainStages(exec.get(), verbosity, out);
return Status::OK();
}
-}
+} cmdUpdate;
-CmdInsert::CmdInsert() : WriteCmd("insert", BatchedCommandRequest::BatchType_Insert) {}
+class CmdDelete final : public WriteCommand {
+public:
+ CmdDelete() : WriteCommand("delete") {}
-void CmdInsert::redactForLogging(mutablebson::Document* cmdObj) {
- redactTooLongLog(cmdObj, StringData("documents", StringData::LiteralTag()));
-}
+ void redactForLogging(mutablebson::Document* cmdObj) final {
+ redactTooLongLog(cmdObj, "deletes");
+ }
-void CmdInsert::help(stringstream& help) const {
- help << "insert documents";
-}
+ void help(stringstream& help) const final {
+ help << "delete documents";
+ }
-CmdUpdate::CmdUpdate() : WriteCmd("update", BatchedCommandRequest::BatchType_Update) {}
+ Status checkAuthForCommand(ClientBasic* client,
+ const std::string& dbname,
+ const BSONObj& cmdObj) final {
+ return checkAuthForWriteCommand(client,
+ BatchedCommandRequest::BatchType_Delete,
+ NamespaceString(parseNs(dbname, cmdObj)),
+ cmdObj);
+ }
-void CmdUpdate::redactForLogging(mutablebson::Document* cmdObj) {
- redactTooLongLog(cmdObj, StringData("updates", StringData::LiteralTag()));
-}
+ void runImpl(OperationContext* txn,
+ const std::string& dbname,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) final {
+ const auto batch = parseDeleteCommand(dbname, cmdObj);
+ const auto reply = performDeletes(txn, batch);
+ serializeReply(txn,
+ ReplyStyle::kNotUpdate,
+ batch.continueOnError,
+ batch.deletes.size(),
+ reply,
+ &result);
+ }
-void CmdUpdate::help(stringstream& help) const {
- help << "update documents";
-}
+ Status explain(OperationContext* txn,
+ const std::string& dbname,
+ const BSONObj& cmdObj,
+ ExplainCommon::Verbosity verbosity,
+ const rpc::ServerSelectionMetadata&,
+ BSONObjBuilder* out) const final {
+ const auto batch = parseDeleteCommand(dbname, cmdObj);
+ uassert(ErrorCodes::InvalidLength,
+ "explained write batches must be of size 1",
+ batch.deletes.size() == 1);
+
+ DeleteRequest deleteRequest(batch.ns);
+ deleteRequest.setQuery(batch.deletes[0].query);
+ deleteRequest.setMulti(batch.deletes[0].multi);
+ deleteRequest.setYieldPolicy(PlanExecutor::YIELD_AUTO);
+ deleteRequest.setExplain();
-CmdDelete::CmdDelete() : WriteCmd("delete", BatchedCommandRequest::BatchType_Delete) {}
+ ParsedDelete parsedDelete(txn, &deleteRequest);
+ uassertStatusOK(parsedDelete.parseRequest());
-void CmdDelete::redactForLogging(mutablebson::Document* cmdObj) {
- redactTooLongLog(cmdObj, StringData("deletes", StringData::LiteralTag()));
-}
+ // Explains of write commands are read-only, but we take write locks so that timing
+ // info is more accurate.
+ ScopedTransaction scopedXact(txn, MODE_IX);
+ AutoGetCollection collection(txn, batch.ns, MODE_IX);
-void CmdDelete::help(stringstream& help) const {
- help << "delete documents";
-}
+ // Explain the plan tree.
+ auto exec = uassertStatusOK(getExecutorDelete(
+ txn, &CurOp::get(txn)->debug(), collection.getCollection(), &parsedDelete));
+ Explain::explainStages(exec.get(), verbosity, out);
+ return Status::OK();
+ }
+} cmdDelete;
} // namespace mongo
diff --git a/src/mongo/db/commands/write_commands/write_commands.h b/src/mongo/db/commands/write_commands/write_commands.h
deleted file mode 100644
index e01ade2b2ff..00000000000
--- a/src/mongo/db/commands/write_commands/write_commands.h
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Copyright (C) 2013 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <string>
-
-#include "mongo/db/commands.h"
-#include "mongo/db/client_basic.h"
-#include "mongo/s/write_ops/batched_command_request.h"
-
-namespace mongo {
-
-/**
- * Base class for write commands. Write commands support batch writes and write concern,
- * and return per-item error information. All write commands use the (non-virtual) entry
- * point WriteCmd::run().
- *
- * Command parsing is performed by the WriteBatch class (command syntax documented there),
- * and command execution is performed by the WriteBatchExecutor class.
- */
-class WriteCmd : public Command {
- MONGO_DISALLOW_COPYING(WriteCmd);
-
-public:
- virtual ~WriteCmd() {}
-
-protected:
- /**
- * Instantiates a command that can be invoked by "name", which will be capable of issuing
- * write batches of type "writeType", and will require privilege "action" to run.
- */
- WriteCmd(StringData name, BatchedCommandRequest::BatchType writeType);
-
- // Full log of write command can be quite large.
- static void redactTooLongLog(mutablebson::Document* cmdObj, StringData fieldName);
-
-private:
- virtual bool slaveOk() const;
-
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override;
-
- virtual Status checkAuthForCommand(ClientBasic* client,
- const std::string& dbname,
- const BSONObj& cmdObj);
-
- virtual bool shouldAffectCommandCounter() const;
-
- // Write command entry point.
- virtual bool run(OperationContext* txn,
- const std::string& dbname,
- BSONObj& cmdObj,
- int options,
- std::string& errmsg,
- BSONObjBuilder& result);
-
- // Write commands can be explained.
- virtual Status explain(OperationContext* txn,
- const std::string& dbname,
- const BSONObj& cmdObj,
- ExplainCommon::Verbosity verbosity,
- const rpc::ServerSelectionMetadata&,
- BSONObjBuilder* out) const;
-
- // Type of batch (e.g. insert).
- BatchedCommandRequest::BatchType _writeType;
-};
-
-class CmdInsert : public WriteCmd {
- MONGO_DISALLOW_COPYING(CmdInsert);
-
-public:
- CmdInsert();
- void redactForLogging(mutablebson::Document* cmdObj);
-
-private:
- virtual void help(std::stringstream& help) const;
-};
-
-class CmdUpdate : public WriteCmd {
- MONGO_DISALLOW_COPYING(CmdUpdate);
-
-public:
- CmdUpdate();
- void redactForLogging(mutablebson::Document* cmdObj);
-
-private:
- virtual void help(std::stringstream& help) const;
-};
-
-class CmdDelete : public WriteCmd {
- MONGO_DISALLOW_COPYING(CmdDelete);
-
-public:
- CmdDelete();
- void redactForLogging(mutablebson::Document* cmdObj);
-
-private:
- virtual void help(std::stringstream& help) const;
-};
-
-} // namespace mongo
diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp
index c71fc2c814d..e9de5d5b127 100644
--- a/src/mongo/db/instance.cpp
+++ b/src/mongo/db/instance.cpp
@@ -40,10 +40,8 @@
#include "mongo/db/auth/authorization_manager.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/auth/authz_manager_external_state_d.h"
-#include "mongo/db/background.h"
-#include "mongo/db/catalog/index_create.h"
#include "mongo/db/client.h"
-#include "mongo/db/clientcursor.h"
+#include "mongo/db/catalog/cursor_manager.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/fsync.h"
#include "mongo/db/concurrency/d_concurrency.h"
@@ -53,10 +51,7 @@
#include "mongo/db/db.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
-#include "mongo/db/dbhelpers.h"
#include "mongo/db/dbmessage.h"
-#include "mongo/db/exec/delete.h"
-#include "mongo/db/exec/update.h"
#include "mongo/db/ftdc/ftdc_mongod.h"
#include "mongo/db/global_timestamp.h"
#include "mongo/db/instance.h"
@@ -67,22 +62,11 @@
#include "mongo/db/mongod_options.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer.h"
-#include "mongo/db/ops/delete_request.h"
-#include "mongo/db/ops/insert.h"
-#include "mongo/db/ops/parsed_delete.h"
-#include "mongo/db/ops/parsed_update.h"
-#include "mongo/db/ops/update_driver.h"
-#include "mongo/db/ops/update_lifecycle_impl.h"
-#include "mongo/db/ops/update_request.h"
+#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/ops/write_ops_parsers.h"
#include "mongo/db/query/find.h"
#include "mongo/db/query/get_executor.h"
-#include "mongo/db/query/plan_summary_stats.h"
-#include "mongo/db/repl/oplog.h"
-#include "mongo/db/repl/repl_client_info.h"
-#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/run_commands.h"
-#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context.h"
@@ -93,7 +77,6 @@
#include "mongo/platform/process_id.h"
#include "mongo/rpc/command_reply_builder.h"
#include "mongo/rpc/command_request.h"
-#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/legacy_reply.h"
#include "mongo/rpc/legacy_reply_builder.h"
#include "mongo/rpc/legacy_request.h"
@@ -115,7 +98,6 @@
#include "mongo/util/time_support.h"
namespace mongo {
-
using logger::LogComponent;
using std::endl;
using std::hex;
@@ -126,6 +108,12 @@ using std::stringstream;
using std::unique_ptr;
using std::vector;
+string dbExecCommand;
+
+MONGO_FP_DECLARE(rsStopGetMore);
+
+namespace {
+
// for diaglog
inline void opread(Message& m) {
if (_diaglog.getLevel() & 2) {
@@ -139,25 +127,6 @@ inline void opwrite(Message& m) {
}
}
-void receivedKillCursors(OperationContext* txn, Message& m);
-
-void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op);
-
-void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op);
-
-void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op);
-
-bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, CurOp& curop);
-
-int nloggedsome = 0;
-#define LOGWITHRATELIMIT if (++nloggedsome < 1000 || nloggedsome % 100 == 0)
-
-string dbExecCommand;
-
-MONGO_FP_DECLARE(rsStopGetMore);
-
-namespace {
-
unique_ptr<AuthzManagerExternalState> createAuthzManagerExternalStateMongod() {
return stdx::make_unique<AuthzManagerExternalStateMongod>();
}
@@ -231,13 +200,11 @@ void beginCommandOp(OperationContext* txn, const NamespaceString& nss, const BSO
curop->setNS_inlock(nss.ns());
}
-} // namespace
-
-static void receivedCommand(OperationContext* txn,
- const NamespaceString& nss,
- Client& client,
- DbResponse& dbResponse,
- Message& message) {
+void receivedCommand(OperationContext* txn,
+ const NamespaceString& nss,
+ Client& client,
+ DbResponse& dbResponse,
+ Message& message) {
invariant(nss.isCommand());
const int32_t responseToMsgId = message.header().getId();
@@ -282,8 +249,6 @@ static void receivedCommand(OperationContext* txn,
dbResponse.responseToMsgId = responseToMsgId;
}
-namespace {
-
void receivedRpc(OperationContext* txn, Client& client, DbResponse& dbResponse, Message& message) {
invariant(message.operation() == dbCommand);
@@ -381,14 +346,13 @@ void receivedPseudoCommand(OperationContext* txn,
receivedCommand(txn, interposedNss, client, dbResponse, interposed);
}
-} // namespace
-
-static void receivedQuery(OperationContext* txn,
- const NamespaceString& nss,
- Client& c,
- DbResponse& dbResponse,
- Message& m) {
+void receivedQuery(OperationContext* txn,
+ const NamespaceString& nss,
+ Client& c,
+ DbResponse& dbResponse,
+ Message& m) {
invariant(!nss.isCommand());
+ globalOpCounters.gotQuery();
int32_t responseToMsgId = m.header().getId();
@@ -420,6 +384,152 @@ static void receivedQuery(OperationContext* txn,
dbResponse.responseToMsgId = responseToMsgId;
}
+void receivedKillCursors(OperationContext* txn, Message& m) {
+ LastError::get(txn->getClient()).disable();
+ DbMessage dbmessage(m);
+ int n = dbmessage.pullInt();
+
+ uassert(13659, "sent 0 cursors to kill", n != 0);
+ massert(13658,
+ str::stream() << "bad kill cursors size: " << m.dataSize(),
+ m.dataSize() == 8 + (8 * n));
+ uassert(13004, str::stream() << "sent negative cursors to kill: " << n, n >= 1);
+
+ if (n > 2000) {
+ (n < 30000 ? warning() : error()) << "receivedKillCursors, n=" << n << endl;
+ verify(n < 30000);
+ }
+
+ const char* cursorArray = dbmessage.getArray(n);
+
+ int found = CursorManager::eraseCursorGlobalIfAuthorized(txn, n, cursorArray);
+
+ if (shouldLog(logger::LogSeverity::Debug(1)) || found != n) {
+ LOG(found == n ? 1 : 0) << "killcursors: found " << found << " of " << n << endl;
+ }
+}
+
+void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Message& m) {
+ auto insertOp = parseLegacyInsert(m);
+ invariant(insertOp.ns == nsString);
+ for (const auto& obj : insertOp.documents) {
+ Status status =
+ AuthorizationSession::get(txn->getClient())->checkAuthForInsert(nsString, obj);
+ audit::logInsertAuthzCheck(txn->getClient(), nsString, obj, status.code());
+ uassertStatusOK(status);
+ }
+ performInserts(txn, insertOp);
+}
+
+void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Message& m) {
+ auto updateOp = parseLegacyUpdate(m);
+ auto& singleUpdate = updateOp.updates[0];
+ invariant(updateOp.ns == nsString);
+
+ Status status = AuthorizationSession::get(txn->getClient())
+ ->checkAuthForUpdate(
+ nsString, singleUpdate.query, singleUpdate.update, singleUpdate.upsert);
+ audit::logUpdateAuthzCheck(txn->getClient(),
+ nsString,
+ singleUpdate.query,
+ singleUpdate.update,
+ singleUpdate.upsert,
+ singleUpdate.multi,
+ status.code());
+ uassertStatusOK(status);
+
+ performUpdates(txn, updateOp);
+}
+
+void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Message& m) {
+ auto deleteOp = parseLegacyDelete(m);
+ auto& singleDelete = deleteOp.deletes[0];
+ invariant(deleteOp.ns == nsString);
+
+ Status status = AuthorizationSession::get(txn->getClient())
+ ->checkAuthForDelete(nsString, singleDelete.query);
+ audit::logDeleteAuthzCheck(txn->getClient(), nsString, singleDelete.query, status.code());
+ uassertStatusOK(status);
+
+ performDeletes(txn, deleteOp);
+}
+
+bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, CurOp& curop) {
+ globalOpCounters.gotGetMore();
+ DbMessage d(m);
+
+ const char* ns = d.getns();
+ int ntoreturn = d.pullInt();
+ uassert(
+ 34419, str::stream() << "Invalid ntoreturn for OP_GET_MORE: " << ntoreturn, ntoreturn >= 0);
+ long long cursorid = d.pullInt64();
+
+ curop.debug().ntoreturn = ntoreturn;
+ curop.debug().cursorid = cursorid;
+
+ {
+ stdx::lock_guard<Client>(*txn->getClient());
+ CurOp::get(txn)->setNS_inlock(ns);
+ }
+
+ bool exhaust = false;
+ QueryResult::View msgdata = 0;
+ bool isCursorAuthorized = false;
+
+ try {
+ const NamespaceString nsString(ns);
+ uassert(ErrorCodes::InvalidNamespace,
+ str::stream() << "Invalid ns [" << ns << "]",
+ nsString.isValid());
+
+ Status status = AuthorizationSession::get(txn->getClient())
+ ->checkAuthForGetMore(nsString, cursorid, false);
+ audit::logGetMoreAuthzCheck(txn->getClient(), nsString, cursorid, status.code());
+ uassertStatusOK(status);
+
+ while (MONGO_FAIL_POINT(rsStopGetMore)) {
+ sleepmillis(0);
+ }
+
+ msgdata = getMore(txn, ns, ntoreturn, cursorid, &exhaust, &isCursorAuthorized);
+ } catch (AssertionException& e) {
+ if (isCursorAuthorized) {
+ // If a cursor with id 'cursorid' was authorized, it may have been advanced
+ // before an exception terminated processGetMore. Erase the ClientCursor
+ // because it may now be out of sync with the client's iteration state.
+ // SERVER-7952
+ // TODO Temporary code, see SERVER-4563 for a cleanup overview.
+ CursorManager::eraseCursorGlobal(txn, cursorid);
+ }
+
+ BSONObjBuilder err;
+ e.getInfo().append(err);
+ BSONObj errObj = err.done();
+
+ curop.debug().exceptionInfo = e.getInfo();
+
+ replyToQuery(ResultFlag_ErrSet, m, dbresponse, errObj);
+ curop.debug().responseLength = dbresponse.response.header().dataLen();
+ curop.debug().nreturned = 1;
+ return false;
+ }
+
+ dbresponse.response.setData(msgdata.view2ptr(), true);
+ curop.debug().responseLength = dbresponse.response.header().dataLen();
+ curop.debug().nreturned = msgdata.getNReturned();
+
+ dbresponse.responseToMsgId = m.header().getId();
+
+ if (exhaust) {
+ curop.debug().exhaust = true;
+ dbresponse.exhaustNS = ns;
+ }
+
+ return true;
+}
+
+} // namespace
+
// Mongod on win32 defines a value for this function. In all other executables it is NULL.
void (*reportEventToSystem)(const char* msg) = 0;
@@ -442,7 +552,9 @@ void assembleResponse(OperationContext* txn,
DbMessage dbmsg(m);
Client& c = *txn->getClient();
- if (!c.isInDirectClient()) {
+ if (c.isInDirectClient()) {
+ invariant(!txn->lockState()->inAWriteUnitOfWork());
+ } else {
LastError::get(c).startRequest();
AuthorizationSession::get(c)->startRequest(txn);
@@ -486,33 +598,6 @@ void assembleResponse(OperationContext* txn,
opwrite(m);
}
- // Increment op counters.
- switch (op) {
- case dbQuery:
- if (!isCommand) {
- globalOpCounters.gotQuery();
- } else {
- // Command counting is deferred, since it is not known yet whether the command
- // needs counting.
- }
- break;
- case dbGetMore:
- globalOpCounters.gotGetMore();
- break;
- case dbInsert:
- // Insert counting is deferred, since it is not known yet whether the insert contains
- // multiple documents (each of which needs to be counted).
- break;
- case dbUpdate:
- globalOpCounters.gotUpdate();
- break;
- case dbDelete:
- globalOpCounters.gotDelete();
- break;
- default:
- break;
- }
-
CurOp& currentOp = *CurOp::get(txn);
{
stdx::lock_guard<Client> lk(*txn->getClient());
@@ -553,10 +638,8 @@ void assembleResponse(OperationContext* txn,
dbresponse.responseToMsgId = m.header().getId();
} else {
+ // The remaining operations do not return any response. They are fire-and-forget.
try {
- // The following operations all require authorization.
- // dbInsert, dbUpdate and dbDelete can be easily pre-authorized,
- // here, but dbKillCursors cannot.
if (op == dbKillCursors) {
currentOp.ensureStarted();
logThreshold = 10;
@@ -579,11 +662,11 @@ void assembleResponse(OperationContext* txn,
if (!nsString.isValid()) {
uassert(16257, str::stream() << "Invalid ns [" << ns << "]", false);
} else if (op == dbInsert) {
- receivedInsert(txn, nsString, m, currentOp);
+ receivedInsert(txn, nsString, m);
} else if (op == dbUpdate) {
- receivedUpdate(txn, nsString, m, currentOp);
+ receivedUpdate(txn, nsString, m);
} else if (op == dbDelete) {
- receivedDelete(txn, nsString, m, currentOp);
+ receivedDelete(txn, nsString, m);
} else {
invariant(false);
}
@@ -630,592 +713,6 @@ void assembleResponse(OperationContext* txn,
recordCurOpMetrics(txn);
}
-void receivedKillCursors(OperationContext* txn, Message& m) {
- LastError::get(txn->getClient()).disable();
- DbMessage dbmessage(m);
- int n = dbmessage.pullInt();
-
- uassert(13659, "sent 0 cursors to kill", n != 0);
- massert(13658,
- str::stream() << "bad kill cursors size: " << m.dataSize(),
- m.dataSize() == 8 + (8 * n));
- uassert(13004, str::stream() << "sent negative cursors to kill: " << n, n >= 1);
-
- if (n > 2000) {
- (n < 30000 ? warning() : error()) << "receivedKillCursors, n=" << n << endl;
- verify(n < 30000);
- }
-
- const char* cursorArray = dbmessage.getArray(n);
-
- int found = CursorManager::eraseCursorGlobalIfAuthorized(txn, n, cursorArray);
-
- if (shouldLog(logger::LogSeverity::Debug(1)) || found != n) {
- LOG(found == n ? 1 : 0) << "killcursors: found " << found << " of " << n << endl;
- }
-}
-
-void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) {
- uassertStatusOK(userAllowedWriteNS(nsString));
- auto client = txn->getClient();
- auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp();
- ScopeGuard lastOpSetterGuard = MakeObjGuard(repl::ReplClientInfo::forClient(client),
- &repl::ReplClientInfo::setLastOpToSystemLastOpTime,
- txn);
-
- auto updateOp = parseLegacyUpdate(m);
- auto& singleUpdate = updateOp.updates[0];
-
- uassert(10055, "update object too large", singleUpdate.update.objsize() <= BSONObjMaxUserSize);
-
- op.debug().query = singleUpdate.query;
- {
- stdx::lock_guard<Client> lk(*client);
- op.setNS_inlock(nsString.ns());
- op.setQuery_inlock(singleUpdate.query);
- }
-
- Status status = AuthorizationSession::get(client)->checkAuthForUpdate(
- nsString, singleUpdate.query, singleUpdate.update, singleUpdate.upsert);
- audit::logUpdateAuthzCheck(client,
- nsString,
- singleUpdate.query,
- singleUpdate.update,
- singleUpdate.upsert,
- singleUpdate.multi,
- status.code());
- uassertStatusOK(status);
-
- UpdateRequest request(nsString);
- request.setUpsert(singleUpdate.upsert);
- request.setMulti(singleUpdate.multi);
- request.setQuery(singleUpdate.query);
- request.setUpdates(singleUpdate.update);
- UpdateLifecycleImpl updateLifecycle(nsString);
- request.setLifecycle(&updateLifecycle);
-
- request.setYieldPolicy(PlanExecutor::YIELD_AUTO);
-
- int attempt = 1;
- while (1) {
- try {
- ParsedUpdate parsedUpdate(txn, &request);
- uassertStatusOK(parsedUpdate.parseRequest());
-
- // Tentatively take an intent lock, fix up if we need to create the collection
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX);
- if (dbHolder().get(txn, nsString.db()) == NULL) {
- // If DB doesn't exist, don't implicitly create it in OldClientContext
- break;
- }
- Lock::CollectionLock collLock(
- txn->lockState(), nsString.ns(), parsedUpdate.isIsolated() ? MODE_X : MODE_IX);
- OldClientContext ctx(txn, nsString.ns());
-
- auto collection = ctx.db()->getCollection(nsString);
-
- // The common case: no implicit collection creation
- if (!singleUpdate.upsert || collection != NULL) {
- unique_ptr<PlanExecutor> exec =
- uassertStatusOK(getExecutorUpdate(txn, &op.debug(), collection, &parsedUpdate));
-
- // Run the plan and get stats out.
- uassertStatusOK(exec->executePlan());
-
- PlanSummaryStats summary;
- Explain::getSummaryStats(*exec, &summary);
- if (collection) {
- collection->infoCache()->notifyOfQuery(txn, summary.indexesUsed);
- }
-
- const UpdateStats* updateStats = UpdateStage::getUpdateStats(exec.get());
- UpdateStage::recordUpdateStatsInOpDebug(updateStats, &op.debug());
- op.debug().setPlanSummaryMetrics(summary);
-
- UpdateResult res = UpdateStage::makeUpdateResult(updateStats);
-
- // for getlasterror
- size_t nMatchedOrInserted = res.upserted.isEmpty() ? res.numMatched : 1U;
- LastError::get(client).recordUpdate(res.existing, nMatchedOrInserted, res.upserted);
-
- if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) {
- // If this operation has already generated a new lastOp, don't bother setting it
- // here. No-op updates will not generate a new lastOp, so we still need the
- // guard to fire in that case.
- lastOpSetterGuard.Dismiss();
- }
- return;
- }
- break;
- } catch (const WriteConflictException& dle) {
- op.debug().writeConflicts++;
- if (singleUpdate.multi) {
- log(LogComponent::kWrite) << "Had WriteConflict during multi update, aborting";
- throw;
- }
- WriteConflictException::logAndBackoff(attempt++, "update", nsString.toString());
- }
- }
-
- // This is an upsert into a non-existing database, so need an exclusive lock
- // to avoid deadlock
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ParsedUpdate parsedUpdate(txn, &request);
- uassertStatusOK(parsedUpdate.parseRequest());
-
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_X);
- OldClientContext ctx(txn, nsString.ns());
- uassert(ErrorCodes::NotMaster,
- str::stream() << "Not primary while performing update on " << nsString.ns(),
- repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsString));
-
- Database* db = ctx.db();
- if (db->getCollection(nsString)) {
- // someone else beat us to it, that's ok
- // we might race while we unlock if someone drops
- // but that's ok, we'll just do nothing and error out
- } else {
- WriteUnitOfWork wuow(txn);
- uassertStatusOK(userCreateNS(txn, db, nsString.ns(), BSONObj()));
- wuow.commit();
- }
-
- auto collection = ctx.db()->getCollection(nsString);
- invariant(collection);
- unique_ptr<PlanExecutor> exec =
- uassertStatusOK(getExecutorUpdate(txn, &op.debug(), collection, &parsedUpdate));
-
- // Run the plan and get stats out.
- uassertStatusOK(exec->executePlan());
-
- PlanSummaryStats summary;
- Explain::getSummaryStats(*exec, &summary);
- collection->infoCache()->notifyOfQuery(txn, summary.indexesUsed);
-
- const UpdateStats* updateStats = UpdateStage::getUpdateStats(exec.get());
- UpdateStage::recordUpdateStatsInOpDebug(updateStats, &op.debug());
- op.debug().setPlanSummaryMetrics(summary);
-
- UpdateResult res = UpdateStage::makeUpdateResult(updateStats);
-
- size_t nMatchedOrInserted = res.upserted.isEmpty() ? res.numMatched : 1U;
- LastError::get(client).recordUpdate(res.existing, nMatchedOrInserted, res.upserted);
-
- if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) {
- // If this operation has already generated a new lastOp, don't bother setting it
- // here. No-op updates will not generate a new lastOp, so we still need the
- // guard to fire in that case.
- lastOpSetterGuard.Dismiss();
- }
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "update", nsString.ns());
-}
-
-void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) {
- uassertStatusOK(userAllowedWriteNS(nsString));
-
- auto deleteOp = parseLegacyDelete(m);
- auto& singleDelete = deleteOp.deletes[0];
-
- auto client = txn->getClient();
- auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp();
- ScopeGuard lastOpSetterGuard = MakeObjGuard(repl::ReplClientInfo::forClient(client),
- &repl::ReplClientInfo::setLastOpToSystemLastOpTime,
- txn);
-
- op.debug().query = singleDelete.query;
- {
- stdx::lock_guard<Client> lk(*client);
- op.setQuery_inlock(singleDelete.query);
- op.setNS_inlock(nsString.ns());
- }
-
- Status status =
- AuthorizationSession::get(client)->checkAuthForDelete(nsString, singleDelete.query);
- audit::logDeleteAuthzCheck(client, nsString, singleDelete.query, status.code());
- uassertStatusOK(status);
-
- DeleteRequest request(nsString);
- request.setQuery(singleDelete.query);
- request.setMulti(singleDelete.multi);
-
- request.setYieldPolicy(PlanExecutor::YIELD_AUTO);
-
- int attempt = 1;
- while (1) {
- try {
- ParsedDelete parsedDelete(txn, &request);
- uassertStatusOK(parsedDelete.parseRequest());
-
- ScopedTransaction scopedXact(txn, MODE_IX);
- AutoGetDb autoDb(txn, nsString.db(), MODE_IX);
- if (!autoDb.getDb()) {
- break;
- }
-
- Lock::CollectionLock collLock(
- txn->lockState(), nsString.ns(), parsedDelete.isIsolated() ? MODE_X : MODE_IX);
- OldClientContext ctx(txn, nsString.ns());
-
- auto collection = ctx.db()->getCollection(nsString);
-
- unique_ptr<PlanExecutor> exec =
- uassertStatusOK(getExecutorDelete(txn, &op.debug(), collection, &parsedDelete));
-
- // Run the plan and get the number of docs deleted.
- uassertStatusOK(exec->executePlan());
- long long n = DeleteStage::getNumDeleted(*exec);
- LastError::get(client).recordDelete(n);
- op.debug().ndeleted = n;
-
- PlanSummaryStats summary;
- Explain::getSummaryStats(*exec, &summary);
- if (collection) {
- collection->infoCache()->notifyOfQuery(txn, summary.indexesUsed);
- }
- CurOp::get(txn)->debug().setPlanSummaryMetrics(summary);
-
- if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) {
- // If this operation has already generated a new lastOp, don't bother setting it
- // here. No-op updates will not generate a new lastOp, so we still need the
- // guard to fire in that case.
- lastOpSetterGuard.Dismiss();
- }
- break;
- } catch (const WriteConflictException& dle) {
- op.debug().writeConflicts++;
- WriteConflictException::logAndBackoff(attempt++, "delete", nsString.toString());
- }
- }
-}
-
-bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, CurOp& curop) {
- DbMessage d(m);
-
- const char* ns = d.getns();
- int ntoreturn = d.pullInt();
- uassert(
- 34419, str::stream() << "Invalid ntoreturn for OP_GET_MORE: " << ntoreturn, ntoreturn >= 0);
- long long cursorid = d.pullInt64();
-
- curop.debug().ntoreturn = ntoreturn;
- curop.debug().cursorid = cursorid;
-
- {
- stdx::lock_guard<Client>(*txn->getClient());
- CurOp::get(txn)->setNS_inlock(ns);
- }
-
- bool exhaust = false;
- QueryResult::View msgdata = 0;
- bool isCursorAuthorized = false;
-
- try {
- const NamespaceString nsString(ns);
- uassert(ErrorCodes::InvalidNamespace,
- str::stream() << "Invalid ns [" << ns << "]",
- nsString.isValid());
-
- Status status = AuthorizationSession::get(txn->getClient())
- ->checkAuthForGetMore(nsString, cursorid, false);
- audit::logGetMoreAuthzCheck(txn->getClient(), nsString, cursorid, status.code());
- uassertStatusOK(status);
-
- while (MONGO_FAIL_POINT(rsStopGetMore)) {
- sleepmillis(0);
- }
-
- msgdata = getMore(txn, ns, ntoreturn, cursorid, &exhaust, &isCursorAuthorized);
- } catch (AssertionException& e) {
- if (isCursorAuthorized) {
- // If a cursor with id 'cursorid' was authorized, it may have been advanced
- // before an exception terminated processGetMore. Erase the ClientCursor
- // because it may now be out of sync with the client's iteration state.
- // SERVER-7952
- // TODO Temporary code, see SERVER-4563 for a cleanup overview.
- CursorManager::eraseCursorGlobal(txn, cursorid);
- }
-
- BSONObjBuilder err;
- e.getInfo().append(err);
- BSONObj errObj = err.done();
-
- curop.debug().exceptionInfo = e.getInfo();
-
- replyToQuery(ResultFlag_ErrSet, m, dbresponse, errObj);
- curop.debug().responseLength = dbresponse.response.header().dataLen();
- curop.debug().nreturned = 1;
- return false;
- }
-
- dbresponse.response.setData(msgdata.view2ptr(), true);
- curop.debug().responseLength = dbresponse.response.header().dataLen();
- curop.debug().nreturned = msgdata.getNReturned();
-
- dbresponse.responseToMsgId = m.header().getId();
-
- if (exhaust) {
- curop.debug().exhaust = true;
- dbresponse.exhaustNS = ns;
- }
-
- return true;
-}
-
-void insertMultiSingletons(OperationContext* txn,
- OldClientContext& ctx,
- bool keepGoing,
- StringData ns,
- CurOp& op,
- vector<BSONObj>::iterator begin,
- vector<BSONObj>::iterator end) {
- for (vector<BSONObj>::iterator it = begin; it != end; it++) {
- try {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- WriteUnitOfWork wouw(txn);
- Collection* collection = ctx.db()->getCollection(ns);
- if (!collection) {
- collection = ctx.db()->createCollection(txn, ns);
- invariant(collection);
- }
-
- uassertStatusOK(collection->insertDocument(txn, *it, &op.debug(), true));
- wouw.commit();
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "insert", ns);
-
- globalOpCounters.incInsertInWriteLock(1);
- op.debug().ninserted++;
-
- } catch (const UserException& ex) {
- if (!keepGoing)
- throw;
- LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg);
- }
- }
-}
-
-void insertMultiVector(OperationContext* txn,
- OldClientContext& ctx,
- bool keepGoing,
- StringData ns,
- CurOp& op,
- vector<BSONObj>::iterator begin,
- vector<BSONObj>::iterator end) {
- try {
- WriteUnitOfWork wunit(txn);
- Collection* collection = ctx.db()->getCollection(ns);
- if (!collection) {
- collection = ctx.db()->createCollection(txn, ns);
- invariant(collection);
- }
-
- uassertStatusOK(collection->insertDocuments(txn, begin, end, &op.debug(), true, false));
- wunit.commit();
-
- int inserted = end - begin;
- globalOpCounters.incInsertInWriteLock(inserted);
- op.debug().ninserted = inserted;
- } catch (UserException&) {
- txn->recoveryUnit()->abandonSnapshot();
- insertMultiSingletons(txn, ctx, keepGoing, ns, op, begin, end);
- } catch (WriteConflictException&) {
- CurOp::get(txn)->debug().writeConflicts++;
- txn->recoveryUnit()->abandonSnapshot();
- WriteConflictException::logAndBackoff(0, "insert", ns);
- insertMultiSingletons(txn, ctx, keepGoing, ns, op, begin, end);
- }
-}
-
-NOINLINE_DECL void insertMulti(OperationContext* txn,
- OldClientContext& ctx,
- const InsertOp& insertOp,
- CurOp& op) {
- std::vector<BSONObj> docs;
- docs.reserve(insertOp.documents.size());
- for (auto&& doc : insertOp.documents) {
- // TODO don't fail yet on invalid documents. They should be treated like other errors.
- BSONObj fixed = uassertStatusOK(fixDocumentForInsert(doc));
- docs.push_back(fixed.isEmpty() ? doc : std::move(fixed));
- }
-
- vector<BSONObj>::iterator chunkBegin = docs.begin();
- int64_t chunkCount = 0;
- int64_t chunkSize = 0;
-
- auto client = txn->getClient();
- auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp();
- ScopeGuard lastOpSetterGuard = MakeObjGuard(repl::ReplClientInfo::forClient(client),
- &repl::ReplClientInfo::setLastOpToSystemLastOpTime,
- txn);
-
- for (vector<BSONObj>::iterator it = docs.begin(); it != docs.end(); it++) {
- chunkSize += (*it).objsize();
- // Limit chunk size, actual size chosen is a tradeoff: larger sizes are more efficient,
- // but smaller chunk sizes allow yielding to other threads and lower chance of WCEs
- if ((++chunkCount >= internalQueryExecYieldIterations / 2) ||
- (chunkSize >= insertVectorMaxBytes)) {
- if (it == chunkBegin) // there is only one doc to process, so avoid retry on failure
- insertMultiSingletons(
- txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, it + 1);
- else
- insertMultiVector(
- txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, it + 1);
- chunkBegin = it + 1;
- chunkCount = 0;
- chunkSize = 0;
- }
- }
- if (chunkBegin != docs.end())
- insertMultiVector(
- txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, docs.end());
-
- if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) {
- // If this operation has already generated a new lastOp, don't bother setting it
- // here. No-op inserts will not generate a new lastOp, so we still need the
- // guard to fire in that case.
- lastOpSetterGuard.Dismiss();
- }
-}
-
-static void convertSystemIndexInsertsToCommands(DbMessage& d, BSONArrayBuilder* allCmdsBuilder) {
- while (d.moreJSObjs()) {
- BSONObj spec = d.nextJsObj();
- BSONElement indexNsElement = spec["ns"];
- uassert(ErrorCodes::NoSuchKey,
- str::stream() << "Missing \"ns\" field while inserting into " << d.getns(),
- !indexNsElement.eoo());
- uassert(ErrorCodes::TypeMismatch,
- str::stream() << "Expected \"ns\" field to have type String, not "
- << typeName(indexNsElement.type()) << " while inserting into "
- << d.getns(),
- indexNsElement.type() == String);
- const StringData nsToIndex(indexNsElement.valueStringData());
- BSONObjBuilder cmdObjBuilder(allCmdsBuilder->subobjStart());
- cmdObjBuilder << "createIndexes" << nsToCollectionSubstring(nsToIndex);
- BSONArrayBuilder specArrayBuilder(cmdObjBuilder.subarrayStart("indexes"));
- while (true) {
- BSONObjBuilder specBuilder(specArrayBuilder.subobjStart());
- BSONElement specNsElement = spec["ns"];
- if ((specNsElement.type() != String) ||
- (specNsElement.valueStringData() != nsToIndex)) {
- break;
- }
- for (BSONObjIterator iter(spec); iter.more();) {
- BSONElement element = iter.next();
- if (element.fieldNameStringData() != "ns") {
- specBuilder.append(element);
- }
- }
- if (!d.moreJSObjs()) {
- break;
- }
- spec = d.nextJsObj();
- }
- }
-}
-
-static void insertSystemIndexes(OperationContext* txn, DbMessage& d, CurOp& curOp) {
- BSONArrayBuilder allCmdsBuilder;
- try {
- convertSystemIndexInsertsToCommands(d, &allCmdsBuilder);
- } catch (const DBException& ex) {
- LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg);
- curOp.debug().exceptionInfo = ex.getInfo();
- return;
- }
- BSONArray allCmds(allCmdsBuilder.done());
- Command* createIndexesCmd = Command::findCommand("createIndexes");
- invariant(createIndexesCmd);
- const bool keepGoing = d.reservedField() & InsertOption_ContinueOnError;
- for (BSONObjIterator iter(allCmds); iter.more();) {
- try {
- BSONObj cmdObj = iter.next().Obj();
-
- rpc::LegacyRequestBuilder requestBuilder{};
- auto indexNs = NamespaceString(d.getns());
- auto cmdRequestMsg = requestBuilder.setDatabase(indexNs.db())
- .setCommandName("createIndexes")
- .setCommandArgs(cmdObj)
- .setMetadata(rpc::makeEmptyMetadata())
- .done();
- rpc::LegacyRequest cmdRequest{&cmdRequestMsg};
- rpc::LegacyReplyBuilder cmdReplyBuilder{};
- Command::execCommand(txn, createIndexesCmd, cmdRequest, &cmdReplyBuilder);
- auto cmdReplyMsg = cmdReplyBuilder.done();
- rpc::LegacyReply cmdReply{&cmdReplyMsg};
- uassertStatusOK(getStatusFromCommandResult(cmdReply.getCommandReply()));
- } catch (const DBException& ex) {
- LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg);
- curOp.debug().exceptionInfo = ex.getInfo();
- if (!keepGoing) {
- return;
- }
- }
- }
-}
-
-bool _receivedInsert(OperationContext* txn,
- const NamespaceString& nsString,
- const InsertOp& insertOp,
- CurOp& op,
- bool checkCollection) {
- // CONCURRENCY TODO: is being read locked in big log sufficient here?
- // writelock is used to synchronize stepdowns w/ writes
- uassert(
- 10058, "not master", repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsString));
-
- OldClientContext ctx(txn, insertOp.ns.ns());
- if (checkCollection && !ctx.db()->getCollection(nsString))
- return false;
- insertMulti(txn, ctx, insertOp, op);
- return true;
-}
-
-void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) {
- {
- stdx::lock_guard<Client>(*txn->getClient());
- CurOp::get(txn)->setNS_inlock(nsString.ns());
- }
-
- uassertStatusOK(userAllowedWriteNS(nsString.ns()));
- if (nsString.isSystemDotIndexes()) {
- DbMessage d(m);
- insertSystemIndexes(txn, d, op);
- return;
- }
-
- auto insertOp = parseLegacyInsert(m);
-
- for (const auto& obj : insertOp.documents) {
- // Check auth for insert.
- Status status =
- AuthorizationSession::get(txn->getClient())->checkAuthForInsert(nsString, obj);
- audit::logInsertAuthzCheck(txn->getClient(), nsString, obj, status.code());
- uassertStatusOK(status);
- }
-
- {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX);
- Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX);
-
- // OldClientContext may implicitly create a database, so check existence
- if (dbHolder().get(txn, nsString.db()) != NULL) {
- if (_receivedInsert(txn, nsString, insertOp, op, true))
- return;
- }
- }
-
- // Collection didn't exist so try again with MODE_X
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_X);
-
- _receivedInsert(txn, nsString, insertOp, op, false);
-}
-
// ----- BEGIN Diaglog -----
DiagLog::DiagLog() : f(0), level(0) {}
diff --git a/src/mongo/db/lasterror.cpp b/src/mongo/db/lasterror.cpp
index 624329c4ddd..60d0e59acc3 100644
--- a/src/mongo/db/lasterror.cpp
+++ b/src/mongo/db/lasterror.cpp
@@ -54,11 +54,6 @@ void LastError::setLastError(int code, std::string msg) {
_msg = std::move(msg);
}
-void LastError::recordInsert(long long nObjects) {
- reset(true);
- _nObjects = nObjects;
-}
-
void LastError::recordUpdate(bool updateObjects, long long nObjects, BSONObj upsertedId) {
reset(true);
_nObjects = nObjects;
diff --git a/src/mongo/db/lasterror.h b/src/mongo/db/lasterror.h
index e6f65e4aed7..27ce978d2b6 100644
--- a/src/mongo/db/lasterror.h
+++ b/src/mongo/db/lasterror.h
@@ -65,8 +65,6 @@ public:
*/
void setLastError(int code, std::string msg);
- void recordInsert(long long nObjects);
-
void recordUpdate(bool updateObjects, long long nObjects, BSONObj upsertedId);
void recordDelete(long long nDeleted);
@@ -84,7 +82,7 @@ public:
bool isValid() const {
return _valid;
}
- int const getNPrev() const {
+ int getNPrev() const {
return _nPrev;
}
diff --git a/src/mongo/db/ops/write_ops.h b/src/mongo/db/ops/write_ops.h
index 267693bc559..24cfe22c54b 100644
--- a/src/mongo/db/ops/write_ops.h
+++ b/src/mongo/db/ops/write_ops.h
@@ -44,7 +44,6 @@ namespace mongo {
*/
struct ParsedWriteOp {
NamespaceString ns;
- boost::optional<BSONObj> writeConcern;
bool bypassDocumentValidation = false;
bool continueOnError = false;
};
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
new file mode 100644
index 00000000000..a995d1b25eb
--- /dev/null
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -0,0 +1,630 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kWrite
+
+#include "mongo/platform/basic.h"
+
+#include <memory>
+
+#include "mongo/base/checked_cast.h"
+#include "mongo/db/audit.h"
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/database_holder.h"
+#include "mongo/db/catalog/document_validation.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/curop_metrics.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/exec/delete.h"
+#include "mongo/db/exec/update.h"
+#include "mongo/db/introspect.h"
+#include "mongo/db/lasterror.h"
+#include "mongo/db/ops/delete_request.h"
+#include "mongo/db/ops/insert.h"
+#include "mongo/db/ops/parsed_delete.h"
+#include "mongo/db/ops/parsed_update.h"
+#include "mongo/db/ops/update_lifecycle_impl.h"
+#include "mongo/db/ops/update_request.h"
+#include "mongo/db/ops/write_ops_exec.h"
+#include "mongo/db/query/get_executor.h"
+#include "mongo/db/query/plan_summary_stats.h"
+#include "mongo/db/query/query_knobs.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/stats/counters.h"
+#include "mongo/db/stats/top.h"
+#include "mongo/db/write_concern.h"
+#include "mongo/rpc/command_reply.h"
+#include "mongo/rpc/command_reply_builder.h"
+#include "mongo/rpc/command_request.h"
+#include "mongo/rpc/command_request_builder.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/stale_exception.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+
+// Convention in this file: generic helpers go in the anonymous namespace. Helpers that are for a
+// single type of operation are static functions defined above their caller.
+namespace {
+
+void finishCurOp(OperationContext* txn, CurOp* curOp) {
+ try {
+ curOp->done();
+ int executionTimeMs = curOp->totalTimeMillis();
+ curOp->debug().executionTime = executionTimeMs;
+
+ recordCurOpMetrics(txn);
+ Top::get(txn->getServiceContext())
+ .record(curOp->getNS(),
+ curOp->getLogicalOp(),
+ 1, // "write locked"
+ curOp->totalTimeMicros(),
+ curOp->isCommand());
+
+ if (!curOp->debug().exceptionInfo.empty()) {
+ LOG(3) << "Caught Assertion in " << logicalOpToString(curOp->getLogicalOp()) << ": "
+ << curOp->debug().exceptionInfo.toString();
+ }
+
+ const bool logAll = logger::globalLogDomain()->shouldLog(logger::LogComponent::kCommand,
+ logger::LogSeverity::Debug(1));
+ const bool logSlow =
+ executionTimeMs > (serverGlobalParams.slowMS + curOp->getExpectedLatencyMs());
+
+ if (logAll || logSlow) {
+ Locker::LockerInfo lockerInfo;
+ txn->lockState()->getLockerInfo(&lockerInfo);
+ log() << curOp->debug().report(*curOp, lockerInfo.stats);
+ }
+
+ if (curOp->shouldDBProfile(executionTimeMs)) {
+ profile(txn, CurOp::get(txn)->getNetworkOp());
+ }
+ } catch (const DBException& ex) {
+ // We need to ignore all errors here. We don't want a successful op to fail because of a
+ // failure to record stats. We also don't want to replace the error reported for an op that
+ // is failing.
+ log() << "Ignoring error from finishCurOp: " << ex.toString();
+ }
+}
+
+/**
+ * Sets the Client's LastOp to the system OpTime if needed.
+ */
+class LastOpFixer {
+public:
+ LastOpFixer(OperationContext* txn, const NamespaceString& ns)
+ : _txn(txn), _isOnLocalDb(ns.isLocal()) {}
+
+ ~LastOpFixer() {
+ if (_needToFixLastOp && !_isOnLocalDb) {
+ // If this operation has already generated a new lastOp, don't bother setting it
+ // here. No-op updates will not generate a new lastOp, so we still need the
+ // guard to fire in that case. Operations on the local DB aren't replicated, so they
+ // don't need to bump the lastOp.
+ replClientInfo().setLastOpToSystemLastOpTime(_txn);
+ }
+ }
+
+ void startingOp() {
+ _needToFixLastOp = true;
+ _opTimeAtLastOpStart = replClientInfo().getLastOp();
+ }
+
+ void finishedOpSuccessfully() {
+ // If the op was succesful and bumped LastOp, we don't need to do it again. However, we
+ // still need to for no-ops and all failing ops.
+ _needToFixLastOp = (replClientInfo().getLastOp() == _opTimeAtLastOpStart);
+ }
+
+private:
+ repl::ReplClientInfo& replClientInfo() {
+ return repl::ReplClientInfo::forClient(_txn->getClient());
+ }
+
+ OperationContext* const _txn;
+ bool _needToFixLastOp = true;
+ const bool _isOnLocalDb;
+ repl::OpTime _opTimeAtLastOpStart;
+};
+
+void assertCanWrite_inlock(OperationContext* txn, const NamespaceString& ns) {
+ uassert(ErrorCodes::NotMaster,
+ str::stream() << "Not primary while writing to " << ns.ns(),
+ repl::ReplicationCoordinator::get(txn->getServiceContext())->canAcceptWritesFor(ns));
+ CollectionShardingState::get(txn, ns)->checkShardVersionOrThrow(txn);
+}
+
+void makeCollection(OperationContext* txn, const NamespaceString& ns) {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ AutoGetOrCreateDb db(txn, ns.db(), MODE_X);
+ assertCanWrite_inlock(txn, ns);
+ if (!db.getDb()->getCollection(ns.ns())) { // someone else may have beat us to it.
+ WriteUnitOfWork wuow(txn);
+ uassertStatusOK(userCreateNS(txn, db.getDb(), ns.ns(), BSONObj()));
+ wuow.commit();
+ }
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "implicit collection creation", ns.ns());
+}
+
+/**
+ * Returns true if the operation can continue.
+ */
+bool handleError(OperationContext* txn,
+ const DBException& ex,
+ const ParsedWriteOp& wholeOp,
+ WriteResult* out) {
+ LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg);
+ auto& curOp = *CurOp::get(txn);
+ curOp.debug().exceptionInfo = ex.getInfo();
+
+ if (ErrorCodes::isInterruption(ErrorCodes::Error(ex.getCode()))) {
+ throw; // These have always failed the whole batch.
+ }
+
+ if (ErrorCodes::isStaleShardingError(ErrorCodes::Error(ex.getCode()))) {
+ auto staleConfigException = dynamic_cast<const SendStaleConfigException*>(&ex);
+ if (!staleConfigException) {
+ // We need to get extra info off of the SCE, but some common patterns can result in the
+ // exception being converted to a Status then rethrown as a UserException, losing the
+ // info we need. It would be a bug if this happens so we want to detect it in testing,
+ // but it isn't severe enough that we should bring down the server if it happens in
+ // production.
+ dassert(staleConfigException);
+ msgassertedNoTrace(35475,
+ str::stream()
+ << "Got a StaleConfig error but exception was the wrong type: "
+ << demangleName(typeid(ex)));
+ }
+
+ ShardingState::get(txn)
+ ->onStaleShardVersion(txn, wholeOp.ns, staleConfigException->getVersionReceived());
+ out->staleConfigException =
+ stdx::make_unique<SendStaleConfigException>(*staleConfigException);
+ return false;
+ }
+
+ out->results.emplace_back(ex.toStatus());
+ return wholeOp.continueOnError;
+}
+
+} // namespace
+
+static WriteResult::SingleResult createIndex(OperationContext* txn,
+ const NamespaceString& systemIndexes,
+ const BSONObj& spec) {
+ BSONElement nsElement = spec["ns"];
+ uassert(ErrorCodes::NoSuchKey, "Missing \"ns\" field in index description", !nsElement.eoo());
+ uassert(ErrorCodes::TypeMismatch,
+ str::stream() << "Expected \"ns\" field of index description to be a "
+ "string, "
+ "but found a " << typeName(nsElement.type()),
+ nsElement.type() == String);
+ const NamespaceString ns(nsElement.valueStringData());
+ uassert(ErrorCodes::InvalidOptions,
+ str::stream() << "Cannot create an index on " << ns.ns() << " with an insert to "
+ << systemIndexes.ns(),
+ ns.db() == systemIndexes.db());
+
+ BSONObjBuilder cmdBuilder;
+ cmdBuilder << "createIndexes" << ns.coll();
+ cmdBuilder << "indexes" << BSON_ARRAY(spec);
+ BSONObj cmd = cmdBuilder.done();
+
+ rpc::CommandRequestBuilder requestBuilder;
+ auto cmdRequestMsg = requestBuilder.setDatabase(ns.db())
+ .setCommandName("createIndexes")
+ .setCommandArgs(cmd)
+ .setMetadata(rpc::makeEmptyMetadata())
+ .done();
+ rpc::CommandRequest cmdRequest(&cmdRequestMsg);
+ rpc::CommandReplyBuilder cmdReplyBuilder;
+ Command::findCommand("createIndexes")->run(txn, cmdRequest, &cmdReplyBuilder);
+ auto cmdReplyMsg = cmdReplyBuilder.done();
+ rpc::CommandReply cmdReply(&cmdReplyMsg);
+ auto cmdResult = cmdReply.getCommandReply();
+ uassertStatusOK(getStatusFromCommandResult(cmdResult));
+
+ // Unlike normal inserts, it is not an error to "insert" a duplicate index.
+ long long n =
+ cmdResult["numIndexesAfter"].numberInt() - cmdResult["numIndexesBefore"].numberInt();
+ CurOp::get(txn)->debug().ninserted += n;
+
+ return {n};
+}
+
+static WriteResult performCreateIndexes(OperationContext* txn, const InsertOp& wholeOp) {
+ // Currently this creates each index independently. We could pass multiple indexes to
+ // createIndexes, but there is a lot of complexity involved in doing it correctly. For one
+ // thing, createIndexes only takes indexes to a single collection, but this batch could include
+ // different collections. Additionally, the error handling is different: createIndexes is
+ // all-or-nothing while inserts are supposed to behave like a sequence that either skips over
+ // errors or stops at the first one. These could theoretically be worked around, but it doesn't
+ // seem worth it since users that want faster index builds should just use the createIndexes
+ // command rather than a legacy emulation.
+ LastOpFixer lastOpFixer(txn, wholeOp.ns);
+
+ // Creating an index can change the writeConcern. Make sure we set it back to what it was.
+ const auto oldWC = txn->getWriteConcern();
+ ON_BLOCK_EXIT([&] { txn->setWriteConcern(oldWC); });
+
+ WriteResult out;
+ for (auto&& spec : wholeOp.documents) {
+ try {
+ lastOpFixer.startingOp();
+ out.results.emplace_back(createIndex(txn, wholeOp.ns, spec));
+ lastOpFixer.finishedOpSuccessfully();
+ } catch (const DBException& ex) {
+ const bool canContinue = handleError(txn, ex, wholeOp, &out);
+ if (!canContinue)
+ break;
+ }
+ }
+ return out;
+}
+
+static void insertDocuments(OperationContext* txn,
+ const NamespaceString& ns,
+ std::vector<BSONObj>::const_iterator begin,
+ std::vector<BSONObj>::const_iterator end) {
+ auto& curOp = *CurOp::get(txn);
+ boost::optional<AutoGetCollection> collection;
+ while (true) {
+ txn->checkForInterrupt();
+ collection.emplace(txn, ns, MODE_IX);
+ if (collection->getCollection())
+ break;
+
+ collection.reset(); // unlock.
+ makeCollection(txn, ns);
+ }
+
+ curOp.raiseDbProfileLevel(collection->getDb()->getProfilingLevel());
+ assertCanWrite_inlock(txn, ns);
+
+ // Intentionally not using a WRITE_CONFLICT_RETRY_LOOP. That is handled by the caller so it can
+ // react to oversized batches.
+ WriteUnitOfWork wuow(txn);
+ uassertStatusOK(collection->getCollection()->insertDocuments(
+ txn, begin, end, &curOp.debug(), /*enforceQuota*/ true));
+ wuow.commit();
+}
+
+/**
+ * Returns true if caller should try to insert more documents. Does nothing else if batch is empty.
+ */
+static bool insertBatchAndHandleErrors(OperationContext* txn,
+ const InsertOp& wholeOp,
+ const std::vector<BSONObj>& batch,
+ LastOpFixer* lastOpFixer,
+ WriteResult* out) {
+ auto& curOp = *CurOp::get(txn);
+ if (batch.size() > 1) {
+ // First try doing it all together. If all goes well, this is all we need to do.
+ try {
+ lastOpFixer->startingOp();
+ insertDocuments(txn, wholeOp.ns, batch.begin(), batch.end());
+ lastOpFixer->finishedOpSuccessfully();
+ globalOpCounters.gotInserts(batch.size());
+ std::fill_n(
+ std::back_inserter(out->results), batch.size(), WriteResult::SingleResult{1});
+ curOp.debug().ninserted += batch.size();
+ return true;
+ } catch (const DBException& ex) {
+ // Ignore this failure and behave as-if we never tried to do the combined batch insert.
+ // The loop below will handle reporting any non-transient errors.
+ }
+ }
+
+ // Try to insert the batch one-at-a-time. This path is executed both for singular batches, and
+ // for batches that failed all-at-once inserting.
+ for (auto it = batch.begin(); it != batch.end(); ++it) {
+ globalOpCounters.gotInsert();
+ try {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ lastOpFixer->startingOp();
+ insertDocuments(txn, wholeOp.ns, it, it + 1);
+ lastOpFixer->finishedOpSuccessfully();
+ out->results.emplace_back(WriteResult::SingleResult{1});
+ curOp.debug().ninserted++;
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "insert", wholeOp.ns.ns());
+ } catch (const DBException& ex) {
+ bool canContinue = handleError(txn, ex, wholeOp, out);
+ if (!canContinue)
+ return false;
+ }
+ }
+
+ return true;
+}
+
+WriteResult performInserts(OperationContext* txn, const InsertOp& wholeOp) {
+ invariant(!txn->lockState()->inAWriteUnitOfWork()); // Does own retries.
+ auto& curOp = *CurOp::get(txn);
+ ON_BLOCK_EXIT([&] {
+ // This is the only part of finishCurOp we need to do for inserts because they reuse the
+ // top-level curOp. The rest is handled by the top-level entrypoint.
+ curOp.done();
+ Top::get(txn->getServiceContext())
+ .record(wholeOp.ns.ns(),
+ LogicalOp::opInsert,
+ 1 /* write locked*/,
+ curOp.totalTimeMicros(),
+ curOp.isCommand());
+
+ });
+
+ {
+ stdx::lock_guard<Client>(*txn->getClient());
+ curOp.setNS_inlock(wholeOp.ns.ns());
+ curOp.setLogicalOp_inlock(LogicalOp::opInsert);
+ curOp.ensureStarted();
+ curOp.debug().ninserted = 0;
+ }
+
+ uassertStatusOK(userAllowedWriteNS(wholeOp.ns));
+
+ if (wholeOp.ns.isSystemDotIndexes()) {
+ return performCreateIndexes(txn, wholeOp);
+ }
+
+ DisableDocumentValidationIfTrue docValidationDisabler(txn, wholeOp.bypassDocumentValidation);
+ LastOpFixer lastOpFixer(txn, wholeOp.ns);
+
+ WriteResult out;
+ out.results.reserve(wholeOp.documents.size());
+
+ size_t bytesInBatch = 0;
+ std::vector<BSONObj> batch;
+ const size_t maxBatchSize = internalQueryExecYieldIterations / 2;
+ batch.reserve(std::min(wholeOp.documents.size(), maxBatchSize));
+
+ for (auto&& doc : wholeOp.documents) {
+ const bool isLastDoc = (&doc == &wholeOp.documents.back());
+ auto fixedDoc = fixDocumentForInsert(doc);
+ if (!fixedDoc.isOK()) {
+ // Handled after we insert anything in the batch to be sure we report errors in the
+ // correct order. In an ordered insert, if one of the docs ahead of us fails, we should
+ // behave as-if we never got to this document.
+ } else {
+ batch.push_back(fixedDoc.getValue().isEmpty() ? doc : std::move(fixedDoc.getValue()));
+ bytesInBatch += batch.back().objsize();
+ if (!isLastDoc && batch.size() < maxBatchSize && bytesInBatch < insertVectorMaxBytes)
+ continue; // Add more to batch before inserting.
+ }
+
+ bool canContinue = insertBatchAndHandleErrors(txn, wholeOp, batch, &lastOpFixer, &out);
+ batch.clear(); // We won't need the current batch any more.
+ bytesInBatch = 0;
+
+ if (canContinue && !fixedDoc.isOK()) {
+ globalOpCounters.gotInsert();
+ canContinue = handleError(
+ txn,
+ UserException(fixedDoc.getStatus().code(), fixedDoc.getStatus().reason()),
+ wholeOp,
+ &out);
+ }
+
+ if (!canContinue)
+ break;
+ }
+
+ return out;
+}
+
+static WriteResult::SingleResult performSingleUpdateOp(OperationContext* txn,
+ const NamespaceString& ns,
+ const UpdateOp::SingleUpdate& op) {
+ globalOpCounters.gotUpdate();
+ auto& curOp = *CurOp::get(txn);
+ {
+ stdx::lock_guard<Client> lk(*txn->getClient());
+ curOp.setNS_inlock(ns.ns());
+ curOp.setNetworkOp_inlock(dbUpdate);
+ curOp.setLogicalOp_inlock(LogicalOp::opUpdate);
+ curOp.setQuery_inlock(op.query);
+ curOp.ensureStarted();
+
+ curOp.debug().query = op.query;
+ }
+
+ UpdateLifecycleImpl updateLifecycle(ns);
+ UpdateRequest request(ns);
+ request.setLifecycle(&updateLifecycle);
+ request.setQuery(op.query);
+ request.setUpdates(op.update);
+ request.setMulti(op.multi);
+ request.setUpsert(op.upsert);
+ request.setYieldPolicy(PlanExecutor::YIELD_AUTO); // ParsedUpdate overrides this for $isolated.
+
+ ParsedUpdate parsedUpdate(txn, &request);
+ uassertStatusOK(parsedUpdate.parseRequest());
+
+ ScopedTransaction scopedXact(txn, MODE_IX);
+ boost::optional<AutoGetCollection> collection;
+ while (true) {
+ txn->checkForInterrupt();
+ collection.emplace(txn,
+ ns,
+ MODE_IX, // DB is always IX, even if collection is X.
+ parsedUpdate.isIsolated() ? MODE_X : MODE_IX);
+ if (collection->getCollection() || !op.upsert)
+ break;
+
+ collection.reset(); // unlock.
+ makeCollection(txn, ns);
+ }
+
+ if (collection->getDb()) {
+ curOp.raiseDbProfileLevel(collection->getDb()->getProfilingLevel());
+ }
+
+ assertCanWrite_inlock(txn, ns);
+
+ auto exec = uassertStatusOK(
+ getExecutorUpdate(txn, &curOp.debug(), collection->getCollection(), &parsedUpdate));
+ uassertStatusOK(exec->executePlan());
+
+ PlanSummaryStats summary;
+ Explain::getSummaryStats(*exec, &summary);
+ if (collection->getCollection()) {
+ collection->getCollection()->infoCache()->notifyOfQuery(txn, summary.indexesUsed);
+ }
+
+ const UpdateStats* updateStats = UpdateStage::getUpdateStats(exec.get());
+ UpdateStage::recordUpdateStatsInOpDebug(updateStats, &curOp.debug());
+ curOp.debug().setPlanSummaryMetrics(summary);
+ UpdateResult res = UpdateStage::makeUpdateResult(updateStats);
+
+ const bool didInsert = !res.upserted.isEmpty();
+ const long long nMatchedOrInserted = didInsert ? 1 : res.numMatched;
+ LastError::get(txn->getClient()).recordUpdate(res.existing, nMatchedOrInserted, res.upserted);
+
+ return {nMatchedOrInserted, res.numDocsModified, res.upserted};
+}
+
+WriteResult performUpdates(OperationContext* txn, const UpdateOp& wholeOp) {
+ invariant(!txn->lockState()->inAWriteUnitOfWork()); // Does own retries.
+ uassertStatusOK(userAllowedWriteNS(wholeOp.ns));
+
+ DisableDocumentValidationIfTrue docValidationDisabler(txn, wholeOp.bypassDocumentValidation);
+ LastOpFixer lastOpFixer(txn, wholeOp.ns);
+
+ WriteResult out;
+ out.results.reserve(wholeOp.updates.size());
+ for (auto&& singleOp : wholeOp.updates) {
+ // TODO: don't create nested CurOp for legacy writes.
+ CurOp curOp(txn);
+ ON_BLOCK_EXIT([&] { finishCurOp(txn, &curOp); });
+ try {
+ lastOpFixer.startingOp();
+ out.results.emplace_back(performSingleUpdateOp(txn, wholeOp.ns, singleOp));
+ lastOpFixer.finishedOpSuccessfully();
+ } catch (const DBException& ex) {
+ const bool canContinue = handleError(txn, ex, wholeOp, &out);
+ if (!canContinue)
+ break;
+ }
+ }
+
+ return out;
+}
+
+static WriteResult::SingleResult performSingleDeleteOp(OperationContext* txn,
+ const NamespaceString& ns,
+ const DeleteOp::SingleDelete& op) {
+ globalOpCounters.gotDelete();
+ auto& curOp = *CurOp::get(txn);
+ {
+ stdx::lock_guard<Client> lk(*txn->getClient());
+ curOp.setNS_inlock(ns.ns());
+ curOp.setNetworkOp_inlock(dbDelete);
+ curOp.setLogicalOp_inlock(LogicalOp::opDelete);
+ curOp.setQuery_inlock(op.query);
+ curOp.ensureStarted();
+
+ curOp.debug().query = op.query;
+ curOp.debug().ndeleted = 0;
+ }
+
+ txn->checkForInterrupt();
+
+ DeleteRequest request(ns);
+ request.setQuery(op.query);
+ request.setMulti(op.multi);
+ request.setYieldPolicy(PlanExecutor::YIELD_AUTO); // ParsedDelete overrides this for $isolated.
+
+ ParsedDelete parsedDelete(txn, &request);
+ uassertStatusOK(parsedDelete.parseRequest());
+
+ ScopedTransaction scopedXact(txn, MODE_IX);
+ AutoGetCollection collection(txn,
+ ns,
+ MODE_IX, // DB is always IX, even if collection is X.
+ parsedDelete.isIsolated() ? MODE_X : MODE_IX);
+ if (collection.getDb()) {
+ curOp.raiseDbProfileLevel(collection.getDb()->getProfilingLevel());
+ }
+
+ assertCanWrite_inlock(txn, ns);
+
+ auto exec = uassertStatusOK(
+ getExecutorDelete(txn, &curOp.debug(), collection.getCollection(), &parsedDelete));
+ uassertStatusOK(exec->executePlan());
+ long long n = DeleteStage::getNumDeleted(*exec);
+ curOp.debug().ndeleted = n;
+
+ PlanSummaryStats summary;
+ Explain::getSummaryStats(*exec, &summary);
+ if (collection.getCollection()) {
+ collection.getCollection()->infoCache()->notifyOfQuery(txn, summary.indexesUsed);
+ }
+ curOp.debug().setPlanSummaryMetrics(summary);
+ LastError::get(txn->getClient()).recordDelete(n);
+
+ return {n};
+}
+
+WriteResult performDeletes(OperationContext* txn, const DeleteOp& wholeOp) {
+ invariant(!txn->lockState()->inAWriteUnitOfWork()); // Does own retries.
+ uassertStatusOK(userAllowedWriteNS(wholeOp.ns));
+
+ DisableDocumentValidationIfTrue docValidationDisabler(txn, wholeOp.bypassDocumentValidation);
+ LastOpFixer lastOpFixer(txn, wholeOp.ns);
+
+ WriteResult out;
+ out.results.reserve(wholeOp.deletes.size());
+ for (auto&& singleOp : wholeOp.deletes) {
+ // TODO: don't create nested CurOp for legacy writes.
+ CurOp curOp(txn);
+ ON_BLOCK_EXIT([&] { finishCurOp(txn, &curOp); });
+ try {
+ lastOpFixer.startingOp();
+ out.results.emplace_back(performSingleDeleteOp(txn, wholeOp.ns, singleOp));
+ lastOpFixer.finishedOpSuccessfully();
+ } catch (const DBException& ex) {
+ const bool canContinue = handleError(txn, ex, wholeOp, &out);
+ if (!canContinue)
+ break;
+ }
+ }
+
+ return out;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/ops/write_ops_exec.h b/src/mongo/db/ops/write_ops_exec.h
new file mode 100644
index 00000000000..37ab8dd3ee1
--- /dev/null
+++ b/src/mongo/db/ops/write_ops_exec.h
@@ -0,0 +1,85 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+#include <vector>
+
+#include "mongo/base/status_with.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/ops/write_ops.h"
+#include "mongo/s/stale_exception.h"
+
+namespace mongo {
+
+/**
+ * The result of performing a single write, possibly within a batch.
+ */
+struct WriteResult {
+ struct SingleResult {
+ int64_t n;
+ int64_t nModified;
+ BSONObj upsertedId; // Non-empty if something was upserted.
+ };
+
+ /**
+ * Maps 1-to-1 to single ops in request. May be shorter than input if there are errors.
+ *
+ * staleConfigException should be considered appended to this if it is non-null.
+ */
+ std::vector<StatusWith<SingleResult>> results;
+
+ /**
+ * If non-null, the SendStaleConfigException that was encountered while processing the op after
+ * the last op reported in results. Processing always stops at the first SCE and nothing is
+ * placed in results for the op that triggered it. The whole exception is copied here because it
+ * contains additional data not included in the Status.
+ *
+ * TODO convert to std::unique_ptr once we are on MSVC2015.
+ */
+ std::shared_ptr<SendStaleConfigException> staleConfigException;
+};
+
+
+/**
+ * Performs a batch of inserts, updates, or deletes.
+ *
+ * These functions handle all of the work of doing the writes, including locking, incrementing
+ * counters, managing CurOp, and of course actually doing the write. Waiting for the writeConcern is
+ * *not* handled by these functions and is expected to be done by the caller if needed.
+ *
+ * LastError is updated for failures of individual writes, but not for batch errors reported by an
+ * exception being thrown from these functions. Callers are responsible for managing LastError in
+ * that case. This should generally be combined with LastError handling from parse failures.
+ */
+WriteResult performInserts(OperationContext* txn, const InsertOp& op);
+WriteResult performUpdates(OperationContext* txn, const UpdateOp& op);
+WriteResult performDeletes(OperationContext* txn, const DeleteOp& op);
+
+} // namespace mongo
diff --git a/src/mongo/db/ops/write_ops_parsers.cpp b/src/mongo/db/ops/write_ops_parsers.cpp
index 8e01ee43eaa..6a7102d4dc1 100644
--- a/src/mongo/db/ops/write_ops_parsers.cpp
+++ b/src/mongo/db/ops/write_ops_parsers.cpp
@@ -95,10 +95,7 @@ void parseWriteCommand(StringData dbName,
}
const StringData fieldName = field.fieldNameStringData();
- if (fieldName == "writeConcern") {
- checkType(Object, field);
- op->writeConcern = field.Obj();
- } else if (fieldName == "bypassDocumentValidation") {
+ if (fieldName == "bypassDocumentValidation") {
checkType(Bool, field);
op->bypassDocumentValidation = field.Bool();
} else if (fieldName == "ordered") {
@@ -108,7 +105,8 @@ void parseWriteCommand(StringData dbName,
haveUniqueField = true;
*uniqueField = field;
} else if (fieldName[0] != '$') {
- std::initializer_list<StringData> ignoredFields = {"maxTimeMS", "shardVersion"};
+ std::initializer_list<StringData> ignoredFields = {
+ "writeConcern", "maxTimeMS", "shardVersion"};
uassert(ErrorCodes::FailedToParse,
str::stream() << "Unknown option to " << cmd.firstElementFieldName()
<< " command: " << fieldName,
@@ -134,6 +132,14 @@ InsertOp parseInsertCommand(StringData dbName, const BSONObj& cmd) {
op.documents.push_back(doc.Obj());
}
checkOpCountForCommand(op.documents.size());
+
+ if (op.ns.isSystemDotIndexes()) {
+ // This is only for consistency with sharding.
+ uassert(ErrorCodes::InvalidLength,
+ "Insert commands to system.indexes are limited to a single insert",
+ op.documents.size() == 1);
+ }
+
return op;
}
diff --git a/src/mongo/db/ops/write_ops_parsers_test.cpp b/src/mongo/db/ops/write_ops_parsers_test.cpp
index 04b3982762e..5d5f030e54c 100644
--- a/src/mongo/db/ops/write_ops_parsers_test.cpp
+++ b/src/mongo/db/ops/write_ops_parsers_test.cpp
@@ -34,16 +34,6 @@
namespace mongo {
-TEST(CommandWriteOpsParsers, CommonFields_WriteConcern) {
- auto writeConcern = BSON("w" << 2);
- auto cmd = BSON("insert"
- << "bar"
- << "documents" << BSON_ARRAY(BSONObj()) << "writeConcern" << writeConcern);
- auto op = parseInsertCommand("foo", cmd);
- ASSERT(bool(op.writeConcern));
- ASSERT_EQ(*op.writeConcern, writeConcern);
-}
-
TEST(CommandWriteOpsParsers, CommonFields_BypassDocumentValidation) {
for (bool bypassDocumentValidation : {true, false}) {
auto cmd = BSON("insert"
@@ -70,7 +60,7 @@ TEST(CommandWriteOpsParsers, CommonFields_IgnoredFields) {
auto cmd = BSON("insert"
<< "bar"
<< "documents" << BSON_ARRAY(BSONObj()) << "maxTimeMS" << 1000 << "shardVersion"
- << BSONObj());
+ << BSONObj() << "writeConcern" << BSONObj());
parseInsertCommand("foo", cmd);
}
@@ -102,7 +92,6 @@ TEST(CommandWriteOpsParsers, SingleInsert) {
auto cmd = BSON("insert" << ns.coll() << "documents" << BSON_ARRAY(obj));
const auto op = parseInsertCommand(ns.db(), cmd);
ASSERT_EQ(op.ns.ns(), ns.ns());
- ASSERT(!op.writeConcern);
ASSERT(!op.bypassDocumentValidation);
ASSERT(!op.continueOnError);
ASSERT_EQ(op.documents.size(), 1u);
@@ -122,7 +111,6 @@ TEST(CommandWriteOpsParsers, RealMultiInsert) {
auto cmd = BSON("insert" << ns.coll() << "documents" << BSON_ARRAY(obj0 << obj1));
const auto op = parseInsertCommand(ns.db(), cmd);
ASSERT_EQ(op.ns.ns(), ns.ns());
- ASSERT(!op.writeConcern);
ASSERT(!op.bypassDocumentValidation);
ASSERT(!op.continueOnError);
ASSERT_EQ(op.documents.size(), 2u);
@@ -141,7 +129,6 @@ TEST(CommandWriteOpsParsers, Update) {
<< upsert << "multi" << multi)));
auto op = parseUpdateCommand(ns.db(), cmd);
ASSERT_EQ(op.ns.ns(), ns.ns());
- ASSERT(!op.writeConcern);
ASSERT(!op.bypassDocumentValidation);
ASSERT_EQ(op.continueOnError, false);
ASSERT_EQ(op.updates.size(), 1u);
@@ -161,7 +148,6 @@ TEST(CommandWriteOpsParsers, Remove) {
<< BSON_ARRAY(BSON("q" << query << "limit" << (multi ? 0 : 1))));
auto op = parseDeleteCommand(ns.db(), cmd);
ASSERT_EQ(op.ns.ns(), ns.ns());
- ASSERT(!op.writeConcern);
ASSERT(!op.bypassDocumentValidation);
ASSERT_EQ(op.continueOnError, false);
ASSERT_EQ(op.deletes.size(), 1u);
@@ -235,7 +221,6 @@ TEST(LegacyWriteOpsParsers, SingleInsert) {
client.insert(ns, obj, continueOnError ? InsertOption_ContinueOnError : 0);
const auto op = parseLegacyInsert(client.message);
ASSERT_EQ(op.ns.ns(), ns);
- ASSERT(!op.writeConcern);
ASSERT(!op.bypassDocumentValidation);
ASSERT_EQ(op.continueOnError, continueOnError);
ASSERT_EQ(op.documents.size(), 1u);
@@ -263,7 +248,6 @@ TEST(LegacyWriteOpsParsers, RealMultiInsert) {
client.insert(ns, {obj0, obj1}, continueOnError ? InsertOption_ContinueOnError : 0);
const auto op = parseLegacyInsert(client.message);
ASSERT_EQ(op.ns.ns(), ns);
- ASSERT(!op.writeConcern);
ASSERT(!op.bypassDocumentValidation);
ASSERT_EQ(op.continueOnError, continueOnError);
ASSERT_EQ(op.documents.size(), 2u);
@@ -282,7 +266,6 @@ TEST(LegacyWriteOpsParsers, Update) {
client.update(ns, query, update, upsert, multi);
const auto op = parseLegacyUpdate(client.message);
ASSERT_EQ(op.ns.ns(), ns);
- ASSERT(!op.writeConcern);
ASSERT(!op.bypassDocumentValidation);
ASSERT_EQ(op.continueOnError, false);
ASSERT_EQ(op.updates.size(), 1u);
@@ -302,7 +285,6 @@ TEST(LegacyWriteOpsParsers, Remove) {
client.remove(ns, query, multi ? 0 : RemoveOption_JustOne);
const auto op = parseLegacyDelete(client.message);
ASSERT_EQ(op.ns.ns(), ns);
- ASSERT(!op.writeConcern);
ASSERT(!op.bypassDocumentValidation);
ASSERT_EQ(op.continueOnError, false);
ASSERT_EQ(op.deletes.size(), 1u);
diff --git a/src/mongo/db/stats/counters.cpp b/src/mongo/db/stats/counters.cpp
index 7a7c7d7dd1f..61b5be912dd 100644
--- a/src/mongo/db/stats/counters.cpp
+++ b/src/mongo/db/stats/counters.cpp
@@ -43,7 +43,7 @@ using std::endl;
OpCounters::OpCounters() {}
-void OpCounters::incInsertInWriteLock(int n) {
+void OpCounters::gotInserts(int n) {
RARELY _checkWrap();
_insert.fetchAndAdd(n);
}
diff --git a/src/mongo/db/stats/counters.h b/src/mongo/db/stats/counters.h
index 717724b8465..cae0cb0ad16 100644
--- a/src/mongo/db/stats/counters.h
+++ b/src/mongo/db/stats/counters.h
@@ -45,7 +45,7 @@ namespace mongo {
class OpCounters {
public:
OpCounters();
- void incInsertInWriteLock(int n);
+ void gotInserts(int n);
void gotInsert();
void gotQuery();
void gotUpdate();
diff --git a/src/mongo/dbtests/query_stage_sort.cpp b/src/mongo/dbtests/query_stage_sort.cpp
index fabf10ba05b..25d391ac6e8 100644
--- a/src/mongo/dbtests/query_stage_sort.cpp
+++ b/src/mongo/dbtests/query_stage_sort.cpp
@@ -319,11 +319,7 @@ public:
wuow.commit();
}
- {
- WriteUnitOfWork wuow(&_txn);
- fillData();
- wuow.commit();
- }
+ fillData();
// The data we're going to later invalidate.
set<RecordId> recordIds;
@@ -432,11 +428,7 @@ public:
wuow.commit();
}
- {
- WriteUnitOfWork wuow(&_txn);
- fillData();
- wuow.commit();
- }
+ fillData();
// The data we're going to later invalidate.
set<RecordId> recordIds;