summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/write_commands
diff options
context:
space:
mode:
authorMark Benvenuto <mark.benvenuto@mongodb.com>2015-06-20 00:22:50 -0400
committerMark Benvenuto <mark.benvenuto@mongodb.com>2015-06-20 10:56:02 -0400
commit9c2ed42daa8fbbef4a919c21ec564e2db55e8d60 (patch)
tree3814f79c10d7b490948d8cb7b112ac1dd41ceff1 /src/mongo/db/commands/write_commands
parent01965cf52bce6976637ecb8f4a622aeb05ab256a (diff)
downloadmongo-9c2ed42daa8fbbef4a919c21ec564e2db55e8d60.tar.gz
SERVER-18579: Clang-Format - reformat code, no comment reflow
Diffstat (limited to 'src/mongo/db/commands/write_commands')
-rw-r--r--src/mongo/db/commands/write_commands/batch_executor.cpp2197
-rw-r--r--src/mongo/db/commands/write_commands/batch_executor.h266
-rw-r--r--src/mongo/db/commands/write_commands/write_commands.cpp447
-rw-r--r--src/mongo/db/commands/write_commands/write_commands.h170
-rw-r--r--src/mongo/db/commands/write_commands/write_commands_common.cpp87
-rw-r--r--src/mongo/db/commands/write_commands/write_commands_common.h9
6 files changed, 1532 insertions, 1644 deletions
diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp
index 0b277ddfa56..2087b5bd2f4 100644
--- a/src/mongo/db/commands/write_commands/batch_executor.cpp
+++ b/src/mongo/db/commands/write_commands/batch_executor.cpp
@@ -83,303 +83,290 @@
namespace mongo {
- using std::endl;
- using std::string;
- using std::unique_ptr;
- using std::vector;
-
- namespace {
-
- /**
- * Data structure to safely hold and clean up results of single write operations.
- */
- class WriteOpResult {
- MONGO_DISALLOW_COPYING(WriteOpResult);
- public:
- WriteOpResult() {}
-
- WriteOpStats& getStats() { return _stats; }
-
- WriteErrorDetail* getError() { return _error.get(); }
- WriteErrorDetail* releaseError() { return _error.release(); }
- void setError(WriteErrorDetail* error) { _error.reset(error); }
-
- private:
- WriteOpStats _stats;
- std::unique_ptr<WriteErrorDetail> _error;
- };
-
- } // namespace
-
- // TODO: Determine queueing behavior we want here
- MONGO_EXPORT_SERVER_PARAMETER( queueForMigrationCommit, bool, true );
-
- using mongoutils::str::stream;
-
- WriteBatchExecutor::WriteBatchExecutor( OperationContext* txn,
- OpCounters* opCounters,
- LastError* le ) :
- _txn(txn),
- _opCounters( opCounters ),
- _le( le ),
- _stats( new WriteBatchStats ) {
- }
+using std::endl;
+using std::string;
+using std::unique_ptr;
+using std::vector;
- static WCErrorDetail* toWriteConcernError( const Status& wcStatus,
- const WriteConcernResult& wcResult ) {
+namespace {
- WCErrorDetail* wcError = new WCErrorDetail;
+/**
+ * Data structure to safely hold and clean up results of single write operations.
+ */
+class WriteOpResult {
+ MONGO_DISALLOW_COPYING(WriteOpResult);
- wcError->setErrCode( wcStatus.code() );
- wcError->setErrMessage( wcStatus.reason() );
- if ( wcResult.wTimedOut )
- wcError->setErrInfo( BSON( "wtimeout" << true ) );
+public:
+ WriteOpResult() {}
- return wcError;
+ WriteOpStats& getStats() {
+ return _stats;
}
- static WriteErrorDetail* toWriteError( const Status& status ) {
-
- WriteErrorDetail* error = new WriteErrorDetail;
-
- // TODO: Complex transform here?
- error->setErrCode( status.code() );
- error->setErrMessage( status.reason() );
-
- return error;
+ WriteErrorDetail* getError() {
+ return _error.get();
}
-
- static void toBatchError( const Status& status, BatchedCommandResponse* response ) {
- response->clear();
- response->setErrCode( status.code() );
- response->setErrMessage( status.reason() );
- response->setOk( false );
- dassert( response->isValid(NULL) );
+ WriteErrorDetail* releaseError() {
+ return _error.release();
}
-
- static void noteInCriticalSection( WriteErrorDetail* staleError ) {
- BSONObjBuilder builder;
- if ( staleError->isErrInfoSet() )
- builder.appendElements( staleError->getErrInfo() );
- builder.append( "inCriticalSection", true );
- staleError->setErrInfo( builder.obj() );
+ void setError(WriteErrorDetail* error) {
+ _error.reset(error);
}
- // static
- Status WriteBatchExecutor::validateBatch( const BatchedCommandRequest& request ) {
-
- // Validate namespace
- const NamespaceString& nss = request.getNSS();
- if ( !nss.isValid() ) {
- return Status( ErrorCodes::InvalidNamespace,
- nss.ns() + " is not a valid namespace" );
- }
-
- // Make sure we can write to the namespace
- Status allowedStatus = userAllowedWriteNS( nss );
- if ( !allowedStatus.isOK() ) {
- return allowedStatus;
- }
-
- // Validate insert index requests
- // TODO: Push insert index requests through createIndex once all upgrade paths support it
- string errMsg;
- if ( request.isInsertIndexRequest() && !request.isValidIndexRequest( &errMsg ) ) {
- return Status( ErrorCodes::InvalidOptions, errMsg );
- }
-
- return Status::OK();
+private:
+ WriteOpStats _stats;
+ std::unique_ptr<WriteErrorDetail> _error;
+};
+
+} // namespace
+
+// TODO: Determine queueing behavior we want here
+MONGO_EXPORT_SERVER_PARAMETER(queueForMigrationCommit, bool, true);
+
+using mongoutils::str::stream;
+
+WriteBatchExecutor::WriteBatchExecutor(OperationContext* txn, OpCounters* opCounters, LastError* le)
+ : _txn(txn), _opCounters(opCounters), _le(le), _stats(new WriteBatchStats) {}
+
+static WCErrorDetail* toWriteConcernError(const Status& wcStatus,
+ const WriteConcernResult& wcResult) {
+ WCErrorDetail* wcError = new WCErrorDetail;
+
+ wcError->setErrCode(wcStatus.code());
+ wcError->setErrMessage(wcStatus.reason());
+ if (wcResult.wTimedOut)
+ wcError->setErrInfo(BSON("wtimeout" << true));
+
+ return wcError;
+}
+
+static WriteErrorDetail* toWriteError(const Status& status) {
+ WriteErrorDetail* error = new WriteErrorDetail;
+
+ // TODO: Complex transform here?
+ error->setErrCode(status.code());
+ error->setErrMessage(status.reason());
+
+ return error;
+}
+
+static void toBatchError(const Status& status, BatchedCommandResponse* response) {
+ response->clear();
+ response->setErrCode(status.code());
+ response->setErrMessage(status.reason());
+ response->setOk(false);
+ dassert(response->isValid(NULL));
+}
+
+static void noteInCriticalSection(WriteErrorDetail* staleError) {
+ BSONObjBuilder builder;
+ if (staleError->isErrInfoSet())
+ builder.appendElements(staleError->getErrInfo());
+ builder.append("inCriticalSection", true);
+ staleError->setErrInfo(builder.obj());
+}
+
+// static
+Status WriteBatchExecutor::validateBatch(const BatchedCommandRequest& request) {
+ // Validate namespace
+ const NamespaceString& nss = request.getNSS();
+ if (!nss.isValid()) {
+ return Status(ErrorCodes::InvalidNamespace, nss.ns() + " is not a valid namespace");
}
- void WriteBatchExecutor::executeBatch( const BatchedCommandRequest& request,
- BatchedCommandResponse* response ) {
+ // Make sure we can write to the namespace
+ Status allowedStatus = userAllowedWriteNS(nss);
+ if (!allowedStatus.isOK()) {
+ return allowedStatus;
+ }
- // Validate namespace
- Status isValid = validateBatch(request);
- if (!isValid.isOK()) {
- toBatchError( isValid, response );
- return;
- }
+ // Validate insert index requests
+ // TODO: Push insert index requests through createIndex once all upgrade paths support it
+ string errMsg;
+ if (request.isInsertIndexRequest() && !request.isValidIndexRequest(&errMsg)) {
+ return Status(ErrorCodes::InvalidOptions, errMsg);
+ }
- if ( request.sizeWriteOps() == 0u ) {
- toBatchError( Status( ErrorCodes::InvalidLength,
- "no write ops were included in the batch" ),
- response );
- return;
- }
+ return Status::OK();
+}
- // Validate batch size
- if ( request.sizeWriteOps() > BatchedCommandRequest::kMaxWriteBatchSize ) {
- toBatchError( Status( ErrorCodes::InvalidLength,
- stream() << "exceeded maximum write batch size of "
- << BatchedCommandRequest::kMaxWriteBatchSize ),
- response );
- return;
- }
+void WriteBatchExecutor::executeBatch(const BatchedCommandRequest& request,
+ BatchedCommandResponse* response) {
+ // Validate namespace
+ Status isValid = validateBatch(request);
+ if (!isValid.isOK()) {
+ toBatchError(isValid, response);
+ return;
+ }
- //
- // End validation
- //
+ if (request.sizeWriteOps() == 0u) {
+ toBatchError(Status(ErrorCodes::InvalidLength, "no write ops were included in the batch"),
+ response);
+ return;
+ }
- const WriteConcernOptions& writeConcern = _txn->getWriteConcern();
- bool silentWC = writeConcern.wMode.empty() && writeConcern.wNumNodes == 0
- && writeConcern.syncMode == WriteConcernOptions::NONE;
+ // Validate batch size
+ if (request.sizeWriteOps() > BatchedCommandRequest::kMaxWriteBatchSize) {
+ toBatchError(Status(ErrorCodes::InvalidLength,
+ stream() << "exceeded maximum write batch size of "
+ << BatchedCommandRequest::kMaxWriteBatchSize),
+ response);
+ return;
+ }
- Timer commandTimer;
+ //
+ // End validation
+ //
- OwnedPointerVector<WriteErrorDetail> writeErrorsOwned;
- vector<WriteErrorDetail*>& writeErrors = writeErrorsOwned.mutableVector();
+ const WriteConcernOptions& writeConcern = _txn->getWriteConcern();
+ bool silentWC = writeConcern.wMode.empty() && writeConcern.wNumNodes == 0 &&
+ writeConcern.syncMode == WriteConcernOptions::NONE;
- OwnedPointerVector<BatchedUpsertDetail> upsertedOwned;
- vector<BatchedUpsertDetail*>& upserted = upsertedOwned.mutableVector();
+ Timer commandTimer;
- //
- // Apply each batch item, possibly bulking some items together in the write lock.
- // Stops on error if batch is ordered.
- //
+ OwnedPointerVector<WriteErrorDetail> writeErrorsOwned;
+ vector<WriteErrorDetail*>& writeErrors = writeErrorsOwned.mutableVector();
- bulkExecute( request, &upserted, &writeErrors );
+ OwnedPointerVector<BatchedUpsertDetail> upsertedOwned;
+ vector<BatchedUpsertDetail*>& upserted = upsertedOwned.mutableVector();
- //
- // Try to enforce the write concern if everything succeeded (unordered or ordered)
- // OR if something succeeded and we're unordered.
- //
+ //
+ // Apply each batch item, possibly bulking some items together in the write lock.
+ // Stops on error if batch is ordered.
+ //
- unique_ptr<WCErrorDetail> wcError;
- bool needToEnforceWC = writeErrors.empty()
- || ( !request.getOrdered()
- && writeErrors.size() < request.sizeWriteOps() );
+ bulkExecute(request, &upserted, &writeErrors);
- if ( needToEnforceWC ) {
- {
- stdx::lock_guard<Client> lk(*_txn->getClient());
- CurOp::get(_txn)->setMessage_inlock( "waiting for write concern" );
- }
+ //
+ // Try to enforce the write concern if everything succeeded (unordered or ordered)
+ // OR if something succeeded and we're unordered.
+ //
- WriteConcernResult res;
- Status status = waitForWriteConcern(
- _txn,
- repl::ReplClientInfo::forClient(_txn->getClient()).getLastOp(),
- &res);
+ unique_ptr<WCErrorDetail> wcError;
+ bool needToEnforceWC = writeErrors.empty() ||
+ (!request.getOrdered() && writeErrors.size() < request.sizeWriteOps());
- if ( !status.isOK() ) {
- wcError.reset( toWriteConcernError( status, res ) );
- }
+ if (needToEnforceWC) {
+ {
+ stdx::lock_guard<Client> lk(*_txn->getClient());
+ CurOp::get(_txn)->setMessage_inlock("waiting for write concern");
}
- //
- // Refresh metadata if needed
- //
+ WriteConcernResult res;
+ Status status = waitForWriteConcern(
+ _txn, repl::ReplClientInfo::forClient(_txn->getClient()).getLastOp(), &res);
- bool staleBatch = !writeErrors.empty()
- && writeErrors.back()->getErrCode() == ErrorCodes::StaleShardVersion;
-
- if ( staleBatch ) {
+ if (!status.isOK()) {
+ wcError.reset(toWriteConcernError(status, res));
+ }
+ }
- const BatchedRequestMetadata* requestMetadata = request.getMetadata();
- dassert( requestMetadata );
+ //
+ // Refresh metadata if needed
+ //
- // Make sure our shard name is set or is the same as what was set previously
- if ( shardingState.setShardName( requestMetadata->getShardName() ) ) {
+ bool staleBatch =
+ !writeErrors.empty() && writeErrors.back()->getErrCode() == ErrorCodes::StaleShardVersion;
+
+ if (staleBatch) {
+ const BatchedRequestMetadata* requestMetadata = request.getMetadata();
+ dassert(requestMetadata);
+
+ // Make sure our shard name is set or is the same as what was set previously
+ if (shardingState.setShardName(requestMetadata->getShardName())) {
+ //
+ // First, we refresh metadata if we need to based on the requested version.
+ //
+
+ ChunkVersion latestShardVersion;
+ shardingState.refreshMetadataIfNeeded(_txn,
+ request.getTargetingNS(),
+ requestMetadata->getShardVersion(),
+ &latestShardVersion);
+
+ // Report if we're still changing our metadata
+ // TODO: Better reporting per-collection
+ if (shardingState.inCriticalMigrateSection()) {
+ noteInCriticalSection(writeErrors.back());
+ }
+ if (queueForMigrationCommit) {
//
- // First, we refresh metadata if we need to based on the requested version.
+ // Queue up for migration to end - this allows us to be sure that clients will
+ // not repeatedly try to refresh metadata that is not yet written to the config
+ // server. Not necessary for correctness.
+ // Exposed as optional parameter to allow testing of queuing behavior with
+ // different network timings.
//
- ChunkVersion latestShardVersion;
- shardingState.refreshMetadataIfNeeded( _txn,
- request.getTargetingNS(),
- requestMetadata->getShardVersion(),
- &latestShardVersion );
-
- // Report if we're still changing our metadata
- // TODO: Better reporting per-collection
- if ( shardingState.inCriticalMigrateSection() ) {
- noteInCriticalSection( writeErrors.back() );
- }
-
- if ( queueForMigrationCommit ) {
-
- //
- // Queue up for migration to end - this allows us to be sure that clients will
- // not repeatedly try to refresh metadata that is not yet written to the config
- // server. Not necessary for correctness.
- // Exposed as optional parameter to allow testing of queuing behavior with
- // different network timings.
- //
-
- const ChunkVersion& requestShardVersion = requestMetadata->getShardVersion();
+ const ChunkVersion& requestShardVersion = requestMetadata->getShardVersion();
- //
- // Only wait if we're an older version (in the current collection epoch) and
- // we're not write compatible, implying that the current migration is affecting
- // writes.
- //
-
- if ( requestShardVersion.isOlderThan( latestShardVersion ) &&
- !requestShardVersion.isWriteCompatibleWith( latestShardVersion ) ) {
-
- while ( shardingState.inCriticalMigrateSection() ) {
+ //
+ // Only wait if we're an older version (in the current collection epoch) and
+ // we're not write compatible, implying that the current migration is affecting
+ // writes.
+ //
- log() << "write request to old shard version "
- << requestMetadata->getShardVersion().toString()
- << " waiting for migration commit" << endl;
+ if (requestShardVersion.isOlderThan(latestShardVersion) &&
+ !requestShardVersion.isWriteCompatibleWith(latestShardVersion)) {
+ while (shardingState.inCriticalMigrateSection()) {
+ log() << "write request to old shard version "
+ << requestMetadata->getShardVersion().toString()
+ << " waiting for migration commit" << endl;
- shardingState.waitTillNotInCriticalSection( 10 /* secs */);
- }
+ shardingState.waitTillNotInCriticalSection(10 /* secs */);
}
}
}
- else {
- // If our shard name is stale, our version must have been stale as well
- dassert( writeErrors.size() == request.sizeWriteOps() );
- }
+ } else {
+ // If our shard name is stale, our version must have been stale as well
+ dassert(writeErrors.size() == request.sizeWriteOps());
}
+ }
- //
- // Construct response
- //
-
- response->setOk( true );
+ //
+ // Construct response
+ //
- if ( !silentWC ) {
+ response->setOk(true);
- if ( upserted.size() ) {
- response->setUpsertDetails( upserted );
- }
+ if (!silentWC) {
+ if (upserted.size()) {
+ response->setUpsertDetails(upserted);
+ }
- if ( writeErrors.size() ) {
- response->setErrDetails( writeErrors );
- }
+ if (writeErrors.size()) {
+ response->setErrDetails(writeErrors);
+ }
- if ( wcError.get() ) {
- response->setWriteConcernError( wcError.release() );
- }
+ if (wcError.get()) {
+ response->setWriteConcernError(wcError.release());
+ }
- repl::ReplicationCoordinator* replCoord = repl::getGlobalReplicationCoordinator();
- const repl::ReplicationCoordinator::Mode replMode = replCoord->getReplicationMode();
- if (replMode != repl::ReplicationCoordinator::modeNone) {
- response->setLastOp(repl::ReplClientInfo::forClient(_txn->getClient()).getLastOp()
- .getTimestamp());
- if (replMode == repl::ReplicationCoordinator::modeReplSet) {
- response->setElectionId(replCoord->getElectionId());
- }
+ repl::ReplicationCoordinator* replCoord = repl::getGlobalReplicationCoordinator();
+ const repl::ReplicationCoordinator::Mode replMode = replCoord->getReplicationMode();
+ if (replMode != repl::ReplicationCoordinator::modeNone) {
+ response->setLastOp(
+ repl::ReplClientInfo::forClient(_txn->getClient()).getLastOp().getTimestamp());
+ if (replMode == repl::ReplicationCoordinator::modeReplSet) {
+ response->setElectionId(replCoord->getElectionId());
}
-
- // Set the stats for the response
- response->setN( _stats->numInserted + _stats->numUpserted + _stats->numMatched
- + _stats->numDeleted );
- if ( request.getBatchType() == BatchedCommandRequest::BatchType_Update )
- response->setNModified( _stats->numModified );
}
- dassert( response->isValid( NULL ) );
+ // Set the stats for the response
+ response->setN(_stats->numInserted + _stats->numUpserted + _stats->numMatched +
+ _stats->numDeleted);
+ if (request.getBatchType() == BatchedCommandRequest::BatchType_Update)
+ response->setNModified(_stats->numModified);
}
- // Translates write item type to wire protocol op code.
- // Helper for WriteBatchExecutor::applyWriteItem().
- static int getOpCode(const BatchItemRef& currWrite) {
- switch (currWrite.getRequest()->getBatchType()) {
+ dassert(response->isValid(NULL));
+}
+
+// Translates write item type to wire protocol op code.
+// Helper for WriteBatchExecutor::applyWriteItem().
+static int getOpCode(const BatchItemRef& currWrite) {
+ switch (currWrite.getRequest()->getBatchType()) {
case BatchedCommandRequest::BatchType_Insert:
return dbInsert;
case BatchedCommandRequest::BatchType_Update:
@@ -388,1068 +375,990 @@ namespace mongo {
return dbDelete;
default:
MONGO_UNREACHABLE;
+ }
+}
+
+static void buildStaleError(const ChunkVersion& shardVersionRecvd,
+ const ChunkVersion& shardVersionWanted,
+ WriteErrorDetail* error) {
+ // Write stale error to results
+ error->setErrCode(ErrorCodes::StaleShardVersion);
+
+ BSONObjBuilder infoB;
+ shardVersionWanted.addToBSON(infoB, "vWanted");
+ error->setErrInfo(infoB.obj());
+
+ string errMsg = stream() << "stale shard version detected before write, received "
+ << shardVersionRecvd.toString() << " but local version is "
+ << shardVersionWanted.toString();
+ error->setErrMessage(errMsg);
+}
+
+static bool checkShardVersion(OperationContext* txn,
+ ShardingState* shardingState,
+ const BatchedCommandRequest& request,
+ WriteOpResult* result) {
+ const NamespaceString& nss = request.getTargetingNSS();
+ dassert(txn->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IX));
+
+ ChunkVersion requestShardVersion =
+ request.isMetadataSet() && request.getMetadata()->isShardVersionSet()
+ ? request.getMetadata()->getShardVersion()
+ : ChunkVersion::IGNORED();
+
+ if (shardingState->enabled()) {
+ CollectionMetadataPtr metadata = shardingState->getCollectionMetadata(nss.ns());
+
+ if (!ChunkVersion::isIgnoredVersion(requestShardVersion)) {
+ ChunkVersion shardVersion =
+ metadata ? metadata->getShardVersion() : ChunkVersion::UNSHARDED();
+
+ if (!requestShardVersion.isWriteCompatibleWith(shardVersion)) {
+ result->setError(new WriteErrorDetail);
+ buildStaleError(requestShardVersion, shardVersion, result->getError());
+ return false;
+ }
}
}
- static void buildStaleError( const ChunkVersion& shardVersionRecvd,
- const ChunkVersion& shardVersionWanted,
- WriteErrorDetail* error ) {
-
- // Write stale error to results
- error->setErrCode( ErrorCodes::StaleShardVersion );
+ return true;
+}
- BSONObjBuilder infoB;
- shardVersionWanted.addToBSON( infoB, "vWanted" );
- error->setErrInfo( infoB.obj() );
-
- string errMsg = stream() << "stale shard version detected before write, received "
- << shardVersionRecvd.toString() << " but local version is "
- << shardVersionWanted.toString();
- error->setErrMessage( errMsg );
+static bool checkIsMasterForDatabase(const NamespaceString& ns, WriteOpResult* result) {
+ if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(ns)) {
+ WriteErrorDetail* errorDetail = new WriteErrorDetail;
+ result->setError(errorDetail);
+ errorDetail->setErrCode(ErrorCodes::NotMaster);
+ errorDetail->setErrMessage("Not primary while writing to " + ns.toString());
+ return false;
}
-
- static bool checkShardVersion(OperationContext* txn,
+ return true;
+}
+
+static void buildUniqueIndexError(const BSONObj& keyPattern,
+ const BSONObj& indexPattern,
+ WriteErrorDetail* error) {
+ error->setErrCode(ErrorCodes::CannotCreateIndex);
+ string errMsg = stream() << "cannot create unique index over " << indexPattern
+ << " with shard key pattern " << keyPattern;
+ error->setErrMessage(errMsg);
+}
+
+static bool checkIndexConstraints(OperationContext* txn,
ShardingState* shardingState,
const BatchedCommandRequest& request,
WriteOpResult* result) {
+ const NamespaceString& nss = request.getTargetingNSS();
+ dassert(txn->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IX));
- const NamespaceString& nss = request.getTargetingNSS();
- dassert(txn->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IX));
-
- ChunkVersion requestShardVersion =
- request.isMetadataSet() && request.getMetadata()->isShardVersionSet() ?
- request.getMetadata()->getShardVersion() : ChunkVersion::IGNORED();
-
- if ( shardingState->enabled() ) {
-
- CollectionMetadataPtr metadata = shardingState->getCollectionMetadata( nss.ns() );
+ if (!request.isUniqueIndexRequest())
+ return true;
- if ( !ChunkVersion::isIgnoredVersion( requestShardVersion ) ) {
+ if (shardingState->enabled()) {
+ CollectionMetadataPtr metadata = shardingState->getCollectionMetadata(nss.ns());
- ChunkVersion shardVersion =
- metadata ? metadata->getShardVersion() : ChunkVersion::UNSHARDED();
+ if (metadata) {
+ ShardKeyPattern shardKeyPattern(metadata->getKeyPattern());
+ if (!shardKeyPattern.isUniqueIndexCompatible(request.getIndexKeyPattern())) {
+ result->setError(new WriteErrorDetail);
+ buildUniqueIndexError(
+ metadata->getKeyPattern(), request.getIndexKeyPattern(), result->getError());
- if ( !requestShardVersion.isWriteCompatibleWith( shardVersion ) ) {
- result->setError(new WriteErrorDetail);
- buildStaleError(requestShardVersion, shardVersion, result->getError());
- return false;
- }
+ return false;
}
}
-
- return true;
- }
-
- static bool checkIsMasterForDatabase(const NamespaceString& ns, WriteOpResult* result) {
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(ns)) {
- WriteErrorDetail* errorDetail = new WriteErrorDetail;
- result->setError(errorDetail);
- errorDetail->setErrCode(ErrorCodes::NotMaster);
- errorDetail->setErrMessage("Not primary while writing to " + ns.toString());
- return false;
- }
- return true;
}
- static void buildUniqueIndexError( const BSONObj& keyPattern,
- const BSONObj& indexPattern,
- WriteErrorDetail* error ) {
- error->setErrCode( ErrorCodes::CannotCreateIndex );
- string errMsg = stream() << "cannot create unique index over " << indexPattern
- << " with shard key pattern " << keyPattern;
- error->setErrMessage( errMsg );
+ return true;
+}
+
+//
+// HELPERS FOR CUROP MANAGEMENT AND GLOBAL STATS
+//
+
+static void beginCurrentOp(OperationContext* txn, const BatchItemRef& currWrite) {
+ stdx::lock_guard<Client> lk(*txn->getClient());
+ CurOp* const currentOp = CurOp::get(txn);
+ currentOp->setOp_inlock(getOpCode(currWrite));
+ currentOp->ensureStarted();
+ currentOp->setNS_inlock(currWrite.getRequest()->getNS());
+
+ currentOp->debug().ns = currentOp->getNS();
+ currentOp->debug().op = currentOp->getOp();
+
+ if (currWrite.getOpType() == BatchedCommandRequest::BatchType_Insert) {
+ currentOp->setQuery_inlock(currWrite.getDocument());
+ currentOp->debug().query = currWrite.getDocument();
+ currentOp->debug().ninserted = 0;
+ } else if (currWrite.getOpType() == BatchedCommandRequest::BatchType_Update) {
+ currentOp->setQuery_inlock(currWrite.getUpdate()->getQuery());
+ currentOp->debug().query = currWrite.getUpdate()->getQuery();
+ currentOp->debug().updateobj = currWrite.getUpdate()->getUpdateExpr();
+ // Note: debug().nMatched, nModified and nmoved are set internally in update
+ } else {
+ dassert(currWrite.getOpType() == BatchedCommandRequest::BatchType_Delete);
+ currentOp->setQuery_inlock(currWrite.getDelete()->getQuery());
+ currentOp->debug().query = currWrite.getDelete()->getQuery();
+ currentOp->debug().ndeleted = 0;
}
-
- static bool checkIndexConstraints(OperationContext* txn,
- ShardingState* shardingState,
- const BatchedCommandRequest& request,
- WriteOpResult* result) {
-
- const NamespaceString& nss = request.getTargetingNSS();
- dassert(txn->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IX));
-
- if ( !request.isUniqueIndexRequest() )
- return true;
-
- if ( shardingState->enabled() ) {
-
- CollectionMetadataPtr metadata = shardingState->getCollectionMetadata( nss.ns() );
-
- if ( metadata ) {
- ShardKeyPattern shardKeyPattern(metadata->getKeyPattern());
- if (!shardKeyPattern.isUniqueIndexCompatible(request.getIndexKeyPattern())) {
-
- result->setError(new WriteErrorDetail);
- buildUniqueIndexError(metadata->getKeyPattern(),
- request.getIndexKeyPattern(),
- result->getError());
-
- return false;
- }
- }
- }
-
- return true;
+}
+
+void WriteBatchExecutor::incOpStats(const BatchItemRef& currWrite) {
+ if (currWrite.getOpType() == BatchedCommandRequest::BatchType_Insert) {
+ _opCounters->gotInsert();
+ } else if (currWrite.getOpType() == BatchedCommandRequest::BatchType_Update) {
+ _opCounters->gotUpdate();
+ } else {
+ dassert(currWrite.getOpType() == BatchedCommandRequest::BatchType_Delete);
+ _opCounters->gotDelete();
}
-
- //
- // HELPERS FOR CUROP MANAGEMENT AND GLOBAL STATS
- //
-
- static void beginCurrentOp(OperationContext* txn, const BatchItemRef& currWrite) {
-
- stdx::lock_guard<Client> lk(*txn->getClient());
- CurOp* const currentOp = CurOp::get(txn);
- currentOp->setOp_inlock(getOpCode(currWrite));
- currentOp->ensureStarted();
- currentOp->setNS_inlock( currWrite.getRequest()->getNS() );
-
- currentOp->debug().ns = currentOp->getNS();
- currentOp->debug().op = currentOp->getOp();
-
- if ( currWrite.getOpType() == BatchedCommandRequest::BatchType_Insert ) {
- currentOp->setQuery_inlock( currWrite.getDocument() );
- currentOp->debug().query = currWrite.getDocument();
- currentOp->debug().ninserted = 0;
+}
+
+void WriteBatchExecutor::incWriteStats(const BatchItemRef& currWrite,
+ const WriteOpStats& stats,
+ const WriteErrorDetail* error,
+ CurOp* currentOp) {
+ if (currWrite.getOpType() == BatchedCommandRequest::BatchType_Insert) {
+ _stats->numInserted += stats.n;
+ currentOp->debug().ninserted += stats.n;
+ if (!error) {
+ _le->recordInsert(stats.n);
}
- else if ( currWrite.getOpType() == BatchedCommandRequest::BatchType_Update ) {
- currentOp->setQuery_inlock( currWrite.getUpdate()->getQuery() );
- currentOp->debug().query = currWrite.getUpdate()->getQuery();
- currentOp->debug().updateobj = currWrite.getUpdate()->getUpdateExpr();
- // Note: debug().nMatched, nModified and nmoved are set internally in update
+ } else if (currWrite.getOpType() == BatchedCommandRequest::BatchType_Update) {
+ if (stats.upsertedID.isEmpty()) {
+ _stats->numMatched += stats.n;
+ _stats->numModified += stats.nModified;
+ } else {
+ ++_stats->numUpserted;
}
- else {
- dassert( currWrite.getOpType() == BatchedCommandRequest::BatchType_Delete );
- currentOp->setQuery_inlock( currWrite.getDelete()->getQuery() );
- currentOp->debug().query = currWrite.getDelete()->getQuery();
- currentOp->debug().ndeleted = 0;
- }
-
- }
- void WriteBatchExecutor::incOpStats( const BatchItemRef& currWrite ) {
-
- if ( currWrite.getOpType() == BatchedCommandRequest::BatchType_Insert ) {
- _opCounters->gotInsert();
- }
- else if ( currWrite.getOpType() == BatchedCommandRequest::BatchType_Update ) {
- _opCounters->gotUpdate();
+ if (!error) {
+ _le->recordUpdate(stats.upsertedID.isEmpty() && stats.n > 0, stats.n, stats.upsertedID);
}
- else {
- dassert( currWrite.getOpType() == BatchedCommandRequest::BatchType_Delete );
- _opCounters->gotDelete();
+ } else {
+ dassert(currWrite.getOpType() == BatchedCommandRequest::BatchType_Delete);
+ _stats->numDeleted += stats.n;
+ if (!error) {
+ _le->recordDelete(stats.n);
}
+ currentOp->debug().ndeleted += stats.n;
}
- void WriteBatchExecutor::incWriteStats( const BatchItemRef& currWrite,
- const WriteOpStats& stats,
- const WriteErrorDetail* error,
- CurOp* currentOp ) {
-
- if ( currWrite.getOpType() == BatchedCommandRequest::BatchType_Insert ) {
- _stats->numInserted += stats.n;
- currentOp->debug().ninserted += stats.n;
- if (!error) {
- _le->recordInsert(stats.n);
- }
- }
- else if ( currWrite.getOpType() == BatchedCommandRequest::BatchType_Update ) {
- if ( stats.upsertedID.isEmpty() ) {
- _stats->numMatched += stats.n;
- _stats->numModified += stats.nModified;
- }
- else {
- ++_stats->numUpserted;
- }
-
- if (!error) {
- _le->recordUpdate( stats.upsertedID.isEmpty() && stats.n > 0,
- stats.n,
- stats.upsertedID );
- }
- }
- else {
- dassert( currWrite.getOpType() == BatchedCommandRequest::BatchType_Delete );
- _stats->numDeleted += stats.n;
- if ( !error ) {
- _le->recordDelete( stats.n );
- }
- currentOp->debug().ndeleted += stats.n;
- }
-
- if (error) {
- _le->setLastError(error->getErrCode(), error->getErrMessage().c_str());
- }
+ if (error) {
+ _le->setLastError(error->getErrCode(), error->getErrMessage().c_str());
}
-
- static void finishCurrentOp(OperationContext* txn, WriteErrorDetail* opError) {
-
- CurOp* currentOp = CurOp::get(txn);
- currentOp->done();
- int executionTime = currentOp->debug().executionTime = currentOp->totalTimeMillis();
- recordCurOpMetrics(txn);
- Top::get(txn->getClient()->getServiceContext()).record(
- currentOp->getNS(),
+}
+
+static void finishCurrentOp(OperationContext* txn, WriteErrorDetail* opError) {
+ CurOp* currentOp = CurOp::get(txn);
+ currentOp->done();
+ int executionTime = currentOp->debug().executionTime = currentOp->totalTimeMillis();
+ recordCurOpMetrics(txn);
+ Top::get(txn->getClient()->getServiceContext())
+ .record(currentOp->getNS(),
currentOp->getOp(),
- 1, // "write locked"
+ 1, // "write locked"
currentOp->totalTimeMicros(),
currentOp->isCommand());
- if ( opError ) {
- currentOp->debug().exceptionInfo = ExceptionInfo( opError->getErrMessage(),
- opError->getErrCode() );
+ if (opError) {
+ currentOp->debug().exceptionInfo =
+ ExceptionInfo(opError->getErrMessage(), opError->getErrCode());
- LOG(3) << " Caught Assertion in " << opToString( currentOp->getOp() )
- << ", continuing " << causedBy( opError->getErrMessage() ) << endl;
- }
+ LOG(3) << " Caught Assertion in " << opToString(currentOp->getOp()) << ", continuing "
+ << causedBy(opError->getErrMessage()) << endl;
+ }
- bool logAll = logger::globalLogDomain()->shouldLog(logger::LogComponent::kWrite,
- logger::LogSeverity::Debug(1));
- bool logSlow = executionTime
- > ( serverGlobalParams.slowMS + currentOp->getExpectedLatencyMs() );
+ bool logAll = logger::globalLogDomain()->shouldLog(logger::LogComponent::kWrite,
+ logger::LogSeverity::Debug(1));
+ bool logSlow = executionTime > (serverGlobalParams.slowMS + currentOp->getExpectedLatencyMs());
- if ( logAll || logSlow ) {
- Locker::LockerInfo lockerInfo;
- txn->lockState()->getLockerInfo(&lockerInfo);
+ if (logAll || logSlow) {
+ Locker::LockerInfo lockerInfo;
+ txn->lockState()->getLockerInfo(&lockerInfo);
- LOG(0) << currentOp->debug().report(*currentOp, lockerInfo.stats);
- }
+ LOG(0) << currentOp->debug().report(*currentOp, lockerInfo.stats);
+ }
- if (currentOp->shouldDBProfile(executionTime)) {
- profile(txn, CurOp::get(txn)->getOp());
- }
+ if (currentOp->shouldDBProfile(executionTime)) {
+ profile(txn, CurOp::get(txn)->getOp());
}
+}
- // END HELPERS
+// END HELPERS
- //
- // CORE WRITE OPERATIONS (declaration)
- // These functions write to the database and return stats and zero or one of:
- // - page fault
- // - error
- //
+//
+// CORE WRITE OPERATIONS (declaration)
+// These functions write to the database and return stats and zero or one of:
+// - page fault
+// - error
+//
- static void singleInsert( OperationContext* txn,
- const BSONObj& docToInsert,
- Collection* collection,
- WriteOpResult* result );
+static void singleInsert(OperationContext* txn,
+ const BSONObj& docToInsert,
+ Collection* collection,
+ WriteOpResult* result);
- static void singleCreateIndex( OperationContext* txn,
- const BSONObj& indexDesc,
- WriteOpResult* result );
+static void singleCreateIndex(OperationContext* txn,
+ const BSONObj& indexDesc,
+ WriteOpResult* result);
- static void multiUpdate( OperationContext* txn,
- const BatchItemRef& updateItem,
- WriteOpResult* result );
+static void multiUpdate(OperationContext* txn,
+ const BatchItemRef& updateItem,
+ WriteOpResult* result);
- static void multiRemove( OperationContext* txn,
- const BatchItemRef& removeItem,
- WriteOpResult* result );
+static void multiRemove(OperationContext* txn,
+ const BatchItemRef& removeItem,
+ WriteOpResult* result);
- //
- // WRITE EXECUTION
- // In general, the exec* operations manage db lock state and stats before dispatching to the
- // core write operations, which are *only* responsible for performing a write and reporting
- // success or failure.
- //
+//
+// WRITE EXECUTION
+// In general, the exec* operations manage db lock state and stats before dispatching to the
+// core write operations, which are *only* responsible for performing a write and reporting
+// success or failure.
+//
+
+/**
+ * Representation of the execution state of execInserts. Used by a single
+ * execution of execInserts in a single thread.
+ */
+class WriteBatchExecutor::ExecInsertsState {
+ MONGO_DISALLOW_COPYING(ExecInsertsState);
+public:
/**
- * Representation of the execution state of execInserts. Used by a single
- * execution of execInserts in a single thread.
+ * Constructs a new instance, for performing inserts described in "aRequest".
*/
- class WriteBatchExecutor::ExecInsertsState {
- MONGO_DISALLOW_COPYING(ExecInsertsState);
- public:
- /**
- * Constructs a new instance, for performing inserts described in "aRequest".
- */
- explicit ExecInsertsState(OperationContext* txn,
- const BatchedCommandRequest* aRequest);
-
- /**
- * Acquires the write lock and client context needed to perform the current write operation.
- * Returns true on success, after which it is safe to use the "context" and "collection"
- * members. It is safe to call this function if this instance already holds the write lock.
- *
- * On failure, writeLock, context and collection will be NULL/clear.
- */
- bool lockAndCheck(WriteOpResult* result);
-
- /**
- * Releases the client context and write lock acquired by lockAndCheck. Safe to call
- * regardless of whether or not this state object currently owns the lock.
- */
- void unlock();
-
- /**
- * Returns true if this executor has the lock on the target database.
- */
- bool hasLock() { return _dbLock.get(); }
-
- /**
- * Gets the target collection for the batch operation. Value is undefined
- * unless hasLock() is true.
- */
- Collection* getCollection() { return _collection; }
-
- OperationContext* txn;
-
- // Request object describing the inserts.
- const BatchedCommandRequest* request;
-
- // Index of the current insert operation to perform.
- size_t currIndex = 0;
-
- // Translation of insert documents in "request" into insert-ready forms. This vector has a
- // correspondence with elements of the "request", and "currIndex" is used to
- // index both.
- std::vector<StatusWith<BSONObj> > normalizedInserts;
-
- private:
- bool _lockAndCheckImpl(WriteOpResult* result, bool intentLock);
-
- ScopedTransaction _transaction;
- // Guard object for the write lock on the target database.
- std::unique_ptr<Lock::DBLock> _dbLock;
- std::unique_ptr<Lock::CollectionLock> _collLock;
-
- Database* _database = nullptr;
- Collection* _collection = nullptr;
- };
-
- void WriteBatchExecutor::bulkExecute( const BatchedCommandRequest& request,
- std::vector<BatchedUpsertDetail*>* upsertedIds,
- std::vector<WriteErrorDetail*>* errors ) {
- boost::optional<DisableDocumentValidation> maybeDisableValidation;
- if (request.shouldBypassValidation()) {
- maybeDisableValidation.emplace(_txn);
- }
+ explicit ExecInsertsState(OperationContext* txn, const BatchedCommandRequest* aRequest);
- if ( request.getBatchType() == BatchedCommandRequest::BatchType_Insert ) {
- execInserts( request, errors );
- }
- else if ( request.getBatchType() == BatchedCommandRequest::BatchType_Update ) {
- for ( size_t i = 0; i < request.sizeWriteOps(); i++ ) {
+ /**
+ * Acquires the write lock and client context needed to perform the current write operation.
+ * Returns true on success, after which it is safe to use the "context" and "collection"
+ * members. It is safe to call this function if this instance already holds the write lock.
+ *
+ * On failure, writeLock, context and collection will be NULL/clear.
+ */
+ bool lockAndCheck(WriteOpResult* result);
- if ( i + 1 == request.sizeWriteOps() ) {
- setupSynchronousCommit( _txn );
- }
+ /**
+ * Releases the client context and write lock acquired by lockAndCheck. Safe to call
+ * regardless of whether or not this state object currently owns the lock.
+ */
+ void unlock();
- WriteErrorDetail* error = NULL;
- BSONObj upsertedId;
- execUpdate( BatchItemRef( &request, i ), &upsertedId, &error );
+ /**
+ * Returns true if this executor has the lock on the target database.
+ */
+ bool hasLock() {
+ return _dbLock.get();
+ }
- if ( !upsertedId.isEmpty() ) {
- BatchedUpsertDetail* batchUpsertedId = new BatchedUpsertDetail;
- batchUpsertedId->setIndex( i );
- batchUpsertedId->setUpsertedID( upsertedId );
- upsertedIds->push_back( batchUpsertedId );
- }
+ /**
+ * Gets the target collection for the batch operation. Value is undefined
+ * unless hasLock() is true.
+ */
+ Collection* getCollection() {
+ return _collection;
+ }
- if ( error ) {
- errors->push_back( error );
- if ( request.getOrdered() )
- break;
- }
- }
- }
- else {
- dassert( request.getBatchType() == BatchedCommandRequest::BatchType_Delete );
- for ( size_t i = 0; i < request.sizeWriteOps(); i++ ) {
+ OperationContext* txn;
- if ( i + 1 == request.sizeWriteOps() ) {
- setupSynchronousCommit( _txn );
- }
+ // Request object describing the inserts.
+ const BatchedCommandRequest* request;
- WriteErrorDetail* error = NULL;
- execRemove( BatchItemRef( &request, i ), &error );
+ // Index of the current insert operation to perform.
+ size_t currIndex = 0;
- if ( error ) {
- errors->push_back( error );
- if ( request.getOrdered() )
- break;
- }
- }
- }
-
- // Fill in stale version errors for unordered batches (update/delete can't do this on own)
- if ( !errors->empty() && !request.getOrdered() ) {
+ // Translation of insert documents in "request" into insert-ready forms. This vector has a
+ // correspondence with elements of the "request", and "currIndex" is used to
+ // index both.
+ std::vector<StatusWith<BSONObj>> normalizedInserts;
- const WriteErrorDetail* finalError = errors->back();
+private:
+ bool _lockAndCheckImpl(WriteOpResult* result, bool intentLock);
- if ( finalError->getErrCode() == ErrorCodes::StaleShardVersion ) {
- for ( size_t i = finalError->getIndex() + 1; i < request.sizeWriteOps(); i++ ) {
- WriteErrorDetail* dupStaleError = new WriteErrorDetail;
- finalError->cloneTo( dupStaleError );
- errors->push_back( dupStaleError );
- }
- }
- }
- }
+ ScopedTransaction _transaction;
+ // Guard object for the write lock on the target database.
+ std::unique_ptr<Lock::DBLock> _dbLock;
+ std::unique_ptr<Lock::CollectionLock> _collLock;
- // Goes over the request and preprocesses normalized versions of all the inserts in the request
- static void normalizeInserts( const BatchedCommandRequest& request,
- vector<StatusWith<BSONObj> >* normalizedInserts ) {
+ Database* _database = nullptr;
+ Collection* _collection = nullptr;
+};
- normalizedInserts->reserve(request.sizeWriteOps());
- for ( size_t i = 0; i < request.sizeWriteOps(); ++i ) {
- BSONObj insertDoc = request.getInsertRequest()->getDocumentsAt( i );
- StatusWith<BSONObj> normalInsert = fixDocumentForInsert( insertDoc );
- normalizedInserts->push_back( normalInsert );
- if ( request.getOrdered() && !normalInsert.isOK() )
- break;
- }
+void WriteBatchExecutor::bulkExecute(const BatchedCommandRequest& request,
+ std::vector<BatchedUpsertDetail*>* upsertedIds,
+ std::vector<WriteErrorDetail*>* errors) {
+ boost::optional<DisableDocumentValidation> maybeDisableValidation;
+ if (request.shouldBypassValidation()) {
+ maybeDisableValidation.emplace(_txn);
}
- void WriteBatchExecutor::execInserts( const BatchedCommandRequest& request,
- std::vector<WriteErrorDetail*>* errors ) {
-
- // Theory of operation:
- //
- // Instantiates an ExecInsertsState, which represents all of the state involved in the batch
- // insert execution algorithm. Most importantly, encapsulates the lock state.
- //
- // Every iteration of the loop in execInserts() processes one document insertion, by calling
- // insertOne() exactly once for a given value of state.currIndex.
- //
- // If the ExecInsertsState indicates that the requisite write locks are not held, insertOne
- // acquires them and performs lock-acquisition-time checks. However, on non-error
- // execution, it does not release the locks. Therefore, the yielding logic in the while
- // loop in execInserts() is solely responsible for lock release in the non-error case.
- //
- // Internally, insertOne loops performing the single insert until it completes without a
- // PageFaultException, or until it fails with some kind of error. Errors are mostly
- // propagated via the request->error field, but DBExceptions or std::exceptions may escape,
- // particularly on operation interruption. These kinds of errors necessarily prevent
- // further insertOne calls, and stop the batch. As a result, the only expected source of
- // such exceptions are interruptions.
- ExecInsertsState state(_txn, &request);
- normalizeInserts(request, &state.normalizedInserts);
-
- ShardedConnectionInfo* info = ShardedConnectionInfo::get(_txn->getClient(), false);
- if (info) {
- if (request.isMetadataSet() && request.getMetadata()->isShardVersionSet()) {
- info->setVersion(request.getTargetingNS(),
- request.getMetadata()->getShardVersion());
- }
- else {
- info->setVersion(request.getTargetingNS(), ChunkVersion::IGNORED());
- }
- }
-
- // Yield frequency is based on the same constants used by PlanYieldPolicy.
- ElapsedTracker elapsedTracker(internalQueryExecYieldIterations,
- internalQueryExecYieldPeriodMS);
-
- for (state.currIndex = 0;
- state.currIndex < state.request->sizeWriteOps();
- ++state.currIndex) {
-
- if (state.currIndex + 1 == state.request->sizeWriteOps()) {
+ if (request.getBatchType() == BatchedCommandRequest::BatchType_Insert) {
+ execInserts(request, errors);
+ } else if (request.getBatchType() == BatchedCommandRequest::BatchType_Update) {
+ for (size_t i = 0; i < request.sizeWriteOps(); i++) {
+ if (i + 1 == request.sizeWriteOps()) {
setupSynchronousCommit(_txn);
}
- if (elapsedTracker.intervalHasElapsed()) {
- // Yield between inserts.
- if (state.hasLock()) {
- // Release our locks. They get reacquired when insertOne() calls
- // ExecInsertsState::lockAndCheck(). Since the lock manager guarantees FIFO
- // queues waiting on locks, there is no need to explicitly sleep or give up
- // control of the processor here.
- state.unlock();
-
- // This releases any storage engine held locks/snapshots.
- _txn->recoveryUnit()->abandonSnapshot();
- }
+ WriteErrorDetail* error = NULL;
+ BSONObj upsertedId;
+ execUpdate(BatchItemRef(&request, i), &upsertedId, &error);
+
+ if (!upsertedId.isEmpty()) {
+ BatchedUpsertDetail* batchUpsertedId = new BatchedUpsertDetail;
+ batchUpsertedId->setIndex(i);
+ batchUpsertedId->setUpsertedID(upsertedId);
+ upsertedIds->push_back(batchUpsertedId);
+ }
- _txn->checkForInterrupt();
- elapsedTracker.resetLastTime();
+ if (error) {
+ errors->push_back(error);
+ if (request.getOrdered())
+ break;
+ }
+ }
+ } else {
+ dassert(request.getBatchType() == BatchedCommandRequest::BatchType_Delete);
+ for (size_t i = 0; i < request.sizeWriteOps(); i++) {
+ if (i + 1 == request.sizeWriteOps()) {
+ setupSynchronousCommit(_txn);
}
WriteErrorDetail* error = NULL;
- execOneInsert(&state, &error);
+ execRemove(BatchItemRef(&request, i), &error);
+
if (error) {
errors->push_back(error);
- error->setIndex(state.currIndex);
if (request.getOrdered())
- return;
+ break;
}
}
}
- void WriteBatchExecutor::execUpdate( const BatchItemRef& updateItem,
- BSONObj* upsertedId,
- WriteErrorDetail** error ) {
-
- // BEGIN CURRENT OP
- CurOp currentOp(_txn);
- beginCurrentOp(_txn, updateItem);
- incOpStats( updateItem );
-
- ShardedConnectionInfo* info = ShardedConnectionInfo::get(_txn->getClient(), false);
- if (info) {
- auto rootRequest = updateItem.getRequest();
- if (!updateItem.getUpdate()->getMulti() &&
- rootRequest->isMetadataSet() &&
- rootRequest->getMetadata()->isShardVersionSet()) {
- info->setVersion(rootRequest->getTargetingNS(),
- rootRequest->getMetadata()->getShardVersion());
- }
- else {
- info->setVersion(rootRequest->getTargetingNS(), ChunkVersion::IGNORED());
+ // Fill in stale version errors for unordered batches (update/delete can't do this on own)
+ if (!errors->empty() && !request.getOrdered()) {
+ const WriteErrorDetail* finalError = errors->back();
+
+ if (finalError->getErrCode() == ErrorCodes::StaleShardVersion) {
+ for (size_t i = finalError->getIndex() + 1; i < request.sizeWriteOps(); i++) {
+ WriteErrorDetail* dupStaleError = new WriteErrorDetail;
+ finalError->cloneTo(dupStaleError);
+ errors->push_back(dupStaleError);
}
}
+ }
+}
+
+// Goes over the request and preprocesses normalized versions of all the inserts in the request
+static void normalizeInserts(const BatchedCommandRequest& request,
+ vector<StatusWith<BSONObj>>* normalizedInserts) {
+ normalizedInserts->reserve(request.sizeWriteOps());
+ for (size_t i = 0; i < request.sizeWriteOps(); ++i) {
+ BSONObj insertDoc = request.getInsertRequest()->getDocumentsAt(i);
+ StatusWith<BSONObj> normalInsert = fixDocumentForInsert(insertDoc);
+ normalizedInserts->push_back(normalInsert);
+ if (request.getOrdered() && !normalInsert.isOK())
+ break;
+ }
+}
- WriteOpResult result;
-
- multiUpdate( _txn, updateItem, &result );
-
- if ( !result.getStats().upsertedID.isEmpty() ) {
- *upsertedId = result.getStats().upsertedID;
+void WriteBatchExecutor::execInserts(const BatchedCommandRequest& request,
+ std::vector<WriteErrorDetail*>* errors) {
+ // Theory of operation:
+ //
+ // Instantiates an ExecInsertsState, which represents all of the state involved in the batch
+ // insert execution algorithm. Most importantly, encapsulates the lock state.
+ //
+ // Every iteration of the loop in execInserts() processes one document insertion, by calling
+ // insertOne() exactly once for a given value of state.currIndex.
+ //
+ // If the ExecInsertsState indicates that the requisite write locks are not held, insertOne
+ // acquires them and performs lock-acquisition-time checks. However, on non-error
+ // execution, it does not release the locks. Therefore, the yielding logic in the while
+ // loop in execInserts() is solely responsible for lock release in the non-error case.
+ //
+ // Internally, insertOne loops performing the single insert until it completes without a
+ // PageFaultException, or until it fails with some kind of error. Errors are mostly
+ // propagated via the request->error field, but DBExceptions or std::exceptions may escape,
+ // particularly on operation interruption. These kinds of errors necessarily prevent
+ // further insertOne calls, and stop the batch. As a result, the only expected source of
+ // such exceptions are interruptions.
+ ExecInsertsState state(_txn, &request);
+ normalizeInserts(request, &state.normalizedInserts);
+
+ ShardedConnectionInfo* info = ShardedConnectionInfo::get(_txn->getClient(), false);
+ if (info) {
+ if (request.isMetadataSet() && request.getMetadata()->isShardVersionSet()) {
+ info->setVersion(request.getTargetingNS(), request.getMetadata()->getShardVersion());
+ } else {
+ info->setVersion(request.getTargetingNS(), ChunkVersion::IGNORED());
}
- // END CURRENT OP
- incWriteStats( updateItem, result.getStats(), result.getError(), &currentOp );
- finishCurrentOp(_txn, result.getError());
+ }
- // End current transaction and release snapshot.
- _txn->recoveryUnit()->abandonSnapshot();
+ // Yield frequency is based on the same constants used by PlanYieldPolicy.
+ ElapsedTracker elapsedTracker(internalQueryExecYieldIterations, internalQueryExecYieldPeriodMS);
- if ( result.getError() ) {
- result.getError()->setIndex( updateItem.getItemIndex() );
- *error = result.releaseError();
+ for (state.currIndex = 0; state.currIndex < state.request->sizeWriteOps(); ++state.currIndex) {
+ if (state.currIndex + 1 == state.request->sizeWriteOps()) {
+ setupSynchronousCommit(_txn);
}
- }
-
- void WriteBatchExecutor::execRemove( const BatchItemRef& removeItem,
- WriteErrorDetail** error ) {
- // Removes are similar to updates, but page faults are handled externally
+ if (elapsedTracker.intervalHasElapsed()) {
+ // Yield between inserts.
+ if (state.hasLock()) {
+ // Release our locks. They get reacquired when insertOne() calls
+ // ExecInsertsState::lockAndCheck(). Since the lock manager guarantees FIFO
+ // queues waiting on locks, there is no need to explicitly sleep or give up
+ // control of the processor here.
+ state.unlock();
+
+ // This releases any storage engine held locks/snapshots.
+ _txn->recoveryUnit()->abandonSnapshot();
+ }
- // BEGIN CURRENT OP
- CurOp currentOp(_txn);
- beginCurrentOp(_txn, removeItem);
- incOpStats( removeItem );
+ _txn->checkForInterrupt();
+ elapsedTracker.resetLastTime();
+ }
- ShardedConnectionInfo* info = ShardedConnectionInfo::get(_txn->getClient(), false);
- if (info) {
- auto rootRequest = removeItem.getRequest();
- if (removeItem.getDelete()->getLimit() == 1 &&
- rootRequest->isMetadataSet() &&
- rootRequest->getMetadata()->isShardVersionSet()) {
- info->setVersion(rootRequest->getTargetingNS(),
- rootRequest->getMetadata()->getShardVersion());
- }
- else {
- info->setVersion(rootRequest->getTargetingNS(), ChunkVersion::IGNORED());
- }
+ WriteErrorDetail* error = NULL;
+ execOneInsert(&state, &error);
+ if (error) {
+ errors->push_back(error);
+ error->setIndex(state.currIndex);
+ if (request.getOrdered())
+ return;
+ }
+ }
+}
+
+void WriteBatchExecutor::execUpdate(const BatchItemRef& updateItem,
+ BSONObj* upsertedId,
+ WriteErrorDetail** error) {
+ // BEGIN CURRENT OP
+ CurOp currentOp(_txn);
+ beginCurrentOp(_txn, updateItem);
+ incOpStats(updateItem);
+
+ ShardedConnectionInfo* info = ShardedConnectionInfo::get(_txn->getClient(), false);
+ if (info) {
+ auto rootRequest = updateItem.getRequest();
+ if (!updateItem.getUpdate()->getMulti() && rootRequest->isMetadataSet() &&
+ rootRequest->getMetadata()->isShardVersionSet()) {
+ info->setVersion(rootRequest->getTargetingNS(),
+ rootRequest->getMetadata()->getShardVersion());
+ } else {
+ info->setVersion(rootRequest->getTargetingNS(), ChunkVersion::IGNORED());
}
+ }
- WriteOpResult result;
+ WriteOpResult result;
- multiRemove( _txn, removeItem, &result );
+ multiUpdate(_txn, updateItem, &result);
- // END CURRENT OP
- incWriteStats( removeItem, result.getStats(), result.getError(), &currentOp );
- finishCurrentOp(_txn, result.getError());
+ if (!result.getStats().upsertedID.isEmpty()) {
+ *upsertedId = result.getStats().upsertedID;
+ }
+ // END CURRENT OP
+ incWriteStats(updateItem, result.getStats(), result.getError(), &currentOp);
+ finishCurrentOp(_txn, result.getError());
- // End current transaction and release snapshot.
- _txn->recoveryUnit()->abandonSnapshot();
+ // End current transaction and release snapshot.
+ _txn->recoveryUnit()->abandonSnapshot();
- if ( result.getError() ) {
- result.getError()->setIndex( removeItem.getItemIndex() );
- *error = result.releaseError();
+ if (result.getError()) {
+ result.getError()->setIndex(updateItem.getItemIndex());
+ *error = result.releaseError();
+ }
+}
+
+void WriteBatchExecutor::execRemove(const BatchItemRef& removeItem, WriteErrorDetail** error) {
+ // Removes are similar to updates, but page faults are handled externally
+
+ // BEGIN CURRENT OP
+ CurOp currentOp(_txn);
+ beginCurrentOp(_txn, removeItem);
+ incOpStats(removeItem);
+
+ ShardedConnectionInfo* info = ShardedConnectionInfo::get(_txn->getClient(), false);
+ if (info) {
+ auto rootRequest = removeItem.getRequest();
+ if (removeItem.getDelete()->getLimit() == 1 && rootRequest->isMetadataSet() &&
+ rootRequest->getMetadata()->isShardVersionSet()) {
+ info->setVersion(rootRequest->getTargetingNS(),
+ rootRequest->getMetadata()->getShardVersion());
+ } else {
+ info->setVersion(rootRequest->getTargetingNS(), ChunkVersion::IGNORED());
}
}
- //
- // IN-DB-LOCK CORE OPERATIONS
- //
+ WriteOpResult result;
+
+ multiRemove(_txn, removeItem, &result);
+
+ // END CURRENT OP
+ incWriteStats(removeItem, result.getStats(), result.getError(), &currentOp);
+ finishCurrentOp(_txn, result.getError());
+
+ // End current transaction and release snapshot.
+ _txn->recoveryUnit()->abandonSnapshot();
- WriteBatchExecutor::ExecInsertsState::ExecInsertsState(OperationContext* txn,
- const BatchedCommandRequest* aRequest) :
- txn(txn),
- request(aRequest),
- _transaction(txn, MODE_IX) {
+ if (result.getError()) {
+ result.getError()->setIndex(removeItem.getItemIndex());
+ *error = result.releaseError();
}
+}
- bool WriteBatchExecutor::ExecInsertsState::_lockAndCheckImpl(WriteOpResult* result,
- bool intentLock) {
- if (hasLock()) {
- CurOp::get(txn)->raiseDbProfileLevel(_database->getProfilingLevel());
- return true;
- }
+//
+// IN-DB-LOCK CORE OPERATIONS
+//
- if (request->isInsertIndexRequest())
- intentLock = false; // can't build indexes in intent mode
-
- const NamespaceString& nss = request->getNSS();
- invariant(!_collLock);
- invariant(!_dbLock);
- _dbLock = stdx::make_unique<Lock::DBLock>(txn->lockState(),
- nss.db(),
- intentLock ? MODE_IX : MODE_X);
- _database = dbHolder().get(txn, nss.ns());
- if (intentLock && !_database) {
- // Ensure exclusive lock in case the database doesn't yet exist
- _dbLock.reset();
- _dbLock = stdx::make_unique<Lock::DBLock>(txn->lockState(), nss.db(), MODE_X);
- intentLock = false;
- }
- _collLock = stdx::make_unique<Lock::CollectionLock>(txn->lockState(),
- nss.ns(),
- intentLock ? MODE_IX : MODE_X);
- if (!checkIsMasterForDatabase(nss, result)) {
- return false;
- }
- if (!checkShardVersion(txn, &shardingState, *request, result)) {
- return false;
- }
- if (!checkIndexConstraints(txn, &shardingState, *request, result)) {
- return false;
- }
+WriteBatchExecutor::ExecInsertsState::ExecInsertsState(OperationContext* txn,
+ const BatchedCommandRequest* aRequest)
+ : txn(txn), request(aRequest), _transaction(txn, MODE_IX) {}
- if (!_database) {
- invariant(!intentLock);
- _database = dbHolder().openDb(txn, nss.ns());
- }
+bool WriteBatchExecutor::ExecInsertsState::_lockAndCheckImpl(WriteOpResult* result,
+ bool intentLock) {
+ if (hasLock()) {
CurOp::get(txn)->raiseDbProfileLevel(_database->getProfilingLevel());
- _collection = _database->getCollection(request->getTargetingNS());
- if (!_collection) {
- if (intentLock) {
- // try again with full X lock.
- unlock();
- return _lockAndCheckImpl(result, false);
- }
-
- WriteUnitOfWork wunit (txn);
- // Implicitly create if it doesn't exist
- _collection = _database->createCollection(txn, request->getTargetingNS());
- if (!_collection) {
- result->setError(
- toWriteError(Status(ErrorCodes::InternalError,
- "could not create collection " +
- request->getTargetingNS())));
- return false;
- }
- wunit.commit();
- }
return true;
}
- bool WriteBatchExecutor::ExecInsertsState::lockAndCheck(WriteOpResult* result) {
- if (_lockAndCheckImpl(result, true))
- return true;
- unlock();
+ if (request->isInsertIndexRequest())
+ intentLock = false; // can't build indexes in intent mode
+
+ const NamespaceString& nss = request->getNSS();
+ invariant(!_collLock);
+ invariant(!_dbLock);
+ _dbLock =
+ stdx::make_unique<Lock::DBLock>(txn->lockState(), nss.db(), intentLock ? MODE_IX : MODE_X);
+ _database = dbHolder().get(txn, nss.ns());
+ if (intentLock && !_database) {
+ // Ensure exclusive lock in case the database doesn't yet exist
+ _dbLock.reset();
+ _dbLock = stdx::make_unique<Lock::DBLock>(txn->lockState(), nss.db(), MODE_X);
+ intentLock = false;
+ }
+ _collLock = stdx::make_unique<Lock::CollectionLock>(
+ txn->lockState(), nss.ns(), intentLock ? MODE_IX : MODE_X);
+ if (!checkIsMasterForDatabase(nss, result)) {
return false;
}
-
- void WriteBatchExecutor::ExecInsertsState::unlock() {
- _collection = nullptr;
- _database = nullptr;
- _collLock.reset();
- _dbLock.reset();
+ if (!checkShardVersion(txn, &shardingState, *request, result)) {
+ return false;
+ }
+ if (!checkIndexConstraints(txn, &shardingState, *request, result)) {
+ return false;
}
- static void insertOne(WriteBatchExecutor::ExecInsertsState* state, WriteOpResult* result) {
- // we have to be top level so we can retry
- invariant(!state->txn->lockState()->inAWriteUnitOfWork() );
- invariant(state->currIndex < state->normalizedInserts.size());
-
- const StatusWith<BSONObj>& normalizedInsert(state->normalizedInserts[state->currIndex]);
+ if (!_database) {
+ invariant(!intentLock);
+ _database = dbHolder().openDb(txn, nss.ns());
+ }
+ CurOp::get(txn)->raiseDbProfileLevel(_database->getProfilingLevel());
+ _collection = _database->getCollection(request->getTargetingNS());
+ if (!_collection) {
+ if (intentLock) {
+ // try again with full X lock.
+ unlock();
+ return _lockAndCheckImpl(result, false);
+ }
- if (!normalizedInsert.isOK()) {
- result->setError(toWriteError(normalizedInsert.getStatus()));
- return;
+ WriteUnitOfWork wunit(txn);
+ // Implicitly create if it doesn't exist
+ _collection = _database->createCollection(txn, request->getTargetingNS());
+ if (!_collection) {
+ result->setError(
+ toWriteError(Status(ErrorCodes::InternalError,
+ "could not create collection " + request->getTargetingNS())));
+ return false;
}
+ wunit.commit();
+ }
+ return true;
+}
- const BSONObj& insertDoc = normalizedInsert.getValue().isEmpty() ?
- state->request->getInsertRequest()->getDocumentsAt( state->currIndex ) :
- normalizedInsert.getValue();
+bool WriteBatchExecutor::ExecInsertsState::lockAndCheck(WriteOpResult* result) {
+ if (_lockAndCheckImpl(result, true))
+ return true;
+ unlock();
+ return false;
+}
+
+void WriteBatchExecutor::ExecInsertsState::unlock() {
+ _collection = nullptr;
+ _database = nullptr;
+ _collLock.reset();
+ _dbLock.reset();
+}
+
+static void insertOne(WriteBatchExecutor::ExecInsertsState* state, WriteOpResult* result) {
+ // we have to be top level so we can retry
+ invariant(!state->txn->lockState()->inAWriteUnitOfWork());
+ invariant(state->currIndex < state->normalizedInserts.size());
+
+ const StatusWith<BSONObj>& normalizedInsert(state->normalizedInserts[state->currIndex]);
+
+ if (!normalizedInsert.isOK()) {
+ result->setError(toWriteError(normalizedInsert.getStatus()));
+ return;
+ }
- int attempt = 0;
- while (true) {
- try {
- if (!state->request->isInsertIndexRequest()) {
- if (state->lockAndCheck(result)) {
- singleInsert(state->txn, insertDoc, state->getCollection(), result);
- }
- }
- else {
- singleCreateIndex(state->txn, insertDoc, result);
+ const BSONObj& insertDoc = normalizedInsert.getValue().isEmpty()
+ ? state->request->getInsertRequest()->getDocumentsAt(state->currIndex)
+ : normalizedInsert.getValue();
+
+ int attempt = 0;
+ while (true) {
+ try {
+ if (!state->request->isInsertIndexRequest()) {
+ if (state->lockAndCheck(result)) {
+ singleInsert(state->txn, insertDoc, state->getCollection(), result);
}
- break;
- }
- catch ( const WriteConflictException& wce ) {
- state->unlock();
- CurOp::get(state->txn)->debug().writeConflicts++;
- state->txn->recoveryUnit()->abandonSnapshot();
- WriteConflictException::logAndBackoff( attempt++,
- "insert",
- state->getCollection() ?
- state->getCollection()->ns().ns() :
- "index" );
- }
- catch (const StaleConfigException& staleExcep) {
- result->setError(new WriteErrorDetail);
- result->getError()->setErrCode(ErrorCodes::StaleShardVersion);
- buildStaleError(staleExcep.getVersionReceived(),
- staleExcep.getVersionWanted(),
- result->getError());
- break;
+ } else {
+ singleCreateIndex(state->txn, insertDoc, result);
}
- catch (const DBException& ex) {
- Status status(ex.toStatus());
- if (ErrorCodes::isInterruption(status.code()))
- throw;
- result->setError(toWriteError(status));
- break;
- }
- }
-
- // Errors release the write lock, as a matter of policy.
- if (result->getError()) {
- state->txn->recoveryUnit()->abandonSnapshot();
+ break;
+ } catch (const WriteConflictException& wce) {
state->unlock();
+ CurOp::get(state->txn)->debug().writeConflicts++;
+ state->txn->recoveryUnit()->abandonSnapshot();
+ WriteConflictException::logAndBackoff(
+ attempt++,
+ "insert",
+ state->getCollection() ? state->getCollection()->ns().ns() : "index");
+ } catch (const StaleConfigException& staleExcep) {
+ result->setError(new WriteErrorDetail);
+ result->getError()->setErrCode(ErrorCodes::StaleShardVersion);
+ buildStaleError(
+ staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError());
+ break;
+ } catch (const DBException& ex) {
+ Status status(ex.toStatus());
+ if (ErrorCodes::isInterruption(status.code()))
+ throw;
+ result->setError(toWriteError(status));
+ break;
}
}
- void WriteBatchExecutor::execOneInsert(ExecInsertsState* state, WriteErrorDetail** error) {
- BatchItemRef currInsertItem(state->request, state->currIndex);
- CurOp currentOp(_txn);
- beginCurrentOp(_txn, currInsertItem);
- incOpStats(currInsertItem);
+ // Errors release the write lock, as a matter of policy.
+ if (result->getError()) {
+ state->txn->recoveryUnit()->abandonSnapshot();
+ state->unlock();
+ }
+}
- WriteOpResult result;
- insertOne(state, &result);
+void WriteBatchExecutor::execOneInsert(ExecInsertsState* state, WriteErrorDetail** error) {
+ BatchItemRef currInsertItem(state->request, state->currIndex);
+ CurOp currentOp(_txn);
+ beginCurrentOp(_txn, currInsertItem);
+ incOpStats(currInsertItem);
- incWriteStats(currInsertItem,
- result.getStats(),
- result.getError(),
- &currentOp);
- finishCurrentOp(_txn, result.getError());
+ WriteOpResult result;
+ insertOne(state, &result);
- if (result.getError()) {
- *error = result.releaseError();
- }
- }
+ incWriteStats(currInsertItem, result.getStats(), result.getError(), &currentOp);
+ finishCurrentOp(_txn, result.getError());
- /**
- * Perform a single insert into a collection. Requires the insert be preprocessed and the
- * collection already has been created.
- *
- * Might fault or error, otherwise populates the result.
- */
- static void singleInsert( OperationContext* txn,
- const BSONObj& docToInsert,
- Collection* collection,
- WriteOpResult* result ) {
-
- const string& insertNS = collection->ns().ns();
- invariant(txn->lockState()->isCollectionLockedForMode(insertNS, MODE_IX));
+ if (result.getError()) {
+ *error = result.releaseError();
+ }
+}
- WriteUnitOfWork wunit(txn);
- StatusWith<RecordId> status = collection->insertDocument( txn, docToInsert, true );
+/**
+ * Perform a single insert into a collection. Requires the insert be preprocessed and the
+ * collection already has been created.
+ *
+ * Might fault or error, otherwise populates the result.
+ */
+static void singleInsert(OperationContext* txn,
+ const BSONObj& docToInsert,
+ Collection* collection,
+ WriteOpResult* result) {
+ const string& insertNS = collection->ns().ns();
+ invariant(txn->lockState()->isCollectionLockedForMode(insertNS, MODE_IX));
+
+ WriteUnitOfWork wunit(txn);
+ StatusWith<RecordId> status = collection->insertDocument(txn, docToInsert, true);
+
+ if (!status.isOK()) {
+ result->setError(toWriteError(status.getStatus()));
+ } else {
+ result->getStats().n = 1;
+ wunit.commit();
+ }
+}
- if ( !status.isOK() ) {
- result->setError(toWriteError(status.getStatus()));
- }
- else {
- result->getStats().n = 1;
- wunit.commit();
+/**
+ * Perform a single index creation on a collection. Requires the index descriptor be
+ * preprocessed.
+ *
+ * Might fault or error, otherwise populates the result.
+ */
+static void singleCreateIndex(OperationContext* txn,
+ const BSONObj& indexDesc,
+ WriteOpResult* result) {
+ BSONElement nsElement = indexDesc["ns"];
+ uassert(ErrorCodes::NoSuchKey, "Missing \"ns\" field in index description", !nsElement.eoo());
+ uassert(ErrorCodes::TypeMismatch,
+ str::stream() << "Expected \"ns\" field of index description to be a "
+ "string, "
+ "but found a " << typeName(nsElement.type()),
+ nsElement.type() == String);
+ const NamespaceString ns(nsElement.valueStringData());
+ BSONObjBuilder cmdBuilder;
+ cmdBuilder << "createIndexes" << ns.coll();
+ cmdBuilder << "indexes" << BSON_ARRAY(indexDesc);
+ BSONObj cmd = cmdBuilder.done();
+ Command* createIndexesCmd = Command::findCommand("createIndexes");
+ invariant(createIndexesCmd);
+ std::string errmsg;
+ BSONObjBuilder resultBuilder;
+ const bool success =
+ createIndexesCmd->run(txn, ns.db().toString(), cmd, 0, errmsg, resultBuilder);
+ Command::appendCommandStatus(resultBuilder, success, errmsg);
+ BSONObj cmdResult = resultBuilder.done();
+ uassertStatusOK(Command::getStatusFromCommandResult(cmdResult));
+ result->getStats().n =
+ cmdResult["numIndexesAfter"].numberInt() - cmdResult["numIndexesBefore"].numberInt();
+}
+
+static void multiUpdate(OperationContext* txn,
+ const BatchItemRef& updateItem,
+ WriteOpResult* result) {
+ const NamespaceString nsString(updateItem.getRequest()->getNS());
+ const bool isMulti = updateItem.getUpdate()->getMulti();
+ UpdateRequest request(nsString);
+ request.setQuery(updateItem.getUpdate()->getQuery());
+ request.setUpdates(updateItem.getUpdate()->getUpdateExpr());
+ request.setMulti(isMulti);
+ request.setUpsert(updateItem.getUpdate()->getUpsert());
+ UpdateLifecycleImpl updateLifecycle(true, request.getNamespaceString());
+ request.setLifecycle(&updateLifecycle);
+
+ // Updates from the write commands path can yield.
+ request.setYieldPolicy(PlanExecutor::YIELD_AUTO);
+
+ int attempt = 0;
+ bool createCollection = false;
+ for (int fakeLoop = 0; fakeLoop < 1; fakeLoop++) {
+ ParsedUpdate parsedUpdate(txn, &request);
+ Status status = parsedUpdate.parseRequest();
+ if (!status.isOK()) {
+ result->setError(toWriteError(status));
+ return;
}
- }
- /**
- * Perform a single index creation on a collection. Requires the index descriptor be
- * preprocessed.
- *
- * Might fault or error, otherwise populates the result.
- */
- static void singleCreateIndex(OperationContext* txn,
- const BSONObj& indexDesc,
- WriteOpResult* result) {
+ if (createCollection) {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ const AutoGetOrCreateDb adb{txn, nsString.db(), MODE_X};
- BSONElement nsElement = indexDesc["ns"];
- uassert(ErrorCodes::NoSuchKey,
- "Missing \"ns\" field in index description",
- !nsElement.eoo());
- uassert(ErrorCodes::TypeMismatch,
- str::stream() << "Expected \"ns\" field of index description to be a " "string, "
- "but found a " << typeName(nsElement.type()),
- nsElement.type() == String);
- const NamespaceString ns(nsElement.valueStringData());
- BSONObjBuilder cmdBuilder;
- cmdBuilder << "createIndexes" << ns.coll();
- cmdBuilder << "indexes" << BSON_ARRAY(indexDesc);
- BSONObj cmd = cmdBuilder.done();
- Command* createIndexesCmd = Command::findCommand("createIndexes");
- invariant(createIndexesCmd);
- std::string errmsg;
- BSONObjBuilder resultBuilder;
- const bool success = createIndexesCmd->run(
- txn,
- ns.db().toString(),
- cmd,
- 0,
- errmsg,
- resultBuilder);
- Command::appendCommandStatus(resultBuilder, success, errmsg);
- BSONObj cmdResult = resultBuilder.done();
- uassertStatusOK(Command::getStatusFromCommandResult(cmdResult));
- result->getStats().n =
- cmdResult["numIndexesAfter"].numberInt() - cmdResult["numIndexesBefore"].numberInt();
- }
+ if (!checkIsMasterForDatabase(nsString, result)) {
+ return;
+ }
- static void multiUpdate( OperationContext* txn,
- const BatchItemRef& updateItem,
- WriteOpResult* result ) {
-
- const NamespaceString nsString(updateItem.getRequest()->getNS());
- const bool isMulti = updateItem.getUpdate()->getMulti();
- UpdateRequest request(nsString);
- request.setQuery(updateItem.getUpdate()->getQuery());
- request.setUpdates(updateItem.getUpdate()->getUpdateExpr());
- request.setMulti(isMulti);
- request.setUpsert(updateItem.getUpdate()->getUpsert());
- UpdateLifecycleImpl updateLifecycle(true, request.getNamespaceString());
- request.setLifecycle(&updateLifecycle);
-
- // Updates from the write commands path can yield.
- request.setYieldPolicy(PlanExecutor::YIELD_AUTO);
-
- int attempt = 0;
- bool createCollection = false;
- for ( int fakeLoop = 0; fakeLoop < 1; fakeLoop++ ) {
-
- ParsedUpdate parsedUpdate(txn, &request);
- Status status = parsedUpdate.parseRequest();
- if (!status.isOK()) {
- result->setError(toWriteError(status));
- return;
+ Database* const db = adb.getDb();
+ if (db->getCollection(nsString.ns())) {
+ // someone else beat us to it
+ } else {
+ WriteUnitOfWork wuow(txn);
+ uassertStatusOK(userCreateNS(txn, db, nsString.ns(), BSONObj()));
+ wuow.commit();
+ }
}
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "update", nsString.ns());
+ }
- if ( createCollection ) {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- const AutoGetOrCreateDb adb{txn, nsString.db(), MODE_X};
+ ///////////////////////////////////////////
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX);
+ Lock::CollectionLock colLock(
+ txn->lockState(), nsString.ns(), parsedUpdate.isIsolated() ? MODE_X : MODE_IX);
+ ///////////////////////////////////////////
- if (!checkIsMasterForDatabase(nsString, result)) {
- return;
- }
+ if (!checkIsMasterForDatabase(nsString, result)) {
+ return;
+ }
- Database* const db = adb.getDb();
- if ( db->getCollection( nsString.ns() ) ) {
- // someone else beat us to it
- }
- else {
- WriteUnitOfWork wuow(txn);
- uassertStatusOK(userCreateNS(txn, db, nsString.ns(), BSONObj()));
- wuow.commit();
- }
- } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "update", nsString.ns());
- }
+ if (!checkShardVersion(txn, &shardingState, *updateItem.getRequest(), result))
+ return;
- ///////////////////////////////////////////
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbLock(txn->lockState(), nsString.db(), MODE_IX);
- Lock::CollectionLock colLock(txn->lockState(),
- nsString.ns(),
- parsedUpdate.isIsolated() ? MODE_X : MODE_IX);
- ///////////////////////////////////////////
+ Database* const db = dbHolder().get(txn, nsString.db());
- if (!checkIsMasterForDatabase(nsString, result)) {
+ if (db == NULL) {
+ if (createCollection) {
+ // we raced with some, accept defeat
+ result->getStats().nModified = 0;
+ result->getStats().n = 0;
return;
}
- if (!checkShardVersion(txn, &shardingState, *updateItem.getRequest(), result))
+ // Database not yet created
+ if (!request.isUpsert()) {
+ // not an upsert, no database, nothing to do
+ result->getStats().nModified = 0;
+ result->getStats().n = 0;
return;
-
- Database* const db = dbHolder().get(txn, nsString.db());
-
- if (db == NULL) {
- if (createCollection) {
- // we raced with some, accept defeat
- result->getStats().nModified = 0;
- result->getStats().n = 0;
- return;
- }
-
- // Database not yet created
- if (!request.isUpsert()) {
- // not an upsert, no database, nothing to do
- result->getStats().nModified = 0;
- result->getStats().n = 0;
- return;
- }
-
- // upsert, don't try to get a context as no MODE_X lock is held
- fakeLoop = -1;
- createCollection = true;
- continue;
}
- CurOp::get(txn)->raiseDbProfileLevel(db->getProfilingLevel());
- Collection* collection = db->getCollection(nsString.ns());
-
- if ( collection == NULL ) {
- if ( createCollection ) {
- // we raced with some, accept defeat
- result->getStats().nModified = 0;
- result->getStats().n = 0;
- return;
- }
+ // upsert, don't try to get a context as no MODE_X lock is held
+ fakeLoop = -1;
+ createCollection = true;
+ continue;
+ }
- if ( !request.isUpsert() ) {
- // not an upsert, no collection, nothing to do
- result->getStats().nModified = 0;
- result->getStats().n = 0;
- return;
- }
+ CurOp::get(txn)->raiseDbProfileLevel(db->getProfilingLevel());
+ Collection* collection = db->getCollection(nsString.ns());
- // upsert, mark that we should create collection
- fakeLoop = -1;
- createCollection = true;
- continue;
+ if (collection == NULL) {
+ if (createCollection) {
+ // we raced with some, accept defeat
+ result->getStats().nModified = 0;
+ result->getStats().n = 0;
+ return;
}
- OpDebug* debug = &CurOp::get(txn)->debug();
+ if (!request.isUpsert()) {
+ // not an upsert, no collection, nothing to do
+ result->getStats().nModified = 0;
+ result->getStats().n = 0;
+ return;
+ }
- try {
- invariant(collection);
- PlanExecutor* rawExec;
- uassertStatusOK(getExecutorUpdate(txn, collection, &parsedUpdate, debug, &rawExec));
- std::unique_ptr<PlanExecutor> exec(rawExec);
+ // upsert, mark that we should create collection
+ fakeLoop = -1;
+ createCollection = true;
+ continue;
+ }
- uassertStatusOK(exec->executePlan());
- UpdateResult res = UpdateStage::makeUpdateResult(exec.get(), debug);
+ OpDebug* debug = &CurOp::get(txn)->debug();
- const long long numDocsModified = res.numDocsModified;
- const long long numMatched = res.numMatched;
- const BSONObj resUpsertedID = res.upserted;
+ try {
+ invariant(collection);
+ PlanExecutor* rawExec;
+ uassertStatusOK(getExecutorUpdate(txn, collection, &parsedUpdate, debug, &rawExec));
+ std::unique_ptr<PlanExecutor> exec(rawExec);
- // We have an _id from an insert
- const bool didInsert = !resUpsertedID.isEmpty();
+ uassertStatusOK(exec->executePlan());
+ UpdateResult res = UpdateStage::makeUpdateResult(exec.get(), debug);
- result->getStats().nModified = didInsert ? 0 : numDocsModified;
- result->getStats().n = didInsert ? 1 : numMatched;
- result->getStats().upsertedID = resUpsertedID;
- }
- catch ( const WriteConflictException& dle ) {
- debug->writeConflicts++;
- if ( isMulti ) {
- log() << "Had WriteConflict during multi update, aborting";
- throw;
- }
+ const long long numDocsModified = res.numDocsModified;
+ const long long numMatched = res.numMatched;
+ const BSONObj resUpsertedID = res.upserted;
- createCollection = false;
- // RESTART LOOP
- fakeLoop = -1;
- txn->recoveryUnit()->abandonSnapshot();
+ // We have an _id from an insert
+ const bool didInsert = !resUpsertedID.isEmpty();
- WriteConflictException::logAndBackoff( attempt++, "update", nsString.ns() );
- }
- catch (const StaleConfigException& staleExcep) {
- result->setError(new WriteErrorDetail);
- result->getError()->setErrCode(ErrorCodes::StaleShardVersion);
- buildStaleError(staleExcep.getVersionReceived(),
- staleExcep.getVersionWanted(),
- result->getError());
+ result->getStats().nModified = didInsert ? 0 : numDocsModified;
+ result->getStats().n = didInsert ? 1 : numMatched;
+ result->getStats().upsertedID = resUpsertedID;
+ } catch (const WriteConflictException& dle) {
+ debug->writeConflicts++;
+ if (isMulti) {
+ log() << "Had WriteConflict during multi update, aborting";
+ throw;
}
- catch (const DBException& ex) {
- Status status = ex.toStatus();
- if (ErrorCodes::isInterruption(status.code())) {
- throw;
- }
- result->setError(toWriteError(status));
+
+ createCollection = false;
+ // RESTART LOOP
+ fakeLoop = -1;
+ txn->recoveryUnit()->abandonSnapshot();
+
+ WriteConflictException::logAndBackoff(attempt++, "update", nsString.ns());
+ } catch (const StaleConfigException& staleExcep) {
+ result->setError(new WriteErrorDetail);
+ result->getError()->setErrCode(ErrorCodes::StaleShardVersion);
+ buildStaleError(
+ staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError());
+ } catch (const DBException& ex) {
+ Status status = ex.toStatus();
+ if (ErrorCodes::isInterruption(status.code())) {
+ throw;
}
+ result->setError(toWriteError(status));
}
}
+}
- /**
- * Perform a remove operation, which might remove multiple documents. Dispatches to remove code
- * currently to do most of this.
- *
- * Might fault or error, otherwise populates the result.
- */
- static void multiRemove( OperationContext* txn,
- const BatchItemRef& removeItem,
- WriteOpResult* result ) {
-
- const NamespaceString& nss = removeItem.getRequest()->getNSS();
- DeleteRequest request(nss);
- request.setQuery( removeItem.getDelete()->getQuery() );
- request.setMulti( removeItem.getDelete()->getLimit() != 1 );
- request.setGod( false );
-
- // Deletes running through the write commands path can yield.
- request.setYieldPolicy(PlanExecutor::YIELD_AUTO);
-
- int attempt = 1;
- while ( 1 ) {
- try {
-
- ParsedDelete parsedDelete(txn, &request);
- Status status = parsedDelete.parseRequest();
- if (!status.isOK()) {
- result->setError(toWriteError(status));
- return;
- }
-
- ScopedTransaction scopedXact(txn, MODE_IX);
- AutoGetDb autoDb(txn, nss.db(), MODE_IX);
- if (!autoDb.getDb()) {
- break;
- }
-
- CurOp::get(txn)->raiseDbProfileLevel(autoDb.getDb()->getProfilingLevel());
- Lock::CollectionLock collLock(txn->lockState(),
- nss.ns(),
- parsedDelete.isIsolated() ? MODE_X : MODE_IX);
-
- // getExecutorDelete() also checks if writes are allowed.
- if (!checkIsMasterForDatabase(nss, result)) {
- return;
- }
- // Check version once we're locked
-
- if (!checkShardVersion(txn, &shardingState, *removeItem.getRequest(), result)) {
- // Version error
- return;
- }
-
- PlanExecutor* rawExec;
- uassertStatusOK(getExecutorDelete(txn,
- autoDb.getDb()->getCollection(nss),
- &parsedDelete,
- &rawExec));
- std::unique_ptr<PlanExecutor> exec(rawExec);
-
- // Execute the delete and retrieve the number deleted.
- uassertStatusOK(exec->executePlan());
- result->getStats().n = DeleteStage::getNumDeleted(exec.get());
+/**
+ * Perform a remove operation, which might remove multiple documents. Dispatches to remove code
+ * currently to do most of this.
+ *
+ * Might fault or error, otherwise populates the result.
+ */
+static void multiRemove(OperationContext* txn,
+ const BatchItemRef& removeItem,
+ WriteOpResult* result) {
+ const NamespaceString& nss = removeItem.getRequest()->getNSS();
+ DeleteRequest request(nss);
+ request.setQuery(removeItem.getDelete()->getQuery());
+ request.setMulti(removeItem.getDelete()->getLimit() != 1);
+ request.setGod(false);
+
+ // Deletes running through the write commands path can yield.
+ request.setYieldPolicy(PlanExecutor::YIELD_AUTO);
+
+ int attempt = 1;
+ while (1) {
+ try {
+ ParsedDelete parsedDelete(txn, &request);
+ Status status = parsedDelete.parseRequest();
+ if (!status.isOK()) {
+ result->setError(toWriteError(status));
+ return;
+ }
+ ScopedTransaction scopedXact(txn, MODE_IX);
+ AutoGetDb autoDb(txn, nss.db(), MODE_IX);
+ if (!autoDb.getDb()) {
break;
}
- catch ( const WriteConflictException& dle ) {
- CurOp::get(txn)->debug().writeConflicts++;
- WriteConflictException::logAndBackoff( attempt++, "delete", nss.ns() );
- }
- catch (const StaleConfigException& staleExcep) {
- result->setError(new WriteErrorDetail);
- result->getError()->setErrCode(ErrorCodes::StaleShardVersion);
- buildStaleError(staleExcep.getVersionReceived(),
- staleExcep.getVersionWanted(),
- result->getError());
+
+ CurOp::get(txn)->raiseDbProfileLevel(autoDb.getDb()->getProfilingLevel());
+ Lock::CollectionLock collLock(
+ txn->lockState(), nss.ns(), parsedDelete.isIsolated() ? MODE_X : MODE_IX);
+
+ // getExecutorDelete() also checks if writes are allowed.
+ if (!checkIsMasterForDatabase(nss, result)) {
return;
}
- catch ( const DBException& ex ) {
- Status status = ex.toStatus();
- if (ErrorCodes::isInterruption(status.code())) {
- throw;
- }
- result->setError(toWriteError(status));
+ // Check version once we're locked
+
+ if (!checkShardVersion(txn, &shardingState, *removeItem.getRequest(), result)) {
+ // Version error
return;
}
+
+ PlanExecutor* rawExec;
+ uassertStatusOK(getExecutorDelete(
+ txn, autoDb.getDb()->getCollection(nss), &parsedDelete, &rawExec));
+ std::unique_ptr<PlanExecutor> exec(rawExec);
+
+ // Execute the delete and retrieve the number deleted.
+ uassertStatusOK(exec->executePlan());
+ result->getStats().n = DeleteStage::getNumDeleted(exec.get());
+
+ break;
+ } catch (const WriteConflictException& dle) {
+ CurOp::get(txn)->debug().writeConflicts++;
+ WriteConflictException::logAndBackoff(attempt++, "delete", nss.ns());
+ } catch (const StaleConfigException& staleExcep) {
+ result->setError(new WriteErrorDetail);
+ result->getError()->setErrCode(ErrorCodes::StaleShardVersion);
+ buildStaleError(
+ staleExcep.getVersionReceived(), staleExcep.getVersionWanted(), result->getError());
+ return;
+ } catch (const DBException& ex) {
+ Status status = ex.toStatus();
+ if (ErrorCodes::isInterruption(status.code())) {
+ throw;
+ }
+ result->setError(toWriteError(status));
+ return;
}
}
+}
-} // namespace mongo
+} // namespace mongo
diff --git a/src/mongo/db/commands/write_commands/batch_executor.h b/src/mongo/db/commands/write_commands/batch_executor.h
index 0bab41d3ff8..0dd1d71848a 100644
--- a/src/mongo/db/commands/write_commands/batch_executor.h
+++ b/src/mongo/db/commands/write_commands/batch_executor.h
@@ -40,158 +40,148 @@
namespace mongo {
- class BSONObjBuilder;
- class CurOp;
- class LastError;
- class OpCounters;
- class OperationContext;
- class WriteBatchStats;
- struct WriteOpStats;
+class BSONObjBuilder;
+class CurOp;
+class LastError;
+class OpCounters;
+class OperationContext;
+class WriteBatchStats;
+struct WriteOpStats;
+
+/**
+ * An instance of WriteBatchExecutor is an object capable of issuing a write batch.
+ */
+class WriteBatchExecutor {
+ MONGO_DISALLOW_COPYING(WriteBatchExecutor);
+
+public:
+ // State object used by private execInserts. TODO: Do not expose this type.
+ class ExecInsertsState;
+
+ WriteBatchExecutor(OperationContext* txn, OpCounters* opCounters, LastError* le);
/**
- * An instance of WriteBatchExecutor is an object capable of issuing a write batch.
+ * Issues writes with requested write concern. Fills response with errors if problems
+ * occur.
*/
- class WriteBatchExecutor {
- MONGO_DISALLOW_COPYING(WriteBatchExecutor);
- public:
-
- // State object used by private execInserts. TODO: Do not expose this type.
- class ExecInsertsState;
-
- WriteBatchExecutor( OperationContext* txn,
- OpCounters* opCounters,
- LastError* le );
-
- /**
- * Issues writes with requested write concern. Fills response with errors if problems
- * occur.
- */
- void executeBatch( const BatchedCommandRequest& request, BatchedCommandResponse* response );
-
- const WriteBatchStats& getStats() const;
-
- /**
- * Does basic validation of the batch request. Returns a non-OK status if
- * any problems with the batch are found.
- */
- static Status validateBatch( const BatchedCommandRequest& request );
-
- private:
- /**
- * Executes the writes in the batch and returns upserted _ids and write errors.
- * Dispatches to one of the three functions below for DBLock, CurOp, and stats management.
- */
- void bulkExecute( const BatchedCommandRequest& request,
- std::vector<BatchedUpsertDetail*>* upsertedIds,
- std::vector<WriteErrorDetail*>* errors );
-
- /**
- * Executes the inserts of an insert batch and returns the write errors.
- *
- * Internally uses the DBLock of the request namespace.
- * May execute multiple inserts inside the same DBLock, and/or take the DBLock multiple
- * times.
- */
- void execInserts( const BatchedCommandRequest& request,
- std::vector<WriteErrorDetail*>* errors );
-
- /**
- * Executes a single insert from a batch, described in the opaque "state" object.
- */
- void execOneInsert( ExecInsertsState* state, WriteErrorDetail** error );
-
- /**
- * Executes an update item (which may update many documents or upsert), and returns the
- * upserted _id on upsert or error on failure.
- *
- * Internally uses the DBLock of the update namespace.
- * May take the DBLock multiple times.
- */
- void execUpdate( const BatchItemRef& updateItem,
- BSONObj* upsertedId,
- WriteErrorDetail** error );
-
- /**
- * Executes a delete item (which may remove many documents) and returns an error on failure.
- *
- * Internally uses the DBLock of the delete namespace.
- * May take the DBLock multiple times.
- */
- void execRemove( const BatchItemRef& removeItem, WriteErrorDetail** error );
-
- /**
- * Helper for incrementing stats on the next CurOp.
- *
- * No lock requirements.
- */
- void incOpStats( const BatchItemRef& currWrite );
-
- /**
- * Helper for incrementing stats after each individual write op.
- *
- * No lock requirements (though usually done inside write lock to make stats update look
- * atomic).
- */
- void incWriteStats( const BatchItemRef& currWrite,
- const WriteOpStats& stats,
- const WriteErrorDetail* error,
- CurOp* currentOp );
-
- OperationContext* _txn;
-
- // OpCounters object to update - needed for stats reporting
- // Not owned here.
- OpCounters* _opCounters;
-
- // LastError object to use for preparing write results - needed for stats reporting
- // Not owned here.
- LastError* _le;
-
- // Stats
- std::unique_ptr<WriteBatchStats> _stats;
- };
+ void executeBatch(const BatchedCommandRequest& request, BatchedCommandResponse* response);
+
+ const WriteBatchStats& getStats() const;
/**
- * Holds information about the result of a single write operation.
+ * Does basic validation of the batch request. Returns a non-OK status if
+ * any problems with the batch are found.
*/
- struct WriteOpStats {
+ static Status validateBatch(const BatchedCommandRequest& request);
- WriteOpStats() :
- n( 0 ), nModified( 0 ) {
- }
+private:
+ /**
+ * Executes the writes in the batch and returns upserted _ids and write errors.
+ * Dispatches to one of the three functions below for DBLock, CurOp, and stats management.
+ */
+ void bulkExecute(const BatchedCommandRequest& request,
+ std::vector<BatchedUpsertDetail*>* upsertedIds,
+ std::vector<WriteErrorDetail*>* errors);
- void reset() {
- n = 0;
- nModified = 0;
- upsertedID = BSONObj();
- }
+ /**
+ * Executes the inserts of an insert batch and returns the write errors.
+ *
+ * Internally uses the DBLock of the request namespace.
+ * May execute multiple inserts inside the same DBLock, and/or take the DBLock multiple
+ * times.
+ */
+ void execInserts(const BatchedCommandRequest& request, std::vector<WriteErrorDetail*>* errors);
- // Num docs logically affected by this operation.
- int n;
+ /**
+ * Executes a single insert from a batch, described in the opaque "state" object.
+ */
+ void execOneInsert(ExecInsertsState* state, WriteErrorDetail** error);
+
+ /**
+ * Executes an update item (which may update many documents or upsert), and returns the
+ * upserted _id on upsert or error on failure.
+ *
+ * Internally uses the DBLock of the update namespace.
+ * May take the DBLock multiple times.
+ */
+ void execUpdate(const BatchItemRef& updateItem, BSONObj* upsertedId, WriteErrorDetail** error);
- // Num docs actually modified by this operation, if applicable (update)
- int nModified;
+ /**
+ * Executes a delete item (which may remove many documents) and returns an error on failure.
+ *
+ * Internally uses the DBLock of the delete namespace.
+ * May take the DBLock multiple times.
+ */
+ void execRemove(const BatchItemRef& removeItem, WriteErrorDetail** error);
- // _id of newly upserted document, if applicable (update)
- BSONObj upsertedID;
- };
+ /**
+ * Helper for incrementing stats on the next CurOp.
+ *
+ * No lock requirements.
+ */
+ void incOpStats(const BatchItemRef& currWrite);
/**
- * Full stats accumulated by a write batch execution. Note that these stats do not directly
- * correspond to the stats accumulated in opCounters and LastError.
+ * Helper for incrementing stats after each individual write op.
+ *
+ * No lock requirements (though usually done inside write lock to make stats update look
+ * atomic).
*/
- class WriteBatchStats {
- public:
+ void incWriteStats(const BatchItemRef& currWrite,
+ const WriteOpStats& stats,
+ const WriteErrorDetail* error,
+ CurOp* currentOp);
- WriteBatchStats() :
- numInserted( 0 ), numUpserted( 0 ), numMatched( 0 ), numModified( 0 ), numDeleted( 0 ) {
- }
+ OperationContext* _txn;
- int numInserted;
- int numUpserted;
- int numMatched;
- int numModified;
- int numDeleted;
- };
+ // OpCounters object to update - needed for stats reporting
+ // Not owned here.
+ OpCounters* _opCounters;
-} // namespace mongo
+ // LastError object to use for preparing write results - needed for stats reporting
+ // Not owned here.
+ LastError* _le;
+
+ // Stats
+ std::unique_ptr<WriteBatchStats> _stats;
+};
+
+/**
+ * Holds information about the result of a single write operation.
+ */
+struct WriteOpStats {
+ WriteOpStats() : n(0), nModified(0) {}
+
+ void reset() {
+ n = 0;
+ nModified = 0;
+ upsertedID = BSONObj();
+ }
+
+ // Num docs logically affected by this operation.
+ int n;
+
+ // Num docs actually modified by this operation, if applicable (update)
+ int nModified;
+
+ // _id of newly upserted document, if applicable (update)
+ BSONObj upsertedID;
+};
+
+/**
+ * Full stats accumulated by a write batch execution. Note that these stats do not directly
+ * correspond to the stats accumulated in opCounters and LastError.
+ */
+class WriteBatchStats {
+public:
+ WriteBatchStats()
+ : numInserted(0), numUpserted(0), numMatched(0), numModified(0), numDeleted(0) {}
+
+ int numInserted;
+ int numUpserted;
+ int numMatched;
+ int numModified;
+ int numDeleted;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp
index 4bf374778fb..fe6bb9a2ff9 100644
--- a/src/mongo/db/commands/write_commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands/write_commands.cpp
@@ -53,270 +53,265 @@
namespace mongo {
- using std::string;
- using std::stringstream;
+using std::string;
+using std::stringstream;
- namespace {
+namespace {
- MONGO_INITIALIZER(RegisterWriteCommands)(InitializerContext* context) {
- // Leaked intentionally: a Command registers itself when constructed.
- new CmdInsert();
- new CmdUpdate();
- new CmdDelete();
- return Status::OK();
- }
-
- } // namespace
+MONGO_INITIALIZER(RegisterWriteCommands)(InitializerContext* context) {
+ // Leaked intentionally: a Command registers itself when constructed.
+ new CmdInsert();
+ new CmdUpdate();
+ new CmdDelete();
+ return Status::OK();
+}
- WriteCmd::WriteCmd( StringData name, BatchedCommandRequest::BatchType writeType ) :
- Command( name ), _writeType( writeType ) {
- }
+} // namespace
- void WriteCmd::redactTooLongLog( mutablebson::Document* cmdObj, StringData fieldName ) {
- namespace mmb = mutablebson;
- mmb::Element root = cmdObj->root();
- mmb::Element field = root.findFirstChildNamed( fieldName );
+WriteCmd::WriteCmd(StringData name, BatchedCommandRequest::BatchType writeType)
+ : Command(name), _writeType(writeType) {}
- // If the cmdObj is too large, it will be a "too big" message given by CachedBSONObj.get()
- if ( !field.ok() ) {
- return;
- }
+void WriteCmd::redactTooLongLog(mutablebson::Document* cmdObj, StringData fieldName) {
+ namespace mmb = mutablebson;
+ mmb::Element root = cmdObj->root();
+ mmb::Element field = root.findFirstChildNamed(fieldName);
- // Redact the log if there are more than one documents or operations.
- if ( field.countChildren() > 1 ) {
- field.setValueInt( field.countChildren() );
- }
+ // If the cmdObj is too large, it will be a "too big" message given by CachedBSONObj.get()
+ if (!field.ok()) {
+ return;
}
- // Slaves can't perform writes.
- bool WriteCmd::slaveOk() const { return false; }
+ // Redact the log if there are more than one documents or operations.
+ if (field.countChildren() > 1) {
+ field.setValueInt(field.countChildren());
+ }
+}
+
+// Slaves can't perform writes.
+bool WriteCmd::slaveOk() const {
+ return false;
+}
+
+bool WriteCmd::isWriteCommandForConfigServer() const {
+ return false;
+}
+
+Status WriteCmd::checkAuthForCommand(ClientBasic* client,
+ const std::string& dbname,
+ const BSONObj& cmdObj) {
+ Status status(auth::checkAuthForWriteCommand(AuthorizationSession::get(client),
+ _writeType,
+ NamespaceString(parseNs(dbname, cmdObj)),
+ cmdObj));
+
+ // TODO: Remove this when we standardize GLE reporting from commands
+ if (!status.isOK()) {
+ LastError::get(client).setLastError(status.code(), status.reason());
+ }
- bool WriteCmd::isWriteCommandForConfigServer() const { return false; }
+ return status;
+}
+
+// Write commands are counted towards their corresponding opcounters, not command opcounters.
+bool WriteCmd::shouldAffectCommandCounter() const {
+ return false;
+}
+
+bool WriteCmd::run(OperationContext* txn,
+ const string& dbName,
+ BSONObj& cmdObj,
+ int options,
+ string& errMsg,
+ BSONObjBuilder& result) {
+ // Can't be run on secondaries.
+ dassert(txn->writesAreReplicated());
+ BatchedCommandRequest request(_writeType);
+ BatchedCommandResponse response;
+
+ if (!request.parseBSON(cmdObj, &errMsg) || !request.isValid(&errMsg)) {
+ return appendCommandStatus(result, Status(ErrorCodes::FailedToParse, errMsg));
+ }
- Status WriteCmd::checkAuthForCommand( ClientBasic* client,
- const std::string& dbname,
- const BSONObj& cmdObj ) {
+ // Note that this is a runCommmand, and therefore, the database and the collection name
+ // are in different parts of the grammar for the command. But it's more convenient to
+ // work with a NamespaceString. We built it here and replace it in the parsed command.
+ // Internally, everything work with the namespace string as opposed to just the
+ // collection name.
+ NamespaceString nss(dbName, request.getNS());
+ request.setNSS(nss);
- Status status( auth::checkAuthForWriteCommand( AuthorizationSession::get(client),
- _writeType,
- NamespaceString( parseNs( dbname, cmdObj ) ),
- cmdObj ));
+ StatusWith<WriteConcernOptions> wcStatus = extractWriteConcern(cmdObj);
- // TODO: Remove this when we standardize GLE reporting from commands
- if ( !status.isOK() ) {
- LastError::get(client).setLastError(status.code(), status.reason());
- }
+ if (!wcStatus.isOK()) {
+ return appendCommandStatus(result, wcStatus.getStatus());
+ }
+ txn->setWriteConcern(wcStatus.getValue());
+
+ WriteBatchExecutor writeBatchExecutor(
+ txn, &globalOpCounters, &LastError::get(txn->getClient()));
+
+ writeBatchExecutor.executeBatch(request, &response);
+
+ result.appendElements(response.toBSON());
+ return response.getOk();
+}
+
+Status WriteCmd::explain(OperationContext* txn,
+ const std::string& dbname,
+ const BSONObj& cmdObj,
+ ExplainCommon::Verbosity verbosity,
+ BSONObjBuilder* out) const {
+ // For now we only explain update and delete write commands.
+ if (BatchedCommandRequest::BatchType_Update != _writeType &&
+ BatchedCommandRequest::BatchType_Delete != _writeType) {
+ return Status(ErrorCodes::IllegalOperation,
+ "Only update and delete write ops can be explained");
+ }
- return status;
+ // Parse the batch request.
+ BatchedCommandRequest request(_writeType);
+ std::string errMsg;
+ if (!request.parseBSON(cmdObj, &errMsg) || !request.isValid(&errMsg)) {
+ return Status(ErrorCodes::FailedToParse, errMsg);
}
- // Write commands are counted towards their corresponding opcounters, not command opcounters.
- bool WriteCmd::shouldAffectCommandCounter() const { return false; }
-
- bool WriteCmd::run(OperationContext* txn,
- const string& dbName,
- BSONObj& cmdObj,
- int options,
- string& errMsg,
- BSONObjBuilder& result) {
- // Can't be run on secondaries.
- dassert(txn->writesAreReplicated());
- BatchedCommandRequest request( _writeType );
- BatchedCommandResponse response;
-
- if ( !request.parseBSON( cmdObj, &errMsg ) || !request.isValid( &errMsg ) ) {
- return appendCommandStatus( result, Status( ErrorCodes::FailedToParse, errMsg ) );
- }
+ // Note that this is a runCommmand, and therefore, the database and the collection name
+ // are in different parts of the grammar for the command. But it's more convenient to
+ // work with a NamespaceString. We built it here and replace it in the parsed command.
+ // Internally, everything work with the namespace string as opposed to just the
+ // collection name.
+ NamespaceString nsString(dbname, request.getNS());
+ request.setNSS(nsString);
+
+ // Do the validation of the batch that is shared with non-explained write batches.
+ Status isValid = WriteBatchExecutor::validateBatch(request);
+ if (!isValid.isOK()) {
+ return isValid;
+ }
- // Note that this is a runCommmand, and therefore, the database and the collection name
- // are in different parts of the grammar for the command. But it's more convenient to
- // work with a NamespaceString. We built it here and replace it in the parsed command.
- // Internally, everything work with the namespace string as opposed to just the
- // collection name.
- NamespaceString nss(dbName, request.getNS());
- request.setNSS(nss);
+ // Explain must do one additional piece of validation: For now we only explain
+ // singleton batches.
+ if (request.sizeWriteOps() != 1u) {
+ return Status(ErrorCodes::InvalidLength, "explained write batches must be of size 1");
+ }
- StatusWith<WriteConcernOptions> wcStatus = extractWriteConcern(cmdObj);
+ ScopedTransaction scopedXact(txn, MODE_IX);
- if (!wcStatus.isOK()) {
- return appendCommandStatus(result, wcStatus.getStatus());
- }
- txn->setWriteConcern(wcStatus.getValue());
+ // Get a reference to the singleton batch item (it's the 0th item in the batch).
+ BatchItemRef batchItem(&request, 0);
- WriteBatchExecutor writeBatchExecutor(txn,
- &globalOpCounters,
- &LastError::get(txn->getClient()));
+ if (BatchedCommandRequest::BatchType_Update == _writeType) {
+ // Create the update request.
+ UpdateRequest updateRequest(nsString);
+ updateRequest.setQuery(batchItem.getUpdate()->getQuery());
+ updateRequest.setUpdates(batchItem.getUpdate()->getUpdateExpr());
+ updateRequest.setMulti(batchItem.getUpdate()->getMulti());
+ updateRequest.setUpsert(batchItem.getUpdate()->getUpsert());
+ UpdateLifecycleImpl updateLifecycle(true, updateRequest.getNamespaceString());
+ updateRequest.setLifecycle(&updateLifecycle);
+ updateRequest.setExplain();
- writeBatchExecutor.executeBatch( request, &response );
+ // Explained updates can yield.
+ updateRequest.setYieldPolicy(PlanExecutor::YIELD_AUTO);
- result.appendElements( response.toBSON() );
- return response.getOk();
- }
+ OpDebug* debug = &CurOp::get(txn)->debug();
- Status WriteCmd::explain(OperationContext* txn,
- const std::string& dbname,
- const BSONObj& cmdObj,
- ExplainCommon::Verbosity verbosity,
- BSONObjBuilder* out) const {
- // For now we only explain update and delete write commands.
- if ( BatchedCommandRequest::BatchType_Update != _writeType &&
- BatchedCommandRequest::BatchType_Delete != _writeType ) {
- return Status( ErrorCodes::IllegalOperation,
- "Only update and delete write ops can be explained" );
+ ParsedUpdate parsedUpdate(txn, &updateRequest);
+ Status parseStatus = parsedUpdate.parseRequest();
+ if (!parseStatus.isOK()) {
+ return parseStatus;
}
- // Parse the batch request.
- BatchedCommandRequest request( _writeType );
- std::string errMsg;
- if ( !request.parseBSON( cmdObj, &errMsg ) || !request.isValid( &errMsg ) ) {
- return Status( ErrorCodes::FailedToParse, errMsg );
- }
+ // Explains of write commands are read-only, but we take write locks so
+ // that timing info is more accurate.
+ AutoGetDb autoDb(txn, nsString.db(), MODE_IX);
+ Lock::CollectionLock colLock(txn->lockState(), nsString.ns(), MODE_IX);
- // Note that this is a runCommmand, and therefore, the database and the collection name
- // are in different parts of the grammar for the command. But it's more convenient to
- // work with a NamespaceString. We built it here and replace it in the parsed command.
- // Internally, everything work with the namespace string as opposed to just the
- // collection name.
- NamespaceString nsString(dbname, request.getNS());
- request.setNSS(nsString);
-
- // Do the validation of the batch that is shared with non-explained write batches.
- Status isValid = WriteBatchExecutor::validateBatch( request );
- if (!isValid.isOK()) {
- return isValid;
- }
+ ensureShardVersionOKOrThrow(txn->getClient(), nsString.ns());
- // Explain must do one additional piece of validation: For now we only explain
- // singleton batches.
- if ( request.sizeWriteOps() != 1u ) {
- return Status( ErrorCodes::InvalidLength,
- "explained write batches must be of size 1" );
+ // Get a pointer to the (possibly NULL) collection.
+ Collection* collection = NULL;
+ if (autoDb.getDb()) {
+ collection = autoDb.getDb()->getCollection(nsString.ns());
}
- ScopedTransaction scopedXact(txn, MODE_IX);
-
- // Get a reference to the singleton batch item (it's the 0th item in the batch).
- BatchItemRef batchItem( &request, 0 );
-
- if ( BatchedCommandRequest::BatchType_Update == _writeType ) {
- // Create the update request.
- UpdateRequest updateRequest( nsString );
- updateRequest.setQuery( batchItem.getUpdate()->getQuery() );
- updateRequest.setUpdates( batchItem.getUpdate()->getUpdateExpr() );
- updateRequest.setMulti( batchItem.getUpdate()->getMulti() );
- updateRequest.setUpsert( batchItem.getUpdate()->getUpsert() );
- UpdateLifecycleImpl updateLifecycle( true, updateRequest.getNamespaceString() );
- updateRequest.setLifecycle( &updateLifecycle );
- updateRequest.setExplain();
-
- // Explained updates can yield.
- updateRequest.setYieldPolicy(PlanExecutor::YIELD_AUTO);
-
- OpDebug* debug = &CurOp::get(txn)->debug();
-
- ParsedUpdate parsedUpdate( txn, &updateRequest );
- Status parseStatus = parsedUpdate.parseRequest();
- if ( !parseStatus.isOK() ) {
- return parseStatus;
- }
-
- // Explains of write commands are read-only, but we take write locks so
- // that timing info is more accurate.
- AutoGetDb autoDb( txn, nsString.db(), MODE_IX );
- Lock::CollectionLock colLock( txn->lockState(), nsString.ns(), MODE_IX );
-
- ensureShardVersionOKOrThrow( txn->getClient(), nsString.ns() );
-
- // Get a pointer to the (possibly NULL) collection.
- Collection* collection = NULL;
- if ( autoDb.getDb() ) {
- collection = autoDb.getDb()->getCollection( nsString.ns() );
- }
-
- PlanExecutor* rawExec;
- uassertStatusOK(getExecutorUpdate(txn, collection, &parsedUpdate, debug, &rawExec));
- std::unique_ptr<PlanExecutor> exec(rawExec);
-
- // Explain the plan tree.
- Explain::explainStages( exec.get(), verbosity, out );
- return Status::OK();
+ PlanExecutor* rawExec;
+ uassertStatusOK(getExecutorUpdate(txn, collection, &parsedUpdate, debug, &rawExec));
+ std::unique_ptr<PlanExecutor> exec(rawExec);
+
+ // Explain the plan tree.
+ Explain::explainStages(exec.get(), verbosity, out);
+ return Status::OK();
+ } else {
+ invariant(BatchedCommandRequest::BatchType_Delete == _writeType);
+
+ // Create the delete request.
+ DeleteRequest deleteRequest(nsString);
+ deleteRequest.setQuery(batchItem.getDelete()->getQuery());
+ deleteRequest.setMulti(batchItem.getDelete()->getLimit() != 1);
+ deleteRequest.setGod(false);
+ deleteRequest.setExplain();
+
+ // Explained deletes can yield.
+ deleteRequest.setYieldPolicy(PlanExecutor::YIELD_AUTO);
+
+ ParsedDelete parsedDelete(txn, &deleteRequest);
+ Status parseStatus = parsedDelete.parseRequest();
+ if (!parseStatus.isOK()) {
+ return parseStatus;
}
- else {
- invariant( BatchedCommandRequest::BatchType_Delete == _writeType );
-
- // Create the delete request.
- DeleteRequest deleteRequest( nsString );
- deleteRequest.setQuery( batchItem.getDelete()->getQuery() );
- deleteRequest.setMulti( batchItem.getDelete()->getLimit() != 1 );
- deleteRequest.setGod( false );
- deleteRequest.setExplain();
-
- // Explained deletes can yield.
- deleteRequest.setYieldPolicy(PlanExecutor::YIELD_AUTO);
-
- ParsedDelete parsedDelete(txn, &deleteRequest);
- Status parseStatus = parsedDelete.parseRequest();
- if (!parseStatus.isOK()) {
- return parseStatus;
- }
-
- // Explains of write commands are read-only, but we take write locks so that timing
- // info is more accurate.
- AutoGetDb autoDb(txn, nsString.db(), MODE_IX);
- Lock::CollectionLock colLock(txn->lockState(), nsString.ns(), MODE_IX);
-
- ensureShardVersionOKOrThrow( txn->getClient(), nsString.ns() );
-
- // Get a pointer to the (possibly NULL) collection.
- Collection* collection = NULL;
- if (autoDb.getDb()) {
- collection = autoDb.getDb()->getCollection(nsString.ns());
- }
-
- PlanExecutor* rawExec;
- uassertStatusOK(getExecutorDelete(txn, collection, &parsedDelete, &rawExec));
- std::unique_ptr<PlanExecutor> exec(rawExec);
-
- // Explain the plan tree.
- Explain::explainStages(exec.get(), verbosity, out);
- return Status::OK();
+
+ // Explains of write commands are read-only, but we take write locks so that timing
+ // info is more accurate.
+ AutoGetDb autoDb(txn, nsString.db(), MODE_IX);
+ Lock::CollectionLock colLock(txn->lockState(), nsString.ns(), MODE_IX);
+
+ ensureShardVersionOKOrThrow(txn->getClient(), nsString.ns());
+
+ // Get a pointer to the (possibly NULL) collection.
+ Collection* collection = NULL;
+ if (autoDb.getDb()) {
+ collection = autoDb.getDb()->getCollection(nsString.ns());
}
- }
- CmdInsert::CmdInsert() :
- WriteCmd( "insert", BatchedCommandRequest::BatchType_Insert ) {
- }
+ PlanExecutor* rawExec;
+ uassertStatusOK(getExecutorDelete(txn, collection, &parsedDelete, &rawExec));
+ std::unique_ptr<PlanExecutor> exec(rawExec);
- void CmdInsert::redactForLogging( mutablebson::Document* cmdObj ) {
- redactTooLongLog( cmdObj, StringData( "documents", StringData::LiteralTag() ) );
+ // Explain the plan tree.
+ Explain::explainStages(exec.get(), verbosity, out);
+ return Status::OK();
}
+}
- void CmdInsert::help( stringstream& help ) const {
- help << "insert documents";
- }
+CmdInsert::CmdInsert() : WriteCmd("insert", BatchedCommandRequest::BatchType_Insert) {}
- CmdUpdate::CmdUpdate() :
- WriteCmd( "update", BatchedCommandRequest::BatchType_Update ) {
- }
+void CmdInsert::redactForLogging(mutablebson::Document* cmdObj) {
+ redactTooLongLog(cmdObj, StringData("documents", StringData::LiteralTag()));
+}
- void CmdUpdate::redactForLogging( mutablebson::Document* cmdObj ) {
- redactTooLongLog( cmdObj, StringData( "updates", StringData::LiteralTag() ) );
- }
+void CmdInsert::help(stringstream& help) const {
+ help << "insert documents";
+}
- void CmdUpdate::help( stringstream& help ) const {
- help << "update documents";
- }
+CmdUpdate::CmdUpdate() : WriteCmd("update", BatchedCommandRequest::BatchType_Update) {}
- CmdDelete::CmdDelete() :
- WriteCmd( "delete", BatchedCommandRequest::BatchType_Delete ) {
- }
+void CmdUpdate::redactForLogging(mutablebson::Document* cmdObj) {
+ redactTooLongLog(cmdObj, StringData("updates", StringData::LiteralTag()));
+}
- void CmdDelete::redactForLogging( mutablebson::Document* cmdObj ) {
- redactTooLongLog( cmdObj, StringData( "deletes", StringData::LiteralTag() ) );
- }
+void CmdUpdate::help(stringstream& help) const {
+ help << "update documents";
+}
- void CmdDelete::help( stringstream& help ) const {
- help << "delete documents";
- }
+CmdDelete::CmdDelete() : WriteCmd("delete", BatchedCommandRequest::BatchType_Delete) {}
+
+void CmdDelete::redactForLogging(mutablebson::Document* cmdObj) {
+ redactTooLongLog(cmdObj, StringData("deletes", StringData::LiteralTag()));
+}
+
+void CmdDelete::help(stringstream& help) const {
+ help << "delete documents";
+}
-} // namespace mongo
+} // namespace mongo
diff --git a/src/mongo/db/commands/write_commands/write_commands.h b/src/mongo/db/commands/write_commands/write_commands.h
index cbb2db6cac6..fcdda1b56fd 100644
--- a/src/mongo/db/commands/write_commands/write_commands.h
+++ b/src/mongo/db/commands/write_commands/write_commands.h
@@ -36,89 +36,91 @@
namespace mongo {
+/**
+ * Base class for write commands. Write commands support batch writes and write concern,
+ * and return per-item error information. All write commands use the (non-virtual) entry
+ * point WriteCmd::run().
+ *
+ * Command parsing is performed by the WriteBatch class (command syntax documented there),
+ * and command execution is performed by the WriteBatchExecutor class.
+ */
+class WriteCmd : public Command {
+ MONGO_DISALLOW_COPYING(WriteCmd);
+
+public:
+ virtual ~WriteCmd() {}
+
+protected:
/**
- * Base class for write commands. Write commands support batch writes and write concern,
- * and return per-item error information. All write commands use the (non-virtual) entry
- * point WriteCmd::run().
- *
- * Command parsing is performed by the WriteBatch class (command syntax documented there),
- * and command execution is performed by the WriteBatchExecutor class.
+ * Instantiates a command that can be invoked by "name", which will be capable of issuing
+ * write batches of type "writeType", and will require privilege "action" to run.
*/
- class WriteCmd : public Command {
- MONGO_DISALLOW_COPYING(WriteCmd);
- public:
- virtual ~WriteCmd() {}
-
- protected:
-
- /**
- * Instantiates a command that can be invoked by "name", which will be capable of issuing
- * write batches of type "writeType", and will require privilege "action" to run.
- */
- WriteCmd( StringData name, BatchedCommandRequest::BatchType writeType );
-
- // Full log of write command can be quite large.
- static void redactTooLongLog( mutablebson::Document* cmdObj, StringData fieldName );
-
- private:
- virtual bool slaveOk() const;
-
- virtual bool isWriteCommandForConfigServer() const;
-
- virtual Status checkAuthForCommand( ClientBasic* client,
- const std::string& dbname,
- const BSONObj& cmdObj );
-
- virtual bool shouldAffectCommandCounter() const;
-
- // Write command entry point.
- virtual bool run(
- OperationContext* txn,
- const std::string& dbname,
- BSONObj& cmdObj,
- int options,
- std::string& errmsg,
- BSONObjBuilder& result);
-
- // Write commands can be explained.
- virtual Status explain(OperationContext* txn,
- const std::string& dbname,
- const BSONObj& cmdObj,
- ExplainCommon::Verbosity verbosity,
- BSONObjBuilder* out) const;
-
- // Type of batch (e.g. insert).
- BatchedCommandRequest::BatchType _writeType;
- };
-
- class CmdInsert : public WriteCmd {
- MONGO_DISALLOW_COPYING(CmdInsert);
- public:
- CmdInsert();
- void redactForLogging(mutablebson::Document* cmdObj);
-
- private:
- virtual void help(std::stringstream& help) const;
- };
-
- class CmdUpdate : public WriteCmd {
- MONGO_DISALLOW_COPYING(CmdUpdate);
- public:
- CmdUpdate();
- void redactForLogging(mutablebson::Document* cmdObj);
-
- private:
- virtual void help(std::stringstream& help) const;
- };
-
- class CmdDelete : public WriteCmd {
- MONGO_DISALLOW_COPYING(CmdDelete);
- public:
- CmdDelete();
- void redactForLogging(mutablebson::Document* cmdObj);
-
- private:
- virtual void help(std::stringstream& help) const;
- };
-
-} // namespace mongo
+ WriteCmd(StringData name, BatchedCommandRequest::BatchType writeType);
+
+ // Full log of write command can be quite large.
+ static void redactTooLongLog(mutablebson::Document* cmdObj, StringData fieldName);
+
+private:
+ virtual bool slaveOk() const;
+
+ virtual bool isWriteCommandForConfigServer() const;
+
+ virtual Status checkAuthForCommand(ClientBasic* client,
+ const std::string& dbname,
+ const BSONObj& cmdObj);
+
+ virtual bool shouldAffectCommandCounter() const;
+
+ // Write command entry point.
+ virtual bool run(OperationContext* txn,
+ const std::string& dbname,
+ BSONObj& cmdObj,
+ int options,
+ std::string& errmsg,
+ BSONObjBuilder& result);
+
+ // Write commands can be explained.
+ virtual Status explain(OperationContext* txn,
+ const std::string& dbname,
+ const BSONObj& cmdObj,
+ ExplainCommon::Verbosity verbosity,
+ BSONObjBuilder* out) const;
+
+ // Type of batch (e.g. insert).
+ BatchedCommandRequest::BatchType _writeType;
+};
+
+class CmdInsert : public WriteCmd {
+ MONGO_DISALLOW_COPYING(CmdInsert);
+
+public:
+ CmdInsert();
+ void redactForLogging(mutablebson::Document* cmdObj);
+
+private:
+ virtual void help(std::stringstream& help) const;
+};
+
+class CmdUpdate : public WriteCmd {
+ MONGO_DISALLOW_COPYING(CmdUpdate);
+
+public:
+ CmdUpdate();
+ void redactForLogging(mutablebson::Document* cmdObj);
+
+private:
+ virtual void help(std::stringstream& help) const;
+};
+
+class CmdDelete : public WriteCmd {
+ MONGO_DISALLOW_COPYING(CmdDelete);
+
+public:
+ CmdDelete();
+ void redactForLogging(mutablebson::Document* cmdObj);
+
+private:
+ virtual void help(std::stringstream& help) const;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/commands/write_commands/write_commands_common.cpp b/src/mongo/db/commands/write_commands/write_commands_common.cpp
index 69ca1014140..82f3ab4db67 100644
--- a/src/mongo/db/commands/write_commands/write_commands_common.cpp
+++ b/src/mongo/db/commands/write_commands/write_commands_common.cpp
@@ -42,62 +42,55 @@
namespace mongo {
namespace auth {
- using std::string;
- using std::vector;
-
- Status checkAuthForWriteCommand( AuthorizationSession* authzSession,
- BatchedCommandRequest::BatchType cmdType,
- const NamespaceString& cmdNSS,
- const BSONObj& cmdObj ) {
-
- vector<Privilege> privileges;
- ActionSet actionsOnCommandNSS;
-
- if (shouldBypassDocumentValidationForCommand(cmdObj)) {
- actionsOnCommandNSS.addAction(ActionType::bypassDocumentValidation);
- }
-
- if ( cmdType == BatchedCommandRequest::BatchType_Insert ) {
+using std::string;
+using std::vector;
+
+Status checkAuthForWriteCommand(AuthorizationSession* authzSession,
+ BatchedCommandRequest::BatchType cmdType,
+ const NamespaceString& cmdNSS,
+ const BSONObj& cmdObj) {
+ vector<Privilege> privileges;
+ ActionSet actionsOnCommandNSS;
+
+ if (shouldBypassDocumentValidationForCommand(cmdObj)) {
+ actionsOnCommandNSS.addAction(ActionType::bypassDocumentValidation);
+ }
- if ( !cmdNSS.isSystemDotIndexes() ) {
- actionsOnCommandNSS.addAction(ActionType::insert);
+ if (cmdType == BatchedCommandRequest::BatchType_Insert) {
+ if (!cmdNSS.isSystemDotIndexes()) {
+ actionsOnCommandNSS.addAction(ActionType::insert);
+ } else {
+ // Special-case indexes until we have a command
+ string nsToIndex, errMsg;
+ if (!BatchedCommandRequest::getIndexedNS(cmdObj, &nsToIndex, &errMsg)) {
+ return Status(ErrorCodes::FailedToParse, errMsg);
}
- else {
- // Special-case indexes until we have a command
- string nsToIndex, errMsg;
- if ( !BatchedCommandRequest::getIndexedNS( cmdObj, &nsToIndex, &errMsg ) ) {
- return Status( ErrorCodes::FailedToParse, errMsg );
- }
- NamespaceString nssToIndex( nsToIndex );
- privileges.push_back( Privilege( ResourcePattern::forExactNamespace( nssToIndex ),
- ActionType::createIndex ) );
- }
+ NamespaceString nssToIndex(nsToIndex);
+ privileges.push_back(
+ Privilege(ResourcePattern::forExactNamespace(nssToIndex), ActionType::createIndex));
}
- else if ( cmdType == BatchedCommandRequest::BatchType_Update ) {
- actionsOnCommandNSS.addAction(ActionType::update);
+ } else if (cmdType == BatchedCommandRequest::BatchType_Update) {
+ actionsOnCommandNSS.addAction(ActionType::update);
- // Upsert also requires insert privs
- if ( BatchedCommandRequest::containsUpserts( cmdObj ) ) {
- actionsOnCommandNSS.addAction(ActionType::insert);
- }
- }
- else {
- fassert( 17251, cmdType == BatchedCommandRequest::BatchType_Delete );
- actionsOnCommandNSS.addAction(ActionType::remove);
- }
-
-
- if (!actionsOnCommandNSS.empty()) {
- privileges.emplace_back(ResourcePattern::forExactNamespace(cmdNSS),
- actionsOnCommandNSS);
+ // Upsert also requires insert privs
+ if (BatchedCommandRequest::containsUpserts(cmdObj)) {
+ actionsOnCommandNSS.addAction(ActionType::insert);
}
+ } else {
+ fassert(17251, cmdType == BatchedCommandRequest::BatchType_Delete);
+ actionsOnCommandNSS.addAction(ActionType::remove);
+ }
- if ( authzSession->isAuthorizedForPrivileges( privileges ) )
- return Status::OK();
- return Status( ErrorCodes::Unauthorized, "unauthorized" );
+ if (!actionsOnCommandNSS.empty()) {
+ privileges.emplace_back(ResourcePattern::forExactNamespace(cmdNSS), actionsOnCommandNSS);
}
+ if (authzSession->isAuthorizedForPrivileges(privileges))
+ return Status::OK();
+
+ return Status(ErrorCodes::Unauthorized, "unauthorized");
+}
}
}
diff --git a/src/mongo/db/commands/write_commands/write_commands_common.h b/src/mongo/db/commands/write_commands/write_commands_common.h
index a1fe6bc9772..cf47bdc02b1 100644
--- a/src/mongo/db/commands/write_commands/write_commands_common.h
+++ b/src/mongo/db/commands/write_commands/write_commands_common.h
@@ -40,10 +40,9 @@
namespace mongo {
namespace auth {
- Status checkAuthForWriteCommand( AuthorizationSession* authzSession,
- BatchedCommandRequest::BatchType cmdType,
- const NamespaceString& cmdNSS,
- const BSONObj& cmdObj );
-
+Status checkAuthForWriteCommand(AuthorizationSession* authzSession,
+ BatchedCommandRequest::BatchType cmdType,
+ const NamespaceString& cmdNSS,
+ const BSONObj& cmdObj);
}
}