diff options
Diffstat (limited to 'src/mongo/s/write_ops/batch_write_exec.cpp')
-rw-r--r-- | src/mongo/s/write_ops/batch_write_exec.cpp | 547 |
1 files changed, 265 insertions, 282 deletions
diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp index 307c39321ee..1fa89c034bc 100644 --- a/src/mongo/s/write_ops/batch_write_exec.cpp +++ b/src/mongo/s/write_ops/batch_write_exec.cpp @@ -36,7 +36,7 @@ #include "mongo/base/owned_pointer_map.h" #include "mongo/base/status.h" #include "mongo/bson/util/builder.h" -#include "mongo/client/dbclientinterface.h" // ConnectionString (header-only) +#include "mongo/client/dbclientinterface.h" // ConnectionString (header-only) #include "mongo/s/client/multi_command_dispatch.h" #include "mongo/s/write_ops/batch_write_op.h" #include "mongo/s/write_ops/write_error_detail.h" @@ -44,354 +44,337 @@ namespace mongo { - using std::endl; - using std::make_pair; - using std::stringstream; - using std::vector; - - BatchWriteExec::BatchWriteExec( NSTargeter* targeter, - ShardResolver* resolver, - MultiCommandDispatch* dispatcher ) : - _targeter( targeter ), - _resolver( resolver ), - _dispatcher( dispatcher ), - _stats( new BatchWriteExecStats ) { - } +using std::endl; +using std::make_pair; +using std::stringstream; +using std::vector; - namespace { +BatchWriteExec::BatchWriteExec(NSTargeter* targeter, + ShardResolver* resolver, + MultiCommandDispatch* dispatcher) + : _targeter(targeter), + _resolver(resolver), + _dispatcher(dispatcher), + _stats(new BatchWriteExecStats) {} - // - // Map which allows associating ConnectionString hosts with TargetedWriteBatches - // This is needed since the dispatcher only returns hosts with responses. - // +namespace { - // TODO: Unordered map? - typedef OwnedPointerMap<ConnectionString, TargetedWriteBatch> OwnedHostBatchMap; - } +// +// Map which allows associating ConnectionString hosts with TargetedWriteBatches +// This is needed since the dispatcher only returns hosts with responses. +// - static void buildErrorFrom( const Status& status, WriteErrorDetail* error ) { - error->setErrCode( status.code() ); - error->setErrMessage( status.reason() ); - } +// TODO: Unordered map? +typedef OwnedPointerMap<ConnectionString, TargetedWriteBatch> OwnedHostBatchMap; +} - // Helper to note several stale errors from a response - static void noteStaleResponses( const vector<ShardError*>& staleErrors, NSTargeter* targeter ) { - for ( vector<ShardError*>::const_iterator it = staleErrors.begin(); it != staleErrors.end(); - ++it ) { - const ShardError* error = *it; - targeter->noteStaleResponse( error->endpoint, - error->error.isErrInfoSet() ? error->error.getErrInfo() : - BSONObj() ); - } - } +static void buildErrorFrom(const Status& status, WriteErrorDetail* error) { + error->setErrCode(status.code()); + error->setErrMessage(status.reason()); +} - static bool isShardMetadataChanging( const vector<ShardError*>& staleErrors ) { - if ( !staleErrors.empty() && staleErrors.back()->error.isErrInfoSet() ) - return staleErrors.back()->error.getErrInfo()["inCriticalSection"].trueValue(); - return false; +// Helper to note several stale errors from a response +static void noteStaleResponses(const vector<ShardError*>& staleErrors, NSTargeter* targeter) { + for (vector<ShardError*>::const_iterator it = staleErrors.begin(); it != staleErrors.end(); + ++it) { + const ShardError* error = *it; + targeter->noteStaleResponse( + error->endpoint, error->error.isErrInfoSet() ? error->error.getErrInfo() : BSONObj()); } +} - // The number of times we'll try to continue a batch op if no progress is being made - // This only applies when no writes are occurring and metadata is not changing on reload - static const int kMaxRoundsWithoutProgress( 5 ); +static bool isShardMetadataChanging(const vector<ShardError*>& staleErrors) { + if (!staleErrors.empty() && staleErrors.back()->error.isErrInfoSet()) + return staleErrors.back()->error.getErrInfo()["inCriticalSection"].trueValue(); + return false; +} - void BatchWriteExec::executeBatch( const BatchedCommandRequest& clientRequest, - BatchedCommandResponse* clientResponse ) { +// The number of times we'll try to continue a batch op if no progress is being made +// This only applies when no writes are occurring and metadata is not changing on reload +static const int kMaxRoundsWithoutProgress(5); - LOG( 4 ) << "starting execution of write batch of size " - << static_cast<int>( clientRequest.sizeWriteOps() ) - << " for " << clientRequest.getNS() << endl; +void BatchWriteExec::executeBatch(const BatchedCommandRequest& clientRequest, + BatchedCommandResponse* clientResponse) { + LOG(4) << "starting execution of write batch of size " + << static_cast<int>(clientRequest.sizeWriteOps()) << " for " << clientRequest.getNS() + << endl; - BatchWriteOp batchOp; - batchOp.initClientRequest( &clientRequest ); + BatchWriteOp batchOp; + batchOp.initClientRequest(&clientRequest); - // Current batch status - bool refreshedTargeter = false; - int rounds = 0; - int numCompletedOps = 0; - int numRoundsWithoutProgress = 0; + // Current batch status + bool refreshedTargeter = false; + int rounds = 0; + int numCompletedOps = 0; + int numRoundsWithoutProgress = 0; - while ( !batchOp.isFinished() ) { + while (!batchOp.isFinished()) { + // + // Get child batches to send using the targeter + // + // Targeting errors can be caused by remote metadata changing (the collection could have + // been dropped and recreated, for example with a new shard key). If a remote metadata + // change occurs *before* a client sends us a batch, we need to make sure that we don't + // error out just because we're staler than the client - otherwise mongos will be have + // unpredictable behavior. + // + // (If a metadata change happens *during* or *after* a client sends us a batch, however, + // we make no guarantees about delivery.) + // + // For this reason, we don't record targeting errors until we've refreshed our targeting + // metadata at least once *after* receiving the client batch - at that point, we know: + // + // 1) our new metadata is the same as the metadata when the client sent a batch, and so + // targeting errors are real. + // OR + // 2) our new metadata is a newer version than when the client sent a batch, and so + // the metadata must have changed after the client batch was sent. We don't need to + // deliver in this case, since for all the client knows we may have gotten the batch + // exactly when the metadata changed. + // - // - // Get child batches to send using the targeter - // - // Targeting errors can be caused by remote metadata changing (the collection could have - // been dropped and recreated, for example with a new shard key). If a remote metadata - // change occurs *before* a client sends us a batch, we need to make sure that we don't - // error out just because we're staler than the client - otherwise mongos will be have - // unpredictable behavior. - // - // (If a metadata change happens *during* or *after* a client sends us a batch, however, - // we make no guarantees about delivery.) - // - // For this reason, we don't record targeting errors until we've refreshed our targeting - // metadata at least once *after* receiving the client batch - at that point, we know: - // - // 1) our new metadata is the same as the metadata when the client sent a batch, and so - // targeting errors are real. - // OR - // 2) our new metadata is a newer version than when the client sent a batch, and so - // the metadata must have changed after the client batch was sent. We don't need to - // deliver in this case, since for all the client knows we may have gotten the batch - // exactly when the metadata changed. - // + OwnedPointerVector<TargetedWriteBatch> childBatchesOwned; + vector<TargetedWriteBatch*>& childBatches = childBatchesOwned.mutableVector(); + + // If we've already had a targeting error, we've refreshed the metadata once and can + // record target errors definitively. + bool recordTargetErrors = refreshedTargeter; + Status targetStatus = batchOp.targetBatch(*_targeter, recordTargetErrors, &childBatches); + if (!targetStatus.isOK()) { + // Don't do anything until a targeter refresh + _targeter->noteCouldNotTarget(); + refreshedTargeter = true; + ++_stats->numTargetErrors; + dassert(childBatches.size() == 0u); + } - OwnedPointerVector<TargetedWriteBatch> childBatchesOwned; - vector<TargetedWriteBatch*>& childBatches = childBatchesOwned.mutableVector(); - - // If we've already had a targeting error, we've refreshed the metadata once and can - // record target errors definitively. - bool recordTargetErrors = refreshedTargeter; - Status targetStatus = batchOp.targetBatch( *_targeter, - recordTargetErrors, - &childBatches ); - if ( !targetStatus.isOK() ) { - // Don't do anything until a targeter refresh - _targeter->noteCouldNotTarget(); - refreshedTargeter = true; - ++_stats->numTargetErrors; - dassert( childBatches.size() == 0u ); - } + // + // Send all child batches + // + + size_t numSent = 0; + size_t numToSend = childBatches.size(); + bool remoteMetadataChanging = false; + while (numSent != numToSend) { + // Collect batches out on the network, mapped by endpoint + OwnedHostBatchMap ownedPendingBatches; + OwnedHostBatchMap::MapType& pendingBatches = ownedPendingBatches.mutableMap(); // - // Send all child batches + // Send side // - size_t numSent = 0; - size_t numToSend = childBatches.size(); - bool remoteMetadataChanging = false; - while ( numSent != numToSend ) { - - // Collect batches out on the network, mapped by endpoint - OwnedHostBatchMap ownedPendingBatches; - OwnedHostBatchMap::MapType& pendingBatches = ownedPendingBatches.mutableMap(); - + // Get as many batches as we can at once + for (vector<TargetedWriteBatch*>::iterator it = childBatches.begin(); + it != childBatches.end(); + ++it) { // - // Send side + // Collect the info needed to dispatch our targeted batch // - // Get as many batches as we can at once - for ( vector<TargetedWriteBatch*>::iterator it = childBatches.begin(); - it != childBatches.end(); ++it ) { - - // - // Collect the info needed to dispatch our targeted batch - // - - TargetedWriteBatch* nextBatch = *it; - // If the batch is NULL, we sent it previously, so skip - if ( nextBatch == NULL ) continue; - - // Figure out what host we need to dispatch our targeted batch - ConnectionString shardHost; - Status resolveStatus = _resolver->chooseWriteHost( nextBatch->getEndpoint() - .shardName, - &shardHost ); - if ( !resolveStatus.isOK() ) { - - ++_stats->numResolveErrors; - - // Record a resolve failure - // TODO: It may be necessary to refresh the cache if stale, or maybe just - // cancel and retarget the batch - WriteErrorDetail error; - buildErrorFrom( resolveStatus, &error ); - - LOG( 4 ) << "unable to send write batch to " << shardHost.toString() - << causedBy( resolveStatus.toString() ) << endl; - - batchOp.noteBatchError( *nextBatch, error ); - - // We're done with this batch - // Clean up when we can't resolve a host - delete *it; - *it = NULL; - --numToSend; - continue; - } - - // If we already have a batch for this host, wait until the next time - OwnedHostBatchMap::MapType::iterator pendingIt = pendingBatches.find( shardHost ); - if ( pendingIt != pendingBatches.end() ) continue; - - // - // We now have all the info needed to dispatch the batch - // + TargetedWriteBatch* nextBatch = *it; + // If the batch is NULL, we sent it previously, so skip + if (nextBatch == NULL) + continue; - BatchedCommandRequest request( clientRequest.getBatchType() ); - batchOp.buildBatchRequest( *nextBatch, &request ); + // Figure out what host we need to dispatch our targeted batch + ConnectionString shardHost; + Status resolveStatus = + _resolver->chooseWriteHost(nextBatch->getEndpoint().shardName, &shardHost); + if (!resolveStatus.isOK()) { + ++_stats->numResolveErrors; - // Internally we use full namespaces for request/response, but we send the - // command to a database with the collection name in the request. - NamespaceString nss( request.getNS() ); - request.setNS( nss.coll() ); + // Record a resolve failure + // TODO: It may be necessary to refresh the cache if stale, or maybe just + // cancel and retarget the batch + WriteErrorDetail error; + buildErrorFrom(resolveStatus, &error); - LOG( 4 ) << "sending write batch to " << shardHost.toString() << ": " - << request.toString() << endl; + LOG(4) << "unable to send write batch to " << shardHost.toString() + << causedBy(resolveStatus.toString()) << endl; - _dispatcher->addCommand( shardHost, nss.db(), request ); + batchOp.noteBatchError(*nextBatch, error); - // Indicate we're done by setting the batch to NULL - // We'll only get duplicate hostEndpoints if we have broadcast and non-broadcast - // endpoints for the same host, so this should be pretty efficient without - // moving stuff around. + // We're done with this batch + // Clean up when we can't resolve a host + delete *it; *it = NULL; - - // Recv-side is responsible for cleaning up the nextBatch when used - pendingBatches.insert( make_pair( shardHost, nextBatch ) ); + --numToSend; + continue; } - // Send them all out - _dispatcher->sendAll(); - numSent += pendingBatches.size(); + // If we already have a batch for this host, wait until the next time + OwnedHostBatchMap::MapType::iterator pendingIt = pendingBatches.find(shardHost); + if (pendingIt != pendingBatches.end()) + continue; // - // Recv side + // We now have all the info needed to dispatch the batch // - while ( _dispatcher->numPending() > 0 ) { + BatchedCommandRequest request(clientRequest.getBatchType()); + batchOp.buildBatchRequest(*nextBatch, &request); - // Get the response - ConnectionString shardHost; - BatchedCommandResponse response; - Status dispatchStatus = _dispatcher->recvAny( &shardHost, &response ); + // Internally we use full namespaces for request/response, but we send the + // command to a database with the collection name in the request. + NamespaceString nss(request.getNS()); + request.setNS(nss.coll()); - // Get the TargetedWriteBatch to find where to put the response - dassert( pendingBatches.find( shardHost ) != pendingBatches.end() ); - TargetedWriteBatch* batch = pendingBatches.find( shardHost )->second; + LOG(4) << "sending write batch to " << shardHost.toString() << ": " + << request.toString() << endl; - if ( dispatchStatus.isOK() ) { + _dispatcher->addCommand(shardHost, nss.db(), request); - TrackedErrors trackedErrors; - trackedErrors.startTracking( ErrorCodes::StaleShardVersion ); + // Indicate we're done by setting the batch to NULL + // We'll only get duplicate hostEndpoints if we have broadcast and non-broadcast + // endpoints for the same host, so this should be pretty efficient without + // moving stuff around. + *it = NULL; - LOG( 4 ) << "write results received from " << shardHost.toString() << ": " - << response.toString() << endl; - - // Dispatch was ok, note response - batchOp.noteBatchResponse( *batch, response, &trackedErrors ); + // Recv-side is responsible for cleaning up the nextBatch when used + pendingBatches.insert(make_pair(shardHost, nextBatch)); + } - // Note if anything was stale - const vector<ShardError*>& staleErrors = - trackedErrors.getErrors( ErrorCodes::StaleShardVersion ); + // Send them all out + _dispatcher->sendAll(); + numSent += pendingBatches.size(); - if ( staleErrors.size() > 0 ) { - noteStaleResponses( staleErrors, _targeter ); - ++_stats->numStaleBatches; - } + // + // Recv side + // - // Remember if the shard is actively changing metadata right now - if ( isShardMetadataChanging( staleErrors ) ) { - remoteMetadataChanging = true; - } + while (_dispatcher->numPending() > 0) { + // Get the response + ConnectionString shardHost; + BatchedCommandResponse response; + Status dispatchStatus = _dispatcher->recvAny(&shardHost, &response); - // Remember that we successfully wrote to this shard - // NOTE: This will record lastOps for shards where we actually didn't update - // or delete any documents, which preserves old behavior but is conservative - _stats->noteWriteAt( shardHost, - response.isLastOpSet() ? - response.getLastOp() : Timestamp(), - response.isElectionIdSet() ? - response.getElectionId() : OID()); - } - else { + // Get the TargetedWriteBatch to find where to put the response + dassert(pendingBatches.find(shardHost) != pendingBatches.end()); + TargetedWriteBatch* batch = pendingBatches.find(shardHost)->second; - // Error occurred dispatching, note it + if (dispatchStatus.isOK()) { + TrackedErrors trackedErrors; + trackedErrors.startTracking(ErrorCodes::StaleShardVersion); - stringstream msg; - msg << "write results unavailable from " << shardHost.toString() - << causedBy( dispatchStatus.toString() ); + LOG(4) << "write results received from " << shardHost.toString() << ": " + << response.toString() << endl; - WriteErrorDetail error; - buildErrorFrom( Status( ErrorCodes::RemoteResultsUnavailable, msg.str() ), - &error ); + // Dispatch was ok, note response + batchOp.noteBatchResponse(*batch, response, &trackedErrors); - LOG( 4 ) << "unable to receive write results from " << shardHost.toString() - << causedBy( dispatchStatus.toString() ) << endl; + // Note if anything was stale + const vector<ShardError*>& staleErrors = + trackedErrors.getErrors(ErrorCodes::StaleShardVersion); - batchOp.noteBatchError( *batch, error ); + if (staleErrors.size() > 0) { + noteStaleResponses(staleErrors, _targeter); + ++_stats->numStaleBatches; } - } - } - ++rounds; - ++_stats->numRounds; + // Remember if the shard is actively changing metadata right now + if (isShardMetadataChanging(staleErrors)) { + remoteMetadataChanging = true; + } - // If we're done, get out - if ( batchOp.isFinished() ) - break; + // Remember that we successfully wrote to this shard + // NOTE: This will record lastOps for shards where we actually didn't update + // or delete any documents, which preserves old behavior but is conservative + _stats->noteWriteAt(shardHost, + response.isLastOpSet() ? response.getLastOp() : Timestamp(), + response.isElectionIdSet() ? response.getElectionId() + : OID()); + } else { + // Error occurred dispatching, note it - // MORE WORK TO DO + stringstream msg; + msg << "write results unavailable from " << shardHost.toString() + << causedBy(dispatchStatus.toString()); - // - // Refresh the targeter if we need to (no-op if nothing stale) - // + WriteErrorDetail error; + buildErrorFrom(Status(ErrorCodes::RemoteResultsUnavailable, msg.str()), &error); - bool targeterChanged = false; - Status refreshStatus = _targeter->refreshIfNeeded( &targeterChanged ); + LOG(4) << "unable to receive write results from " << shardHost.toString() + << causedBy(dispatchStatus.toString()) << endl; - if ( !refreshStatus.isOK() ) { - - // It's okay if we can't refresh, we'll just record errors for the ops if - // needed. - warning() << "could not refresh targeter" << causedBy( refreshStatus.reason() ) - << endl; + batchOp.noteBatchError(*batch, error); + } } + } - // - // Ensure progress is being made toward completing the batch op - // + ++rounds; + ++_stats->numRounds; - int currCompletedOps = batchOp.numWriteOpsIn( WriteOpState_Completed ); - if ( currCompletedOps == numCompletedOps && !targeterChanged - && !remoteMetadataChanging ) { - ++numRoundsWithoutProgress; - } - else { - numRoundsWithoutProgress = 0; - } - numCompletedOps = currCompletedOps; + // If we're done, get out + if (batchOp.isFinished()) + break; - if ( numRoundsWithoutProgress > kMaxRoundsWithoutProgress ) { + // MORE WORK TO DO - stringstream msg; - msg << "no progress was made executing batch write op in " << clientRequest.getNS() - << " after " << kMaxRoundsWithoutProgress << " rounds (" << numCompletedOps - << " ops completed in " << rounds << " rounds total)"; + // + // Refresh the targeter if we need to (no-op if nothing stale) + // - WriteErrorDetail error; - buildErrorFrom( Status( ErrorCodes::NoProgressMade, msg.str() ), &error ); - batchOp.abortBatch( error ); - break; - } + bool targeterChanged = false; + Status refreshStatus = _targeter->refreshIfNeeded(&targeterChanged); + + if (!refreshStatus.isOK()) { + // It's okay if we can't refresh, we'll just record errors for the ops if + // needed. + warning() << "could not refresh targeter" << causedBy(refreshStatus.reason()) << endl; } - batchOp.buildClientResponse( clientResponse ); + // + // Ensure progress is being made toward completing the batch op + // - LOG( 4 ) << "finished execution of write batch" - << ( clientResponse->isErrDetailsSet() ? " with write errors" : "") - << ( clientResponse->isErrDetailsSet() && - clientResponse->isWriteConcernErrorSet() ? " and" : "" ) - << ( clientResponse->isWriteConcernErrorSet() ? " with write concern error" : "" ) - << " for " << clientRequest.getNS() << endl; + int currCompletedOps = batchOp.numWriteOpsIn(WriteOpState_Completed); + if (currCompletedOps == numCompletedOps && !targeterChanged && !remoteMetadataChanging) { + ++numRoundsWithoutProgress; + } else { + numRoundsWithoutProgress = 0; + } + numCompletedOps = currCompletedOps; + + if (numRoundsWithoutProgress > kMaxRoundsWithoutProgress) { + stringstream msg; + msg << "no progress was made executing batch write op in " << clientRequest.getNS() + << " after " << kMaxRoundsWithoutProgress << " rounds (" << numCompletedOps + << " ops completed in " << rounds << " rounds total)"; + + WriteErrorDetail error; + buildErrorFrom(Status(ErrorCodes::NoProgressMade, msg.str()), &error); + batchOp.abortBatch(error); + break; + } } - const BatchWriteExecStats& BatchWriteExec::getStats() { - return *_stats; - } + batchOp.buildClientResponse(clientResponse); - BatchWriteExecStats* BatchWriteExec::releaseStats() { - return _stats.release(); - } + LOG(4) << "finished execution of write batch" + << (clientResponse->isErrDetailsSet() ? " with write errors" : "") + << (clientResponse->isErrDetailsSet() && clientResponse->isWriteConcernErrorSet() + ? " and" + : "") + << (clientResponse->isWriteConcernErrorSet() ? " with write concern error" : "") + << " for " << clientRequest.getNS() << endl; +} - void BatchWriteExecStats::noteWriteAt(const ConnectionString& host, - Timestamp opTime, - const OID& electionId) { - _writeOpTimes[host] = HostOpTime(opTime, electionId); - } +const BatchWriteExecStats& BatchWriteExec::getStats() { + return *_stats; +} - const HostOpTimeMap& BatchWriteExecStats::getWriteOpTimes() const { - return _writeOpTimes; - } +BatchWriteExecStats* BatchWriteExec::releaseStats() { + return _stats.release(); +} + +void BatchWriteExecStats::noteWriteAt(const ConnectionString& host, + Timestamp opTime, + const OID& electionId) { + _writeOpTimes[host] = HostOpTime(opTime, electionId); +} + +const HostOpTimeMap& BatchWriteExecStats::getWriteOpTimes() const { + return _writeOpTimes; +} } |