summaryrefslogtreecommitdiff
path: root/src/mongo/client/server_ping_monitor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/client/server_ping_monitor.cpp')
-rw-r--r--src/mongo/client/server_ping_monitor.cpp89
1 files changed, 43 insertions, 46 deletions
diff --git a/src/mongo/client/server_ping_monitor.cpp b/src/mongo/client/server_ping_monitor.cpp
index 3cb7f793bf2..c748d5d0498 100644
--- a/src/mongo/client/server_ping_monitor.cpp
+++ b/src/mongo/client/server_ping_monitor.cpp
@@ -33,6 +33,7 @@
#include "mongo/client/server_ping_monitor.h"
+#include "mongo/client/sdam/sdam.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/network_interface_thread_pool.h"
#include "mongo/executor/thread_pool_task_executor.h"
@@ -50,7 +51,7 @@ using CallbackHandle = TaskExecutor::CallbackHandle;
SingleServerPingMonitor::SingleServerPingMonitor(sdam::ServerAddress hostAndPort,
sdam::TopologyListener* rttListener,
- Seconds pingFrequency,
+ Milliseconds pingFrequency,
std::shared_ptr<TaskExecutor> executor)
: _hostAndPort(hostAndPort),
_rttListener(rttListener),
@@ -99,18 +100,19 @@ void SingleServerPingMonitor::_scheduleServerPing() {
if (ErrorCodes::isShutdownError(schedulePingHandle.getStatus().code())) {
LOGV2_DEBUG(23727,
- 1,
+ kLogLevel,
"Can't schedule ping for {hostAndPort}. Executor shutdown in progress",
"hostAndPort"_attr = _hostAndPort);
return;
}
if (!schedulePingHandle.isOK()) {
- LOGV2_FATAL(31434,
+ LOGV2_FATAL(23732,
"Can't continue scheduling pings to {hostAndPort} due to "
"{schedulePingHandle_getStatus}",
"hostAndPort"_attr = _hostAndPort,
"schedulePingHandle_getStatus"_attr = redact(schedulePingHandle.getStatus()));
+ fassertFailed(31434);
}
_pingHandle = std::move(schedulePingHandle.getValue());
@@ -154,17 +156,18 @@ void SingleServerPingMonitor::_doServerPing() {
if (ErrorCodes::isShutdownError(remotePingHandle.getStatus().code())) {
LOGV2_DEBUG(23728,
- 1,
+ kLogLevel,
"Can't ping {hostAndPort}. Executor shutdown in progress",
"hostAndPort"_attr = _hostAndPort);
return;
}
if (!remotePingHandle.isOK()) {
- LOGV2_FATAL(31435,
+ LOGV2_FATAL(23733,
"Can't continue pinging {hostAndPort} due to {remotePingHandle_getStatus}",
"hostAndPort"_attr = _hostAndPort,
"remotePingHandle_getStatus"_attr = redact(remotePingHandle.getStatus()));
+ fassertFailed(31435);
}
// Update the _pingHandle so the ping can be canceled if the SingleServerPingMonitor gets
@@ -173,11 +176,9 @@ void SingleServerPingMonitor::_doServerPing() {
}
ServerPingMonitor::ServerPingMonitor(sdam::TopologyListener* rttListener,
- Seconds pingFrequency,
- boost::optional<std::shared_ptr<TaskExecutor>> executor)
- : _rttListener(rttListener),
- _pingFrequency(pingFrequency),
- _executor(executor.get_value_or({})) {}
+ Milliseconds pingFrequency,
+ std::shared_ptr<TaskExecutor> executor)
+ : _rttListener(rttListener), _pingFrequency(pingFrequency), _executor(executor) {}
ServerPingMonitor::~ServerPingMonitor() {
shutdown();
@@ -201,25 +202,6 @@ void ServerPingMonitor::shutdown() {
for (auto& [hostAndPort, singleMonitor] : serverPingMonitorMap) {
singleMonitor->drop();
}
-
- if (executor) {
- executor->shutdown();
- executor->join();
- }
-}
-
-void ServerPingMonitor::_setupTaskExecutor_inlock() {
- if (_isShutdown || _executor) {
- // Do not restart the _executor if it is in shutdown or already provided from a test.
- return;
- } else {
- auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
- auto net = executor::makeNetworkInterface(
- "ServerPingMonitor-TaskExecutor", nullptr, std::move(hookList));
- auto pool = std::make_unique<NetworkInterfaceThreadPool>(net.get());
- _executor = std::make_shared<ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
- _executor->startup();
- }
}
void ServerPingMonitor::onServerHandshakeCompleteEvent(sdam::IsMasterRTT durationMs,
@@ -231,33 +213,48 @@ void ServerPingMonitor::onServerHandshakeCompleteEvent(sdam::IsMasterRTT duratio
<< "' due to shutdown",
!_isShutdown);
- _setupTaskExecutor_inlock();
- invariant(_serverPingMonitorMap.find(address) == _serverPingMonitorMap.end());
+ if (_serverPingMonitorMap.find(address) != _serverPingMonitorMap.end()) {
+ LOGV2_DEBUG(466811,
+ kLogLevel + 1,
+ "ServerPingMonitor already monitoring {address}",
+ "address"_attr = address);
+ return;
+ }
auto newSingleMonitor =
std::make_shared<SingleServerPingMonitor>(address, _rttListener, _pingFrequency, _executor);
_serverPingMonitorMap[address] = newSingleMonitor;
newSingleMonitor->init();
- LOGV2_DEBUG(
- 23729, 1, "ServerPingMonitor is now monitoring {address}", "address"_attr = address);
+ LOGV2_DEBUG(23729,
+ kLogLevel,
+ "ServerPingMonitor is now monitoring {address}",
+ "address"_attr = address);
}
-void ServerPingMonitor::onServerClosedEvent(const sdam::ServerAddress& address, OID topologyId) {
+void ServerPingMonitor::onTopologyDescriptionChangedEvent(
+ UUID topologyId,
+ sdam::TopologyDescriptionPtr previousDescription,
+ sdam::TopologyDescriptionPtr newDescription) {
stdx::lock_guard lk(_mutex);
if (_isShutdown) {
- LOGV2_DEBUG(23730,
- 1,
- "ServerPingMonitor is in shutdown and will stop monitoring {address} if it has "
- "not already done so.",
- "address"_attr = address);
return;
}
- auto it = _serverPingMonitorMap.find(address);
- invariant(it != _serverPingMonitorMap.end());
- it->second->drop();
- _serverPingMonitorMap.erase(it);
- LOGV2_DEBUG(
- 23731, 1, "ServerPingMonitor stopped monitoring {address}", "address"_attr = address);
-}
+ // Remove monitors that are missing from the topology.
+ auto it = _serverPingMonitorMap.begin();
+ while (it != _serverPingMonitorMap.end()) {
+ const auto& serverAddress = it->first;
+ if (newDescription->findServerByAddress(serverAddress) == boost::none) {
+ auto& singleMonitor = _serverPingMonitorMap[serverAddress];
+ singleMonitor->drop();
+ LOGV2_DEBUG(462899,
+ kLogLevel,
+ "ServerPingMonitor for host {addr} was removed from being monitored.",
+ "addr"_attr = serverAddress);
+ it = _serverPingMonitorMap.erase(it, ++it);
+ } else {
+ ++it;
+ }
+ }
+}
} // namespace mongo