summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2016-06-13 11:02:02 -0400
committerEsha Maharishi <esha.maharishi@mongodb.com>2016-06-13 11:40:18 -0400
commit677c25dbc453e90c0d2ef6b77b4284ea34c1b949 (patch)
tree65fe3b62d3e681d9fc908b7b1cc2763f036a033e
parent5121174a62246efed908deb8757039f103a368cd (diff)
downloadmongo-677c25dbc453e90c0d2ef6b77b4284ea34c1b949.tar.gz
SERVER-24126 fix shared library compile
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos.yml2
-rw-r--r--jstests/sharding/min_optime_recovery.js94
-rw-r--r--jstests/sharding/shard_aware_on_add_shard.js56
-rw-r--r--jstests/sharding/shard_identity_config_update.js15
-rw-r--r--jstests/sharding/sharding_state_after_stepdown.js160
-rw-r--r--jstests/sharding/startup_with_all_configs_down.js9
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/type_shard_identity.cpp32
-rw-r--r--src/mongo/db/s/type_shard_identity.h10
-rw-r--r--src/mongo/s/catalog/replset/SConscript1
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp95
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.h8
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp120
-rw-r--r--src/mongo/s/client/shard_registry.cpp10
-rw-r--r--src/mongo/s/client/shard_registry.h11
16 files changed, 337 insertions, 288 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
index ef52c8a5ea7..d7c91283678 100644
--- a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
@@ -96,7 +96,6 @@ selector:
- jstests/sharding/startup_with_all_configs_down.js
- jstests/sharding/lagged_config_secondary.js
- jstests/sharding/autodiscover_config_rs_from_secondary.js
- - jstests/sharding/sharding_state_after_stepdown.js
- jstests/sharding/rs_stepdown_and_pooling.js
- jstests/sharding/mongos_no_replica_set_refresh.js
- jstests/sharding/primary_config_server_blackholed_from_mongos.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos.yml
index e92c1d84fbc..0e8c3f150ff 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos.yml
@@ -7,7 +7,9 @@ selector:
- jstests/sharding/error_propagation.js
# New features for v3.4
- jstests/sharding/shard_aware_init.js
+ - jstests/sharding/shard_aware_on_add_shard.js
- jstests/sharding/shard_aware_primary_failover.js
+ - jstests/sharding/shard_identity_config_update.js
# v3.4 changes $in shard key extraction.
- jstests/sharding/upsert_sharded.js
- jstests/sharding/update_sharded.js
diff --git a/jstests/sharding/min_optime_recovery.js b/jstests/sharding/min_optime_recovery.js
index 69ccc78f02c..4f8595feb69 100644
--- a/jstests/sharding/min_optime_recovery.js
+++ b/jstests/sharding/min_optime_recovery.js
@@ -1,85 +1,37 @@
/**
- * Basic testing for minOpTimeRecovery document. Tests that it will be created after a migration
- * only if the config server is a replica set and recovery will not run when disabled.
- *
- * This test restarts a shard and the shard will attempt to read a document that was saved before
- * the restart, so it cannot be run on ephemeral storage engines.
- * @tags: [requires_persistence]
+ * Tests that the minOpTimeRecovery document will be created after a migration.
*/
(function() {
"use strict";
- var runTest = function(withRecovery) {
- var st = new ShardingTest({shards: 2});
+ var st = new ShardingTest({shards: 2});
- var testDB = st.s.getDB('test');
- testDB.adminCommand({enableSharding: 'test'});
- st.ensurePrimaryShard('test', 'shard0000');
- testDB.adminCommand({shardCollection: 'test.user', key: {x: 1}});
+ var testDB = st.s.getDB('test');
+ testDB.adminCommand({enableSharding: 'test'});
+ st.ensurePrimaryShard('test', 'shard0000');
+ testDB.adminCommand({shardCollection: 'test.user', key: {x: 1}});
- var opTimeBeforeMigrate = null;
- if (st.configRS) {
- var priConn = st.configRS.getPrimary();
- var replStatus = priConn.getDB('admin').runCommand({replSetGetStatus: 1});
- replStatus.members.forEach(function(memberState) {
- if (memberState.state == 1) { // if primary
- opTimeBeforeMigrate = memberState.optime;
-
- assert.neq(null, opTimeBeforeMigrate);
- assert.neq(null, opTimeBeforeMigrate.ts);
- assert.neq(null, opTimeBeforeMigrate.t);
- }
- });
- }
-
- testDB.adminCommand({moveChunk: 'test.user', find: {x: 0}, to: 'shard0001'});
-
- var shardAdmin = st.d0.getDB('admin');
- var doc = shardAdmin.system.version.findOne();
-
- if (st.configRS) {
- assert.neq(null, doc);
- assert.eq('minOpTimeRecovery', doc._id);
- assert.eq(st.configRS.getURL(), doc.configsvrConnectionString);
- assert.eq('shard0000', doc.shardName);
- assert.gt(doc.minOpTime.ts.getTime(), 0);
- } else {
- assert.eq(null, doc);
+ var priConn = st.configRS.getPrimary();
+ var replStatus = priConn.getDB('admin').runCommand({replSetGetStatus: 1});
+ replStatus.members.forEach(function(memberState) {
+ if (memberState.state == 1) { // if primary
+ assert.neq(null, memberState.optime);
+ assert.neq(null, memberState.optime.ts);
+ assert.neq(null, memberState.optime.t);
}
+ });
- var restartCmdLineOptions = Object.merge(st.d0.fullOptions, {
- setParameter: 'recoverShardingState=' + (withRecovery ? 'true' : 'false'),
- restart: true
- });
+ testDB.adminCommand({moveChunk: 'test.user', find: {x: 0}, to: 'shard0001'});
- // Restart the shard that donated a chunk to trigger the optime recovery logic.
- st.stopMongod(0);
- var newMongod = MongoRunner.runMongod(restartCmdLineOptions);
- var shardingSection = newMongod.getDB('admin').runCommand({serverStatus: 1}).sharding;
-
- if (st.configRS && withRecovery) {
- assert.neq(null, shardingSection);
-
- // Confirm that the config server string points to an actual config server replica set.
- var configConnStr = shardingSection.configsvrConnectionString;
- var configConn = new Mongo(configConnStr);
- var configIsMaster = configConn.getDB('admin').runCommand({isMaster: 1});
- assert.gt(configConnStr.indexOf('/'), 0);
- assert.eq(1, configIsMaster.configsvr); // If it's a shard, this field won't exist.
-
- var configOpTimeObj = shardingSection.lastSeenConfigServerOpTime;
- assert.neq(null, configOpTimeObj);
- assert.gte(configOpTimeObj.ts.getTime(), opTimeBeforeMigrate.ts.getTime());
- assert.gte(configOpTimeObj.t, opTimeBeforeMigrate.t);
- } else {
- assert.eq(null, shardingSection);
- }
+ var shardAdmin = st.d0.getDB('admin');
+ var minOpTimeRecoveryDoc = shardAdmin.system.version.findOne({_id: 'minOpTimeRecovery'});
- MongoRunner.stopMongod(newMongod.port);
- st.stop();
- };
+ assert.neq(null, minOpTimeRecoveryDoc);
+ assert.eq('minOpTimeRecovery', minOpTimeRecoveryDoc._id);
+ assert.eq(st.configRS.getURL(), minOpTimeRecoveryDoc.configsvrConnectionString);
+ assert.eq('shard0000', minOpTimeRecoveryDoc.shardName);
+ assert.gt(minOpTimeRecoveryDoc.minOpTime.ts.getTime(), 0);
- runTest(true);
- runTest(false);
+ st.stop();
})();
diff --git a/jstests/sharding/shard_aware_on_add_shard.js b/jstests/sharding/shard_aware_on_add_shard.js
new file mode 100644
index 00000000000..a616f03bcc3
--- /dev/null
+++ b/jstests/sharding/shard_aware_on_add_shard.js
@@ -0,0 +1,56 @@
+/**
+ * Tests that the addShard process initializes sharding awareness on an added standalone or
+ * replica set shard that was started with --shardsvr.
+ */
+
+(function() {
+ "use strict";
+
+ var waitForIsMaster = function(conn) {
+ assert.soon(function() {
+ var res = conn.getDB('admin').runCommand({isMaster: 1});
+ return res.ismaster;
+ });
+ };
+
+ var checkShardingStateInitialized = function(conn, configConnStr, shardName) {
+ var res = conn.getDB('admin').runCommand({shardingState: 1});
+ assert.commandWorked(res);
+ assert(res.enabled);
+ assert.eq(configConnStr, res.configServer);
+ assert.eq(shardName, res.shardName);
+ // TODO SERVER-23096: How should the clusterId be obtained externally?
+ // assert.eq(clusterId, res.clusterId);
+ };
+
+ // Create the cluster to test adding shards to.
+ var st = new ShardingTest({shards: 1});
+
+ // Add a shard that is a standalone mongod.
+
+ var standaloneConn = MongoRunner.runMongod({shardsvr: ''});
+ waitForIsMaster(standaloneConn);
+
+ jsTest.log("Going to add standalone as shard: " + standaloneConn);
+ var newShardName = "newShard";
+ assert.commandWorked(st.s.adminCommand({addShard: standaloneConn.name, name: newShardName}));
+ checkShardingStateInitialized(standaloneConn, st.configRS.getURL(), newShardName);
+
+ MongoRunner.stopMongod(standaloneConn.port);
+
+ // Add a shard that is a replica set.
+
+ var replTest = new ReplSetTest({nodes: 1});
+ replTest.startSet({shardsvr: ''});
+ replTest.initiate();
+ waitForIsMaster(replTest.getPrimary());
+
+ jsTest.log("Going to add replica set as shard: " + replTest);
+ assert.commandWorked(st.s.adminCommand({addShard: replTest.getURL(), name: replTest.getURL()}));
+ checkShardingStateInitialized(replTest.getPrimary(), st.configRS.getURL(), replTest.getURL());
+
+ replTest.stopSet();
+
+ st.stop();
+
+})();
diff --git a/jstests/sharding/shard_identity_config_update.js b/jstests/sharding/shard_identity_config_update.js
index 678a04c79fa..6d3b01deb41 100644
--- a/jstests/sharding/shard_identity_config_update.js
+++ b/jstests/sharding/shard_identity_config_update.js
@@ -10,22 +10,7 @@
var st = new ShardingTest({shards: {rs0: {nodes: 2}}});
- // TODO: remove crutch once all node shard aware is complete.
- var setupShardIdentity = function(conn, configConnStr) {
- var shardIdentityDoc = {
- _id: 'shardIdentity',
- configsvrConnectionString: configConnStr,
- shardName: 'newShard',
- clusterId: ObjectId()
- };
-
- var res = conn.getDB('admin').system.version.update(
- {_id: 'shardIdentity'}, shardIdentityDoc, true);
- assert.eq(1, res.nUpserted);
- };
-
var shardPri = st.rs0.getPrimary();
- setupShardIdentity(st.rs0.getPrimary(), st.configRS.getURL());
// Note: Adding new replica set member by hand because of SERVER-24011.
diff --git a/jstests/sharding/sharding_state_after_stepdown.js b/jstests/sharding/sharding_state_after_stepdown.js
deleted file mode 100644
index 6bd2f4927cc..00000000000
--- a/jstests/sharding/sharding_state_after_stepdown.js
+++ /dev/null
@@ -1,160 +0,0 @@
-// Tests the state of sharding data after a replica set reconfig
-//
-// This test involves restarting a single-node replica set, so cannot be run on ephemeral storage
-// engines. A single node replica set that is using an ephemeral engine will have no knowledge of
-// the replica set configuration once restarted, so will not elect itself as primary.
-// @tags: [requires_persistence]
-
-(function() {
- 'use strict';
-
- var st = new ShardingTest({shards: 2, mongos: 1, other: {rs: true, rsOptions: {nodes: 1}}});
-
- var mongos = st.s0;
- var admin = mongos.getDB("admin");
- var shards = mongos.getCollection("config.shards").find().toArray();
-
- var coll = mongos.getCollection("foo.bar");
- var collSharded = mongos.getCollection("foo.barSharded");
-
- assert.commandWorked(admin.runCommand({enableSharding: coll.getDB() + ""}));
- st.ensurePrimaryShard(coll.getDB() + "", shards[0]._id);
- assert.commandWorked(
- admin.runCommand({shardCollection: collSharded.toString(), key: {_id: 1}}));
- assert.commandWorked(
- admin.runCommand({moveChunk: collSharded.toString(), find: {_id: 0}, to: shards[1]._id}));
-
- assert.writeOK(coll.insert({some: "data"}));
- assert.writeOK(collSharded.insert({some: "data"}));
- assert.eq(2, mongos.adminCommand({getShardVersion: collSharded.toString()}).version.t);
-
- st.printShardingStatus();
-
- // Restart both primaries to reset our sharding data
- var restartPrimaries = function() {
- var rs0Primary = st.rs0.getPrimary();
- var rs1Primary = st.rs1.getPrimary();
-
- st.rs0.stop(rs0Primary);
- st.rs1.stop(rs1Primary);
-
- ReplSetTest.awaitRSClientHosts(mongos, [rs0Primary, rs1Primary], {ok: false});
-
- st.rs0.start(rs0Primary, Object.extend(rs0Primary.savedOptions, {restart: true}));
- st.rs1.start(rs1Primary, Object.extend(rs1Primary.savedOptions, {restart: true}));
-
- ReplSetTest.awaitRSClientHosts(mongos, [rs0Primary, rs1Primary], {ismaster: true});
- };
-
- restartPrimaries();
-
- // Sharding data gets initialized either when shards are hit by an unsharded query or if some
- // metadata operation was run before the step down, which wrote a minOpTime recovery record
- // (CSRS
- // only). In this case we did a moveChunk above from shard0 to shard1, so we will have this
- // record
- // on shard0.
- if (st.configRS) {
- assert.neq(
- "", st.rs0.getPrimary().adminCommand({getShardVersion: coll.toString()}).configServer);
- } else {
- assert.eq(
- "", st.rs0.getPrimary().adminCommand({getShardVersion: coll.toString()}).configServer);
- }
- assert.eq("",
- st.rs1.getPrimary().adminCommand({getShardVersion: coll.toString()}).configServer);
-
- // Doing a find only accesses the primary (rs0), which is already recovered. Ensure that the
- // secondary still has no sharding knowledge.
- assert.neq(null, coll.findOne({}));
- assert.eq("",
- st.rs1.getPrimary().adminCommand({getShardVersion: coll.toString()}).configServer);
-
- //
- //
- // Sharding data initialized when shards are hit by a sharded query
- assert.neq(null, collSharded.findOne({}));
- assert.neq("",
- st.rs0.getPrimary().adminCommand({getShardVersion: coll.toString()}).configServer);
- assert.neq("",
- st.rs1.getPrimary().adminCommand({getShardVersion: coll.toString()}).configServer);
-
- // Stepdown both primaries to reset our sharding data
- var stepDownPrimaries = function() {
-
- var rs0Primary = st.rs0.getPrimary();
- var rs1Primary = st.rs1.getPrimary();
-
- try {
- rs0Primary.adminCommand({replSetStepDown: 1000 * 1000, force: true});
- assert(false);
- } catch (ex) {
- // Expected connection exception, will check for stepdown later
- }
-
- try {
- rs1Primary.adminCommand({replSetStepDown: 1000 * 1000, force: true});
- assert(false);
- } catch (ex) {
- // Expected connection exception, will check for stepdown later
- }
-
- ReplSetTest.awaitRSClientHosts(mongos, [rs0Primary, rs1Primary], {secondary: true});
-
- assert.commandWorked(new Mongo(rs0Primary.host).adminCommand({replSetFreeze: 0}));
- assert.commandWorked(new Mongo(rs1Primary.host).adminCommand({replSetFreeze: 0}));
-
- rs0Primary = st.rs0.getPrimary();
- rs1Primary = st.rs1.getPrimary();
-
- // Flush connections to avoid transient issues with conn pooling
- assert.commandWorked(rs0Primary.adminCommand({connPoolSync: true}));
- assert.commandWorked(rs1Primary.adminCommand({connPoolSync: true}));
-
- ReplSetTest.awaitRSClientHosts(mongos, [rs0Primary, rs1Primary], {ismaster: true});
- };
-
- stepDownPrimaries();
-
- //
- //
- // No sharding metadata until shards are hit by a metadata operation
- assert.eq({},
- st.rs0.getPrimary()
- .adminCommand({getShardVersion: collSharded.toString(), fullMetadata: true})
- .metadata);
- assert.eq({},
- st.rs1.getPrimary()
- .adminCommand({getShardVersion: collSharded.toString(), fullMetadata: true})
- .metadata);
-
- //
- //
- // Metadata commands should enable sharding data implicitly
- assert.commandWorked(mongos.adminCommand({split: collSharded.toString(), middle: {_id: 0}}));
- assert.eq({},
- st.rs0.getPrimary()
- .adminCommand({getShardVersion: collSharded.toString(), fullMetadata: true})
- .metadata);
- assert.neq({},
- st.rs1.getPrimary()
- .adminCommand({getShardVersion: collSharded.toString(), fullMetadata: true})
- .metadata);
-
- //
- //
- // MoveChunk command should enable sharding data implicitly on TO-shard
- assert.commandWorked(mongos.adminCommand(
- {moveChunk: collSharded.toString(), find: {_id: 0}, to: shards[0]._id}));
- assert.neq({},
- st.rs0.getPrimary()
- .adminCommand({getShardVersion: collSharded.toString(), fullMetadata: true})
- .metadata);
- assert.neq({},
- st.rs1.getPrimary()
- .adminCommand({getShardVersion: collSharded.toString(), fullMetadata: true})
- .metadata);
-
- st.stop();
-
-})();
diff --git a/jstests/sharding/startup_with_all_configs_down.js b/jstests/sharding/startup_with_all_configs_down.js
index 32609a04484..a85c9595bea 100644
--- a/jstests/sharding/startup_with_all_configs_down.js
+++ b/jstests/sharding/startup_with_all_configs_down.js
@@ -11,9 +11,10 @@
/**
* Restarts the mongod backing the specified shard instance, without restarting the mongobridge.
*/
- function restartShard(shard) {
+ function restartShard(shard, waitForConnect) {
MongoRunner.stopMongod(shard);
shard.restart = true;
+ shard.waitForConnect = waitForConnect;
MongoRunner.runMongod(shard);
}
@@ -50,7 +51,7 @@
});
jsTestLog("Restarting a shard while there are no config servers up");
- restartShard(st.shard1);
+ restartShard(st.shard1, false);
jsTestLog("Queries should fail because the shard can't initialize sharding state");
var error = assert.throws(function() {
@@ -67,8 +68,8 @@
// TODO: SERVER-23192 - restart all shards and mongos because their replica set monitor has
// deemed the CSRS config server set as unusable.
- restartShard(st.shard0);
- restartShard(st.shard1);
+ restartShard(st.shard0, true);
+ restartShard(st.shard1, true);
st.restartMongos(0);
jsTestLog("Queries against the original mongos should work again");
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index c622e482fc7..9b709976bd1 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -37,6 +37,7 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/s/write_ops/batch_write_types',
'$BUILD_DIR/mongo/bson/util/bson_extract',
'$BUILD_DIR/mongo/s/common',
],
diff --git a/src/mongo/db/s/type_shard_identity.cpp b/src/mongo/db/s/type_shard_identity.cpp
index 7ef675ad334..d34f4975cfa 100644
--- a/src/mongo/db/s/type_shard_identity.cpp
+++ b/src/mongo/db/s/type_shard_identity.cpp
@@ -32,6 +32,8 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/util/bson_extract.h"
+#include "mongo/s/write_ops/batched_update_document.h"
+#include "mongo/s/write_ops/batched_update_request.h"
#include "mongo/util/assert_util.h"
namespace mongo {
@@ -134,7 +136,10 @@ Status ShardIdentityType::validate() const {
return {ErrorCodes::NoSuchKey, str::stream() << "missing " << shardName() << " field"};
}
- if (!_clusterId || _clusterId->isSet()) {
+ // TODO SERVER-23096: Once ShardRegistry::_clusterId is loaded from the config servers rather
+ // than initialized in the ShardRegistry constructor in each process, the isSet() check can
+ // be re-added.
+ if (!_clusterId /*|| !_clusterId->isSet()*/) {
return {ErrorCodes::NoSuchKey, str::stream() << "missing " << clusterId() << " field"};
}
@@ -204,6 +209,31 @@ void ShardIdentityType::setClusterId(OID clusterId) {
_clusterId = std::move(clusterId);
}
+std::unique_ptr<BatchedUpdateRequest> ShardIdentityType::createUpsertForAddShard() const {
+ invariant(validate().isOK());
+
+ std::unique_ptr<BatchedUpdateDocument> updateDoc(new BatchedUpdateDocument());
+
+ BSONObjBuilder query;
+ query.append("_id", "shardIdentity");
+ query.append("shardName", getShardName());
+ query.append("clusterId", getClusterId());
+ updateDoc->setQuery(query.obj());
+
+ BSONObjBuilder update;
+ BSONObjBuilder setConfigBuilder(update.subobjStart("$set"));
+ setConfigBuilder.append(configsvrConnString(), getConfigsvrConnString().toString());
+ setConfigBuilder.doneFast();
+ updateDoc->setUpdateExpr(update.obj());
+
+ updateDoc->setUpsert(true);
+
+ std::unique_ptr<BatchedUpdateRequest> updateRequest(new BatchedUpdateRequest());
+ updateRequest->addToUpdates(updateDoc.release());
+
+ return updateRequest;
+}
+
BSONObj ShardIdentityType::createConfigServerUpdateObject(const std::string& newConnString) {
BSONObjBuilder builder;
BSONObjBuilder setConfigBuilder(builder.subobjStart("$set"));
diff --git a/src/mongo/db/s/type_shard_identity.h b/src/mongo/db/s/type_shard_identity.h
index 30e2c832dff..9c51913fe0f 100644
--- a/src/mongo/db/s/type_shard_identity.h
+++ b/src/mongo/db/s/type_shard_identity.h
@@ -32,6 +32,7 @@
#include "mongo/client/connection_string.h"
#include "mongo/db/jsobj.h"
+#include "mongo/s/write_ops/batched_update_request.h"
namespace mongo {
@@ -40,6 +41,8 @@ namespace mongo {
*/
class ShardIdentityType {
public:
+ ShardIdentityType() = default;
+
// The _id value for this document type.
static const std::string IdName;
@@ -78,6 +81,13 @@ public:
void setClusterId(OID clusterId);
/**
+ * Returns an update object that can be used to insert a shardIdentity doc on a shard or update
+ * the existing shardIdentity doc's configsvrConnString (if the _id, shardName, and clusterId
+ * match those in this ShardIdentityType instance).
+ */
+ std::unique_ptr<BatchedUpdateRequest> createUpsertForAddShard() const;
+
+ /**
* Returns an update object that can be used to update the config server field of the
* shardIdentity document with the new connection string.
*/
diff --git a/src/mongo/s/catalog/replset/SConscript b/src/mongo/s/catalog/replset/SConscript
index 582b9a26aea..3d482b50403 100644
--- a/src/mongo/s/catalog/replset/SConscript
+++ b/src/mongo/s/catalog/replset/SConscript
@@ -66,6 +66,7 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/repl/read_concern_args',
+ '$BUILD_DIR/mongo/db/s/type_shard_identity',
'$BUILD_DIR/mongo/executor/network_interface',
'$BUILD_DIR/mongo/s/catalog/dist_lock_manager',
'$BUILD_DIR/mongo/s/client/sharding_client',
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
index 3c9b7ce9809..8237359140e 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/read_concern_args.h"
+#include "mongo/db/s/type_shard_identity.h"
#include "mongo/executor/network_interface.h"
#include "mongo/executor/task_executor.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -197,9 +198,10 @@ StatusWith<ShardType> CatalogManagerReplicaSet::_validateHostAsShard(
// Check for mongos and older version mongod connections, and whether the hosts
// can be found for the user specified replset.
- auto cmdStatus = _runCommandForAddShard(txn, targeter.get(), "admin", BSON("isMaster" << 1));
- if (!cmdStatus.isOK()) {
- if (cmdStatus == ErrorCodes::RPCProtocolNegotiationFailed) {
+ auto swCommandResponse =
+ _runCommandForAddShard(txn, targeter.get(), "admin", BSON("isMaster" << 1));
+ if (!swCommandResponse.isOK()) {
+ if (swCommandResponse.getStatus() == ErrorCodes::RPCProtocolNegotiationFailed) {
// Mongos to mongos commands are no longer supported in the wire protocol
// (because mongos does not support OP_COMMAND), similarly for a new mongos
// and an old mongod. So the call will fail in such cases.
@@ -211,19 +213,20 @@ StatusWith<ShardType> CatalogManagerReplicaSet::_validateHostAsShard(
<< " likely because it contains a node that is a mongos or an old"
<< " version of mongod."};
} else {
- return cmdStatus.getStatus();
+ return swCommandResponse.getStatus();
}
}
// Check for a command response error
- BSONObj resIsMaster = cmdStatus.getValue();
- Status resIsMasterStatus = getStatusFromCommandResult(resIsMaster);
+ auto resIsMasterStatus = std::move(swCommandResponse.getValue().commandStatus);
if (!resIsMasterStatus.isOK()) {
return {resIsMasterStatus.code(),
str::stream() << "Error running isMaster against " << shardConn->toString() << ": "
<< causedBy(resIsMasterStatus)};
}
+ auto resIsMaster = std::move(swCommandResponse.getValue().response);
+
// Check whether there is a master. If there isn't, the replica set may not have been
// initiated. If the connection is a standalone, it will return true for isMaster.
bool isMaster;
@@ -350,19 +353,19 @@ StatusWith<std::vector<std::string>> CatalogManagerReplicaSet::_getDBNamesListFr
shardRegistry->createConnection(connectionString).release()};
invariant(shardConn);
- auto cmdStatus = _runCommandForAddShard(
+ auto swCommandResponse = _runCommandForAddShard(
txn, shardConn->getTargeter().get(), "admin", BSON("listDatabases" << 1));
- if (!cmdStatus.isOK()) {
- return cmdStatus.getStatus();
+ if (!swCommandResponse.isOK()) {
+ return swCommandResponse.getStatus();
}
- const BSONObj& cmdResult = cmdStatus.getValue();
-
- Status cmdResultStatus = getStatusFromCommandResult(cmdResult);
- if (!cmdResultStatus.isOK()) {
- return cmdResultStatus;
+ auto cmdStatus = std::move(swCommandResponse.getValue().commandStatus);
+ if (!cmdStatus.isOK()) {
+ return cmdStatus;
}
+ auto cmdResult = std::move(swCommandResponse.getValue().response);
+
vector<string> dbNames;
for (const auto& dbEntry : cmdResult["databases"].Obj()) {
@@ -408,7 +411,7 @@ void CatalogManagerReplicaSet::shutDown(OperationContext* txn) {
_executorForAddShard->join();
}
-StatusWith<BSONObj> CatalogManagerReplicaSet::_runCommandForAddShard(
+StatusWith<Shard::CommandResponse> CatalogManagerReplicaSet::_runCommandForAddShard(
OperationContext* txn,
RemoteCommandTargeter* targeter,
const std::string& dbName,
@@ -421,12 +424,12 @@ StatusWith<BSONObj> CatalogManagerReplicaSet::_runCommandForAddShard(
executor::RemoteCommandRequest request(
host.getValue(), dbName, cmdObj, rpc::makeEmptyMetadata(), Seconds(30));
- StatusWith<executor::RemoteCommandResponse> responseStatus =
+ StatusWith<executor::RemoteCommandResponse> swResponse =
Status(ErrorCodes::InternalError, "Internal error running command");
auto callStatus = _executorForAddShard->scheduleRemoteCommand(
- request, [&responseStatus](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
- responseStatus = args.response;
+ request, [&swResponse](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
+ swResponse = args.response;
});
if (!callStatus.isOK()) {
return callStatus.getStatus();
@@ -435,11 +438,22 @@ StatusWith<BSONObj> CatalogManagerReplicaSet::_runCommandForAddShard(
// Block until the command is carried out
_executorForAddShard->wait(callStatus.getValue());
- if (!responseStatus.isOK()) {
- return responseStatus.getStatus();
+ if (!swResponse.isOK()) {
+ if (swResponse.getStatus().compareCode(ErrorCodes::ExceededTimeLimit)) {
+ LOG(0) << "Operation for addShard timed out with status " << swResponse.getStatus();
+ }
+ return swResponse.getStatus();
}
- return responseStatus.getValue().data.getOwned();
+ BSONObj responseObj = swResponse.getValue().data.getOwned();
+ BSONObj responseMetadata = swResponse.getValue().metadata.getOwned();
+ Status commandStatus = getStatusFromCommandResult(responseObj);
+ Status writeConcernStatus = getWriteConcernStatusFromCommandResult(responseObj);
+
+ return Shard::CommandResponse(std::move(responseObj),
+ std::move(responseMetadata),
+ std::move(commandStatus),
+ std::move(writeConcernStatus));
}
StatusWith<string> CatalogManagerReplicaSet::addShard(OperationContext* txn,
@@ -497,7 +511,44 @@ StatusWith<string> CatalogManagerReplicaSet::addShard(OperationContext* txn,
shardType.setMaxSizeMB(maxSize);
}
- log() << "going to add shard: " << shardType.toString();
+ ShardIdentityType shardIdentity;
+ shardIdentity.setConfigsvrConnString(
+ Grid::get(txn)->shardRegistry()->getConfigServerConnectionString());
+ shardIdentity.setShardName(shardType.getName());
+ shardIdentity.setClusterId(Grid::get(txn)->shardRegistry()->getClusterId());
+ auto validateStatus = shardIdentity.validate();
+ if (!validateStatus.isOK()) {
+ return validateStatus;
+ }
+
+ log() << "going to insert shardIdentity document into shard: " << shardIdentity.toString();
+
+ auto updateRequest = shardIdentity.createUpsertForAddShard();
+ BatchedCommandRequest commandRequest(updateRequest.release());
+ commandRequest.setNS(NamespaceString::kConfigCollectionNamespace);
+ commandRequest.setWriteConcern(kMajorityWriteConcern.toBSON());
+
+ const std::shared_ptr<Shard> shardConn{
+ Grid::get(txn)->shardRegistry()->createConnection(shardConnectionString)};
+ invariant(shardConn);
+ auto targeter = shardConn->getTargeter();
+
+ auto swCommandResponse =
+ _runCommandForAddShard(txn, targeter.get(), "admin", commandRequest.toBSON());
+
+ if (!swCommandResponse.isOK()) {
+ return swCommandResponse.getStatus();
+ }
+
+ auto commandResponse = std::move(swCommandResponse.getValue());
+
+ BatchedCommandResponse batchResponse;
+ auto batchResponseStatus = _processBatchWriteResponse(commandResponse, &batchResponse);
+ if (!batchResponseStatus.isOK()) {
+ return batchResponseStatus;
+ }
+
+ log() << "going to insert new entry for shard into config.shards: " << shardType.toString();
Status result = insertConfigDocument(txn, ShardType::ConfigNS, shardType.toBSON());
if (!result.isOK()) {
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
index b7251dc2a37..a26bf941ab2 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
@@ -276,10 +276,10 @@ private:
* Runs a command against a "shard" that is not yet in the cluster and thus not present in the
* ShardRegistry.
*/
- StatusWith<BSONObj> _runCommandForAddShard(OperationContext* txn,
- RemoteCommandTargeter* targeter,
- const std::string& dbName,
- const BSONObj& cmdObj);
+ StatusWith<Shard::CommandResponse> _runCommandForAddShard(OperationContext* txn,
+ RemoteCommandTargeter* targeter,
+ const std::string& dbName,
+ const BSONObj& cmdObj);
StatusWith<repl::OpTimeWith<std::vector<BSONObj>>> _exhaustiveFindOnConfig(
OperationContext* txn,
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp
index 54f414c79cb..5c3e1ccfb06 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_add_shard_test.cpp
@@ -32,10 +32,12 @@
#include <vector>
+#include "mongo/client/connection_string.h"
#include "mongo/client/remote_command_targeter_factory_mock.h"
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/commands.h"
#include "mongo/db/query/query_request.h"
+#include "mongo/db/s/type_shard_identity.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
#include "mongo/s/catalog/replset/catalog_manager_replica_set.h"
@@ -75,7 +77,16 @@ protected:
CatalogManagerReplSetTestFixture::setUp();
getMessagingPort()->setRemote(HostAndPort("FakeRemoteClient:34567"));
- configTargeter()->setFindHostReturnValue(configHost);
+
+ configTargeter()->setConnectionStringReturnValue(_configConnStr);
+
+ _configHost = _configConnStr.getServers().front();
+ configTargeter()->setFindHostReturnValue(_configHost);
+
+ // TODO SERVER-23096: Change this to OID::gen() once clusterId is loaded from the config
+ // servers into the ShardRegistry instead of created by the ShardRegistry within each
+ // process.
+ _clusterId = OID();
}
/**
@@ -127,12 +138,77 @@ protected:
}
/**
+ * Waits for a request for the shardIdentity document to be upserted into a shard from the
+ * config server on addShard.
+ */
+ void expectShardIdentityUpsert(const HostAndPort& expectedHost,
+ const std::string& expectedShardName) {
+
+ ShardIdentityType expectedShardIdentity;
+ expectedShardIdentity.setShardName(expectedShardName);
+ expectedShardIdentity.setClusterId(_clusterId);
+ expectedShardIdentity.setConfigsvrConnString(_configConnStr);
+ invariant(expectedShardIdentity.validate().isOK());
+
+ auto updateRequest = expectedShardIdentity.createUpsertForAddShard();
+ expectUpdates(expectedHost,
+ NamespaceString(NamespaceString::kConfigCollectionNamespace),
+ updateRequest.get());
+ }
+
+ /**
+ * Waits for a set of batched updates and ensures that the host, namespace, and updates exactly
+ * match what's expected. Responds with a success status.
+ */
+ void expectUpdates(const HostAndPort& expectedHost,
+ const NamespaceString& expectedNss,
+ BatchedUpdateRequest* expectedBatchedUpdates) {
+ onCommandForAddShard([&](const RemoteCommandRequest& request) {
+
+ ASSERT_EQUALS(expectedHost, request.target);
+
+ // Check that the db name in the request matches the expected db name.
+ ASSERT_EQUALS(expectedNss.db(), request.dbname);
+
+ BatchedUpdateRequest actualBatchedUpdates;
+ std::string errmsg;
+ ASSERT_TRUE(actualBatchedUpdates.parseBSON(request.dbname, request.cmdObj, &errmsg));
+
+ // Check that the db and collection names in the BatchedUpdateRequest match the
+ // expected.
+ ASSERT_EQUALS(expectedNss, actualBatchedUpdates.getNS());
+
+ auto expectedUpdates = expectedBatchedUpdates->getUpdates();
+ auto actualUpdates = actualBatchedUpdates.getUpdates();
+
+ ASSERT_EQUALS(expectedUpdates.size(), actualUpdates.size());
+
+ auto itExpected = expectedUpdates.begin();
+ auto itActual = actualUpdates.begin();
+
+ for (; itActual != actualUpdates.end(); itActual++, itExpected++) {
+ ASSERT_EQ((*itExpected)->getUpsert(), (*itActual)->getUpsert());
+ ASSERT_EQ((*itExpected)->getMulti(), (*itActual)->getMulti());
+ ASSERT_EQ((*itExpected)->getQuery(), (*itActual)->getQuery());
+ ASSERT_EQ((*itExpected)->getUpdateExpr(), (*itActual)->getUpdateExpr());
+ }
+
+ BatchedCommandResponse response;
+ response.setOk(true);
+ response.setNModified(1);
+
+ return response.toBSON();
+ });
+ }
+
+
+ /**
* Wait for a single update request and ensure that the items being updated exactly match the
* expected items. Responds with a success status.
*/
void expectDatabaseUpdate(const DatabaseType& dbtExpected) {
onCommand([this, &dbtExpected](const RemoteCommandRequest& request) {
- ASSERT_EQ(request.target, configHost);
+ ASSERT_EQ(request.target, _configHost);
ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
BatchedUpdateRequest actualBatchedUpdate;
@@ -162,10 +238,10 @@ protected:
*/
void expectAddShardChangeLog(const std::string& shardName, const std::string& shardHost) {
// Expect the change log collection to be created
- expectChangeLogCreate(configHost, BSON("ok" << 1));
+ expectChangeLogCreate(_configHost, BSON("ok" << 1));
// Expect the change log operation
- expectChangeLogInsert(configHost,
+ expectChangeLogInsert(_configHost,
network()->now(),
"addShard",
"",
@@ -177,7 +253,7 @@ protected:
// Do it twice when there is no response set because getDatabase retries if it can't
// find a database
onFindCommand([&](const RemoteCommandRequest& request) {
- ASSERT_EQ(request.target, configHost);
+ ASSERT_EQ(request.target, _configHost);
if (i == 0) {
ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
} else if (i == 1) {
@@ -205,7 +281,11 @@ protected:
}
}
- const HostAndPort configHost{HostAndPort("ConfigHost:23456")};
+ const ConnectionString _configConnStr{ConnectionString::forReplicaSet(
+ "configRS",
+ {HostAndPort("host1:23456"), HostAndPort("host2:23456"), HostAndPort("host3:23456")})};
+ HostAndPort _configHost;
+ OID _clusterId;
};
TEST_F(AddShardTest, Standalone) {
@@ -250,7 +330,10 @@ TEST_F(AddShardTest, Standalone) {
expectGetDatabase("TestDB1", boost::none);
expectGetDatabase("TestDB2", boost::none);
- // The new shard is being inserted
+ // The shardIdentity doc inserted into the config.version collection on the shard.
+ expectShardIdentityUpsert(shardTarget, expectedShardName);
+
+ // The shard doc inserted into the config.shards collection on the config server.
ShardType expectedShard;
expectedShard.setName(expectedShardName);
expectedShard.setHost("StandaloneHost:12345");
@@ -347,7 +430,10 @@ TEST_F(AddShardTest, StandaloneGenerateName) {
return vector<BSONObj>{existingShard.toBSON()};
});
- // The new shard is being inserted
+ // The shardIdentity doc inserted into the config.version collection on the shard.
+ expectShardIdentityUpsert(shardTarget, expectedShardName);
+
+ // The shard doc inserted into the config.shards collection on the config server.
ShardType expectedShard;
expectedShard.setName(expectedShardName);
expectedShard.setHost(shardTarget.toString());
@@ -784,6 +870,10 @@ TEST_F(AddShardTest, ReAddExistingShard) {
expectGetDatabase("shardDB", boost::none);
+ // The shardIdentity doc inserted into the config.version collection on the shard.
+ expectShardIdentityUpsert(shardTarget, expectedShardName);
+
+ // The shard doc inserted into the config.shards collection on the config server.
ShardType newShard;
newShard.setName(expectedShardName);
newShard.setMaxSizeMB(100);
@@ -849,6 +939,10 @@ TEST_F(AddShardTest, SuccessfullyAddReplicaSet) {
expectGetDatabase("shardDB", boost::none);
+ // The shardIdentity doc inserted into the config.version collection on the shard.
+ expectShardIdentityUpsert(shardTarget, expectedShardName);
+
+ // The shard doc inserted into the config.shards collection on the config server.
ShardType newShard;
newShard.setName(expectedShardName);
newShard.setMaxSizeMB(100);
@@ -906,6 +1000,10 @@ TEST_F(AddShardTest, AddShardSucceedsEvenIfAddingDBsFromNewShardFails) {
expectGetDatabase("shardDB", boost::none);
+ // The shardIdentity doc inserted into the config.version collection on the shard.
+ expectShardIdentityUpsert(shardTarget, expectedShardName);
+
+ // The shard doc inserted into the config.shards collection on the config server.
ShardType newShard;
newShard.setName(expectedShardName);
newShard.setMaxSizeMB(100);
@@ -922,7 +1020,7 @@ TEST_F(AddShardTest, AddShardSucceedsEvenIfAddingDBsFromNewShardFails) {
// Ensure that even if upserting the database discovered on the shard fails, the addShard
// operation succeeds.
onCommand([this, &shardDB](const RemoteCommandRequest& request) {
- ASSERT_EQ(request.target, configHost);
+ ASSERT_EQ(request.target, _configHost);
ASSERT_EQUALS(BSON(rpc::kReplSetMetadataFieldName << 1), request.metadata);
BatchedUpdateRequest actualBatchedUpdate;
@@ -983,6 +1081,10 @@ TEST_F(AddShardTest, ReplicaSetExtraHostsDiscovered) {
expectListDatabases(shardTarget, {});
+ // The shardIdentity doc inserted into the config.version collection on the shard.
+ expectShardIdentityUpsert(shardTarget, expectedShardName);
+
+ // The shard doc inserted into the config.shards collection on the config server.
ShardType newShard;
newShard.setName(expectedShardName);
newShard.setMaxSizeMB(100);
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index fb9339b660c..11f0e7e7c2c 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -39,6 +39,7 @@
#include "mongo/client/connection_string.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/s/catalog/catalog_manager.h"
+#include "mongo/s/catalog/type_config_version.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/client/shard_connection.h"
@@ -59,9 +60,12 @@ using std::string;
using std::unique_ptr;
using std::vector;
+// TODO SERVER-23096: Initializing an empty _clusterId is a temporary hack. The _clusterId should
+// be queried from the config servers on sharding initialization and passed to the ShardRegistry
+// constructor.
ShardRegistry::ShardRegistry(std::unique_ptr<ShardFactory> shardFactory,
const ConnectionString& configServerCS)
- : _shardFactory(std::move(shardFactory)), _data() {
+ : _shardFactory(std::move(shardFactory)), _clusterId(), _data() {
_initConfigServerCS = configServerCS;
}
@@ -69,6 +73,10 @@ ConnectionString ShardRegistry::getConfigServerConnectionString() const {
return getConfigShard()->getConnString();
}
+const OID& ShardRegistry::getClusterId() const {
+ return _clusterId;
+}
+
void ShardRegistry::rebuildConfigShard() {
_data.rebuildConfigShard(_shardFactory.get());
invariant(_data.getConfigShard());
diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h
index 89502ff507a..df6605b5e73 100644
--- a/src/mongo/s/client/shard_registry.h
+++ b/src/mongo/s/client/shard_registry.h
@@ -158,6 +158,11 @@ public:
ConnectionString getConfigServerConnectionString() const;
/**
+ * Returns the cluster id from the config shard.
+ */
+ const OID& getClusterId() const;
+
+ /**
* Reloads the ShardRegistry based on the contents of the config server's config.shards
* collection. Returns true if this call performed a reload and false if this call only waited
* for another thread to perform the reload and did not actually reload. Because of this, it is
@@ -241,6 +246,12 @@ private:
*/
ConnectionString _initConfigServerCS;
+ /**
+ * The id for the cluster, obtained from the config servers on sharding initialization. The
+ * config servers are the authority on the clusterId.
+ */
+ const OID _clusterId;
+
ShardRegistryData _data;
// Protects the _reloadState and _initConfigServerCS during startup.