/** * Copyright (C) 2013 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kWrite #include "mongo/platform/basic.h" #include "mongo/db/commands/write_commands/batch_executor.h" #include #include "mongo/base/error_codes.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/document_validation.h" #include "mongo/db/catalog/index_create.h" #include "mongo/db/clientcursor.h" #include "mongo/db/commands.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop_metrics.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/delete.h" #include "mongo/db/exec/update.h" #include "mongo/db/instance.h" #include "mongo/db/introspect.h" #include "mongo/db/lasterror.h" #include "mongo/db/namespace_string.h" #include "mongo/db/op_observer.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/ops/delete_request.h" #include "mongo/db/ops/insert.h" #include "mongo/db/ops/parsed_delete.h" #include "mongo/db/ops/parsed_update.h" #include "mongo/db/ops/update_lifecycle_impl.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/query/query_knobs.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" #include "mongo/db/stats/counters.h" #include "mongo/db/stats/top.h" #include "mongo/db/write_concern.h" #include "mongo/s/collection_metadata.h" #include "mongo/s/d_state.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/s/stale_exception.h" #include "mongo/s/write_ops/batched_upsert_detail.h" #include "mongo/s/write_ops/write_error_detail.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" #include "mongo/util/elapsed_tracker.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" namespace mongo { using std::endl; using std::string; using std::unique_ptr; using std::vector; namespace { /** * Data structure to safely hold and clean up results of single write operations. */ class WriteOpResult { MONGO_DISALLOW_COPYING(WriteOpResult); public: WriteOpResult() {} WriteOpStats& getStats() { return _stats; } WriteErrorDetail* getError() { return _error.get(); } WriteErrorDetail* releaseError() { return _error.release(); } void setError(WriteErrorDetail* error) { _error.reset(error); } private: WriteOpStats _stats; std::unique_ptr _error; }; } // namespace // TODO: Determine queueing behavior we want here MONGO_EXPORT_SERVER_PARAMETER(queueForMigrationCommit, bool, true); using mongoutils::str::stream; WriteBatchExecutor::WriteBatchExecutor(OperationContext* txn, OpCounters* opCounters, LastError* le) : _txn(txn), _opCounters(opCounters), _le(le), _stats(new WriteBatchStats) {} static WCErrorDetail* toWriteConcernError(const Status& wcStatus, const WriteConcernResult& wcResult) { WCErrorDetail* wcError = new WCErrorDetail; wcError->setErrCode(wcStatus.code()); wcError->setErrMessage(wcStatus.reason()); if (wcResult.wTimedOut) wcError->setErrInfo(BSON("wtimeout" << true)); return wcError; } static WriteErrorDetail* toWriteError(const Status& status) { WriteErrorDetail* error = new WriteErrorDetail; // TODO: Complex transform here? error->setErrCode(status.code()); error->setErrMessage(status.reason()); return error; } static void toBatchError(const Status& status, BatchedCommandResponse* response) { response->clear(); response->setErrCode(status.code()); response->setErrMessage(status.reason()); response->setOk(false); dassert(response->isValid(NULL)); } static void noteInCriticalSection(WriteErrorDetail* staleError) { BSONObjBuilder builder; if (staleError->isErrInfoSet()) builder.appendElements(staleError->getErrInfo()); builder.append("inCriticalSection", true); staleError->setErrInfo(builder.obj()); } // static Status WriteBatchExecutor::validateBatch(const BatchedCommandRequest& request) { // Validate namespace const NamespaceString& nss = request.getNSS(); if (!nss.isValid()) { return Status(ErrorCodes::InvalidNamespace, nss.ns() + " is not a valid namespace"); } // Make sure we can write to the namespace Status allowedStatus = userAllowedWriteNS(nss); if (!allowedStatus.isOK()) { return allowedStatus; } // Validate insert index requests // TODO: Push insert index requests through createIndex once all upgrade paths support it string errMsg; if (request.isInsertIndexRequest() && !request.isValidIndexRequest(&errMsg)) { return Status(ErrorCodes::InvalidOptions, errMsg); } return Status::OK(); } void WriteBatchExecutor::executeBatch(const BatchedCommandRequest& request, BatchedCommandResponse* response) { // Validate namespace Status isValid = validateBatch(request); if (!isValid.isOK()) { toBatchError(isValid, response); return; } if (request.sizeWriteOps() == 0u) { toBatchError(Status(ErrorCodes::InvalidLength, "no write ops were included in the batch"), response); return; } // Validate batch size if (request.sizeWriteOps() > BatchedCommandRequest::kMaxWriteBatchSize) { toBatchError(Status(ErrorCodes::InvalidLength, stream() << "exceeded maximum write batch size of " << BatchedCommandRequest::kMaxWriteBatchSize), response); return; } // // End validation // const WriteConcernOptions& writeConcern = _txn->getWriteConcern(); bool silentWC = writeConcern.wMode.empty() && writeConcern.wNumNodes == 0 && writeConcern.syncMode == WriteConcernOptions::NONE; Timer commandTimer; OwnedPointerVector writeErrorsOwned; vector& writeErrors = writeErrorsOwned.mutableVector(); OwnedPointerVector upsertedOwned; vector& upserted = upsertedOwned.mutableVector(); // // Apply each batch item, possibly bulking some items together in the write lock. // Stops on error if batch is ordered. // bulkExecute(request, &upserted, &writeErrors); // // Try to enforce the write concern if everything succeeded (unordered or ordered) // OR if something succeeded and we're unordered. // unique_ptr wcError; bool needToEnforceWC = writeErrors.empty() || (!request.getOrdered() && writeErrors.size() < request.sizeWriteOps()); if (needToEnforceWC) { { stdx::lock_guard lk(*_txn->getClient()); CurOp::get(_txn)->setMessage_inlock("waiting for write concern"); } WriteConcernResult res; Status status = waitForWriteConcern( _txn, repl::ReplClientInfo::forClient(_txn->getClient()).getLastOp(), &res); if (!status.isOK()) { wcError.reset(toWriteConcernError(status, res)); } } // // Refresh metadata if needed // bool staleBatch = !writeErrors.empty() && writeErrors.back()->getErrCode() == ErrorCodes::StaleShardVersion; if (staleBatch) { const BatchedRequestMetadata* requestMetadata = request.getMetadata(); dassert(requestMetadata); // Make sure our shard name is set or is the same as what was set previously if (shardingState.setShardName(requestMetadata->getShardName())) { // // First, we refresh metadata if we need to based on the requested version. // ChunkVersion latestShardVersion; shardingState.refreshMetadataIfNeeded(_txn, request.getTargetingNS(), requestMetadata->getShardVersion(), &latestShardVersion); // Report if we're still changing our metadata // TODO: Better reporting per-collection if (shardingState.inCriticalMigrateSection()) { noteInCriticalSection(writeErrors.back()); } if (queueForMigrationCommit) { // // Queue up for migration to end - this allows us to be sure that clients will // not repeatedly try to refresh metadata that is not yet written to the config // server. Not necessary for correctness. // Exposed as optional parameter to allow testing of queuing behavior with // different network timings. // const ChunkVersion& requestShardVersion = requestMetadata->getShardVersion(); // // Only wait if we're an older version (in the current collection epoch) and // we're not write compatible, implying that the current migration is affecting // writes. // if (requestShardVersion.isOlderThan(latestShardVersion) && !requestShardVersion.isWriteCompatibleWith(latestShardVersion)) { while (shardingState.inCriticalMigrateSection()) { log() << "write request to old shard version " << requestMetadata->getShardVersion().toString() << " waiting for migration commit" << endl; shardingState.waitTillNotInCriticalSection(10 /* secs */); } } } } else { // If our shard name is stale, our version must have been stale as well dassert(writeErrors.size() == request.sizeWriteOps()); } } // // Construct response // response->setOk(true); if (!silentWC) { if (upserted.size()) { response->setUpsertDetails(upserted); } if (writeErrors.size()) { response->setErrDetails(writeErrors); } if (wcError.get()) { response->setWriteConcernError(wcError.release()); } repl::ReplicationCoordinator* replCoord = repl::getGlobalReplicationCoordinator(); const repl::ReplicationCoordinator::Mode replMode = replCoord->getReplicationMode(); if (replMode != repl::ReplicationCoordinator::modeNone) { response->setLastOp( repl::ReplClientInfo::forClient(_txn->getClient()).getLastOp().getTimestamp()); if (replMode == repl::ReplicationCoordinator::modeReplSet) { response->setElectionId(replCoord->getElectionId()); } } // Set the stats for the response response->setN(_stats->numInserted + _stats->numUpserted + _stats->numMatched + _stats->numDeleted); if (request.getBatchType() == BatchedCommandRequest::BatchType_Update) response->setNModified(_stats->numModified); } dassert(response->isValid(NULL)); } // Translates write item type to wire protocol op code. // Helper for WriteBatchExecutor::applyWriteItem(). static int getOpCode(const BatchItemRef& currWrite) { switch (currWrite.getRequest()->getBatchType()) { case BatchedCommandRequest::BatchType_Insert: return dbInsert; case BatchedCommandRequest::BatchType_Update: return dbUpdate; case BatchedCommandRequest::BatchType_Delete: return dbDelete; default: MONGO_UNREACHABLE; } } static void buildStaleError(const ChunkVersion& shardVersionRecvd, const ChunkVersion& shardVersionWanted, WriteErrorDetail* error) { // Write stale error to results error->setErrCode(ErrorCodes::StaleShardVersion); BSONObjBuilder infoB; shardVersionWanted.addToBSON(infoB, "vWanted"); error->setErrInfo(infoB.obj()); string errMsg = stream() << "stale shard version detected before write, received " << shardVersionRecvd.toString() << " but local version is " << shardVersionWanted.toString(); error->setErrMessage(errMsg); } static bool checkShardVersion(OperationContext* txn, ShardingState* shardingState, const BatchedCommandRequest& request, WriteOpResult* result) { const NamespaceString& nss = request.getTargetingNSS(); dassert(txn->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IX)); ChunkVersion requestShardVersion = request.isMetadataSet() && request.getMetadata()->isShardVersionSet() ? request.getMetadata()->getShardVersion() : ChunkVersion::IGNORED(); if (shardingState->enabled()) { CollectionMetadataPtr metadata = shardingState->getCollectionMetadata(nss.ns()); if (!ChunkVersion::isIgnoredVersion(requestShardVersion)) { ChunkVersion shardVersion = metadata ? metadata->getShardVersion() : ChunkVersion::UNSHARDED(); if (!requestShardVersion.isWriteCompatibleWith(shardVersion)) { result->setError(new WriteErrorDetail); buildStaleError(requestShardVersion, shardVersion, result->getError()); return false; } } } return true; } static bool checkIsMasterForDatabase(const NamespaceString& ns, WriteOpResult* result) { if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(ns)) { WriteErrorDetail* errorDetail = new WriteErrorDetail; result->setError(errorDetail); errorDetail->setErrCode(ErrorCodes::NotMaster); errorDetail->setErrMessage("Not primary while writing to " + ns.toString()); return false; } return true; } static void buildUniqueIndexError(const BSONObj& keyPattern, const BSONObj& indexPattern, WriteErrorDetail* error) { error->setErrCode(ErrorCodes::CannotCreateIndex); string errMsg = stream() << "cannot create unique index over " << indexPattern << " with shard key pattern " << keyPattern; error->setErrMessage(errMsg); } static bool checkIndexConstraints(OperationContext* txn, ShardingState* shardingState, const BatchedCommandRequest& request, WriteOpResult* result) { const NamespaceString& nss = request.getTargetingNSS(); dassert(txn->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IX)); if (!request.isUniqueIndexRequest()) return true; if (shardingState->enabled()) { CollectionMetadataPtr metadata = shardingState->getCollectionMetadata(nss.ns()); if (metadata) { ShardKeyPattern shardKeyPattern(metadata->getKeyPattern()); if (!shardKeyPattern.isUniqueIndexCompatible(request.getIndexKeyPattern())) { result->setError(new WriteErrorDetail); buildUniqueIndexError( metadata->getKeyPattern(), request.getIndexKeyPattern(), result->getError()); return false; } } } return true; } // // HELPERS FOR CUROP MANAGEMENT AND GLOBAL STATS // static void beginCurrentOp(OperationContext* txn, const BatchItemRef& currWrite) { stdx::lock_guard lk(*txn->getClient()); CurOp* const currentOp = CurOp::get(txn); currentOp->setOp_inlock(getOpCode(currWrite)); currentOp->ensureStarted(); currentOp->setNS_inlock(currWrite.getRequest()->getNS()); currentOp->debug().ns = currentOp->getNS(); currentOp->debug().op = currentOp->getOp(); if (currWrite.getOpType() == BatchedCommandRequest::BatchType_Insert) { currentOp->setQuery_inlock(currWrite.getDocument()); currentOp->debug().query = currWrite.getDocument(); currentOp->debug().ninserted = 0; } else if (currWrite.getOpType() == BatchedCommandRequest::BatchType_Update) { currentOp->setQuery_inlock(currWrite.getUpdate()->getQuery()); currentOp->debug().query = currWrite.getUpdate()->getQuery(); currentOp->debug().updateobj = currWrite.getUpdate()->getUpdateExpr(); // Note: debug().nMatched, nModified and nmoved are set internally in update } else { dassert(currWrite.getOpType() == BatchedCommandRequest::BatchType_Delete); currentOp->setQuery_inlock(currWrite.getDelete()->getQuery()); currentOp->debug().query = currWrite.getDelete()->getQuery(); currentOp->debug().ndeleted = 0; } } void WriteBatchExecutor::incOpStats(const BatchItemRef& currWrite) { if (currWrite.getOpType() == BatchedCommandRequest::BatchType_Insert) { _opCounters->gotInsert(); } else if (currWrite.getOpType() == BatchedCommandRequest::BatchType_Update) { _opCounters->gotUpdate(); } else { dassert(currWrite.getOpType() == BatchedCommandRequest::BatchType_Delete); _opCounters->gotDelete(); } } void WriteBatchExecutor::incWriteStats(const BatchItemRef& currWrite, const WriteOpStats& stats, const WriteErrorDetail* error, CurOp* currentOp) { if (currWrite.getOpType() == BatchedCommandRequest::BatchType_Insert) { _stats->numInserted += stats.n; currentOp->debug().ninserted += stats.n; if (!error) { _le->recordInsert(stats.n); } } else if (currWrite.getOpType() == BatchedCommandRequest::BatchType_Update) { if (stats.upsertedID.isEmpty()) { _stats->numMatched += stats.n; _stats->numModified += stats.nModified; } else { ++_stats->numUpserted; } if (!error) { _le->recordUpdate(stats.upsertedID.isEmpty() && stats.n > 0, stats.n, stats.upsertedID); } } else { dassert(currWrite.getOpType() == BatchedCommandRequest::BatchType_Delete); _stats->numDeleted += stats.n; if (!error) { _le->recordDelete(stats.n); } currentOp->debug().ndeleted += stats.n; } if (error) { _le->setLastError(error->getErrCode(), error->getErrMessage().c_str()); } } static void finishCurrentOp(OperationContext* txn, WriteErrorDetail* opError) { CurOp* currentOp = CurOp::get(txn); currentOp->done(); int executionTime = currentOp->debug().executionTime = currentOp->totalTimeMillis(); recordCurOpMetrics(txn); Top::get(txn->getClient()->getServiceContext()) .record(currentOp->getNS(), currentOp->getOp(), 1, // "write locked" currentOp->totalTimeMicros(), currentOp->isCommand()); if (opError) { currentOp->debug().exceptionInfo = ExceptionInfo(opError->getErrMessage(), opError->getErrCode()); LOG(3) << " Caught Assertion in " << opToString(currentOp->getOp()) << ", continuing " << causedBy(opError->getErrMessage()) << endl; } bool logAll = logger::globalLogDomain()->shouldLog(logger::LogComponent::kWrite, logger::LogSeverity::Debug(1)); bool logSlow = executionTime > (serverGlobalParams.slowMS + currentOp->getExpectedLatencyMs()); if (logAll || logSlow) { Locker::LockerInfo lockerInfo; txn->lockState()->getLockerInfo(&lockerInfo); LOG(0) << currentOp->debug().report(*currentOp, lockerInfo.stats); } if (currentOp->shouldDBProfile(executionTime)) { profile(txn, CurOp::get(txn)->getOp()); } } // END HELPERS // // CORE WRITE OPERATIONS (declaration) // These functions write to the database and return stats and zero or one of: // - page fault // - error // static void singleInsert(OperationContext* txn, const BSONObj& docToInsert, Collection* collection, WriteOpResult* result); static void singleCreateIndex(OperationContext* txn, const BSONObj& indexDesc, WriteOpResult* result); static void multiUpdate(OperationContext* txn, const BatchItemRef& updateItem, WriteOpResult* result); static void multiRemove(OperationContext* txn, const BatchItemRef& removeItem, WriteOpResult* result); // // WRITE EXECUTION // In general, the exec* operations manage db lock state and stats before dispatching to the // core write operations, which are *only* responsible for performing a write and reporting // success or failure. // /** * Representation of the execution state of execInserts. Used by a single * execution of execInserts in a single thread. */ class WriteBatchExecutor::ExecInsertsState { MONGO_DISALLOW_COPYING(ExecInsertsState); public: /** * Constructs a new instance, for performing inserts described in "aRequest". */ explicit ExecInsertsState(OperationContext* txn, const BatchedCommandRequest* aRequest); /** * Acquires the write lock and client context needed to perform the current write operation. * Returns true on success, after which it is safe to use the "context" and "collection" * members. It is safe to call this function if this instance already holds the write lock. * * On failure, writeLock, context and collection will be NULL/clear. */ bool lockAndCheck(WriteOpResult* result); /** * Releases the client context and write lock acquired by lockAndCheck. Safe to call * regardless of whether or not this state object currently owns the lock. */ void unlock(); /** * Returns true if this executor has the lock on the target database. */ bool hasLock() { return _dbLock.get(); } /** * Gets the target collection for the batch operation. Value is undefined * unless hasLock() is true. */ Collection* getCollection() { return _collection; } OperationContext* txn; // Request object describing the inserts. const BatchedCommandRequest* request; // Index of the current insert operation to perform. size_t currIndex = 0; // Translation of insert documents in "request" into insert-ready forms. This vector has a // correspondence with elements of the "request", and "currIndex" is used to // index both. std::vector> normalizedInserts; private: bool _lockAndCheckImpl(WriteOpResult* result, bool intentLock); ScopedTransaction _transaction; // Guard object for the write lock on the target database. std::unique_ptr _dbLock; std::unique_ptr _collLock; Database* _database = nullptr; Collection* _collection = nullptr; }; void WriteBatchExecutor::bulkExecute(const BatchedCommandRequest& request, std::vector* upsertedIds, std::vector* errors) { boost::optional maybeDisableValidation; if (request.shouldBypassValidation()) { maybeDisableValidation.emplace(_txn); } if (request.getBatchType() == BatchedCommandRequest::BatchType_Insert) { execInserts(request, errors); } else if (request.getBatchType() == BatchedCommandRequest::BatchType_Update) { for (size_t i = 0; i < request.sizeWriteOps(); i++) { if (i + 1 == request.sizeWriteOps()) { setupSynchronousCommit(_txn); } WriteErrorDetail* error = NULL; BSONObj upsertedId; execUpdate(BatchItemRef(&request, i), &upsertedId, &error); if (!upsertedId.isEmpty()) { BatchedUpsertDetail* batchUpsertedId = new BatchedUpsertDetail; batchUpsertedId->setIndex(i); batchUpsertedId->setUpsertedID(upsertedId); upsertedIds->push_back(batchUpsertedId); } if (error) { errors->push_back(error); if (request.getOrdered()) break; } } } else { dassert(request.getBatchType() == BatchedCommandRequest::BatchType_Delete); for (size_t i = 0; i < request.sizeWriteOps(); i++) { if (i + 1 == request.sizeWriteOps()) { setupSynchronousCommit(_txn); } WriteErrorDetail* error = NULL; execRemove(BatchItemRef(&request, i), &error); if (error) { errors->push_back(error); if (request.getOrdered()) break; } } } // Fill in stale version errors for unordered batches (update/delete can't do this on own) if (!errors->empty() && !request.getOrdered()) { const WriteErrorDetail* finalError = errors->back(); if (finalError->getErrCode() == ErrorCodes::StaleShardVersion) { for (size_t i = finalError->getIndex() + 1; i < request.sizeWriteOps(); i++) { WriteErrorDetail* dupStaleError = new WriteErrorDetail; finalError->cloneTo(dupStaleError); errors->push_back(dupStaleError); } } } } // Goes over the request and preprocesses normalized versions of all the inserts in the request static void normalizeInserts(const BatchedCommandRequest& request, vector>* normalizedInserts) { normalizedInserts->reserve(request.sizeWriteOps()); for (size_t i = 0; i < request.sizeWriteOps(); ++i) { BSONObj insertDoc = request.getInsertRequest()->getDocumentsAt(i); StatusWith normalInsert = fixDocumentForInsert(insertDoc); normalizedInserts->push_back(normalInsert); if (request.getOrdered() && !normalInsert.isOK()) break; } } void WriteBatchExecutor::execInserts(const BatchedCommandRequest& request, std::vector* errors) { // Theory of operation: // // Instantiates an ExecInsertsState, which represents all of the state involved in the batch // insert execution algorithm. Most importantly, encapsulates the lock state. // // Every iteration of the loop in execInserts() processes one document insertion, by calling // insertOne() exactly once for a given value of state.currIndex. // // If the ExecInsertsState indicates that the requisite write locks are not held, insertOne // acquires them and performs lock-acquisition-time checks. However, on non-error // execution, it does not release the locks. Therefore, the yielding logic in the while // loop in execInserts() is solely responsible for lock release in the non-error case. // // Internally, insertOne loops performing the single insert until it completes without a // PageFaultException, or until it fails with some kind of error. Errors are mostly // propagated via the request->error field, but DBExceptions or std::exceptions may escape, // particularly on operation interruption. These kinds of errors necessarily prevent // further insertOne calls, and stop the batch. As a result, the only expected source of // such exceptions are interruptions. ExecInsertsState state(_txn, &request); normalizeInserts(request, &state.normalizedInserts); ShardedConnectionInfo* info = ShardedConnectionInfo::get(_txn->getClient(), false); if (info) { if (request.isMetadataSet() && request.getMetadata()->isShardVersionSet()) { info->setVersion(request.getTargetingNS(), request.getMetadata()->getShardVersion()); } else { info->setVersion(request.getTargetingNS(), ChunkVersion::IGNORED()); } } // Yield frequency is based on the same constants used by PlanYieldPolicy. ElapsedTracker elapsedTracker(internalQueryExecYieldIterations, internalQueryExecYieldPeriodMS); for (state.currIndex = 0; state.currIndex < state.request->sizeWriteOps(); ++state.currIndex) { if (state.currIndex + 1 == state.request->sizeWriteOps()) { setupSynchronousCommit(_txn); } if (elapsedTracker.intervalHasElapsed()) { // Yield between inserts. if (state.hasLock()) { // Release our locks. They get reacquired when insertOne() calls // ExecInsertsState::lockAndCheck(). Since the lock manager guarantees FIFO // queues waiting on locks, there is no need to explicitly sleep or give up // control of the processor here. state.unlock(); // This releases any storage engine held locks/snapshots. _txn->recoveryUnit()->abandonSnapshot(); } _txn->checkForInterrupt(); elapsedTracker.resetLastTime(); } WriteErrorDetail* error = NULL; execOneInsert(&state, &error); if (error) { errors->push_back(error); error->setIndex(state.currIndex); if (request.getOrdered()) return; } } } void WriteBatchExecutor::execUpdate(const BatchItemRef& updateItem, BSONObj* upsertedId, WriteErrorDetail** error) { // BEGIN CURRENT OP CurOp currentOp(_txn); beginCurrentOp(_txn, updateItem); incOpStats(updateItem); ShardedConnectionInfo* info = ShardedConnectionInfo::get(_txn->getClient(), false); if (info) { auto rootRequest = updateItem.getRequest(); if (!updateItem.getUpdate()->getMulti() && rootRequest->isMetadataSet() && rootRequest->getMetadata()->isShardVersionSet()) { info->setVersion(rootRequest->getTargetingNS(), rootRequest->getMetadata()->getShardVersion()); } else { info->setVersion(rootRequest->getTargetingNS(), ChunkVersion::IGNORED()); } } WriteOpResult result; multiUpdate(_txn, updateItem, &result); if (!result.getStats().upsertedID.isEmpty()) { *upsertedId = result.getStats().upsertedID; } // END CURRENT OP incWriteStats(updateItem, result.getStats(), result.getError(), ¤tOp); finishCurrentOp(_txn, result.getError()); // End current transaction and release snapshot. _txn->recoveryUnit()->abandonSnapshot(); if (result.getError()) { result.getError()->setIndex(updateItem.getItemIndex()); *error = result.releaseError(); } } void WriteBatchExecutor::execRemove(const BatchItemRef& removeItem, WriteErrorDetail** error) { // Removes are similar to updates, but page faults are handled externally // BEGIN CURRENT OP CurOp currentOp(_txn); beginCurrentOp(_txn, removeItem); incOpStats(removeItem); ShardedConnectionInfo* info = ShardedConnectionInfo::get(_txn->getClient(), false); if (info) { auto rootRequest = removeItem.getRequest(); if (removeItem.getDelete()->getLimit() == 1 && rootRequest->isMetadataSet() && rootRequest->getMetadata()->isShardVersionSet()) { info->setVersion(rootRequest->getTargetingNS(), rootRequest->getMetadata()->getShardVersion()); } else { info->setVersion(rootRequest->getTargetingNS(), ChunkVersion::IGNORED()); } } WriteOpResult result; multiRemove(_txn, removeItem, &result); // END CURRENT OP incWriteStats(removeItem, result.getStats(), result.getError(), ¤tOp); finishCurrentOp(_txn, result.getError()); // End current transaction and release snapshot. _txn->recoveryUnit()->abandonSnapshot(); if (result.getError()) { result.getError()->setIndex(removeItem.getItemIndex()); *error = result.releaseError(); } } // // IN-DB-LOCK CORE OPERATIONS // WriteBatchExecutor::ExecInsertsState::ExecInsertsState(OperationContext* txn, const BatchedCommandRequest* aRequest) : txn(txn), request(aRequest), _transaction(txn, MODE_IX) {} bool WriteBatchExecutor::ExecInsertsState::_lockAndCheckImpl(WriteOpResult* result, bool intentLock) { if (hasLock()) { CurOp::get(txn)->raiseDbProfileLevel(_database->getProfilingLevel()); return true; } if (request->isInsertIndexRequest()) intentLock = false; // can't build indexes in intent mode const NamespaceString& nss = request->getNSS(); invariant(!_collLock); invariant(!_dbLock); _dbLock = stdx::make_unique(txn->lockState(), nss.db(), intentLock ? MODE_IX : MODE_X); _database = dbHolder().get(txn, nss.ns()); if (intentLock && !_database) { // Ensure exclusive lock in case the database doesn't yet exist _dbLock.reset(); _dbLock = stdx::make_unique(txn->lockState(), nss.db(), MODE_X); intentLock = false; } _collLock = stdx::make_unique( txn->lockState(), nss.ns(), intentLock ? MODE_IX : MODE_X); if (!checkIsMasterForDatabase(nss, result)) { return false; } if (!checkShardVersion(txn, &shardingState, *request, result)) { return false; } if (!checkIndexConstraints(txn, &shardingState, *request, result)) { return false; } if (!_database) { invariant(!intentLock); _database = dbHolder().openDb(txn, nss.ns()); } CurOp::get(txn)->raiseDbProfileLevel(_database->getProfilingLevel()); _collection = _database->getCollection(request->getTargetingNS()); if (!_collection) { if (intentLock) { // try again with full X lock. unlock(); return _lockAndCheckImpl(result, false); } WriteUnitOfWork wunit(txn); // Implicitly create if it doesn't exist _collection = _database->createCollection(txn, request->getTargetingNS()); if (!_collection) { result->setError( toWriteError(Status(ErrorCodes::InternalError, "could not create collection " + request->getTargetingNS()))); return false; } wunit.commit(); } return true; } bool WriteBatchExecutor::ExecInsertsState::lockAndCheck(WriteOpResult* result) { if (_lockAndCheckImpl(result, true)) return true; unlock(); return false; } void WriteBatchExecutor::ExecInsertsState::unlock() { _collection = nullptr; _database = nullptr; _collLock.reset(); _dbLock.reset(); } static void insertOne(WriteBatchExecutor::ExecInsertsState* state, WriteOpResult* result) { // we have to be top level so we can retry invariant(!state->txn->lockState()->inAWriteUnitOfWork()); invariant(state->currIndex < state->normalizedInserts.size()); const StatusWith& normalizedInsert(state->normalizedInserts[state->currIndex]); if (!normalizedInsert.isOK()) { result->setError(toWriteError(normalizedInsert.getStatus())); return; } const BSONObj& insertDoc = normalizedInsert.getValue().isEmpty() ? state->request->getInsertRequest()->getDocumentsAt(state->currIndex) : normalizedInsert.getValue(); int attempt = 0; while (true) { try { if (!state->request->isInsertIndexRequest()) { if (state->lockAndCheck(result)) { singleInsert(state->txn, insertDoc, state->getCollection(), result); } } else { singleCreateIndex(state->txn, insertDoc, result); } break; } catch (const WriteConflictException& wce) { state->unlock(); CurOp::get(state->txn)->debug().writeConflicts++; state->txn->recoveryUnit()->abandonSnapshot(); WriteConflictException::logAndBackoff( attempt++, "insert", state->getCollection() ? state->getCollection()->ns().ns() : "index"); } catch (const StaleConfigException& staleExcep) { result->setError(new WriteErrorDetail); result->getError()->setErrCode(ErrorCodes::StaleShardVersion); buildStaleError( staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError()); break; } catch (const DBException& ex) { Status status(ex.toStatus()); if (ErrorCodes::isInterruption(status.code())) throw; result->setError(toWriteError(status)); break; } } // Errors release the write lock, as a matter of policy. if (result->getError()) { state->txn->recoveryUnit()->abandonSnapshot(); state->unlock(); } } void WriteBatchExecutor::execOneInsert(ExecInsertsState* state, WriteErrorDetail** error) { BatchItemRef currInsertItem(state->request, state->currIndex); CurOp currentOp(_txn); beginCurrentOp(_txn, currInsertItem); incOpStats(currInsertItem); WriteOpResult result; insertOne(state, &result); incWriteStats(currInsertItem, result.getStats(), result.getError(), ¤tOp); finishCurrentOp(_txn, result.getError()); if (result.getError()) { *error = result.releaseError(); } } /** * Perform a single insert into a collection. Requires the insert be preprocessed and the * collection already has been created. * * Might fault or error, otherwise populates the result. */ static void singleInsert(OperationContext* txn, const BSONObj& docToInsert, Collection* collection, WriteOpResult* result) { const string& insertNS = collection->ns().ns(); invariant(txn->lockState()->isCollectionLockedForMode(insertNS, MODE_IX)); WriteUnitOfWork wunit(txn); StatusWith status = collection->insertDocument(txn, docToInsert, true); if (!status.isOK()) { result->setError(toWriteError(status.getStatus())); } else { result->getStats().n = 1; wunit.commit(); } } /** * Perform a single index creation on a collection. Requires the index descriptor be * preprocessed. * * Might fault or error, otherwise populates the result. */ static void singleCreateIndex(OperationContext* txn, const BSONObj& indexDesc, WriteOpResult* result) { BSONElement nsElement = indexDesc["ns"]; uassert(ErrorCodes::NoSuchKey, "Missing \"ns\" field in index description", !nsElement.eoo()); uassert(ErrorCodes::TypeMismatch, str::stream() << "Expected \"ns\" field of index description to be a " "string, " "but found a " << typeName(nsElement.type()), nsElement.type() == String); const NamespaceString ns(nsElement.valueStringData()); BSONObjBuilder cmdBuilder; cmdBuilder << "createIndexes" << ns.coll(); cmdBuilder << "indexes" << BSON_ARRAY(indexDesc); BSONObj cmd = cmdBuilder.done(); Command* createIndexesCmd = Command::findCommand("createIndexes"); invariant(createIndexesCmd); std::string errmsg; BSONObjBuilder resultBuilder; const bool success = createIndexesCmd->run(txn, ns.db().toString(), cmd, 0, errmsg, resultBuilder); Command::appendCommandStatus(resultBuilder, success, errmsg); BSONObj cmdResult = resultBuilder.done(); uassertStatusOK(Command::getStatusFromCommandResult(cmdResult)); result->getStats().n = cmdResult["numIndexesAfter"].numberInt() - cmdResult["numIndexesBefore"].numberInt(); } static void multiUpdate(OperationContext* txn, const BatchItemRef& updateItem, WriteOpResult* result) { const NamespaceString nsString(updateItem.getRequest()->getNS()); const bool isMulti = updateItem.getUpdate()->getMulti(); UpdateRequest request(nsString); request.setQuery(updateItem.getUpdate()->getQuery()); request.setUpdates(updateItem.getUpdate()->getUpdateExpr()); request.setMulti(isMulti); request.setUpsert(updateItem.getUpdate()->getUpsert()); UpdateLifecycleImpl updateLifecycle(true, request.getNamespaceString()); request.setLifecycle(&updateLifecycle); // Updates from the write commands path can yield. request.setYieldPolicy(PlanExecutor::YIELD_AUTO); int attempt = 0; bool createCollection = false; for (int fakeLoop = 0; fakeLoop < 1; fakeLoop++) { ParsedUpdate parsedUpdate(txn, &request); Status status = parsedUpdate.parseRequest(); if (!status.isOK()) { result->setError(toWriteError(status)); return; } if (createCollection) { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { const AutoGetOrCreateDb adb{txn, nsString.db(), MODE_X}; if (!checkIsMasterForDatabase(nsString, result)) { return; } Database* const db = adb.getDb(); if (db->getCollection(nsString.ns())) { // someone else beat us to it } else { WriteUnitOfWork wuow(txn); uassertStatusOK(userCreateNS(txn, db, nsString.ns(), BSONObj())); wuow.commit(); } } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "update", nsString.ns()); } /////////////////////////////////////////// ScopedTransaction transaction(txn, MODE_IX); Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX); Lock::CollectionLock colLock( txn->lockState(), nsString.ns(), parsedUpdate.isIsolated() ? MODE_X : MODE_IX); /////////////////////////////////////////// if (!checkIsMasterForDatabase(nsString, result)) { return; } if (!checkShardVersion(txn, &shardingState, *updateItem.getRequest(), result)) return; Database* const db = dbHolder().get(txn, nsString.db()); if (db == NULL) { if (createCollection) { // we raced with some, accept defeat result->getStats().nModified = 0; result->getStats().n = 0; return; } // Database not yet created if (!request.isUpsert()) { // not an upsert, no database, nothing to do result->getStats().nModified = 0; result->getStats().n = 0; return; } // upsert, don't try to get a context as no MODE_X lock is held fakeLoop = -1; createCollection = true; continue; } CurOp::get(txn)->raiseDbProfileLevel(db->getProfilingLevel()); Collection* collection = db->getCollection(nsString.ns()); if (collection == NULL) { if (createCollection) { // we raced with some, accept defeat result->getStats().nModified = 0; result->getStats().n = 0; return; } if (!request.isUpsert()) { // not an upsert, no collection, nothing to do result->getStats().nModified = 0; result->getStats().n = 0; return; } // upsert, mark that we should create collection fakeLoop = -1; createCollection = true; continue; } OpDebug* debug = &CurOp::get(txn)->debug(); try { invariant(collection); PlanExecutor* rawExec; uassertStatusOK(getExecutorUpdate(txn, collection, &parsedUpdate, debug, &rawExec)); std::unique_ptr exec(rawExec); uassertStatusOK(exec->executePlan()); UpdateResult res = UpdateStage::makeUpdateResult(exec.get(), debug); const long long numDocsModified = res.numDocsModified; const long long numMatched = res.numMatched; const BSONObj resUpsertedID = res.upserted; // We have an _id from an insert const bool didInsert = !resUpsertedID.isEmpty(); result->getStats().nModified = didInsert ? 0 : numDocsModified; result->getStats().n = didInsert ? 1 : numMatched; result->getStats().upsertedID = resUpsertedID; } catch (const WriteConflictException& dle) { debug->writeConflicts++; if (isMulti) { log() << "Had WriteConflict during multi update, aborting"; throw; } createCollection = false; // RESTART LOOP fakeLoop = -1; txn->recoveryUnit()->abandonSnapshot(); WriteConflictException::logAndBackoff(attempt++, "update", nsString.ns()); } catch (const StaleConfigException& staleExcep) { result->setError(new WriteErrorDetail); result->getError()->setErrCode(ErrorCodes::StaleShardVersion); buildStaleError( staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError()); } catch (const DBException& ex) { Status status = ex.toStatus(); if (ErrorCodes::isInterruption(status.code())) { throw; } result->setError(toWriteError(status)); } } } /** * Perform a remove operation, which might remove multiple documents. Dispatches to remove code * currently to do most of this. * * Might fault or error, otherwise populates the result. */ static void multiRemove(OperationContext* txn, const BatchItemRef& removeItem, WriteOpResult* result) { const NamespaceString& nss = removeItem.getRequest()->getNSS(); DeleteRequest request(nss); request.setQuery(removeItem.getDelete()->getQuery()); request.setMulti(removeItem.getDelete()->getLimit() != 1); request.setGod(false); // Deletes running through the write commands path can yield. request.setYieldPolicy(PlanExecutor::YIELD_AUTO); int attempt = 1; while (1) { try { ParsedDelete parsedDelete(txn, &request); Status status = parsedDelete.parseRequest(); if (!status.isOK()) { result->setError(toWriteError(status)); return; } ScopedTransaction scopedXact(txn, MODE_IX); AutoGetDb autoDb(txn, nss.db(), MODE_IX); if (!autoDb.getDb()) { break; } CurOp::get(txn)->raiseDbProfileLevel(autoDb.getDb()->getProfilingLevel()); Lock::CollectionLock collLock( txn->lockState(), nss.ns(), parsedDelete.isIsolated() ? MODE_X : MODE_IX); // getExecutorDelete() also checks if writes are allowed. if (!checkIsMasterForDatabase(nss, result)) { return; } // Check version once we're locked if (!checkShardVersion(txn, &shardingState, *removeItem.getRequest(), result)) { // Version error return; } PlanExecutor* rawExec; uassertStatusOK(getExecutorDelete( txn, autoDb.getDb()->getCollection(nss), &parsedDelete, &rawExec)); std::unique_ptr exec(rawExec); // Execute the delete and retrieve the number deleted. uassertStatusOK(exec->executePlan()); result->getStats().n = DeleteStage::getNumDeleted(exec.get()); break; } catch (const WriteConflictException& dle) { CurOp::get(txn)->debug().writeConflicts++; WriteConflictException::logAndBackoff(attempt++, "delete", nss.ns()); } catch (const StaleConfigException& staleExcep) { result->setError(new WriteErrorDetail); result->getError()->setErrCode(ErrorCodes::StaleShardVersion); buildStaleError( staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError()); return; } catch (const DBException& ex) { Status status = ex.toStatus(); if (ErrorCodes::isInterruption(status.code())) { throw; } result->setError(toWriteError(status)); return; } } } } // namespace mongo