From f93d14738c1abaa31b44a0af87700e72fd3979d3 Mon Sep 17 00:00:00 2001 From: Greg Studer Date: Tue, 1 Apr 2014 15:40:26 -0400 Subject: SERVER-13448 broadcast updates/deletes to all shards when more than one targeted (cherry picked from commit 9b17edd15ffb0ba3a3a9309fafd9bd4fa04a4d92) --- jstests/sharding/multi_write_target.js | 88 +++++++++++++++++++++++++ src/mongo/s/chunk_manager_targeter.cpp | 26 +++++++- src/mongo/s/chunk_manager_targeter.h | 4 +- src/mongo/s/mock_ns_targeter.h | 14 +++- src/mongo/s/ns_targeter.h | 9 ++- src/mongo/s/write_ops/batch_write_exec.cpp | 25 +++++++ src/mongo/s/write_ops/batched_command_request.h | 11 ++++ src/mongo/s/write_ops/write_op.cpp | 14 +++- src/mongo/s/write_ops/write_op_test.cpp | 82 ++++++++++++++++++++--- 9 files changed, 258 insertions(+), 15 deletions(-) create mode 100644 jstests/sharding/multi_write_target.js diff --git a/jstests/sharding/multi_write_target.js b/jstests/sharding/multi_write_target.js new file mode 100644 index 00000000000..52ce36a83e0 --- /dev/null +++ b/jstests/sharding/multi_write_target.js @@ -0,0 +1,88 @@ +// +// Tests that multi-writes (update/delete) target *all* shards and not just shards in the collection +// + +var options = { separateConfig : true }; + +var st = new ShardingTest({ shards : 3, mongos : 1, other : options }); +st.stopBalancer(); + +var mongos = st.s0; +var admin = mongos.getDB( "admin" ); +var shards = mongos.getCollection( "config.shards" ).find().toArray(); +var coll = mongos.getCollection( "foo.bar" ); + +assert( admin.runCommand({ enableSharding : coll.getDB() + "" }).ok ); +printjson( admin.runCommand({ movePrimary : coll.getDB() + "", to : shards[0]._id }) ); +assert( admin.runCommand({ shardCollection : coll + "", key : { skey : 1 } }).ok ); +assert( admin.runCommand({ split : coll + "", middle : { skey : 0 } }).ok ); +assert( admin.runCommand({ moveChunk : coll + "", + find : { skey : 0 }, + to : shards[1]._id }).ok ); + +st.printShardingStatus(); + +jsTest.log("Testing multi-update..."); + +// Put data on all shards +st.shard0.getCollection(coll.toString()).insert({ _id : 0, skey : -1, x : 1 }); +assert.gleOK(st.shard0.getCollection(coll.toString()).getDB().getLastErrorObj()); +st.shard1.getCollection(coll.toString()).insert({ _id : 1, skey : 1, x : 1 }); +assert.gleOK(st.shard1.getCollection(coll.toString()).getDB().getLastErrorObj()); +// Data not in chunks +st.shard2.getCollection(coll.toString()).insert({ _id : 0, x : 1 }); +assert.gleOK(st.shard2.getCollection(coll.toString()).getDB().getLastErrorObj()); + +// Non-multi-update doesn't work without shard key +coll.update({ x : 1 }, { $set : { updated : true } }, { multi : false }); +assert.gleError(coll.getDB().getLastErrorObj()); + +coll.update({ x : 1 }, { $set : { updated : true } }, { multi : true }); +assert.gleOK(coll.getDB().getLastErrorObj()); + +// Ensure update goes to *all* shards +assert.neq(null, st.shard0.getCollection(coll.toString()).findOne({ updated : true })); +assert.neq(null, st.shard1.getCollection(coll.toString()).findOne({ updated : true })); +assert.neq(null, st.shard2.getCollection(coll.toString()).findOne({ updated : true })); + +// _id update works, and goes to all shards +coll.update({ _id : 0 }, { $set : { updatedById : true } }, { multi : false }); +assert.gleOK(coll.getDB().getLastErrorObj()); + +// Ensure _id update goes to *all* shards +assert.neq(null, st.shard0.getCollection(coll.toString()).findOne({ updatedById : true })); +assert.neq(null, st.shard2.getCollection(coll.toString()).findOne({ updatedById : true })); + +jsTest.log("Testing multi-delete..."); + +// non-multi-delete doesn't work without shard key +coll.remove({ x : 1 }, { justOne : true }); +assert.gleError(coll.getDB().getLastErrorObj()); + +coll.remove({ x : 1 }, { justOne : false }); +assert.gleOK(coll.getDB().getLastErrorObj()); + +// Ensure delete goes to *all* shards +assert.eq(null, st.shard0.getCollection(coll.toString()).findOne({ x : 1 })); +assert.eq(null, st.shard1.getCollection(coll.toString()).findOne({ x : 1 })); +assert.eq(null, st.shard2.getCollection(coll.toString()).findOne({ x : 1 })); + +// Put more on all shards +st.shard0.getCollection(coll.toString()).insert({ _id : 0, skey : -1, x : 1 }); +assert.gleOK(st.shard0.getCollection(coll.toString()).getDB().getLastErrorObj()); +st.shard1.getCollection(coll.toString()).insert({ _id : 1, skey : 1, x : 1 }); +assert.gleOK(st.shard1.getCollection(coll.toString()).getDB().getLastErrorObj()); +// Data not in chunks +st.shard2.getCollection(coll.toString()).insert({ _id : 0, x : 1 }); +assert.gleOK(st.shard2.getCollection(coll.toString()).getDB().getLastErrorObj()); + +coll.remove({ _id : 0 }, { justOne : true }); +assert.gleOK(coll.getDB().getLastErrorObj()); + +// Ensure _id delete goes to *all* shards +assert.eq(null, st.shard0.getCollection(coll.toString()).findOne({ x : 1 })); +assert.eq(null, st.shard2.getCollection(coll.toString()).findOne({ x : 1 })); + +jsTest.log( "DONE!" ); + +st.stop(); diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp index 3054d4ed700..ebd7a4eee68 100644 --- a/src/mongo/s/chunk_manager_targeter.cpp +++ b/src/mongo/s/chunk_manager_targeter.cpp @@ -307,7 +307,9 @@ namespace mongo { return Status::OK(); } - Status ChunkManagerTargeter::targetAll( vector* endpoints ) const { + + + Status ChunkManagerTargeter::targetCollection( vector* endpoints ) const { if ( !_primary && !_manager ) { return Status( ErrorCodes::NamespaceNotFound, @@ -334,6 +336,28 @@ namespace mongo { return Status::OK(); } + Status ChunkManagerTargeter::targetAllShards( vector* endpoints ) const { + + if ( !_primary && !_manager ) { + return Status( ErrorCodes::NamespaceNotFound, + str::stream() << "could not target every shard with versions for " + << getNS().ns() + << "; metadata not found" ); + } + + vector shards; + Shard::getAllShards( shards ); + + for ( vector::iterator it = shards.begin(); it != shards.end(); ++it ) { + endpoints->push_back( new ShardEndpoint( it->getName(), + _manager ? + _manager->getVersion( *it ) : + ChunkVersion::UNSHARDED() ) ); + } + + return Status::OK(); + } + namespace { // diff --git a/src/mongo/s/chunk_manager_targeter.h b/src/mongo/s/chunk_manager_targeter.h index bb1f63e283b..bd9e46adb0a 100644 --- a/src/mongo/s/chunk_manager_targeter.h +++ b/src/mongo/s/chunk_manager_targeter.h @@ -73,7 +73,9 @@ namespace mongo { Status targetDelete( const BatchedDeleteDocument& deleteDoc, std::vector* endpoints ) const; - Status targetAll( std::vector* endpoints ) const; + Status targetCollection( std::vector* endpoints ) const; + + Status targetAllShards( std::vector* endpoints ) const; void noteStaleResponse( const ShardEndpoint& endpoint, const BSONObj& staleInfo ); diff --git a/src/mongo/s/mock_ns_targeter.h b/src/mongo/s/mock_ns_targeter.h index 248ccb445ac..d51ea5de3b6 100644 --- a/src/mongo/s/mock_ns_targeter.h +++ b/src/mongo/s/mock_ns_targeter.h @@ -119,12 +119,24 @@ namespace mongo { return targetQuery( deleteDoc.getQuery(), endpoints ); } - Status targetAll( std::vector* endpoints ) const { + Status targetCollection( std::vector* endpoints ) const { // TODO: XXX // No-op return Status::OK(); } + Status targetAllShards( std::vector* endpoints ) const { + const std::vector& ranges = getRanges(); + for ( std::vector::const_iterator it = ranges.begin(); it != ranges.end(); + ++it ) { + + const MockRange* range = *it; + endpoints->push_back( new ShardEndpoint( range->endpoint ) ); + } + + return Status::OK(); + } + void noteCouldNotTarget() { // No-op } diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h index 4baae5bd6cf..1d5cc78f323 100644 --- a/src/mongo/s/ns_targeter.h +++ b/src/mongo/s/ns_targeter.h @@ -106,7 +106,14 @@ namespace mongo { * * Returns !OK with message if the full collection could not be targeted. */ - virtual Status targetAll( std::vector* endpoints ) const = 0; + virtual Status targetCollection( std::vector* endpoints ) const = 0; + + /** + * Returns a vector of ShardEndpoints for all shards. + * + * Returns !OK with message if all shards could not be targeted. + */ + virtual Status targetAllShards( std::vector* endpoints ) const = 0; /** * Informs the targeter that a targeting failure occurred during one of the last targeting diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp index 3c2190add51..1ed043021c6 100644 --- a/src/mongo/s/write_ops/batch_write_exec.cpp +++ b/src/mongo/s/write_ops/batch_write_exec.cpp @@ -89,6 +89,10 @@ namespace mongo { void BatchWriteExec::executeBatch( const BatchedCommandRequest& clientRequest, BatchedCommandResponse* clientResponse ) { + LOG( 4 ) << "starting execution of write batch of size " + << static_cast( clientRequest.sizeWriteOps() ) + << " for " << clientRequest.getNS() << endl; + BatchWriteOp batchOp; batchOp.initClientRequest( &clientRequest ); @@ -184,6 +188,10 @@ namespace mongo { // cancel and retarget the batch WriteErrorDetail error; buildErrorFrom( resolveStatus, &error ); + + LOG( 4 ) << "unable to send write batch to " << shardHost.toString() + << causedBy( resolveStatus.toString() ) << endl; + batchOp.noteBatchError( *nextBatch, error ); // We're done with this batch @@ -210,6 +218,9 @@ namespace mongo { NamespaceString nss( request.getNS() ); request.setNS( nss.coll() ); + LOG( 4 ) << "sending write batch to " << shardHost.toString() << ": " + << request.toString() << endl; + _dispatcher->addCommand( shardHost, nss.db(), request ); // Indicate we're done by setting the batch to NULL @@ -246,6 +257,9 @@ namespace mongo { TrackedErrors trackedErrors; trackedErrors.startTracking( ErrorCodes::StaleShardVersion ); + LOG( 4 ) << "write results received from " << shardHost.toString() << ": " + << response.toString() << endl; + // Dispatch was ok, note response batchOp.noteBatchResponse( *batch, response, &trackedErrors ); @@ -283,6 +297,10 @@ namespace mongo { WriteErrorDetail error; buildErrorFrom( Status( ErrorCodes::RemoteResultsUnavailable, msg.str() ), &error ); + + LOG( 4 ) << "unable to receive write results from " << shardHost.toString() + << causedBy( dispatchStatus.toString() ) << endl; + batchOp.noteBatchError( *batch, error ); } } @@ -341,6 +359,13 @@ namespace mongo { } batchOp.buildClientResponse( clientResponse ); + + LOG( 4 ) << "finished execution of write batch" + << ( clientResponse->isErrDetailsSet() ? " with write errors" : "") + << ( clientResponse->isErrDetailsSet() && + clientResponse->isWriteConcernErrorSet() ? " and" : "" ) + << ( clientResponse->isWriteConcernErrorSet() ? " with write concern error" : "" ) + << " for " << clientRequest.getNS() << endl; } const BatchWriteExecStats& BatchWriteExec::getStats() { diff --git a/src/mongo/s/write_ops/batched_command_request.h b/src/mongo/s/write_ops/batched_command_request.h index 1afd9c02b96..7b59268484f 100644 --- a/src/mongo/s/write_ops/batched_command_request.h +++ b/src/mongo/s/write_ops/batched_command_request.h @@ -207,6 +207,17 @@ namespace mongo { return _request->getDeleteRequest()->getDeletesAt( _itemIndex ); } + BSONObj toBSON() const { + switch ( getOpType() ) { + case BatchedCommandRequest::BatchType_Insert: + return getDocument(); + case BatchedCommandRequest::BatchType_Update: + return getUpdate()->toBSON(); + default: + return getDelete()->toBSON(); + } + } + private: const BatchedCommandRequest* _request; diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp index 6245619567b..6eadc571449 100644 --- a/src/mongo/s/write_ops/write_op.cpp +++ b/src/mongo/s/write_ops/write_op.cpp @@ -83,7 +83,7 @@ namespace mongo { } else { // TODO: Retry index writes with stale version? - targetStatus = targeter.targetAll( &endpoints ); + targetStatus = targeter.targetCollection( &endpoints ); } if ( !targetStatus.isOK() ) { @@ -95,6 +95,16 @@ namespace mongo { if ( endpoint ) endpoints.push_back( endpoint ); } + // If we're targeting more than one endpoint with an update/delete, we have to target + // everywhere since we cannot currently retry partial results. + // NOTE: Index inserts are currently specially targeted only at the current collection to + // avoid creating collections everywhere. + if ( targetStatus.isOK() && endpoints.size() > 1u && !isIndexInsert ) { + endpointsOwned.clear(); + invariant( endpoints.empty() ); + targetStatus = targeter.targetAllShards( &endpoints ); + } + // If we had an error, stop here if ( !targetStatus.isOK() ) return targetStatus; @@ -107,7 +117,7 @@ namespace mongo { WriteOpRef ref( _itemRef.getItemIndex(), _childOps.size() - 1 ); - // For now, multiple endpoints imply no versioning + // For now, multiple endpoints imply no versioning - we can't retry half a multi-write if ( endpoints.size() == 1u ) { targetedWrites->push_back( new TargetedWrite( *endpoint, ref ) ); } diff --git a/src/mongo/s/write_ops/write_op_test.cpp b/src/mongo/s/write_ops/write_op_test.cpp index 9c0b92f41b7..c7712f5774b 100644 --- a/src/mongo/s/write_ops/write_op_test.cpp +++ b/src/mongo/s/write_ops/write_op_test.cpp @@ -137,16 +137,17 @@ namespace { std::sort( writes->begin(), writes->end(), EndpointComp() ); } - TEST(WriteOpTests, TargetMulti) { + TEST(WriteOpTests, TargetMultiOneShard) { // - // Multi-endpoint targeting test + // Multi-write targeting test where our query goes to one shard // NamespaceString nss( "foo.bar" ); - ShardEndpoint endpointA( "shardA", ChunkVersion::IGNORED() ); - ShardEndpoint endpointB( "shardB", ChunkVersion::IGNORED() ); + ShardEndpoint endpointA( "shardA", ChunkVersion(10, 0, OID()) ); + ShardEndpoint endpointB( "shardB", ChunkVersion(20, 0, OID()) ); + ShardEndpoint endpointC( "shardB", ChunkVersion(20, 0, OID()) ); vector mockRanges; mockRanges.push_back( new MockRange( endpointA, @@ -156,6 +157,63 @@ namespace { mockRanges.push_back( new MockRange( endpointB, nss, BSON( "x" << 0 ), + BSON( "x" << 10 ) ) ); + mockRanges.push_back( new MockRange( endpointC, + nss, + BSON( "x" << 10 ), + BSON( "x" << MAXKEY ) ) ); + + BatchedCommandRequest request( BatchedCommandRequest::BatchType_Delete ); + request.setNS( nss.ns() ); + // Only hits first shard + BSONObj query = BSON( "x" << GTE << -2 << LT << -1 ); + request.getDeleteRequest()->addToDeletes( buildDeleteDoc( BSON( "q" << query ) ) ); + + WriteOp writeOp( BatchItemRef( &request, 0 ) ); + ASSERT_EQUALS( writeOp.getWriteState(), WriteOpState_Ready ); + + MockNSTargeter targeter; + targeter.init( mockRanges ); + + OwnedPointerVector targetedOwned; + vector& targeted = targetedOwned.mutableVector(); + Status status = writeOp.targetWrites( targeter, &targeted ); + + ASSERT( status.isOK() ); + ASSERT_EQUALS( writeOp.getWriteState(), WriteOpState_Pending ); + ASSERT_EQUALS( targeted.size(), 1u ); + assertEndpointsEqual( targeted.front()->endpoint, endpointA ); + + writeOp.noteWriteComplete( *targeted.front() ); + + ASSERT_EQUALS( writeOp.getWriteState(), WriteOpState_Completed ); + + } + + TEST(WriteOpTests, TargetMultiAllShards) { + + // + // Multi-write targeting test where our write goes to more than one shard + // + + NamespaceString nss( "foo.bar" ); + + ShardEndpoint endpointA( "shardA", ChunkVersion(10, 0, OID()) ); + ShardEndpoint endpointB( "shardB", ChunkVersion(20, 0, OID()) ); + ShardEndpoint endpointC( "shardB", ChunkVersion(20, 0, OID()) ); + + vector mockRanges; + mockRanges.push_back( new MockRange( endpointA, + nss, + BSON( "x" << MINKEY ), + BSON( "x" << 0 ) ) ); + mockRanges.push_back( new MockRange( endpointB, + nss, + BSON( "x" << 0 ), + BSON( "x" << 10 ) ) ); + mockRanges.push_back( new MockRange( endpointC, + nss, + BSON( "x" << 10 ), BSON( "x" << MAXKEY ) ) ); BatchedCommandRequest request( BatchedCommandRequest::BatchType_Delete ); @@ -177,13 +235,19 @@ namespace { ASSERT( status.isOK() ); ASSERT_EQUALS( writeOp.getWriteState(), WriteOpState_Pending ); - ASSERT_EQUALS( targeted.size(), 2u ); + ASSERT_EQUALS( targeted.size(), 3u ); sortByEndpoint( &targeted ); - assertEndpointsEqual( targeted.front()->endpoint, endpointA ); - assertEndpointsEqual( targeted.back()->endpoint, endpointB ); + ASSERT_EQUALS( targeted[0]->endpoint.shardName, endpointA.shardName ); + ASSERT( ChunkVersion::isIgnoredVersion( targeted[0]->endpoint.shardVersion ) ); + ASSERT_EQUALS( targeted[1]->endpoint.shardName, endpointB.shardName ); + ASSERT( ChunkVersion::isIgnoredVersion( targeted[1]->endpoint.shardVersion ) ); + ASSERT_EQUALS( targeted[2]->endpoint.shardName, endpointC.shardName ); + ASSERT( ChunkVersion::isIgnoredVersion( targeted[2]->endpoint.shardVersion ) ); + + writeOp.noteWriteComplete( *targeted[0] ); + writeOp.noteWriteComplete( *targeted[1] ); + writeOp.noteWriteComplete( *targeted[2] ); - writeOp.noteWriteComplete( *targeted.front() ); - writeOp.noteWriteComplete( *targeted.back() ); ASSERT_EQUALS( writeOp.getWriteState(), WriteOpState_Completed ); } -- cgit v1.2.1