/** * Copyright (C) 2013 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kWrite #include "mongo/platform/basic.h" #include "mongo/db/commands/write_commands/batch_executor.h" #include #include #include "mongo/base/error_codes.h" #include "mongo/db/catalog/database.h" #include "mongo/db/clientcursor.h" #include "mongo/db/commands.h" #include "mongo/db/instance.h" #include "mongo/db/introspect.h" #include "mongo/db/lasterror.h" #include "mongo/db/namespace_string.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/index_create.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/exec/delete.h" #include "mongo/db/exec/update.h" #include "mongo/db/ops/delete_request.h" #include "mongo/db/ops/parsed_delete.h" #include "mongo/db/ops/parsed_update.h" #include "mongo/db/ops/insert.h" #include "mongo/db/ops/update_lifecycle_impl.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/query/query_knobs.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/server_parameters.h" #include "mongo/db/stats/counters.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/write_concern.h" #include "mongo/s/collection_metadata.h" #include "mongo/s/d_state.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/s/write_ops/batched_upsert_detail.h" #include "mongo/s/write_ops/write_error_detail.h" #include "mongo/util/elapsed_tracker.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" namespace mongo { using boost::scoped_ptr; namespace { /** * Data structure to safely hold and clean up results of single write operations. */ class WriteOpResult { MONGO_DISALLOW_COPYING(WriteOpResult); public: WriteOpResult() {} WriteOpStats& getStats() { return _stats; } WriteErrorDetail* getError() { return _error.get(); } WriteErrorDetail* releaseError() { return _error.release(); } void setError(WriteErrorDetail* error) { _error.reset(error); } private: WriteOpStats _stats; std::auto_ptr _error; }; } // namespace // TODO: Determine queueing behavior we want here MONGO_EXPORT_SERVER_PARAMETER( queueForMigrationCommit, bool, true ); using mongoutils::str::stream; WriteBatchExecutor::WriteBatchExecutor( OperationContext* txn, const WriteConcernOptions& wc, OpCounters* opCounters, LastError* le ) : _txn(txn), _defaultWriteConcern( wc ), _opCounters( opCounters ), _le( le ), _stats( new WriteBatchStats ) { } static WCErrorDetail* toWriteConcernError( const Status& wcStatus, const WriteConcernResult& wcResult ) { WCErrorDetail* wcError = new WCErrorDetail; wcError->setErrCode( wcStatus.code() ); wcError->setErrMessage( wcStatus.reason() ); if ( wcResult.wTimedOut ) wcError->setErrInfo( BSON( "wtimeout" << true ) ); return wcError; } static WriteErrorDetail* toWriteError( const Status& status ) { WriteErrorDetail* error = new WriteErrorDetail; // TODO: Complex transform here? error->setErrCode( status.code() ); error->setErrMessage( status.reason() ); return error; } static void toBatchError( const Status& status, BatchedCommandResponse* response ) { response->clear(); response->setErrCode( status.code() ); response->setErrMessage( status.reason() ); response->setOk( false ); dassert( response->isValid(NULL) ); } static void noteInCriticalSection( WriteErrorDetail* staleError ) { BSONObjBuilder builder; if ( staleError->isErrInfoSet() ) builder.appendElements( staleError->getErrInfo() ); builder.append( "inCriticalSection", true ); staleError->setErrInfo( builder.obj() ); } // static Status WriteBatchExecutor::validateBatch( const BatchedCommandRequest& request ) { // Validate namespace const NamespaceString& nss = request.getNSS(); if ( !nss.isValid() ) { return Status( ErrorCodes::InvalidNamespace, nss.ns() + " is not a valid namespace" ); } // Make sure we can write to the namespace Status allowedStatus = userAllowedWriteNS( nss ); if ( !allowedStatus.isOK() ) { return allowedStatus; } // Validate insert index requests // TODO: Push insert index requests through createIndex once all upgrade paths support it string errMsg; if ( request.isInsertIndexRequest() && !request.isValidIndexRequest( &errMsg ) ) { return Status( ErrorCodes::InvalidOptions, errMsg ); } return Status::OK(); } void WriteBatchExecutor::executeBatch( const BatchedCommandRequest& request, BatchedCommandResponse* response ) { // Validate namespace Status isValid = validateBatch(request); if (!isValid.isOK()) { toBatchError( isValid, response ); return; } // Validate write concern // TODO: Lift write concern parsing out of this entirely WriteConcernOptions writeConcern; BSONObj wcDoc; if ( request.isWriteConcernSet() ) { wcDoc = request.getWriteConcern(); } Status wcStatus = Status::OK(); if ( wcDoc.isEmpty() ) { // The default write concern if empty is w : 1 // Specifying w : 0 is/was allowed, but is interpreted identically to w : 1 writeConcern = _defaultWriteConcern; if ( writeConcern.wNumNodes == 0 && writeConcern.wMode.empty() ) { writeConcern.wNumNodes = 1; } } else { wcStatus = writeConcern.parse( wcDoc ); } if ( wcStatus.isOK() ) { wcStatus = validateWriteConcern( writeConcern ); } if ( !wcStatus.isOK() ) { toBatchError( wcStatus, response ); return; } if ( writeConcern.syncMode == WriteConcernOptions::JOURNAL || writeConcern.syncMode == WriteConcernOptions::FSYNC ) { _txn->recoveryUnit()->goingToAwaitCommit(); } if ( request.sizeWriteOps() == 0u ) { toBatchError( Status( ErrorCodes::InvalidLength, "no write ops were included in the batch" ), response ); return; } // Validate batch size if ( request.sizeWriteOps() > BatchedCommandRequest::kMaxWriteBatchSize ) { toBatchError( Status( ErrorCodes::InvalidLength, stream() << "exceeded maximum write batch size of " << BatchedCommandRequest::kMaxWriteBatchSize ), response ); return; } // // End validation // bool silentWC = writeConcern.wMode.empty() && writeConcern.wNumNodes == 0 && writeConcern.syncMode == WriteConcernOptions::NONE; Timer commandTimer; OwnedPointerVector writeErrorsOwned; vector& writeErrors = writeErrorsOwned.mutableVector(); OwnedPointerVector upsertedOwned; vector& upserted = upsertedOwned.mutableVector(); // // Apply each batch item, possibly bulking some items together in the write lock. // Stops on error if batch is ordered. // bulkExecute( request, &upserted, &writeErrors ); // // Try to enforce the write concern if everything succeeded (unordered or ordered) // OR if something succeeded and we're unordered. // auto_ptr wcError; bool needToEnforceWC = writeErrors.empty() || ( !request.getOrdered() && writeErrors.size() < request.sizeWriteOps() ); if ( needToEnforceWC ) { _txn->getCurOp()->setMessage( "waiting for write concern" ); WriteConcernResult res; Status status = waitForWriteConcern( _txn, writeConcern, _txn->getClient()->getLastOp(), &res ); if ( !status.isOK() ) { wcError.reset( toWriteConcernError( status, res ) ); } } // // Refresh metadata if needed // bool staleBatch = !writeErrors.empty() && writeErrors.back()->getErrCode() == ErrorCodes::StaleShardVersion; if ( staleBatch ) { const BatchedRequestMetadata* requestMetadata = request.getMetadata(); dassert( requestMetadata ); // Make sure our shard name is set or is the same as what was set previously if ( shardingState.setShardName( requestMetadata->getShardName() ) ) { // // First, we refresh metadata if we need to based on the requested version. // ChunkVersion latestShardVersion; shardingState.refreshMetadataIfNeeded( _txn, request.getTargetingNS(), requestMetadata->getShardVersion(), &latestShardVersion ); // Report if we're still changing our metadata // TODO: Better reporting per-collection if ( shardingState.inCriticalMigrateSection() ) { noteInCriticalSection( writeErrors.back() ); } if ( queueForMigrationCommit ) { // // Queue up for migration to end - this allows us to be sure that clients will // not repeatedly try to refresh metadata that is not yet written to the config // server. Not necessary for correctness. // Exposed as optional parameter to allow testing of queuing behavior with // different network timings. // const ChunkVersion& requestShardVersion = requestMetadata->getShardVersion(); // // Only wait if we're an older version (in the current collection epoch) and // we're not write compatible, implying that the current migration is affecting // writes. // if ( requestShardVersion.isOlderThan( latestShardVersion ) && !requestShardVersion.isWriteCompatibleWith( latestShardVersion ) ) { while ( shardingState.inCriticalMigrateSection() ) { log() << "write request to old shard version " << requestMetadata->getShardVersion().toString() << " waiting for migration commit" << endl; shardingState.waitTillNotInCriticalSection( 10 /* secs */); } } } } else { // If our shard name is stale, our version must have been stale as well dassert( writeErrors.size() == request.sizeWriteOps() ); } } // // Construct response // response->setOk( true ); if ( !silentWC ) { if ( upserted.size() ) { response->setUpsertDetails( upserted ); } if ( writeErrors.size() ) { response->setErrDetails( writeErrors ); } if ( wcError.get() ) { response->setWriteConcernError( wcError.release() ); } repl::ReplicationCoordinator* replCoord = repl::getGlobalReplicationCoordinator(); const repl::ReplicationCoordinator::Mode replMode = replCoord->getReplicationMode(); if (replMode != repl::ReplicationCoordinator::modeNone) { response->setLastOp( _txn->getClient()->getLastOp() ); if (replMode == repl::ReplicationCoordinator::modeReplSet) { response->setElectionId(replCoord->getElectionId()); } } // Set the stats for the response response->setN( _stats->numInserted + _stats->numUpserted + _stats->numMatched + _stats->numDeleted ); if ( request.getBatchType() == BatchedCommandRequest::BatchType_Update ) response->setNModified( _stats->numModified ); } dassert( response->isValid( NULL ) ); } // Translates write item type to wire protocol op code. // Helper for WriteBatchExecutor::applyWriteItem(). static int getOpCode( BatchedCommandRequest::BatchType writeType ) { switch ( writeType ) { case BatchedCommandRequest::BatchType_Insert: return dbInsert; case BatchedCommandRequest::BatchType_Update: return dbUpdate; default: dassert( writeType == BatchedCommandRequest::BatchType_Delete ); return dbDelete; } return 0; } static void buildStaleError( const ChunkVersion& shardVersionRecvd, const ChunkVersion& shardVersionWanted, WriteErrorDetail* error ) { // Write stale error to results error->setErrCode( ErrorCodes::StaleShardVersion ); BSONObjBuilder infoB; shardVersionWanted.addToBSON( infoB, "vWanted" ); error->setErrInfo( infoB.obj() ); string errMsg = stream() << "stale shard version detected before write, received " << shardVersionRecvd.toString() << " but local version is " << shardVersionWanted.toString(); error->setErrMessage( errMsg ); } static bool checkShardVersion(OperationContext* txn, ShardingState* shardingState, const BatchedCommandRequest& request, WriteOpResult* result) { const NamespaceString& nss = request.getTargetingNSS(); dassert(txn->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IX)); ChunkVersion requestShardVersion = request.isMetadataSet() && request.getMetadata()->isShardVersionSet() ? request.getMetadata()->getShardVersion() : ChunkVersion::IGNORED(); if ( shardingState->enabled() ) { CollectionMetadataPtr metadata = shardingState->getCollectionMetadata( nss.ns() ); if ( !ChunkVersion::isIgnoredVersion( requestShardVersion ) ) { ChunkVersion shardVersion = metadata ? metadata->getShardVersion() : ChunkVersion::UNSHARDED(); if ( !requestShardVersion.isWriteCompatibleWith( shardVersion ) ) { result->setError(new WriteErrorDetail); buildStaleError(requestShardVersion, shardVersion, result->getError()); return false; } } } return true; } static bool checkIsMasterForDatabase(const NamespaceString& ns, WriteOpResult* result) { if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase( ns.db())) { WriteErrorDetail* errorDetail = new WriteErrorDetail; result->setError(errorDetail); errorDetail->setErrCode(ErrorCodes::NotMaster); errorDetail->setErrMessage("Not primary while writing to " + ns.toString()); return false; } return true; } static void buildUniqueIndexError( const BSONObj& keyPattern, const BSONObj& indexPattern, WriteErrorDetail* error ) { error->setErrCode( ErrorCodes::CannotCreateIndex ); string errMsg = stream() << "cannot create unique index over " << indexPattern << " with shard key pattern " << keyPattern; error->setErrMessage( errMsg ); } static bool checkIndexConstraints(OperationContext* txn, ShardingState* shardingState, const BatchedCommandRequest& request, WriteOpResult* result) { const NamespaceString& nss = request.getTargetingNSS(); dassert(txn->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IX)); if ( !request.isUniqueIndexRequest() ) return true; if ( shardingState->enabled() ) { CollectionMetadataPtr metadata = shardingState->getCollectionMetadata( nss.ns() ); if ( metadata ) { ShardKeyPattern shardKeyPattern(metadata->getKeyPattern()); if (!shardKeyPattern.isUniqueIndexCompatible(request.getIndexKeyPattern())) { result->setError(new WriteErrorDetail); buildUniqueIndexError(metadata->getKeyPattern(), request.getIndexKeyPattern(), result->getError()); return false; } } } return true; } // // HELPERS FOR CUROP MANAGEMENT AND GLOBAL STATS // static void beginCurrentOp( CurOp* currentOp, Client* client, const BatchItemRef& currWrite ) { // Execute the write item as a child operation of the current operation. // This is not done by out callers // Set up the child op with more info HostAndPort remote = client->hasRemote() ? client->getRemote() : HostAndPort( "0.0.0.0", 0 ); // TODO Modify CurOp "wrapped" constructor to take an opcode, so calling .reset() // is unneeded currentOp->reset( remote, getOpCode( currWrite.getRequest()->getBatchType() ) ); currentOp->ensureStarted(); currentOp->setNS( currWrite.getRequest()->getNS() ); currentOp->debug().ns = currentOp->getNS(); currentOp->debug().op = currentOp->getOp(); if ( currWrite.getOpType() == BatchedCommandRequest::BatchType_Insert ) { currentOp->setQuery( currWrite.getDocument() ); currentOp->debug().query = currWrite.getDocument(); currentOp->debug().ninserted = 0; } else if ( currWrite.getOpType() == BatchedCommandRequest::BatchType_Update ) { currentOp->setQuery( currWrite.getUpdate()->getQuery() ); currentOp->debug().query = currWrite.getUpdate()->getQuery(); currentOp->debug().updateobj = currWrite.getUpdate()->getUpdateExpr(); // Note: debug().nMatched, nModified and nmoved are set internally in update } else { dassert( currWrite.getOpType() == BatchedCommandRequest::BatchType_Delete ); currentOp->setQuery( currWrite.getDelete()->getQuery() ); currentOp->debug().query = currWrite.getDelete()->getQuery(); currentOp->debug().ndeleted = 0; } } void WriteBatchExecutor::incOpStats( const BatchItemRef& currWrite ) { if ( currWrite.getOpType() == BatchedCommandRequest::BatchType_Insert ) { _opCounters->gotInsert(); } else if ( currWrite.getOpType() == BatchedCommandRequest::BatchType_Update ) { _opCounters->gotUpdate(); } else { dassert( currWrite.getOpType() == BatchedCommandRequest::BatchType_Delete ); _opCounters->gotDelete(); } } void WriteBatchExecutor::incWriteStats( const BatchItemRef& currWrite, const WriteOpStats& stats, const WriteErrorDetail* error, CurOp* currentOp ) { if ( currWrite.getOpType() == BatchedCommandRequest::BatchType_Insert ) { _stats->numInserted += stats.n; _le->nObjects = stats.n; currentOp->debug().ninserted += stats.n; } else if ( currWrite.getOpType() == BatchedCommandRequest::BatchType_Update ) { if ( stats.upsertedID.isEmpty() ) { _stats->numMatched += stats.n; _stats->numModified += stats.nModified; } else { ++_stats->numUpserted; } if ( !error ) { _le->recordUpdate( stats.upsertedID.isEmpty() && stats.n > 0, stats.n, stats.upsertedID ); } } else { dassert( currWrite.getOpType() == BatchedCommandRequest::BatchType_Delete ); _stats->numDeleted += stats.n; if ( !error ) { _le->recordDelete( stats.n ); } currentOp->debug().ndeleted += stats.n; } if (error && !_le->disabled) { _le->raiseError(error->getErrCode(), error->getErrMessage().c_str()); } } static void finishCurrentOp( OperationContext* txn, CurOp* currentOp, WriteErrorDetail* opError ) { currentOp->done(); int executionTime = currentOp->debug().executionTime = currentOp->totalTimeMillis(); currentOp->debug().recordStats(); if (currentOp->getOp() == dbInsert) { // This is a wrapped operation, so make sure to count this part of the op // SERVER-13339: Properly fix the handling of context in the insert path. // Right now it caches client contexts in ExecInsertsState, unlike the // update and remove operations. currentOp->recordGlobalTime(txn->lockState()->isWriteLocked(), currentOp->totalTimeMicros()); } if ( opError ) { currentOp->debug().exceptionInfo = ExceptionInfo( opError->getErrMessage(), opError->getErrCode() ); LOG(3) << " Caught Assertion in " << opToString( currentOp->getOp() ) << ", continuing " << causedBy( opError->getErrMessage() ) << endl; } bool logAll = logger::globalLogDomain()->shouldLog(logger::LogComponent::kWrite, logger::LogSeverity::Debug(1)); bool logSlow = executionTime > ( serverGlobalParams.slowMS + currentOp->getExpectedLatencyMs() ); if ( logAll || logSlow ) { LOG(0) << currentOp->debug().report( *currentOp ) << endl; } if ( currentOp->shouldDBProfile( executionTime ) ) { profile( txn, *txn->getClient(), currentOp->getOp(), *currentOp ); } } // END HELPERS // // CORE WRITE OPERATIONS (declaration) // These functions write to the database and return stats and zero or one of: // - page fault // - error // static void singleInsert( OperationContext* txn, const BSONObj& docToInsert, Collection* collection, WriteOpResult* result ); static void singleCreateIndex( OperationContext* txn, const BSONObj& indexDesc, WriteOpResult* result ); static void multiUpdate( OperationContext* txn, const BatchItemRef& updateItem, WriteOpResult* result ); static void multiRemove( OperationContext* txn, const BatchItemRef& removeItem, WriteOpResult* result ); // // WRITE EXECUTION // In general, the exec* operations manage db lock state and stats before dispatching to the // core write operations, which are *only* responsible for performing a write and reporting // success or failure. // /** * Representation of the execution state of execInserts. Used by a single * execution of execInserts in a single thread. */ class WriteBatchExecutor::ExecInsertsState { MONGO_DISALLOW_COPYING(ExecInsertsState); public: /** * Constructs a new instance, for performing inserts described in "aRequest". */ explicit ExecInsertsState(OperationContext* txn, const BatchedCommandRequest* aRequest); /** * Acquires the write lock and client context needed to perform the current write operation. * Returns true on success, after which it is safe to use the "context" and "collection" * members. It is safe to call this function if this instance already holds the write lock. * * On failure, writeLock, context and collection will be NULL/clear. */ bool lockAndCheck(WriteOpResult* result); /** * Releases the client context and write lock acquired by lockAndCheck. Safe to call * regardless of whether or not this state object currently owns the lock. */ void unlock(); /** * Returns true if this executor has the lock on the target database. */ bool hasLock() { return _writeLock.get(); } /** * Gets the lock-holding object. Only valid if hasLock(). */ Lock::DBLock& getLock() { return *_writeLock; } /** * Gets the target collection for the batch operation. Value is undefined * unless hasLock() is true. */ Collection* getCollection() { return _collection; } OperationContext* txn; // Request object describing the inserts. const BatchedCommandRequest* request; // Index of the current insert operation to perform. size_t currIndex; // Translation of insert documents in "request" into insert-ready forms. This vector has a // correspondence with elements of the "request", and "currIndex" is used to // index both. std::vector > normalizedInserts; private: bool _lockAndCheckImpl(WriteOpResult* result, bool intentLock=true); ScopedTransaction _transaction; // Guard object for the write lock on the target database. scoped_ptr _writeLock; scoped_ptr _collLock; // Context object on the target database. Must appear after writeLock, so that it is // destroyed in proper order. scoped_ptr _context; // Target collection. Collection* _collection; }; void WriteBatchExecutor::bulkExecute( const BatchedCommandRequest& request, std::vector* upsertedIds, std::vector* errors ) { if ( request.getBatchType() == BatchedCommandRequest::BatchType_Insert ) { execInserts( request, errors ); } else if ( request.getBatchType() == BatchedCommandRequest::BatchType_Update ) { for ( size_t i = 0; i < request.sizeWriteOps(); i++ ) { WriteErrorDetail* error = NULL; BSONObj upsertedId; execUpdate( BatchItemRef( &request, i ), &upsertedId, &error ); if ( !upsertedId.isEmpty() ) { BatchedUpsertDetail* batchUpsertedId = new BatchedUpsertDetail; batchUpsertedId->setIndex( i ); batchUpsertedId->setUpsertedID( upsertedId ); upsertedIds->push_back( batchUpsertedId ); } if ( error ) { errors->push_back( error ); if ( request.getOrdered() ) break; } } } else { dassert( request.getBatchType() == BatchedCommandRequest::BatchType_Delete ); for ( size_t i = 0; i < request.sizeWriteOps(); i++ ) { WriteErrorDetail* error = NULL; execRemove( BatchItemRef( &request, i ), &error ); if ( error ) { errors->push_back( error ); if ( request.getOrdered() ) break; } } } // Fill in stale version errors for unordered batches (update/delete can't do this on own) if ( !errors->empty() && !request.getOrdered() ) { const WriteErrorDetail* finalError = errors->back(); if ( finalError->getErrCode() == ErrorCodes::StaleShardVersion ) { for ( size_t i = finalError->getIndex() + 1; i < request.sizeWriteOps(); i++ ) { WriteErrorDetail* dupStaleError = new WriteErrorDetail; finalError->cloneTo( dupStaleError ); errors->push_back( dupStaleError ); } } } } // Goes over the request and preprocesses normalized versions of all the inserts in the request static void normalizeInserts( const BatchedCommandRequest& request, vector >* normalizedInserts ) { normalizedInserts->reserve(request.sizeWriteOps()); for ( size_t i = 0; i < request.sizeWriteOps(); ++i ) { BSONObj insertDoc = request.getInsertRequest()->getDocumentsAt( i ); StatusWith normalInsert = fixDocumentForInsert( insertDoc ); normalizedInserts->push_back( normalInsert ); if ( request.getOrdered() && !normalInsert.isOK() ) break; } } void WriteBatchExecutor::execInserts( const BatchedCommandRequest& request, std::vector* errors ) { // Theory of operation: // // Instantiates an ExecInsertsState, which represents all of the state involved in the batch // insert execution algorithm. Most importantly, encapsulates the lock state. // // Every iteration of the loop in execInserts() processes one document insertion, by calling // insertOne() exactly once for a given value of state.currIndex. // // If the ExecInsertsState indicates that the requisite write locks are not held, insertOne // acquires them and performs lock-acquisition-time checks. However, on non-error // execution, it does not release the locks. Therefore, the yielding logic in the while // loop in execInserts() is solely responsible for lock release in the non-error case. // // Internally, insertOne loops performing the single insert until it completes without a // PageFaultException, or until it fails with some kind of error. Errors are mostly // propagated via the request->error field, but DBExceptions or std::exceptions may escape, // particularly on operation interruption. These kinds of errors necessarily prevent // further insertOne calls, and stop the batch. As a result, the only expected source of // such exceptions are interruptions. ExecInsertsState state(_txn, &request); normalizeInserts(request, &state.normalizedInserts); // Yield frequency is based on the same constants used by PlanYieldPolicy. ElapsedTracker elapsedTracker(internalQueryExecYieldIterations, internalQueryExecYieldPeriodMS); for (state.currIndex = 0; state.currIndex < state.request->sizeWriteOps(); ++state.currIndex) { if (elapsedTracker.intervalHasElapsed()) { // Yield between inserts. if (state.hasLock()) { // Release our locks. They get reacquired when insertOne() calls // ExecInsertsState::lockAndCheck(). Since the lock manager guarantees FIFO // queues waiting on locks, there is no need to explicitly sleep or give up // control of the processor here. state.unlock(); // This releases any storage engine held locks/snapshots. _txn->recoveryUnit()->commitAndRestart(); } _txn->checkForInterrupt(); elapsedTracker.resetLastTime(); } WriteErrorDetail* error = NULL; execOneInsert(&state, &error); if (error) { errors->push_back(error); error->setIndex(state.currIndex); if (request.getOrdered()) return; } } } void WriteBatchExecutor::execUpdate( const BatchItemRef& updateItem, BSONObj* upsertedId, WriteErrorDetail** error ) { // BEGIN CURRENT OP CurOp currentOp( _txn->getClient(), _txn->getClient()->curop() ); beginCurrentOp( ¤tOp, _txn->getClient(), updateItem ); incOpStats( updateItem ); WriteOpResult result; multiUpdate( _txn, updateItem, &result ); if ( !result.getStats().upsertedID.isEmpty() ) { *upsertedId = result.getStats().upsertedID; } // END CURRENT OP incWriteStats( updateItem, result.getStats(), result.getError(), ¤tOp ); finishCurrentOp( _txn, ¤tOp, result.getError() ); // End current transaction and release snapshot. _txn->recoveryUnit()->commitAndRestart(); if ( result.getError() ) { result.getError()->setIndex( updateItem.getItemIndex() ); *error = result.releaseError(); } } void WriteBatchExecutor::execRemove( const BatchItemRef& removeItem, WriteErrorDetail** error ) { // Removes are similar to updates, but page faults are handled externally // BEGIN CURRENT OP CurOp currentOp( _txn->getClient(), _txn->getClient()->curop() ); beginCurrentOp( ¤tOp, _txn->getClient(), removeItem ); incOpStats( removeItem ); WriteOpResult result; multiRemove( _txn, removeItem, &result ); // END CURRENT OP incWriteStats( removeItem, result.getStats(), result.getError(), ¤tOp ); finishCurrentOp( _txn, ¤tOp, result.getError() ); // End current transaction and release snapshot. _txn->recoveryUnit()->commitAndRestart(); if ( result.getError() ) { result.getError()->setIndex( removeItem.getItemIndex() ); *error = result.releaseError(); } } // // IN-DB-LOCK CORE OPERATIONS // WriteBatchExecutor::ExecInsertsState::ExecInsertsState(OperationContext* txn, const BatchedCommandRequest* aRequest) : txn(txn), request(aRequest), currIndex(0), _transaction(txn, MODE_IX), _collection(NULL) { } bool WriteBatchExecutor::ExecInsertsState::_lockAndCheckImpl(WriteOpResult* result, bool intentLock) { if (hasLock()) { // TODO: Client::Context legacy, needs to be removed txn->getCurOp()->enter(_context->ns(), _context->db() ? _context->db()->getProfilingLevel() : 0); return true; } if (request->isInsertIndexRequest()) intentLock = false; // can't build indexes in intent mode invariant(!_context.get()); const NamespaceString& nss = request->getNSS(); _collLock.reset(); // give up locks if any _writeLock.reset(); _writeLock.reset(new Lock::DBLock(txn->lockState(), nss.db(), intentLock ? MODE_IX : MODE_X)); if (intentLock && dbHolder().get(txn, nss.db()) == NULL) { // Ensure exclusive lock in case the database doesn't yet exist _writeLock.reset(); _writeLock.reset(new Lock::DBLock(txn->lockState(), nss.db(), MODE_X)); intentLock = false; } _collLock.reset(new Lock::CollectionLock(txn->lockState(), nss.ns(), intentLock ? MODE_IX : MODE_X)); if (!checkIsMasterForDatabase(nss, result)) { return false; } if (!checkShardVersion(txn, &shardingState, *request, result)) { return false; } if (!checkIndexConstraints(txn, &shardingState, *request, result)) { return false; } _context.reset(); _context.reset(new Client::Context(txn, nss, false)); Database* database = _context->db(); dassert(database); _collection = database->getCollection(request->getTargetingNS()); if (!_collection) { if (intentLock) { // try again with full X lock. unlock(); return _lockAndCheckImpl(result, false); } WriteUnitOfWork wunit (txn); // Implicitly create if it doesn't exist _collection = database->createCollection(txn, request->getTargetingNS()); if (!_collection) { result->setError( toWriteError(Status(ErrorCodes::InternalError, "could not create collection " + request->getTargetingNS()))); return false; } repl::logOp(txn, "c", (database->name() + ".$cmd").c_str(), BSON("create" << nsToCollectionSubstring(request->getTargetingNS()))); wunit.commit(); } return true; } bool WriteBatchExecutor::ExecInsertsState::lockAndCheck(WriteOpResult* result) { if (_lockAndCheckImpl(result)) return true; unlock(); return false; } void WriteBatchExecutor::ExecInsertsState::unlock() { _collection = NULL; _context.reset(); _collLock.reset(); _writeLock.reset(); } static void insertOne(WriteBatchExecutor::ExecInsertsState* state, WriteOpResult* result) { invariant(state->currIndex < state->normalizedInserts.size()); const StatusWith& normalizedInsert(state->normalizedInserts[state->currIndex]); if (!normalizedInsert.isOK()) { result->setError(toWriteError(normalizedInsert.getStatus())); return; } const BSONObj& insertDoc = normalizedInsert.getValue().isEmpty() ? state->request->getInsertRequest()->getDocumentsAt( state->currIndex ) : normalizedInsert.getValue(); try { if (!state->request->isInsertIndexRequest()) { if (state->lockAndCheck(result)) { singleInsert(state->txn, insertDoc, state->getCollection(), result); } } else { singleCreateIndex(state->txn, insertDoc, result); } } catch (const DBException& ex) { Status status(ex.toStatus()); if (ErrorCodes::isInterruption(status.code())) throw; result->setError(toWriteError(status)); } // Errors release the write lock, as a matter of policy. if (result->getError()) { state->txn->recoveryUnit()->commitAndRestart(); state->unlock(); } } void WriteBatchExecutor::execOneInsert(ExecInsertsState* state, WriteErrorDetail** error) { BatchItemRef currInsertItem(state->request, state->currIndex); CurOp currentOp( _txn->getClient(), _txn->getClient()->curop() ); beginCurrentOp( ¤tOp, _txn->getClient(), currInsertItem ); incOpStats(currInsertItem); WriteOpResult result; insertOne(state, &result); incWriteStats(currInsertItem, result.getStats(), result.getError(), ¤tOp); finishCurrentOp(_txn, ¤tOp, result.getError()); if (result.getError()) { *error = result.releaseError(); } } /** * Perform a single insert into a collection. Requires the insert be preprocessed and the * collection already has been created. * * Might fault or error, otherwise populates the result. */ static void singleInsert( OperationContext* txn, const BSONObj& docToInsert, Collection* collection, WriteOpResult* result ) { const string& insertNS = collection->ns().ns(); invariant(txn->lockState()->isCollectionLockedForMode(insertNS, MODE_IX)); WriteUnitOfWork wunit(txn); StatusWith status = collection->insertDocument( txn, docToInsert, true ); if ( !status.isOK() ) { result->setError(toWriteError(status.getStatus())); } else { repl::logOp( txn, "i", insertNS.c_str(), docToInsert ); result->getStats().n = 1; wunit.commit(); } } /** * Perform a single index creation on a collection. Requires the index descriptor be * preprocessed. * * Might fault or error, otherwise populates the result. */ static void singleCreateIndex(OperationContext* txn, const BSONObj& indexDesc, WriteOpResult* result) { BSONElement nsElement = indexDesc["ns"]; uassert(ErrorCodes::NoSuchKey, "Missing \"ns\" field in index description", !nsElement.eoo()); uassert(ErrorCodes::TypeMismatch, str::stream() << "Expected \"ns\" field of index description to be a " "string, " "but found a " << typeName(nsElement.type()), nsElement.type() == String); const NamespaceString ns(nsElement.valueStringData()); BSONObjBuilder cmdBuilder; cmdBuilder << "createIndexes" << ns.coll(); cmdBuilder << "indexes" << BSON_ARRAY(indexDesc); BSONObj cmd = cmdBuilder.done(); Command* createIndexesCmd = Command::findCommand("createIndexes"); invariant(createIndexesCmd); std::string errmsg; BSONObjBuilder resultBuilder; const bool success = createIndexesCmd->run( txn, ns.db().toString(), cmd, 0, errmsg, resultBuilder, false /* fromrepl */); Command::appendCommandStatus(resultBuilder, success, errmsg); BSONObj cmdResult = resultBuilder.done(); uassertStatusOK(Command::getStatusFromCommandResult(cmdResult)); const long long numIndexesBefore = cmdResult["numIndexesBefore"].safeNumberLong(); const long long numIndexesAfter = cmdResult["numIndexesAfter"].safeNumberLong(); if (numIndexesAfter - numIndexesBefore == 1) { result->getStats().n = 1; } else if (numIndexesAfter != 0 && numIndexesAfter != numIndexesBefore) { severe() << "Created multiple indexes while attempting to create only 1; numIndexesBefore = " << numIndexesBefore << "; numIndexesAfter = " << numIndexesAfter; fassertFailed(28547); } else { result->getStats().n = 0; } } static void multiUpdate( OperationContext* txn, const BatchItemRef& updateItem, WriteOpResult* result ) { const NamespaceString nsString(updateItem.getRequest()->getNS()); const bool isMulti = updateItem.getUpdate()->getMulti(); UpdateRequest request(nsString); request.setQuery(updateItem.getUpdate()->getQuery()); request.setUpdates(updateItem.getUpdate()->getUpdateExpr()); request.setMulti(isMulti); request.setUpsert(updateItem.getUpdate()->getUpsert()); request.setUpdateOpLog(true); UpdateLifecycleImpl updateLifecycle(true, request.getNamespaceString()); request.setLifecycle(&updateLifecycle); // Updates from the write commands path can yield. request.setYieldPolicy(PlanExecutor::YIELD_AUTO); int attempt = 0; bool createCollection = false; for ( int fakeLoop = 0; fakeLoop < 1; fakeLoop++ ) { ParsedUpdate parsedUpdate(txn, &request); Status status = parsedUpdate.parseRequest(); if (!status.isOK()) { result->setError(toWriteError(status)); return; } if ( createCollection ) { ScopedTransaction transaction(txn, MODE_IX); Lock::DBLock lk(txn->lockState(), nsString.db(), MODE_X); Client::Context ctx(txn, nsString.ns(), false /* don't check version */); Database* db = ctx.db(); if ( db->getCollection( nsString.ns() ) ) { // someone else beat us to it } else { WriteUnitOfWork wuow(txn); uassertStatusOK( userCreateNS( txn, db, nsString.ns(), BSONObj(), !request.isFromReplication() ) ); wuow.commit(); } } /////////////////////////////////////////// ScopedTransaction transaction(txn, MODE_IX); Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX); Lock::CollectionLock colLock(txn->lockState(), nsString.ns(), MODE_IX); /////////////////////////////////////////// if (!checkShardVersion(txn, &shardingState, *updateItem.getRequest(), result)) return; Database* const db = dbHolder().get(txn, nsString.db()); if (db == NULL) { if (createCollection) { // we raced with some, accept defeat result->getStats().nModified = 0; result->getStats().n = 0; return; } // Database not yet created if (!request.isUpsert()) { // not an upsert, no database, nothing to do result->getStats().nModified = 0; result->getStats().n = 0; return; } // upsert, don't try to get a context as no MODE_X lock is held fakeLoop = -1; createCollection = true; continue; } Client::Context ctx(txn, nsString.ns(), false /* don't check version */); Collection* collection = db->getCollection(nsString.ns()); if ( collection == NULL ) { if ( createCollection ) { // we raced with some, accept defeat result->getStats().nModified = 0; result->getStats().n = 0; return; } if ( !request.isUpsert() ) { // not an upsert, no collection, nothing to do result->getStats().nModified = 0; result->getStats().n = 0; return; } // upsert, mark that we should create collection fakeLoop = -1; createCollection = true; continue; } OpDebug* debug = &txn->getCurOp()->debug(); try { invariant(collection); PlanExecutor* rawExec; uassertStatusOK(getExecutorUpdate(txn, collection, &parsedUpdate, debug, &rawExec)); boost::scoped_ptr exec(rawExec); uassertStatusOK(exec->executePlan()); UpdateResult res = UpdateStage::makeUpdateResult(exec.get(), debug); const long long numDocsModified = res.numDocsModified; const long long numMatched = res.numMatched; const BSONObj resUpsertedID = res.upserted; // We have an _id from an insert const bool didInsert = !resUpsertedID.isEmpty(); result->getStats().nModified = didInsert ? 0 : numDocsModified; result->getStats().n = didInsert ? 1 : numMatched; result->getStats().upsertedID = resUpsertedID; } catch ( const WriteConflictException& dle ) { debug->writeConflicts++; if ( isMulti ) { log() << "Had WriteConflict during multi update, aborting"; throw; } createCollection = false; // RESTART LOOP fakeLoop = -1; txn->recoveryUnit()->commitAndRestart(); WriteConflictException::logAndBackoff( attempt++, "update", nsString.ns() ); } catch (const DBException& ex) { Status status = ex.toStatus(); if (ErrorCodes::isInterruption(status.code())) { throw; } result->setError(toWriteError(status)); } } } /** * Perform a remove operation, which might remove multiple documents. Dispatches to remove code * currently to do most of this. * * Might fault or error, otherwise populates the result. */ static void multiRemove( OperationContext* txn, const BatchItemRef& removeItem, WriteOpResult* result ) { const NamespaceString& nss = removeItem.getRequest()->getNSS(); DeleteRequest request(nss); request.setQuery( removeItem.getDelete()->getQuery() ); request.setMulti( removeItem.getDelete()->getLimit() != 1 ); request.setUpdateOpLog(true); request.setGod( false ); // Deletes running through the write commands path can yield. request.setYieldPolicy(PlanExecutor::YIELD_AUTO); int attempt = 1; while ( 1 ) { try { ParsedDelete parsedDelete(txn, &request); Status status = parsedDelete.parseRequest(); if (!status.isOK()) { result->setError(toWriteError(status)); return; } ScopedTransaction scopedXact(txn, MODE_IX); AutoGetDb autoDb(txn, nss.db(), MODE_IX); if (!autoDb.getDb()) { break; } Lock::CollectionLock collLock(txn->lockState(), nss.ns(), MODE_IX); // Check version once we're locked if (!checkShardVersion(txn, &shardingState, *removeItem.getRequest(), result)) { // Version error return; } // Context once we're locked, to set more details in currentOp() // TODO: better constructor? Client::Context ctx(txn, nss.ns(), false /* don't check version */); PlanExecutor* rawExec; uassertStatusOK(getExecutorDelete(txn, ctx.db()->getCollection(nss), &parsedDelete, &rawExec)); boost::scoped_ptr exec(rawExec); // Execute the delete and retrieve the number deleted. uassertStatusOK(exec->executePlan()); result->getStats().n = DeleteStage::getNumDeleted(exec.get()); break; } catch ( const WriteConflictException& dle ) { txn->getCurOp()->debug().writeConflicts++; WriteConflictException::logAndBackoff( attempt++, "delete", nss.ns() ); } catch ( const DBException& ex ) { Status status = ex.toStatus(); if (ErrorCodes::isInterruption(status.code())) { throw; } result->setError(toWriteError(status)); return; } } } } // namespace mongo