summaryrefslogtreecommitdiff
path: root/src/mongo/s/sharding_initialization.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/sharding_initialization.cpp')
-rw-r--r--src/mongo/s/sharding_initialization.cpp85
1 files changed, 85 insertions, 0 deletions
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index 29da54a9a30..4dc7372960e 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -44,9 +44,12 @@
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/db/time_proof_service.h"
+#include "mongo/executor/async_multicaster.h"
#include "mongo/executor/connection_pool.h"
+#include "mongo/executor/connection_pool_stats.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/network_interface_thread_pool.h"
+#include "mongo/executor/scoped_task_executor.h"
#include "mongo/executor/task_executor.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/executor/thread_pool_task_executor.h"
@@ -56,12 +59,14 @@
#include "mongo/s/catalog/dist_lock_catalog_impl.h"
#include "mongo/s/catalog/replset_dist_lock_manager.h"
#include "mongo/s/catalog/sharding_catalog_client_impl.h"
+#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/num_hosts_targeted_metrics.h"
#include "mongo/s/client/shard_factory.h"
#include "mongo/s/client/sharding_network_connection_hook.h"
#include "mongo/s/cluster_identity_loader.h"
#include "mongo/s/grid.h"
+#include "mongo/s/pre_warm_connection_pool_impl.h"
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/sharding_task_executor.h"
#include "mongo/s/sharding_task_executor_pool_controller.h"
@@ -141,6 +146,26 @@ std::unique_ptr<TaskExecutorPool> makeShardingTaskExecutorPool(
return executorPool;
}
+/**
+ * Uses an AsyncMulticaster to ping all of the hosts in order to establish
+ * ShardingTaskExecutorPoolMinSize connections. This does not wait
+ * for the connections to be established nor does it check how many were established.
+ */
+void preWarmConnections(OperationContext* opCtx, std::vector<HostAndPort> allHosts) {
+ auto const grid = Grid::get(opCtx);
+ auto arbi = grid->getExecutorPool()->getArbitraryExecutor();
+ auto executor = executor::ScopedTaskExecutor(arbi);
+ executor::AsyncMulticaster::Options options;
+
+ auto results =
+ executor::AsyncMulticaster(*executor, options)
+ .multicast(allHosts,
+ "admin",
+ BSON("ping" << 1),
+ opCtx,
+ Milliseconds(gWarmMinConnectionsInShardingTaskExecutorPoolOnStartupWaitMS));
+}
+
} // namespace
std::unique_ptr<executor::TaskExecutor> makeShardingTaskExecutor(
@@ -245,4 +270,64 @@ Status waitForShardRegistryReload(OperationContext* opCtx) {
return {ErrorCodes::ShutdownInProgress, "aborting shard loading attempt"};
}
+Status preWarmConnectionPool(OperationContext* opCtx) {
+ if (!gWarmMinConnectionsInShardingTaskExecutorPoolOnStartup) {
+ return Status::OK();
+ }
+
+ if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ log() << "Pre-warming pooled connections for config server is disabled";
+ return Status::OK();
+ }
+
+ // Should not be called by mongod
+ invariant(serverGlobalParams.clusterRole != ClusterRole::ShardServer);
+
+ Timer timer;
+ std::vector<HostAndPort> allHosts;
+ auto const grid = Grid::get(opCtx);
+ auto allShardsStatus =
+ grid->catalogClient()->getAllShards(opCtx, repl::ReadConcernLevel::kMajorityReadConcern);
+ if (!allShardsStatus.isOK()) {
+ return allShardsStatus.getStatus();
+ }
+ auto allShards = allShardsStatus.getValue().value;
+
+ for (auto& shard : allShards) {
+ auto connStrStatus = ConnectionString::parse(shard.getHost());
+ if (!connStrStatus.isOK()) {
+ return connStrStatus.getStatus();
+ }
+ auto connStr = connStrStatus.getValue();
+ for (const auto& hostEntry : connStr.getServers()) {
+ allHosts.push_back(hostEntry);
+ }
+ }
+
+ if (allHosts.empty()) {
+ log() << "No hosts found to pre-warm connections to";
+ return Status::OK();
+ }
+ log() << "Pre-warming connections to " << allHosts.size() << " hosts";
+
+ try {
+ opCtx->runWithDeadline(
+ opCtx->getServiceContext()->getPreciseClockSource()->now() +
+ Milliseconds(gWarmMinConnectionsInShardingTaskExecutorPoolOnStartupWaitMS),
+ ErrorCodes::ExceededTimeLimit,
+ [&] { preWarmConnections(opCtx, allHosts); });
+ } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) {
+ // if we've timed out, eat the exception and continue
+ } catch (const DBException& ex) {
+ warning() << "Connection pool pre-warm failure " << causedBy(ex.toStatus()) << ", timeMs "
+ << timer.millis();
+ return ex.toStatus();
+ }
+
+ if (timer.millis() > gWarmMinConnectionsInShardingTaskExecutorPoolOnStartupWaitMS / 2) {
+ log() << "Slow connection pool pre-warm, timeMs " << timer.millis();
+ }
+ return Status::OK();
+}
+
} // namespace mongo