diff options
author | Haley Connelly <haley.connelly@mongodb.com> | 2020-03-26 13:15:04 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-03-27 15:01:20 +0000 |
commit | 6694a0434f37db0d6671d05e37a54e78eb1f156b (patch) | |
tree | 7d51a47f1f03f2aa7168ae6463b0065066b3be84 | |
parent | aec50d8b634b3af53807f9c98a6afd371cb2fa89 (diff) | |
download | mongo-6694a0434f37db0d6671d05e37a54e78eb1f156b.tar.gz |
SERVER-46681 Integrate the ServerPingMonitor into the StreamableReplicaSetMonitor
-rw-r--r-- | src/mongo/client/sdam/sdam_datatypes.h | 21 | ||||
-rw-r--r-- | src/mongo/client/sdam/sdam_json_test_runner.cpp | 3 | ||||
-rw-r--r-- | src/mongo/client/sdam/server_description.cpp | 31 | ||||
-rw-r--r-- | src/mongo/client/sdam/server_description.h | 3 | ||||
-rw-r--r-- | src/mongo/client/sdam/server_description_test.cpp | 57 | ||||
-rw-r--r-- | src/mongo/client/sdam/server_selector.h | 6 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_listener.cpp | 35 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_listener_mock.cpp | 30 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_listener_mock.h | 9 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_manager.cpp | 21 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_manager_test.cpp | 26 | ||||
-rw-r--r-- | src/mongo/client/server_ping_monitor.cpp | 89 | ||||
-rw-r--r-- | src/mongo/client/server_ping_monitor.h | 34 | ||||
-rw-r--r-- | src/mongo/client/server_ping_monitor_test.cpp | 56 | ||||
-rw-r--r-- | src/mongo/client/streamable_replica_set_monitor.cpp | 25 | ||||
-rw-r--r-- | src/mongo/client/streamable_replica_set_monitor.h | 2 |
16 files changed, 288 insertions, 160 deletions
diff --git a/src/mongo/client/sdam/sdam_datatypes.h b/src/mongo/client/sdam/sdam_datatypes.h index 520d517da87..0da61e14d09 100644 --- a/src/mongo/client/sdam/sdam_datatypes.h +++ b/src/mongo/client/sdam/sdam_datatypes.h @@ -79,8 +79,10 @@ class IsMasterOutcome { IsMasterOutcome() = delete; public: - // success constructor - IsMasterOutcome(ServerAddress server, BSONObj response, IsMasterRTT rtt) + // Success constructor. + IsMasterOutcome(ServerAddress server, + BSONObj response, + boost::optional<IsMasterRTT> rtt = boost::none) : _server(std::move(server)), _success(true), _response(response), _rtt(rtt) { const auto topologyVersionField = response.getField("topologyVersion"); if (topologyVersionField) { @@ -89,7 +91,7 @@ public: } } - // failure constructor + // Failure constructor. IsMasterOutcome(ServerAddress server, BSONObj response, std::string errorMsg) : _server(std::move(server)), _success(false), _errorMsg(errorMsg) { const auto topologyVersionField = response.getField("topologyVersion"); @@ -108,16 +110,17 @@ public: private: ServerAddress _server; - // indicating the success or failure of the attempt + // Indicates the success or failure of the attempt. bool _success; - // an error message in case of failure + // An error message in case of failure. std::string _errorMsg; - // a document containing the command response (or boost::none if it failed) + // A document containing the command response (or boost::none if it failed). boost::optional<BSONObj> _response; - // the round trip time to execute the command (or null if it failed) + // The round trip time to execute the command (or boost::none if it failed or is not the outcome + // from an initial handshake exchange). boost::optional<IsMasterRTT> _rtt; - // indicates how fresh the topology information in this reponse is (or boost::none if it failed - // or the response did not include this) + // Indicates how fresh the topology information in this reponse is (or boost::none if it failed + // or the response did not include this). boost::optional<TopologyVersion> _topologyVersion; }; diff --git a/src/mongo/client/sdam/sdam_json_test_runner.cpp b/src/mongo/client/sdam/sdam_json_test_runner.cpp index 9c750ad977b..b2a6cd0b011 100644 --- a/src/mongo/client/sdam/sdam_json_test_runner.cpp +++ b/src/mongo/client/sdam/sdam_json_test_runner.cpp @@ -192,7 +192,8 @@ public: if (bsonIsMaster.nFields() == 0) { _isMasterResponses.push_back(IsMasterOutcome(address, BSONObj(), "network error")); } else { - _isMasterResponses.push_back(IsMasterOutcome(address, bsonIsMaster, kLatency)); + _isMasterResponses.push_back( + IsMasterOutcome(address, bsonIsMaster, duration_cast<IsMasterRTT>(kLatency))); } } _topologyOutcome = phase["outcome"].Obj(); diff --git a/src/mongo/client/sdam/server_description.cpp b/src/mongo/client/sdam/server_description.cpp index 62092a0de6c..79b9cafdecf 100644 --- a/src/mongo/client/sdam/server_description.cpp +++ b/src/mongo/client/sdam/server_description.cpp @@ -62,7 +62,7 @@ ServerDescription::ServerDescription(ClockSource* clockSource, // type must be parsed before RTT is calculated. parseTypeFromIsMaster(response); - calculateRtt(*isMasterOutcome.getRtt(), lastRtt); + calculateRtt(isMasterOutcome.getRtt(), lastRtt); _lastUpdateTime = clockSource->now(); _minWireVersion = response["minWireVersion"].numberInt(); @@ -160,7 +160,7 @@ void ServerDescription::saveStreamable(BSONElement streamableField) { _streamable = streamableField && streamableField.Bool(); } -void ServerDescription::calculateRtt(const IsMasterRTT currentRtt, +void ServerDescription::calculateRtt(const boost::optional<IsMasterRTT> currentRtt, const boost::optional<IsMasterRTT> lastRtt) { if (getType() == ServerType::kUnknown) { // if a server's type is Unknown, it's RTT is null @@ -169,12 +169,28 @@ void ServerDescription::calculateRtt(const IsMasterRTT currentRtt, return; } - if (lastRtt) { + if (currentRtt == boost::none) { + // An onServerHeartbeatSucceededEvent occured. Note: This should not be reached by an + // onServerHeartbeatFailedEvent. Upon the failed event, the type is set to + // ServerType::Unknown. + + // The ServerType is no longer ServerType::Unknown, but the ServerPingMonitor has not + // updated the RTT yet. Set the _rtt to max() until the ServerPingMonitor provides the + // accurate RTT measurement. + if (lastRtt == boost::none) { + _rtt = IsMasterRTT::max(); + return; + } + + // Do not update the RTT upon an onServerHeartbeatSucceededEvent. + _rtt = lastRtt; + } else if (lastRtt == boost::none || lastRtt == IsMasterRTT::max()) { + // The lastRtt either does not exist or is not accurate. Discard it and use the currentRtt. + _rtt = currentRtt; + } else { // new_rtt = alpha * x + (1 - alpha) * old_rtt - _rtt = IsMasterRTT(static_cast<IsMasterRTT::rep>(kRttAlpha * currentRtt.count() + + _rtt = IsMasterRTT(static_cast<IsMasterRTT::rep>(kRttAlpha * currentRtt.get().count() + (1 - kRttAlpha) * lastRtt.get().count())); - } else { - _rtt = currentRtt; } } @@ -429,7 +445,8 @@ std::string ServerDescription::toString() const { ServerDescriptionPtr ServerDescription::cloneWithRTT(IsMasterRTT rtt) { auto newServerDescription = std::make_shared<ServerDescription>(*this); - newServerDescription->_rtt = rtt; + auto lastRtt = newServerDescription->getRtt(); + newServerDescription->calculateRtt(rtt, lastRtt); return newServerDescription; } diff --git a/src/mongo/client/sdam/server_description.h b/src/mongo/client/sdam/server_description.h index eedc5d37599..159d7948738 100644 --- a/src/mongo/client/sdam/server_description.h +++ b/src/mongo/client/sdam/server_description.h @@ -117,7 +117,8 @@ private: void parseTypeFromIsMaster(const BSONObj isMaster); - void calculateRtt(const IsMasterRTT currentRtt, const boost::optional<IsMasterRTT> lastRtt); + void calculateRtt(const boost::optional<IsMasterRTT> currentRtt, + const boost::optional<IsMasterRTT> lastRtt); void saveLastWriteInfo(BSONObj lastWriteBson); void storeHostListIfPresent(const std::string key, diff --git a/src/mongo/client/sdam/server_description_test.cpp b/src/mongo/client/sdam/server_description_test.cpp index 1c9e1156707..8ed40911927 100644 --- a/src/mongo/client/sdam/server_description_test.cpp +++ b/src/mongo/client/sdam/server_description_test.cpp @@ -364,35 +364,41 @@ TEST_F(ServerDescriptionTestFixture, ShouldStoreRTTNullWhenServerTypeIsUnknown) } TEST_F(ServerDescriptionTestFixture, - ShouldStoreMovingAverageRTTWhenChangingFromOneKnownServerTypeToAnother) { - auto response = IsMasterOutcome("foo:1234", kBsonRsPrimary, mongo::Milliseconds(40)); + ShouldStoreConstantRTTWhenChangingFromOneKnownServerTypeToAnother) { + // Simulate a non-ping monitoring response. + auto response = IsMasterOutcome("foo:1234", kBsonRsPrimary); auto lastServerDescription = ServerDescriptionBuilder() .withType(ServerType::kRSSecondary) .withRtt(mongo::Milliseconds(20)) .instance(); + + // Check the RTT is unchanged since the IsMasterOutcome does not contain an RTT. auto description = ServerDescription(clockSource, response, lastServerDescription->getRtt()); - ASSERT_EQUALS(24, durationCount<mongo::Milliseconds>(*description.getRtt())); + ASSERT_EQUALS(20, durationCount<mongo::Milliseconds>(*description.getRtt())); - auto response2 = IsMasterOutcome("foo:1234", kBsonRsPrimary, mongo::Milliseconds(30)); + auto response2 = IsMasterOutcome("foo:1234", kBsonRsPrimary); auto description2 = ServerDescription(clockSource, response2, description.getRtt()); - ASSERT_EQUALS(25, durationCount<mongo::Milliseconds>(*description2.getRtt())); + ASSERT_EQUALS(20, durationCount<mongo::Milliseconds>(*description2.getRtt())); } TEST_F(ServerDescriptionTestFixture, ShouldStoreLastWriteDate) { - auto response = IsMasterOutcome("foo:1234", kBsonLastWrite, mongo::Milliseconds(40)); + auto response = IsMasterOutcome( + "foo:1234", kBsonLastWrite, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); auto description = ServerDescription(clockSource, response); ASSERT_EQUALS(kLastWriteDate, description.getLastWriteDate()); } TEST_F(ServerDescriptionTestFixture, ShouldStoreOpTime) { - auto response = IsMasterOutcome("foo:1234", kBsonLastWrite, mongo::Milliseconds(40)); + auto response = IsMasterOutcome( + "foo:1234", kBsonLastWrite, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); auto description = ServerDescription(clockSource, response); ASSERT_EQUALS(kOpTime, description.getOpTime()); } TEST_F(ServerDescriptionTestFixture, ShouldStoreLastUpdateTime) { auto testStart = clockSource->now(); - auto response = IsMasterOutcome("foo:1234", kBsonRsPrimary, mongo::Milliseconds(40)); + auto response = IsMasterOutcome( + "foo:1234", kBsonRsPrimary, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); auto description = ServerDescription(clockSource, response); ASSERT_GREATER_THAN_OR_EQUALS(description.getLastUpdateTime(), testStart); } @@ -400,7 +406,8 @@ TEST_F(ServerDescriptionTestFixture, ShouldStoreLastUpdateTime) { // Disabling these tests since this causes jstest failures when // running on a host with a mixed case hostname. // TEST_F(ServerDescriptionTestFixture, ShouldStoreHostNamesAsLowercase) { -// auto response = IsMasterOutcome("FOO:1234", kBsonHostNames, mongo::Milliseconds(40)); +// auto response = IsMasterOutcome("FOO:1234", kBsonHostNames, +// duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); // auto description = ServerDescription(clockSource, response); // // ASSERT_EQUALS("foo:1234", description.getAddress()); @@ -419,20 +426,23 @@ TEST_F(ServerDescriptionTestFixture, ShouldStoreLastUpdateTime) { //} TEST_F(ServerDescriptionTestFixture, ShouldStoreMinMaxWireVersion) { - auto response = IsMasterOutcome("foo:1234", kBsonWireVersion, mongo::Milliseconds(40)); + auto response = IsMasterOutcome( + "foo:1234", kBsonWireVersion, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); auto description = ServerDescription(clockSource, response); ASSERT_EQUALS(kBsonWireVersion["minWireVersion"].Int(), description.getMinWireVersion()); ASSERT_EQUALS(kBsonWireVersion["maxWireVersion"].Int(), description.getMaxWireVersion()); } TEST_F(ServerDescriptionTestFixture, ShouldStoreTags) { - auto response = IsMasterOutcome("foo:1234", kBsonTags, mongo::Milliseconds(40)); + auto response = + IsMasterOutcome("foo:1234", kBsonTags, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); auto description = ServerDescription(clockSource, response); ASSERT_EQUALS(toStringMap(kBsonTags["tags"].Obj()), description.getTags()); } TEST_F(ServerDescriptionTestFixture, ShouldStoreSetVersionAndName) { - auto response = IsMasterOutcome("foo:1234", kBsonSetVersionName, mongo::Milliseconds(40)); + auto response = IsMasterOutcome( + "foo:1234", kBsonSetVersionName, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); auto description = ServerDescription(clockSource, response); ASSERT_EQUALS(kBsonSetVersionName.getIntField("setVersion"), description.getSetVersion()); ASSERT_EQUALS(std::string(kBsonSetVersionName.getStringField("setName")), @@ -440,27 +450,31 @@ TEST_F(ServerDescriptionTestFixture, ShouldStoreSetVersionAndName) { } TEST_F(ServerDescriptionTestFixture, ShouldStoreElectionId) { - auto response = IsMasterOutcome("foo:1234", kBsonElectionId, mongo::Milliseconds(40)); + auto response = IsMasterOutcome( + "foo:1234", kBsonElectionId, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); auto description = ServerDescription(clockSource, response); ASSERT_EQUALS(kBsonElectionId.getField("electionId").OID(), description.getElectionId()); } TEST_F(ServerDescriptionTestFixture, ShouldStorePrimary) { - auto response = IsMasterOutcome("foo:1234", kBsonPrimary, mongo::Milliseconds(40)); + auto response = IsMasterOutcome( + "foo:1234", kBsonPrimary, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); auto description = ServerDescription(clockSource, response); ASSERT_EQUALS(std::string(kBsonPrimary.getStringField("primary")), description.getPrimary()); } TEST_F(ServerDescriptionTestFixture, ShouldStoreLogicalSessionTimeout) { - auto response = - IsMasterOutcome("foo:1234", kBsonLogicalSessionTimeout, mongo::Milliseconds(40)); + auto response = IsMasterOutcome("foo:1234", + kBsonLogicalSessionTimeout, + duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); auto description = ServerDescription(clockSource, response); ASSERT_EQUALS(kBsonLogicalSessionTimeout.getIntField("logicalSessionTimeoutMinutes"), description.getLogicalSessionTimeoutMinutes()); } TEST_F(ServerDescriptionTestFixture, ShouldStoreTopologyVersion) { - auto response = IsMasterOutcome("foo:1234", kTopologyVersion, mongo::Milliseconds(40)); + auto response = IsMasterOutcome( + "foo:1234", kTopologyVersion, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); auto topologyVersion = TopologyVersion::parse(IDLParserErrorContext("TopologyVersion"), kTopologyVersion.getObjectField("topologyVersion")); @@ -475,13 +489,15 @@ TEST_F(ServerDescriptionTestFixture, ShouldStoreTopologyVersion) { } TEST_F(ServerDescriptionTestFixture, ShouldStoreStreamable) { - auto response = IsMasterOutcome("foo:1234", kStreamable, mongo::Milliseconds(40)); + auto response = IsMasterOutcome( + "foo:1234", kStreamable, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); auto description = ServerDescription(clockSource, response); ASSERT_EQUALS(true, description.isStreamable()); } TEST_F(ServerDescriptionTestFixture, ShouldStorePoolResetCounter) { - auto response = IsMasterOutcome("foo:1234", kStreamable, mongo::Milliseconds(40)); + auto response = IsMasterOutcome( + "foo:1234", kStreamable, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); auto description = ServerDescription( clockSource, response, boost::none /*lastRtt*/, boost::none /*topologyVersion*/, 1); ASSERT_EQUALS(1, description.getPoolResetCounter()); @@ -495,7 +511,8 @@ TEST_F(ServerDescriptionTestFixture, ShouldStoreServerAddressOnError) { } TEST_F(ServerDescriptionTestFixture, ShouldStoreCorrectDefaultValuesOnSuccess) { - auto response = IsMasterOutcome("foo:1234", kBsonOk, mongo::Milliseconds(40)); + auto response = + IsMasterOutcome("foo:1234", kBsonOk, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); auto description = ServerDescription(clockSource, response); ASSERT_EQUALS(boost::none, description.getError()); ASSERT_EQUALS(boost::none, description.getLastWriteDate()); diff --git a/src/mongo/client/sdam/server_selector.h b/src/mongo/client/sdam/server_selector.h index 01152fb13b2..b05ad6be90f 100644 --- a/src/mongo/client/sdam/server_selector.h +++ b/src/mongo/client/sdam/server_selector.h @@ -177,10 +177,12 @@ private: // This is used to filter out servers based on their current latency measurements. struct LatencyWindow { const IsMasterRTT lower; - const IsMasterRTT upper; + IsMasterRTT upper; explicit LatencyWindow(const IsMasterRTT lowerBound, const IsMasterRTT windowWidth) - : lower(lowerBound), upper(lowerBound + windowWidth) {} + : lower(lowerBound) { + upper = (lowerBound == IsMasterRTT::max()) ? lowerBound : lowerBound + windowWidth; + } bool isWithinWindow(IsMasterRTT latency); diff --git a/src/mongo/client/sdam/topology_listener.cpp b/src/mongo/client/sdam/topology_listener.cpp index 147fa7dcb26..34d8ef63563 100644 --- a/src/mongo/client/sdam/topology_listener.cpp +++ b/src/mongo/client/sdam/topology_listener.cpp @@ -26,12 +26,8 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork - #include "mongo/client/sdam/topology_listener.h" -#include "mongo/util/log.h" - namespace mongo::sdam { void TopologyEventsPublisher::registerListener(TopologyListenerPtr listener) { @@ -120,10 +116,30 @@ void TopologyEventsPublisher::_scheduleNextDelivery() { } void TopologyEventsPublisher::onServerPingFailedEvent(const ServerAddress& hostAndPort, - const Status& status) {} + const Status& status) { + { + stdx::lock_guard lock(_eventQueueMutex); + EventPtr event = std::make_unique<Event>(); + event->type = EventType::PING_FAILURE; + event->hostAndPort = hostAndPort; + event->status = status; + _eventQueue.push_back(std::move(event)); + } + _scheduleNextDelivery(); +} void TopologyEventsPublisher::onServerPingSucceededEvent(IsMasterRTT durationMS, - const ServerAddress& hostAndPort) {} + const ServerAddress& hostAndPort) { + { + stdx::lock_guard lock(_eventQueueMutex); + EventPtr event = std::make_unique<Event>(); + event->type = EventType::PING_SUCCESS; + event->duration = duration_cast<IsMasterRTT>(durationMS); + event->hostAndPort = hostAndPort; + _eventQueue.push_back(std::move(event)); + } + _scheduleNextDelivery(); +} // note that this could be done in batches if it is a bottleneck. void TopologyEventsPublisher::_nextDelivery() { @@ -178,6 +194,13 @@ void TopologyEventsPublisher::_sendEvent(TopologyListenerPtr listener, const Eve event.hostAndPort, event.reply); break; + case EventType::PING_SUCCESS: + listener->onServerPingSucceededEvent(duration_cast<IsMasterRTT>(event.duration), + event.hostAndPort); + break; + case EventType::PING_FAILURE: + listener->onServerPingFailedEvent(event.hostAndPort, event.status); + break; default: MONGO_UNREACHABLE; } diff --git a/src/mongo/client/sdam/topology_listener_mock.cpp b/src/mongo/client/sdam/topology_listener_mock.cpp index 5b8bcd4a6b0..7dddbaefea6 100644 --- a/src/mongo/client/sdam/topology_listener_mock.cpp +++ b/src/mongo/client/sdam/topology_listener_mock.cpp @@ -34,20 +34,29 @@ namespace mongo::sdam { -void TopologyListenerMock::onServerPingFailedEvent(const ServerAddress& hostAndPort, - const Status& status) { +void TopologyListenerMock::onServerPingSucceededEvent(IsMasterRTT latency, + const ServerAddress& hostAndPort) { stdx::lock_guard lk(_mutex); - invariant(!_hasPingResponse_inlock(hostAndPort)); - _serverPingRTTs.emplace(hostAndPort, status); + auto it = _serverPingRTTs.find(hostAndPort); + if (it != _serverPingRTTs.end()) { + it->second.emplace_back(latency); + } else { + _serverPingRTTs.emplace(hostAndPort, std::vector<StatusWith<IsMasterRTT>>{latency}); + } } -void TopologyListenerMock::onServerPingSucceededEvent(IsMasterRTT durationMS, - const ServerAddress& hostAndPort) { +void TopologyListenerMock::onServerPingFailedEvent(const ServerAddress& hostAndPort, + const Status& errorStatus) { stdx::lock_guard lk(_mutex); - invariant(!_hasPingResponse_inlock(hostAndPort)); - _serverPingRTTs.emplace(hostAndPort, durationMS); + // If the map already contains an element for hostAndPort, append to its already existing + // vector. Otherwise, create a new vector. + auto it = _serverPingRTTs.find(hostAndPort); + if (it != _serverPingRTTs.end()) { + it->second.emplace_back(errorStatus); + } else { + _serverPingRTTs.emplace(hostAndPort, std::vector<StatusWith<IsMasterRTT>>{errorStatus}); + } } - bool TopologyListenerMock::hasPingResponse(const ServerAddress& hostAndPort) { stdx::lock_guard lk(_mutex); return _hasPingResponse_inlock(hostAndPort); @@ -57,7 +66,8 @@ bool TopologyListenerMock::_hasPingResponse_inlock(const ServerAddress& hostAndP return _serverPingRTTs.find(hostAndPort) != _serverPingRTTs.end(); } -StatusWith<IsMasterRTT> TopologyListenerMock::getPingResponse(const ServerAddress& hostAndPort) { +std::vector<StatusWith<IsMasterRTT>> TopologyListenerMock::getPingResponse( + const ServerAddress& hostAndPort) { stdx::lock_guard lk(_mutex); invariant(_hasPingResponse_inlock(hostAndPort)); auto it = _serverPingRTTs.find(hostAndPort); diff --git a/src/mongo/client/sdam/topology_listener_mock.h b/src/mongo/client/sdam/topology_listener_mock.h index 0184d1ce81e..05207512ea1 100644 --- a/src/mongo/client/sdam/topology_listener_mock.h +++ b/src/mongo/client/sdam/topology_listener_mock.h @@ -39,9 +39,10 @@ class TopologyListenerMock : public TopologyListener { public: TopologyListenerMock() = default; virtual ~TopologyListenerMock() = default; + + void onServerPingSucceededEvent(IsMasterRTT latency, const ServerAddress& hostAndPort) override; + void onServerPingFailedEvent(const ServerAddress& hostAndPort, const Status& status) override; - void onServerPingSucceededEvent(IsMasterRTT durationMS, - const ServerAddress& hostAndPort) override; /** * Acquires _mutex before calling _hasPingResponse_inlock(). @@ -58,11 +59,11 @@ public: * Returns the response for the most recent onServerPing event. MUST be called after a ping has * been sent and proccessed in order to remove it from the map and make room for the next. */ - StatusWith<IsMasterRTT> getPingResponse(const ServerAddress& hostAndPort); + std::vector<StatusWith<IsMasterRTT>> getPingResponse(const ServerAddress& hostAndPort); private: Mutex _mutex; - stdx::unordered_map<ServerAddress, StatusWith<IsMasterRTT>> _serverPingRTTs; + stdx::unordered_map<ServerAddress, std::vector<StatusWith<IsMasterRTT>>> _serverPingRTTs; }; } // namespace mongo::sdam diff --git a/src/mongo/client/sdam/topology_manager.cpp b/src/mongo/client/sdam/topology_manager.cpp index a4847391cf6..75242baa796 100644 --- a/src/mongo/client/sdam/topology_manager.cpp +++ b/src/mongo/client/sdam/topology_manager.cpp @@ -129,21 +129,22 @@ const std::shared_ptr<TopologyDescription> TopologyManager::getTopologyDescripti } void TopologyManager::onServerRTTUpdated(ServerAddress hostAndPort, IsMasterRTT rtt) { - stdx::lock_guard<mongo::Mutex> lock(_mutex); + { + stdx::lock_guard<mongo::Mutex> lock(_mutex); - auto oldServerDescription = _topologyDescription->findServerByAddress(hostAndPort); - if (oldServerDescription) { - auto newServerDescription = (*oldServerDescription)->cloneWithRTT(rtt); + auto oldServerDescription = _topologyDescription->findServerByAddress(hostAndPort); + if (oldServerDescription) { + auto newServerDescription = (*oldServerDescription)->cloneWithRTT(rtt); - auto oldTopologyDescription = _topologyDescription; - _topologyDescription = std::make_shared<TopologyDescription>(*_topologyDescription); - _topologyDescription->installServerDescription(newServerDescription); + auto oldTopologyDescription = _topologyDescription; + _topologyDescription = std::make_shared<TopologyDescription>(*_topologyDescription); + _topologyDescription->installServerDescription(newServerDescription); - _publishTopologyDescriptionChanged(oldTopologyDescription, _topologyDescription); + _publishTopologyDescriptionChanged(oldTopologyDescription, _topologyDescription); - return; + return; + } } - // otherwise, the server was removed from the topology. Nothing to do. LOGV2(4333201, "Not updating RTT. Server {server} does not exist in {setName}", diff --git a/src/mongo/client/sdam/topology_manager_test.cpp b/src/mongo/client/sdam/topology_manager_test.cpp index bc9e9256f1a..fb78bfbbf3f 100644 --- a/src/mongo/client/sdam/topology_manager_test.cpp +++ b/src/mongo/client/sdam/topology_manager_test.cpp @@ -74,8 +74,9 @@ TEST_F(TopologyManagerTestFixture, ShouldUpdateTopologyVersionOnSuccess) { ASSERT(serverDescription->getTopologyVersion() == boost::none); // If previous topologyVersion is boost::none, should update to new topologyVersion - auto isMasterOutcome = IsMasterOutcome( - serverDescription->getAddress(), kBsonTopologyVersionLow, mongo::Milliseconds(40)); + auto isMasterOutcome = IsMasterOutcome(serverDescription->getAddress(), + kBsonTopologyVersionLow, + duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); topologyManager.onServerDescription(isMasterOutcome); topologyDescription = topologyManager.getTopologyDescription(); auto newServerDescription = topologyDescription->getServers()[0]; @@ -83,8 +84,9 @@ TEST_F(TopologyManagerTestFixture, ShouldUpdateTopologyVersionOnSuccess) { kBsonTopologyVersionLow.getObjectField("topologyVersion")); // If previous topologyVersion is <= new topologyVersion, should update to new topologyVersion - isMasterOutcome = IsMasterOutcome( - serverDescription->getAddress(), kBsonTopologyVersionHigh, mongo::Milliseconds(40)); + isMasterOutcome = IsMasterOutcome(serverDescription->getAddress(), + kBsonTopologyVersionHigh, + duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); topologyManager.onServerDescription(isMasterOutcome); topologyDescription = topologyManager.getTopologyDescription(); newServerDescription = topologyDescription->getServers()[0]; @@ -102,8 +104,9 @@ TEST_F(TopologyManagerTestFixture, ShouldUpdateTopologyVersionOnErrorIfSent) { ASSERT(serverDescription->getTopologyVersion() == boost::none); // If previous topologyVersion is boost::none, should update to new topologyVersion - auto isMasterOutcome = IsMasterOutcome( - serverDescription->getAddress(), kBsonTopologyVersionLow, mongo::Milliseconds(40)); + auto isMasterOutcome = IsMasterOutcome(serverDescription->getAddress(), + kBsonTopologyVersionLow, + duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); topologyManager.onServerDescription(isMasterOutcome); topologyDescription = topologyManager.getTopologyDescription(); auto newServerDescription = topologyDescription->getServers()[0]; @@ -130,8 +133,9 @@ TEST_F(TopologyManagerTestFixture, ShouldNotUpdateServerDescriptionIfNewTopology ASSERT(serverDescription->getTopologyVersion() == boost::none); // If previous topologyVersion is boost::none, should update to new topologyVersion - auto isMasterOutcome = IsMasterOutcome( - serverDescription->getAddress(), kBsonTopologyVersionHigh, mongo::Milliseconds(40)); + auto isMasterOutcome = IsMasterOutcome(serverDescription->getAddress(), + kBsonTopologyVersionHigh, + duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); topologyManager.onServerDescription(isMasterOutcome); topologyDescription = topologyManager.getTopologyDescription(); auto newServerDescription = topologyDescription->getServers()[0]; @@ -139,8 +143,7 @@ TEST_F(TopologyManagerTestFixture, ShouldNotUpdateServerDescriptionIfNewTopology kBsonTopologyVersionHigh.getObjectField("topologyVersion")); // If isMasterOutcome is not successful, should preserve old topologyVersion - isMasterOutcome = IsMasterOutcome( - serverDescription->getAddress(), kBsonTopologyVersionLow, mongo::Milliseconds(40)); + isMasterOutcome = IsMasterOutcome(serverDescription->getAddress(), kBsonTopologyVersionLow); topologyManager.onServerDescription(isMasterOutcome); topologyDescription = topologyManager.getTopologyDescription(); newServerDescription = topologyDescription->getServers()[0]; @@ -158,8 +161,7 @@ TEST_F(TopologyManagerTestFixture, ShouldNowIncrementPoolResetCounterOnSuccess) ASSERT_EQUALS(serverDescription->getPoolResetCounter(), 0); // If isMasterOutcome is successful, poolResetCounter should remain the same - IsMasterOutcome isMasterOutcome( - serverDescription->getAddress(), kBsonOk, mongo::Milliseconds(40)); + IsMasterOutcome isMasterOutcome(serverDescription->getAddress(), kBsonOk); topologyManager.onServerDescription(isMasterOutcome); topologyDescription = topologyManager.getTopologyDescription(); auto newServerDescription = topologyDescription->getServers()[0]; diff --git a/src/mongo/client/server_ping_monitor.cpp b/src/mongo/client/server_ping_monitor.cpp index 3cb7f793bf2..c748d5d0498 100644 --- a/src/mongo/client/server_ping_monitor.cpp +++ b/src/mongo/client/server_ping_monitor.cpp @@ -33,6 +33,7 @@ #include "mongo/client/server_ping_monitor.h" +#include "mongo/client/sdam/sdam.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/network_interface_thread_pool.h" #include "mongo/executor/thread_pool_task_executor.h" @@ -50,7 +51,7 @@ using CallbackHandle = TaskExecutor::CallbackHandle; SingleServerPingMonitor::SingleServerPingMonitor(sdam::ServerAddress hostAndPort, sdam::TopologyListener* rttListener, - Seconds pingFrequency, + Milliseconds pingFrequency, std::shared_ptr<TaskExecutor> executor) : _hostAndPort(hostAndPort), _rttListener(rttListener), @@ -99,18 +100,19 @@ void SingleServerPingMonitor::_scheduleServerPing() { if (ErrorCodes::isShutdownError(schedulePingHandle.getStatus().code())) { LOGV2_DEBUG(23727, - 1, + kLogLevel, "Can't schedule ping for {hostAndPort}. Executor shutdown in progress", "hostAndPort"_attr = _hostAndPort); return; } if (!schedulePingHandle.isOK()) { - LOGV2_FATAL(31434, + LOGV2_FATAL(23732, "Can't continue scheduling pings to {hostAndPort} due to " "{schedulePingHandle_getStatus}", "hostAndPort"_attr = _hostAndPort, "schedulePingHandle_getStatus"_attr = redact(schedulePingHandle.getStatus())); + fassertFailed(31434); } _pingHandle = std::move(schedulePingHandle.getValue()); @@ -154,17 +156,18 @@ void SingleServerPingMonitor::_doServerPing() { if (ErrorCodes::isShutdownError(remotePingHandle.getStatus().code())) { LOGV2_DEBUG(23728, - 1, + kLogLevel, "Can't ping {hostAndPort}. Executor shutdown in progress", "hostAndPort"_attr = _hostAndPort); return; } if (!remotePingHandle.isOK()) { - LOGV2_FATAL(31435, + LOGV2_FATAL(23733, "Can't continue pinging {hostAndPort} due to {remotePingHandle_getStatus}", "hostAndPort"_attr = _hostAndPort, "remotePingHandle_getStatus"_attr = redact(remotePingHandle.getStatus())); + fassertFailed(31435); } // Update the _pingHandle so the ping can be canceled if the SingleServerPingMonitor gets @@ -173,11 +176,9 @@ void SingleServerPingMonitor::_doServerPing() { } ServerPingMonitor::ServerPingMonitor(sdam::TopologyListener* rttListener, - Seconds pingFrequency, - boost::optional<std::shared_ptr<TaskExecutor>> executor) - : _rttListener(rttListener), - _pingFrequency(pingFrequency), - _executor(executor.get_value_or({})) {} + Milliseconds pingFrequency, + std::shared_ptr<TaskExecutor> executor) + : _rttListener(rttListener), _pingFrequency(pingFrequency), _executor(executor) {} ServerPingMonitor::~ServerPingMonitor() { shutdown(); @@ -201,25 +202,6 @@ void ServerPingMonitor::shutdown() { for (auto& [hostAndPort, singleMonitor] : serverPingMonitorMap) { singleMonitor->drop(); } - - if (executor) { - executor->shutdown(); - executor->join(); - } -} - -void ServerPingMonitor::_setupTaskExecutor_inlock() { - if (_isShutdown || _executor) { - // Do not restart the _executor if it is in shutdown or already provided from a test. - return; - } else { - auto hookList = std::make_unique<rpc::EgressMetadataHookList>(); - auto net = executor::makeNetworkInterface( - "ServerPingMonitor-TaskExecutor", nullptr, std::move(hookList)); - auto pool = std::make_unique<NetworkInterfaceThreadPool>(net.get()); - _executor = std::make_shared<ThreadPoolTaskExecutor>(std::move(pool), std::move(net)); - _executor->startup(); - } } void ServerPingMonitor::onServerHandshakeCompleteEvent(sdam::IsMasterRTT durationMs, @@ -231,33 +213,48 @@ void ServerPingMonitor::onServerHandshakeCompleteEvent(sdam::IsMasterRTT duratio << "' due to shutdown", !_isShutdown); - _setupTaskExecutor_inlock(); - invariant(_serverPingMonitorMap.find(address) == _serverPingMonitorMap.end()); + if (_serverPingMonitorMap.find(address) != _serverPingMonitorMap.end()) { + LOGV2_DEBUG(466811, + kLogLevel + 1, + "ServerPingMonitor already monitoring {address}", + "address"_attr = address); + return; + } auto newSingleMonitor = std::make_shared<SingleServerPingMonitor>(address, _rttListener, _pingFrequency, _executor); _serverPingMonitorMap[address] = newSingleMonitor; newSingleMonitor->init(); - LOGV2_DEBUG( - 23729, 1, "ServerPingMonitor is now monitoring {address}", "address"_attr = address); + LOGV2_DEBUG(23729, + kLogLevel, + "ServerPingMonitor is now monitoring {address}", + "address"_attr = address); } -void ServerPingMonitor::onServerClosedEvent(const sdam::ServerAddress& address, OID topologyId) { +void ServerPingMonitor::onTopologyDescriptionChangedEvent( + UUID topologyId, + sdam::TopologyDescriptionPtr previousDescription, + sdam::TopologyDescriptionPtr newDescription) { stdx::lock_guard lk(_mutex); if (_isShutdown) { - LOGV2_DEBUG(23730, - 1, - "ServerPingMonitor is in shutdown and will stop monitoring {address} if it has " - "not already done so.", - "address"_attr = address); return; } - auto it = _serverPingMonitorMap.find(address); - invariant(it != _serverPingMonitorMap.end()); - it->second->drop(); - _serverPingMonitorMap.erase(it); - LOGV2_DEBUG( - 23731, 1, "ServerPingMonitor stopped monitoring {address}", "address"_attr = address); -} + // Remove monitors that are missing from the topology. + auto it = _serverPingMonitorMap.begin(); + while (it != _serverPingMonitorMap.end()) { + const auto& serverAddress = it->first; + if (newDescription->findServerByAddress(serverAddress) == boost::none) { + auto& singleMonitor = _serverPingMonitorMap[serverAddress]; + singleMonitor->drop(); + LOGV2_DEBUG(462899, + kLogLevel, + "ServerPingMonitor for host {addr} was removed from being monitored.", + "addr"_attr = serverAddress); + it = _serverPingMonitorMap.erase(it, ++it); + } else { + ++it; + } + } +} } // namespace mongo diff --git a/src/mongo/client/server_ping_monitor.h b/src/mongo/client/server_ping_monitor.h index a7289a79d99..80ebc8be95d 100644 --- a/src/mongo/client/server_ping_monitor.h +++ b/src/mongo/client/server_ping_monitor.h @@ -44,7 +44,7 @@ class SingleServerPingMonitor : public std::enable_shared_from_this<SingleServer public: explicit SingleServerPingMonitor(sdam::ServerAddress hostAndPort, sdam::TopologyListener* rttListener, - Seconds pingFrequency, + Milliseconds pingFrequency, std::shared_ptr<executor::TaskExecutor> executor); /** @@ -97,7 +97,7 @@ private: /** * The frequency at which ping requests should be sent to measure the round trip time. */ - Seconds _pingFrequency; + Milliseconds _pingFrequency; std::shared_ptr<executor::TaskExecutor> _executor; @@ -107,6 +107,8 @@ private: */ Date_t _nextPingStartDate{}; + static constexpr auto kLogLevel = 0; + /** * Must be held to access any of the member variables below. */ @@ -132,14 +134,9 @@ class ServerPingMonitor : public sdam::TopologyListener { ServerPingMonitor& operator=(const ServerPingMonitor&) = delete; public: - /** - * Note: The ServerPingMonitor creates its own executor by default. It takes in an executor for - * testing only. - */ - ServerPingMonitor( - sdam::TopologyListener* rttListener, - Seconds pingFrequency, - boost::optional<std::shared_ptr<executor::TaskExecutor>> executor = boost::none); + ServerPingMonitor(sdam::TopologyListener* rttListener, + Milliseconds pingFrequency, + std::shared_ptr<executor::TaskExecutor> executor); ~ServerPingMonitor(); /** @@ -148,7 +145,7 @@ public: void shutdown(); /** - * The first isMaster exchange for a server succeeded. Creates a new + * The first isMaster exchange for a connection to the server succeeded. Creates a new * SingleServerPingMonitor to monitor the new replica set member. */ void onServerHandshakeCompleteEvent(sdam::IsMasterRTT durationMs, @@ -156,16 +153,15 @@ public: const BSONObj reply = BSONObj()); /** - * The connection to the server was closed. Removes the server from the ServerPingMonitorList. + * Drop corresponding SingleServerPingMonitors if the server is not included in the + * newDescritpion. */ - void onServerClosedEvent(const sdam::ServerAddress& address, OID topologyId); + void onTopologyDescriptionChangedEvent(UUID topologyId, + sdam::TopologyDescriptionPtr previousDescription, + sdam::TopologyDescriptionPtr newDescription); private: /** - * Sets up and starts up the _executor if it did not already exist. - */ - void _setupTaskExecutor_inlock(); - /** * Listens for when new RTT (Round Trip Time) values are published. */ sdam::TopologyListener* _rttListener; @@ -173,13 +169,15 @@ private: /** * The interval at which ping requests should be sent to measure the RTT (Round Trip Time). */ - Seconds _pingFrequency; + Milliseconds _pingFrequency; /** * Executor for performing server monitoring pings for all of the replica set members. */ std::shared_ptr<executor::TaskExecutor> _executor; + static constexpr auto kLogLevel = 0; + mutable Mutex _mutex = MONGO_MAKE_LATCH("ServerPingMonitor::mutex"); /** diff --git a/src/mongo/client/server_ping_monitor_test.cpp b/src/mongo/client/server_ping_monitor_test.cpp index 05a7f0603d0..0eba77e7941 100644 --- a/src/mongo/client/server_ping_monitor_test.cpp +++ b/src/mongo/client/server_ping_monitor_test.cpp @@ -33,7 +33,7 @@ #include <memory> -#include "mongo/client/sdam/sdam_datatypes.h" +#include "mongo/client/sdam/sdam.h" #include "mongo/client/sdam/topology_listener_mock.h" #include "mongo/client/server_ping_monitor.h" #include "mongo/dbtests/mock/mock_replica_set.h" @@ -161,8 +161,10 @@ protected: ASSERT_TRUE(_topologyListener->hasPingResponse(hostAndPort)); ASSERT_LT(elapsed(), deadline); auto pingResponse = _topologyListener->getPingResponse(hostAndPort); - ASSERT(pingResponse.isOK()); + // There should only be one isMaster response queued up. + ASSERT_EQ(pingResponse.size(), 1); + ASSERT(pingResponse[0].isOK()); checkNoActivityBefore(deadline, hostAndPort); } @@ -178,6 +180,40 @@ protected: } } + /** + * Since the SingleServerPingMonitor is removed upon an onTopologyDescriptionChangedEvent, + * prompt the event with a new TopologyDescription that does not include hostToDrop. + */ + void closeMonitor(MockReplicaSet* replSet, + sdam::ServerAddress hostToDrop, + ServerPingMonitor* pingMonitor) { + auto hostAndPorts = replSet->getHosts(); + std::vector<sdam::ServerAddress> hosts; + std::transform(hostAndPorts.begin(), + hostAndPorts.end(), + std::back_inserter(hosts), + [](const auto& hostAndPort) { return hostAndPort.toString(); }); + + auto sdamConfigOld = sdam::SdamConfiguration(hosts); + auto topologyDescriptionOld = std::make_shared<sdam::TopologyDescription>(sdamConfigOld); + + + std::vector<sdam::ServerAddress> hostsNew(hosts.begin(), hosts.end()); + hostsNew.erase(std::remove_if(hostsNew.begin(), + hostsNew.end(), + [&](auto host) { return host == hostToDrop; }), + hostsNew.end()); + // Since the seedlist cannot be empty, the new TopologyDescription contains an empty + // HostAndPort. + if (hostsNew.size() == 0) { + hostsNew.emplace_back(HostAndPort().toString()); + } + auto sdamConfigNew = sdam::SdamConfiguration(hostsNew); + auto topologyDescriptionNew = std::make_shared<sdam::TopologyDescription>(sdamConfigNew); + pingMonitor->onTopologyDescriptionChangedEvent( + UUID::gen(), topologyDescriptionOld, topologyDescriptionNew); + } + private: Date_t _startDate; std::unique_ptr<sdam::TopologyListenerMock> _topologyListener; @@ -277,7 +313,8 @@ TEST_F(SingleServerPingMonitorTest, pingDeadServer) { ASSERT_TRUE(topologyListener->hasPingResponse(hostAndPort)); auto pingResponse = topologyListener->getPingResponse(hostAndPort); - ASSERT_EQ(ErrorCodes::HostUnreachable, pingResponse.getStatus()); + ASSERT_EQ(pingResponse.size(), 1); + ASSERT_EQ(ErrorCodes::HostUnreachable, pingResponse[0].getStatus()); checkNoActivityBefore(deadline); }; @@ -310,14 +347,13 @@ TEST_F(SingleServerPingMonitorTest, noPingAfterSingleServerPingMonitorClosed) { class ServerPingMonitorTest : public ServerPingMonitorTestFixture { protected: std::unique_ptr<ServerPingMonitor> makeServerPingMonitor(Seconds pingFrequency) { - auto executor = boost::optional<std::shared_ptr<executor::TaskExecutor>>(getExecutor()); - return std::make_unique<ServerPingMonitor>(getTopologyListener(), pingFrequency, executor); + return std::make_unique<ServerPingMonitor>( + getTopologyListener(), pingFrequency, getExecutor()); } }; /** - * Adds and removes a SingleServerPingMonitor from the ServerPingMonitor via - * onServerHandshakeCompleteEvent and onServerClosedEvent. + * Adds and removes a SingleServerPingMonitor from the ServerPingMonitor. */ TEST_F(ServerPingMonitorTest, singleNodeServerPingMonitorCycle) { auto pingFrequency = Seconds(10); @@ -326,7 +362,6 @@ TEST_F(ServerPingMonitorTest, singleNodeServerPingMonitorCycle) { "test", 1, /* hasPrimary = */ false, /* dollarPrefixHosts = */ false); auto hostAndPort = HostAndPort(replSet->getSecondaries()[0]).toString(); - auto oid = OID::gen(); // Add a SingleServerPingMonitor to the ServerPingMonitor. Confirm pings are sent to the server // at pingFrequency. @@ -336,7 +371,7 @@ TEST_F(ServerPingMonitorTest, singleNodeServerPingMonitorCycle) { // Close the SingleServerMonitor before the third ping and confirm ping activity to the server // is stopped. - serverPingMonitor->onServerClosedEvent(hostAndPort, oid); + closeMonitor(replSet.get(), hostAndPort, serverPingMonitor.get()); checkNoActivityBefore(elapsed() + pingFrequency * 2, hostAndPort); } @@ -353,7 +388,6 @@ TEST_F(ServerPingMonitorTest, twoNodeServerPingMonitorOneClosed) { auto hosts = replSet->getHosts(); auto host0 = hosts[0].toString(); auto host1 = hosts[1].toString(); - auto oid0 = OID::gen(); // Add SingleServerPingMonitors for host0 and host1 where host1 is added host1Delay seconds // after host0. @@ -364,7 +398,7 @@ TEST_F(ServerPingMonitorTest, twoNodeServerPingMonitorOneClosed) { serverPingMonitor->onServerHandshakeCompleteEvent(initialRTT, host1); checkSinglePing(pingFrequency - Seconds(2), host1, replSet.get()); - serverPingMonitor->onServerClosedEvent(host0, oid0); + closeMonitor(replSet.get(), host0, serverPingMonitor.get()); checkNoActivityBefore(pingFrequency + host1Delay, host0); // Confirm that host1's SingleServerPingMonitor continues ping activity. diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp index 65e24727c0c..2c286de0368 100644 --- a/src/mongo/client/streamable_replica_set_monitor.cpp +++ b/src/mongo/client/streamable_replica_set_monitor.cpp @@ -172,11 +172,16 @@ void StreamableReplicaSetMonitor::init() { _topologyManager = std::make_unique<TopologyManager>( _sdamConfig, getGlobalServiceContext()->getPreciseClockSource(), _eventsPublisher); + _eventsPublisher->registerListener(shared_from_this()); + + _pingMonitor = std::make_unique<ServerPingMonitor>( + _eventsPublisher.get(), sdam::SdamConfiguration::kDefaultHeartbeatFrequencyMs, _executor); + _eventsPublisher->registerListener(_pingMonitor); + _isMasterMonitor = std::make_unique<ServerIsMasterMonitor>( _uri, _sdamConfig, _eventsPublisher, _topologyManager->getTopologyDescription(), _executor); - - _eventsPublisher->registerListener(shared_from_this()); _eventsPublisher->registerListener(_isMasterMonitor); + _isDropped.store(false); ReplicaSetMonitorManager::get()->getNotifier().onFoundSet(getName()); @@ -190,6 +195,7 @@ void StreamableReplicaSetMonitor::drop() { LOGV2(4333209, "Closing Replica Set Monitor {setName}", "setName"_attr = getName()); _eventsPublisher->close(); _queryProcessor->shutdown(); + _pingMonitor->shutdown(); _isMasterMonitor->shutdown(); _failOutstandingWithStatus( lock, Status{ErrorCodes::ShutdownInProgress, "the ReplicaSetMonitor is shutting down"}); @@ -562,7 +568,8 @@ void StreamableReplicaSetMonitor::onTopologyDescriptionChangedEvent( void StreamableReplicaSetMonitor::onServerHeartbeatSucceededEvent(sdam::IsMasterRTT durationMs, const ServerAddress& hostAndPort, const BSONObj reply) { - IsMasterOutcome outcome(hostAndPort, reply, durationMs); + // After the inital handshake, isMasterResponses should not update the RTT with durationMs. + IsMasterOutcome outcome(hostAndPort, reply, boost::none); _topologyManager->onServerDescription(outcome); } @@ -576,11 +583,23 @@ void StreamableReplicaSetMonitor::onServerHeartbeatFailureEvent(IsMasterRTT dura void StreamableReplicaSetMonitor::onServerPingFailedEvent(const ServerAddress& hostAndPort, const Status& status) { + LOGV2_DEBUG(4668133, + 0, + " StreamableReplicaSetMonitor::onServerPingFailedEvent, ServerPingMonitor got " + "status{status} ", + "addr"_attr = hostAndPort, + "status"_attr = status); failedHost(HostAndPort(hostAndPort), status); } void StreamableReplicaSetMonitor::onServerPingSucceededEvent(sdam::IsMasterRTT durationMS, const ServerAddress& hostAndPort) { + LOGV2_DEBUG(4668132, + 1, + " StreamableReplicaSetMonitor::onServerPingSucceededEvent, ServerPingMonitor for " + "{addr} with {rtt}", + "addr"_attr = hostAndPort, + "rtt"_attr = durationMS); _topologyManager->onServerRTTUpdated(hostAndPort, durationMS); } diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h index ebf77642738..5ec56495684 100644 --- a/src/mongo/client/streamable_replica_set_monitor.h +++ b/src/mongo/client/streamable_replica_set_monitor.h @@ -40,6 +40,7 @@ #include "mongo/client/replica_set_monitor.h" #include "mongo/client/sdam/sdam.h" #include "mongo/client/server_is_master_monitor.h" +#include "mongo/client/server_ping_monitor.h" #include "mongo/executor/task_executor.h" #include "mongo/logger/log_component.h" #include "mongo/util/concurrency/with_lock.h" @@ -201,6 +202,7 @@ private: sdam::ServerSelectorPtr _serverSelector; sdam::TopologyEventsPublisherPtr _eventsPublisher; ServerIsMasterMonitorPtr _isMasterMonitor; + std::shared_ptr<ServerPingMonitor> _pingMonitor; // This object will be registered as a TopologyListener if there are // any outstanding queries for this RSM instance. |