diff options
Diffstat (limited to 'src/mongo/s/sharding_initialization.cpp')
-rw-r--r-- | src/mongo/s/sharding_initialization.cpp | 85 |
1 files changed, 85 insertions, 0 deletions
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 29da54a9a30..4dc7372960e 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -44,9 +44,12 @@ #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/db/time_proof_service.h" +#include "mongo/executor/async_multicaster.h" #include "mongo/executor/connection_pool.h" +#include "mongo/executor/connection_pool_stats.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/network_interface_thread_pool.h" +#include "mongo/executor/scoped_task_executor.h" #include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/executor/thread_pool_task_executor.h" @@ -56,12 +59,14 @@ #include "mongo/s/catalog/dist_lock_catalog_impl.h" #include "mongo/s/catalog/replset_dist_lock_manager.h" #include "mongo/s/catalog/sharding_catalog_client_impl.h" +#include "mongo/s/catalog/type_shard.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/num_hosts_targeted_metrics.h" #include "mongo/s/client/shard_factory.h" #include "mongo/s/client/sharding_network_connection_hook.h" #include "mongo/s/cluster_identity_loader.h" #include "mongo/s/grid.h" +#include "mongo/s/pre_warm_connection_pool_impl.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/sharding_task_executor.h" #include "mongo/s/sharding_task_executor_pool_controller.h" @@ -141,6 +146,26 @@ std::unique_ptr<TaskExecutorPool> makeShardingTaskExecutorPool( return executorPool; } +/** + * Uses an AsyncMulticaster to ping all of the hosts in order to establish + * ShardingTaskExecutorPoolMinSize connections. This does not wait + * for the connections to be established nor does it check how many were established. + */ +void preWarmConnections(OperationContext* opCtx, std::vector<HostAndPort> allHosts) { + auto const grid = Grid::get(opCtx); + auto arbi = grid->getExecutorPool()->getArbitraryExecutor(); + auto executor = executor::ScopedTaskExecutor(arbi); + executor::AsyncMulticaster::Options options; + + auto results = + executor::AsyncMulticaster(*executor, options) + .multicast(allHosts, + "admin", + BSON("ping" << 1), + opCtx, + Milliseconds(gWarmMinConnectionsInShardingTaskExecutorPoolOnStartupWaitMS)); +} + } // namespace std::unique_ptr<executor::TaskExecutor> makeShardingTaskExecutor( @@ -245,4 +270,64 @@ Status waitForShardRegistryReload(OperationContext* opCtx) { return {ErrorCodes::ShutdownInProgress, "aborting shard loading attempt"}; } +Status preWarmConnectionPool(OperationContext* opCtx) { + if (!gWarmMinConnectionsInShardingTaskExecutorPoolOnStartup) { + return Status::OK(); + } + + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { + log() << "Pre-warming pooled connections for config server is disabled"; + return Status::OK(); + } + + // Should not be called by mongod + invariant(serverGlobalParams.clusterRole != ClusterRole::ShardServer); + + Timer timer; + std::vector<HostAndPort> allHosts; + auto const grid = Grid::get(opCtx); + auto allShardsStatus = + grid->catalogClient()->getAllShards(opCtx, repl::ReadConcernLevel::kMajorityReadConcern); + if (!allShardsStatus.isOK()) { + return allShardsStatus.getStatus(); + } + auto allShards = allShardsStatus.getValue().value; + + for (auto& shard : allShards) { + auto connStrStatus = ConnectionString::parse(shard.getHost()); + if (!connStrStatus.isOK()) { + return connStrStatus.getStatus(); + } + auto connStr = connStrStatus.getValue(); + for (const auto& hostEntry : connStr.getServers()) { + allHosts.push_back(hostEntry); + } + } + + if (allHosts.empty()) { + log() << "No hosts found to pre-warm connections to"; + return Status::OK(); + } + log() << "Pre-warming connections to " << allHosts.size() << " hosts"; + + try { + opCtx->runWithDeadline( + opCtx->getServiceContext()->getPreciseClockSource()->now() + + Milliseconds(gWarmMinConnectionsInShardingTaskExecutorPoolOnStartupWaitMS), + ErrorCodes::ExceededTimeLimit, + [&] { preWarmConnections(opCtx, allHosts); }); + } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) { + // if we've timed out, eat the exception and continue + } catch (const DBException& ex) { + warning() << "Connection pool pre-warm failure " << causedBy(ex.toStatus()) << ", timeMs " + << timer.millis(); + return ex.toStatus(); + } + + if (timer.millis() > gWarmMinConnectionsInShardingTaskExecutorPoolOnStartupWaitMS / 2) { + log() << "Slow connection pool pre-warm, timeMs " << timer.millis(); + } + return Status::OK(); +} + } // namespace mongo |