diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2017-01-20 18:58:49 -0500 |
---|---|---|
committer | Esha Maharishi <esha.maharishi@mongodb.com> | 2017-01-23 15:19:28 -0500 |
commit | ac5d193edf5c1e170119871dd4bfdc5a839fc1cf (patch) | |
tree | a428118e57471324f362dd602306e375424e0fcc | |
parent | 7480e053bb992f869bf83c8e54ee088afa199bb9 (diff) | |
download | mongo-ac5d193edf5c1e170119871dd4bfdc5a839fc1cf.tar.gz |
SERVER-27625 remove dead ANSA and setShardVersion code
30 files changed, 228 insertions, 1986 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml index 462d8cff458..ea23207f1f2 100644 --- a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml +++ b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml @@ -99,7 +99,6 @@ selector: - jstests/sharding/mongos_no_replica_set_refresh.js - jstests/sharding/movechunk_interrupt_at_primary_stepdown.js - jstests/sharding/primary_config_server_blackholed_from_mongos.js - - jstests/sharding/shard_aware_on_config_election.js # Nothing is affected by config server step down - jstests/sharding/basic_sharding_params.js # ShardingTest is never used, so continuous step down thread never starts diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js index 361cb97c127..06655890f37 100644 --- a/jstests/core/views/views_all_commands.js +++ b/jstests/core/views/views_all_commands.js @@ -451,7 +451,7 @@ }, skipSharded: true, expectFailure: true, - expectedErrorCode: null, + expectedErrorCode: 193, isAdminCommand: true, }, splitVector: { diff --git a/jstests/sharding/addshard2.js b/jstests/sharding/addshard2.js index a7912c8762d..30b7d4365ff 100644 --- a/jstests/sharding/addshard2.js +++ b/jstests/sharding/addshard2.js @@ -51,18 +51,9 @@ }, "removeShard never completed for shard " + shardName); }; - // Enable the failpoint that prevents the config server from upserting a shardIdentity on new - // shards so that the same shard host can be re-used for multiple addShard calls without being - // restarted in between each addShard (the shardIdentity cannot be deleted while the shard host - // is running with --shardsvr). var st = new ShardingTest({ shards: 0, mongos: 1, - other: { - configOptions: { - setParameter: "failpoint.dontUpsertShardIdentityOnNewShards={'mode':'alwaysOn'}" - } - } }); // Add one shard since the last shard cannot be removed. @@ -72,6 +63,13 @@ // Allocate a port that can be used to test adding invalid hosts. var portWithoutHostRunning = allocatePort(); + // Enable the failpoint that prevents the config server from upserting a shardIdentity on new + // shards so that the same shard host can be re-used for multiple addShard calls without being + // restarted in between each addShard (the shardIdentity cannot be deleted while the shard host + // is running with --shardsvr). + st.configRS.getPrimary().adminCommand( + {configureFailPoint: "dontUpsertShardIdentityOnNewShards", mode: "alwaysOn"}); + // 1. Test adding a *standalone* // 1.a. with or without specifying the shardName. @@ -169,15 +167,34 @@ // 4. Test that a replica set whose *set name* is "admin" can be written to (SERVER-17232). + // Turn off the dontUpsertShardIdentityOnNewShards failpoint, since mongos will send + // setShardVersion when trying to do the write, and the setShardVersion will fail if the + // sharding state will not be enabled. + assert.commandWorked(st.configRS.getPrimary().adminCommand( + {configureFailPoint: "dontUpsertShardIdentityOnNewShards", mode: "off"})); + rst = new ReplSetTest({name: "admin", nodes: 1}); rst.startSet({shardsvr: ''}); rst.initiate(); jsTest.log("A replica set whose set name is 'admin' should be able to be written to."); + addShardRes = st.s.adminCommand({addShard: rst.getURL()}); assertAddShardSucceeded(addShardRes); + + // Ensure the write goes to the newly added shard. + assert.commandWorked(st.s.getDB('test').runCommand({create: "foo"})); + var res = st.s.getDB('config').getCollection('databases').findOne({_id: 'test'}); + assert.neq(null, res); + if (res.primary != addShardRes.shardAdded) { + assert.commandWorked(st.s.adminCommand({movePrimary: 'test', to: addShardRes.shardAdded})); + } + assert.writeOK(st.s.getDB('test').foo.insert({x: 1})); + assert.neq(null, rst.getPrimary().getDB('test').foo.findOne()); + assert.commandWorked(st.s.getDB('test').runCommand({dropDatabase: 1})); + removeShardWithName(addShardRes.shardAdded); rst.stopSet(); diff --git a/jstests/sharding/delayed_shard_identity_upsert.js b/jstests/sharding/delayed_shard_identity_upsert.js deleted file mode 100644 index d2d3a59054f..00000000000 --- a/jstests/sharding/delayed_shard_identity_upsert.js +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Tests that a variety of operations from a mongos to a shard succeed even during the period when - * the shard has yet to receive the shardIdentity from the config server. - */ -(function() { - 'use strict'; - - // Simulate that the insert of the shardIdentity doc from the config to a new shard gets - // "delayed" by using the dontUpsertShardIdentityOnNewShards failpoint on the configs. - var st = new ShardingTest({ - shards: 3, - mongos: 1, - other: { - rs: true, - rsOptions: {nodes: 1}, - configOptions: { - setParameter: "failpoint.dontUpsertShardIdentityOnNewShards={'mode':'alwaysOn'}" - } - } - }); - - var testDB = st.s.getDB("test"); - assert.commandWorked(testDB.adminCommand({enableSharding: testDB.getName()})); - st.ensurePrimaryShard(testDB.getName(), st.shard0.shardName); - - // Create a collection sharded on {a: 1}. Add 2dsphere index to test geoNear. - var coll = testDB.getCollection("sharded"); - coll.drop(); - assert.commandWorked(coll.createIndex({a: 1})); - assert.commandWorked(coll.createIndex({geo: "2dsphere"})); - assert.commandWorked(testDB.adminCommand({shardCollection: coll.getFullName(), key: {a: 1}})); - - // Split the collection. - // shard0000: { "a" : { "$minKey" : 1 } } -->> { "a" : 1 } - // shard0001: { "a" : 1 } -->> { "a" : 10 } - // shard0002: { "a" : 10 } -->> { "a" : { "$maxKey" : 1 }} - var chunk2Min = 1; - var chunk3Min = 10; - assert.commandWorked(testDB.adminCommand({split: coll.getFullName(), middle: {a: chunk2Min}})); - assert.commandWorked(testDB.adminCommand({split: coll.getFullName(), middle: {a: chunk3Min}})); - assert.commandWorked(testDB.adminCommand( - {moveChunk: coll.getFullName(), find: {a: 5}, to: st.shard1.shardName})); - assert.commandWorked(testDB.adminCommand( - {moveChunk: coll.getFullName(), find: {a: 15}, to: st.shard2.shardName})); - - // Put data on each shard. - // Note that the balancer is off by default, so the chunks will stay put. - // shard0000: {a: 0} - // shard0001: {a: 2}, {a: 4} - // shard0002: {a: 15} - // Include geo field to test geoNear. - var a_0 = {_id: 0, a: 0, geo: {type: "Point", coordinates: [0, 0]}}; - var a_2 = {_id: 1, a: 2, geo: {type: "Point", coordinates: [0, 0]}}; - var a_4 = {_id: 2, a: 4, geo: {type: "Point", coordinates: [0, 0]}}; - var a_15 = {_id: 3, a: 15, geo: {type: "Point", coordinates: [0, 0]}}; - assert.writeOK(coll.insert(a_0)); - assert.writeOK(coll.insert(a_2)); - assert.writeOK(coll.insert(a_4)); - assert.writeOK(coll.insert(a_15)); - - // Aggregate and aggregate explain. - assert.eq(3, coll.aggregate([{$match: {a: {$lt: chunk3Min}}}]).itcount()); - assert.commandWorked(coll.explain().aggregate([{$match: {a: {$lt: chunk3Min}}}])); - - // Count and count explain. - assert.eq(3, coll.find({a: {$lt: chunk3Min}}).count()); - assert.commandWorked(coll.explain().find({a: {$lt: chunk3Min}}).count()); - - // Distinct and distinct explain. - assert.eq(3, coll.distinct("_id", {a: {$lt: chunk3Min}}).length); - assert.commandWorked(coll.explain().distinct("_id", {a: {$lt: chunk3Min}})); - - // Find and find explain. - assert.eq(3, coll.find({a: {$lt: chunk3Min}}).itcount()); - assert.commandWorked(coll.find({a: {$lt: chunk3Min}}).explain()); - - // FindAndModify and findAndModify explain. - assert.eq(0, coll.findAndModify({query: {a: 0}, update: {$set: {b: 1}}}).a); - assert.commandWorked(coll.explain().findAndModify({query: {a: 0}, update: {$set: {b: 1}}})); - - // GeoNear. - assert.eq(3, - assert - .commandWorked(testDB.runCommand({ - geoNear: coll.getName(), - near: {type: "Point", coordinates: [0, 0]}, - spherical: true, - query: {a: {$lt: chunk3Min}}, - })) - .results.length); - - // MapReduce. - assert.eq(3, - assert - .commandWorked(coll.mapReduce( - function() { - emit(this.a, 1); - }, - function(key, values) { - return Array.sum(values); - }, - {out: {inline: 1}, query: {a: {$lt: chunk3Min}}})) - .results.length); - - // Remove and remove explain. - var writeRes = coll.remove({a: {$lt: chunk3Min}}); - assert.writeOK(writeRes); - assert.eq(3, writeRes.nRemoved); - assert.commandWorked(coll.explain().remove({a: {$lt: chunk3Min}})); - assert.writeOK(coll.insert(a_0)); - assert.writeOK(coll.insert(a_2)); - assert.writeOK(coll.insert(a_4)); - - // Update and update explain. - writeRes = coll.update({a: {$lt: chunk3Min}}, {$set: {b: 1}}, {multi: true}); - assert.writeOK(writeRes); - assert.eq(3, writeRes.nMatched); - assert.commandWorked( - coll.explain().update({a: {$lt: chunk3Min}}, {$set: {b: 1}}, {multi: true})); - - // Assert that the shardIdentity document has still not "reached" any shard, meaning all of the - // above commands indeed succeeded during the period that the shardIdentity insert was - // "delayed." - for (shard in st.shards) { - var res = shard.getDB("admin").getCollection("system.version").find({_id: "shardIdentity"}); - assert.eq(null, res); - } - - st.stop(); - -})(); diff --git a/jstests/sharding/replmonitor_bad_seed.js b/jstests/sharding/replmonitor_bad_seed.js index e619af509e1..efb84783bd4 100644 --- a/jstests/sharding/replmonitor_bad_seed.js +++ b/jstests/sharding/replmonitor_bad_seed.js @@ -25,12 +25,12 @@ // The cluster now has the shard information. Then kill the replica set so when mongos restarts // and tries to create a ReplSetMonitor for that shard, it will not be able to connect to any of // the seed servers. - replTest.stopSet(); + // Don't clear the data directory so that the shardIdentity is not deleted. + replTest.stopSet(undefined /* send default signal */, true /* don't clear data directory */); st.restartMongos(0); - replTest.startSet({restart: true}); - replTest.initiate(); + replTest.startSet({restart: true, noCleanData: true}); replTest.awaitSecondaryNodes(); // Verify that the replSetMonitor can reach the restarted set diff --git a/jstests/sharding/shard_aware_on_config_election.js b/jstests/sharding/shard_aware_on_config_election.js deleted file mode 100644 index a885a37455a..00000000000 --- a/jstests/sharding/shard_aware_on_config_election.js +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Tests that, on transition to primary, a config server initializes sharding awareness on all - * shards not marked as sharding aware in config.shards. - * - * This test restarts shard and config server nodes. - * @tags: [requires_persistence] - */ - -(function() { - "use strict"; - - var waitForIsMaster = function(conn) { - assert.soon(function() { - var res = conn.getDB('admin').runCommand({isMaster: 1}); - return res.ismaster; - }); - }; - - var checkShardingStateInitialized = function(conn, configConnStr, shardName, clusterId) { - var res = conn.getDB('admin').runCommand({shardingState: 1}); - assert.commandWorked(res); - assert(res.enabled); - assert.eq(configConnStr, res.configServer); - assert.eq(shardName, res.shardName); - assert(clusterId.equals(res.clusterId), - 'cluster id: ' + tojson(clusterId) + ' != ' + tojson(res.clusterId)); - }; - - var checkShardMarkedAsShardAware = function(mongosConn, shardName) { - var res = mongosConn.getDB('config').getCollection('shards').findOne({_id: shardName}); - assert.neq(null, res, "Could not find new shard " + shardName + " in config.shards"); - assert.eq(1, res.state); - }; - - var waitUntilShardingStateInitialized = function(conn, configConnStr, shardName, clusterId) { - assert.soon(function() { - var res = conn.getDB('admin').runCommand({shardingState: 1}); - assert.commandWorked(res); - if (res.enabled && (configConnStr === res.configServer) && - (shardName === res.shardName) && (clusterId.equals(res.clusterId))) { - return true; - } - return false; - }); - }; - - var waitUntilShardMarkedAsShardAware = function(mongosConn, shardName) { - assert.soon(function() { - var res = mongosConn.getDB('config').getCollection('shards').findOne({_id: shardName}); - assert.neq(null, res, "Could not find new shard " + shardName + " in config.shards"); - if (res.state && res.state === 1) { - return true; - } - return false; - }); - }; - - var numShards = 2; - var st = new ShardingTest({shards: numShards, other: {rs: true}}); - var clusterId = st.s.getDB('config').getCollection('version').findOne().clusterId; - - var restartedShards = []; - for (var i = 0; i < numShards; i++) { - var rst = st["rs" + i]; - - jsTest.log("Assert that shard " + rst.name + - " is sharding aware and was marked as sharding aware in config.shards"); - checkShardingStateInitialized(rst.getPrimary(), st.configRS.getURL(), rst.name, clusterId); - checkShardMarkedAsShardAware(st.s, rst.name); - - jsTest.log("Restart " + rst.name + " without --shardsvr to clear its sharding awareness"); - for (var nodeId = 0; nodeId < rst.nodes.length; nodeId++) { - var rstOpts = rst.nodes[nodeId].fullOptions; - delete rstOpts.shardsvr; - rst.restart(nodeId, rstOpts); - } - rst.awaitNodesAgreeOnPrimary(); - - jsTest.log("Manually delete the shardIdentity document from " + rst.name); - // Use writeConcern: { w: majority } so that the write cannot be lost when the shard is - // restarted again with --shardsvr. - assert.writeOK(rst.getPrimary() - .getDB("admin") - .getCollection("system.version") - .remove({"_id": "shardIdentity"}, {writeConcern: {w: "majority"}})); - - jsTest.log("Manually unset the state field from " + rst.name + "'s entry in config.shards"); - // Use writeConcern: { w: majority } so that the write cannot be rolled back when the - // current primary is stepped down. - assert.writeOK(st.s.getDB("config").getCollection("shards").update( - {"_id": rst.name}, {$unset: {"state": ""}}, {writeConcern: {w: "majority"}})); - - // Make sure shardIdentity delete replicated to all nodes before restarting them with - // --shardsvr since if they try to replicate that delete while runnning with --shardsvr - // they will crash. - rst.awaitReplication(); - jsTest.log("Restart " + rst.name + - " with --shardsvr to allow initializing its sharding awareness"); - for (var nodeId = 0; nodeId < rst.nodes.length; nodeId++) { - var rstOpts = rst.nodes[nodeId].fullOptions; - rstOpts.shardsvr = ""; - rst.restart(nodeId, rstOpts); - } - rst.awaitNodesAgreeOnPrimary(); - } - - jsTest.log("Step down the primary config server"); - // Step down the primary config server so that the newly elected primary performs sharding - // initialization on shards not marked as shard aware. - assert.throws(function() { - st.configRS.getPrimary().getDB("admin").runCommand({replSetStepDown: 10}); - }); - - jsTest.log("Wait for a new primary config server to be elected."); - st.configRS.awaitNodesAgreeOnPrimary(); - - for (var i = 0; i < numShards; i++) { - var rst = st["rs" + i]; - jsTest.log("Assert that shard " + rst.name + - " became sharding aware and marked as sharding aware in config.shards again"); - waitUntilShardingStateInitialized( - rst.getPrimary(), st.configRS.getURL(), rst.name, clusterId); - waitUntilShardMarkedAsShardAware(st.s, rst.name); - } - - st.stop(); - -})(); diff --git a/jstests/sharding/shard_aware_on_set_shard_version.js b/jstests/sharding/shard_aware_on_set_shard_version.js deleted file mode 100644 index 94d7e081097..00000000000 --- a/jstests/sharding/shard_aware_on_set_shard_version.js +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Tests the correctness of sharding initialization through setShardVersion. - * - * Though sharding initialization is typically done: - * - * 1) when the config server inserts the shardIdentity document on a new shard, or - * 2) when the shard starts up with a shardIdentity document already on disk - * - * the initialization may be done through setShardVersion if a sharded connection from a mongos or - * config is made to the new shard before the shardIdentity insert triggers sharding initialization. - */ -(function() { - 'use strict'; - - // Prevent a config primary from upserting the shardIdentity document into the shards by using - // the dontUpsertShardIdentityOnNewShards failpoint. - var st = new ShardingTest({ - shards: 1, - mongos: 1, - other: { - rs: true, - rsOptions: {nodes: 1}, - configOptions: { - setParameter: - {"failpoint.dontUpsertShardIdentityOnNewShards": "{'mode':'alwaysOn'}"} - } - } - }); - - st.configRS.awaitReplication(); - var configVersion = st.s.getDB('config').getCollection('version').findOne(); - assert.neq(null, configVersion); - var clusterId = configVersion.clusterId; - assert.neq(null, clusterId); - - // The balancer, even when disabled, initiates a sharded connection to each new shard through - // its periodic check that no shards' process OIDs clash. Expect that this check will send - // setShardVersion and trigger sharding initialization on the new shard soon. - var fiveMinutes = 30000; - assert.soon(function() { - var res = st.rs0.getPrimary().adminCommand({shardingState: 1}); - assert.commandWorked(res); - if (res.enabled) { - // If sharding state was initialized, make sure all fields are correct. Note, the - // clusterId field is not initialized through setShardVersion. - return (st.configRS.getURL() === res.configServer) && (st.rs0.name === res.shardName) && - (!clusterId.equals(res.clusterId)); - } else { - return false; - } - }, "Shard failed to initialize sharding awareness after being added as a shard", fiveMinutes); - - // Assert that the shardIdentity document was not somehow inserted on the shard, triggering - // sharding initialization unexpectedly. - var res = st.rs0.getPrimary().getDB("admin").getCollection("system.version").findOne({ - _id: "shardIdentity" - }); - assert.eq(null, res); - - st.stop(); - -})(); diff --git a/jstests/sharding/ssv_config_check.js b/jstests/sharding/ssv_config_check.js index b909f98f1f6..969e915aaea 100644 --- a/jstests/sharding/ssv_config_check.js +++ b/jstests/sharding/ssv_config_check.js @@ -11,7 +11,6 @@ testDB.adminCommand({enableSharding: 'test'}); testDB.adminCommand({shardCollection: 'test.user', key: {x: 1}}); - // Initialize version on shard. testDB.user.insert({x: 1}); var directConn = new Mongo(st.d0.host); @@ -22,6 +21,7 @@ var shardDoc = st.s.getDB('config').shards.findOne(); + jsTest.log("Verify that the obsolete init form of setShardVersion succeeds on shards."); assert.commandWorked(adminDB.runCommand({ setShardVersion: '', init: true, @@ -31,14 +31,18 @@ shardHost: shardDoc.host })); - assert.commandFailed(adminDB.runCommand({ + var configAdmin = st.c0.getDB('admin'); + + jsTest.log("Verify that setShardVersion fails on the config server"); + // Even if shardName sent is 'config' and connstring sent is config server's actual connstring. + assert.commandFailedWithCode(configAdmin.runCommand({ setShardVersion: '', init: true, authoritative: true, - configdb: 'bad-rs/local:12,local:34', - shard: shardDoc._id, - shardHost: shardDoc.host - })); + configdb: configStr, + shard: 'config' + }), + ErrorCodes.NoShardingEnabled); st.stop(); })(); diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index efedba69baa..3e7e489bf03 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -200,6 +200,7 @@ error_code("ReplicaSetMonitorRemoved", 199) error_code("ChunkRangeCleanupPending", 200) error_code("CannotBuildIndexKeys", 201) error_code("NetworkInterfaceExceededTimeLimit", 202) +error_code("ShardingStateNotInitialized", 203) # Non-sequential error codes (for compatibility only) error_code("SocketException", 9001) diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp index 40733d822b1..7b5714b95e0 100644 --- a/src/mongo/db/commands/dbcommands.cpp +++ b/src/mongo/db/commands/dbcommands.cpp @@ -1532,7 +1532,6 @@ void mongo::execCommandDatabase(OperationContext* txn, } } - if (command->adminOnly()) { LOG(2) << "command: " << request.getCommandName(); } @@ -1564,39 +1563,17 @@ void mongo::execCommandDatabase(OperationContext* txn, // Operations are only versioned against the primary. We also make sure not to redo shard // version handling if this command was issued via the direct client. if (iAmPrimary && !txn->getClient()->isInDirectClient()) { - // Handle shard version and config optime information that may have been sent along with - // the command. - auto& oss = OperationShardingState::get(txn); - + // Handle a shard version that may have been sent along with the command. auto commandNS = NamespaceString(command->parseNs(dbname, request.getCommandArgs())); + auto& oss = OperationShardingState::get(txn); oss.initializeShardVersion(commandNS, extractedFields[kShardVersionFieldIdx]); - auto shardingState = ShardingState::get(txn); - if (oss.hasShardVersion()) { - if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) { - uassertStatusOK( - {ErrorCodes::NoShardingEnabled, - "Cannot accept sharding commands if not started with --shardsvr"}); - } else if (!shardingState->enabled()) { - // TODO(esha): Once 3.4 ships, we no longer need to support initializing - // sharding awareness through commands, so just reject all sharding commands. - if (!shardingState->commandInitializesShardingAwareness( - request.getCommandName().toString())) { - uassertStatusOK({ErrorCodes::NoShardingEnabled, - str::stream() - << "Received a command with sharding chunk version " - "information but this node is not sharding aware: " - << request.getCommandArgs().jsonString()}); - } - } + uassertStatusOK(shardingState->canAcceptShardedCommands()); } - if (shardingState->enabled()) { - // TODO(spencer): Do this unconditionally once all nodes are sharding aware - // by default. - uassertStatusOK(shardingState->updateConfigServerOpTimeFromMetadata(txn)); - } + // Handle config optime information that may have been sent along with the command. + uassertStatusOK(shardingState->updateConfigServerOpTimeFromMetadata(txn)); } // Can throw diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 3ff852d0372..c1267979332 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -773,16 +773,6 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook fassertStatusOK(40217, status); } - // For upgrade from 3.2 to 3.4, check if any shards in config.shards are not yet marked as - // shard aware, and attempt to initialize sharding awareness on them. - auto shardAwareInitializationStatus = - Grid::get(txn)->catalogManager()->initializeShardingAwarenessOnUnawareShards(txn); - if (!shardAwareInitializationStatus.isOK()) { - warning() << "Error while attempting to initialize sharding awareness on sharding " - "unaware shards " - << causedBy(shardAwareInitializationStatus); - } - // Free any leftover locks from previous instantiations. auto distLockManager = Grid::get(txn)->catalogClient(txn)->getDistLockManager(); distLockManager->unlockAll(txn, distLockManager->getProcessID()); diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index ce015fa245c..e10d654d4ef 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -81,51 +81,6 @@ private: const ShardIdentityType _shardIdentity; }; -/** - * Used by the config server for backwards compatibility with 3.2 mongos to upsert a shardIdentity - * document (and thereby perform shard aware initialization) on a newly added shard. - * - * Warning: Only a config server primary should perform this upsert. Callers should ensure that - * they are primary before registering this RecoveryUnit. - */ -class LegacyAddShardLogOpHandler final : public RecoveryUnit::Change { -public: - LegacyAddShardLogOpHandler(OperationContext* txn, ShardType shardType) - : _txn(txn), _shardType(std::move(shardType)) {} - - void commit() override { - uassertStatusOK( - Grid::get(_txn)->catalogManager()->upsertShardIdentityOnShard(_txn, _shardType)); - } - - void rollback() override {} - -private: - OperationContext* _txn; - const ShardType _shardType; -}; - -/** - * Used by the config server for backwards compatibility. Cancels a pending addShard task (if there - * is one) for the shard with id shardId that was initiated by catching the insert to config.shards - * from a 3.2 mongos doing addShard. - */ -class RemoveShardLogOpHandler final : public RecoveryUnit::Change { -public: - RemoveShardLogOpHandler(OperationContext* txn, ShardId shardId) - : _txn(txn), _shardId(std::move(shardId)) {} - - void commit() override { - Grid::get(_txn)->catalogManager()->cancelAddShardTaskIfNeeded(_shardId); - } - - void rollback() override {} - -private: - OperationContext* _txn; - const ShardId _shardId; -}; - } // unnamed namespace CollectionShardingState::CollectionShardingState(ServiceContext* sc, NamespaceString nss) @@ -243,22 +198,6 @@ void CollectionShardingState::onInsertOp(OperationContext* txn, const BSONObj& i } } - // For backwards compatibility with 3.2 mongos, perform share aware initialization on a newly - // added shard on inserts to config.shards missing the "state" field. (On addShard, a 3.2 - // mongos performs the insert into config.shards without a "state" field.) - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer && - _nss == ShardType::ConfigNS) { - // Only the primary should complete the addShard process by upserting the shardIdentity on - // the new shard. This guards against inserts on non-primaries due to oplog application in - // steady state, rollback, or recovering. - if (repl::getGlobalReplicationCoordinator()->getMemberState().primary() && - insertedDoc[ShardType::state.name()].eoo()) { - const auto shardType = uassertStatusOK(ShardType::fromBSON(insertedDoc)); - txn->recoveryUnit()->registerChange( - new LegacyAddShardLogOpHandler(txn, std::move(shardType))); - } - } - checkShardVersionOrThrow(txn); if (_sourceMgr) { @@ -299,19 +238,7 @@ void CollectionShardingState::onDeleteOp(OperationContext* txn, } if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - if (_nss == ShardType::ConfigNS) { - // For backwards compatibility, cancel a pending asynchronous addShard task created on - // the primary config as a result of a 3.2 mongos doing addShard for the shard with id - // deletedDocId. - BSONElement idElement = deleteState.idDoc["_id"]; - invariant(!idElement.eoo()); - auto shardIdStr = idElement.valuestrsafe(); - // Though the asynchronous addShard task should only be started on a primary, we - // should cancel a pending addShard task (if one exists for this shardId) even while - // non-primary, since it guarantees we cleanup any pending tasks on stepdown. - txn->recoveryUnit()->registerChange( - new RemoveShardLogOpHandler(txn, ShardId(std::move(shardIdStr)))); - } else if (_nss == VersionType::ConfigNS) { + if (_nss == VersionType::ConfigNS) { if (!repl::ReplicationCoordinator::get(txn)->getMemberState().rollback()) { uasserted(40302, "cannot delete config.version document while in --configsvr mode"); } else { diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp index e7728f27560..242c9d5fc7f 100644 --- a/src/mongo/db/s/merge_chunks_command.cpp +++ b/src/mongo/db/s/merge_chunks_command.cpp @@ -325,8 +325,7 @@ public: void help(stringstream& h) const override { h << "Merge Chunks command\n" << "usage: { mergeChunks : <ns>, bounds : [ <min key>, <max key> ]," - << " (opt) epoch : <epoch>, (opt) config : <configdb string>," - << " (opt) shardName : <shard name> }"; + << " (opt) epoch : <epoch> }"; } Status checkAuthForCommand(Client* client, @@ -357,11 +356,9 @@ public: // Required static BSONField<string> nsField; static BSONField<vector<BSONObj>> boundsField; + // Optional, if the merge is only valid for a particular epoch static BSONField<OID> epochField; - // Optional, if our sharding state has not previously been initializeed - static BSONField<string> shardNameField; - static BSONField<string> configField; bool run(OperationContext* txn, const string& dbname, @@ -369,6 +366,8 @@ public: int, string& errmsg, BSONObjBuilder& result) override { + uassertStatusOK(ShardingState::get(txn)->canAcceptShardedCommands()); + string ns = parseNs(dbname, cmdObj); if (ns.size() == 0) { @@ -404,38 +403,7 @@ public: return false; } - // - // This might be the first call from mongos, so we may need to pass the config and shard - // information to initialize the sharding state. - // - - ShardingState* gss = ShardingState::get(txn); - if (!gss->enabled()) { - string configConnString; - FieldParser::FieldState extracted = - FieldParser::extract(cmdObj, configField, &configConnString, &errmsg); - if (!extracted || extracted == FieldParser::FIELD_NONE) { - errmsg = - "sharding state must be enabled or " - "config server specified to merge chunks"; - return false; - } - - string shardName; - extracted = FieldParser::extract(cmdObj, shardNameField, &shardName, &errmsg); - if (!extracted) { - errmsg = - "shard name must be specified to merge chunks if sharding state not enabled"; - return false; - } - - gss->initializeFromConfigConnString(txn, configConnString, shardName); - } - - // // Epoch is optional, and if not set indicates we should use the latest epoch - // - OID epoch; if (!FieldParser::extract(cmdObj, epochField, &epoch, &errmsg)) { return false; @@ -448,9 +416,6 @@ public: BSONField<string> MergeChunksCommand::nsField("mergeChunks"); BSONField<vector<BSONObj>> MergeChunksCommand::boundsField("bounds"); - -BSONField<string> MergeChunksCommand::configField("config"); -BSONField<string> MergeChunksCommand::shardNameField("shardName"); BSONField<OID> MergeChunksCommand::epochField("epoch"); } // namespace diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp index a20db670259..c55a317e99f 100644 --- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp +++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp @@ -91,27 +91,12 @@ public: int, string& errmsg, BSONObjBuilder& result) { - ShardingState* const shardingState = ShardingState::get(txn); + auto shardingState = ShardingState::get(txn); + uassertStatusOK(shardingState->canAcceptShardedCommands()); const ShardId toShard(cmdObj["toShardName"].String()); const ShardId fromShard(cmdObj["fromShardName"].String()); - if (!shardingState->enabled()) { - if (!cmdObj["configServer"].eoo()) { - dassert(cmdObj["configServer"].type() == String); - shardingState->initializeFromConfigConnString( - txn, cmdObj["configServer"].String(), toShard.toString()); - } else { - errmsg = str::stream() - << "cannot start recv'ing chunk, " - << "sharding is not enabled and no config server was provided"; - - warning() << errmsg; - return false; - } - } - - const NamespaceString nss(cmdObj.firstElement().String()); const auto chunkRange = uassertStatusOK(ChunkRange::fromBSON(cmdObj)); diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp index d00b54b80c4..03a03c285bb 100644 --- a/src/mongo/db/s/move_chunk_command.cpp +++ b/src/mongo/db/s/move_chunk_command.cpp @@ -116,18 +116,12 @@ public: int options, string& errmsg, BSONObjBuilder& result) override { + auto shardingState = ShardingState::get(txn); + uassertStatusOK(shardingState->canAcceptShardedCommands()); + const MoveChunkRequest moveChunkRequest = uassertStatusOK( MoveChunkRequest::createFromCommand(NamespaceString(parseNs(dbname, cmdObj)), cmdObj)); - ShardingState* const shardingState = ShardingState::get(txn); - - if (!shardingState->enabled()) { - shardingState->initializeFromConfigConnString( - txn, - moveChunkRequest.getConfigServerCS().toString(), - moveChunkRequest.getFromShardId().toString()); - } - // Make sure we're as up-to-date as possible with shard information. This catches the case // where we might have changed a shard's host by removing/adding a shard with the same name. grid.shardRegistry()->reload(txn); diff --git a/src/mongo/db/s/set_shard_version_command.cpp b/src/mongo/db/s/set_shard_version_command.cpp index 0427aa9f81c..6261ebc0481 100644 --- a/src/mongo/db/s/set_shard_version_command.cpp +++ b/src/mongo/db/s/set_shard_version_command.cpp @@ -94,24 +94,43 @@ public: int options, string& errmsg, BSONObjBuilder& result) { - - if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) { - uassertStatusOK({ErrorCodes::NoShardingEnabled, - "Cannot accept sharding commands if not started with --shardsvr"}); - } + auto shardingState = ShardingState::get(txn); + uassertStatusOK(shardingState->canAcceptShardedCommands()); // Steps - // 1. check basic config - // 2. extract params from command - // 3. fast check - // 4. slow check (LOCKS) + // 1. As long as the command does not have noConnectionVersioning set, register a + // ShardedConnectionInfo for this client connection (this is for clients using + // ShardConnection). Registering the ShardedConnectionInfo guarantees that we will check + // the shardVersion on all requests from this client connection. The connection's version + // will be updated on each subsequent setShardVersion sent on this connection. + // + // 2. If we have received the init form of setShardVersion, vacuously return true. + // The init form of setShardVersion was used to initialize sharding awareness on a shard, + // but was made obsolete in v3.4 by making nodes sharding-aware when they are added to a + // cluster. The init form was kept in v3.4 shards for compatibility with mixed-version + // 3.2/3.4 clusters, but was deprecated and made to vacuously return true in v3.6. + // + // 3. Validate all command parameters against the info in our ShardingState, and return an + // error if they do not match. + // + // 4. If the sent shardVersion is compatible with our shardVersion, update the shardVersion + // in this client's ShardedConnectionInfo if needed. + // + // 5. If the sent shardVersion indicates a drop, jump to step 7. + // + // 6. If the sent shardVersion is staler than ours, return a stale config error. + // + // 7. If the sent shardVersion is newer than ours (or indicates a drop), reload our metadata + // and compare the sent shardVersion with what we reloaded. If the versions are now + // compatible, update the shardVersion in this client's ShardedConnectionInfo, as in + // step 4. If the sent shardVersion is staler than what we reloaded, return a stale + // config error, as in step 6. // Step 1 + Client* client = txn->getClient(); LastError::get(client).disable(); - ShardingState* shardingState = ShardingState::get(txn); - const bool authoritative = cmdObj.getBoolField("authoritative"); const bool noConnectionVersioning = cmdObj.getBoolField("noConnectionVersioning"); @@ -123,51 +142,74 @@ public: info = ShardedConnectionInfo::get(client, true); } - const auto configDBStr = cmdObj["configdb"].str(); - string shardName = cmdObj["shard"].str(); - const auto isInit = cmdObj["init"].trueValue(); + // Step 2 - const string ns = cmdObj["setShardVersion"].valuestrsafe(); - if (shardName.empty()) { - if (isInit && ns.empty()) { - // Note: v3.0 mongos ConfigCoordinator doesn't set the shard field when sending - // setShardVersion to config. - shardName = "config"; - } else { - errmsg = "shard name cannot be empty if not init"; - return false; - } + // The init form of setShardVersion was deprecated in v3.6. For backwards compatibility with + // pre-v3.6 mongos, return true. + const auto isInit = cmdObj["init"].trueValue(); + if (isInit) { + result.append("initialized", true); + return true; } - if (!_checkConfigOrInit(txn, configDBStr, shardName, authoritative, errmsg, result)) { + // Step 3 + + // Validate shardName parameter. + string shardName = cmdObj["shard"].str(); + auto storedShardName = ShardingState::get(txn)->getShardName(); + uassert(ErrorCodes::BadValue, + str::stream() << "received shardName " << shardName + << " which differs from stored shardName " + << storedShardName, + storedShardName == shardName); + + // Validate config connection string parameter. + + const auto configdb = cmdObj["configdb"].str(); + if (configdb.size() == 0) { + errmsg = "no configdb"; return false; } - // Handle initial shard connection - if (cmdObj["version"].eoo() && isInit) { - result.append("initialized", true); + auto givenConnStrStatus = ConnectionString::parse(configdb); + uassertStatusOK(givenConnStrStatus); - // TODO: SERVER-21397 remove post v3.3. - // Send back wire version to let mongos know what protocol we can speak - result.append("minWireVersion", WireSpec::instance().incoming.minWireVersion); - result.append("maxWireVersion", WireSpec::instance().incoming.maxWireVersion); + const auto& givenConnStr = givenConnStrStatus.getValue(); + if (givenConnStr.type() != ConnectionString::SET) { + errmsg = str::stream() << "given config server string is not of type SET"; + return false; + } - return true; + ConnectionString storedConnStr = ShardingState::get(txn)->getConfigServer(txn); + if (givenConnStr.getSetName() != storedConnStr.getSetName()) { + errmsg = str::stream() + << "given config server set name: " << givenConnStr.getSetName() + << " differs from known set name: " << storedConnStr.getSetName(); + + return false; } + // Validate namespace parameter. + + const string ns = cmdObj["setShardVersion"].valuestrsafe(); if (ns.size() == 0) { errmsg = "need to specify namespace"; return false; } - const NamespaceString nss(ns); - // Backwards compatibility for SERVER-23119 + const NamespaceString nss(ns); if (!nss.isValid()) { warning() << "Invalid namespace used for setShardVersion: " << ns; return true; } + // Validate chunk version parameter. + const ChunkVersion requestedVersion = + uassertStatusOK(ChunkVersion::parseFromBSONForSetShardVersion(cmdObj)); + + // Step 4 + // we can run on a slave up to here if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(nss.db())) { result.append("errmsg", "not master"); @@ -175,11 +217,6 @@ public: return false; } - // step 2 - const ChunkVersion requestedVersion = - uassertStatusOK(ChunkVersion::parseFromBSONForSetShardVersion(cmdObj)); - - // step 3 - Actual version checking const ChunkVersion connectionVersion = info->getVersion(ns); connectionVersion.addToBSON(result, "oldVersion"); @@ -203,12 +240,18 @@ public: if (requestedVersion.isWriteCompatibleWith(collectionShardVersion)) { // mongos and mongod agree! + // Now we should update the connection's version if it's not compatible with the + // request's version. This could happen if the shard's metadata has changed, but + // the remote client has already refreshed its view of the metadata since the last + // time it sent anything over this connection. if (!connectionVersion.isWriteCompatibleWith(requestedVersion)) { + // A migration occurred. if (connectionVersion < collectionShardVersion && connectionVersion.epoch() == collectionShardVersion.epoch()) { info->setVersion(ns, requestedVersion); - } else if (authoritative) { - // this means there was a drop and our version is reset + } + // The collection was dropped and recreated. + else if (authoritative) { info->setVersion(ns, requestedVersion); } else { result.append("ns", ns); @@ -221,8 +264,8 @@ public: return true; } - // step 4 - // Cases below all either return OR fall-through to remote metadata reload. + // Step 5 + const bool isDropRequested = !requestedVersion.isSet() && collectionShardVersion.isSet(); @@ -239,6 +282,8 @@ public: } else { // Not Dropping + // Step 6 + // TODO: Refactor all of this if (requestedVersion < connectionVersion && requestedVersion.epoch() == connectionVersion.epoch()) { @@ -298,6 +343,8 @@ public: } } + // Step 7 + Status status = shardingState->onStaleShardVersion(txn, nss, requestedVersion); { @@ -362,90 +409,6 @@ public: return true; } -private: - /** - * Checks if this server has already been initialized. If yes, then checks that the configdb - * settings matches the initialized settings. Otherwise, initializes the server with the given - * settings. - */ - bool _checkConfigOrInit(OperationContext* txn, - const string& configdb, - const string& shardName, - bool authoritative, - string& errmsg, - BSONObjBuilder& result) { - if (configdb.size() == 0) { - errmsg = "no configdb"; - return false; - } - - auto givenConnStrStatus = ConnectionString::parse(configdb); - if (!givenConnStrStatus.isOK()) { - errmsg = str::stream() << "error parsing given config string: " << configdb - << causedBy(givenConnStrStatus.getStatus()); - return false; - } - - const auto& givenConnStr = givenConnStrStatus.getValue(); - ConnectionString storedConnStr; - - if (shardName == "config") { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (!_configStr.isValid()) { - _configStr = givenConnStr; - return true; - } else { - storedConnStr = _configStr; - } - } else if (ShardingState::get(txn)->enabled()) { - invariant(!_configStr.isValid()); - storedConnStr = ShardingState::get(txn)->getConfigServer(txn); - } - - if (storedConnStr.isValid()) { - if (givenConnStr.type() == ConnectionString::SET && - storedConnStr.type() == ConnectionString::SET) { - if (givenConnStr.getSetName() != storedConnStr.getSetName()) { - errmsg = str::stream() - << "given config server set name: " << givenConnStr.getSetName() - << " differs from known set name: " << storedConnStr.getSetName(); - - return false; - } - - return true; - } - - const auto& storedRawConfigString = storedConnStr.toString(); - if (storedRawConfigString == configdb) { - return true; - } - - result.append("configdb", - BSON("stored" << storedRawConfigString << "given" << configdb)); - - errmsg = str::stream() << "mongos specified a different config database string : " - << "stored : " << storedRawConfigString - << " vs given : " << configdb; - return false; - } - - invariant(shardName != "config"); - - if (!authoritative) { - result.appendBool("need_authoritative", true); - errmsg = "first setShardVersion"; - return false; - } - - ShardingState::get(txn)->initializeFromConfigConnString(txn, configdb, shardName); - return true; - } - - // Only for servers that are running as a config server. - stdx::mutex _mutex; - ConnectionString _configStr; - } setShardVersionCmd; } // namespace diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index 74849ffb860..dda78ad8ae8 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -117,9 +117,6 @@ void updateShardIdentityConfigStringCB(const string& setName, const string& newC } // namespace -const std::set<std::string> ShardingState::_commandsThatInitializeShardingAwareness{ - "_recvChunkStart", "mergeChunks", "moveChunk", "setShardVersion", "splitChunk"}; - ShardingState::ShardingState() : _initializationState(static_cast<uint32_t>(InitializationState::kNew)), _initializationStatus(Status(ErrorCodes::InternalError, "Uninitialized value")), @@ -141,6 +138,19 @@ bool ShardingState::enabled() const { return _getInitializationState() == InitializationState::kInitialized; } +Status ShardingState::canAcceptShardedCommands() const { + if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) { + return {ErrorCodes::NoShardingEnabled, + "Cannot accept sharding commands if not started with --shardsvr"}; + } else if (!enabled()) { + return {ErrorCodes::ShardingStateNotInitialized, + "Cannot accept sharding commands if sharding state has not " + "been initialized with a shardIdentity document"}; + } else { + return Status::OK(); + } +} + ConnectionString ShardingState::getConfigServer(OperationContext* txn) { invariant(enabled()); stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -154,37 +164,16 @@ string ShardingState::getShardName() { } void ShardingState::shutDown(OperationContext* txn) { - bool mustEnterShutdownState = false; - - { - stdx::unique_lock<stdx::mutex> lk(_mutex); - - while (_getInitializationState() == InitializationState::kInitializing) { - _initializationFinishedCondition.wait(lk); - } - - if (_getInitializationState() == InitializationState::kNew) { - _setInitializationState_inlock(InitializationState::kInitializing); - mustEnterShutdownState = true; - } - } - - // Initialization completion must be signalled outside of the mutex - if (mustEnterShutdownState) { - _signalInitializationComplete( - Status(ErrorCodes::ShutdownInProgress, - "Sharding state unavailable because the system is shutting down")); - } - - if (_getInitializationState() == InitializationState::kInitialized) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (enabled()) { grid.getExecutorPool()->shutdownAndJoin(); grid.catalogClient(txn)->shutDown(txn); } } Status ShardingState::updateConfigServerOpTimeFromMetadata(OperationContext* txn) { - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - // Nothing to do if we're a config server ourselves. + if (!enabled()) { + // Nothing to do if sharding state has not been initialized. return Status::OK(); } @@ -298,33 +287,6 @@ Status ShardingState::refreshMetadataNow(OperationContext* txn, return Status::OK(); } -void ShardingState::initializeFromConfigConnString(OperationContext* txn, - const string& configSvr, - const string shardName) { - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - if (_getInitializationState() == InitializationState::kNew) { - uassert(18509, - "Unable to obtain host name during sharding initialization.", - !getHostName().empty()); - - ConnectionString configSvrConnStr = uassertStatusOK(ConnectionString::parse(configSvr)); - - _setInitializationState_inlock(InitializationState::kInitializing); - - stdx::thread thread([this, configSvrConnStr, shardName] { - _initializeImpl(configSvrConnStr, shardName); - }); - thread.detach(); - } - } - - uassertStatusOK(_waitForInitialization(txn->getDeadline())); - uassertStatusOK(reloadShardRegistryUntilSuccess(txn)); - uassertStatusOK(updateConfigServerOpTimeFromMetadata(txn)); -} - // NOTE: This method can be called inside a database lock so it should never take any database // locks, perform I/O, or any long running operations. Status ShardingState::initializeFromShardIdentity(OperationContext* txn, @@ -344,168 +306,57 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* txn, stdx::unique_lock<stdx::mutex> lk(_mutex); - // TODO: remove after v3.4. - // This is for backwards compatibility with old style initialization through metadata - // commands/setShardVersion, which can happen concurrently with an insert of a - // shardIdentity document to admin.system.version. - if (_getInitializationState() == InitializationState::kInitializing) { - auto waitStatus = _waitForInitialization_inlock(Date_t::max(), lk); - if (!waitStatus.isOK()) { - return waitStatus; - } - } - - if (_getInitializationState() == InitializationState::kError) { - return {ErrorCodes::ManualInterventionRequired, - str::stream() << "Server's sharding metadata manager failed to initialize and will " - "remain in this state until the instance is manually reset" - << causedBy(_initializationStatus)}; - } - auto configSvrConnStr = shardIdentity.getConfigsvrConnString(); - // TODO: remove after v3.4. - // This is for backwards compatibility with old style initialization through metadata - // commands/setShardVersion, which sets the shardName and configsvrConnectionString. - if (_getInitializationState() == InitializationState::kInitialized) { - if (_shardName != shardIdentity.getShardName()) { - return {ErrorCodes::InconsistentShardIdentity, - str::stream() << "shard name previously set as " << _shardName - << " is different from stored: " - << shardIdentity.getShardName()}; - } + if (enabled()) { + invariant(!_shardName.empty()); + fassert(40372, _shardName == shardIdentity.getShardName()); auto prevConfigsvrConnStr = grid.shardRegistry()->getConfigServerConnectionString(); - if (prevConfigsvrConnStr.type() != ConnectionString::SET) { - return {ErrorCodes::UnsupportedFormat, - str::stream() << "config server connection string was previously initialized as" - " something that is not a replica set: " - << prevConfigsvrConnStr.toString()}; - } + invariant(prevConfigsvrConnStr.type() == ConnectionString::SET); + fassert(40373, prevConfigsvrConnStr.getSetName() == configSvrConnStr.getSetName()); - if (prevConfigsvrConnStr.getSetName() != configSvrConnStr.getSetName()) { - return {ErrorCodes::InconsistentShardIdentity, - str::stream() << "config server connection string previously set as " - << prevConfigsvrConnStr.toString() - << " is different from stored: " - << configSvrConnStr.toString()}; - } - - // The clusterId will only be unset if sharding state was initialized via the sharding - // metadata commands. - if (!_clusterId.isSet()) { - _clusterId = shardIdentity.getClusterId(); - } else if (_clusterId != shardIdentity.getClusterId()) { - return {ErrorCodes::InconsistentShardIdentity, - str::stream() << "cluster id previously set as " << _clusterId - << " is different from stored: " - << shardIdentity.getClusterId()}; - } + invariant(_clusterId.isSet()); + fassert(40374, _clusterId == shardIdentity.getClusterId()); return Status::OK(); } - if (_getInitializationState() == InitializationState::kNew) { - ShardedConnectionInfo::addHook(); - - try { - Status status = _globalInit(txn, configSvrConnStr, generateDistLockProcessId(txn)); - - // TODO: remove after v3.4. - // This is for backwards compatibility with old style initialization through metadata - // commands/setShardVersion, which can happen concurrently with an insert of a - // shardIdentity document to admin.system.version. - if (status.isOK()) { - _setInitializationState_inlock(InitializationState::kInitialized); - ReplicaSetMonitor::setSynchronousConfigChangeHook( - &ShardRegistry::replicaSetChangeShardRegistryUpdateHook); - ReplicaSetMonitor::setAsynchronousConfigChangeHook( - &updateShardIdentityConfigStringCB); - } else { - _initializationStatus = status; - _setInitializationState_inlock(InitializationState::kError); - } - - _shardName = shardIdentity.getShardName(); - _clusterId = shardIdentity.getClusterId(); - - _initializeRangeDeleterTaskExecutor(); - - return status; - } catch (const DBException& ex) { - auto errorStatus = ex.toStatus(); - _setInitializationState_inlock(InitializationState::kError); - _initializationStatus = errorStatus; - return errorStatus; - } + if (_getInitializationState() == InitializationState::kError) { + return {ErrorCodes::ManualInterventionRequired, + str::stream() << "Server's sharding metadata manager failed to initialize and will " + "remain in this state until the instance is manually reset" + << causedBy(_initializationStatus)}; } - MONGO_UNREACHABLE; -} - -void ShardingState::_initializeImpl(ConnectionString configSvr, string shardName) { - Client::initThread("ShardingState initialization"); - auto txn = cc().makeOperationContext(); - - // Do this initialization outside of the lock, since we are already protected by having entered - // the kInitializing state. ShardedConnectionInfo::addHook(); try { - Status status = _globalInit(txn.get(), configSvr, generateDistLockProcessId(txn.get())); - + Status status = _globalInit(txn, configSvrConnStr, generateDistLockProcessId(txn)); if (status.isOK()) { + log() << "initialized sharding components"; + _setInitializationState(InitializationState::kInitialized); ReplicaSetMonitor::setSynchronousConfigChangeHook( &ShardRegistry::replicaSetChangeShardRegistryUpdateHook); ReplicaSetMonitor::setAsynchronousConfigChangeHook(&updateShardIdentityConfigStringCB); + _setInitializationState(InitializationState::kInitialized); - _initializeRangeDeleterTaskExecutor(); - - _shardName = shardName; + } else { + log() << "failed to initialize sharding components" << causedBy(status); + _initializationStatus = status; + _setInitializationState(InitializationState::kError); } + _shardName = shardIdentity.getShardName(); + _clusterId = shardIdentity.getClusterId(); - _signalInitializationComplete(status); + _initializeRangeDeleterTaskExecutor(); + return status; } catch (const DBException& ex) { - _signalInitializationComplete(ex.toStatus()); - } -} - -Status ShardingState::_waitForInitialization(Date_t deadline) { - if (enabled()) - return Status::OK(); - - stdx::unique_lock<stdx::mutex> lk(_mutex); - return _waitForInitialization_inlock(deadline, lk); -} - -Status ShardingState::_waitForInitialization_inlock(Date_t deadline, - stdx::unique_lock<stdx::mutex>& lk) { - { - while (_getInitializationState() == InitializationState::kInitializing || - _getInitializationState() == InitializationState::kNew) { - if (deadline == Date_t::max()) { - _initializationFinishedCondition.wait(lk); - } else if (stdx::cv_status::timeout == - _initializationFinishedCondition.wait_until(lk, - deadline.toSystemTimePoint())) { - return Status(ErrorCodes::ExceededTimeLimit, - "Initializing sharding state exceeded time limit"); - } - } - } - - auto initializationState = _getInitializationState(); - if (initializationState == InitializationState::kInitialized) { - fassertStatusOK(34349, _initializationStatus); - return Status::OK(); - } - if (initializationState == InitializationState::kError) { - return Status(ErrorCodes::ManualInterventionRequired, - str::stream() - << "Server's sharding metadata manager failed to initialize and will " - "remain in this state until the instance is manually reset" - << causedBy(_initializationStatus)); + auto errorStatus = ex.toStatus(); + _initializationStatus = errorStatus; + _setInitializationState(InitializationState::kError); + return errorStatus; } MONGO_UNREACHABLE; @@ -515,26 +366,10 @@ ShardingState::InitializationState ShardingState::_getInitializationState() cons return static_cast<InitializationState>(_initializationState.load()); } -void ShardingState::_setInitializationState_inlock(InitializationState newState) { +void ShardingState::_setInitializationState(InitializationState newState) { _initializationState.store(static_cast<uint32_t>(newState)); } -void ShardingState::_signalInitializationComplete(Status status) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - invariant(_getInitializationState() == InitializationState::kInitializing); - - if (!status.isOK()) { - _initializationStatus = status; - _setInitializationState_inlock(InitializationState::kError); - } else { - _initializationStatus = Status::OK(); - _setInitializationState_inlock(InitializationState::kInitialized); - } - - _initializationFinishedCondition.notify_all(); -} - StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationContext* txn) { // In sharded readOnly mode, we ignore the shardIdentity document on disk and instead *require* // a shardIdentity document to be passed through --overrideShardIdentity. @@ -587,18 +422,19 @@ StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationCon // Load the shardIdentity document from disk. invariant(!txn->lockState()->isLocked()); BSONObj shardIdentityBSON; + bool foundShardIdentity = false; try { AutoGetCollection autoColl(txn, NamespaceString::kConfigCollectionNamespace, MODE_IS); - Helpers::findOne(txn, - autoColl.getCollection(), - BSON("_id" << ShardIdentityType::IdName), - shardIdentityBSON); + foundShardIdentity = Helpers::findOne(txn, + autoColl.getCollection(), + BSON("_id" << ShardIdentityType::IdName), + shardIdentityBSON); } catch (const DBException& ex) { return ex.toStatus(); } if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { - if (shardIdentityBSON.isEmpty()) { + if (!foundShardIdentity) { warning() << "Started with --shardsvr, but no shardIdentity document was found on " "disk in " << NamespaceString::kConfigCollectionNamespace @@ -606,6 +442,9 @@ StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationCon "sharded cluster."; return false; } + + invariant(!shardIdentityBSON.isEmpty()); + auto swShardIdentity = ShardIdentityType::fromBSON(shardIdentityBSON); if (!swShardIdentity.isOK()) { return swShardIdentity.getStatus(); @@ -632,11 +471,7 @@ StatusWith<ChunkVersion> ShardingState::_refreshMetadata( OperationContext* txn, const NamespaceString& nss, const CollectionMetadata* metadataForDiff) { invariant(!txn->lockState()->isLocked()); - { - Status status = _waitForInitialization(txn->getDeadline()); - if (!status.isOK()) - return status; - } + invariant(enabled()); // We can't reload if a shard name has not yet been set { diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index 8481c282bed..6686dc8deca 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -93,8 +93,24 @@ public: static ShardingState* get(ServiceContext* serviceContext); static ShardingState* get(OperationContext* operationContext); + /** + * Returns true if ShardingState has been successfully initialized. + * + * Code that needs to perform extra actions if sharding is initialized, but does not need to + * error if not, should use this. Alternatively, see ShardingState::canAcceptShardedCommands(). + */ bool enabled() const; + /** + * Returns Status::OK if the ShardingState is enabled; if not, returns an error describing + * whether the ShardingState is just not yet initialized, or if this shard is not running with + * --shardsvr at all. + * + * Code that should error if sharding state has not been initialized should use this to report + * a more descriptive error. Alternatively, see ShardingState::enabled(). + */ + Status canAcceptShardedCommands() const; + ConnectionString getConfigServer(OperationContext* txn); std::string getShardName(); @@ -104,22 +120,6 @@ public: } /** - * Initializes sharding state and begins authenticating outgoing connections and handling shard - * versions. If this is not run before sharded operations occur auth will not work and versions - * will not be tracked. This method is deprecated and is mainly used for initialization from - * mongos metadata commands like moveChunk, splitChunk, mergeChunk and setShardVersion. - * - * Throws if initialization fails for any reason and the sharding state object becomes unusable - * afterwards. Any sharding state operations afterwards will fail. - * - * Note that this will also try to connect to the config servers and will block until it - * succeeds. - */ - void initializeFromConfigConnString(OperationContext* txn, - const std::string& configSvr, - const std::string shardName); - - /** * Initializes the sharding state of this server from the shard identity document argument. */ Status initializeFromShardIdentity(OperationContext* txn, @@ -268,16 +268,6 @@ public: */ StatusWith<bool> initializeShardingAwarenessIfNeeded(OperationContext* txn); - /** - * Check if a command is one of the whitelisted commands that can be accepted with shardVersion - * information before this node is sharding aware, because the command initializes sharding - * awareness. - */ - static bool commandInitializesShardingAwareness(const std::string& commandName) { - return _commandsThatInitializeShardingAwareness.find(commandName) != - _commandsThatInitializeShardingAwareness.end(); - } - private: // Map from a namespace into the sharding state for each collection we have typedef stdx::unordered_map<std::string, std::unique_ptr<CollectionShardingState>> @@ -291,10 +281,6 @@ private: // recovey document is found or stay in it until initialize has been called. kNew, - // The sharding state has been recovered (or doesn't need to be recovered) and the catalog - // manager is currently being initialized by one of the threads. - kInitializing, - // Sharding state is fully usable. kInitialized, @@ -304,44 +290,14 @@ private: }; /** - * Initializes the sharding infrastructure (connection hook, catalog manager, etc) and - * optionally recovers its minimum optime. Must not be called while holding the sharding state - * mutex. - * - * Doesn't throw, only updates the initialization state variables. - * - * Runs in a new thread so that if all config servers are down initialization can continue - * retrying in the background even if the operation that kicked off the initialization has - * terminated. - * - * @param configSvr Connection string of the config server to use. - * @param shardName the name of the shard in config.shards - */ - void _initializeImpl(ConnectionString configSvr, std::string shardName); - - /** - * Must be called only when the current state is kInitializing. Sets the current state to - * kInitialized if the status is OK or to kError otherwise. - */ - void _signalInitializationComplete(Status status); - - /** - * Blocking method, which waits for the initialization state to become kInitialized or kError - * and returns the initialization status. - */ - Status _waitForInitialization(Date_t deadline); - Status _waitForInitialization_inlock(Date_t deadline, stdx::unique_lock<stdx::mutex>& lk); - - /** - * Simple wrapper to cast the initialization state atomic uint64 to InitializationState value - * without doing any locking. + * Returns the initialization state. */ InitializationState _getInitializationState() const; /** - * Updates the initialization state. Must be called while holding _mutex. + * Updates the initialization state. */ - void _setInitializationState_inlock(InitializationState newState); + void _setInitializationState(InitializationState newState); /** * Refreshes collection metadata by asking the config server for the latest information and @@ -390,10 +346,6 @@ private: // The id for the cluster this shard belongs to. OID _clusterId; - // A whitelist of sharding commands that are allowed when running with --shardsvr but not yet - // shard aware, because they initialize sharding awareness. - static const std::set<std::string> _commandsThatInitializeShardingAwareness; - // Function for initializing the external sharding state components not owned here. GlobalInitFunc _globalInit; diff --git a/src/mongo/db/s/sharding_state_recovery.cpp b/src/mongo/db/s/sharding_state_recovery.cpp index eedb9d1ea6b..7f17b748a90 100644 --- a/src/mongo/db/s/sharding_state_recovery.cpp +++ b/src/mongo/db/s/sharding_state_recovery.cpp @@ -270,17 +270,8 @@ Status ShardingStateRecovery::recover(OperationContext* txn) { log() << "Sharding state recovery process found document " << redact(recoveryDoc.toBSON()); - // Make sure the sharding state is initialized ShardingState* const shardingState = ShardingState::get(txn); - - // For backwards compatibility. Shards added by v3.4 cluster should have been initialized by - // the shard identity document. - // TODO(SERER-25276): Remove this after 3.4 since 3.4 shards should always have ShardingState - // initialized by this point. - if (!shardingState->enabled()) { - shardingState->initializeFromConfigConnString( - txn, recoveryDoc.getConfigsvr().toString(), recoveryDoc.getShardName()); - } + invariant(shardingState->enabled()); if (!recoveryDoc.getMinOpTimeUpdaters()) { // Treat the minOpTime as up-to-date diff --git a/src/mongo/db/s/sharding_state_test.cpp b/src/mongo/db/s/sharding_state_test.cpp index e4953f3488c..b42af89e774 100644 --- a/src/mongo/db/s/sharding_state_test.cpp +++ b/src/mongo/db/s/sharding_state_test.cpp @@ -252,93 +252,6 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) { ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString()); } -TEST_F(ShardingStateTest, InitializeAgainWithDifferentReplSetNameFails) { - auto clusterID = OID::gen(); - ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString( - ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); - shardIdentity.setClusterId(clusterID); - - ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); - - ShardIdentityType shardIdentity2; - shardIdentity2.setConfigsvrConnString( - ConnectionString(ConnectionString::SET, "a:1,b:2", "configRS")); - shardIdentity2.setShardName(shardName()); - shardIdentity2.setClusterId(clusterID); - - shardingState()->setGlobalInitMethodForTest( - [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) { - return Status{ErrorCodes::InternalError, "should not reach here"}; - }); - - auto status = shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity2); - ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status); - - ASSERT_TRUE(shardingState()->enabled()); - ASSERT_EQ(shardName(), shardingState()->getShardName()); - ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString()); -} - -TEST_F(ShardingStateTest, InitializeAgainWithDifferentShardNameFails) { - auto clusterID = OID::gen(); - ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString( - ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); - shardIdentity.setClusterId(clusterID); - - ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); - - ShardIdentityType shardIdentity2; - shardIdentity2.setConfigsvrConnString( - ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity2.setShardName("b"); - shardIdentity2.setClusterId(clusterID); - - shardingState()->setGlobalInitMethodForTest( - [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) { - return Status{ErrorCodes::InternalError, "should not reach here"}; - }); - - auto status = shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity2); - ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status); - - ASSERT_TRUE(shardingState()->enabled()); - ASSERT_EQ(shardName(), shardingState()->getShardName()); - ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString()); -} - -TEST_F(ShardingStateTest, InitializeAgainWithDifferentClusterIdFails) { - ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString( - ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); - shardIdentity.setClusterId(OID::gen()); - - ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); - - ShardIdentityType shardIdentity2; - shardIdentity2.setConfigsvrConnString( - ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity2.setShardName(shardName()); - shardIdentity2.setClusterId(OID::gen()); - - shardingState()->setGlobalInitMethodForTest( - [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) { - return Status{ErrorCodes::InternalError, "should not reach here"}; - }); - - auto status = shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity2); - ASSERT_EQ(ErrorCodes::InconsistentShardIdentity, status); - - ASSERT_TRUE(shardingState()->enabled()); - ASSERT_EQ(shardName(), shardingState()->getShardName()); - ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(operationContext()).toString()); -} - - // The below tests check for compatible startup parameters for --shardsvr, --overrideShardIdentity, // and queryableBackup (readOnly) mode. diff --git a/src/mongo/db/s/split_chunk_command.cpp b/src/mongo/db/s/split_chunk_command.cpp index d9ffe75245a..09d7f147b27 100644 --- a/src/mongo/db/s/split_chunk_command.cpp +++ b/src/mongo/db/s/split_chunk_command.cpp @@ -173,6 +173,9 @@ public: int options, std::string& errmsg, BSONObjBuilder& result) override { + auto shardingState = ShardingState::get(txn); + uassertStatusOK(shardingState->canAcceptShardedCommands()); + // // Check whether parameters passed to splitChunk are sound // @@ -221,24 +224,6 @@ public: if (!parseShardNameStatus.isOK()) return appendCommandStatus(result, parseShardNameStatus); - // - // Get sharding state up-to-date - // - ShardingState* const shardingState = ShardingState::get(txn); - - // This could be the first call that enables sharding - make sure we initialize the - // sharding state for this shard. - if (!shardingState->enabled()) { - if (cmdObj["configdb"].type() != String) { - errmsg = "sharding not enabled"; - warning() << errmsg; - return false; - } - - const string configdb = cmdObj["configdb"].String(); - shardingState->initializeFromConfigConnString(txn, configdb, shardName); - } - log() << "received splitChunk request: " << redact(cmdObj); // diff --git a/src/mongo/s/catalog/sharding_catalog_add_shard_test.cpp b/src/mongo/s/catalog/sharding_catalog_add_shard_test.cpp index 8ae7d068d40..df94c520318 100644 --- a/src/mongo/s/catalog/sharding_catalog_add_shard_test.cpp +++ b/src/mongo/s/catalog/sharding_catalog_add_shard_test.cpp @@ -1054,423 +1054,6 @@ TEST_F(AddShardTest, AddShardSucceedsEvenIfAddingDBsFromNewShardFails) { assertChangeWasLogged(expectedShard); } -TEST_F(AddShardTest, CompatibilityAddShardSuccess) { - // This is a hack to set the ReplicationCoordinator's MemberState to primary, since this test - // relies on behavior guarded by a check that we are a primary. - repl::ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(repl::MemberState::RS_PRIMARY); - - std::unique_ptr<RemoteCommandTargeterMock> targeter( - stdx::make_unique<RemoteCommandTargeterMock>()); - HostAndPort shardTarget("StandaloneHost:12345"); - targeter->setConnectionStringReturnValue(ConnectionString(shardTarget)); - targeter->setFindHostReturnValue(shardTarget); - targeterFactory()->addTargeterToReturn(ConnectionString(shardTarget), std::move(targeter)); - - std::string shardName = "StandaloneShard"; - - // The shard doc inserted into the config.shards collection on the config server. - ShardType addedShard; - addedShard.setName(shardName); - addedShard.setHost(shardTarget.toString()); - addedShard.setMaxSizeMB(100); - - // Add the shard to config.shards to trigger the OpObserver that performs shard aware - // initialization. - ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(), - ShardType::ConfigNS, - addedShard.toBSON(), - ShardingCatalogClient::kMajorityWriteConcern)); - - // The shardIdentity doc inserted into the admin.system.version collection on the shard. - expectShardIdentityUpsertReturnSuccess(shardTarget, shardName); - - // Since the shardIdentity upsert succeeded, the entry in config.shards should have been - // updated to reflect that the shard is now shard aware. - addedShard.setState(ShardType::ShardState::kShardAware); - - // Ensure that the shard document was properly added to config.shards. - assertShardExists(addedShard); -} - -TEST_F(AddShardTest, CompatibilityAddShardRetryOnGenericFailures) { - // This is a hack to set the ReplicationCoordinator's MemberState to primary, since this test - // relies on behavior guarded by a check that we are a primary. - repl::ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(repl::MemberState::RS_PRIMARY); - - std::unique_ptr<RemoteCommandTargeterMock> targeter( - stdx::make_unique<RemoteCommandTargeterMock>()); - HostAndPort shardTarget("StandaloneHost:12345"); - targeter->setConnectionStringReturnValue(ConnectionString(shardTarget)); - targeter->setFindHostReturnValue(shardTarget); - targeterFactory()->addTargeterToReturn(ConnectionString(shardTarget), std::move(targeter)); - - std::string shardName = "StandaloneShard"; - - // The shard doc inserted into the config.shards collection on the config server. - ShardType addedShard; - addedShard.setName(shardName); - addedShard.setHost(shardTarget.toString()); - addedShard.setMaxSizeMB(100); - - // Add the shard to config.shards to trigger the OpObserver that performs shard aware - // initialization. - ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(), - ShardType::ConfigNS, - addedShard.toBSON(), - ShardingCatalogClient::kMajorityWriteConcern)); - - // Simulate several failures upserting the shardIdentity doc on the shard. The upsert should - // be rescheduled and retried until it succeeds. - - expectShardIdentityUpsertReturnFailure( - shardTarget, shardName, {ErrorCodes::HostUnreachable, "host unreachable"}); - // Since the upsert returned failure, a (local) task to reschedule the upsert will be scheduled - // to run after an interval. Forward the network to just past the end of the interval so that - // the (local) task runs, at which point the upsert will be rescheduled to run immediately on - // the network. - forwardAddShardNetwork(networkForAddShard()->now() + - ShardingCatalogManager::getAddShardTaskRetryInterval() + - Milliseconds(10)); - - expectShardIdentityUpsertReturnFailure( - shardTarget, shardName, {ErrorCodes::WriteConcernFailed, "write concern failed"}); - forwardAddShardNetwork(networkForAddShard()->now() + - ShardingCatalogManager::getAddShardTaskRetryInterval() + - Milliseconds(10)); - - expectShardIdentityUpsertReturnFailure( - shardTarget, shardName, {ErrorCodes::RemoteChangeDetected, "remote change detected"}); - forwardAddShardNetwork(networkForAddShard()->now() + - ShardingCatalogManager::getAddShardTaskRetryInterval() + - Milliseconds(10)); - - // Finally, respond with success. - expectShardIdentityUpsertReturnSuccess(shardTarget, shardName); - - // Since the shardIdentity upsert succeeded, the entry in config.shards should have been - // updated to reflect that the shard is now shard aware. - addedShard.setState(ShardType::ShardState::kShardAware); - - // Ensure that the shard document was properly added to config.shards. - assertShardExists(addedShard); -} - -// Note: This test is separated from the generic failures one because there is a special code path -// to handle DuplicateKey errors, even though the server's actual behavior is the same. -TEST_F(AddShardTest, CompatibilityAddShardRetryOnDuplicateKeyFailure) { - // This is a hack to set the ReplicationCoordinator's MemberState to primary, since this test - // relies on behavior guarded by a check that we are a primary. - repl::ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(repl::MemberState::RS_PRIMARY); - - std::unique_ptr<RemoteCommandTargeterMock> targeter( - stdx::make_unique<RemoteCommandTargeterMock>()); - HostAndPort shardTarget("StandaloneHost:12345"); - targeter->setConnectionStringReturnValue(ConnectionString(shardTarget)); - targeter->setFindHostReturnValue(shardTarget); - targeterFactory()->addTargeterToReturn(ConnectionString(shardTarget), std::move(targeter)); - - std::string shardName = "StandaloneShard"; - - // The shard doc inserted into the config.shards collection on the config server. - ShardType addedShard; - addedShard.setName(shardName); - addedShard.setHost(shardTarget.toString()); - addedShard.setMaxSizeMB(100); - - // Add the shard to config.shards to trigger the OpObserver that performs shard aware - // initialization. - ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(), - ShardType::ConfigNS, - addedShard.toBSON(), - ShardingCatalogClient::kMajorityWriteConcern)); - - // Simulate several DuplicateKeyError failures while the shardIdentity document on the shard - // has not yet been manually deleted. - for (int i = 0; i < 3; i++) { - expectShardIdentityUpsertReturnFailure( - shardTarget, shardName, {ErrorCodes::DuplicateKey, "duplicate key"}); - // Since the upsert returned failure, a (local) task to reschedule the upsert will be - // scheduled to run after an interval. Forward the network to just past the end of the - // interval so that the (local) task runs, at which point the upsert will be rescheduled - // to run immediately on the network. - forwardAddShardNetwork(networkForAddShard()->now() + - ShardingCatalogManager::getAddShardTaskRetryInterval() + - Milliseconds(10)); - } - - // Finally, respond with success (simulating that conflicting the shardIdentity document has - // been deleted from the shard, and the new shardIdentity document was able to be inserted). - expectShardIdentityUpsertReturnSuccess(shardTarget, shardName); - - // Since the shardIdentity upsert succeeded, the entry in config.shards should have been - // updated to reflect that the shard is now shard aware. - addedShard.setState(ShardType::ShardState::kShardAware); - - // Ensure that the shard document was properly added to config.shards. - assertShardExists(addedShard); -} - -TEST_F(AddShardTest, CompatibilityAddShardCancelRequestCallbackBeforeTaskCompletes) { - // This is a hack to set the ReplicationCoordinator's MemberState to primary, since this test - // relies on behavior guarded by a check that we are a primary. - repl::ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(repl::MemberState::RS_PRIMARY); - - std::unique_ptr<RemoteCommandTargeterMock> targeter( - stdx::make_unique<RemoteCommandTargeterMock>()); - HostAndPort shardTarget("StandaloneHost:12345"); - targeter->setConnectionStringReturnValue(ConnectionString(shardTarget)); - targeter->setFindHostReturnValue(shardTarget); - targeterFactory()->addTargeterToReturn(ConnectionString(shardTarget), std::move(targeter)); - - std::string shardName = "StandaloneShard"; - - // The shard doc inserted into the config.shards collection on the config server. - ShardType addedShard; - addedShard.setName(shardName); - addedShard.setHost(shardTarget.toString()); - addedShard.setMaxSizeMB(100); - - // Add the shard to config.shards to trigger the OpObserver that performs shard aware - // initialization. - ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(), - ShardType::ConfigNS, - addedShard.toBSON(), - ShardingCatalogClient::kMajorityWriteConcern)); - - // Cancel the addShard task directly rather than via the OpObserver for deletes to config.shards - // so that we can check that the shard entry did not get updated after the callback ran - // (meaning the addShard task was successfully canceled and handled as such). - catalogManager()->cancelAddShardTaskIfNeeded(addedShard.getName()); - - // Run ready network operations manually to deliver the CallbackCanceled response to the - // callback. - networkForAddShard()->enterNetwork(); - networkForAddShard()->runReadyNetworkOperations(); - networkForAddShard()->exitNetwork(); - - // If the shard exists without the "state: 1" field, the callback did not run, as expected. - assertShardExists(addedShard); -} - -TEST_F(AddShardTest, CompatibilityAddShardCancelRequestCallbackAfterTaskCompletes) { - // This is a hack to set the ReplicationCoordinator's MemberState to primary, since this test - // relies on behavior guarded by a check that we are a primary. - repl::ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(repl::MemberState::RS_PRIMARY); - - std::unique_ptr<RemoteCommandTargeterMock> targeter( - stdx::make_unique<RemoteCommandTargeterMock>()); - HostAndPort shardTarget("StandaloneHost:12345"); - targeter->setConnectionStringReturnValue(ConnectionString(shardTarget)); - targeter->setFindHostReturnValue(shardTarget); - targeterFactory()->addTargeterToReturn(ConnectionString(shardTarget), std::move(targeter)); - - std::string shardName = "StandaloneShard"; - - // The shard doc inserted into the config.shards collection on the config server. - ShardType addedShard; - addedShard.setName(shardName); - addedShard.setHost(shardTarget.toString()); - addedShard.setMaxSizeMB(100); - - // Add the shard to config.shards to trigger the OpObserver that performs shard aware - // initialization. - ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(), - ShardType::ConfigNS, - addedShard.toBSON(), - ShardingCatalogClient::kMajorityWriteConcern)); - - // Manually progress the network to schedule a response for the upsert, but stop before the - // response is delivered to the callback. - // Note: this does all the steps in NetworkTestEnv::onCommand except runReadyNetworkOperations. - - networkForAddShard()->enterNetwork(); - auto noi = networkForAddShard()->getNextReadyRequest(); - auto request = noi->getRequest(); - - // Build a success response. - BatchedCommandResponse responseValue; - responseValue.setOk(true); - responseValue.setNModified(1); - Status responseStatus = Status::OK(); - BSONObjBuilder result; - result.appendElements(responseValue.toBSON()); - Command::appendCommandStatus(result, responseStatus); - const RemoteCommandResponse response(result.obj(), BSONObj(), Milliseconds(1)); - - networkForAddShard()->scheduleResponse(noi, networkForAddShard()->now(), response); - networkForAddShard()->exitNetwork(); - - // Cancel the addShard task directly rather than via the OpObserver for deletes to config.shards - // so that we can check that the shard entry did not get updated after the callback ran - // (meaning the addShard task was successfully canceled and handled as such). - catalogManager()->cancelAddShardTaskIfNeeded(addedShard.getName()); - - // Now allow the network to run the callback with the response we scheduled earlier. - networkForAddShard()->enterNetwork(); - networkForAddShard()->runReadyNetworkOperations(); - networkForAddShard()->exitNetwork(); - - // If the shard exists without the "state: 1" field, the callback did not run, as expected. - assertShardExists(addedShard); -} - -TEST_F(AddShardTest, CompatibilityAddShardCancelRequestCallbackReAddShard) { - // This is a hack to set the ReplicationCoordinator's MemberState to primary, since this test - // relies on behavior guarded by a check that we are a primary. - repl::ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(repl::MemberState::RS_PRIMARY); - - std::unique_ptr<RemoteCommandTargeterMock> targeter( - stdx::make_unique<RemoteCommandTargeterMock>()); - HostAndPort shardTarget("StandaloneHost:12345"); - targeter->setConnectionStringReturnValue(ConnectionString(shardTarget)); - targeter->setFindHostReturnValue(shardTarget); - targeterFactory()->addTargeterToReturn(ConnectionString(shardTarget), std::move(targeter)); - - std::string shardName = "StandaloneShard"; - - // The shard doc inserted into the config.shards collection on the config server. - ShardType addedShard; - addedShard.setName(shardName); - addedShard.setHost(shardTarget.toString()); - addedShard.setMaxSizeMB(100); - - // Add the shard to config.shards to trigger the OpObserver that performs shard aware - // initialization. - ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(), - ShardType::ConfigNS, - addedShard.toBSON(), - ShardingCatalogClient::kMajorityWriteConcern)); - - // Cancel the addShard task directly rather than via the OpObserver for deletes to config.shards - // so that we can check that the shard entry did not get updated after the callback ran - // (meaning the addShard task was successfully canceled and handled as such). - catalogManager()->cancelAddShardTaskIfNeeded(addedShard.getName()); - - // Before delivering the CallbackCanceled response, simulate another addShard request for the - // same shard directly. - ASSERT_OK(catalogManager()->upsertShardIdentityOnShard(operationContext(), addedShard)); - - // Run ready network operations manually to deliver the CallbackCanceled response to the - // callback. - networkForAddShard()->enterNetwork(); - networkForAddShard()->runReadyNetworkOperations(); - networkForAddShard()->exitNetwork(); - - // Ensure the shard entry's state field was not updated. - assertShardExists(addedShard); - - // Make the shard respond with success to the second addShard task. - expectShardIdentityUpsertReturnSuccess(shardTarget, shardName); - - addedShard.setState(ShardType::ShardState::kShardAware); - assertShardExists(addedShard); -} - -TEST_F(AddShardTest, CompatibilityAddShardCancelRescheduledCallback) { - // This is a hack to set the ReplicationCoordinator's MemberState to primary, since this test - // relies on behavior guarded by a check that we are a primary. - repl::ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(repl::MemberState::RS_PRIMARY); - - std::unique_ptr<RemoteCommandTargeterMock> targeter( - stdx::make_unique<RemoteCommandTargeterMock>()); - HostAndPort shardTarget("StandaloneHost:12345"); - targeter->setConnectionStringReturnValue(ConnectionString(shardTarget)); - targeter->setFindHostReturnValue(shardTarget); - targeterFactory()->addTargeterToReturn(ConnectionString(shardTarget), std::move(targeter)); - - std::string shardName = "StandaloneShard"; - - // The shard doc inserted into the config.shards collection on the config server. - ShardType addedShard; - addedShard.setName(shardName); - addedShard.setHost(shardTarget.toString()); - addedShard.setMaxSizeMB(100); - - // Add the shard to config.shards to trigger the OpObserver that performs shard aware - // initialization. - ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(), - ShardType::ConfigNS, - addedShard.toBSON(), - ShardingCatalogClient::kMajorityWriteConcern)); - - // Make the network respond with failure so that the request is rescheduled. - expectShardIdentityUpsertReturnFailure( - shardTarget, shardName, {ErrorCodes::HostUnreachable, "host unreachable"}); - - // Cancel the addShard task directly rather than via the OpObserver for deletes to config.shards - // so that we can check that the shard entry did not get updated after the callback ran - // (meaning the addShard task was successfully canceled and handled as such). - // Note: Since the task being canceled was not a network request, the callback is run as soon - // as the callback is canceled by the task executor, so we do not need to run ready network - // requests. - catalogManager()->cancelAddShardTaskIfNeeded(addedShard.getName()); - - assertShardExists(addedShard); -} - -TEST_F(AddShardTest, CompatibilityAddShardCancelRescheduledCallbackReAddShard) { - // This is a hack to set the ReplicationCoordinator's MemberState to primary, since this test - // relies on behavior guarded by a check that we are a primary. - repl::ReplicationCoordinator::get(getGlobalServiceContext()) - ->setFollowerMode(repl::MemberState::RS_PRIMARY); - - std::unique_ptr<RemoteCommandTargeterMock> targeter( - stdx::make_unique<RemoteCommandTargeterMock>()); - HostAndPort shardTarget("StandaloneHost:12345"); - targeter->setConnectionStringReturnValue(ConnectionString(shardTarget)); - targeter->setFindHostReturnValue(shardTarget); - targeterFactory()->addTargeterToReturn(ConnectionString(shardTarget), std::move(targeter)); - - std::string shardName = "StandaloneShard"; - - // The shard doc inserted into the config.shards collection on the config server. - ShardType addedShard; - addedShard.setName(shardName); - addedShard.setHost(shardTarget.toString()); - addedShard.setMaxSizeMB(100); - - // Add the shard to config.shards to trigger the OpObserver that performs shard aware - // initialization. - ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(), - ShardType::ConfigNS, - addedShard.toBSON(), - ShardingCatalogClient::kMajorityWriteConcern)); - - // Make the network respond with failure so that the request is rescheduled. - expectShardIdentityUpsertReturnFailure( - shardTarget, shardName, {ErrorCodes::HostUnreachable, "host unreachable"}); - - // Simulate a removeShard by deleting the shard's entry in config.shards. This will trigger - // canceling the addShard task via the OpObserver. - // Note: Since the task being canceled was not a network request, the callback is run as soon - // as the callback is canceled by the task executor, so we do not need to run ready network - // requests. - ASSERT_OK(catalogClient()->removeConfigDocuments(operationContext(), - ShardType::ConfigNS, - BSON("_id" << addedShard.getName()), - ShardingCatalogClient::kMajorityWriteConcern)); - - // Another addShard request for the same shard should succeed (simulated by re-inserting the - // same shard entry into config.shards). - ASSERT_OK(catalogClient()->insertConfigDocument(operationContext(), - ShardType::ConfigNS, - addedShard.toBSON(), - ShardingCatalogClient::kMajorityWriteConcern)); - - // Make the shard respond with success to the second addShard task. - expectShardIdentityUpsertReturnSuccess(shardTarget, shardName); - - addedShard.setState(ShardType::ShardState::kShardAware); - assertShardExists(addedShard); -} - // Tests both that trying to add a shard with the same host as an existing shard but with different // options fails, and that adding a shard with the same host as an existing shard with the *same* // options succeeds. diff --git a/src/mongo/s/catalog/sharding_catalog_manager.h b/src/mongo/s/catalog/sharding_catalog_manager.h index bb7946de096..e3cae76b7e3 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager.h +++ b/src/mongo/s/catalog/sharding_catalog_manager.h @@ -190,21 +190,6 @@ public: virtual void discardCachedConfigDatabaseInitializationState() = 0; /** - * For upgrade from 3.2 to 3.4, for each shard in config.shards that is not marked as sharding - * aware, schedules a task to upsert a shardIdentity doc into the shard and mark the shard as - * sharding aware. - */ - virtual Status initializeShardingAwarenessOnUnawareShards(OperationContext* txn) = 0; - - /** - * For rolling upgrade and backwards compatibility with 3.2 mongos, schedules an asynchronous - * task against addShard executor to upsert a shardIdentity doc into the new shard described by - * shardType. On failure to upsert the doc on the shard, the task reschedules itself with a - * delay indefinitely, and is canceled only when a removeShard is called. - */ - virtual Status upsertShardIdentityOnShard(OperationContext* txn, ShardType shardType) = 0; - - /** * Returns a BSON representation of an update request that can be used to insert a * shardIdentity doc into the shard for the given shardType (or update the shard's existing * shardIdentity doc's configsvrConnString if the _id, shardName, and clusterId do not @@ -214,12 +199,6 @@ public: const std::string& shardName) = 0; /** - * For rolling upgrade and backwards compatibility, cancels a pending addShard task to upsert - * a shardIdentity document into the shard with id shardId (if there is such a task pending). - */ - virtual void cancelAddShardTaskIfNeeded(const ShardId& shardId) = 0; - - /** * Runs the setFeatureCompatibilityVersion command on all shards. */ virtual Status setFeatureCompatibilityVersionOnShards(OperationContext* txn, diff --git a/src/mongo/s/catalog/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/sharding_catalog_manager_impl.h index 5d81313446c..842b11f6920 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_impl.h +++ b/src/mongo/s/catalog/sharding_catalog_manager_impl.h @@ -103,15 +103,9 @@ public: const ConnectionString& shardConnectionString, const long long maxSize) override; - Status initializeShardingAwarenessOnUnawareShards(OperationContext* txn) override; - - Status upsertShardIdentityOnShard(OperationContext* txn, ShardType shardType) override; - BSONObj createShardIdentityUpsertForAddShard(OperationContext* txn, const std::string& shardName) override; - void cancelAddShardTaskIfNeeded(const ShardId& shardId) override; - Status setFeatureCompatibilityVersionOnShards(OperationContext* txn, const std::string& version) override; @@ -180,71 +174,6 @@ private: const std::string& dbName, const BSONObj& cmdObj); - /** - * Retrieves all shards that are not marked as sharding aware (state = 1) in this cluster. - */ - StatusWith<std::vector<ShardType>> _getAllShardingUnawareShards(OperationContext* txn); - - /** - * Callback function used when rescheduling an addShard task after the first attempt failed. - * Checks if the callback has been canceled, and if not, proceeds to call - * _scheduleAddShardTask. - */ - void _scheduleAddShardTaskUnlessCanceled(const executor::TaskExecutor::CallbackArgs& cbArgs, - const ShardType shardType, - std::shared_ptr<RemoteCommandTargeter> targeter, - const BSONObj commandRequest); - - /** - * For rolling upgrade and backwards compatibility with 3.2 mongos, schedules an asynchronous - * task against the addShard executor to upsert a shardIdentity doc into the new shard - * described by shardType. If there is an existing such task for this shardId (as tracked by - * the _addShardHandles map), a new task is not scheduled. There could be an existing such task - * if addShard was called previously, but the upsert has not yet succeeded on the shard. - */ - void _scheduleAddShardTask(const ShardType shardType, - std::shared_ptr<RemoteCommandTargeter> targeter, - const BSONObj commandRequest, - const bool isRetry); - - /** - * Callback function for the asynchronous upsert of the shardIdentity doc scheduled by - * scheduleAddShardTaskIfNeeded. Checks the response from the shard, and updates config.shards - * to mark the shard as shardAware on success. On failure to perform the upsert, this callback - * schedules scheduleAddShardTaskIfNeeded to be called again after a delay. - */ - void _handleAddShardTaskResponse( - const executor::TaskExecutor::RemoteCommandCallbackArgs& cbArgs, - ShardType shardType, - std::shared_ptr<RemoteCommandTargeter> targeter); - - /** - * Checks if a running or scheduled addShard task exists for the shard with id shardId. - * The caller must hold _addShardHandlesMutex. - */ - bool _hasAddShardHandle_inlock(const ShardId& shardId); - - /** - * Returns the CallbackHandle associated with the addShard task for the shard with id shardId. - * Invariants that there is a handle being tracked for that shard. - */ - const executor::TaskExecutor::CallbackHandle& _getAddShardHandle_inlock(const ShardId& shardId); - - /** - * Adds CallbackHandle handle for the shard with id shardId to the map of running or scheduled - * addShard tasks. - * The caller must hold _addShardHandlesMutex. - */ - void _trackAddShardHandle_inlock( - const ShardId shardId, const StatusWith<executor::TaskExecutor::CallbackHandle>& handle); - - /** - * Removes the handle to a running or scheduled addShard task callback for the shard with id - * shardId. - * The caller must hold _addShardHandlesMutex. - */ - void _untrackAddShardHandle_inlock(const ShardId& shardId); - // // All member variables are labeled with one of the following codes indicating the // synchronization rules for accessing them. @@ -270,19 +199,6 @@ private: // to servers that are not yet in the ShardRegistry. std::unique_ptr<executor::TaskExecutor> _executorForAddShard; // (R) - // For rolling upgrade and backwards compatibility with 3.2 mongos, maintains a mapping of - // a shardId to an outstanding addShard task scheduled against the _executorForAddShard. - // A "addShard" task upserts the shardIdentity document into the new shard. Such a task is - // scheduled: - // 1) on a config server's transition to primary for each shard in config.shards that is not - // marked as sharding aware - // 2) on a direct insert to the config.shards collection (usually from a 3.2 mongos). - // This map tracks that only one such task per shard can be running at a time. - std::map<ShardId, executor::TaskExecutor::CallbackHandle> _addShardHandles; - - // Protects the _addShardHandles map. - stdx::mutex _addShardHandlesMutex; - /** * Lock for shard zoning operations. This should be acquired when doing any operations that * can affect the config.tags collection or the tags field of the config.shards collection. diff --git a/src/mongo/s/catalog/sharding_catalog_manager_shard_operations_impl.cpp b/src/mongo/s/catalog/sharding_catalog_manager_shard_operations_impl.cpp index 3a7aef9c9df..b42ed6bb69e 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_shard_operations_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_manager_shard_operations_impl.cpp @@ -704,308 +704,6 @@ void ShardingCatalogManagerImpl::appendConnectionStats(executor::ConnectionPoolS _executorForAddShard->appendConnectionStats(stats); } -Status ShardingCatalogManagerImpl::initializeShardingAwarenessOnUnawareShards( - OperationContext* txn) { - auto swShards = _getAllShardingUnawareShards(txn); - if (!swShards.isOK()) { - return swShards.getStatus(); - } else { - auto shards = std::move(swShards.getValue()); - for (const auto& shard : shards) { - auto status = upsertShardIdentityOnShard(txn, shard); - if (!status.isOK()) { - return status; - } - } - } - - // Note: this OK status means only that tasks to initialize sharding awareness on the shards - // were scheduled against the task executor, not that the tasks actually succeeded. - return Status::OK(); -} - -StatusWith<std::vector<ShardType>> ShardingCatalogManagerImpl::_getAllShardingUnawareShards( - OperationContext* txn) { - std::vector<ShardType> shards; - auto findStatus = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - txn, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - NamespaceString(ShardType::ConfigNS), - BSON( - "state" << BSON("$ne" << static_cast<std::underlying_type<ShardType::ShardState>::type>( - ShardType::ShardState::kShardAware))), // shard is sharding unaware - BSONObj(), // no sort - boost::none); // no limit - if (!findStatus.isOK()) { - return findStatus.getStatus(); - } - - for (const BSONObj& doc : findStatus.getValue().docs) { - auto shardRes = ShardType::fromBSON(doc); - if (!shardRes.isOK()) { - return {ErrorCodes::FailedToParse, - str::stream() << "Failed to parse shard " << causedBy(shardRes.getStatus()) - << doc}; - } - - Status validateStatus = shardRes.getValue().validate(); - if (!validateStatus.isOK()) { - return {validateStatus.code(), - str::stream() << "Failed to validate shard " << causedBy(validateStatus) - << doc}; - } - - shards.push_back(shardRes.getValue()); - } - - return shards; -} - -Status ShardingCatalogManagerImpl::upsertShardIdentityOnShard(OperationContext* txn, - ShardType shardType) { - - auto commandRequest = createShardIdentityUpsertForAddShard(txn, shardType.getName()); - - auto swConnString = ConnectionString::parse(shardType.getHost()); - if (!swConnString.isOK()) { - return swConnString.getStatus(); - } - - // TODO: Don't create a detached Shard object, create a detached RemoteCommandTargeter - // instead. - const std::shared_ptr<Shard> shard{ - Grid::get(txn)->shardRegistry()->createConnection(swConnString.getValue())}; - invariant(shard); - auto targeter = shard->getTargeter(); - - _scheduleAddShardTask( - std::move(shardType), std::move(targeter), std::move(commandRequest), false); - - return Status::OK(); -} - -void ShardingCatalogManagerImpl::cancelAddShardTaskIfNeeded(const ShardId& shardId) { - stdx::lock_guard<stdx::mutex> lk(_addShardHandlesMutex); - if (_hasAddShardHandle_inlock(shardId)) { - auto cbHandle = _getAddShardHandle_inlock(shardId); - _executorForAddShard->cancel(cbHandle); - // Untrack the handle here so that if this shard is re-added before the CallbackCanceled - // status is delivered to the callback, a new addShard task for the shard will be - // created. - _untrackAddShardHandle_inlock(shardId); - } -} - -void ShardingCatalogManagerImpl::_scheduleAddShardTaskUnlessCanceled( - const CallbackArgs& cbArgs, - const ShardType shardType, - std::shared_ptr<RemoteCommandTargeter> targeter, - const BSONObj commandRequest) { - if (cbArgs.status == ErrorCodes::CallbackCanceled) { - return; - } - _scheduleAddShardTask( - std::move(shardType), std::move(targeter), std::move(commandRequest), true); -} - -void ShardingCatalogManagerImpl::_scheduleAddShardTask( - const ShardType shardType, - std::shared_ptr<RemoteCommandTargeter> targeter, - const BSONObj commandRequest, - const bool isRetry) { - stdx::lock_guard<stdx::mutex> lk(_addShardHandlesMutex); - - if (isRetry) { - // Untrack the handle from scheduleWorkAt, and schedule a new addShard task. - _untrackAddShardHandle_inlock(shardType.getName()); - } else { - // We should never be able to schedule an addShard task while one is running, because - // there is a unique index on the _id field in config.shards. - invariant(!_hasAddShardHandle_inlock(shardType.getName())); - } - - // Schedule the shardIdentity upsert request to run immediately, and track the handle. - - auto swHost = targeter->findHostWithMaxWait(ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - Milliseconds(kDefaultFindHostMaxWaitTime)); - if (!swHost.isOK()) { - // A 3.2 mongos must have previously successfully communicated with hosts in this shard, - // so a failure to find a host here is probably transient, and it is safe to retry. - warning() << "Failed to find host for shard " << shardType - << " when trying to upsert a shardIdentity document, " - << causedBy(swHost.getStatus()); - const Date_t now = _executorForAddShard->now(); - const Date_t when = now + getAddShardTaskRetryInterval(); - _trackAddShardHandle_inlock( - shardType.getName(), - _executorForAddShard->scheduleWorkAt( - when, - stdx::bind(&ShardingCatalogManagerImpl::_scheduleAddShardTaskUnlessCanceled, - this, - stdx::placeholders::_1, - shardType, - std::move(targeter), - std::move(commandRequest)))); - return; - } - - executor::RemoteCommandRequest request( - swHost.getValue(), "admin", commandRequest, rpc::makeEmptyMetadata(), nullptr, Seconds(30)); - - const RemoteCommandCallbackFn callback = - stdx::bind(&ShardingCatalogManagerImpl::_handleAddShardTaskResponse, - this, - stdx::placeholders::_1, - shardType, - std::move(targeter)); - - if (isRetry) { - log() << "Retrying upsert of shardIdentity document into shard " << shardType.getName(); - } - _trackAddShardHandle_inlock(shardType.getName(), - _executorForAddShard->scheduleRemoteCommand(request, callback)); -} - -void ShardingCatalogManagerImpl::_handleAddShardTaskResponse( - const RemoteCommandCallbackArgs& cbArgs, - ShardType shardType, - std::shared_ptr<RemoteCommandTargeter> targeter) { - stdx::unique_lock<stdx::mutex> lk(_addShardHandlesMutex); - - // If the callback has been canceled (either due to shutdown or the shard being removed), we - // do not need to reschedule the task or update config.shards. - Status responseStatus = cbArgs.response.status; - if (responseStatus == ErrorCodes::CallbackCanceled) { - return; - } - - // If the handle no longer exists, the shard must have been removed, but the callback must not - // have been canceled until after the task had completed. In this case as well, we do not need - // to reschedule the task or update config.shards. - if (!_hasAddShardHandle_inlock(shardType.getName())) { - return; - } - - // Untrack the handle from scheduleRemoteCommand regardless of whether the command - // succeeded. If it failed, we will track the handle for the rescheduled task before - // releasing the mutex. - _untrackAddShardHandle_inlock(shardType.getName()); - - // Examine the response to determine if the upsert succeeded. - - bool rescheduleTask = false; - - auto swResponse = cbArgs.response; - if (!swResponse.isOK()) { - warning() << "Failed to upsert shardIdentity document during addShard into shard " - << shardType.getName() << "(" << shardType.getHost() - << "). The shardIdentity upsert will continue to be retried. " - << causedBy(swResponse.status); - rescheduleTask = true; - } else { - // Create a CommandResponse object in order to use processBatchWriteResponse. - BSONObj responseObj = swResponse.data.getOwned(); - BSONObj responseMetadata = swResponse.metadata.getOwned(); - Status commandStatus = getStatusFromCommandResult(responseObj); - Status writeConcernStatus = getWriteConcernStatusFromCommandResult(responseObj); - Shard::CommandResponse commandResponse(std::move(responseObj), - std::move(responseMetadata), - std::move(commandStatus), - std::move(writeConcernStatus)); - - BatchedCommandResponse batchResponse; - auto batchResponseStatus = - Shard::CommandResponse::processBatchWriteResponse(commandResponse, &batchResponse); - if (!batchResponseStatus.isOK()) { - if (batchResponseStatus == ErrorCodes::DuplicateKey) { - warning() - << "Received duplicate key error when inserting the shardIdentity " - "document into " - << shardType.getName() << "(" << shardType.getHost() - << "). This means the shard has a shardIdentity document with a clusterId " - "that differs from this cluster's clusterId. It may still belong to " - "or not have been properly removed from another cluster. The " - "shardIdentity upsert will continue to be retried."; - } else { - warning() << "Failed to upsert shardIdentity document into shard " - << shardType.getName() << "(" << shardType.getHost() - << ") during addShard. The shardIdentity upsert will continue to be " - "retried. " - << causedBy(batchResponseStatus); - } - rescheduleTask = true; - } - } - - if (rescheduleTask) { - // If the command did not succeed, schedule the upsert shardIdentity task again with a - // delay. - const Date_t now = _executorForAddShard->now(); - const Date_t when = now + getAddShardTaskRetryInterval(); - - // Track the handle from scheduleWorkAt. - _trackAddShardHandle_inlock( - shardType.getName(), - _executorForAddShard->scheduleWorkAt( - when, - stdx::bind(&ShardingCatalogManagerImpl::_scheduleAddShardTaskUnlessCanceled, - this, - stdx::placeholders::_1, - shardType, - std::move(targeter), - std::move(cbArgs.request.cmdObj)))); - return; - } - - // If the command succeeded, update config.shards to mark the shard as shardAware. - - // Release the _addShardHandlesMutex before updating config.shards, since it involves disk - // I/O. - // At worst, a redundant addShard task will be scheduled by a new primary if the current - // primary fails during that write. - lk.unlock(); - - // This thread is part of a thread pool owned by the addShard TaskExecutor. Threads in that - // pool are not created with Client objects associated with them, so a Client is created and - // attached here to do the local update. The Client is destroyed at the end of the scope, - // leaving the thread state as it was before. - Client::initThread(getThreadName().c_str()); - ON_BLOCK_EXIT([&] { Client::destroy(); }); - - // Use the thread's Client to create an OperationContext to perform the local write to - // config.shards. This OperationContext will automatically be destroyed when it goes out of - // scope at the end of this code block. - auto txnPtr = cc().makeOperationContext(); - - // Use kNoWaitWriteConcern to prevent waiting in this callback, since we don't handle a - // failed response anyway. If the write is rolled back, the new config primary will attempt to - // initialize sharding awareness on this shard again, and this update to config.shards will - // be automatically retried then. If it fails because the shard was removed through the normal - // removeShard path (so the entry in config.shards was deleted), no new addShard task will - // get scheduled on the next transition to primary. - auto updateStatus = - Grid::get(txnPtr.get()) - ->catalogClient(txnPtr.get()) - ->updateConfigDocument( - txnPtr.get(), - ShardType::ConfigNS, - BSON(ShardType::name(shardType.getName())), - BSON("$set" << BSON( - ShardType::state() - << static_cast<std::underlying_type<ShardType::ShardState>::type>( - ShardType::ShardState::kShardAware))), - false, - kNoWaitWriteConcern); - - if (!updateStatus.isOK()) { - warning() << "Failed to mark shard " << shardType.getName() << "(" << shardType.getHost() - << ") as shardAware in config.shards. This will be retried the next time a " - "config server transitions to primary. " - << causedBy(updateStatus.getStatus()); - } -} - BSONObj ShardingCatalogManagerImpl::createShardIdentityUpsertForAddShard( OperationContext* txn, const std::string& shardName) { std::unique_ptr<BatchedUpdateDocument> updateDoc(new BatchedUpdateDocument()); @@ -1035,29 +733,4 @@ BSONObj ShardingCatalogManagerImpl::createShardIdentityUpsertForAddShard( return commandRequest.toBSON(); } -bool ShardingCatalogManagerImpl::_hasAddShardHandle_inlock(const ShardId& shardId) { - return _addShardHandles.find(shardId) != _addShardHandles.end(); -} - -const CallbackHandle& ShardingCatalogManagerImpl::_getAddShardHandle_inlock( - const ShardId& shardId) { - invariant(_hasAddShardHandle_inlock(shardId)); - return _addShardHandles.find(shardId)->second; -} - -void ShardingCatalogManagerImpl::_trackAddShardHandle_inlock( - const ShardId shardId, const StatusWith<CallbackHandle>& swHandle) { - if (swHandle.getStatus() == ErrorCodes::ShutdownInProgress) { - return; - } - fassert(40219, swHandle.getStatus()); - _addShardHandles.insert(std::pair<ShardId, CallbackHandle>(shardId, swHandle.getValue())); -} - -void ShardingCatalogManagerImpl::_untrackAddShardHandle_inlock(const ShardId& shardId) { - auto it = _addShardHandles.find(shardId); - invariant(it != _addShardHandles.end()); - _addShardHandles.erase(shardId); -} - } // namespace mongo diff --git a/src/mongo/s/client/sharding_network_connection_hook.cpp b/src/mongo/s/client/sharding_network_connection_hook.cpp index e3544a4617b..9133a97e697 100644 --- a/src/mongo/s/client/sharding_network_connection_hook.cpp +++ b/src/mongo/s/client/sharding_network_connection_hook.cpp @@ -90,31 +90,11 @@ Status ShardingNetworkConnectionHook::validateHostImpl( StatusWith<boost::optional<executor::RemoteCommandRequest>> ShardingNetworkConnectionHook::makeRequest(const HostAndPort& remoteHost) { - auto shard = grid.shardRegistry()->getShardForHostNoReload(remoteHost); - if (!shard) { - return {ErrorCodes::ShardNotFound, - str::stream() << "No shard found for host: " << remoteHost.toString()}; - } - if (shard->isConfig()) { - // No need to initialize sharding metadata if talking to a config server - return {boost::none}; - } - - SetShardVersionRequest ssv = SetShardVersionRequest::makeForInitNoPersist( - grid.shardRegistry()->getConfigServerConnectionString(), - shard->getId(), - shard->getConnString()); - executor::RemoteCommandRequest request; - request.dbname = "admin"; - request.target = remoteHost; - request.timeout = Seconds{30}; - request.cmdObj = ssv.toBSON(); - - return {request}; + return {boost::none}; } Status ShardingNetworkConnectionHook::handleReply(const HostAndPort& remoteHost, executor::RemoteCommandResponse&& response) { - return getStatusFromCommandResult(response.data); + MONGO_UNREACHABLE; } } // namespace mongo diff --git a/src/mongo/s/commands/cluster_move_primary_cmd.cpp b/src/mongo/s/commands/cluster_move_primary_cmd.cpp index 02b8bfc43db..a482971ebbd 100644 --- a/src/mongo/s/commands/cluster_move_primary_cmd.cpp +++ b/src/mongo/s/commands/cluster_move_primary_cmd.cpp @@ -51,7 +51,6 @@ #include "mongo/s/commands/sharded_command_processing.h" #include "mongo/s/config.h" #include "mongo/s/grid.h" -#include "mongo/s/set_shard_version_request.h" #include "mongo/util/log.h" namespace mongo { @@ -180,19 +179,6 @@ public: ScopedDbConnection toconn(toShard->getConnString()); - { - // Make sure the target node is sharding aware. - auto ssvRequest = SetShardVersionRequest::makeForInitNoPersist( - shardRegistry->getConfigServerConnectionString(), - toShard->getId(), - toShard->getConnString()); - BSONObj res; - bool ok = toconn->runCommand("admin", ssvRequest.toBSON(), res); - if (!ok) { - return appendCommandStatus(result, getStatusFromCommandResult(res)); - } - } - // TODO ERH - we need a clone command which replays operations from clone start to now // can just use local.oplog.$main BSONObj cloneRes; diff --git a/src/mongo/s/set_shard_version_request.cpp b/src/mongo/s/set_shard_version_request.cpp index 979a5f5f915..267bf91eb98 100644 --- a/src/mongo/s/set_shard_version_request.cpp +++ b/src/mongo/s/set_shard_version_request.cpp @@ -83,15 +83,6 @@ SetShardVersionRequest SetShardVersionRequest::makeForInit( return SetShardVersionRequest(configServer, shardName, shardConnectionString); } -SetShardVersionRequest SetShardVersionRequest::makeForInitNoPersist( - const ConnectionString& configServer, - const ShardId& shardName, - const ConnectionString& shardConnectionString) { - auto ssv = SetShardVersionRequest(configServer, shardName, shardConnectionString); - ssv._noConnectionVersioning = true; - return ssv; -} - SetShardVersionRequest SetShardVersionRequest::makeForVersioning( const ConnectionString& configServer, const ShardId& shardName, diff --git a/src/mongo/s/set_shard_version_request.h b/src/mongo/s/set_shard_version_request.h index 857866a36a9..7f6374d17d0 100644 --- a/src/mongo/s/set_shard_version_request.h +++ b/src/mongo/s/set_shard_version_request.h @@ -60,19 +60,6 @@ public: const ConnectionString& shardConnectionString); /** - * Constructs a new set shard version request, which is of the "init" type, meaning it has no - * namespace or version information associated with it and the init flag is set. In - * addition, the request will contain the "noConnectionVersioning" field, which means that the - * connection WILL NOT be marked as "versioned". DO NOT USE except on connections only used - * with operations that do per-operation versioning, and do not depend on the connection being - * marked as sharded. - */ - static SetShardVersionRequest makeForInitNoPersist( - const ConnectionString& configServer, - const ShardId& shardName, - const ConnectionString& shardConnectionString); - - /** * Constructs a new set shard version request, which is of the "versioning" type, meaning it has * both initialization data and namespace and version information associated with it. * diff --git a/src/mongo/s/set_shard_version_request_test.cpp b/src/mongo/s/set_shard_version_request_test.cpp index baae7c77cb7..7e209414765 100644 --- a/src/mongo/s/set_shard_version_request_test.cpp +++ b/src/mongo/s/set_shard_version_request_test.cpp @@ -284,36 +284,6 @@ TEST(SetShardVersionRequest, ToSSVCommandInit) { << 30000)); } -TEST(SetShardVersionRequest, ToSSVCommandInitNoConnectionVersioning) { - SetShardVersionRequest ssv = - SetShardVersionRequest::makeForInitNoPersist(configCS, ShardId("TestShard"), shardCS); - - ASSERT(ssv.isInit()); - ASSERT(ssv.isAuthoritative()); - ASSERT(ssv.getNoConnectionVersioning()); - ASSERT_EQ(ssv.getConfigServer().toString(), configCS.toString()); - ASSERT_EQ(ssv.getShardName(), "TestShard"); - ASSERT_EQ(ssv.getShardConnectionString().toString(), shardCS.toString()); - - ASSERT_BSONOBJ_EQ(ssv.toBSON(), - BSON("setShardVersion" - << "" - << "init" - << true - << "authoritative" - << true - << "configdb" - << configCS.toString() - << "shard" - << "TestShard" - << "shardHost" - << shardCS.toString() - << "maxTimeMS" - << 30000 - << "noConnectionVersioning" - << true)); -} - TEST(SetShardVersionRequest, ToSSVCommandFull) { const ChunkVersion chunkVersion(1, 2, OID::gen()); |