summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGreg Studer <greg@10gen.com>2014-04-01 15:40:26 -0400
committerGreg Studer <greg@10gen.com>2014-04-01 20:30:27 -0400
commitf93d14738c1abaa31b44a0af87700e72fd3979d3 (patch)
tree7e5dcd81f67c5ea0570487b31ce7ce6e23e1faa6
parentc7a673410edd0c9ff6a17ff333f72fb43c82fb2c (diff)
downloadmongo-f93d14738c1abaa31b44a0af87700e72fd3979d3.tar.gz
SERVER-13448 broadcast updates/deletes to all shards when more than one targeted
(cherry picked from commit 9b17edd15ffb0ba3a3a9309fafd9bd4fa04a4d92)
-rw-r--r--jstests/sharding/multi_write_target.js88
-rw-r--r--src/mongo/s/chunk_manager_targeter.cpp26
-rw-r--r--src/mongo/s/chunk_manager_targeter.h4
-rw-r--r--src/mongo/s/mock_ns_targeter.h14
-rw-r--r--src/mongo/s/ns_targeter.h9
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.cpp25
-rw-r--r--src/mongo/s/write_ops/batched_command_request.h11
-rw-r--r--src/mongo/s/write_ops/write_op.cpp14
-rw-r--r--src/mongo/s/write_ops/write_op_test.cpp82
9 files changed, 258 insertions, 15 deletions
diff --git a/jstests/sharding/multi_write_target.js b/jstests/sharding/multi_write_target.js
new file mode 100644
index 00000000000..52ce36a83e0
--- /dev/null
+++ b/jstests/sharding/multi_write_target.js
@@ -0,0 +1,88 @@
+//
+// Tests that multi-writes (update/delete) target *all* shards and not just shards in the collection
+//
+
+var options = { separateConfig : true };
+
+var st = new ShardingTest({ shards : 3, mongos : 1, other : options });
+st.stopBalancer();
+
+var mongos = st.s0;
+var admin = mongos.getDB( "admin" );
+var shards = mongos.getCollection( "config.shards" ).find().toArray();
+var coll = mongos.getCollection( "foo.bar" );
+
+assert( admin.runCommand({ enableSharding : coll.getDB() + "" }).ok );
+printjson( admin.runCommand({ movePrimary : coll.getDB() + "", to : shards[0]._id }) );
+assert( admin.runCommand({ shardCollection : coll + "", key : { skey : 1 } }).ok );
+assert( admin.runCommand({ split : coll + "", middle : { skey : 0 } }).ok );
+assert( admin.runCommand({ moveChunk : coll + "",
+ find : { skey : 0 },
+ to : shards[1]._id }).ok );
+
+st.printShardingStatus();
+
+jsTest.log("Testing multi-update...");
+
+// Put data on all shards
+st.shard0.getCollection(coll.toString()).insert({ _id : 0, skey : -1, x : 1 });
+assert.gleOK(st.shard0.getCollection(coll.toString()).getDB().getLastErrorObj());
+st.shard1.getCollection(coll.toString()).insert({ _id : 1, skey : 1, x : 1 });
+assert.gleOK(st.shard1.getCollection(coll.toString()).getDB().getLastErrorObj());
+// Data not in chunks
+st.shard2.getCollection(coll.toString()).insert({ _id : 0, x : 1 });
+assert.gleOK(st.shard2.getCollection(coll.toString()).getDB().getLastErrorObj());
+
+// Non-multi-update doesn't work without shard key
+coll.update({ x : 1 }, { $set : { updated : true } }, { multi : false });
+assert.gleError(coll.getDB().getLastErrorObj());
+
+coll.update({ x : 1 }, { $set : { updated : true } }, { multi : true });
+assert.gleOK(coll.getDB().getLastErrorObj());
+
+// Ensure update goes to *all* shards
+assert.neq(null, st.shard0.getCollection(coll.toString()).findOne({ updated : true }));
+assert.neq(null, st.shard1.getCollection(coll.toString()).findOne({ updated : true }));
+assert.neq(null, st.shard2.getCollection(coll.toString()).findOne({ updated : true }));
+
+// _id update works, and goes to all shards
+coll.update({ _id : 0 }, { $set : { updatedById : true } }, { multi : false });
+assert.gleOK(coll.getDB().getLastErrorObj());
+
+// Ensure _id update goes to *all* shards
+assert.neq(null, st.shard0.getCollection(coll.toString()).findOne({ updatedById : true }));
+assert.neq(null, st.shard2.getCollection(coll.toString()).findOne({ updatedById : true }));
+
+jsTest.log("Testing multi-delete...");
+
+// non-multi-delete doesn't work without shard key
+coll.remove({ x : 1 }, { justOne : true });
+assert.gleError(coll.getDB().getLastErrorObj());
+
+coll.remove({ x : 1 }, { justOne : false });
+assert.gleOK(coll.getDB().getLastErrorObj());
+
+// Ensure delete goes to *all* shards
+assert.eq(null, st.shard0.getCollection(coll.toString()).findOne({ x : 1 }));
+assert.eq(null, st.shard1.getCollection(coll.toString()).findOne({ x : 1 }));
+assert.eq(null, st.shard2.getCollection(coll.toString()).findOne({ x : 1 }));
+
+// Put more on all shards
+st.shard0.getCollection(coll.toString()).insert({ _id : 0, skey : -1, x : 1 });
+assert.gleOK(st.shard0.getCollection(coll.toString()).getDB().getLastErrorObj());
+st.shard1.getCollection(coll.toString()).insert({ _id : 1, skey : 1, x : 1 });
+assert.gleOK(st.shard1.getCollection(coll.toString()).getDB().getLastErrorObj());
+// Data not in chunks
+st.shard2.getCollection(coll.toString()).insert({ _id : 0, x : 1 });
+assert.gleOK(st.shard2.getCollection(coll.toString()).getDB().getLastErrorObj());
+
+coll.remove({ _id : 0 }, { justOne : true });
+assert.gleOK(coll.getDB().getLastErrorObj());
+
+// Ensure _id delete goes to *all* shards
+assert.eq(null, st.shard0.getCollection(coll.toString()).findOne({ x : 1 }));
+assert.eq(null, st.shard2.getCollection(coll.toString()).findOne({ x : 1 }));
+
+jsTest.log( "DONE!" );
+
+st.stop();
diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp
index 3054d4ed700..ebd7a4eee68 100644
--- a/src/mongo/s/chunk_manager_targeter.cpp
+++ b/src/mongo/s/chunk_manager_targeter.cpp
@@ -307,7 +307,9 @@ namespace mongo {
return Status::OK();
}
- Status ChunkManagerTargeter::targetAll( vector<ShardEndpoint*>* endpoints ) const {
+
+
+ Status ChunkManagerTargeter::targetCollection( vector<ShardEndpoint*>* endpoints ) const {
if ( !_primary && !_manager ) {
return Status( ErrorCodes::NamespaceNotFound,
@@ -334,6 +336,28 @@ namespace mongo {
return Status::OK();
}
+ Status ChunkManagerTargeter::targetAllShards( vector<ShardEndpoint*>* endpoints ) const {
+
+ if ( !_primary && !_manager ) {
+ return Status( ErrorCodes::NamespaceNotFound,
+ str::stream() << "could not target every shard with versions for "
+ << getNS().ns()
+ << "; metadata not found" );
+ }
+
+ vector<Shard> shards;
+ Shard::getAllShards( shards );
+
+ for ( vector<Shard>::iterator it = shards.begin(); it != shards.end(); ++it ) {
+ endpoints->push_back( new ShardEndpoint( it->getName(),
+ _manager ?
+ _manager->getVersion( *it ) :
+ ChunkVersion::UNSHARDED() ) );
+ }
+
+ return Status::OK();
+ }
+
namespace {
//
diff --git a/src/mongo/s/chunk_manager_targeter.h b/src/mongo/s/chunk_manager_targeter.h
index bb1f63e283b..bd9e46adb0a 100644
--- a/src/mongo/s/chunk_manager_targeter.h
+++ b/src/mongo/s/chunk_manager_targeter.h
@@ -73,7 +73,9 @@ namespace mongo {
Status targetDelete( const BatchedDeleteDocument& deleteDoc,
std::vector<ShardEndpoint*>* endpoints ) const;
- Status targetAll( std::vector<ShardEndpoint*>* endpoints ) const;
+ Status targetCollection( std::vector<ShardEndpoint*>* endpoints ) const;
+
+ Status targetAllShards( std::vector<ShardEndpoint*>* endpoints ) const;
void noteStaleResponse( const ShardEndpoint& endpoint, const BSONObj& staleInfo );
diff --git a/src/mongo/s/mock_ns_targeter.h b/src/mongo/s/mock_ns_targeter.h
index 248ccb445ac..d51ea5de3b6 100644
--- a/src/mongo/s/mock_ns_targeter.h
+++ b/src/mongo/s/mock_ns_targeter.h
@@ -119,12 +119,24 @@ namespace mongo {
return targetQuery( deleteDoc.getQuery(), endpoints );
}
- Status targetAll( std::vector<ShardEndpoint*>* endpoints ) const {
+ Status targetCollection( std::vector<ShardEndpoint*>* endpoints ) const {
// TODO: XXX
// No-op
return Status::OK();
}
+ Status targetAllShards( std::vector<ShardEndpoint*>* endpoints ) const {
+ const std::vector<MockRange*>& ranges = getRanges();
+ for ( std::vector<MockRange*>::const_iterator it = ranges.begin(); it != ranges.end();
+ ++it ) {
+
+ const MockRange* range = *it;
+ endpoints->push_back( new ShardEndpoint( range->endpoint ) );
+ }
+
+ return Status::OK();
+ }
+
void noteCouldNotTarget() {
// No-op
}
diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h
index 4baae5bd6cf..1d5cc78f323 100644
--- a/src/mongo/s/ns_targeter.h
+++ b/src/mongo/s/ns_targeter.h
@@ -106,7 +106,14 @@ namespace mongo {
*
* Returns !OK with message if the full collection could not be targeted.
*/
- virtual Status targetAll( std::vector<ShardEndpoint*>* endpoints ) const = 0;
+ virtual Status targetCollection( std::vector<ShardEndpoint*>* endpoints ) const = 0;
+
+ /**
+ * Returns a vector of ShardEndpoints for all shards.
+ *
+ * Returns !OK with message if all shards could not be targeted.
+ */
+ virtual Status targetAllShards( std::vector<ShardEndpoint*>* endpoints ) const = 0;
/**
* Informs the targeter that a targeting failure occurred during one of the last targeting
diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp
index 3c2190add51..1ed043021c6 100644
--- a/src/mongo/s/write_ops/batch_write_exec.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec.cpp
@@ -89,6 +89,10 @@ namespace mongo {
void BatchWriteExec::executeBatch( const BatchedCommandRequest& clientRequest,
BatchedCommandResponse* clientResponse ) {
+ LOG( 4 ) << "starting execution of write batch of size "
+ << static_cast<int>( clientRequest.sizeWriteOps() )
+ << " for " << clientRequest.getNS() << endl;
+
BatchWriteOp batchOp;
batchOp.initClientRequest( &clientRequest );
@@ -184,6 +188,10 @@ namespace mongo {
// cancel and retarget the batch
WriteErrorDetail error;
buildErrorFrom( resolveStatus, &error );
+
+ LOG( 4 ) << "unable to send write batch to " << shardHost.toString()
+ << causedBy( resolveStatus.toString() ) << endl;
+
batchOp.noteBatchError( *nextBatch, error );
// We're done with this batch
@@ -210,6 +218,9 @@ namespace mongo {
NamespaceString nss( request.getNS() );
request.setNS( nss.coll() );
+ LOG( 4 ) << "sending write batch to " << shardHost.toString() << ": "
+ << request.toString() << endl;
+
_dispatcher->addCommand( shardHost, nss.db(), request );
// Indicate we're done by setting the batch to NULL
@@ -246,6 +257,9 @@ namespace mongo {
TrackedErrors trackedErrors;
trackedErrors.startTracking( ErrorCodes::StaleShardVersion );
+ LOG( 4 ) << "write results received from " << shardHost.toString() << ": "
+ << response.toString() << endl;
+
// Dispatch was ok, note response
batchOp.noteBatchResponse( *batch, response, &trackedErrors );
@@ -283,6 +297,10 @@ namespace mongo {
WriteErrorDetail error;
buildErrorFrom( Status( ErrorCodes::RemoteResultsUnavailable, msg.str() ),
&error );
+
+ LOG( 4 ) << "unable to receive write results from " << shardHost.toString()
+ << causedBy( dispatchStatus.toString() ) << endl;
+
batchOp.noteBatchError( *batch, error );
}
}
@@ -341,6 +359,13 @@ namespace mongo {
}
batchOp.buildClientResponse( clientResponse );
+
+ LOG( 4 ) << "finished execution of write batch"
+ << ( clientResponse->isErrDetailsSet() ? " with write errors" : "")
+ << ( clientResponse->isErrDetailsSet() &&
+ clientResponse->isWriteConcernErrorSet() ? " and" : "" )
+ << ( clientResponse->isWriteConcernErrorSet() ? " with write concern error" : "" )
+ << " for " << clientRequest.getNS() << endl;
}
const BatchWriteExecStats& BatchWriteExec::getStats() {
diff --git a/src/mongo/s/write_ops/batched_command_request.h b/src/mongo/s/write_ops/batched_command_request.h
index 1afd9c02b96..7b59268484f 100644
--- a/src/mongo/s/write_ops/batched_command_request.h
+++ b/src/mongo/s/write_ops/batched_command_request.h
@@ -207,6 +207,17 @@ namespace mongo {
return _request->getDeleteRequest()->getDeletesAt( _itemIndex );
}
+ BSONObj toBSON() const {
+ switch ( getOpType() ) {
+ case BatchedCommandRequest::BatchType_Insert:
+ return getDocument();
+ case BatchedCommandRequest::BatchType_Update:
+ return getUpdate()->toBSON();
+ default:
+ return getDelete()->toBSON();
+ }
+ }
+
private:
const BatchedCommandRequest* _request;
diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp
index 6245619567b..6eadc571449 100644
--- a/src/mongo/s/write_ops/write_op.cpp
+++ b/src/mongo/s/write_ops/write_op.cpp
@@ -83,7 +83,7 @@ namespace mongo {
}
else {
// TODO: Retry index writes with stale version?
- targetStatus = targeter.targetAll( &endpoints );
+ targetStatus = targeter.targetCollection( &endpoints );
}
if ( !targetStatus.isOK() ) {
@@ -95,6 +95,16 @@ namespace mongo {
if ( endpoint ) endpoints.push_back( endpoint );
}
+ // If we're targeting more than one endpoint with an update/delete, we have to target
+ // everywhere since we cannot currently retry partial results.
+ // NOTE: Index inserts are currently specially targeted only at the current collection to
+ // avoid creating collections everywhere.
+ if ( targetStatus.isOK() && endpoints.size() > 1u && !isIndexInsert ) {
+ endpointsOwned.clear();
+ invariant( endpoints.empty() );
+ targetStatus = targeter.targetAllShards( &endpoints );
+ }
+
// If we had an error, stop here
if ( !targetStatus.isOK() ) return targetStatus;
@@ -107,7 +117,7 @@ namespace mongo {
WriteOpRef ref( _itemRef.getItemIndex(), _childOps.size() - 1 );
- // For now, multiple endpoints imply no versioning
+ // For now, multiple endpoints imply no versioning - we can't retry half a multi-write
if ( endpoints.size() == 1u ) {
targetedWrites->push_back( new TargetedWrite( *endpoint, ref ) );
}
diff --git a/src/mongo/s/write_ops/write_op_test.cpp b/src/mongo/s/write_ops/write_op_test.cpp
index 9c0b92f41b7..c7712f5774b 100644
--- a/src/mongo/s/write_ops/write_op_test.cpp
+++ b/src/mongo/s/write_ops/write_op_test.cpp
@@ -137,16 +137,17 @@ namespace {
std::sort( writes->begin(), writes->end(), EndpointComp() );
}
- TEST(WriteOpTests, TargetMulti) {
+ TEST(WriteOpTests, TargetMultiOneShard) {
//
- // Multi-endpoint targeting test
+ // Multi-write targeting test where our query goes to one shard
//
NamespaceString nss( "foo.bar" );
- ShardEndpoint endpointA( "shardA", ChunkVersion::IGNORED() );
- ShardEndpoint endpointB( "shardB", ChunkVersion::IGNORED() );
+ ShardEndpoint endpointA( "shardA", ChunkVersion(10, 0, OID()) );
+ ShardEndpoint endpointB( "shardB", ChunkVersion(20, 0, OID()) );
+ ShardEndpoint endpointC( "shardB", ChunkVersion(20, 0, OID()) );
vector<MockRange*> mockRanges;
mockRanges.push_back( new MockRange( endpointA,
@@ -156,6 +157,63 @@ namespace {
mockRanges.push_back( new MockRange( endpointB,
nss,
BSON( "x" << 0 ),
+ BSON( "x" << 10 ) ) );
+ mockRanges.push_back( new MockRange( endpointC,
+ nss,
+ BSON( "x" << 10 ),
+ BSON( "x" << MAXKEY ) ) );
+
+ BatchedCommandRequest request( BatchedCommandRequest::BatchType_Delete );
+ request.setNS( nss.ns() );
+ // Only hits first shard
+ BSONObj query = BSON( "x" << GTE << -2 << LT << -1 );
+ request.getDeleteRequest()->addToDeletes( buildDeleteDoc( BSON( "q" << query ) ) );
+
+ WriteOp writeOp( BatchItemRef( &request, 0 ) );
+ ASSERT_EQUALS( writeOp.getWriteState(), WriteOpState_Ready );
+
+ MockNSTargeter targeter;
+ targeter.init( mockRanges );
+
+ OwnedPointerVector<TargetedWrite> targetedOwned;
+ vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();
+ Status status = writeOp.targetWrites( targeter, &targeted );
+
+ ASSERT( status.isOK() );
+ ASSERT_EQUALS( writeOp.getWriteState(), WriteOpState_Pending );
+ ASSERT_EQUALS( targeted.size(), 1u );
+ assertEndpointsEqual( targeted.front()->endpoint, endpointA );
+
+ writeOp.noteWriteComplete( *targeted.front() );
+
+ ASSERT_EQUALS( writeOp.getWriteState(), WriteOpState_Completed );
+
+ }
+
+ TEST(WriteOpTests, TargetMultiAllShards) {
+
+ //
+ // Multi-write targeting test where our write goes to more than one shard
+ //
+
+ NamespaceString nss( "foo.bar" );
+
+ ShardEndpoint endpointA( "shardA", ChunkVersion(10, 0, OID()) );
+ ShardEndpoint endpointB( "shardB", ChunkVersion(20, 0, OID()) );
+ ShardEndpoint endpointC( "shardB", ChunkVersion(20, 0, OID()) );
+
+ vector<MockRange*> mockRanges;
+ mockRanges.push_back( new MockRange( endpointA,
+ nss,
+ BSON( "x" << MINKEY ),
+ BSON( "x" << 0 ) ) );
+ mockRanges.push_back( new MockRange( endpointB,
+ nss,
+ BSON( "x" << 0 ),
+ BSON( "x" << 10 ) ) );
+ mockRanges.push_back( new MockRange( endpointC,
+ nss,
+ BSON( "x" << 10 ),
BSON( "x" << MAXKEY ) ) );
BatchedCommandRequest request( BatchedCommandRequest::BatchType_Delete );
@@ -177,13 +235,19 @@ namespace {
ASSERT( status.isOK() );
ASSERT_EQUALS( writeOp.getWriteState(), WriteOpState_Pending );
- ASSERT_EQUALS( targeted.size(), 2u );
+ ASSERT_EQUALS( targeted.size(), 3u );
sortByEndpoint( &targeted );
- assertEndpointsEqual( targeted.front()->endpoint, endpointA );
- assertEndpointsEqual( targeted.back()->endpoint, endpointB );
+ ASSERT_EQUALS( targeted[0]->endpoint.shardName, endpointA.shardName );
+ ASSERT( ChunkVersion::isIgnoredVersion( targeted[0]->endpoint.shardVersion ) );
+ ASSERT_EQUALS( targeted[1]->endpoint.shardName, endpointB.shardName );
+ ASSERT( ChunkVersion::isIgnoredVersion( targeted[1]->endpoint.shardVersion ) );
+ ASSERT_EQUALS( targeted[2]->endpoint.shardName, endpointC.shardName );
+ ASSERT( ChunkVersion::isIgnoredVersion( targeted[2]->endpoint.shardVersion ) );
+
+ writeOp.noteWriteComplete( *targeted[0] );
+ writeOp.noteWriteComplete( *targeted[1] );
+ writeOp.noteWriteComplete( *targeted[2] );
- writeOp.noteWriteComplete( *targeted.front() );
- writeOp.noteWriteComplete( *targeted.back() );
ASSERT_EQUALS( writeOp.getWriteState(), WriteOpState_Completed );
}