summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorGreg Studer <greg@10gen.com>2014-02-27 17:10:13 -0500
committerGreg Studer <greg@10gen.com>2014-03-03 12:52:27 -0500
commit24f85cdfdba1db685f4da499d8fcb77385e57da7 (patch)
tree393539553c2903b5f2851b2787676d863f98e6ed /src/mongo
parentf880299ae5eb78c3cb1789e3439135d5d33cb64f (diff)
downloadmongo-24f85cdfdba1db685f4da499d8fcb77385e57da7.tar.gz
SERVER-12950 break ordered batches at first multi-shard op
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp124
-rw-r--r--src/mongo/s/write_ops/batch_write_op_test.cpp252
2 files changed, 354 insertions, 22 deletions
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index 7b9eb656f9f..93031ef3cce 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -77,11 +77,6 @@ namespace mongo {
_clientRequest = clientRequest;
}
- static void buildTargetError( const Status& errStatus, WriteErrorDetail* details ) {
- details->setErrCode( errStatus.code() );
- details->setErrMessage( errStatus.reason() );
- }
-
// Arbitrary endpoint ordering, needed for grouping by endpoint
static int compareEndpoints( const ShardEndpoint* endpointA, const ShardEndpoint* endpointB ) {
@@ -112,6 +107,27 @@ namespace mongo {
typedef std::map<const ShardEndpoint*, TargetedWriteBatch*, EndpointComp> TargetedBatchMap;
}
+ static void buildTargetError( const Status& errStatus, WriteErrorDetail* details ) {
+ details->setErrCode( errStatus.code() );
+ details->setErrMessage( errStatus.reason() );
+ }
+
+ // Helper to determine whether a number of targeted writes require a new targeted batch
+ static bool isNewBatchRequired( const vector<TargetedWrite*>& writes,
+ const TargetedBatchMap& batchMap ) {
+
+ for ( vector<TargetedWrite*>::const_iterator it = writes.begin(); it != writes.end();
+ ++it ) {
+
+ TargetedWrite* write = *it;
+ if ( batchMap.find( &write->endpoint ) == batchMap.end() ) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
// Helper function to cancel all the write ops of targeted batch.
static void cancelBatch( const TargetedWriteBatch& targetedBatch,
WriteOp* writeOps,
@@ -163,16 +179,47 @@ namespace mongo {
bool recordTargetErrors,
vector<TargetedWriteBatch*>* targetedBatches ) {
+ //
+ // Targeting of unordered batches is fairly simple - each remaining write op is targeted,
+ // and each of those targeted writes are grouped into a batch for a particular shard
+ // endpoint.
+ //
+ // Targeting of ordered batches is a bit more complex - to respect the ordering of the
+ // batch, we can only send:
+ // A) a single targeted batch to one shard endpoint
+ // B) multiple targeted batches, but only containing targeted writes for a single write op
+ //
+ // This means that any multi-shard write operation must be targeted and sent one-by-one.
+ // Subsequent single-shard write operations can be batched together if they go to the same
+ // place.
+ //
+ // Ex: ShardA : { skey : a->k }, ShardB : { skey : k->z }
+ //
+ // Ordered insert batch of: [{ skey : a }, { skey : b }, { skey : x }]
+ // broken into:
+ // [{ skey : a }, { skey : b }],
+ // [{ skey : x }]
+ //
+ // Ordered update Batch of :
+ // [{ skey : a }{ $push },
+ // { skey : b }{ $push },
+ // { skey : [c, x] }{ $push },
+ // { skey : y }{ $push },
+ // { skey : z }{ $push }]
+ // broken into:
+ // [{ skey : a }, { skey : b }],
+ // [{ skey : [c,x] }],
+ // [{ skey : y }, { skey : z }]
+ //
+
+ const bool ordered = _clientRequest->getOrdered();
+
TargetedBatchMap batchMap;
int numTargetErrors = 0;
size_t numWriteOps = _clientRequest->sizeWriteOps();
for ( size_t i = 0; i < numWriteOps; ++i ) {
- // Only do one-at-a-time ops if COE is false (and break at first target error)
- if ( _clientRequest->getOrdered() && ( !batchMap.empty() || numTargetErrors != 0 ) )
- break;
-
WriteOp& writeOp = _writeOps[i];
// Only target _Ready ops
@@ -190,24 +237,52 @@ namespace mongo {
if ( !targetStatus.isOK() ) {
- //
- // We're not sure how to target here, so either record the error or cancel the
- // current batches.
- //
-
WriteErrorDetail targetError;
buildTargetError( targetStatus, &targetError );
- if ( recordTargetErrors ) {
+ if ( !recordTargetErrors ) {
+
+ // Cancel current batch state with an error
+
+ cancelBatches( targetError, _writeOps, &batchMap );
+ dassert( batchMap.empty() );
+ return targetStatus;
+ }
+ else if ( !ordered || batchMap.empty() ) {
+
+ // Record an error for this batch
+
writeOp.setOpError( targetError );
++numTargetErrors;
+
+ if ( ordered )
+ return Status::OK();
+
continue;
}
else {
- // Cancel current batch state with an error
- cancelBatches( targetError, _writeOps, &batchMap );
- dassert( batchMap.empty() );
- return targetStatus;
+ dassert( ordered && !batchMap.empty() );
+
+ // Send out what we have, but don't record an error yet, since there may be an
+ // error in the writes before this point.
+
+ writeOp.cancelWrites( &targetError );
+ break;
+ }
+ }
+
+ //
+ // If ordered and we have a previous endpoint, make sure we don't need to send these
+ // targeted writes to any other endpoints.
+ //
+
+ if ( ordered && !batchMap.empty() ) {
+
+ dassert( batchMap.size() == 1u );
+ if ( isNewBatchRequired( writes, batchMap ) ) {
+
+ writeOp.cancelWrites( NULL );
+ break;
}
}
@@ -232,6 +307,14 @@ namespace mongo {
// Relinquish ownership of TargetedWrites, now the TargetedBatches own them
writesOwned.mutableVector().clear();
+
+ //
+ // Break if we're ordered and we have more than one endpoint - later writes cannot be
+ // enforced as ordered across multiple shard endpoints.
+ //
+
+ if ( ordered && batchMap.size() > 1u )
+ break;
}
//
@@ -242,6 +325,9 @@ namespace mongo {
TargetedWriteBatch* batch = it->second;
+ if ( batch->getWrites().empty() )
+ continue;
+
// Remember targeted batch for reporting
_targeted.insert( batch );
// Send the handle back to caller
diff --git a/src/mongo/s/write_ops/batch_write_op_test.cpp b/src/mongo/s/write_ops/batch_write_op_test.cpp
index 06d410ce18e..63d1eda036c 100644
--- a/src/mongo/s/write_ops/batch_write_op_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_op_test.cpp
@@ -169,7 +169,63 @@ namespace {
ASSERT( clientResponse.getErrMessage().find( error->getErrMessage()) != string::npos );
}
- TEST(WriteOpTests, TargetMultiOpSameShard) {
+ TEST(WriteOpTests, TargetMultiOpSameShardOrdered) {
+
+ //
+ // Multi-op targeting test
+ //
+
+ NamespaceString nss( "foo.bar" );
+
+ ShardEndpoint endpoint( "shard", ChunkVersion::IGNORED() );
+
+ vector<MockRange*> mockRanges;
+ mockRanges.push_back( new MockRange( endpoint,
+ nss,
+ BSON( "x" << MINKEY ),
+ BSON( "x" << MAXKEY ) ) );
+
+ BatchedCommandRequest request( BatchedCommandRequest::BatchType_Insert );
+ request.setNS( nss.ns() );
+ request.setOrdered( true );
+ request.setWriteConcern( BSONObj() );
+
+ // Do single-target, multi-doc batch write op
+
+ request.getInsertRequest()->addToDocuments( BSON( "x" << 1 ) );
+ request.getInsertRequest()->addToDocuments( BSON( "x" << 2 ) );
+
+ BatchWriteOp batchOp;
+ batchOp.initClientRequest( &request );
+ ASSERT( !batchOp.isFinished() );
+
+ MockNSTargeter targeter;
+ targeter.init( mockRanges );
+
+ OwnedPointerVector<TargetedWriteBatch> targetedOwned;
+ vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ Status status = batchOp.targetBatch( targeter, false, &targeted );
+
+ ASSERT( status.isOK() );
+ ASSERT( !batchOp.isFinished() );
+ ASSERT_EQUALS( targeted.size(), 1u );
+ ASSERT_EQUALS( targeted.front()->getWrites().size(), 2u );
+ assertEndpointsEqual( targeted.front()->getEndpoint(), endpoint );
+
+ BatchedCommandResponse response;
+ response.setOk( true );
+ response.setN( 0 );
+ ASSERT( response.isValid( NULL ) );
+
+ batchOp.noteBatchResponse( *targeted.front(), response, NULL );
+ ASSERT( batchOp.isFinished() );
+
+ BatchedCommandResponse clientResponse;
+ batchOp.buildClientResponse( &clientResponse );
+ ASSERT( clientResponse.getOk() );
+ }
+
+ TEST(WriteOpTests, TargetMultiOpSameShardUnordered) {
//
// Multi-op targeting test
@@ -313,10 +369,96 @@ namespace {
return deleteDoc;
}
- TEST(WriteOpTests, TargetMultiOpTwoShardsEach) {
+ TEST(WriteOpTests, TargetMultiOpTwoShardsEachOrdered) {
+
+ //
+ // Multi-op (ordered) targeting test where each op goes to both shards
+ //
+
+ NamespaceString nss( "foo.bar" );
+
+ ShardEndpoint endpointA( "shardA", ChunkVersion::IGNORED() );
+ ShardEndpoint endpointB( "shardB", ChunkVersion::IGNORED() );
+
+ vector<MockRange*> mockRanges;
+ mockRanges.push_back( new MockRange( endpointA,
+ nss,
+ BSON( "x" << MINKEY ),
+ BSON( "x" << 0 ) ) );
+ mockRanges.push_back( new MockRange( endpointB,
+ nss,
+ BSON( "x" << 0 ),
+ BSON( "x" << MAXKEY ) ) );
+
+ BatchedCommandRequest request( BatchedCommandRequest::BatchType_Delete );
+ request.setNS( nss.ns() );
+ request.setOrdered( true );
+ request.setWriteConcern( BSONObj() );
+
+ // Each op goes to both shards
+
+ BSONObj queryA = BSON( "x" << GTE << -1 << LT << 2 );
+ request.getDeleteRequest()->addToDeletes( buildDeleteDoc( BSON( "q" << queryA ) ) );
+ BSONObj queryB = BSON( "x" << GTE << -2 << LT << 1 );
+ request.getDeleteRequest()->addToDeletes( buildDeleteDoc( BSON( "q" << queryB ) ) );
+
+ BatchWriteOp batchOp;
+ batchOp.initClientRequest( &request );
+ ASSERT( !batchOp.isFinished() );
+
+ MockNSTargeter targeter;
+ targeter.init( mockRanges );
+
+ OwnedPointerVector<TargetedWriteBatch> targetedOwned;
+ vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ Status status = batchOp.targetBatch( targeter, false, &targeted );
+
+ ASSERT( status.isOK() );
+ ASSERT( !batchOp.isFinished() );
+ ASSERT_EQUALS( targeted.size(), 2u );
+ sortByEndpoint( &targeted );
+ assertEndpointsEqual( targeted.front()->getEndpoint(), endpointA );
+ assertEndpointsEqual( targeted.back()->getEndpoint(), endpointB );
+ ASSERT_EQUALS( targeted.front()->getWrites().size(), 1u );
+ ASSERT_EQUALS( targeted.back()->getWrites().size(), 1u );
+
+ BatchedCommandResponse response;
+ response.setOk( true );
+ response.setN( 0 );
+ ASSERT( response.isValid( NULL ) );
+
+ batchOp.noteBatchResponse( *targeted.front(), response, NULL );
+ ASSERT( !batchOp.isFinished() );
+ batchOp.noteBatchResponse( *targeted.back(), response, NULL );
+ ASSERT( !batchOp.isFinished() );
+
+ // Second round of targeting is needed
+ targetedOwned.clear();
+ status = batchOp.targetBatch( targeter, false, &targeted );
+
+ ASSERT( status.isOK() );
+ ASSERT( !batchOp.isFinished() );
+ ASSERT_EQUALS( targeted.size(), 2u );
+ sortByEndpoint( &targeted );
+ assertEndpointsEqual( targeted.front()->getEndpoint(), endpointA );
+ assertEndpointsEqual( targeted.back()->getEndpoint(), endpointB );
+ ASSERT_EQUALS( targeted.front()->getWrites().size(), 1u );
+ ASSERT_EQUALS( targeted.back()->getWrites().size(), 1u );
+
+ batchOp.noteBatchResponse( *targeted.front(), response, NULL );
+ ASSERT( !batchOp.isFinished() );
+ batchOp.noteBatchResponse( *targeted.back(), response, NULL );
+ ASSERT( batchOp.isFinished() );
+
+ BatchedCommandResponse clientResponse;
+ batchOp.buildClientResponse( &clientResponse );
+ ASSERT( clientResponse.getOk() );
+ }
+
+ TEST(WriteOpTests, TargetMultiOpTwoShardsEachUnordered) {
//
- // Multi-op targeting test where each op goes to both shards
+ // Multi-op (unaordered) targeting test where each op goes to both shards
//
NamespaceString nss( "foo.bar" );
@@ -381,6 +523,110 @@ namespace {
ASSERT( clientResponse.getOk() );
}
+ TEST(WriteOpTests, TargetMultiOpOneOrTwoShardsOrdered) {
+
+ //
+ // Multi-op (ordered) targeting test where first two ops go to one shard, second two ops
+ // go to two shards.
+ //
+
+ NamespaceString nss( "foo.bar" );
+
+ ShardEndpoint endpointA( "shardA", ChunkVersion::IGNORED() );
+ ShardEndpoint endpointB( "shardB", ChunkVersion::IGNORED() );
+
+ vector<MockRange*> mockRanges;
+ mockRanges.push_back( new MockRange( endpointA,
+ nss,
+ BSON( "x" << MINKEY ),
+ BSON( "x" << 0 ) ) );
+ mockRanges.push_back( new MockRange( endpointB,
+ nss,
+ BSON( "x" << 0 ),
+ BSON( "x" << MAXKEY ) ) );
+
+ MockNSTargeter targeter;
+ targeter.init( mockRanges );
+
+ BatchedCommandRequest request( BatchedCommandRequest::BatchType_Delete );
+ request.setNS( nss.ns() );
+ request.setOrdered( true );
+ request.setWriteConcern( BSONObj() );
+
+ // Each op goes to one shard
+ BSONObj queryOne = BSON( "x" << GTE << -2 << LT << -1 );
+ request.getDeleteRequest()->addToDeletes( buildDeleteDoc( BSON( "q" << queryOne ) ) );
+ request.getDeleteRequest()->addToDeletes( buildDeleteDoc( BSON( "q" << queryOne ) ) );
+
+ // Each op goes to both shards
+ BSONObj queryBoth = BSON( "x" << GTE << -1 << LT << 2 );
+ request.getDeleteRequest()->addToDeletes( buildDeleteDoc( BSON( "q" << queryBoth ) ) );
+ request.getDeleteRequest()->addToDeletes( buildDeleteDoc( BSON( "q" << queryBoth ) ) );
+
+ BatchWriteOp batchOp;
+ batchOp.initClientRequest( &request );
+ ASSERT( !batchOp.isFinished() );
+
+ // First round of targeting
+ OwnedPointerVector<TargetedWriteBatch> targetedOwned;
+ vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ Status status = batchOp.targetBatch( targeter, false, &targeted );
+
+ ASSERT( status.isOK() );
+ ASSERT( !batchOp.isFinished() );
+ ASSERT_EQUALS( targeted.size(), 1u );
+ assertEndpointsEqual( targeted.front()->getEndpoint(), endpointA );
+ ASSERT_EQUALS( targeted.front()->getWrites().size(), 2u );
+
+ BatchedCommandResponse response;
+ response.setOk( true );
+ response.setN( 0 );
+ ASSERT( response.isValid( NULL ) );
+
+ batchOp.noteBatchResponse( *targeted.front(), response, NULL );
+ ASSERT( !batchOp.isFinished() );
+
+ // Second round of targeting
+ targetedOwned.clear();
+ status = batchOp.targetBatch( targeter, false, &targeted );
+
+ ASSERT( status.isOK() );
+ ASSERT( !batchOp.isFinished() );
+ ASSERT_EQUALS( targeted.size(), 2u );
+ sortByEndpoint( &targeted );
+ assertEndpointsEqual( targeted.front()->getEndpoint(), endpointA );
+ assertEndpointsEqual( targeted.back()->getEndpoint(), endpointB );
+ ASSERT_EQUALS( targeted.front()->getWrites().size(), 1u );
+ ASSERT_EQUALS( targeted.back()->getWrites().size(), 1u );
+
+ batchOp.noteBatchResponse( *targeted.front(), response, NULL );
+ ASSERT( !batchOp.isFinished() );
+ batchOp.noteBatchResponse( *targeted.back(), response, NULL );
+ ASSERT( !batchOp.isFinished() );
+
+ // Third round of targeting
+ targetedOwned.clear();
+ status = batchOp.targetBatch( targeter, false, &targeted );
+
+ ASSERT( status.isOK() );
+ ASSERT( !batchOp.isFinished() );
+ ASSERT_EQUALS( targeted.size(), 2u );
+ sortByEndpoint( &targeted );
+ assertEndpointsEqual( targeted.front()->getEndpoint(), endpointA );
+ assertEndpointsEqual( targeted.back()->getEndpoint(), endpointB );
+ ASSERT_EQUALS( targeted.front()->getWrites().size(), 1u );
+ ASSERT_EQUALS( targeted.back()->getWrites().size(), 1u );
+
+ batchOp.noteBatchResponse( *targeted.front(), response, NULL );
+ ASSERT( !batchOp.isFinished() );
+ batchOp.noteBatchResponse( *targeted.back(), response, NULL );
+ ASSERT( batchOp.isFinished() );
+
+ BatchedCommandResponse clientResponse;
+ batchOp.buildClientResponse( &clientResponse );
+ ASSERT( clientResponse.getOk() );
+ }
+
TEST(WriteOpTests, TargetMultiOpTwoShardsEachError) {
//