summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2016-07-12 17:51:20 -0400
committerSpencer T Brody <spencer@mongodb.com>2016-07-13 11:39:22 -0400
commit8ea795c3be185102d80e75257189e0385ee476fe (patch)
treeb325d1c0a0fd35abaa2b52da47b289e0127deb90
parentff285b342fd98416a458a070f05e62c274028c89 (diff)
downloadmongo-8ea795c3be185102d80e75257189e0385ee476fe.tar.gz
SERVER-24817 Separate out logic for loading cluster ID and accessing it
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp8
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp9
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp9
-rw-r--r--src/mongo/s/cluster_identity_loader.cpp20
-rw-r--r--src/mongo/s/cluster_identity_loader.h22
-rw-r--r--src/mongo/s/cluster_identity_loader_test.cpp34
-rw-r--r--src/mongo/s/sharding_initialization.cpp9
7 files changed, 57 insertions, 54 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index f140580fbcc..84019da8db1 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -502,8 +502,12 @@ void ReplicationCoordinatorExternalStateImpl::shardingOnDrainingStateHook(Operat
// readConcern in drain mode because the global lock prevents replication. This is
// safe, since if the clusterId write is rolled back, any writes that depend on it will
// also be rolled back.
- ClusterIdentityLoader::get(txn)->getClusterId(
- txn, repl::ReadConcernLevel::kLocalReadConcern);
+ // Since we *just* wrote the cluster ID to the config.version document (via
+ // ShardingCatalogManager::initializeConfigDatabaseIfNeeded), this should always
+ // succeed.
+ fassertStatusOK(40217,
+ ClusterIdentityLoader::get(txn)->loadClusterId(
+ txn, repl::ReadConcernLevel::kLocalReadConcern));
}
// Free any leftover locks from previous instantiations
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp
index ac7d4dd759e..f3405100655 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp
@@ -92,14 +92,13 @@ protected:
// TODO: use kLocalReadConcern once this test is switched to using the
// ConfigServerTestFixture.
auto future = launchAsync([&] {
- auto clusterId =
- assertGet(ClusterIdentityLoader::get(operationContext())
- ->getClusterId(operationContext(),
- repl::ReadConcernLevel::kMajorityReadConcern));
- ASSERT_EQUALS(_clusterId, clusterId);
+ ASSERT_OK(ClusterIdentityLoader::get(operationContext())
+ ->loadClusterId(operationContext(),
+ repl::ReadConcernLevel::kMajorityReadConcern));
});
expectGetConfigVersion();
future.timed_get(kFutureTimeout);
+ ASSERT_EQUALS(_clusterId, ClusterIdentityLoader::get(operationContext())->getClusterId());
}
/**
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
index b61829a76ab..422821a03df 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
@@ -499,18 +499,11 @@ StatusWith<string> ShardingCatalogManagerImpl::addShard(
shardType.setMaxSizeMB(maxSize);
}
- auto clusterIdentity = ClusterIdentityLoader::get(txn);
- auto clusterId =
- clusterIdentity->getClusterId(txn, repl::ReadConcernLevel::kMajorityReadConcern);
- if (!clusterId.isOK()) {
- return clusterId.getStatus();
- }
-
ShardIdentityType shardIdentity;
shardIdentity.setConfigsvrConnString(
Grid::get(txn)->shardRegistry()->getConfigServerConnectionString());
shardIdentity.setShardName(shardType.getName());
- shardIdentity.setClusterId(clusterId.getValue());
+ shardIdentity.setClusterId(ClusterIdentityLoader::get(txn)->getClusterId());
auto validateStatus = shardIdentity.validate();
if (!validateStatus.isOK()) {
return validateStatus;
diff --git a/src/mongo/s/cluster_identity_loader.cpp b/src/mongo/s/cluster_identity_loader.cpp
index 2e9a4d228c4..dc2ae335f94 100644
--- a/src/mongo/s/cluster_identity_loader.cpp
+++ b/src/mongo/s/cluster_identity_loader.cpp
@@ -54,26 +54,32 @@ ClusterIdentityLoader* ClusterIdentityLoader::get(OperationContext* operationCon
return ClusterIdentityLoader::get(operationContext->getServiceContext());
}
-StatusWith<OID> ClusterIdentityLoader::getClusterId(
- OperationContext* txn, const repl::ReadConcernLevel& readConcernLevel) {
+OID ClusterIdentityLoader::getClusterId() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ invariant(_initializationState == InitializationState::kInitialized && _lastLoadResult.isOK());
+ return _lastLoadResult.getValue();
+}
+
+Status ClusterIdentityLoader::loadClusterId(OperationContext* txn,
+ const repl::ReadConcernLevel& readConcernLevel) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
if (_initializationState == InitializationState::kInitialized) {
invariant(_lastLoadResult.isOK());
- return _lastLoadResult;
+ return Status::OK();
}
if (_initializationState == InitializationState::kLoading) {
while (_initializationState == InitializationState::kLoading) {
_inReloadCV.wait(lk);
}
- return _lastLoadResult;
+ return _lastLoadResult.getStatus();
}
invariant(_initializationState == InitializationState::kUninitialized);
_initializationState = InitializationState::kLoading;
lk.unlock();
- auto loadStatus = _loadClusterId(txn, readConcernLevel);
+ auto loadStatus = _fetchClusterIdFromConfig(txn, readConcernLevel);
lk.lock();
invariant(_initializationState == InitializationState::kLoading);
@@ -84,10 +90,10 @@ StatusWith<OID> ClusterIdentityLoader::getClusterId(
_initializationState = InitializationState::kUninitialized;
}
_inReloadCV.notify_all();
- return _lastLoadResult;
+ return _lastLoadResult.getStatus();
}
-StatusWith<OID> ClusterIdentityLoader::_loadClusterId(
+StatusWith<OID> ClusterIdentityLoader::_fetchClusterIdFromConfig(
OperationContext* txn, const repl::ReadConcernLevel& readConcernLevel) {
auto catalogClient = Grid::get(txn)->catalogClient(txn);
auto loadResult = catalogClient->getConfigVersion(txn, readConcernLevel);
diff --git a/src/mongo/s/cluster_identity_loader.h b/src/mongo/s/cluster_identity_loader.h
index 33c3bc06c00..25645111ef1 100644
--- a/src/mongo/s/cluster_identity_loader.h
+++ b/src/mongo/s/cluster_identity_loader.h
@@ -59,15 +59,19 @@ public:
static ClusterIdentityLoader* get(ServiceContext* serviceContext);
static ClusterIdentityLoader* get(OperationContext* operationContext);
+ /*
+ * Returns the cached cluster ID. Invalid to call unless loadClusterId has previously been
+ * called and returned success.
+ */
+ OID getClusterId();
+
/**
- * Returns the cluster ID. If the cluster ID has been successfully loaded in the past, will
- * return the cached version which will be stored in _lastLoadResult. If we've never
- * successfully loaded the cluster ID, will attempt to load it from the config.version
- * collection on the config servers, or if another thread is already in the process of loading
- * it, will wait for that thread to finish and then return its results.
+ * Loads the cluster ID from the config server's config.version collection and stores it into
+ * _lastLoadResult. If the cluster ID has previously been successfully loaded, this is a no-op.
+ * If another thread is already in the process of loading the cluster ID, concurrent calls will
+ * wait for that thread to finish and then return its results.
*/
- StatusWith<OID> getClusterId(OperationContext* txn,
- const repl::ReadConcernLevel& readConcernLevel);
+ Status loadClusterId(OperationContext* txn, const repl::ReadConcernLevel& readConcernLevel);
private:
enum class InitializationState {
@@ -80,8 +84,8 @@ private:
* Queries the config.version collection on the config server, extracts the cluster ID from
* the version document, and returns it.
*/
- StatusWith<OID> _loadClusterId(OperationContext* txn,
- const repl::ReadConcernLevel& readConcernLevel);
+ StatusWith<OID> _fetchClusterIdFromConfig(OperationContext* txn,
+ const repl::ReadConcernLevel& readConcernLevel);
stdx::mutex _mutex;
stdx::condition_variable _inReloadCV;
diff --git a/src/mongo/s/cluster_identity_loader_test.cpp b/src/mongo/s/cluster_identity_loader_test.cpp
index 7797a437e32..c3aff614524 100644
--- a/src/mongo/s/cluster_identity_loader_test.cpp
+++ b/src/mongo/s/cluster_identity_loader_test.cpp
@@ -111,9 +111,9 @@ TEST_F(ClusterIdentityTest, BasicLoadSuccess) {
auto future = launchAsync([&] {
auto clusterIdStatus =
ClusterIdentityLoader::get(operationContext())
- ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
+ ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
ASSERT_OK(clusterIdStatus);
- ASSERT_EQUALS(clusterId, clusterIdStatus.getValue());
+ ASSERT_EQUALS(clusterId, ClusterIdentityLoader::get(operationContext())->getClusterId());
});
expectConfigVersionLoad(clusterId);
@@ -122,11 +122,9 @@ TEST_F(ClusterIdentityTest, BasicLoadSuccess) {
// Subsequent requests for the cluster ID should not require any network traffic as we consult
// the cached version.
- auto clusterIdStatus =
+ ASSERT_OK(
ClusterIdentityLoader::get(operationContext())
- ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
- ASSERT_OK(clusterIdStatus);
- ASSERT_EQUALS(clusterId, clusterIdStatus.getValue());
+ ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern));
}
TEST_F(ClusterIdentityTest, MultipleThreadsLoadingSuccess) {
@@ -135,23 +133,23 @@ TEST_F(ClusterIdentityTest, MultipleThreadsLoadingSuccess) {
auto future1 = launchAsync([&] {
auto clusterIdStatus =
ClusterIdentityLoader::get(operationContext())
- ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
+ ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
ASSERT_OK(clusterIdStatus);
- ASSERT_EQUALS(clusterId, clusterIdStatus.getValue());
+ ASSERT_EQUALS(clusterId, ClusterIdentityLoader::get(operationContext())->getClusterId());
});
auto future2 = launchAsync([&] {
auto clusterIdStatus =
ClusterIdentityLoader::get(operationContext())
- ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
+ ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
ASSERT_OK(clusterIdStatus);
- ASSERT_EQUALS(clusterId, clusterIdStatus.getValue());
+ ASSERT_EQUALS(clusterId, ClusterIdentityLoader::get(operationContext())->getClusterId());
});
auto future3 = launchAsync([&] {
auto clusterIdStatus =
ClusterIdentityLoader::get(operationContext())
- ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
+ ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
ASSERT_OK(clusterIdStatus);
- ASSERT_EQUALS(clusterId, clusterIdStatus.getValue());
+ ASSERT_EQUALS(clusterId, ClusterIdentityLoader::get(operationContext())->getClusterId());
});
expectConfigVersionLoad(clusterId);
@@ -167,7 +165,7 @@ TEST_F(ClusterIdentityTest, BasicLoadFailureFollowedBySuccess) {
auto future = launchAsync([&] {
auto clusterIdStatus =
ClusterIdentityLoader::get(operationContext())
- ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
+ ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
ASSERT_EQUALS(ErrorCodes::Interrupted, clusterIdStatus);
});
@@ -180,9 +178,9 @@ TEST_F(ClusterIdentityTest, BasicLoadFailureFollowedBySuccess) {
future = launchAsync([&] {
auto clusterIdStatus =
ClusterIdentityLoader::get(operationContext())
- ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
+ ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
ASSERT_OK(clusterIdStatus);
- ASSERT_EQUALS(clusterId, clusterIdStatus.getValue());
+ ASSERT_EQUALS(clusterId, ClusterIdentityLoader::get(operationContext())->getClusterId());
});
expectConfigVersionLoad(clusterId);
@@ -196,19 +194,19 @@ TEST_F(ClusterIdentityTest, MultipleThreadsLoadFailure) {
auto future1 = launchAsync([&] {
auto clusterIdStatus =
ClusterIdentityLoader::get(operationContext())
- ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
+ ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
ASSERT_EQUALS(ErrorCodes::Interrupted, clusterIdStatus);
});
auto future2 = launchAsync([&] {
auto clusterIdStatus =
ClusterIdentityLoader::get(operationContext())
- ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
+ ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
ASSERT_EQUALS(ErrorCodes::Interrupted, clusterIdStatus);
});
auto future3 = launchAsync([&] {
auto clusterIdStatus =
ClusterIdentityLoader::get(operationContext())
- ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
+ ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
ASSERT_EQUALS(ErrorCodes::Interrupted, clusterIdStatus);
});
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index c7999aaace5..62f6232952f 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -201,13 +201,12 @@ Status reloadShardRegistryUntilSuccess(OperationContext* txn) {
}
try {
- auto clusterIdentity = ClusterIdentityLoader::get(txn);
- auto clusterId =
- clusterIdentity->getClusterId(txn, repl::ReadConcernLevel::kMajorityReadConcern);
- if (!clusterId.isOK()) {
+ auto status = ClusterIdentityLoader::get(txn)->loadClusterId(
+ txn, repl::ReadConcernLevel::kMajorityReadConcern);
+ if (!status.isOK()) {
warning()
<< "Error initializing sharding state, sleeping for 2 seconds and trying again"
- << causedBy(clusterId.getStatus());
+ << causedBy(status);
sleepmillis(2000);
continue;
}