diff options
author | Greg Studer <greg@10gen.com> | 2014-01-16 12:40:27 -0500 |
---|---|---|
committer | gregs <greg@10gen.com> | 2014-01-23 11:29:56 -0500 |
commit | cf99842a3a495cb86efca4d702fb8a45e5768072 (patch) | |
tree | e9c25e724305877746ce5d4b514310905602f330 /src | |
parent | a6c4e86b0aba95fd34ef4b912909e9dd89d8425e (diff) | |
download | mongo-cf99842a3a495cb86efca4d702fb8a45e5768072.tar.gz |
SERVER-12367 detect halted progress in write command execution, report error
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/base/error_codes.err | 1 | ||||
-rw-r--r-- | src/mongo/db/commands/write_commands/batch_executor.cpp | 58 | ||||
-rw-r--r-- | src/mongo/s/chunk_manager_targeter.cpp | 38 | ||||
-rw-r--r-- | src/mongo/s/chunk_manager_targeter.h | 4 | ||||
-rw-r--r-- | src/mongo/s/chunk_version.h | 25 | ||||
-rw-r--r-- | src/mongo/s/chunk_version_test.cpp | 29 | ||||
-rw-r--r-- | src/mongo/s/mock_multi_write_command.h | 48 | ||||
-rw-r--r-- | src/mongo/s/mock_ns_targeter.h | 4 | ||||
-rw-r--r-- | src/mongo/s/ns_targeter.h | 11 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_exec.cpp | 121 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_exec.h | 19 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_exec_test.cpp | 251 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op.cpp | 41 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op.h | 21 |
14 files changed, 517 insertions, 154 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index f613db103ea..36e1fd0f6dc 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -81,6 +81,7 @@ error_code("CommandResultSchemaViolation", 78) error_code("UnknownReplWriteConcern", 79) error_code("RoleDataInconsistent", 80) error_code("NoClientContext", 81) +error_code("NoProgressMade", 82) # Non-sequential error codes (for compatibility only) error_code("DuplicateKey", 11000) diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp index 1e99bbb4d58..7a0db71f651 100644 --- a/src/mongo/db/commands/write_commands/batch_executor.cpp +++ b/src/mongo/db/commands/write_commands/batch_executor.cpp @@ -42,6 +42,7 @@ #include "mongo/db/pagefault.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_server_status.h" +#include "mongo/db/server_parameters.h" #include "mongo/db/stats/counters.h" #include "mongo/db/write_concern.h" #include "mongo/s/collection_metadata.h" @@ -53,6 +54,9 @@ namespace mongo { + // TODO: Determine queueing behavior we want here + MONGO_EXPORT_SERVER_PARAMETER( queueForMigrationCommit, bool, true ); + using mongoutils::str::stream; WriteBatchExecutor::WriteBatchExecutor( const BSONObj& wc, @@ -90,6 +94,14 @@ namespace mongo { return error; } + static void noteInCriticalSection( WriteErrorDetail* staleError ) { + BSONObjBuilder builder; + if ( staleError->isErrInfoSet() ) + builder.appendElements( staleError->getErrInfo() ); + builder.append( "inCriticalSection", true ); + staleError->setErrInfo( builder.obj() ); + } + void WriteBatchExecutor::executeBatch( const BatchedCommandRequest& request, BatchedCommandResponse* response ) { @@ -162,8 +174,6 @@ namespace mongo { bool staleBatch = !writeErrors.empty() && writeErrors.back()->getErrCode() == ErrorCodes::StaleShardVersion; - // TODO: Audit where we want to queue here - the shardingState calls may block for remote - // data if ( staleBatch ) { const BatchedRequestMetadata* requestMetadata = request.getMetadata(); @@ -171,11 +181,53 @@ namespace mongo { // Make sure our shard name is set or is the same as what was set previously if ( shardingState.setShardName( requestMetadata->getShardName() ) ) { - // Refresh our shard version + + // + // First, we refresh metadata if we need to based on the requested version. + // + ChunkVersion latestShardVersion; shardingState.refreshMetadataIfNeeded( request.getTargetingNS(), requestMetadata->getShardVersion(), &latestShardVersion ); + + // Report if we're still changing our metadata + // TODO: Better reporting per-collection + if ( shardingState.inCriticalMigrateSection() ) { + noteInCriticalSection( writeErrors.back() ); + } + + if ( queueForMigrationCommit ) { + + // + // Queue up for migration to end - this allows us to be sure that clients will + // not repeatedly try to refresh metadata that is not yet written to the config + // server. Not necessary for correctness. + // Exposed as optional parameter to allow testing of queuing behavior with + // different network timings. + // + + const ChunkVersion& requestShardVersion = requestMetadata->getShardVersion(); + + // + // Only wait if we're an older version (in the current collection epoch) and + // we're not write compatible, implying that the current migration is affecting + // writes. + // + + if ( requestShardVersion.isOlderThan( latestShardVersion ) && + !requestShardVersion.isWriteCompatibleWith( latestShardVersion ) ) { + + while ( shardingState.inCriticalMigrateSection() ) { + + log() << "write request to old shard version " + << requestMetadata->getShardVersion().toString() + << " waiting for migration commit" << endl; + + shardingState.waitTillNotInCriticalSection( 10 /* secs */); + } + } + } } else { // If our shard name is stale, our version must have been stale as well diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp index b992e7b5fdf..80c01ad0f34 100644 --- a/src/mongo/s/chunk_manager_targeter.cpp +++ b/src/mongo/s/chunk_manager_targeter.cpp @@ -444,20 +444,40 @@ namespace mongo { /** * Whether or not the manager/primary pair is different from the other manager/primary pair */ + bool isMetadataDifferent( const ChunkManagerPtr& managerA, + const ShardPtr& primaryA, + const ChunkManagerPtr& managerB, + const ShardPtr& primaryB ) { + + if ( ( managerA && !managerB ) || ( !managerA && managerB ) || ( primaryA && !primaryB ) + || ( !primaryA && primaryB ) ) return true; + + if ( managerA ) { + return !managerA->getVersion().isStrictlyEqualTo( managerB->getVersion() ); + } + + dassert( NULL != primaryA.get() ); + return primaryA->getName() != primaryB->getName(); + } + + /** + * Whether or not the manager/primary pair was changed or refreshed from a previous version + * of the metadata. + */ bool wasMetadataRefreshed( const ChunkManagerPtr& managerA, const ShardPtr& primaryA, const ChunkManagerPtr& managerB, const ShardPtr& primaryB ) { - if ( ( managerA && !managerB ) || ( !managerA && managerB ) || ( primaryA && !primaryB ) - || ( !primaryA && primaryB ) ) return true; + if ( isMetadataDifferent( managerA, primaryA, managerB, primaryB ) ) + return true; if ( managerA ) { + dassert( managerB.get() ); // otherwise metadata would be different return managerA->getSequenceNumber() != managerB->getSequenceNumber(); } - dassert( NULL != primaryA.get() ); - return primaryA->getName() != primaryB->getName(); + return false; } } @@ -491,7 +511,13 @@ namespace mongo { return _stats.get(); } - Status ChunkManagerTargeter::refreshIfNeeded() { + Status ChunkManagerTargeter::refreshIfNeeded( bool *wasChanged ) { + + bool dummy; + if ( !wasChanged ) + wasChanged = &dummy; + + *wasChanged = false; // // Did we have any stale config or targeting errors at all? @@ -544,6 +570,7 @@ namespace mongo { return refreshNow( RefreshType_RefreshChunkManager ); } + *wasChanged = isMetadataDifferent( lastManager, lastPrimary, _manager, _primary ); return Status::OK(); } else if ( !_remoteShardVersions.empty() ) { @@ -566,6 +593,7 @@ namespace mongo { return refreshNow( RefreshType_RefreshChunkManager ); } + *wasChanged = isMetadataDifferent( lastManager, lastPrimary, _manager, _primary ); return Status::OK(); } diff --git a/src/mongo/s/chunk_manager_targeter.h b/src/mongo/s/chunk_manager_targeter.h index 2d57eddafad..bb1f63e283b 100644 --- a/src/mongo/s/chunk_manager_targeter.h +++ b/src/mongo/s/chunk_manager_targeter.h @@ -84,9 +84,11 @@ namespace mongo { * information is stale WRT the noted stale responses or a remote refresh is needed due * to a targeting failure, will contact the config servers to reload the metadata. * + * Reports wasChanged = true if the metadata is different after this reload. + * * Also see NSTargeter::refreshIfNeeded(). */ - Status refreshIfNeeded(); + Status refreshIfNeeded( bool* wasChanged ); /** * Returns the stats. Note that the returned stats object is still owned by this targeter. diff --git a/src/mongo/s/chunk_version.h b/src/mongo/s/chunk_version.h index 3aac6eb66af..b8955906780 100644 --- a/src/mongo/s/chunk_version.h +++ b/src/mongo/s/chunk_version.h @@ -188,6 +188,31 @@ namespace mongo { return otherVersion._combined == _combined; } + /** + * Returns true if the otherVersion is the same as this version and enforces strict epoch + * checking (empty epochs are not wildcards). + */ + bool isStrictlyEqualTo( const ChunkVersion& otherVersion ) const { + if ( otherVersion._epoch != _epoch ) + return false; + return otherVersion._combined == _combined; + } + + /** + * Returns true if this version is (strictly) in the same epoch as the other version and + * this version is older. Returns false if we're not sure because the epochs are different + * or if this version is newer. + */ + bool isOlderThan( const ChunkVersion& otherVersion ) const { + if ( otherVersion._epoch != _epoch ) + return false; + + if ( _major != otherVersion._major ) + return _major < otherVersion._major; + + return _minor < otherVersion._minor; + } + // Is this in the same epoch? bool hasCompatibleEpoch( const ChunkVersion& otherVersion ) const { return hasCompatibleEpoch( otherVersion._epoch ); diff --git a/src/mongo/s/chunk_version_test.cpp b/src/mongo/s/chunk_version_test.cpp index c5136ac34dc..52183487e21 100644 --- a/src/mongo/s/chunk_version_test.cpp +++ b/src/mongo/s/chunk_version_test.cpp @@ -96,5 +96,34 @@ namespace { ASSERT( parsed.epoch().isSet() ); } + TEST(Comparison, StrictEqual) { + + OID epoch = OID::gen(); + + ASSERT( ChunkVersion( 3, 1, epoch ).isStrictlyEqualTo( ChunkVersion( 3, 1, epoch ) ) ); + ASSERT( !ChunkVersion( 3, 1, epoch ).isStrictlyEqualTo( ChunkVersion( 3, 1, OID() ) ) ); + ASSERT( !ChunkVersion( 3, 1, OID() ).isStrictlyEqualTo( ChunkVersion( 3, 1, epoch ) ) ); + ASSERT( ChunkVersion( 3, 1, OID() ).isStrictlyEqualTo( ChunkVersion( 3, 1, OID() ) ) ); + ASSERT( !ChunkVersion( 4, 2, epoch ).isStrictlyEqualTo( ChunkVersion( 4, 1, epoch ) ) ); + } + + TEST(Comparison, OlderThan) { + + OID epoch = OID::gen(); + + ASSERT( ChunkVersion( 3, 1, epoch ).isOlderThan( ChunkVersion( 4, 1, epoch ) ) ); + ASSERT( !ChunkVersion( 4, 1, epoch ).isOlderThan( ChunkVersion( 3, 1, epoch ) ) ); + + ASSERT( ChunkVersion( 3, 1, epoch ).isOlderThan( ChunkVersion( 3, 2, epoch ) ) ); + ASSERT( !ChunkVersion( 3, 2, epoch ).isOlderThan( ChunkVersion( 3, 1, epoch ) ) ); + + ASSERT( !ChunkVersion( 3, 1, epoch ).isOlderThan( ChunkVersion( 4, 1, OID() ) ) ); + ASSERT( !ChunkVersion( 4, 1, OID() ).isOlderThan( ChunkVersion( 3, 1, epoch ) ) ); + + ASSERT( ChunkVersion( 3, 2, epoch ).isOlderThan( ChunkVersion( 4, 1, epoch ) ) ); + + ASSERT( !ChunkVersion( 3, 1, epoch ).isOlderThan( ChunkVersion( 3, 1, epoch ) ) ); + } + } // unnamed namespace } // namespace mongo diff --git a/src/mongo/s/mock_multi_write_command.h b/src/mongo/s/mock_multi_write_command.h index bb8301e4817..08c79af2505 100644 --- a/src/mongo/s/mock_multi_write_command.h +++ b/src/mongo/s/mock_multi_write_command.h @@ -41,15 +41,25 @@ namespace mongo { * A ConnectionString endpoint registered with some kind of error, to simulate returning when * the endpoint is used. */ - struct MockEndpoint { + struct MockWriteResult { + + MockWriteResult( const ConnectionString& endpoint, const WriteErrorDetail& error ) : + endpoint( endpoint ) { + WriteErrorDetail* errorCopy = new WriteErrorDetail; + error.cloneTo( errorCopy ); + errorCopy->setIndex( 0 ); + response.setOk(true); + response.setN(0); + response.addToErrDetails( errorCopy ); + } - MockEndpoint( const ConnectionString& endpoint, const WriteErrorDetail& error ) : - endpoint( endpoint ) { - error.cloneTo( &this->error ); + MockWriteResult( const ConnectionString& endpoint, const BatchedCommandResponse& response ) : + endpoint( endpoint ) { + response.cloneTo( &this->response ); } const ConnectionString endpoint; - WriteErrorDetail error; + BatchedCommandResponse response; }; /** @@ -67,7 +77,7 @@ namespace mongo { class MockMultiWriteCommand : public MultiCommandDispatch { public: - void init( const std::vector<MockEndpoint*> mockEndpoints ) { + void init( const std::vector<MockWriteResult*> mockEndpoints ) { ASSERT( !mockEndpoints.empty() ); _mockEndpoints.mutableVector().insert( _mockEndpoints.mutableVector().end(), mockEndpoints.begin(), @@ -98,39 +108,35 @@ namespace mongo { static_cast<BatchedCommandResponse*>( response ); *endpoint = _pending.front(); - MockEndpoint* mockEndpoint = releaseByHost( _pending.front() ); + MockWriteResult* mockResponse = releaseByHost( _pending.front() ); _pending.pop_front(); - if ( NULL == mockEndpoint ) { + if ( NULL == mockResponse ) { batchResponse->setOk( true ); batchResponse->setN( 0 ); // TODO: Make this accurate } else { - batchResponse->setOk( false ); - batchResponse->setN( 0 ); - batchResponse->setErrCode( mockEndpoint->error.getErrCode() ); - batchResponse->setErrMessage( mockEndpoint->error.getErrMessage() ); - delete mockEndpoint; + mockResponse->response.cloneTo( batchResponse ); + delete mockResponse; } - string errMsg; - ASSERT( batchResponse->isValid( &errMsg ) ); + ASSERT( batchResponse->isValid( NULL ) ); return Status::OK(); } - const std::vector<MockEndpoint*>& getEndpoints() const { + const std::vector<MockWriteResult*>& getEndpoints() const { return _mockEndpoints.vector(); } private: // Find a MockEndpoint* by host, and release it so we don't see it again - MockEndpoint* releaseByHost( const ConnectionString& endpoint ) { - std::vector<MockEndpoint*>& endpoints = _mockEndpoints.mutableVector(); + MockWriteResult* releaseByHost( const ConnectionString& endpoint ) { + std::vector<MockWriteResult*>& endpoints = _mockEndpoints.mutableVector(); - for ( std::vector<MockEndpoint*>::iterator it = endpoints.begin(); + for ( std::vector<MockWriteResult*>::iterator it = endpoints.begin(); it != endpoints.end(); ++it ) { - MockEndpoint* storedEndpoint = *it; + MockWriteResult* storedEndpoint = *it; if ( storedEndpoint->endpoint.toString().compare( endpoint.toString() ) == 0 ) { endpoints.erase( it ); return storedEndpoint; @@ -141,7 +147,7 @@ namespace mongo { } // Manually-stored ranges - OwnedPointerVector<MockEndpoint> _mockEndpoints; + OwnedPointerVector<MockWriteResult> _mockEndpoints; std::deque<ConnectionString> _pending; }; diff --git a/src/mongo/s/mock_ns_targeter.h b/src/mongo/s/mock_ns_targeter.h index c648f59f640..144702faea3 100644 --- a/src/mongo/s/mock_ns_targeter.h +++ b/src/mongo/s/mock_ns_targeter.h @@ -140,8 +140,10 @@ namespace mongo { // No-op } - Status refreshIfNeeded() { + Status refreshIfNeeded( bool* wasChanged ) { // No-op + if ( wasChanged ) + *wasChanged = false; return Status::OK(); } diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h index 093f56db72f..4baae5bd6cf 100644 --- a/src/mongo/s/ns_targeter.h +++ b/src/mongo/s/ns_targeter.h @@ -57,9 +57,9 @@ namespace mongo { * * 3. Goto 0. * - * The refreshIfNeeded() operation must make progress against noted targeting or stale config - * failures, see comments below. No functions may block for shared resources or network calls - * except refreshIfNeeded(). + * The refreshIfNeeded() operation must try to make progress against noted targeting or stale + * config failures, see comments below. No functions may block for shared resources or network + * calls except refreshIfNeeded(). * * Implementers are free to define more specific targeting error codes to allow more complex * error handling. @@ -131,12 +131,13 @@ namespace mongo { * * After this function is called, the targeter should be in a state such that the noted * stale responses are not seen again and if a targeting failure occurred it reloaded - - * it should make progress. + * it should try to make progress. If provided, wasChanged is set to true if the targeting + * information used here was changed. * * NOTE: This function may block for shared resources or network calls. * Returns !OK with message if could not refresh */ - virtual Status refreshIfNeeded() = 0; + virtual Status refreshIfNeeded( bool* wasChanged ) = 0; }; diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp index 8ce53fbd8d6..81e05624400 100644 --- a/src/mongo/s/write_ops/batch_write_exec.cpp +++ b/src/mongo/s/write_ops/batch_write_exec.cpp @@ -30,6 +30,7 @@ #include "mongo/base/error_codes.h" #include "mongo/base/status.h" +#include "mongo/bson/util/builder.h" #include "mongo/client/dbclientinterface.h" // ConnectionString (header-only) #include "mongo/s/write_ops/batch_write_op.h" #include "mongo/s/write_ops/write_error_detail.h" @@ -66,42 +67,38 @@ namespace mongo { for ( vector<ShardError*>::const_iterator it = staleErrors.begin(); it != staleErrors.end(); ++it ) { const ShardError* error = *it; - targeter->noteStaleResponse( error->endpoint, error->error.getErrInfo() ); + targeter->noteStaleResponse( error->endpoint, + error->error.isErrInfoSet() ? error->error.getErrInfo() : + BSONObj() ); } } + static bool isShardMetadataChanging( const vector<ShardError*>& staleErrors ) { + if ( !staleErrors.empty() && staleErrors.back()->error.isErrInfoSet() ) + return staleErrors.back()->error.getErrInfo()["inCriticalSection"].trueValue(); + return false; + } + + // The number of times we'll try to continue a batch op if no progress is being made + // This only applies when no writes are occurring and metadata is not changing on reload + static const int kMaxRoundsWithoutProgress( 5 ); + void BatchWriteExec::executeBatch( const BatchedCommandRequest& clientRequest, BatchedCommandResponse* clientResponse ) { BatchWriteOp batchOp; batchOp.initClientRequest( &clientRequest ); - int numTargetErrors = 0; - int numStaleBatches = 0; - int numResolveFailures = 0; - - for ( int rounds = 0; !batchOp.isFinished(); rounds++ ) { + // Current batch status + bool refreshedTargeter = false; + int rounds = 0; + int numCompletedOps = 0; + int numRoundsWithoutProgress = 0; - // - // Refresh the targeter if we need to (no-op if nothing stale) - // - - Status refreshStatus = _targeter->refreshIfNeeded(); - - if ( !refreshStatus.isOK() ) { - - // It's okay if we can't refresh, we'll just record errors for the ops if - // needed. - warning() << "could not refresh targeter" << causedBy( refreshStatus.reason() ) - << endl; - } + while ( !batchOp.isFinished() ) { // - // Get child batches to send - // - - vector<TargetedWriteBatch*> childBatches; - + // Get child batches to send using the targeter // // Targeting errors can be caused by remote metadata changing (the collection could have // been dropped and recreated, for example with a new shard key). If a remote metadata @@ -123,16 +120,21 @@ namespace mongo { // deliver in this case, since for all the client knows we may have gotten the batch // exactly when the metadata changed. // - // If we've had a targeting error or stale error, we've refreshed the metadata once and - // can record target errors. - bool recordTargetErrors = numTargetErrors > 0 || numStaleBatches > 0; + + vector<TargetedWriteBatch*> childBatches; + + // If we've already had a targeting error, we've refreshed the metadata once and can + // record target errors definitively. + bool recordTargetErrors = refreshedTargeter; Status targetStatus = batchOp.targetBatch( *_targeter, recordTargetErrors, &childBatches ); if ( !targetStatus.isOK() ) { + // Don't do anything until a targeter refresh _targeter->noteCouldNotTarget(); - ++numTargetErrors; - continue; + refreshedTargeter = true; + ++_stats->numTargetErrors; + dassert( childBatches.size() == 0u ); } // @@ -141,6 +143,7 @@ namespace mongo { size_t numSent = 0; size_t numToSend = childBatches.size(); + bool remoteMetadataChanging = false; while ( numSent != numToSend ) { // Collect batches out on the network, mapped by endpoint @@ -169,7 +172,7 @@ namespace mongo { &shardHost ); if ( !resolveStatus.isOK() ) { - ++numResolveFailures; + ++_stats->numResolveErrors; // Record a resolve failure // TODO: It may be necessary to refresh the cache if stale, or maybe just @@ -246,7 +249,12 @@ namespace mongo { if ( staleErrors.size() > 0 ) { noteStaleResponses( staleErrors, _targeter ); - ++numStaleBatches; + ++_stats->numStaleBatches; + } + + // Remember if the shard is actively changing metadata right now + if ( isShardMetadataChanging( staleErrors ) ) { + remoteMetadataChanging = true; } // Remember that we successfully wrote to this shard @@ -265,6 +273,57 @@ namespace mongo { } } } + + ++rounds; + ++_stats->numRounds; + + // If we're done, get out + if ( batchOp.isFinished() ) + break; + + // MORE WORK TO DO + + // + // Refresh the targeter if we need to (no-op if nothing stale) + // + + bool targeterChanged = false; + Status refreshStatus = _targeter->refreshIfNeeded( &targeterChanged ); + + if ( !refreshStatus.isOK() ) { + + // It's okay if we can't refresh, we'll just record errors for the ops if + // needed. + warning() << "could not refresh targeter" << causedBy( refreshStatus.reason() ) + << endl; + } + + // + // Ensure progress is being made toward completing the batch op + // + + int currCompletedOps = batchOp.numWriteOpsIn( WriteOpState_Completed ); + if ( currCompletedOps == numCompletedOps && !targeterChanged + && !remoteMetadataChanging ) { + ++numRoundsWithoutProgress; + } + else { + numRoundsWithoutProgress = 0; + } + numCompletedOps = currCompletedOps; + + if ( numRoundsWithoutProgress > kMaxRoundsWithoutProgress ) { + + stringstream msg; + msg << "no progress was made executing batch write op in " << clientRequest.getNS() + << " after " << kMaxRoundsWithoutProgress << " rounds (" << numCompletedOps + << " ops completed in " << rounds << " rounds total)"; + + WriteErrorDetail error; + buildErrorFrom( Status( ErrorCodes::NoProgressMade, msg.str() ), &error ); + batchOp.setBatchError( error ); + break; + } } batchOp.buildClientResponse( clientResponse ); diff --git a/src/mongo/s/write_ops/batch_write_exec.h b/src/mongo/s/write_ops/batch_write_exec.h index d9324f2f8ae..5967d25a6b7 100644 --- a/src/mongo/s/write_ops/batch_write_exec.h +++ b/src/mongo/s/write_ops/batch_write_exec.h @@ -72,11 +72,7 @@ namespace mongo { * Executes a client batch write request by sending child batches to several shard * endpoints, and returns a client batch write response. * - * Several network round-trips are generally required to execute a write batch. - * * This function does not throw, any errors are reported via the clientResponse. - * - * TODO: Stats? */ void executeBatch( const BatchedCommandRequest& clientRequest, BatchedCommandResponse* clientResponse ); @@ -113,12 +109,25 @@ namespace mongo { class BatchWriteExecStats { public: - // TODO: Other stats can go here + BatchWriteExecStats() : + numRounds( 0 ), numTargetErrors( 0 ), numResolveErrors( 0 ), numStaleBatches( 0 ) { + } void noteWriteAt( const ConnectionString& host, OpTime opTime ); const HostOpTimeMap& getWriteOpTimes() const; + // Expose via helpers if this gets more complex + + // Number of round trips required for the batch + int numRounds; + // Number of times targeting failed + int numTargetErrors; + // Number of times host resolution failed + int numResolveErrors; + // Number of stale batches + int numStaleBatches; + private: HostOpTimeMap _writeOpTimes; diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp index acf62ab2027..b990a3d64de 100644 --- a/src/mongo/s/write_ops/batch_write_exec_test.cpp +++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp @@ -40,6 +40,44 @@ namespace { using namespace mongo; + /** + * Mimics a single shard backend for a particular collection which can be initialized with a + * set of write command results to return. + */ + class MockSingleShardBackend { + public: + + MockSingleShardBackend( const NamespaceString& nss ) { + + // Initialize targeting to a mock shard + ShardEndpoint endpoint( "shard", ChunkVersion::IGNORED() ); + vector<MockRange*> mockRanges; + mockRanges.push_back( new MockRange( endpoint, + nss, + BSON( "x" << MINKEY ), + BSON( "x" << MAXKEY ) ) ); + targeter.init( mockRanges ); + + // Get the connection string for the mock shard + resolver.chooseWriteHost( mockRanges.front()->endpoint.shardName, &shardHost ); + + // Executor using the mock backend + exec.reset( new BatchWriteExec( &targeter, &resolver, &dispatcher ) ); + } + + void setMockResults( const vector<MockWriteResult*>& results ) { + dispatcher.init( results ); + } + + ConnectionString shardHost; + + MockNSTargeter targeter; + MockShardResolver resolver; + MockMultiWriteCommand dispatcher; + + scoped_ptr<BatchWriteExec> exec; + }; + // // Tests for the BatchWriteExec // @@ -52,36 +90,21 @@ namespace { NamespaceString nss( "foo.bar" ); - ShardEndpoint endpoint( "shard", ChunkVersion::IGNORED() ); - - vector<MockRange*> mockRanges; - mockRanges.push_back( new MockRange( endpoint, - nss, - BSON( "x" << MINKEY ), - BSON( "x" << MAXKEY ) ) ); - - MockShardResolver resolver; + MockSingleShardBackend backend( nss ); BatchedCommandRequest request( BatchedCommandRequest::BatchType_Insert ); request.setNS( nss.ns() ); request.setOrdered( false ); request.setWriteConcern( BSONObj() ); - // Do single-target, single doc batch write op - request.getInsertRequest()->addToDocuments( BSON( "x" << 1 ) ); - MockNSTargeter targeter; - targeter.init( mockRanges ); - - MockMultiWriteCommand dispatcher; - - BatchWriteExec exec( &targeter, &resolver, &dispatcher ); - BatchedCommandResponse response; - exec.executeBatch( request, &response ); - + backend.exec->executeBatch( request, &response ); ASSERT( response.getOk() ); + + const BatchWriteExecStats& stats = backend.exec->getStats(); + ASSERT_EQUALS( stats.numRounds, 1 ); } TEST(BatchWriteExecTests, SingleOpError) { @@ -92,56 +115,39 @@ namespace { NamespaceString nss( "foo.bar" ); - ShardEndpoint endpoint( "shard", ChunkVersion::IGNORED() ); + MockSingleShardBackend backend( nss ); - vector<MockRange*> mockRanges; - mockRanges.push_back( new MockRange( endpoint, - nss, - BSON( "x" << MINKEY ), - BSON( "x" << MAXKEY ) ) ); + vector<MockWriteResult*> mockResults; + BatchedCommandResponse errResponse; + errResponse.setOk( false ); + errResponse.setErrCode( ErrorCodes::UnknownError ); + errResponse.setErrMessage( "mock error" ); + mockResults.push_back( new MockWriteResult( backend.shardHost, errResponse ) ); - MockShardResolver resolver; - ConnectionString shardHost; - resolver.chooseWriteHost( mockRanges.front()->endpoint.shardName, &shardHost ); - - vector<MockEndpoint*> mockEndpoints; - WriteErrorDetail error; - error.setErrCode( ErrorCodes::UnknownError ); - error.setErrMessage( "mock error" ); - mockEndpoints.push_back( new MockEndpoint( shardHost, error ) ); + backend.setMockResults( mockResults ); BatchedCommandRequest request( BatchedCommandRequest::BatchType_Insert ); request.setNS( nss.ns() ); request.setOrdered( false ); request.setWriteConcern( BSONObj() ); - // Do single-target, single doc batch write op - request.getInsertRequest()->addToDocuments( BSON( "x" << 1 ) ); - MockNSTargeter targeter; - targeter.init( mockRanges ); - - MockMultiWriteCommand dispatcher; - dispatcher.init( mockEndpoints ); - - BatchWriteExec exec( &targeter, &resolver, &dispatcher ); - BatchedCommandResponse response; - exec.executeBatch( request, &response ); - + backend.exec->executeBatch( request, &response ); ASSERT( !response.getOk() ); - ASSERT_EQUALS( response.getErrCode(), error.getErrCode() ); - ASSERT( response.getErrMessage().find( error.getErrMessage() ) != string::npos ); + ASSERT_EQUALS( response.getErrCode(), errResponse.getErrCode() ); + ASSERT( response.getErrMessage().find( errResponse.getErrMessage() ) != string::npos ); + + const BatchWriteExecStats& stats = backend.exec->getStats(); + ASSERT_EQUALS( stats.numRounds, 1 ); } // // Test retryable errors // -#if 0 - // XXX TODO: rewrite - StaleShardVersion will never be a top level error anymore. - TEST(BatchWriteExecTests, RetryOpError) { + TEST(BatchWriteExecTests, StaleOp) { // // Retry op in exec b/c of stale config @@ -149,45 +155,146 @@ namespace { NamespaceString nss( "foo.bar" ); - ShardEndpoint endpoint( "shard", ChunkVersion::IGNORED() ); + // Insert request + BatchedCommandRequest request( BatchedCommandRequest::BatchType_Insert ); + request.setNS( nss.ns() ); + request.setOrdered( false ); + request.setWriteConcern( BSONObj() ); + // Do single-target, single doc batch write op + request.getInsertRequest()->addToDocuments( BSON( "x" << 1 ) ); - vector<MockRange*> mockRanges; - mockRanges.push_back( new MockRange( endpoint, - nss, - BSON( "x" << MINKEY ), - BSON( "x" << MAXKEY ) ) ); + MockSingleShardBackend backend( nss ); - MockShardResolver resolver; - ConnectionString shardHost; - resolver.chooseWriteHost( mockRanges.front()->endpoint.shardName, &shardHost ); - - vector<MockEndpoint*> mockEndpoints; + vector<MockWriteResult*> mockResults; WriteErrorDetail error; error.setErrCode( ErrorCodes::StaleShardVersion ); - error.setErrInfo( BSONObj() ); // Needed for correct handling error.setErrMessage( "mock stale error" ); - mockEndpoints.push_back( new MockEndpoint( shardHost, error ) ); + mockResults.push_back( new MockWriteResult( backend.shardHost, error ) ); + + backend.setMockResults( mockResults ); + + // Execute request + BatchedCommandResponse response; + backend.exec->executeBatch( request, &response ); + ASSERT( response.getOk() ); + + const BatchWriteExecStats& stats = backend.exec->getStats(); + ASSERT_EQUALS( stats.numStaleBatches, 1 ); + } + + TEST(BatchWriteExecTests, MultiStaleOp) { + + // + // Retry op in exec multiple times b/c of stale config + // + + NamespaceString nss( "foo.bar" ); + // Insert request BatchedCommandRequest request( BatchedCommandRequest::BatchType_Insert ); request.setNS( nss.ns() ); request.setOrdered( false ); request.setWriteConcern( BSONObj() ); + // Do single-target, single doc batch write op + request.getInsertRequest()->addToDocuments( BSON( "x" << 1 ) ); + + MockSingleShardBackend backend( nss ); + + vector<MockWriteResult*> mockResults; + WriteErrorDetail error; + error.setErrCode( ErrorCodes::StaleShardVersion ); + error.setErrMessage( "mock stale error" ); + for ( int i = 0; i < 3; i++ ) { + mockResults.push_back( new MockWriteResult( backend.shardHost, error ) ); + } + + backend.setMockResults( mockResults ); + + // Execute request + BatchedCommandResponse response; + backend.exec->executeBatch( request, &response ); + ASSERT( response.getOk() ); + + const BatchWriteExecStats& stats = backend.exec->getStats(); + ASSERT_EQUALS( stats.numStaleBatches, 3 ); + } + + TEST(BatchWriteExecTests, TooManyStaleOp) { + // + // Retry op in exec too many times (without refresh) b/c of stale config + // (The mock targeter doesn't report progress on refresh) + // + + NamespaceString nss( "foo.bar" ); + + // Insert request + BatchedCommandRequest request( BatchedCommandRequest::BatchType_Insert ); + request.setNS( nss.ns() ); + request.setOrdered( false ); + request.setWriteConcern( BSONObj() ); // Do single-target, single doc batch write op + request.getInsertRequest()->addToDocuments( BSON( "x" << 1 ) ); + MockSingleShardBackend backend( nss ); + + vector<MockWriteResult*> mockResults; + WriteErrorDetail error; + error.setErrCode( ErrorCodes::StaleShardVersion ); + error.setErrMessage( "mock stale error" ); + for ( int i = 0; i < 10; i++ ) { + mockResults.push_back( new MockWriteResult( backend.shardHost, error ) ); + } + + backend.setMockResults( mockResults ); + + // Execute request + BatchedCommandResponse response; + backend.exec->executeBatch( request, &response ); + ASSERT( !response.getOk() ); + ASSERT_EQUALS( response.getErrCode(), ErrorCodes::NoProgressMade ); + } + + TEST(BatchWriteExecTests, ManyStaleOpWithMigration) { + + // + // Retry op in exec many times b/c of stale config, but simulate remote migrations occurring + // + + NamespaceString nss( "foo.bar" ); + + // Insert request + BatchedCommandRequest request( BatchedCommandRequest::BatchType_Insert ); + request.setNS( nss.ns() ); + request.setOrdered( false ); + request.setWriteConcern( BSONObj() ); + // Do single-target, single doc batch write op request.getInsertRequest()->addToDocuments( BSON( "x" << 1 ) ); - MockNSTargeter targeter; - targeter.init( mockRanges ); + MockSingleShardBackend backend( nss ); - MockMultiWriteCommand dispatcher; - dispatcher.init( mockEndpoints ); + vector<MockWriteResult*> mockResults; + WriteErrorDetail error; + error.setErrCode( ErrorCodes::StaleShardVersion ); + error.setErrMessage( "mock stale error" ); + for ( int i = 0; i < 10; i++ ) { + if ( i % 2 == 0 ) + error.setErrInfo( BSONObj() ); + else + error.setErrInfo( BSON( "inCriticalSection" << true ) ); - BatchWriteExec exec( &targeter, &resolver, &dispatcher ); + mockResults.push_back( new MockWriteResult( backend.shardHost, error ) ); + } + backend.setMockResults( mockResults ); + + // Execute request BatchedCommandResponse response; - exec.executeBatch( request, &response ); + backend.exec->executeBatch( request, &response ); ASSERT( response.getOk() ); + + const BatchWriteExecStats& stats = backend.exec->getStats(); + ASSERT_EQUALS( stats.numStaleBatches, 10 ); } -#endif + } // unnamed namespace diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index b7e04ffe1bc..67a3676aa73 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -397,7 +397,7 @@ namespace mongo { if ( !response.getOk() ) { WriteErrorDetail batchError; cloneBatchErrorTo( response, &batchError ); - _batchCommandError.reset( new ShardError( targetedBatch.getEndpoint(), + _batchError.reset( new ShardError( targetedBatch.getEndpoint(), batchError )); cancelBatch( targetedBatch, _writeOps, batchError ); return; @@ -495,8 +495,12 @@ namespace mongo { noteBatchResponse( targetedBatch, response, NULL ); } + void BatchWriteOp::setBatchError( const WriteErrorDetail& error ) { + _batchError.reset( new ShardError( ShardEndpoint(), error ) ); + } + bool BatchWriteOp::isFinished() { - if ( _batchCommandError.get() ) return true; + if ( _batchError.get() ) return true; size_t numWriteOps = _clientRequest->sizeWriteOps(); bool orderedOps = _clientRequest->getOrdered(); @@ -517,15 +521,18 @@ namespace mongo { dassert( isFinished() ); - if ( _batchCommandError.get() ) { + if ( _batchError.get() ) { + batchResp->setOk( false ); + batchResp->setErrCode( _batchError->error.getErrCode() ); - const WriteErrorDetail& error = _batchCommandError->error; - batchResp->setErrCode( error.getErrCode() ); + stringstream msg; + msg << _batchError->error.getErrMessage(); + if ( !_batchError->endpoint.shardName.empty() ) { + msg << " at " << _batchError->endpoint.shardName; + } - string errMsg( str::stream() << error.getErrMessage() << " at " - << _batchCommandError->endpoint.shardName ); - batchResp->setErrMessage( errMsg ); + batchResp->setErrMessage( msg.str() ); dassert( batchResp->isValid( NULL ) ); return; } @@ -634,6 +641,24 @@ namespace mongo { } } + int BatchWriteOp::numWriteOps() const { + return static_cast<int>( _clientRequest->sizeWriteOps() ); + } + + int BatchWriteOp::numWriteOpsIn( WriteOpState opState ) const { + + // TODO: This could be faster, if we tracked this info explicitly + size_t numWriteOps = _clientRequest->sizeWriteOps(); + int count = 0; + for ( size_t i = 0; i < numWriteOps; ++i ) { + WriteOp& writeOp = _writeOps[i]; + if ( writeOp.getWriteState() == opState ) + ++count; + } + + return count; + } + void TrackedErrors::startTracking( int errCode ) { dassert( !isTracking( errCode ) ); _errorMap.insert( make_pair( errCode, vector<ShardError*>() ) ); diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h index 8c3bc1c0b43..f79dfd0cde3 100644 --- a/src/mongo/s/write_ops/batch_write_op.h +++ b/src/mongo/s/write_ops/batch_write_op.h @@ -135,6 +135,13 @@ namespace mongo { const WriteErrorDetail& error ); /** + * Sets a command error for this batch op directly. + * + * Should only be used when there are no outstanding batches to return. + */ + void setBatchError( const WriteErrorDetail& error ); + + /** * Returns false if the batch write op needs more processing. */ bool isFinished(); @@ -144,6 +151,14 @@ namespace mongo { */ void buildClientResponse( BatchedCommandResponse* batchResp ); + // + // Accessors + // + + int numWriteOps() const; + + int numWriteOpsIn( WriteOpState state ) const; + private: // Incoming client request, not owned here @@ -162,8 +177,10 @@ namespace mongo { // Upserted ids for the whole write batch OwnedPointerVector<BatchedUpsertDetail> _upsertedIds; - // Use to store the error from the last shard that returned { ok: 0 }. - scoped_ptr<ShardError> _batchCommandError; + // Use to store a top-level error indicating that the batch aborted unexpectedly and we + // can't report on any of the writes sent. May also include a ShardEndpoint indicating + // where the root problem was. + scoped_ptr<ShardError> _batchError; // Stats for the entire batch op scoped_ptr<BatchWriteStats> _stats; |