diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_initialization_mongod.cpp | 107 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_initialization_mongod.h | 11 |
3 files changed, 84 insertions, 42 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 9d8162067fc..c34fe99d4f7 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -779,12 +779,8 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook const auto configsvrConnStr = Grid::get(opCtx)->shardRegistry()->getConfigShard()->getConnString(); - status = ShardingInitializationMongoD::get(opCtx)->updateShardIdentityConfigString( - opCtx, configsvrConnStr); - if (!status.isOK()) { - warning() << "error encountered while trying to update config connection string to " - << configsvrConnStr << causedBy(status); - } + ShardingInitializationMongoD::get(opCtx)->updateShardIdentityConfigString(opCtx, + configsvrConnStr); CatalogCacheLoader::get(_service).onStepUp(); ChunkSplitter::get(_service).onStepUp(); diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp index 1902d884cee..7bee0b6a789 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.cpp +++ b/src/mongo/db/s/sharding_initialization_mongod.cpp @@ -67,6 +67,10 @@ #include "mongo/util/log.h" namespace mongo { + +// Failpoint for disabling updateShardIdentityConfigString calls on signaled RS nodes. +MONGO_FAIL_POINT_DEFINE(failUpdateShardIdentityConfigString); + namespace { const auto getInstance = ServiceContext::declareDecoration<ShardingInitializationMongoD>(); @@ -83,41 +87,73 @@ auto makeEgressHooksList(ServiceContext* service) { /** * Updates the config server field of the shardIdentity document with the given connection string if * setName is equal to the config server replica set name. - * - * NOTE: This is intended to be used on a new thread that hasn't called Client::initThread. - * - * One example use case is for the ReplicaSetMonitor asynchronous callback when it detects changes - * to replica set membership. */ -void updateShardIdentityConfigStringCB(const std::string& setName, - const std::string& newConnectionString) { - auto configsvrConnStr = - Grid::get(getGlobalServiceContext())->shardRegistry()->getConfigServerConnectionString(); - if (configsvrConnStr.getSetName() != setName) { - // Ignore all change notification for other sets that are not the config server. - return; - } +class ShardingReplicaSetChangeListener final : public ReplicaSetChangeNotifier::Listener { +public: + ShardingReplicaSetChangeListener(ServiceContext* serviceContext) + : _serviceContext(serviceContext) {} + ~ShardingReplicaSetChangeListener() final = default; + + void onFoundSet(const Key&) final {} + + // Update the shard identy config string + void onConfirmedSet(const State& state) final { + auto connStr = state.connStr; + + auto fun = [ serviceContext = _serviceContext, connStr ](auto args) { + if (ErrorCodes::isCancelationError(args.status.code())) { + return; + } + uassertStatusOK(args.status); + + LOG(0) << "Updating config server with confirmed set " << connStr; + Grid::get(serviceContext)->shardRegistry()->updateReplSetHosts(connStr); + + if (MONGO_FAIL_POINT(failUpdateShardIdentityConfigString)) { + return; + } + + auto configsvrConnStr = + Grid::get(serviceContext)->shardRegistry()->getConfigServerConnectionString(); - Client::initThread("updateShardIdentityConfigConnString"); - auto uniqOpCtx = Client::getCurrent()->makeOperationContext(); + // Only proceed if the notification is for the configsvr + if (configsvrConnStr.getSetName() != connStr.getSetName()) { + return; + } + + ThreadClient tc("updateShardIdentityConfigString", serviceContext); + auto opCtx = tc->makeOperationContext(); + + ShardingInitializationMongoD::updateShardIdentityConfigString(opCtx.get(), connStr); + }; - auto status = ShardingInitializationMongoD::updateShardIdentityConfigString( - uniqOpCtx.get(), uassertStatusOK(ConnectionString::parse(newConnectionString))); - if (!status.isOK() && !ErrorCodes::isNotMasterError(status.code())) { - warning() << "Error encountered while trying to update config connection string to " - << newConnectionString << causedBy(redact(status)); + auto executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor(); + auto schedStatus = executor->scheduleWork(std::move(fun)).getStatus(); + if (ErrorCodes::isCancelationError(schedStatus.code())) { + LOG(2) << "Unable to schedule confirmed set update due to " << schedStatus; + return; + } + uassertStatusOK(schedStatus); } -} + void onPossibleSet(const State& state) final { + Grid::get(_serviceContext)->shardRegistry()->updateReplSetHosts(state.connStr); + } + void onDroppedSet(const Key&) final {} -void initializeShardingEnvironmentOnShardServer(OperationContext* opCtx, - const ShardIdentity& shardIdentity, - StringData distLockProcessId) { +private: + ServiceContext* _serviceContext; +}; + +} // namespace + +void ShardingInitializationMongoD::initializeShardingEnvironmentOnShardServer( + OperationContext* opCtx, const ShardIdentity& shardIdentity, StringData distLockProcessId) { initializeGlobalShardingStateForMongoD( opCtx, shardIdentity.getConfigsvrConnectionString(), distLockProcessId); - ReplicaSetMonitor::setSynchronousConfigChangeHook( - &ShardRegistry::replicaSetChangeShardRegistryUpdateHook); - ReplicaSetMonitor::setAsynchronousConfigChangeHook(&updateShardIdentityConfigStringCB); + _replicaSetChangeListener = + ReplicaSetMonitor::getNotifier().makeListener<ShardingReplicaSetChangeListener>( + opCtx->getServiceContext()); // Determine primary/secondary/standalone state in order to properly initialize sharding // components. @@ -141,10 +177,10 @@ void initializeShardingEnvironmentOnShardServer(OperationContext* opCtx, << (isStandaloneOrPrimary ? "primary" : "secondary") << " node."; } -} // namespace - ShardingInitializationMongoD::ShardingInitializationMongoD() - : _initFunc(initializeShardingEnvironmentOnShardServer) {} + : _initFunc([this](auto... args) { + this->initializeShardingEnvironmentOnShardServer(std::forward<decltype(args)>(args)...); + }) {} ShardingInitializationMongoD::~ShardingInitializationMongoD() = default; @@ -166,6 +202,7 @@ void ShardingInitializationMongoD::shutDown(OperationContext* opCtx) { grid->getExecutorPool()->shutdownAndJoin(); grid->catalogClient()->shutDown(opCtx); grid->shardRegistry()->shutdown(); + _replicaSetChangeListener.reset(); } bool ShardingInitializationMongoD::initializeShardingAwarenessIfNeeded(OperationContext* opCtx) { @@ -306,7 +343,7 @@ void ShardingInitializationMongoD::initializeFromShardIdentity( } } -Status ShardingInitializationMongoD::updateShardIdentityConfigString( +void ShardingInitializationMongoD::updateShardIdentityConfigString( OperationContext* opCtx, const ConnectionString& newConnectionString) { BSONObj updateObj( ShardIdentityType::createConfigServerUpdateObject(newConnectionString.toString())); @@ -328,10 +365,12 @@ Status ShardingInitializationMongoD::updateShardIdentityConfigString( << newConnectionString; } } catch (const DBException& exception) { - return exception.toStatus(); + auto status = exception.toStatus(); + if (!ErrorCodes::isNotMasterError(status.code())) { + warning() << "Error encountered while trying to update config connection string to " + << newConnectionString.toString() << causedBy(redact(status)); + } } - - return Status::OK(); } void initializeGlobalShardingStateForMongoD(OperationContext* opCtx, diff --git a/src/mongo/db/s/sharding_initialization_mongod.h b/src/mongo/db/s/sharding_initialization_mongod.h index e26302b4bb2..a205d68d1b2 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.h +++ b/src/mongo/db/s/sharding_initialization_mongod.h @@ -30,6 +30,7 @@ #pragma once #include "mongo/base/string_data.h" +#include "mongo/client/replica_set_change_notifier.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/stdx/functional.h" @@ -60,6 +61,10 @@ public: static ShardingInitializationMongoD* get(OperationContext* opCtx); static ShardingInitializationMongoD* get(ServiceContext* service); + void initializeShardingEnvironmentOnShardServer(OperationContext* opCtx, + const ShardIdentity& shardIdentity, + StringData distLockProcessId); + /** * If started with --shardsvr, initializes sharding awareness from the shardIdentity document on * disk, if there is one. @@ -94,8 +99,8 @@ public: * Updates the config server field of the shardIdentity document with the given connection * string. */ - static Status updateShardIdentityConfigString(OperationContext* opCtx, - const ConnectionString& newConnectionString); + static void updateShardIdentityConfigString(OperationContext* opCtx, + const ConnectionString& newConnectionString); /** * For testing only. Mock the initialization method used by initializeFromConfigConnString and @@ -112,6 +117,8 @@ private: // Function for initializing the sharding environment components (i.e. everything on the Grid) ShardingEnvironmentInitFunc _initFunc; + + ReplicaSetChangeListenerHandle _replicaSetChangeListener; }; /** |