diff options
author | Randolph Tan <randolph@10gen.com> | 2016-04-22 14:10:33 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2016-05-05 15:45:46 -0400 |
commit | d5b670a01622ff5b9cd8dc1a988321022948182b (patch) | |
tree | 3eaf298e2815f11c5fefbaddec9cf0011efd0755 | |
parent | 9f91d422fc826d08b8eca8d323276c22d63ba436 (diff) | |
download | mongo-d5b670a01622ff5b9cd8dc1a988321022948182b.tar.gz |
SERVER-23765 Update config string of shardIdentity document
-rw-r--r-- | jstests/sharding/shard_identity_config_update.js | 116 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_mock.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_mock.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.cpp | 75 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.h | 9 | ||||
-rw-r--r-- | src/mongo/db/s/type_shard_identity.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/s/type_shard_identity.h | 6 | ||||
-rw-r--r-- | src/mongo/db/s/type_shard_identity_test.cpp | 7 |
13 files changed, 251 insertions, 13 deletions
diff --git a/jstests/sharding/shard_identity_config_update.js b/jstests/sharding/shard_identity_config_update.js new file mode 100644 index 00000000000..b2952780d80 --- /dev/null +++ b/jstests/sharding/shard_identity_config_update.js @@ -0,0 +1,116 @@ +/** + * Tests that the config server connection string in the shard identity document of both the + * primary and secondary will get updated whenever the config server membership changes. + */ +(function() { + "use strict"; + + load('jstests/replsets/rslib.js'); + + var st = new ShardingTest({shards: {rs0: {nodes: 2}}}); + + // TODO: remove crutch once all node shard aware is complete. + var setupShardIdentity = function(conn, configConnStr) { + var shardIdentityDoc = { + _id: 'shardIdentity', + configsvrConnectionString: configConnStr, + shardName: 'newShard', + clusterId: ObjectId() + }; + + var res = conn.getDB('admin') + .system.version.update({_id: 'shardIdentity'}, shardIdentityDoc, true); + assert.eq(1, res.nUpserted); + }; + + var shardPri = st.rs0.getPrimary(); + setupShardIdentity(st.rs0.getPrimary(), st.configRS.getURL()); + + // Note: Adding new replica set member by hand because of SERVER-24011. + + var newNode = MongoRunner.runMongod( + {configsvr: '', replSet: st.configRS.name, storageEngine: 'wiredTiger'}); + + var replConfig = st.configRS.getReplSetConfigFromNode(); + replConfig.version += 1; + replConfig.members.push({_id: 3, host: newNode.host}); + + reconfig(st.configRS, replConfig); + + /** + * Returns true if the shardIdentity document has all the replica set member nodes in the + * expectedConfigStr. + */ + var checkConfigStrUpdated = function(conn, expectedConfigStr) { + var shardIdentity = conn.getDB('admin').system.version.findOne({_id: 'shardIdentity'}); + + var shardConfigsvrStr = shardIdentity.configsvrConnectionString; + var shardConfigReplName = shardConfigsvrStr.split('/')[0]; + var expectedReplName = expectedConfigStr.split('/')[0]; + + assert.eq(expectedReplName, shardConfigReplName); + + var expectedHostList = expectedConfigStr.split('/')[1].split(','); + var shardConfigHostList = shardConfigsvrStr.split('/')[1].split(','); + + if (expectedHostList.length != shardConfigHostList.length) { + return false; + } + + for (var x = 0; x < expectedHostList.length; x++) { + if (shardConfigsvrStr.indexOf(expectedHostList[x]) == -1) { + return false; + } + } + + return true; + }; + + var origConfigConnStr = st.configRS.getURL(); + var expectedConfigStr = origConfigConnStr + ',' + newNode.host; + assert.soon(function() { + return checkConfigStrUpdated(st.rs0.getPrimary(), expectedConfigStr); + }); + + var secConn = st.rs0.getSecondary(); + secConn.setSlaveOk(true); + assert.soon(function() { + return checkConfigStrUpdated(secConn, expectedConfigStr); + }); + + // + // Remove the newly added member from the config replSet while the shards are down. + // Check that the shard identity document will be updated with the new replSet connection + // string when they come back up. + // + + st.rs0.stop(0); + st.rs0.stop(1); + + MongoRunner.stopMongod(newNode.port); + + replConfig = st.configRS.getReplSetConfigFromNode(); + replConfig.version += 1; + replConfig.members.pop(); + + reconfig(st.configRS, replConfig); + + st.rs0.restart(0, {shardsvr: ''}); + st.rs0.restart(1, {shardsvr: ''}); + + st.rs0.waitForMaster(); + st.rs0.awaitSecondaryNodes(); + + assert.soon(function() { + return checkConfigStrUpdated(st.rs0.getPrimary(), origConfigConnStr); + }); + + secConn = st.rs0.getSecondary(); + secConn.setSlaveOk(true); + assert.soon(function() { + return checkConfigStrUpdated(secConn, origConfigConnStr); + }); + + st.stop(); + +})(); diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 5bbef20bd25..7be2d475eda 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -208,6 +208,12 @@ public: virtual void recoverShardingState(OperationContext* txn) = 0; /** + * Called when the instance transitions to primary in order to update the config server + * connection string of the shard identity document. + */ + virtual void updateShardIdentityConfigString(OperationContext* txn) = 0; + + /** * Notifies the bgsync and syncSourceFeedback threads to choose a new sync source. */ virtual void signalApplierToChooseNewSyncSource() = 0; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index ef4900c1135..0fdbe0857d0 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -63,6 +63,8 @@ #include "mongo/db/s/sharding_state_recovery.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/executor/network_interface.h" +#include "mongo/s/grid.h" +#include "mongo/s/client/shard_registry.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/thread.h" #include "mongo/util/assert_util.h" @@ -405,6 +407,20 @@ void ReplicationCoordinatorExternalStateImpl::recoverShardingState(OperationCont ShardingState::get(txn)->clearCollectionMetadata(); } +void ReplicationCoordinatorExternalStateImpl::updateShardIdentityConfigString( + OperationContext* txn) { + if (ShardingState::get(txn)->enabled()) { + const auto configsvrConnStr = + Grid::get(txn)->shardRegistry()->getConfigShard()->getConnString(); + auto status = ShardingState::get(txn) + ->updateShardIdentityConfigString(txn, configsvrConnStr.toString()); + if (!status.isOK()) { + warning() << "error encountered while trying to update config connection string to " + << configsvrConnStr << causedBy(status); + } + } +} + void ReplicationCoordinatorExternalStateImpl::signalApplierToChooseNewSyncSource() { auto bgsync = BackgroundSync::get(); if (bgsync) { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 78969cbb292..e08cf1fb75e 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -76,6 +76,7 @@ public: virtual void killAllUserOperations(OperationContext* txn); virtual void clearShardingState(); virtual void recoverShardingState(OperationContext* txn); + virtual void updateShardIdentityConfigString(OperationContext* txn) override; virtual void signalApplierToChooseNewSyncSource(); virtual void signalApplierToCancelFetcher(); virtual void dropAllTempCollections(OperationContext* txn); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index 3ed6023fd71..0396bb37b5f 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -199,6 +199,9 @@ void ReplicationCoordinatorExternalStateMock::clearShardingState() {} void ReplicationCoordinatorExternalStateMock::recoverShardingState(OperationContext* txn) {} +void ReplicationCoordinatorExternalStateMock::updateShardIdentityConfigString( + OperationContext* txn) {} + void ReplicationCoordinatorExternalStateMock::signalApplierToChooseNewSyncSource() {} void ReplicationCoordinatorExternalStateMock::signalApplierToCancelFetcher() { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 57970b239d5..311de9c5e58 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -76,6 +76,7 @@ public: virtual void killAllUserOperations(OperationContext* txn); virtual void clearShardingState(); virtual void recoverShardingState(OperationContext* txn); + virtual void updateShardIdentityConfigString(OperationContext* txn) override; virtual void signalApplierToChooseNewSyncSource(); virtual void signalApplierToCancelFetcher(); virtual void dropAllTempCollections(OperationContext* txn); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index ab70282600f..36e0ea67444 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -751,6 +751,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) { _drainFinishedCond.notify_all(); lk.unlock(); + _externalState->updateShardIdentityConfigString(txn); _externalState->dropAllTempCollections(txn); // This is done for compatibility with PV0 replicas wrt how "n" ops are processed. diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 2a7299b3b25..a8743a183a2 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -180,15 +180,6 @@ void CollectionShardingState::onInsertOp(OperationContext* txn, const BSONObj& i void CollectionShardingState::onUpdateOp(OperationContext* txn, const BSONObj& updatedDoc) { dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); - if (txn->writesAreReplicated() && serverGlobalParams.clusterRole == ClusterRole::ShardServer && - _nss == NamespaceString::kConfigCollectionNamespace) { - if (auto idElem = updatedDoc["_id"]) { - uassert(40069, - "cannot update shardIdentity document while in --shardsvr mode", - idElem.str() != ShardIdentityType::IdName); - } - } - checkShardVersionOrThrow(txn); if (_sourceMgr) { diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index 4f9c69c3e4c..7b8bd9c61e6 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -41,7 +41,10 @@ #include "mongo/db/db_raii.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/operation_context.h" +#include "mongo/db/ops/update.h" +#include "mongo/db/ops/update_lifecycle_impl.h" #include "mongo/db/repl/optime.h" +#include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/collection_metadata.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/metadata_loader.h" @@ -139,6 +142,32 @@ Date_t getDeadlineFromMaxTimeMS(OperationContext* txn) { return Date_t::now() + Microseconds(remainingTime); } +/** + * Updates the config server field of the shardIdentity document with the given connection string + * if setName is equal to the config server replica set name. + * + * Note: This is intended to be used on a new thread that hasn't called Client::initThread. + * One example use case is for the ReplicaSetMonitor asynchronous callback when it detects changes + * to replica set membership. + */ +void updateShardIdentityConfigStringCB(const string& setName, const string& newConnectionString) { + auto configsvrConnStr = grid.shardRegistry()->getConfigServerConnectionString(); + if (configsvrConnStr.getSetName() != setName) { + // Ignore all change notification for other sets that are not the config server. + return; + } + + Client::initThread("updateShardIdentityConfigConnString"); + auto uniqOpCtx = getGlobalServiceContext()->makeOperationContext(&cc()); + + auto status = ShardingState::get(uniqOpCtx.get()) + ->updateShardIdentityConfigString(uniqOpCtx.get(), newConnectionString); + if (!status.isOK() && !ErrorCodes::isNotMasterError(status.code())) { + warning() << "error encountered while trying to update config connection string to " + << newConnectionString << causedBy(status); + } +} + } // namespace // @@ -524,8 +553,6 @@ Status ShardingState::initializeFromShardIdentity(const ShardIdentityType& shard if (_getInitializationState() == InitializationState::kNew) { ShardedConnectionInfo::addHook(); - ReplicaSetMonitor::setSynchronousConfigChangeHook( - &ConfigServer::replicaSetChangeShardRegistryUpdateHook); try { Status status = _globalInit(configSvrConnStr); @@ -533,6 +560,10 @@ Status ShardingState::initializeFromShardIdentity(const ShardIdentityType& shard // For backwards compatibility with old style inits from metadata commands. if (status.isOK()) { _setInitializationState_inlock(InitializationState::kInitialized); + ReplicaSetMonitor::setSynchronousConfigChangeHook( + &ConfigServer::replicaSetChangeShardRegistryUpdateHook); + ReplicaSetMonitor::setAsynchronousConfigChangeHook( + &updateShardIdentityConfigStringCB); } else { _initializationStatus = status; _setInitializationState_inlock(InitializationState::kError); @@ -560,12 +591,18 @@ void ShardingState::_initializeImpl(ConnectionString configSvr) { // Do this initialization outside of the lock, since we are already protected by having entered // the kInitializing state. ShardedConnectionInfo::addHook(); - ReplicaSetMonitor::setSynchronousConfigChangeHook( - &ConfigServer::replicaSetChangeShardRegistryUpdateHook); try { Status status = _globalInit(configSvr); + + if (status.isOK()) { + ReplicaSetMonitor::setSynchronousConfigChangeHook( + &ConfigServer::replicaSetChangeShardRegistryUpdateHook); + ReplicaSetMonitor::setAsynchronousConfigChangeHook(&updateShardIdentityConfigStringCB); + } + _signalInitializationComplete(status); + } catch (const DBException& ex) { _signalInitializationComplete(ex.toStatus()); } @@ -970,6 +1007,36 @@ ShardingState::ScopedRegisterMigration::~ScopedRegisterMigration() { ShardingState::get(_txn)->_clearMigration(); } +Status ShardingState::updateShardIdentityConfigString(OperationContext* txn, + const std::string& newConnectionString) { + BSONObj updateObj(ShardIdentityType::createConfigServerUpdateObject(newConnectionString)); + + UpdateRequest updateReq(NamespaceString::kConfigCollectionNamespace); + updateReq.setQuery(BSON("_id" << ShardIdentityType::IdName)); + updateReq.setUpdates(updateObj); + UpdateLifecycleImpl updateLifecycle(NamespaceString::kConfigCollectionNamespace); + updateReq.setLifecycle(&updateLifecycle); + + OpDebug opDebug; + + try { + AutoGetOrCreateDb autoDb(txn, NamespaceString::kConfigCollectionNamespace.db(), MODE_X); + + auto result = update(txn, autoDb.getDb(), updateReq, &opDebug); + if (result.numMatched == 0) { + warning() << "failed to update config string of shard identity document because " + << "it does not exist. This shard could have been removed from the cluster"; + } else { + LOG(2) << "Updated config server connection string in shardIdentity document to" + << newConnectionString; + } + } catch (const DBException& exception) { + return exception.toStatus(); + } + + return Status::OK(); +} + /** * Global free function. */ diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index 672642eedde..08ff629a703 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -219,6 +219,15 @@ public: std::shared_ptr<CollectionMetadata> getCollectionMetadata(const std::string& ns); /** + * Updates the config server field of the shardIdentity document with the given connection + * string. + * + * Note: this can return NotMaster error. + */ + Status updateShardIdentityConfigString(OperationContext* txn, + const std::string& newConnectionString); + + /** * TESTING ONLY * Uninstalls the metadata for a given collection. */ diff --git a/src/mongo/db/s/type_shard_identity.cpp b/src/mongo/db/s/type_shard_identity.cpp index f776e8cd573..7ef675ad334 100644 --- a/src/mongo/db/s/type_shard_identity.cpp +++ b/src/mongo/db/s/type_shard_identity.cpp @@ -30,16 +30,22 @@ #include "mongo/db/s/type_shard_identity.h" +#include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/util/bson_extract.h" #include "mongo/util/assert_util.h" namespace mongo { const std::string ShardIdentityType::IdName("shardIdentity"); + +namespace { + const BSONField<std::string> configsvrConnString("configsvrConnectionString"); const BSONField<std::string> shardName("shardName"); const BSONField<OID> clusterId("clusterId"); +} // unnamed namespace + StatusWith<ShardIdentityType> ShardIdentityType::fromBSON(const BSONObj& source) { if (!source.hasField("_id")) { return {ErrorCodes::NoSuchKey, @@ -198,4 +204,12 @@ void ShardIdentityType::setClusterId(OID clusterId) { _clusterId = std::move(clusterId); } +BSONObj ShardIdentityType::createConfigServerUpdateObject(const std::string& newConnString) { + BSONObjBuilder builder; + BSONObjBuilder setConfigBuilder(builder.subobjStart("$set")); + setConfigBuilder.append(configsvrConnString(), newConnString); + setConfigBuilder.doneFast(); + return builder.obj(); +} + } // namespace mongo diff --git a/src/mongo/db/s/type_shard_identity.h b/src/mongo/db/s/type_shard_identity.h index 52d31c0c39f..30e2c832dff 100644 --- a/src/mongo/db/s/type_shard_identity.h +++ b/src/mongo/db/s/type_shard_identity.h @@ -77,6 +77,12 @@ public: const OID& getClusterId() const; void setClusterId(OID clusterId); + /** + * Returns an update object that can be used to update the config server field of the + * shardIdentity document with the new connection string. + */ + static BSONObj createConfigServerUpdateObject(const std::string& newConnString); + private: // Convention: (M)andatory, (O)ptional, (S)pecial rule. diff --git a/src/mongo/db/s/type_shard_identity_test.cpp b/src/mongo/db/s/type_shard_identity_test.cpp index 0b31e4388d3..8a2382e4bf7 100644 --- a/src/mongo/db/s/type_shard_identity_test.cpp +++ b/src/mongo/db/s/type_shard_identity_test.cpp @@ -134,5 +134,12 @@ TEST(ShardIdentityType, NonReplSetConnectionString) { ASSERT_EQ(ErrorCodes::UnsupportedFormat, ShardIdentityType::fromBSON(doc).getStatus()); } +TEST(ShardIdentityType, CreateUpdateObject) { + auto updateObj = ShardIdentityType::createConfigServerUpdateObject("test/a:1,b:2"); + auto expectedObj = BSON("$set" << BSON("configsvrConnectionString" + << "test/a:1,b:2")); + ASSERT_EQ(expectedObj, updateObj); +} + } // namespace mongo } // unnamed namespace |