diff options
Diffstat (limited to 'src/mongo/client/server_ping_monitor.cpp')
-rw-r--r-- | src/mongo/client/server_ping_monitor.cpp | 89 |
1 files changed, 43 insertions, 46 deletions
diff --git a/src/mongo/client/server_ping_monitor.cpp b/src/mongo/client/server_ping_monitor.cpp index 3cb7f793bf2..c748d5d0498 100644 --- a/src/mongo/client/server_ping_monitor.cpp +++ b/src/mongo/client/server_ping_monitor.cpp @@ -33,6 +33,7 @@ #include "mongo/client/server_ping_monitor.h" +#include "mongo/client/sdam/sdam.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/network_interface_thread_pool.h" #include "mongo/executor/thread_pool_task_executor.h" @@ -50,7 +51,7 @@ using CallbackHandle = TaskExecutor::CallbackHandle; SingleServerPingMonitor::SingleServerPingMonitor(sdam::ServerAddress hostAndPort, sdam::TopologyListener* rttListener, - Seconds pingFrequency, + Milliseconds pingFrequency, std::shared_ptr<TaskExecutor> executor) : _hostAndPort(hostAndPort), _rttListener(rttListener), @@ -99,18 +100,19 @@ void SingleServerPingMonitor::_scheduleServerPing() { if (ErrorCodes::isShutdownError(schedulePingHandle.getStatus().code())) { LOGV2_DEBUG(23727, - 1, + kLogLevel, "Can't schedule ping for {hostAndPort}. Executor shutdown in progress", "hostAndPort"_attr = _hostAndPort); return; } if (!schedulePingHandle.isOK()) { - LOGV2_FATAL(31434, + LOGV2_FATAL(23732, "Can't continue scheduling pings to {hostAndPort} due to " "{schedulePingHandle_getStatus}", "hostAndPort"_attr = _hostAndPort, "schedulePingHandle_getStatus"_attr = redact(schedulePingHandle.getStatus())); + fassertFailed(31434); } _pingHandle = std::move(schedulePingHandle.getValue()); @@ -154,17 +156,18 @@ void SingleServerPingMonitor::_doServerPing() { if (ErrorCodes::isShutdownError(remotePingHandle.getStatus().code())) { LOGV2_DEBUG(23728, - 1, + kLogLevel, "Can't ping {hostAndPort}. Executor shutdown in progress", "hostAndPort"_attr = _hostAndPort); return; } if (!remotePingHandle.isOK()) { - LOGV2_FATAL(31435, + LOGV2_FATAL(23733, "Can't continue pinging {hostAndPort} due to {remotePingHandle_getStatus}", "hostAndPort"_attr = _hostAndPort, "remotePingHandle_getStatus"_attr = redact(remotePingHandle.getStatus())); + fassertFailed(31435); } // Update the _pingHandle so the ping can be canceled if the SingleServerPingMonitor gets @@ -173,11 +176,9 @@ void SingleServerPingMonitor::_doServerPing() { } ServerPingMonitor::ServerPingMonitor(sdam::TopologyListener* rttListener, - Seconds pingFrequency, - boost::optional<std::shared_ptr<TaskExecutor>> executor) - : _rttListener(rttListener), - _pingFrequency(pingFrequency), - _executor(executor.get_value_or({})) {} + Milliseconds pingFrequency, + std::shared_ptr<TaskExecutor> executor) + : _rttListener(rttListener), _pingFrequency(pingFrequency), _executor(executor) {} ServerPingMonitor::~ServerPingMonitor() { shutdown(); @@ -201,25 +202,6 @@ void ServerPingMonitor::shutdown() { for (auto& [hostAndPort, singleMonitor] : serverPingMonitorMap) { singleMonitor->drop(); } - - if (executor) { - executor->shutdown(); - executor->join(); - } -} - -void ServerPingMonitor::_setupTaskExecutor_inlock() { - if (_isShutdown || _executor) { - // Do not restart the _executor if it is in shutdown or already provided from a test. - return; - } else { - auto hookList = std::make_unique<rpc::EgressMetadataHookList>(); - auto net = executor::makeNetworkInterface( - "ServerPingMonitor-TaskExecutor", nullptr, std::move(hookList)); - auto pool = std::make_unique<NetworkInterfaceThreadPool>(net.get()); - _executor = std::make_shared<ThreadPoolTaskExecutor>(std::move(pool), std::move(net)); - _executor->startup(); - } } void ServerPingMonitor::onServerHandshakeCompleteEvent(sdam::IsMasterRTT durationMs, @@ -231,33 +213,48 @@ void ServerPingMonitor::onServerHandshakeCompleteEvent(sdam::IsMasterRTT duratio << "' due to shutdown", !_isShutdown); - _setupTaskExecutor_inlock(); - invariant(_serverPingMonitorMap.find(address) == _serverPingMonitorMap.end()); + if (_serverPingMonitorMap.find(address) != _serverPingMonitorMap.end()) { + LOGV2_DEBUG(466811, + kLogLevel + 1, + "ServerPingMonitor already monitoring {address}", + "address"_attr = address); + return; + } auto newSingleMonitor = std::make_shared<SingleServerPingMonitor>(address, _rttListener, _pingFrequency, _executor); _serverPingMonitorMap[address] = newSingleMonitor; newSingleMonitor->init(); - LOGV2_DEBUG( - 23729, 1, "ServerPingMonitor is now monitoring {address}", "address"_attr = address); + LOGV2_DEBUG(23729, + kLogLevel, + "ServerPingMonitor is now monitoring {address}", + "address"_attr = address); } -void ServerPingMonitor::onServerClosedEvent(const sdam::ServerAddress& address, OID topologyId) { +void ServerPingMonitor::onTopologyDescriptionChangedEvent( + UUID topologyId, + sdam::TopologyDescriptionPtr previousDescription, + sdam::TopologyDescriptionPtr newDescription) { stdx::lock_guard lk(_mutex); if (_isShutdown) { - LOGV2_DEBUG(23730, - 1, - "ServerPingMonitor is in shutdown and will stop monitoring {address} if it has " - "not already done so.", - "address"_attr = address); return; } - auto it = _serverPingMonitorMap.find(address); - invariant(it != _serverPingMonitorMap.end()); - it->second->drop(); - _serverPingMonitorMap.erase(it); - LOGV2_DEBUG( - 23731, 1, "ServerPingMonitor stopped monitoring {address}", "address"_attr = address); -} + // Remove monitors that are missing from the topology. + auto it = _serverPingMonitorMap.begin(); + while (it != _serverPingMonitorMap.end()) { + const auto& serverAddress = it->first; + if (newDescription->findServerByAddress(serverAddress) == boost::none) { + auto& singleMonitor = _serverPingMonitorMap[serverAddress]; + singleMonitor->drop(); + LOGV2_DEBUG(462899, + kLogLevel, + "ServerPingMonitor for host {addr} was removed from being monitored.", + "addr"_attr = serverAddress); + it = _serverPingMonitorMap.erase(it, ++it); + } else { + ++it; + } + } +} } // namespace mongo |