summaryrefslogtreecommitdiff
path: root/src/mongo/client
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2020-02-10 18:16:55 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-03-04 18:00:52 +0000
commitb74fd5ae11cb95a42cf18e996e81136546cfddda (patch)
treec79a66e1158207996440aae11d3beb0d2eb64a61 /src/mongo/client
parentfdb12065e79cd46bd0dae0f93ccd682dfeff120c (diff)
downloadmongo-b74fd5ae11cb95a42cf18e996e81136546cfddda.tar.gz
SERVER-45230 Rtt and isMaster from initial handshake should populate initial server description
Diffstat (limited to 'src/mongo/client')
-rw-r--r--src/mongo/client/replica_set_monitor_manager.cpp51
-rw-r--r--src/mongo/client/replica_set_monitor_manager.h19
-rw-r--r--src/mongo/client/sdam/SConscript2
-rw-r--r--src/mongo/client/sdam/topology_listener.cpp24
-rw-r--r--src/mongo/client/sdam/topology_listener.h19
-rw-r--r--src/mongo/client/server_ping_monitor.cpp5
-rw-r--r--src/mongo/client/server_ping_monitor.h4
-rw-r--r--src/mongo/client/server_ping_monitor_test.cpp22
-rw-r--r--src/mongo/client/streamable_replica_set_monitor.cpp12
-rw-r--r--src/mongo/client/streamable_replica_set_monitor.h6
10 files changed, 141 insertions, 23 deletions
diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp
index f83669496f1..3f0e8ff65b7 100644
--- a/src/mongo/client/replica_set_monitor_manager.cpp
+++ b/src/mongo/client/replica_set_monitor_manager.cpp
@@ -71,6 +71,41 @@ const auto getGlobalRSMMonitorManager =
ServiceContext::declareDecoration<ReplicaSetMonitorManager>();
} // namespace
+Status ReplicaSetMonitorManagerNetworkConnectionHook::validateHost(
+ const HostAndPort& remoteHost,
+ const BSONObj& isMasterRequest,
+ const executor::RemoteCommandResponse& isMasterReply) {
+ if (gReplicaSetMonitorProtocol != ReplicaSetMonitorProtocol::kScanning) {
+ auto monitor = ReplicaSetMonitorManager::get()->getMonitorForHost(remoteHost);
+ if (!monitor) {
+ return Status::OK();
+ }
+
+ if (std::shared_ptr<StreamableReplicaSetMonitor> streamableMonitor =
+ std::dynamic_pointer_cast<StreamableReplicaSetMonitor>(
+ ReplicaSetMonitorManager::get()->getMonitorForHost(remoteHost))) {
+
+ auto publisher = streamableMonitor->getEventsPublisher();
+ if (publisher) {
+ publisher->onServerHandshakeCompleteEvent(
+ isMasterReply.elapsedMillis.get(), remoteHost.toString(), isMasterReply.data);
+ }
+ }
+ }
+
+ return Status::OK();
+}
+
+StatusWith<boost::optional<executor::RemoteCommandRequest>>
+ReplicaSetMonitorManagerNetworkConnectionHook::makeRequest(const HostAndPort& remoteHost) {
+ return {boost::none};
+}
+
+Status ReplicaSetMonitorManagerNetworkConnectionHook::handleReply(
+ const HostAndPort& remoteHost, executor::RemoteCommandResponse&& response) {
+ MONGO_UNREACHABLE;
+}
+
ReplicaSetMonitorManager::~ReplicaSetMonitorManager() {
shutdown();
}
@@ -97,8 +132,9 @@ void ReplicaSetMonitorManager::_setupTaskExecutorInLock() {
// construct task executor
auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
+ auto networkConnectionHook = std::make_unique<ReplicaSetMonitorManagerNetworkConnectionHook>();
auto net = executor::makeNetworkInterface(
- "ReplicaSetMonitor-TaskExecutor", nullptr, std::move(hookList));
+ "ReplicaSetMonitor-TaskExecutor", std::move(networkConnectionHook), std::move(hookList));
auto pool = std::make_unique<NetworkInterfaceThreadPool>(net.get());
_taskExecutor = std::make_shared<ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
_taskExecutor->startup();
@@ -143,6 +179,19 @@ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(const
return newMonitor;
}
+shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getMonitorForHost(const HostAndPort& host) {
+ stdx::lock_guard<Latch> lk(_mutex);
+
+ for (auto entry : _monitors) {
+ auto monitor = entry.second.lock();
+ if (monitor->contains(host)) {
+ return monitor;
+ }
+ }
+
+ return shared_ptr<ReplicaSetMonitor>();
+}
+
vector<string> ReplicaSetMonitorManager::getAllSetNames() {
vector<string> allNames;
diff --git a/src/mongo/client/replica_set_monitor_manager.h b/src/mongo/client/replica_set_monitor_manager.h
index dc6913fa905..de628867b6e 100644
--- a/src/mongo/client/replica_set_monitor_manager.h
+++ b/src/mongo/client/replica_set_monitor_manager.h
@@ -33,6 +33,7 @@
#include <vector>
#include "mongo/client/replica_set_change_notifier.h"
+#include "mongo/executor/network_connection_hook.h"
#include "mongo/executor/task_executor.h"
#include "mongo/platform/mutex.h"
#include "mongo/util/hierarchical_acquisition.h"
@@ -45,6 +46,22 @@ class ConnectionString;
class ReplicaSetMonitor;
class MongoURI;
+class ReplicaSetMonitorManagerNetworkConnectionHook final : public executor::NetworkConnectionHook {
+public:
+ ReplicaSetMonitorManagerNetworkConnectionHook() = default;
+ virtual ~ReplicaSetMonitorManagerNetworkConnectionHook() = default;
+
+ Status validateHost(const HostAndPort& remoteHost,
+ const BSONObj& isMasterRequest,
+ const executor::RemoteCommandResponse& isMasterReply) override;
+
+ StatusWith<boost::optional<executor::RemoteCommandRequest>> makeRequest(
+ const HostAndPort& remoteHost) override;
+
+ Status handleReply(const HostAndPort& remoteHost,
+ executor::RemoteCommandResponse&& response) override;
+};
+
/**
* Manages the lifetime of a set of replica set monitors.
*/
@@ -78,6 +95,8 @@ public:
*/
void removeMonitor(StringData setName);
+ std::shared_ptr<ReplicaSetMonitor> getMonitorForHost(const HostAndPort& host);
+
/**
* Removes and destroys all replica set monitors. Should be used for unit tests only.
*/
diff --git a/src/mongo/client/sdam/SConscript b/src/mongo/client/sdam/SConscript
index 6cc5c642ad6..4d7c01ac4b9 100644
--- a/src/mongo/client/sdam/SConscript
+++ b/src/mongo/client/sdam/SConscript
@@ -21,7 +21,7 @@ env.Library(
'$BUILD_DIR/mongo/db/repl/optime',
'$BUILD_DIR/mongo/util/clock_sources',
'$BUILD_DIR/mongo/client/read_preference',
- '$BUILD_DIR/mongo/executor/task_executor_interface',
+ '$BUILD_DIR/mongo/executor/task_executor_interface',
'$BUILD_DIR/mongo/db/wire_version',
'$BUILD_DIR/mongo/rpc/metadata',
],
diff --git a/src/mongo/client/sdam/topology_listener.cpp b/src/mongo/client/sdam/topology_listener.cpp
index f4b2dc66b3b..147fa7dcb26 100644
--- a/src/mongo/client/sdam/topology_listener.cpp
+++ b/src/mongo/client/sdam/topology_listener.cpp
@@ -26,13 +26,14 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
#include "mongo/client/sdam/topology_listener.h"
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
#include "mongo/util/log.h"
namespace mongo::sdam {
+
void TopologyEventsPublisher::registerListener(TopologyListenerPtr listener) {
stdx::lock_guard lock(_mutex);
_listeners.push_back(listener);
@@ -65,6 +66,21 @@ void TopologyEventsPublisher::onTopologyDescriptionChangedEvent(
_scheduleNextDelivery();
}
+void TopologyEventsPublisher::onServerHandshakeCompleteEvent(IsMasterRTT durationMs,
+ const sdam::ServerAddress& address,
+ const BSONObj reply) {
+ {
+ stdx::lock_guard<Mutex> lock(_eventQueueMutex);
+ EventPtr event = std::make_unique<Event>();
+ event->type = EventType::HANDSHAKE_COMPLETE;
+ event->duration = duration_cast<IsMasterRTT>(durationMs);
+ event->hostAndPort = address;
+ event->reply = reply;
+ _eventQueue.push_back(std::move(event));
+ }
+ _scheduleNextDelivery();
+}
+
void TopologyEventsPublisher::onServerHeartbeatSucceededEvent(IsMasterRTT durationMs,
const ServerAddress& hostAndPort,
const BSONObj reply) {
@@ -156,6 +172,12 @@ void TopologyEventsPublisher::_sendEvent(TopologyListenerPtr listener, const Eve
listener->onTopologyDescriptionChangedEvent(
UUID::gen(), event.previousDescription, event.newDescription);
break;
+ case EventType::HANDSHAKE_COMPLETE:
+ listener->onServerHandshakeCompleteEvent(
+ sdam::IsMasterRTT(duration_cast<Milliseconds>(event.duration)),
+ event.hostAndPort,
+ event.reply);
+ break;
default:
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/client/sdam/topology_listener.h b/src/mongo/client/sdam/topology_listener.h
index 86f46b3848e..4adbb2329d2 100644
--- a/src/mongo/client/sdam/topology_listener.h
+++ b/src/mongo/client/sdam/topology_listener.h
@@ -57,6 +57,14 @@ public:
const ServerAddress& hostAndPort,
const BSONObj reply){};
/**
+ * Called when a ServerHandshakeCompleteEvent is published - The initial handshake to the server
+ * at hostAndPort was successful. durationMS is the measured RTT (Round Trip Time).
+ */
+ virtual void onServerHandshakeCompleteEvent(IsMasterRTT durationMs,
+ const sdam::ServerAddress& address,
+ const BSONObj reply = BSONObj()){};
+
+ /**
* Called when a ServerHeartBeatSucceededEvent is published - A heartbeat sent to the server at
* hostAndPort succeeded. durationMS is the execution time of the event, including the time it
* took to send the message and recieve the reply from the server.
@@ -85,8 +93,8 @@ public:
* To publish an event to all registered listeners call the corresponding event function on the
* TopologyEventsPublisher instance.
*/
-class TopologyEventsPublisher final : public TopologyListener,
- public std::enable_shared_from_this<TopologyEventsPublisher> {
+class TopologyEventsPublisher : public TopologyListener,
+ public std::enable_shared_from_this<TopologyEventsPublisher> {
public:
TopologyEventsPublisher(std::shared_ptr<executor::TaskExecutor> executor)
: _executor(executor){};
@@ -97,6 +105,9 @@ public:
void onTopologyDescriptionChangedEvent(UUID topologyId,
TopologyDescriptionPtr previousDescription,
TopologyDescriptionPtr newDescription) override;
+ virtual void onServerHandshakeCompleteEvent(IsMasterRTT durationMs,
+ const sdam::ServerAddress& address,
+ const BSONObj reply = BSONObj()) override;
void onServerHeartbeatSucceededEvent(IsMasterRTT durationMs,
const ServerAddress& hostAndPort,
const BSONObj reply) override;
@@ -108,14 +119,14 @@ public:
void onServerPingSucceededEvent(IsMasterRTT durationMS,
const ServerAddress& hostAndPort) override;
-
private:
enum class EventType {
HEARTBEAT_SUCCESS,
HEARTBEAT_FAILURE,
PING_SUCCESS,
PING_FAILURE,
- TOPOLOGY_DESCRIPTION_CHANGED
+ TOPOLOGY_DESCRIPTION_CHANGED,
+ HANDSHAKE_COMPLETE
};
struct Event {
EventType type;
diff --git a/src/mongo/client/server_ping_monitor.cpp b/src/mongo/client/server_ping_monitor.cpp
index 7359b07e503..bc448888bb8 100644
--- a/src/mongo/client/server_ping_monitor.cpp
+++ b/src/mongo/client/server_ping_monitor.cpp
@@ -225,8 +225,9 @@ void ServerPingMonitor::_setupTaskExecutor_inlock() {
}
}
-void ServerPingMonitor::onServerHandshakeCompleteEvent(const sdam::ServerAddress& address,
- OID topologyId) {
+void ServerPingMonitor::onServerHandshakeCompleteEvent(sdam::IsMasterRTT durationMs,
+ const sdam::ServerAddress& address,
+ const BSONObj reply) {
stdx::lock_guard lk(_mutex);
uassert(ErrorCodes::ShutdownInProgress,
str::stream() << "ServerPingMonitor is unable to start monitoring '" << address
diff --git a/src/mongo/client/server_ping_monitor.h b/src/mongo/client/server_ping_monitor.h
index d473c776c5f..a7289a79d99 100644
--- a/src/mongo/client/server_ping_monitor.h
+++ b/src/mongo/client/server_ping_monitor.h
@@ -151,7 +151,9 @@ public:
* The first isMaster exchange for a server succeeded. Creates a new
* SingleServerPingMonitor to monitor the new replica set member.
*/
- void onServerHandshakeCompleteEvent(const sdam::ServerAddress& address, OID topologyId);
+ void onServerHandshakeCompleteEvent(sdam::IsMasterRTT durationMs,
+ const sdam::ServerAddress& address,
+ const BSONObj reply = BSONObj());
/**
* The connection to the server was closed. Removes the server from the ServerPingMonitorList.
diff --git a/src/mongo/client/server_ping_monitor_test.cpp b/src/mongo/client/server_ping_monitor_test.cpp
index b4a3074ce0b..f9abd8050de 100644
--- a/src/mongo/client/server_ping_monitor_test.cpp
+++ b/src/mongo/client/server_ping_monitor_test.cpp
@@ -48,6 +48,7 @@
namespace mongo {
namespace {
+const sdam::IsMasterRTT initialRTT = duration_cast<Milliseconds>(Milliseconds(100));
using executor::NetworkInterfaceMock;
using executor::RemoteCommandResponse;
using executor::ThreadPoolExecutorTest;
@@ -308,12 +309,12 @@ TEST_F(ServerPingMonitorTest, singleNodeServerPingMonitorCycle) {
auto replSet = std::make_unique<MockReplicaSet>(
"test", 1, /* hasPrimary = */ false, /* dollarPrefixHosts = */ false);
- auto oid = OID::gen();
auto hostAndPort = HostAndPort(replSet->getSecondaries()[0]).toString();
+ auto oid = OID::gen();
// Add a SingleServerPingMonitor to the ServerPingMonitor. Confirm pings are sent to the server
// at pingFrequency.
- serverPingMonitor->onServerHandshakeCompleteEvent(hostAndPort, oid);
+ serverPingMonitor->onServerHandshakeCompleteEvent(initialRTT, hostAndPort);
checkSinglePing(pingFrequency, hostAndPort, replSet.get());
checkSinglePing(pingFrequency * 2 - Seconds(2), hostAndPort, replSet.get());
@@ -337,15 +338,14 @@ TEST_F(ServerPingMonitorTest, twoNodeServerPingMonitorOneClosed) {
auto host0 = hosts[0].toString();
auto host1 = hosts[1].toString();
auto oid0 = OID::gen();
- auto oid1 = OID::gen();
// Add SingleServerPingMonitors for host0 and host1 where host1 is added host1Delay seconds
// after host0.
auto host1Delay = Seconds(2);
- serverPingMonitor->onServerHandshakeCompleteEvent(host0, oid0);
+ serverPingMonitor->onServerHandshakeCompleteEvent(initialRTT, host0);
checkSinglePing(host1Delay, host0, replSet.get());
ASSERT_EQ(elapsed(), host1Delay);
- serverPingMonitor->onServerHandshakeCompleteEvent(host1, oid1);
+ serverPingMonitor->onServerHandshakeCompleteEvent(initialRTT, host1);
checkSinglePing(pingFrequency - Seconds(2), host1, replSet.get());
serverPingMonitor->onServerClosedEvent(host0, oid0);
@@ -367,8 +367,6 @@ TEST_F(ServerPingMonitorTest, twoNodeServerPingMonitorOneDead) {
auto hosts = replSet->getHosts();
auto host0 = hosts[0].toString();
auto host1 = hosts[1].toString();
- auto oid0 = OID::gen();
- auto oid1 = OID::gen();
{
NetworkInterfaceMock::InNetworkGuard ing(getNet());
@@ -376,12 +374,12 @@ TEST_F(ServerPingMonitorTest, twoNodeServerPingMonitorOneDead) {
}
auto host1Delay = Seconds(2);
- serverPingMonitor->onServerHandshakeCompleteEvent(host0, oid0);
+ serverPingMonitor->onServerHandshakeCompleteEvent(initialRTT, host0);
checkSinglePing(host1Delay, host0, replSet.get());
// Add host1 host1Delay after host2.
ASSERT_EQ(elapsed(), host1Delay);
- serverPingMonitor->onServerHandshakeCompleteEvent(host1, oid1);
+ serverPingMonitor->onServerHandshakeCompleteEvent(initialRTT, host1);
// Confirm host1 reported HostUnreachable to the TopologyLisener.
processPingRequest(host1, replSet.get());
@@ -416,16 +414,14 @@ TEST_F(ServerPingMonitorTest, twoNodeServerPingMonitorMutlipleShutdown) {
auto hosts = replSet->getHosts();
auto host0 = hosts[0].toString();
auto host1 = hosts[1].toString();
- auto oid0 = OID::gen();
- auto oid1 = OID::gen();
// Add SingleServerPingMonitors for host0 and host1 where host1 is added host1Delay seconds
// after host0.
auto host1Delay = Seconds(2);
- serverPingMonitor->onServerHandshakeCompleteEvent(host0, oid0);
+ serverPingMonitor->onServerHandshakeCompleteEvent(initialRTT, host0);
checkSinglePing(host1Delay, host0, replSet.get());
ASSERT_EQ(elapsed(), host1Delay);
- serverPingMonitor->onServerHandshakeCompleteEvent(host1, oid1);
+ serverPingMonitor->onServerHandshakeCompleteEvent(initialRTT, host1);
checkSinglePing(pingFrequency - Seconds(2), host1, replSet.get());
serverPingMonitor->shutdown();
diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp
index 0621cbc8793..c8548369363 100644
--- a/src/mongo/client/streamable_replica_set_monitor.cpp
+++ b/src/mongo/client/streamable_replica_set_monitor.cpp
@@ -331,6 +331,11 @@ HostAndPort StreamableReplicaSetMonitor::getMasterOrUassert() {
return getHostOrRefresh(kPrimaryOnlyReadPreference).get();
}
+sdam::TopologyEventsPublisherPtr StreamableReplicaSetMonitor::getEventsPublisher() {
+ return _eventsPublisher;
+}
+
+
void StreamableReplicaSetMonitor::failedHost(const HostAndPort& host, const Status& status) {
failedHost(host, BSONObj(), status);
}
@@ -535,6 +540,13 @@ void StreamableReplicaSetMonitor::onServerPingSucceededEvent(sdam::IsMasterRTT d
_topologyManager->onServerRTTUpdated(hostAndPort, durationMS);
}
+void StreamableReplicaSetMonitor::onServerHandshakeCompleteEvent(sdam::IsMasterRTT durationMs,
+ const ServerAddress& hostAndPort,
+ const BSONObj reply) {
+ IsMasterOutcome outcome(hostAndPort, reply, durationMs);
+ _topologyManager->onServerDescription(outcome);
+}
+
std::string StreamableReplicaSetMonitor::_logPrefix() {
return str::stream() << kLogPrefix << " [" << getName() << "] ";
}
diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h
index 59006e0221c..93e235677bb 100644
--- a/src/mongo/client/streamable_replica_set_monitor.h
+++ b/src/mongo/client/streamable_replica_set_monitor.h
@@ -109,6 +109,8 @@ public:
const MongoURI& getOriginalUri() const;
+ sdam::TopologyEventsPublisherPtr getEventsPublisher();
+
bool contains(const HostAndPort& server) const;
void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const;
@@ -159,6 +161,10 @@ private:
void onServerPingSucceededEvent(sdam::IsMasterRTT durationMS,
const sdam::ServerAddress& hostAndPort) override;
+ void onServerHandshakeCompleteEvent(sdam::IsMasterRTT durationMs,
+ const ServerAddress& hostAndPort,
+ const BSONObj reply) override;
+
// Get a pointer to the current primary's ServerDescription
// To ensure a consistent view of the Topology either _currentPrimary or _currentTopology should
// be called (not both) since the topology can change between the function invocations.