diff options
author | Spencer T Brody <spencer@mongodb.com> | 2016-07-12 17:51:20 -0400 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2016-07-13 11:39:22 -0400 |
commit | 8ea795c3be185102d80e75257189e0385ee476fe (patch) | |
tree | b325d1c0a0fd35abaa2b52da47b289e0127deb90 | |
parent | ff285b342fd98416a458a070f05e62c274028c89 (diff) | |
download | mongo-8ea795c3be185102d80e75257189e0385ee476fe.tar.gz |
SERVER-24817 Separate out logic for loading cluster ID and accessing it
7 files changed, 57 insertions, 54 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index f140580fbcc..84019da8db1 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -502,8 +502,12 @@ void ReplicationCoordinatorExternalStateImpl::shardingOnDrainingStateHook(Operat // readConcern in drain mode because the global lock prevents replication. This is // safe, since if the clusterId write is rolled back, any writes that depend on it will // also be rolled back. - ClusterIdentityLoader::get(txn)->getClusterId( - txn, repl::ReadConcernLevel::kLocalReadConcern); + // Since we *just* wrote the cluster ID to the config.version document (via + // ShardingCatalogManager::initializeConfigDatabaseIfNeeded), this should always + // succeed. + fassertStatusOK(40217, + ClusterIdentityLoader::get(txn)->loadClusterId( + txn, repl::ReadConcernLevel::kLocalReadConcern)); } // Free any leftover locks from previous instantiations diff --git a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp index ac7d4dd759e..f3405100655 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp @@ -92,14 +92,13 @@ protected: // TODO: use kLocalReadConcern once this test is switched to using the // ConfigServerTestFixture. auto future = launchAsync([&] { - auto clusterId = - assertGet(ClusterIdentityLoader::get(operationContext()) - ->getClusterId(operationContext(), - repl::ReadConcernLevel::kMajorityReadConcern)); - ASSERT_EQUALS(_clusterId, clusterId); + ASSERT_OK(ClusterIdentityLoader::get(operationContext()) + ->loadClusterId(operationContext(), + repl::ReadConcernLevel::kMajorityReadConcern)); }); expectGetConfigVersion(); future.timed_get(kFutureTimeout); + ASSERT_EQUALS(_clusterId, ClusterIdentityLoader::get(operationContext())->getClusterId()); } /** diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp index b61829a76ab..422821a03df 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp @@ -499,18 +499,11 @@ StatusWith<string> ShardingCatalogManagerImpl::addShard( shardType.setMaxSizeMB(maxSize); } - auto clusterIdentity = ClusterIdentityLoader::get(txn); - auto clusterId = - clusterIdentity->getClusterId(txn, repl::ReadConcernLevel::kMajorityReadConcern); - if (!clusterId.isOK()) { - return clusterId.getStatus(); - } - ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( Grid::get(txn)->shardRegistry()->getConfigServerConnectionString()); shardIdentity.setShardName(shardType.getName()); - shardIdentity.setClusterId(clusterId.getValue()); + shardIdentity.setClusterId(ClusterIdentityLoader::get(txn)->getClusterId()); auto validateStatus = shardIdentity.validate(); if (!validateStatus.isOK()) { return validateStatus; diff --git a/src/mongo/s/cluster_identity_loader.cpp b/src/mongo/s/cluster_identity_loader.cpp index 2e9a4d228c4..dc2ae335f94 100644 --- a/src/mongo/s/cluster_identity_loader.cpp +++ b/src/mongo/s/cluster_identity_loader.cpp @@ -54,26 +54,32 @@ ClusterIdentityLoader* ClusterIdentityLoader::get(OperationContext* operationCon return ClusterIdentityLoader::get(operationContext->getServiceContext()); } -StatusWith<OID> ClusterIdentityLoader::getClusterId( - OperationContext* txn, const repl::ReadConcernLevel& readConcernLevel) { +OID ClusterIdentityLoader::getClusterId() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + invariant(_initializationState == InitializationState::kInitialized && _lastLoadResult.isOK()); + return _lastLoadResult.getValue(); +} + +Status ClusterIdentityLoader::loadClusterId(OperationContext* txn, + const repl::ReadConcernLevel& readConcernLevel) { stdx::unique_lock<stdx::mutex> lk(_mutex); if (_initializationState == InitializationState::kInitialized) { invariant(_lastLoadResult.isOK()); - return _lastLoadResult; + return Status::OK(); } if (_initializationState == InitializationState::kLoading) { while (_initializationState == InitializationState::kLoading) { _inReloadCV.wait(lk); } - return _lastLoadResult; + return _lastLoadResult.getStatus(); } invariant(_initializationState == InitializationState::kUninitialized); _initializationState = InitializationState::kLoading; lk.unlock(); - auto loadStatus = _loadClusterId(txn, readConcernLevel); + auto loadStatus = _fetchClusterIdFromConfig(txn, readConcernLevel); lk.lock(); invariant(_initializationState == InitializationState::kLoading); @@ -84,10 +90,10 @@ StatusWith<OID> ClusterIdentityLoader::getClusterId( _initializationState = InitializationState::kUninitialized; } _inReloadCV.notify_all(); - return _lastLoadResult; + return _lastLoadResult.getStatus(); } -StatusWith<OID> ClusterIdentityLoader::_loadClusterId( +StatusWith<OID> ClusterIdentityLoader::_fetchClusterIdFromConfig( OperationContext* txn, const repl::ReadConcernLevel& readConcernLevel) { auto catalogClient = Grid::get(txn)->catalogClient(txn); auto loadResult = catalogClient->getConfigVersion(txn, readConcernLevel); diff --git a/src/mongo/s/cluster_identity_loader.h b/src/mongo/s/cluster_identity_loader.h index 33c3bc06c00..25645111ef1 100644 --- a/src/mongo/s/cluster_identity_loader.h +++ b/src/mongo/s/cluster_identity_loader.h @@ -59,15 +59,19 @@ public: static ClusterIdentityLoader* get(ServiceContext* serviceContext); static ClusterIdentityLoader* get(OperationContext* operationContext); + /* + * Returns the cached cluster ID. Invalid to call unless loadClusterId has previously been + * called and returned success. + */ + OID getClusterId(); + /** - * Returns the cluster ID. If the cluster ID has been successfully loaded in the past, will - * return the cached version which will be stored in _lastLoadResult. If we've never - * successfully loaded the cluster ID, will attempt to load it from the config.version - * collection on the config servers, or if another thread is already in the process of loading - * it, will wait for that thread to finish and then return its results. + * Loads the cluster ID from the config server's config.version collection and stores it into + * _lastLoadResult. If the cluster ID has previously been successfully loaded, this is a no-op. + * If another thread is already in the process of loading the cluster ID, concurrent calls will + * wait for that thread to finish and then return its results. */ - StatusWith<OID> getClusterId(OperationContext* txn, - const repl::ReadConcernLevel& readConcernLevel); + Status loadClusterId(OperationContext* txn, const repl::ReadConcernLevel& readConcernLevel); private: enum class InitializationState { @@ -80,8 +84,8 @@ private: * Queries the config.version collection on the config server, extracts the cluster ID from * the version document, and returns it. */ - StatusWith<OID> _loadClusterId(OperationContext* txn, - const repl::ReadConcernLevel& readConcernLevel); + StatusWith<OID> _fetchClusterIdFromConfig(OperationContext* txn, + const repl::ReadConcernLevel& readConcernLevel); stdx::mutex _mutex; stdx::condition_variable _inReloadCV; diff --git a/src/mongo/s/cluster_identity_loader_test.cpp b/src/mongo/s/cluster_identity_loader_test.cpp index 7797a437e32..c3aff614524 100644 --- a/src/mongo/s/cluster_identity_loader_test.cpp +++ b/src/mongo/s/cluster_identity_loader_test.cpp @@ -111,9 +111,9 @@ TEST_F(ClusterIdentityTest, BasicLoadSuccess) { auto future = launchAsync([&] { auto clusterIdStatus = ClusterIdentityLoader::get(operationContext()) - ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); + ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_OK(clusterIdStatus); - ASSERT_EQUALS(clusterId, clusterIdStatus.getValue()); + ASSERT_EQUALS(clusterId, ClusterIdentityLoader::get(operationContext())->getClusterId()); }); expectConfigVersionLoad(clusterId); @@ -122,11 +122,9 @@ TEST_F(ClusterIdentityTest, BasicLoadSuccess) { // Subsequent requests for the cluster ID should not require any network traffic as we consult // the cached version. - auto clusterIdStatus = + ASSERT_OK( ClusterIdentityLoader::get(operationContext()) - ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); - ASSERT_OK(clusterIdStatus); - ASSERT_EQUALS(clusterId, clusterIdStatus.getValue()); + ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern)); } TEST_F(ClusterIdentityTest, MultipleThreadsLoadingSuccess) { @@ -135,23 +133,23 @@ TEST_F(ClusterIdentityTest, MultipleThreadsLoadingSuccess) { auto future1 = launchAsync([&] { auto clusterIdStatus = ClusterIdentityLoader::get(operationContext()) - ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); + ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_OK(clusterIdStatus); - ASSERT_EQUALS(clusterId, clusterIdStatus.getValue()); + ASSERT_EQUALS(clusterId, ClusterIdentityLoader::get(operationContext())->getClusterId()); }); auto future2 = launchAsync([&] { auto clusterIdStatus = ClusterIdentityLoader::get(operationContext()) - ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); + ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_OK(clusterIdStatus); - ASSERT_EQUALS(clusterId, clusterIdStatus.getValue()); + ASSERT_EQUALS(clusterId, ClusterIdentityLoader::get(operationContext())->getClusterId()); }); auto future3 = launchAsync([&] { auto clusterIdStatus = ClusterIdentityLoader::get(operationContext()) - ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); + ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_OK(clusterIdStatus); - ASSERT_EQUALS(clusterId, clusterIdStatus.getValue()); + ASSERT_EQUALS(clusterId, ClusterIdentityLoader::get(operationContext())->getClusterId()); }); expectConfigVersionLoad(clusterId); @@ -167,7 +165,7 @@ TEST_F(ClusterIdentityTest, BasicLoadFailureFollowedBySuccess) { auto future = launchAsync([&] { auto clusterIdStatus = ClusterIdentityLoader::get(operationContext()) - ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); + ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_EQUALS(ErrorCodes::Interrupted, clusterIdStatus); }); @@ -180,9 +178,9 @@ TEST_F(ClusterIdentityTest, BasicLoadFailureFollowedBySuccess) { future = launchAsync([&] { auto clusterIdStatus = ClusterIdentityLoader::get(operationContext()) - ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); + ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_OK(clusterIdStatus); - ASSERT_EQUALS(clusterId, clusterIdStatus.getValue()); + ASSERT_EQUALS(clusterId, ClusterIdentityLoader::get(operationContext())->getClusterId()); }); expectConfigVersionLoad(clusterId); @@ -196,19 +194,19 @@ TEST_F(ClusterIdentityTest, MultipleThreadsLoadFailure) { auto future1 = launchAsync([&] { auto clusterIdStatus = ClusterIdentityLoader::get(operationContext()) - ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); + ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_EQUALS(ErrorCodes::Interrupted, clusterIdStatus); }); auto future2 = launchAsync([&] { auto clusterIdStatus = ClusterIdentityLoader::get(operationContext()) - ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); + ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_EQUALS(ErrorCodes::Interrupted, clusterIdStatus); }); auto future3 = launchAsync([&] { auto clusterIdStatus = ClusterIdentityLoader::get(operationContext()) - ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); + ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_EQUALS(ErrorCodes::Interrupted, clusterIdStatus); }); diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index c7999aaace5..62f6232952f 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -201,13 +201,12 @@ Status reloadShardRegistryUntilSuccess(OperationContext* txn) { } try { - auto clusterIdentity = ClusterIdentityLoader::get(txn); - auto clusterId = - clusterIdentity->getClusterId(txn, repl::ReadConcernLevel::kMajorityReadConcern); - if (!clusterId.isOK()) { + auto status = ClusterIdentityLoader::get(txn)->loadClusterId( + txn, repl::ReadConcernLevel::kMajorityReadConcern); + if (!status.isOK()) { warning() << "Error initializing sharding state, sleeping for 2 seconds and trying again" - << causedBy(clusterId.getStatus()); + << causedBy(status); sleepmillis(2000); continue; } |