diff options
author | Mathias Stearn <mathias@10gen.com> | 2016-04-07 18:34:44 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2016-04-21 18:58:41 -0400 |
commit | d819ac65d1a0f941bd3e201f343ac04e252c4442 (patch) | |
tree | a531131c01f3816094d34cd845cfd71aec2e3459 | |
parent | a7a593da31a944c90d7c5f0422eeee8264eb438d (diff) | |
download | mongo-d819ac65d1a0f941bd3e201f343ac04e252c4442.tar.gz |
SERVER-23128 Refactor mongod write operations
Now both write commands and legacy writes share an implementation.
-rw-r--r-- | jstests/core/batch_write_command_insert.js | 4 | ||||
-rw-r--r-- | jstests/gle/opcounters_legacy.js | 6 | ||||
-rw-r--r-- | src/mongo/db/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/catalog/document_validation.h | 14 | ||||
-rw-r--r-- | src/mongo/db/commands/write_commands/batch_executor.cpp | 1284 | ||||
-rw-r--r-- | src/mongo/db/commands/write_commands/batch_executor.h | 186 | ||||
-rw-r--r-- | src/mongo/db/commands/write_commands/write_commands.cpp | 445 | ||||
-rw-r--r-- | src/mongo/db/commands/write_commands/write_commands.h | 127 | ||||
-rw-r--r-- | src/mongo/db/instance.cpp | 847 | ||||
-rw-r--r-- | src/mongo/db/lasterror.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/lasterror.h | 4 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops.h | 1 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.cpp | 630 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.h | 85 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_parsers.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_parsers_test.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/stats/counters.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/stats/counters.h | 2 | ||||
-rw-r--r-- | src/mongo/dbtests/query_stage_sort.cpp | 12 |
19 files changed, 1202 insertions, 2490 deletions
diff --git a/jstests/core/batch_write_command_insert.js b/jstests/core/batch_write_command_insert.js index 6b42cf08ebf..3405785ecce 100644 --- a/jstests/core/batch_write_command_insert.js +++ b/jstests/core/batch_write_command_insert.js @@ -285,7 +285,7 @@ request = { documents: [{ns: "invalid." + coll.getName(), key: {x: 1}, name: "x_1", unique: true}] }; result = coll.runCommand(request); -assert(!result.ok, tojson(result)); +assert(!result.ok || (result.n == 0 && result.writeErrors.length == 1), tojson(result)); assert.eq(coll.getIndexes().length, 0); // @@ -296,7 +296,7 @@ request = { documents: [{}] }; result = coll.runCommand(request); -assert(!result.ok, tojson(result)); +assert(!result.ok || (result.n == 0 && result.writeErrors.length == 1), tojson(result)); assert.eq(coll.getIndexes().length, 0); // diff --git a/jstests/gle/opcounters_legacy.js b/jstests/gle/opcounters_legacy.js index c31494b6c01..b243b8bc076 100644 --- a/jstests/gle/opcounters_legacy.js +++ b/jstests/gle/opcounters_legacy.js @@ -46,20 +46,20 @@ opCounters = db.serverStatus().opcounters; t.insert({_id: 0}); print(db.getLastError()); assert(db.getLastError()); -assert.eq(opCounters.insert + (isMongos ? 1 : 0), db.serverStatus().opcounters.insert); +assert.eq(opCounters.insert + 1, db.serverStatus().opcounters.insert); // Bulk insert, with error, continueOnError=false. opCounters = db.serverStatus().opcounters; t.insert([{_id: 3}, {_id: 3}, {_id: 4}]); assert(db.getLastError()); -assert.eq(opCounters.insert + (isMongos ? 2 : 1), db.serverStatus().opcounters.insert); +assert.eq(opCounters.insert + 2, db.serverStatus().opcounters.insert); // Bulk insert, with error, continueOnError=true. var continueOnErrorFlag = 1; opCounters = db.serverStatus().opcounters; t.insert([{_id: 5}, {_id: 5}, {_id: 6}], continueOnErrorFlag); assert(db.getLastError()); -assert.eq(opCounters.insert + 2, db.serverStatus().opcounters.insert); +assert.eq(opCounters.insert + (isMongos ? 2 : 3), db.serverStatus().opcounters.insert); // // 2. Update. diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 71e16230890..5b68bb4454f 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -615,7 +615,6 @@ serverOnlyFiles = [ "commands/touch.cpp", "commands/user_management_commands.cpp", "commands/validate.cpp", - "commands/write_commands/batch_executor.cpp", "commands/write_commands/write_commands.cpp", "curop_metrics.cpp", "db_raii.cpp", @@ -646,6 +645,7 @@ serverOnlyFiles = [ "ops/update.cpp", "ops/update_lifecycle_impl.cpp", "ops/update_result.cpp", + "ops/write_ops_exec.cpp", "pipeline/document_source_cursor.cpp", "pipeline/pipeline_d.cpp", "prefetch.cpp", diff --git a/src/mongo/db/catalog/document_validation.h b/src/mongo/db/catalog/document_validation.h index 2bc8f8b4787..e5b0fc0555c 100644 --- a/src/mongo/db/catalog/document_validation.h +++ b/src/mongo/db/catalog/document_validation.h @@ -69,4 +69,18 @@ private: OperationContext* const _txn; const bool _initialState; }; + +/** + * Disables document validation while in scope if the constructor is passed true. + */ +class DisableDocumentValidationIfTrue { +public: + DisableDocumentValidationIfTrue(OperationContext* txn, bool shouldDisableValidation) { + if (shouldDisableValidation) + _documentValidationDisabler.emplace(txn); + } + +private: + boost::optional<DisableDocumentValidation> _documentValidationDisabler; +}; } diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp deleted file mode 100644 index 69bd166f4e5..00000000000 --- a/src/mongo/db/commands/write_commands/batch_executor.cpp +++ /dev/null @@ -1,1284 +0,0 @@ -/** - * Copyright (C) 2013 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kWrite - -#include "mongo/platform/basic.h" - -#include "mongo/db/commands/write_commands/batch_executor.h" - -#include <memory> - -#include "mongo/base/error_codes.h" -#include "mongo/db/catalog/database.h" -#include "mongo/db/catalog/database_holder.h" -#include "mongo/db/catalog/document_validation.h" -#include "mongo/db/catalog/index_create.h" -#include "mongo/db/clientcursor.h" -#include "mongo/db/commands.h" -#include "mongo/db/concurrency/write_conflict_exception.h" -#include "mongo/db/curop_metrics.h" -#include "mongo/db/db_raii.h" -#include "mongo/db/exec/delete.h" -#include "mongo/db/exec/update.h" -#include "mongo/db/instance.h" -#include "mongo/db/introspect.h" -#include "mongo/db/lasterror.h" -#include "mongo/db/namespace_string.h" -#include "mongo/db/op_observer.h" -#include "mongo/db/ops/delete_request.h" -#include "mongo/db/ops/insert.h" -#include "mongo/db/ops/parsed_delete.h" -#include "mongo/db/ops/parsed_update.h" -#include "mongo/db/ops/update_lifecycle_impl.h" -#include "mongo/db/query/get_executor.h" -#include "mongo/db/query/plan_executor.h" -#include "mongo/db/query/plan_summary_stats.h" -#include "mongo/db/query/query_knobs.h" -#include "mongo/db/repl/oplog.h" -#include "mongo/db/repl/repl_client_info.h" -#include "mongo/db/repl/repl_settings.h" -#include "mongo/db/repl/replication_coordinator_global.h" -#include "mongo/db/server_parameters.h" -#include "mongo/db/s/collection_metadata.h" -#include "mongo/db/s/collection_sharding_state.h" -#include "mongo/db/s/operation_sharding_state.h" -#include "mongo/db/s/sharded_connection_info.h" -#include "mongo/db/s/sharding_state.h" -#include "mongo/db/stats/counters.h" -#include "mongo/db/stats/top.h" -#include "mongo/db/write_concern.h" -#include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/s/shard_key_pattern.h" -#include "mongo/s/stale_exception.h" -#include "mongo/s/write_ops/batched_upsert_detail.h" -#include "mongo/s/write_ops/write_error_detail.h" -#include "mongo/stdx/memory.h" -#include "mongo/stdx/mutex.h" -#include "mongo/util/elapsed_tracker.h" -#include "mongo/util/log.h" -#include "mongo/util/mongoutils/str.h" -#include "mongo/util/scopeguard.h" - -namespace mongo { - -using std::string; -using std::unique_ptr; -using std::vector; -using str::stream; - -namespace { - -/** - * Data structure to safely hold and clean up results of single write operations. - */ -class WriteOpResult { - MONGO_DISALLOW_COPYING(WriteOpResult); - -public: - WriteOpResult() {} - - WriteOpStats& getStats() { - return _stats; - } - - WriteErrorDetail* getError() { - return _error.get(); - } - WriteErrorDetail* releaseError() { - return _error.release(); - } - void setError(WriteErrorDetail* error) { - _error.reset(error); - } - -private: - WriteOpStats _stats; - std::unique_ptr<WriteErrorDetail> _error; -}; - -WriteErrorDetail* toWriteError(const Status& status) { - WriteErrorDetail* error = new WriteErrorDetail; - - // TODO: Complex transform here? - error->setErrCode(status.code()); - error->setErrMessage(status.reason()); - - return error; -} - -void toBatchError(const Status& status, BatchedCommandResponse* response) { - response->clear(); - response->setErrCode(status.code()); - response->setErrMessage(status.reason()); - response->setOk(false); - dassert(response->isValid(NULL)); -} - -/** - * Translates write item type to wire protocol op code. Helper for - * WriteBatchExecutor::applyWriteItem(). - */ -NetworkOp getOpCode(const BatchItemRef& currWrite) { - switch (currWrite.getRequest()->getBatchType()) { - case BatchedCommandRequest::BatchType_Insert: - return dbInsert; - case BatchedCommandRequest::BatchType_Update: - return dbUpdate; - case BatchedCommandRequest::BatchType_Delete: - return dbDelete; - default: - MONGO_UNREACHABLE; - } -} - -void buildStaleError(const ChunkVersion& shardVersionRecvd, - const ChunkVersion& shardVersionWanted, - WriteErrorDetail* error) { - // Write stale error to results - error->setErrCode(ErrorCodes::StaleShardVersion); - - BSONObjBuilder infoB; - shardVersionWanted.addToBSON(infoB, "vWanted"); - error->setErrInfo(infoB.obj()); - - string errMsg = stream() << "stale shard version detected before write, received " - << shardVersionRecvd.toString() << " but local version is " - << shardVersionWanted.toString(); - error->setErrMessage(errMsg); -} - -bool checkShardVersion(OperationContext* txn, - const BatchedCommandRequest& request, - WriteOpResult* result) { - const auto& css = CollectionShardingState::get(txn, request.getTargetingNSS()); - - try { - css->checkShardVersionOrThrow(txn); - return true; - } catch (const StaleConfigException& e) { - result->setError(new WriteErrorDetail()); - buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError()); - return false; - } -} - -} // namespace - -WriteBatchExecutor::WriteBatchExecutor(OperationContext* txn, OpCounters* opCounters, LastError* le) - : _txn(txn), _opCounters(opCounters), _le(le), _stats(new WriteBatchStats) {} - -// static -Status WriteBatchExecutor::validateBatch(const BatchedCommandRequest& request) { - // Validate namespace - const NamespaceString& nss = request.getNS(); - if (!nss.isValid()) { - return Status(ErrorCodes::InvalidNamespace, nss.ns() + " is not a valid namespace"); - } - - // Make sure we can write to the namespace - Status allowedStatus = userAllowedWriteNS(nss); - if (!allowedStatus.isOK()) { - return allowedStatus; - } - - // Validate insert index requests - // TODO: Push insert index requests through createIndex once all upgrade paths support it - string errMsg; - if (request.isInsertIndexRequest() && !request.isValidIndexRequest(&errMsg)) { - return Status(ErrorCodes::InvalidOptions, errMsg); - } - - return Status::OK(); -} - -void WriteBatchExecutor::executeBatch(const BatchedCommandRequest& request, - BatchedCommandResponse* response) { - // Validate namespace - Status isValid = validateBatch(request); - if (!isValid.isOK()) { - toBatchError(isValid, response); - return; - } - - if (request.sizeWriteOps() == 0u) { - toBatchError(Status(ErrorCodes::InvalidLength, "no write ops were included in the batch"), - response); - return; - } - - // Validate batch size - if (request.sizeWriteOps() > BatchedCommandRequest::kMaxWriteBatchSize) { - toBatchError(Status(ErrorCodes::InvalidLength, - stream() << "exceeded maximum write batch size of " - << BatchedCommandRequest::kMaxWriteBatchSize), - response); - return; - } - - // - // End validation - // - - const WriteConcernOptions& writeConcern = _txn->getWriteConcern(); - bool silentWC = writeConcern.wMode.empty() && writeConcern.wNumNodes == 0 && - (writeConcern.syncMode == WriteConcernOptions::SyncMode::NONE || - writeConcern.syncMode == WriteConcernOptions::SyncMode::UNSET); - - Timer commandTimer; - - OwnedPointerVector<WriteErrorDetail> writeErrorsOwned; - vector<WriteErrorDetail*>& writeErrors = writeErrorsOwned.mutableVector(); - - OwnedPointerVector<BatchedUpsertDetail> upsertedOwned; - vector<BatchedUpsertDetail*>& upserted = upsertedOwned.mutableVector(); - - // - // Apply each batch item, possibly bulking some items together in the write lock. - // Stops on error if batch is ordered. - // - - bulkExecute(request, &upserted, &writeErrors); - - // - // Refresh metadata if needed - // - - const bool staleBatch = - !writeErrors.empty() && writeErrors.back()->getErrCode() == ErrorCodes::StaleShardVersion; - - if (staleBatch) { - const auto& oss = OperationShardingState::get(_txn); - - ChunkVersion requestedShardVersion = oss.getShardVersion(request.getTargetingNSS()); - ShardingState::get(_txn) - ->onStaleShardVersion(_txn, request.getTargetingNSS(), requestedShardVersion); - } - - // - // Construct response - // - - response->setOk(true); - - if (!silentWC) { - if (upserted.size()) { - response->setUpsertDetails(upserted); - } - - if (writeErrors.size()) { - response->setErrDetails(writeErrors); - } - - repl::ReplicationCoordinator* replCoord = repl::getGlobalReplicationCoordinator(); - const repl::ReplicationCoordinator::Mode replMode = replCoord->getReplicationMode(); - if (replMode != repl::ReplicationCoordinator::modeNone) { - response->setLastOp(repl::ReplClientInfo::forClient(_txn->getClient()).getLastOp()); - if (replMode == repl::ReplicationCoordinator::modeReplSet) { - response->setElectionId(replCoord->getElectionId()); - } - } - - // Set the stats for the response - response->setN(_stats->numInserted + _stats->numUpserted + _stats->numMatched + - _stats->numDeleted); - if (request.getBatchType() == BatchedCommandRequest::BatchType_Update) - response->setNModified(_stats->numModified); - } - - dassert(response->isValid(NULL)); -} - -static bool checkIsMasterForDatabase(const NamespaceString& ns, WriteOpResult* result) { - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(ns)) { - WriteErrorDetail* errorDetail = new WriteErrorDetail; - result->setError(errorDetail); - errorDetail->setErrCode(ErrorCodes::NotMaster); - errorDetail->setErrMessage("Not primary while writing to " + ns.toString()); - return false; - } - return true; -} - -static void buildUniqueIndexError(const BSONObj& keyPattern, - const BSONObj& indexPattern, - WriteErrorDetail* error) { - error->setErrCode(ErrorCodes::CannotCreateIndex); - string errMsg = stream() << "cannot create unique index over " << indexPattern - << " with shard key pattern " << keyPattern; - error->setErrMessage(errMsg); -} - -static bool checkIndexConstraints(OperationContext* txn, - const BatchedCommandRequest& request, - WriteOpResult* result) { - const NamespaceString& nss = request.getTargetingNSS(); - dassert(txn->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IX)); - - if (!request.isUniqueIndexRequest()) - return true; - - ShardingState* shardingState = ShardingState::get(txn); - if (shardingState->enabled()) { - auto metadata = shardingState->getCollectionMetadata(nss.ns()); - if (metadata) { - ShardKeyPattern shardKeyPattern(metadata->getKeyPattern()); - if (!shardKeyPattern.isUniqueIndexCompatible(request.getIndexKeyPattern())) { - result->setError(new WriteErrorDetail); - buildUniqueIndexError( - metadata->getKeyPattern(), request.getIndexKeyPattern(), result->getError()); - - return false; - } - } - } - - return true; -} - -// -// HELPERS FOR CUROP MANAGEMENT AND GLOBAL STATS -// - -static void beginCurrentOp(OperationContext* txn, const BatchItemRef& currWrite) { - stdx::lock_guard<Client> lk(*txn->getClient()); - CurOp* const currentOp = CurOp::get(txn); - currentOp->setNetworkOp_inlock(getOpCode(currWrite)); - currentOp->setLogicalOp_inlock(networkOpToLogicalOp(getOpCode(currWrite))); - currentOp->ensureStarted(); - currentOp->setNS_inlock(currWrite.getRequest()->getNS().ns()); - - if (currWrite.getOpType() == BatchedCommandRequest::BatchType_Insert) { - currentOp->setQuery_inlock(currWrite.getDocument()); - currentOp->debug().query = currWrite.getDocument(); - currentOp->debug().ninserted = 0; - } else if (currWrite.getOpType() == BatchedCommandRequest::BatchType_Update) { - currentOp->setQuery_inlock(currWrite.getUpdate()->getQuery()); - currentOp->debug().query = currWrite.getUpdate()->getQuery(); - currentOp->debug().updateobj = currWrite.getUpdate()->getUpdateExpr(); - // Note: debug().nMatched, nModified and nmoved are set internally in update - } else { - dassert(currWrite.getOpType() == BatchedCommandRequest::BatchType_Delete); - currentOp->setQuery_inlock(currWrite.getDelete()->getQuery()); - currentOp->debug().query = currWrite.getDelete()->getQuery(); - currentOp->debug().ndeleted = 0; - } -} - -void WriteBatchExecutor::incWriteStats(const BatchedCommandRequest::BatchType opType, - const WriteOpStats& stats, - const WriteErrorDetail* error, - CurOp* currentOp) { - if (opType == BatchedCommandRequest::BatchType_Update) { - if (stats.upsertedID.isEmpty()) { - _stats->numMatched += stats.n; - _stats->numModified += stats.nModified; - } else { - ++_stats->numUpserted; - } - - if (!error) { - _le->recordUpdate(stats.upsertedID.isEmpty() && stats.n > 0, stats.n, stats.upsertedID); - } - } else { - dassert(opType == BatchedCommandRequest::BatchType_Delete); - _stats->numDeleted += stats.n; - if (!error) { - _le->recordDelete(stats.n); - } - currentOp->debug().ndeleted += stats.n; - } - - if (error) { - _le->setLastError(error->getErrCode(), error->getErrMessage().c_str()); - } -} - -static void logCurOpError(CurOp* currentOp, WriteErrorDetail* opError) { - invariant(opError != nullptr); - currentOp->debug().exceptionInfo = - ExceptionInfo(opError->getErrMessage(), opError->getErrCode()); - - LOG(3) << " Caught Assertion in " << networkOpToString(currentOp->getNetworkOp()) - << ", continuing " << causedBy(opError->getErrMessage()); -} - -static void finishCurrentOp(OperationContext* txn, WriteErrorDetail* opError) { - CurOp* currentOp = CurOp::get(txn); - currentOp->done(); - int executionTime = currentOp->debug().executionTime = currentOp->totalTimeMillis(); - recordCurOpMetrics(txn); - Top::get(txn->getClient()->getServiceContext()) - .record(currentOp->getNS(), - currentOp->getLogicalOp(), - 1, // "write locked" - currentOp->totalTimeMicros(), - currentOp->isCommand()); - - if (opError) - logCurOpError(currentOp, opError); - - bool logAll = logger::globalLogDomain()->shouldLog(logger::LogComponent::kCommand, - logger::LogSeverity::Debug(1)); - bool logSlow = executionTime > (serverGlobalParams.slowMS + currentOp->getExpectedLatencyMs()); - - if (logAll || logSlow) { - Locker::LockerInfo lockerInfo; - txn->lockState()->getLockerInfo(&lockerInfo); - - LOG(0) << currentOp->debug().report(*currentOp, lockerInfo.stats); - } - - if (currentOp->shouldDBProfile(executionTime)) { - profile(txn, CurOp::get(txn)->getNetworkOp()); - } -} - -// END HELPERS - -// -// CORE WRITE OPERATIONS (declaration) -// These functions write to the database and return stats and zero or one of: -// - page fault -// - error -// - -static void singleCreateIndex(OperationContext* txn, - const BSONObj& indexDesc, - WriteOpResult* result); - -static void multiUpdate(OperationContext* txn, - const BatchItemRef& updateItem, - WriteOpResult* result); - -static void multiRemove(OperationContext* txn, - const BatchItemRef& removeItem, - WriteOpResult* result); - -// -// WRITE EXECUTION -// In general, the exec* operations manage db lock state and stats before dispatching to the -// core write operations, which are *only* responsible for performing a write and reporting -// success or failure. -// - -/** - * Representation of the execution state of execInserts. Used by a single - * execution of execInserts in a single thread. - */ -class WriteBatchExecutor::ExecInsertsState { - MONGO_DISALLOW_COPYING(ExecInsertsState); - -public: - /** - * Constructs a new instance, for performing inserts described in "aRequest". - */ - explicit ExecInsertsState(OperationContext* txn, const BatchedCommandRequest* aRequest); - - /** - * Acquires the write lock and client context needed to perform the current write operation. - * Returns true on success, after which it is safe to use the "context" and "collection" - * members. It is safe to call this function if this instance already holds the write lock. - * - * On failure, writeLock, context and collection will be NULL/clear. - */ - bool lockAndCheck(WriteOpResult* result); - - /** - * Releases the client context and write lock acquired by lockAndCheck. Safe to call - * regardless of whether or not this state object currently owns the lock. - */ - void unlock(); - - /** - * Returns true if this executor has the lock on the target database. - */ - bool hasLock() { - return _dbLock.get(); - } - - /** - * Gets the target collection for the batch operation. Value is undefined - * unless hasLock() is true. - */ - Collection* getCollection() { - return _collection; - } - - OperationContext* txn; - - // Request object describing the inserts. - const BatchedCommandRequest* request; - - // Index of the current insert operation to perform. - size_t currIndex = 0; - - // Translation of insert documents in "request" into insert-ready forms. This vector has a - // correspondence with elements of the "request", and "currIndex" is used to - // index both. - std::vector<StatusWith<BSONObj>> normalizedInserts; - -private: - bool _lockAndCheckImpl(WriteOpResult* result, bool intentLock); - - ScopedTransaction _transaction; - // Guard object for the write lock on the target database. - std::unique_ptr<Lock::DBLock> _dbLock; - std::unique_ptr<Lock::CollectionLock> _collLock; - - Database* _database = nullptr; - Collection* _collection = nullptr; -}; - -void WriteBatchExecutor::bulkExecute(const BatchedCommandRequest& request, - std::vector<BatchedUpsertDetail*>* upsertedIds, - std::vector<WriteErrorDetail*>* errors) { - boost::optional<DisableDocumentValidation> maybeDisableValidation; - if (request.shouldBypassValidation()) { - maybeDisableValidation.emplace(_txn); - } - - if (request.getBatchType() == BatchedCommandRequest::BatchType_Insert) { - execInserts(request, errors); - } else if (request.getBatchType() == BatchedCommandRequest::BatchType_Update) { - for (size_t i = 0; i < request.sizeWriteOps(); i++) { - if (i + 1 == request.sizeWriteOps()) { - setupSynchronousCommit(_txn); - } - - WriteErrorDetail* error = NULL; - BSONObj upsertedId; - execUpdate(BatchItemRef(&request, i), &upsertedId, &error); - - if (!upsertedId.isEmpty()) { - BatchedUpsertDetail* batchUpsertedId = new BatchedUpsertDetail; - batchUpsertedId->setIndex(i); - batchUpsertedId->setUpsertedID(upsertedId); - upsertedIds->push_back(batchUpsertedId); - } - - if (error) { - errors->push_back(error); - if (request.getOrdered()) - break; - } - } - } else { - dassert(request.getBatchType() == BatchedCommandRequest::BatchType_Delete); - for (size_t i = 0; i < request.sizeWriteOps(); i++) { - if (i + 1 == request.sizeWriteOps()) { - setupSynchronousCommit(_txn); - } - - WriteErrorDetail* error = NULL; - execRemove(BatchItemRef(&request, i), &error); - - if (error) { - errors->push_back(error); - if (request.getOrdered()) - break; - } - } - } - - // Fill in stale version errors for unordered batches (update/delete can't do this on own) - if (!errors->empty() && !request.getOrdered()) { - const WriteErrorDetail* finalError = errors->back(); - - if (finalError->getErrCode() == ErrorCodes::StaleShardVersion) { - for (size_t i = finalError->getIndex() + 1; i < request.sizeWriteOps(); i++) { - WriteErrorDetail* dupStaleError = new WriteErrorDetail; - finalError->cloneTo(dupStaleError); - errors->push_back(dupStaleError); - } - } - } -} - -// Goes over the request and preprocesses normalized versions of all the inserts in the request -static void normalizeInserts(const BatchedCommandRequest& request, - vector<StatusWith<BSONObj>>* normalizedInserts) { - normalizedInserts->reserve(request.sizeWriteOps()); - for (size_t i = 0; i < request.sizeWriteOps(); ++i) { - BSONObj insertDoc = request.getInsertRequest()->getDocumentsAt(i); - StatusWith<BSONObj> normalInsert = fixDocumentForInsert(insertDoc); - normalizedInserts->push_back(normalInsert); - if (request.getOrdered() && !normalInsert.isOK()) - break; - } -} - -static void insertOne(WriteBatchExecutor::ExecInsertsState* state, WriteOpResult* result); - -// Loops over the specified subset of the batch, processes one document at a time. -// Returns a true to discontinue the insert, or false if not. -bool WriteBatchExecutor::insertMany(WriteBatchExecutor::ExecInsertsState* state, - size_t startIndex, - size_t endIndex, - CurOp* currentOp, - std::vector<WriteErrorDetail*>* errors, - bool ordered) { - for (state->currIndex = startIndex; state->currIndex < endIndex; ++state->currIndex) { - WriteOpResult result; - BatchItemRef currInsertItem(state->request, state->currIndex); - { - stdx::lock_guard<Client> lk(*_txn->getClient()); - currentOp->setQuery_inlock(currInsertItem.getDocument()); - currentOp->debug().query = currInsertItem.getDocument(); - } - - _opCounters->gotInsert(); - // Internally, insertOne retries the single insert until it completes without a write - // conflict exception, or until it fails with some kind of error. Errors are mostly - // propagated via the request->error field, but DBExceptions or std::exceptions may escape, - // particularly on operation interruption. These kinds of errors necessarily prevent - // further insertOne calls, and stop the batch. As a result, the only expected source of - // such exceptions are interruptions. - insertOne(state, &result); - - uint64_t nInserted = result.getStats().n; - _stats->numInserted += nInserted; - currentOp->debug().ninserted += nInserted; - - const WriteErrorDetail* error = result.getError(); - if (error) { - _le->setLastError(error->getErrCode(), error->getErrMessage().c_str()); - WriteErrorDetail* error = NULL; - error = result.releaseError(); - errors->push_back(error); - error->setIndex(state->currIndex); - logCurOpError(CurOp::get(_txn), error); - if (ordered) - return true; - } else { - _le->recordInsert(nInserted); - } - } - return false; -} - -// Instantiates an ExecInsertsState, which represents all of the state for the batch. -// Breaks out into manageably sized chunks for insertMany, between which we can yield. -// Encapsulates the lock state. -void WriteBatchExecutor::execInserts(const BatchedCommandRequest& request, - std::vector<WriteErrorDetail*>* errors) { - ExecInsertsState state(_txn, &request); - normalizeInserts(request, &state.normalizedInserts); - - CurOp* currentOp; - { - stdx::lock_guard<Client> lk(*_txn->getClient()); - currentOp = CurOp::get(_txn); - currentOp->setLogicalOp_inlock(LogicalOp::opInsert); - currentOp->ensureStarted(); - currentOp->setNS_inlock(request.getNS().ns()); - currentOp->debug().ninserted = 0; - } - - auto client = _txn->getClient(); - auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp(); - ScopeGuard lastOpSetterGuard = MakeObjGuard(repl::ReplClientInfo::forClient(client), - &repl::ReplClientInfo::setLastOpToSystemLastOpTime, - _txn); - - // If this is the local database, don't set last op. - if (request.getNS().isLocal()) { - lastOpSetterGuard.Dismiss(); - } - - int64_t chunkCount = 0; - int64_t chunkBytes = 0; - const int64_t chunkMaxCount = internalQueryExecYieldIterations / 2; - size_t startIndex = 0; - size_t maxIndex = state.request->sizeWriteOps() - 1; - - for (size_t i = 0; i <= maxIndex; ++i) { - if (i == maxIndex) - setupSynchronousCommit(_txn); - state.currIndex = i; - BatchItemRef currInsertItem(state.request, state.currIndex); - chunkBytes += currInsertItem.getDocument().objsize(); - chunkCount++; - - if ((chunkCount >= chunkMaxCount) || (chunkBytes >= insertVectorMaxBytes) || - (i == maxIndex)) { - bool stop; - stop = insertMany(&state, startIndex, i + 1, currentOp, errors, request.getOrdered()); - startIndex = i + 1; - chunkCount = 0; - chunkBytes = 0; - - if (state.hasLock()) { - // insertOne acquires the locks, but does not release them on non-error cases, - // so we release them here. insertOne() reacquires them via lockAndCheck(). - state.unlock(); - // This releases any storage engine held locks/snapshots. - _txn->recoveryUnit()->abandonSnapshot(); - // Since the lock manager guarantees FIFO queues waiting on locks, - // there is no need to explicitly sleep or give up control of the processor here. - } - if (stop) - break; - } - } - - if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) { - // If this operation has already generated a new lastOp, don't bother setting it - // here. No-op updates will not generate a new lastOp, so we still need the guard to - // fire in that case. - lastOpSetterGuard.Dismiss(); - } - - // TODO: Move Top and CurOp metrics management into an RAII object. - currentOp->done(); - recordCurOpMetrics(_txn); - Top::get(_txn->getClient()->getServiceContext()) - .record(currentOp->getNS(), - currentOp->getLogicalOp(), - 1, // "write locked" - currentOp->totalTimeMicros(), - currentOp->isCommand()); -} - -void WriteBatchExecutor::execUpdate(const BatchItemRef& updateItem, - BSONObj* upsertedId, - WriteErrorDetail** error) { - // BEGIN CURRENT OP - CurOp currentOp(_txn); - beginCurrentOp(_txn, updateItem); - _opCounters->gotUpdate(); - - WriteOpResult result; - multiUpdate(_txn, updateItem, &result); - - if (!result.getStats().upsertedID.isEmpty()) { - *upsertedId = result.getStats().upsertedID; - } - // END CURRENT OP - incWriteStats(updateItem.getOpType(), result.getStats(), result.getError(), ¤tOp); - finishCurrentOp(_txn, result.getError()); - - // End current transaction and release snapshot. - _txn->recoveryUnit()->abandonSnapshot(); - - if (result.getError()) { - result.getError()->setIndex(updateItem.getItemIndex()); - *error = result.releaseError(); - } -} - -void WriteBatchExecutor::execRemove(const BatchItemRef& removeItem, WriteErrorDetail** error) { - // Removes are similar to updates, but page faults are handled externally - - // BEGIN CURRENT OP - CurOp currentOp(_txn); - beginCurrentOp(_txn, removeItem); - _opCounters->gotDelete(); - - WriteOpResult result; - multiRemove(_txn, removeItem, &result); - - // END CURRENT OP - incWriteStats(removeItem.getOpType(), result.getStats(), result.getError(), ¤tOp); - finishCurrentOp(_txn, result.getError()); - - // End current transaction and release snapshot. - _txn->recoveryUnit()->abandonSnapshot(); - - if (result.getError()) { - result.getError()->setIndex(removeItem.getItemIndex()); - *error = result.releaseError(); - } -} - -// -// IN-DB-LOCK CORE OPERATIONS -// - -WriteBatchExecutor::ExecInsertsState::ExecInsertsState(OperationContext* txn, - const BatchedCommandRequest* aRequest) - : txn(txn), request(aRequest), _transaction(txn, MODE_IX) {} - -bool WriteBatchExecutor::ExecInsertsState::_lockAndCheckImpl(WriteOpResult* result, - bool intentLock) { - if (hasLock()) { - CurOp::get(txn)->raiseDbProfileLevel(_database->getProfilingLevel()); - return true; - } - - if (request->isInsertIndexRequest()) - intentLock = false; // can't build indexes in intent mode - - const NamespaceString& nss = request->getNS(); - invariant(!_collLock); - invariant(!_dbLock); - _dbLock = - stdx::make_unique<Lock::DBLock>(txn->lockState(), nss.db(), intentLock ? MODE_IX : MODE_X); - _database = dbHolder().get(txn, nss.ns()); - if (intentLock && !_database) { - // Ensure exclusive lock in case the database doesn't yet exist - _dbLock.reset(); - _dbLock = stdx::make_unique<Lock::DBLock>(txn->lockState(), nss.db(), MODE_X); - intentLock = false; - } - _collLock = stdx::make_unique<Lock::CollectionLock>( - txn->lockState(), nss.ns(), intentLock ? MODE_IX : MODE_X); - if (!checkIsMasterForDatabase(nss, result)) { - return false; - } - if (!checkShardVersion(txn, *request, result)) { - return false; - } - if (!checkIndexConstraints(txn, *request, result)) { - return false; - } - - if (!_database) { - invariant(!intentLock); - _database = dbHolder().openDb(txn, nss.ns()); - } - CurOp::get(txn)->raiseDbProfileLevel(_database->getProfilingLevel()); - _collection = _database->getCollection(request->getTargetingNS()); - if (!_collection) { - if (intentLock) { - // try again with full X lock. - unlock(); - return _lockAndCheckImpl(result, false); - } - - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - WriteUnitOfWork wunit(txn); - // Implicitly create if it doesn't exist - _collection = _database->createCollection(txn, request->getTargetingNS()); - if (!_collection) { - result->setError(toWriteError( - Status(ErrorCodes::InternalError, - "could not create collection " + request->getTargetingNS()))); - return false; - } - wunit.commit(); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", request->getTargetingNS()); - } - return true; -} - -bool WriteBatchExecutor::ExecInsertsState::lockAndCheck(WriteOpResult* result) { - if (_lockAndCheckImpl(result, true)) - return true; - unlock(); - return false; -} - -void WriteBatchExecutor::ExecInsertsState::unlock() { - _collection = nullptr; - _database = nullptr; - _collLock.reset(); - _dbLock.reset(); -} - -static void insertOne(WriteBatchExecutor::ExecInsertsState* state, WriteOpResult* result) { - // we have to be top level so we can retry - OperationContext* txn = state->txn; - invariant(!txn->lockState()->inAWriteUnitOfWork()); - invariant(state->currIndex < state->normalizedInserts.size()); - - const StatusWith<BSONObj>& normalizedInsert(state->normalizedInserts[state->currIndex]); - - if (!normalizedInsert.isOK()) { - result->setError(toWriteError(normalizedInsert.getStatus())); - return; - } - - const BSONObj& insertDoc = normalizedInsert.getValue().isEmpty() - ? state->request->getInsertRequest()->getDocumentsAt(state->currIndex) - : normalizedInsert.getValue(); - - try { - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - if (state->request->isInsertIndexRequest()) { - singleCreateIndex(txn, insertDoc, result); - } else { - if (state->lockAndCheck(result)) { - dassert(txn->lockState()->isCollectionLockedForMode( - state->getCollection()->ns().ns(), MODE_IX)); - - WriteUnitOfWork wunit(txn); - Status status = state->getCollection()->insertDocument( - txn, insertDoc, &CurOp::get(txn)->debug(), true); - - if (status.isOK()) { - result->getStats().n++; - wunit.commit(); - } else { - result->setError(toWriteError(status)); - } - } - } - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - txn, "insert", state->getCollection() ? state->getCollection()->ns().ns() : "index"); - } catch (const StaleConfigException& e) { - result->setError(new WriteErrorDetail()); - buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError()); - } catch (const DBException& ex) { - Status status(ex.toStatus()); - if (ErrorCodes::isInterruption(status.code())) - throw; - result->setError(toWriteError(status)); - } - - // Errors release the write lock, as a matter of policy. - if (result->getError()) { - txn->recoveryUnit()->abandonSnapshot(); - state->unlock(); - } -} - -/** - * Perform a single index creation on a collection. Requires the index descriptor be - * preprocessed. - * - * Might fault or error, otherwise populates the result. - */ -static void singleCreateIndex(OperationContext* txn, - const BSONObj& indexDesc, - WriteOpResult* result) { - BSONElement nsElement = indexDesc["ns"]; - uassert(ErrorCodes::NoSuchKey, "Missing \"ns\" field in index description", !nsElement.eoo()); - uassert(ErrorCodes::TypeMismatch, - str::stream() << "Expected \"ns\" field of index description to be a " - "string, " - "but found a " << typeName(nsElement.type()), - nsElement.type() == String); - const NamespaceString ns(nsElement.valueStringData()); - BSONObjBuilder cmdBuilder; - cmdBuilder << "createIndexes" << ns.coll(); - cmdBuilder << "indexes" << BSON_ARRAY(indexDesc); - BSONObj cmd = cmdBuilder.done(); - Command* createIndexesCmd = Command::findCommand("createIndexes"); - invariant(createIndexesCmd); - std::string errmsg; - BSONObjBuilder resultBuilder; - const bool success = - createIndexesCmd->run(txn, ns.db().toString(), cmd, 0, errmsg, resultBuilder); - Command::appendCommandStatus(resultBuilder, success, errmsg); - BSONObj cmdResult = resultBuilder.done(); - uassertStatusOK(getStatusFromCommandResult(cmdResult)); - result->getStats().n = - cmdResult["numIndexesAfter"].numberInt() - cmdResult["numIndexesBefore"].numberInt(); -} - -static void multiUpdate(OperationContext* txn, - const BatchItemRef& updateItem, - WriteOpResult* result) { - const NamespaceString nsString(updateItem.getRequest()->getNS()); - const bool isMulti = updateItem.getUpdate()->getMulti(); - UpdateRequest request(nsString); - request.setQuery(updateItem.getUpdate()->getQuery()); - request.setUpdates(updateItem.getUpdate()->getUpdateExpr()); - request.setMulti(isMulti); - request.setUpsert(updateItem.getUpdate()->getUpsert()); - UpdateLifecycleImpl updateLifecycle(request.getNamespaceString()); - request.setLifecycle(&updateLifecycle); - - // Updates from the write commands path can yield. - request.setYieldPolicy(PlanExecutor::YIELD_AUTO); - - auto client = txn->getClient(); - auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp(); - ScopeGuard lastOpSetterGuard = MakeObjGuard(repl::ReplClientInfo::forClient(client), - &repl::ReplClientInfo::setLastOpToSystemLastOpTime, - txn); - - // If this is the local database, don't set last op. - if (nsString.isLocal()) { - lastOpSetterGuard.Dismiss(); - } - - int attempt = 0; - bool createCollection = false; - for (int fakeLoop = 0; fakeLoop < 1; fakeLoop++) { - ParsedUpdate parsedUpdate(txn, &request); - Status status = parsedUpdate.parseRequest(); - if (!status.isOK()) { - result->setError(toWriteError(status)); - return; - } - - if (createCollection) { - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - const AutoGetOrCreateDb adb{txn, nsString.db(), MODE_X}; - - if (!checkIsMasterForDatabase(nsString, result)) { - return; - } - - Database* const db = adb.getDb(); - if (db->getCollection(nsString.ns())) { - // someone else beat us to it - } else { - WriteUnitOfWork wuow(txn); - uassertStatusOK(userCreateNS(txn, db, nsString.ns(), BSONObj())); - wuow.commit(); - } - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "update", nsString.ns()); - } - - /////////////////////////////////////////// - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX); - Lock::CollectionLock colLock( - txn->lockState(), nsString.ns(), parsedUpdate.isIsolated() ? MODE_X : MODE_IX); - /////////////////////////////////////////// - - if (!checkIsMasterForDatabase(nsString, result)) { - return; - } - - if (!checkShardVersion(txn, *updateItem.getRequest(), result)) { - return; - } - - Database* const db = dbHolder().get(txn, nsString.db()); - - if (db == NULL) { - if (createCollection) { - // we raced with some, accept defeat - result->getStats().nModified = 0; - result->getStats().n = 0; - return; - } - - // Database not yet created - if (!request.isUpsert()) { - // not an upsert, no database, nothing to do - result->getStats().nModified = 0; - result->getStats().n = 0; - return; - } - - // upsert, don't try to get a context as no MODE_X lock is held - fakeLoop = -1; - createCollection = true; - continue; - } - - CurOp::get(txn)->raiseDbProfileLevel(db->getProfilingLevel()); - Collection* collection = db->getCollection(nsString.ns()); - - if (collection == NULL) { - if (createCollection) { - // we raced with some, accept defeat - result->getStats().nModified = 0; - result->getStats().n = 0; - return; - } - - if (!request.isUpsert()) { - // not an upsert, no collection, nothing to do - result->getStats().nModified = 0; - result->getStats().n = 0; - return; - } - - // upsert, mark that we should create collection - fakeLoop = -1; - createCollection = true; - continue; - } - - OpDebug* debug = &CurOp::get(txn)->debug(); - - try { - invariant(collection); - std::unique_ptr<PlanExecutor> exec = - uassertStatusOK(getExecutorUpdate(txn, debug, collection, &parsedUpdate)); - - uassertStatusOK(exec->executePlan()); - - PlanSummaryStats summary; - Explain::getSummaryStats(*exec, &summary); - collection->infoCache()->notifyOfQuery(txn, summary.indexesUsed); - - const UpdateStats* updateStats = UpdateStage::getUpdateStats(exec.get()); - UpdateStage::recordUpdateStatsInOpDebug(updateStats, debug); - debug->setPlanSummaryMetrics(summary); - - UpdateResult res = UpdateStage::makeUpdateResult(updateStats); - - const long long numDocsModified = res.numDocsModified; - const long long numMatched = res.numMatched; - const BSONObj resUpsertedID = res.upserted; - - // We have an _id from an insert - const bool didInsert = !resUpsertedID.isEmpty(); - - result->getStats().nModified = numDocsModified; - result->getStats().n = didInsert ? 1 : numMatched; - result->getStats().upsertedID = resUpsertedID; - - if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) { - // If this operation has already generated a new lastOp, don't bother setting it - // here. No-op updates will not generate a new lastOp, so we still need the guard to - // fire in that case. - lastOpSetterGuard.Dismiss(); - } - } catch (const WriteConflictException&) { - debug->writeConflicts++; - if (isMulti) { - log() << "Had WriteConflict during multi update, aborting"; - throw; - } - - createCollection = false; - - // RESTART LOOP - fakeLoop = -1; - txn->recoveryUnit()->abandonSnapshot(); - - WriteConflictException::logAndBackoff(attempt++, "update", nsString.ns()); - } catch (const StaleConfigException& e) { - result->setError(new WriteErrorDetail()); - buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError()); - } catch (const DBException& ex) { - Status status = ex.toStatus(); - if (ErrorCodes::isInterruption(status.code())) { - throw; - } - result->setError(toWriteError(status)); - } - } -} - -/** - * Perform a remove operation, which might remove multiple documents. Dispatches to remove code - * currently to do most of this. - * - * Might fault or error, otherwise populates the result. - */ -static void multiRemove(OperationContext* txn, - const BatchItemRef& removeItem, - WriteOpResult* result) { - const NamespaceString& nss = removeItem.getRequest()->getNS(); - DeleteRequest request(nss); - request.setQuery(removeItem.getDelete()->getQuery()); - request.setMulti(removeItem.getDelete()->getLimit() != 1); - request.setGod(false); - - // Deletes running through the write commands path can yield. - request.setYieldPolicy(PlanExecutor::YIELD_AUTO); - - auto client = txn->getClient(); - auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp(); - ScopeGuard lastOpSetterGuard = MakeObjGuard(repl::ReplClientInfo::forClient(client), - &repl::ReplClientInfo::setLastOpToSystemLastOpTime, - txn); - - // If this is the local database, don't set last op. - if (nss.isLocal()) { - lastOpSetterGuard.Dismiss(); - } - - OpDebug* opDebug = &CurOp::get(txn)->debug(); - - int attempt = 1; - while (1) { - try { - ParsedDelete parsedDelete(txn, &request); - Status status = parsedDelete.parseRequest(); - if (!status.isOK()) { - result->setError(toWriteError(status)); - return; - } - - ScopedTransaction scopedXact(txn, MODE_IX); - AutoGetDb autoDb(txn, nss.db(), MODE_IX); - if (!autoDb.getDb()) { - break; - } - - CurOp::get(txn)->raiseDbProfileLevel(autoDb.getDb()->getProfilingLevel()); - Lock::CollectionLock collLock( - txn->lockState(), nss.ns(), parsedDelete.isIsolated() ? MODE_X : MODE_IX); - - // getExecutorDelete() also checks if writes are allowed. - if (!checkIsMasterForDatabase(nss, result)) { - return; - } - // Check version once we're locked - - if (!checkShardVersion(txn, *removeItem.getRequest(), result)) { - // Version error - return; - } - - auto collection = autoDb.getDb()->getCollection(nss); - - std::unique_ptr<PlanExecutor> exec = - uassertStatusOK(getExecutorDelete(txn, opDebug, collection, &parsedDelete)); - - // Execute the delete and retrieve the number deleted. - uassertStatusOK(exec->executePlan()); - result->getStats().n = DeleteStage::getNumDeleted(*exec); - - PlanSummaryStats summary; - Explain::getSummaryStats(*exec, &summary); - if (collection) { - collection->infoCache()->notifyOfQuery(txn, summary.indexesUsed); - } - - CurOp::get(txn)->debug().setPlanSummaryMetrics(summary); - - if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) { - // If this operation has already generated a new lastOp, don't bother setting it - // here. No-op updates will not generate a new lastOp, so we still need the guard to - // fire in that case. - lastOpSetterGuard.Dismiss(); - } - break; - } catch (const WriteConflictException&) { - CurOp::get(txn)->debug().writeConflicts++; - WriteConflictException::logAndBackoff(attempt++, "delete", nss.ns()); - } catch (const StaleConfigException& e) { - result->setError(new WriteErrorDetail()); - buildStaleError(e.getVersionReceived(), e.getVersionWanted(), result->getError()); - return; - } catch (const DBException& ex) { - Status status = ex.toStatus(); - if (ErrorCodes::isInterruption(status.code())) { - throw; - } - result->setError(toWriteError(status)); - return; - } - } -} - -} // namespace mongo diff --git a/src/mongo/db/commands/write_commands/batch_executor.h b/src/mongo/db/commands/write_commands/batch_executor.h deleted file mode 100644 index 945adfca021..00000000000 --- a/src/mongo/db/commands/write_commands/batch_executor.h +++ /dev/null @@ -1,186 +0,0 @@ -/** - * Copyright (C) 2013 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <string> - -#include "mongo/base/disallow_copying.h" -#include "mongo/db/ops/update_request.h" -#include "mongo/s/write_ops/batched_command_request.h" -#include "mongo/s/write_ops/batched_command_response.h" -#include "mongo/s/write_ops/batched_delete_document.h" -#include "mongo/s/write_ops/batched_update_document.h" -#include "mongo/util/mongoutils/str.h" - -namespace mongo { - -class BSONObjBuilder; -class CurOp; -class LastError; -class OpCounters; -class OperationContext; -class WriteBatchStats; -struct WriteOpStats; - -/** - * An instance of WriteBatchExecutor is an object capable of issuing a write batch. - */ -class WriteBatchExecutor { - MONGO_DISALLOW_COPYING(WriteBatchExecutor); - -public: - // State object used by private execInserts. TODO: Do not expose this type. - class ExecInsertsState; - - WriteBatchExecutor(OperationContext* txn, OpCounters* opCounters, LastError* le); - - /** - * Issues writes with requested write concern. Fills response with errors if problems - * occur. - */ - void executeBatch(const BatchedCommandRequest& request, BatchedCommandResponse* response); - - const WriteBatchStats& getStats() const; - - /** - * Does basic validation of the batch request. Returns a non-OK status if - * any problems with the batch are found. - */ - static Status validateBatch(const BatchedCommandRequest& request); - -private: - /** - * Executes the writes in the batch and returns upserted _ids and write errors. - * Dispatches to one of the three functions below for DBLock, CurOp, and stats management. - */ - void bulkExecute(const BatchedCommandRequest& request, - std::vector<BatchedUpsertDetail*>* upsertedIds, - std::vector<WriteErrorDetail*>* errors); - - /** - * Inserts a subset of an insert batch. - * Returns a true to discontinue the insert, or false if not. - */ - bool insertMany(WriteBatchExecutor::ExecInsertsState* state, - size_t startIndex, - size_t endIndex, - CurOp* currentOp, - std::vector<WriteErrorDetail*>* errors, - bool ordered); - - /** - * Executes the inserts of an insert batch and returns the write errors. - * - * Internally uses the DBLock of the request namespace. - * May execute multiple inserts inside the same DBLock, and/or take the DBLock multiple - * times. - */ - void execInserts(const BatchedCommandRequest& request, std::vector<WriteErrorDetail*>* errors); - - /** - * Executes an update item (which may update many documents or upsert), and returns the - * upserted _id on upsert or error on failure. - * - * Internally uses the DBLock of the update namespace. - * May take the DBLock multiple times. - */ - void execUpdate(const BatchItemRef& updateItem, BSONObj* upsertedId, WriteErrorDetail** error); - - /** - * Executes a delete item (which may remove many documents) and returns an error on failure. - * - * Internally uses the DBLock of the delete namespace. - * May take the DBLock multiple times. - */ - void execRemove(const BatchItemRef& removeItem, WriteErrorDetail** error); - - /** - * Helper for incrementing stats after each individual write op. - * - * No lock requirements (though usually done inside write lock to make stats update look - * atomic). - */ - void incWriteStats(const BatchedCommandRequest::BatchType opType, - const WriteOpStats& stats, - const WriteErrorDetail* error, - CurOp* currentOp); - - OperationContext* _txn; - - // OpCounters object to update - needed for stats reporting - // Not owned here. - OpCounters* _opCounters; - - // LastError object to use for preparing write results - needed for stats reporting - // Not owned here. - LastError* _le; - - // Stats - std::unique_ptr<WriteBatchStats> _stats; -}; - -/** - * Holds information about the result of a single write operation. - */ -struct WriteOpStats { - WriteOpStats() : n(0), nModified(0) {} - - void reset() { - n = 0; - nModified = 0; - upsertedID = BSONObj(); - } - - // Num docs logically affected by this operation. - int n; - - // Num docs actually modified by this operation, if applicable (update) - int nModified; - - // _id of newly upserted document, if applicable (update) - BSONObj upsertedID; -}; - -/** - * Full stats accumulated by a write batch execution. Note that these stats do not directly - * correspond to the stats accumulated in opCounters and LastError. - */ -class WriteBatchStats { -public: - WriteBatchStats() - : numInserted(0), numUpserted(0), numMatched(0), numModified(0), numDeleted(0) {} - - int numInserted; - int numUpserted; - int numMatched; - int numModified; - int numDeleted; -}; - -} // namespace mongo diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp index 78f5cde997c..dacfe9dab56 100644 --- a/src/mongo/db/commands/write_commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands/write_commands.cpp @@ -26,14 +26,12 @@ * it in the license file. */ -#include "mongo/db/commands/write_commands/write_commands.h" - #include "mongo/base/init.h" #include "mongo/bson/mutable/document.h" #include "mongo/bson/mutable/element.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/client.h" -#include "mongo/db/commands/write_commands/batch_executor.h" +#include "mongo/db/commands.h" #include "mongo/db/commands/write_commands/write_commands_common.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" @@ -43,12 +41,16 @@ #include "mongo/db/ops/parsed_delete.h" #include "mongo/db/ops/parsed_update.h" #include "mongo/db/ops/update_lifecycle_impl.h" +#include "mongo/db/ops/write_ops_exec.h" +#include "mongo/db/ops/write_ops_parsers.h" #include "mongo/db/query/explain.h" #include "mongo/db/query/get_executor.h" +#include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/server_parameters.h" #include "mongo/db/stats/counters.h" #include "mongo/db/write_concern.h" +#include "mongo/s/stale_exception.h" namespace mongo { @@ -57,20 +59,7 @@ using std::stringstream; namespace { -MONGO_INITIALIZER(RegisterWriteCommands)(InitializerContext* context) { - // Leaked intentionally: a Command registers itself when constructed. - new CmdInsert(); - new CmdUpdate(); - new CmdDelete(); - return Status::OK(); -} - -} // namespace - -WriteCmd::WriteCmd(StringData name, BatchedCommandRequest::BatchType writeType) - : Command(name), _writeType(writeType) {} - -void WriteCmd::redactTooLongLog(mutablebson::Document* cmdObj, StringData fieldName) { +void redactTooLongLog(mutablebson::Document* cmdObj, StringData fieldName) { namespace mmb = mutablebson; mmb::Element root = cmdObj->root(); mmb::Element field = root.findFirstChildNamed(fieldName); @@ -86,204 +75,326 @@ void WriteCmd::redactTooLongLog(mutablebson::Document* cmdObj, StringData fieldN } } -// Slaves can't perform writes. -bool WriteCmd::slaveOk() const { - return false; -} - - -bool WriteCmd::supportsWriteConcern(const BSONObj& cmd) const { - return true; -} - -Status WriteCmd::checkAuthForCommand(ClientBasic* client, - const std::string& dbname, - const BSONObj& cmdObj) { - Status status(auth::checkAuthForWriteCommand(AuthorizationSession::get(client), - _writeType, - NamespaceString(parseNs(dbname, cmdObj)), - cmdObj)); - - // TODO: Remove this when we standardize GLE reporting from commands +Status checkAuthForWriteCommand(ClientBasic* client, + BatchedCommandRequest::BatchType batchType, + NamespaceString ns, + const BSONObj& cmdObj) { + Status status = + auth::checkAuthForWriteCommand(AuthorizationSession::get(client), batchType, ns, cmdObj); if (!status.isOK()) { LastError::get(client).setLastError(status.code(), status.reason()); } - return status; } -// Write commands are counted towards their corresponding opcounters, not command opcounters. -bool WriteCmd::shouldAffectCommandCounter() const { - return false; +bool shouldSkipOutput(OperationContext* txn) { + const WriteConcernOptions& writeConcern = txn->getWriteConcern(); + return writeConcern.wMode.empty() && writeConcern.wNumNodes == 0 && + (writeConcern.syncMode == WriteConcernOptions::SyncMode::NONE || + writeConcern.syncMode == WriteConcernOptions::SyncMode::UNSET); } -bool WriteCmd::run(OperationContext* txn, - const string& dbName, - BSONObj& cmdObj, - int options, - string& errMsg, - BSONObjBuilder& result) { - // Can't be run on secondaries. - dassert(txn->writesAreReplicated()); - BatchedCommandRequest request(_writeType); - BatchedCommandResponse response; - - if (!request.parseBSON(dbName, cmdObj, &errMsg) || !request.isValid(&errMsg)) { - return appendCommandStatus(result, Status(ErrorCodes::FailedToParse, errMsg)); +enum class ReplyStyle { kUpdate, kNotUpdate }; // update has extra fields. +void serializeReply(OperationContext* txn, + ReplyStyle replyStyle, + bool continueOnError, + size_t opsInBatch, + const WriteResult& result, + BSONObjBuilder* out) { + if (shouldSkipOutput(txn)) + return; + + long long n = 0; + long long nModified = 0; + std::vector<BSONObj> upsertInfo; + std::vector<BSONObj> errors; + BSONSizeTracker upsertInfoSizeTracker; + BSONSizeTracker errorsSizeTracker; + + for (size_t i = 0; i < result.results.size(); i++) { + if (result.results[i].isOK()) { + const auto& opResult = result.results[i].getValue(); + n += opResult.n; // Always there. + if (replyStyle == ReplyStyle::kUpdate) { + nModified += opResult.nModified; + if (!opResult.upsertedId.isEmpty()) { + BSONObjBuilder upsertedId(upsertInfoSizeTracker); + upsertedId.append("index", int(i)); + upsertedId.appendAs(opResult.upsertedId.firstElement(), "_id"); + upsertInfo.push_back(upsertedId.obj()); + } + } + continue; + } + + const auto& status = result.results[i].getStatus(); + BSONObjBuilder error(errorsSizeTracker); + error.append("index", int(i)); + error.append("code", int(status.code())); + error.append("errmsg", status.reason()); + errors.push_back(error.obj()); } - WriteBatchExecutor writeBatchExecutor( - txn, &globalOpCounters, &LastError::get(txn->getClient())); + if (result.staleConfigException) { + // For ordered:false commands we need to duplicate the StaleConfig result for all ops + // after we stopped. result.results doesn't include the staleConfigException. + // See the comment on WriteResult::staleConfigException for more info. + int endIndex = continueOnError ? opsInBatch : result.results.size() + 1; + for (int i = result.results.size(); i < endIndex; i++) { + BSONObjBuilder error(errorsSizeTracker); + error.append("index", i); + error.append("code", int(ErrorCodes::StaleShardVersion)); // Different from exception! + error.append("errmsg", result.staleConfigException->getInfo().msg); + { + BSONObjBuilder errInfo(error.subobjStart("errInfo")); + result.staleConfigException->getVersionWanted().addToBSON(errInfo, "vWanted"); + } + errors.push_back(error.obj()); + } + } + + out->appendNumber("n", n); + + if (replyStyle == ReplyStyle::kUpdate) { + out->appendNumber("nModified", nModified); + if (!upsertInfo.empty()) { + out->append("upserted", upsertInfo); + } + } - writeBatchExecutor.executeBatch(request, &response); + if (!errors.empty()) { + out->append("writeErrors", errors); + } - result.appendElements(response.toBSON()); - return response.getOk(); + // writeConcernError field is handled by command processor. + + { + // Undocumented repl fields that mongos depends on. + auto* replCoord = repl::ReplicationCoordinator::get(txn->getServiceContext()); + const auto replMode = replCoord->getReplicationMode(); + if (replMode != repl::ReplicationCoordinator::modeNone) { + const auto lastOp = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); + if (lastOp.getTerm() == repl::OpTime::kUninitializedTerm) { + out->append("opTime", lastOp.getTimestamp()); + } else { + lastOp.append(out, "opTime"); + } + + if (replMode == repl::ReplicationCoordinator::modeReplSet) { + out->append("electionId", replCoord->getElectionId()); + } + } + } } -Status WriteCmd::explain(OperationContext* txn, - const std::string& dbname, - const BSONObj& cmdObj, - ExplainCommon::Verbosity verbosity, - const rpc::ServerSelectionMetadata&, - BSONObjBuilder* out) const { - // For now we only explain update and delete write commands. - if (BatchedCommandRequest::BatchType_Update != _writeType && - BatchedCommandRequest::BatchType_Delete != _writeType) { - return Status(ErrorCodes::IllegalOperation, - "Only update and delete write ops can be explained"); +class WriteCommand : public Command { +public: + explicit WriteCommand(StringData name) : Command(name) {} + + bool slaveOk() const final { + return false; } - // Parse the batch request. - BatchedCommandRequest request(_writeType); - std::string errMsg; - if (!request.parseBSON(dbname, cmdObj, &errMsg) || !request.isValid(&errMsg)) { - return Status(ErrorCodes::FailedToParse, errMsg); + bool shouldAffectCommandCounter() const final { + return false; } - // Do the validation of the batch that is shared with non-explained write batches. - Status isValid = WriteBatchExecutor::validateBatch(request); - if (!isValid.isOK()) { - return isValid; + bool supportsWriteConcern(const BSONObj& cmd) const { + return true; } - // Explain must do one additional piece of validation: For now we only explain - // singleton batches. - if (request.sizeWriteOps() != 1u) { - return Status(ErrorCodes::InvalidLength, "explained write batches must be of size 1"); + bool run(OperationContext* txn, + const std::string& dbname, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& result) final { + try { + runImpl(txn, dbname, cmdObj, result); + return true; + } catch (const DBException& ex) { + LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg); + throw; + } } - ScopedTransaction scopedXact(txn, MODE_IX); + virtual void runImpl(OperationContext* txn, + const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder& result) = 0; +}; - // Get a reference to the singleton batch item (it's the 0th item in the batch). - BatchItemRef batchItem(&request, 0); +} // namespace - OpDebug* opDebug = &CurOp::get(txn)->debug(); +class CmdInsert final : public WriteCommand { +public: + CmdInsert() : WriteCommand("insert") {} - if (BatchedCommandRequest::BatchType_Update == _writeType) { - // Create the update request. - UpdateRequest updateRequest(request.getNS()); - updateRequest.setQuery(batchItem.getUpdate()->getQuery()); - updateRequest.setUpdates(batchItem.getUpdate()->getUpdateExpr()); - updateRequest.setMulti(batchItem.getUpdate()->getMulti()); - updateRequest.setUpsert(batchItem.getUpdate()->getUpsert()); - UpdateLifecycleImpl updateLifecycle(updateRequest.getNamespaceString()); - updateRequest.setLifecycle(&updateLifecycle); - updateRequest.setExplain(); + void redactForLogging(mutablebson::Document* cmdObj) final { + redactTooLongLog(cmdObj, "documents"); + } - // Explained updates can yield. - updateRequest.setYieldPolicy(PlanExecutor::YIELD_AUTO); + void help(stringstream& help) const final { + help << "insert documents"; + } - ParsedUpdate parsedUpdate(txn, &updateRequest); - Status parseStatus = parsedUpdate.parseRequest(); - if (!parseStatus.isOK()) { - return parseStatus; - } + Status checkAuthForCommand(ClientBasic* client, + const std::string& dbname, + const BSONObj& cmdObj) final { + return checkAuthForWriteCommand(client, + BatchedCommandRequest::BatchType_Insert, + NamespaceString(parseNs(dbname, cmdObj)), + cmdObj); + } + + void runImpl(OperationContext* txn, + const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder& result) final { + const auto batch = parseInsertCommand(dbname, cmdObj); + const auto reply = performInserts(txn, batch); + serializeReply(txn, + ReplyStyle::kNotUpdate, + batch.continueOnError, + batch.documents.size(), + reply, + &result); + } +} cmdInsert; - // Explains of write commands are read-only, but we take write locks so - // that timing info is more accurate. - AutoGetDb autoDb(txn, request.getNS().db(), MODE_IX); - Lock::CollectionLock colLock(txn->lockState(), request.getNS().ns(), MODE_IX); +class CmdUpdate final : public WriteCommand { +public: + CmdUpdate() : WriteCommand("update") {} - // Get a pointer to the (possibly NULL) collection. - Collection* collection = NULL; - if (autoDb.getDb()) { - collection = autoDb.getDb()->getCollection(request.getNS()); - } + void redactForLogging(mutablebson::Document* cmdObj) final { + redactTooLongLog(cmdObj, "updates"); + } - std::unique_ptr<PlanExecutor> exec = - uassertStatusOK(getExecutorUpdate(txn, opDebug, collection, &parsedUpdate)); + void help(stringstream& help) const final { + help << "update documents"; + } - // Explain the plan tree. - Explain::explainStages(exec.get(), verbosity, out); - return Status::OK(); - } else { - invariant(BatchedCommandRequest::BatchType_Delete == _writeType); - - // Create the delete request. - DeleteRequest deleteRequest(request.getNS()); - deleteRequest.setQuery(batchItem.getDelete()->getQuery()); - deleteRequest.setMulti(batchItem.getDelete()->getLimit() != 1); - deleteRequest.setGod(false); - deleteRequest.setExplain(); + Status checkAuthForCommand(ClientBasic* client, + const std::string& dbname, + const BSONObj& cmdObj) final { + return checkAuthForWriteCommand(client, + BatchedCommandRequest::BatchType_Update, + NamespaceString(parseNs(dbname, cmdObj)), + cmdObj); + } - // Explained deletes can yield. - deleteRequest.setYieldPolicy(PlanExecutor::YIELD_AUTO); + void runImpl(OperationContext* txn, + const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder& result) final { + const auto batch = parseUpdateCommand(dbname, cmdObj); + const auto reply = performUpdates(txn, batch); + serializeReply( + txn, ReplyStyle::kUpdate, batch.continueOnError, batch.updates.size(), reply, &result); + } - ParsedDelete parsedDelete(txn, &deleteRequest); - Status parseStatus = parsedDelete.parseRequest(); - if (!parseStatus.isOK()) { - return parseStatus; - } + Status explain(OperationContext* txn, + const std::string& dbname, + const BSONObj& cmdObj, + ExplainCommon::Verbosity verbosity, + const rpc::ServerSelectionMetadata&, + BSONObjBuilder* out) const final { + const auto batch = parseUpdateCommand(dbname, cmdObj); + uassert(ErrorCodes::InvalidLength, + "explained write batches must be of size 1", + batch.updates.size() == 1); + + UpdateLifecycleImpl updateLifecycle(batch.ns); + UpdateRequest updateRequest(batch.ns); + updateRequest.setLifecycle(&updateLifecycle); + updateRequest.setQuery(batch.updates[0].query); + updateRequest.setUpdates(batch.updates[0].update); + updateRequest.setMulti(batch.updates[0].multi); + updateRequest.setUpsert(batch.updates[0].upsert); + updateRequest.setYieldPolicy(PlanExecutor::YIELD_AUTO); + updateRequest.setExplain(); + + ParsedUpdate parsedUpdate(txn, &updateRequest); + uassertStatusOK(parsedUpdate.parseRequest()); // Explains of write commands are read-only, but we take write locks so that timing // info is more accurate. - AutoGetDb autoDb(txn, request.getNS().db(), MODE_IX); - Lock::CollectionLock colLock(txn->lockState(), request.getNS().ns(), MODE_IX); - - // Get a pointer to the (possibly NULL) collection. - Collection* collection = NULL; - if (autoDb.getDb()) { - collection = autoDb.getDb()->getCollection(request.getNS()); - } - - std::unique_ptr<PlanExecutor> exec = - uassertStatusOK(getExecutorDelete(txn, opDebug, collection, &parsedDelete)); + ScopedTransaction scopedXact(txn, MODE_IX); + AutoGetCollection collection(txn, batch.ns, MODE_IX); - // Explain the plan tree. + auto exec = uassertStatusOK(getExecutorUpdate( + txn, &CurOp::get(txn)->debug(), collection.getCollection(), &parsedUpdate)); Explain::explainStages(exec.get(), verbosity, out); return Status::OK(); } -} +} cmdUpdate; -CmdInsert::CmdInsert() : WriteCmd("insert", BatchedCommandRequest::BatchType_Insert) {} +class CmdDelete final : public WriteCommand { +public: + CmdDelete() : WriteCommand("delete") {} -void CmdInsert::redactForLogging(mutablebson::Document* cmdObj) { - redactTooLongLog(cmdObj, StringData("documents", StringData::LiteralTag())); -} + void redactForLogging(mutablebson::Document* cmdObj) final { + redactTooLongLog(cmdObj, "deletes"); + } -void CmdInsert::help(stringstream& help) const { - help << "insert documents"; -} + void help(stringstream& help) const final { + help << "delete documents"; + } -CmdUpdate::CmdUpdate() : WriteCmd("update", BatchedCommandRequest::BatchType_Update) {} + Status checkAuthForCommand(ClientBasic* client, + const std::string& dbname, + const BSONObj& cmdObj) final { + return checkAuthForWriteCommand(client, + BatchedCommandRequest::BatchType_Delete, + NamespaceString(parseNs(dbname, cmdObj)), + cmdObj); + } -void CmdUpdate::redactForLogging(mutablebson::Document* cmdObj) { - redactTooLongLog(cmdObj, StringData("updates", StringData::LiteralTag())); -} + void runImpl(OperationContext* txn, + const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder& result) final { + const auto batch = parseDeleteCommand(dbname, cmdObj); + const auto reply = performDeletes(txn, batch); + serializeReply(txn, + ReplyStyle::kNotUpdate, + batch.continueOnError, + batch.deletes.size(), + reply, + &result); + } -void CmdUpdate::help(stringstream& help) const { - help << "update documents"; -} + Status explain(OperationContext* txn, + const std::string& dbname, + const BSONObj& cmdObj, + ExplainCommon::Verbosity verbosity, + const rpc::ServerSelectionMetadata&, + BSONObjBuilder* out) const final { + const auto batch = parseDeleteCommand(dbname, cmdObj); + uassert(ErrorCodes::InvalidLength, + "explained write batches must be of size 1", + batch.deletes.size() == 1); + + DeleteRequest deleteRequest(batch.ns); + deleteRequest.setQuery(batch.deletes[0].query); + deleteRequest.setMulti(batch.deletes[0].multi); + deleteRequest.setYieldPolicy(PlanExecutor::YIELD_AUTO); + deleteRequest.setExplain(); -CmdDelete::CmdDelete() : WriteCmd("delete", BatchedCommandRequest::BatchType_Delete) {} + ParsedDelete parsedDelete(txn, &deleteRequest); + uassertStatusOK(parsedDelete.parseRequest()); -void CmdDelete::redactForLogging(mutablebson::Document* cmdObj) { - redactTooLongLog(cmdObj, StringData("deletes", StringData::LiteralTag())); -} + // Explains of write commands are read-only, but we take write locks so that timing + // info is more accurate. + ScopedTransaction scopedXact(txn, MODE_IX); + AutoGetCollection collection(txn, batch.ns, MODE_IX); -void CmdDelete::help(stringstream& help) const { - help << "delete documents"; -} + // Explain the plan tree. + auto exec = uassertStatusOK(getExecutorDelete( + txn, &CurOp::get(txn)->debug(), collection.getCollection(), &parsedDelete)); + Explain::explainStages(exec.get(), verbosity, out); + return Status::OK(); + } +} cmdDelete; } // namespace mongo diff --git a/src/mongo/db/commands/write_commands/write_commands.h b/src/mongo/db/commands/write_commands/write_commands.h deleted file mode 100644 index e01ade2b2ff..00000000000 --- a/src/mongo/db/commands/write_commands/write_commands.h +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Copyright (C) 2013 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <string> - -#include "mongo/db/commands.h" -#include "mongo/db/client_basic.h" -#include "mongo/s/write_ops/batched_command_request.h" - -namespace mongo { - -/** - * Base class for write commands. Write commands support batch writes and write concern, - * and return per-item error information. All write commands use the (non-virtual) entry - * point WriteCmd::run(). - * - * Command parsing is performed by the WriteBatch class (command syntax documented there), - * and command execution is performed by the WriteBatchExecutor class. - */ -class WriteCmd : public Command { - MONGO_DISALLOW_COPYING(WriteCmd); - -public: - virtual ~WriteCmd() {} - -protected: - /** - * Instantiates a command that can be invoked by "name", which will be capable of issuing - * write batches of type "writeType", and will require privilege "action" to run. - */ - WriteCmd(StringData name, BatchedCommandRequest::BatchType writeType); - - // Full log of write command can be quite large. - static void redactTooLongLog(mutablebson::Document* cmdObj, StringData fieldName); - -private: - virtual bool slaveOk() const; - - virtual bool supportsWriteConcern(const BSONObj& cmd) const override; - - virtual Status checkAuthForCommand(ClientBasic* client, - const std::string& dbname, - const BSONObj& cmdObj); - - virtual bool shouldAffectCommandCounter() const; - - // Write command entry point. - virtual bool run(OperationContext* txn, - const std::string& dbname, - BSONObj& cmdObj, - int options, - std::string& errmsg, - BSONObjBuilder& result); - - // Write commands can be explained. - virtual Status explain(OperationContext* txn, - const std::string& dbname, - const BSONObj& cmdObj, - ExplainCommon::Verbosity verbosity, - const rpc::ServerSelectionMetadata&, - BSONObjBuilder* out) const; - - // Type of batch (e.g. insert). - BatchedCommandRequest::BatchType _writeType; -}; - -class CmdInsert : public WriteCmd { - MONGO_DISALLOW_COPYING(CmdInsert); - -public: - CmdInsert(); - void redactForLogging(mutablebson::Document* cmdObj); - -private: - virtual void help(std::stringstream& help) const; -}; - -class CmdUpdate : public WriteCmd { - MONGO_DISALLOW_COPYING(CmdUpdate); - -public: - CmdUpdate(); - void redactForLogging(mutablebson::Document* cmdObj); - -private: - virtual void help(std::stringstream& help) const; -}; - -class CmdDelete : public WriteCmd { - MONGO_DISALLOW_COPYING(CmdDelete); - -public: - CmdDelete(); - void redactForLogging(mutablebson::Document* cmdObj); - -private: - virtual void help(std::stringstream& help) const; -}; - -} // namespace mongo diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp index c71fc2c814d..e9de5d5b127 100644 --- a/src/mongo/db/instance.cpp +++ b/src/mongo/db/instance.cpp @@ -40,10 +40,8 @@ #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/auth/authz_manager_external_state_d.h" -#include "mongo/db/background.h" -#include "mongo/db/catalog/index_create.h" #include "mongo/db/client.h" -#include "mongo/db/clientcursor.h" +#include "mongo/db/catalog/cursor_manager.h" #include "mongo/db/commands.h" #include "mongo/db/commands/fsync.h" #include "mongo/db/concurrency/d_concurrency.h" @@ -53,10 +51,7 @@ #include "mongo/db/db.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" -#include "mongo/db/dbhelpers.h" #include "mongo/db/dbmessage.h" -#include "mongo/db/exec/delete.h" -#include "mongo/db/exec/update.h" #include "mongo/db/ftdc/ftdc_mongod.h" #include "mongo/db/global_timestamp.h" #include "mongo/db/instance.h" @@ -67,22 +62,11 @@ #include "mongo/db/mongod_options.h" #include "mongo/db/namespace_string.h" #include "mongo/db/op_observer.h" -#include "mongo/db/ops/delete_request.h" -#include "mongo/db/ops/insert.h" -#include "mongo/db/ops/parsed_delete.h" -#include "mongo/db/ops/parsed_update.h" -#include "mongo/db/ops/update_driver.h" -#include "mongo/db/ops/update_lifecycle_impl.h" -#include "mongo/db/ops/update_request.h" +#include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/ops/write_ops_parsers.h" #include "mongo/db/query/find.h" #include "mongo/db/query/get_executor.h" -#include "mongo/db/query/plan_summary_stats.h" -#include "mongo/db/repl/oplog.h" -#include "mongo/db/repl/repl_client_info.h" -#include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/run_commands.h" -#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context.h" @@ -93,7 +77,6 @@ #include "mongo/platform/process_id.h" #include "mongo/rpc/command_reply_builder.h" #include "mongo/rpc/command_request.h" -#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/legacy_reply.h" #include "mongo/rpc/legacy_reply_builder.h" #include "mongo/rpc/legacy_request.h" @@ -115,7 +98,6 @@ #include "mongo/util/time_support.h" namespace mongo { - using logger::LogComponent; using std::endl; using std::hex; @@ -126,6 +108,12 @@ using std::stringstream; using std::unique_ptr; using std::vector; +string dbExecCommand; + +MONGO_FP_DECLARE(rsStopGetMore); + +namespace { + // for diaglog inline void opread(Message& m) { if (_diaglog.getLevel() & 2) { @@ -139,25 +127,6 @@ inline void opwrite(Message& m) { } } -void receivedKillCursors(OperationContext* txn, Message& m); - -void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op); - -void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op); - -void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op); - -bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, CurOp& curop); - -int nloggedsome = 0; -#define LOGWITHRATELIMIT if (++nloggedsome < 1000 || nloggedsome % 100 == 0) - -string dbExecCommand; - -MONGO_FP_DECLARE(rsStopGetMore); - -namespace { - unique_ptr<AuthzManagerExternalState> createAuthzManagerExternalStateMongod() { return stdx::make_unique<AuthzManagerExternalStateMongod>(); } @@ -231,13 +200,11 @@ void beginCommandOp(OperationContext* txn, const NamespaceString& nss, const BSO curop->setNS_inlock(nss.ns()); } -} // namespace - -static void receivedCommand(OperationContext* txn, - const NamespaceString& nss, - Client& client, - DbResponse& dbResponse, - Message& message) { +void receivedCommand(OperationContext* txn, + const NamespaceString& nss, + Client& client, + DbResponse& dbResponse, + Message& message) { invariant(nss.isCommand()); const int32_t responseToMsgId = message.header().getId(); @@ -282,8 +249,6 @@ static void receivedCommand(OperationContext* txn, dbResponse.responseToMsgId = responseToMsgId; } -namespace { - void receivedRpc(OperationContext* txn, Client& client, DbResponse& dbResponse, Message& message) { invariant(message.operation() == dbCommand); @@ -381,14 +346,13 @@ void receivedPseudoCommand(OperationContext* txn, receivedCommand(txn, interposedNss, client, dbResponse, interposed); } -} // namespace - -static void receivedQuery(OperationContext* txn, - const NamespaceString& nss, - Client& c, - DbResponse& dbResponse, - Message& m) { +void receivedQuery(OperationContext* txn, + const NamespaceString& nss, + Client& c, + DbResponse& dbResponse, + Message& m) { invariant(!nss.isCommand()); + globalOpCounters.gotQuery(); int32_t responseToMsgId = m.header().getId(); @@ -420,6 +384,152 @@ static void receivedQuery(OperationContext* txn, dbResponse.responseToMsgId = responseToMsgId; } +void receivedKillCursors(OperationContext* txn, Message& m) { + LastError::get(txn->getClient()).disable(); + DbMessage dbmessage(m); + int n = dbmessage.pullInt(); + + uassert(13659, "sent 0 cursors to kill", n != 0); + massert(13658, + str::stream() << "bad kill cursors size: " << m.dataSize(), + m.dataSize() == 8 + (8 * n)); + uassert(13004, str::stream() << "sent negative cursors to kill: " << n, n >= 1); + + if (n > 2000) { + (n < 30000 ? warning() : error()) << "receivedKillCursors, n=" << n << endl; + verify(n < 30000); + } + + const char* cursorArray = dbmessage.getArray(n); + + int found = CursorManager::eraseCursorGlobalIfAuthorized(txn, n, cursorArray); + + if (shouldLog(logger::LogSeverity::Debug(1)) || found != n) { + LOG(found == n ? 1 : 0) << "killcursors: found " << found << " of " << n << endl; + } +} + +void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Message& m) { + auto insertOp = parseLegacyInsert(m); + invariant(insertOp.ns == nsString); + for (const auto& obj : insertOp.documents) { + Status status = + AuthorizationSession::get(txn->getClient())->checkAuthForInsert(nsString, obj); + audit::logInsertAuthzCheck(txn->getClient(), nsString, obj, status.code()); + uassertStatusOK(status); + } + performInserts(txn, insertOp); +} + +void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Message& m) { + auto updateOp = parseLegacyUpdate(m); + auto& singleUpdate = updateOp.updates[0]; + invariant(updateOp.ns == nsString); + + Status status = AuthorizationSession::get(txn->getClient()) + ->checkAuthForUpdate( + nsString, singleUpdate.query, singleUpdate.update, singleUpdate.upsert); + audit::logUpdateAuthzCheck(txn->getClient(), + nsString, + singleUpdate.query, + singleUpdate.update, + singleUpdate.upsert, + singleUpdate.multi, + status.code()); + uassertStatusOK(status); + + performUpdates(txn, updateOp); +} + +void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Message& m) { + auto deleteOp = parseLegacyDelete(m); + auto& singleDelete = deleteOp.deletes[0]; + invariant(deleteOp.ns == nsString); + + Status status = AuthorizationSession::get(txn->getClient()) + ->checkAuthForDelete(nsString, singleDelete.query); + audit::logDeleteAuthzCheck(txn->getClient(), nsString, singleDelete.query, status.code()); + uassertStatusOK(status); + + performDeletes(txn, deleteOp); +} + +bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, CurOp& curop) { + globalOpCounters.gotGetMore(); + DbMessage d(m); + + const char* ns = d.getns(); + int ntoreturn = d.pullInt(); + uassert( + 34419, str::stream() << "Invalid ntoreturn for OP_GET_MORE: " << ntoreturn, ntoreturn >= 0); + long long cursorid = d.pullInt64(); + + curop.debug().ntoreturn = ntoreturn; + curop.debug().cursorid = cursorid; + + { + stdx::lock_guard<Client>(*txn->getClient()); + CurOp::get(txn)->setNS_inlock(ns); + } + + bool exhaust = false; + QueryResult::View msgdata = 0; + bool isCursorAuthorized = false; + + try { + const NamespaceString nsString(ns); + uassert(ErrorCodes::InvalidNamespace, + str::stream() << "Invalid ns [" << ns << "]", + nsString.isValid()); + + Status status = AuthorizationSession::get(txn->getClient()) + ->checkAuthForGetMore(nsString, cursorid, false); + audit::logGetMoreAuthzCheck(txn->getClient(), nsString, cursorid, status.code()); + uassertStatusOK(status); + + while (MONGO_FAIL_POINT(rsStopGetMore)) { + sleepmillis(0); + } + + msgdata = getMore(txn, ns, ntoreturn, cursorid, &exhaust, &isCursorAuthorized); + } catch (AssertionException& e) { + if (isCursorAuthorized) { + // If a cursor with id 'cursorid' was authorized, it may have been advanced + // before an exception terminated processGetMore. Erase the ClientCursor + // because it may now be out of sync with the client's iteration state. + // SERVER-7952 + // TODO Temporary code, see SERVER-4563 for a cleanup overview. + CursorManager::eraseCursorGlobal(txn, cursorid); + } + + BSONObjBuilder err; + e.getInfo().append(err); + BSONObj errObj = err.done(); + + curop.debug().exceptionInfo = e.getInfo(); + + replyToQuery(ResultFlag_ErrSet, m, dbresponse, errObj); + curop.debug().responseLength = dbresponse.response.header().dataLen(); + curop.debug().nreturned = 1; + return false; + } + + dbresponse.response.setData(msgdata.view2ptr(), true); + curop.debug().responseLength = dbresponse.response.header().dataLen(); + curop.debug().nreturned = msgdata.getNReturned(); + + dbresponse.responseToMsgId = m.header().getId(); + + if (exhaust) { + curop.debug().exhaust = true; + dbresponse.exhaustNS = ns; + } + + return true; +} + +} // namespace + // Mongod on win32 defines a value for this function. In all other executables it is NULL. void (*reportEventToSystem)(const char* msg) = 0; @@ -442,7 +552,9 @@ void assembleResponse(OperationContext* txn, DbMessage dbmsg(m); Client& c = *txn->getClient(); - if (!c.isInDirectClient()) { + if (c.isInDirectClient()) { + invariant(!txn->lockState()->inAWriteUnitOfWork()); + } else { LastError::get(c).startRequest(); AuthorizationSession::get(c)->startRequest(txn); @@ -486,33 +598,6 @@ void assembleResponse(OperationContext* txn, opwrite(m); } - // Increment op counters. - switch (op) { - case dbQuery: - if (!isCommand) { - globalOpCounters.gotQuery(); - } else { - // Command counting is deferred, since it is not known yet whether the command - // needs counting. - } - break; - case dbGetMore: - globalOpCounters.gotGetMore(); - break; - case dbInsert: - // Insert counting is deferred, since it is not known yet whether the insert contains - // multiple documents (each of which needs to be counted). - break; - case dbUpdate: - globalOpCounters.gotUpdate(); - break; - case dbDelete: - globalOpCounters.gotDelete(); - break; - default: - break; - } - CurOp& currentOp = *CurOp::get(txn); { stdx::lock_guard<Client> lk(*txn->getClient()); @@ -553,10 +638,8 @@ void assembleResponse(OperationContext* txn, dbresponse.responseToMsgId = m.header().getId(); } else { + // The remaining operations do not return any response. They are fire-and-forget. try { - // The following operations all require authorization. - // dbInsert, dbUpdate and dbDelete can be easily pre-authorized, - // here, but dbKillCursors cannot. if (op == dbKillCursors) { currentOp.ensureStarted(); logThreshold = 10; @@ -579,11 +662,11 @@ void assembleResponse(OperationContext* txn, if (!nsString.isValid()) { uassert(16257, str::stream() << "Invalid ns [" << ns << "]", false); } else if (op == dbInsert) { - receivedInsert(txn, nsString, m, currentOp); + receivedInsert(txn, nsString, m); } else if (op == dbUpdate) { - receivedUpdate(txn, nsString, m, currentOp); + receivedUpdate(txn, nsString, m); } else if (op == dbDelete) { - receivedDelete(txn, nsString, m, currentOp); + receivedDelete(txn, nsString, m); } else { invariant(false); } @@ -630,592 +713,6 @@ void assembleResponse(OperationContext* txn, recordCurOpMetrics(txn); } -void receivedKillCursors(OperationContext* txn, Message& m) { - LastError::get(txn->getClient()).disable(); - DbMessage dbmessage(m); - int n = dbmessage.pullInt(); - - uassert(13659, "sent 0 cursors to kill", n != 0); - massert(13658, - str::stream() << "bad kill cursors size: " << m.dataSize(), - m.dataSize() == 8 + (8 * n)); - uassert(13004, str::stream() << "sent negative cursors to kill: " << n, n >= 1); - - if (n > 2000) { - (n < 30000 ? warning() : error()) << "receivedKillCursors, n=" << n << endl; - verify(n < 30000); - } - - const char* cursorArray = dbmessage.getArray(n); - - int found = CursorManager::eraseCursorGlobalIfAuthorized(txn, n, cursorArray); - - if (shouldLog(logger::LogSeverity::Debug(1)) || found != n) { - LOG(found == n ? 1 : 0) << "killcursors: found " << found << " of " << n << endl; - } -} - -void receivedUpdate(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) { - uassertStatusOK(userAllowedWriteNS(nsString)); - auto client = txn->getClient(); - auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp(); - ScopeGuard lastOpSetterGuard = MakeObjGuard(repl::ReplClientInfo::forClient(client), - &repl::ReplClientInfo::setLastOpToSystemLastOpTime, - txn); - - auto updateOp = parseLegacyUpdate(m); - auto& singleUpdate = updateOp.updates[0]; - - uassert(10055, "update object too large", singleUpdate.update.objsize() <= BSONObjMaxUserSize); - - op.debug().query = singleUpdate.query; - { - stdx::lock_guard<Client> lk(*client); - op.setNS_inlock(nsString.ns()); - op.setQuery_inlock(singleUpdate.query); - } - - Status status = AuthorizationSession::get(client)->checkAuthForUpdate( - nsString, singleUpdate.query, singleUpdate.update, singleUpdate.upsert); - audit::logUpdateAuthzCheck(client, - nsString, - singleUpdate.query, - singleUpdate.update, - singleUpdate.upsert, - singleUpdate.multi, - status.code()); - uassertStatusOK(status); - - UpdateRequest request(nsString); - request.setUpsert(singleUpdate.upsert); - request.setMulti(singleUpdate.multi); - request.setQuery(singleUpdate.query); - request.setUpdates(singleUpdate.update); - UpdateLifecycleImpl updateLifecycle(nsString); - request.setLifecycle(&updateLifecycle); - - request.setYieldPolicy(PlanExecutor::YIELD_AUTO); - - int attempt = 1; - while (1) { - try { - ParsedUpdate parsedUpdate(txn, &request); - uassertStatusOK(parsedUpdate.parseRequest()); - - // Tentatively take an intent lock, fix up if we need to create the collection - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX); - if (dbHolder().get(txn, nsString.db()) == NULL) { - // If DB doesn't exist, don't implicitly create it in OldClientContext - break; - } - Lock::CollectionLock collLock( - txn->lockState(), nsString.ns(), parsedUpdate.isIsolated() ? MODE_X : MODE_IX); - OldClientContext ctx(txn, nsString.ns()); - - auto collection = ctx.db()->getCollection(nsString); - - // The common case: no implicit collection creation - if (!singleUpdate.upsert || collection != NULL) { - unique_ptr<PlanExecutor> exec = - uassertStatusOK(getExecutorUpdate(txn, &op.debug(), collection, &parsedUpdate)); - - // Run the plan and get stats out. - uassertStatusOK(exec->executePlan()); - - PlanSummaryStats summary; - Explain::getSummaryStats(*exec, &summary); - if (collection) { - collection->infoCache()->notifyOfQuery(txn, summary.indexesUsed); - } - - const UpdateStats* updateStats = UpdateStage::getUpdateStats(exec.get()); - UpdateStage::recordUpdateStatsInOpDebug(updateStats, &op.debug()); - op.debug().setPlanSummaryMetrics(summary); - - UpdateResult res = UpdateStage::makeUpdateResult(updateStats); - - // for getlasterror - size_t nMatchedOrInserted = res.upserted.isEmpty() ? res.numMatched : 1U; - LastError::get(client).recordUpdate(res.existing, nMatchedOrInserted, res.upserted); - - if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) { - // If this operation has already generated a new lastOp, don't bother setting it - // here. No-op updates will not generate a new lastOp, so we still need the - // guard to fire in that case. - lastOpSetterGuard.Dismiss(); - } - return; - } - break; - } catch (const WriteConflictException& dle) { - op.debug().writeConflicts++; - if (singleUpdate.multi) { - log(LogComponent::kWrite) << "Had WriteConflict during multi update, aborting"; - throw; - } - WriteConflictException::logAndBackoff(attempt++, "update", nsString.toString()); - } - } - - // This is an upsert into a non-existing database, so need an exclusive lock - // to avoid deadlock - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ParsedUpdate parsedUpdate(txn, &request); - uassertStatusOK(parsedUpdate.parseRequest()); - - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_X); - OldClientContext ctx(txn, nsString.ns()); - uassert(ErrorCodes::NotMaster, - str::stream() << "Not primary while performing update on " << nsString.ns(), - repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsString)); - - Database* db = ctx.db(); - if (db->getCollection(nsString)) { - // someone else beat us to it, that's ok - // we might race while we unlock if someone drops - // but that's ok, we'll just do nothing and error out - } else { - WriteUnitOfWork wuow(txn); - uassertStatusOK(userCreateNS(txn, db, nsString.ns(), BSONObj())); - wuow.commit(); - } - - auto collection = ctx.db()->getCollection(nsString); - invariant(collection); - unique_ptr<PlanExecutor> exec = - uassertStatusOK(getExecutorUpdate(txn, &op.debug(), collection, &parsedUpdate)); - - // Run the plan and get stats out. - uassertStatusOK(exec->executePlan()); - - PlanSummaryStats summary; - Explain::getSummaryStats(*exec, &summary); - collection->infoCache()->notifyOfQuery(txn, summary.indexesUsed); - - const UpdateStats* updateStats = UpdateStage::getUpdateStats(exec.get()); - UpdateStage::recordUpdateStatsInOpDebug(updateStats, &op.debug()); - op.debug().setPlanSummaryMetrics(summary); - - UpdateResult res = UpdateStage::makeUpdateResult(updateStats); - - size_t nMatchedOrInserted = res.upserted.isEmpty() ? res.numMatched : 1U; - LastError::get(client).recordUpdate(res.existing, nMatchedOrInserted, res.upserted); - - if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) { - // If this operation has already generated a new lastOp, don't bother setting it - // here. No-op updates will not generate a new lastOp, so we still need the - // guard to fire in that case. - lastOpSetterGuard.Dismiss(); - } - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "update", nsString.ns()); -} - -void receivedDelete(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) { - uassertStatusOK(userAllowedWriteNS(nsString)); - - auto deleteOp = parseLegacyDelete(m); - auto& singleDelete = deleteOp.deletes[0]; - - auto client = txn->getClient(); - auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp(); - ScopeGuard lastOpSetterGuard = MakeObjGuard(repl::ReplClientInfo::forClient(client), - &repl::ReplClientInfo::setLastOpToSystemLastOpTime, - txn); - - op.debug().query = singleDelete.query; - { - stdx::lock_guard<Client> lk(*client); - op.setQuery_inlock(singleDelete.query); - op.setNS_inlock(nsString.ns()); - } - - Status status = - AuthorizationSession::get(client)->checkAuthForDelete(nsString, singleDelete.query); - audit::logDeleteAuthzCheck(client, nsString, singleDelete.query, status.code()); - uassertStatusOK(status); - - DeleteRequest request(nsString); - request.setQuery(singleDelete.query); - request.setMulti(singleDelete.multi); - - request.setYieldPolicy(PlanExecutor::YIELD_AUTO); - - int attempt = 1; - while (1) { - try { - ParsedDelete parsedDelete(txn, &request); - uassertStatusOK(parsedDelete.parseRequest()); - - ScopedTransaction scopedXact(txn, MODE_IX); - AutoGetDb autoDb(txn, nsString.db(), MODE_IX); - if (!autoDb.getDb()) { - break; - } - - Lock::CollectionLock collLock( - txn->lockState(), nsString.ns(), parsedDelete.isIsolated() ? MODE_X : MODE_IX); - OldClientContext ctx(txn, nsString.ns()); - - auto collection = ctx.db()->getCollection(nsString); - - unique_ptr<PlanExecutor> exec = - uassertStatusOK(getExecutorDelete(txn, &op.debug(), collection, &parsedDelete)); - - // Run the plan and get the number of docs deleted. - uassertStatusOK(exec->executePlan()); - long long n = DeleteStage::getNumDeleted(*exec); - LastError::get(client).recordDelete(n); - op.debug().ndeleted = n; - - PlanSummaryStats summary; - Explain::getSummaryStats(*exec, &summary); - if (collection) { - collection->infoCache()->notifyOfQuery(txn, summary.indexesUsed); - } - CurOp::get(txn)->debug().setPlanSummaryMetrics(summary); - - if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) { - // If this operation has already generated a new lastOp, don't bother setting it - // here. No-op updates will not generate a new lastOp, so we still need the - // guard to fire in that case. - lastOpSetterGuard.Dismiss(); - } - break; - } catch (const WriteConflictException& dle) { - op.debug().writeConflicts++; - WriteConflictException::logAndBackoff(attempt++, "delete", nsString.toString()); - } - } -} - -bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, CurOp& curop) { - DbMessage d(m); - - const char* ns = d.getns(); - int ntoreturn = d.pullInt(); - uassert( - 34419, str::stream() << "Invalid ntoreturn for OP_GET_MORE: " << ntoreturn, ntoreturn >= 0); - long long cursorid = d.pullInt64(); - - curop.debug().ntoreturn = ntoreturn; - curop.debug().cursorid = cursorid; - - { - stdx::lock_guard<Client>(*txn->getClient()); - CurOp::get(txn)->setNS_inlock(ns); - } - - bool exhaust = false; - QueryResult::View msgdata = 0; - bool isCursorAuthorized = false; - - try { - const NamespaceString nsString(ns); - uassert(ErrorCodes::InvalidNamespace, - str::stream() << "Invalid ns [" << ns << "]", - nsString.isValid()); - - Status status = AuthorizationSession::get(txn->getClient()) - ->checkAuthForGetMore(nsString, cursorid, false); - audit::logGetMoreAuthzCheck(txn->getClient(), nsString, cursorid, status.code()); - uassertStatusOK(status); - - while (MONGO_FAIL_POINT(rsStopGetMore)) { - sleepmillis(0); - } - - msgdata = getMore(txn, ns, ntoreturn, cursorid, &exhaust, &isCursorAuthorized); - } catch (AssertionException& e) { - if (isCursorAuthorized) { - // If a cursor with id 'cursorid' was authorized, it may have been advanced - // before an exception terminated processGetMore. Erase the ClientCursor - // because it may now be out of sync with the client's iteration state. - // SERVER-7952 - // TODO Temporary code, see SERVER-4563 for a cleanup overview. - CursorManager::eraseCursorGlobal(txn, cursorid); - } - - BSONObjBuilder err; - e.getInfo().append(err); - BSONObj errObj = err.done(); - - curop.debug().exceptionInfo = e.getInfo(); - - replyToQuery(ResultFlag_ErrSet, m, dbresponse, errObj); - curop.debug().responseLength = dbresponse.response.header().dataLen(); - curop.debug().nreturned = 1; - return false; - } - - dbresponse.response.setData(msgdata.view2ptr(), true); - curop.debug().responseLength = dbresponse.response.header().dataLen(); - curop.debug().nreturned = msgdata.getNReturned(); - - dbresponse.responseToMsgId = m.header().getId(); - - if (exhaust) { - curop.debug().exhaust = true; - dbresponse.exhaustNS = ns; - } - - return true; -} - -void insertMultiSingletons(OperationContext* txn, - OldClientContext& ctx, - bool keepGoing, - StringData ns, - CurOp& op, - vector<BSONObj>::iterator begin, - vector<BSONObj>::iterator end) { - for (vector<BSONObj>::iterator it = begin; it != end; it++) { - try { - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - WriteUnitOfWork wouw(txn); - Collection* collection = ctx.db()->getCollection(ns); - if (!collection) { - collection = ctx.db()->createCollection(txn, ns); - invariant(collection); - } - - uassertStatusOK(collection->insertDocument(txn, *it, &op.debug(), true)); - wouw.commit(); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "insert", ns); - - globalOpCounters.incInsertInWriteLock(1); - op.debug().ninserted++; - - } catch (const UserException& ex) { - if (!keepGoing) - throw; - LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg); - } - } -} - -void insertMultiVector(OperationContext* txn, - OldClientContext& ctx, - bool keepGoing, - StringData ns, - CurOp& op, - vector<BSONObj>::iterator begin, - vector<BSONObj>::iterator end) { - try { - WriteUnitOfWork wunit(txn); - Collection* collection = ctx.db()->getCollection(ns); - if (!collection) { - collection = ctx.db()->createCollection(txn, ns); - invariant(collection); - } - - uassertStatusOK(collection->insertDocuments(txn, begin, end, &op.debug(), true, false)); - wunit.commit(); - - int inserted = end - begin; - globalOpCounters.incInsertInWriteLock(inserted); - op.debug().ninserted = inserted; - } catch (UserException&) { - txn->recoveryUnit()->abandonSnapshot(); - insertMultiSingletons(txn, ctx, keepGoing, ns, op, begin, end); - } catch (WriteConflictException&) { - CurOp::get(txn)->debug().writeConflicts++; - txn->recoveryUnit()->abandonSnapshot(); - WriteConflictException::logAndBackoff(0, "insert", ns); - insertMultiSingletons(txn, ctx, keepGoing, ns, op, begin, end); - } -} - -NOINLINE_DECL void insertMulti(OperationContext* txn, - OldClientContext& ctx, - const InsertOp& insertOp, - CurOp& op) { - std::vector<BSONObj> docs; - docs.reserve(insertOp.documents.size()); - for (auto&& doc : insertOp.documents) { - // TODO don't fail yet on invalid documents. They should be treated like other errors. - BSONObj fixed = uassertStatusOK(fixDocumentForInsert(doc)); - docs.push_back(fixed.isEmpty() ? doc : std::move(fixed)); - } - - vector<BSONObj>::iterator chunkBegin = docs.begin(); - int64_t chunkCount = 0; - int64_t chunkSize = 0; - - auto client = txn->getClient(); - auto lastOpAtOperationStart = repl::ReplClientInfo::forClient(client).getLastOp(); - ScopeGuard lastOpSetterGuard = MakeObjGuard(repl::ReplClientInfo::forClient(client), - &repl::ReplClientInfo::setLastOpToSystemLastOpTime, - txn); - - for (vector<BSONObj>::iterator it = docs.begin(); it != docs.end(); it++) { - chunkSize += (*it).objsize(); - // Limit chunk size, actual size chosen is a tradeoff: larger sizes are more efficient, - // but smaller chunk sizes allow yielding to other threads and lower chance of WCEs - if ((++chunkCount >= internalQueryExecYieldIterations / 2) || - (chunkSize >= insertVectorMaxBytes)) { - if (it == chunkBegin) // there is only one doc to process, so avoid retry on failure - insertMultiSingletons( - txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, it + 1); - else - insertMultiVector( - txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, it + 1); - chunkBegin = it + 1; - chunkCount = 0; - chunkSize = 0; - } - } - if (chunkBegin != docs.end()) - insertMultiVector( - txn, ctx, insertOp.continueOnError, insertOp.ns.ns(), op, chunkBegin, docs.end()); - - if (repl::ReplClientInfo::forClient(client).getLastOp() != lastOpAtOperationStart) { - // If this operation has already generated a new lastOp, don't bother setting it - // here. No-op inserts will not generate a new lastOp, so we still need the - // guard to fire in that case. - lastOpSetterGuard.Dismiss(); - } -} - -static void convertSystemIndexInsertsToCommands(DbMessage& d, BSONArrayBuilder* allCmdsBuilder) { - while (d.moreJSObjs()) { - BSONObj spec = d.nextJsObj(); - BSONElement indexNsElement = spec["ns"]; - uassert(ErrorCodes::NoSuchKey, - str::stream() << "Missing \"ns\" field while inserting into " << d.getns(), - !indexNsElement.eoo()); - uassert(ErrorCodes::TypeMismatch, - str::stream() << "Expected \"ns\" field to have type String, not " - << typeName(indexNsElement.type()) << " while inserting into " - << d.getns(), - indexNsElement.type() == String); - const StringData nsToIndex(indexNsElement.valueStringData()); - BSONObjBuilder cmdObjBuilder(allCmdsBuilder->subobjStart()); - cmdObjBuilder << "createIndexes" << nsToCollectionSubstring(nsToIndex); - BSONArrayBuilder specArrayBuilder(cmdObjBuilder.subarrayStart("indexes")); - while (true) { - BSONObjBuilder specBuilder(specArrayBuilder.subobjStart()); - BSONElement specNsElement = spec["ns"]; - if ((specNsElement.type() != String) || - (specNsElement.valueStringData() != nsToIndex)) { - break; - } - for (BSONObjIterator iter(spec); iter.more();) { - BSONElement element = iter.next(); - if (element.fieldNameStringData() != "ns") { - specBuilder.append(element); - } - } - if (!d.moreJSObjs()) { - break; - } - spec = d.nextJsObj(); - } - } -} - -static void insertSystemIndexes(OperationContext* txn, DbMessage& d, CurOp& curOp) { - BSONArrayBuilder allCmdsBuilder; - try { - convertSystemIndexInsertsToCommands(d, &allCmdsBuilder); - } catch (const DBException& ex) { - LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg); - curOp.debug().exceptionInfo = ex.getInfo(); - return; - } - BSONArray allCmds(allCmdsBuilder.done()); - Command* createIndexesCmd = Command::findCommand("createIndexes"); - invariant(createIndexesCmd); - const bool keepGoing = d.reservedField() & InsertOption_ContinueOnError; - for (BSONObjIterator iter(allCmds); iter.more();) { - try { - BSONObj cmdObj = iter.next().Obj(); - - rpc::LegacyRequestBuilder requestBuilder{}; - auto indexNs = NamespaceString(d.getns()); - auto cmdRequestMsg = requestBuilder.setDatabase(indexNs.db()) - .setCommandName("createIndexes") - .setCommandArgs(cmdObj) - .setMetadata(rpc::makeEmptyMetadata()) - .done(); - rpc::LegacyRequest cmdRequest{&cmdRequestMsg}; - rpc::LegacyReplyBuilder cmdReplyBuilder{}; - Command::execCommand(txn, createIndexesCmd, cmdRequest, &cmdReplyBuilder); - auto cmdReplyMsg = cmdReplyBuilder.done(); - rpc::LegacyReply cmdReply{&cmdReplyMsg}; - uassertStatusOK(getStatusFromCommandResult(cmdReply.getCommandReply())); - } catch (const DBException& ex) { - LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg); - curOp.debug().exceptionInfo = ex.getInfo(); - if (!keepGoing) { - return; - } - } - } -} - -bool _receivedInsert(OperationContext* txn, - const NamespaceString& nsString, - const InsertOp& insertOp, - CurOp& op, - bool checkCollection) { - // CONCURRENCY TODO: is being read locked in big log sufficient here? - // writelock is used to synchronize stepdowns w/ writes - uassert( - 10058, "not master", repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsString)); - - OldClientContext ctx(txn, insertOp.ns.ns()); - if (checkCollection && !ctx.db()->getCollection(nsString)) - return false; - insertMulti(txn, ctx, insertOp, op); - return true; -} - -void receivedInsert(OperationContext* txn, const NamespaceString& nsString, Message& m, CurOp& op) { - { - stdx::lock_guard<Client>(*txn->getClient()); - CurOp::get(txn)->setNS_inlock(nsString.ns()); - } - - uassertStatusOK(userAllowedWriteNS(nsString.ns())); - if (nsString.isSystemDotIndexes()) { - DbMessage d(m); - insertSystemIndexes(txn, d, op); - return; - } - - auto insertOp = parseLegacyInsert(m); - - for (const auto& obj : insertOp.documents) { - // Check auth for insert. - Status status = - AuthorizationSession::get(txn->getClient())->checkAuthForInsert(nsString, obj); - audit::logInsertAuthzCheck(txn->getClient(), nsString, obj, status.code()); - uassertStatusOK(status); - } - - { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX); - Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX); - - // OldClientContext may implicitly create a database, so check existence - if (dbHolder().get(txn, nsString.db()) != NULL) { - if (_receivedInsert(txn, nsString, insertOp, op, true)) - return; - } - } - - // Collection didn't exist so try again with MODE_X - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_X); - - _receivedInsert(txn, nsString, insertOp, op, false); -} - // ----- BEGIN Diaglog ----- DiagLog::DiagLog() : f(0), level(0) {} diff --git a/src/mongo/db/lasterror.cpp b/src/mongo/db/lasterror.cpp index 624329c4ddd..60d0e59acc3 100644 --- a/src/mongo/db/lasterror.cpp +++ b/src/mongo/db/lasterror.cpp @@ -54,11 +54,6 @@ void LastError::setLastError(int code, std::string msg) { _msg = std::move(msg); } -void LastError::recordInsert(long long nObjects) { - reset(true); - _nObjects = nObjects; -} - void LastError::recordUpdate(bool updateObjects, long long nObjects, BSONObj upsertedId) { reset(true); _nObjects = nObjects; diff --git a/src/mongo/db/lasterror.h b/src/mongo/db/lasterror.h index e6f65e4aed7..27ce978d2b6 100644 --- a/src/mongo/db/lasterror.h +++ b/src/mongo/db/lasterror.h @@ -65,8 +65,6 @@ public: */ void setLastError(int code, std::string msg); - void recordInsert(long long nObjects); - void recordUpdate(bool updateObjects, long long nObjects, BSONObj upsertedId); void recordDelete(long long nDeleted); @@ -84,7 +82,7 @@ public: bool isValid() const { return _valid; } - int const getNPrev() const { + int getNPrev() const { return _nPrev; } diff --git a/src/mongo/db/ops/write_ops.h b/src/mongo/db/ops/write_ops.h index 267693bc559..24cfe22c54b 100644 --- a/src/mongo/db/ops/write_ops.h +++ b/src/mongo/db/ops/write_ops.h @@ -44,7 +44,6 @@ namespace mongo { */ struct ParsedWriteOp { NamespaceString ns; - boost::optional<BSONObj> writeConcern; bool bypassDocumentValidation = false; bool continueOnError = false; }; diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp new file mode 100644 index 00000000000..a995d1b25eb --- /dev/null +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -0,0 +1,630 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kWrite + +#include "mongo/platform/basic.h" + +#include <memory> + +#include "mongo/base/checked_cast.h" +#include "mongo/db/audit.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/database_holder.h" +#include "mongo/db/catalog/document_validation.h" +#include "mongo/db/commands.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/curop_metrics.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/exec/delete.h" +#include "mongo/db/exec/update.h" +#include "mongo/db/introspect.h" +#include "mongo/db/lasterror.h" +#include "mongo/db/ops/delete_request.h" +#include "mongo/db/ops/insert.h" +#include "mongo/db/ops/parsed_delete.h" +#include "mongo/db/ops/parsed_update.h" +#include "mongo/db/ops/update_lifecycle_impl.h" +#include "mongo/db/ops/update_request.h" +#include "mongo/db/ops/write_ops_exec.h" +#include "mongo/db/query/get_executor.h" +#include "mongo/db/query/plan_summary_stats.h" +#include "mongo/db/query/query_knobs.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/db/stats/counters.h" +#include "mongo/db/stats/top.h" +#include "mongo/db/write_concern.h" +#include "mongo/rpc/command_reply.h" +#include "mongo/rpc/command_reply_builder.h" +#include "mongo/rpc/command_request.h" +#include "mongo/rpc/command_request_builder.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/stale_exception.h" +#include "mongo/stdx/memory.h" +#include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { + +// Convention in this file: generic helpers go in the anonymous namespace. Helpers that are for a +// single type of operation are static functions defined above their caller. +namespace { + +void finishCurOp(OperationContext* txn, CurOp* curOp) { + try { + curOp->done(); + int executionTimeMs = curOp->totalTimeMillis(); + curOp->debug().executionTime = executionTimeMs; + + recordCurOpMetrics(txn); + Top::get(txn->getServiceContext()) + .record(curOp->getNS(), + curOp->getLogicalOp(), + 1, // "write locked" + curOp->totalTimeMicros(), + curOp->isCommand()); + + if (!curOp->debug().exceptionInfo.empty()) { + LOG(3) << "Caught Assertion in " << logicalOpToString(curOp->getLogicalOp()) << ": " + << curOp->debug().exceptionInfo.toString(); + } + + const bool logAll = logger::globalLogDomain()->shouldLog(logger::LogComponent::kCommand, + logger::LogSeverity::Debug(1)); + const bool logSlow = + executionTimeMs > (serverGlobalParams.slowMS + curOp->getExpectedLatencyMs()); + + if (logAll || logSlow) { + Locker::LockerInfo lockerInfo; + txn->lockState()->getLockerInfo(&lockerInfo); + log() << curOp->debug().report(*curOp, lockerInfo.stats); + } + + if (curOp->shouldDBProfile(executionTimeMs)) { + profile(txn, CurOp::get(txn)->getNetworkOp()); + } + } catch (const DBException& ex) { + // We need to ignore all errors here. We don't want a successful op to fail because of a + // failure to record stats. We also don't want to replace the error reported for an op that + // is failing. + log() << "Ignoring error from finishCurOp: " << ex.toString(); + } +} + +/** + * Sets the Client's LastOp to the system OpTime if needed. + */ +class LastOpFixer { +public: + LastOpFixer(OperationContext* txn, const NamespaceString& ns) + : _txn(txn), _isOnLocalDb(ns.isLocal()) {} + + ~LastOpFixer() { + if (_needToFixLastOp && !_isOnLocalDb) { + // If this operation has already generated a new lastOp, don't bother setting it + // here. No-op updates will not generate a new lastOp, so we still need the + // guard to fire in that case. Operations on the local DB aren't replicated, so they + // don't need to bump the lastOp. + replClientInfo().setLastOpToSystemLastOpTime(_txn); + } + } + + void startingOp() { + _needToFixLastOp = true; + _opTimeAtLastOpStart = replClientInfo().getLastOp(); + } + + void finishedOpSuccessfully() { + // If the op was succesful and bumped LastOp, we don't need to do it again. However, we + // still need to for no-ops and all failing ops. + _needToFixLastOp = (replClientInfo().getLastOp() == _opTimeAtLastOpStart); + } + +private: + repl::ReplClientInfo& replClientInfo() { + return repl::ReplClientInfo::forClient(_txn->getClient()); + } + + OperationContext* const _txn; + bool _needToFixLastOp = true; + const bool _isOnLocalDb; + repl::OpTime _opTimeAtLastOpStart; +}; + +void assertCanWrite_inlock(OperationContext* txn, const NamespaceString& ns) { + uassert(ErrorCodes::NotMaster, + str::stream() << "Not primary while writing to " << ns.ns(), + repl::ReplicationCoordinator::get(txn->getServiceContext())->canAcceptWritesFor(ns)); + CollectionShardingState::get(txn, ns)->checkShardVersionOrThrow(txn); +} + +void makeCollection(OperationContext* txn, const NamespaceString& ns) { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + AutoGetOrCreateDb db(txn, ns.db(), MODE_X); + assertCanWrite_inlock(txn, ns); + if (!db.getDb()->getCollection(ns.ns())) { // someone else may have beat us to it. + WriteUnitOfWork wuow(txn); + uassertStatusOK(userCreateNS(txn, db.getDb(), ns.ns(), BSONObj())); + wuow.commit(); + } + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "implicit collection creation", ns.ns()); +} + +/** + * Returns true if the operation can continue. + */ +bool handleError(OperationContext* txn, + const DBException& ex, + const ParsedWriteOp& wholeOp, + WriteResult* out) { + LastError::get(txn->getClient()).setLastError(ex.getCode(), ex.getInfo().msg); + auto& curOp = *CurOp::get(txn); + curOp.debug().exceptionInfo = ex.getInfo(); + + if (ErrorCodes::isInterruption(ErrorCodes::Error(ex.getCode()))) { + throw; // These have always failed the whole batch. + } + + if (ErrorCodes::isStaleShardingError(ErrorCodes::Error(ex.getCode()))) { + auto staleConfigException = dynamic_cast<const SendStaleConfigException*>(&ex); + if (!staleConfigException) { + // We need to get extra info off of the SCE, but some common patterns can result in the + // exception being converted to a Status then rethrown as a UserException, losing the + // info we need. It would be a bug if this happens so we want to detect it in testing, + // but it isn't severe enough that we should bring down the server if it happens in + // production. + dassert(staleConfigException); + msgassertedNoTrace(35475, + str::stream() + << "Got a StaleConfig error but exception was the wrong type: " + << demangleName(typeid(ex))); + } + + ShardingState::get(txn) + ->onStaleShardVersion(txn, wholeOp.ns, staleConfigException->getVersionReceived()); + out->staleConfigException = + stdx::make_unique<SendStaleConfigException>(*staleConfigException); + return false; + } + + out->results.emplace_back(ex.toStatus()); + return wholeOp.continueOnError; +} + +} // namespace + +static WriteResult::SingleResult createIndex(OperationContext* txn, + const NamespaceString& systemIndexes, + const BSONObj& spec) { + BSONElement nsElement = spec["ns"]; + uassert(ErrorCodes::NoSuchKey, "Missing \"ns\" field in index description", !nsElement.eoo()); + uassert(ErrorCodes::TypeMismatch, + str::stream() << "Expected \"ns\" field of index description to be a " + "string, " + "but found a " << typeName(nsElement.type()), + nsElement.type() == String); + const NamespaceString ns(nsElement.valueStringData()); + uassert(ErrorCodes::InvalidOptions, + str::stream() << "Cannot create an index on " << ns.ns() << " with an insert to " + << systemIndexes.ns(), + ns.db() == systemIndexes.db()); + + BSONObjBuilder cmdBuilder; + cmdBuilder << "createIndexes" << ns.coll(); + cmdBuilder << "indexes" << BSON_ARRAY(spec); + BSONObj cmd = cmdBuilder.done(); + + rpc::CommandRequestBuilder requestBuilder; + auto cmdRequestMsg = requestBuilder.setDatabase(ns.db()) + .setCommandName("createIndexes") + .setCommandArgs(cmd) + .setMetadata(rpc::makeEmptyMetadata()) + .done(); + rpc::CommandRequest cmdRequest(&cmdRequestMsg); + rpc::CommandReplyBuilder cmdReplyBuilder; + Command::findCommand("createIndexes")->run(txn, cmdRequest, &cmdReplyBuilder); + auto cmdReplyMsg = cmdReplyBuilder.done(); + rpc::CommandReply cmdReply(&cmdReplyMsg); + auto cmdResult = cmdReply.getCommandReply(); + uassertStatusOK(getStatusFromCommandResult(cmdResult)); + + // Unlike normal inserts, it is not an error to "insert" a duplicate index. + long long n = + cmdResult["numIndexesAfter"].numberInt() - cmdResult["numIndexesBefore"].numberInt(); + CurOp::get(txn)->debug().ninserted += n; + + return {n}; +} + +static WriteResult performCreateIndexes(OperationContext* txn, const InsertOp& wholeOp) { + // Currently this creates each index independently. We could pass multiple indexes to + // createIndexes, but there is a lot of complexity involved in doing it correctly. For one + // thing, createIndexes only takes indexes to a single collection, but this batch could include + // different collections. Additionally, the error handling is different: createIndexes is + // all-or-nothing while inserts are supposed to behave like a sequence that either skips over + // errors or stops at the first one. These could theoretically be worked around, but it doesn't + // seem worth it since users that want faster index builds should just use the createIndexes + // command rather than a legacy emulation. + LastOpFixer lastOpFixer(txn, wholeOp.ns); + + // Creating an index can change the writeConcern. Make sure we set it back to what it was. + const auto oldWC = txn->getWriteConcern(); + ON_BLOCK_EXIT([&] { txn->setWriteConcern(oldWC); }); + + WriteResult out; + for (auto&& spec : wholeOp.documents) { + try { + lastOpFixer.startingOp(); + out.results.emplace_back(createIndex(txn, wholeOp.ns, spec)); + lastOpFixer.finishedOpSuccessfully(); + } catch (const DBException& ex) { + const bool canContinue = handleError(txn, ex, wholeOp, &out); + if (!canContinue) + break; + } + } + return out; +} + +static void insertDocuments(OperationContext* txn, + const NamespaceString& ns, + std::vector<BSONObj>::const_iterator begin, + std::vector<BSONObj>::const_iterator end) { + auto& curOp = *CurOp::get(txn); + boost::optional<AutoGetCollection> collection; + while (true) { + txn->checkForInterrupt(); + collection.emplace(txn, ns, MODE_IX); + if (collection->getCollection()) + break; + + collection.reset(); // unlock. + makeCollection(txn, ns); + } + + curOp.raiseDbProfileLevel(collection->getDb()->getProfilingLevel()); + assertCanWrite_inlock(txn, ns); + + // Intentionally not using a WRITE_CONFLICT_RETRY_LOOP. That is handled by the caller so it can + // react to oversized batches. + WriteUnitOfWork wuow(txn); + uassertStatusOK(collection->getCollection()->insertDocuments( + txn, begin, end, &curOp.debug(), /*enforceQuota*/ true)); + wuow.commit(); +} + +/** + * Returns true if caller should try to insert more documents. Does nothing else if batch is empty. + */ +static bool insertBatchAndHandleErrors(OperationContext* txn, + const InsertOp& wholeOp, + const std::vector<BSONObj>& batch, + LastOpFixer* lastOpFixer, + WriteResult* out) { + auto& curOp = *CurOp::get(txn); + if (batch.size() > 1) { + // First try doing it all together. If all goes well, this is all we need to do. + try { + lastOpFixer->startingOp(); + insertDocuments(txn, wholeOp.ns, batch.begin(), batch.end()); + lastOpFixer->finishedOpSuccessfully(); + globalOpCounters.gotInserts(batch.size()); + std::fill_n( + std::back_inserter(out->results), batch.size(), WriteResult::SingleResult{1}); + curOp.debug().ninserted += batch.size(); + return true; + } catch (const DBException& ex) { + // Ignore this failure and behave as-if we never tried to do the combined batch insert. + // The loop below will handle reporting any non-transient errors. + } + } + + // Try to insert the batch one-at-a-time. This path is executed both for singular batches, and + // for batches that failed all-at-once inserting. + for (auto it = batch.begin(); it != batch.end(); ++it) { + globalOpCounters.gotInsert(); + try { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + lastOpFixer->startingOp(); + insertDocuments(txn, wholeOp.ns, it, it + 1); + lastOpFixer->finishedOpSuccessfully(); + out->results.emplace_back(WriteResult::SingleResult{1}); + curOp.debug().ninserted++; + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "insert", wholeOp.ns.ns()); + } catch (const DBException& ex) { + bool canContinue = handleError(txn, ex, wholeOp, out); + if (!canContinue) + return false; + } + } + + return true; +} + +WriteResult performInserts(OperationContext* txn, const InsertOp& wholeOp) { + invariant(!txn->lockState()->inAWriteUnitOfWork()); // Does own retries. + auto& curOp = *CurOp::get(txn); + ON_BLOCK_EXIT([&] { + // This is the only part of finishCurOp we need to do for inserts because they reuse the + // top-level curOp. The rest is handled by the top-level entrypoint. + curOp.done(); + Top::get(txn->getServiceContext()) + .record(wholeOp.ns.ns(), + LogicalOp::opInsert, + 1 /* write locked*/, + curOp.totalTimeMicros(), + curOp.isCommand()); + + }); + + { + stdx::lock_guard<Client>(*txn->getClient()); + curOp.setNS_inlock(wholeOp.ns.ns()); + curOp.setLogicalOp_inlock(LogicalOp::opInsert); + curOp.ensureStarted(); + curOp.debug().ninserted = 0; + } + + uassertStatusOK(userAllowedWriteNS(wholeOp.ns)); + + if (wholeOp.ns.isSystemDotIndexes()) { + return performCreateIndexes(txn, wholeOp); + } + + DisableDocumentValidationIfTrue docValidationDisabler(txn, wholeOp.bypassDocumentValidation); + LastOpFixer lastOpFixer(txn, wholeOp.ns); + + WriteResult out; + out.results.reserve(wholeOp.documents.size()); + + size_t bytesInBatch = 0; + std::vector<BSONObj> batch; + const size_t maxBatchSize = internalQueryExecYieldIterations / 2; + batch.reserve(std::min(wholeOp.documents.size(), maxBatchSize)); + + for (auto&& doc : wholeOp.documents) { + const bool isLastDoc = (&doc == &wholeOp.documents.back()); + auto fixedDoc = fixDocumentForInsert(doc); + if (!fixedDoc.isOK()) { + // Handled after we insert anything in the batch to be sure we report errors in the + // correct order. In an ordered insert, if one of the docs ahead of us fails, we should + // behave as-if we never got to this document. + } else { + batch.push_back(fixedDoc.getValue().isEmpty() ? doc : std::move(fixedDoc.getValue())); + bytesInBatch += batch.back().objsize(); + if (!isLastDoc && batch.size() < maxBatchSize && bytesInBatch < insertVectorMaxBytes) + continue; // Add more to batch before inserting. + } + + bool canContinue = insertBatchAndHandleErrors(txn, wholeOp, batch, &lastOpFixer, &out); + batch.clear(); // We won't need the current batch any more. + bytesInBatch = 0; + + if (canContinue && !fixedDoc.isOK()) { + globalOpCounters.gotInsert(); + canContinue = handleError( + txn, + UserException(fixedDoc.getStatus().code(), fixedDoc.getStatus().reason()), + wholeOp, + &out); + } + + if (!canContinue) + break; + } + + return out; +} + +static WriteResult::SingleResult performSingleUpdateOp(OperationContext* txn, + const NamespaceString& ns, + const UpdateOp::SingleUpdate& op) { + globalOpCounters.gotUpdate(); + auto& curOp = *CurOp::get(txn); + { + stdx::lock_guard<Client> lk(*txn->getClient()); + curOp.setNS_inlock(ns.ns()); + curOp.setNetworkOp_inlock(dbUpdate); + curOp.setLogicalOp_inlock(LogicalOp::opUpdate); + curOp.setQuery_inlock(op.query); + curOp.ensureStarted(); + + curOp.debug().query = op.query; + } + + UpdateLifecycleImpl updateLifecycle(ns); + UpdateRequest request(ns); + request.setLifecycle(&updateLifecycle); + request.setQuery(op.query); + request.setUpdates(op.update); + request.setMulti(op.multi); + request.setUpsert(op.upsert); + request.setYieldPolicy(PlanExecutor::YIELD_AUTO); // ParsedUpdate overrides this for $isolated. + + ParsedUpdate parsedUpdate(txn, &request); + uassertStatusOK(parsedUpdate.parseRequest()); + + ScopedTransaction scopedXact(txn, MODE_IX); + boost::optional<AutoGetCollection> collection; + while (true) { + txn->checkForInterrupt(); + collection.emplace(txn, + ns, + MODE_IX, // DB is always IX, even if collection is X. + parsedUpdate.isIsolated() ? MODE_X : MODE_IX); + if (collection->getCollection() || !op.upsert) + break; + + collection.reset(); // unlock. + makeCollection(txn, ns); + } + + if (collection->getDb()) { + curOp.raiseDbProfileLevel(collection->getDb()->getProfilingLevel()); + } + + assertCanWrite_inlock(txn, ns); + + auto exec = uassertStatusOK( + getExecutorUpdate(txn, &curOp.debug(), collection->getCollection(), &parsedUpdate)); + uassertStatusOK(exec->executePlan()); + + PlanSummaryStats summary; + Explain::getSummaryStats(*exec, &summary); + if (collection->getCollection()) { + collection->getCollection()->infoCache()->notifyOfQuery(txn, summary.indexesUsed); + } + + const UpdateStats* updateStats = UpdateStage::getUpdateStats(exec.get()); + UpdateStage::recordUpdateStatsInOpDebug(updateStats, &curOp.debug()); + curOp.debug().setPlanSummaryMetrics(summary); + UpdateResult res = UpdateStage::makeUpdateResult(updateStats); + + const bool didInsert = !res.upserted.isEmpty(); + const long long nMatchedOrInserted = didInsert ? 1 : res.numMatched; + LastError::get(txn->getClient()).recordUpdate(res.existing, nMatchedOrInserted, res.upserted); + + return {nMatchedOrInserted, res.numDocsModified, res.upserted}; +} + +WriteResult performUpdates(OperationContext* txn, const UpdateOp& wholeOp) { + invariant(!txn->lockState()->inAWriteUnitOfWork()); // Does own retries. + uassertStatusOK(userAllowedWriteNS(wholeOp.ns)); + + DisableDocumentValidationIfTrue docValidationDisabler(txn, wholeOp.bypassDocumentValidation); + LastOpFixer lastOpFixer(txn, wholeOp.ns); + + WriteResult out; + out.results.reserve(wholeOp.updates.size()); + for (auto&& singleOp : wholeOp.updates) { + // TODO: don't create nested CurOp for legacy writes. + CurOp curOp(txn); + ON_BLOCK_EXIT([&] { finishCurOp(txn, &curOp); }); + try { + lastOpFixer.startingOp(); + out.results.emplace_back(performSingleUpdateOp(txn, wholeOp.ns, singleOp)); + lastOpFixer.finishedOpSuccessfully(); + } catch (const DBException& ex) { + const bool canContinue = handleError(txn, ex, wholeOp, &out); + if (!canContinue) + break; + } + } + + return out; +} + +static WriteResult::SingleResult performSingleDeleteOp(OperationContext* txn, + const NamespaceString& ns, + const DeleteOp::SingleDelete& op) { + globalOpCounters.gotDelete(); + auto& curOp = *CurOp::get(txn); + { + stdx::lock_guard<Client> lk(*txn->getClient()); + curOp.setNS_inlock(ns.ns()); + curOp.setNetworkOp_inlock(dbDelete); + curOp.setLogicalOp_inlock(LogicalOp::opDelete); + curOp.setQuery_inlock(op.query); + curOp.ensureStarted(); + + curOp.debug().query = op.query; + curOp.debug().ndeleted = 0; + } + + txn->checkForInterrupt(); + + DeleteRequest request(ns); + request.setQuery(op.query); + request.setMulti(op.multi); + request.setYieldPolicy(PlanExecutor::YIELD_AUTO); // ParsedDelete overrides this for $isolated. + + ParsedDelete parsedDelete(txn, &request); + uassertStatusOK(parsedDelete.parseRequest()); + + ScopedTransaction scopedXact(txn, MODE_IX); + AutoGetCollection collection(txn, + ns, + MODE_IX, // DB is always IX, even if collection is X. + parsedDelete.isIsolated() ? MODE_X : MODE_IX); + if (collection.getDb()) { + curOp.raiseDbProfileLevel(collection.getDb()->getProfilingLevel()); + } + + assertCanWrite_inlock(txn, ns); + + auto exec = uassertStatusOK( + getExecutorDelete(txn, &curOp.debug(), collection.getCollection(), &parsedDelete)); + uassertStatusOK(exec->executePlan()); + long long n = DeleteStage::getNumDeleted(*exec); + curOp.debug().ndeleted = n; + + PlanSummaryStats summary; + Explain::getSummaryStats(*exec, &summary); + if (collection.getCollection()) { + collection.getCollection()->infoCache()->notifyOfQuery(txn, summary.indexesUsed); + } + curOp.debug().setPlanSummaryMetrics(summary); + LastError::get(txn->getClient()).recordDelete(n); + + return {n}; +} + +WriteResult performDeletes(OperationContext* txn, const DeleteOp& wholeOp) { + invariant(!txn->lockState()->inAWriteUnitOfWork()); // Does own retries. + uassertStatusOK(userAllowedWriteNS(wholeOp.ns)); + + DisableDocumentValidationIfTrue docValidationDisabler(txn, wholeOp.bypassDocumentValidation); + LastOpFixer lastOpFixer(txn, wholeOp.ns); + + WriteResult out; + out.results.reserve(wholeOp.deletes.size()); + for (auto&& singleOp : wholeOp.deletes) { + // TODO: don't create nested CurOp for legacy writes. + CurOp curOp(txn); + ON_BLOCK_EXIT([&] { finishCurOp(txn, &curOp); }); + try { + lastOpFixer.startingOp(); + out.results.emplace_back(performSingleDeleteOp(txn, wholeOp.ns, singleOp)); + lastOpFixer.finishedOpSuccessfully(); + } catch (const DBException& ex) { + const bool canContinue = handleError(txn, ex, wholeOp, &out); + if (!canContinue) + break; + } + } + + return out; +} + +} // namespace mongo diff --git a/src/mongo/db/ops/write_ops_exec.h b/src/mongo/db/ops/write_ops_exec.h new file mode 100644 index 00000000000..37ab8dd3ee1 --- /dev/null +++ b/src/mongo/db/ops/write_ops_exec.h @@ -0,0 +1,85 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> +#include <vector> + +#include "mongo/base/status_with.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/ops/write_ops.h" +#include "mongo/s/stale_exception.h" + +namespace mongo { + +/** + * The result of performing a single write, possibly within a batch. + */ +struct WriteResult { + struct SingleResult { + int64_t n; + int64_t nModified; + BSONObj upsertedId; // Non-empty if something was upserted. + }; + + /** + * Maps 1-to-1 to single ops in request. May be shorter than input if there are errors. + * + * staleConfigException should be considered appended to this if it is non-null. + */ + std::vector<StatusWith<SingleResult>> results; + + /** + * If non-null, the SendStaleConfigException that was encountered while processing the op after + * the last op reported in results. Processing always stops at the first SCE and nothing is + * placed in results for the op that triggered it. The whole exception is copied here because it + * contains additional data not included in the Status. + * + * TODO convert to std::unique_ptr once we are on MSVC2015. + */ + std::shared_ptr<SendStaleConfigException> staleConfigException; +}; + + +/** + * Performs a batch of inserts, updates, or deletes. + * + * These functions handle all of the work of doing the writes, including locking, incrementing + * counters, managing CurOp, and of course actually doing the write. Waiting for the writeConcern is + * *not* handled by these functions and is expected to be done by the caller if needed. + * + * LastError is updated for failures of individual writes, but not for batch errors reported by an + * exception being thrown from these functions. Callers are responsible for managing LastError in + * that case. This should generally be combined with LastError handling from parse failures. + */ +WriteResult performInserts(OperationContext* txn, const InsertOp& op); +WriteResult performUpdates(OperationContext* txn, const UpdateOp& op); +WriteResult performDeletes(OperationContext* txn, const DeleteOp& op); + +} // namespace mongo diff --git a/src/mongo/db/ops/write_ops_parsers.cpp b/src/mongo/db/ops/write_ops_parsers.cpp index 8e01ee43eaa..6a7102d4dc1 100644 --- a/src/mongo/db/ops/write_ops_parsers.cpp +++ b/src/mongo/db/ops/write_ops_parsers.cpp @@ -95,10 +95,7 @@ void parseWriteCommand(StringData dbName, } const StringData fieldName = field.fieldNameStringData(); - if (fieldName == "writeConcern") { - checkType(Object, field); - op->writeConcern = field.Obj(); - } else if (fieldName == "bypassDocumentValidation") { + if (fieldName == "bypassDocumentValidation") { checkType(Bool, field); op->bypassDocumentValidation = field.Bool(); } else if (fieldName == "ordered") { @@ -108,7 +105,8 @@ void parseWriteCommand(StringData dbName, haveUniqueField = true; *uniqueField = field; } else if (fieldName[0] != '$') { - std::initializer_list<StringData> ignoredFields = {"maxTimeMS", "shardVersion"}; + std::initializer_list<StringData> ignoredFields = { + "writeConcern", "maxTimeMS", "shardVersion"}; uassert(ErrorCodes::FailedToParse, str::stream() << "Unknown option to " << cmd.firstElementFieldName() << " command: " << fieldName, @@ -134,6 +132,14 @@ InsertOp parseInsertCommand(StringData dbName, const BSONObj& cmd) { op.documents.push_back(doc.Obj()); } checkOpCountForCommand(op.documents.size()); + + if (op.ns.isSystemDotIndexes()) { + // This is only for consistency with sharding. + uassert(ErrorCodes::InvalidLength, + "Insert commands to system.indexes are limited to a single insert", + op.documents.size() == 1); + } + return op; } diff --git a/src/mongo/db/ops/write_ops_parsers_test.cpp b/src/mongo/db/ops/write_ops_parsers_test.cpp index 04b3982762e..5d5f030e54c 100644 --- a/src/mongo/db/ops/write_ops_parsers_test.cpp +++ b/src/mongo/db/ops/write_ops_parsers_test.cpp @@ -34,16 +34,6 @@ namespace mongo { -TEST(CommandWriteOpsParsers, CommonFields_WriteConcern) { - auto writeConcern = BSON("w" << 2); - auto cmd = BSON("insert" - << "bar" - << "documents" << BSON_ARRAY(BSONObj()) << "writeConcern" << writeConcern); - auto op = parseInsertCommand("foo", cmd); - ASSERT(bool(op.writeConcern)); - ASSERT_EQ(*op.writeConcern, writeConcern); -} - TEST(CommandWriteOpsParsers, CommonFields_BypassDocumentValidation) { for (bool bypassDocumentValidation : {true, false}) { auto cmd = BSON("insert" @@ -70,7 +60,7 @@ TEST(CommandWriteOpsParsers, CommonFields_IgnoredFields) { auto cmd = BSON("insert" << "bar" << "documents" << BSON_ARRAY(BSONObj()) << "maxTimeMS" << 1000 << "shardVersion" - << BSONObj()); + << BSONObj() << "writeConcern" << BSONObj()); parseInsertCommand("foo", cmd); } @@ -102,7 +92,6 @@ TEST(CommandWriteOpsParsers, SingleInsert) { auto cmd = BSON("insert" << ns.coll() << "documents" << BSON_ARRAY(obj)); const auto op = parseInsertCommand(ns.db(), cmd); ASSERT_EQ(op.ns.ns(), ns.ns()); - ASSERT(!op.writeConcern); ASSERT(!op.bypassDocumentValidation); ASSERT(!op.continueOnError); ASSERT_EQ(op.documents.size(), 1u); @@ -122,7 +111,6 @@ TEST(CommandWriteOpsParsers, RealMultiInsert) { auto cmd = BSON("insert" << ns.coll() << "documents" << BSON_ARRAY(obj0 << obj1)); const auto op = parseInsertCommand(ns.db(), cmd); ASSERT_EQ(op.ns.ns(), ns.ns()); - ASSERT(!op.writeConcern); ASSERT(!op.bypassDocumentValidation); ASSERT(!op.continueOnError); ASSERT_EQ(op.documents.size(), 2u); @@ -141,7 +129,6 @@ TEST(CommandWriteOpsParsers, Update) { << upsert << "multi" << multi))); auto op = parseUpdateCommand(ns.db(), cmd); ASSERT_EQ(op.ns.ns(), ns.ns()); - ASSERT(!op.writeConcern); ASSERT(!op.bypassDocumentValidation); ASSERT_EQ(op.continueOnError, false); ASSERT_EQ(op.updates.size(), 1u); @@ -161,7 +148,6 @@ TEST(CommandWriteOpsParsers, Remove) { << BSON_ARRAY(BSON("q" << query << "limit" << (multi ? 0 : 1)))); auto op = parseDeleteCommand(ns.db(), cmd); ASSERT_EQ(op.ns.ns(), ns.ns()); - ASSERT(!op.writeConcern); ASSERT(!op.bypassDocumentValidation); ASSERT_EQ(op.continueOnError, false); ASSERT_EQ(op.deletes.size(), 1u); @@ -235,7 +221,6 @@ TEST(LegacyWriteOpsParsers, SingleInsert) { client.insert(ns, obj, continueOnError ? InsertOption_ContinueOnError : 0); const auto op = parseLegacyInsert(client.message); ASSERT_EQ(op.ns.ns(), ns); - ASSERT(!op.writeConcern); ASSERT(!op.bypassDocumentValidation); ASSERT_EQ(op.continueOnError, continueOnError); ASSERT_EQ(op.documents.size(), 1u); @@ -263,7 +248,6 @@ TEST(LegacyWriteOpsParsers, RealMultiInsert) { client.insert(ns, {obj0, obj1}, continueOnError ? InsertOption_ContinueOnError : 0); const auto op = parseLegacyInsert(client.message); ASSERT_EQ(op.ns.ns(), ns); - ASSERT(!op.writeConcern); ASSERT(!op.bypassDocumentValidation); ASSERT_EQ(op.continueOnError, continueOnError); ASSERT_EQ(op.documents.size(), 2u); @@ -282,7 +266,6 @@ TEST(LegacyWriteOpsParsers, Update) { client.update(ns, query, update, upsert, multi); const auto op = parseLegacyUpdate(client.message); ASSERT_EQ(op.ns.ns(), ns); - ASSERT(!op.writeConcern); ASSERT(!op.bypassDocumentValidation); ASSERT_EQ(op.continueOnError, false); ASSERT_EQ(op.updates.size(), 1u); @@ -302,7 +285,6 @@ TEST(LegacyWriteOpsParsers, Remove) { client.remove(ns, query, multi ? 0 : RemoveOption_JustOne); const auto op = parseLegacyDelete(client.message); ASSERT_EQ(op.ns.ns(), ns); - ASSERT(!op.writeConcern); ASSERT(!op.bypassDocumentValidation); ASSERT_EQ(op.continueOnError, false); ASSERT_EQ(op.deletes.size(), 1u); diff --git a/src/mongo/db/stats/counters.cpp b/src/mongo/db/stats/counters.cpp index 7a7c7d7dd1f..61b5be912dd 100644 --- a/src/mongo/db/stats/counters.cpp +++ b/src/mongo/db/stats/counters.cpp @@ -43,7 +43,7 @@ using std::endl; OpCounters::OpCounters() {} -void OpCounters::incInsertInWriteLock(int n) { +void OpCounters::gotInserts(int n) { RARELY _checkWrap(); _insert.fetchAndAdd(n); } diff --git a/src/mongo/db/stats/counters.h b/src/mongo/db/stats/counters.h index 717724b8465..cae0cb0ad16 100644 --- a/src/mongo/db/stats/counters.h +++ b/src/mongo/db/stats/counters.h @@ -45,7 +45,7 @@ namespace mongo { class OpCounters { public: OpCounters(); - void incInsertInWriteLock(int n); + void gotInserts(int n); void gotInsert(); void gotQuery(); void gotUpdate(); diff --git a/src/mongo/dbtests/query_stage_sort.cpp b/src/mongo/dbtests/query_stage_sort.cpp index fabf10ba05b..25d391ac6e8 100644 --- a/src/mongo/dbtests/query_stage_sort.cpp +++ b/src/mongo/dbtests/query_stage_sort.cpp @@ -319,11 +319,7 @@ public: wuow.commit(); } - { - WriteUnitOfWork wuow(&_txn); - fillData(); - wuow.commit(); - } + fillData(); // The data we're going to later invalidate. set<RecordId> recordIds; @@ -432,11 +428,7 @@ public: wuow.commit(); } - { - WriteUnitOfWork wuow(&_txn); - fillData(); - wuow.commit(); - } + fillData(); // The data we're going to later invalidate. set<RecordId> recordIds; |