summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/sharding/shard_aware_on_add_shard.js13
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp1
-rw-r--r--src/mongo/db/s/sharding_state.cpp9
-rw-r--r--src/mongo/db/s/sharding_state_test.cpp27
-rw-r--r--src/mongo/db/s/type_shard_identity.cpp5
-rw-r--r--src/mongo/s/SConscript12
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp48
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp49
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_client_impl.h3
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_config_initialization_test.cpp2
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp51
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h5
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client.h8
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_mock.cpp6
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_mock.h3
-rw-r--r--src/mongo/s/catalog/type_config_version.cpp5
-rw-r--r--src/mongo/s/client/shard_registry.cpp9
-rw-r--r--src/mongo/s/client/shard_registry.h11
-rw-r--r--src/mongo/s/cluster_identity_loader.cpp101
-rw-r--r--src/mongo/s/cluster_identity_loader.h95
-rw-r--r--src/mongo/s/cluster_identity_loader_test.cpp213
-rw-r--r--src/mongo/s/sharding_initialization.cpp16
-rw-r--r--src/mongo/s/sharding_initialization.h4
24 files changed, 585 insertions, 112 deletions
diff --git a/jstests/sharding/shard_aware_on_add_shard.js b/jstests/sharding/shard_aware_on_add_shard.js
index a616f03bcc3..9b7045c5dca 100644
--- a/jstests/sharding/shard_aware_on_add_shard.js
+++ b/jstests/sharding/shard_aware_on_add_shard.js
@@ -13,18 +13,18 @@
});
};
- var checkShardingStateInitialized = function(conn, configConnStr, shardName) {
+ var checkShardingStateInitialized = function(conn, configConnStr, shardName, clusterId) {
var res = conn.getDB('admin').runCommand({shardingState: 1});
assert.commandWorked(res);
assert(res.enabled);
assert.eq(configConnStr, res.configServer);
assert.eq(shardName, res.shardName);
- // TODO SERVER-23096: How should the clusterId be obtained externally?
- // assert.eq(clusterId, res.clusterId);
+ assert.eq(clusterId, res.clusterId);
};
// Create the cluster to test adding shards to.
var st = new ShardingTest({shards: 1});
+ var clusterId = st.s.getDB('config').getCollection('version').findOne().clusterId;
// Add a shard that is a standalone mongod.
@@ -34,7 +34,7 @@
jsTest.log("Going to add standalone as shard: " + standaloneConn);
var newShardName = "newShard";
assert.commandWorked(st.s.adminCommand({addShard: standaloneConn.name, name: newShardName}));
- checkShardingStateInitialized(standaloneConn, st.configRS.getURL(), newShardName);
+ checkShardingStateInitialized(standaloneConn, st.configRS.getURL(), newShardName, clusterId);
MongoRunner.stopMongod(standaloneConn.port);
@@ -45,9 +45,10 @@
replTest.initiate();
waitForIsMaster(replTest.getPrimary());
- jsTest.log("Going to add replica set as shard: " + replTest);
+ jsTest.log("Going to add replica set as shard: " + tojson(replTest));
assert.commandWorked(st.s.adminCommand({addShard: replTest.getURL(), name: replTest.getURL()}));
- checkShardingStateInitialized(replTest.getPrimary(), st.configRS.getURL(), replTest.getURL());
+ checkShardingStateInitialized(
+ replTest.getPrimary(), st.configRS.getURL(), replTest.getURL(), clusterId);
replTest.stopSet();
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index dd1685fc53e..66acaa27bfb 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -168,6 +168,7 @@ error_code("CommandNotSupportedOnView", 166)
error_code("OptionNotSupportedOnView", 167)
error_code("OperatorNotSupportedOnView", 168)
error_code("CommandOnShardedViewNotSupportedOnMongos", 169)
+error_code("TooManyMatchingDocuments", 170)
# Non-sequential error codes (for compatibility only)
error_code("SocketException", 9001)
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index af9e67fdbfe..226a5d65af1 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -164,6 +164,7 @@ void CollectionShardingState::onInsertOp(OperationContext* txn, const BSONObj& i
if (auto idElem = insertedDoc["_id"]) {
if (idElem.str() == ShardIdentityType::IdName) {
auto shardIdentityDoc = uassertStatusOK(ShardIdentityType::fromBSON(insertedDoc));
+ uassertStatusOK(shardIdentityDoc.validate());
txn->recoveryUnit()->registerChange(
new ShardIdentityLopOpHandler(txn, std::move(shardIdentityDoc)));
}
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index f54eafc6ae4..a1c7b4eea7b 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -433,6 +433,15 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* txn,
return Status::OK();
}
+ Status validationStatus = shardIdentity.validate();
+ if (!validationStatus.isOK()) {
+ return Status(
+ validationStatus.code(),
+ str::stream()
+ << "Invalid shard identity document found when initializing sharding state: "
+ << validationStatus.reason());
+ }
+
log() << "initializing sharding state with: " << shardIdentity;
stdx::unique_lock<stdx::mutex> lk(_mutex);
diff --git a/src/mongo/db/s/sharding_state_test.cpp b/src/mongo/db/s/sharding_state_test.cpp
index ebd26413091..7320dd06c94 100644
--- a/src/mongo/db/s/sharding_state_test.cpp
+++ b/src/mongo/db/s/sharding_state_test.cpp
@@ -329,33 +329,6 @@ TEST_F(ShardingStateTest, InitializeAgainWithDifferentShardNameFails) {
ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(txn()).toString());
}
-TEST_F(ShardingStateTest, InitializeAgainWithPreviouslyUnsetClusterIdSucceeds) {
- ShardIdentityType shardIdentity;
- shardIdentity.setConfigsvrConnString(
- ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity.setShardName("a");
- shardIdentity.setClusterId(OID());
-
- ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity, Date_t::max()));
-
- ShardIdentityType shardIdentity2;
- shardIdentity2.setConfigsvrConnString(
- ConnectionString(ConnectionString::SET, "a:1,b:2", "config"));
- shardIdentity2.setShardName("a");
- shardIdentity2.setClusterId(OID::gen());
-
- shardingState()->setGlobalInitMethodForTest(
- [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) {
- return Status{ErrorCodes::InternalError, "should not reach here"};
- });
-
- ASSERT_OK(shardingState()->initializeFromShardIdentity(txn(), shardIdentity2, Date_t::max()));
-
- ASSERT_TRUE(shardingState()->enabled());
- ASSERT_EQ("a", shardingState()->getShardName());
- ASSERT_EQ("config/a:1,b:2", shardingState()->getConfigServer(txn()).toString());
-}
-
TEST_F(ShardingStateTest, InitializeAgainWithDifferentClusterIdFails) {
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
diff --git a/src/mongo/db/s/type_shard_identity.cpp b/src/mongo/db/s/type_shard_identity.cpp
index d34f4975cfa..6188f08a109 100644
--- a/src/mongo/db/s/type_shard_identity.cpp
+++ b/src/mongo/db/s/type_shard_identity.cpp
@@ -136,10 +136,7 @@ Status ShardIdentityType::validate() const {
return {ErrorCodes::NoSuchKey, str::stream() << "missing " << shardName() << " field"};
}
- // TODO SERVER-23096: Once ShardRegistry::_clusterId is loaded from the config servers rather
- // than initialized in the ShardRegistry constructor in each process, the isSet() check can
- // be re-added.
- if (!_clusterId /*|| !_clusterId->isSet()*/) {
+ if (!_clusterId || !_clusterId->isSet()) {
return {ErrorCodes::NoSuchKey, str::stream() << "missing " << clusterId() << " field"};
}
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 4b0cc679867..2cbaf9e5118 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -258,6 +258,7 @@ env.Library(
'catalog/catalog_cache.cpp',
'chunk.cpp',
'chunk_manager.cpp',
+ 'cluster_identity_loader.cpp',
'config.cpp',
'config_server_client.cpp',
'grid.cpp',
@@ -333,6 +334,17 @@ env.CppUnitTest(
]
)
+env.CppUnitTest(
+ target='cluster_identity_loader_test',
+ source=[
+ 'cluster_identity_loader_test.cpp',
+ ],
+ LIBDEPS=[
+ 'coreshard',
+ 'sharding_test_fixture',
+ ]
+)
+
env.Library(
target='local_sharding_info',
source=[
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp
index a09fe9834b4..baa3ad957da 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp
@@ -40,12 +40,15 @@
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/rpc/metadata/server_selection_metadata.h"
+#include "mongo/s/catalog/config_server_version.h"
#include "mongo/s/catalog/replset/sharding_catalog_test_fixture.h"
#include "mongo/s/catalog/sharding_catalog_manager.h"
#include "mongo/s/catalog/type_changelog.h"
+#include "mongo/s/catalog/type_config_version.h"
#include "mongo/s/catalog/type_database.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/cluster_identity_loader.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/s/write_ops/batched_insert_request.h"
#include "mongo/s/write_ops/batched_update_document.h"
@@ -83,10 +86,17 @@ protected:
_configHost = _configConnStr.getServers().front();
configTargeter()->setFindHostReturnValue(_configHost);
- // TODO SERVER-23096: Change this to OID::gen() once clusterId is loaded from the config
- // servers into the ShardRegistry instead of created by the ShardRegistry within each
- // process.
- _clusterId = OID();
+ _clusterId = OID::gen();
+
+ // Ensure the cluster ID has been loaded and cached so that future requests for the cluster
+ // ID will not require any network traffic.
+ auto future = launchAsync([&] {
+ auto clusterId = assertGet(
+ ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext()));
+ ASSERT_EQUALS(_clusterId, clusterId);
+ });
+ expectGetConfigVersion();
+ future.timed_get(kFutureTimeout);
}
/**
@@ -138,6 +148,36 @@ protected:
}
/**
+ * Intercepts a query on config.version and returns a basic config.version document containing
+ * _clusterId
+ */
+ void expectGetConfigVersion() {
+ VersionType version;
+ version.setCurrentVersion(CURRENT_CONFIG_VERSION);
+ version.setMinCompatibleVersion(MIN_COMPATIBLE_CONFIG_VERSION);
+ version.setClusterId(_clusterId);
+
+ onFindCommand([this, &version](const RemoteCommandRequest& request) {
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ ASSERT_EQ(nss.toString(), VersionType::ConfigNS);
+
+ auto queryResult = QueryRequest::makeFromFindCommand(nss, request.cmdObj, false);
+ ASSERT_OK(queryResult.getStatus());
+
+ const auto& query = queryResult.getValue();
+ ASSERT_EQ(query->ns(), VersionType::ConfigNS);
+
+ ASSERT_EQ(query->getFilter(), BSONObj());
+ ASSERT_EQ(query->getSort(), BSONObj());
+ ASSERT_FALSE(query->getLimit().is_initialized());
+
+ checkReadConcern(request.cmdObj, Timestamp(0, 0), repl::OpTime::kUninitializedTerm);
+
+ return std::vector<BSONObj>{version.toBSON()};
+ });
+ }
+
+ /**
* Waits for a request for the shardIdentity document to be upserted into a shard from the
* config server on addShard.
*/
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp
index 386f44a5387..5e11ad1313c 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.cpp
@@ -54,10 +54,12 @@
#include "mongo/executor/task_executor.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
+#include "mongo/s/catalog/config_server_version.h"
#include "mongo/s/catalog/dist_lock_manager.h"
#include "mongo/s/catalog/type_changelog.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_collection.h"
+#include "mongo/s/catalog/type_config_version.h"
#include "mongo/s/catalog/type_database.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/catalog/type_tags.h"
@@ -909,6 +911,53 @@ StatusWith<BSONObj> ShardingCatalogClientImpl::getGlobalSettings(OperationContex
return docs.front();
}
+StatusWith<VersionType> ShardingCatalogClientImpl::getConfigVersion(
+ OperationContext* txn, repl::ReadConcernLevel readConcern) {
+ auto findStatus = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ txn,
+ kConfigReadSelector,
+ readConcern,
+ NamespaceString(VersionType::ConfigNS),
+ BSONObj(),
+ BSONObj(),
+ boost::none /* no limit */);
+ if (!findStatus.isOK()) {
+ return findStatus.getStatus();
+ }
+
+ auto queryResults = findStatus.getValue().docs;
+
+ if (queryResults.size() > 1) {
+ return {ErrorCodes::TooManyMatchingDocuments,
+ str::stream() << "should only have 1 document in " << VersionType::ConfigNS};
+ }
+
+ if (queryResults.empty()) {
+ VersionType versionInfo;
+ versionInfo.setMinCompatibleVersion(UpgradeHistory_EmptyVersion);
+ versionInfo.setCurrentVersion(UpgradeHistory_EmptyVersion);
+ versionInfo.setClusterId(OID{});
+ return versionInfo;
+ }
+
+ BSONObj versionDoc = queryResults.front();
+ auto versionTypeResult = VersionType::fromBSON(versionDoc);
+ if (!versionTypeResult.isOK()) {
+ return Status(ErrorCodes::UnsupportedFormat,
+ str::stream() << "invalid config.version document: " << versionDoc
+ << causedBy(versionTypeResult.getStatus()));
+ }
+
+ auto validationStatus = versionTypeResult.getValue().validate();
+ if (!validationStatus.isOK()) {
+ return Status(validationStatus.code(),
+ str::stream() << "invalid config.version document: " << versionDoc
+ << causedBy(validationStatus.reason()));
+ }
+
+ return versionTypeResult.getValue();
+}
+
Status ShardingCatalogClientImpl::getDatabasesForShard(OperationContext* txn,
const ShardId& shardId,
vector<string>* dbs) {
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h
index 9d9d060a8e8..e98474a10a2 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h
+++ b/src/mongo/s/catalog/replset/sharding_catalog_client_impl.h
@@ -151,6 +151,9 @@ public:
StatusWith<BSONObj> getGlobalSettings(OperationContext* txn, StringData key) override;
+ StatusWith<VersionType> getConfigVersion(OperationContext* txn,
+ repl::ReadConcernLevel readConcern) override;
+
void writeConfigServerDirect(OperationContext* txn,
const BatchedCommandRequest& request,
BatchedCommandResponse* response) override;
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_config_initialization_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_config_initialization_test.cpp
index ca4f04bda77..56330375b21 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_config_initialization_test.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_config_initialization_test.cpp
@@ -126,7 +126,7 @@ TEST_F(ConfigInitializationTest, InitClusterMultipleVersionDocs) {
BSON("_id"
<< "a second document")));
- ASSERT_EQ(ErrorCodes::IncompatibleShardingConfigVersion,
+ ASSERT_EQ(ErrorCodes::TooManyMatchingDocuments,
catalogManager()->initializeConfigDatabaseIfNeeded(operationContext()));
}
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
index 39e0a6bae52..c956e25e4df 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
@@ -61,6 +61,7 @@
#include "mongo/s/catalog/type_tags.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/cluster_identity_loader.h"
#include "mongo/s/grid.h"
#include "mongo/s/set_shard_version_request.h"
#include "mongo/s/write_ops/batched_command_request.h"
@@ -498,11 +499,17 @@ StatusWith<string> ShardingCatalogManagerImpl::addShard(
shardType.setMaxSizeMB(maxSize);
}
+ auto clusterIdentity = ClusterIdentityLoader::get(txn);
+ auto clusterId = clusterIdentity->getClusterId(txn);
+ if (!clusterId.isOK()) {
+ return clusterId.getStatus();
+ }
+
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
Grid::get(txn)->shardRegistry()->getConfigServerConnectionString());
shardIdentity.setShardName(shardType.getName());
- shardIdentity.setClusterId(Grid::get(txn)->shardRegistry()->getClusterId());
+ shardIdentity.setClusterId(clusterId.getValue());
auto validateStatus = shardIdentity.validate();
if (!validateStatus.isOK()) {
return validateStatus;
@@ -749,47 +756,9 @@ Status ShardingCatalogManagerImpl::initializeConfigDatabaseIfNeeded(OperationCon
return Status::OK();
}
-StatusWith<VersionType> ShardingCatalogManagerImpl::_getConfigVersion(OperationContext* txn) {
- auto findStatus = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
- txn,
- kConfigReadSelector,
- // Use local read concern as we're likely to follow this up with a write.
- repl::ReadConcernLevel::kLocalReadConcern,
- NamespaceString(VersionType::ConfigNS),
- BSONObj(),
- BSONObj(),
- boost::none /* no limit */);
- if (!findStatus.isOK()) {
- return findStatus.getStatus();
- }
-
- auto queryResults = findStatus.getValue().docs;
-
- if (queryResults.size() > 1) {
- return {ErrorCodes::IncompatibleShardingConfigVersion,
- str::stream() << "should only have 1 document in " << VersionType::ConfigNS};
- }
-
- if (queryResults.empty()) {
- VersionType versionInfo;
- versionInfo.setMinCompatibleVersion(UpgradeHistory_EmptyVersion);
- versionInfo.setCurrentVersion(UpgradeHistory_EmptyVersion);
- return versionInfo;
- }
-
- BSONObj versionDoc = queryResults.front();
- auto versionTypeResult = VersionType::fromBSON(versionDoc);
- if (!versionTypeResult.isOK()) {
- return Status(ErrorCodes::UnsupportedFormat,
- str::stream() << "invalid config version document: " << versionDoc
- << versionTypeResult.getStatus().toString());
- }
-
- return versionTypeResult.getValue();
-}
-
Status ShardingCatalogManagerImpl::_initConfigVersion(OperationContext* txn) {
- auto versionStatus = _getConfigVersion(txn);
+ auto versionStatus =
+ _catalogClient->getConfigVersion(txn, repl::ReadConcernLevel::kLocalReadConcern);
if (!versionStatus.isOK()) {
return versionStatus.getStatus();
}
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
index bfacef70a57..8e9ab963851 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
@@ -122,11 +122,6 @@ private:
const BSONObj& cmdObj);
/**
- * Returns the current cluster schema/protocol version.
- */
- StatusWith<VersionType> _getConfigVersion(OperationContext* txn);
-
- /**
* Performs the necessary checks for version compatibility and creates a new config.version
* document if the current cluster config is empty.
*/
diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h
index bbf59af157a..816ef6adff6 100644
--- a/src/mongo/s/catalog/sharding_catalog_client.h
+++ b/src/mongo/s/catalog/sharding_catalog_client.h
@@ -62,6 +62,7 @@ class Status;
template <typename T>
class StatusWith;
class TagsType;
+class VersionType;
namespace executor {
struct ConnectionPoolStats;
@@ -345,6 +346,13 @@ public:
virtual StatusWith<BSONObj> getGlobalSettings(OperationContext* txn, StringData key) = 0;
/**
+ * Returns the contents of the config.version document - containing the current cluster schema
+ * version as well as the clusterID.
+ */
+ virtual StatusWith<VersionType> getConfigVersion(OperationContext* txn,
+ repl::ReadConcernLevel readConcern) = 0;
+
+ /**
* Directly sends the specified command to the config server and returns the response.
*
* NOTE: Usage of this function is disallowed in new code, which should instead go through
diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
index 1ba10b51582..4fb48226572 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp
@@ -33,6 +33,7 @@
#include "mongo/base/status.h"
#include "mongo/db/repl/optime.h"
#include "mongo/s/catalog/type_collection.h"
+#include "mongo/s/catalog/type_config_version.h"
#include "mongo/s/catalog/type_database.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/stdx/memory.h"
@@ -184,6 +185,11 @@ StatusWith<BSONObj> ShardingCatalogClientMock::getGlobalSettings(OperationContex
return {ErrorCodes::InternalError, "Method not implemented"};
}
+StatusWith<VersionType> ShardingCatalogClientMock::getConfigVersion(
+ OperationContext* txn, repl::ReadConcernLevel readConcern) {
+ return {ErrorCodes::InternalError, "Method not implemented"};
+}
+
void ShardingCatalogClientMock::writeConfigServerDirect(OperationContext* txn,
const BatchedCommandRequest& request,
BatchedCommandResponse* response) {}
diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h
index 125c6b36ca7..b9ab3a57274 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_mock.h
+++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h
@@ -129,6 +129,9 @@ public:
StatusWith<BSONObj> getGlobalSettings(OperationContext* txn, StringData key) override;
+ StatusWith<VersionType> getConfigVersion(OperationContext* txn,
+ repl::ReadConcernLevel readConcern) override;
+
void writeConfigServerDirect(OperationContext* txn,
const BatchedCommandRequest& request,
BatchedCommandResponse* response) override;
diff --git a/src/mongo/s/catalog/type_config_version.cpp b/src/mongo/s/catalog/type_config_version.cpp
index 92710ca7638..68fa14829ad 100644
--- a/src/mongo/s/catalog/type_config_version.cpp
+++ b/src/mongo/s/catalog/type_config_version.cpp
@@ -84,6 +84,10 @@ Status VersionType::validate() const {
return {ErrorCodes::NoSuchKey, str::stream() << "missing " << clusterId.name() << " field"};
}
+ if (!_clusterId->isSet()) {
+ return {ErrorCodes::NotYetInitialized, "Cluster ID cannot be empty"};
+ }
+
return Status::OK();
}
@@ -210,7 +214,6 @@ void VersionType::setCurrentVersion(const int currentVersion) {
}
void VersionType::setClusterId(const OID& clusterId) {
- invariant(clusterId.isSet());
_clusterId = clusterId;
}
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index 13135db1142..9b8a9c9b9f2 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -59,12 +59,9 @@ using std::string;
using std::unique_ptr;
using std::vector;
-// TODO SERVER-23096: Initializing an empty _clusterId is a temporary hack. The _clusterId should
-// be queried from the config servers on sharding initialization and passed to the ShardRegistry
-// constructor.
ShardRegistry::ShardRegistry(std::unique_ptr<ShardFactory> shardFactory,
const ConnectionString& configServerCS)
- : _shardFactory(std::move(shardFactory)), _clusterId(), _data() {
+ : _shardFactory(std::move(shardFactory)), _data() {
_initConfigServerCS = configServerCS;
}
@@ -72,10 +69,6 @@ ConnectionString ShardRegistry::getConfigServerConnectionString() const {
return getConfigShard()->getConnString();
}
-const OID& ShardRegistry::getClusterId() const {
- return _clusterId;
-}
-
void ShardRegistry::rebuildConfigShard() {
_data.rebuildConfigShard(_shardFactory.get());
invariant(_data.getConfigShard());
diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h
index e384a53feea..17dd8367edd 100644
--- a/src/mongo/s/client/shard_registry.h
+++ b/src/mongo/s/client/shard_registry.h
@@ -158,11 +158,6 @@ public:
ConnectionString getConfigServerConnectionString() const;
/**
- * Returns the cluster id from the config shard.
- */
- const OID& getClusterId() const;
-
- /**
* Reloads the ShardRegistry based on the contents of the config server's config.shards
* collection. Returns true if this call performed a reload and false if this call only waited
* for another thread to perform the reload and did not actually reload. Because of this, it is
@@ -246,12 +241,6 @@ private:
*/
ConnectionString _initConfigServerCS;
- /**
- * The id for the cluster, obtained from the config servers on sharding initialization. The
- * config servers are the authority on the clusterId.
- */
- const OID _clusterId;
-
ShardRegistryData _data;
// Protects the _reloadState and _initConfigServerCS during startup.
diff --git a/src/mongo/s/cluster_identity_loader.cpp b/src/mongo/s/cluster_identity_loader.cpp
new file mode 100644
index 00000000000..b97c30bab08
--- /dev/null
+++ b/src/mongo/s/cluster_identity_loader.cpp
@@ -0,0 +1,101 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/cluster_identity_loader.h"
+
+#include "mongo/base/status_with.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/service_context.h"
+#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/catalog/type_config_version.h"
+#include "mongo/s/grid.h"
+
+namespace mongo {
+namespace {
+
+const auto getClusterIdentity = ServiceContext::declareDecoration<ClusterIdentityLoader>();
+
+} // namespace
+
+ClusterIdentityLoader* ClusterIdentityLoader::get(ServiceContext* serviceContext) {
+ return &getClusterIdentity(serviceContext);
+}
+
+ClusterIdentityLoader* ClusterIdentityLoader::get(OperationContext* operationContext) {
+ return ClusterIdentityLoader::get(operationContext->getServiceContext());
+}
+
+StatusWith<OID> ClusterIdentityLoader::getClusterId(OperationContext* txn) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (_initializationState == InitializationState::kInitialized) {
+ invariant(_lastLoadResult.isOK());
+ return _lastLoadResult;
+ }
+
+ if (_initializationState == InitializationState::kLoading) {
+ while (_initializationState == InitializationState::kLoading) {
+ _inReloadCV.wait(lk);
+ }
+ return _lastLoadResult;
+ }
+
+ invariant(_initializationState == InitializationState::kUninitialized);
+ _initializationState = InitializationState::kLoading;
+
+ lk.unlock();
+ auto loadStatus = _loadClusterId(txn);
+ lk.lock();
+
+ invariant(_initializationState == InitializationState::kLoading);
+ _lastLoadResult = std::move(loadStatus);
+ if (_lastLoadResult.isOK()) {
+ _initializationState = InitializationState::kInitialized;
+ } else {
+ _initializationState = InitializationState::kUninitialized;
+ }
+ _inReloadCV.notify_all();
+ return _lastLoadResult;
+}
+
+StatusWith<OID> ClusterIdentityLoader::_loadClusterId(OperationContext* txn) {
+ auto catalogClient = Grid::get(txn)->catalogClient(txn);
+ auto loadResult =
+ catalogClient->getConfigVersion(txn, repl::ReadConcernLevel::kMajorityReadConcern);
+ if (!loadResult.isOK()) {
+ return Status(loadResult.getStatus().code(),
+ str::stream() << "Error loading clusterID"
+ << causedBy(loadResult.getStatus().reason()));
+ }
+ return loadResult.getValue().getClusterId();
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/cluster_identity_loader.h b/src/mongo/s/cluster_identity_loader.h
new file mode 100644
index 00000000000..c1cd798186b
--- /dev/null
+++ b/src/mongo/s/cluster_identity_loader.h
@@ -0,0 +1,95 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/bson/oid.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+
+namespace mongo {
+
+class OperationContext;
+class ServiceContext;
+template <typename T>
+class StatusWith;
+
+/**
+ * Decoration on ServiceContext used by any process in a sharded cluster to access the cluster ID.
+ */
+class ClusterIdentityLoader {
+ MONGO_DISALLOW_COPYING(ClusterIdentityLoader);
+
+public:
+ ClusterIdentityLoader() = default;
+ ~ClusterIdentityLoader() = default;
+
+ /**
+ * Retrieves the ClusterIdentity object associated with the given service context.
+ */
+ static ClusterIdentityLoader* get(ServiceContext* serviceContext);
+ static ClusterIdentityLoader* get(OperationContext* operationContext);
+
+ /**
+ * Returns the cluster ID. If the cluster ID has been successfully loaded in the past, will
+ * return the cached version which will be stored in _lastLoadResult. If we've never
+ * successfully loaded the cluster ID, will attempt to load it from the config.version
+ * collection on the config servers, or if another thread is already in the process of loading
+ * it, will wait for that thread to finish and then return its results.
+ */
+ StatusWith<OID> getClusterId(OperationContext* txn);
+
+private:
+ enum class InitializationState {
+ kUninitialized, // We have never successfully loaded the cluster ID
+ kLoading, // One thread is in the process of attempting to load the cluster ID
+ kInitialized, // We have been able to successfully load the cluster ID.
+ };
+
+ /**
+ * Queries the config.version collection on the config server, extracts the cluster ID from
+ * the version document, and returns it.
+ */
+ StatusWith<OID> _loadClusterId(OperationContext* txn);
+
+ stdx::mutex _mutex;
+ stdx::condition_variable _inReloadCV;
+
+ // Used to ensure that only one thread at a time attempts to reload the cluster ID from the
+ // config.version collection
+ InitializationState _initializationState{InitializationState::kUninitialized};
+
+ // Stores the result of the last call to _loadClusterId. Used to cache the cluster ID once it
+ // has been successfully loaded, as well as to report failures in loading across threads.
+ StatusWith<OID> _lastLoadResult{Status{ErrorCodes::InternalError, "cluster ID never loaded"}};
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/cluster_identity_loader_test.cpp b/src/mongo/s/cluster_identity_loader_test.cpp
new file mode 100644
index 00000000000..19ced5e34fc
--- /dev/null
+++ b/src/mongo/s/cluster_identity_loader_test.cpp
@@ -0,0 +1,213 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include <vector>
+
+#include "mongo/client/remote_command_targeter_mock.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/query/query_request.h"
+#include "mongo/executor/network_interface_mock.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/rpc/metadata/repl_set_metadata.h"
+#include "mongo/rpc/metadata/server_selection_metadata.h"
+#include "mongo/s/catalog/config_server_version.h"
+#include "mongo/s/catalog/replset/sharding_catalog_client_impl.h"
+#include "mongo/s/catalog/type_config_version.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/cluster_identity_loader.h"
+#include "mongo/s/sharding_test_fixture.h"
+#include "mongo/stdx/future.h"
+#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+namespace {
+
+using executor::NetworkInterfaceMock;
+using executor::RemoteCommandRequest;
+using executor::TaskExecutor;
+using stdx::async;
+using unittest::assertGet;
+
+const BSONObj kReplSecondaryOkMetadata{[] {
+ BSONObjBuilder o;
+ o.appendElements(rpc::ServerSelectionMetadata(true, boost::none).toBSON());
+ o.append(rpc::kReplSetMetadataFieldName, 1);
+ return o.obj();
+}()};
+
+class ClusterIdentityTest : public ShardingTestFixture {
+public:
+ void setUp() override {
+ ShardingTestFixture::setUp();
+
+ configTargeter()->setFindHostReturnValue(configHost);
+ }
+
+ void expectConfigVersionLoad(StatusWith<OID> result) {
+ onFindCommand([&](const RemoteCommandRequest& request) {
+ ASSERT_EQUALS(configHost, request.target);
+ ASSERT_EQUALS(kReplSecondaryOkMetadata, request.metadata);
+
+ const NamespaceString nss(request.dbname, request.cmdObj.firstElement().String());
+ ASSERT_EQ(nss.ns(), "config.version");
+
+ auto query = assertGet(QueryRequest::makeFromFindCommand(nss, request.cmdObj, false));
+
+ ASSERT_EQ(query->ns(), "config.version");
+ ASSERT_EQ(query->getFilter(), BSONObj());
+ ASSERT_FALSE(query->getLimit().is_initialized());
+
+ if (result.isOK()) {
+ VersionType version;
+ version.setCurrentVersion(CURRENT_CONFIG_VERSION);
+ version.setMinCompatibleVersion(MIN_COMPATIBLE_CONFIG_VERSION);
+ version.setClusterId(result.getValue());
+
+ return StatusWith<std::vector<BSONObj>>{{version.toBSON()}};
+ } else {
+ return StatusWith<std::vector<BSONObj>>{result.getStatus()};
+ }
+ });
+ }
+
+protected:
+ OID clusterId{OID::gen()};
+ HostAndPort configHost{"TestHost1"};
+};
+
+TEST_F(ClusterIdentityTest, BasicLoadSuccess) {
+
+ // The first time you ask for the cluster ID it will have to be loaded from the config servers.
+ auto future = launchAsync([&] {
+ auto clusterIdStatus =
+ ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext());
+ ASSERT_OK(clusterIdStatus);
+ ASSERT_EQUALS(clusterId, clusterIdStatus.getValue());
+ });
+
+ expectConfigVersionLoad(clusterId);
+
+ future.timed_get(kFutureTimeout);
+
+ // Subsequent requests for the cluster ID should not require any network traffic as we consult
+ // the cached version.
+ auto clusterIdStatus =
+ ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext());
+ ASSERT_OK(clusterIdStatus);
+ ASSERT_EQUALS(clusterId, clusterIdStatus.getValue());
+}
+
+TEST_F(ClusterIdentityTest, MultipleThreadsLoadingSuccess) {
+ // Check that multiple threads calling getClusterId at once still results in only one network
+ // operation.
+ auto future1 = launchAsync([&] {
+ auto clusterIdStatus =
+ ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext());
+ ASSERT_OK(clusterIdStatus);
+ ASSERT_EQUALS(clusterId, clusterIdStatus.getValue());
+ });
+ auto future2 = launchAsync([&] {
+ auto clusterIdStatus =
+ ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext());
+ ASSERT_OK(clusterIdStatus);
+ ASSERT_EQUALS(clusterId, clusterIdStatus.getValue());
+ });
+ auto future3 = launchAsync([&] {
+ auto clusterIdStatus =
+ ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext());
+ ASSERT_OK(clusterIdStatus);
+ ASSERT_EQUALS(clusterId, clusterIdStatus.getValue());
+ });
+
+ expectConfigVersionLoad(clusterId);
+
+ future1.timed_get(kFutureTimeout);
+ future2.timed_get(kFutureTimeout);
+ future3.timed_get(kFutureTimeout);
+}
+
+TEST_F(ClusterIdentityTest, BasicLoadFailureFollowedBySuccess) {
+
+ // The first time you ask for the cluster ID it will have to be loaded from the config servers.
+ auto future = launchAsync([&] {
+ auto clusterIdStatus =
+ ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext());
+ ASSERT_EQUALS(ErrorCodes::Interrupted, clusterIdStatus);
+ });
+
+ expectConfigVersionLoad(Status(ErrorCodes::Interrupted, "interrupted"));
+
+ future.timed_get(kFutureTimeout);
+
+ // After a failure to load the cluster ID, subsequent attempts to get the cluster ID should
+ // retry loading it.
+ future = launchAsync([&] {
+ auto clusterIdStatus =
+ ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext());
+ ASSERT_OK(clusterIdStatus);
+ ASSERT_EQUALS(clusterId, clusterIdStatus.getValue());
+ });
+
+ expectConfigVersionLoad(clusterId);
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ClusterIdentityTest, MultipleThreadsLoadFailure) {
+ // Check that multiple threads calling getClusterId at once still results in only one network
+ // operation.
+ auto future1 = launchAsync([&] {
+ auto clusterIdStatus =
+ ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext());
+ ASSERT_EQUALS(ErrorCodes::Interrupted, clusterIdStatus);
+ });
+ auto future2 = launchAsync([&] {
+ auto clusterIdStatus =
+ ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext());
+ ASSERT_EQUALS(ErrorCodes::Interrupted, clusterIdStatus);
+ });
+ auto future3 = launchAsync([&] {
+ auto clusterIdStatus =
+ ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext());
+ ASSERT_EQUALS(ErrorCodes::Interrupted, clusterIdStatus);
+ });
+
+ expectConfigVersionLoad(Status(ErrorCodes::Interrupted, "interrupted"));
+
+ future1.timed_get(kFutureTimeout);
+ future2.timed_get(kFutureTimeout);
+ future3.timed_get(kFutureTimeout);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index cdca5486b55..9d256fa4229 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -55,6 +55,7 @@
#include "mongo/s/client/shard_factory.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/client/sharding_network_connection_hook.h"
+#include "mongo/s/cluster_identity_loader.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/sharding_egress_metadata_hook.h"
@@ -200,6 +201,16 @@ Status reloadShardRegistryUntilSuccess(OperationContext* txn) {
}
try {
+ auto clusterIdentity = ClusterIdentityLoader::get(txn);
+ auto clusterId = clusterIdentity->getClusterId(txn);
+ if (!clusterId.isOK()) {
+ warning()
+ << "Error initializing sharding state, sleeping for 2 seconds and trying again"
+ << causedBy(clusterId.getStatus());
+ sleepmillis(2000);
+ continue;
+ }
+
grid.shardRegistry()->reload(txn);
return Status::OK();
} catch (const DBException& ex) {
@@ -211,8 +222,9 @@ Status reloadShardRegistryUntilSuccess(OperationContext* txn) {
// servers.
grid.shardRegistry()->rebuildConfigShard();
}
- log() << "Error initializing sharding state, sleeping for 2 seconds and trying again"
- << causedBy(status);
+ warning()
+ << "Error initializing sharding state, sleeping for 2 seconds and trying again"
+ << causedBy(status);
sleepmillis(2000);
continue;
}
diff --git a/src/mongo/s/sharding_initialization.h b/src/mongo/s/sharding_initialization.h
index 0ecdd2a3508..94973f0f24f 100644
--- a/src/mongo/s/sharding_initialization.h
+++ b/src/mongo/s/sharding_initialization.h
@@ -78,8 +78,8 @@ Status initializeGlobalShardingState(OperationContext* txn,
ShardingCatalogManagerBuilder catalogManagerBuilder);
/**
- * Tries to contact the config server and reload the shard registry until it succeeds or
- * is interrupted.
+ * Tries to contact the config server and reload the shard registry and the cluster ID until it
+ * succeeds or is interrupted.
*/
Status reloadShardRegistryUntilSuccess(OperationContext* txn);