diff options
Diffstat (limited to 'src/mongo/db/commands/write_commands')
6 files changed, 1532 insertions, 1644 deletions
diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp index 0b277ddfa56..2087b5bd2f4 100644 --- a/src/mongo/db/commands/write_commands/batch_executor.cpp +++ b/src/mongo/db/commands/write_commands/batch_executor.cpp @@ -83,303 +83,290 @@ namespace mongo { - using std::endl; - using std::string; - using std::unique_ptr; - using std::vector; - - namespace { - - /** - * Data structure to safely hold and clean up results of single write operations. - */ - class WriteOpResult { - MONGO_DISALLOW_COPYING(WriteOpResult); - public: - WriteOpResult() {} - - WriteOpStats& getStats() { return _stats; } - - WriteErrorDetail* getError() { return _error.get(); } - WriteErrorDetail* releaseError() { return _error.release(); } - void setError(WriteErrorDetail* error) { _error.reset(error); } - - private: - WriteOpStats _stats; - std::unique_ptr<WriteErrorDetail> _error; - }; - - } // namespace - - // TODO: Determine queueing behavior we want here - MONGO_EXPORT_SERVER_PARAMETER( queueForMigrationCommit, bool, true ); - - using mongoutils::str::stream; - - WriteBatchExecutor::WriteBatchExecutor( OperationContext* txn, - OpCounters* opCounters, - LastError* le ) : - _txn(txn), - _opCounters( opCounters ), - _le( le ), - _stats( new WriteBatchStats ) { - } +using std::endl; +using std::string; +using std::unique_ptr; +using std::vector; - static WCErrorDetail* toWriteConcernError( const Status& wcStatus, - const WriteConcernResult& wcResult ) { +namespace { - WCErrorDetail* wcError = new WCErrorDetail; +/** + * Data structure to safely hold and clean up results of single write operations. + */ +class WriteOpResult { + MONGO_DISALLOW_COPYING(WriteOpResult); - wcError->setErrCode( wcStatus.code() ); - wcError->setErrMessage( wcStatus.reason() ); - if ( wcResult.wTimedOut ) - wcError->setErrInfo( BSON( "wtimeout" << true ) ); +public: + WriteOpResult() {} - return wcError; + WriteOpStats& getStats() { + return _stats; } - static WriteErrorDetail* toWriteError( const Status& status ) { - - WriteErrorDetail* error = new WriteErrorDetail; - - // TODO: Complex transform here? - error->setErrCode( status.code() ); - error->setErrMessage( status.reason() ); - - return error; + WriteErrorDetail* getError() { + return _error.get(); } - - static void toBatchError( const Status& status, BatchedCommandResponse* response ) { - response->clear(); - response->setErrCode( status.code() ); - response->setErrMessage( status.reason() ); - response->setOk( false ); - dassert( response->isValid(NULL) ); + WriteErrorDetail* releaseError() { + return _error.release(); } - - static void noteInCriticalSection( WriteErrorDetail* staleError ) { - BSONObjBuilder builder; - if ( staleError->isErrInfoSet() ) - builder.appendElements( staleError->getErrInfo() ); - builder.append( "inCriticalSection", true ); - staleError->setErrInfo( builder.obj() ); + void setError(WriteErrorDetail* error) { + _error.reset(error); } - // static - Status WriteBatchExecutor::validateBatch( const BatchedCommandRequest& request ) { - - // Validate namespace - const NamespaceString& nss = request.getNSS(); - if ( !nss.isValid() ) { - return Status( ErrorCodes::InvalidNamespace, - nss.ns() + " is not a valid namespace" ); - } - - // Make sure we can write to the namespace - Status allowedStatus = userAllowedWriteNS( nss ); - if ( !allowedStatus.isOK() ) { - return allowedStatus; - } - - // Validate insert index requests - // TODO: Push insert index requests through createIndex once all upgrade paths support it - string errMsg; - if ( request.isInsertIndexRequest() && !request.isValidIndexRequest( &errMsg ) ) { - return Status( ErrorCodes::InvalidOptions, errMsg ); - } - - return Status::OK(); +private: + WriteOpStats _stats; + std::unique_ptr<WriteErrorDetail> _error; +}; + +} // namespace + +// TODO: Determine queueing behavior we want here +MONGO_EXPORT_SERVER_PARAMETER(queueForMigrationCommit, bool, true); + +using mongoutils::str::stream; + +WriteBatchExecutor::WriteBatchExecutor(OperationContext* txn, OpCounters* opCounters, LastError* le) + : _txn(txn), _opCounters(opCounters), _le(le), _stats(new WriteBatchStats) {} + +static WCErrorDetail* toWriteConcernError(const Status& wcStatus, + const WriteConcernResult& wcResult) { + WCErrorDetail* wcError = new WCErrorDetail; + + wcError->setErrCode(wcStatus.code()); + wcError->setErrMessage(wcStatus.reason()); + if (wcResult.wTimedOut) + wcError->setErrInfo(BSON("wtimeout" << true)); + + return wcError; +} + +static WriteErrorDetail* toWriteError(const Status& status) { + WriteErrorDetail* error = new WriteErrorDetail; + + // TODO: Complex transform here? + error->setErrCode(status.code()); + error->setErrMessage(status.reason()); + + return error; +} + +static void toBatchError(const Status& status, BatchedCommandResponse* response) { + response->clear(); + response->setErrCode(status.code()); + response->setErrMessage(status.reason()); + response->setOk(false); + dassert(response->isValid(NULL)); +} + +static void noteInCriticalSection(WriteErrorDetail* staleError) { + BSONObjBuilder builder; + if (staleError->isErrInfoSet()) + builder.appendElements(staleError->getErrInfo()); + builder.append("inCriticalSection", true); + staleError->setErrInfo(builder.obj()); +} + +// static +Status WriteBatchExecutor::validateBatch(const BatchedCommandRequest& request) { + // Validate namespace + const NamespaceString& nss = request.getNSS(); + if (!nss.isValid()) { + return Status(ErrorCodes::InvalidNamespace, nss.ns() + " is not a valid namespace"); } - void WriteBatchExecutor::executeBatch( const BatchedCommandRequest& request, - BatchedCommandResponse* response ) { + // Make sure we can write to the namespace + Status allowedStatus = userAllowedWriteNS(nss); + if (!allowedStatus.isOK()) { + return allowedStatus; + } - // Validate namespace - Status isValid = validateBatch(request); - if (!isValid.isOK()) { - toBatchError( isValid, response ); - return; - } + // Validate insert index requests + // TODO: Push insert index requests through createIndex once all upgrade paths support it + string errMsg; + if (request.isInsertIndexRequest() && !request.isValidIndexRequest(&errMsg)) { + return Status(ErrorCodes::InvalidOptions, errMsg); + } - if ( request.sizeWriteOps() == 0u ) { - toBatchError( Status( ErrorCodes::InvalidLength, - "no write ops were included in the batch" ), - response ); - return; - } + return Status::OK(); +} - // Validate batch size - if ( request.sizeWriteOps() > BatchedCommandRequest::kMaxWriteBatchSize ) { - toBatchError( Status( ErrorCodes::InvalidLength, - stream() << "exceeded maximum write batch size of " - << BatchedCommandRequest::kMaxWriteBatchSize ), - response ); - return; - } +void WriteBatchExecutor::executeBatch(const BatchedCommandRequest& request, + BatchedCommandResponse* response) { + // Validate namespace + Status isValid = validateBatch(request); + if (!isValid.isOK()) { + toBatchError(isValid, response); + return; + } - // - // End validation - // + if (request.sizeWriteOps() == 0u) { + toBatchError(Status(ErrorCodes::InvalidLength, "no write ops were included in the batch"), + response); + return; + } - const WriteConcernOptions& writeConcern = _txn->getWriteConcern(); - bool silentWC = writeConcern.wMode.empty() && writeConcern.wNumNodes == 0 - && writeConcern.syncMode == WriteConcernOptions::NONE; + // Validate batch size + if (request.sizeWriteOps() > BatchedCommandRequest::kMaxWriteBatchSize) { + toBatchError(Status(ErrorCodes::InvalidLength, + stream() << "exceeded maximum write batch size of " + << BatchedCommandRequest::kMaxWriteBatchSize), + response); + return; + } - Timer commandTimer; + // + // End validation + // - OwnedPointerVector<WriteErrorDetail> writeErrorsOwned; - vector<WriteErrorDetail*>& writeErrors = writeErrorsOwned.mutableVector(); + const WriteConcernOptions& writeConcern = _txn->getWriteConcern(); + bool silentWC = writeConcern.wMode.empty() && writeConcern.wNumNodes == 0 && + writeConcern.syncMode == WriteConcernOptions::NONE; - OwnedPointerVector<BatchedUpsertDetail> upsertedOwned; - vector<BatchedUpsertDetail*>& upserted = upsertedOwned.mutableVector(); + Timer commandTimer; - // - // Apply each batch item, possibly bulking some items together in the write lock. - // Stops on error if batch is ordered. - // + OwnedPointerVector<WriteErrorDetail> writeErrorsOwned; + vector<WriteErrorDetail*>& writeErrors = writeErrorsOwned.mutableVector(); - bulkExecute( request, &upserted, &writeErrors ); + OwnedPointerVector<BatchedUpsertDetail> upsertedOwned; + vector<BatchedUpsertDetail*>& upserted = upsertedOwned.mutableVector(); - // - // Try to enforce the write concern if everything succeeded (unordered or ordered) - // OR if something succeeded and we're unordered. - // + // + // Apply each batch item, possibly bulking some items together in the write lock. + // Stops on error if batch is ordered. + // - unique_ptr<WCErrorDetail> wcError; - bool needToEnforceWC = writeErrors.empty() - || ( !request.getOrdered() - && writeErrors.size() < request.sizeWriteOps() ); + bulkExecute(request, &upserted, &writeErrors); - if ( needToEnforceWC ) { - { - stdx::lock_guard<Client> lk(*_txn->getClient()); - CurOp::get(_txn)->setMessage_inlock( "waiting for write concern" ); - } + // + // Try to enforce the write concern if everything succeeded (unordered or ordered) + // OR if something succeeded and we're unordered. + // - WriteConcernResult res; - Status status = waitForWriteConcern( - _txn, - repl::ReplClientInfo::forClient(_txn->getClient()).getLastOp(), - &res); + unique_ptr<WCErrorDetail> wcError; + bool needToEnforceWC = writeErrors.empty() || + (!request.getOrdered() && writeErrors.size() < request.sizeWriteOps()); - if ( !status.isOK() ) { - wcError.reset( toWriteConcernError( status, res ) ); - } + if (needToEnforceWC) { + { + stdx::lock_guard<Client> lk(*_txn->getClient()); + CurOp::get(_txn)->setMessage_inlock("waiting for write concern"); } - // - // Refresh metadata if needed - // + WriteConcernResult res; + Status status = waitForWriteConcern( + _txn, repl::ReplClientInfo::forClient(_txn->getClient()).getLastOp(), &res); - bool staleBatch = !writeErrors.empty() - && writeErrors.back()->getErrCode() == ErrorCodes::StaleShardVersion; - - if ( staleBatch ) { + if (!status.isOK()) { + wcError.reset(toWriteConcernError(status, res)); + } + } - const BatchedRequestMetadata* requestMetadata = request.getMetadata(); - dassert( requestMetadata ); + // + // Refresh metadata if needed + // - // Make sure our shard name is set or is the same as what was set previously - if ( shardingState.setShardName( requestMetadata->getShardName() ) ) { + bool staleBatch = + !writeErrors.empty() && writeErrors.back()->getErrCode() == ErrorCodes::StaleShardVersion; + + if (staleBatch) { + const BatchedRequestMetadata* requestMetadata = request.getMetadata(); + dassert(requestMetadata); + + // Make sure our shard name is set or is the same as what was set previously + if (shardingState.setShardName(requestMetadata->getShardName())) { + // + // First, we refresh metadata if we need to based on the requested version. + // + + ChunkVersion latestShardVersion; + shardingState.refreshMetadataIfNeeded(_txn, + request.getTargetingNS(), + requestMetadata->getShardVersion(), + &latestShardVersion); + + // Report if we're still changing our metadata + // TODO: Better reporting per-collection + if (shardingState.inCriticalMigrateSection()) { + noteInCriticalSection(writeErrors.back()); + } + if (queueForMigrationCommit) { // - // First, we refresh metadata if we need to based on the requested version. + // Queue up for migration to end - this allows us to be sure that clients will + // not repeatedly try to refresh metadata that is not yet written to the config + // server. Not necessary for correctness. + // Exposed as optional parameter to allow testing of queuing behavior with + // different network timings. // - ChunkVersion latestShardVersion; - shardingState.refreshMetadataIfNeeded( _txn, - request.getTargetingNS(), - requestMetadata->getShardVersion(), - &latestShardVersion ); - - // Report if we're still changing our metadata - // TODO: Better reporting per-collection - if ( shardingState.inCriticalMigrateSection() ) { - noteInCriticalSection( writeErrors.back() ); - } - - if ( queueForMigrationCommit ) { - - // - // Queue up for migration to end - this allows us to be sure that clients will - // not repeatedly try to refresh metadata that is not yet written to the config - // server. Not necessary for correctness. - // Exposed as optional parameter to allow testing of queuing behavior with - // different network timings. - // - - const ChunkVersion& requestShardVersion = requestMetadata->getShardVersion(); + const ChunkVersion& requestShardVersion = requestMetadata->getShardVersion(); - // - // Only wait if we're an older version (in the current collection epoch) and - // we're not write compatible, implying that the current migration is affecting - // writes. - // - - if ( requestShardVersion.isOlderThan( latestShardVersion ) && - !requestShardVersion.isWriteCompatibleWith( latestShardVersion ) ) { - - while ( shardingState.inCriticalMigrateSection() ) { + // + // Only wait if we're an older version (in the current collection epoch) and + // we're not write compatible, implying that the current migration is affecting + // writes. + // - log() << "write request to old shard version " - << requestMetadata->getShardVersion().toString() - << " waiting for migration commit" << endl; + if (requestShardVersion.isOlderThan(latestShardVersion) && + !requestShardVersion.isWriteCompatibleWith(latestShardVersion)) { + while (shardingState.inCriticalMigrateSection()) { + log() << "write request to old shard version " + << requestMetadata->getShardVersion().toString() + << " waiting for migration commit" << endl; - shardingState.waitTillNotInCriticalSection( 10 /* secs */); - } + shardingState.waitTillNotInCriticalSection(10 /* secs */); } } } - else { - // If our shard name is stale, our version must have been stale as well - dassert( writeErrors.size() == request.sizeWriteOps() ); - } + } else { + // If our shard name is stale, our version must have been stale as well + dassert(writeErrors.size() == request.sizeWriteOps()); } + } - // - // Construct response - // - - response->setOk( true ); + // + // Construct response + // - if ( !silentWC ) { + response->setOk(true); - if ( upserted.size() ) { - response->setUpsertDetails( upserted ); - } + if (!silentWC) { + if (upserted.size()) { + response->setUpsertDetails(upserted); + } - if ( writeErrors.size() ) { - response->setErrDetails( writeErrors ); - } + if (writeErrors.size()) { + response->setErrDetails(writeErrors); + } - if ( wcError.get() ) { - response->setWriteConcernError( wcError.release() ); - } + if (wcError.get()) { + response->setWriteConcernError(wcError.release()); + } - repl::ReplicationCoordinator* replCoord = repl::getGlobalReplicationCoordinator(); - const repl::ReplicationCoordinator::Mode replMode = replCoord->getReplicationMode(); - if (replMode != repl::ReplicationCoordinator::modeNone) { - response->setLastOp(repl::ReplClientInfo::forClient(_txn->getClient()).getLastOp() - .getTimestamp()); - if (replMode == repl::ReplicationCoordinator::modeReplSet) { - response->setElectionId(replCoord->getElectionId()); - } + repl::ReplicationCoordinator* replCoord = repl::getGlobalReplicationCoordinator(); + const repl::ReplicationCoordinator::Mode replMode = replCoord->getReplicationMode(); + if (replMode != repl::ReplicationCoordinator::modeNone) { + response->setLastOp( + repl::ReplClientInfo::forClient(_txn->getClient()).getLastOp().getTimestamp()); + if (replMode == repl::ReplicationCoordinator::modeReplSet) { + response->setElectionId(replCoord->getElectionId()); } - - // Set the stats for the response - response->setN( _stats->numInserted + _stats->numUpserted + _stats->numMatched - + _stats->numDeleted ); - if ( request.getBatchType() == BatchedCommandRequest::BatchType_Update ) - response->setNModified( _stats->numModified ); } - dassert( response->isValid( NULL ) ); + // Set the stats for the response + response->setN(_stats->numInserted + _stats->numUpserted + _stats->numMatched + + _stats->numDeleted); + if (request.getBatchType() == BatchedCommandRequest::BatchType_Update) + response->setNModified(_stats->numModified); } - // Translates write item type to wire protocol op code. - // Helper for WriteBatchExecutor::applyWriteItem(). - static int getOpCode(const BatchItemRef& currWrite) { - switch (currWrite.getRequest()->getBatchType()) { + dassert(response->isValid(NULL)); +} + +// Translates write item type to wire protocol op code. +// Helper for WriteBatchExecutor::applyWriteItem(). +static int getOpCode(const BatchItemRef& currWrite) { + switch (currWrite.getRequest()->getBatchType()) { case BatchedCommandRequest::BatchType_Insert: return dbInsert; case BatchedCommandRequest::BatchType_Update: @@ -388,1068 +375,990 @@ namespace mongo { return dbDelete; default: MONGO_UNREACHABLE; + } +} + +static void buildStaleError(const ChunkVersion& shardVersionRecvd, + const ChunkVersion& shardVersionWanted, + WriteErrorDetail* error) { + // Write stale error to results + error->setErrCode(ErrorCodes::StaleShardVersion); + + BSONObjBuilder infoB; + shardVersionWanted.addToBSON(infoB, "vWanted"); + error->setErrInfo(infoB.obj()); + + string errMsg = stream() << "stale shard version detected before write, received " + << shardVersionRecvd.toString() << " but local version is " + << shardVersionWanted.toString(); + error->setErrMessage(errMsg); +} + +static bool checkShardVersion(OperationContext* txn, + ShardingState* shardingState, + const BatchedCommandRequest& request, + WriteOpResult* result) { + const NamespaceString& nss = request.getTargetingNSS(); + dassert(txn->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IX)); + + ChunkVersion requestShardVersion = + request.isMetadataSet() && request.getMetadata()->isShardVersionSet() + ? request.getMetadata()->getShardVersion() + : ChunkVersion::IGNORED(); + + if (shardingState->enabled()) { + CollectionMetadataPtr metadata = shardingState->getCollectionMetadata(nss.ns()); + + if (!ChunkVersion::isIgnoredVersion(requestShardVersion)) { + ChunkVersion shardVersion = + metadata ? metadata->getShardVersion() : ChunkVersion::UNSHARDED(); + + if (!requestShardVersion.isWriteCompatibleWith(shardVersion)) { + result->setError(new WriteErrorDetail); + buildStaleError(requestShardVersion, shardVersion, result->getError()); + return false; + } } } - static void buildStaleError( const ChunkVersion& shardVersionRecvd, - const ChunkVersion& shardVersionWanted, - WriteErrorDetail* error ) { - - // Write stale error to results - error->setErrCode( ErrorCodes::StaleShardVersion ); + return true; +} - BSONObjBuilder infoB; - shardVersionWanted.addToBSON( infoB, "vWanted" ); - error->setErrInfo( infoB.obj() ); - - string errMsg = stream() << "stale shard version detected before write, received " - << shardVersionRecvd.toString() << " but local version is " - << shardVersionWanted.toString(); - error->setErrMessage( errMsg ); +static bool checkIsMasterForDatabase(const NamespaceString& ns, WriteOpResult* result) { + if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(ns)) { + WriteErrorDetail* errorDetail = new WriteErrorDetail; + result->setError(errorDetail); + errorDetail->setErrCode(ErrorCodes::NotMaster); + errorDetail->setErrMessage("Not primary while writing to " + ns.toString()); + return false; } - - static bool checkShardVersion(OperationContext* txn, + return true; +} + +static void buildUniqueIndexError(const BSONObj& keyPattern, + const BSONObj& indexPattern, + WriteErrorDetail* error) { + error->setErrCode(ErrorCodes::CannotCreateIndex); + string errMsg = stream() << "cannot create unique index over " << indexPattern + << " with shard key pattern " << keyPattern; + error->setErrMessage(errMsg); +} + +static bool checkIndexConstraints(OperationContext* txn, ShardingState* shardingState, const BatchedCommandRequest& request, WriteOpResult* result) { + const NamespaceString& nss = request.getTargetingNSS(); + dassert(txn->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IX)); - const NamespaceString& nss = request.getTargetingNSS(); - dassert(txn->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IX)); - - ChunkVersion requestShardVersion = - request.isMetadataSet() && request.getMetadata()->isShardVersionSet() ? - request.getMetadata()->getShardVersion() : ChunkVersion::IGNORED(); - - if ( shardingState->enabled() ) { - - CollectionMetadataPtr metadata = shardingState->getCollectionMetadata( nss.ns() ); + if (!request.isUniqueIndexRequest()) + return true; - if ( !ChunkVersion::isIgnoredVersion( requestShardVersion ) ) { + if (shardingState->enabled()) { + CollectionMetadataPtr metadata = shardingState->getCollectionMetadata(nss.ns()); - ChunkVersion shardVersion = - metadata ? metadata->getShardVersion() : ChunkVersion::UNSHARDED(); + if (metadata) { + ShardKeyPattern shardKeyPattern(metadata->getKeyPattern()); + if (!shardKeyPattern.isUniqueIndexCompatible(request.getIndexKeyPattern())) { + result->setError(new WriteErrorDetail); + buildUniqueIndexError( + metadata->getKeyPattern(), request.getIndexKeyPattern(), result->getError()); - if ( !requestShardVersion.isWriteCompatibleWith( shardVersion ) ) { - result->setError(new WriteErrorDetail); - buildStaleError(requestShardVersion, shardVersion, result->getError()); - return false; - } + return false; } } - - return true; - } - - static bool checkIsMasterForDatabase(const NamespaceString& ns, WriteOpResult* result) { - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(ns)) { - WriteErrorDetail* errorDetail = new WriteErrorDetail; - result->setError(errorDetail); - errorDetail->setErrCode(ErrorCodes::NotMaster); - errorDetail->setErrMessage("Not primary while writing to " + ns.toString()); - return false; - } - return true; } - static void buildUniqueIndexError( const BSONObj& keyPattern, - const BSONObj& indexPattern, - WriteErrorDetail* error ) { - error->setErrCode( ErrorCodes::CannotCreateIndex ); - string errMsg = stream() << "cannot create unique index over " << indexPattern - << " with shard key pattern " << keyPattern; - error->setErrMessage( errMsg ); + return true; +} + +// +// HELPERS FOR CUROP MANAGEMENT AND GLOBAL STATS +// + +static void beginCurrentOp(OperationContext* txn, const BatchItemRef& currWrite) { + stdx::lock_guard<Client> lk(*txn->getClient()); + CurOp* const currentOp = CurOp::get(txn); + currentOp->setOp_inlock(getOpCode(currWrite)); + currentOp->ensureStarted(); + currentOp->setNS_inlock(currWrite.getRequest()->getNS()); + + currentOp->debug().ns = currentOp->getNS(); + currentOp->debug().op = currentOp->getOp(); + + if (currWrite.getOpType() == BatchedCommandRequest::BatchType_Insert) { + currentOp->setQuery_inlock(currWrite.getDocument()); + currentOp->debug().query = currWrite.getDocument(); + currentOp->debug().ninserted = 0; + } else if (currWrite.getOpType() == BatchedCommandRequest::BatchType_Update) { + currentOp->setQuery_inlock(currWrite.getUpdate()->getQuery()); + currentOp->debug().query = currWrite.getUpdate()->getQuery(); + currentOp->debug().updateobj = currWrite.getUpdate()->getUpdateExpr(); + // Note: debug().nMatched, nModified and nmoved are set internally in update + } else { + dassert(currWrite.getOpType() == BatchedCommandRequest::BatchType_Delete); + currentOp->setQuery_inlock(currWrite.getDelete()->getQuery()); + currentOp->debug().query = currWrite.getDelete()->getQuery(); + currentOp->debug().ndeleted = 0; } - - static bool checkIndexConstraints(OperationContext* txn, - ShardingState* shardingState, - const BatchedCommandRequest& request, - WriteOpResult* result) { - - const NamespaceString& nss = request.getTargetingNSS(); - dassert(txn->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IX)); - - if ( !request.isUniqueIndexRequest() ) - return true; - - if ( shardingState->enabled() ) { - - CollectionMetadataPtr metadata = shardingState->getCollectionMetadata( nss.ns() ); - - if ( metadata ) { - ShardKeyPattern shardKeyPattern(metadata->getKeyPattern()); - if (!shardKeyPattern.isUniqueIndexCompatible(request.getIndexKeyPattern())) { - - result->setError(new WriteErrorDetail); - buildUniqueIndexError(metadata->getKeyPattern(), - request.getIndexKeyPattern(), - result->getError()); - - return false; - } - } - } - - return true; +} + +void WriteBatchExecutor::incOpStats(const BatchItemRef& currWrite) { + if (currWrite.getOpType() == BatchedCommandRequest::BatchType_Insert) { + _opCounters->gotInsert(); + } else if (currWrite.getOpType() == BatchedCommandRequest::BatchType_Update) { + _opCounters->gotUpdate(); + } else { + dassert(currWrite.getOpType() == BatchedCommandRequest::BatchType_Delete); + _opCounters->gotDelete(); } - - // - // HELPERS FOR CUROP MANAGEMENT AND GLOBAL STATS - // - - static void beginCurrentOp(OperationContext* txn, const BatchItemRef& currWrite) { - - stdx::lock_guard<Client> lk(*txn->getClient()); - CurOp* const currentOp = CurOp::get(txn); - currentOp->setOp_inlock(getOpCode(currWrite)); - currentOp->ensureStarted(); - currentOp->setNS_inlock( currWrite.getRequest()->getNS() ); - - currentOp->debug().ns = currentOp->getNS(); - currentOp->debug().op = currentOp->getOp(); - - if ( currWrite.getOpType() == BatchedCommandRequest::BatchType_Insert ) { - currentOp->setQuery_inlock( currWrite.getDocument() ); - currentOp->debug().query = currWrite.getDocument(); - currentOp->debug().ninserted = 0; +} + +void WriteBatchExecutor::incWriteStats(const BatchItemRef& currWrite, + const WriteOpStats& stats, + const WriteErrorDetail* error, + CurOp* currentOp) { + if (currWrite.getOpType() == BatchedCommandRequest::BatchType_Insert) { + _stats->numInserted += stats.n; + currentOp->debug().ninserted += stats.n; + if (!error) { + _le->recordInsert(stats.n); } - else if ( currWrite.getOpType() == BatchedCommandRequest::BatchType_Update ) { - currentOp->setQuery_inlock( currWrite.getUpdate()->getQuery() ); - currentOp->debug().query = currWrite.getUpdate()->getQuery(); - currentOp->debug().updateobj = currWrite.getUpdate()->getUpdateExpr(); - // Note: debug().nMatched, nModified and nmoved are set internally in update + } else if (currWrite.getOpType() == BatchedCommandRequest::BatchType_Update) { + if (stats.upsertedID.isEmpty()) { + _stats->numMatched += stats.n; + _stats->numModified += stats.nModified; + } else { + ++_stats->numUpserted; } - else { - dassert( currWrite.getOpType() == BatchedCommandRequest::BatchType_Delete ); - currentOp->setQuery_inlock( currWrite.getDelete()->getQuery() ); - currentOp->debug().query = currWrite.getDelete()->getQuery(); - currentOp->debug().ndeleted = 0; - } - - } - void WriteBatchExecutor::incOpStats( const BatchItemRef& currWrite ) { - - if ( currWrite.getOpType() == BatchedCommandRequest::BatchType_Insert ) { - _opCounters->gotInsert(); - } - else if ( currWrite.getOpType() == BatchedCommandRequest::BatchType_Update ) { - _opCounters->gotUpdate(); + if (!error) { + _le->recordUpdate(stats.upsertedID.isEmpty() && stats.n > 0, stats.n, stats.upsertedID); } - else { - dassert( currWrite.getOpType() == BatchedCommandRequest::BatchType_Delete ); - _opCounters->gotDelete(); + } else { + dassert(currWrite.getOpType() == BatchedCommandRequest::BatchType_Delete); + _stats->numDeleted += stats.n; + if (!error) { + _le->recordDelete(stats.n); } + currentOp->debug().ndeleted += stats.n; } - void WriteBatchExecutor::incWriteStats( const BatchItemRef& currWrite, - const WriteOpStats& stats, - const WriteErrorDetail* error, - CurOp* currentOp ) { - - if ( currWrite.getOpType() == BatchedCommandRequest::BatchType_Insert ) { - _stats->numInserted += stats.n; - currentOp->debug().ninserted += stats.n; - if (!error) { - _le->recordInsert(stats.n); - } - } - else if ( currWrite.getOpType() == BatchedCommandRequest::BatchType_Update ) { - if ( stats.upsertedID.isEmpty() ) { - _stats->numMatched += stats.n; - _stats->numModified += stats.nModified; - } - else { - ++_stats->numUpserted; - } - - if (!error) { - _le->recordUpdate( stats.upsertedID.isEmpty() && stats.n > 0, - stats.n, - stats.upsertedID ); - } - } - else { - dassert( currWrite.getOpType() == BatchedCommandRequest::BatchType_Delete ); - _stats->numDeleted += stats.n; - if ( !error ) { - _le->recordDelete( stats.n ); - } - currentOp->debug().ndeleted += stats.n; - } - - if (error) { - _le->setLastError(error->getErrCode(), error->getErrMessage().c_str()); - } + if (error) { + _le->setLastError(error->getErrCode(), error->getErrMessage().c_str()); } - - static void finishCurrentOp(OperationContext* txn, WriteErrorDetail* opError) { - - CurOp* currentOp = CurOp::get(txn); - currentOp->done(); - int executionTime = currentOp->debug().executionTime = currentOp->totalTimeMillis(); - recordCurOpMetrics(txn); - Top::get(txn->getClient()->getServiceContext()).record( - currentOp->getNS(), +} + +static void finishCurrentOp(OperationContext* txn, WriteErrorDetail* opError) { + CurOp* currentOp = CurOp::get(txn); + currentOp->done(); + int executionTime = currentOp->debug().executionTime = currentOp->totalTimeMillis(); + recordCurOpMetrics(txn); + Top::get(txn->getClient()->getServiceContext()) + .record(currentOp->getNS(), currentOp->getOp(), - 1, // "write locked" + 1, // "write locked" currentOp->totalTimeMicros(), currentOp->isCommand()); - if ( opError ) { - currentOp->debug().exceptionInfo = ExceptionInfo( opError->getErrMessage(), - opError->getErrCode() ); + if (opError) { + currentOp->debug().exceptionInfo = + ExceptionInfo(opError->getErrMessage(), opError->getErrCode()); - LOG(3) << " Caught Assertion in " << opToString( currentOp->getOp() ) - << ", continuing " << causedBy( opError->getErrMessage() ) << endl; - } + LOG(3) << " Caught Assertion in " << opToString(currentOp->getOp()) << ", continuing " + << causedBy(opError->getErrMessage()) << endl; + } - bool logAll = logger::globalLogDomain()->shouldLog(logger::LogComponent::kWrite, - logger::LogSeverity::Debug(1)); - bool logSlow = executionTime - > ( serverGlobalParams.slowMS + currentOp->getExpectedLatencyMs() ); + bool logAll = logger::globalLogDomain()->shouldLog(logger::LogComponent::kWrite, + logger::LogSeverity::Debug(1)); + bool logSlow = executionTime > (serverGlobalParams.slowMS + currentOp->getExpectedLatencyMs()); - if ( logAll || logSlow ) { - Locker::LockerInfo lockerInfo; - txn->lockState()->getLockerInfo(&lockerInfo); + if (logAll || logSlow) { + Locker::LockerInfo lockerInfo; + txn->lockState()->getLockerInfo(&lockerInfo); - LOG(0) << currentOp->debug().report(*currentOp, lockerInfo.stats); - } + LOG(0) << currentOp->debug().report(*currentOp, lockerInfo.stats); + } - if (currentOp->shouldDBProfile(executionTime)) { - profile(txn, CurOp::get(txn)->getOp()); - } + if (currentOp->shouldDBProfile(executionTime)) { + profile(txn, CurOp::get(txn)->getOp()); } +} - // END HELPERS +// END HELPERS - // - // CORE WRITE OPERATIONS (declaration) - // These functions write to the database and return stats and zero or one of: - // - page fault - // - error - // +// +// CORE WRITE OPERATIONS (declaration) +// These functions write to the database and return stats and zero or one of: +// - page fault +// - error +// - static void singleInsert( OperationContext* txn, - const BSONObj& docToInsert, - Collection* collection, - WriteOpResult* result ); +static void singleInsert(OperationContext* txn, + const BSONObj& docToInsert, + Collection* collection, + WriteOpResult* result); - static void singleCreateIndex( OperationContext* txn, - const BSONObj& indexDesc, - WriteOpResult* result ); +static void singleCreateIndex(OperationContext* txn, + const BSONObj& indexDesc, + WriteOpResult* result); - static void multiUpdate( OperationContext* txn, - const BatchItemRef& updateItem, - WriteOpResult* result ); +static void multiUpdate(OperationContext* txn, + const BatchItemRef& updateItem, + WriteOpResult* result); - static void multiRemove( OperationContext* txn, - const BatchItemRef& removeItem, - WriteOpResult* result ); +static void multiRemove(OperationContext* txn, + const BatchItemRef& removeItem, + WriteOpResult* result); - // - // WRITE EXECUTION - // In general, the exec* operations manage db lock state and stats before dispatching to the - // core write operations, which are *only* responsible for performing a write and reporting - // success or failure. - // +// +// WRITE EXECUTION +// In general, the exec* operations manage db lock state and stats before dispatching to the +// core write operations, which are *only* responsible for performing a write and reporting +// success or failure. +// + +/** + * Representation of the execution state of execInserts. Used by a single + * execution of execInserts in a single thread. + */ +class WriteBatchExecutor::ExecInsertsState { + MONGO_DISALLOW_COPYING(ExecInsertsState); +public: /** - * Representation of the execution state of execInserts. Used by a single - * execution of execInserts in a single thread. + * Constructs a new instance, for performing inserts described in "aRequest". */ - class WriteBatchExecutor::ExecInsertsState { - MONGO_DISALLOW_COPYING(ExecInsertsState); - public: - /** - * Constructs a new instance, for performing inserts described in "aRequest". - */ - explicit ExecInsertsState(OperationContext* txn, - const BatchedCommandRequest* aRequest); - - /** - * Acquires the write lock and client context needed to perform the current write operation. - * Returns true on success, after which it is safe to use the "context" and "collection" - * members. It is safe to call this function if this instance already holds the write lock. - * - * On failure, writeLock, context and collection will be NULL/clear. - */ - bool lockAndCheck(WriteOpResult* result); - - /** - * Releases the client context and write lock acquired by lockAndCheck. Safe to call - * regardless of whether or not this state object currently owns the lock. - */ - void unlock(); - - /** - * Returns true if this executor has the lock on the target database. - */ - bool hasLock() { return _dbLock.get(); } - - /** - * Gets the target collection for the batch operation. Value is undefined - * unless hasLock() is true. - */ - Collection* getCollection() { return _collection; } - - OperationContext* txn; - - // Request object describing the inserts. - const BatchedCommandRequest* request; - - // Index of the current insert operation to perform. - size_t currIndex = 0; - - // Translation of insert documents in "request" into insert-ready forms. This vector has a - // correspondence with elements of the "request", and "currIndex" is used to - // index both. - std::vector<StatusWith<BSONObj> > normalizedInserts; - - private: - bool _lockAndCheckImpl(WriteOpResult* result, bool intentLock); - - ScopedTransaction _transaction; - // Guard object for the write lock on the target database. - std::unique_ptr<Lock::DBLock> _dbLock; - std::unique_ptr<Lock::CollectionLock> _collLock; - - Database* _database = nullptr; - Collection* _collection = nullptr; - }; - - void WriteBatchExecutor::bulkExecute( const BatchedCommandRequest& request, - std::vector<BatchedUpsertDetail*>* upsertedIds, - std::vector<WriteErrorDetail*>* errors ) { - boost::optional<DisableDocumentValidation> maybeDisableValidation; - if (request.shouldBypassValidation()) { - maybeDisableValidation.emplace(_txn); - } + explicit ExecInsertsState(OperationContext* txn, const BatchedCommandRequest* aRequest); - if ( request.getBatchType() == BatchedCommandRequest::BatchType_Insert ) { - execInserts( request, errors ); - } - else if ( request.getBatchType() == BatchedCommandRequest::BatchType_Update ) { - for ( size_t i = 0; i < request.sizeWriteOps(); i++ ) { + /** + * Acquires the write lock and client context needed to perform the current write operation. + * Returns true on success, after which it is safe to use the "context" and "collection" + * members. It is safe to call this function if this instance already holds the write lock. + * + * On failure, writeLock, context and collection will be NULL/clear. + */ + bool lockAndCheck(WriteOpResult* result); - if ( i + 1 == request.sizeWriteOps() ) { - setupSynchronousCommit( _txn ); - } + /** + * Releases the client context and write lock acquired by lockAndCheck. Safe to call + * regardless of whether or not this state object currently owns the lock. + */ + void unlock(); - WriteErrorDetail* error = NULL; - BSONObj upsertedId; - execUpdate( BatchItemRef( &request, i ), &upsertedId, &error ); + /** + * Returns true if this executor has the lock on the target database. + */ + bool hasLock() { + return _dbLock.get(); + } - if ( !upsertedId.isEmpty() ) { - BatchedUpsertDetail* batchUpsertedId = new BatchedUpsertDetail; - batchUpsertedId->setIndex( i ); - batchUpsertedId->setUpsertedID( upsertedId ); - upsertedIds->push_back( batchUpsertedId ); - } + /** + * Gets the target collection for the batch operation. Value is undefined + * unless hasLock() is true. + */ + Collection* getCollection() { + return _collection; + } - if ( error ) { - errors->push_back( error ); - if ( request.getOrdered() ) - break; - } - } - } - else { - dassert( request.getBatchType() == BatchedCommandRequest::BatchType_Delete ); - for ( size_t i = 0; i < request.sizeWriteOps(); i++ ) { + OperationContext* txn; - if ( i + 1 == request.sizeWriteOps() ) { - setupSynchronousCommit( _txn ); - } + // Request object describing the inserts. + const BatchedCommandRequest* request; - WriteErrorDetail* error = NULL; - execRemove( BatchItemRef( &request, i ), &error ); + // Index of the current insert operation to perform. + size_t currIndex = 0; - if ( error ) { - errors->push_back( error ); - if ( request.getOrdered() ) - break; - } - } - } - - // Fill in stale version errors for unordered batches (update/delete can't do this on own) - if ( !errors->empty() && !request.getOrdered() ) { + // Translation of insert documents in "request" into insert-ready forms. This vector has a + // correspondence with elements of the "request", and "currIndex" is used to + // index both. + std::vector<StatusWith<BSONObj>> normalizedInserts; - const WriteErrorDetail* finalError = errors->back(); +private: + bool _lockAndCheckImpl(WriteOpResult* result, bool intentLock); - if ( finalError->getErrCode() == ErrorCodes::StaleShardVersion ) { - for ( size_t i = finalError->getIndex() + 1; i < request.sizeWriteOps(); i++ ) { - WriteErrorDetail* dupStaleError = new WriteErrorDetail; - finalError->cloneTo( dupStaleError ); - errors->push_back( dupStaleError ); - } - } - } - } + ScopedTransaction _transaction; + // Guard object for the write lock on the target database. + std::unique_ptr<Lock::DBLock> _dbLock; + std::unique_ptr<Lock::CollectionLock> _collLock; - // Goes over the request and preprocesses normalized versions of all the inserts in the request - static void normalizeInserts( const BatchedCommandRequest& request, - vector<StatusWith<BSONObj> >* normalizedInserts ) { + Database* _database = nullptr; + Collection* _collection = nullptr; +}; - normalizedInserts->reserve(request.sizeWriteOps()); - for ( size_t i = 0; i < request.sizeWriteOps(); ++i ) { - BSONObj insertDoc = request.getInsertRequest()->getDocumentsAt( i ); - StatusWith<BSONObj> normalInsert = fixDocumentForInsert( insertDoc ); - normalizedInserts->push_back( normalInsert ); - if ( request.getOrdered() && !normalInsert.isOK() ) - break; - } +void WriteBatchExecutor::bulkExecute(const BatchedCommandRequest& request, + std::vector<BatchedUpsertDetail*>* upsertedIds, + std::vector<WriteErrorDetail*>* errors) { + boost::optional<DisableDocumentValidation> maybeDisableValidation; + if (request.shouldBypassValidation()) { + maybeDisableValidation.emplace(_txn); } - void WriteBatchExecutor::execInserts( const BatchedCommandRequest& request, - std::vector<WriteErrorDetail*>* errors ) { - - // Theory of operation: - // - // Instantiates an ExecInsertsState, which represents all of the state involved in the batch - // insert execution algorithm. Most importantly, encapsulates the lock state. - // - // Every iteration of the loop in execInserts() processes one document insertion, by calling - // insertOne() exactly once for a given value of state.currIndex. - // - // If the ExecInsertsState indicates that the requisite write locks are not held, insertOne - // acquires them and performs lock-acquisition-time checks. However, on non-error - // execution, it does not release the locks. Therefore, the yielding logic in the while - // loop in execInserts() is solely responsible for lock release in the non-error case. - // - // Internally, insertOne loops performing the single insert until it completes without a - // PageFaultException, or until it fails with some kind of error. Errors are mostly - // propagated via the request->error field, but DBExceptions or std::exceptions may escape, - // particularly on operation interruption. These kinds of errors necessarily prevent - // further insertOne calls, and stop the batch. As a result, the only expected source of - // such exceptions are interruptions. - ExecInsertsState state(_txn, &request); - normalizeInserts(request, &state.normalizedInserts); - - ShardedConnectionInfo* info = ShardedConnectionInfo::get(_txn->getClient(), false); - if (info) { - if (request.isMetadataSet() && request.getMetadata()->isShardVersionSet()) { - info->setVersion(request.getTargetingNS(), - request.getMetadata()->getShardVersion()); - } - else { - info->setVersion(request.getTargetingNS(), ChunkVersion::IGNORED()); - } - } - - // Yield frequency is based on the same constants used by PlanYieldPolicy. - ElapsedTracker elapsedTracker(internalQueryExecYieldIterations, - internalQueryExecYieldPeriodMS); - - for (state.currIndex = 0; - state.currIndex < state.request->sizeWriteOps(); - ++state.currIndex) { - - if (state.currIndex + 1 == state.request->sizeWriteOps()) { + if (request.getBatchType() == BatchedCommandRequest::BatchType_Insert) { + execInserts(request, errors); + } else if (request.getBatchType() == BatchedCommandRequest::BatchType_Update) { + for (size_t i = 0; i < request.sizeWriteOps(); i++) { + if (i + 1 == request.sizeWriteOps()) { setupSynchronousCommit(_txn); } - if (elapsedTracker.intervalHasElapsed()) { - // Yield between inserts. - if (state.hasLock()) { - // Release our locks. They get reacquired when insertOne() calls - // ExecInsertsState::lockAndCheck(). Since the lock manager guarantees FIFO - // queues waiting on locks, there is no need to explicitly sleep or give up - // control of the processor here. - state.unlock(); - - // This releases any storage engine held locks/snapshots. - _txn->recoveryUnit()->abandonSnapshot(); - } + WriteErrorDetail* error = NULL; + BSONObj upsertedId; + execUpdate(BatchItemRef(&request, i), &upsertedId, &error); + + if (!upsertedId.isEmpty()) { + BatchedUpsertDetail* batchUpsertedId = new BatchedUpsertDetail; + batchUpsertedId->setIndex(i); + batchUpsertedId->setUpsertedID(upsertedId); + upsertedIds->push_back(batchUpsertedId); + } - _txn->checkForInterrupt(); - elapsedTracker.resetLastTime(); + if (error) { + errors->push_back(error); + if (request.getOrdered()) + break; + } + } + } else { + dassert(request.getBatchType() == BatchedCommandRequest::BatchType_Delete); + for (size_t i = 0; i < request.sizeWriteOps(); i++) { + if (i + 1 == request.sizeWriteOps()) { + setupSynchronousCommit(_txn); } WriteErrorDetail* error = NULL; - execOneInsert(&state, &error); + execRemove(BatchItemRef(&request, i), &error); + if (error) { errors->push_back(error); - error->setIndex(state.currIndex); if (request.getOrdered()) - return; + break; } } } - void WriteBatchExecutor::execUpdate( const BatchItemRef& updateItem, - BSONObj* upsertedId, - WriteErrorDetail** error ) { - - // BEGIN CURRENT OP - CurOp currentOp(_txn); - beginCurrentOp(_txn, updateItem); - incOpStats( updateItem ); - - ShardedConnectionInfo* info = ShardedConnectionInfo::get(_txn->getClient(), false); - if (info) { - auto rootRequest = updateItem.getRequest(); - if (!updateItem.getUpdate()->getMulti() && - rootRequest->isMetadataSet() && - rootRequest->getMetadata()->isShardVersionSet()) { - info->setVersion(rootRequest->getTargetingNS(), - rootRequest->getMetadata()->getShardVersion()); - } - else { - info->setVersion(rootRequest->getTargetingNS(), ChunkVersion::IGNORED()); + // Fill in stale version errors for unordered batches (update/delete can't do this on own) + if (!errors->empty() && !request.getOrdered()) { + const WriteErrorDetail* finalError = errors->back(); + + if (finalError->getErrCode() == ErrorCodes::StaleShardVersion) { + for (size_t i = finalError->getIndex() + 1; i < request.sizeWriteOps(); i++) { + WriteErrorDetail* dupStaleError = new WriteErrorDetail; + finalError->cloneTo(dupStaleError); + errors->push_back(dupStaleError); } } + } +} + +// Goes over the request and preprocesses normalized versions of all the inserts in the request +static void normalizeInserts(const BatchedCommandRequest& request, + vector<StatusWith<BSONObj>>* normalizedInserts) { + normalizedInserts->reserve(request.sizeWriteOps()); + for (size_t i = 0; i < request.sizeWriteOps(); ++i) { + BSONObj insertDoc = request.getInsertRequest()->getDocumentsAt(i); + StatusWith<BSONObj> normalInsert = fixDocumentForInsert(insertDoc); + normalizedInserts->push_back(normalInsert); + if (request.getOrdered() && !normalInsert.isOK()) + break; + } +} - WriteOpResult result; - - multiUpdate( _txn, updateItem, &result ); - - if ( !result.getStats().upsertedID.isEmpty() ) { - *upsertedId = result.getStats().upsertedID; +void WriteBatchExecutor::execInserts(const BatchedCommandRequest& request, + std::vector<WriteErrorDetail*>* errors) { + // Theory of operation: + // + // Instantiates an ExecInsertsState, which represents all of the state involved in the batch + // insert execution algorithm. Most importantly, encapsulates the lock state. + // + // Every iteration of the loop in execInserts() processes one document insertion, by calling + // insertOne() exactly once for a given value of state.currIndex. + // + // If the ExecInsertsState indicates that the requisite write locks are not held, insertOne + // acquires them and performs lock-acquisition-time checks. However, on non-error + // execution, it does not release the locks. Therefore, the yielding logic in the while + // loop in execInserts() is solely responsible for lock release in the non-error case. + // + // Internally, insertOne loops performing the single insert until it completes without a + // PageFaultException, or until it fails with some kind of error. Errors are mostly + // propagated via the request->error field, but DBExceptions or std::exceptions may escape, + // particularly on operation interruption. These kinds of errors necessarily prevent + // further insertOne calls, and stop the batch. As a result, the only expected source of + // such exceptions are interruptions. + ExecInsertsState state(_txn, &request); + normalizeInserts(request, &state.normalizedInserts); + + ShardedConnectionInfo* info = ShardedConnectionInfo::get(_txn->getClient(), false); + if (info) { + if (request.isMetadataSet() && request.getMetadata()->isShardVersionSet()) { + info->setVersion(request.getTargetingNS(), request.getMetadata()->getShardVersion()); + } else { + info->setVersion(request.getTargetingNS(), ChunkVersion::IGNORED()); } - // END CURRENT OP - incWriteStats( updateItem, result.getStats(), result.getError(), ¤tOp ); - finishCurrentOp(_txn, result.getError()); + } - // End current transaction and release snapshot. - _txn->recoveryUnit()->abandonSnapshot(); + // Yield frequency is based on the same constants used by PlanYieldPolicy. + ElapsedTracker elapsedTracker(internalQueryExecYieldIterations, internalQueryExecYieldPeriodMS); - if ( result.getError() ) { - result.getError()->setIndex( updateItem.getItemIndex() ); - *error = result.releaseError(); + for (state.currIndex = 0; state.currIndex < state.request->sizeWriteOps(); ++state.currIndex) { + if (state.currIndex + 1 == state.request->sizeWriteOps()) { + setupSynchronousCommit(_txn); } - } - - void WriteBatchExecutor::execRemove( const BatchItemRef& removeItem, - WriteErrorDetail** error ) { - // Removes are similar to updates, but page faults are handled externally + if (elapsedTracker.intervalHasElapsed()) { + // Yield between inserts. + if (state.hasLock()) { + // Release our locks. They get reacquired when insertOne() calls + // ExecInsertsState::lockAndCheck(). Since the lock manager guarantees FIFO + // queues waiting on locks, there is no need to explicitly sleep or give up + // control of the processor here. + state.unlock(); + + // This releases any storage engine held locks/snapshots. + _txn->recoveryUnit()->abandonSnapshot(); + } - // BEGIN CURRENT OP - CurOp currentOp(_txn); - beginCurrentOp(_txn, removeItem); - incOpStats( removeItem ); + _txn->checkForInterrupt(); + elapsedTracker.resetLastTime(); + } - ShardedConnectionInfo* info = ShardedConnectionInfo::get(_txn->getClient(), false); - if (info) { - auto rootRequest = removeItem.getRequest(); - if (removeItem.getDelete()->getLimit() == 1 && - rootRequest->isMetadataSet() && - rootRequest->getMetadata()->isShardVersionSet()) { - info->setVersion(rootRequest->getTargetingNS(), - rootRequest->getMetadata()->getShardVersion()); - } - else { - info->setVersion(rootRequest->getTargetingNS(), ChunkVersion::IGNORED()); - } + WriteErrorDetail* error = NULL; + execOneInsert(&state, &error); + if (error) { + errors->push_back(error); + error->setIndex(state.currIndex); + if (request.getOrdered()) + return; + } + } +} + +void WriteBatchExecutor::execUpdate(const BatchItemRef& updateItem, + BSONObj* upsertedId, + WriteErrorDetail** error) { + // BEGIN CURRENT OP + CurOp currentOp(_txn); + beginCurrentOp(_txn, updateItem); + incOpStats(updateItem); + + ShardedConnectionInfo* info = ShardedConnectionInfo::get(_txn->getClient(), false); + if (info) { + auto rootRequest = updateItem.getRequest(); + if (!updateItem.getUpdate()->getMulti() && rootRequest->isMetadataSet() && + rootRequest->getMetadata()->isShardVersionSet()) { + info->setVersion(rootRequest->getTargetingNS(), + rootRequest->getMetadata()->getShardVersion()); + } else { + info->setVersion(rootRequest->getTargetingNS(), ChunkVersion::IGNORED()); } + } - WriteOpResult result; + WriteOpResult result; - multiRemove( _txn, removeItem, &result ); + multiUpdate(_txn, updateItem, &result); - // END CURRENT OP - incWriteStats( removeItem, result.getStats(), result.getError(), ¤tOp ); - finishCurrentOp(_txn, result.getError()); + if (!result.getStats().upsertedID.isEmpty()) { + *upsertedId = result.getStats().upsertedID; + } + // END CURRENT OP + incWriteStats(updateItem, result.getStats(), result.getError(), ¤tOp); + finishCurrentOp(_txn, result.getError()); - // End current transaction and release snapshot. - _txn->recoveryUnit()->abandonSnapshot(); + // End current transaction and release snapshot. + _txn->recoveryUnit()->abandonSnapshot(); - if ( result.getError() ) { - result.getError()->setIndex( removeItem.getItemIndex() ); - *error = result.releaseError(); + if (result.getError()) { + result.getError()->setIndex(updateItem.getItemIndex()); + *error = result.releaseError(); + } +} + +void WriteBatchExecutor::execRemove(const BatchItemRef& removeItem, WriteErrorDetail** error) { + // Removes are similar to updates, but page faults are handled externally + + // BEGIN CURRENT OP + CurOp currentOp(_txn); + beginCurrentOp(_txn, removeItem); + incOpStats(removeItem); + + ShardedConnectionInfo* info = ShardedConnectionInfo::get(_txn->getClient(), false); + if (info) { + auto rootRequest = removeItem.getRequest(); + if (removeItem.getDelete()->getLimit() == 1 && rootRequest->isMetadataSet() && + rootRequest->getMetadata()->isShardVersionSet()) { + info->setVersion(rootRequest->getTargetingNS(), + rootRequest->getMetadata()->getShardVersion()); + } else { + info->setVersion(rootRequest->getTargetingNS(), ChunkVersion::IGNORED()); } } - // - // IN-DB-LOCK CORE OPERATIONS - // + WriteOpResult result; + + multiRemove(_txn, removeItem, &result); + + // END CURRENT OP + incWriteStats(removeItem, result.getStats(), result.getError(), ¤tOp); + finishCurrentOp(_txn, result.getError()); + + // End current transaction and release snapshot. + _txn->recoveryUnit()->abandonSnapshot(); - WriteBatchExecutor::ExecInsertsState::ExecInsertsState(OperationContext* txn, - const BatchedCommandRequest* aRequest) : - txn(txn), - request(aRequest), - _transaction(txn, MODE_IX) { + if (result.getError()) { + result.getError()->setIndex(removeItem.getItemIndex()); + *error = result.releaseError(); } +} - bool WriteBatchExecutor::ExecInsertsState::_lockAndCheckImpl(WriteOpResult* result, - bool intentLock) { - if (hasLock()) { - CurOp::get(txn)->raiseDbProfileLevel(_database->getProfilingLevel()); - return true; - } +// +// IN-DB-LOCK CORE OPERATIONS +// - if (request->isInsertIndexRequest()) - intentLock = false; // can't build indexes in intent mode - - const NamespaceString& nss = request->getNSS(); - invariant(!_collLock); - invariant(!_dbLock); - _dbLock = stdx::make_unique<Lock::DBLock>(txn->lockState(), - nss.db(), - intentLock ? MODE_IX : MODE_X); - _database = dbHolder().get(txn, nss.ns()); - if (intentLock && !_database) { - // Ensure exclusive lock in case the database doesn't yet exist - _dbLock.reset(); - _dbLock = stdx::make_unique<Lock::DBLock>(txn->lockState(), nss.db(), MODE_X); - intentLock = false; - } - _collLock = stdx::make_unique<Lock::CollectionLock>(txn->lockState(), - nss.ns(), - intentLock ? MODE_IX : MODE_X); - if (!checkIsMasterForDatabase(nss, result)) { - return false; - } - if (!checkShardVersion(txn, &shardingState, *request, result)) { - return false; - } - if (!checkIndexConstraints(txn, &shardingState, *request, result)) { - return false; - } +WriteBatchExecutor::ExecInsertsState::ExecInsertsState(OperationContext* txn, + const BatchedCommandRequest* aRequest) + : txn(txn), request(aRequest), _transaction(txn, MODE_IX) {} - if (!_database) { - invariant(!intentLock); - _database = dbHolder().openDb(txn, nss.ns()); - } +bool WriteBatchExecutor::ExecInsertsState::_lockAndCheckImpl(WriteOpResult* result, + bool intentLock) { + if (hasLock()) { CurOp::get(txn)->raiseDbProfileLevel(_database->getProfilingLevel()); - _collection = _database->getCollection(request->getTargetingNS()); - if (!_collection) { - if (intentLock) { - // try again with full X lock. - unlock(); - return _lockAndCheckImpl(result, false); - } - - WriteUnitOfWork wunit (txn); - // Implicitly create if it doesn't exist - _collection = _database->createCollection(txn, request->getTargetingNS()); - if (!_collection) { - result->setError( - toWriteError(Status(ErrorCodes::InternalError, - "could not create collection " + - request->getTargetingNS()))); - return false; - } - wunit.commit(); - } return true; } - bool WriteBatchExecutor::ExecInsertsState::lockAndCheck(WriteOpResult* result) { - if (_lockAndCheckImpl(result, true)) - return true; - unlock(); + if (request->isInsertIndexRequest()) + intentLock = false; // can't build indexes in intent mode + + const NamespaceString& nss = request->getNSS(); + invariant(!_collLock); + invariant(!_dbLock); + _dbLock = + stdx::make_unique<Lock::DBLock>(txn->lockState(), nss.db(), intentLock ? MODE_IX : MODE_X); + _database = dbHolder().get(txn, nss.ns()); + if (intentLock && !_database) { + // Ensure exclusive lock in case the database doesn't yet exist + _dbLock.reset(); + _dbLock = stdx::make_unique<Lock::DBLock>(txn->lockState(), nss.db(), MODE_X); + intentLock = false; + } + _collLock = stdx::make_unique<Lock::CollectionLock>( + txn->lockState(), nss.ns(), intentLock ? MODE_IX : MODE_X); + if (!checkIsMasterForDatabase(nss, result)) { return false; } - - void WriteBatchExecutor::ExecInsertsState::unlock() { - _collection = nullptr; - _database = nullptr; - _collLock.reset(); - _dbLock.reset(); + if (!checkShardVersion(txn, &shardingState, *request, result)) { + return false; + } + if (!checkIndexConstraints(txn, &shardingState, *request, result)) { + return false; } - static void insertOne(WriteBatchExecutor::ExecInsertsState* state, WriteOpResult* result) { - // we have to be top level so we can retry - invariant(!state->txn->lockState()->inAWriteUnitOfWork() ); - invariant(state->currIndex < state->normalizedInserts.size()); - - const StatusWith<BSONObj>& normalizedInsert(state->normalizedInserts[state->currIndex]); + if (!_database) { + invariant(!intentLock); + _database = dbHolder().openDb(txn, nss.ns()); + } + CurOp::get(txn)->raiseDbProfileLevel(_database->getProfilingLevel()); + _collection = _database->getCollection(request->getTargetingNS()); + if (!_collection) { + if (intentLock) { + // try again with full X lock. + unlock(); + return _lockAndCheckImpl(result, false); + } - if (!normalizedInsert.isOK()) { - result->setError(toWriteError(normalizedInsert.getStatus())); - return; + WriteUnitOfWork wunit(txn); + // Implicitly create if it doesn't exist + _collection = _database->createCollection(txn, request->getTargetingNS()); + if (!_collection) { + result->setError( + toWriteError(Status(ErrorCodes::InternalError, + "could not create collection " + request->getTargetingNS()))); + return false; } + wunit.commit(); + } + return true; +} - const BSONObj& insertDoc = normalizedInsert.getValue().isEmpty() ? - state->request->getInsertRequest()->getDocumentsAt( state->currIndex ) : - normalizedInsert.getValue(); +bool WriteBatchExecutor::ExecInsertsState::lockAndCheck(WriteOpResult* result) { + if (_lockAndCheckImpl(result, true)) + return true; + unlock(); + return false; +} + +void WriteBatchExecutor::ExecInsertsState::unlock() { + _collection = nullptr; + _database = nullptr; + _collLock.reset(); + _dbLock.reset(); +} + +static void insertOne(WriteBatchExecutor::ExecInsertsState* state, WriteOpResult* result) { + // we have to be top level so we can retry + invariant(!state->txn->lockState()->inAWriteUnitOfWork()); + invariant(state->currIndex < state->normalizedInserts.size()); + + const StatusWith<BSONObj>& normalizedInsert(state->normalizedInserts[state->currIndex]); + + if (!normalizedInsert.isOK()) { + result->setError(toWriteError(normalizedInsert.getStatus())); + return; + } - int attempt = 0; - while (true) { - try { - if (!state->request->isInsertIndexRequest()) { - if (state->lockAndCheck(result)) { - singleInsert(state->txn, insertDoc, state->getCollection(), result); - } - } - else { - singleCreateIndex(state->txn, insertDoc, result); + const BSONObj& insertDoc = normalizedInsert.getValue().isEmpty() + ? state->request->getInsertRequest()->getDocumentsAt(state->currIndex) + : normalizedInsert.getValue(); + + int attempt = 0; + while (true) { + try { + if (!state->request->isInsertIndexRequest()) { + if (state->lockAndCheck(result)) { + singleInsert(state->txn, insertDoc, state->getCollection(), result); } - break; - } - catch ( const WriteConflictException& wce ) { - state->unlock(); - CurOp::get(state->txn)->debug().writeConflicts++; - state->txn->recoveryUnit()->abandonSnapshot(); - WriteConflictException::logAndBackoff( attempt++, - "insert", - state->getCollection() ? - state->getCollection()->ns().ns() : - "index" ); - } - catch (const StaleConfigException& staleExcep) { - result->setError(new WriteErrorDetail); - result->getError()->setErrCode(ErrorCodes::StaleShardVersion); - buildStaleError(staleExcep.getVersionReceived(), - staleExcep.getVersionWanted(), - result->getError()); - break; + } else { + singleCreateIndex(state->txn, insertDoc, result); } - catch (const DBException& ex) { - Status status(ex.toStatus()); - if (ErrorCodes::isInterruption(status.code())) - throw; - result->setError(toWriteError(status)); - break; - } - } - - // Errors release the write lock, as a matter of policy. - if (result->getError()) { - state->txn->recoveryUnit()->abandonSnapshot(); + break; + } catch (const WriteConflictException& wce) { state->unlock(); + CurOp::get(state->txn)->debug().writeConflicts++; + state->txn->recoveryUnit()->abandonSnapshot(); + WriteConflictException::logAndBackoff( + attempt++, + "insert", + state->getCollection() ? state->getCollection()->ns().ns() : "index"); + } catch (const StaleConfigException& staleExcep) { + result->setError(new WriteErrorDetail); + result->getError()->setErrCode(ErrorCodes::StaleShardVersion); + buildStaleError( + staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError()); + break; + } catch (const DBException& ex) { + Status status(ex.toStatus()); + if (ErrorCodes::isInterruption(status.code())) + throw; + result->setError(toWriteError(status)); + break; } } - void WriteBatchExecutor::execOneInsert(ExecInsertsState* state, WriteErrorDetail** error) { - BatchItemRef currInsertItem(state->request, state->currIndex); - CurOp currentOp(_txn); - beginCurrentOp(_txn, currInsertItem); - incOpStats(currInsertItem); + // Errors release the write lock, as a matter of policy. + if (result->getError()) { + state->txn->recoveryUnit()->abandonSnapshot(); + state->unlock(); + } +} - WriteOpResult result; - insertOne(state, &result); +void WriteBatchExecutor::execOneInsert(ExecInsertsState* state, WriteErrorDetail** error) { + BatchItemRef currInsertItem(state->request, state->currIndex); + CurOp currentOp(_txn); + beginCurrentOp(_txn, currInsertItem); + incOpStats(currInsertItem); - incWriteStats(currInsertItem, - result.getStats(), - result.getError(), - ¤tOp); - finishCurrentOp(_txn, result.getError()); + WriteOpResult result; + insertOne(state, &result); - if (result.getError()) { - *error = result.releaseError(); - } - } + incWriteStats(currInsertItem, result.getStats(), result.getError(), ¤tOp); + finishCurrentOp(_txn, result.getError()); - /** - * Perform a single insert into a collection. Requires the insert be preprocessed and the - * collection already has been created. - * - * Might fault or error, otherwise populates the result. - */ - static void singleInsert( OperationContext* txn, - const BSONObj& docToInsert, - Collection* collection, - WriteOpResult* result ) { - - const string& insertNS = collection->ns().ns(); - invariant(txn->lockState()->isCollectionLockedForMode(insertNS, MODE_IX)); + if (result.getError()) { + *error = result.releaseError(); + } +} - WriteUnitOfWork wunit(txn); - StatusWith<RecordId> status = collection->insertDocument( txn, docToInsert, true ); +/** + * Perform a single insert into a collection. Requires the insert be preprocessed and the + * collection already has been created. + * + * Might fault or error, otherwise populates the result. + */ +static void singleInsert(OperationContext* txn, + const BSONObj& docToInsert, + Collection* collection, + WriteOpResult* result) { + const string& insertNS = collection->ns().ns(); + invariant(txn->lockState()->isCollectionLockedForMode(insertNS, MODE_IX)); + + WriteUnitOfWork wunit(txn); + StatusWith<RecordId> status = collection->insertDocument(txn, docToInsert, true); + + if (!status.isOK()) { + result->setError(toWriteError(status.getStatus())); + } else { + result->getStats().n = 1; + wunit.commit(); + } +} - if ( !status.isOK() ) { - result->setError(toWriteError(status.getStatus())); - } - else { - result->getStats().n = 1; - wunit.commit(); +/** + * Perform a single index creation on a collection. Requires the index descriptor be + * preprocessed. + * + * Might fault or error, otherwise populates the result. + */ +static void singleCreateIndex(OperationContext* txn, + const BSONObj& indexDesc, + WriteOpResult* result) { + BSONElement nsElement = indexDesc["ns"]; + uassert(ErrorCodes::NoSuchKey, "Missing \"ns\" field in index description", !nsElement.eoo()); + uassert(ErrorCodes::TypeMismatch, + str::stream() << "Expected \"ns\" field of index description to be a " + "string, " + "but found a " << typeName(nsElement.type()), + nsElement.type() == String); + const NamespaceString ns(nsElement.valueStringData()); + BSONObjBuilder cmdBuilder; + cmdBuilder << "createIndexes" << ns.coll(); + cmdBuilder << "indexes" << BSON_ARRAY(indexDesc); + BSONObj cmd = cmdBuilder.done(); + Command* createIndexesCmd = Command::findCommand("createIndexes"); + invariant(createIndexesCmd); + std::string errmsg; + BSONObjBuilder resultBuilder; + const bool success = + createIndexesCmd->run(txn, ns.db().toString(), cmd, 0, errmsg, resultBuilder); + Command::appendCommandStatus(resultBuilder, success, errmsg); + BSONObj cmdResult = resultBuilder.done(); + uassertStatusOK(Command::getStatusFromCommandResult(cmdResult)); + result->getStats().n = + cmdResult["numIndexesAfter"].numberInt() - cmdResult["numIndexesBefore"].numberInt(); +} + +static void multiUpdate(OperationContext* txn, + const BatchItemRef& updateItem, + WriteOpResult* result) { + const NamespaceString nsString(updateItem.getRequest()->getNS()); + const bool isMulti = updateItem.getUpdate()->getMulti(); + UpdateRequest request(nsString); + request.setQuery(updateItem.getUpdate()->getQuery()); + request.setUpdates(updateItem.getUpdate()->getUpdateExpr()); + request.setMulti(isMulti); + request.setUpsert(updateItem.getUpdate()->getUpsert()); + UpdateLifecycleImpl updateLifecycle(true, request.getNamespaceString()); + request.setLifecycle(&updateLifecycle); + + // Updates from the write commands path can yield. + request.setYieldPolicy(PlanExecutor::YIELD_AUTO); + + int attempt = 0; + bool createCollection = false; + for (int fakeLoop = 0; fakeLoop < 1; fakeLoop++) { + ParsedUpdate parsedUpdate(txn, &request); + Status status = parsedUpdate.parseRequest(); + if (!status.isOK()) { + result->setError(toWriteError(status)); + return; } - } - /** - * Perform a single index creation on a collection. Requires the index descriptor be - * preprocessed. - * - * Might fault or error, otherwise populates the result. - */ - static void singleCreateIndex(OperationContext* txn, - const BSONObj& indexDesc, - WriteOpResult* result) { + if (createCollection) { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + const AutoGetOrCreateDb adb{txn, nsString.db(), MODE_X}; - BSONElement nsElement = indexDesc["ns"]; - uassert(ErrorCodes::NoSuchKey, - "Missing \"ns\" field in index description", - !nsElement.eoo()); - uassert(ErrorCodes::TypeMismatch, - str::stream() << "Expected \"ns\" field of index description to be a " "string, " - "but found a " << typeName(nsElement.type()), - nsElement.type() == String); - const NamespaceString ns(nsElement.valueStringData()); - BSONObjBuilder cmdBuilder; - cmdBuilder << "createIndexes" << ns.coll(); - cmdBuilder << "indexes" << BSON_ARRAY(indexDesc); - BSONObj cmd = cmdBuilder.done(); - Command* createIndexesCmd = Command::findCommand("createIndexes"); - invariant(createIndexesCmd); - std::string errmsg; - BSONObjBuilder resultBuilder; - const bool success = createIndexesCmd->run( - txn, - ns.db().toString(), - cmd, - 0, - errmsg, - resultBuilder); - Command::appendCommandStatus(resultBuilder, success, errmsg); - BSONObj cmdResult = resultBuilder.done(); - uassertStatusOK(Command::getStatusFromCommandResult(cmdResult)); - result->getStats().n = - cmdResult["numIndexesAfter"].numberInt() - cmdResult["numIndexesBefore"].numberInt(); - } + if (!checkIsMasterForDatabase(nsString, result)) { + return; + } - static void multiUpdate( OperationContext* txn, - const BatchItemRef& updateItem, - WriteOpResult* result ) { - - const NamespaceString nsString(updateItem.getRequest()->getNS()); - const bool isMulti = updateItem.getUpdate()->getMulti(); - UpdateRequest request(nsString); - request.setQuery(updateItem.getUpdate()->getQuery()); - request.setUpdates(updateItem.getUpdate()->getUpdateExpr()); - request.setMulti(isMulti); - request.setUpsert(updateItem.getUpdate()->getUpsert()); - UpdateLifecycleImpl updateLifecycle(true, request.getNamespaceString()); - request.setLifecycle(&updateLifecycle); - - // Updates from the write commands path can yield. - request.setYieldPolicy(PlanExecutor::YIELD_AUTO); - - int attempt = 0; - bool createCollection = false; - for ( int fakeLoop = 0; fakeLoop < 1; fakeLoop++ ) { - - ParsedUpdate parsedUpdate(txn, &request); - Status status = parsedUpdate.parseRequest(); - if (!status.isOK()) { - result->setError(toWriteError(status)); - return; + Database* const db = adb.getDb(); + if (db->getCollection(nsString.ns())) { + // someone else beat us to it + } else { + WriteUnitOfWork wuow(txn); + uassertStatusOK(userCreateNS(txn, db, nsString.ns(), BSONObj())); + wuow.commit(); + } } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "update", nsString.ns()); + } - if ( createCollection ) { - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - const AutoGetOrCreateDb adb{txn, nsString.db(), MODE_X}; + /////////////////////////////////////////// + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX); + Lock::CollectionLock colLock( + txn->lockState(), nsString.ns(), parsedUpdate.isIsolated() ? MODE_X : MODE_IX); + /////////////////////////////////////////// - if (!checkIsMasterForDatabase(nsString, result)) { - return; - } + if (!checkIsMasterForDatabase(nsString, result)) { + return; + } - Database* const db = adb.getDb(); - if ( db->getCollection( nsString.ns() ) ) { - // someone else beat us to it - } - else { - WriteUnitOfWork wuow(txn); - uassertStatusOK(userCreateNS(txn, db, nsString.ns(), BSONObj())); - wuow.commit(); - } - } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "update", nsString.ns()); - } + if (!checkShardVersion(txn, &shardingState, *updateItem.getRequest(), result)) + return; - /////////////////////////////////////////// - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX); - Lock::CollectionLock colLock(txn->lockState(), - nsString.ns(), - parsedUpdate.isIsolated() ? MODE_X : MODE_IX); - /////////////////////////////////////////// + Database* const db = dbHolder().get(txn, nsString.db()); - if (!checkIsMasterForDatabase(nsString, result)) { + if (db == NULL) { + if (createCollection) { + // we raced with some, accept defeat + result->getStats().nModified = 0; + result->getStats().n = 0; return; } - if (!checkShardVersion(txn, &shardingState, *updateItem.getRequest(), result)) + // Database not yet created + if (!request.isUpsert()) { + // not an upsert, no database, nothing to do + result->getStats().nModified = 0; + result->getStats().n = 0; return; - - Database* const db = dbHolder().get(txn, nsString.db()); - - if (db == NULL) { - if (createCollection) { - // we raced with some, accept defeat - result->getStats().nModified = 0; - result->getStats().n = 0; - return; - } - - // Database not yet created - if (!request.isUpsert()) { - // not an upsert, no database, nothing to do - result->getStats().nModified = 0; - result->getStats().n = 0; - return; - } - - // upsert, don't try to get a context as no MODE_X lock is held - fakeLoop = -1; - createCollection = true; - continue; } - CurOp::get(txn)->raiseDbProfileLevel(db->getProfilingLevel()); - Collection* collection = db->getCollection(nsString.ns()); - - if ( collection == NULL ) { - if ( createCollection ) { - // we raced with some, accept defeat - result->getStats().nModified = 0; - result->getStats().n = 0; - return; - } + // upsert, don't try to get a context as no MODE_X lock is held + fakeLoop = -1; + createCollection = true; + continue; + } - if ( !request.isUpsert() ) { - // not an upsert, no collection, nothing to do - result->getStats().nModified = 0; - result->getStats().n = 0; - return; - } + CurOp::get(txn)->raiseDbProfileLevel(db->getProfilingLevel()); + Collection* collection = db->getCollection(nsString.ns()); - // upsert, mark that we should create collection - fakeLoop = -1; - createCollection = true; - continue; + if (collection == NULL) { + if (createCollection) { + // we raced with some, accept defeat + result->getStats().nModified = 0; + result->getStats().n = 0; + return; } - OpDebug* debug = &CurOp::get(txn)->debug(); + if (!request.isUpsert()) { + // not an upsert, no collection, nothing to do + result->getStats().nModified = 0; + result->getStats().n = 0; + return; + } - try { - invariant(collection); - PlanExecutor* rawExec; - uassertStatusOK(getExecutorUpdate(txn, collection, &parsedUpdate, debug, &rawExec)); - std::unique_ptr<PlanExecutor> exec(rawExec); + // upsert, mark that we should create collection + fakeLoop = -1; + createCollection = true; + continue; + } - uassertStatusOK(exec->executePlan()); - UpdateResult res = UpdateStage::makeUpdateResult(exec.get(), debug); + OpDebug* debug = &CurOp::get(txn)->debug(); - const long long numDocsModified = res.numDocsModified; - const long long numMatched = res.numMatched; - const BSONObj resUpsertedID = res.upserted; + try { + invariant(collection); + PlanExecutor* rawExec; + uassertStatusOK(getExecutorUpdate(txn, collection, &parsedUpdate, debug, &rawExec)); + std::unique_ptr<PlanExecutor> exec(rawExec); - // We have an _id from an insert - const bool didInsert = !resUpsertedID.isEmpty(); + uassertStatusOK(exec->executePlan()); + UpdateResult res = UpdateStage::makeUpdateResult(exec.get(), debug); - result->getStats().nModified = didInsert ? 0 : numDocsModified; - result->getStats().n = didInsert ? 1 : numMatched; - result->getStats().upsertedID = resUpsertedID; - } - catch ( const WriteConflictException& dle ) { - debug->writeConflicts++; - if ( isMulti ) { - log() << "Had WriteConflict during multi update, aborting"; - throw; - } + const long long numDocsModified = res.numDocsModified; + const long long numMatched = res.numMatched; + const BSONObj resUpsertedID = res.upserted; - createCollection = false; - // RESTART LOOP - fakeLoop = -1; - txn->recoveryUnit()->abandonSnapshot(); + // We have an _id from an insert + const bool didInsert = !resUpsertedID.isEmpty(); - WriteConflictException::logAndBackoff( attempt++, "update", nsString.ns() ); - } - catch (const StaleConfigException& staleExcep) { - result->setError(new WriteErrorDetail); - result->getError()->setErrCode(ErrorCodes::StaleShardVersion); - buildStaleError(staleExcep.getVersionReceived(), - staleExcep.getVersionWanted(), - result->getError()); + result->getStats().nModified = didInsert ? 0 : numDocsModified; + result->getStats().n = didInsert ? 1 : numMatched; + result->getStats().upsertedID = resUpsertedID; + } catch (const WriteConflictException& dle) { + debug->writeConflicts++; + if (isMulti) { + log() << "Had WriteConflict during multi update, aborting"; + throw; } - catch (const DBException& ex) { - Status status = ex.toStatus(); - if (ErrorCodes::isInterruption(status.code())) { - throw; - } - result->setError(toWriteError(status)); + + createCollection = false; + // RESTART LOOP + fakeLoop = -1; + txn->recoveryUnit()->abandonSnapshot(); + + WriteConflictException::logAndBackoff(attempt++, "update", nsString.ns()); + } catch (const StaleConfigException& staleExcep) { + result->setError(new WriteErrorDetail); + result->getError()->setErrCode(ErrorCodes::StaleShardVersion); + buildStaleError( + staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError()); + } catch (const DBException& ex) { + Status status = ex.toStatus(); + if (ErrorCodes::isInterruption(status.code())) { + throw; } + result->setError(toWriteError(status)); } } +} - /** - * Perform a remove operation, which might remove multiple documents. Dispatches to remove code - * currently to do most of this. - * - * Might fault or error, otherwise populates the result. - */ - static void multiRemove( OperationContext* txn, - const BatchItemRef& removeItem, - WriteOpResult* result ) { - - const NamespaceString& nss = removeItem.getRequest()->getNSS(); - DeleteRequest request(nss); - request.setQuery( removeItem.getDelete()->getQuery() ); - request.setMulti( removeItem.getDelete()->getLimit() != 1 ); - request.setGod( false ); - - // Deletes running through the write commands path can yield. - request.setYieldPolicy(PlanExecutor::YIELD_AUTO); - - int attempt = 1; - while ( 1 ) { - try { - - ParsedDelete parsedDelete(txn, &request); - Status status = parsedDelete.parseRequest(); - if (!status.isOK()) { - result->setError(toWriteError(status)); - return; - } - - ScopedTransaction scopedXact(txn, MODE_IX); - AutoGetDb autoDb(txn, nss.db(), MODE_IX); - if (!autoDb.getDb()) { - break; - } - - CurOp::get(txn)->raiseDbProfileLevel(autoDb.getDb()->getProfilingLevel()); - Lock::CollectionLock collLock(txn->lockState(), - nss.ns(), - parsedDelete.isIsolated() ? MODE_X : MODE_IX); - - // getExecutorDelete() also checks if writes are allowed. - if (!checkIsMasterForDatabase(nss, result)) { - return; - } - // Check version once we're locked - - if (!checkShardVersion(txn, &shardingState, *removeItem.getRequest(), result)) { - // Version error - return; - } - - PlanExecutor* rawExec; - uassertStatusOK(getExecutorDelete(txn, - autoDb.getDb()->getCollection(nss), - &parsedDelete, - &rawExec)); - std::unique_ptr<PlanExecutor> exec(rawExec); - - // Execute the delete and retrieve the number deleted. - uassertStatusOK(exec->executePlan()); - result->getStats().n = DeleteStage::getNumDeleted(exec.get()); +/** + * Perform a remove operation, which might remove multiple documents. Dispatches to remove code + * currently to do most of this. + * + * Might fault or error, otherwise populates the result. + */ +static void multiRemove(OperationContext* txn, + const BatchItemRef& removeItem, + WriteOpResult* result) { + const NamespaceString& nss = removeItem.getRequest()->getNSS(); + DeleteRequest request(nss); + request.setQuery(removeItem.getDelete()->getQuery()); + request.setMulti(removeItem.getDelete()->getLimit() != 1); + request.setGod(false); + + // Deletes running through the write commands path can yield. + request.setYieldPolicy(PlanExecutor::YIELD_AUTO); + + int attempt = 1; + while (1) { + try { + ParsedDelete parsedDelete(txn, &request); + Status status = parsedDelete.parseRequest(); + if (!status.isOK()) { + result->setError(toWriteError(status)); + return; + } + ScopedTransaction scopedXact(txn, MODE_IX); + AutoGetDb autoDb(txn, nss.db(), MODE_IX); + if (!autoDb.getDb()) { break; } - catch ( const WriteConflictException& dle ) { - CurOp::get(txn)->debug().writeConflicts++; - WriteConflictException::logAndBackoff( attempt++, "delete", nss.ns() ); - } - catch (const StaleConfigException& staleExcep) { - result->setError(new WriteErrorDetail); - result->getError()->setErrCode(ErrorCodes::StaleShardVersion); - buildStaleError(staleExcep.getVersionReceived(), - staleExcep.getVersionWanted(), - result->getError()); + + CurOp::get(txn)->raiseDbProfileLevel(autoDb.getDb()->getProfilingLevel()); + Lock::CollectionLock collLock( + txn->lockState(), nss.ns(), parsedDelete.isIsolated() ? MODE_X : MODE_IX); + + // getExecutorDelete() also checks if writes are allowed. + if (!checkIsMasterForDatabase(nss, result)) { return; } - catch ( const DBException& ex ) { - Status status = ex.toStatus(); - if (ErrorCodes::isInterruption(status.code())) { - throw; - } - result->setError(toWriteError(status)); + // Check version once we're locked + + if (!checkShardVersion(txn, &shardingState, *removeItem.getRequest(), result)) { + // Version error return; } + + PlanExecutor* rawExec; + uassertStatusOK(getExecutorDelete( + txn, autoDb.getDb()->getCollection(nss), &parsedDelete, &rawExec)); + std::unique_ptr<PlanExecutor> exec(rawExec); + + // Execute the delete and retrieve the number deleted. + uassertStatusOK(exec->executePlan()); + result->getStats().n = DeleteStage::getNumDeleted(exec.get()); + + break; + } catch (const WriteConflictException& dle) { + CurOp::get(txn)->debug().writeConflicts++; + WriteConflictException::logAndBackoff(attempt++, "delete", nss.ns()); + } catch (const StaleConfigException& staleExcep) { + result->setError(new WriteErrorDetail); + result->getError()->setErrCode(ErrorCodes::StaleShardVersion); + buildStaleError( + staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError()); + return; + } catch (const DBException& ex) { + Status status = ex.toStatus(); + if (ErrorCodes::isInterruption(status.code())) { + throw; + } + result->setError(toWriteError(status)); + return; } } +} -} // namespace mongo +} // namespace mongo diff --git a/src/mongo/db/commands/write_commands/batch_executor.h b/src/mongo/db/commands/write_commands/batch_executor.h index 0bab41d3ff8..0dd1d71848a 100644 --- a/src/mongo/db/commands/write_commands/batch_executor.h +++ b/src/mongo/db/commands/write_commands/batch_executor.h @@ -40,158 +40,148 @@ namespace mongo { - class BSONObjBuilder; - class CurOp; - class LastError; - class OpCounters; - class OperationContext; - class WriteBatchStats; - struct WriteOpStats; +class BSONObjBuilder; +class CurOp; +class LastError; +class OpCounters; +class OperationContext; +class WriteBatchStats; +struct WriteOpStats; + +/** + * An instance of WriteBatchExecutor is an object capable of issuing a write batch. + */ +class WriteBatchExecutor { + MONGO_DISALLOW_COPYING(WriteBatchExecutor); + +public: + // State object used by private execInserts. TODO: Do not expose this type. + class ExecInsertsState; + + WriteBatchExecutor(OperationContext* txn, OpCounters* opCounters, LastError* le); /** - * An instance of WriteBatchExecutor is an object capable of issuing a write batch. + * Issues writes with requested write concern. Fills response with errors if problems + * occur. */ - class WriteBatchExecutor { - MONGO_DISALLOW_COPYING(WriteBatchExecutor); - public: - - // State object used by private execInserts. TODO: Do not expose this type. - class ExecInsertsState; - - WriteBatchExecutor( OperationContext* txn, - OpCounters* opCounters, - LastError* le ); - - /** - * Issues writes with requested write concern. Fills response with errors if problems - * occur. - */ - void executeBatch( const BatchedCommandRequest& request, BatchedCommandResponse* response ); - - const WriteBatchStats& getStats() const; - - /** - * Does basic validation of the batch request. Returns a non-OK status if - * any problems with the batch are found. - */ - static Status validateBatch( const BatchedCommandRequest& request ); - - private: - /** - * Executes the writes in the batch and returns upserted _ids and write errors. - * Dispatches to one of the three functions below for DBLock, CurOp, and stats management. - */ - void bulkExecute( const BatchedCommandRequest& request, - std::vector<BatchedUpsertDetail*>* upsertedIds, - std::vector<WriteErrorDetail*>* errors ); - - /** - * Executes the inserts of an insert batch and returns the write errors. - * - * Internally uses the DBLock of the request namespace. - * May execute multiple inserts inside the same DBLock, and/or take the DBLock multiple - * times. - */ - void execInserts( const BatchedCommandRequest& request, - std::vector<WriteErrorDetail*>* errors ); - - /** - * Executes a single insert from a batch, described in the opaque "state" object. - */ - void execOneInsert( ExecInsertsState* state, WriteErrorDetail** error ); - - /** - * Executes an update item (which may update many documents or upsert), and returns the - * upserted _id on upsert or error on failure. - * - * Internally uses the DBLock of the update namespace. - * May take the DBLock multiple times. - */ - void execUpdate( const BatchItemRef& updateItem, - BSONObj* upsertedId, - WriteErrorDetail** error ); - - /** - * Executes a delete item (which may remove many documents) and returns an error on failure. - * - * Internally uses the DBLock of the delete namespace. - * May take the DBLock multiple times. - */ - void execRemove( const BatchItemRef& removeItem, WriteErrorDetail** error ); - - /** - * Helper for incrementing stats on the next CurOp. - * - * No lock requirements. - */ - void incOpStats( const BatchItemRef& currWrite ); - - /** - * Helper for incrementing stats after each individual write op. - * - * No lock requirements (though usually done inside write lock to make stats update look - * atomic). - */ - void incWriteStats( const BatchItemRef& currWrite, - const WriteOpStats& stats, - const WriteErrorDetail* error, - CurOp* currentOp ); - - OperationContext* _txn; - - // OpCounters object to update - needed for stats reporting - // Not owned here. - OpCounters* _opCounters; - - // LastError object to use for preparing write results - needed for stats reporting - // Not owned here. - LastError* _le; - - // Stats - std::unique_ptr<WriteBatchStats> _stats; - }; + void executeBatch(const BatchedCommandRequest& request, BatchedCommandResponse* response); + + const WriteBatchStats& getStats() const; /** - * Holds information about the result of a single write operation. + * Does basic validation of the batch request. Returns a non-OK status if + * any problems with the batch are found. */ - struct WriteOpStats { + static Status validateBatch(const BatchedCommandRequest& request); - WriteOpStats() : - n( 0 ), nModified( 0 ) { - } +private: + /** + * Executes the writes in the batch and returns upserted _ids and write errors. + * Dispatches to one of the three functions below for DBLock, CurOp, and stats management. + */ + void bulkExecute(const BatchedCommandRequest& request, + std::vector<BatchedUpsertDetail*>* upsertedIds, + std::vector<WriteErrorDetail*>* errors); - void reset() { - n = 0; - nModified = 0; - upsertedID = BSONObj(); - } + /** + * Executes the inserts of an insert batch and returns the write errors. + * + * Internally uses the DBLock of the request namespace. + * May execute multiple inserts inside the same DBLock, and/or take the DBLock multiple + * times. + */ + void execInserts(const BatchedCommandRequest& request, std::vector<WriteErrorDetail*>* errors); - // Num docs logically affected by this operation. - int n; + /** + * Executes a single insert from a batch, described in the opaque "state" object. + */ + void execOneInsert(ExecInsertsState* state, WriteErrorDetail** error); + + /** + * Executes an update item (which may update many documents or upsert), and returns the + * upserted _id on upsert or error on failure. + * + * Internally uses the DBLock of the update namespace. + * May take the DBLock multiple times. + */ + void execUpdate(const BatchItemRef& updateItem, BSONObj* upsertedId, WriteErrorDetail** error); - // Num docs actually modified by this operation, if applicable (update) - int nModified; + /** + * Executes a delete item (which may remove many documents) and returns an error on failure. + * + * Internally uses the DBLock of the delete namespace. + * May take the DBLock multiple times. + */ + void execRemove(const BatchItemRef& removeItem, WriteErrorDetail** error); - // _id of newly upserted document, if applicable (update) - BSONObj upsertedID; - }; + /** + * Helper for incrementing stats on the next CurOp. + * + * No lock requirements. + */ + void incOpStats(const BatchItemRef& currWrite); /** - * Full stats accumulated by a write batch execution. Note that these stats do not directly - * correspond to the stats accumulated in opCounters and LastError. + * Helper for incrementing stats after each individual write op. + * + * No lock requirements (though usually done inside write lock to make stats update look + * atomic). */ - class WriteBatchStats { - public: + void incWriteStats(const BatchItemRef& currWrite, + const WriteOpStats& stats, + const WriteErrorDetail* error, + CurOp* currentOp); - WriteBatchStats() : - numInserted( 0 ), numUpserted( 0 ), numMatched( 0 ), numModified( 0 ), numDeleted( 0 ) { - } + OperationContext* _txn; - int numInserted; - int numUpserted; - int numMatched; - int numModified; - int numDeleted; - }; + // OpCounters object to update - needed for stats reporting + // Not owned here. + OpCounters* _opCounters; -} // namespace mongo + // LastError object to use for preparing write results - needed for stats reporting + // Not owned here. + LastError* _le; + + // Stats + std::unique_ptr<WriteBatchStats> _stats; +}; + +/** + * Holds information about the result of a single write operation. + */ +struct WriteOpStats { + WriteOpStats() : n(0), nModified(0) {} + + void reset() { + n = 0; + nModified = 0; + upsertedID = BSONObj(); + } + + // Num docs logically affected by this operation. + int n; + + // Num docs actually modified by this operation, if applicable (update) + int nModified; + + // _id of newly upserted document, if applicable (update) + BSONObj upsertedID; +}; + +/** + * Full stats accumulated by a write batch execution. Note that these stats do not directly + * correspond to the stats accumulated in opCounters and LastError. + */ +class WriteBatchStats { +public: + WriteBatchStats() + : numInserted(0), numUpserted(0), numMatched(0), numModified(0), numDeleted(0) {} + + int numInserted; + int numUpserted; + int numMatched; + int numModified; + int numDeleted; +}; + +} // namespace mongo diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp index 4bf374778fb..fe6bb9a2ff9 100644 --- a/src/mongo/db/commands/write_commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands/write_commands.cpp @@ -53,270 +53,265 @@ namespace mongo { - using std::string; - using std::stringstream; +using std::string; +using std::stringstream; - namespace { +namespace { - MONGO_INITIALIZER(RegisterWriteCommands)(InitializerContext* context) { - // Leaked intentionally: a Command registers itself when constructed. - new CmdInsert(); - new CmdUpdate(); - new CmdDelete(); - return Status::OK(); - } - - } // namespace +MONGO_INITIALIZER(RegisterWriteCommands)(InitializerContext* context) { + // Leaked intentionally: a Command registers itself when constructed. + new CmdInsert(); + new CmdUpdate(); + new CmdDelete(); + return Status::OK(); +} - WriteCmd::WriteCmd( StringData name, BatchedCommandRequest::BatchType writeType ) : - Command( name ), _writeType( writeType ) { - } +} // namespace - void WriteCmd::redactTooLongLog( mutablebson::Document* cmdObj, StringData fieldName ) { - namespace mmb = mutablebson; - mmb::Element root = cmdObj->root(); - mmb::Element field = root.findFirstChildNamed( fieldName ); +WriteCmd::WriteCmd(StringData name, BatchedCommandRequest::BatchType writeType) + : Command(name), _writeType(writeType) {} - // If the cmdObj is too large, it will be a "too big" message given by CachedBSONObj.get() - if ( !field.ok() ) { - return; - } +void WriteCmd::redactTooLongLog(mutablebson::Document* cmdObj, StringData fieldName) { + namespace mmb = mutablebson; + mmb::Element root = cmdObj->root(); + mmb::Element field = root.findFirstChildNamed(fieldName); - // Redact the log if there are more than one documents or operations. - if ( field.countChildren() > 1 ) { - field.setValueInt( field.countChildren() ); - } + // If the cmdObj is too large, it will be a "too big" message given by CachedBSONObj.get() + if (!field.ok()) { + return; } - // Slaves can't perform writes. - bool WriteCmd::slaveOk() const { return false; } + // Redact the log if there are more than one documents or operations. + if (field.countChildren() > 1) { + field.setValueInt(field.countChildren()); + } +} + +// Slaves can't perform writes. +bool WriteCmd::slaveOk() const { + return false; +} + +bool WriteCmd::isWriteCommandForConfigServer() const { + return false; +} + +Status WriteCmd::checkAuthForCommand(ClientBasic* client, + const std::string& dbname, + const BSONObj& cmdObj) { + Status status(auth::checkAuthForWriteCommand(AuthorizationSession::get(client), + _writeType, + NamespaceString(parseNs(dbname, cmdObj)), + cmdObj)); + + // TODO: Remove this when we standardize GLE reporting from commands + if (!status.isOK()) { + LastError::get(client).setLastError(status.code(), status.reason()); + } - bool WriteCmd::isWriteCommandForConfigServer() const { return false; } + return status; +} + +// Write commands are counted towards their corresponding opcounters, not command opcounters. +bool WriteCmd::shouldAffectCommandCounter() const { + return false; +} + +bool WriteCmd::run(OperationContext* txn, + const string& dbName, + BSONObj& cmdObj, + int options, + string& errMsg, + BSONObjBuilder& result) { + // Can't be run on secondaries. + dassert(txn->writesAreReplicated()); + BatchedCommandRequest request(_writeType); + BatchedCommandResponse response; + + if (!request.parseBSON(cmdObj, &errMsg) || !request.isValid(&errMsg)) { + return appendCommandStatus(result, Status(ErrorCodes::FailedToParse, errMsg)); + } - Status WriteCmd::checkAuthForCommand( ClientBasic* client, - const std::string& dbname, - const BSONObj& cmdObj ) { + // Note that this is a runCommmand, and therefore, the database and the collection name + // are in different parts of the grammar for the command. But it's more convenient to + // work with a NamespaceString. We built it here and replace it in the parsed command. + // Internally, everything work with the namespace string as opposed to just the + // collection name. + NamespaceString nss(dbName, request.getNS()); + request.setNSS(nss); - Status status( auth::checkAuthForWriteCommand( AuthorizationSession::get(client), - _writeType, - NamespaceString( parseNs( dbname, cmdObj ) ), - cmdObj )); + StatusWith<WriteConcernOptions> wcStatus = extractWriteConcern(cmdObj); - // TODO: Remove this when we standardize GLE reporting from commands - if ( !status.isOK() ) { - LastError::get(client).setLastError(status.code(), status.reason()); - } + if (!wcStatus.isOK()) { + return appendCommandStatus(result, wcStatus.getStatus()); + } + txn->setWriteConcern(wcStatus.getValue()); + + WriteBatchExecutor writeBatchExecutor( + txn, &globalOpCounters, &LastError::get(txn->getClient())); + + writeBatchExecutor.executeBatch(request, &response); + + result.appendElements(response.toBSON()); + return response.getOk(); +} + +Status WriteCmd::explain(OperationContext* txn, + const std::string& dbname, + const BSONObj& cmdObj, + ExplainCommon::Verbosity verbosity, + BSONObjBuilder* out) const { + // For now we only explain update and delete write commands. + if (BatchedCommandRequest::BatchType_Update != _writeType && + BatchedCommandRequest::BatchType_Delete != _writeType) { + return Status(ErrorCodes::IllegalOperation, + "Only update and delete write ops can be explained"); + } - return status; + // Parse the batch request. + BatchedCommandRequest request(_writeType); + std::string errMsg; + if (!request.parseBSON(cmdObj, &errMsg) || !request.isValid(&errMsg)) { + return Status(ErrorCodes::FailedToParse, errMsg); } - // Write commands are counted towards their corresponding opcounters, not command opcounters. - bool WriteCmd::shouldAffectCommandCounter() const { return false; } - - bool WriteCmd::run(OperationContext* txn, - const string& dbName, - BSONObj& cmdObj, - int options, - string& errMsg, - BSONObjBuilder& result) { - // Can't be run on secondaries. - dassert(txn->writesAreReplicated()); - BatchedCommandRequest request( _writeType ); - BatchedCommandResponse response; - - if ( !request.parseBSON( cmdObj, &errMsg ) || !request.isValid( &errMsg ) ) { - return appendCommandStatus( result, Status( ErrorCodes::FailedToParse, errMsg ) ); - } + // Note that this is a runCommmand, and therefore, the database and the collection name + // are in different parts of the grammar for the command. But it's more convenient to + // work with a NamespaceString. We built it here and replace it in the parsed command. + // Internally, everything work with the namespace string as opposed to just the + // collection name. + NamespaceString nsString(dbname, request.getNS()); + request.setNSS(nsString); + + // Do the validation of the batch that is shared with non-explained write batches. + Status isValid = WriteBatchExecutor::validateBatch(request); + if (!isValid.isOK()) { + return isValid; + } - // Note that this is a runCommmand, and therefore, the database and the collection name - // are in different parts of the grammar for the command. But it's more convenient to - // work with a NamespaceString. We built it here and replace it in the parsed command. - // Internally, everything work with the namespace string as opposed to just the - // collection name. - NamespaceString nss(dbName, request.getNS()); - request.setNSS(nss); + // Explain must do one additional piece of validation: For now we only explain + // singleton batches. + if (request.sizeWriteOps() != 1u) { + return Status(ErrorCodes::InvalidLength, "explained write batches must be of size 1"); + } - StatusWith<WriteConcernOptions> wcStatus = extractWriteConcern(cmdObj); + ScopedTransaction scopedXact(txn, MODE_IX); - if (!wcStatus.isOK()) { - return appendCommandStatus(result, wcStatus.getStatus()); - } - txn->setWriteConcern(wcStatus.getValue()); + // Get a reference to the singleton batch item (it's the 0th item in the batch). + BatchItemRef batchItem(&request, 0); - WriteBatchExecutor writeBatchExecutor(txn, - &globalOpCounters, - &LastError::get(txn->getClient())); + if (BatchedCommandRequest::BatchType_Update == _writeType) { + // Create the update request. + UpdateRequest updateRequest(nsString); + updateRequest.setQuery(batchItem.getUpdate()->getQuery()); + updateRequest.setUpdates(batchItem.getUpdate()->getUpdateExpr()); + updateRequest.setMulti(batchItem.getUpdate()->getMulti()); + updateRequest.setUpsert(batchItem.getUpdate()->getUpsert()); + UpdateLifecycleImpl updateLifecycle(true, updateRequest.getNamespaceString()); + updateRequest.setLifecycle(&updateLifecycle); + updateRequest.setExplain(); - writeBatchExecutor.executeBatch( request, &response ); + // Explained updates can yield. + updateRequest.setYieldPolicy(PlanExecutor::YIELD_AUTO); - result.appendElements( response.toBSON() ); - return response.getOk(); - } + OpDebug* debug = &CurOp::get(txn)->debug(); - Status WriteCmd::explain(OperationContext* txn, - const std::string& dbname, - const BSONObj& cmdObj, - ExplainCommon::Verbosity verbosity, - BSONObjBuilder* out) const { - // For now we only explain update and delete write commands. - if ( BatchedCommandRequest::BatchType_Update != _writeType && - BatchedCommandRequest::BatchType_Delete != _writeType ) { - return Status( ErrorCodes::IllegalOperation, - "Only update and delete write ops can be explained" ); + ParsedUpdate parsedUpdate(txn, &updateRequest); + Status parseStatus = parsedUpdate.parseRequest(); + if (!parseStatus.isOK()) { + return parseStatus; } - // Parse the batch request. - BatchedCommandRequest request( _writeType ); - std::string errMsg; - if ( !request.parseBSON( cmdObj, &errMsg ) || !request.isValid( &errMsg ) ) { - return Status( ErrorCodes::FailedToParse, errMsg ); - } + // Explains of write commands are read-only, but we take write locks so + // that timing info is more accurate. + AutoGetDb autoDb(txn, nsString.db(), MODE_IX); + Lock::CollectionLock colLock(txn->lockState(), nsString.ns(), MODE_IX); - // Note that this is a runCommmand, and therefore, the database and the collection name - // are in different parts of the grammar for the command. But it's more convenient to - // work with a NamespaceString. We built it here and replace it in the parsed command. - // Internally, everything work with the namespace string as opposed to just the - // collection name. - NamespaceString nsString(dbname, request.getNS()); - request.setNSS(nsString); - - // Do the validation of the batch that is shared with non-explained write batches. - Status isValid = WriteBatchExecutor::validateBatch( request ); - if (!isValid.isOK()) { - return isValid; - } + ensureShardVersionOKOrThrow(txn->getClient(), nsString.ns()); - // Explain must do one additional piece of validation: For now we only explain - // singleton batches. - if ( request.sizeWriteOps() != 1u ) { - return Status( ErrorCodes::InvalidLength, - "explained write batches must be of size 1" ); + // Get a pointer to the (possibly NULL) collection. + Collection* collection = NULL; + if (autoDb.getDb()) { + collection = autoDb.getDb()->getCollection(nsString.ns()); } - ScopedTransaction scopedXact(txn, MODE_IX); - - // Get a reference to the singleton batch item (it's the 0th item in the batch). - BatchItemRef batchItem( &request, 0 ); - - if ( BatchedCommandRequest::BatchType_Update == _writeType ) { - // Create the update request. - UpdateRequest updateRequest( nsString ); - updateRequest.setQuery( batchItem.getUpdate()->getQuery() ); - updateRequest.setUpdates( batchItem.getUpdate()->getUpdateExpr() ); - updateRequest.setMulti( batchItem.getUpdate()->getMulti() ); - updateRequest.setUpsert( batchItem.getUpdate()->getUpsert() ); - UpdateLifecycleImpl updateLifecycle( true, updateRequest.getNamespaceString() ); - updateRequest.setLifecycle( &updateLifecycle ); - updateRequest.setExplain(); - - // Explained updates can yield. - updateRequest.setYieldPolicy(PlanExecutor::YIELD_AUTO); - - OpDebug* debug = &CurOp::get(txn)->debug(); - - ParsedUpdate parsedUpdate( txn, &updateRequest ); - Status parseStatus = parsedUpdate.parseRequest(); - if ( !parseStatus.isOK() ) { - return parseStatus; - } - - // Explains of write commands are read-only, but we take write locks so - // that timing info is more accurate. - AutoGetDb autoDb( txn, nsString.db(), MODE_IX ); - Lock::CollectionLock colLock( txn->lockState(), nsString.ns(), MODE_IX ); - - ensureShardVersionOKOrThrow( txn->getClient(), nsString.ns() ); - - // Get a pointer to the (possibly NULL) collection. - Collection* collection = NULL; - if ( autoDb.getDb() ) { - collection = autoDb.getDb()->getCollection( nsString.ns() ); - } - - PlanExecutor* rawExec; - uassertStatusOK(getExecutorUpdate(txn, collection, &parsedUpdate, debug, &rawExec)); - std::unique_ptr<PlanExecutor> exec(rawExec); - - // Explain the plan tree. - Explain::explainStages( exec.get(), verbosity, out ); - return Status::OK(); + PlanExecutor* rawExec; + uassertStatusOK(getExecutorUpdate(txn, collection, &parsedUpdate, debug, &rawExec)); + std::unique_ptr<PlanExecutor> exec(rawExec); + + // Explain the plan tree. + Explain::explainStages(exec.get(), verbosity, out); + return Status::OK(); + } else { + invariant(BatchedCommandRequest::BatchType_Delete == _writeType); + + // Create the delete request. + DeleteRequest deleteRequest(nsString); + deleteRequest.setQuery(batchItem.getDelete()->getQuery()); + deleteRequest.setMulti(batchItem.getDelete()->getLimit() != 1); + deleteRequest.setGod(false); + deleteRequest.setExplain(); + + // Explained deletes can yield. + deleteRequest.setYieldPolicy(PlanExecutor::YIELD_AUTO); + + ParsedDelete parsedDelete(txn, &deleteRequest); + Status parseStatus = parsedDelete.parseRequest(); + if (!parseStatus.isOK()) { + return parseStatus; } - else { - invariant( BatchedCommandRequest::BatchType_Delete == _writeType ); - - // Create the delete request. - DeleteRequest deleteRequest( nsString ); - deleteRequest.setQuery( batchItem.getDelete()->getQuery() ); - deleteRequest.setMulti( batchItem.getDelete()->getLimit() != 1 ); - deleteRequest.setGod( false ); - deleteRequest.setExplain(); - - // Explained deletes can yield. - deleteRequest.setYieldPolicy(PlanExecutor::YIELD_AUTO); - - ParsedDelete parsedDelete(txn, &deleteRequest); - Status parseStatus = parsedDelete.parseRequest(); - if (!parseStatus.isOK()) { - return parseStatus; - } - - // Explains of write commands are read-only, but we take write locks so that timing - // info is more accurate. - AutoGetDb autoDb(txn, nsString.db(), MODE_IX); - Lock::CollectionLock colLock(txn->lockState(), nsString.ns(), MODE_IX); - - ensureShardVersionOKOrThrow( txn->getClient(), nsString.ns() ); - - // Get a pointer to the (possibly NULL) collection. - Collection* collection = NULL; - if (autoDb.getDb()) { - collection = autoDb.getDb()->getCollection(nsString.ns()); - } - - PlanExecutor* rawExec; - uassertStatusOK(getExecutorDelete(txn, collection, &parsedDelete, &rawExec)); - std::unique_ptr<PlanExecutor> exec(rawExec); - - // Explain the plan tree. - Explain::explainStages(exec.get(), verbosity, out); - return Status::OK(); + + // Explains of write commands are read-only, but we take write locks so that timing + // info is more accurate. + AutoGetDb autoDb(txn, nsString.db(), MODE_IX); + Lock::CollectionLock colLock(txn->lockState(), nsString.ns(), MODE_IX); + + ensureShardVersionOKOrThrow(txn->getClient(), nsString.ns()); + + // Get a pointer to the (possibly NULL) collection. + Collection* collection = NULL; + if (autoDb.getDb()) { + collection = autoDb.getDb()->getCollection(nsString.ns()); } - } - CmdInsert::CmdInsert() : - WriteCmd( "insert", BatchedCommandRequest::BatchType_Insert ) { - } + PlanExecutor* rawExec; + uassertStatusOK(getExecutorDelete(txn, collection, &parsedDelete, &rawExec)); + std::unique_ptr<PlanExecutor> exec(rawExec); - void CmdInsert::redactForLogging( mutablebson::Document* cmdObj ) { - redactTooLongLog( cmdObj, StringData( "documents", StringData::LiteralTag() ) ); + // Explain the plan tree. + Explain::explainStages(exec.get(), verbosity, out); + return Status::OK(); } +} - void CmdInsert::help( stringstream& help ) const { - help << "insert documents"; - } +CmdInsert::CmdInsert() : WriteCmd("insert", BatchedCommandRequest::BatchType_Insert) {} - CmdUpdate::CmdUpdate() : - WriteCmd( "update", BatchedCommandRequest::BatchType_Update ) { - } +void CmdInsert::redactForLogging(mutablebson::Document* cmdObj) { + redactTooLongLog(cmdObj, StringData("documents", StringData::LiteralTag())); +} - void CmdUpdate::redactForLogging( mutablebson::Document* cmdObj ) { - redactTooLongLog( cmdObj, StringData( "updates", StringData::LiteralTag() ) ); - } +void CmdInsert::help(stringstream& help) const { + help << "insert documents"; +} - void CmdUpdate::help( stringstream& help ) const { - help << "update documents"; - } +CmdUpdate::CmdUpdate() : WriteCmd("update", BatchedCommandRequest::BatchType_Update) {} - CmdDelete::CmdDelete() : - WriteCmd( "delete", BatchedCommandRequest::BatchType_Delete ) { - } +void CmdUpdate::redactForLogging(mutablebson::Document* cmdObj) { + redactTooLongLog(cmdObj, StringData("updates", StringData::LiteralTag())); +} - void CmdDelete::redactForLogging( mutablebson::Document* cmdObj ) { - redactTooLongLog( cmdObj, StringData( "deletes", StringData::LiteralTag() ) ); - } +void CmdUpdate::help(stringstream& help) const { + help << "update documents"; +} - void CmdDelete::help( stringstream& help ) const { - help << "delete documents"; - } +CmdDelete::CmdDelete() : WriteCmd("delete", BatchedCommandRequest::BatchType_Delete) {} + +void CmdDelete::redactForLogging(mutablebson::Document* cmdObj) { + redactTooLongLog(cmdObj, StringData("deletes", StringData::LiteralTag())); +} + +void CmdDelete::help(stringstream& help) const { + help << "delete documents"; +} -} // namespace mongo +} // namespace mongo diff --git a/src/mongo/db/commands/write_commands/write_commands.h b/src/mongo/db/commands/write_commands/write_commands.h index cbb2db6cac6..fcdda1b56fd 100644 --- a/src/mongo/db/commands/write_commands/write_commands.h +++ b/src/mongo/db/commands/write_commands/write_commands.h @@ -36,89 +36,91 @@ namespace mongo { +/** + * Base class for write commands. Write commands support batch writes and write concern, + * and return per-item error information. All write commands use the (non-virtual) entry + * point WriteCmd::run(). + * + * Command parsing is performed by the WriteBatch class (command syntax documented there), + * and command execution is performed by the WriteBatchExecutor class. + */ +class WriteCmd : public Command { + MONGO_DISALLOW_COPYING(WriteCmd); + +public: + virtual ~WriteCmd() {} + +protected: /** - * Base class for write commands. Write commands support batch writes and write concern, - * and return per-item error information. All write commands use the (non-virtual) entry - * point WriteCmd::run(). - * - * Command parsing is performed by the WriteBatch class (command syntax documented there), - * and command execution is performed by the WriteBatchExecutor class. + * Instantiates a command that can be invoked by "name", which will be capable of issuing + * write batches of type "writeType", and will require privilege "action" to run. */ - class WriteCmd : public Command { - MONGO_DISALLOW_COPYING(WriteCmd); - public: - virtual ~WriteCmd() {} - - protected: - - /** - * Instantiates a command that can be invoked by "name", which will be capable of issuing - * write batches of type "writeType", and will require privilege "action" to run. - */ - WriteCmd( StringData name, BatchedCommandRequest::BatchType writeType ); - - // Full log of write command can be quite large. - static void redactTooLongLog( mutablebson::Document* cmdObj, StringData fieldName ); - - private: - virtual bool slaveOk() const; - - virtual bool isWriteCommandForConfigServer() const; - - virtual Status checkAuthForCommand( ClientBasic* client, - const std::string& dbname, - const BSONObj& cmdObj ); - - virtual bool shouldAffectCommandCounter() const; - - // Write command entry point. - virtual bool run( - OperationContext* txn, - const std::string& dbname, - BSONObj& cmdObj, - int options, - std::string& errmsg, - BSONObjBuilder& result); - - // Write commands can be explained. - virtual Status explain(OperationContext* txn, - const std::string& dbname, - const BSONObj& cmdObj, - ExplainCommon::Verbosity verbosity, - BSONObjBuilder* out) const; - - // Type of batch (e.g. insert). - BatchedCommandRequest::BatchType _writeType; - }; - - class CmdInsert : public WriteCmd { - MONGO_DISALLOW_COPYING(CmdInsert); - public: - CmdInsert(); - void redactForLogging(mutablebson::Document* cmdObj); - - private: - virtual void help(std::stringstream& help) const; - }; - - class CmdUpdate : public WriteCmd { - MONGO_DISALLOW_COPYING(CmdUpdate); - public: - CmdUpdate(); - void redactForLogging(mutablebson::Document* cmdObj); - - private: - virtual void help(std::stringstream& help) const; - }; - - class CmdDelete : public WriteCmd { - MONGO_DISALLOW_COPYING(CmdDelete); - public: - CmdDelete(); - void redactForLogging(mutablebson::Document* cmdObj); - - private: - virtual void help(std::stringstream& help) const; - }; - -} // namespace mongo + WriteCmd(StringData name, BatchedCommandRequest::BatchType writeType); + + // Full log of write command can be quite large. + static void redactTooLongLog(mutablebson::Document* cmdObj, StringData fieldName); + +private: + virtual bool slaveOk() const; + + virtual bool isWriteCommandForConfigServer() const; + + virtual Status checkAuthForCommand(ClientBasic* client, + const std::string& dbname, + const BSONObj& cmdObj); + + virtual bool shouldAffectCommandCounter() const; + + // Write command entry point. + virtual bool run(OperationContext* txn, + const std::string& dbname, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& result); + + // Write commands can be explained. + virtual Status explain(OperationContext* txn, + const std::string& dbname, + const BSONObj& cmdObj, + ExplainCommon::Verbosity verbosity, + BSONObjBuilder* out) const; + + // Type of batch (e.g. insert). + BatchedCommandRequest::BatchType _writeType; +}; + +class CmdInsert : public WriteCmd { + MONGO_DISALLOW_COPYING(CmdInsert); + +public: + CmdInsert(); + void redactForLogging(mutablebson::Document* cmdObj); + +private: + virtual void help(std::stringstream& help) const; +}; + +class CmdUpdate : public WriteCmd { + MONGO_DISALLOW_COPYING(CmdUpdate); + +public: + CmdUpdate(); + void redactForLogging(mutablebson::Document* cmdObj); + +private: + virtual void help(std::stringstream& help) const; +}; + +class CmdDelete : public WriteCmd { + MONGO_DISALLOW_COPYING(CmdDelete); + +public: + CmdDelete(); + void redactForLogging(mutablebson::Document* cmdObj); + +private: + virtual void help(std::stringstream& help) const; +}; + +} // namespace mongo diff --git a/src/mongo/db/commands/write_commands/write_commands_common.cpp b/src/mongo/db/commands/write_commands/write_commands_common.cpp index 69ca1014140..82f3ab4db67 100644 --- a/src/mongo/db/commands/write_commands/write_commands_common.cpp +++ b/src/mongo/db/commands/write_commands/write_commands_common.cpp @@ -42,62 +42,55 @@ namespace mongo { namespace auth { - using std::string; - using std::vector; - - Status checkAuthForWriteCommand( AuthorizationSession* authzSession, - BatchedCommandRequest::BatchType cmdType, - const NamespaceString& cmdNSS, - const BSONObj& cmdObj ) { - - vector<Privilege> privileges; - ActionSet actionsOnCommandNSS; - - if (shouldBypassDocumentValidationForCommand(cmdObj)) { - actionsOnCommandNSS.addAction(ActionType::bypassDocumentValidation); - } - - if ( cmdType == BatchedCommandRequest::BatchType_Insert ) { +using std::string; +using std::vector; + +Status checkAuthForWriteCommand(AuthorizationSession* authzSession, + BatchedCommandRequest::BatchType cmdType, + const NamespaceString& cmdNSS, + const BSONObj& cmdObj) { + vector<Privilege> privileges; + ActionSet actionsOnCommandNSS; + + if (shouldBypassDocumentValidationForCommand(cmdObj)) { + actionsOnCommandNSS.addAction(ActionType::bypassDocumentValidation); + } - if ( !cmdNSS.isSystemDotIndexes() ) { - actionsOnCommandNSS.addAction(ActionType::insert); + if (cmdType == BatchedCommandRequest::BatchType_Insert) { + if (!cmdNSS.isSystemDotIndexes()) { + actionsOnCommandNSS.addAction(ActionType::insert); + } else { + // Special-case indexes until we have a command + string nsToIndex, errMsg; + if (!BatchedCommandRequest::getIndexedNS(cmdObj, &nsToIndex, &errMsg)) { + return Status(ErrorCodes::FailedToParse, errMsg); } - else { - // Special-case indexes until we have a command - string nsToIndex, errMsg; - if ( !BatchedCommandRequest::getIndexedNS( cmdObj, &nsToIndex, &errMsg ) ) { - return Status( ErrorCodes::FailedToParse, errMsg ); - } - NamespaceString nssToIndex( nsToIndex ); - privileges.push_back( Privilege( ResourcePattern::forExactNamespace( nssToIndex ), - ActionType::createIndex ) ); - } + NamespaceString nssToIndex(nsToIndex); + privileges.push_back( + Privilege(ResourcePattern::forExactNamespace(nssToIndex), ActionType::createIndex)); } - else if ( cmdType == BatchedCommandRequest::BatchType_Update ) { - actionsOnCommandNSS.addAction(ActionType::update); + } else if (cmdType == BatchedCommandRequest::BatchType_Update) { + actionsOnCommandNSS.addAction(ActionType::update); - // Upsert also requires insert privs - if ( BatchedCommandRequest::containsUpserts( cmdObj ) ) { - actionsOnCommandNSS.addAction(ActionType::insert); - } - } - else { - fassert( 17251, cmdType == BatchedCommandRequest::BatchType_Delete ); - actionsOnCommandNSS.addAction(ActionType::remove); - } - - - if (!actionsOnCommandNSS.empty()) { - privileges.emplace_back(ResourcePattern::forExactNamespace(cmdNSS), - actionsOnCommandNSS); + // Upsert also requires insert privs + if (BatchedCommandRequest::containsUpserts(cmdObj)) { + actionsOnCommandNSS.addAction(ActionType::insert); } + } else { + fassert(17251, cmdType == BatchedCommandRequest::BatchType_Delete); + actionsOnCommandNSS.addAction(ActionType::remove); + } - if ( authzSession->isAuthorizedForPrivileges( privileges ) ) - return Status::OK(); - return Status( ErrorCodes::Unauthorized, "unauthorized" ); + if (!actionsOnCommandNSS.empty()) { + privileges.emplace_back(ResourcePattern::forExactNamespace(cmdNSS), actionsOnCommandNSS); } + if (authzSession->isAuthorizedForPrivileges(privileges)) + return Status::OK(); + + return Status(ErrorCodes::Unauthorized, "unauthorized"); +} } } diff --git a/src/mongo/db/commands/write_commands/write_commands_common.h b/src/mongo/db/commands/write_commands/write_commands_common.h index a1fe6bc9772..cf47bdc02b1 100644 --- a/src/mongo/db/commands/write_commands/write_commands_common.h +++ b/src/mongo/db/commands/write_commands/write_commands_common.h @@ -40,10 +40,9 @@ namespace mongo { namespace auth { - Status checkAuthForWriteCommand( AuthorizationSession* authzSession, - BatchedCommandRequest::BatchType cmdType, - const NamespaceString& cmdNSS, - const BSONObj& cmdObj ); - +Status checkAuthForWriteCommand(AuthorizationSession* authzSession, + BatchedCommandRequest::BatchType cmdType, + const NamespaceString& cmdNSS, + const BSONObj& cmdObj); } } |