summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGreg Studer <greg@10gen.com>2014-01-16 12:40:27 -0500
committergregs <greg@10gen.com>2014-01-23 11:29:56 -0500
commitcf99842a3a495cb86efca4d702fb8a45e5768072 (patch)
treee9c25e724305877746ce5d4b514310905602f330 /src
parenta6c4e86b0aba95fd34ef4b912909e9dd89d8425e (diff)
downloadmongo-cf99842a3a495cb86efca4d702fb8a45e5768072.tar.gz
SERVER-12367 detect halted progress in write command execution, report error
Diffstat (limited to 'src')
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/db/commands/write_commands/batch_executor.cpp58
-rw-r--r--src/mongo/s/chunk_manager_targeter.cpp38
-rw-r--r--src/mongo/s/chunk_manager_targeter.h4
-rw-r--r--src/mongo/s/chunk_version.h25
-rw-r--r--src/mongo/s/chunk_version_test.cpp29
-rw-r--r--src/mongo/s/mock_multi_write_command.h48
-rw-r--r--src/mongo/s/mock_ns_targeter.h4
-rw-r--r--src/mongo/s/ns_targeter.h11
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.cpp121
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.h19
-rw-r--r--src/mongo/s/write_ops/batch_write_exec_test.cpp251
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp41
-rw-r--r--src/mongo/s/write_ops/batch_write_op.h21
14 files changed, 517 insertions, 154 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index f613db103ea..36e1fd0f6dc 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -81,6 +81,7 @@ error_code("CommandResultSchemaViolation", 78)
error_code("UnknownReplWriteConcern", 79)
error_code("RoleDataInconsistent", 80)
error_code("NoClientContext", 81)
+error_code("NoProgressMade", 82)
# Non-sequential error codes (for compatibility only)
error_code("DuplicateKey", 11000)
diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp
index 1e99bbb4d58..7a0db71f651 100644
--- a/src/mongo/db/commands/write_commands/batch_executor.cpp
+++ b/src/mongo/db/commands/write_commands/batch_executor.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/pagefault.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/replication_server_status.h"
+#include "mongo/db/server_parameters.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/write_concern.h"
#include "mongo/s/collection_metadata.h"
@@ -53,6 +54,9 @@
namespace mongo {
+ // TODO: Determine queueing behavior we want here
+ MONGO_EXPORT_SERVER_PARAMETER( queueForMigrationCommit, bool, true );
+
using mongoutils::str::stream;
WriteBatchExecutor::WriteBatchExecutor( const BSONObj& wc,
@@ -90,6 +94,14 @@ namespace mongo {
return error;
}
+ static void noteInCriticalSection( WriteErrorDetail* staleError ) {
+ BSONObjBuilder builder;
+ if ( staleError->isErrInfoSet() )
+ builder.appendElements( staleError->getErrInfo() );
+ builder.append( "inCriticalSection", true );
+ staleError->setErrInfo( builder.obj() );
+ }
+
void WriteBatchExecutor::executeBatch( const BatchedCommandRequest& request,
BatchedCommandResponse* response ) {
@@ -162,8 +174,6 @@ namespace mongo {
bool staleBatch = !writeErrors.empty()
&& writeErrors.back()->getErrCode() == ErrorCodes::StaleShardVersion;
- // TODO: Audit where we want to queue here - the shardingState calls may block for remote
- // data
if ( staleBatch ) {
const BatchedRequestMetadata* requestMetadata = request.getMetadata();
@@ -171,11 +181,53 @@ namespace mongo {
// Make sure our shard name is set or is the same as what was set previously
if ( shardingState.setShardName( requestMetadata->getShardName() ) ) {
- // Refresh our shard version
+
+ //
+ // First, we refresh metadata if we need to based on the requested version.
+ //
+
ChunkVersion latestShardVersion;
shardingState.refreshMetadataIfNeeded( request.getTargetingNS(),
requestMetadata->getShardVersion(),
&latestShardVersion );
+
+ // Report if we're still changing our metadata
+ // TODO: Better reporting per-collection
+ if ( shardingState.inCriticalMigrateSection() ) {
+ noteInCriticalSection( writeErrors.back() );
+ }
+
+ if ( queueForMigrationCommit ) {
+
+ //
+ // Queue up for migration to end - this allows us to be sure that clients will
+ // not repeatedly try to refresh metadata that is not yet written to the config
+ // server. Not necessary for correctness.
+ // Exposed as optional parameter to allow testing of queuing behavior with
+ // different network timings.
+ //
+
+ const ChunkVersion& requestShardVersion = requestMetadata->getShardVersion();
+
+ //
+ // Only wait if we're an older version (in the current collection epoch) and
+ // we're not write compatible, implying that the current migration is affecting
+ // writes.
+ //
+
+ if ( requestShardVersion.isOlderThan( latestShardVersion ) &&
+ !requestShardVersion.isWriteCompatibleWith( latestShardVersion ) ) {
+
+ while ( shardingState.inCriticalMigrateSection() ) {
+
+ log() << "write request to old shard version "
+ << requestMetadata->getShardVersion().toString()
+ << " waiting for migration commit" << endl;
+
+ shardingState.waitTillNotInCriticalSection( 10 /* secs */);
+ }
+ }
+ }
}
else {
// If our shard name is stale, our version must have been stale as well
diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp
index b992e7b5fdf..80c01ad0f34 100644
--- a/src/mongo/s/chunk_manager_targeter.cpp
+++ b/src/mongo/s/chunk_manager_targeter.cpp
@@ -444,20 +444,40 @@ namespace mongo {
/**
* Whether or not the manager/primary pair is different from the other manager/primary pair
*/
+ bool isMetadataDifferent( const ChunkManagerPtr& managerA,
+ const ShardPtr& primaryA,
+ const ChunkManagerPtr& managerB,
+ const ShardPtr& primaryB ) {
+
+ if ( ( managerA && !managerB ) || ( !managerA && managerB ) || ( primaryA && !primaryB )
+ || ( !primaryA && primaryB ) ) return true;
+
+ if ( managerA ) {
+ return !managerA->getVersion().isStrictlyEqualTo( managerB->getVersion() );
+ }
+
+ dassert( NULL != primaryA.get() );
+ return primaryA->getName() != primaryB->getName();
+ }
+
+ /**
+ * Whether or not the manager/primary pair was changed or refreshed from a previous version
+ * of the metadata.
+ */
bool wasMetadataRefreshed( const ChunkManagerPtr& managerA,
const ShardPtr& primaryA,
const ChunkManagerPtr& managerB,
const ShardPtr& primaryB ) {
- if ( ( managerA && !managerB ) || ( !managerA && managerB ) || ( primaryA && !primaryB )
- || ( !primaryA && primaryB ) ) return true;
+ if ( isMetadataDifferent( managerA, primaryA, managerB, primaryB ) )
+ return true;
if ( managerA ) {
+ dassert( managerB.get() ); // otherwise metadata would be different
return managerA->getSequenceNumber() != managerB->getSequenceNumber();
}
- dassert( NULL != primaryA.get() );
- return primaryA->getName() != primaryB->getName();
+ return false;
}
}
@@ -491,7 +511,13 @@ namespace mongo {
return _stats.get();
}
- Status ChunkManagerTargeter::refreshIfNeeded() {
+ Status ChunkManagerTargeter::refreshIfNeeded( bool *wasChanged ) {
+
+ bool dummy;
+ if ( !wasChanged )
+ wasChanged = &dummy;
+
+ *wasChanged = false;
//
// Did we have any stale config or targeting errors at all?
@@ -544,6 +570,7 @@ namespace mongo {
return refreshNow( RefreshType_RefreshChunkManager );
}
+ *wasChanged = isMetadataDifferent( lastManager, lastPrimary, _manager, _primary );
return Status::OK();
}
else if ( !_remoteShardVersions.empty() ) {
@@ -566,6 +593,7 @@ namespace mongo {
return refreshNow( RefreshType_RefreshChunkManager );
}
+ *wasChanged = isMetadataDifferent( lastManager, lastPrimary, _manager, _primary );
return Status::OK();
}
diff --git a/src/mongo/s/chunk_manager_targeter.h b/src/mongo/s/chunk_manager_targeter.h
index 2d57eddafad..bb1f63e283b 100644
--- a/src/mongo/s/chunk_manager_targeter.h
+++ b/src/mongo/s/chunk_manager_targeter.h
@@ -84,9 +84,11 @@ namespace mongo {
* information is stale WRT the noted stale responses or a remote refresh is needed due
* to a targeting failure, will contact the config servers to reload the metadata.
*
+ * Reports wasChanged = true if the metadata is different after this reload.
+ *
* Also see NSTargeter::refreshIfNeeded().
*/
- Status refreshIfNeeded();
+ Status refreshIfNeeded( bool* wasChanged );
/**
* Returns the stats. Note that the returned stats object is still owned by this targeter.
diff --git a/src/mongo/s/chunk_version.h b/src/mongo/s/chunk_version.h
index 3aac6eb66af..b8955906780 100644
--- a/src/mongo/s/chunk_version.h
+++ b/src/mongo/s/chunk_version.h
@@ -188,6 +188,31 @@ namespace mongo {
return otherVersion._combined == _combined;
}
+ /**
+ * Returns true if the otherVersion is the same as this version and enforces strict epoch
+ * checking (empty epochs are not wildcards).
+ */
+ bool isStrictlyEqualTo( const ChunkVersion& otherVersion ) const {
+ if ( otherVersion._epoch != _epoch )
+ return false;
+ return otherVersion._combined == _combined;
+ }
+
+ /**
+ * Returns true if this version is (strictly) in the same epoch as the other version and
+ * this version is older. Returns false if we're not sure because the epochs are different
+ * or if this version is newer.
+ */
+ bool isOlderThan( const ChunkVersion& otherVersion ) const {
+ if ( otherVersion._epoch != _epoch )
+ return false;
+
+ if ( _major != otherVersion._major )
+ return _major < otherVersion._major;
+
+ return _minor < otherVersion._minor;
+ }
+
// Is this in the same epoch?
bool hasCompatibleEpoch( const ChunkVersion& otherVersion ) const {
return hasCompatibleEpoch( otherVersion._epoch );
diff --git a/src/mongo/s/chunk_version_test.cpp b/src/mongo/s/chunk_version_test.cpp
index c5136ac34dc..52183487e21 100644
--- a/src/mongo/s/chunk_version_test.cpp
+++ b/src/mongo/s/chunk_version_test.cpp
@@ -96,5 +96,34 @@ namespace {
ASSERT( parsed.epoch().isSet() );
}
+ TEST(Comparison, StrictEqual) {
+
+ OID epoch = OID::gen();
+
+ ASSERT( ChunkVersion( 3, 1, epoch ).isStrictlyEqualTo( ChunkVersion( 3, 1, epoch ) ) );
+ ASSERT( !ChunkVersion( 3, 1, epoch ).isStrictlyEqualTo( ChunkVersion( 3, 1, OID() ) ) );
+ ASSERT( !ChunkVersion( 3, 1, OID() ).isStrictlyEqualTo( ChunkVersion( 3, 1, epoch ) ) );
+ ASSERT( ChunkVersion( 3, 1, OID() ).isStrictlyEqualTo( ChunkVersion( 3, 1, OID() ) ) );
+ ASSERT( !ChunkVersion( 4, 2, epoch ).isStrictlyEqualTo( ChunkVersion( 4, 1, epoch ) ) );
+ }
+
+ TEST(Comparison, OlderThan) {
+
+ OID epoch = OID::gen();
+
+ ASSERT( ChunkVersion( 3, 1, epoch ).isOlderThan( ChunkVersion( 4, 1, epoch ) ) );
+ ASSERT( !ChunkVersion( 4, 1, epoch ).isOlderThan( ChunkVersion( 3, 1, epoch ) ) );
+
+ ASSERT( ChunkVersion( 3, 1, epoch ).isOlderThan( ChunkVersion( 3, 2, epoch ) ) );
+ ASSERT( !ChunkVersion( 3, 2, epoch ).isOlderThan( ChunkVersion( 3, 1, epoch ) ) );
+
+ ASSERT( !ChunkVersion( 3, 1, epoch ).isOlderThan( ChunkVersion( 4, 1, OID() ) ) );
+ ASSERT( !ChunkVersion( 4, 1, OID() ).isOlderThan( ChunkVersion( 3, 1, epoch ) ) );
+
+ ASSERT( ChunkVersion( 3, 2, epoch ).isOlderThan( ChunkVersion( 4, 1, epoch ) ) );
+
+ ASSERT( !ChunkVersion( 3, 1, epoch ).isOlderThan( ChunkVersion( 3, 1, epoch ) ) );
+ }
+
} // unnamed namespace
} // namespace mongo
diff --git a/src/mongo/s/mock_multi_write_command.h b/src/mongo/s/mock_multi_write_command.h
index bb8301e4817..08c79af2505 100644
--- a/src/mongo/s/mock_multi_write_command.h
+++ b/src/mongo/s/mock_multi_write_command.h
@@ -41,15 +41,25 @@ namespace mongo {
* A ConnectionString endpoint registered with some kind of error, to simulate returning when
* the endpoint is used.
*/
- struct MockEndpoint {
+ struct MockWriteResult {
+
+ MockWriteResult( const ConnectionString& endpoint, const WriteErrorDetail& error ) :
+ endpoint( endpoint ) {
+ WriteErrorDetail* errorCopy = new WriteErrorDetail;
+ error.cloneTo( errorCopy );
+ errorCopy->setIndex( 0 );
+ response.setOk(true);
+ response.setN(0);
+ response.addToErrDetails( errorCopy );
+ }
- MockEndpoint( const ConnectionString& endpoint, const WriteErrorDetail& error ) :
- endpoint( endpoint ) {
- error.cloneTo( &this->error );
+ MockWriteResult( const ConnectionString& endpoint, const BatchedCommandResponse& response ) :
+ endpoint( endpoint ) {
+ response.cloneTo( &this->response );
}
const ConnectionString endpoint;
- WriteErrorDetail error;
+ BatchedCommandResponse response;
};
/**
@@ -67,7 +77,7 @@ namespace mongo {
class MockMultiWriteCommand : public MultiCommandDispatch {
public:
- void init( const std::vector<MockEndpoint*> mockEndpoints ) {
+ void init( const std::vector<MockWriteResult*> mockEndpoints ) {
ASSERT( !mockEndpoints.empty() );
_mockEndpoints.mutableVector().insert( _mockEndpoints.mutableVector().end(),
mockEndpoints.begin(),
@@ -98,39 +108,35 @@ namespace mongo {
static_cast<BatchedCommandResponse*>( response );
*endpoint = _pending.front();
- MockEndpoint* mockEndpoint = releaseByHost( _pending.front() );
+ MockWriteResult* mockResponse = releaseByHost( _pending.front() );
_pending.pop_front();
- if ( NULL == mockEndpoint ) {
+ if ( NULL == mockResponse ) {
batchResponse->setOk( true );
batchResponse->setN( 0 ); // TODO: Make this accurate
}
else {
- batchResponse->setOk( false );
- batchResponse->setN( 0 );
- batchResponse->setErrCode( mockEndpoint->error.getErrCode() );
- batchResponse->setErrMessage( mockEndpoint->error.getErrMessage() );
- delete mockEndpoint;
+ mockResponse->response.cloneTo( batchResponse );
+ delete mockResponse;
}
- string errMsg;
- ASSERT( batchResponse->isValid( &errMsg ) );
+ ASSERT( batchResponse->isValid( NULL ) );
return Status::OK();
}
- const std::vector<MockEndpoint*>& getEndpoints() const {
+ const std::vector<MockWriteResult*>& getEndpoints() const {
return _mockEndpoints.vector();
}
private:
// Find a MockEndpoint* by host, and release it so we don't see it again
- MockEndpoint* releaseByHost( const ConnectionString& endpoint ) {
- std::vector<MockEndpoint*>& endpoints = _mockEndpoints.mutableVector();
+ MockWriteResult* releaseByHost( const ConnectionString& endpoint ) {
+ std::vector<MockWriteResult*>& endpoints = _mockEndpoints.mutableVector();
- for ( std::vector<MockEndpoint*>::iterator it = endpoints.begin();
+ for ( std::vector<MockWriteResult*>::iterator it = endpoints.begin();
it != endpoints.end(); ++it ) {
- MockEndpoint* storedEndpoint = *it;
+ MockWriteResult* storedEndpoint = *it;
if ( storedEndpoint->endpoint.toString().compare( endpoint.toString() ) == 0 ) {
endpoints.erase( it );
return storedEndpoint;
@@ -141,7 +147,7 @@ namespace mongo {
}
// Manually-stored ranges
- OwnedPointerVector<MockEndpoint> _mockEndpoints;
+ OwnedPointerVector<MockWriteResult> _mockEndpoints;
std::deque<ConnectionString> _pending;
};
diff --git a/src/mongo/s/mock_ns_targeter.h b/src/mongo/s/mock_ns_targeter.h
index c648f59f640..144702faea3 100644
--- a/src/mongo/s/mock_ns_targeter.h
+++ b/src/mongo/s/mock_ns_targeter.h
@@ -140,8 +140,10 @@ namespace mongo {
// No-op
}
- Status refreshIfNeeded() {
+ Status refreshIfNeeded( bool* wasChanged ) {
// No-op
+ if ( wasChanged )
+ *wasChanged = false;
return Status::OK();
}
diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h
index 093f56db72f..4baae5bd6cf 100644
--- a/src/mongo/s/ns_targeter.h
+++ b/src/mongo/s/ns_targeter.h
@@ -57,9 +57,9 @@ namespace mongo {
*
* 3. Goto 0.
*
- * The refreshIfNeeded() operation must make progress against noted targeting or stale config
- * failures, see comments below. No functions may block for shared resources or network calls
- * except refreshIfNeeded().
+ * The refreshIfNeeded() operation must try to make progress against noted targeting or stale
+ * config failures, see comments below. No functions may block for shared resources or network
+ * calls except refreshIfNeeded().
*
* Implementers are free to define more specific targeting error codes to allow more complex
* error handling.
@@ -131,12 +131,13 @@ namespace mongo {
*
* After this function is called, the targeter should be in a state such that the noted
* stale responses are not seen again and if a targeting failure occurred it reloaded -
- * it should make progress.
+ * it should try to make progress. If provided, wasChanged is set to true if the targeting
+ * information used here was changed.
*
* NOTE: This function may block for shared resources or network calls.
* Returns !OK with message if could not refresh
*/
- virtual Status refreshIfNeeded() = 0;
+ virtual Status refreshIfNeeded( bool* wasChanged ) = 0;
};
diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp
index 8ce53fbd8d6..81e05624400 100644
--- a/src/mongo/s/write_ops/batch_write_exec.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec.cpp
@@ -30,6 +30,7 @@
#include "mongo/base/error_codes.h"
#include "mongo/base/status.h"
+#include "mongo/bson/util/builder.h"
#include "mongo/client/dbclientinterface.h" // ConnectionString (header-only)
#include "mongo/s/write_ops/batch_write_op.h"
#include "mongo/s/write_ops/write_error_detail.h"
@@ -66,42 +67,38 @@ namespace mongo {
for ( vector<ShardError*>::const_iterator it = staleErrors.begin(); it != staleErrors.end();
++it ) {
const ShardError* error = *it;
- targeter->noteStaleResponse( error->endpoint, error->error.getErrInfo() );
+ targeter->noteStaleResponse( error->endpoint,
+ error->error.isErrInfoSet() ? error->error.getErrInfo() :
+ BSONObj() );
}
}
+ static bool isShardMetadataChanging( const vector<ShardError*>& staleErrors ) {
+ if ( !staleErrors.empty() && staleErrors.back()->error.isErrInfoSet() )
+ return staleErrors.back()->error.getErrInfo()["inCriticalSection"].trueValue();
+ return false;
+ }
+
+ // The number of times we'll try to continue a batch op if no progress is being made
+ // This only applies when no writes are occurring and metadata is not changing on reload
+ static const int kMaxRoundsWithoutProgress( 5 );
+
void BatchWriteExec::executeBatch( const BatchedCommandRequest& clientRequest,
BatchedCommandResponse* clientResponse ) {
BatchWriteOp batchOp;
batchOp.initClientRequest( &clientRequest );
- int numTargetErrors = 0;
- int numStaleBatches = 0;
- int numResolveFailures = 0;
-
- for ( int rounds = 0; !batchOp.isFinished(); rounds++ ) {
+ // Current batch status
+ bool refreshedTargeter = false;
+ int rounds = 0;
+ int numCompletedOps = 0;
+ int numRoundsWithoutProgress = 0;
- //
- // Refresh the targeter if we need to (no-op if nothing stale)
- //
-
- Status refreshStatus = _targeter->refreshIfNeeded();
-
- if ( !refreshStatus.isOK() ) {
-
- // It's okay if we can't refresh, we'll just record errors for the ops if
- // needed.
- warning() << "could not refresh targeter" << causedBy( refreshStatus.reason() )
- << endl;
- }
+ while ( !batchOp.isFinished() ) {
//
- // Get child batches to send
- //
-
- vector<TargetedWriteBatch*> childBatches;
-
+ // Get child batches to send using the targeter
//
// Targeting errors can be caused by remote metadata changing (the collection could have
// been dropped and recreated, for example with a new shard key). If a remote metadata
@@ -123,16 +120,21 @@ namespace mongo {
// deliver in this case, since for all the client knows we may have gotten the batch
// exactly when the metadata changed.
//
- // If we've had a targeting error or stale error, we've refreshed the metadata once and
- // can record target errors.
- bool recordTargetErrors = numTargetErrors > 0 || numStaleBatches > 0;
+
+ vector<TargetedWriteBatch*> childBatches;
+
+ // If we've already had a targeting error, we've refreshed the metadata once and can
+ // record target errors definitively.
+ bool recordTargetErrors = refreshedTargeter;
Status targetStatus = batchOp.targetBatch( *_targeter,
recordTargetErrors,
&childBatches );
if ( !targetStatus.isOK() ) {
+ // Don't do anything until a targeter refresh
_targeter->noteCouldNotTarget();
- ++numTargetErrors;
- continue;
+ refreshedTargeter = true;
+ ++_stats->numTargetErrors;
+ dassert( childBatches.size() == 0u );
}
//
@@ -141,6 +143,7 @@ namespace mongo {
size_t numSent = 0;
size_t numToSend = childBatches.size();
+ bool remoteMetadataChanging = false;
while ( numSent != numToSend ) {
// Collect batches out on the network, mapped by endpoint
@@ -169,7 +172,7 @@ namespace mongo {
&shardHost );
if ( !resolveStatus.isOK() ) {
- ++numResolveFailures;
+ ++_stats->numResolveErrors;
// Record a resolve failure
// TODO: It may be necessary to refresh the cache if stale, or maybe just
@@ -246,7 +249,12 @@ namespace mongo {
if ( staleErrors.size() > 0 ) {
noteStaleResponses( staleErrors, _targeter );
- ++numStaleBatches;
+ ++_stats->numStaleBatches;
+ }
+
+ // Remember if the shard is actively changing metadata right now
+ if ( isShardMetadataChanging( staleErrors ) ) {
+ remoteMetadataChanging = true;
}
// Remember that we successfully wrote to this shard
@@ -265,6 +273,57 @@ namespace mongo {
}
}
}
+
+ ++rounds;
+ ++_stats->numRounds;
+
+ // If we're done, get out
+ if ( batchOp.isFinished() )
+ break;
+
+ // MORE WORK TO DO
+
+ //
+ // Refresh the targeter if we need to (no-op if nothing stale)
+ //
+
+ bool targeterChanged = false;
+ Status refreshStatus = _targeter->refreshIfNeeded( &targeterChanged );
+
+ if ( !refreshStatus.isOK() ) {
+
+ // It's okay if we can't refresh, we'll just record errors for the ops if
+ // needed.
+ warning() << "could not refresh targeter" << causedBy( refreshStatus.reason() )
+ << endl;
+ }
+
+ //
+ // Ensure progress is being made toward completing the batch op
+ //
+
+ int currCompletedOps = batchOp.numWriteOpsIn( WriteOpState_Completed );
+ if ( currCompletedOps == numCompletedOps && !targeterChanged
+ && !remoteMetadataChanging ) {
+ ++numRoundsWithoutProgress;
+ }
+ else {
+ numRoundsWithoutProgress = 0;
+ }
+ numCompletedOps = currCompletedOps;
+
+ if ( numRoundsWithoutProgress > kMaxRoundsWithoutProgress ) {
+
+ stringstream msg;
+ msg << "no progress was made executing batch write op in " << clientRequest.getNS()
+ << " after " << kMaxRoundsWithoutProgress << " rounds (" << numCompletedOps
+ << " ops completed in " << rounds << " rounds total)";
+
+ WriteErrorDetail error;
+ buildErrorFrom( Status( ErrorCodes::NoProgressMade, msg.str() ), &error );
+ batchOp.setBatchError( error );
+ break;
+ }
}
batchOp.buildClientResponse( clientResponse );
diff --git a/src/mongo/s/write_ops/batch_write_exec.h b/src/mongo/s/write_ops/batch_write_exec.h
index d9324f2f8ae..5967d25a6b7 100644
--- a/src/mongo/s/write_ops/batch_write_exec.h
+++ b/src/mongo/s/write_ops/batch_write_exec.h
@@ -72,11 +72,7 @@ namespace mongo {
* Executes a client batch write request by sending child batches to several shard
* endpoints, and returns a client batch write response.
*
- * Several network round-trips are generally required to execute a write batch.
- *
* This function does not throw, any errors are reported via the clientResponse.
- *
- * TODO: Stats?
*/
void executeBatch( const BatchedCommandRequest& clientRequest,
BatchedCommandResponse* clientResponse );
@@ -113,12 +109,25 @@ namespace mongo {
class BatchWriteExecStats {
public:
- // TODO: Other stats can go here
+ BatchWriteExecStats() :
+ numRounds( 0 ), numTargetErrors( 0 ), numResolveErrors( 0 ), numStaleBatches( 0 ) {
+ }
void noteWriteAt( const ConnectionString& host, OpTime opTime );
const HostOpTimeMap& getWriteOpTimes() const;
+ // Expose via helpers if this gets more complex
+
+ // Number of round trips required for the batch
+ int numRounds;
+ // Number of times targeting failed
+ int numTargetErrors;
+ // Number of times host resolution failed
+ int numResolveErrors;
+ // Number of stale batches
+ int numStaleBatches;
+
private:
HostOpTimeMap _writeOpTimes;
diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp
index acf62ab2027..b990a3d64de 100644
--- a/src/mongo/s/write_ops/batch_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp
@@ -40,6 +40,44 @@ namespace {
using namespace mongo;
+ /**
+ * Mimics a single shard backend for a particular collection which can be initialized with a
+ * set of write command results to return.
+ */
+ class MockSingleShardBackend {
+ public:
+
+ MockSingleShardBackend( const NamespaceString& nss ) {
+
+ // Initialize targeting to a mock shard
+ ShardEndpoint endpoint( "shard", ChunkVersion::IGNORED() );
+ vector<MockRange*> mockRanges;
+ mockRanges.push_back( new MockRange( endpoint,
+ nss,
+ BSON( "x" << MINKEY ),
+ BSON( "x" << MAXKEY ) ) );
+ targeter.init( mockRanges );
+
+ // Get the connection string for the mock shard
+ resolver.chooseWriteHost( mockRanges.front()->endpoint.shardName, &shardHost );
+
+ // Executor using the mock backend
+ exec.reset( new BatchWriteExec( &targeter, &resolver, &dispatcher ) );
+ }
+
+ void setMockResults( const vector<MockWriteResult*>& results ) {
+ dispatcher.init( results );
+ }
+
+ ConnectionString shardHost;
+
+ MockNSTargeter targeter;
+ MockShardResolver resolver;
+ MockMultiWriteCommand dispatcher;
+
+ scoped_ptr<BatchWriteExec> exec;
+ };
+
//
// Tests for the BatchWriteExec
//
@@ -52,36 +90,21 @@ namespace {
NamespaceString nss( "foo.bar" );
- ShardEndpoint endpoint( "shard", ChunkVersion::IGNORED() );
-
- vector<MockRange*> mockRanges;
- mockRanges.push_back( new MockRange( endpoint,
- nss,
- BSON( "x" << MINKEY ),
- BSON( "x" << MAXKEY ) ) );
-
- MockShardResolver resolver;
+ MockSingleShardBackend backend( nss );
BatchedCommandRequest request( BatchedCommandRequest::BatchType_Insert );
request.setNS( nss.ns() );
request.setOrdered( false );
request.setWriteConcern( BSONObj() );
-
// Do single-target, single doc batch write op
-
request.getInsertRequest()->addToDocuments( BSON( "x" << 1 ) );
- MockNSTargeter targeter;
- targeter.init( mockRanges );
-
- MockMultiWriteCommand dispatcher;
-
- BatchWriteExec exec( &targeter, &resolver, &dispatcher );
-
BatchedCommandResponse response;
- exec.executeBatch( request, &response );
-
+ backend.exec->executeBatch( request, &response );
ASSERT( response.getOk() );
+
+ const BatchWriteExecStats& stats = backend.exec->getStats();
+ ASSERT_EQUALS( stats.numRounds, 1 );
}
TEST(BatchWriteExecTests, SingleOpError) {
@@ -92,56 +115,39 @@ namespace {
NamespaceString nss( "foo.bar" );
- ShardEndpoint endpoint( "shard", ChunkVersion::IGNORED() );
+ MockSingleShardBackend backend( nss );
- vector<MockRange*> mockRanges;
- mockRanges.push_back( new MockRange( endpoint,
- nss,
- BSON( "x" << MINKEY ),
- BSON( "x" << MAXKEY ) ) );
+ vector<MockWriteResult*> mockResults;
+ BatchedCommandResponse errResponse;
+ errResponse.setOk( false );
+ errResponse.setErrCode( ErrorCodes::UnknownError );
+ errResponse.setErrMessage( "mock error" );
+ mockResults.push_back( new MockWriteResult( backend.shardHost, errResponse ) );
- MockShardResolver resolver;
- ConnectionString shardHost;
- resolver.chooseWriteHost( mockRanges.front()->endpoint.shardName, &shardHost );
-
- vector<MockEndpoint*> mockEndpoints;
- WriteErrorDetail error;
- error.setErrCode( ErrorCodes::UnknownError );
- error.setErrMessage( "mock error" );
- mockEndpoints.push_back( new MockEndpoint( shardHost, error ) );
+ backend.setMockResults( mockResults );
BatchedCommandRequest request( BatchedCommandRequest::BatchType_Insert );
request.setNS( nss.ns() );
request.setOrdered( false );
request.setWriteConcern( BSONObj() );
-
// Do single-target, single doc batch write op
-
request.getInsertRequest()->addToDocuments( BSON( "x" << 1 ) );
- MockNSTargeter targeter;
- targeter.init( mockRanges );
-
- MockMultiWriteCommand dispatcher;
- dispatcher.init( mockEndpoints );
-
- BatchWriteExec exec( &targeter, &resolver, &dispatcher );
-
BatchedCommandResponse response;
- exec.executeBatch( request, &response );
-
+ backend.exec->executeBatch( request, &response );
ASSERT( !response.getOk() );
- ASSERT_EQUALS( response.getErrCode(), error.getErrCode() );
- ASSERT( response.getErrMessage().find( error.getErrMessage() ) != string::npos );
+ ASSERT_EQUALS( response.getErrCode(), errResponse.getErrCode() );
+ ASSERT( response.getErrMessage().find( errResponse.getErrMessage() ) != string::npos );
+
+ const BatchWriteExecStats& stats = backend.exec->getStats();
+ ASSERT_EQUALS( stats.numRounds, 1 );
}
//
// Test retryable errors
//
-#if 0
- // XXX TODO: rewrite - StaleShardVersion will never be a top level error anymore.
- TEST(BatchWriteExecTests, RetryOpError) {
+ TEST(BatchWriteExecTests, StaleOp) {
//
// Retry op in exec b/c of stale config
@@ -149,45 +155,146 @@ namespace {
NamespaceString nss( "foo.bar" );
- ShardEndpoint endpoint( "shard", ChunkVersion::IGNORED() );
+ // Insert request
+ BatchedCommandRequest request( BatchedCommandRequest::BatchType_Insert );
+ request.setNS( nss.ns() );
+ request.setOrdered( false );
+ request.setWriteConcern( BSONObj() );
+ // Do single-target, single doc batch write op
+ request.getInsertRequest()->addToDocuments( BSON( "x" << 1 ) );
- vector<MockRange*> mockRanges;
- mockRanges.push_back( new MockRange( endpoint,
- nss,
- BSON( "x" << MINKEY ),
- BSON( "x" << MAXKEY ) ) );
+ MockSingleShardBackend backend( nss );
- MockShardResolver resolver;
- ConnectionString shardHost;
- resolver.chooseWriteHost( mockRanges.front()->endpoint.shardName, &shardHost );
-
- vector<MockEndpoint*> mockEndpoints;
+ vector<MockWriteResult*> mockResults;
WriteErrorDetail error;
error.setErrCode( ErrorCodes::StaleShardVersion );
- error.setErrInfo( BSONObj() ); // Needed for correct handling
error.setErrMessage( "mock stale error" );
- mockEndpoints.push_back( new MockEndpoint( shardHost, error ) );
+ mockResults.push_back( new MockWriteResult( backend.shardHost, error ) );
+
+ backend.setMockResults( mockResults );
+
+ // Execute request
+ BatchedCommandResponse response;
+ backend.exec->executeBatch( request, &response );
+ ASSERT( response.getOk() );
+
+ const BatchWriteExecStats& stats = backend.exec->getStats();
+ ASSERT_EQUALS( stats.numStaleBatches, 1 );
+ }
+
+ TEST(BatchWriteExecTests, MultiStaleOp) {
+
+ //
+ // Retry op in exec multiple times b/c of stale config
+ //
+
+ NamespaceString nss( "foo.bar" );
+ // Insert request
BatchedCommandRequest request( BatchedCommandRequest::BatchType_Insert );
request.setNS( nss.ns() );
request.setOrdered( false );
request.setWriteConcern( BSONObj() );
+ // Do single-target, single doc batch write op
+ request.getInsertRequest()->addToDocuments( BSON( "x" << 1 ) );
+
+ MockSingleShardBackend backend( nss );
+
+ vector<MockWriteResult*> mockResults;
+ WriteErrorDetail error;
+ error.setErrCode( ErrorCodes::StaleShardVersion );
+ error.setErrMessage( "mock stale error" );
+ for ( int i = 0; i < 3; i++ ) {
+ mockResults.push_back( new MockWriteResult( backend.shardHost, error ) );
+ }
+
+ backend.setMockResults( mockResults );
+
+ // Execute request
+ BatchedCommandResponse response;
+ backend.exec->executeBatch( request, &response );
+ ASSERT( response.getOk() );
+
+ const BatchWriteExecStats& stats = backend.exec->getStats();
+ ASSERT_EQUALS( stats.numStaleBatches, 3 );
+ }
+
+ TEST(BatchWriteExecTests, TooManyStaleOp) {
+ //
+ // Retry op in exec too many times (without refresh) b/c of stale config
+ // (The mock targeter doesn't report progress on refresh)
+ //
+
+ NamespaceString nss( "foo.bar" );
+
+ // Insert request
+ BatchedCommandRequest request( BatchedCommandRequest::BatchType_Insert );
+ request.setNS( nss.ns() );
+ request.setOrdered( false );
+ request.setWriteConcern( BSONObj() );
// Do single-target, single doc batch write op
+ request.getInsertRequest()->addToDocuments( BSON( "x" << 1 ) );
+ MockSingleShardBackend backend( nss );
+
+ vector<MockWriteResult*> mockResults;
+ WriteErrorDetail error;
+ error.setErrCode( ErrorCodes::StaleShardVersion );
+ error.setErrMessage( "mock stale error" );
+ for ( int i = 0; i < 10; i++ ) {
+ mockResults.push_back( new MockWriteResult( backend.shardHost, error ) );
+ }
+
+ backend.setMockResults( mockResults );
+
+ // Execute request
+ BatchedCommandResponse response;
+ backend.exec->executeBatch( request, &response );
+ ASSERT( !response.getOk() );
+ ASSERT_EQUALS( response.getErrCode(), ErrorCodes::NoProgressMade );
+ }
+
+ TEST(BatchWriteExecTests, ManyStaleOpWithMigration) {
+
+ //
+ // Retry op in exec many times b/c of stale config, but simulate remote migrations occurring
+ //
+
+ NamespaceString nss( "foo.bar" );
+
+ // Insert request
+ BatchedCommandRequest request( BatchedCommandRequest::BatchType_Insert );
+ request.setNS( nss.ns() );
+ request.setOrdered( false );
+ request.setWriteConcern( BSONObj() );
+ // Do single-target, single doc batch write op
request.getInsertRequest()->addToDocuments( BSON( "x" << 1 ) );
- MockNSTargeter targeter;
- targeter.init( mockRanges );
+ MockSingleShardBackend backend( nss );
- MockMultiWriteCommand dispatcher;
- dispatcher.init( mockEndpoints );
+ vector<MockWriteResult*> mockResults;
+ WriteErrorDetail error;
+ error.setErrCode( ErrorCodes::StaleShardVersion );
+ error.setErrMessage( "mock stale error" );
+ for ( int i = 0; i < 10; i++ ) {
+ if ( i % 2 == 0 )
+ error.setErrInfo( BSONObj() );
+ else
+ error.setErrInfo( BSON( "inCriticalSection" << true ) );
- BatchWriteExec exec( &targeter, &resolver, &dispatcher );
+ mockResults.push_back( new MockWriteResult( backend.shardHost, error ) );
+ }
+ backend.setMockResults( mockResults );
+
+ // Execute request
BatchedCommandResponse response;
- exec.executeBatch( request, &response );
+ backend.exec->executeBatch( request, &response );
ASSERT( response.getOk() );
+
+ const BatchWriteExecStats& stats = backend.exec->getStats();
+ ASSERT_EQUALS( stats.numStaleBatches, 10 );
}
-#endif
+
} // unnamed namespace
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index b7e04ffe1bc..67a3676aa73 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -397,7 +397,7 @@ namespace mongo {
if ( !response.getOk() ) {
WriteErrorDetail batchError;
cloneBatchErrorTo( response, &batchError );
- _batchCommandError.reset( new ShardError( targetedBatch.getEndpoint(),
+ _batchError.reset( new ShardError( targetedBatch.getEndpoint(),
batchError ));
cancelBatch( targetedBatch, _writeOps, batchError );
return;
@@ -495,8 +495,12 @@ namespace mongo {
noteBatchResponse( targetedBatch, response, NULL );
}
+ void BatchWriteOp::setBatchError( const WriteErrorDetail& error ) {
+ _batchError.reset( new ShardError( ShardEndpoint(), error ) );
+ }
+
bool BatchWriteOp::isFinished() {
- if ( _batchCommandError.get() ) return true;
+ if ( _batchError.get() ) return true;
size_t numWriteOps = _clientRequest->sizeWriteOps();
bool orderedOps = _clientRequest->getOrdered();
@@ -517,15 +521,18 @@ namespace mongo {
dassert( isFinished() );
- if ( _batchCommandError.get() ) {
+ if ( _batchError.get() ) {
+
batchResp->setOk( false );
+ batchResp->setErrCode( _batchError->error.getErrCode() );
- const WriteErrorDetail& error = _batchCommandError->error;
- batchResp->setErrCode( error.getErrCode() );
+ stringstream msg;
+ msg << _batchError->error.getErrMessage();
+ if ( !_batchError->endpoint.shardName.empty() ) {
+ msg << " at " << _batchError->endpoint.shardName;
+ }
- string errMsg( str::stream() << error.getErrMessage() << " at "
- << _batchCommandError->endpoint.shardName );
- batchResp->setErrMessage( errMsg );
+ batchResp->setErrMessage( msg.str() );
dassert( batchResp->isValid( NULL ) );
return;
}
@@ -634,6 +641,24 @@ namespace mongo {
}
}
+ int BatchWriteOp::numWriteOps() const {
+ return static_cast<int>( _clientRequest->sizeWriteOps() );
+ }
+
+ int BatchWriteOp::numWriteOpsIn( WriteOpState opState ) const {
+
+ // TODO: This could be faster, if we tracked this info explicitly
+ size_t numWriteOps = _clientRequest->sizeWriteOps();
+ int count = 0;
+ for ( size_t i = 0; i < numWriteOps; ++i ) {
+ WriteOp& writeOp = _writeOps[i];
+ if ( writeOp.getWriteState() == opState )
+ ++count;
+ }
+
+ return count;
+ }
+
void TrackedErrors::startTracking( int errCode ) {
dassert( !isTracking( errCode ) );
_errorMap.insert( make_pair( errCode, vector<ShardError*>() ) );
diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h
index 8c3bc1c0b43..f79dfd0cde3 100644
--- a/src/mongo/s/write_ops/batch_write_op.h
+++ b/src/mongo/s/write_ops/batch_write_op.h
@@ -135,6 +135,13 @@ namespace mongo {
const WriteErrorDetail& error );
/**
+ * Sets a command error for this batch op directly.
+ *
+ * Should only be used when there are no outstanding batches to return.
+ */
+ void setBatchError( const WriteErrorDetail& error );
+
+ /**
* Returns false if the batch write op needs more processing.
*/
bool isFinished();
@@ -144,6 +151,14 @@ namespace mongo {
*/
void buildClientResponse( BatchedCommandResponse* batchResp );
+ //
+ // Accessors
+ //
+
+ int numWriteOps() const;
+
+ int numWriteOpsIn( WriteOpState state ) const;
+
private:
// Incoming client request, not owned here
@@ -162,8 +177,10 @@ namespace mongo {
// Upserted ids for the whole write batch
OwnedPointerVector<BatchedUpsertDetail> _upsertedIds;
- // Use to store the error from the last shard that returned { ok: 0 }.
- scoped_ptr<ShardError> _batchCommandError;
+ // Use to store a top-level error indicating that the batch aborted unexpectedly and we
+ // can't report on any of the writes sent. May also include a ShardEndpoint indicating
+ // where the root problem was.
+ scoped_ptr<ShardError> _batchError;
// Stats for the entire batch op
scoped_ptr<BatchWriteStats> _stats;