diff options
author | jannaerin <golden.janna@gmail.com> | 2020-02-10 18:16:55 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-03-04 18:00:52 +0000 |
commit | b74fd5ae11cb95a42cf18e996e81136546cfddda (patch) | |
tree | c79a66e1158207996440aae11d3beb0d2eb64a61 /src/mongo/client | |
parent | fdb12065e79cd46bd0dae0f93ccd682dfeff120c (diff) | |
download | mongo-b74fd5ae11cb95a42cf18e996e81136546cfddda.tar.gz |
SERVER-45230 Rtt and isMaster from initial handshake should populate initial server description
Diffstat (limited to 'src/mongo/client')
-rw-r--r-- | src/mongo/client/replica_set_monitor_manager.cpp | 51 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_manager.h | 19 | ||||
-rw-r--r-- | src/mongo/client/sdam/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_listener.cpp | 24 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_listener.h | 19 | ||||
-rw-r--r-- | src/mongo/client/server_ping_monitor.cpp | 5 | ||||
-rw-r--r-- | src/mongo/client/server_ping_monitor.h | 4 | ||||
-rw-r--r-- | src/mongo/client/server_ping_monitor_test.cpp | 22 | ||||
-rw-r--r-- | src/mongo/client/streamable_replica_set_monitor.cpp | 12 | ||||
-rw-r--r-- | src/mongo/client/streamable_replica_set_monitor.h | 6 |
10 files changed, 141 insertions, 23 deletions
diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp index f83669496f1..3f0e8ff65b7 100644 --- a/src/mongo/client/replica_set_monitor_manager.cpp +++ b/src/mongo/client/replica_set_monitor_manager.cpp @@ -71,6 +71,41 @@ const auto getGlobalRSMMonitorManager = ServiceContext::declareDecoration<ReplicaSetMonitorManager>(); } // namespace +Status ReplicaSetMonitorManagerNetworkConnectionHook::validateHost( + const HostAndPort& remoteHost, + const BSONObj& isMasterRequest, + const executor::RemoteCommandResponse& isMasterReply) { + if (gReplicaSetMonitorProtocol != ReplicaSetMonitorProtocol::kScanning) { + auto monitor = ReplicaSetMonitorManager::get()->getMonitorForHost(remoteHost); + if (!monitor) { + return Status::OK(); + } + + if (std::shared_ptr<StreamableReplicaSetMonitor> streamableMonitor = + std::dynamic_pointer_cast<StreamableReplicaSetMonitor>( + ReplicaSetMonitorManager::get()->getMonitorForHost(remoteHost))) { + + auto publisher = streamableMonitor->getEventsPublisher(); + if (publisher) { + publisher->onServerHandshakeCompleteEvent( + isMasterReply.elapsedMillis.get(), remoteHost.toString(), isMasterReply.data); + } + } + } + + return Status::OK(); +} + +StatusWith<boost::optional<executor::RemoteCommandRequest>> +ReplicaSetMonitorManagerNetworkConnectionHook::makeRequest(const HostAndPort& remoteHost) { + return {boost::none}; +} + +Status ReplicaSetMonitorManagerNetworkConnectionHook::handleReply( + const HostAndPort& remoteHost, executor::RemoteCommandResponse&& response) { + MONGO_UNREACHABLE; +} + ReplicaSetMonitorManager::~ReplicaSetMonitorManager() { shutdown(); } @@ -97,8 +132,9 @@ void ReplicaSetMonitorManager::_setupTaskExecutorInLock() { // construct task executor auto hookList = std::make_unique<rpc::EgressMetadataHookList>(); + auto networkConnectionHook = std::make_unique<ReplicaSetMonitorManagerNetworkConnectionHook>(); auto net = executor::makeNetworkInterface( - "ReplicaSetMonitor-TaskExecutor", nullptr, std::move(hookList)); + "ReplicaSetMonitor-TaskExecutor", std::move(networkConnectionHook), std::move(hookList)); auto pool = std::make_unique<NetworkInterfaceThreadPool>(net.get()); _taskExecutor = std::make_shared<ThreadPoolTaskExecutor>(std::move(pool), std::move(net)); _taskExecutor->startup(); @@ -143,6 +179,19 @@ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(const return newMonitor; } +shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getMonitorForHost(const HostAndPort& host) { + stdx::lock_guard<Latch> lk(_mutex); + + for (auto entry : _monitors) { + auto monitor = entry.second.lock(); + if (monitor->contains(host)) { + return monitor; + } + } + + return shared_ptr<ReplicaSetMonitor>(); +} + vector<string> ReplicaSetMonitorManager::getAllSetNames() { vector<string> allNames; diff --git a/src/mongo/client/replica_set_monitor_manager.h b/src/mongo/client/replica_set_monitor_manager.h index dc6913fa905..de628867b6e 100644 --- a/src/mongo/client/replica_set_monitor_manager.h +++ b/src/mongo/client/replica_set_monitor_manager.h @@ -33,6 +33,7 @@ #include <vector> #include "mongo/client/replica_set_change_notifier.h" +#include "mongo/executor/network_connection_hook.h" #include "mongo/executor/task_executor.h" #include "mongo/platform/mutex.h" #include "mongo/util/hierarchical_acquisition.h" @@ -45,6 +46,22 @@ class ConnectionString; class ReplicaSetMonitor; class MongoURI; +class ReplicaSetMonitorManagerNetworkConnectionHook final : public executor::NetworkConnectionHook { +public: + ReplicaSetMonitorManagerNetworkConnectionHook() = default; + virtual ~ReplicaSetMonitorManagerNetworkConnectionHook() = default; + + Status validateHost(const HostAndPort& remoteHost, + const BSONObj& isMasterRequest, + const executor::RemoteCommandResponse& isMasterReply) override; + + StatusWith<boost::optional<executor::RemoteCommandRequest>> makeRequest( + const HostAndPort& remoteHost) override; + + Status handleReply(const HostAndPort& remoteHost, + executor::RemoteCommandResponse&& response) override; +}; + /** * Manages the lifetime of a set of replica set monitors. */ @@ -78,6 +95,8 @@ public: */ void removeMonitor(StringData setName); + std::shared_ptr<ReplicaSetMonitor> getMonitorForHost(const HostAndPort& host); + /** * Removes and destroys all replica set monitors. Should be used for unit tests only. */ diff --git a/src/mongo/client/sdam/SConscript b/src/mongo/client/sdam/SConscript index 6cc5c642ad6..4d7c01ac4b9 100644 --- a/src/mongo/client/sdam/SConscript +++ b/src/mongo/client/sdam/SConscript @@ -21,7 +21,7 @@ env.Library( '$BUILD_DIR/mongo/db/repl/optime', '$BUILD_DIR/mongo/util/clock_sources', '$BUILD_DIR/mongo/client/read_preference', - '$BUILD_DIR/mongo/executor/task_executor_interface', + '$BUILD_DIR/mongo/executor/task_executor_interface', '$BUILD_DIR/mongo/db/wire_version', '$BUILD_DIR/mongo/rpc/metadata', ], diff --git a/src/mongo/client/sdam/topology_listener.cpp b/src/mongo/client/sdam/topology_listener.cpp index f4b2dc66b3b..147fa7dcb26 100644 --- a/src/mongo/client/sdam/topology_listener.cpp +++ b/src/mongo/client/sdam/topology_listener.cpp @@ -26,13 +26,14 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork #include "mongo/client/sdam/topology_listener.h" -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork #include "mongo/util/log.h" namespace mongo::sdam { + void TopologyEventsPublisher::registerListener(TopologyListenerPtr listener) { stdx::lock_guard lock(_mutex); _listeners.push_back(listener); @@ -65,6 +66,21 @@ void TopologyEventsPublisher::onTopologyDescriptionChangedEvent( _scheduleNextDelivery(); } +void TopologyEventsPublisher::onServerHandshakeCompleteEvent(IsMasterRTT durationMs, + const sdam::ServerAddress& address, + const BSONObj reply) { + { + stdx::lock_guard<Mutex> lock(_eventQueueMutex); + EventPtr event = std::make_unique<Event>(); + event->type = EventType::HANDSHAKE_COMPLETE; + event->duration = duration_cast<IsMasterRTT>(durationMs); + event->hostAndPort = address; + event->reply = reply; + _eventQueue.push_back(std::move(event)); + } + _scheduleNextDelivery(); +} + void TopologyEventsPublisher::onServerHeartbeatSucceededEvent(IsMasterRTT durationMs, const ServerAddress& hostAndPort, const BSONObj reply) { @@ -156,6 +172,12 @@ void TopologyEventsPublisher::_sendEvent(TopologyListenerPtr listener, const Eve listener->onTopologyDescriptionChangedEvent( UUID::gen(), event.previousDescription, event.newDescription); break; + case EventType::HANDSHAKE_COMPLETE: + listener->onServerHandshakeCompleteEvent( + sdam::IsMasterRTT(duration_cast<Milliseconds>(event.duration)), + event.hostAndPort, + event.reply); + break; default: MONGO_UNREACHABLE; } diff --git a/src/mongo/client/sdam/topology_listener.h b/src/mongo/client/sdam/topology_listener.h index 86f46b3848e..4adbb2329d2 100644 --- a/src/mongo/client/sdam/topology_listener.h +++ b/src/mongo/client/sdam/topology_listener.h @@ -57,6 +57,14 @@ public: const ServerAddress& hostAndPort, const BSONObj reply){}; /** + * Called when a ServerHandshakeCompleteEvent is published - The initial handshake to the server + * at hostAndPort was successful. durationMS is the measured RTT (Round Trip Time). + */ + virtual void onServerHandshakeCompleteEvent(IsMasterRTT durationMs, + const sdam::ServerAddress& address, + const BSONObj reply = BSONObj()){}; + + /** * Called when a ServerHeartBeatSucceededEvent is published - A heartbeat sent to the server at * hostAndPort succeeded. durationMS is the execution time of the event, including the time it * took to send the message and recieve the reply from the server. @@ -85,8 +93,8 @@ public: * To publish an event to all registered listeners call the corresponding event function on the * TopologyEventsPublisher instance. */ -class TopologyEventsPublisher final : public TopologyListener, - public std::enable_shared_from_this<TopologyEventsPublisher> { +class TopologyEventsPublisher : public TopologyListener, + public std::enable_shared_from_this<TopologyEventsPublisher> { public: TopologyEventsPublisher(std::shared_ptr<executor::TaskExecutor> executor) : _executor(executor){}; @@ -97,6 +105,9 @@ public: void onTopologyDescriptionChangedEvent(UUID topologyId, TopologyDescriptionPtr previousDescription, TopologyDescriptionPtr newDescription) override; + virtual void onServerHandshakeCompleteEvent(IsMasterRTT durationMs, + const sdam::ServerAddress& address, + const BSONObj reply = BSONObj()) override; void onServerHeartbeatSucceededEvent(IsMasterRTT durationMs, const ServerAddress& hostAndPort, const BSONObj reply) override; @@ -108,14 +119,14 @@ public: void onServerPingSucceededEvent(IsMasterRTT durationMS, const ServerAddress& hostAndPort) override; - private: enum class EventType { HEARTBEAT_SUCCESS, HEARTBEAT_FAILURE, PING_SUCCESS, PING_FAILURE, - TOPOLOGY_DESCRIPTION_CHANGED + TOPOLOGY_DESCRIPTION_CHANGED, + HANDSHAKE_COMPLETE }; struct Event { EventType type; diff --git a/src/mongo/client/server_ping_monitor.cpp b/src/mongo/client/server_ping_monitor.cpp index 7359b07e503..bc448888bb8 100644 --- a/src/mongo/client/server_ping_monitor.cpp +++ b/src/mongo/client/server_ping_monitor.cpp @@ -225,8 +225,9 @@ void ServerPingMonitor::_setupTaskExecutor_inlock() { } } -void ServerPingMonitor::onServerHandshakeCompleteEvent(const sdam::ServerAddress& address, - OID topologyId) { +void ServerPingMonitor::onServerHandshakeCompleteEvent(sdam::IsMasterRTT durationMs, + const sdam::ServerAddress& address, + const BSONObj reply) { stdx::lock_guard lk(_mutex); uassert(ErrorCodes::ShutdownInProgress, str::stream() << "ServerPingMonitor is unable to start monitoring '" << address diff --git a/src/mongo/client/server_ping_monitor.h b/src/mongo/client/server_ping_monitor.h index d473c776c5f..a7289a79d99 100644 --- a/src/mongo/client/server_ping_monitor.h +++ b/src/mongo/client/server_ping_monitor.h @@ -151,7 +151,9 @@ public: * The first isMaster exchange for a server succeeded. Creates a new * SingleServerPingMonitor to monitor the new replica set member. */ - void onServerHandshakeCompleteEvent(const sdam::ServerAddress& address, OID topologyId); + void onServerHandshakeCompleteEvent(sdam::IsMasterRTT durationMs, + const sdam::ServerAddress& address, + const BSONObj reply = BSONObj()); /** * The connection to the server was closed. Removes the server from the ServerPingMonitorList. diff --git a/src/mongo/client/server_ping_monitor_test.cpp b/src/mongo/client/server_ping_monitor_test.cpp index b4a3074ce0b..f9abd8050de 100644 --- a/src/mongo/client/server_ping_monitor_test.cpp +++ b/src/mongo/client/server_ping_monitor_test.cpp @@ -48,6 +48,7 @@ namespace mongo { namespace { +const sdam::IsMasterRTT initialRTT = duration_cast<Milliseconds>(Milliseconds(100)); using executor::NetworkInterfaceMock; using executor::RemoteCommandResponse; using executor::ThreadPoolExecutorTest; @@ -308,12 +309,12 @@ TEST_F(ServerPingMonitorTest, singleNodeServerPingMonitorCycle) { auto replSet = std::make_unique<MockReplicaSet>( "test", 1, /* hasPrimary = */ false, /* dollarPrefixHosts = */ false); - auto oid = OID::gen(); auto hostAndPort = HostAndPort(replSet->getSecondaries()[0]).toString(); + auto oid = OID::gen(); // Add a SingleServerPingMonitor to the ServerPingMonitor. Confirm pings are sent to the server // at pingFrequency. - serverPingMonitor->onServerHandshakeCompleteEvent(hostAndPort, oid); + serverPingMonitor->onServerHandshakeCompleteEvent(initialRTT, hostAndPort); checkSinglePing(pingFrequency, hostAndPort, replSet.get()); checkSinglePing(pingFrequency * 2 - Seconds(2), hostAndPort, replSet.get()); @@ -337,15 +338,14 @@ TEST_F(ServerPingMonitorTest, twoNodeServerPingMonitorOneClosed) { auto host0 = hosts[0].toString(); auto host1 = hosts[1].toString(); auto oid0 = OID::gen(); - auto oid1 = OID::gen(); // Add SingleServerPingMonitors for host0 and host1 where host1 is added host1Delay seconds // after host0. auto host1Delay = Seconds(2); - serverPingMonitor->onServerHandshakeCompleteEvent(host0, oid0); + serverPingMonitor->onServerHandshakeCompleteEvent(initialRTT, host0); checkSinglePing(host1Delay, host0, replSet.get()); ASSERT_EQ(elapsed(), host1Delay); - serverPingMonitor->onServerHandshakeCompleteEvent(host1, oid1); + serverPingMonitor->onServerHandshakeCompleteEvent(initialRTT, host1); checkSinglePing(pingFrequency - Seconds(2), host1, replSet.get()); serverPingMonitor->onServerClosedEvent(host0, oid0); @@ -367,8 +367,6 @@ TEST_F(ServerPingMonitorTest, twoNodeServerPingMonitorOneDead) { auto hosts = replSet->getHosts(); auto host0 = hosts[0].toString(); auto host1 = hosts[1].toString(); - auto oid0 = OID::gen(); - auto oid1 = OID::gen(); { NetworkInterfaceMock::InNetworkGuard ing(getNet()); @@ -376,12 +374,12 @@ TEST_F(ServerPingMonitorTest, twoNodeServerPingMonitorOneDead) { } auto host1Delay = Seconds(2); - serverPingMonitor->onServerHandshakeCompleteEvent(host0, oid0); + serverPingMonitor->onServerHandshakeCompleteEvent(initialRTT, host0); checkSinglePing(host1Delay, host0, replSet.get()); // Add host1 host1Delay after host2. ASSERT_EQ(elapsed(), host1Delay); - serverPingMonitor->onServerHandshakeCompleteEvent(host1, oid1); + serverPingMonitor->onServerHandshakeCompleteEvent(initialRTT, host1); // Confirm host1 reported HostUnreachable to the TopologyLisener. processPingRequest(host1, replSet.get()); @@ -416,16 +414,14 @@ TEST_F(ServerPingMonitorTest, twoNodeServerPingMonitorMutlipleShutdown) { auto hosts = replSet->getHosts(); auto host0 = hosts[0].toString(); auto host1 = hosts[1].toString(); - auto oid0 = OID::gen(); - auto oid1 = OID::gen(); // Add SingleServerPingMonitors for host0 and host1 where host1 is added host1Delay seconds // after host0. auto host1Delay = Seconds(2); - serverPingMonitor->onServerHandshakeCompleteEvent(host0, oid0); + serverPingMonitor->onServerHandshakeCompleteEvent(initialRTT, host0); checkSinglePing(host1Delay, host0, replSet.get()); ASSERT_EQ(elapsed(), host1Delay); - serverPingMonitor->onServerHandshakeCompleteEvent(host1, oid1); + serverPingMonitor->onServerHandshakeCompleteEvent(initialRTT, host1); checkSinglePing(pingFrequency - Seconds(2), host1, replSet.get()); serverPingMonitor->shutdown(); diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp index 0621cbc8793..c8548369363 100644 --- a/src/mongo/client/streamable_replica_set_monitor.cpp +++ b/src/mongo/client/streamable_replica_set_monitor.cpp @@ -331,6 +331,11 @@ HostAndPort StreamableReplicaSetMonitor::getMasterOrUassert() { return getHostOrRefresh(kPrimaryOnlyReadPreference).get(); } +sdam::TopologyEventsPublisherPtr StreamableReplicaSetMonitor::getEventsPublisher() { + return _eventsPublisher; +} + + void StreamableReplicaSetMonitor::failedHost(const HostAndPort& host, const Status& status) { failedHost(host, BSONObj(), status); } @@ -535,6 +540,13 @@ void StreamableReplicaSetMonitor::onServerPingSucceededEvent(sdam::IsMasterRTT d _topologyManager->onServerRTTUpdated(hostAndPort, durationMS); } +void StreamableReplicaSetMonitor::onServerHandshakeCompleteEvent(sdam::IsMasterRTT durationMs, + const ServerAddress& hostAndPort, + const BSONObj reply) { + IsMasterOutcome outcome(hostAndPort, reply, durationMs); + _topologyManager->onServerDescription(outcome); +} + std::string StreamableReplicaSetMonitor::_logPrefix() { return str::stream() << kLogPrefix << " [" << getName() << "] "; } diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h index 59006e0221c..93e235677bb 100644 --- a/src/mongo/client/streamable_replica_set_monitor.h +++ b/src/mongo/client/streamable_replica_set_monitor.h @@ -109,6 +109,8 @@ public: const MongoURI& getOriginalUri() const; + sdam::TopologyEventsPublisherPtr getEventsPublisher(); + bool contains(const HostAndPort& server) const; void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const; @@ -159,6 +161,10 @@ private: void onServerPingSucceededEvent(sdam::IsMasterRTT durationMS, const sdam::ServerAddress& hostAndPort) override; + void onServerHandshakeCompleteEvent(sdam::IsMasterRTT durationMs, + const ServerAddress& hostAndPort, + const BSONObj reply) override; + // Get a pointer to the current primary's ServerDescription // To ensure a consistent view of the Topology either _currentPrimary or _currentTopology should // be called (not both) since the topology can change between the function invocations. |