summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2016-04-22 14:10:33 -0400
committerRandolph Tan <randolph@10gen.com>2016-05-05 15:45:46 -0400
commitd5b670a01622ff5b9cd8dc1a988321022948182b (patch)
tree3eaf298e2815f11c5fefbaddec9cf0011efd0755
parent9f91d422fc826d08b8eca8d323276c22d63ba436 (diff)
downloadmongo-d5b670a01622ff5b9cd8dc1a988321022948182b.tar.gz
SERVER-23765 Update config string of shardIdentity document
-rw-r--r--jstests/sharding/shard_identity_config_update.js116
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h6
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp16
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp3
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp1
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp9
-rw-r--r--src/mongo/db/s/sharding_state.cpp75
-rw-r--r--src/mongo/db/s/sharding_state.h9
-rw-r--r--src/mongo/db/s/type_shard_identity.cpp14
-rw-r--r--src/mongo/db/s/type_shard_identity.h6
-rw-r--r--src/mongo/db/s/type_shard_identity_test.cpp7
13 files changed, 251 insertions, 13 deletions
diff --git a/jstests/sharding/shard_identity_config_update.js b/jstests/sharding/shard_identity_config_update.js
new file mode 100644
index 00000000000..b2952780d80
--- /dev/null
+++ b/jstests/sharding/shard_identity_config_update.js
@@ -0,0 +1,116 @@
+/**
+ * Tests that the config server connection string in the shard identity document of both the
+ * primary and secondary will get updated whenever the config server membership changes.
+ */
+(function() {
+ "use strict";
+
+ load('jstests/replsets/rslib.js');
+
+ var st = new ShardingTest({shards: {rs0: {nodes: 2}}});
+
+ // TODO: remove crutch once all node shard aware is complete.
+ var setupShardIdentity = function(conn, configConnStr) {
+ var shardIdentityDoc = {
+ _id: 'shardIdentity',
+ configsvrConnectionString: configConnStr,
+ shardName: 'newShard',
+ clusterId: ObjectId()
+ };
+
+ var res = conn.getDB('admin')
+ .system.version.update({_id: 'shardIdentity'}, shardIdentityDoc, true);
+ assert.eq(1, res.nUpserted);
+ };
+
+ var shardPri = st.rs0.getPrimary();
+ setupShardIdentity(st.rs0.getPrimary(), st.configRS.getURL());
+
+ // Note: Adding new replica set member by hand because of SERVER-24011.
+
+ var newNode = MongoRunner.runMongod(
+ {configsvr: '', replSet: st.configRS.name, storageEngine: 'wiredTiger'});
+
+ var replConfig = st.configRS.getReplSetConfigFromNode();
+ replConfig.version += 1;
+ replConfig.members.push({_id: 3, host: newNode.host});
+
+ reconfig(st.configRS, replConfig);
+
+ /**
+ * Returns true if the shardIdentity document has all the replica set member nodes in the
+ * expectedConfigStr.
+ */
+ var checkConfigStrUpdated = function(conn, expectedConfigStr) {
+ var shardIdentity = conn.getDB('admin').system.version.findOne({_id: 'shardIdentity'});
+
+ var shardConfigsvrStr = shardIdentity.configsvrConnectionString;
+ var shardConfigReplName = shardConfigsvrStr.split('/')[0];
+ var expectedReplName = expectedConfigStr.split('/')[0];
+
+ assert.eq(expectedReplName, shardConfigReplName);
+
+ var expectedHostList = expectedConfigStr.split('/')[1].split(',');
+ var shardConfigHostList = shardConfigsvrStr.split('/')[1].split(',');
+
+ if (expectedHostList.length != shardConfigHostList.length) {
+ return false;
+ }
+
+ for (var x = 0; x < expectedHostList.length; x++) {
+ if (shardConfigsvrStr.indexOf(expectedHostList[x]) == -1) {
+ return false;
+ }
+ }
+
+ return true;
+ };
+
+ var origConfigConnStr = st.configRS.getURL();
+ var expectedConfigStr = origConfigConnStr + ',' + newNode.host;
+ assert.soon(function() {
+ return checkConfigStrUpdated(st.rs0.getPrimary(), expectedConfigStr);
+ });
+
+ var secConn = st.rs0.getSecondary();
+ secConn.setSlaveOk(true);
+ assert.soon(function() {
+ return checkConfigStrUpdated(secConn, expectedConfigStr);
+ });
+
+ //
+ // Remove the newly added member from the config replSet while the shards are down.
+ // Check that the shard identity document will be updated with the new replSet connection
+ // string when they come back up.
+ //
+
+ st.rs0.stop(0);
+ st.rs0.stop(1);
+
+ MongoRunner.stopMongod(newNode.port);
+
+ replConfig = st.configRS.getReplSetConfigFromNode();
+ replConfig.version += 1;
+ replConfig.members.pop();
+
+ reconfig(st.configRS, replConfig);
+
+ st.rs0.restart(0, {shardsvr: ''});
+ st.rs0.restart(1, {shardsvr: ''});
+
+ st.rs0.waitForMaster();
+ st.rs0.awaitSecondaryNodes();
+
+ assert.soon(function() {
+ return checkConfigStrUpdated(st.rs0.getPrimary(), origConfigConnStr);
+ });
+
+ secConn = st.rs0.getSecondary();
+ secConn.setSlaveOk(true);
+ assert.soon(function() {
+ return checkConfigStrUpdated(secConn, origConfigConnStr);
+ });
+
+ st.stop();
+
+})();
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 5bbef20bd25..7be2d475eda 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -208,6 +208,12 @@ public:
virtual void recoverShardingState(OperationContext* txn) = 0;
/**
+ * Called when the instance transitions to primary in order to update the config server
+ * connection string of the shard identity document.
+ */
+ virtual void updateShardIdentityConfigString(OperationContext* txn) = 0;
+
+ /**
* Notifies the bgsync and syncSourceFeedback threads to choose a new sync source.
*/
virtual void signalApplierToChooseNewSyncSource() = 0;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index ef4900c1135..0fdbe0857d0 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -63,6 +63,8 @@
#include "mongo/db/s/sharding_state_recovery.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/executor/network_interface.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/client/shard_registry.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/assert_util.h"
@@ -405,6 +407,20 @@ void ReplicationCoordinatorExternalStateImpl::recoverShardingState(OperationCont
ShardingState::get(txn)->clearCollectionMetadata();
}
+void ReplicationCoordinatorExternalStateImpl::updateShardIdentityConfigString(
+ OperationContext* txn) {
+ if (ShardingState::get(txn)->enabled()) {
+ const auto configsvrConnStr =
+ Grid::get(txn)->shardRegistry()->getConfigShard()->getConnString();
+ auto status = ShardingState::get(txn)
+ ->updateShardIdentityConfigString(txn, configsvrConnStr.toString());
+ if (!status.isOK()) {
+ warning() << "error encountered while trying to update config connection string to "
+ << configsvrConnStr << causedBy(status);
+ }
+ }
+}
+
void ReplicationCoordinatorExternalStateImpl::signalApplierToChooseNewSyncSource() {
auto bgsync = BackgroundSync::get();
if (bgsync) {
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 78969cbb292..e08cf1fb75e 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -76,6 +76,7 @@ public:
virtual void killAllUserOperations(OperationContext* txn);
virtual void clearShardingState();
virtual void recoverShardingState(OperationContext* txn);
+ virtual void updateShardIdentityConfigString(OperationContext* txn) override;
virtual void signalApplierToChooseNewSyncSource();
virtual void signalApplierToCancelFetcher();
virtual void dropAllTempCollections(OperationContext* txn);
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index 3ed6023fd71..0396bb37b5f 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -199,6 +199,9 @@ void ReplicationCoordinatorExternalStateMock::clearShardingState() {}
void ReplicationCoordinatorExternalStateMock::recoverShardingState(OperationContext* txn) {}
+void ReplicationCoordinatorExternalStateMock::updateShardIdentityConfigString(
+ OperationContext* txn) {}
+
void ReplicationCoordinatorExternalStateMock::signalApplierToChooseNewSyncSource() {}
void ReplicationCoordinatorExternalStateMock::signalApplierToCancelFetcher() {
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index 57970b239d5..311de9c5e58 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -76,6 +76,7 @@ public:
virtual void killAllUserOperations(OperationContext* txn);
virtual void clearShardingState();
virtual void recoverShardingState(OperationContext* txn);
+ virtual void updateShardIdentityConfigString(OperationContext* txn) override;
virtual void signalApplierToChooseNewSyncSource();
virtual void signalApplierToCancelFetcher();
virtual void dropAllTempCollections(OperationContext* txn);
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index ab70282600f..36e0ea67444 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -751,6 +751,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
_drainFinishedCond.notify_all();
lk.unlock();
+ _externalState->updateShardIdentityConfigString(txn);
_externalState->dropAllTempCollections(txn);
// This is done for compatibility with PV0 replicas wrt how "n" ops are processed.
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 2a7299b3b25..a8743a183a2 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -180,15 +180,6 @@ void CollectionShardingState::onInsertOp(OperationContext* txn, const BSONObj& i
void CollectionShardingState::onUpdateOp(OperationContext* txn, const BSONObj& updatedDoc) {
dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
- if (txn->writesAreReplicated() && serverGlobalParams.clusterRole == ClusterRole::ShardServer &&
- _nss == NamespaceString::kConfigCollectionNamespace) {
- if (auto idElem = updatedDoc["_id"]) {
- uassert(40069,
- "cannot update shardIdentity document while in --shardsvr mode",
- idElem.str() != ShardIdentityType::IdName);
- }
- }
-
checkShardVersionOrThrow(txn);
if (_sourceMgr) {
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index 4f9c69c3e4c..7b8bd9c61e6 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -41,7 +41,10 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/ops/update.h"
+#include "mongo/db/ops/update_lifecycle_impl.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/s/collection_metadata.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/metadata_loader.h"
@@ -139,6 +142,32 @@ Date_t getDeadlineFromMaxTimeMS(OperationContext* txn) {
return Date_t::now() + Microseconds(remainingTime);
}
+/**
+ * Updates the config server field of the shardIdentity document with the given connection string
+ * if setName is equal to the config server replica set name.
+ *
+ * Note: This is intended to be used on a new thread that hasn't called Client::initThread.
+ * One example use case is for the ReplicaSetMonitor asynchronous callback when it detects changes
+ * to replica set membership.
+ */
+void updateShardIdentityConfigStringCB(const string& setName, const string& newConnectionString) {
+ auto configsvrConnStr = grid.shardRegistry()->getConfigServerConnectionString();
+ if (configsvrConnStr.getSetName() != setName) {
+ // Ignore all change notification for other sets that are not the config server.
+ return;
+ }
+
+ Client::initThread("updateShardIdentityConfigConnString");
+ auto uniqOpCtx = getGlobalServiceContext()->makeOperationContext(&cc());
+
+ auto status = ShardingState::get(uniqOpCtx.get())
+ ->updateShardIdentityConfigString(uniqOpCtx.get(), newConnectionString);
+ if (!status.isOK() && !ErrorCodes::isNotMasterError(status.code())) {
+ warning() << "error encountered while trying to update config connection string to "
+ << newConnectionString << causedBy(status);
+ }
+}
+
} // namespace
//
@@ -524,8 +553,6 @@ Status ShardingState::initializeFromShardIdentity(const ShardIdentityType& shard
if (_getInitializationState() == InitializationState::kNew) {
ShardedConnectionInfo::addHook();
- ReplicaSetMonitor::setSynchronousConfigChangeHook(
- &ConfigServer::replicaSetChangeShardRegistryUpdateHook);
try {
Status status = _globalInit(configSvrConnStr);
@@ -533,6 +560,10 @@ Status ShardingState::initializeFromShardIdentity(const ShardIdentityType& shard
// For backwards compatibility with old style inits from metadata commands.
if (status.isOK()) {
_setInitializationState_inlock(InitializationState::kInitialized);
+ ReplicaSetMonitor::setSynchronousConfigChangeHook(
+ &ConfigServer::replicaSetChangeShardRegistryUpdateHook);
+ ReplicaSetMonitor::setAsynchronousConfigChangeHook(
+ &updateShardIdentityConfigStringCB);
} else {
_initializationStatus = status;
_setInitializationState_inlock(InitializationState::kError);
@@ -560,12 +591,18 @@ void ShardingState::_initializeImpl(ConnectionString configSvr) {
// Do this initialization outside of the lock, since we are already protected by having entered
// the kInitializing state.
ShardedConnectionInfo::addHook();
- ReplicaSetMonitor::setSynchronousConfigChangeHook(
- &ConfigServer::replicaSetChangeShardRegistryUpdateHook);
try {
Status status = _globalInit(configSvr);
+
+ if (status.isOK()) {
+ ReplicaSetMonitor::setSynchronousConfigChangeHook(
+ &ConfigServer::replicaSetChangeShardRegistryUpdateHook);
+ ReplicaSetMonitor::setAsynchronousConfigChangeHook(&updateShardIdentityConfigStringCB);
+ }
+
_signalInitializationComplete(status);
+
} catch (const DBException& ex) {
_signalInitializationComplete(ex.toStatus());
}
@@ -970,6 +1007,36 @@ ShardingState::ScopedRegisterMigration::~ScopedRegisterMigration() {
ShardingState::get(_txn)->_clearMigration();
}
+Status ShardingState::updateShardIdentityConfigString(OperationContext* txn,
+ const std::string& newConnectionString) {
+ BSONObj updateObj(ShardIdentityType::createConfigServerUpdateObject(newConnectionString));
+
+ UpdateRequest updateReq(NamespaceString::kConfigCollectionNamespace);
+ updateReq.setQuery(BSON("_id" << ShardIdentityType::IdName));
+ updateReq.setUpdates(updateObj);
+ UpdateLifecycleImpl updateLifecycle(NamespaceString::kConfigCollectionNamespace);
+ updateReq.setLifecycle(&updateLifecycle);
+
+ OpDebug opDebug;
+
+ try {
+ AutoGetOrCreateDb autoDb(txn, NamespaceString::kConfigCollectionNamespace.db(), MODE_X);
+
+ auto result = update(txn, autoDb.getDb(), updateReq, &opDebug);
+ if (result.numMatched == 0) {
+ warning() << "failed to update config string of shard identity document because "
+ << "it does not exist. This shard could have been removed from the cluster";
+ } else {
+ LOG(2) << "Updated config server connection string in shardIdentity document to"
+ << newConnectionString;
+ }
+ } catch (const DBException& exception) {
+ return exception.toStatus();
+ }
+
+ return Status::OK();
+}
+
/**
* Global free function.
*/
diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h
index 672642eedde..08ff629a703 100644
--- a/src/mongo/db/s/sharding_state.h
+++ b/src/mongo/db/s/sharding_state.h
@@ -219,6 +219,15 @@ public:
std::shared_ptr<CollectionMetadata> getCollectionMetadata(const std::string& ns);
/**
+ * Updates the config server field of the shardIdentity document with the given connection
+ * string.
+ *
+ * Note: this can return NotMaster error.
+ */
+ Status updateShardIdentityConfigString(OperationContext* txn,
+ const std::string& newConnectionString);
+
+ /**
* TESTING ONLY
* Uninstalls the metadata for a given collection.
*/
diff --git a/src/mongo/db/s/type_shard_identity.cpp b/src/mongo/db/s/type_shard_identity.cpp
index f776e8cd573..7ef675ad334 100644
--- a/src/mongo/db/s/type_shard_identity.cpp
+++ b/src/mongo/db/s/type_shard_identity.cpp
@@ -30,16 +30,22 @@
#include "mongo/db/s/type_shard_identity.h"
+#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/util/bson_extract.h"
#include "mongo/util/assert_util.h"
namespace mongo {
const std::string ShardIdentityType::IdName("shardIdentity");
+
+namespace {
+
const BSONField<std::string> configsvrConnString("configsvrConnectionString");
const BSONField<std::string> shardName("shardName");
const BSONField<OID> clusterId("clusterId");
+} // unnamed namespace
+
StatusWith<ShardIdentityType> ShardIdentityType::fromBSON(const BSONObj& source) {
if (!source.hasField("_id")) {
return {ErrorCodes::NoSuchKey,
@@ -198,4 +204,12 @@ void ShardIdentityType::setClusterId(OID clusterId) {
_clusterId = std::move(clusterId);
}
+BSONObj ShardIdentityType::createConfigServerUpdateObject(const std::string& newConnString) {
+ BSONObjBuilder builder;
+ BSONObjBuilder setConfigBuilder(builder.subobjStart("$set"));
+ setConfigBuilder.append(configsvrConnString(), newConnString);
+ setConfigBuilder.doneFast();
+ return builder.obj();
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/type_shard_identity.h b/src/mongo/db/s/type_shard_identity.h
index 52d31c0c39f..30e2c832dff 100644
--- a/src/mongo/db/s/type_shard_identity.h
+++ b/src/mongo/db/s/type_shard_identity.h
@@ -77,6 +77,12 @@ public:
const OID& getClusterId() const;
void setClusterId(OID clusterId);
+ /**
+ * Returns an update object that can be used to update the config server field of the
+ * shardIdentity document with the new connection string.
+ */
+ static BSONObj createConfigServerUpdateObject(const std::string& newConnString);
+
private:
// Convention: (M)andatory, (O)ptional, (S)pecial rule.
diff --git a/src/mongo/db/s/type_shard_identity_test.cpp b/src/mongo/db/s/type_shard_identity_test.cpp
index 0b31e4388d3..8a2382e4bf7 100644
--- a/src/mongo/db/s/type_shard_identity_test.cpp
+++ b/src/mongo/db/s/type_shard_identity_test.cpp
@@ -134,5 +134,12 @@ TEST(ShardIdentityType, NonReplSetConnectionString) {
ASSERT_EQ(ErrorCodes::UnsupportedFormat, ShardIdentityType::fromBSON(doc).getStatus());
}
+TEST(ShardIdentityType, CreateUpdateObject) {
+ auto updateObj = ShardIdentityType::createConfigServerUpdateObject("test/a:1,b:2");
+ auto expectedObj = BSON("$set" << BSON("configsvrConnectionString"
+ << "test/a:1,b:2"));
+ ASSERT_EQ(expectedObj, updateObj);
+}
+
} // namespace mongo
} // unnamed namespace