diff options
24 files changed, 585 insertions, 112 deletions
diff --git a/jstests/sharding/shard_aware_on_add_shard.js b/jstests/sharding/shard_aware_on_add_shard.js index a616f03bcc3..9b7045c5dca 100644 --- a/jstests/sharding/shard_aware_on_add_shard.js +++ b/jstests/sharding/shard_aware_on_add_shard.js @@ -13,18 +13,18 @@ }); }; - var checkShardingStateInitialized = function(conn, configConnStr, shardName) { + var checkShardingStateInitialized = function(conn, configConnStr, shardName, clusterId) { var res = conn.getDB('admin').runCommand({shardingState: 1}); assert.commandWorked(res); assert(res.enabled); assert.eq(configConnStr, res.configServer); assert.eq(shardName, res.shardName); - // TODO SERVER-23096: How should the clusterId be obtained externally? - // assert.eq(clusterId, res.clusterId); + assert.eq(clusterId, res.clusterId); }; // Create the cluster to test adding shards to. var st = new ShardingTest({shards: 1}); + var clusterId = st.s.getDB('config').getCollection('version').findOne().clusterId; // Add a shard that is a standalone mongod. @@ -34,7 +34,7 @@ jsTest.log("Going to add standalone as shard: " + standaloneConn); var newShardName = "newShard"; assert.commandWorked(st.s.adminCommand({addShard: standaloneConn.name, name: newShardName})); - checkShardingStateInitialized(standaloneConn, st.configRS.getURL(), newShardName); + checkShardingStateInitialized(standaloneConn, st.configRS.getURL(), newShardName, clusterId); MongoRunner.stopMongod(standaloneConn.port); @@ -45,9 +45,10 @@ replTest.initiate(); waitForIsMaster(replTest.getPrimary()); - jsTest.log("Going to add replica set as shard: " + replTest); + jsTest.log("Going to add replica set as shard: " + tojson(replTest)); assert.commandWorked(st.s.adminCommand({addShard: replTest.getURL(), name: replTest.getURL()})); - checkShardingStateInitialized(replTest.getPrimary(), st.configRS.getURL(), replTest.getURL()); + checkShardingStateInitialized( + replTest.getPrimary(), st.configRS.getURL(), replTest.getURL(), clusterId); replTest.stopSet(); diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index dd1685fc53e..66acaa27bfb 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -168,6 +168,7 @@ error_code("CommandNotSupportedOnView", 166) error_code("OptionNotSupportedOnView", 167) error_code("OperatorNotSupportedOnView", 168) error_code("CommandOnShardedViewNotSupportedOnMongos", 169) +error_code("TooManyMatchingDocuments", 170) # Non-sequential error codes (for compatibility only) error_code("SocketException", 9001) diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index af9e67fdbfe..226a5d65af1 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -164,6 +164,7 @@ void CollectionShardingState::onInsertOp(OperationContext* txn, const BSONObj& i if (auto idElem = insertedDoc["_id"]) { if (idElem.str() == ShardIdentityType::IdName) { auto shardIdentityDoc = uassertStatusOK(ShardIdentityType::fromBSON(insertedDoc)); + uassertStatusOK(shardIdentityDoc.validate()); txn->recoveryUnit()->registerChange( new ShardIdentityLopOpHandler(txn, std::move(shardIdentityDoc))); } diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index f54eafc6ae4..a1c7b4eea7b 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -433,6 +433,15 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* txn, return Status::OK(); } + Status validationStatus = shardIdentity.validate(); + if (!validationStatus.isOK()) { + return Status( + validationStatus.code(), + str::stream() + << "Invalid shard identity document found when initializing sharding state: " + << validationStatus.reason()); + } + log() << "initializing sharding state with: " << shardIdentity; stdx::unique_lock<stdx::mutex> lk(_mutex); diff --git a/src/mongo/db/s/sharding_state_test.cpp b/src/mongo/db/s/sharding_state_test.cpp index ebd26413091..7320dd06c94 100644 --- a/src/mongo/db/s/sharding_state_test.cpp +++ b/src/mongo/db/s/sharding_state_test.cpp @@ -329,33 +329,6 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentShardNameFails) { ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(txn()).toString()); } -TEST_F(ShardingStateTest, InitializeAgainWithPreviouslyUnsetClusterIdSucceeds) { - ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString( - ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName("a"); - shardIdentity.setClusterId(OID()); - - ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity, Date_t::max())); - - ShardIdentityType shardIdentity2; - shardIdentity2.setConfigsvrConnString( - ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity2.setShardName("a"); - shardIdentity2.setClusterId(OID::gen()); - - shardingState()->setGlobalInitMethodForTest( - [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) { - return Status{ErrorCodes::InternalError, "should not reach here"}; - }); - - ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity2, Date_t::max())); - - ASSERT_TRUE(shardingState()->enabled()); - ASSERT_EQ("a", shardingState()->getShardName()); - ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(txn()).toString()); -} - TEST_F(ShardingStateTest, InitializeAgainWithDifferentClusterIdFails) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( diff --git a/src/mongo/db/s/type_shard_identity.cpp b/src/mongo/db/s/type_shard_identity.cpp index d34f4975cfa..6188f08a109 100644 --- a/src/mongo/db/s/type_shard_identity.cpp +++ b/src/mongo/db/s/type_shard_identity.cpp @@ -136,10 +136,7 @@ Status ShardIdentityType::validate() const { return {ErrorCodes::NoSuchKey, str::stream() << "missing " << shardName() << " field"}; } - // TODO SERVER-23096: Once ShardRegistry::_clusterId is loaded from the config servers rather - // than initialized in the ShardRegistry constructor in each process, the isSet() check can - // be re-added. - if (!_clusterId /*|| !_clusterId->isSet()*/) { + if (!_clusterId || !_clusterId->isSet()) { return {ErrorCodes::NoSuchKey, str::stream() << "missing " << clusterId() << " field"}; } diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 4b0cc679867..2cbaf9e5118 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -258,6 +258,7 @@ env.Library( 'catalog/catalog_cache.cpp', 'chunk.cpp', 'chunk_manager.cpp', + 'cluster_identity_loader.cpp', 'config.cpp', 'config_server_client.cpp', 'grid.cpp', @@ -333,6 +334,17 @@ env.CppUnitTest( ] ) +env.CppUnitTest( + target='cluster_identity_loader_test', + source=[ + 'cluster_identity_loader_test.cpp', + ], + LIBDEPS=[ + 'coreshard', + 'sharding_test_fixture', + ] +) + env.Library( target='local_sharding_info', source=[ diff --git a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp index a09fe9834b4..baa3ad957da 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp @@ -40,12 +40,15 @@ #include "mongo/db/s/type_shard_identity.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" +#include "mongo/s/catalog/config_server_version.h" #include "mongo/s/catalog/replset/sharding_catalog_test_fixture.h" #include "mongo/s/catalog/sharding_catalog_manager.h" #include "mongo/s/catalog/type_changelog.h" +#include "mongo/s/catalog/type_config_version.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" +#include "mongo/s/cluster_identity_loader.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/s/write_ops/batched_insert_request.h" #include "mongo/s/write_ops/batched_update_document.h" @@ -83,10 +86,17 @@ protected: _configHost = _configConnStr.getServers().front(); configTargeter()->setFindHostReturnValue(_configHost); - // TODO SERVER-23096: Change this to OID::gen() once clusterId is loaded from the config - // servers into the ShardRegistry instead of created by the ShardRegistry within each - // process. - _clusterId = OID(); + _clusterId = OID::gen(); + + // Ensure the cluster ID has been loaded and cached so that future requests for the cluster + // ID will not require any network traffic. + auto future = launchAsync([&] { + auto clusterId = assertGet( + ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext())); + ASSERT_EQUALS(_clusterId, clusterId); + }); + expectGetConfigVersion(); + future.timed_get(kFutureTimeout); } /** @@ -138,6 +148,36 @@ protected: } /** + * Intercepts a query on config.version and returns a basic config.version document containing + * _clusterId + */ + void expectGetConfigVersion() { + VersionType version; + version.setCurrentVersion(CURRENT_CONFIG_VERSION); + version.setMinCompatibleVersion(MIN_COMPATIBLE_CONFIG_VERSION); + version.setClusterId(_clusterId); + + onFindCommand([this, &version](const RemoteCommandRequest& request) { + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); + ASSERT_EQ(nss.toString(), VersionType::ConfigNS); + + auto queryResult = QueryRequest::makeFromFindCommand(nss, request.cmdObj, false); + ASSERT_OK(queryResult.getStatus()); + + const auto& query = queryResult.getValue(); + ASSERT_EQ(query->ns(), VersionType::ConfigNS); + + ASSERT_EQ(query->getFilter(), BSONObj()); + ASSERT_EQ(query->getSort(), BSONObj()); + ASSERT_FALSE(query->getLimit().is_initialized()); + + checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm); + + return std::vector<BSONObj>{version.toBSON()}; + }); + } + + /** * Waits for a request for the shardIdentity document to be upserted into a shard from the * config server on addShard. */ diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp index 386f44a5387..5e11ad1313c 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp @@ -54,10 +54,12 @@ #include "mongo/executor/task_executor.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata/repl_set_metadata.h" +#include "mongo/s/catalog/config_server_version.h" #include "mongo/s/catalog/dist_lock_manager.h" #include "mongo/s/catalog/type_changelog.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_collection.h" +#include "mongo/s/catalog/type_config_version.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/catalog/type_tags.h" @@ -909,6 +911,53 @@ StatusWith<BSONObj> ShardingCatalogClientImpl::getGlobalSettings(OperationContex return docs.front(); } +StatusWith<VersionType> ShardingCatalogClientImpl::getConfigVersion( + OperationContext* txn, repl::ReadConcernLevel readConcern) { + auto findStatus = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + txn, + kConfigReadSelector, + readConcern, + NamespaceString(VersionType::ConfigNS), + BSONObj(), + BSONObj(), + boost::none /* no limit */); + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } + + auto queryResults = findStatus.getValue().docs; + + if (queryResults.size() > 1) { + return {ErrorCodes::TooManyMatchingDocuments, + str::stream() << "should only have 1 document in " << VersionType::ConfigNS}; + } + + if (queryResults.empty()) { + VersionType versionInfo; + versionInfo.setMinCompatibleVersion(UpgradeHistory_EmptyVersion); + versionInfo.setCurrentVersion(UpgradeHistory_EmptyVersion); + versionInfo.setClusterId(OID{}); + return versionInfo; + } + + BSONObj versionDoc = queryResults.front(); + auto versionTypeResult = VersionType::fromBSON(versionDoc); + if (!versionTypeResult.isOK()) { + return Status(ErrorCodes::UnsupportedFormat, + str::stream() << "invalid config.version document: " << versionDoc + << causedBy(versionTypeResult.getStatus())); + } + + auto validationStatus = versionTypeResult.getValue().validate(); + if (!validationStatus.isOK()) { + return Status(validationStatus.code(), + str::stream() << "invalid config.version document: " << versionDoc + << causedBy(validationStatus.reason())); + } + + return versionTypeResult.getValue(); +} + Status ShardingCatalogClientImpl::getDatabasesForShard(OperationContext* txn, const ShardId& shardId, vector<string>* dbs) { diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h index 9d9d060a8e8..e98474a10a2 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h +++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h @@ -151,6 +151,9 @@ public: StatusWith<BSONObj> getGlobalSettings(OperationContext* txn, StringData key) override; + StatusWith<VersionType> getConfigVersion(OperationContext* txn, + repl::ReadConcernLevel readConcern) override; + void writeConfigServerDirect(OperationContext* txn, const BatchedCommandRequest& request, BatchedCommandResponse* response) override; diff --git a/src/mongo/s/catalog/replset/sharding_catalog_config_initialization_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_config_initialization_test.cpp index ca4f04bda77..56330375b21 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_config_initialization_test.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_config_initialization_test.cpp @@ -126,7 +126,7 @@ TEST_F(ConfigInitializationTest, InitClusterMultipleVersionDocs) { BSON("_id" << "a second document"))); - ASSERT_EQ(ErrorCodes::IncompatibleShardingConfigVersion, + ASSERT_EQ(ErrorCodes::TooManyMatchingDocuments, catalogManager()->initializeConfigDatabaseIfNeeded(operationContext())); } diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp index 39e0a6bae52..c956e25e4df 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp @@ -61,6 +61,7 @@ #include "mongo/s/catalog/type_tags.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" +#include "mongo/s/cluster_identity_loader.h" #include "mongo/s/grid.h" #include "mongo/s/set_shard_version_request.h" #include "mongo/s/write_ops/batched_command_request.h" @@ -498,11 +499,17 @@ StatusWith<string> ShardingCatalogManagerImpl::addShard( shardType.setMaxSizeMB(maxSize); } + auto clusterIdentity = ClusterIdentityLoader::get(txn); + auto clusterId = clusterIdentity->getClusterId(txn); + if (!clusterId.isOK()) { + return clusterId.getStatus(); + } + ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( Grid::get(txn)->shardRegistry()->getConfigServerConnectionString()); shardIdentity.setShardName(shardType.getName()); - shardIdentity.setClusterId(Grid::get(txn)->shardRegistry()->getClusterId()); + shardIdentity.setClusterId(clusterId.getValue()); auto validateStatus = shardIdentity.validate(); if (!validateStatus.isOK()) { return validateStatus; @@ -749,47 +756,9 @@ Status ShardingCatalogManagerImpl::initializeConfigDatabaseIfNeeded(OperationCon return Status::OK(); } -StatusWith<VersionType> ShardingCatalogManagerImpl::_getConfigVersion(OperationContext* txn) { - auto findStatus = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - txn, - kConfigReadSelector, - // Use local read concern as we're likely to follow this up with a write. - repl::ReadConcernLevel::kLocalReadConcern, - NamespaceString(VersionType::ConfigNS), - BSONObj(), - BSONObj(), - boost::none /* no limit */); - if (!findStatus.isOK()) { - return findStatus.getStatus(); - } - - auto queryResults = findStatus.getValue().docs; - - if (queryResults.size() > 1) { - return {ErrorCodes::IncompatibleShardingConfigVersion, - str::stream() << "should only have 1 document in " << VersionType::ConfigNS}; - } - - if (queryResults.empty()) { - VersionType versionInfo; - versionInfo.setMinCompatibleVersion(UpgradeHistory_EmptyVersion); - versionInfo.setCurrentVersion(UpgradeHistory_EmptyVersion); - return versionInfo; - } - - BSONObj versionDoc = queryResults.front(); - auto versionTypeResult = VersionType::fromBSON(versionDoc); - if (!versionTypeResult.isOK()) { - return Status(ErrorCodes::UnsupportedFormat, - str::stream() << "invalid config version document: " << versionDoc - << versionTypeResult.getStatus().toString()); - } - - return versionTypeResult.getValue(); -} - Status ShardingCatalogManagerImpl::_initConfigVersion(OperationContext* txn) { - auto versionStatus = _getConfigVersion(txn); + auto versionStatus = + _catalogClient->getConfigVersion(txn, repl::ReadConcernLevel::kLocalReadConcern); if (!versionStatus.isOK()) { return versionStatus.getStatus(); } diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h index bfacef70a57..8e9ab963851 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h @@ -122,11 +122,6 @@ private: const BSONObj& cmdObj); /** - * Returns the current cluster schema/protocol version. - */ - StatusWith<VersionType> _getConfigVersion(OperationContext* txn); - - /** * Performs the necessary checks for version compatibility and creates a new config.version * document if the current cluster config is empty. */ diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h index bbf59af157a..816ef6adff6 100644 --- a/src/mongo/s/catalog/sharding_catalog_client.h +++ b/src/mongo/s/catalog/sharding_catalog_client.h @@ -62,6 +62,7 @@ class Status; template <typename T> class StatusWith; class TagsType; +class VersionType; namespace executor { struct ConnectionPoolStats; @@ -345,6 +346,13 @@ public: virtual StatusWith<BSONObj> getGlobalSettings(OperationContext* txn, StringData key) = 0; /** + * Returns the contents of the config.version document - containing the current cluster schema + * version as well as the clusterID. + */ + virtual StatusWith<VersionType> getConfigVersion(OperationContext* txn, + repl::ReadConcernLevel readConcern) = 0; + + /** * Directly sends the specified command to the config server and returns the response. * * NOTE: Usage of this function is disallowed in new code, which should instead go through diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp index 1ba10b51582..4fb48226572 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp @@ -33,6 +33,7 @@ #include "mongo/base/status.h" #include "mongo/db/repl/optime.h" #include "mongo/s/catalog/type_collection.h" +#include "mongo/s/catalog/type_config_version.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/stdx/memory.h" @@ -184,6 +185,11 @@ StatusWith<BSONObj> ShardingCatalogClientMock::getGlobalSettings(OperationContex return {ErrorCodes::InternalError, "Method not implemented"}; } +StatusWith<VersionType> ShardingCatalogClientMock::getConfigVersion( + OperationContext* txn, repl::ReadConcernLevel readConcern) { + return {ErrorCodes::InternalError, "Method not implemented"}; +} + void ShardingCatalogClientMock::writeConfigServerDirect(OperationContext* txn, const BatchedCommandRequest& request, BatchedCommandResponse* response) {} diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h index 125c6b36ca7..b9ab3a57274 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.h +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h @@ -129,6 +129,9 @@ public: StatusWith<BSONObj> getGlobalSettings(OperationContext* txn, StringData key) override; + StatusWith<VersionType> getConfigVersion(OperationContext* txn, + repl::ReadConcernLevel readConcern) override; + void writeConfigServerDirect(OperationContext* txn, const BatchedCommandRequest& request, BatchedCommandResponse* response) override; diff --git a/src/mongo/s/catalog/type_config_version.cpp b/src/mongo/s/catalog/type_config_version.cpp index 92710ca7638..68fa14829ad 100644 --- a/src/mongo/s/catalog/type_config_version.cpp +++ b/src/mongo/s/catalog/type_config_version.cpp @@ -84,6 +84,10 @@ Status VersionType::validate() const { return {ErrorCodes::NoSuchKey, str::stream() << "missing " << clusterId.name() << " field"}; } + if (!_clusterId->isSet()) { + return {ErrorCodes::NotYetInitialized, "Cluster ID cannot be empty"}; + } + return Status::OK(); } @@ -210,7 +214,6 @@ void VersionType::setCurrentVersion(const int currentVersion) { } void VersionType::setClusterId(const OID& clusterId) { - invariant(clusterId.isSet()); _clusterId = clusterId; } diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 13135db1142..9b8a9c9b9f2 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -59,12 +59,9 @@ using std::string; using std::unique_ptr; using std::vector; -// TODO SERVER-23096: Initializing an empty _clusterId is a temporary hack. The _clusterId should -// be queried from the config servers on sharding initialization and passed to the ShardRegistry -// constructor. ShardRegistry::ShardRegistry(std::unique_ptr<ShardFactory> shardFactory, const ConnectionString& configServerCS) - : _shardFactory(std::move(shardFactory)), _clusterId(), _data() { + : _shardFactory(std::move(shardFactory)), _data() { _initConfigServerCS = configServerCS; } @@ -72,10 +69,6 @@ ConnectionString ShardRegistry::getConfigServerConnectionString() const { return getConfigShard()->getConnString(); } -const OID& ShardRegistry::getClusterId() const { - return _clusterId; -} - void ShardRegistry::rebuildConfigShard() { _data.rebuildConfigShard(_shardFactory.get()); invariant(_data.getConfigShard()); diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h index e384a53feea..17dd8367edd 100644 --- a/src/mongo/s/client/shard_registry.h +++ b/src/mongo/s/client/shard_registry.h @@ -158,11 +158,6 @@ public: ConnectionString getConfigServerConnectionString() const; /** - * Returns the cluster id from the config shard. - */ - const OID& getClusterId() const; - - /** * Reloads the ShardRegistry based on the contents of the config server's config.shards * collection. Returns true if this call performed a reload and false if this call only waited * for another thread to perform the reload and did not actually reload. Because of this, it is @@ -246,12 +241,6 @@ private: */ ConnectionString _initConfigServerCS; - /** - * The id for the cluster, obtained from the config servers on sharding initialization. The - * config servers are the authority on the clusterId. - */ - const OID _clusterId; - ShardRegistryData _data; // Protects the _reloadState and _initConfigServerCS during startup. diff --git a/src/mongo/s/cluster_identity_loader.cpp b/src/mongo/s/cluster_identity_loader.cpp new file mode 100644 index 00000000000..b97c30bab08 --- /dev/null +++ b/src/mongo/s/cluster_identity_loader.cpp @@ -0,0 +1,101 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/s/cluster_identity_loader.h" + +#include "mongo/base/status_with.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/service_context.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/type_config_version.h" +#include "mongo/s/grid.h" + +namespace mongo { +namespace { + +const auto getClusterIdentity = ServiceContext::declareDecoration<ClusterIdentityLoader>(); + +} // namespace + +ClusterIdentityLoader* ClusterIdentityLoader::get(ServiceContext* serviceContext) { + return &getClusterIdentity(serviceContext); +} + +ClusterIdentityLoader* ClusterIdentityLoader::get(OperationContext* operationContext) { + return ClusterIdentityLoader::get(operationContext->getServiceContext()); +} + +StatusWith<OID> ClusterIdentityLoader::getClusterId(OperationContext* txn) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (_initializationState == InitializationState::kInitialized) { + invariant(_lastLoadResult.isOK()); + return _lastLoadResult; + } + + if (_initializationState == InitializationState::kLoading) { + while (_initializationState == InitializationState::kLoading) { + _inReloadCV.wait(lk); + } + return _lastLoadResult; + } + + invariant(_initializationState == InitializationState::kUninitialized); + _initializationState = InitializationState::kLoading; + + lk.unlock(); + auto loadStatus = _loadClusterId(txn); + lk.lock(); + + invariant(_initializationState == InitializationState::kLoading); + _lastLoadResult = std::move(loadStatus); + if (_lastLoadResult.isOK()) { + _initializationState = InitializationState::kInitialized; + } else { + _initializationState = InitializationState::kUninitialized; + } + _inReloadCV.notify_all(); + return _lastLoadResult; +} + +StatusWith<OID> ClusterIdentityLoader::_loadClusterId(OperationContext* txn) { + auto catalogClient = Grid::get(txn)->catalogClient(txn); + auto loadResult = + catalogClient->getConfigVersion(txn, repl::ReadConcernLevel::kMajorityReadConcern); + if (!loadResult.isOK()) { + return Status(loadResult.getStatus().code(), + str::stream() << "Error loading clusterID" + << causedBy(loadResult.getStatus().reason())); + } + return loadResult.getValue().getClusterId(); +} + +} // namespace mongo diff --git a/src/mongo/s/cluster_identity_loader.h b/src/mongo/s/cluster_identity_loader.h new file mode 100644 index 00000000000..c1cd798186b --- /dev/null +++ b/src/mongo/s/cluster_identity_loader.h @@ -0,0 +1,95 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> + +#include "mongo/base/disallow_copying.h" +#include "mongo/bson/oid.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" + +namespace mongo { + +class OperationContext; +class ServiceContext; +template <typename T> +class StatusWith; + +/** + * Decoration on ServiceContext used by any process in a sharded cluster to access the cluster ID. + */ +class ClusterIdentityLoader { + MONGO_DISALLOW_COPYING(ClusterIdentityLoader); + +public: + ClusterIdentityLoader() = default; + ~ClusterIdentityLoader() = default; + + /** + * Retrieves the ClusterIdentity object associated with the given service context. + */ + static ClusterIdentityLoader* get(ServiceContext* serviceContext); + static ClusterIdentityLoader* get(OperationContext* operationContext); + + /** + * Returns the cluster ID. If the cluster ID has been successfully loaded in the past, will + * return the cached version which will be stored in _lastLoadResult. If we've never + * successfully loaded the cluster ID, will attempt to load it from the config.version + * collection on the config servers, or if another thread is already in the process of loading + * it, will wait for that thread to finish and then return its results. + */ + StatusWith<OID> getClusterId(OperationContext* txn); + +private: + enum class InitializationState { + kUninitialized, // We have never successfully loaded the cluster ID + kLoading, // One thread is in the process of attempting to load the cluster ID + kInitialized, // We have been able to successfully load the cluster ID. + }; + + /** + * Queries the config.version collection on the config server, extracts the cluster ID from + * the version document, and returns it. + */ + StatusWith<OID> _loadClusterId(OperationContext* txn); + + stdx::mutex _mutex; + stdx::condition_variable _inReloadCV; + + // Used to ensure that only one thread at a time attempts to reload the cluster ID from the + // config.version collection + InitializationState _initializationState{InitializationState::kUninitialized}; + + // Stores the result of the last call to _loadClusterId. Used to cache the cluster ID once it + // has been successfully loaded, as well as to report failures in loading across threads. + StatusWith<OID> _lastLoadResult{Status{ErrorCodes::InternalError, "cluster ID never loaded"}}; +}; + +} // namespace mongo diff --git a/src/mongo/s/cluster_identity_loader_test.cpp b/src/mongo/s/cluster_identity_loader_test.cpp new file mode 100644 index 00000000000..19ced5e34fc --- /dev/null +++ b/src/mongo/s/cluster_identity_loader_test.cpp @@ -0,0 +1,213 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include <vector> + +#include "mongo/client/remote_command_targeter_mock.h" +#include "mongo/db/commands.h" +#include "mongo/db/query/query_request.h" +#include "mongo/executor/network_interface_mock.h" +#include "mongo/executor/task_executor.h" +#include "mongo/rpc/metadata/repl_set_metadata.h" +#include "mongo/rpc/metadata/server_selection_metadata.h" +#include "mongo/s/catalog/config_server_version.h" +#include "mongo/s/catalog/replset/sharding_catalog_client_impl.h" +#include "mongo/s/catalog/type_config_version.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/cluster_identity_loader.h" +#include "mongo/s/sharding_test_fixture.h" +#include "mongo/stdx/future.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { +namespace { + +using executor::NetworkInterfaceMock; +using executor::RemoteCommandRequest; +using executor::TaskExecutor; +using stdx::async; +using unittest::assertGet; + +const BSONObj kReplSecondaryOkMetadata{[] { + BSONObjBuilder o; + o.appendElements(rpc::ServerSelectionMetadata(true, boost::none).toBSON()); + o.append(rpc::kReplSetMetadataFieldName, 1); + return o.obj(); +}()}; + +class ClusterIdentityTest : public ShardingTestFixture { +public: + void setUp() override { + ShardingTestFixture::setUp(); + + configTargeter()->setFindHostReturnValue(configHost); + } + + void expectConfigVersionLoad(StatusWith<OID> result) { + onFindCommand([&](const RemoteCommandRequest& request) { + ASSERT_EQUALS(configHost, request.target); + ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata); + + const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String()); + ASSERT_EQ(nss.ns(), "config.version"); + + auto query = assertGet(QueryRequest::makeFromFindCommand(nss, request.cmdObj, false)); + + ASSERT_EQ(query->ns(), "config.version"); + ASSERT_EQ(query->getFilter(), BSONObj()); + ASSERT_FALSE(query->getLimit().is_initialized()); + + if (result.isOK()) { + VersionType version; + version.setCurrentVersion(CURRENT_CONFIG_VERSION); + version.setMinCompatibleVersion(MIN_COMPATIBLE_CONFIG_VERSION); + version.setClusterId(result.getValue()); + + return StatusWith<std::vector<BSONObj>>{{version.toBSON()}}; + } else { + return StatusWith<std::vector<BSONObj>>{result.getStatus()}; + } + }); + } + +protected: + OID clusterId{OID::gen()}; + HostAndPort configHost{"TestHost1"}; +}; + +TEST_F(ClusterIdentityTest, BasicLoadSuccess) { + + // The first time you ask for the cluster ID it will have to be loaded from the config servers. + auto future = launchAsync([&] { + auto clusterIdStatus = + ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext()); + ASSERT_OK(clusterIdStatus); + ASSERT_EQUALS(clusterId, clusterIdStatus.getValue()); + }); + + expectConfigVersionLoad(clusterId); + + future.timed_get(kFutureTimeout); + + // Subsequent requests for the cluster ID should not require any network traffic as we consult + // the cached version. + auto clusterIdStatus = + ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext()); + ASSERT_OK(clusterIdStatus); + ASSERT_EQUALS(clusterId, clusterIdStatus.getValue()); +} + +TEST_F(ClusterIdentityTest, MultipleThreadsLoadingSuccess) { + // Check that multiple threads calling getClusterId at once still results in only one network + // operation. + auto future1 = launchAsync([&] { + auto clusterIdStatus = + ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext()); + ASSERT_OK(clusterIdStatus); + ASSERT_EQUALS(clusterId, clusterIdStatus.getValue()); + }); + auto future2 = launchAsync([&] { + auto clusterIdStatus = + ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext()); + ASSERT_OK(clusterIdStatus); + ASSERT_EQUALS(clusterId, clusterIdStatus.getValue()); + }); + auto future3 = launchAsync([&] { + auto clusterIdStatus = + ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext()); + ASSERT_OK(clusterIdStatus); + ASSERT_EQUALS(clusterId, clusterIdStatus.getValue()); + }); + + expectConfigVersionLoad(clusterId); + + future1.timed_get(kFutureTimeout); + future2.timed_get(kFutureTimeout); + future3.timed_get(kFutureTimeout); +} + +TEST_F(ClusterIdentityTest, BasicLoadFailureFollowedBySuccess) { + + // The first time you ask for the cluster ID it will have to be loaded from the config servers. + auto future = launchAsync([&] { + auto clusterIdStatus = + ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext()); + ASSERT_EQUALS(ErrorCodes::Interrupted, clusterIdStatus); + }); + + expectConfigVersionLoad(Status(ErrorCodes::Interrupted, "interrupted")); + + future.timed_get(kFutureTimeout); + + // After a failure to load the cluster ID, subsequent attempts to get the cluster ID should + // retry loading it. + future = launchAsync([&] { + auto clusterIdStatus = + ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext()); + ASSERT_OK(clusterIdStatus); + ASSERT_EQUALS(clusterId, clusterIdStatus.getValue()); + }); + + expectConfigVersionLoad(clusterId); + + future.timed_get(kFutureTimeout); +} + +TEST_F(ClusterIdentityTest, MultipleThreadsLoadFailure) { + // Check that multiple threads calling getClusterId at once still results in only one network + // operation. + auto future1 = launchAsync([&] { + auto clusterIdStatus = + ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext()); + ASSERT_EQUALS(ErrorCodes::Interrupted, clusterIdStatus); + }); + auto future2 = launchAsync([&] { + auto clusterIdStatus = + ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext()); + ASSERT_EQUALS(ErrorCodes::Interrupted, clusterIdStatus); + }); + auto future3 = launchAsync([&] { + auto clusterIdStatus = + ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext()); + ASSERT_EQUALS(ErrorCodes::Interrupted, clusterIdStatus); + }); + + expectConfigVersionLoad(Status(ErrorCodes::Interrupted, "interrupted")); + + future1.timed_get(kFutureTimeout); + future2.timed_get(kFutureTimeout); + future3.timed_get(kFutureTimeout); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index cdca5486b55..9d256fa4229 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -55,6 +55,7 @@ #include "mongo/s/client/shard_factory.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/client/sharding_network_connection_hook.h" +#include "mongo/s/cluster_identity_loader.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/sharding_egress_metadata_hook.h" @@ -200,6 +201,16 @@ Status reloadShardRegistryUntilSuccess(OperationContext* txn) { } try { + auto clusterIdentity = ClusterIdentityLoader::get(txn); + auto clusterId = clusterIdentity->getClusterId(txn); + if (!clusterId.isOK()) { + warning() + << "Error initializing sharding state, sleeping for 2 seconds and trying again" + << causedBy(clusterId.getStatus()); + sleepmillis(2000); + continue; + } + grid.shardRegistry()->reload(txn); return Status::OK(); } catch (const DBException& ex) { @@ -211,8 +222,9 @@ Status reloadShardRegistryUntilSuccess(OperationContext* txn) { // servers. grid.shardRegistry()->rebuildConfigShard(); } - log() << "Error initializing sharding state, sleeping for 2 seconds and trying again" - << causedBy(status); + warning() + << "Error initializing sharding state, sleeping for 2 seconds and trying again" + << causedBy(status); sleepmillis(2000); continue; } diff --git a/src/mongo/s/sharding_initialization.h b/src/mongo/s/sharding_initialization.h index 0ecdd2a3508..94973f0f24f 100644 --- a/src/mongo/s/sharding_initialization.h +++ b/src/mongo/s/sharding_initialization.h @@ -78,8 +78,8 @@ Status initializeGlobalShardingState(OperationContext* txn, ShardingCatalogManagerBuilder catalogManagerBuilder); /** - * Tries to contact the config server and reload the shard registry until it succeeds or - * is interrupted. + * Tries to contact the config server and reload the shard registry and the cluster ID until it + * succeeds or is interrupted. */ Status reloadShardRegistryUntilSuccess(OperationContext* txn); |