diff options
Diffstat (limited to 'src/mongo/client/server_ping_monitor.cpp')
-rw-r--r-- | src/mongo/client/server_ping_monitor.cpp | 89 |
1 files changed, 46 insertions, 43 deletions
diff --git a/src/mongo/client/server_ping_monitor.cpp b/src/mongo/client/server_ping_monitor.cpp index c748d5d0498..3cb7f793bf2 100644 --- a/src/mongo/client/server_ping_monitor.cpp +++ b/src/mongo/client/server_ping_monitor.cpp @@ -33,7 +33,6 @@ #include "mongo/client/server_ping_monitor.h" -#include "mongo/client/sdam/sdam.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/network_interface_thread_pool.h" #include "mongo/executor/thread_pool_task_executor.h" @@ -51,7 +50,7 @@ using CallbackHandle = TaskExecutor::CallbackHandle; SingleServerPingMonitor::SingleServerPingMonitor(sdam::ServerAddress hostAndPort, sdam::TopologyListener* rttListener, - Milliseconds pingFrequency, + Seconds pingFrequency, std::shared_ptr<TaskExecutor> executor) : _hostAndPort(hostAndPort), _rttListener(rttListener), @@ -100,19 +99,18 @@ void SingleServerPingMonitor::_scheduleServerPing() { if (ErrorCodes::isShutdownError(schedulePingHandle.getStatus().code())) { LOGV2_DEBUG(23727, - kLogLevel, + 1, "Can't schedule ping for {hostAndPort}. Executor shutdown in progress", "hostAndPort"_attr = _hostAndPort); return; } if (!schedulePingHandle.isOK()) { - LOGV2_FATAL(23732, + LOGV2_FATAL(31434, "Can't continue scheduling pings to {hostAndPort} due to " "{schedulePingHandle_getStatus}", "hostAndPort"_attr = _hostAndPort, "schedulePingHandle_getStatus"_attr = redact(schedulePingHandle.getStatus())); - fassertFailed(31434); } _pingHandle = std::move(schedulePingHandle.getValue()); @@ -156,18 +154,17 @@ void SingleServerPingMonitor::_doServerPing() { if (ErrorCodes::isShutdownError(remotePingHandle.getStatus().code())) { LOGV2_DEBUG(23728, - kLogLevel, + 1, "Can't ping {hostAndPort}. Executor shutdown in progress", "hostAndPort"_attr = _hostAndPort); return; } if (!remotePingHandle.isOK()) { - LOGV2_FATAL(23733, + LOGV2_FATAL(31435, "Can't continue pinging {hostAndPort} due to {remotePingHandle_getStatus}", "hostAndPort"_attr = _hostAndPort, "remotePingHandle_getStatus"_attr = redact(remotePingHandle.getStatus())); - fassertFailed(31435); } // Update the _pingHandle so the ping can be canceled if the SingleServerPingMonitor gets @@ -176,9 +173,11 @@ void SingleServerPingMonitor::_doServerPing() { } ServerPingMonitor::ServerPingMonitor(sdam::TopologyListener* rttListener, - Milliseconds pingFrequency, - std::shared_ptr<TaskExecutor> executor) - : _rttListener(rttListener), _pingFrequency(pingFrequency), _executor(executor) {} + Seconds pingFrequency, + boost::optional<std::shared_ptr<TaskExecutor>> executor) + : _rttListener(rttListener), + _pingFrequency(pingFrequency), + _executor(executor.get_value_or({})) {} ServerPingMonitor::~ServerPingMonitor() { shutdown(); @@ -202,6 +201,25 @@ void ServerPingMonitor::shutdown() { for (auto& [hostAndPort, singleMonitor] : serverPingMonitorMap) { singleMonitor->drop(); } + + if (executor) { + executor->shutdown(); + executor->join(); + } +} + +void ServerPingMonitor::_setupTaskExecutor_inlock() { + if (_isShutdown || _executor) { + // Do not restart the _executor if it is in shutdown or already provided from a test. + return; + } else { + auto hookList = std::make_unique<rpc::EgressMetadataHookList>(); + auto net = executor::makeNetworkInterface( + "ServerPingMonitor-TaskExecutor", nullptr, std::move(hookList)); + auto pool = std::make_unique<NetworkInterfaceThreadPool>(net.get()); + _executor = std::make_shared<ThreadPoolTaskExecutor>(std::move(pool), std::move(net)); + _executor->startup(); + } } void ServerPingMonitor::onServerHandshakeCompleteEvent(sdam::IsMasterRTT durationMs, @@ -213,48 +231,33 @@ void ServerPingMonitor::onServerHandshakeCompleteEvent(sdam::IsMasterRTT duratio << "' due to shutdown", !_isShutdown); - if (_serverPingMonitorMap.find(address) != _serverPingMonitorMap.end()) { - LOGV2_DEBUG(466811, - kLogLevel + 1, - "ServerPingMonitor already monitoring {address}", - "address"_attr = address); - return; - } + _setupTaskExecutor_inlock(); + invariant(_serverPingMonitorMap.find(address) == _serverPingMonitorMap.end()); auto newSingleMonitor = std::make_shared<SingleServerPingMonitor>(address, _rttListener, _pingFrequency, _executor); _serverPingMonitorMap[address] = newSingleMonitor; newSingleMonitor->init(); - LOGV2_DEBUG(23729, - kLogLevel, - "ServerPingMonitor is now monitoring {address}", - "address"_attr = address); + LOGV2_DEBUG( + 23729, 1, "ServerPingMonitor is now monitoring {address}", "address"_attr = address); } -void ServerPingMonitor::onTopologyDescriptionChangedEvent( - UUID topologyId, - sdam::TopologyDescriptionPtr previousDescription, - sdam::TopologyDescriptionPtr newDescription) { +void ServerPingMonitor::onServerClosedEvent(const sdam::ServerAddress& address, OID topologyId) { stdx::lock_guard lk(_mutex); if (_isShutdown) { + LOGV2_DEBUG(23730, + 1, + "ServerPingMonitor is in shutdown and will stop monitoring {address} if it has " + "not already done so.", + "address"_attr = address); return; } - - // Remove monitors that are missing from the topology. - auto it = _serverPingMonitorMap.begin(); - while (it != _serverPingMonitorMap.end()) { - const auto& serverAddress = it->first; - if (newDescription->findServerByAddress(serverAddress) == boost::none) { - auto& singleMonitor = _serverPingMonitorMap[serverAddress]; - singleMonitor->drop(); - LOGV2_DEBUG(462899, - kLogLevel, - "ServerPingMonitor for host {addr} was removed from being monitored.", - "addr"_attr = serverAddress); - it = _serverPingMonitorMap.erase(it, ++it); - } else { - ++it; - } - } + auto it = _serverPingMonitorMap.find(address); + invariant(it != _serverPingMonitorMap.end()); + it->second->drop(); + _serverPingMonitorMap.erase(it); + LOGV2_DEBUG( + 23731, 1, "ServerPingMonitor stopped monitoring {address}", "address"_attr = address); } + } // namespace mongo |