diff options
author | Haley Connelly <haley.connelly@mongodb.com> | 2020-03-27 13:47:34 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-03-27 18:02:58 +0000 |
commit | 4aa3ad525c1012fe9e3dbcb6e0d79745a5446cf5 (patch) | |
tree | c1b973f50c9aa9b734a1d643b692515413a7cd98 | |
parent | fb09428156a33be70d30452fb3bb91f73be94ee7 (diff) | |
download | mongo-4aa3ad525c1012fe9e3dbcb6e0d79745a5446cf5.tar.gz |
Revert "SERVER-46681 Integrate the ServerPingMonitor into the StreamableReplicaSetMonitor"
This reverts commit 6694a0434f37db0d6671d05e37a54e78eb1f156b.
-rw-r--r-- | src/mongo/client/sdam/sdam_datatypes.h | 21 | ||||
-rw-r--r-- | src/mongo/client/sdam/sdam_json_test_runner.cpp | 3 | ||||
-rw-r--r-- | src/mongo/client/sdam/server_description.cpp | 31 | ||||
-rw-r--r-- | src/mongo/client/sdam/server_description.h | 3 | ||||
-rw-r--r-- | src/mongo/client/sdam/server_description_test.cpp | 57 | ||||
-rw-r--r-- | src/mongo/client/sdam/server_selector.h | 6 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_listener.cpp | 35 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_listener_mock.cpp | 30 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_listener_mock.h | 9 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_manager.cpp | 21 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_manager_test.cpp | 26 | ||||
-rw-r--r-- | src/mongo/client/server_ping_monitor.cpp | 89 | ||||
-rw-r--r-- | src/mongo/client/server_ping_monitor.h | 34 | ||||
-rw-r--r-- | src/mongo/client/server_ping_monitor_test.cpp | 56 | ||||
-rw-r--r-- | src/mongo/client/streamable_replica_set_monitor.cpp | 25 | ||||
-rw-r--r-- | src/mongo/client/streamable_replica_set_monitor.h | 2 |
16 files changed, 160 insertions, 288 deletions
diff --git a/src/mongo/client/sdam/sdam_datatypes.h b/src/mongo/client/sdam/sdam_datatypes.h index 0da61e14d09..520d517da87 100644 --- a/src/mongo/client/sdam/sdam_datatypes.h +++ b/src/mongo/client/sdam/sdam_datatypes.h @@ -79,10 +79,8 @@ class IsMasterOutcome { IsMasterOutcome() = delete; public: - // Success constructor. - IsMasterOutcome(ServerAddress server, - BSONObj response, - boost::optional<IsMasterRTT> rtt = boost::none) + // success constructor + IsMasterOutcome(ServerAddress server, BSONObj response, IsMasterRTT rtt) : _server(std::move(server)), _success(true), _response(response), _rtt(rtt) { const auto topologyVersionField = response.getField("topologyVersion"); if (topologyVersionField) { @@ -91,7 +89,7 @@ public: } } - // Failure constructor. + // failure constructor IsMasterOutcome(ServerAddress server, BSONObj response, std::string errorMsg) : _server(std::move(server)), _success(false), _errorMsg(errorMsg) { const auto topologyVersionField = response.getField("topologyVersion"); @@ -110,17 +108,16 @@ public: private: ServerAddress _server; - // Indicates the success or failure of the attempt. + // indicating the success or failure of the attempt bool _success; - // An error message in case of failure. + // an error message in case of failure std::string _errorMsg; - // A document containing the command response (or boost::none if it failed). + // a document containing the command response (or boost::none if it failed) boost::optional<BSONObj> _response; - // The round trip time to execute the command (or boost::none if it failed or is not the outcome - // from an initial handshake exchange). + // the round trip time to execute the command (or null if it failed) boost::optional<IsMasterRTT> _rtt; - // Indicates how fresh the topology information in this reponse is (or boost::none if it failed - // or the response did not include this). + // indicates how fresh the topology information in this reponse is (or boost::none if it failed + // or the response did not include this) boost::optional<TopologyVersion> _topologyVersion; }; diff --git a/src/mongo/client/sdam/sdam_json_test_runner.cpp b/src/mongo/client/sdam/sdam_json_test_runner.cpp index b2a6cd0b011..9c750ad977b 100644 --- a/src/mongo/client/sdam/sdam_json_test_runner.cpp +++ b/src/mongo/client/sdam/sdam_json_test_runner.cpp @@ -192,8 +192,7 @@ public: if (bsonIsMaster.nFields() == 0) { _isMasterResponses.push_back(IsMasterOutcome(address, BSONObj(), "network error")); } else { - _isMasterResponses.push_back( - IsMasterOutcome(address, bsonIsMaster, duration_cast<IsMasterRTT>(kLatency))); + _isMasterResponses.push_back(IsMasterOutcome(address, bsonIsMaster, kLatency)); } } _topologyOutcome = phase["outcome"].Obj(); diff --git a/src/mongo/client/sdam/server_description.cpp b/src/mongo/client/sdam/server_description.cpp index 79b9cafdecf..62092a0de6c 100644 --- a/src/mongo/client/sdam/server_description.cpp +++ b/src/mongo/client/sdam/server_description.cpp @@ -62,7 +62,7 @@ ServerDescription::ServerDescription(ClockSource* clockSource, // type must be parsed before RTT is calculated. parseTypeFromIsMaster(response); - calculateRtt(isMasterOutcome.getRtt(), lastRtt); + calculateRtt(*isMasterOutcome.getRtt(), lastRtt); _lastUpdateTime = clockSource->now(); _minWireVersion = response["minWireVersion"].numberInt(); @@ -160,7 +160,7 @@ void ServerDescription::saveStreamable(BSONElement streamableField) { _streamable = streamableField && streamableField.Bool(); } -void ServerDescription::calculateRtt(const boost::optional<IsMasterRTT> currentRtt, +void ServerDescription::calculateRtt(const IsMasterRTT currentRtt, const boost::optional<IsMasterRTT> lastRtt) { if (getType() == ServerType::kUnknown) { // if a server's type is Unknown, it's RTT is null @@ -169,28 +169,12 @@ void ServerDescription::calculateRtt(const boost::optional<IsMasterRTT> currentR return; } - if (currentRtt == boost::none) { - // An onServerHeartbeatSucceededEvent occured. Note: This should not be reached by an - // onServerHeartbeatFailedEvent. Upon the failed event, the type is set to - // ServerType::Unknown. - - // The ServerType is no longer ServerType::Unknown, but the ServerPingMonitor has not - // updated the RTT yet. Set the _rtt to max() until the ServerPingMonitor provides the - // accurate RTT measurement. - if (lastRtt == boost::none) { - _rtt = IsMasterRTT::max(); - return; - } - - // Do not update the RTT upon an onServerHeartbeatSucceededEvent. - _rtt = lastRtt; - } else if (lastRtt == boost::none || lastRtt == IsMasterRTT::max()) { - // The lastRtt either does not exist or is not accurate. Discard it and use the currentRtt. - _rtt = currentRtt; - } else { + if (lastRtt) { // new_rtt = alpha * x + (1 - alpha) * old_rtt - _rtt = IsMasterRTT(static_cast<IsMasterRTT::rep>(kRttAlpha * currentRtt.get().count() + + _rtt = IsMasterRTT(static_cast<IsMasterRTT::rep>(kRttAlpha * currentRtt.count() + (1 - kRttAlpha) * lastRtt.get().count())); + } else { + _rtt = currentRtt; } } @@ -445,8 +429,7 @@ std::string ServerDescription::toString() const { ServerDescriptionPtr ServerDescription::cloneWithRTT(IsMasterRTT rtt) { auto newServerDescription = std::make_shared<ServerDescription>(*this); - auto lastRtt = newServerDescription->getRtt(); - newServerDescription->calculateRtt(rtt, lastRtt); + newServerDescription->_rtt = rtt; return newServerDescription; } diff --git a/src/mongo/client/sdam/server_description.h b/src/mongo/client/sdam/server_description.h index 159d7948738..eedc5d37599 100644 --- a/src/mongo/client/sdam/server_description.h +++ b/src/mongo/client/sdam/server_description.h @@ -117,8 +117,7 @@ private: void parseTypeFromIsMaster(const BSONObj isMaster); - void calculateRtt(const boost::optional<IsMasterRTT> currentRtt, - const boost::optional<IsMasterRTT> lastRtt); + void calculateRtt(const IsMasterRTT currentRtt, const boost::optional<IsMasterRTT> lastRtt); void saveLastWriteInfo(BSONObj lastWriteBson); void storeHostListIfPresent(const std::string key, diff --git a/src/mongo/client/sdam/server_description_test.cpp b/src/mongo/client/sdam/server_description_test.cpp index 8ed40911927..1c9e1156707 100644 --- a/src/mongo/client/sdam/server_description_test.cpp +++ b/src/mongo/client/sdam/server_description_test.cpp @@ -364,41 +364,35 @@ TEST_F(ServerDescriptionTestFixture, ShouldStoreRTTNullWhenServerTypeIsUnknown) } TEST_F(ServerDescriptionTestFixture, - ShouldStoreConstantRTTWhenChangingFromOneKnownServerTypeToAnother) { - // Simulate a non-ping monitoring response. - auto response = IsMasterOutcome("foo:1234", kBsonRsPrimary); + ShouldStoreMovingAverageRTTWhenChangingFromOneKnownServerTypeToAnother) { + auto response = IsMasterOutcome("foo:1234", kBsonRsPrimary, mongo::Milliseconds(40)); auto lastServerDescription = ServerDescriptionBuilder() .withType(ServerType::kRSSecondary) .withRtt(mongo::Milliseconds(20)) .instance(); - - // Check the RTT is unchanged since the IsMasterOutcome does not contain an RTT. auto description = ServerDescription(clockSource, response, lastServerDescription->getRtt()); - ASSERT_EQUALS(20, durationCount<mongo::Milliseconds>(*description.getRtt())); + ASSERT_EQUALS(24, durationCount<mongo::Milliseconds>(*description.getRtt())); - auto response2 = IsMasterOutcome("foo:1234", kBsonRsPrimary); + auto response2 = IsMasterOutcome("foo:1234", kBsonRsPrimary, mongo::Milliseconds(30)); auto description2 = ServerDescription(clockSource, response2, description.getRtt()); - ASSERT_EQUALS(20, durationCount<mongo::Milliseconds>(*description2.getRtt())); + ASSERT_EQUALS(25, durationCount<mongo::Milliseconds>(*description2.getRtt())); } TEST_F(ServerDescriptionTestFixture, ShouldStoreLastWriteDate) { - auto response = IsMasterOutcome( - "foo:1234", kBsonLastWrite, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); + auto response = IsMasterOutcome("foo:1234", kBsonLastWrite, mongo::Milliseconds(40)); auto description = ServerDescription(clockSource, response); ASSERT_EQUALS(kLastWriteDate, description.getLastWriteDate()); } TEST_F(ServerDescriptionTestFixture, ShouldStoreOpTime) { - auto response = IsMasterOutcome( - "foo:1234", kBsonLastWrite, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); + auto response = IsMasterOutcome("foo:1234", kBsonLastWrite, mongo::Milliseconds(40)); auto description = ServerDescription(clockSource, response); ASSERT_EQUALS(kOpTime, description.getOpTime()); } TEST_F(ServerDescriptionTestFixture, ShouldStoreLastUpdateTime) { auto testStart = clockSource->now(); - auto response = IsMasterOutcome( - "foo:1234", kBsonRsPrimary, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); + auto response = IsMasterOutcome("foo:1234", kBsonRsPrimary, mongo::Milliseconds(40)); auto description = ServerDescription(clockSource, response); ASSERT_GREATER_THAN_OR_EQUALS(description.getLastUpdateTime(), testStart); } @@ -406,8 +400,7 @@ TEST_F(ServerDescriptionTestFixture, ShouldStoreLastUpdateTime) { // Disabling these tests since this causes jstest failures when // running on a host with a mixed case hostname. // TEST_F(ServerDescriptionTestFixture, ShouldStoreHostNamesAsLowercase) { -// auto response = IsMasterOutcome("FOO:1234", kBsonHostNames, -// duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); +// auto response = IsMasterOutcome("FOO:1234", kBsonHostNames, mongo::Milliseconds(40)); // auto description = ServerDescription(clockSource, response); // // ASSERT_EQUALS("foo:1234", description.getAddress()); @@ -426,23 +419,20 @@ TEST_F(ServerDescriptionTestFixture, ShouldStoreLastUpdateTime) { //} TEST_F(ServerDescriptionTestFixture, ShouldStoreMinMaxWireVersion) { - auto response = IsMasterOutcome( - "foo:1234", kBsonWireVersion, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); + auto response = IsMasterOutcome("foo:1234", kBsonWireVersion, mongo::Milliseconds(40)); auto description = ServerDescription(clockSource, response); ASSERT_EQUALS(kBsonWireVersion["minWireVersion"].Int(), description.getMinWireVersion()); ASSERT_EQUALS(kBsonWireVersion["maxWireVersion"].Int(), description.getMaxWireVersion()); } TEST_F(ServerDescriptionTestFixture, ShouldStoreTags) { - auto response = - IsMasterOutcome("foo:1234", kBsonTags, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); + auto response = IsMasterOutcome("foo:1234", kBsonTags, mongo::Milliseconds(40)); auto description = ServerDescription(clockSource, response); ASSERT_EQUALS(toStringMap(kBsonTags["tags"].Obj()), description.getTags()); } TEST_F(ServerDescriptionTestFixture, ShouldStoreSetVersionAndName) { - auto response = IsMasterOutcome( - "foo:1234", kBsonSetVersionName, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); + auto response = IsMasterOutcome("foo:1234", kBsonSetVersionName, mongo::Milliseconds(40)); auto description = ServerDescription(clockSource, response); ASSERT_EQUALS(kBsonSetVersionName.getIntField("setVersion"), description.getSetVersion()); ASSERT_EQUALS(std::string(kBsonSetVersionName.getStringField("setName")), @@ -450,31 +440,27 @@ TEST_F(ServerDescriptionTestFixture, ShouldStoreSetVersionAndName) { } TEST_F(ServerDescriptionTestFixture, ShouldStoreElectionId) { - auto response = IsMasterOutcome( - "foo:1234", kBsonElectionId, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); + auto response = IsMasterOutcome("foo:1234", kBsonElectionId, mongo::Milliseconds(40)); auto description = ServerDescription(clockSource, response); ASSERT_EQUALS(kBsonElectionId.getField("electionId").OID(), description.getElectionId()); } TEST_F(ServerDescriptionTestFixture, ShouldStorePrimary) { - auto response = IsMasterOutcome( - "foo:1234", kBsonPrimary, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); + auto response = IsMasterOutcome("foo:1234", kBsonPrimary, mongo::Milliseconds(40)); auto description = ServerDescription(clockSource, response); ASSERT_EQUALS(std::string(kBsonPrimary.getStringField("primary")), description.getPrimary()); } TEST_F(ServerDescriptionTestFixture, ShouldStoreLogicalSessionTimeout) { - auto response = IsMasterOutcome("foo:1234", - kBsonLogicalSessionTimeout, - duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); + auto response = + IsMasterOutcome("foo:1234", kBsonLogicalSessionTimeout, mongo::Milliseconds(40)); auto description = ServerDescription(clockSource, response); ASSERT_EQUALS(kBsonLogicalSessionTimeout.getIntField("logicalSessionTimeoutMinutes"), description.getLogicalSessionTimeoutMinutes()); } TEST_F(ServerDescriptionTestFixture, ShouldStoreTopologyVersion) { - auto response = IsMasterOutcome( - "foo:1234", kTopologyVersion, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); + auto response = IsMasterOutcome("foo:1234", kTopologyVersion, mongo::Milliseconds(40)); auto topologyVersion = TopologyVersion::parse(IDLParserErrorContext("TopologyVersion"), kTopologyVersion.getObjectField("topologyVersion")); @@ -489,15 +475,13 @@ TEST_F(ServerDescriptionTestFixture, ShouldStoreTopologyVersion) { } TEST_F(ServerDescriptionTestFixture, ShouldStoreStreamable) { - auto response = IsMasterOutcome( - "foo:1234", kStreamable, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); + auto response = IsMasterOutcome("foo:1234", kStreamable, mongo::Milliseconds(40)); auto description = ServerDescription(clockSource, response); ASSERT_EQUALS(true, description.isStreamable()); } TEST_F(ServerDescriptionTestFixture, ShouldStorePoolResetCounter) { - auto response = IsMasterOutcome( - "foo:1234", kStreamable, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); + auto response = IsMasterOutcome("foo:1234", kStreamable, mongo::Milliseconds(40)); auto description = ServerDescription( clockSource, response, boost::none /*lastRtt*/, boost::none /*topologyVersion*/, 1); ASSERT_EQUALS(1, description.getPoolResetCounter()); @@ -511,8 +495,7 @@ TEST_F(ServerDescriptionTestFixture, ShouldStoreServerAddressOnError) { } TEST_F(ServerDescriptionTestFixture, ShouldStoreCorrectDefaultValuesOnSuccess) { - auto response = - IsMasterOutcome("foo:1234", kBsonOk, duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); + auto response = IsMasterOutcome("foo:1234", kBsonOk, mongo::Milliseconds(40)); auto description = ServerDescription(clockSource, response); ASSERT_EQUALS(boost::none, description.getError()); ASSERT_EQUALS(boost::none, description.getLastWriteDate()); diff --git a/src/mongo/client/sdam/server_selector.h b/src/mongo/client/sdam/server_selector.h index b05ad6be90f..01152fb13b2 100644 --- a/src/mongo/client/sdam/server_selector.h +++ b/src/mongo/client/sdam/server_selector.h @@ -177,12 +177,10 @@ private: // This is used to filter out servers based on their current latency measurements. struct LatencyWindow { const IsMasterRTT lower; - IsMasterRTT upper; + const IsMasterRTT upper; explicit LatencyWindow(const IsMasterRTT lowerBound, const IsMasterRTT windowWidth) - : lower(lowerBound) { - upper = (lowerBound == IsMasterRTT::max()) ? lowerBound : lowerBound + windowWidth; - } + : lower(lowerBound), upper(lowerBound + windowWidth) {} bool isWithinWindow(IsMasterRTT latency); diff --git a/src/mongo/client/sdam/topology_listener.cpp b/src/mongo/client/sdam/topology_listener.cpp index 34d8ef63563..147fa7dcb26 100644 --- a/src/mongo/client/sdam/topology_listener.cpp +++ b/src/mongo/client/sdam/topology_listener.cpp @@ -26,8 +26,12 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + #include "mongo/client/sdam/topology_listener.h" +#include "mongo/util/log.h" + namespace mongo::sdam { void TopologyEventsPublisher::registerListener(TopologyListenerPtr listener) { @@ -116,30 +120,10 @@ void TopologyEventsPublisher::_scheduleNextDelivery() { } void TopologyEventsPublisher::onServerPingFailedEvent(const ServerAddress& hostAndPort, - const Status& status) { - { - stdx::lock_guard lock(_eventQueueMutex); - EventPtr event = std::make_unique<Event>(); - event->type = EventType::PING_FAILURE; - event->hostAndPort = hostAndPort; - event->status = status; - _eventQueue.push_back(std::move(event)); - } - _scheduleNextDelivery(); -} + const Status& status) {} void TopologyEventsPublisher::onServerPingSucceededEvent(IsMasterRTT durationMS, - const ServerAddress& hostAndPort) { - { - stdx::lock_guard lock(_eventQueueMutex); - EventPtr event = std::make_unique<Event>(); - event->type = EventType::PING_SUCCESS; - event->duration = duration_cast<IsMasterRTT>(durationMS); - event->hostAndPort = hostAndPort; - _eventQueue.push_back(std::move(event)); - } - _scheduleNextDelivery(); -} + const ServerAddress& hostAndPort) {} // note that this could be done in batches if it is a bottleneck. void TopologyEventsPublisher::_nextDelivery() { @@ -194,13 +178,6 @@ void TopologyEventsPublisher::_sendEvent(TopologyListenerPtr listener, const Eve event.hostAndPort, event.reply); break; - case EventType::PING_SUCCESS: - listener->onServerPingSucceededEvent(duration_cast<IsMasterRTT>(event.duration), - event.hostAndPort); - break; - case EventType::PING_FAILURE: - listener->onServerPingFailedEvent(event.hostAndPort, event.status); - break; default: MONGO_UNREACHABLE; } diff --git a/src/mongo/client/sdam/topology_listener_mock.cpp b/src/mongo/client/sdam/topology_listener_mock.cpp index 7dddbaefea6..5b8bcd4a6b0 100644 --- a/src/mongo/client/sdam/topology_listener_mock.cpp +++ b/src/mongo/client/sdam/topology_listener_mock.cpp @@ -34,29 +34,20 @@ namespace mongo::sdam { -void TopologyListenerMock::onServerPingSucceededEvent(IsMasterRTT latency, - const ServerAddress& hostAndPort) { +void TopologyListenerMock::onServerPingFailedEvent(const ServerAddress& hostAndPort, + const Status& status) { stdx::lock_guard lk(_mutex); - auto it = _serverPingRTTs.find(hostAndPort); - if (it != _serverPingRTTs.end()) { - it->second.emplace_back(latency); - } else { - _serverPingRTTs.emplace(hostAndPort, std::vector<StatusWith<IsMasterRTT>>{latency}); - } + invariant(!_hasPingResponse_inlock(hostAndPort)); + _serverPingRTTs.emplace(hostAndPort, status); } -void TopologyListenerMock::onServerPingFailedEvent(const ServerAddress& hostAndPort, - const Status& errorStatus) { +void TopologyListenerMock::onServerPingSucceededEvent(IsMasterRTT durationMS, + const ServerAddress& hostAndPort) { stdx::lock_guard lk(_mutex); - // If the map already contains an element for hostAndPort, append to its already existing - // vector. Otherwise, create a new vector. - auto it = _serverPingRTTs.find(hostAndPort); - if (it != _serverPingRTTs.end()) { - it->second.emplace_back(errorStatus); - } else { - _serverPingRTTs.emplace(hostAndPort, std::vector<StatusWith<IsMasterRTT>>{errorStatus}); - } + invariant(!_hasPingResponse_inlock(hostAndPort)); + _serverPingRTTs.emplace(hostAndPort, durationMS); } + bool TopologyListenerMock::hasPingResponse(const ServerAddress& hostAndPort) { stdx::lock_guard lk(_mutex); return _hasPingResponse_inlock(hostAndPort); @@ -66,8 +57,7 @@ bool TopologyListenerMock::_hasPingResponse_inlock(const ServerAddress& hostAndP return _serverPingRTTs.find(hostAndPort) != _serverPingRTTs.end(); } -std::vector<StatusWith<IsMasterRTT>> TopologyListenerMock::getPingResponse( - const ServerAddress& hostAndPort) { +StatusWith<IsMasterRTT> TopologyListenerMock::getPingResponse(const ServerAddress& hostAndPort) { stdx::lock_guard lk(_mutex); invariant(_hasPingResponse_inlock(hostAndPort)); auto it = _serverPingRTTs.find(hostAndPort); diff --git a/src/mongo/client/sdam/topology_listener_mock.h b/src/mongo/client/sdam/topology_listener_mock.h index 05207512ea1..0184d1ce81e 100644 --- a/src/mongo/client/sdam/topology_listener_mock.h +++ b/src/mongo/client/sdam/topology_listener_mock.h @@ -39,10 +39,9 @@ class TopologyListenerMock : public TopologyListener { public: TopologyListenerMock() = default; virtual ~TopologyListenerMock() = default; - - void onServerPingSucceededEvent(IsMasterRTT latency, const ServerAddress& hostAndPort) override; - void onServerPingFailedEvent(const ServerAddress& hostAndPort, const Status& status) override; + void onServerPingSucceededEvent(IsMasterRTT durationMS, + const ServerAddress& hostAndPort) override; /** * Acquires _mutex before calling _hasPingResponse_inlock(). @@ -59,11 +58,11 @@ public: * Returns the response for the most recent onServerPing event. MUST be called after a ping has * been sent and proccessed in order to remove it from the map and make room for the next. */ - std::vector<StatusWith<IsMasterRTT>> getPingResponse(const ServerAddress& hostAndPort); + StatusWith<IsMasterRTT> getPingResponse(const ServerAddress& hostAndPort); private: Mutex _mutex; - stdx::unordered_map<ServerAddress, std::vector<StatusWith<IsMasterRTT>>> _serverPingRTTs; + stdx::unordered_map<ServerAddress, StatusWith<IsMasterRTT>> _serverPingRTTs; }; } // namespace mongo::sdam diff --git a/src/mongo/client/sdam/topology_manager.cpp b/src/mongo/client/sdam/topology_manager.cpp index 75242baa796..a4847391cf6 100644 --- a/src/mongo/client/sdam/topology_manager.cpp +++ b/src/mongo/client/sdam/topology_manager.cpp @@ -129,22 +129,21 @@ const std::shared_ptr<TopologyDescription> TopologyManager::getTopologyDescripti } void TopologyManager::onServerRTTUpdated(ServerAddress hostAndPort, IsMasterRTT rtt) { - { - stdx::lock_guard<mongo::Mutex> lock(_mutex); + stdx::lock_guard<mongo::Mutex> lock(_mutex); - auto oldServerDescription = _topologyDescription->findServerByAddress(hostAndPort); - if (oldServerDescription) { - auto newServerDescription = (*oldServerDescription)->cloneWithRTT(rtt); + auto oldServerDescription = _topologyDescription->findServerByAddress(hostAndPort); + if (oldServerDescription) { + auto newServerDescription = (*oldServerDescription)->cloneWithRTT(rtt); - auto oldTopologyDescription = _topologyDescription; - _topologyDescription = std::make_shared<TopologyDescription>(*_topologyDescription); - _topologyDescription->installServerDescription(newServerDescription); + auto oldTopologyDescription = _topologyDescription; + _topologyDescription = std::make_shared<TopologyDescription>(*_topologyDescription); + _topologyDescription->installServerDescription(newServerDescription); - _publishTopologyDescriptionChanged(oldTopologyDescription, _topologyDescription); + _publishTopologyDescriptionChanged(oldTopologyDescription, _topologyDescription); - return; - } + return; } + // otherwise, the server was removed from the topology. Nothing to do. LOGV2(4333201, "Not updating RTT. Server {server} does not exist in {setName}", diff --git a/src/mongo/client/sdam/topology_manager_test.cpp b/src/mongo/client/sdam/topology_manager_test.cpp index fb78bfbbf3f..bc9e9256f1a 100644 --- a/src/mongo/client/sdam/topology_manager_test.cpp +++ b/src/mongo/client/sdam/topology_manager_test.cpp @@ -74,9 +74,8 @@ TEST_F(TopologyManagerTestFixture, ShouldUpdateTopologyVersionOnSuccess) { ASSERT(serverDescription->getTopologyVersion() == boost::none); // If previous topologyVersion is boost::none, should update to new topologyVersion - auto isMasterOutcome = IsMasterOutcome(serverDescription->getAddress(), - kBsonTopologyVersionLow, - duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); + auto isMasterOutcome = IsMasterOutcome( + serverDescription->getAddress(), kBsonTopologyVersionLow, mongo::Milliseconds(40)); topologyManager.onServerDescription(isMasterOutcome); topologyDescription = topologyManager.getTopologyDescription(); auto newServerDescription = topologyDescription->getServers()[0]; @@ -84,9 +83,8 @@ TEST_F(TopologyManagerTestFixture, ShouldUpdateTopologyVersionOnSuccess) { kBsonTopologyVersionLow.getObjectField("topologyVersion")); // If previous topologyVersion is <= new topologyVersion, should update to new topologyVersion - isMasterOutcome = IsMasterOutcome(serverDescription->getAddress(), - kBsonTopologyVersionHigh, - duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); + isMasterOutcome = IsMasterOutcome( + serverDescription->getAddress(), kBsonTopologyVersionHigh, mongo::Milliseconds(40)); topologyManager.onServerDescription(isMasterOutcome); topologyDescription = topologyManager.getTopologyDescription(); newServerDescription = topologyDescription->getServers()[0]; @@ -104,9 +102,8 @@ TEST_F(TopologyManagerTestFixture, ShouldUpdateTopologyVersionOnErrorIfSent) { ASSERT(serverDescription->getTopologyVersion() == boost::none); // If previous topologyVersion is boost::none, should update to new topologyVersion - auto isMasterOutcome = IsMasterOutcome(serverDescription->getAddress(), - kBsonTopologyVersionLow, - duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); + auto isMasterOutcome = IsMasterOutcome( + serverDescription->getAddress(), kBsonTopologyVersionLow, mongo::Milliseconds(40)); topologyManager.onServerDescription(isMasterOutcome); topologyDescription = topologyManager.getTopologyDescription(); auto newServerDescription = topologyDescription->getServers()[0]; @@ -133,9 +130,8 @@ TEST_F(TopologyManagerTestFixture, ShouldNotUpdateServerDescriptionIfNewTopology ASSERT(serverDescription->getTopologyVersion() == boost::none); // If previous topologyVersion is boost::none, should update to new topologyVersion - auto isMasterOutcome = IsMasterOutcome(serverDescription->getAddress(), - kBsonTopologyVersionHigh, - duration_cast<IsMasterRTT>(mongo::Milliseconds(40))); + auto isMasterOutcome = IsMasterOutcome( + serverDescription->getAddress(), kBsonTopologyVersionHigh, mongo::Milliseconds(40)); topologyManager.onServerDescription(isMasterOutcome); topologyDescription = topologyManager.getTopologyDescription(); auto newServerDescription = topologyDescription->getServers()[0]; @@ -143,7 +139,8 @@ TEST_F(TopologyManagerTestFixture, ShouldNotUpdateServerDescriptionIfNewTopology kBsonTopologyVersionHigh.getObjectField("topologyVersion")); // If isMasterOutcome is not successful, should preserve old topologyVersion - isMasterOutcome = IsMasterOutcome(serverDescription->getAddress(), kBsonTopologyVersionLow); + isMasterOutcome = IsMasterOutcome( + serverDescription->getAddress(), kBsonTopologyVersionLow, mongo::Milliseconds(40)); topologyManager.onServerDescription(isMasterOutcome); topologyDescription = topologyManager.getTopologyDescription(); newServerDescription = topologyDescription->getServers()[0]; @@ -161,7 +158,8 @@ TEST_F(TopologyManagerTestFixture, ShouldNowIncrementPoolResetCounterOnSuccess) ASSERT_EQUALS(serverDescription->getPoolResetCounter(), 0); // If isMasterOutcome is successful, poolResetCounter should remain the same - IsMasterOutcome isMasterOutcome(serverDescription->getAddress(), kBsonOk); + IsMasterOutcome isMasterOutcome( + serverDescription->getAddress(), kBsonOk, mongo::Milliseconds(40)); topologyManager.onServerDescription(isMasterOutcome); topologyDescription = topologyManager.getTopologyDescription(); auto newServerDescription = topologyDescription->getServers()[0]; diff --git a/src/mongo/client/server_ping_monitor.cpp b/src/mongo/client/server_ping_monitor.cpp index c748d5d0498..3cb7f793bf2 100644 --- a/src/mongo/client/server_ping_monitor.cpp +++ b/src/mongo/client/server_ping_monitor.cpp @@ -33,7 +33,6 @@ #include "mongo/client/server_ping_monitor.h" -#include "mongo/client/sdam/sdam.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/network_interface_thread_pool.h" #include "mongo/executor/thread_pool_task_executor.h" @@ -51,7 +50,7 @@ using CallbackHandle = TaskExecutor::CallbackHandle; SingleServerPingMonitor::SingleServerPingMonitor(sdam::ServerAddress hostAndPort, sdam::TopologyListener* rttListener, - Milliseconds pingFrequency, + Seconds pingFrequency, std::shared_ptr<TaskExecutor> executor) : _hostAndPort(hostAndPort), _rttListener(rttListener), @@ -100,19 +99,18 @@ void SingleServerPingMonitor::_scheduleServerPing() { if (ErrorCodes::isShutdownError(schedulePingHandle.getStatus().code())) { LOGV2_DEBUG(23727, - kLogLevel, + 1, "Can't schedule ping for {hostAndPort}. Executor shutdown in progress", "hostAndPort"_attr = _hostAndPort); return; } if (!schedulePingHandle.isOK()) { - LOGV2_FATAL(23732, + LOGV2_FATAL(31434, "Can't continue scheduling pings to {hostAndPort} due to " "{schedulePingHandle_getStatus}", "hostAndPort"_attr = _hostAndPort, "schedulePingHandle_getStatus"_attr = redact(schedulePingHandle.getStatus())); - fassertFailed(31434); } _pingHandle = std::move(schedulePingHandle.getValue()); @@ -156,18 +154,17 @@ void SingleServerPingMonitor::_doServerPing() { if (ErrorCodes::isShutdownError(remotePingHandle.getStatus().code())) { LOGV2_DEBUG(23728, - kLogLevel, + 1, "Can't ping {hostAndPort}. Executor shutdown in progress", "hostAndPort"_attr = _hostAndPort); return; } if (!remotePingHandle.isOK()) { - LOGV2_FATAL(23733, + LOGV2_FATAL(31435, "Can't continue pinging {hostAndPort} due to {remotePingHandle_getStatus}", "hostAndPort"_attr = _hostAndPort, "remotePingHandle_getStatus"_attr = redact(remotePingHandle.getStatus())); - fassertFailed(31435); } // Update the _pingHandle so the ping can be canceled if the SingleServerPingMonitor gets @@ -176,9 +173,11 @@ void SingleServerPingMonitor::_doServerPing() { } ServerPingMonitor::ServerPingMonitor(sdam::TopologyListener* rttListener, - Milliseconds pingFrequency, - std::shared_ptr<TaskExecutor> executor) - : _rttListener(rttListener), _pingFrequency(pingFrequency), _executor(executor) {} + Seconds pingFrequency, + boost::optional<std::shared_ptr<TaskExecutor>> executor) + : _rttListener(rttListener), + _pingFrequency(pingFrequency), + _executor(executor.get_value_or({})) {} ServerPingMonitor::~ServerPingMonitor() { shutdown(); @@ -202,6 +201,25 @@ void ServerPingMonitor::shutdown() { for (auto& [hostAndPort, singleMonitor] : serverPingMonitorMap) { singleMonitor->drop(); } + + if (executor) { + executor->shutdown(); + executor->join(); + } +} + +void ServerPingMonitor::_setupTaskExecutor_inlock() { + if (_isShutdown || _executor) { + // Do not restart the _executor if it is in shutdown or already provided from a test. + return; + } else { + auto hookList = std::make_unique<rpc::EgressMetadataHookList>(); + auto net = executor::makeNetworkInterface( + "ServerPingMonitor-TaskExecutor", nullptr, std::move(hookList)); + auto pool = std::make_unique<NetworkInterfaceThreadPool>(net.get()); + _executor = std::make_shared<ThreadPoolTaskExecutor>(std::move(pool), std::move(net)); + _executor->startup(); + } } void ServerPingMonitor::onServerHandshakeCompleteEvent(sdam::IsMasterRTT durationMs, @@ -213,48 +231,33 @@ void ServerPingMonitor::onServerHandshakeCompleteEvent(sdam::IsMasterRTT duratio << "' due to shutdown", !_isShutdown); - if (_serverPingMonitorMap.find(address) != _serverPingMonitorMap.end()) { - LOGV2_DEBUG(466811, - kLogLevel + 1, - "ServerPingMonitor already monitoring {address}", - "address"_attr = address); - return; - } + _setupTaskExecutor_inlock(); + invariant(_serverPingMonitorMap.find(address) == _serverPingMonitorMap.end()); auto newSingleMonitor = std::make_shared<SingleServerPingMonitor>(address, _rttListener, _pingFrequency, _executor); _serverPingMonitorMap[address] = newSingleMonitor; newSingleMonitor->init(); - LOGV2_DEBUG(23729, - kLogLevel, - "ServerPingMonitor is now monitoring {address}", - "address"_attr = address); + LOGV2_DEBUG( + 23729, 1, "ServerPingMonitor is now monitoring {address}", "address"_attr = address); } -void ServerPingMonitor::onTopologyDescriptionChangedEvent( - UUID topologyId, - sdam::TopologyDescriptionPtr previousDescription, - sdam::TopologyDescriptionPtr newDescription) { +void ServerPingMonitor::onServerClosedEvent(const sdam::ServerAddress& address, OID topologyId) { stdx::lock_guard lk(_mutex); if (_isShutdown) { + LOGV2_DEBUG(23730, + 1, + "ServerPingMonitor is in shutdown and will stop monitoring {address} if it has " + "not already done so.", + "address"_attr = address); return; } - - // Remove monitors that are missing from the topology. - auto it = _serverPingMonitorMap.begin(); - while (it != _serverPingMonitorMap.end()) { - const auto& serverAddress = it->first; - if (newDescription->findServerByAddress(serverAddress) == boost::none) { - auto& singleMonitor = _serverPingMonitorMap[serverAddress]; - singleMonitor->drop(); - LOGV2_DEBUG(462899, - kLogLevel, - "ServerPingMonitor for host {addr} was removed from being monitored.", - "addr"_attr = serverAddress); - it = _serverPingMonitorMap.erase(it, ++it); - } else { - ++it; - } - } + auto it = _serverPingMonitorMap.find(address); + invariant(it != _serverPingMonitorMap.end()); + it->second->drop(); + _serverPingMonitorMap.erase(it); + LOGV2_DEBUG( + 23731, 1, "ServerPingMonitor stopped monitoring {address}", "address"_attr = address); } + } // namespace mongo diff --git a/src/mongo/client/server_ping_monitor.h b/src/mongo/client/server_ping_monitor.h index 80ebc8be95d..a7289a79d99 100644 --- a/src/mongo/client/server_ping_monitor.h +++ b/src/mongo/client/server_ping_monitor.h @@ -44,7 +44,7 @@ class SingleServerPingMonitor : public std::enable_shared_from_this<SingleServer public: explicit SingleServerPingMonitor(sdam::ServerAddress hostAndPort, sdam::TopologyListener* rttListener, - Milliseconds pingFrequency, + Seconds pingFrequency, std::shared_ptr<executor::TaskExecutor> executor); /** @@ -97,7 +97,7 @@ private: /** * The frequency at which ping requests should be sent to measure the round trip time. */ - Milliseconds _pingFrequency; + Seconds _pingFrequency; std::shared_ptr<executor::TaskExecutor> _executor; @@ -107,8 +107,6 @@ private: */ Date_t _nextPingStartDate{}; - static constexpr auto kLogLevel = 0; - /** * Must be held to access any of the member variables below. */ @@ -134,9 +132,14 @@ class ServerPingMonitor : public sdam::TopologyListener { ServerPingMonitor& operator=(const ServerPingMonitor&) = delete; public: - ServerPingMonitor(sdam::TopologyListener* rttListener, - Milliseconds pingFrequency, - std::shared_ptr<executor::TaskExecutor> executor); + /** + * Note: The ServerPingMonitor creates its own executor by default. It takes in an executor for + * testing only. + */ + ServerPingMonitor( + sdam::TopologyListener* rttListener, + Seconds pingFrequency, + boost::optional<std::shared_ptr<executor::TaskExecutor>> executor = boost::none); ~ServerPingMonitor(); /** @@ -145,7 +148,7 @@ public: void shutdown(); /** - * The first isMaster exchange for a connection to the server succeeded. Creates a new + * The first isMaster exchange for a server succeeded. Creates a new * SingleServerPingMonitor to monitor the new replica set member. */ void onServerHandshakeCompleteEvent(sdam::IsMasterRTT durationMs, @@ -153,15 +156,16 @@ public: const BSONObj reply = BSONObj()); /** - * Drop corresponding SingleServerPingMonitors if the server is not included in the - * newDescritpion. + * The connection to the server was closed. Removes the server from the ServerPingMonitorList. */ - void onTopologyDescriptionChangedEvent(UUID topologyId, - sdam::TopologyDescriptionPtr previousDescription, - sdam::TopologyDescriptionPtr newDescription); + void onServerClosedEvent(const sdam::ServerAddress& address, OID topologyId); private: /** + * Sets up and starts up the _executor if it did not already exist. + */ + void _setupTaskExecutor_inlock(); + /** * Listens for when new RTT (Round Trip Time) values are published. */ sdam::TopologyListener* _rttListener; @@ -169,15 +173,13 @@ private: /** * The interval at which ping requests should be sent to measure the RTT (Round Trip Time). */ - Milliseconds _pingFrequency; + Seconds _pingFrequency; /** * Executor for performing server monitoring pings for all of the replica set members. */ std::shared_ptr<executor::TaskExecutor> _executor; - static constexpr auto kLogLevel = 0; - mutable Mutex _mutex = MONGO_MAKE_LATCH("ServerPingMonitor::mutex"); /** diff --git a/src/mongo/client/server_ping_monitor_test.cpp b/src/mongo/client/server_ping_monitor_test.cpp index 0eba77e7941..05a7f0603d0 100644 --- a/src/mongo/client/server_ping_monitor_test.cpp +++ b/src/mongo/client/server_ping_monitor_test.cpp @@ -33,7 +33,7 @@ #include <memory> -#include "mongo/client/sdam/sdam.h" +#include "mongo/client/sdam/sdam_datatypes.h" #include "mongo/client/sdam/topology_listener_mock.h" #include "mongo/client/server_ping_monitor.h" #include "mongo/dbtests/mock/mock_replica_set.h" @@ -161,10 +161,8 @@ protected: ASSERT_TRUE(_topologyListener->hasPingResponse(hostAndPort)); ASSERT_LT(elapsed(), deadline); auto pingResponse = _topologyListener->getPingResponse(hostAndPort); + ASSERT(pingResponse.isOK()); - // There should only be one isMaster response queued up. - ASSERT_EQ(pingResponse.size(), 1); - ASSERT(pingResponse[0].isOK()); checkNoActivityBefore(deadline, hostAndPort); } @@ -180,40 +178,6 @@ protected: } } - /** - * Since the SingleServerPingMonitor is removed upon an onTopologyDescriptionChangedEvent, - * prompt the event with a new TopologyDescription that does not include hostToDrop. - */ - void closeMonitor(MockReplicaSet* replSet, - sdam::ServerAddress hostToDrop, - ServerPingMonitor* pingMonitor) { - auto hostAndPorts = replSet->getHosts(); - std::vector<sdam::ServerAddress> hosts; - std::transform(hostAndPorts.begin(), - hostAndPorts.end(), - std::back_inserter(hosts), - [](const auto& hostAndPort) { return hostAndPort.toString(); }); - - auto sdamConfigOld = sdam::SdamConfiguration(hosts); - auto topologyDescriptionOld = std::make_shared<sdam::TopologyDescription>(sdamConfigOld); - - - std::vector<sdam::ServerAddress> hostsNew(hosts.begin(), hosts.end()); - hostsNew.erase(std::remove_if(hostsNew.begin(), - hostsNew.end(), - [&](auto host) { return host == hostToDrop; }), - hostsNew.end()); - // Since the seedlist cannot be empty, the new TopologyDescription contains an empty - // HostAndPort. - if (hostsNew.size() == 0) { - hostsNew.emplace_back(HostAndPort().toString()); - } - auto sdamConfigNew = sdam::SdamConfiguration(hostsNew); - auto topologyDescriptionNew = std::make_shared<sdam::TopologyDescription>(sdamConfigNew); - pingMonitor->onTopologyDescriptionChangedEvent( - UUID::gen(), topologyDescriptionOld, topologyDescriptionNew); - } - private: Date_t _startDate; std::unique_ptr<sdam::TopologyListenerMock> _topologyListener; @@ -313,8 +277,7 @@ TEST_F(SingleServerPingMonitorTest, pingDeadServer) { ASSERT_TRUE(topologyListener->hasPingResponse(hostAndPort)); auto pingResponse = topologyListener->getPingResponse(hostAndPort); - ASSERT_EQ(pingResponse.size(), 1); - ASSERT_EQ(ErrorCodes::HostUnreachable, pingResponse[0].getStatus()); + ASSERT_EQ(ErrorCodes::HostUnreachable, pingResponse.getStatus()); checkNoActivityBefore(deadline); }; @@ -347,13 +310,14 @@ TEST_F(SingleServerPingMonitorTest, noPingAfterSingleServerPingMonitorClosed) { class ServerPingMonitorTest : public ServerPingMonitorTestFixture { protected: std::unique_ptr<ServerPingMonitor> makeServerPingMonitor(Seconds pingFrequency) { - return std::make_unique<ServerPingMonitor>( - getTopologyListener(), pingFrequency, getExecutor()); + auto executor = boost::optional<std::shared_ptr<executor::TaskExecutor>>(getExecutor()); + return std::make_unique<ServerPingMonitor>(getTopologyListener(), pingFrequency, executor); } }; /** - * Adds and removes a SingleServerPingMonitor from the ServerPingMonitor. + * Adds and removes a SingleServerPingMonitor from the ServerPingMonitor via + * onServerHandshakeCompleteEvent and onServerClosedEvent. */ TEST_F(ServerPingMonitorTest, singleNodeServerPingMonitorCycle) { auto pingFrequency = Seconds(10); @@ -362,6 +326,7 @@ TEST_F(ServerPingMonitorTest, singleNodeServerPingMonitorCycle) { "test", 1, /* hasPrimary = */ false, /* dollarPrefixHosts = */ false); auto hostAndPort = HostAndPort(replSet->getSecondaries()[0]).toString(); + auto oid = OID::gen(); // Add a SingleServerPingMonitor to the ServerPingMonitor. Confirm pings are sent to the server // at pingFrequency. @@ -371,7 +336,7 @@ TEST_F(ServerPingMonitorTest, singleNodeServerPingMonitorCycle) { // Close the SingleServerMonitor before the third ping and confirm ping activity to the server // is stopped. - closeMonitor(replSet.get(), hostAndPort, serverPingMonitor.get()); + serverPingMonitor->onServerClosedEvent(hostAndPort, oid); checkNoActivityBefore(elapsed() + pingFrequency * 2, hostAndPort); } @@ -388,6 +353,7 @@ TEST_F(ServerPingMonitorTest, twoNodeServerPingMonitorOneClosed) { auto hosts = replSet->getHosts(); auto host0 = hosts[0].toString(); auto host1 = hosts[1].toString(); + auto oid0 = OID::gen(); // Add SingleServerPingMonitors for host0 and host1 where host1 is added host1Delay seconds // after host0. @@ -398,7 +364,7 @@ TEST_F(ServerPingMonitorTest, twoNodeServerPingMonitorOneClosed) { serverPingMonitor->onServerHandshakeCompleteEvent(initialRTT, host1); checkSinglePing(pingFrequency - Seconds(2), host1, replSet.get()); - closeMonitor(replSet.get(), host0, serverPingMonitor.get()); + serverPingMonitor->onServerClosedEvent(host0, oid0); checkNoActivityBefore(pingFrequency + host1Delay, host0); // Confirm that host1's SingleServerPingMonitor continues ping activity. diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp index 2c286de0368..65e24727c0c 100644 --- a/src/mongo/client/streamable_replica_set_monitor.cpp +++ b/src/mongo/client/streamable_replica_set_monitor.cpp @@ -172,16 +172,11 @@ void StreamableReplicaSetMonitor::init() { _topologyManager = std::make_unique<TopologyManager>( _sdamConfig, getGlobalServiceContext()->getPreciseClockSource(), _eventsPublisher); - _eventsPublisher->registerListener(shared_from_this()); - - _pingMonitor = std::make_unique<ServerPingMonitor>( - _eventsPublisher.get(), sdam::SdamConfiguration::kDefaultHeartbeatFrequencyMs, _executor); - _eventsPublisher->registerListener(_pingMonitor); - _isMasterMonitor = std::make_unique<ServerIsMasterMonitor>( _uri, _sdamConfig, _eventsPublisher, _topologyManager->getTopologyDescription(), _executor); - _eventsPublisher->registerListener(_isMasterMonitor); + _eventsPublisher->registerListener(shared_from_this()); + _eventsPublisher->registerListener(_isMasterMonitor); _isDropped.store(false); ReplicaSetMonitorManager::get()->getNotifier().onFoundSet(getName()); @@ -195,7 +190,6 @@ void StreamableReplicaSetMonitor::drop() { LOGV2(4333209, "Closing Replica Set Monitor {setName}", "setName"_attr = getName()); _eventsPublisher->close(); _queryProcessor->shutdown(); - _pingMonitor->shutdown(); _isMasterMonitor->shutdown(); _failOutstandingWithStatus( lock, Status{ErrorCodes::ShutdownInProgress, "the ReplicaSetMonitor is shutting down"}); @@ -568,8 +562,7 @@ void StreamableReplicaSetMonitor::onTopologyDescriptionChangedEvent( void StreamableReplicaSetMonitor::onServerHeartbeatSucceededEvent(sdam::IsMasterRTT durationMs, const ServerAddress& hostAndPort, const BSONObj reply) { - // After the inital handshake, isMasterResponses should not update the RTT with durationMs. - IsMasterOutcome outcome(hostAndPort, reply, boost::none); + IsMasterOutcome outcome(hostAndPort, reply, durationMs); _topologyManager->onServerDescription(outcome); } @@ -583,23 +576,11 @@ void StreamableReplicaSetMonitor::onServerHeartbeatFailureEvent(IsMasterRTT dura void StreamableReplicaSetMonitor::onServerPingFailedEvent(const ServerAddress& hostAndPort, const Status& status) { - LOGV2_DEBUG(4668133, - 0, - " StreamableReplicaSetMonitor::onServerPingFailedEvent, ServerPingMonitor got " - "status{status} ", - "addr"_attr = hostAndPort, - "status"_attr = status); failedHost(HostAndPort(hostAndPort), status); } void StreamableReplicaSetMonitor::onServerPingSucceededEvent(sdam::IsMasterRTT durationMS, const ServerAddress& hostAndPort) { - LOGV2_DEBUG(4668132, - 1, - " StreamableReplicaSetMonitor::onServerPingSucceededEvent, ServerPingMonitor for " - "{addr} with {rtt}", - "addr"_attr = hostAndPort, - "rtt"_attr = durationMS); _topologyManager->onServerRTTUpdated(hostAndPort, durationMS); } diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h index 5ec56495684..ebf77642738 100644 --- a/src/mongo/client/streamable_replica_set_monitor.h +++ b/src/mongo/client/streamable_replica_set_monitor.h @@ -40,7 +40,6 @@ #include "mongo/client/replica_set_monitor.h" #include "mongo/client/sdam/sdam.h" #include "mongo/client/server_is_master_monitor.h" -#include "mongo/client/server_ping_monitor.h" #include "mongo/executor/task_executor.h" #include "mongo/logger/log_component.h" #include "mongo/util/concurrency/with_lock.h" @@ -202,7 +201,6 @@ private: sdam::ServerSelectorPtr _serverSelector; sdam::TopologyEventsPublisherPtr _eventsPublisher; ServerIsMasterMonitorPtr _isMasterMonitor; - std::shared_ptr<ServerPingMonitor> _pingMonitor; // This object will be registered as a TopologyListener if there are // any outstanding queries for this RSM instance. |