summaryrefslogtreecommitdiff
path: root/src/mongo/s/write_ops/batch_write_exec.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/write_ops/batch_write_exec.cpp')
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.cpp547
1 files changed, 265 insertions, 282 deletions
diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp
index 307c39321ee..1fa89c034bc 100644
--- a/src/mongo/s/write_ops/batch_write_exec.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec.cpp
@@ -36,7 +36,7 @@
#include "mongo/base/owned_pointer_map.h"
#include "mongo/base/status.h"
#include "mongo/bson/util/builder.h"
-#include "mongo/client/dbclientinterface.h" // ConnectionString (header-only)
+#include "mongo/client/dbclientinterface.h" // ConnectionString (header-only)
#include "mongo/s/client/multi_command_dispatch.h"
#include "mongo/s/write_ops/batch_write_op.h"
#include "mongo/s/write_ops/write_error_detail.h"
@@ -44,354 +44,337 @@
namespace mongo {
- using std::endl;
- using std::make_pair;
- using std::stringstream;
- using std::vector;
-
- BatchWriteExec::BatchWriteExec( NSTargeter* targeter,
- ShardResolver* resolver,
- MultiCommandDispatch* dispatcher ) :
- _targeter( targeter ),
- _resolver( resolver ),
- _dispatcher( dispatcher ),
- _stats( new BatchWriteExecStats ) {
- }
+using std::endl;
+using std::make_pair;
+using std::stringstream;
+using std::vector;
- namespace {
+BatchWriteExec::BatchWriteExec(NSTargeter* targeter,
+ ShardResolver* resolver,
+ MultiCommandDispatch* dispatcher)
+ : _targeter(targeter),
+ _resolver(resolver),
+ _dispatcher(dispatcher),
+ _stats(new BatchWriteExecStats) {}
- //
- // Map which allows associating ConnectionString hosts with TargetedWriteBatches
- // This is needed since the dispatcher only returns hosts with responses.
- //
+namespace {
- // TODO: Unordered map?
- typedef OwnedPointerMap<ConnectionString, TargetedWriteBatch> OwnedHostBatchMap;
- }
+//
+// Map which allows associating ConnectionString hosts with TargetedWriteBatches
+// This is needed since the dispatcher only returns hosts with responses.
+//
- static void buildErrorFrom( const Status& status, WriteErrorDetail* error ) {
- error->setErrCode( status.code() );
- error->setErrMessage( status.reason() );
- }
+// TODO: Unordered map?
+typedef OwnedPointerMap<ConnectionString, TargetedWriteBatch> OwnedHostBatchMap;
+}
- // Helper to note several stale errors from a response
- static void noteStaleResponses( const vector<ShardError*>& staleErrors, NSTargeter* targeter ) {
- for ( vector<ShardError*>::const_iterator it = staleErrors.begin(); it != staleErrors.end();
- ++it ) {
- const ShardError* error = *it;
- targeter->noteStaleResponse( error->endpoint,
- error->error.isErrInfoSet() ? error->error.getErrInfo() :
- BSONObj() );
- }
- }
+static void buildErrorFrom(const Status& status, WriteErrorDetail* error) {
+ error->setErrCode(status.code());
+ error->setErrMessage(status.reason());
+}
- static bool isShardMetadataChanging( const vector<ShardError*>& staleErrors ) {
- if ( !staleErrors.empty() && staleErrors.back()->error.isErrInfoSet() )
- return staleErrors.back()->error.getErrInfo()["inCriticalSection"].trueValue();
- return false;
+// Helper to note several stale errors from a response
+static void noteStaleResponses(const vector<ShardError*>& staleErrors, NSTargeter* targeter) {
+ for (vector<ShardError*>::const_iterator it = staleErrors.begin(); it != staleErrors.end();
+ ++it) {
+ const ShardError* error = *it;
+ targeter->noteStaleResponse(
+ error->endpoint, error->error.isErrInfoSet() ? error->error.getErrInfo() : BSONObj());
}
+}
- // The number of times we'll try to continue a batch op if no progress is being made
- // This only applies when no writes are occurring and metadata is not changing on reload
- static const int kMaxRoundsWithoutProgress( 5 );
+static bool isShardMetadataChanging(const vector<ShardError*>& staleErrors) {
+ if (!staleErrors.empty() && staleErrors.back()->error.isErrInfoSet())
+ return staleErrors.back()->error.getErrInfo()["inCriticalSection"].trueValue();
+ return false;
+}
- void BatchWriteExec::executeBatch( const BatchedCommandRequest& clientRequest,
- BatchedCommandResponse* clientResponse ) {
+// The number of times we'll try to continue a batch op if no progress is being made
+// This only applies when no writes are occurring and metadata is not changing on reload
+static const int kMaxRoundsWithoutProgress(5);
- LOG( 4 ) << "starting execution of write batch of size "
- << static_cast<int>( clientRequest.sizeWriteOps() )
- << " for " << clientRequest.getNS() << endl;
+void BatchWriteExec::executeBatch(const BatchedCommandRequest& clientRequest,
+ BatchedCommandResponse* clientResponse) {
+ LOG(4) << "starting execution of write batch of size "
+ << static_cast<int>(clientRequest.sizeWriteOps()) << " for " << clientRequest.getNS()
+ << endl;
- BatchWriteOp batchOp;
- batchOp.initClientRequest( &clientRequest );
+ BatchWriteOp batchOp;
+ batchOp.initClientRequest(&clientRequest);
- // Current batch status
- bool refreshedTargeter = false;
- int rounds = 0;
- int numCompletedOps = 0;
- int numRoundsWithoutProgress = 0;
+ // Current batch status
+ bool refreshedTargeter = false;
+ int rounds = 0;
+ int numCompletedOps = 0;
+ int numRoundsWithoutProgress = 0;
- while ( !batchOp.isFinished() ) {
+ while (!batchOp.isFinished()) {
+ //
+ // Get child batches to send using the targeter
+ //
+ // Targeting errors can be caused by remote metadata changing (the collection could have
+ // been dropped and recreated, for example with a new shard key). If a remote metadata
+ // change occurs *before* a client sends us a batch, we need to make sure that we don't
+ // error out just because we're staler than the client - otherwise mongos will be have
+ // unpredictable behavior.
+ //
+ // (If a metadata change happens *during* or *after* a client sends us a batch, however,
+ // we make no guarantees about delivery.)
+ //
+ // For this reason, we don't record targeting errors until we've refreshed our targeting
+ // metadata at least once *after* receiving the client batch - at that point, we know:
+ //
+ // 1) our new metadata is the same as the metadata when the client sent a batch, and so
+ // targeting errors are real.
+ // OR
+ // 2) our new metadata is a newer version than when the client sent a batch, and so
+ // the metadata must have changed after the client batch was sent. We don't need to
+ // deliver in this case, since for all the client knows we may have gotten the batch
+ // exactly when the metadata changed.
+ //
- //
- // Get child batches to send using the targeter
- //
- // Targeting errors can be caused by remote metadata changing (the collection could have
- // been dropped and recreated, for example with a new shard key). If a remote metadata
- // change occurs *before* a client sends us a batch, we need to make sure that we don't
- // error out just because we're staler than the client - otherwise mongos will be have
- // unpredictable behavior.
- //
- // (If a metadata change happens *during* or *after* a client sends us a batch, however,
- // we make no guarantees about delivery.)
- //
- // For this reason, we don't record targeting errors until we've refreshed our targeting
- // metadata at least once *after* receiving the client batch - at that point, we know:
- //
- // 1) our new metadata is the same as the metadata when the client sent a batch, and so
- // targeting errors are real.
- // OR
- // 2) our new metadata is a newer version than when the client sent a batch, and so
- // the metadata must have changed after the client batch was sent. We don't need to
- // deliver in this case, since for all the client knows we may have gotten the batch
- // exactly when the metadata changed.
- //
+ OwnedPointerVector<TargetedWriteBatch> childBatchesOwned;
+ vector<TargetedWriteBatch*>& childBatches = childBatchesOwned.mutableVector();
+
+ // If we've already had a targeting error, we've refreshed the metadata once and can
+ // record target errors definitively.
+ bool recordTargetErrors = refreshedTargeter;
+ Status targetStatus = batchOp.targetBatch(*_targeter, recordTargetErrors, &childBatches);
+ if (!targetStatus.isOK()) {
+ // Don't do anything until a targeter refresh
+ _targeter->noteCouldNotTarget();
+ refreshedTargeter = true;
+ ++_stats->numTargetErrors;
+ dassert(childBatches.size() == 0u);
+ }
- OwnedPointerVector<TargetedWriteBatch> childBatchesOwned;
- vector<TargetedWriteBatch*>& childBatches = childBatchesOwned.mutableVector();
-
- // If we've already had a targeting error, we've refreshed the metadata once and can
- // record target errors definitively.
- bool recordTargetErrors = refreshedTargeter;
- Status targetStatus = batchOp.targetBatch( *_targeter,
- recordTargetErrors,
- &childBatches );
- if ( !targetStatus.isOK() ) {
- // Don't do anything until a targeter refresh
- _targeter->noteCouldNotTarget();
- refreshedTargeter = true;
- ++_stats->numTargetErrors;
- dassert( childBatches.size() == 0u );
- }
+ //
+ // Send all child batches
+ //
+
+ size_t numSent = 0;
+ size_t numToSend = childBatches.size();
+ bool remoteMetadataChanging = false;
+ while (numSent != numToSend) {
+ // Collect batches out on the network, mapped by endpoint
+ OwnedHostBatchMap ownedPendingBatches;
+ OwnedHostBatchMap::MapType& pendingBatches = ownedPendingBatches.mutableMap();
//
- // Send all child batches
+ // Send side
//
- size_t numSent = 0;
- size_t numToSend = childBatches.size();
- bool remoteMetadataChanging = false;
- while ( numSent != numToSend ) {
-
- // Collect batches out on the network, mapped by endpoint
- OwnedHostBatchMap ownedPendingBatches;
- OwnedHostBatchMap::MapType& pendingBatches = ownedPendingBatches.mutableMap();
-
+ // Get as many batches as we can at once
+ for (vector<TargetedWriteBatch*>::iterator it = childBatches.begin();
+ it != childBatches.end();
+ ++it) {
//
- // Send side
+ // Collect the info needed to dispatch our targeted batch
//
- // Get as many batches as we can at once
- for ( vector<TargetedWriteBatch*>::iterator it = childBatches.begin();
- it != childBatches.end(); ++it ) {
-
- //
- // Collect the info needed to dispatch our targeted batch
- //
-
- TargetedWriteBatch* nextBatch = *it;
- // If the batch is NULL, we sent it previously, so skip
- if ( nextBatch == NULL ) continue;
-
- // Figure out what host we need to dispatch our targeted batch
- ConnectionString shardHost;
- Status resolveStatus = _resolver->chooseWriteHost( nextBatch->getEndpoint()
- .shardName,
- &shardHost );
- if ( !resolveStatus.isOK() ) {
-
- ++_stats->numResolveErrors;
-
- // Record a resolve failure
- // TODO: It may be necessary to refresh the cache if stale, or maybe just
- // cancel and retarget the batch
- WriteErrorDetail error;
- buildErrorFrom( resolveStatus, &error );
-
- LOG( 4 ) << "unable to send write batch to " << shardHost.toString()
- << causedBy( resolveStatus.toString() ) << endl;
-
- batchOp.noteBatchError( *nextBatch, error );
-
- // We're done with this batch
- // Clean up when we can't resolve a host
- delete *it;
- *it = NULL;
- --numToSend;
- continue;
- }
-
- // If we already have a batch for this host, wait until the next time
- OwnedHostBatchMap::MapType::iterator pendingIt = pendingBatches.find( shardHost );
- if ( pendingIt != pendingBatches.end() ) continue;
-
- //
- // We now have all the info needed to dispatch the batch
- //
+ TargetedWriteBatch* nextBatch = *it;
+ // If the batch is NULL, we sent it previously, so skip
+ if (nextBatch == NULL)
+ continue;
- BatchedCommandRequest request( clientRequest.getBatchType() );
- batchOp.buildBatchRequest( *nextBatch, &request );
+ // Figure out what host we need to dispatch our targeted batch
+ ConnectionString shardHost;
+ Status resolveStatus =
+ _resolver->chooseWriteHost(nextBatch->getEndpoint().shardName, &shardHost);
+ if (!resolveStatus.isOK()) {
+ ++_stats->numResolveErrors;
- // Internally we use full namespaces for request/response, but we send the
- // command to a database with the collection name in the request.
- NamespaceString nss( request.getNS() );
- request.setNS( nss.coll() );
+ // Record a resolve failure
+ // TODO: It may be necessary to refresh the cache if stale, or maybe just
+ // cancel and retarget the batch
+ WriteErrorDetail error;
+ buildErrorFrom(resolveStatus, &error);
- LOG( 4 ) << "sending write batch to " << shardHost.toString() << ": "
- << request.toString() << endl;
+ LOG(4) << "unable to send write batch to " << shardHost.toString()
+ << causedBy(resolveStatus.toString()) << endl;
- _dispatcher->addCommand( shardHost, nss.db(), request );
+ batchOp.noteBatchError(*nextBatch, error);
- // Indicate we're done by setting the batch to NULL
- // We'll only get duplicate hostEndpoints if we have broadcast and non-broadcast
- // endpoints for the same host, so this should be pretty efficient without
- // moving stuff around.
+ // We're done with this batch
+ // Clean up when we can't resolve a host
+ delete *it;
*it = NULL;
-
- // Recv-side is responsible for cleaning up the nextBatch when used
- pendingBatches.insert( make_pair( shardHost, nextBatch ) );
+ --numToSend;
+ continue;
}
- // Send them all out
- _dispatcher->sendAll();
- numSent += pendingBatches.size();
+ // If we already have a batch for this host, wait until the next time
+ OwnedHostBatchMap::MapType::iterator pendingIt = pendingBatches.find(shardHost);
+ if (pendingIt != pendingBatches.end())
+ continue;
//
- // Recv side
+ // We now have all the info needed to dispatch the batch
//
- while ( _dispatcher->numPending() > 0 ) {
+ BatchedCommandRequest request(clientRequest.getBatchType());
+ batchOp.buildBatchRequest(*nextBatch, &request);
- // Get the response
- ConnectionString shardHost;
- BatchedCommandResponse response;
- Status dispatchStatus = _dispatcher->recvAny( &shardHost, &response );
+ // Internally we use full namespaces for request/response, but we send the
+ // command to a database with the collection name in the request.
+ NamespaceString nss(request.getNS());
+ request.setNS(nss.coll());
- // Get the TargetedWriteBatch to find where to put the response
- dassert( pendingBatches.find( shardHost ) != pendingBatches.end() );
- TargetedWriteBatch* batch = pendingBatches.find( shardHost )->second;
+ LOG(4) << "sending write batch to " << shardHost.toString() << ": "
+ << request.toString() << endl;
- if ( dispatchStatus.isOK() ) {
+ _dispatcher->addCommand(shardHost, nss.db(), request);
- TrackedErrors trackedErrors;
- trackedErrors.startTracking( ErrorCodes::StaleShardVersion );
+ // Indicate we're done by setting the batch to NULL
+ // We'll only get duplicate hostEndpoints if we have broadcast and non-broadcast
+ // endpoints for the same host, so this should be pretty efficient without
+ // moving stuff around.
+ *it = NULL;
- LOG( 4 ) << "write results received from " << shardHost.toString() << ": "
- << response.toString() << endl;
-
- // Dispatch was ok, note response
- batchOp.noteBatchResponse( *batch, response, &trackedErrors );
+ // Recv-side is responsible for cleaning up the nextBatch when used
+ pendingBatches.insert(make_pair(shardHost, nextBatch));
+ }
- // Note if anything was stale
- const vector<ShardError*>& staleErrors =
- trackedErrors.getErrors( ErrorCodes::StaleShardVersion );
+ // Send them all out
+ _dispatcher->sendAll();
+ numSent += pendingBatches.size();
- if ( staleErrors.size() > 0 ) {
- noteStaleResponses( staleErrors, _targeter );
- ++_stats->numStaleBatches;
- }
+ //
+ // Recv side
+ //
- // Remember if the shard is actively changing metadata right now
- if ( isShardMetadataChanging( staleErrors ) ) {
- remoteMetadataChanging = true;
- }
+ while (_dispatcher->numPending() > 0) {
+ // Get the response
+ ConnectionString shardHost;
+ BatchedCommandResponse response;
+ Status dispatchStatus = _dispatcher->recvAny(&shardHost, &response);
- // Remember that we successfully wrote to this shard
- // NOTE: This will record lastOps for shards where we actually didn't update
- // or delete any documents, which preserves old behavior but is conservative
- _stats->noteWriteAt( shardHost,
- response.isLastOpSet() ?
- response.getLastOp() : Timestamp(),
- response.isElectionIdSet() ?
- response.getElectionId() : OID());
- }
- else {
+ // Get the TargetedWriteBatch to find where to put the response
+ dassert(pendingBatches.find(shardHost) != pendingBatches.end());
+ TargetedWriteBatch* batch = pendingBatches.find(shardHost)->second;
- // Error occurred dispatching, note it
+ if (dispatchStatus.isOK()) {
+ TrackedErrors trackedErrors;
+ trackedErrors.startTracking(ErrorCodes::StaleShardVersion);
- stringstream msg;
- msg << "write results unavailable from " << shardHost.toString()
- << causedBy( dispatchStatus.toString() );
+ LOG(4) << "write results received from " << shardHost.toString() << ": "
+ << response.toString() << endl;
- WriteErrorDetail error;
- buildErrorFrom( Status( ErrorCodes::RemoteResultsUnavailable, msg.str() ),
- &error );
+ // Dispatch was ok, note response
+ batchOp.noteBatchResponse(*batch, response, &trackedErrors);
- LOG( 4 ) << "unable to receive write results from " << shardHost.toString()
- << causedBy( dispatchStatus.toString() ) << endl;
+ // Note if anything was stale
+ const vector<ShardError*>& staleErrors =
+ trackedErrors.getErrors(ErrorCodes::StaleShardVersion);
- batchOp.noteBatchError( *batch, error );
+ if (staleErrors.size() > 0) {
+ noteStaleResponses(staleErrors, _targeter);
+ ++_stats->numStaleBatches;
}
- }
- }
- ++rounds;
- ++_stats->numRounds;
+ // Remember if the shard is actively changing metadata right now
+ if (isShardMetadataChanging(staleErrors)) {
+ remoteMetadataChanging = true;
+ }
- // If we're done, get out
- if ( batchOp.isFinished() )
- break;
+ // Remember that we successfully wrote to this shard
+ // NOTE: This will record lastOps for shards where we actually didn't update
+ // or delete any documents, which preserves old behavior but is conservative
+ _stats->noteWriteAt(shardHost,
+ response.isLastOpSet() ? response.getLastOp() : Timestamp(),
+ response.isElectionIdSet() ? response.getElectionId()
+ : OID());
+ } else {
+ // Error occurred dispatching, note it
- // MORE WORK TO DO
+ stringstream msg;
+ msg << "write results unavailable from " << shardHost.toString()
+ << causedBy(dispatchStatus.toString());
- //
- // Refresh the targeter if we need to (no-op if nothing stale)
- //
+ WriteErrorDetail error;
+ buildErrorFrom(Status(ErrorCodes::RemoteResultsUnavailable, msg.str()), &error);
- bool targeterChanged = false;
- Status refreshStatus = _targeter->refreshIfNeeded( &targeterChanged );
+ LOG(4) << "unable to receive write results from " << shardHost.toString()
+ << causedBy(dispatchStatus.toString()) << endl;
- if ( !refreshStatus.isOK() ) {
-
- // It's okay if we can't refresh, we'll just record errors for the ops if
- // needed.
- warning() << "could not refresh targeter" << causedBy( refreshStatus.reason() )
- << endl;
+ batchOp.noteBatchError(*batch, error);
+ }
}
+ }
- //
- // Ensure progress is being made toward completing the batch op
- //
+ ++rounds;
+ ++_stats->numRounds;
- int currCompletedOps = batchOp.numWriteOpsIn( WriteOpState_Completed );
- if ( currCompletedOps == numCompletedOps && !targeterChanged
- && !remoteMetadataChanging ) {
- ++numRoundsWithoutProgress;
- }
- else {
- numRoundsWithoutProgress = 0;
- }
- numCompletedOps = currCompletedOps;
+ // If we're done, get out
+ if (batchOp.isFinished())
+ break;
- if ( numRoundsWithoutProgress > kMaxRoundsWithoutProgress ) {
+ // MORE WORK TO DO
- stringstream msg;
- msg << "no progress was made executing batch write op in " << clientRequest.getNS()
- << " after " << kMaxRoundsWithoutProgress << " rounds (" << numCompletedOps
- << " ops completed in " << rounds << " rounds total)";
+ //
+ // Refresh the targeter if we need to (no-op if nothing stale)
+ //
- WriteErrorDetail error;
- buildErrorFrom( Status( ErrorCodes::NoProgressMade, msg.str() ), &error );
- batchOp.abortBatch( error );
- break;
- }
+ bool targeterChanged = false;
+ Status refreshStatus = _targeter->refreshIfNeeded(&targeterChanged);
+
+ if (!refreshStatus.isOK()) {
+ // It's okay if we can't refresh, we'll just record errors for the ops if
+ // needed.
+ warning() << "could not refresh targeter" << causedBy(refreshStatus.reason()) << endl;
}
- batchOp.buildClientResponse( clientResponse );
+ //
+ // Ensure progress is being made toward completing the batch op
+ //
- LOG( 4 ) << "finished execution of write batch"
- << ( clientResponse->isErrDetailsSet() ? " with write errors" : "")
- << ( clientResponse->isErrDetailsSet() &&
- clientResponse->isWriteConcernErrorSet() ? " and" : "" )
- << ( clientResponse->isWriteConcernErrorSet() ? " with write concern error" : "" )
- << " for " << clientRequest.getNS() << endl;
+ int currCompletedOps = batchOp.numWriteOpsIn(WriteOpState_Completed);
+ if (currCompletedOps == numCompletedOps && !targeterChanged && !remoteMetadataChanging) {
+ ++numRoundsWithoutProgress;
+ } else {
+ numRoundsWithoutProgress = 0;
+ }
+ numCompletedOps = currCompletedOps;
+
+ if (numRoundsWithoutProgress > kMaxRoundsWithoutProgress) {
+ stringstream msg;
+ msg << "no progress was made executing batch write op in " << clientRequest.getNS()
+ << " after " << kMaxRoundsWithoutProgress << " rounds (" << numCompletedOps
+ << " ops completed in " << rounds << " rounds total)";
+
+ WriteErrorDetail error;
+ buildErrorFrom(Status(ErrorCodes::NoProgressMade, msg.str()), &error);
+ batchOp.abortBatch(error);
+ break;
+ }
}
- const BatchWriteExecStats& BatchWriteExec::getStats() {
- return *_stats;
- }
+ batchOp.buildClientResponse(clientResponse);
- BatchWriteExecStats* BatchWriteExec::releaseStats() {
- return _stats.release();
- }
+ LOG(4) << "finished execution of write batch"
+ << (clientResponse->isErrDetailsSet() ? " with write errors" : "")
+ << (clientResponse->isErrDetailsSet() && clientResponse->isWriteConcernErrorSet()
+ ? " and"
+ : "")
+ << (clientResponse->isWriteConcernErrorSet() ? " with write concern error" : "")
+ << " for " << clientRequest.getNS() << endl;
+}
- void BatchWriteExecStats::noteWriteAt(const ConnectionString& host,
- Timestamp opTime,
- const OID& electionId) {
- _writeOpTimes[host] = HostOpTime(opTime, electionId);
- }
+const BatchWriteExecStats& BatchWriteExec::getStats() {
+ return *_stats;
+}
- const HostOpTimeMap& BatchWriteExecStats::getWriteOpTimes() const {
- return _writeOpTimes;
- }
+BatchWriteExecStats* BatchWriteExec::releaseStats() {
+ return _stats.release();
+}
+
+void BatchWriteExecStats::noteWriteAt(const ConnectionString& host,
+ Timestamp opTime,
+ const OID& electionId) {
+ _writeOpTimes[host] = HostOpTime(opTime, electionId);
+}
+
+const HostOpTimeMap& BatchWriteExecStats::getWriteOpTimes() const {
+ return _writeOpTimes;
+}
}