diff options
author | Haley Connelly <haley.connelly@mongodb.com> | 2020-04-06 16:40:58 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-04-22 17:48:38 +0000 |
commit | 9c442e31cc9d7fda1ee51f142e61ccac21de7c7e (patch) | |
tree | c0aca6806b4dd9866f52acac7e18e1d96ca0afd4 /src/mongo/client | |
parent | 1563ac389bdcb08aadeb31705b2cc123742b739b (diff) | |
download | mongo-9c442e31cc9d7fda1ee51f142e61ccac21de7c7e.tar.gz |
SERVER-45799 Add unit tests for ServerIsMasterMonitor
Diffstat (limited to 'src/mongo/client')
-rw-r--r-- | src/mongo/client/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_listener_mock.cpp | 54 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_listener_mock.h | 31 | ||||
-rw-r--r-- | src/mongo/client/server_is_master_monitor.cpp | 5 | ||||
-rw-r--r-- | src/mongo/client/server_is_master_monitor.h | 2 | ||||
-rw-r--r-- | src/mongo/client/server_is_master_monitor_test.cpp | 561 |
6 files changed, 640 insertions, 14 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index d2f6a3ad102..deecf85f78c 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -335,6 +335,7 @@ env.CppUnitTest( 'scanning_replica_set_monitor_test_concurrent.cpp', 'scanning_replica_set_monitor_test_fixture.cpp', 'server_is_master_monitor_expedited_test.cpp', + 'server_is_master_monitor_test.cpp', 'server_ping_monitor_test.cpp', 'streamable_replica_set_monitor_error_handler_test.cpp', ], diff --git a/src/mongo/client/sdam/topology_listener_mock.cpp b/src/mongo/client/sdam/topology_listener_mock.cpp index 7dddbaefea6..64255fdbf20 100644 --- a/src/mongo/client/sdam/topology_listener_mock.cpp +++ b/src/mongo/client/sdam/topology_listener_mock.cpp @@ -34,6 +34,49 @@ namespace mongo::sdam { +void TopologyListenerMock::onServerHeartbeatSucceededEvent(const ServerAddress& hostAndPort, + const BSONObj reply) { + stdx::lock_guard lk(_mutex); + auto it = _serverIsMasterReplies.find(hostAndPort); + if (it != _serverIsMasterReplies.end()) { + it->second.emplace_back(Status::OK()); + } else { + _serverIsMasterReplies.emplace(hostAndPort, std::vector<Status>{Status::OK()}); + } +} + +void TopologyListenerMock::onServerHeartbeatFailureEvent(Status errorStatus, + const ServerAddress& hostAndPort, + const BSONObj reply) { + stdx::lock_guard lk(_mutex); + // If the map already contains an element for hostAndPort, append to its already existing + // vector. Otherwise, create a new vector. + auto it = _serverIsMasterReplies.find(hostAndPort); + if (it != _serverIsMasterReplies.end()) { + it->second.emplace_back(errorStatus); + } else { + _serverIsMasterReplies.emplace(hostAndPort, std::vector<Status>{errorStatus}); + } +} + +bool TopologyListenerMock::hasIsMasterResponse(const ServerAddress& hostAndPort) { + stdx::lock_guard lock(_mutex); + return _hasIsMasterResponse(lock, hostAndPort); +} + +bool TopologyListenerMock::_hasIsMasterResponse(WithLock, const ServerAddress& hostAndPort) { + return _serverIsMasterReplies.find(hostAndPort) != _serverIsMasterReplies.end(); +} + +std::vector<Status> TopologyListenerMock::getIsMasterResponse(const ServerAddress& hostAndPort) { + stdx::lock_guard lock(_mutex); + invariant(_hasIsMasterResponse(lock, hostAndPort)); + auto it = _serverIsMasterReplies.find(hostAndPort); + auto statusWithIsMasterResponse = it->second; + _serverIsMasterReplies.erase(it); + return statusWithIsMasterResponse; +} + void TopologyListenerMock::onServerPingSucceededEvent(IsMasterRTT latency, const ServerAddress& hostAndPort) { stdx::lock_guard lk(_mutex); @@ -57,19 +100,20 @@ void TopologyListenerMock::onServerPingFailedEvent(const ServerAddress& hostAndP _serverPingRTTs.emplace(hostAndPort, std::vector<StatusWith<IsMasterRTT>>{errorStatus}); } } + bool TopologyListenerMock::hasPingResponse(const ServerAddress& hostAndPort) { - stdx::lock_guard lk(_mutex); - return _hasPingResponse_inlock(hostAndPort); + stdx::lock_guard lock(_mutex); + return _hasPingResponse(lock, hostAndPort); } -bool TopologyListenerMock::_hasPingResponse_inlock(const ServerAddress& hostAndPort) { +bool TopologyListenerMock::_hasPingResponse(WithLock, const ServerAddress& hostAndPort) { return _serverPingRTTs.find(hostAndPort) != _serverPingRTTs.end(); } std::vector<StatusWith<IsMasterRTT>> TopologyListenerMock::getPingResponse( const ServerAddress& hostAndPort) { - stdx::lock_guard lk(_mutex); - invariant(_hasPingResponse_inlock(hostAndPort)); + stdx::lock_guard lock(_mutex); + invariant(_hasPingResponse(lock, hostAndPort)); auto it = _serverPingRTTs.find(hostAndPort); auto statusWithRTT = it->second; _serverPingRTTs.erase(it); diff --git a/src/mongo/client/sdam/topology_listener_mock.h b/src/mongo/client/sdam/topology_listener_mock.h index 05207512ea1..836354e52c2 100644 --- a/src/mongo/client/sdam/topology_listener_mock.h +++ b/src/mongo/client/sdam/topology_listener_mock.h @@ -40,29 +40,42 @@ public: TopologyListenerMock() = default; virtual ~TopologyListenerMock() = default; - void onServerPingSucceededEvent(IsMasterRTT latency, const ServerAddress& hostAndPort) override; + void onServerHeartbeatSucceededEvent(const ServerAddress& hostAndPort, + const BSONObj reply) override; - void onServerPingFailedEvent(const ServerAddress& hostAndPort, const Status& status) override; + void onServerHeartbeatFailureEvent(Status errorStatus, + const ServerAddress& hostAndPort, + const BSONObj reply) override; /** - * Acquires _mutex before calling _hasPingResponse_inlock(). + * Returns true if _serverIsMasterReplies contains an element corresponding to hostAndPort. */ - bool hasPingResponse(const ServerAddress& hostAndPort); + bool hasIsMasterResponse(const ServerAddress& hostAndPort); + bool _hasIsMasterResponse(WithLock, const ServerAddress& hostAndPort); /** - * Should only be called while holding the _mutex. Returns true if _serverPingRTTs contains an - * element corresponding to hostAndPort. + * Returns the responses for the most recent onServerHeartbeat events. */ - bool _hasPingResponse_inlock(const ServerAddress& hostAndPort); + std::vector<Status> getIsMasterResponse(const ServerAddress& hostAndPort); + + void onServerPingSucceededEvent(IsMasterRTT latency, const ServerAddress& hostAndPort) override; + + void onServerPingFailedEvent(const ServerAddress& hostAndPort, const Status& status) override; + + /** + * Returns true if _serverPingRTTs contains an element corresponding to hostAndPort. + */ + bool hasPingResponse(const ServerAddress& hostAndPort); + bool _hasPingResponse(WithLock, const ServerAddress& hostAndPort); /** - * Returns the response for the most recent onServerPing event. MUST be called after a ping has - * been sent and proccessed in order to remove it from the map and make room for the next. + * Returns the responses for the most recent onServerPing events. */ std::vector<StatusWith<IsMasterRTT>> getPingResponse(const ServerAddress& hostAndPort); private: Mutex _mutex; + stdx::unordered_map<ServerAddress, std::vector<Status>> _serverIsMasterReplies; stdx::unordered_map<ServerAddress, std::vector<StatusWith<IsMasterRTT>>> _serverPingRTTs; }; diff --git a/src/mongo/client/server_is_master_monitor.cpp b/src/mongo/client/server_is_master_monitor.cpp index eab24eddb7e..5f1637b14ba 100644 --- a/src/mongo/client/server_is_master_monitor.cpp +++ b/src/mongo/client/server_is_master_monitor.cpp @@ -544,6 +544,11 @@ void ServerIsMasterMonitor::requestImmediateCheck() { } } +void ServerIsMasterMonitor::disableExpeditedChecking() { + stdx::lock_guard lock(_mutex); + _disableExpeditedChecking(lock); +} + void ServerIsMasterMonitor::_disableExpeditedChecking(WithLock) { for (auto& addressAndMonitor : _singleMonitors) { addressAndMonitor.second->disableExpeditedChecking(); diff --git a/src/mongo/client/server_is_master_monitor.h b/src/mongo/client/server_is_master_monitor.h index e788cc3c95a..aff3795c654 100644 --- a/src/mongo/client/server_is_master_monitor.h +++ b/src/mongo/client/server_is_master_monitor.h @@ -139,6 +139,8 @@ public: TopologyDescriptionPtr previousDescription, TopologyDescriptionPtr newDescription) override; + void disableExpeditedChecking(); + private: /** * If the provided executor exists, use that one (for testing). Otherwise create a new one. diff --git a/src/mongo/client/server_is_master_monitor_test.cpp b/src/mongo/client/server_is_master_monitor_test.cpp new file mode 100644 index 00000000000..a0eb42744ce --- /dev/null +++ b/src/mongo/client/server_is_master_monitor_test.cpp @@ -0,0 +1,561 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + +#include "mongo/platform/basic.h" + +#include <memory> + +#include <boost/optional/optional_io.hpp> + +#include "mongo/client/replica_set_monitor.h" +#include "mongo/client/replica_set_monitor_protocol_test_util.h" +#include "mongo/client/sdam/sdam.h" +#include "mongo/client/sdam/topology_description.h" +#include "mongo/client/sdam/topology_listener_mock.h" +#include "mongo/client/server_is_master_monitor.h" +#include "mongo/dbtests/mock/mock_replica_set.h" +#include "mongo/executor/network_interface_mock.h" +#include "mongo/executor/thread_pool_mock.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/logv2/log.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/duration.h" + +namespace mongo { +namespace { + +using executor::NetworkInterfaceMock; +using executor::RemoteCommandResponse; +using executor::ThreadPoolExecutorTest; +using InNetworkGuard = NetworkInterfaceMock::InNetworkGuard; + +class ServerIsMasterMonitorTestFixture : public unittest::Test { +protected: + /** + * Sets up the task executor as well as a TopologyListenerMock for each unit test. + */ + void setUp() override { + auto serviceContext = ServiceContext::make(); + setGlobalServiceContext(std::move(serviceContext)); + ReplicaSetMonitorProtocolTestUtil::setRSMProtocol(ReplicaSetMonitorProtocol::kSdam); + ReplicaSetMonitor::cleanup(); + + auto network = std::make_unique<executor::NetworkInterfaceMock>(); + _net = network.get(); + _executor = makeSharedThreadPoolTestExecutor(std::move(network)); + _executor->startup(); + _startDate = _net->now(); + _eventsPublisher = std::make_shared<sdam::TopologyEventsPublisher>(_executor); + _topologyListener.reset(new sdam::TopologyListenerMock()); + _eventsPublisher->registerListener(_topologyListener); + } + + void tearDown() override { + _eventsPublisher.reset(); + _topologyListener.reset(); + _executor->shutdown(); + _executor->join(); + _executor.reset(); + ReplicaSetMonitor::cleanup(); + ReplicaSetMonitorProtocolTestUtil::resetRSMProtocol(); + } + + sdam::TopologyListenerMock* getTopologyListener() { + return _topologyListener.get(); + } + + std::shared_ptr<sdam::TopologyEventsPublisher> getEventsPublisher() { + return _eventsPublisher; + } + + executor::NetworkInterfaceMock* getNet() { + return _net; + } + + std::shared_ptr<executor::ThreadPoolTaskExecutor> getExecutor() { + return _executor; + } + + Date_t getStartDate() { + return _startDate; + } + + Milliseconds getTimeoutMS() { + return _timeoutMS; + } + + Milliseconds getHeartbeatFrequency() { + return _heartbeatFrequency; + } + + bool hasReadyRequests() { + NetworkInterfaceMock::InNetworkGuard ing(_net); + return _net->hasReadyRequests(); + } + + Milliseconds elapsed() { + return _net->now() - _startDate; + } + + /** + * Sets up a SingleServerIsMasterMonitor that starts sending isMasters to the server. + */ + std::shared_ptr<SingleServerIsMasterMonitor> initSingleServerIsMasterMonitor( + const sdam::ServerAddress& hostAndPort, MockReplicaSet* replSet) { + auto ssIsMasterMonitor = std::make_shared<SingleServerIsMasterMonitor>(replSet->getURI(), + hostAndPort, + boost::none, + _heartbeatFrequency, + _eventsPublisher, + _executor); + ssIsMasterMonitor->init(); + + // Ensure that the clock has not advanced since setUp() and _startDate is representative + // of when the first isMaster request was sent. + ASSERT_EQ(getStartDate(), getNet()->now()); + return ssIsMasterMonitor; + } + + std::shared_ptr<ServerIsMasterMonitor> initServerIsMasterMonitor( + const MongoURI& setUri, + const sdam::SdamConfiguration& sdamConfiguration, + const sdam::TopologyDescriptionPtr topologyDescription) { + auto serverIsMasterMonitor = std::make_shared<ServerIsMasterMonitor>( + setUri, sdamConfiguration, _eventsPublisher, topologyDescription, _executor); + + // Ensure that the clock has not advanced since setUp() and _startDate is representative + // of when the first isMaster request was sent. + ASSERT_EQ(getStartDate(), getNet()->now()); + return serverIsMasterMonitor; + } + + /** + * Checks that an isMaster request has been sent to some server and schedules a response. If + * assertHostCheck is true, asserts that the isMaster was sent to the server at hostAndPort. + */ + void processIsMasterRequest(MockReplicaSet* replSet, + boost::optional<sdam::ServerAddress> hostAndPort = boost::none) { + ASSERT(hasReadyRequests()); + InNetworkGuard guard(_net); + _net->runReadyNetworkOperations(); + auto noi = _net->getNextReadyRequest(); + auto request = noi->getRequest(); + + executor::TaskExecutorTest::assertRemoteCommandNameEquals("isMaster", request); + auto requestHost = request.target.toString(); + if (hostAndPort) { + ASSERT_EQ(request.target.toString(), hostAndPort); + } + + LOGV2(457991, + "got mock network operation", + "elapsed"_attr = elapsed(), + "request"_attr = request.toString()); + + const auto node = replSet->getNode(requestHost); + if (node->isRunning()) { + const auto opmsg = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj); + const auto reply = node->runCommand(request.id, opmsg)->getCommandReply(); + _net->scheduleSuccessfulResponse(noi, RemoteCommandResponse(reply, Milliseconds(0))); + } else { + _net->scheduleErrorResponse(noi, Status(ErrorCodes::HostUnreachable, "")); + } + } + + template <typename Duration> + void advanceTime(Duration d) { + InNetworkGuard guard(_net); + // Operations can happen inline with advanceTime(), so log before and after the call. + LOGV2_DEBUG(457992, + 1, + "Advancing time", + "from_time_elapsed"_attr = elapsed(), + "to_time_elapsed"_attr = (elapsed() + d)); + _net->advanceTime(_net->now() + d); + LOGV2_DEBUG(457993, 1, "Advanced time", "time_elapsed"_attr = elapsed()); + } + + /** + * Checks that exactly one successful isMaster occurs within a time interval of + * heartbeatFrequency. + */ + void checkSingleIsMaster(Milliseconds heartbeatFrequency, + const sdam::ServerAddress& hostAndPort, + MockReplicaSet* replSet) { + auto deadline = elapsed() + heartbeatFrequency; + processIsMasterRequest(replSet, hostAndPort); + + while (elapsed() < deadline && !_topologyListener->hasIsMasterResponse(hostAndPort)) { + advanceTime(Milliseconds(1)); + } + validateIsMasterResponse(hostAndPort, deadline); + checkNoActivityBefore(deadline, hostAndPort); + } + + void validateIsMasterResponse(const sdam::ServerAddress& hostAndPort, Milliseconds deadline) { + ASSERT_TRUE(_topologyListener->hasIsMasterResponse(hostAndPort)); + ASSERT_LT(elapsed(), deadline); + auto isMasterResponse = _topologyListener->getIsMasterResponse(hostAndPort); + + // There should only be one isMaster response queued up. + ASSERT_EQ(isMasterResponse.size(), 1); + ASSERT(isMasterResponse[0].isOK()); + } + + /** + * Confirms no more isMaster requests are sent between elapsed() and deadline. Confirms no more + * isMaster responses are received between elapsed() and deadline when hostAndPort is specified. + */ + void checkNoActivityBefore(Milliseconds deadline, + boost::optional<sdam::ServerAddress> hostAndPort = boost::none) { + while (elapsed() < deadline) { + if (hasReadyRequests()) { + { + InNetworkGuard guard(_net); + _net->runReadyNetworkOperations(); + auto noi = _net->getNextReadyRequest(); + auto request = noi->getRequest(); + LOGV2_DEBUG(4579931, + 0, + "mynameisrae about to fail because was activity", + "request"_attr = request.toString(), + "elapsed"_attr = elapsed()); + } + ASSERT_FALSE(true); + } + ASSERT_FALSE(hasReadyRequests()); + if (hostAndPort) { + ASSERT_FALSE(_topologyListener->hasIsMasterResponse(hostAndPort.get())); + } + advanceTime(Milliseconds(1)); + } + } + + /** + * Waits up to timeoutMS for the next isMaster request to go out. + * Causes the test to fail if timeoutMS time passes and no request is ready. + * + * NOTE: The time between each isMaster request is the heartbeatFrequency compounded by response + * time. + */ + void waitForNextIsMaster(Milliseconds timeoutMS) { + auto deadline = elapsed() + timeoutMS; + while (!hasReadyRequests() && elapsed() < deadline) { + advanceTime(Milliseconds(1)); + } + + ASSERT_LT(elapsed(), deadline); + } + +private: + Date_t _startDate; + std::shared_ptr<sdam::TopologyEventsPublisher> _eventsPublisher; + std::shared_ptr<sdam::TopologyListenerMock> _topologyListener; + std::shared_ptr<executor::ThreadPoolTaskExecutor> _executor; + executor::NetworkInterfaceMock* _net; + + Milliseconds _heartbeatFrequency = sdam::SdamConfiguration::kDefaultHeartbeatFrequencyMs; + + // The ServerIsMasterMonitor hard codes this to be it's _timeoutMS. + Milliseconds _timeoutMS = SdamConfiguration::kDefaultConnectTimeoutMS; +}; + +/** + * Checks that a SingleServerIsMasterMonitor sends isMaster requests at least heartbeatFrequency + * apart. + */ +TEST_F(ServerIsMasterMonitorTestFixture, heartbeatFrequencyCheck) { + auto replSet = std::make_unique<MockReplicaSet>( + "test", 1, /* hasPrimary = */ false, /* dollarPrefixHosts = */ false); + auto hostAndPort = HostAndPort(replSet->getSecondaries()[0]).toString(); + + auto ssIsMasterMonitor = initSingleServerIsMasterMonitor(hostAndPort, replSet.get()); + ssIsMasterMonitor->disableExpeditedChecking(); + + // An isMaster command fails if it takes as long or longer than timeoutMS. + auto timeoutMS = getTimeoutMS(); + auto heartbeatFrequency = getHeartbeatFrequency(); + + checkSingleIsMaster(heartbeatFrequency, hostAndPort, replSet.get()); + waitForNextIsMaster(timeoutMS); + + checkSingleIsMaster(heartbeatFrequency, hostAndPort, replSet.get()); + waitForNextIsMaster(timeoutMS); + + checkSingleIsMaster(heartbeatFrequency, hostAndPort, replSet.get()); + waitForNextIsMaster(timeoutMS); + + checkSingleIsMaster(heartbeatFrequency, hostAndPort, replSet.get()); + waitForNextIsMaster(timeoutMS); +} + +/** + * Confirms that a SingleServerIsMasterMonitor reports to the TopologyListener when an isMaster + * command generates an error. + */ +TEST_F(ServerIsMasterMonitorTestFixture, singleServerIsMasterMonitorReportsFailure) { + auto replSet = std::make_unique<MockReplicaSet>( + "test", 1, /* hasPrimary = */ false, /* dollarPrefixHosts = */ false); + + // Kill the server before starting up the SingleServerIsMasterMonitor. + auto hostAndPort = HostAndPort(replSet->getSecondaries()[0]).toString(); + { + NetworkInterfaceMock::InNetworkGuard ing(getNet()); + replSet->kill(hostAndPort); + } + + auto ssIsMasterMonitor = initSingleServerIsMasterMonitor(hostAndPort, replSet.get()); + ssIsMasterMonitor->disableExpeditedChecking(); + + processIsMasterRequest(replSet.get(), hostAndPort); + auto topologyListener = getTopologyListener(); + auto timeoutMS = getTimeoutMS(); + while (elapsed() < timeoutMS && !topologyListener->hasIsMasterResponse(hostAndPort)) { + // Advance time in small increments to ensure we stop before another isMaster is sent. + advanceTime(Milliseconds(1)); + } + ASSERT_TRUE(topologyListener->hasIsMasterResponse(hostAndPort)); + auto response = topologyListener->getIsMasterResponse(hostAndPort); + ASSERT_EQ(response.size(), 1); + ASSERT_EQ(response[0], ErrorCodes::HostUnreachable); +} + +TEST_F(ServerIsMasterMonitorTestFixture, serverIsMasterMonitorOnTopologyDescriptionChangeAddHost) { + auto replSet = std::make_unique<MockReplicaSet>( + "test", 2, /* hasPrimary = */ false, /* dollarPrefixHosts = */ false); + + auto hostAndPortList = replSet->getHosts(); + auto host0 = hostAndPortList[0].toString(); + std::vector<sdam::ServerAddress> host0Vec{host0}; + + // Start up the ServerIsMasterMonitor to monitor host0 only. + auto sdamConfig0 = sdam::SdamConfiguration(host0Vec); + auto topologyDescription0 = std::make_shared<sdam::TopologyDescription>(sdamConfig0); + auto uri = replSet->getURI(); + auto isMasterMonitor = initServerIsMasterMonitor(uri, sdamConfig0, topologyDescription0); + isMasterMonitor->disableExpeditedChecking(); + + auto host1Delay = Milliseconds(100); + checkSingleIsMaster(host1Delay, host0, replSet.get()); + ASSERT_FALSE(hasReadyRequests()); + + // Start monitoring host1. + auto host1 = hostAndPortList[1].toString(); + std::vector<sdam::ServerAddress> allHostsVec{host0, host1}; + auto sdamConfigAllHosts = sdam::SdamConfiguration(allHostsVec); + auto topologyDescriptionAllHosts = + std::make_shared<sdam::TopologyDescription>(sdamConfigAllHosts); + isMasterMonitor->onTopologyDescriptionChangedEvent( + UUID::gen(), topologyDescription0, topologyDescriptionAllHosts); + // Ensure expedited checking is disabled for the SingleServerIsMasterMonitor corresponding to + // host1 as well. + isMasterMonitor->disableExpeditedChecking(); + + // Confirm host0 and host1 are monitored. + auto heartbeatFrequency = getHeartbeatFrequency(); + checkSingleIsMaster(heartbeatFrequency - host1Delay, host1, replSet.get()); + waitForNextIsMaster(getTimeoutMS()); + checkSingleIsMaster(host1Delay, host0, replSet.get()); +} + +TEST_F(ServerIsMasterMonitorTestFixture, + serverIsMasterMonitorOnTopologyDescriptionChangeRemoveHost) { + auto replSet = std::make_unique<MockReplicaSet>( + "test", 2, /* hasPrimary = */ false, /* dollarPrefixHosts = */ false); + + auto hostAndPortList = replSet->getHosts(); + auto host0 = hostAndPortList[0].toString(); + auto host1 = hostAndPortList[1].toString(); + std::vector<sdam::ServerAddress> allHostsVec{host0, host1}; + + // Start up the ServerIsMasterMonitor to monitor both hosts. + auto sdamConfigAllHosts = sdam::SdamConfiguration(allHostsVec); + auto topologyDescriptionAllHosts = + std::make_shared<sdam::TopologyDescription>(sdamConfigAllHosts); + auto uri = replSet->getURI(); + auto isMasterMonitor = + initServerIsMasterMonitor(uri, sdamConfigAllHosts, topologyDescriptionAllHosts); + isMasterMonitor->disableExpeditedChecking(); + + // Confirm that both hosts are monitored. + auto heartbeatFrequency = getHeartbeatFrequency(); + while (hasReadyRequests()) { + processIsMasterRequest(replSet.get()); + } + auto deadline = elapsed() + heartbeatFrequency; + auto topologyListener = getTopologyListener(); + auto hasResponses = [&]() { + return topologyListener->hasIsMasterResponse(host0) && + topologyListener->hasIsMasterResponse(host1); + }; + while (elapsed() < heartbeatFrequency && !hasResponses()) { + advanceTime(Milliseconds(1)); + } + validateIsMasterResponse(host0, deadline); + validateIsMasterResponse(host1, deadline); + + // Remove host1 from the TopologyDescription to stop monitoring it. + std::vector<sdam::ServerAddress> host0Vec{host0}; + auto sdamConfig0 = sdam::SdamConfiguration(host0Vec); + auto topologyDescription0 = std::make_shared<sdam::TopologyDescription>(sdamConfig0); + isMasterMonitor->onTopologyDescriptionChangedEvent( + UUID::gen(), topologyDescriptionAllHosts, topologyDescription0); + + checkNoActivityBefore(deadline); + waitForNextIsMaster(getTimeoutMS()); + + checkSingleIsMaster(heartbeatFrequency, host0, replSet.get()); + waitForNextIsMaster(getTimeoutMS()); + + // Confirm the next isMaster request is sent to host0 and not host1. + checkSingleIsMaster(heartbeatFrequency, host0, replSet.get()); +} + +TEST_F(ServerIsMasterMonitorTestFixture, serverIsMasterMonitorShutdownStopsIsMasterRequests) { + auto replSet = std::make_unique<MockReplicaSet>( + "test", 1, /* hasPrimary = */ false, /* dollarPrefixHosts = */ false); + + std::vector<sdam::ServerAddress> hostVec{replSet->getHosts()[0].toString()}; + auto sdamConfig = sdam::SdamConfiguration(hostVec); + auto topologyDescription = std::make_shared<sdam::TopologyDescription>(sdamConfig); + auto uri = replSet->getURI(); + auto isMasterMonitor = initServerIsMasterMonitor(uri, sdamConfig, topologyDescription); + isMasterMonitor->disableExpeditedChecking(); + + auto heartbeatFrequency = getHeartbeatFrequency(); + checkSingleIsMaster(heartbeatFrequency - Milliseconds(200), hostVec[0], replSet.get()); + + isMasterMonitor->shutdown(); + + // After the ServerIsMasterMonitor shuts down, the TopologyListener may have responses until + // heartbeatFrequency has passed, but none of them should indicate Status::OK. + auto deadline = elapsed() + heartbeatFrequency; + auto topologyListener = getTopologyListener(); + + // Drain any requests already scheduled. + while (elapsed() < deadline) { + while (hasReadyRequests()) { + processIsMasterRequest(replSet.get(), hostVec[0]); + } + if (topologyListener->hasIsMasterResponse(hostVec[0])) { + auto isMasterResponses = topologyListener->getIsMasterResponse(hostVec[0]); + for (auto& response : isMasterResponses) { + ASSERT_FALSE(response.isOK()); + } + } + advanceTime(Milliseconds(1)); + } + + ASSERT_FALSE(topologyListener->hasIsMasterResponse(hostVec[0])); +} + +/** + * Tests that the ServerIsMasterMonitor waits until SdamConfiguration::kMinHeartbeatFrequencyMS has + * passed since the last isMaster was received if requestImmediateCheck() is called before enough + * time has passed. + */ +TEST_F(ServerIsMasterMonitorTestFixture, + serverIsMasterMonitorRequestImmediateCheckWaitMinHeartbeat) { + auto replSet = std::make_unique<MockReplicaSet>( + "test", 1, /* hasPrimary = */ false, /* dollarPrefixHosts = */ false); + + std::vector<sdam::ServerAddress> hostVec{replSet->getHosts()[0].toString()}; + + // Start up the ServerIsMasterMonitor to monitor host0 only. + auto sdamConfig0 = sdam::SdamConfiguration(hostVec); + auto topologyDescription0 = std::make_shared<sdam::TopologyDescription>(sdamConfig0); + auto uri = replSet->getURI(); + auto isMasterMonitor = initServerIsMasterMonitor(uri, sdamConfig0, topologyDescription0); + + // Ensure the server is not in expedited mode *before* requestImmediateCheck(). + isMasterMonitor->disableExpeditedChecking(); + + // Check that there is only one isMaster request at time t=0 up until + // timeAdvanceFromFirstIsMaster. + auto minHeartbeatFrequency = SdamConfiguration::kMinHeartbeatFrequencyMS; + auto timeAdvanceFromFirstIsMaster = Milliseconds(10); + ASSERT_LT(timeAdvanceFromFirstIsMaster, minHeartbeatFrequency); + checkSingleIsMaster(timeAdvanceFromFirstIsMaster, hostVec[0], replSet.get()); + + // It's been less than SdamConfiguration::kMinHeartbeatFrequencyMS since the last isMaster was + // received. The next isMaster should be sent SdamConfiguration::kMinHeartbeatFrequencyMS since + // the last isMaster was recieved rather than immediately. + auto timeRequestImmediateSent = elapsed(); + isMasterMonitor->requestImmediateCheck(); + waitForNextIsMaster(minHeartbeatFrequency); + + auto timeIsMasterSent = elapsed(); + ASSERT_LT(timeRequestImmediateSent, timeIsMasterSent); + ASSERT_LT(timeIsMasterSent, timeRequestImmediateSent + minHeartbeatFrequency); + checkSingleIsMaster(minHeartbeatFrequency, hostVec[0], replSet.get()); + + // Confirm expedited requests continue since there is no primary. + waitForNextIsMaster(getTimeoutMS()); + checkSingleIsMaster(minHeartbeatFrequency, hostVec[0], replSet.get()); +} + +/** + * Tests that if more than SdamConfiguration::kMinHeartbeatFrequencyMS has passed since the last + * isMaster response was received, the ServerIsMasterMonitor sends an isMaster immediately after + * requestImmediateCheck() is called. + */ +TEST_F(ServerIsMasterMonitorTestFixture, serverIsMasterMonitorRequestImmediateCheckNoWait) { + auto replSet = std::make_unique<MockReplicaSet>( + "test", 1, /* hasPrimary = */ false, /* dollarPrefixHosts = */ false); + + std::vector<sdam::ServerAddress> hostVec{replSet->getHosts()[0].toString()}; + + // Start up the ServerIsMasterMonitor to monitor host0 only. + auto sdamConfig0 = sdam::SdamConfiguration(hostVec); + auto topologyDescription0 = std::make_shared<sdam::TopologyDescription>(sdamConfig0); + auto uri = replSet->getURI(); + auto isMasterMonitor = initServerIsMasterMonitor(uri, sdamConfig0, topologyDescription0); + + // Ensure the server is not in expedited mode *before* requestImmediateCheck(). + isMasterMonitor->disableExpeditedChecking(); + + // No less than SdamConfiguration::kMinHeartbeatFrequencyMS must pass before + // requestImmediateCheck() is called in order to ensure the server reschedules for an immediate + // check. + auto minHeartbeatFrequency = SdamConfiguration::kMinHeartbeatFrequencyMS; + checkSingleIsMaster(minHeartbeatFrequency + Milliseconds(10), hostVec[0], replSet.get()); + + isMasterMonitor->requestImmediateCheck(); + checkSingleIsMaster(minHeartbeatFrequency, hostVec[0], replSet.get()); + + // Confirm expedited requests continue since there is no primary. + waitForNextIsMaster(getTimeoutMS()); + checkSingleIsMaster(minHeartbeatFrequency, hostVec[0], replSet.get()); +} + +} // namespace +} // namespace mongo |