summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2020-03-26 13:15:04 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-03-27 15:01:20 +0000
commit6694a0434f37db0d6671d05e37a54e78eb1f156b (patch)
tree7d51a47f1f03f2aa7168ae6463b0065066b3be84
parentaec50d8b634b3af53807f9c98a6afd371cb2fa89 (diff)
downloadmongo-6694a0434f37db0d6671d05e37a54e78eb1f156b.tar.gz
SERVER-46681 Integrate the ServerPingMonitor into the StreamableReplicaSetMonitor
-rw-r--r--src/mongo/client/sdam/sdam_datatypes.h21
-rw-r--r--src/mongo/client/sdam/sdam_json_test_runner.cpp3
-rw-r--r--src/mongo/client/sdam/server_description.cpp31
-rw-r--r--src/mongo/client/sdam/server_description.h3
-rw-r--r--src/mongo/client/sdam/server_description_test.cpp57
-rw-r--r--src/mongo/client/sdam/server_selector.h6
-rw-r--r--src/mongo/client/sdam/topology_listener.cpp35
-rw-r--r--src/mongo/client/sdam/topology_listener_mock.cpp30
-rw-r--r--src/mongo/client/sdam/topology_listener_mock.h9
-rw-r--r--src/mongo/client/sdam/topology_manager.cpp21
-rw-r--r--src/mongo/client/sdam/topology_manager_test.cpp26
-rw-r--r--src/mongo/client/server_ping_monitor.cpp89
-rw-r--r--src/mongo/client/server_ping_monitor.h34
-rw-r--r--src/mongo/client/server_ping_monitor_test.cpp56
-rw-r--r--src/mongo/client/streamable_replica_set_monitor.cpp25
-rw-r--r--src/mongo/client/streamable_replica_set_monitor.h2
16 files changed, 288 insertions, 160 deletions
diff --git a/src/mongo/client/sdam/sdam_datatypes.h b/src/mongo/client/sdam/sdam_datatypes.h
index 520d517da87..0da61e14d09 100644
--- a/src/mongo/client/sdam/sdam_datatypes.h
+++ b/src/mongo/client/sdam/sdam_datatypes.h
@@ -79,8 +79,10 @@ class IsMasterOutcome {
IsMasterOutcome() = delete;
public:
- // success constructor
- IsMasterOutcome(ServerAddress server, BSONObj response, IsMasterRTT rtt)
+ // Success constructor.
+ IsMasterOutcome(ServerAddress server,
+ BSONObj response,
+ boost::optional<IsMasterRTT> rtt = boost::none)
: _server(std::move(server)), _success(true), _response(response), _rtt(rtt) {
const auto topologyVersionField = response.getField("topologyVersion");
if (topologyVersionField) {
@@ -89,7 +91,7 @@ public:
}
}
- // failure constructor
+ // Failure constructor.
IsMasterOutcome(ServerAddress server, BSONObj response, std::string errorMsg)
: _server(std::move(server)), _success(false), _errorMsg(errorMsg) {
const auto topologyVersionField = response.getField("topologyVersion");
@@ -108,16 +110,17 @@ public:
private:
ServerAddress _server;
- // indicating the success or failure of the attempt
+ // Indicates the success or failure of the attempt.
bool _success;
- // an error message in case of failure
+ // An error message in case of failure.
std::string _errorMsg;
- // a document containing the command response (or boost::none if it failed)
+ // A document containing the command response (or boost::none if it failed).
boost::optional<BSONObj> _response;
- // the round trip time to execute the command (or null if it failed)
+ // The round trip time to execute the command (or boost::none if it failed or is not the outcome
+ // from an initial handshake exchange).
boost::optional<IsMasterRTT> _rtt;
- // indicates how fresh the topology information in this reponse is (or boost::none if it failed
- // or the response did not include this)
+ // Indicates how fresh the topology information in this reponse is (or boost::none if it failed
+ // or the response did not include this).
boost::optional<TopologyVersion> _topologyVersion;
};
diff --git a/src/mongo/client/sdam/sdam_json_test_runner.cpp b/src/mongo/client/sdam/sdam_json_test_runner.cpp
index 9c750ad977b..b2a6cd0b011 100644
--- a/src/mongo/client/sdam/sdam_json_test_runner.cpp
+++ b/src/mongo/client/sdam/sdam_json_test_runner.cpp
@@ -192,7 +192,8 @@ public:
if (bsonIsMaster.nFields() == 0) {
_isMasterResponses.push_back(IsMasterOutcome(address, BSONObj(), "network error"));
} else {
- _isMasterResponses.push_back(IsMasterOutcome(address, bsonIsMaster, kLatency));
+ _isMasterResponses.push_back(
+ IsMasterOutcome(address, bsonIsMaster, duration_cast<IsMasterRTT>(kLatency)));
}
}
_topologyOutcome = phase["outcome"].Obj();
diff --git a/src/mongo/client/sdam/server_description.cpp b/src/mongo/client/sdam/server_description.cpp
index 62092a0de6c..79b9cafdecf 100644
--- a/src/mongo/client/sdam/server_description.cpp
+++ b/src/mongo/client/sdam/server_description.cpp
@@ -62,7 +62,7 @@ ServerDescription::ServerDescription(ClockSource* clockSource,
// type must be parsed before RTT is calculated.
parseTypeFromIsMaster(response);
- calculateRtt(*isMasterOutcome.getRtt(), lastRtt);
+ calculateRtt(isMasterOutcome.getRtt(), lastRtt);
_lastUpdateTime = clockSource->now();
_minWireVersion = response["minWireVersion"].numberInt();
@@ -160,7 +160,7 @@ void ServerDescription::saveStreamable(BSONElement streamableField) {
_streamable = streamableField && streamableField.Bool();
}
-void ServerDescription::calculateRtt(const IsMasterRTT currentRtt,
+void ServerDescription::calculateRtt(const boost::optional<IsMasterRTT> currentRtt,
const boost::optional<IsMasterRTT> lastRtt) {
if (getType() == ServerType::kUnknown) {
// if a server's type is Unknown, it's RTT is null
@@ -169,12 +169,28 @@ void ServerDescription::calculateRtt(const IsMasterRTT currentRtt,
return;
}
- if (lastRtt) {
+ if (currentRtt == boost::none) {
+ // An onServerHeartbeatSucceededEvent occured. Note: This should not be reached by an
+ // onServerHeartbeatFailedEvent. Upon the failed event, the type is set to
+ // ServerType::Unknown.
+
+ // The ServerType is no longer ServerType::Unknown, but the ServerPingMonitor has not
+ // updated the RTT yet. Set the _rtt to max() until the ServerPingMonitor provides the
+ // accurate RTT measurement.
+ if (lastRtt == boost::none) {
+ _rtt = IsMasterRTT::max();
+ return;
+ }
+
+ // Do not update the RTT upon an onServerHeartbeatSucceededEvent.
+ _rtt = lastRtt;
+ } else if (lastRtt == boost::none || lastRtt == IsMasterRTT::max()) {
+ // The lastRtt either does not exist or is not accurate. Discard it and use the currentRtt.
+ _rtt = currentRtt;
+ } else {
// new_rtt = alpha * x + (1 - alpha) * old_rtt
- _rtt = IsMasterRTT(static_cast<IsMasterRTT::rep>(kRttAlpha * currentRtt.count() +
+ _rtt = IsMasterRTT(static_cast<IsMasterRTT::rep>(kRttAlpha * currentRtt.get().count() +
(1 - kRttAlpha) * lastRtt.get().count()));
- } else {
- _rtt = currentRtt;
}
}
@@ -429,7 +445,8 @@ std::string ServerDescription::toString() const {
ServerDescriptionPtr ServerDescription::cloneWithRTT(IsMasterRTT rtt) {
auto newServerDescription = std::make_shared<ServerDescription>(*this);
- newServerDescription->_rtt = rtt;
+ auto lastRtt = newServerDescription->getRtt();
+ newServerDescription->calculateRtt(rtt, lastRtt);
return newServerDescription;
}
diff --git a/src/mongo/client/sdam/server_description.h b/src/mongo/client/sdam/server_description.h
index eedc5d37599..159d7948738 100644
--- a/src/mongo/client/sdam/server_description.h
+++ b/src/mongo/client/sdam/server_description.h
@@ -117,7 +117,8 @@ private:
void parseTypeFromIsMaster(const BSONObj isMaster);
- void calculateRtt(const IsMasterRTT currentRtt, const boost::optional<IsMasterRTT> lastRtt);
+ void calculateRtt(const boost::optional<IsMasterRTT> currentRtt,
+ const boost::optional<IsMasterRTT> lastRtt);
void saveLastWriteInfo(BSONObj lastWriteBson);
void storeHostListIfPresent(const std::string key,
diff --git a/src/mongo/client/sdam/server_description_test.cpp b/src/mongo/client/sdam/server_description_test.cpp
index 1c9e1156707..8ed40911927 100644
--- a/src/mongo/client/sdam/server_description_test.cpp
+++ b/src/mongo/client/sdam/server_description_test.cpp
@@ -364,35 +364,41 @@ TEST_F(ServerDescriptionTestFixture, ShouldStoreRTTNullWhenServerTypeIsUnknown)
}
TEST_F(ServerDescriptionTestFixture,
- ShouldStoreMovingAverageRTTWhenChangingFromOneKnownServerTypeToAnother) {
- auto response = IsMasterOutcome("foo:1234", kBsonRsPrimary, mongo::Milliseconds(40));
+ ShouldStoreConstantRTTWhenChangingFromOneKnownServerTypeToAnother) {
+ // Simulate a non-ping monitoring response.
+ auto response = IsMasterOutcome("foo:1234", kBsonRsPrimary);
auto lastServerDescription = ServerDescriptionBuilder()
.withType(ServerType::kRSSecondary)
.withRtt(mongo::Milliseconds(20))
.instance();
+
+ // Check the RTT is unchanged since the IsMasterOutcome does not contain an RTT.
auto description = ServerDescription(clockSource, response, lastServerDescription->getRtt());
- ASSERT_EQUALS(24, durationCount<mongo::Milliseconds>(*description.getRtt()));
+ ASSERT_EQUALS(20, durationCount<mongo::Milliseconds>(*description.getRtt()));
- auto response2 = IsMasterOutcome("foo:1234", kBsonRsPrimary, mongo::Milliseconds(30));
+ auto response2 = IsMasterOutcome("foo:1234", kBsonRsPrimary);
auto description2 = ServerDescription(clockSource, response2, description.getRtt());
- ASSERT_EQUALS(25, durationCount<mongo::Milliseconds>(*description2.getRtt()));
+ ASSERT_EQUALS(20, durationCount<mongo::Milliseconds>(*description2.getRtt()));
}
TEST_F(ServerDescriptionTestFixture, ShouldStoreLastWriteDate) {
- auto response = IsMasterOutcome("foo:1234", kBsonLastWrite, mongo::Milliseconds(40));
+ auto response = IsMasterOutcome(
+ "foo:1234", kBsonLastWrite, duration_cast<IsMasterRTT>(mongo::Milliseconds(40)));
auto description = ServerDescription(clockSource, response);
ASSERT_EQUALS(kLastWriteDate, description.getLastWriteDate());
}
TEST_F(ServerDescriptionTestFixture, ShouldStoreOpTime) {
- auto response = IsMasterOutcome("foo:1234", kBsonLastWrite, mongo::Milliseconds(40));
+ auto response = IsMasterOutcome(
+ "foo:1234", kBsonLastWrite, duration_cast<IsMasterRTT>(mongo::Milliseconds(40)));
auto description = ServerDescription(clockSource, response);
ASSERT_EQUALS(kOpTime, description.getOpTime());
}
TEST_F(ServerDescriptionTestFixture, ShouldStoreLastUpdateTime) {
auto testStart = clockSource->now();
- auto response = IsMasterOutcome("foo:1234", kBsonRsPrimary, mongo::Milliseconds(40));
+ auto response = IsMasterOutcome(
+ "foo:1234", kBsonRsPrimary, duration_cast<IsMasterRTT>(mongo::Milliseconds(40)));
auto description = ServerDescription(clockSource, response);
ASSERT_GREATER_THAN_OR_EQUALS(description.getLastUpdateTime(), testStart);
}
@@ -400,7 +406,8 @@ TEST_F(ServerDescriptionTestFixture, ShouldStoreLastUpdateTime) {
// Disabling these tests since this causes jstest failures when
// running on a host with a mixed case hostname.
// TEST_F(ServerDescriptionTestFixture, ShouldStoreHostNamesAsLowercase) {
-// auto response = IsMasterOutcome("FOO:1234", kBsonHostNames, mongo::Milliseconds(40));
+// auto response = IsMasterOutcome("FOO:1234", kBsonHostNames,
+// duration_cast<IsMasterRTT>(mongo::Milliseconds(40)));
// auto description = ServerDescription(clockSource, response);
//
// ASSERT_EQUALS("foo:1234", description.getAddress());
@@ -419,20 +426,23 @@ TEST_F(ServerDescriptionTestFixture, ShouldStoreLastUpdateTime) {
//}
TEST_F(ServerDescriptionTestFixture, ShouldStoreMinMaxWireVersion) {
- auto response = IsMasterOutcome("foo:1234", kBsonWireVersion, mongo::Milliseconds(40));
+ auto response = IsMasterOutcome(
+ "foo:1234", kBsonWireVersion, duration_cast<IsMasterRTT>(mongo::Milliseconds(40)));
auto description = ServerDescription(clockSource, response);
ASSERT_EQUALS(kBsonWireVersion["minWireVersion"].Int(), description.getMinWireVersion());
ASSERT_EQUALS(kBsonWireVersion["maxWireVersion"].Int(), description.getMaxWireVersion());
}
TEST_F(ServerDescriptionTestFixture, ShouldStoreTags) {
- auto response = IsMasterOutcome("foo:1234", kBsonTags, mongo::Milliseconds(40));
+ auto response =
+ IsMasterOutcome("foo:1234", kBsonTags, duration_cast<IsMasterRTT>(mongo::Milliseconds(40)));
auto description = ServerDescription(clockSource, response);
ASSERT_EQUALS(toStringMap(kBsonTags["tags"].Obj()), description.getTags());
}
TEST_F(ServerDescriptionTestFixture, ShouldStoreSetVersionAndName) {
- auto response = IsMasterOutcome("foo:1234", kBsonSetVersionName, mongo::Milliseconds(40));
+ auto response = IsMasterOutcome(
+ "foo:1234", kBsonSetVersionName, duration_cast<IsMasterRTT>(mongo::Milliseconds(40)));
auto description = ServerDescription(clockSource, response);
ASSERT_EQUALS(kBsonSetVersionName.getIntField("setVersion"), description.getSetVersion());
ASSERT_EQUALS(std::string(kBsonSetVersionName.getStringField("setName")),
@@ -440,27 +450,31 @@ TEST_F(ServerDescriptionTestFixture, ShouldStoreSetVersionAndName) {
}
TEST_F(ServerDescriptionTestFixture, ShouldStoreElectionId) {
- auto response = IsMasterOutcome("foo:1234", kBsonElectionId, mongo::Milliseconds(40));
+ auto response = IsMasterOutcome(
+ "foo:1234", kBsonElectionId, duration_cast<IsMasterRTT>(mongo::Milliseconds(40)));
auto description = ServerDescription(clockSource, response);
ASSERT_EQUALS(kBsonElectionId.getField("electionId").OID(), description.getElectionId());
}
TEST_F(ServerDescriptionTestFixture, ShouldStorePrimary) {
- auto response = IsMasterOutcome("foo:1234", kBsonPrimary, mongo::Milliseconds(40));
+ auto response = IsMasterOutcome(
+ "foo:1234", kBsonPrimary, duration_cast<IsMasterRTT>(mongo::Milliseconds(40)));
auto description = ServerDescription(clockSource, response);
ASSERT_EQUALS(std::string(kBsonPrimary.getStringField("primary")), description.getPrimary());
}
TEST_F(ServerDescriptionTestFixture, ShouldStoreLogicalSessionTimeout) {
- auto response =
- IsMasterOutcome("foo:1234", kBsonLogicalSessionTimeout, mongo::Milliseconds(40));
+ auto response = IsMasterOutcome("foo:1234",
+ kBsonLogicalSessionTimeout,
+ duration_cast<IsMasterRTT>(mongo::Milliseconds(40)));
auto description = ServerDescription(clockSource, response);
ASSERT_EQUALS(kBsonLogicalSessionTimeout.getIntField("logicalSessionTimeoutMinutes"),
description.getLogicalSessionTimeoutMinutes());
}
TEST_F(ServerDescriptionTestFixture, ShouldStoreTopologyVersion) {
- auto response = IsMasterOutcome("foo:1234", kTopologyVersion, mongo::Milliseconds(40));
+ auto response = IsMasterOutcome(
+ "foo:1234", kTopologyVersion, duration_cast<IsMasterRTT>(mongo::Milliseconds(40)));
auto topologyVersion =
TopologyVersion::parse(IDLParserErrorContext("TopologyVersion"),
kTopologyVersion.getObjectField("topologyVersion"));
@@ -475,13 +489,15 @@ TEST_F(ServerDescriptionTestFixture, ShouldStoreTopologyVersion) {
}
TEST_F(ServerDescriptionTestFixture, ShouldStoreStreamable) {
- auto response = IsMasterOutcome("foo:1234", kStreamable, mongo::Milliseconds(40));
+ auto response = IsMasterOutcome(
+ "foo:1234", kStreamable, duration_cast<IsMasterRTT>(mongo::Milliseconds(40)));
auto description = ServerDescription(clockSource, response);
ASSERT_EQUALS(true, description.isStreamable());
}
TEST_F(ServerDescriptionTestFixture, ShouldStorePoolResetCounter) {
- auto response = IsMasterOutcome("foo:1234", kStreamable, mongo::Milliseconds(40));
+ auto response = IsMasterOutcome(
+ "foo:1234", kStreamable, duration_cast<IsMasterRTT>(mongo::Milliseconds(40)));
auto description = ServerDescription(
clockSource, response, boost::none /*lastRtt*/, boost::none /*topologyVersion*/, 1);
ASSERT_EQUALS(1, description.getPoolResetCounter());
@@ -495,7 +511,8 @@ TEST_F(ServerDescriptionTestFixture, ShouldStoreServerAddressOnError) {
}
TEST_F(ServerDescriptionTestFixture, ShouldStoreCorrectDefaultValuesOnSuccess) {
- auto response = IsMasterOutcome("foo:1234", kBsonOk, mongo::Milliseconds(40));
+ auto response =
+ IsMasterOutcome("foo:1234", kBsonOk, duration_cast<IsMasterRTT>(mongo::Milliseconds(40)));
auto description = ServerDescription(clockSource, response);
ASSERT_EQUALS(boost::none, description.getError());
ASSERT_EQUALS(boost::none, description.getLastWriteDate());
diff --git a/src/mongo/client/sdam/server_selector.h b/src/mongo/client/sdam/server_selector.h
index 01152fb13b2..b05ad6be90f 100644
--- a/src/mongo/client/sdam/server_selector.h
+++ b/src/mongo/client/sdam/server_selector.h
@@ -177,10 +177,12 @@ private:
// This is used to filter out servers based on their current latency measurements.
struct LatencyWindow {
const IsMasterRTT lower;
- const IsMasterRTT upper;
+ IsMasterRTT upper;
explicit LatencyWindow(const IsMasterRTT lowerBound, const IsMasterRTT windowWidth)
- : lower(lowerBound), upper(lowerBound + windowWidth) {}
+ : lower(lowerBound) {
+ upper = (lowerBound == IsMasterRTT::max()) ? lowerBound : lowerBound + windowWidth;
+ }
bool isWithinWindow(IsMasterRTT latency);
diff --git a/src/mongo/client/sdam/topology_listener.cpp b/src/mongo/client/sdam/topology_listener.cpp
index 147fa7dcb26..34d8ef63563 100644
--- a/src/mongo/client/sdam/topology_listener.cpp
+++ b/src/mongo/client/sdam/topology_listener.cpp
@@ -26,12 +26,8 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
-
#include "mongo/client/sdam/topology_listener.h"
-#include "mongo/util/log.h"
-
namespace mongo::sdam {
void TopologyEventsPublisher::registerListener(TopologyListenerPtr listener) {
@@ -120,10 +116,30 @@ void TopologyEventsPublisher::_scheduleNextDelivery() {
}
void TopologyEventsPublisher::onServerPingFailedEvent(const ServerAddress& hostAndPort,
- const Status& status) {}
+ const Status& status) {
+ {
+ stdx::lock_guard lock(_eventQueueMutex);
+ EventPtr event = std::make_unique<Event>();
+ event->type = EventType::PING_FAILURE;
+ event->hostAndPort = hostAndPort;
+ event->status = status;
+ _eventQueue.push_back(std::move(event));
+ }
+ _scheduleNextDelivery();
+}
void TopologyEventsPublisher::onServerPingSucceededEvent(IsMasterRTT durationMS,
- const ServerAddress& hostAndPort) {}
+ const ServerAddress& hostAndPort) {
+ {
+ stdx::lock_guard lock(_eventQueueMutex);
+ EventPtr event = std::make_unique<Event>();
+ event->type = EventType::PING_SUCCESS;
+ event->duration = duration_cast<IsMasterRTT>(durationMS);
+ event->hostAndPort = hostAndPort;
+ _eventQueue.push_back(std::move(event));
+ }
+ _scheduleNextDelivery();
+}
// note that this could be done in batches if it is a bottleneck.
void TopologyEventsPublisher::_nextDelivery() {
@@ -178,6 +194,13 @@ void TopologyEventsPublisher::_sendEvent(TopologyListenerPtr listener, const Eve
event.hostAndPort,
event.reply);
break;
+ case EventType::PING_SUCCESS:
+ listener->onServerPingSucceededEvent(duration_cast<IsMasterRTT>(event.duration),
+ event.hostAndPort);
+ break;
+ case EventType::PING_FAILURE:
+ listener->onServerPingFailedEvent(event.hostAndPort, event.status);
+ break;
default:
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/client/sdam/topology_listener_mock.cpp b/src/mongo/client/sdam/topology_listener_mock.cpp
index 5b8bcd4a6b0..7dddbaefea6 100644
--- a/src/mongo/client/sdam/topology_listener_mock.cpp
+++ b/src/mongo/client/sdam/topology_listener_mock.cpp
@@ -34,20 +34,29 @@
namespace mongo::sdam {
-void TopologyListenerMock::onServerPingFailedEvent(const ServerAddress& hostAndPort,
- const Status& status) {
+void TopologyListenerMock::onServerPingSucceededEvent(IsMasterRTT latency,
+ const ServerAddress& hostAndPort) {
stdx::lock_guard lk(_mutex);
- invariant(!_hasPingResponse_inlock(hostAndPort));
- _serverPingRTTs.emplace(hostAndPort, status);
+ auto it = _serverPingRTTs.find(hostAndPort);
+ if (it != _serverPingRTTs.end()) {
+ it->second.emplace_back(latency);
+ } else {
+ _serverPingRTTs.emplace(hostAndPort, std::vector<StatusWith<IsMasterRTT>>{latency});
+ }
}
-void TopologyListenerMock::onServerPingSucceededEvent(IsMasterRTT durationMS,
- const ServerAddress& hostAndPort) {
+void TopologyListenerMock::onServerPingFailedEvent(const ServerAddress& hostAndPort,
+ const Status& errorStatus) {
stdx::lock_guard lk(_mutex);
- invariant(!_hasPingResponse_inlock(hostAndPort));
- _serverPingRTTs.emplace(hostAndPort, durationMS);
+ // If the map already contains an element for hostAndPort, append to its already existing
+ // vector. Otherwise, create a new vector.
+ auto it = _serverPingRTTs.find(hostAndPort);
+ if (it != _serverPingRTTs.end()) {
+ it->second.emplace_back(errorStatus);
+ } else {
+ _serverPingRTTs.emplace(hostAndPort, std::vector<StatusWith<IsMasterRTT>>{errorStatus});
+ }
}
-
bool TopologyListenerMock::hasPingResponse(const ServerAddress& hostAndPort) {
stdx::lock_guard lk(_mutex);
return _hasPingResponse_inlock(hostAndPort);
@@ -57,7 +66,8 @@ bool TopologyListenerMock::_hasPingResponse_inlock(const ServerAddress& hostAndP
return _serverPingRTTs.find(hostAndPort) != _serverPingRTTs.end();
}
-StatusWith<IsMasterRTT> TopologyListenerMock::getPingResponse(const ServerAddress& hostAndPort) {
+std::vector<StatusWith<IsMasterRTT>> TopologyListenerMock::getPingResponse(
+ const ServerAddress& hostAndPort) {
stdx::lock_guard lk(_mutex);
invariant(_hasPingResponse_inlock(hostAndPort));
auto it = _serverPingRTTs.find(hostAndPort);
diff --git a/src/mongo/client/sdam/topology_listener_mock.h b/src/mongo/client/sdam/topology_listener_mock.h
index 0184d1ce81e..05207512ea1 100644
--- a/src/mongo/client/sdam/topology_listener_mock.h
+++ b/src/mongo/client/sdam/topology_listener_mock.h
@@ -39,9 +39,10 @@ class TopologyListenerMock : public TopologyListener {
public:
TopologyListenerMock() = default;
virtual ~TopologyListenerMock() = default;
+
+ void onServerPingSucceededEvent(IsMasterRTT latency, const ServerAddress& hostAndPort) override;
+
void onServerPingFailedEvent(const ServerAddress& hostAndPort, const Status& status) override;
- void onServerPingSucceededEvent(IsMasterRTT durationMS,
- const ServerAddress& hostAndPort) override;
/**
* Acquires _mutex before calling _hasPingResponse_inlock().
@@ -58,11 +59,11 @@ public:
* Returns the response for the most recent onServerPing event. MUST be called after a ping has
* been sent and proccessed in order to remove it from the map and make room for the next.
*/
- StatusWith<IsMasterRTT> getPingResponse(const ServerAddress& hostAndPort);
+ std::vector<StatusWith<IsMasterRTT>> getPingResponse(const ServerAddress& hostAndPort);
private:
Mutex _mutex;
- stdx::unordered_map<ServerAddress, StatusWith<IsMasterRTT>> _serverPingRTTs;
+ stdx::unordered_map<ServerAddress, std::vector<StatusWith<IsMasterRTT>>> _serverPingRTTs;
};
} // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/topology_manager.cpp b/src/mongo/client/sdam/topology_manager.cpp
index a4847391cf6..75242baa796 100644
--- a/src/mongo/client/sdam/topology_manager.cpp
+++ b/src/mongo/client/sdam/topology_manager.cpp
@@ -129,21 +129,22 @@ const std::shared_ptr<TopologyDescription> TopologyManager::getTopologyDescripti
}
void TopologyManager::onServerRTTUpdated(ServerAddress hostAndPort, IsMasterRTT rtt) {
- stdx::lock_guard<mongo::Mutex> lock(_mutex);
+ {
+ stdx::lock_guard<mongo::Mutex> lock(_mutex);
- auto oldServerDescription = _topologyDescription->findServerByAddress(hostAndPort);
- if (oldServerDescription) {
- auto newServerDescription = (*oldServerDescription)->cloneWithRTT(rtt);
+ auto oldServerDescription = _topologyDescription->findServerByAddress(hostAndPort);
+ if (oldServerDescription) {
+ auto newServerDescription = (*oldServerDescription)->cloneWithRTT(rtt);
- auto oldTopologyDescription = _topologyDescription;
- _topologyDescription = std::make_shared<TopologyDescription>(*_topologyDescription);
- _topologyDescription->installServerDescription(newServerDescription);
+ auto oldTopologyDescription = _topologyDescription;
+ _topologyDescription = std::make_shared<TopologyDescription>(*_topologyDescription);
+ _topologyDescription->installServerDescription(newServerDescription);
- _publishTopologyDescriptionChanged(oldTopologyDescription, _topologyDescription);
+ _publishTopologyDescriptionChanged(oldTopologyDescription, _topologyDescription);
- return;
+ return;
+ }
}
-
// otherwise, the server was removed from the topology. Nothing to do.
LOGV2(4333201,
"Not updating RTT. Server {server} does not exist in {setName}",
diff --git a/src/mongo/client/sdam/topology_manager_test.cpp b/src/mongo/client/sdam/topology_manager_test.cpp
index bc9e9256f1a..fb78bfbbf3f 100644
--- a/src/mongo/client/sdam/topology_manager_test.cpp
+++ b/src/mongo/client/sdam/topology_manager_test.cpp
@@ -74,8 +74,9 @@ TEST_F(TopologyManagerTestFixture, ShouldUpdateTopologyVersionOnSuccess) {
ASSERT(serverDescription->getTopologyVersion() == boost::none);
// If previous topologyVersion is boost::none, should update to new topologyVersion
- auto isMasterOutcome = IsMasterOutcome(
- serverDescription->getAddress(), kBsonTopologyVersionLow, mongo::Milliseconds(40));
+ auto isMasterOutcome = IsMasterOutcome(serverDescription->getAddress(),
+ kBsonTopologyVersionLow,
+ duration_cast<IsMasterRTT>(mongo::Milliseconds(40)));
topologyManager.onServerDescription(isMasterOutcome);
topologyDescription = topologyManager.getTopologyDescription();
auto newServerDescription = topologyDescription->getServers()[0];
@@ -83,8 +84,9 @@ TEST_F(TopologyManagerTestFixture, ShouldUpdateTopologyVersionOnSuccess) {
kBsonTopologyVersionLow.getObjectField("topologyVersion"));
// If previous topologyVersion is <= new topologyVersion, should update to new topologyVersion
- isMasterOutcome = IsMasterOutcome(
- serverDescription->getAddress(), kBsonTopologyVersionHigh, mongo::Milliseconds(40));
+ isMasterOutcome = IsMasterOutcome(serverDescription->getAddress(),
+ kBsonTopologyVersionHigh,
+ duration_cast<IsMasterRTT>(mongo::Milliseconds(40)));
topologyManager.onServerDescription(isMasterOutcome);
topologyDescription = topologyManager.getTopologyDescription();
newServerDescription = topologyDescription->getServers()[0];
@@ -102,8 +104,9 @@ TEST_F(TopologyManagerTestFixture, ShouldUpdateTopologyVersionOnErrorIfSent) {
ASSERT(serverDescription->getTopologyVersion() == boost::none);
// If previous topologyVersion is boost::none, should update to new topologyVersion
- auto isMasterOutcome = IsMasterOutcome(
- serverDescription->getAddress(), kBsonTopologyVersionLow, mongo::Milliseconds(40));
+ auto isMasterOutcome = IsMasterOutcome(serverDescription->getAddress(),
+ kBsonTopologyVersionLow,
+ duration_cast<IsMasterRTT>(mongo::Milliseconds(40)));
topologyManager.onServerDescription(isMasterOutcome);
topologyDescription = topologyManager.getTopologyDescription();
auto newServerDescription = topologyDescription->getServers()[0];
@@ -130,8 +133,9 @@ TEST_F(TopologyManagerTestFixture, ShouldNotUpdateServerDescriptionIfNewTopology
ASSERT(serverDescription->getTopologyVersion() == boost::none);
// If previous topologyVersion is boost::none, should update to new topologyVersion
- auto isMasterOutcome = IsMasterOutcome(
- serverDescription->getAddress(), kBsonTopologyVersionHigh, mongo::Milliseconds(40));
+ auto isMasterOutcome = IsMasterOutcome(serverDescription->getAddress(),
+ kBsonTopologyVersionHigh,
+ duration_cast<IsMasterRTT>(mongo::Milliseconds(40)));
topologyManager.onServerDescription(isMasterOutcome);
topologyDescription = topologyManager.getTopologyDescription();
auto newServerDescription = topologyDescription->getServers()[0];
@@ -139,8 +143,7 @@ TEST_F(TopologyManagerTestFixture, ShouldNotUpdateServerDescriptionIfNewTopology
kBsonTopologyVersionHigh.getObjectField("topologyVersion"));
// If isMasterOutcome is not successful, should preserve old topologyVersion
- isMasterOutcome = IsMasterOutcome(
- serverDescription->getAddress(), kBsonTopologyVersionLow, mongo::Milliseconds(40));
+ isMasterOutcome = IsMasterOutcome(serverDescription->getAddress(), kBsonTopologyVersionLow);
topologyManager.onServerDescription(isMasterOutcome);
topologyDescription = topologyManager.getTopologyDescription();
newServerDescription = topologyDescription->getServers()[0];
@@ -158,8 +161,7 @@ TEST_F(TopologyManagerTestFixture, ShouldNowIncrementPoolResetCounterOnSuccess)
ASSERT_EQUALS(serverDescription->getPoolResetCounter(), 0);
// If isMasterOutcome is successful, poolResetCounter should remain the same
- IsMasterOutcome isMasterOutcome(
- serverDescription->getAddress(), kBsonOk, mongo::Milliseconds(40));
+ IsMasterOutcome isMasterOutcome(serverDescription->getAddress(), kBsonOk);
topologyManager.onServerDescription(isMasterOutcome);
topologyDescription = topologyManager.getTopologyDescription();
auto newServerDescription = topologyDescription->getServers()[0];
diff --git a/src/mongo/client/server_ping_monitor.cpp b/src/mongo/client/server_ping_monitor.cpp
index 3cb7f793bf2..c748d5d0498 100644
--- a/src/mongo/client/server_ping_monitor.cpp
+++ b/src/mongo/client/server_ping_monitor.cpp
@@ -33,6 +33,7 @@
#include "mongo/client/server_ping_monitor.h"
+#include "mongo/client/sdam/sdam.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/network_interface_thread_pool.h"
#include "mongo/executor/thread_pool_task_executor.h"
@@ -50,7 +51,7 @@ using CallbackHandle = TaskExecutor::CallbackHandle;
SingleServerPingMonitor::SingleServerPingMonitor(sdam::ServerAddress hostAndPort,
sdam::TopologyListener* rttListener,
- Seconds pingFrequency,
+ Milliseconds pingFrequency,
std::shared_ptr<TaskExecutor> executor)
: _hostAndPort(hostAndPort),
_rttListener(rttListener),
@@ -99,18 +100,19 @@ void SingleServerPingMonitor::_scheduleServerPing() {
if (ErrorCodes::isShutdownError(schedulePingHandle.getStatus().code())) {
LOGV2_DEBUG(23727,
- 1,
+ kLogLevel,
"Can't schedule ping for {hostAndPort}. Executor shutdown in progress",
"hostAndPort"_attr = _hostAndPort);
return;
}
if (!schedulePingHandle.isOK()) {
- LOGV2_FATAL(31434,
+ LOGV2_FATAL(23732,
"Can't continue scheduling pings to {hostAndPort} due to "
"{schedulePingHandle_getStatus}",
"hostAndPort"_attr = _hostAndPort,
"schedulePingHandle_getStatus"_attr = redact(schedulePingHandle.getStatus()));
+ fassertFailed(31434);
}
_pingHandle = std::move(schedulePingHandle.getValue());
@@ -154,17 +156,18 @@ void SingleServerPingMonitor::_doServerPing() {
if (ErrorCodes::isShutdownError(remotePingHandle.getStatus().code())) {
LOGV2_DEBUG(23728,
- 1,
+ kLogLevel,
"Can't ping {hostAndPort}. Executor shutdown in progress",
"hostAndPort"_attr = _hostAndPort);
return;
}
if (!remotePingHandle.isOK()) {
- LOGV2_FATAL(31435,
+ LOGV2_FATAL(23733,
"Can't continue pinging {hostAndPort} due to {remotePingHandle_getStatus}",
"hostAndPort"_attr = _hostAndPort,
"remotePingHandle_getStatus"_attr = redact(remotePingHandle.getStatus()));
+ fassertFailed(31435);
}
// Update the _pingHandle so the ping can be canceled if the SingleServerPingMonitor gets
@@ -173,11 +176,9 @@ void SingleServerPingMonitor::_doServerPing() {
}
ServerPingMonitor::ServerPingMonitor(sdam::TopologyListener* rttListener,
- Seconds pingFrequency,
- boost::optional<std::shared_ptr<TaskExecutor>> executor)
- : _rttListener(rttListener),
- _pingFrequency(pingFrequency),
- _executor(executor.get_value_or({})) {}
+ Milliseconds pingFrequency,
+ std::shared_ptr<TaskExecutor> executor)
+ : _rttListener(rttListener), _pingFrequency(pingFrequency), _executor(executor) {}
ServerPingMonitor::~ServerPingMonitor() {
shutdown();
@@ -201,25 +202,6 @@ void ServerPingMonitor::shutdown() {
for (auto& [hostAndPort, singleMonitor] : serverPingMonitorMap) {
singleMonitor->drop();
}
-
- if (executor) {
- executor->shutdown();
- executor->join();
- }
-}
-
-void ServerPingMonitor::_setupTaskExecutor_inlock() {
- if (_isShutdown || _executor) {
- // Do not restart the _executor if it is in shutdown or already provided from a test.
- return;
- } else {
- auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
- auto net = executor::makeNetworkInterface(
- "ServerPingMonitor-TaskExecutor", nullptr, std::move(hookList));
- auto pool = std::make_unique<NetworkInterfaceThreadPool>(net.get());
- _executor = std::make_shared<ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
- _executor->startup();
- }
}
void ServerPingMonitor::onServerHandshakeCompleteEvent(sdam::IsMasterRTT durationMs,
@@ -231,33 +213,48 @@ void ServerPingMonitor::onServerHandshakeCompleteEvent(sdam::IsMasterRTT duratio
<< "' due to shutdown",
!_isShutdown);
- _setupTaskExecutor_inlock();
- invariant(_serverPingMonitorMap.find(address) == _serverPingMonitorMap.end());
+ if (_serverPingMonitorMap.find(address) != _serverPingMonitorMap.end()) {
+ LOGV2_DEBUG(466811,
+ kLogLevel + 1,
+ "ServerPingMonitor already monitoring {address}",
+ "address"_attr = address);
+ return;
+ }
auto newSingleMonitor =
std::make_shared<SingleServerPingMonitor>(address, _rttListener, _pingFrequency, _executor);
_serverPingMonitorMap[address] = newSingleMonitor;
newSingleMonitor->init();
- LOGV2_DEBUG(
- 23729, 1, "ServerPingMonitor is now monitoring {address}", "address"_attr = address);
+ LOGV2_DEBUG(23729,
+ kLogLevel,
+ "ServerPingMonitor is now monitoring {address}",
+ "address"_attr = address);
}
-void ServerPingMonitor::onServerClosedEvent(const sdam::ServerAddress& address, OID topologyId) {
+void ServerPingMonitor::onTopologyDescriptionChangedEvent(
+ UUID topologyId,
+ sdam::TopologyDescriptionPtr previousDescription,
+ sdam::TopologyDescriptionPtr newDescription) {
stdx::lock_guard lk(_mutex);
if (_isShutdown) {
- LOGV2_DEBUG(23730,
- 1,
- "ServerPingMonitor is in shutdown and will stop monitoring {address} if it has "
- "not already done so.",
- "address"_attr = address);
return;
}
- auto it = _serverPingMonitorMap.find(address);
- invariant(it != _serverPingMonitorMap.end());
- it->second->drop();
- _serverPingMonitorMap.erase(it);
- LOGV2_DEBUG(
- 23731, 1, "ServerPingMonitor stopped monitoring {address}", "address"_attr = address);
-}
+ // Remove monitors that are missing from the topology.
+ auto it = _serverPingMonitorMap.begin();
+ while (it != _serverPingMonitorMap.end()) {
+ const auto& serverAddress = it->first;
+ if (newDescription->findServerByAddress(serverAddress) == boost::none) {
+ auto& singleMonitor = _serverPingMonitorMap[serverAddress];
+ singleMonitor->drop();
+ LOGV2_DEBUG(462899,
+ kLogLevel,
+ "ServerPingMonitor for host {addr} was removed from being monitored.",
+ "addr"_attr = serverAddress);
+ it = _serverPingMonitorMap.erase(it, ++it);
+ } else {
+ ++it;
+ }
+ }
+}
} // namespace mongo
diff --git a/src/mongo/client/server_ping_monitor.h b/src/mongo/client/server_ping_monitor.h
index a7289a79d99..80ebc8be95d 100644
--- a/src/mongo/client/server_ping_monitor.h
+++ b/src/mongo/client/server_ping_monitor.h
@@ -44,7 +44,7 @@ class SingleServerPingMonitor : public std::enable_shared_from_this<SingleServer
public:
explicit SingleServerPingMonitor(sdam::ServerAddress hostAndPort,
sdam::TopologyListener* rttListener,
- Seconds pingFrequency,
+ Milliseconds pingFrequency,
std::shared_ptr<executor::TaskExecutor> executor);
/**
@@ -97,7 +97,7 @@ private:
/**
* The frequency at which ping requests should be sent to measure the round trip time.
*/
- Seconds _pingFrequency;
+ Milliseconds _pingFrequency;
std::shared_ptr<executor::TaskExecutor> _executor;
@@ -107,6 +107,8 @@ private:
*/
Date_t _nextPingStartDate{};
+ static constexpr auto kLogLevel = 0;
+
/**
* Must be held to access any of the member variables below.
*/
@@ -132,14 +134,9 @@ class ServerPingMonitor : public sdam::TopologyListener {
ServerPingMonitor& operator=(const ServerPingMonitor&) = delete;
public:
- /**
- * Note: The ServerPingMonitor creates its own executor by default. It takes in an executor for
- * testing only.
- */
- ServerPingMonitor(
- sdam::TopologyListener* rttListener,
- Seconds pingFrequency,
- boost::optional<std::shared_ptr<executor::TaskExecutor>> executor = boost::none);
+ ServerPingMonitor(sdam::TopologyListener* rttListener,
+ Milliseconds pingFrequency,
+ std::shared_ptr<executor::TaskExecutor> executor);
~ServerPingMonitor();
/**
@@ -148,7 +145,7 @@ public:
void shutdown();
/**
- * The first isMaster exchange for a server succeeded. Creates a new
+ * The first isMaster exchange for a connection to the server succeeded. Creates a new
* SingleServerPingMonitor to monitor the new replica set member.
*/
void onServerHandshakeCompleteEvent(sdam::IsMasterRTT durationMs,
@@ -156,16 +153,15 @@ public:
const BSONObj reply = BSONObj());
/**
- * The connection to the server was closed. Removes the server from the ServerPingMonitorList.
+ * Drop corresponding SingleServerPingMonitors if the server is not included in the
+ * newDescritpion.
*/
- void onServerClosedEvent(const sdam::ServerAddress& address, OID topologyId);
+ void onTopologyDescriptionChangedEvent(UUID topologyId,
+ sdam::TopologyDescriptionPtr previousDescription,
+ sdam::TopologyDescriptionPtr newDescription);
private:
/**
- * Sets up and starts up the _executor if it did not already exist.
- */
- void _setupTaskExecutor_inlock();
- /**
* Listens for when new RTT (Round Trip Time) values are published.
*/
sdam::TopologyListener* _rttListener;
@@ -173,13 +169,15 @@ private:
/**
* The interval at which ping requests should be sent to measure the RTT (Round Trip Time).
*/
- Seconds _pingFrequency;
+ Milliseconds _pingFrequency;
/**
* Executor for performing server monitoring pings for all of the replica set members.
*/
std::shared_ptr<executor::TaskExecutor> _executor;
+ static constexpr auto kLogLevel = 0;
+
mutable Mutex _mutex = MONGO_MAKE_LATCH("ServerPingMonitor::mutex");
/**
diff --git a/src/mongo/client/server_ping_monitor_test.cpp b/src/mongo/client/server_ping_monitor_test.cpp
index 05a7f0603d0..0eba77e7941 100644
--- a/src/mongo/client/server_ping_monitor_test.cpp
+++ b/src/mongo/client/server_ping_monitor_test.cpp
@@ -33,7 +33,7 @@
#include <memory>
-#include "mongo/client/sdam/sdam_datatypes.h"
+#include "mongo/client/sdam/sdam.h"
#include "mongo/client/sdam/topology_listener_mock.h"
#include "mongo/client/server_ping_monitor.h"
#include "mongo/dbtests/mock/mock_replica_set.h"
@@ -161,8 +161,10 @@ protected:
ASSERT_TRUE(_topologyListener->hasPingResponse(hostAndPort));
ASSERT_LT(elapsed(), deadline);
auto pingResponse = _topologyListener->getPingResponse(hostAndPort);
- ASSERT(pingResponse.isOK());
+ // There should only be one isMaster response queued up.
+ ASSERT_EQ(pingResponse.size(), 1);
+ ASSERT(pingResponse[0].isOK());
checkNoActivityBefore(deadline, hostAndPort);
}
@@ -178,6 +180,40 @@ protected:
}
}
+ /**
+ * Since the SingleServerPingMonitor is removed upon an onTopologyDescriptionChangedEvent,
+ * prompt the event with a new TopologyDescription that does not include hostToDrop.
+ */
+ void closeMonitor(MockReplicaSet* replSet,
+ sdam::ServerAddress hostToDrop,
+ ServerPingMonitor* pingMonitor) {
+ auto hostAndPorts = replSet->getHosts();
+ std::vector<sdam::ServerAddress> hosts;
+ std::transform(hostAndPorts.begin(),
+ hostAndPorts.end(),
+ std::back_inserter(hosts),
+ [](const auto& hostAndPort) { return hostAndPort.toString(); });
+
+ auto sdamConfigOld = sdam::SdamConfiguration(hosts);
+ auto topologyDescriptionOld = std::make_shared<sdam::TopologyDescription>(sdamConfigOld);
+
+
+ std::vector<sdam::ServerAddress> hostsNew(hosts.begin(), hosts.end());
+ hostsNew.erase(std::remove_if(hostsNew.begin(),
+ hostsNew.end(),
+ [&](auto host) { return host == hostToDrop; }),
+ hostsNew.end());
+ // Since the seedlist cannot be empty, the new TopologyDescription contains an empty
+ // HostAndPort.
+ if (hostsNew.size() == 0) {
+ hostsNew.emplace_back(HostAndPort().toString());
+ }
+ auto sdamConfigNew = sdam::SdamConfiguration(hostsNew);
+ auto topologyDescriptionNew = std::make_shared<sdam::TopologyDescription>(sdamConfigNew);
+ pingMonitor->onTopologyDescriptionChangedEvent(
+ UUID::gen(), topologyDescriptionOld, topologyDescriptionNew);
+ }
+
private:
Date_t _startDate;
std::unique_ptr<sdam::TopologyListenerMock> _topologyListener;
@@ -277,7 +313,8 @@ TEST_F(SingleServerPingMonitorTest, pingDeadServer) {
ASSERT_TRUE(topologyListener->hasPingResponse(hostAndPort));
auto pingResponse = topologyListener->getPingResponse(hostAndPort);
- ASSERT_EQ(ErrorCodes::HostUnreachable, pingResponse.getStatus());
+ ASSERT_EQ(pingResponse.size(), 1);
+ ASSERT_EQ(ErrorCodes::HostUnreachable, pingResponse[0].getStatus());
checkNoActivityBefore(deadline);
};
@@ -310,14 +347,13 @@ TEST_F(SingleServerPingMonitorTest, noPingAfterSingleServerPingMonitorClosed) {
class ServerPingMonitorTest : public ServerPingMonitorTestFixture {
protected:
std::unique_ptr<ServerPingMonitor> makeServerPingMonitor(Seconds pingFrequency) {
- auto executor = boost::optional<std::shared_ptr<executor::TaskExecutor>>(getExecutor());
- return std::make_unique<ServerPingMonitor>(getTopologyListener(), pingFrequency, executor);
+ return std::make_unique<ServerPingMonitor>(
+ getTopologyListener(), pingFrequency, getExecutor());
}
};
/**
- * Adds and removes a SingleServerPingMonitor from the ServerPingMonitor via
- * onServerHandshakeCompleteEvent and onServerClosedEvent.
+ * Adds and removes a SingleServerPingMonitor from the ServerPingMonitor.
*/
TEST_F(ServerPingMonitorTest, singleNodeServerPingMonitorCycle) {
auto pingFrequency = Seconds(10);
@@ -326,7 +362,6 @@ TEST_F(ServerPingMonitorTest, singleNodeServerPingMonitorCycle) {
"test", 1, /* hasPrimary = */ false, /* dollarPrefixHosts = */ false);
auto hostAndPort = HostAndPort(replSet->getSecondaries()[0]).toString();
- auto oid = OID::gen();
// Add a SingleServerPingMonitor to the ServerPingMonitor. Confirm pings are sent to the server
// at pingFrequency.
@@ -336,7 +371,7 @@ TEST_F(ServerPingMonitorTest, singleNodeServerPingMonitorCycle) {
// Close the SingleServerMonitor before the third ping and confirm ping activity to the server
// is stopped.
- serverPingMonitor->onServerClosedEvent(hostAndPort, oid);
+ closeMonitor(replSet.get(), hostAndPort, serverPingMonitor.get());
checkNoActivityBefore(elapsed() + pingFrequency * 2, hostAndPort);
}
@@ -353,7 +388,6 @@ TEST_F(ServerPingMonitorTest, twoNodeServerPingMonitorOneClosed) {
auto hosts = replSet->getHosts();
auto host0 = hosts[0].toString();
auto host1 = hosts[1].toString();
- auto oid0 = OID::gen();
// Add SingleServerPingMonitors for host0 and host1 where host1 is added host1Delay seconds
// after host0.
@@ -364,7 +398,7 @@ TEST_F(ServerPingMonitorTest, twoNodeServerPingMonitorOneClosed) {
serverPingMonitor->onServerHandshakeCompleteEvent(initialRTT, host1);
checkSinglePing(pingFrequency - Seconds(2), host1, replSet.get());
- serverPingMonitor->onServerClosedEvent(host0, oid0);
+ closeMonitor(replSet.get(), host0, serverPingMonitor.get());
checkNoActivityBefore(pingFrequency + host1Delay, host0);
// Confirm that host1's SingleServerPingMonitor continues ping activity.
diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp
index 65e24727c0c..2c286de0368 100644
--- a/src/mongo/client/streamable_replica_set_monitor.cpp
+++ b/src/mongo/client/streamable_replica_set_monitor.cpp
@@ -172,11 +172,16 @@ void StreamableReplicaSetMonitor::init() {
_topologyManager = std::make_unique<TopologyManager>(
_sdamConfig, getGlobalServiceContext()->getPreciseClockSource(), _eventsPublisher);
+ _eventsPublisher->registerListener(shared_from_this());
+
+ _pingMonitor = std::make_unique<ServerPingMonitor>(
+ _eventsPublisher.get(), sdam::SdamConfiguration::kDefaultHeartbeatFrequencyMs, _executor);
+ _eventsPublisher->registerListener(_pingMonitor);
+
_isMasterMonitor = std::make_unique<ServerIsMasterMonitor>(
_uri, _sdamConfig, _eventsPublisher, _topologyManager->getTopologyDescription(), _executor);
-
- _eventsPublisher->registerListener(shared_from_this());
_eventsPublisher->registerListener(_isMasterMonitor);
+
_isDropped.store(false);
ReplicaSetMonitorManager::get()->getNotifier().onFoundSet(getName());
@@ -190,6 +195,7 @@ void StreamableReplicaSetMonitor::drop() {
LOGV2(4333209, "Closing Replica Set Monitor {setName}", "setName"_attr = getName());
_eventsPublisher->close();
_queryProcessor->shutdown();
+ _pingMonitor->shutdown();
_isMasterMonitor->shutdown();
_failOutstandingWithStatus(
lock, Status{ErrorCodes::ShutdownInProgress, "the ReplicaSetMonitor is shutting down"});
@@ -562,7 +568,8 @@ void StreamableReplicaSetMonitor::onTopologyDescriptionChangedEvent(
void StreamableReplicaSetMonitor::onServerHeartbeatSucceededEvent(sdam::IsMasterRTT durationMs,
const ServerAddress& hostAndPort,
const BSONObj reply) {
- IsMasterOutcome outcome(hostAndPort, reply, durationMs);
+ // After the inital handshake, isMasterResponses should not update the RTT with durationMs.
+ IsMasterOutcome outcome(hostAndPort, reply, boost::none);
_topologyManager->onServerDescription(outcome);
}
@@ -576,11 +583,23 @@ void StreamableReplicaSetMonitor::onServerHeartbeatFailureEvent(IsMasterRTT dura
void StreamableReplicaSetMonitor::onServerPingFailedEvent(const ServerAddress& hostAndPort,
const Status& status) {
+ LOGV2_DEBUG(4668133,
+ 0,
+ " StreamableReplicaSetMonitor::onServerPingFailedEvent, ServerPingMonitor got "
+ "status{status} ",
+ "addr"_attr = hostAndPort,
+ "status"_attr = status);
failedHost(HostAndPort(hostAndPort), status);
}
void StreamableReplicaSetMonitor::onServerPingSucceededEvent(sdam::IsMasterRTT durationMS,
const ServerAddress& hostAndPort) {
+ LOGV2_DEBUG(4668132,
+ 1,
+ " StreamableReplicaSetMonitor::onServerPingSucceededEvent, ServerPingMonitor for "
+ "{addr} with {rtt}",
+ "addr"_attr = hostAndPort,
+ "rtt"_attr = durationMS);
_topologyManager->onServerRTTUpdated(hostAndPort, durationMS);
}
diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h
index ebf77642738..5ec56495684 100644
--- a/src/mongo/client/streamable_replica_set_monitor.h
+++ b/src/mongo/client/streamable_replica_set_monitor.h
@@ -40,6 +40,7 @@
#include "mongo/client/replica_set_monitor.h"
#include "mongo/client/sdam/sdam.h"
#include "mongo/client/server_is_master_monitor.h"
+#include "mongo/client/server_ping_monitor.h"
#include "mongo/executor/task_executor.h"
#include "mongo/logger/log_component.h"
#include "mongo/util/concurrency/with_lock.h"
@@ -201,6 +202,7 @@ private:
sdam::ServerSelectorPtr _serverSelector;
sdam::TopologyEventsPublisherPtr _eventsPublisher;
ServerIsMasterMonitorPtr _isMasterMonitor;
+ std::shared_ptr<ServerPingMonitor> _pingMonitor;
// This object will be registered as a TopologyListener if there are
// any outstanding queries for this RSM instance.