diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2016-07-11 18:43:09 -0400 |
---|---|---|
committer | Esha Maharishi <esha.maharishi@mongodb.com> | 2016-07-12 15:11:40 -0400 |
commit | 638e2163eb1f77b20a0373fc0f04a02715c1d3c8 (patch) | |
tree | 6aecc1bbc044693375fc275505adcf963e034ef1 /src/mongo | |
parent | 72d7a5572ff5b83800bb0615380d0da497d60084 (diff) | |
download | mongo-638e2163eb1f77b20a0373fc0f04a02715c1d3c8.tar.gz |
SERVER-25003 load clusterId into memory on config server transition to primary if needed
Diffstat (limited to 'src/mongo')
7 files changed, 54 insertions, 23 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 5d8f6469117..81ed9c16bf3 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -71,6 +71,7 @@ #include "mongo/s/balancer/balancer.h" #include "mongo/s/catalog/sharding_catalog_manager.h" #include "mongo/s/client/shard_registry.h" +#include "mongo/s/cluster_identity_loader.h" #include "mongo/s/grid.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/memory.h" @@ -479,7 +480,7 @@ void ReplicationCoordinatorExternalStateImpl::shardingOnDrainingStateHook(Operat if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { status = Grid::get(txn)->catalogManager()->initializeConfigDatabaseIfNeeded(txn); - if (!status.isOK()) { + if (!status.isOK() && status != ErrorCodes::AlreadyInitialized) { if (status == ErrorCodes::ShutdownInProgress || status == ErrorCodes::InterruptedAtShutdown) { // Don't fassert if we're mid-shutdown, let the shutdown happen gracefully. @@ -494,6 +495,15 @@ void ReplicationCoordinatorExternalStateImpl::shardingOnDrainingStateHook(Operat << causedBy(status))); } + if (status != ErrorCodes::AlreadyInitialized) { + // Load the clusterId into memory. Use local readConcern, since we can't use majority + // readConcern in drain mode because the global lock prevents replication. This is + // safe, since if the clusterId write is rolled back, any writes that depend on it will + // also be rolled back. + ClusterIdentityLoader::get(txn)->getClusterId( + txn, repl::ReadConcernLevel::kLocalReadConcern); + } + // Free any leftover locks from previous instantiations auto distLockManager = Grid::get(txn)->catalogClient(txn)->getDistLockManager(); distLockManager->unlockAll(txn, distLockManager->getProcessID()); diff --git a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp index baa3ad957da..53aa481fdbc 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_add_shard_test.cpp @@ -90,9 +90,13 @@ protected: // Ensure the cluster ID has been loaded and cached so that future requests for the cluster // ID will not require any network traffic. + // TODO: use kLocalReadConcern once this test is switched to using the + // ConfigServerTestFixture. auto future = launchAsync([&] { - auto clusterId = assertGet( - ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext())); + auto clusterId = + assertGet(ClusterIdentityLoader::get(operationContext()) + ->getClusterId(operationContext(), + repl::ReadConcernLevel::kMajorityReadConcern)); ASSERT_EQUALS(_clusterId, clusterId); }); expectGetConfigVersion(); diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp index c956e25e4df..b61829a76ab 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp @@ -500,7 +500,8 @@ StatusWith<string> ShardingCatalogManagerImpl::addShard( } auto clusterIdentity = ClusterIdentityLoader::get(txn); - auto clusterId = clusterIdentity->getClusterId(txn); + auto clusterId = + clusterIdentity->getClusterId(txn, repl::ReadConcernLevel::kMajorityReadConcern); if (!clusterId.isOK()) { return clusterId.getStatus(); } @@ -736,7 +737,8 @@ Status ShardingCatalogManagerImpl::initializeConfigDatabaseIfNeeded(OperationCon { stdx::lock_guard<stdx::mutex> lk(_mutex); if (_configInitialized) { - return Status::OK(); + return {ErrorCodes::AlreadyInitialized, + "Config database was previously loaded into memory"}; } } diff --git a/src/mongo/s/cluster_identity_loader.cpp b/src/mongo/s/cluster_identity_loader.cpp index b97c30bab08..2e9a4d228c4 100644 --- a/src/mongo/s/cluster_identity_loader.cpp +++ b/src/mongo/s/cluster_identity_loader.cpp @@ -54,7 +54,8 @@ ClusterIdentityLoader* ClusterIdentityLoader::get(OperationContext* operationCon return ClusterIdentityLoader::get(operationContext->getServiceContext()); } -StatusWith<OID> ClusterIdentityLoader::getClusterId(OperationContext* txn) { +StatusWith<OID> ClusterIdentityLoader::getClusterId( + OperationContext* txn, const repl::ReadConcernLevel& readConcernLevel) { stdx::unique_lock<stdx::mutex> lk(_mutex); if (_initializationState == InitializationState::kInitialized) { invariant(_lastLoadResult.isOK()); @@ -72,7 +73,7 @@ StatusWith<OID> ClusterIdentityLoader::getClusterId(OperationContext* txn) { _initializationState = InitializationState::kLoading; lk.unlock(); - auto loadStatus = _loadClusterId(txn); + auto loadStatus = _loadClusterId(txn, readConcernLevel); lk.lock(); invariant(_initializationState == InitializationState::kLoading); @@ -86,10 +87,10 @@ StatusWith<OID> ClusterIdentityLoader::getClusterId(OperationContext* txn) { return _lastLoadResult; } -StatusWith<OID> ClusterIdentityLoader::_loadClusterId(OperationContext* txn) { +StatusWith<OID> ClusterIdentityLoader::_loadClusterId( + OperationContext* txn, const repl::ReadConcernLevel& readConcernLevel) { auto catalogClient = Grid::get(txn)->catalogClient(txn); - auto loadResult = - catalogClient->getConfigVersion(txn, repl::ReadConcernLevel::kMajorityReadConcern); + auto loadResult = catalogClient->getConfigVersion(txn, readConcernLevel); if (!loadResult.isOK()) { return Status(loadResult.getStatus().code(), str::stream() << "Error loading clusterID" diff --git a/src/mongo/s/cluster_identity_loader.h b/src/mongo/s/cluster_identity_loader.h index c1cd798186b..33c3bc06c00 100644 --- a/src/mongo/s/cluster_identity_loader.h +++ b/src/mongo/s/cluster_identity_loader.h @@ -32,6 +32,7 @@ #include "mongo/base/disallow_copying.h" #include "mongo/bson/oid.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" @@ -65,7 +66,8 @@ public: * collection on the config servers, or if another thread is already in the process of loading * it, will wait for that thread to finish and then return its results. */ - StatusWith<OID> getClusterId(OperationContext* txn); + StatusWith<OID> getClusterId(OperationContext* txn, + const repl::ReadConcernLevel& readConcernLevel); private: enum class InitializationState { @@ -78,7 +80,8 @@ private: * Queries the config.version collection on the config server, extracts the cluster ID from * the version document, and returns it. */ - StatusWith<OID> _loadClusterId(OperationContext* txn); + StatusWith<OID> _loadClusterId(OperationContext* txn, + const repl::ReadConcernLevel& readConcernLevel); stdx::mutex _mutex; stdx::condition_variable _inReloadCV; diff --git a/src/mongo/s/cluster_identity_loader_test.cpp b/src/mongo/s/cluster_identity_loader_test.cpp index 19ced5e34fc..7797a437e32 100644 --- a/src/mongo/s/cluster_identity_loader_test.cpp +++ b/src/mongo/s/cluster_identity_loader_test.cpp @@ -110,7 +110,8 @@ TEST_F(ClusterIdentityTest, BasicLoadSuccess) { // The first time you ask for the cluster ID it will have to be loaded from the config servers. auto future = launchAsync([&] { auto clusterIdStatus = - ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext()); + ClusterIdentityLoader::get(operationContext()) + ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_OK(clusterIdStatus); ASSERT_EQUALS(clusterId, clusterIdStatus.getValue()); }); @@ -122,7 +123,8 @@ TEST_F(ClusterIdentityTest, BasicLoadSuccess) { // Subsequent requests for the cluster ID should not require any network traffic as we consult // the cached version. auto clusterIdStatus = - ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext()); + ClusterIdentityLoader::get(operationContext()) + ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_OK(clusterIdStatus); ASSERT_EQUALS(clusterId, clusterIdStatus.getValue()); } @@ -132,19 +134,22 @@ TEST_F(ClusterIdentityTest, MultipleThreadsLoadingSuccess) { // operation. auto future1 = launchAsync([&] { auto clusterIdStatus = - ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext()); + ClusterIdentityLoader::get(operationContext()) + ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_OK(clusterIdStatus); ASSERT_EQUALS(clusterId, clusterIdStatus.getValue()); }); auto future2 = launchAsync([&] { auto clusterIdStatus = - ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext()); + ClusterIdentityLoader::get(operationContext()) + ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_OK(clusterIdStatus); ASSERT_EQUALS(clusterId, clusterIdStatus.getValue()); }); auto future3 = launchAsync([&] { auto clusterIdStatus = - ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext()); + ClusterIdentityLoader::get(operationContext()) + ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_OK(clusterIdStatus); ASSERT_EQUALS(clusterId, clusterIdStatus.getValue()); }); @@ -161,7 +166,8 @@ TEST_F(ClusterIdentityTest, BasicLoadFailureFollowedBySuccess) { // The first time you ask for the cluster ID it will have to be loaded from the config servers. auto future = launchAsync([&] { auto clusterIdStatus = - ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext()); + ClusterIdentityLoader::get(operationContext()) + ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_EQUALS(ErrorCodes::Interrupted, clusterIdStatus); }); @@ -173,7 +179,8 @@ TEST_F(ClusterIdentityTest, BasicLoadFailureFollowedBySuccess) { // retry loading it. future = launchAsync([&] { auto clusterIdStatus = - ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext()); + ClusterIdentityLoader::get(operationContext()) + ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_OK(clusterIdStatus); ASSERT_EQUALS(clusterId, clusterIdStatus.getValue()); }); @@ -188,17 +195,20 @@ TEST_F(ClusterIdentityTest, MultipleThreadsLoadFailure) { // operation. auto future1 = launchAsync([&] { auto clusterIdStatus = - ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext()); + ClusterIdentityLoader::get(operationContext()) + ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_EQUALS(ErrorCodes::Interrupted, clusterIdStatus); }); auto future2 = launchAsync([&] { auto clusterIdStatus = - ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext()); + ClusterIdentityLoader::get(operationContext()) + ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_EQUALS(ErrorCodes::Interrupted, clusterIdStatus); }); auto future3 = launchAsync([&] { auto clusterIdStatus = - ClusterIdentityLoader::get(operationContext())->getClusterId(operationContext()); + ClusterIdentityLoader::get(operationContext()) + ->getClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_EQUALS(ErrorCodes::Interrupted, clusterIdStatus); }); diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 9d256fa4229..c7999aaace5 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -202,7 +202,8 @@ Status reloadShardRegistryUntilSuccess(OperationContext* txn) { try { auto clusterIdentity = ClusterIdentityLoader::get(txn); - auto clusterId = clusterIdentity->getClusterId(txn); + auto clusterId = + clusterIdentity->getClusterId(txn, repl::ReadConcernLevel::kMajorityReadConcern); if (!clusterId.isOK()) { warning() << "Error initializing sharding state, sleeping for 2 seconds and trying again" |