summaryrefslogtreecommitdiff
path: root/src/mongo/client/server_ping_monitor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/client/server_ping_monitor.cpp')
-rw-r--r--src/mongo/client/server_ping_monitor.cpp89
1 files changed, 46 insertions, 43 deletions
diff --git a/src/mongo/client/server_ping_monitor.cpp b/src/mongo/client/server_ping_monitor.cpp
index c748d5d0498..3cb7f793bf2 100644
--- a/src/mongo/client/server_ping_monitor.cpp
+++ b/src/mongo/client/server_ping_monitor.cpp
@@ -33,7 +33,6 @@
#include "mongo/client/server_ping_monitor.h"
-#include "mongo/client/sdam/sdam.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/network_interface_thread_pool.h"
#include "mongo/executor/thread_pool_task_executor.h"
@@ -51,7 +50,7 @@ using CallbackHandle = TaskExecutor::CallbackHandle;
SingleServerPingMonitor::SingleServerPingMonitor(sdam::ServerAddress hostAndPort,
sdam::TopologyListener* rttListener,
- Milliseconds pingFrequency,
+ Seconds pingFrequency,
std::shared_ptr<TaskExecutor> executor)
: _hostAndPort(hostAndPort),
_rttListener(rttListener),
@@ -100,19 +99,18 @@ void SingleServerPingMonitor::_scheduleServerPing() {
if (ErrorCodes::isShutdownError(schedulePingHandle.getStatus().code())) {
LOGV2_DEBUG(23727,
- kLogLevel,
+ 1,
"Can't schedule ping for {hostAndPort}. Executor shutdown in progress",
"hostAndPort"_attr = _hostAndPort);
return;
}
if (!schedulePingHandle.isOK()) {
- LOGV2_FATAL(23732,
+ LOGV2_FATAL(31434,
"Can't continue scheduling pings to {hostAndPort} due to "
"{schedulePingHandle_getStatus}",
"hostAndPort"_attr = _hostAndPort,
"schedulePingHandle_getStatus"_attr = redact(schedulePingHandle.getStatus()));
- fassertFailed(31434);
}
_pingHandle = std::move(schedulePingHandle.getValue());
@@ -156,18 +154,17 @@ void SingleServerPingMonitor::_doServerPing() {
if (ErrorCodes::isShutdownError(remotePingHandle.getStatus().code())) {
LOGV2_DEBUG(23728,
- kLogLevel,
+ 1,
"Can't ping {hostAndPort}. Executor shutdown in progress",
"hostAndPort"_attr = _hostAndPort);
return;
}
if (!remotePingHandle.isOK()) {
- LOGV2_FATAL(23733,
+ LOGV2_FATAL(31435,
"Can't continue pinging {hostAndPort} due to {remotePingHandle_getStatus}",
"hostAndPort"_attr = _hostAndPort,
"remotePingHandle_getStatus"_attr = redact(remotePingHandle.getStatus()));
- fassertFailed(31435);
}
// Update the _pingHandle so the ping can be canceled if the SingleServerPingMonitor gets
@@ -176,9 +173,11 @@ void SingleServerPingMonitor::_doServerPing() {
}
ServerPingMonitor::ServerPingMonitor(sdam::TopologyListener* rttListener,
- Milliseconds pingFrequency,
- std::shared_ptr<TaskExecutor> executor)
- : _rttListener(rttListener), _pingFrequency(pingFrequency), _executor(executor) {}
+ Seconds pingFrequency,
+ boost::optional<std::shared_ptr<TaskExecutor>> executor)
+ : _rttListener(rttListener),
+ _pingFrequency(pingFrequency),
+ _executor(executor.get_value_or({})) {}
ServerPingMonitor::~ServerPingMonitor() {
shutdown();
@@ -202,6 +201,25 @@ void ServerPingMonitor::shutdown() {
for (auto& [hostAndPort, singleMonitor] : serverPingMonitorMap) {
singleMonitor->drop();
}
+
+ if (executor) {
+ executor->shutdown();
+ executor->join();
+ }
+}
+
+void ServerPingMonitor::_setupTaskExecutor_inlock() {
+ if (_isShutdown || _executor) {
+ // Do not restart the _executor if it is in shutdown or already provided from a test.
+ return;
+ } else {
+ auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
+ auto net = executor::makeNetworkInterface(
+ "ServerPingMonitor-TaskExecutor", nullptr, std::move(hookList));
+ auto pool = std::make_unique<NetworkInterfaceThreadPool>(net.get());
+ _executor = std::make_shared<ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
+ _executor->startup();
+ }
}
void ServerPingMonitor::onServerHandshakeCompleteEvent(sdam::IsMasterRTT durationMs,
@@ -213,48 +231,33 @@ void ServerPingMonitor::onServerHandshakeCompleteEvent(sdam::IsMasterRTT duratio
<< "' due to shutdown",
!_isShutdown);
- if (_serverPingMonitorMap.find(address) != _serverPingMonitorMap.end()) {
- LOGV2_DEBUG(466811,
- kLogLevel + 1,
- "ServerPingMonitor already monitoring {address}",
- "address"_attr = address);
- return;
- }
+ _setupTaskExecutor_inlock();
+ invariant(_serverPingMonitorMap.find(address) == _serverPingMonitorMap.end());
auto newSingleMonitor =
std::make_shared<SingleServerPingMonitor>(address, _rttListener, _pingFrequency, _executor);
_serverPingMonitorMap[address] = newSingleMonitor;
newSingleMonitor->init();
- LOGV2_DEBUG(23729,
- kLogLevel,
- "ServerPingMonitor is now monitoring {address}",
- "address"_attr = address);
+ LOGV2_DEBUG(
+ 23729, 1, "ServerPingMonitor is now monitoring {address}", "address"_attr = address);
}
-void ServerPingMonitor::onTopologyDescriptionChangedEvent(
- UUID topologyId,
- sdam::TopologyDescriptionPtr previousDescription,
- sdam::TopologyDescriptionPtr newDescription) {
+void ServerPingMonitor::onServerClosedEvent(const sdam::ServerAddress& address, OID topologyId) {
stdx::lock_guard lk(_mutex);
if (_isShutdown) {
+ LOGV2_DEBUG(23730,
+ 1,
+ "ServerPingMonitor is in shutdown and will stop monitoring {address} if it has "
+ "not already done so.",
+ "address"_attr = address);
return;
}
-
- // Remove monitors that are missing from the topology.
- auto it = _serverPingMonitorMap.begin();
- while (it != _serverPingMonitorMap.end()) {
- const auto& serverAddress = it->first;
- if (newDescription->findServerByAddress(serverAddress) == boost::none) {
- auto& singleMonitor = _serverPingMonitorMap[serverAddress];
- singleMonitor->drop();
- LOGV2_DEBUG(462899,
- kLogLevel,
- "ServerPingMonitor for host {addr} was removed from being monitored.",
- "addr"_attr = serverAddress);
- it = _serverPingMonitorMap.erase(it, ++it);
- } else {
- ++it;
- }
- }
+ auto it = _serverPingMonitorMap.find(address);
+ invariant(it != _serverPingMonitorMap.end());
+ it->second->drop();
+ _serverPingMonitorMap.erase(it);
+ LOGV2_DEBUG(
+ 23731, 1, "ServerPingMonitor stopped monitoring {address}", "address"_attr = address);
}
+
} // namespace mongo