diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2016-06-13 11:02:02 -0400 |
---|---|---|
committer | Esha Maharishi <esha.maharishi@mongodb.com> | 2016-06-13 11:40:18 -0400 |
commit | 677c25dbc453e90c0d2ef6b77b4284ea34c1b949 (patch) | |
tree | 65fe3b62d3e681d9fc908b7b1cc2763f036a033e | |
parent | 5121174a62246efed908deb8757039f103a368cd (diff) | |
download | mongo-677c25dbc453e90c0d2ef6b77b4284ea34c1b949.tar.gz |
SERVER-24126 fix shared library compile
-rw-r--r-- | buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml | 1 | ||||
-rw-r--r-- | buildscripts/resmokeconfig/suites/sharding_last_stable_mongos.yml | 2 | ||||
-rw-r--r-- | jstests/sharding/min_optime_recovery.js | 94 | ||||
-rw-r--r-- | jstests/sharding/shard_aware_on_add_shard.js | 56 | ||||
-rw-r--r-- | jstests/sharding/shard_identity_config_update.js | 15 | ||||
-rw-r--r-- | jstests/sharding/sharding_state_after_stepdown.js | 160 | ||||
-rw-r--r-- | jstests/sharding/startup_with_all_configs_down.js | 9 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/type_shard_identity.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/s/type_shard_identity.h | 10 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp | 95 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/catalog_manager_replica_set.h | 8 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp | 120 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.cpp | 10 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.h | 11 |
16 files changed, 337 insertions, 288 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml index ef52c8a5ea7..d7c91283678 100644 --- a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml +++ b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml @@ -96,7 +96,6 @@ selector: - jstests/sharding/startup_with_all_configs_down.js - jstests/sharding/lagged_config_secondary.js - jstests/sharding/autodiscover_config_rs_from_secondary.js - - jstests/sharding/sharding_state_after_stepdown.js - jstests/sharding/rs_stepdown_and_pooling.js - jstests/sharding/mongos_no_replica_set_refresh.js - jstests/sharding/primary_config_server_blackholed_from_mongos.js diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos.yml index e92c1d84fbc..0e8c3f150ff 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos.yml @@ -7,7 +7,9 @@ selector: - jstests/sharding/error_propagation.js # New features for v3.4 - jstests/sharding/shard_aware_init.js + - jstests/sharding/shard_aware_on_add_shard.js - jstests/sharding/shard_aware_primary_failover.js + - jstests/sharding/shard_identity_config_update.js # v3.4 changes $in shard key extraction. - jstests/sharding/upsert_sharded.js - jstests/sharding/update_sharded.js diff --git a/jstests/sharding/min_optime_recovery.js b/jstests/sharding/min_optime_recovery.js index 69ccc78f02c..4f8595feb69 100644 --- a/jstests/sharding/min_optime_recovery.js +++ b/jstests/sharding/min_optime_recovery.js @@ -1,85 +1,37 @@ /** - * Basic testing for minOpTimeRecovery document. Tests that it will be created after a migration - * only if the config server is a replica set and recovery will not run when disabled. - * - * This test restarts a shard and the shard will attempt to read a document that was saved before - * the restart, so it cannot be run on ephemeral storage engines. - * @tags: [requires_persistence] + * Tests that the minOpTimeRecovery document will be created after a migration. */ (function() { "use strict"; - var runTest = function(withRecovery) { - var st = new ShardingTest({shards: 2}); + var st = new ShardingTest({shards: 2}); - var testDB = st.s.getDB('test'); - testDB.adminCommand({enableSharding: 'test'}); - st.ensurePrimaryShard('test', 'shard0000'); - testDB.adminCommand({shardCollection: 'test.user', key: {x: 1}}); + var testDB = st.s.getDB('test'); + testDB.adminCommand({enableSharding: 'test'}); + st.ensurePrimaryShard('test', 'shard0000'); + testDB.adminCommand({shardCollection: 'test.user', key: {x: 1}}); - var opTimeBeforeMigrate = null; - if (st.configRS) { - var priConn = st.configRS.getPrimary(); - var replStatus = priConn.getDB('admin').runCommand({replSetGetStatus: 1}); - replStatus.members.forEach(function(memberState) { - if (memberState.state == 1) { // if primary - opTimeBeforeMigrate = memberState.optime; - - assert.neq(null, opTimeBeforeMigrate); - assert.neq(null, opTimeBeforeMigrate.ts); - assert.neq(null, opTimeBeforeMigrate.t); - } - }); - } - - testDB.adminCommand({moveChunk: 'test.user', find: {x: 0}, to: 'shard0001'}); - - var shardAdmin = st.d0.getDB('admin'); - var doc = shardAdmin.system.version.findOne(); - - if (st.configRS) { - assert.neq(null, doc); - assert.eq('minOpTimeRecovery', doc._id); - assert.eq(st.configRS.getURL(), doc.configsvrConnectionString); - assert.eq('shard0000', doc.shardName); - assert.gt(doc.minOpTime.ts.getTime(), 0); - } else { - assert.eq(null, doc); + var priConn = st.configRS.getPrimary(); + var replStatus = priConn.getDB('admin').runCommand({replSetGetStatus: 1}); + replStatus.members.forEach(function(memberState) { + if (memberState.state == 1) { // if primary + assert.neq(null, memberState.optime); + assert.neq(null, memberState.optime.ts); + assert.neq(null, memberState.optime.t); } + }); - var restartCmdLineOptions = Object.merge(st.d0.fullOptions, { - setParameter: 'recoverShardingState=' + (withRecovery ? 'true' : 'false'), - restart: true - }); + testDB.adminCommand({moveChunk: 'test.user', find: {x: 0}, to: 'shard0001'}); - // Restart the shard that donated a chunk to trigger the optime recovery logic. - st.stopMongod(0); - var newMongod = MongoRunner.runMongod(restartCmdLineOptions); - var shardingSection = newMongod.getDB('admin').runCommand({serverStatus: 1}).sharding; - - if (st.configRS && withRecovery) { - assert.neq(null, shardingSection); - - // Confirm that the config server string points to an actual config server replica set. - var configConnStr = shardingSection.configsvrConnectionString; - var configConn = new Mongo(configConnStr); - var configIsMaster = configConn.getDB('admin').runCommand({isMaster: 1}); - assert.gt(configConnStr.indexOf('/'), 0); - assert.eq(1, configIsMaster.configsvr); // If it's a shard, this field won't exist. - - var configOpTimeObj = shardingSection.lastSeenConfigServerOpTime; - assert.neq(null, configOpTimeObj); - assert.gte(configOpTimeObj.ts.getTime(), opTimeBeforeMigrate.ts.getTime()); - assert.gte(configOpTimeObj.t, opTimeBeforeMigrate.t); - } else { - assert.eq(null, shardingSection); - } + var shardAdmin = st.d0.getDB('admin'); + var minOpTimeRecoveryDoc = shardAdmin.system.version.findOne({_id: 'minOpTimeRecovery'}); - MongoRunner.stopMongod(newMongod.port); - st.stop(); - }; + assert.neq(null, minOpTimeRecoveryDoc); + assert.eq('minOpTimeRecovery', minOpTimeRecoveryDoc._id); + assert.eq(st.configRS.getURL(), minOpTimeRecoveryDoc.configsvrConnectionString); + assert.eq('shard0000', minOpTimeRecoveryDoc.shardName); + assert.gt(minOpTimeRecoveryDoc.minOpTime.ts.getTime(), 0); - runTest(true); - runTest(false); + st.stop(); })(); diff --git a/jstests/sharding/shard_aware_on_add_shard.js b/jstests/sharding/shard_aware_on_add_shard.js new file mode 100644 index 00000000000..a616f03bcc3 --- /dev/null +++ b/jstests/sharding/shard_aware_on_add_shard.js @@ -0,0 +1,56 @@ +/** + * Tests that the addShard process initializes sharding awareness on an added standalone or + * replica set shard that was started with --shardsvr. + */ + +(function() { + "use strict"; + + var waitForIsMaster = function(conn) { + assert.soon(function() { + var res = conn.getDB('admin').runCommand({isMaster: 1}); + return res.ismaster; + }); + }; + + var checkShardingStateInitialized = function(conn, configConnStr, shardName) { + var res = conn.getDB('admin').runCommand({shardingState: 1}); + assert.commandWorked(res); + assert(res.enabled); + assert.eq(configConnStr, res.configServer); + assert.eq(shardName, res.shardName); + // TODO SERVER-23096: How should the clusterId be obtained externally? + // assert.eq(clusterId, res.clusterId); + }; + + // Create the cluster to test adding shards to. + var st = new ShardingTest({shards: 1}); + + // Add a shard that is a standalone mongod. + + var standaloneConn = MongoRunner.runMongod({shardsvr: ''}); + waitForIsMaster(standaloneConn); + + jsTest.log("Going to add standalone as shard: " + standaloneConn); + var newShardName = "newShard"; + assert.commandWorked(st.s.adminCommand({addShard: standaloneConn.name, name: newShardName})); + checkShardingStateInitialized(standaloneConn, st.configRS.getURL(), newShardName); + + MongoRunner.stopMongod(standaloneConn.port); + + // Add a shard that is a replica set. + + var replTest = new ReplSetTest({nodes: 1}); + replTest.startSet({shardsvr: ''}); + replTest.initiate(); + waitForIsMaster(replTest.getPrimary()); + + jsTest.log("Going to add replica set as shard: " + replTest); + assert.commandWorked(st.s.adminCommand({addShard: replTest.getURL(), name: replTest.getURL()})); + checkShardingStateInitialized(replTest.getPrimary(), st.configRS.getURL(), replTest.getURL()); + + replTest.stopSet(); + + st.stop(); + +})(); diff --git a/jstests/sharding/shard_identity_config_update.js b/jstests/sharding/shard_identity_config_update.js index 678a04c79fa..6d3b01deb41 100644 --- a/jstests/sharding/shard_identity_config_update.js +++ b/jstests/sharding/shard_identity_config_update.js @@ -10,22 +10,7 @@ var st = new ShardingTest({shards: {rs0: {nodes: 2}}}); - // TODO: remove crutch once all node shard aware is complete. - var setupShardIdentity = function(conn, configConnStr) { - var shardIdentityDoc = { - _id: 'shardIdentity', - configsvrConnectionString: configConnStr, - shardName: 'newShard', - clusterId: ObjectId() - }; - - var res = conn.getDB('admin').system.version.update( - {_id: 'shardIdentity'}, shardIdentityDoc, true); - assert.eq(1, res.nUpserted); - }; - var shardPri = st.rs0.getPrimary(); - setupShardIdentity(st.rs0.getPrimary(), st.configRS.getURL()); // Note: Adding new replica set member by hand because of SERVER-24011. diff --git a/jstests/sharding/sharding_state_after_stepdown.js b/jstests/sharding/sharding_state_after_stepdown.js deleted file mode 100644 index 6bd2f4927cc..00000000000 --- a/jstests/sharding/sharding_state_after_stepdown.js +++ /dev/null @@ -1,160 +0,0 @@ -// Tests the state of sharding data after a replica set reconfig -// -// This test involves restarting a single-node replica set, so cannot be run on ephemeral storage -// engines. A single node replica set that is using an ephemeral engine will have no knowledge of -// the replica set configuration once restarted, so will not elect itself as primary. -// @tags: [requires_persistence] - -(function() { - 'use strict'; - - var st = new ShardingTest({shards: 2, mongos: 1, other: {rs: true, rsOptions: {nodes: 1}}}); - - var mongos = st.s0; - var admin = mongos.getDB("admin"); - var shards = mongos.getCollection("config.shards").find().toArray(); - - var coll = mongos.getCollection("foo.bar"); - var collSharded = mongos.getCollection("foo.barSharded"); - - assert.commandWorked(admin.runCommand({enableSharding: coll.getDB() + ""})); - st.ensurePrimaryShard(coll.getDB() + "", shards[0]._id); - assert.commandWorked( - admin.runCommand({shardCollection: collSharded.toString(), key: {_id: 1}})); - assert.commandWorked( - admin.runCommand({moveChunk: collSharded.toString(), find: {_id: 0}, to: shards[1]._id})); - - assert.writeOK(coll.insert({some: "data"})); - assert.writeOK(collSharded.insert({some: "data"})); - assert.eq(2, mongos.adminCommand({getShardVersion: collSharded.toString()}).version.t); - - st.printShardingStatus(); - - // Restart both primaries to reset our sharding data - var restartPrimaries = function() { - var rs0Primary = st.rs0.getPrimary(); - var rs1Primary = st.rs1.getPrimary(); - - st.rs0.stop(rs0Primary); - st.rs1.stop(rs1Primary); - - ReplSetTest.awaitRSClientHosts(mongos, [rs0Primary, rs1Primary], {ok: false}); - - st.rs0.start(rs0Primary, Object.extend(rs0Primary.savedOptions, {restart: true})); - st.rs1.start(rs1Primary, Object.extend(rs1Primary.savedOptions, {restart: true})); - - ReplSetTest.awaitRSClientHosts(mongos, [rs0Primary, rs1Primary], {ismaster: true}); - }; - - restartPrimaries(); - - // Sharding data gets initialized either when shards are hit by an unsharded query or if some - // metadata operation was run before the step down, which wrote a minOpTime recovery record - // (CSRS - // only). In this case we did a moveChunk above from shard0 to shard1, so we will have this - // record - // on shard0. - if (st.configRS) { - assert.neq( - "", st.rs0.getPrimary().adminCommand({getShardVersion: coll.toString()}).configServer); - } else { - assert.eq( - "", st.rs0.getPrimary().adminCommand({getShardVersion: coll.toString()}).configServer); - } - assert.eq("", - st.rs1.getPrimary().adminCommand({getShardVersion: coll.toString()}).configServer); - - // Doing a find only accesses the primary (rs0), which is already recovered. Ensure that the - // secondary still has no sharding knowledge. - assert.neq(null, coll.findOne({})); - assert.eq("", - st.rs1.getPrimary().adminCommand({getShardVersion: coll.toString()}).configServer); - - // - // - // Sharding data initialized when shards are hit by a sharded query - assert.neq(null, collSharded.findOne({})); - assert.neq("", - st.rs0.getPrimary().adminCommand({getShardVersion: coll.toString()}).configServer); - assert.neq("", - st.rs1.getPrimary().adminCommand({getShardVersion: coll.toString()}).configServer); - - // Stepdown both primaries to reset our sharding data - var stepDownPrimaries = function() { - - var rs0Primary = st.rs0.getPrimary(); - var rs1Primary = st.rs1.getPrimary(); - - try { - rs0Primary.adminCommand({replSetStepDown: 1000 * 1000, force: true}); - assert(false); - } catch (ex) { - // Expected connection exception, will check for stepdown later - } - - try { - rs1Primary.adminCommand({replSetStepDown: 1000 * 1000, force: true}); - assert(false); - } catch (ex) { - // Expected connection exception, will check for stepdown later - } - - ReplSetTest.awaitRSClientHosts(mongos, [rs0Primary, rs1Primary], {secondary: true}); - - assert.commandWorked(new Mongo(rs0Primary.host).adminCommand({replSetFreeze: 0})); - assert.commandWorked(new Mongo(rs1Primary.host).adminCommand({replSetFreeze: 0})); - - rs0Primary = st.rs0.getPrimary(); - rs1Primary = st.rs1.getPrimary(); - - // Flush connections to avoid transient issues with conn pooling - assert.commandWorked(rs0Primary.adminCommand({connPoolSync: true})); - assert.commandWorked(rs1Primary.adminCommand({connPoolSync: true})); - - ReplSetTest.awaitRSClientHosts(mongos, [rs0Primary, rs1Primary], {ismaster: true}); - }; - - stepDownPrimaries(); - - // - // - // No sharding metadata until shards are hit by a metadata operation - assert.eq({}, - st.rs0.getPrimary() - .adminCommand({getShardVersion: collSharded.toString(), fullMetadata: true}) - .metadata); - assert.eq({}, - st.rs1.getPrimary() - .adminCommand({getShardVersion: collSharded.toString(), fullMetadata: true}) - .metadata); - - // - // - // Metadata commands should enable sharding data implicitly - assert.commandWorked(mongos.adminCommand({split: collSharded.toString(), middle: {_id: 0}})); - assert.eq({}, - st.rs0.getPrimary() - .adminCommand({getShardVersion: collSharded.toString(), fullMetadata: true}) - .metadata); - assert.neq({}, - st.rs1.getPrimary() - .adminCommand({getShardVersion: collSharded.toString(), fullMetadata: true}) - .metadata); - - // - // - // MoveChunk command should enable sharding data implicitly on TO-shard - assert.commandWorked(mongos.adminCommand( - {moveChunk: collSharded.toString(), find: {_id: 0}, to: shards[0]._id})); - assert.neq({}, - st.rs0.getPrimary() - .adminCommand({getShardVersion: collSharded.toString(), fullMetadata: true}) - .metadata); - assert.neq({}, - st.rs1.getPrimary() - .adminCommand({getShardVersion: collSharded.toString(), fullMetadata: true}) - .metadata); - - st.stop(); - -})(); diff --git a/jstests/sharding/startup_with_all_configs_down.js b/jstests/sharding/startup_with_all_configs_down.js index 32609a04484..a85c9595bea 100644 --- a/jstests/sharding/startup_with_all_configs_down.js +++ b/jstests/sharding/startup_with_all_configs_down.js @@ -11,9 +11,10 @@ /** * Restarts the mongod backing the specified shard instance, without restarting the mongobridge. */ - function restartShard(shard) { + function restartShard(shard, waitForConnect) { MongoRunner.stopMongod(shard); shard.restart = true; + shard.waitForConnect = waitForConnect; MongoRunner.runMongod(shard); } @@ -50,7 +51,7 @@ }); jsTestLog("Restarting a shard while there are no config servers up"); - restartShard(st.shard1); + restartShard(st.shard1, false); jsTestLog("Queries should fail because the shard can't initialize sharding state"); var error = assert.throws(function() { @@ -67,8 +68,8 @@ // TODO: SERVER-23192 - restart all shards and mongos because their replica set monitor has // deemed the CSRS config server set as unusable. - restartShard(st.shard0); - restartShard(st.shard1); + restartShard(st.shard0, true); + restartShard(st.shard1, true); st.restartMongos(0); jsTestLog("Queries against the original mongos should work again"); diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index c622e482fc7..9b709976bd1 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -37,6 +37,7 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/s/write_ops/batch_write_types', '$BUILD_DIR/mongo/bson/util/bson_extract', '$BUILD_DIR/mongo/s/common', ], diff --git a/src/mongo/db/s/type_shard_identity.cpp b/src/mongo/db/s/type_shard_identity.cpp index 7ef675ad334..d34f4975cfa 100644 --- a/src/mongo/db/s/type_shard_identity.cpp +++ b/src/mongo/db/s/type_shard_identity.cpp @@ -32,6 +32,8 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/util/bson_extract.h" +#include "mongo/s/write_ops/batched_update_document.h" +#include "mongo/s/write_ops/batched_update_request.h" #include "mongo/util/assert_util.h" namespace mongo { @@ -134,7 +136,10 @@ Status ShardIdentityType::validate() const { return {ErrorCodes::NoSuchKey, str::stream() << "missing " << shardName() << " field"}; } - if (!_clusterId || _clusterId->isSet()) { + // TODO SERVER-23096: Once ShardRegistry::_clusterId is loaded from the config servers rather + // than initialized in the ShardRegistry constructor in each process, the isSet() check can + // be re-added. + if (!_clusterId /*|| !_clusterId->isSet()*/) { return {ErrorCodes::NoSuchKey, str::stream() << "missing " << clusterId() << " field"}; } @@ -204,6 +209,31 @@ void ShardIdentityType::setClusterId(OID clusterId) { _clusterId = std::move(clusterId); } +std::unique_ptr<BatchedUpdateRequest> ShardIdentityType::createUpsertForAddShard() const { + invariant(validate().isOK()); + + std::unique_ptr<BatchedUpdateDocument> updateDoc(new BatchedUpdateDocument()); + + BSONObjBuilder query; + query.append("_id", "shardIdentity"); + query.append("shardName", getShardName()); + query.append("clusterId", getClusterId()); + updateDoc->setQuery(query.obj()); + + BSONObjBuilder update; + BSONObjBuilder setConfigBuilder(update.subobjStart("$set")); + setConfigBuilder.append(configsvrConnString(), getConfigsvrConnString().toString()); + setConfigBuilder.doneFast(); + updateDoc->setUpdateExpr(update.obj()); + + updateDoc->setUpsert(true); + + std::unique_ptr<BatchedUpdateRequest> updateRequest(new BatchedUpdateRequest()); + updateRequest->addToUpdates(updateDoc.release()); + + return updateRequest; +} + BSONObj ShardIdentityType::createConfigServerUpdateObject(const std::string& newConnString) { BSONObjBuilder builder; BSONObjBuilder setConfigBuilder(builder.subobjStart("$set")); diff --git a/src/mongo/db/s/type_shard_identity.h b/src/mongo/db/s/type_shard_identity.h index 30e2c832dff..9c51913fe0f 100644 --- a/src/mongo/db/s/type_shard_identity.h +++ b/src/mongo/db/s/type_shard_identity.h @@ -32,6 +32,7 @@ #include "mongo/client/connection_string.h" #include "mongo/db/jsobj.h" +#include "mongo/s/write_ops/batched_update_request.h" namespace mongo { @@ -40,6 +41,8 @@ namespace mongo { */ class ShardIdentityType { public: + ShardIdentityType() = default; + // The _id value for this document type. static const std::string IdName; @@ -78,6 +81,13 @@ public: void setClusterId(OID clusterId); /** + * Returns an update object that can be used to insert a shardIdentity doc on a shard or update + * the existing shardIdentity doc's configsvrConnString (if the _id, shardName, and clusterId + * match those in this ShardIdentityType instance). + */ + std::unique_ptr<BatchedUpdateRequest> createUpsertForAddShard() const; + + /** * Returns an update object that can be used to update the config server field of the * shardIdentity document with the new connection string. */ diff --git a/src/mongo/s/catalog/replset/SConscript b/src/mongo/s/catalog/replset/SConscript index 582b9a26aea..3d482b50403 100644 --- a/src/mongo/s/catalog/replset/SConscript +++ b/src/mongo/s/catalog/replset/SConscript @@ -66,6 +66,7 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/db/repl/read_concern_args', + '$BUILD_DIR/mongo/db/s/type_shard_identity', '$BUILD_DIR/mongo/executor/network_interface', '$BUILD_DIR/mongo/s/catalog/dist_lock_manager', '$BUILD_DIR/mongo/s/client/sharding_client', diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp index 3c9b7ce9809..8237359140e 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp @@ -49,6 +49,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/read_concern_args.h" +#include "mongo/db/s/type_shard_identity.h" #include "mongo/executor/network_interface.h" #include "mongo/executor/task_executor.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -197,9 +198,10 @@ StatusWith<ShardType> CatalogManagerReplicaSet::_validateHostAsShard( // Check for mongos and older version mongod connections, and whether the hosts // can be found for the user specified replset. - auto cmdStatus = _runCommandForAddShard(txn, targeter.get(), "admin", BSON("isMaster" << 1)); - if (!cmdStatus.isOK()) { - if (cmdStatus == ErrorCodes::RPCProtocolNegotiationFailed) { + auto swCommandResponse = + _runCommandForAddShard(txn, targeter.get(), "admin", BSON("isMaster" << 1)); + if (!swCommandResponse.isOK()) { + if (swCommandResponse.getStatus() == ErrorCodes::RPCProtocolNegotiationFailed) { // Mongos to mongos commands are no longer supported in the wire protocol // (because mongos does not support OP_COMMAND), similarly for a new mongos // and an old mongod. So the call will fail in such cases. @@ -211,19 +213,20 @@ StatusWith<ShardType> CatalogManagerReplicaSet::_validateHostAsShard( << " likely because it contains a node that is a mongos or an old" << " version of mongod."}; } else { - return cmdStatus.getStatus(); + return swCommandResponse.getStatus(); } } // Check for a command response error - BSONObj resIsMaster = cmdStatus.getValue(); - Status resIsMasterStatus = getStatusFromCommandResult(resIsMaster); + auto resIsMasterStatus = std::move(swCommandResponse.getValue().commandStatus); if (!resIsMasterStatus.isOK()) { return {resIsMasterStatus.code(), str::stream() << "Error running isMaster against " << shardConn->toString() << ": " << causedBy(resIsMasterStatus)}; } + auto resIsMaster = std::move(swCommandResponse.getValue().response); + // Check whether there is a master. If there isn't, the replica set may not have been // initiated. If the connection is a standalone, it will return true for isMaster. bool isMaster; @@ -350,19 +353,19 @@ StatusWith<std::vector<std::string>> CatalogManagerReplicaSet::_getDBNamesListFr shardRegistry->createConnection(connectionString).release()}; invariant(shardConn); - auto cmdStatus = _runCommandForAddShard( + auto swCommandResponse = _runCommandForAddShard( txn, shardConn->getTargeter().get(), "admin", BSON("listDatabases" << 1)); - if (!cmdStatus.isOK()) { - return cmdStatus.getStatus(); + if (!swCommandResponse.isOK()) { + return swCommandResponse.getStatus(); } - const BSONObj& cmdResult = cmdStatus.getValue(); - - Status cmdResultStatus = getStatusFromCommandResult(cmdResult); - if (!cmdResultStatus.isOK()) { - return cmdResultStatus; + auto cmdStatus = std::move(swCommandResponse.getValue().commandStatus); + if (!cmdStatus.isOK()) { + return cmdStatus; } + auto cmdResult = std::move(swCommandResponse.getValue().response); + vector<string> dbNames; for (const auto& dbEntry : cmdResult["databases"].Obj()) { @@ -408,7 +411,7 @@ void CatalogManagerReplicaSet::shutDown(OperationContext* txn) { _executorForAddShard->join(); } -StatusWith<BSONObj> CatalogManagerReplicaSet::_runCommandForAddShard( +StatusWith<Shard::CommandResponse> CatalogManagerReplicaSet::_runCommandForAddShard( OperationContext* txn, RemoteCommandTargeter* targeter, const std::string& dbName, @@ -421,12 +424,12 @@ StatusWith<BSONObj> CatalogManagerReplicaSet::_runCommandForAddShard( executor::RemoteCommandRequest request( host.getValue(), dbName, cmdObj, rpc::makeEmptyMetadata(), Seconds(30)); - StatusWith<executor::RemoteCommandResponse> responseStatus = + StatusWith<executor::RemoteCommandResponse> swResponse = Status(ErrorCodes::InternalError, "Internal error running command"); auto callStatus = _executorForAddShard->scheduleRemoteCommand( - request, [&responseStatus](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { - responseStatus = args.response; + request, [&swResponse](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { + swResponse = args.response; }); if (!callStatus.isOK()) { return callStatus.getStatus(); @@ -435,11 +438,22 @@ StatusWith<BSONObj> CatalogManagerReplicaSet::_runCommandForAddShard( // Block until the command is carried out _executorForAddShard->wait(callStatus.getValue()); - if (!responseStatus.isOK()) { - return responseStatus.getStatus(); + if (!swResponse.isOK()) { + if (swResponse.getStatus().compareCode(ErrorCodes::ExceededTimeLimit)) { + LOG(0) << "Operation for addShard timed out with status " << swResponse.getStatus(); + } + return swResponse.getStatus(); } - return responseStatus.getValue().data.getOwned(); + BSONObj responseObj = swResponse.getValue().data.getOwned(); + BSONObj responseMetadata = swResponse.getValue().metadata.getOwned(); + Status commandStatus = getStatusFromCommandResult(responseObj); + Status writeConcernStatus = getWriteConcernStatusFromCommandResult(responseObj); + + return Shard::CommandResponse(std::move(responseObj), + std::move(responseMetadata), + std::move(commandStatus), + std::move(writeConcernStatus)); } StatusWith<string> CatalogManagerReplicaSet::addShard(OperationContext* txn, @@ -497,7 +511,44 @@ StatusWith<string> CatalogManagerReplicaSet::addShard(OperationContext* txn, shardType.setMaxSizeMB(maxSize); } - log() << "going to add shard: " << shardType.toString(); + ShardIdentityType shardIdentity; + shardIdentity.setConfigsvrConnString( + Grid::get(txn)->shardRegistry()->getConfigServerConnectionString()); + shardIdentity.setShardName(shardType.getName()); + shardIdentity.setClusterId(Grid::get(txn)->shardRegistry()->getClusterId()); + auto validateStatus = shardIdentity.validate(); + if (!validateStatus.isOK()) { + return validateStatus; + } + + log() << "going to insert shardIdentity document into shard: " << shardIdentity.toString(); + + auto updateRequest = shardIdentity.createUpsertForAddShard(); + BatchedCommandRequest commandRequest(updateRequest.release()); + commandRequest.setNS(NamespaceString::kConfigCollectionNamespace); + commandRequest.setWriteConcern(kMajorityWriteConcern.toBSON()); + + const std::shared_ptr<Shard> shardConn{ + Grid::get(txn)->shardRegistry()->createConnection(shardConnectionString)}; + invariant(shardConn); + auto targeter = shardConn->getTargeter(); + + auto swCommandResponse = + _runCommandForAddShard(txn, targeter.get(), "admin", commandRequest.toBSON()); + + if (!swCommandResponse.isOK()) { + return swCommandResponse.getStatus(); + } + + auto commandResponse = std::move(swCommandResponse.getValue()); + + BatchedCommandResponse batchResponse; + auto batchResponseStatus = _processBatchWriteResponse(commandResponse, &batchResponse); + if (!batchResponseStatus.isOK()) { + return batchResponseStatus; + } + + log() << "going to insert new entry for shard into config.shards: " << shardType.toString(); Status result = insertConfigDocument(txn, ShardType::ConfigNS, shardType.toBSON()); if (!result.isOK()) { diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h index b7251dc2a37..a26bf941ab2 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h @@ -276,10 +276,10 @@ private: * Runs a command against a "shard" that is not yet in the cluster and thus not present in the * ShardRegistry. */ - StatusWith<BSONObj> _runCommandForAddShard(OperationContext* txn, - RemoteCommandTargeter* targeter, - const std::string& dbName, - const BSONObj& cmdObj); + StatusWith<Shard::CommandResponse> _runCommandForAddShard(OperationContext* txn, + RemoteCommandTargeter* targeter, + const std::string& dbName, + const BSONObj& cmdObj); StatusWith<repl::OpTimeWith<std::vector<BSONObj>>> _exhaustiveFindOnConfig( OperationContext* txn, diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp index 54f414c79cb..5c3e1ccfb06 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp @@ -32,10 +32,12 @@ #include <vector> +#include "mongo/client/connection_string.h" #include "mongo/client/remote_command_targeter_factory_mock.h" #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/commands.h" #include "mongo/db/query/query_request.h" +#include "mongo/db/s/type_shard_identity.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/s/catalog/replset/catalog_manager_replica_set.h" @@ -75,7 +77,16 @@ protected: CatalogManagerReplSetTestFixture::setUp(); getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567")); - configTargeter()->setFindHostReturnValue(configHost); + + configTargeter()->setConnectionStringReturnValue(_configConnStr); + + _configHost = _configConnStr.getServers().front(); + configTargeter()->setFindHostReturnValue(_configHost); + + // TODO SERVER-23096: Change this to OID::gen() once clusterId is loaded from the config + // servers into the ShardRegistry instead of created by the ShardRegistry within each + // process. + _clusterId = OID(); } /** @@ -127,12 +138,77 @@ protected: } /** + * Waits for a request for the shardIdentity document to be upserted into a shard from the + * config server on addShard. + */ + void expectShardIdentityUpsert(const HostAndPort& expectedHost, + const std::string& expectedShardName) { + + ShardIdentityType expectedShardIdentity; + expectedShardIdentity.setShardName(expectedShardName); + expectedShardIdentity.setClusterId(_clusterId); + expectedShardIdentity.setConfigsvrConnString(_configConnStr); + invariant(expectedShardIdentity.validate().isOK()); + + auto updateRequest = expectedShardIdentity.createUpsertForAddShard(); + expectUpdates(expectedHost, + NamespaceString(NamespaceString::kConfigCollectionNamespace), + updateRequest.get()); + } + + /** + * Waits for a set of batched updates and ensures that the host, namespace, and updates exactly + * match what's expected. Responds with a success status. + */ + void expectUpdates(const HostAndPort& expectedHost, + const NamespaceString& expectedNss, + BatchedUpdateRequest* expectedBatchedUpdates) { + onCommandForAddShard([&](const RemoteCommandRequest& request) { + + ASSERT_EQUALS(expectedHost, request.target); + + // Check that the db name in the request matches the expected db name. + ASSERT_EQUALS(expectedNss.db(), request.dbname); + + BatchedUpdateRequest actualBatchedUpdates; + std::string errmsg; + ASSERT_TRUE(actualBatchedUpdates.parseBSON(request.dbname, request.cmdObj, &errmsg)); + + // Check that the db and collection names in the BatchedUpdateRequest match the + // expected. + ASSERT_EQUALS(expectedNss, actualBatchedUpdates.getNS()); + + auto expectedUpdates = expectedBatchedUpdates->getUpdates(); + auto actualUpdates = actualBatchedUpdates.getUpdates(); + + ASSERT_EQUALS(expectedUpdates.size(), actualUpdates.size()); + + auto itExpected = expectedUpdates.begin(); + auto itActual = actualUpdates.begin(); + + for (; itActual != actualUpdates.end(); itActual++, itExpected++) { + ASSERT_EQ((*itExpected)->getUpsert(), (*itActual)->getUpsert()); + ASSERT_EQ((*itExpected)->getMulti(), (*itActual)->getMulti()); + ASSERT_EQ((*itExpected)->getQuery(), (*itActual)->getQuery()); + ASSERT_EQ((*itExpected)->getUpdateExpr(), (*itActual)->getUpdateExpr()); + } + + BatchedCommandResponse response; + response.setOk(true); + response.setNModified(1); + + return response.toBSON(); + }); + } + + + /** * Wait for a single update request and ensure that the items being updated exactly match the * expected items. Responds with a success status. */ void expectDatabaseUpdate(const DatabaseType& dbtExpected) { onCommand([this, &dbtExpected](const RemoteCommandRequest& request) { - ASSERT_EQ(request.target, configHost); + ASSERT_EQ(request.target, _configHost); ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata); BatchedUpdateRequest actualBatchedUpdate; @@ -162,10 +238,10 @@ protected: */ void expectAddShardChangeLog(const std::string& shardName, const std::string& shardHost) { // Expect the change log collection to be created - expectChangeLogCreate(configHost, BSON("ok" << 1)); + expectChangeLogCreate(_configHost, BSON("ok" << 1)); // Expect the change log operation - expectChangeLogInsert(configHost, + expectChangeLogInsert(_configHost, network()->now(), "addShard", "", @@ -177,7 +253,7 @@ protected: // Do it twice when there is no response set because getDatabase retries if it can't // find a database onFindCommand([&](const RemoteCommandRequest& request) { - ASSERT_EQ(request.target, configHost); + ASSERT_EQ(request.target, _configHost); if (i == 0) { ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata); } else if (i == 1) { @@ -205,7 +281,11 @@ protected: } } - const HostAndPort configHost{HostAndPort("ConfigHost:23456")}; + const ConnectionString _configConnStr{ConnectionString::forReplicaSet( + "configRS", + {HostAndPort("host1:23456"), HostAndPort("host2:23456"), HostAndPort("host3:23456")})}; + HostAndPort _configHost; + OID _clusterId; }; TEST_F(AddShardTest, Standalone) { @@ -250,7 +330,10 @@ TEST_F(AddShardTest, Standalone) { expectGetDatabase("TestDB1", boost::none); expectGetDatabase("TestDB2", boost::none); - // The new shard is being inserted + // The shardIdentity doc inserted into the config.version collection on the shard. + expectShardIdentityUpsert(shardTarget, expectedShardName); + + // The shard doc inserted into the config.shards collection on the config server. ShardType expectedShard; expectedShard.setName(expectedShardName); expectedShard.setHost("StandaloneHost:12345"); @@ -347,7 +430,10 @@ TEST_F(AddShardTest, StandaloneGenerateName) { return vector<BSONObj>{existingShard.toBSON()}; }); - // The new shard is being inserted + // The shardIdentity doc inserted into the config.version collection on the shard. + expectShardIdentityUpsert(shardTarget, expectedShardName); + + // The shard doc inserted into the config.shards collection on the config server. ShardType expectedShard; expectedShard.setName(expectedShardName); expectedShard.setHost(shardTarget.toString()); @@ -784,6 +870,10 @@ TEST_F(AddShardTest, ReAddExistingShard) { expectGetDatabase("shardDB", boost::none); + // The shardIdentity doc inserted into the config.version collection on the shard. + expectShardIdentityUpsert(shardTarget, expectedShardName); + + // The shard doc inserted into the config.shards collection on the config server. ShardType newShard; newShard.setName(expectedShardName); newShard.setMaxSizeMB(100); @@ -849,6 +939,10 @@ TEST_F(AddShardTest, SuccessfullyAddReplicaSet) { expectGetDatabase("shardDB", boost::none); + // The shardIdentity doc inserted into the config.version collection on the shard. + expectShardIdentityUpsert(shardTarget, expectedShardName); + + // The shard doc inserted into the config.shards collection on the config server. ShardType newShard; newShard.setName(expectedShardName); newShard.setMaxSizeMB(100); @@ -906,6 +1000,10 @@ TEST_F(AddShardTest, AddShardSucceedsEvenIfAddingDBsFromNewShardFails) { expectGetDatabase("shardDB", boost::none); + // The shardIdentity doc inserted into the config.version collection on the shard. + expectShardIdentityUpsert(shardTarget, expectedShardName); + + // The shard doc inserted into the config.shards collection on the config server. ShardType newShard; newShard.setName(expectedShardName); newShard.setMaxSizeMB(100); @@ -922,7 +1020,7 @@ TEST_F(AddShardTest, AddShardSucceedsEvenIfAddingDBsFromNewShardFails) { // Ensure that even if upserting the database discovered on the shard fails, the addShard // operation succeeds. onCommand([this, &shardDB](const RemoteCommandRequest& request) { - ASSERT_EQ(request.target, configHost); + ASSERT_EQ(request.target, _configHost); ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata); BatchedUpdateRequest actualBatchedUpdate; @@ -983,6 +1081,10 @@ TEST_F(AddShardTest, ReplicaSetExtraHostsDiscovered) { expectListDatabases(shardTarget, {}); + // The shardIdentity doc inserted into the config.version collection on the shard. + expectShardIdentityUpsert(shardTarget, expectedShardName); + + // The shard doc inserted into the config.shards collection on the config server. ShardType newShard; newShard.setName(expectedShardName); newShard.setMaxSizeMB(100); diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index fb9339b660c..11f0e7e7c2c 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -39,6 +39,7 @@ #include "mongo/client/connection_string.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/s/catalog/catalog_manager.h" +#include "mongo/s/catalog/type_config_version.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_connection.h" @@ -59,9 +60,12 @@ using std::string; using std::unique_ptr; using std::vector; +// TODO SERVER-23096: Initializing an empty _clusterId is a temporary hack. The _clusterId should +// be queried from the config servers on sharding initialization and passed to the ShardRegistry +// constructor. ShardRegistry::ShardRegistry(std::unique_ptr<ShardFactory> shardFactory, const ConnectionString& configServerCS) - : _shardFactory(std::move(shardFactory)), _data() { + : _shardFactory(std::move(shardFactory)), _clusterId(), _data() { _initConfigServerCS = configServerCS; } @@ -69,6 +73,10 @@ ConnectionString ShardRegistry::getConfigServerConnectionString() const { return getConfigShard()->getConnString(); } +const OID& ShardRegistry::getClusterId() const { + return _clusterId; +} + void ShardRegistry::rebuildConfigShard() { _data.rebuildConfigShard(_shardFactory.get()); invariant(_data.getConfigShard()); diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h index 89502ff507a..df6605b5e73 100644 --- a/src/mongo/s/client/shard_registry.h +++ b/src/mongo/s/client/shard_registry.h @@ -158,6 +158,11 @@ public: ConnectionString getConfigServerConnectionString() const; /** + * Returns the cluster id from the config shard. + */ + const OID& getClusterId() const; + + /** * Reloads the ShardRegistry based on the contents of the config server's config.shards * collection. Returns true if this call performed a reload and false if this call only waited * for another thread to perform the reload and did not actually reload. Because of this, it is @@ -241,6 +246,12 @@ private: */ ConnectionString _initConfigServerCS; + /** + * The id for the cluster, obtained from the config servers on sharding initialization. The + * config servers are the authority on the clusterId. + */ + const OID _clusterId; + ShardRegistryData _data; // Protects the _reloadState and _initConfigServerCS during startup. |