diff options
author | Greg Studer <greg@10gen.com> | 2014-02-27 17:10:13 -0500 |
---|---|---|
committer | Greg Studer <greg@10gen.com> | 2014-03-03 12:52:27 -0500 |
commit | 24f85cdfdba1db685f4da499d8fcb77385e57da7 (patch) | |
tree | 393539553c2903b5f2851b2787676d863f98e6ed /src/mongo | |
parent | f880299ae5eb78c3cb1789e3439135d5d33cb64f (diff) | |
download | mongo-24f85cdfdba1db685f4da499d8fcb77385e57da7.tar.gz |
SERVER-12950 break ordered batches at first multi-shard op
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op.cpp | 124 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op_test.cpp | 252 |
2 files changed, 354 insertions, 22 deletions
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index 7b9eb656f9f..93031ef3cce 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -77,11 +77,6 @@ namespace mongo { _clientRequest = clientRequest; } - static void buildTargetError( const Status& errStatus, WriteErrorDetail* details ) { - details->setErrCode( errStatus.code() ); - details->setErrMessage( errStatus.reason() ); - } - // Arbitrary endpoint ordering, needed for grouping by endpoint static int compareEndpoints( const ShardEndpoint* endpointA, const ShardEndpoint* endpointB ) { @@ -112,6 +107,27 @@ namespace mongo { typedef std::map<const ShardEndpoint*, TargetedWriteBatch*, EndpointComp> TargetedBatchMap; } + static void buildTargetError( const Status& errStatus, WriteErrorDetail* details ) { + details->setErrCode( errStatus.code() ); + details->setErrMessage( errStatus.reason() ); + } + + // Helper to determine whether a number of targeted writes require a new targeted batch + static bool isNewBatchRequired( const vector<TargetedWrite*>& writes, + const TargetedBatchMap& batchMap ) { + + for ( vector<TargetedWrite*>::const_iterator it = writes.begin(); it != writes.end(); + ++it ) { + + TargetedWrite* write = *it; + if ( batchMap.find( &write->endpoint ) == batchMap.end() ) { + return true; + } + } + + return false; + } + // Helper function to cancel all the write ops of targeted batch. static void cancelBatch( const TargetedWriteBatch& targetedBatch, WriteOp* writeOps, @@ -163,16 +179,47 @@ namespace mongo { bool recordTargetErrors, vector<TargetedWriteBatch*>* targetedBatches ) { + // + // Targeting of unordered batches is fairly simple - each remaining write op is targeted, + // and each of those targeted writes are grouped into a batch for a particular shard + // endpoint. + // + // Targeting of ordered batches is a bit more complex - to respect the ordering of the + // batch, we can only send: + // A) a single targeted batch to one shard endpoint + // B) multiple targeted batches, but only containing targeted writes for a single write op + // + // This means that any multi-shard write operation must be targeted and sent one-by-one. + // Subsequent single-shard write operations can be batched together if they go to the same + // place. + // + // Ex: ShardA : { skey : a->k }, ShardB : { skey : k->z } + // + // Ordered insert batch of: [{ skey : a }, { skey : b }, { skey : x }] + // broken into: + // [{ skey : a }, { skey : b }], + // [{ skey : x }] + // + // Ordered update Batch of : + // [{ skey : a }{ $push }, + // { skey : b }{ $push }, + // { skey : [c, x] }{ $push }, + // { skey : y }{ $push }, + // { skey : z }{ $push }] + // broken into: + // [{ skey : a }, { skey : b }], + // [{ skey : [c,x] }], + // [{ skey : y }, { skey : z }] + // + + const bool ordered = _clientRequest->getOrdered(); + TargetedBatchMap batchMap; int numTargetErrors = 0; size_t numWriteOps = _clientRequest->sizeWriteOps(); for ( size_t i = 0; i < numWriteOps; ++i ) { - // Only do one-at-a-time ops if COE is false (and break at first target error) - if ( _clientRequest->getOrdered() && ( !batchMap.empty() || numTargetErrors != 0 ) ) - break; - WriteOp& writeOp = _writeOps[i]; // Only target _Ready ops @@ -190,24 +237,52 @@ namespace mongo { if ( !targetStatus.isOK() ) { - // - // We're not sure how to target here, so either record the error or cancel the - // current batches. - // - WriteErrorDetail targetError; buildTargetError( targetStatus, &targetError ); - if ( recordTargetErrors ) { + if ( !recordTargetErrors ) { + + // Cancel current batch state with an error + + cancelBatches( targetError, _writeOps, &batchMap ); + dassert( batchMap.empty() ); + return targetStatus; + } + else if ( !ordered || batchMap.empty() ) { + + // Record an error for this batch + writeOp.setOpError( targetError ); ++numTargetErrors; + + if ( ordered ) + return Status::OK(); + continue; } else { - // Cancel current batch state with an error - cancelBatches( targetError, _writeOps, &batchMap ); - dassert( batchMap.empty() ); - return targetStatus; + dassert( ordered && !batchMap.empty() ); + + // Send out what we have, but don't record an error yet, since there may be an + // error in the writes before this point. + + writeOp.cancelWrites( &targetError ); + break; + } + } + + // + // If ordered and we have a previous endpoint, make sure we don't need to send these + // targeted writes to any other endpoints. + // + + if ( ordered && !batchMap.empty() ) { + + dassert( batchMap.size() == 1u ); + if ( isNewBatchRequired( writes, batchMap ) ) { + + writeOp.cancelWrites( NULL ); + break; } } @@ -232,6 +307,14 @@ namespace mongo { // Relinquish ownership of TargetedWrites, now the TargetedBatches own them writesOwned.mutableVector().clear(); + + // + // Break if we're ordered and we have more than one endpoint - later writes cannot be + // enforced as ordered across multiple shard endpoints. + // + + if ( ordered && batchMap.size() > 1u ) + break; } // @@ -242,6 +325,9 @@ namespace mongo { TargetedWriteBatch* batch = it->second; + if ( batch->getWrites().empty() ) + continue; + // Remember targeted batch for reporting _targeted.insert( batch ); // Send the handle back to caller diff --git a/src/mongo/s/write_ops/batch_write_op_test.cpp b/src/mongo/s/write_ops/batch_write_op_test.cpp index 06d410ce18e..63d1eda036c 100644 --- a/src/mongo/s/write_ops/batch_write_op_test.cpp +++ b/src/mongo/s/write_ops/batch_write_op_test.cpp @@ -169,7 +169,63 @@ namespace { ASSERT( clientResponse.getErrMessage().find( error->getErrMessage()) != string::npos ); } - TEST(WriteOpTests, TargetMultiOpSameShard) { + TEST(WriteOpTests, TargetMultiOpSameShardOrdered) { + + // + // Multi-op targeting test + // + + NamespaceString nss( "foo.bar" ); + + ShardEndpoint endpoint( "shard", ChunkVersion::IGNORED() ); + + vector<MockRange*> mockRanges; + mockRanges.push_back( new MockRange( endpoint, + nss, + BSON( "x" << MINKEY ), + BSON( "x" << MAXKEY ) ) ); + + BatchedCommandRequest request( BatchedCommandRequest::BatchType_Insert ); + request.setNS( nss.ns() ); + request.setOrdered( true ); + request.setWriteConcern( BSONObj() ); + + // Do single-target, multi-doc batch write op + + request.getInsertRequest()->addToDocuments( BSON( "x" << 1 ) ); + request.getInsertRequest()->addToDocuments( BSON( "x" << 2 ) ); + + BatchWriteOp batchOp; + batchOp.initClientRequest( &request ); + ASSERT( !batchOp.isFinished() ); + + MockNSTargeter targeter; + targeter.init( mockRanges ); + + OwnedPointerVector<TargetedWriteBatch> targetedOwned; + vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + Status status = batchOp.targetBatch( targeter, false, &targeted ); + + ASSERT( status.isOK() ); + ASSERT( !batchOp.isFinished() ); + ASSERT_EQUALS( targeted.size(), 1u ); + ASSERT_EQUALS( targeted.front()->getWrites().size(), 2u ); + assertEndpointsEqual( targeted.front()->getEndpoint(), endpoint ); + + BatchedCommandResponse response; + response.setOk( true ); + response.setN( 0 ); + ASSERT( response.isValid( NULL ) ); + + batchOp.noteBatchResponse( *targeted.front(), response, NULL ); + ASSERT( batchOp.isFinished() ); + + BatchedCommandResponse clientResponse; + batchOp.buildClientResponse( &clientResponse ); + ASSERT( clientResponse.getOk() ); + } + + TEST(WriteOpTests, TargetMultiOpSameShardUnordered) { // // Multi-op targeting test @@ -313,10 +369,96 @@ namespace { return deleteDoc; } - TEST(WriteOpTests, TargetMultiOpTwoShardsEach) { + TEST(WriteOpTests, TargetMultiOpTwoShardsEachOrdered) { + + // + // Multi-op (ordered) targeting test where each op goes to both shards + // + + NamespaceString nss( "foo.bar" ); + + ShardEndpoint endpointA( "shardA", ChunkVersion::IGNORED() ); + ShardEndpoint endpointB( "shardB", ChunkVersion::IGNORED() ); + + vector<MockRange*> mockRanges; + mockRanges.push_back( new MockRange( endpointA, + nss, + BSON( "x" << MINKEY ), + BSON( "x" << 0 ) ) ); + mockRanges.push_back( new MockRange( endpointB, + nss, + BSON( "x" << 0 ), + BSON( "x" << MAXKEY ) ) ); + + BatchedCommandRequest request( BatchedCommandRequest::BatchType_Delete ); + request.setNS( nss.ns() ); + request.setOrdered( true ); + request.setWriteConcern( BSONObj() ); + + // Each op goes to both shards + + BSONObj queryA = BSON( "x" << GTE << -1 << LT << 2 ); + request.getDeleteRequest()->addToDeletes( buildDeleteDoc( BSON( "q" << queryA ) ) ); + BSONObj queryB = BSON( "x" << GTE << -2 << LT << 1 ); + request.getDeleteRequest()->addToDeletes( buildDeleteDoc( BSON( "q" << queryB ) ) ); + + BatchWriteOp batchOp; + batchOp.initClientRequest( &request ); + ASSERT( !batchOp.isFinished() ); + + MockNSTargeter targeter; + targeter.init( mockRanges ); + + OwnedPointerVector<TargetedWriteBatch> targetedOwned; + vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + Status status = batchOp.targetBatch( targeter, false, &targeted ); + + ASSERT( status.isOK() ); + ASSERT( !batchOp.isFinished() ); + ASSERT_EQUALS( targeted.size(), 2u ); + sortByEndpoint( &targeted ); + assertEndpointsEqual( targeted.front()->getEndpoint(), endpointA ); + assertEndpointsEqual( targeted.back()->getEndpoint(), endpointB ); + ASSERT_EQUALS( targeted.front()->getWrites().size(), 1u ); + ASSERT_EQUALS( targeted.back()->getWrites().size(), 1u ); + + BatchedCommandResponse response; + response.setOk( true ); + response.setN( 0 ); + ASSERT( response.isValid( NULL ) ); + + batchOp.noteBatchResponse( *targeted.front(), response, NULL ); + ASSERT( !batchOp.isFinished() ); + batchOp.noteBatchResponse( *targeted.back(), response, NULL ); + ASSERT( !batchOp.isFinished() ); + + // Second round of targeting is needed + targetedOwned.clear(); + status = batchOp.targetBatch( targeter, false, &targeted ); + + ASSERT( status.isOK() ); + ASSERT( !batchOp.isFinished() ); + ASSERT_EQUALS( targeted.size(), 2u ); + sortByEndpoint( &targeted ); + assertEndpointsEqual( targeted.front()->getEndpoint(), endpointA ); + assertEndpointsEqual( targeted.back()->getEndpoint(), endpointB ); + ASSERT_EQUALS( targeted.front()->getWrites().size(), 1u ); + ASSERT_EQUALS( targeted.back()->getWrites().size(), 1u ); + + batchOp.noteBatchResponse( *targeted.front(), response, NULL ); + ASSERT( !batchOp.isFinished() ); + batchOp.noteBatchResponse( *targeted.back(), response, NULL ); + ASSERT( batchOp.isFinished() ); + + BatchedCommandResponse clientResponse; + batchOp.buildClientResponse( &clientResponse ); + ASSERT( clientResponse.getOk() ); + } + + TEST(WriteOpTests, TargetMultiOpTwoShardsEachUnordered) { // - // Multi-op targeting test where each op goes to both shards + // Multi-op (unaordered) targeting test where each op goes to both shards // NamespaceString nss( "foo.bar" ); @@ -381,6 +523,110 @@ namespace { ASSERT( clientResponse.getOk() ); } + TEST(WriteOpTests, TargetMultiOpOneOrTwoShardsOrdered) { + + // + // Multi-op (ordered) targeting test where first two ops go to one shard, second two ops + // go to two shards. + // + + NamespaceString nss( "foo.bar" ); + + ShardEndpoint endpointA( "shardA", ChunkVersion::IGNORED() ); + ShardEndpoint endpointB( "shardB", ChunkVersion::IGNORED() ); + + vector<MockRange*> mockRanges; + mockRanges.push_back( new MockRange( endpointA, + nss, + BSON( "x" << MINKEY ), + BSON( "x" << 0 ) ) ); + mockRanges.push_back( new MockRange( endpointB, + nss, + BSON( "x" << 0 ), + BSON( "x" << MAXKEY ) ) ); + + MockNSTargeter targeter; + targeter.init( mockRanges ); + + BatchedCommandRequest request( BatchedCommandRequest::BatchType_Delete ); + request.setNS( nss.ns() ); + request.setOrdered( true ); + request.setWriteConcern( BSONObj() ); + + // Each op goes to one shard + BSONObj queryOne = BSON( "x" << GTE << -2 << LT << -1 ); + request.getDeleteRequest()->addToDeletes( buildDeleteDoc( BSON( "q" << queryOne ) ) ); + request.getDeleteRequest()->addToDeletes( buildDeleteDoc( BSON( "q" << queryOne ) ) ); + + // Each op goes to both shards + BSONObj queryBoth = BSON( "x" << GTE << -1 << LT << 2 ); + request.getDeleteRequest()->addToDeletes( buildDeleteDoc( BSON( "q" << queryBoth ) ) ); + request.getDeleteRequest()->addToDeletes( buildDeleteDoc( BSON( "q" << queryBoth ) ) ); + + BatchWriteOp batchOp; + batchOp.initClientRequest( &request ); + ASSERT( !batchOp.isFinished() ); + + // First round of targeting + OwnedPointerVector<TargetedWriteBatch> targetedOwned; + vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + Status status = batchOp.targetBatch( targeter, false, &targeted ); + + ASSERT( status.isOK() ); + ASSERT( !batchOp.isFinished() ); + ASSERT_EQUALS( targeted.size(), 1u ); + assertEndpointsEqual( targeted.front()->getEndpoint(), endpointA ); + ASSERT_EQUALS( targeted.front()->getWrites().size(), 2u ); + + BatchedCommandResponse response; + response.setOk( true ); + response.setN( 0 ); + ASSERT( response.isValid( NULL ) ); + + batchOp.noteBatchResponse( *targeted.front(), response, NULL ); + ASSERT( !batchOp.isFinished() ); + + // Second round of targeting + targetedOwned.clear(); + status = batchOp.targetBatch( targeter, false, &targeted ); + + ASSERT( status.isOK() ); + ASSERT( !batchOp.isFinished() ); + ASSERT_EQUALS( targeted.size(), 2u ); + sortByEndpoint( &targeted ); + assertEndpointsEqual( targeted.front()->getEndpoint(), endpointA ); + assertEndpointsEqual( targeted.back()->getEndpoint(), endpointB ); + ASSERT_EQUALS( targeted.front()->getWrites().size(), 1u ); + ASSERT_EQUALS( targeted.back()->getWrites().size(), 1u ); + + batchOp.noteBatchResponse( *targeted.front(), response, NULL ); + ASSERT( !batchOp.isFinished() ); + batchOp.noteBatchResponse( *targeted.back(), response, NULL ); + ASSERT( !batchOp.isFinished() ); + + // Third round of targeting + targetedOwned.clear(); + status = batchOp.targetBatch( targeter, false, &targeted ); + + ASSERT( status.isOK() ); + ASSERT( !batchOp.isFinished() ); + ASSERT_EQUALS( targeted.size(), 2u ); + sortByEndpoint( &targeted ); + assertEndpointsEqual( targeted.front()->getEndpoint(), endpointA ); + assertEndpointsEqual( targeted.back()->getEndpoint(), endpointB ); + ASSERT_EQUALS( targeted.front()->getWrites().size(), 1u ); + ASSERT_EQUALS( targeted.back()->getWrites().size(), 1u ); + + batchOp.noteBatchResponse( *targeted.front(), response, NULL ); + ASSERT( !batchOp.isFinished() ); + batchOp.noteBatchResponse( *targeted.back(), response, NULL ); + ASSERT( batchOp.isFinished() ); + + BatchedCommandResponse clientResponse; + batchOp.buildClientResponse( &clientResponse ); + ASSERT( clientResponse.getOk() ); + } + TEST(WriteOpTests, TargetMultiOpTwoShardsEachError) { // |