diff options
Diffstat (limited to 'src/mongo/db/repl')
14 files changed, 992 insertions, 403 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 1621bb65651..bf1821ce75d 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -732,23 +732,13 @@ env.Library('topology_coordinator', 'topology_coordinator.cpp', ], LIBDEPS=[ - 'repl_coordinator_interface', - 'replica_set_messages', - 'rslog', - '$BUILD_DIR/mongo/rpc/metadata', - ]) - -env.Library('topology_coordinator_impl', - [ - 'topology_coordinator_impl.cpp', - ], - LIBDEPS=[ '$BUILD_DIR/mongo/db/audit', + '$BUILD_DIR/mongo/rpc/metadata', + '$BUILD_DIR/mongo/util/fail_point', 'replica_set_messages', 'repl_settings', 'rslog', - 'topology_coordinator', - '$BUILD_DIR/mongo/util/fail_point', + 'repl_coordinator_interface', ]) env.CppUnitTest('repl_set_heartbeat_response_test', @@ -756,12 +746,12 @@ env.CppUnitTest('repl_set_heartbeat_response_test', LIBDEPS=['replica_set_messages']) env.CppUnitTest( - target='topology_coordinator_impl_test', + target='topology_coordinator_test', source=[ - 'topology_coordinator_impl_test.cpp', + 'topology_coordinator_test.cpp', ], LIBDEPS=[ - 'topology_coordinator_impl', + 'topology_coordinator', 'replica_set_messages', '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/db/service_context_noop_init', @@ -769,12 +759,12 @@ env.CppUnitTest( ) env.CppUnitTest( - target='topology_coordinator_impl_v1_test', + target='topology_coordinator_v1_test', source=[ - 'topology_coordinator_impl_v1_test.cpp', + 'topology_coordinator_v1_test.cpp', ], LIBDEPS=[ - 'topology_coordinator_impl', + 'topology_coordinator', 'replica_set_messages', '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/db/service_context_noop_init', @@ -828,7 +818,7 @@ env.Library( 'repl_coordinator_impl', 'replmocks', 'service_context_repl_mock_init', - 'topology_coordinator_impl', + 'topology_coordinator', '$BUILD_DIR/mongo/db/auth/authorization_manager_global', '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/executor/network_interface_mock', diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect.cpp index 727d652630a..ac6e07ce3ee 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect.cpp @@ -34,7 +34,7 @@ #include "mongo/db/repl/elect_cmd_runner.h" #include "mongo/db/repl/freshness_checker.h" #include "mongo/db/repl/replication_coordinator_impl.h" -#include "mongo/db/repl/topology_coordinator_impl.h" +#include "mongo/db/repl/topology_coordinator.h" #include "mongo/stdx/mutex.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp index e449665a483..0025f70464b 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp @@ -39,7 +39,7 @@ #include "mongo/db/repl/replication_coordinator_external_state_mock.h" #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_coordinator_test_fixture.h" -#include "mongo/db/repl/topology_coordinator_impl.h" +#include "mongo/db/repl/topology_coordinator.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/unittest/unittest.h" #include "mongo/util/fail_point_service.h" diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp index 75b419765b8..dac78fdd56e 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp @@ -34,7 +34,7 @@ #include "mongo/base/disallow_copying.h" #include "mongo/db/repl/replication_coordinator_impl.h" -#include "mongo/db/repl/topology_coordinator_impl.h" +#include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/vote_requester.h" #include "mongo/platform/unordered_set.h" #include "mongo/stdx/mutex.h" diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp index 6aff08b85dc..0fd15751fec 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp @@ -39,7 +39,7 @@ #include "mongo/db/repl/replication_coordinator_external_state_mock.h" #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_coordinator_test_fixture.h" -#include "mongo/db/repl/topology_coordinator_impl.h" +#include "mongo/db/repl/topology_coordinator.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/unittest/unittest.h" #include "mongo/util/fail_point_service.h" diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp index c83657a154e..55140bb9a23 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp @@ -39,7 +39,7 @@ #include "mongo/db/repl/replication_coordinator_external_state_mock.h" #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_coordinator_test_fixture.h" -#include "mongo/db/repl/topology_coordinator_impl.h" +#include "mongo/db/repl/topology_coordinator.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/unittest/unittest.h" #include "mongo/util/log.h" diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index 6e77d46c084..2637deed13a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -39,7 +39,7 @@ #include "mongo/db/repl/replication_coordinator_external_state_mock.h" #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_coordinator_test_fixture.h" -#include "mongo/db/repl/topology_coordinator_impl.h" +#include "mongo/db/repl/topology_coordinator.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/unittest/unittest.h" diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 24975726a36..a7201c03d32 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -55,7 +55,7 @@ #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/replication_coordinator_test_fixture.h" #include "mongo/db/repl/storage_interface_mock.h" -#include "mongo/db/repl/topology_coordinator_impl.h" +#include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/update_position_args.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context_noop.h" diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index 6d1555e2ed0..c2278a72e9a 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -44,7 +44,7 @@ #include "mongo/db/repl/replication_process.h" #include "mongo/db/repl/replication_recovery_mock.h" #include "mongo/db/repl/storage_interface_mock.h" -#include "mongo/db/repl/topology_coordinator_impl.h" +#include "mongo/db/repl/topology_coordinator.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/thread_pool_mock.h" #include "mongo/executor/thread_pool_task_executor.h" @@ -138,8 +138,8 @@ void ReplCoordTest::init() { auto logicalClock = stdx::make_unique<LogicalClock>(service); LogicalClock::set(service, std::move(logicalClock)); - TopologyCoordinatorImpl::Options settings; - auto topo = stdx::make_unique<TopologyCoordinatorImpl>(settings); + TopologyCoordinator::Options settings; + auto topo = stdx::make_unique<TopologyCoordinator>(settings); _topo = topo.get(); auto net = stdx::make_unique<NetworkInterfaceMock>(); _net = net.get(); diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h index 92eeea79f63..15b0027b32f 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.h +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h @@ -48,7 +48,7 @@ class ReplSetConfig; class ReplicationCoordinatorExternalStateMock; class ReplicationCoordinatorImpl; class StorageInterfaceMock; -class TopologyCoordinatorImpl; +class TopologyCoordinator; using executor::NetworkInterfaceMock; @@ -120,7 +120,7 @@ protected: /** * Gets the topology coordinator used by the replication coordinator under test. */ - TopologyCoordinatorImpl& getTopoCoord() { + TopologyCoordinator& getTopoCoord() { return *_topo; } @@ -288,7 +288,7 @@ protected: private: std::unique_ptr<ReplicationCoordinatorImpl> _repl; // Owned by ReplicationCoordinatorImpl - TopologyCoordinatorImpl* _topo = nullptr; + TopologyCoordinator* _topo = nullptr; // Owned by executor executor::NetworkInterfaceMock* _net = nullptr; // Owned by ReplicationCoordinatorImpl diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index 84f4ccbedfe..ffca19a4949 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -30,9 +30,10 @@ #include "mongo/platform/basic.h" -#include "mongo/db/repl/topology_coordinator_impl.h" +#include "mongo/db/repl/topology_coordinator.h" #include <limits> +#include <string> #include "mongo/db/audit.h" #include "mongo/db/client.h" @@ -50,6 +51,7 @@ #include "mongo/db/server_parameters.h" #include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" +#include "mongo/util/assert_util.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/hex.h" #include "mongo/util/log.h" @@ -59,19 +61,61 @@ namespace mongo { namespace repl { using std::vector; -const Seconds TopologyCoordinatorImpl::VoteLease::leaseTime = Seconds(30); +const Seconds TopologyCoordinator::VoteLease::leaseTime = Seconds(30); // Controls how caught up in replication a secondary with higher priority than the current primary // must be before it will call for a priority takeover election. MONGO_EXPORT_STARTUP_SERVER_PARAMETER(priorityTakeoverFreshnessWindowSeconds, int, 2); -// If this fail point is enabled, TopologyCoordinatorImpl::shouldChangeSyncSource() will ignore -// the option TopologyCoordinatorImpl::Options::maxSyncSourceLagSecs. The sync source will not be +// If this fail point is enabled, TopologyCoordinator::shouldChangeSyncSource() will ignore +// the option TopologyCoordinator::Options::maxSyncSourceLagSecs. The sync source will not be // re-evaluated if it lags behind another node by more than 'maxSyncSourceLagSecs' seconds. MONGO_FP_DECLARE(disableMaxSyncSourceLagSecs); +constexpr Milliseconds TopologyCoordinator::PingStats::UninitializedPing; + namespace { +constexpr int kLeaderValue = 0; +constexpr int kFollowerValue = 1; +constexpr int kCandidateValue = 2; +} // namespace + +const TopologyCoordinator::Role TopologyCoordinator::Role::leader(kLeaderValue); +const TopologyCoordinator::Role TopologyCoordinator::Role::follower(kFollowerValue); +const TopologyCoordinator::Role TopologyCoordinator::Role::candidate(kCandidateValue); + +TopologyCoordinator::Role::Role(int value) : _value(value) {} + +std::string TopologyCoordinator::Role::toString() const { + switch (_value) { + case kLeaderValue: + return "leader"; + case kFollowerValue: + return "follower"; + case kCandidateValue: + return "candidate"; + } + invariant(false); +} + +TopologyCoordinator::~TopologyCoordinator() {} +std::ostream& operator<<(std::ostream& os, TopologyCoordinator::Role role) { + return os << role.toString(); +} + +std::ostream& operator<<(std::ostream& os, + TopologyCoordinator::PrepareFreezeResponseResult result) { + switch (result) { + case TopologyCoordinator::PrepareFreezeResponseResult::kNoAction: + return os << "no action"; + case TopologyCoordinator::PrepareFreezeResponseResult::kElectSelf: + return os << "elect self"; + } + MONGO_UNREACHABLE; +} + +namespace { template <typename T> int indexOfIterator(const std::vector<T>& vec, typename std::vector<T>::const_iterator& it) { return static_cast<int>(it - vec.begin()); @@ -114,23 +158,23 @@ void appendOpTime(BSONObjBuilder* bob, } } // namespace -void PingStats::start(Date_t now) { +void TopologyCoordinator::PingStats::start(Date_t now) { _lastHeartbeatStartDate = now; _numFailuresSinceLastStart = 0; } -void PingStats::hit(Milliseconds millis) { +void TopologyCoordinator::PingStats::hit(Milliseconds millis) { _numFailuresSinceLastStart = std::numeric_limits<int>::max(); ++count; value = value == UninitializedPing ? millis : Milliseconds((value * 4 + millis) / 5); } -void PingStats::miss() { +void TopologyCoordinator::PingStats::miss() { ++_numFailuresSinceLastStart; } -TopologyCoordinatorImpl::TopologyCoordinatorImpl(Options options) +TopologyCoordinator::TopologyCoordinator(Options options) : _role(Role::follower), _term(OpTime::kUninitializedTerm), _currentPrimaryIndex(-1), @@ -145,22 +189,22 @@ TopologyCoordinatorImpl::TopologyCoordinatorImpl(Options options) _memberData.back().setIsSelf(true); } -TopologyCoordinator::Role TopologyCoordinatorImpl::getRole() const { +TopologyCoordinator::Role TopologyCoordinator::getRole() const { return _role; } -void TopologyCoordinatorImpl::setForceSyncSourceIndex(int index) { +void TopologyCoordinator::setForceSyncSourceIndex(int index) { invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers()); _forceSyncSourceIndex = index; } -HostAndPort TopologyCoordinatorImpl::getSyncSourceAddress() const { +HostAndPort TopologyCoordinator::getSyncSourceAddress() const { return _syncSource; } -HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now, - const OpTime& lastOpTimeFetched, - ChainingPreference chainingPreference) { +HostAndPort TopologyCoordinator::chooseNewSyncSource(Date_t now, + const OpTime& lastOpTimeFetched, + ChainingPreference chainingPreference) { // If we are not a member of the current replica set configuration, no sync source is valid. if (_selfIndex == -1) { LOG(1) << "Cannot sync from any members because we are not in the replica set config"; @@ -361,8 +405,7 @@ HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now, return _syncSource; } -bool TopologyCoordinatorImpl::_memberIsBlacklisted(const MemberConfig& memberConfig, - Date_t now) const { +bool TopologyCoordinator::_memberIsBlacklisted(const MemberConfig& memberConfig, Date_t now) const { std::map<HostAndPort, Date_t>::const_iterator blacklisted = _syncSourceBlacklist.find(memberConfig.getHostAndPort()); if (blacklisted != _syncSourceBlacklist.end()) { @@ -373,12 +416,12 @@ bool TopologyCoordinatorImpl::_memberIsBlacklisted(const MemberConfig& memberCon return false; } -void TopologyCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) { +void TopologyCoordinator::blacklistSyncSource(const HostAndPort& host, Date_t until) { LOG(2) << "blacklisting " << host << " until " << until.toString(); _syncSourceBlacklist[host] = until; } -void TopologyCoordinatorImpl::unblacklistSyncSource(const HostAndPort& host, Date_t now) { +void TopologyCoordinator::unblacklistSyncSource(const HostAndPort& host, Date_t now) { std::map<HostAndPort, Date_t>::iterator hostItr = _syncSourceBlacklist.find(host); if (hostItr != _syncSourceBlacklist.end() && now >= hostItr->second) { LOG(2) << "unblacklisting " << host; @@ -386,13 +429,13 @@ void TopologyCoordinatorImpl::unblacklistSyncSource(const HostAndPort& host, Dat } } -void TopologyCoordinatorImpl::clearSyncSourceBlacklist() { +void TopologyCoordinator::clearSyncSourceBlacklist() { _syncSourceBlacklist.clear(); } -void TopologyCoordinatorImpl::prepareSyncFromResponse(const HostAndPort& target, - BSONObjBuilder* response, - Status* result) { +void TopologyCoordinator::prepareSyncFromResponse(const HostAndPort& target, + BSONObjBuilder* response, + Status* result) { response->append("syncFromRequested", target.toString()); if (_selfIndex == -1) { @@ -483,11 +526,10 @@ void TopologyCoordinatorImpl::prepareSyncFromResponse(const HostAndPort& target, *result = Status::OK(); } -void TopologyCoordinatorImpl::prepareFreshResponse( - const ReplicationCoordinator::ReplSetFreshArgs& args, - const Date_t now, - BSONObjBuilder* response, - Status* result) { +void TopologyCoordinator::prepareFreshResponse(const ReplicationCoordinator::ReplSetFreshArgs& args, + const Date_t now, + BSONObjBuilder* response, + Status* result) { if (_rsConfig.getProtocolVersion() != 0) { *result = Status(ErrorCodes::BadValue, str::stream() << "replset: incompatible replset protocol version: " @@ -543,10 +585,9 @@ void TopologyCoordinatorImpl::prepareFreshResponse( *result = Status::OK(); } -bool TopologyCoordinatorImpl::_shouldVetoMember( - const ReplicationCoordinator::ReplSetFreshArgs& args, - const Date_t& now, - std::string* errmsg) const { +bool TopologyCoordinator::_shouldVetoMember(const ReplicationCoordinator::ReplSetFreshArgs& args, + const Date_t& now, + std::string* errmsg) const { if (_rsConfig.getConfigVersion() < args.cfgver) { // We are stale; do not veto. return false; @@ -611,11 +652,10 @@ bool TopologyCoordinatorImpl::_shouldVetoMember( } // produce a reply to a received electCmd -void TopologyCoordinatorImpl::prepareElectResponse( - const ReplicationCoordinator::ReplSetElectArgs& args, - const Date_t now, - BSONObjBuilder* response, - Status* result) { +void TopologyCoordinator::prepareElectResponse(const ReplicationCoordinator::ReplSetElectArgs& args, + const Date_t now, + BSONObjBuilder* response, + Status* result) { if (_rsConfig.getProtocolVersion() != 0) { *result = Status(ErrorCodes::BadValue, str::stream() << "replset: incompatible replset protocol version: " @@ -688,10 +728,10 @@ void TopologyCoordinatorImpl::prepareElectResponse( } // produce a reply to a heartbeat -Status TopologyCoordinatorImpl::prepareHeartbeatResponse(Date_t now, - const ReplSetHeartbeatArgs& args, - const std::string& ourSetName, - ReplSetHeartbeatResponse* response) { +Status TopologyCoordinator::prepareHeartbeatResponse(Date_t now, + const ReplSetHeartbeatArgs& args, + const std::string& ourSetName, + ReplSetHeartbeatResponse* response) { if (args.getProtocolVersion() != 1) { return Status(ErrorCodes::BadValue, str::stream() << "replset: incompatible replset protocol version: " @@ -784,10 +824,10 @@ Status TopologyCoordinatorImpl::prepareHeartbeatResponse(Date_t now, return Status::OK(); } -Status TopologyCoordinatorImpl::prepareHeartbeatResponseV1(Date_t now, - const ReplSetHeartbeatArgsV1& args, - const std::string& ourSetName, - ReplSetHeartbeatResponse* response) { +Status TopologyCoordinator::prepareHeartbeatResponseV1(Date_t now, + const ReplSetHeartbeatArgsV1& args, + const std::string& ourSetName, + ReplSetHeartbeatResponse* response) { // Verify that replica set names match const std::string rshb = args.getSetName(); if (ourSetName != rshb) { @@ -864,7 +904,7 @@ Status TopologyCoordinatorImpl::prepareHeartbeatResponseV1(Date_t now, return Status::OK(); } -int TopologyCoordinatorImpl::_getMemberIndex(int id) const { +int TopologyCoordinator::_getMemberIndex(int id) const { int index = 0; for (ReplSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd(); ++it, ++index) { @@ -875,7 +915,7 @@ int TopologyCoordinatorImpl::_getMemberIndex(int id) const { return -1; } -std::pair<ReplSetHeartbeatArgs, Milliseconds> TopologyCoordinatorImpl::prepareHeartbeatRequest( +std::pair<ReplSetHeartbeatArgs, Milliseconds> TopologyCoordinator::prepareHeartbeatRequest( Date_t now, const std::string& ourSetName, const HostAndPort& target) { PingStats& hbStats = _pings[target]; Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate(); @@ -914,7 +954,7 @@ std::pair<ReplSetHeartbeatArgs, Milliseconds> TopologyCoordinatorImpl::prepareHe return std::make_pair(hbArgs, timeout); } -std::pair<ReplSetHeartbeatArgsV1, Milliseconds> TopologyCoordinatorImpl::prepareHeartbeatRequestV1( +std::pair<ReplSetHeartbeatArgsV1, Milliseconds> TopologyCoordinator::prepareHeartbeatRequestV1( Date_t now, const std::string& ourSetName, const HostAndPort& target) { PingStats& hbStats = _pings[target]; Milliseconds alreadyElapsed(now.asInt64() - hbStats.getLastHeartbeatStartDate().asInt64()); @@ -954,7 +994,7 @@ std::pair<ReplSetHeartbeatArgsV1, Milliseconds> TopologyCoordinatorImpl::prepare return std::make_pair(hbArgs, timeout); } -HeartbeatResponseAction TopologyCoordinatorImpl::processHeartbeatResponse( +HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse( Date_t now, Milliseconds networkRoundTripTime, const HostAndPort& target, @@ -1092,9 +1132,9 @@ HeartbeatResponseAction TopologyCoordinatorImpl::processHeartbeatResponse( return nextAction; } -bool TopologyCoordinatorImpl::haveNumNodesReachedOpTime(const OpTime& targetOpTime, - int numNodes, - bool durablyWritten) { +bool TopologyCoordinator::haveNumNodesReachedOpTime(const OpTime& targetOpTime, + int numNodes, + bool durablyWritten) { // Replication progress that is for some reason ahead of us should not allow us to // satisfy a write concern if we aren't caught up ourselves. OpTime myOpTime = durablyWritten ? getMyLastDurableOpTime() : getMyLastAppliedOpTime(); @@ -1116,9 +1156,9 @@ bool TopologyCoordinatorImpl::haveNumNodesReachedOpTime(const OpTime& targetOpTi return false; } -bool TopologyCoordinatorImpl::haveTaggedNodesReachedOpTime(const OpTime& opTime, - const ReplSetTagPattern& tagPattern, - bool durablyWritten) { +bool TopologyCoordinator::haveTaggedNodesReachedOpTime(const OpTime& opTime, + const ReplSetTagPattern& tagPattern, + bool durablyWritten) { ReplSetTagMatch matcher(tagPattern); for (auto&& memberData : _memberData) { const OpTime& memberOpTime = @@ -1141,7 +1181,7 @@ bool TopologyCoordinatorImpl::haveTaggedNodesReachedOpTime(const OpTime& opTime, return false; } -HeartbeatResponseAction TopologyCoordinatorImpl::checkMemberTimeouts(Date_t now) { +HeartbeatResponseAction TopologyCoordinator::checkMemberTimeouts(Date_t now) { bool stepdown = false; for (int memberIndex = 0; memberIndex < static_cast<int>(_memberData.size()); memberIndex++) { auto& memberData = _memberData[memberIndex]; @@ -1160,9 +1200,9 @@ HeartbeatResponseAction TopologyCoordinatorImpl::checkMemberTimeouts(Date_t now) return HeartbeatResponseAction::makeNoAction(); } -std::vector<HostAndPort> TopologyCoordinatorImpl::getHostsWrittenTo(const OpTime& op, - bool durablyWritten, - bool skipSelf) { +std::vector<HostAndPort> TopologyCoordinator::getHostsWrittenTo(const OpTime& op, + bool durablyWritten, + bool skipSelf) { std::vector<HostAndPort> hosts; for (const auto& memberData : _memberData) { if (skipSelf && memberData.isSelf()) { @@ -1182,7 +1222,7 @@ std::vector<HostAndPort> TopologyCoordinatorImpl::getHostsWrittenTo(const OpTime return hosts; } -bool TopologyCoordinatorImpl::setMemberAsDown(Date_t now, const int memberIndex) { +bool TopologyCoordinator::setMemberAsDown(Date_t now, const int memberIndex) { invariant(memberIndex != _selfIndex); invariant(memberIndex != -1); invariant(_currentPrimaryIndex == _selfIndex); @@ -1196,7 +1236,7 @@ bool TopologyCoordinatorImpl::setMemberAsDown(Date_t now, const int memberIndex) return false; } -std::pair<int, Date_t> TopologyCoordinatorImpl::getStalestLiveMember() const { +std::pair<int, Date_t> TopologyCoordinator::getStalestLiveMember() const { Date_t earliestDate = Date_t::max(); int earliestMemberId = -1; for (const auto& memberData : _memberData) { @@ -1217,39 +1257,39 @@ std::pair<int, Date_t> TopologyCoordinatorImpl::getStalestLiveMember() const { return std::make_pair(earliestMemberId, earliestDate); } -void TopologyCoordinatorImpl::resetAllMemberTimeouts(Date_t now) { +void TopologyCoordinator::resetAllMemberTimeouts(Date_t now) { for (auto&& memberData : _memberData) memberData.updateLiveness(now); } -void TopologyCoordinatorImpl::resetMemberTimeouts( - Date_t now, const stdx::unordered_set<HostAndPort>& member_set) { +void TopologyCoordinator::resetMemberTimeouts(Date_t now, + const stdx::unordered_set<HostAndPort>& member_set) { for (auto&& memberData : _memberData) { if (member_set.count(memberData.getHostAndPort())) memberData.updateLiveness(now); } } -OpTime TopologyCoordinatorImpl::getMyLastAppliedOpTime() const { +OpTime TopologyCoordinator::getMyLastAppliedOpTime() const { return _selfMemberData().getLastAppliedOpTime(); } -OpTime TopologyCoordinatorImpl::getMyLastDurableOpTime() const { +OpTime TopologyCoordinator::getMyLastDurableOpTime() const { return _selfMemberData().getLastDurableOpTime(); } -MemberData* TopologyCoordinatorImpl::getMyMemberData() { +MemberData* TopologyCoordinator::getMyMemberData() { return &_memberData[_selfMemberDataIndex()]; } -MemberData* TopologyCoordinatorImpl::findMemberDataByMemberId(const int memberId) { +MemberData* TopologyCoordinator::findMemberDataByMemberId(const int memberId) { const int memberIndex = _getMemberIndex(memberId); if (memberIndex >= 0) return &_memberData[memberIndex]; return nullptr; } -MemberData* TopologyCoordinatorImpl::findMemberDataByRid(const OID rid) { +MemberData* TopologyCoordinator::findMemberDataByRid(const OID rid) { for (auto& memberData : _memberData) { if (memberData.getRid() == rid) return &memberData; @@ -1257,7 +1297,7 @@ MemberData* TopologyCoordinatorImpl::findMemberDataByRid(const OID rid) { return nullptr; } -MemberData* TopologyCoordinatorImpl::addSlaveMemberData(const OID rid) { +MemberData* TopologyCoordinator::addSlaveMemberData(const OID rid) { invariant(!_memberData.empty()); // Must always have our own entry first. invariant(!_rsConfig.isInitialized()); // Used only for master-slave. _memberData.emplace_back(); @@ -1266,7 +1306,7 @@ MemberData* TopologyCoordinatorImpl::addSlaveMemberData(const OID rid) { return result; } -HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBDataV1( +HeartbeatResponseAction TopologyCoordinator::_updatePrimaryFromHBDataV1( int updatedConfigIndex, const MemberState& originalState, Date_t now) { // // Updates the local notion of which remote node, if any is primary. @@ -1339,7 +1379,7 @@ HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBDataV1( return HeartbeatResponseAction::makeNoAction(); } -HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBData( +HeartbeatResponseAction TopologyCoordinator::_updatePrimaryFromHBData( int updatedConfigIndex, const MemberState& originalState, Date_t now) { // This method has two interrelated responsibilities, performed in two phases. // @@ -1519,14 +1559,14 @@ HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBData( const auto status = checkShouldStandForElection(now); if (!status.isOK()) { // NOTE: This log line is checked in unit test(s). - LOG(2) << "TopologyCoordinatorImpl::_updatePrimaryFromHBData - " << status.reason(); + LOG(2) << "TopologyCoordinator::_updatePrimaryFromHBData - " << status.reason(); return HeartbeatResponseAction::makeNoAction(); } fassertStatusOK(28816, becomeCandidateIfElectable(now, StartElectionReason::kElectionTimeout)); return HeartbeatResponseAction::makeElectAction(); } -Status TopologyCoordinatorImpl::checkShouldStandForElection(Date_t now) const { +Status TopologyCoordinator::checkShouldStandForElection(Date_t now) const { if (_currentPrimaryIndex != -1) { return {ErrorCodes::NodeNotElectable, "Not standing for election since there is a Primary"}; } @@ -1570,7 +1610,7 @@ Status TopologyCoordinatorImpl::checkShouldStandForElection(Date_t now) const { return Status::OK(); } -bool TopologyCoordinatorImpl::_aMajoritySeemsToBeUp() const { +bool TopologyCoordinator::_aMajoritySeemsToBeUp() const { int vUp = 0; for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end(); ++it) { @@ -1583,7 +1623,7 @@ bool TopologyCoordinatorImpl::_aMajoritySeemsToBeUp() const { return vUp * 2 > _rsConfig.getTotalVotingMembers(); } -int TopologyCoordinatorImpl::_findHealthyPrimaryOfEqualOrGreaterPriority( +int TopologyCoordinator::_findHealthyPrimaryOfEqualOrGreaterPriority( const int candidateIndex) const { const double candidatePriority = _rsConfig.getMemberAt(candidateIndex).getPriority(); for (auto it = _memberData.begin(); it != _memberData.end(); ++it) { @@ -1600,13 +1640,13 @@ int TopologyCoordinatorImpl::_findHealthyPrimaryOfEqualOrGreaterPriority( return -1; } -bool TopologyCoordinatorImpl::_isOpTimeCloseEnoughToLatestToElect(const OpTime& otherOpTime) const { +bool TopologyCoordinator::_isOpTimeCloseEnoughToLatestToElect(const OpTime& otherOpTime) const { const OpTime latestKnownOpTime = _latestKnownOpTime(); // Use addition instead of subtraction to avoid overflow. return otherOpTime.getSecs() + 10 >= (latestKnownOpTime.getSecs()); } -bool TopologyCoordinatorImpl::_amIFreshEnoughForPriorityTakeover() const { +bool TopologyCoordinator::_amIFreshEnoughForPriorityTakeover() const { const OpTime latestKnownOpTime = _latestKnownOpTime(); // Rules are: @@ -1634,7 +1674,7 @@ bool TopologyCoordinatorImpl::_amIFreshEnoughForPriorityTakeover() const { } } -bool TopologyCoordinatorImpl::_amIFreshEnoughForCatchupTakeover() const { +bool TopologyCoordinator::_amIFreshEnoughForCatchupTakeover() const { const OpTime latestKnownOpTime = _latestKnownOpTime(); @@ -1668,7 +1708,7 @@ bool TopologyCoordinatorImpl::_amIFreshEnoughForCatchupTakeover() const { return ourLastOpApplied.getTerm() < _term; } -bool TopologyCoordinatorImpl::_iAmPrimary() const { +bool TopologyCoordinator::_iAmPrimary() const { if (_role == Role::leader) { invariant(_currentPrimaryIndex == _selfIndex); invariant(_leaderMode != LeaderMode::kNotLeader); @@ -1677,7 +1717,7 @@ bool TopologyCoordinatorImpl::_iAmPrimary() const { return false; } -OpTime TopologyCoordinatorImpl::_latestKnownOpTime() const { +OpTime TopologyCoordinator::_latestKnownOpTime() const { OpTime latest = getMyLastAppliedOpTime(); for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end(); ++it) { @@ -1705,8 +1745,7 @@ OpTime TopologyCoordinatorImpl::_latestKnownOpTime() const { return latest; } -bool TopologyCoordinatorImpl::_isMemberHigherPriority(int memberOneIndex, - int memberTwoIndex) const { +bool TopologyCoordinator::_isMemberHigherPriority(int memberOneIndex, int memberTwoIndex) const { if (memberOneIndex == -1) return false; @@ -1717,7 +1756,7 @@ bool TopologyCoordinatorImpl::_isMemberHigherPriority(int memberOneIndex, _rsConfig.getMemberAt(memberTwoIndex).getPriority(); } -int TopologyCoordinatorImpl::_getHighestPriorityElectableIndex(Date_t now) const { +int TopologyCoordinator::_getHighestPriorityElectableIndex(Date_t now) const { int maxIndex = -1; for (int currentIndex = 0; currentIndex < _rsConfig.getNumMembers(); currentIndex++) { UnelectableReasonMask reason = currentIndex == _selfIndex @@ -1731,7 +1770,7 @@ int TopologyCoordinatorImpl::_getHighestPriorityElectableIndex(Date_t now) const return maxIndex; } -bool TopologyCoordinatorImpl::prepareForUnconditionalStepDown() { +bool TopologyCoordinator::prepareForUnconditionalStepDown() { if (_leaderMode == LeaderMode::kSteppingDown) { // Can only be processing one required stepdown at a time. return false; @@ -1742,7 +1781,7 @@ bool TopologyCoordinatorImpl::prepareForUnconditionalStepDown() { return true; } -Status TopologyCoordinatorImpl::prepareForStepDownAttempt() { +Status TopologyCoordinator::prepareForStepDownAttempt() { if (_leaderMode == LeaderMode::kSteppingDown || _leaderMode == LeaderMode::kAttemptingStepDown) { return Status{ErrorCodes::ConflictingOperationInProgress, @@ -1752,14 +1791,14 @@ Status TopologyCoordinatorImpl::prepareForStepDownAttempt() { return Status::OK(); } -void TopologyCoordinatorImpl::abortAttemptedStepDownIfNeeded() { +void TopologyCoordinator::abortAttemptedStepDownIfNeeded() { if (_leaderMode == TopologyCoordinator::LeaderMode::kAttemptingStepDown) { _setLeaderMode(TopologyCoordinator::LeaderMode::kMaster); } } -void TopologyCoordinatorImpl::changeMemberState_forTest(const MemberState& newMemberState, - const Timestamp& electionTime) { +void TopologyCoordinator::changeMemberState_forTest(const MemberState& newMemberState, + const Timestamp& electionTime) { invariant(_selfIndex != -1); if (newMemberState == getMemberState()) return; @@ -1795,7 +1834,7 @@ void TopologyCoordinatorImpl::changeMemberState_forTest(const MemberState& newMe log() << newMemberState; } -void TopologyCoordinatorImpl::_setCurrentPrimaryForTest(int primaryIndex) { +void TopologyCoordinator::_setCurrentPrimaryForTest(int primaryIndex) { if (primaryIndex == _selfIndex) { changeMemberState_forTest(MemberState::RS_PRIMARY); } else { @@ -1817,16 +1856,16 @@ void TopologyCoordinatorImpl::_setCurrentPrimaryForTest(int primaryIndex) { } } -const MemberConfig* TopologyCoordinatorImpl::_currentPrimaryMember() const { +const MemberConfig* TopologyCoordinator::_currentPrimaryMember() const { if (_currentPrimaryIndex == -1) return NULL; return &(_rsConfig.getMemberAt(_currentPrimaryIndex)); } -void TopologyCoordinatorImpl::prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs, - BSONObjBuilder* response, - Status* result) { +void TopologyCoordinator::prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs, + BSONObjBuilder* response, + Status* result) { // output for each member vector<BSONObj> membersOut; const MemberState myState = getMemberState(); @@ -1933,12 +1972,10 @@ void TopologyCoordinatorImpl::prepareStatusResponse(const ReplSetStatusArgs& rsS bb.appendDate("lastHeartbeat", it->getLastHeartbeat()); bb.appendDate("lastHeartbeatRecv", it->getLastHeartbeatRecv()); Milliseconds ping = _getPing(itConfig.getHostAndPort()); - if (ping != UninitializedPing) { - bb.append("pingMs", durationCount<Milliseconds>(ping)); - std::string s = it->getLastHeartbeatMsg(); - if (!s.empty()) - bb.append("lastHeartbeatMessage", s); - } + bb.append("pingMs", durationCount<Milliseconds>(ping)); + std::string s = it->getLastHeartbeatMsg(); + if (!s.empty()) + bb.append("lastHeartbeatMessage", s); if (it->hasAuthIssue()) { bb.append("authenticated", false); } @@ -1997,7 +2034,7 @@ void TopologyCoordinatorImpl::prepareStatusResponse(const ReplSetStatusArgs& rsS *result = Status::OK(); } -StatusWith<BSONObj> TopologyCoordinatorImpl::prepareReplSetUpdatePositionCommand( +StatusWith<BSONObj> TopologyCoordinator::prepareReplSetUpdatePositionCommand( ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle, OpTime currentCommittedSnapshotOpTime) const { BSONObjBuilder cmdBuilder; @@ -2051,7 +2088,7 @@ StatusWith<BSONObj> TopologyCoordinatorImpl::prepareReplSetUpdatePositionCommand return cmdBuilder.obj(); } -void TopologyCoordinatorImpl::fillMemberData(BSONObjBuilder* result) { +void TopologyCoordinator::fillMemberData(BSONObjBuilder* result) { BSONArrayBuilder replicationProgress(result->subarrayStart("replicationProgress")); { for (const auto& memberData : _memberData) { @@ -2076,7 +2113,7 @@ void TopologyCoordinatorImpl::fillMemberData(BSONObjBuilder* result) { } } -void TopologyCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* response) { +void TopologyCoordinator::fillIsMasterForReplSet(IsMasterResponse* response) { const MemberState myState = getMemberState(); if (!_rsConfig.isInitialized()) { response->markAsNoConfig(); @@ -2146,8 +2183,8 @@ void TopologyCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* response) } } -StatusWith<TopologyCoordinatorImpl::PrepareFreezeResponseResult> -TopologyCoordinatorImpl::prepareFreezeResponse(Date_t now, int secs, BSONObjBuilder* response) { +StatusWith<TopologyCoordinator::PrepareFreezeResponseResult> +TopologyCoordinator::prepareFreezeResponse(Date_t now, int secs, BSONObjBuilder* response) { if (_role != TopologyCoordinator::Role::follower) { std::string msg = str::stream() << "cannot freeze node when primary or running for election. state: " @@ -2180,7 +2217,7 @@ TopologyCoordinatorImpl::prepareFreezeResponse(Date_t now, int secs, BSONObjBuil return PrepareFreezeResponseResult::kNoAction; } -bool TopologyCoordinatorImpl::becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now) { +bool TopologyCoordinator::becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now) { if (_stepDownUntil > now) { return false; } @@ -2195,31 +2232,31 @@ bool TopologyCoordinatorImpl::becomeCandidateIfStepdownPeriodOverAndSingleNodeSe return false; } -void TopologyCoordinatorImpl::setElectionSleepUntil(Date_t newTime) { +void TopologyCoordinator::setElectionSleepUntil(Date_t newTime) { if (_electionSleepUntil < newTime) { _electionSleepUntil = newTime; } } -Timestamp TopologyCoordinatorImpl::getElectionTime() const { +Timestamp TopologyCoordinator::getElectionTime() const { return _electionTime; } -OID TopologyCoordinatorImpl::getElectionId() const { +OID TopologyCoordinator::getElectionId() const { return _electionId; } -int TopologyCoordinatorImpl::getCurrentPrimaryIndex() const { +int TopologyCoordinator::getCurrentPrimaryIndex() const { return _currentPrimaryIndex; } -Date_t TopologyCoordinatorImpl::getStepDownTime() const { +Date_t TopologyCoordinator::getStepDownTime() const { return _stepDownUntil; } -void TopologyCoordinatorImpl::_updateHeartbeatDataForReconfig(const ReplSetConfig& newConfig, - int selfIndex, - Date_t now) { +void TopologyCoordinator::_updateHeartbeatDataForReconfig(const ReplSetConfig& newConfig, + int selfIndex, + Date_t now) { std::vector<MemberData> oldHeartbeats; _memberData.swap(oldHeartbeats); @@ -2265,9 +2302,7 @@ void TopologyCoordinatorImpl::_updateHeartbeatDataForReconfig(const ReplSetConfi // This function installs a new config object and recreates MemberData objects // that reflect the new config. -void TopologyCoordinatorImpl::updateConfig(const ReplSetConfig& newConfig, - int selfIndex, - Date_t now) { +void TopologyCoordinator::updateConfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now) { invariant(_role != Role::candidate); invariant(selfIndex < newConfig.getNumMembers()); @@ -2312,7 +2347,7 @@ void TopologyCoordinatorImpl::updateConfig(const ReplSetConfig& newConfig, _role = Role::candidate; } } -std::string TopologyCoordinatorImpl::_getHbmsg(Date_t now) const { +std::string TopologyCoordinator::_getHbmsg(Date_t now) const { // ignore messages over 2 minutes old if ((now - _hbmsgTime) > Seconds{120}) { return ""; @@ -2320,20 +2355,20 @@ std::string TopologyCoordinatorImpl::_getHbmsg(Date_t now) const { return _hbmsg; } -void TopologyCoordinatorImpl::setMyHeartbeatMessage(const Date_t now, const std::string& message) { +void TopologyCoordinator::setMyHeartbeatMessage(const Date_t now, const std::string& message) { _hbmsgTime = now; _hbmsg = message; } -const MemberConfig& TopologyCoordinatorImpl::_selfConfig() const { +const MemberConfig& TopologyCoordinator::_selfConfig() const { return _rsConfig.getMemberAt(_selfIndex); } -const MemberData& TopologyCoordinatorImpl::_selfMemberData() const { +const MemberData& TopologyCoordinator::_selfMemberData() const { return _memberData[_selfMemberDataIndex()]; } -const int TopologyCoordinatorImpl::_selfMemberDataIndex() const { +const int TopologyCoordinator::_selfMemberDataIndex() const { invariant(!_memberData.empty()); if (_selfIndex >= 0) return _selfIndex; @@ -2342,7 +2377,7 @@ const int TopologyCoordinatorImpl::_selfMemberDataIndex() const { return 0; } -TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getUnelectableReason( +TopologyCoordinator::UnelectableReasonMask TopologyCoordinator::_getUnelectableReason( int index) const { invariant(index != _selfIndex); const MemberConfig& memberConfig = _rsConfig.getMemberAt(index); @@ -2368,7 +2403,7 @@ TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getUnel return result; } -TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getMyUnelectableReason( +TopologyCoordinator::UnelectableReasonMask TopologyCoordinator::_getMyUnelectableReason( const Date_t now, StartElectionReason reason) const { UnelectableReasonMask result = None; const OpTime lastApplied = getMyLastAppliedOpTime(); @@ -2423,8 +2458,7 @@ TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getMyUn return result; } -std::string TopologyCoordinatorImpl::_getUnelectableReasonString( - const UnelectableReasonMask ur) const { +std::string TopologyCoordinator::_getUnelectableReasonString(const UnelectableReasonMask ur) const { invariant(ur); str::stream ss; bool hasWrittenToStream = false; @@ -2521,15 +2555,15 @@ std::string TopologyCoordinatorImpl::_getUnelectableReasonString( return ss; } -Milliseconds TopologyCoordinatorImpl::_getPing(const HostAndPort& host) { +Milliseconds TopologyCoordinator::_getPing(const HostAndPort& host) { return _pings[host].getMillis(); } -void TopologyCoordinatorImpl::_setElectionTime(const Timestamp& newElectionTime) { +void TopologyCoordinator::_setElectionTime(const Timestamp& newElectionTime) { _electionTime = newElectionTime; } -int TopologyCoordinatorImpl::_getTotalPings() { +int TopologyCoordinator::_getTotalPings() { PingMap::iterator it = _pings.begin(); PingMap::iterator end = _pings.end(); int totalPings = 0; @@ -2540,7 +2574,7 @@ int TopologyCoordinatorImpl::_getTotalPings() { return totalPings; } -std::vector<HostAndPort> TopologyCoordinatorImpl::getMaybeUpHostAndPorts() const { +std::vector<HostAndPort> TopologyCoordinator::getMaybeUpHostAndPorts() const { std::vector<HostAndPort> upHosts; for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end(); ++it) { @@ -2557,7 +2591,7 @@ std::vector<HostAndPort> TopologyCoordinatorImpl::getMaybeUpHostAndPorts() const return upHosts; } -bool TopologyCoordinatorImpl::voteForMyself(Date_t now) { +bool TopologyCoordinator::voteForMyself(Date_t now) { if (_role != Role::candidate) { return false; } @@ -2574,12 +2608,12 @@ bool TopologyCoordinatorImpl::voteForMyself(Date_t now) { return true; } -bool TopologyCoordinatorImpl::isSteppingDown() const { +bool TopologyCoordinator::isSteppingDown() const { return _leaderMode == LeaderMode::kAttemptingStepDown || _leaderMode == LeaderMode::kSteppingDown; } -void TopologyCoordinatorImpl::_setLeaderMode(TopologyCoordinator::LeaderMode newMode) { +void TopologyCoordinator::_setLeaderMode(TopologyCoordinator::LeaderMode newMode) { // Invariants for valid state transitions. switch (_leaderMode) { case LeaderMode::kNotLeader: @@ -2607,7 +2641,7 @@ void TopologyCoordinatorImpl::_setLeaderMode(TopologyCoordinator::LeaderMode new _leaderMode = std::move(newMode); } -MemberState TopologyCoordinatorImpl::getMemberState() const { +MemberState TopologyCoordinator::getMemberState() const { if (_selfIndex == -1) { if (_rsConfig.isInitialized()) { return MemberState::RS_REMOVED; @@ -2646,17 +2680,17 @@ MemberState TopologyCoordinatorImpl::getMemberState() const { return _followerMode; } -bool TopologyCoordinatorImpl::canAcceptWrites() const { +bool TopologyCoordinator::canAcceptWrites() const { return _leaderMode == LeaderMode::kMaster; } -void TopologyCoordinatorImpl::setElectionInfo(OID electionId, Timestamp electionOpTime) { +void TopologyCoordinator::setElectionInfo(OID electionId, Timestamp electionOpTime) { invariant(_role == Role::leader); _electionTime = electionOpTime; _electionId = electionId; } -void TopologyCoordinatorImpl::processWinElection(OID electionId, Timestamp electionOpTime) { +void TopologyCoordinator::processWinElection(OID electionId, Timestamp electionOpTime) { invariant(_role == Role::candidate); invariant(_leaderMode == LeaderMode::kNotLeader); _role = Role::leader; @@ -2670,7 +2704,7 @@ void TopologyCoordinatorImpl::processWinElection(OID electionId, Timestamp elect OpTime(Timestamp(std::numeric_limits<int>::max(), 0), std::numeric_limits<int>::max()); } -void TopologyCoordinatorImpl::processLoseElection() { +void TopologyCoordinator::processLoseElection() { invariant(_role == Role::candidate); invariant(_leaderMode == LeaderMode::kNotLeader); const HostAndPort syncSourceAddress = getSyncSourceAddress(); @@ -2685,7 +2719,7 @@ void TopologyCoordinatorImpl::processLoseElection() { } } -bool TopologyCoordinatorImpl::attemptStepDown( +bool TopologyCoordinator::attemptStepDown( long long termAtStart, Date_t now, Date_t waitUntil, Date_t stepDownUntil, bool force) { if (_role != Role::leader || _leaderMode == LeaderMode::kSteppingDown || _term != termAtStart) { @@ -2724,9 +2758,7 @@ bool TopologyCoordinatorImpl::attemptStepDown( return true; } -bool TopologyCoordinatorImpl::_canCompleteStepDownAttempt(Date_t now, - Date_t waitUntil, - bool force) { +bool TopologyCoordinator::_canCompleteStepDownAttempt(Date_t now, Date_t waitUntil, bool force) { const bool forceNow = force && (now >= waitUntil); if (forceNow) { return true; @@ -2735,7 +2767,7 @@ bool TopologyCoordinatorImpl::_canCompleteStepDownAttempt(Date_t now, return isSafeToStepDown(); } -bool TopologyCoordinatorImpl::isSafeToStepDown() { +bool TopologyCoordinator::isSafeToStepDown() { if (!_rsConfig.isInitialized() || _selfIndex < 0) { return false; } @@ -2767,7 +2799,7 @@ bool TopologyCoordinatorImpl::isSafeToStepDown() { return false; } -void TopologyCoordinatorImpl::setFollowerMode(MemberState::MS newMode) { +void TopologyCoordinator::setFollowerMode(MemberState::MS newMode) { invariant(_role == Role::follower); switch (newMode) { case MemberState::RS_RECOVERING: @@ -2793,13 +2825,13 @@ void TopologyCoordinatorImpl::setFollowerMode(MemberState::MS newMode) { } } -bool TopologyCoordinatorImpl::_isElectableNodeInSingleNodeReplicaSet() const { +bool TopologyCoordinator::_isElectableNodeInSingleNodeReplicaSet() const { return _followerMode == MemberState::RS_SECONDARY && _rsConfig.getNumMembers() == 1 && _selfIndex == 0 && _rsConfig.getMemberAt(_selfIndex).isElectable() && _maintenanceModeCalls == 0; } -void TopologyCoordinatorImpl::finishUnconditionalStepDown() { +void TopologyCoordinator::finishUnconditionalStepDown() { invariant(_leaderMode == LeaderMode::kSteppingDown); int remotePrimaryIndex = -1; @@ -2824,7 +2856,7 @@ void TopologyCoordinatorImpl::finishUnconditionalStepDown() { _stepDownSelfAndReplaceWith(remotePrimaryIndex); } -void TopologyCoordinatorImpl::_stepDownSelfAndReplaceWith(int newPrimary) { +void TopologyCoordinator::_stepDownSelfAndReplaceWith(int newPrimary) { invariant(_role == Role::leader); invariant(_selfIndex != -1); invariant(_selfIndex != newPrimary); @@ -2834,7 +2866,7 @@ void TopologyCoordinatorImpl::_stepDownSelfAndReplaceWith(int newPrimary) { _setLeaderMode(LeaderMode::kNotLeader); } -bool TopologyCoordinatorImpl::updateLastCommittedOpTime() { +bool TopologyCoordinator::updateLastCommittedOpTime() { // If we're not primary or we're stepping down due to learning of a new term then we must not // advance the commit point. If we are stepping down due to a user request, however, then it // is safe to advance the commit point, and in fact we must since the stepdown request may be @@ -2870,7 +2902,7 @@ bool TopologyCoordinatorImpl::updateLastCommittedOpTime() { return advanceLastCommittedOpTime(committedOpTime); } -bool TopologyCoordinatorImpl::advanceLastCommittedOpTime(const OpTime& committedOpTime) { +bool TopologyCoordinator::advanceLastCommittedOpTime(const OpTime& committedOpTime) { if (committedOpTime == _lastCommittedOpTime) { return false; // Hasn't changed, so ignore it. } else if (committedOpTime < _lastCommittedOpTime) { @@ -2891,12 +2923,11 @@ bool TopologyCoordinatorImpl::advanceLastCommittedOpTime(const OpTime& committed return true; } -OpTime TopologyCoordinatorImpl::getLastCommittedOpTime() const { +OpTime TopologyCoordinator::getLastCommittedOpTime() const { return _lastCommittedOpTime; } -bool TopologyCoordinatorImpl::canCompleteTransitionToPrimary( - long long termWhenDrainCompleted) const { +bool TopologyCoordinator::canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const { if (termWhenDrainCompleted != _term) { return false; @@ -2910,7 +2941,7 @@ bool TopologyCoordinatorImpl::canCompleteTransitionToPrimary( return true; } -Status TopologyCoordinatorImpl::completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) { +Status TopologyCoordinator::completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) { if (!canCompleteTransitionToPrimary(firstOpTimeOfTerm.getTerm())) { return Status(ErrorCodes::PrimarySteppedDown, "By the time this node was ready to complete its transition to PRIMARY it " @@ -2923,18 +2954,17 @@ Status TopologyCoordinatorImpl::completeTransitionToPrimary(const OpTime& firstO return Status::OK(); } -void TopologyCoordinatorImpl::adjustMaintenanceCountBy(int inc) { +void TopologyCoordinator::adjustMaintenanceCountBy(int inc) { invariant(_role == Role::follower); _maintenanceModeCalls += inc; invariant(_maintenanceModeCalls >= 0); } -int TopologyCoordinatorImpl::getMaintenanceCount() const { +int TopologyCoordinator::getMaintenanceCount() const { return _maintenanceModeCalls; } -TopologyCoordinator::UpdateTermResult TopologyCoordinatorImpl::updateTerm(long long term, - Date_t now) { +TopologyCoordinator::UpdateTermResult TopologyCoordinator::updateTerm(long long term, Date_t now) { if (term <= _term) { return TopologyCoordinator::UpdateTermResult::kAlreadyUpToDate; } @@ -2952,13 +2982,13 @@ TopologyCoordinator::UpdateTermResult TopologyCoordinatorImpl::updateTerm(long l } -long long TopologyCoordinatorImpl::getTerm() { +long long TopologyCoordinator::getTerm() const { return _term; } // TODO(siyuan): Merge _hddata into _slaveInfo, so that we have a single view of the // replset. Passing metadata is unnecessary. -bool TopologyCoordinatorImpl::shouldChangeSyncSource( +bool TopologyCoordinator::shouldChangeSyncSource( const HostAndPort& currentSource, const rpc::ReplSetMetadata& replMetadata, boost::optional<rpc::OplogQueryMetadata> oqMetadata, @@ -3079,7 +3109,7 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource( return false; } -rpc::ReplSetMetadata TopologyCoordinatorImpl::prepareReplSetMetadata( +rpc::ReplSetMetadata TopologyCoordinator::prepareReplSetMetadata( const OpTime& lastVisibleOpTime) const { return rpc::ReplSetMetadata(_term, _lastCommittedOpTime, @@ -3090,7 +3120,7 @@ rpc::ReplSetMetadata TopologyCoordinatorImpl::prepareReplSetMetadata( _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress())); } -rpc::OplogQueryMetadata TopologyCoordinatorImpl::prepareOplogQueryMetadata(int rbid) const { +rpc::OplogQueryMetadata TopologyCoordinator::prepareOplogQueryMetadata(int rbid) const { return rpc::OplogQueryMetadata(_lastCommittedOpTime, getMyLastAppliedOpTime(), rbid, @@ -3098,7 +3128,7 @@ rpc::OplogQueryMetadata TopologyCoordinatorImpl::prepareOplogQueryMetadata(int r _rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress())); } -void TopologyCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) { +void TopologyCoordinator::summarizeAsHtml(ReplSetHtmlSummary* output) { output->setConfig(_rsConfig); output->setHBData(_memberData); output->setSelfIndex(_selfIndex); @@ -3107,8 +3137,8 @@ void TopologyCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) { output->setSelfHeartbeatMessage(_hbmsg); } -void TopologyCoordinatorImpl::processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, - ReplSetRequestVotesResponse* response) { +void TopologyCoordinator::processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, + ReplSetRequestVotesResponse* response) { response->setTerm(_term); if (args.getTerm() < _term) { @@ -3164,21 +3194,21 @@ void TopologyCoordinatorImpl::processReplSetRequestVotes(const ReplSetRequestVot } } -void TopologyCoordinatorImpl::loadLastVote(const LastVote& lastVote) { +void TopologyCoordinator::loadLastVote(const LastVote& lastVote) { _lastVote = lastVote; } -void TopologyCoordinatorImpl::voteForMyselfV1() { +void TopologyCoordinator::voteForMyselfV1() { _lastVote.setTerm(_term); _lastVote.setCandidateIndex(_selfIndex); } -void TopologyCoordinatorImpl::setPrimaryIndex(long long primaryIndex) { +void TopologyCoordinator::setPrimaryIndex(long long primaryIndex) { _currentPrimaryIndex = primaryIndex; } -Status TopologyCoordinatorImpl::becomeCandidateIfElectable(const Date_t now, - StartElectionReason reason) { +Status TopologyCoordinator::becomeCandidateIfElectable(const Date_t now, + StartElectionReason reason) { if (_role == Role::leader) { return {ErrorCodes::NodeNotElectable, "Not standing for election again; already primary"}; } @@ -3200,18 +3230,18 @@ Status TopologyCoordinatorImpl::becomeCandidateIfElectable(const Date_t now, return Status::OK(); } -void TopologyCoordinatorImpl::setStorageEngineSupportsReadCommitted(bool supported) { +void TopologyCoordinator::setStorageEngineSupportsReadCommitted(bool supported) { _storageEngineSupportsReadCommitted = supported ? ReadCommittedSupport::kYes : ReadCommittedSupport::kNo; } -void TopologyCoordinatorImpl::restartHeartbeats() { +void TopologyCoordinator::restartHeartbeats() { for (auto& hb : _memberData) { hb.restart(); } } -boost::optional<OpTime> TopologyCoordinatorImpl::latestKnownOpTimeSinceHeartbeatRestart() const { +boost::optional<OpTime> TopologyCoordinator::latestKnownOpTimeSinceHeartbeatRestart() const { // The smallest OpTime in PV1. OpTime latest(Timestamp(0, 0), 0); for (size_t i = 0; i < _memberData.size(); i++) { diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 046ff2217a6..eca071e590a 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -28,241 +28,735 @@ #pragma once +#include <iosfwd> #include <string> -#include <vector> -#include "mongo/bson/timestamp.h" +#include "mongo/base/disallow_copying.h" #include "mongo/db/repl/last_vote.h" -#include "mongo/db/repl/member_data.h" -#include "mongo/db/repl/member_state.h" -#include "mongo/db/repl/optime.h" -#include "mongo/db/repl/repl_set_config.h" +#include "mongo/db/repl/repl_set_heartbeat_response.h" #include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/server_options.h" +#include "mongo/stdx/functional.h" +#include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" namespace mongo { - -class OperationContext; - -namespace rpc { -class ReplSetMetadata; -} // namespace rpc +class Timestamp; namespace repl { - -static const Milliseconds UninitializedPing{-1}; +class HeartbeatResponseAction; +class MemberData; +class OpTime; +class ReplSetHeartbeatArgs; +class ReplSetConfig; +class TagSubgroup; +struct MemberState; /** - * Represents a latency measurement for each replica set member based on heartbeat requests. - * The measurement is an average weighted 80% to the old value, and 20% to the new value. + * Replication Topology Coordinator * - * Also stores information about heartbeat progress and retries. + * This object is responsible for managing the topology of the cluster. + * Tasks include consensus and leader election, chaining, and configuration management. + * Methods of this class should be non-blocking. */ -class PingStats { +class TopologyCoordinator { + MONGO_DISALLOW_COPYING(TopologyCoordinator); + public: /** - * Records that a new heartbeat request started at "now". + * Type that denotes the role of a node in the replication protocol. * - * This resets the failure count used in determining whether the next request to a target - * should be a retry or a regularly scheduled heartbeat message. + * The role is distinct from MemberState, in that it only deals with the + * roles a node plays in the basic protocol -- leader, follower and candidate. + * The mapping between MemberState and Role is complex -- several MemberStates + * map to the follower role, and MemberState::RS_SECONDARY maps to either + * follower or candidate roles, e.g. */ - void start(Date_t now); + class Role { + public: + /** + * Constant indicating leader role. + */ + static const Role leader; + + /** + * Constant indicating follower role. + */ + static const Role follower; + + /** + * Constant indicating candidate role + */ + static const Role candidate; + + Role() {} + + bool operator==(Role other) const { + return _value == other._value; + } + bool operator!=(Role other) const { + return _value != other._value; + } + + std::string toString() const; + + private: + explicit Role(int value); + + int _value; + }; + + struct Options { + // A sync source is re-evaluated after it lags behind further than this amount. + Seconds maxSyncSourceLagSecs{0}; + + // Whether or not this node is running as a config server. + ClusterRole clusterRole{ClusterRole::None}; + }; /** - * Records that a heartbeat request completed successfully, and that "millis" milliseconds - * were spent for a single network roundtrip plus remote processing time. + * Constructs a Topology Coordinator object. + **/ + TopologyCoordinator(Options options); + + + ~TopologyCoordinator(); + + /** + * Different modes a node can be in while still reporting itself as in state PRIMARY. + * + * Valid transitions: + * + * kNotLeader <---------------------------------- + * | | + * | | + * | | + * v | + * kLeaderElect----- | + * | | | + * | | | + * v | | + * kMaster ------------------------- | + * | ^ | | | + * | | ------------------- | | + * | | | | | | + * v | v v v | + * kAttemptingStepDown----------->kSteppingDown | + * | | | + * | | | + * | | | + * --------------------------------------------- + * */ - void hit(Milliseconds millis); + enum class LeaderMode { + kNotLeader, // This node is not currently a leader. + kLeaderElect, // This node has been elected leader, but can't yet accept writes. + kMaster, // This node reports ismaster:true and can accept writes. + kSteppingDown, // This node is in the middle of a (hb) stepdown that must complete. + kAttemptingStepDown, // This node is in the middle of a stepdown (cmd) that might fail. + }; + + //////////////////////////////////////////////////////////// + // + // State inspection methods. + // + //////////////////////////////////////////////////////////// /** - * Records that a heartbeat request failed. + * Gets the role of this member in the replication protocol. */ - void miss(); + Role getRole() const; /** - * Gets the number of hit() calls. + * Gets the MemberState of this member in the replica set. */ - unsigned int getCount() const { - return count; - } + MemberState getMemberState() const; /** - * Gets the weighted average round trip time for heartbeat messages to the target. - * Returns 0 if there have been no pings recorded yet. + * Returns whether this node should be allowed to accept writes. */ - Milliseconds getMillis() const { - return value == UninitializedPing ? Milliseconds(0) : value; - } + bool canAcceptWrites() const; /** - * Gets the date at which start() was last called, which is used to determine if - * a heartbeat should be retried or if the time limit has expired. + * Returns true if this node is in the process of stepping down. Note that this can be + * due to an unconditional stepdown that must succeed (for instance from learning about a new + * term) or due to a stepdown attempt that could fail (for instance from a stepdown cmd that + * could fail if not enough nodes are caught up). */ - Date_t getLastHeartbeatStartDate() const { - return _lastHeartbeatStartDate; - } + bool isSteppingDown() const; /** - * Gets the number of failures since start() was last called. + * Returns the address of the current sync source, or an empty HostAndPort if there is no + * current sync source. + */ + HostAndPort getSyncSourceAddress() const; + + /** + * Retrieves a vector of HostAndPorts containing all nodes that are neither DOWN nor + * ourself. + */ + std::vector<HostAndPort> getMaybeUpHostAndPorts() const; + + /** + * Gets the earliest time the current node will stand for election. + */ + Date_t getStepDownTime() const; + + /** + * Gets the current value of the maintenance mode counter. + */ + int getMaintenanceCount() const; + + /** + * Gets the latest term this member is aware of. If this member is the primary, + * it's the current term of the replica set. + */ + long long getTerm() const; + + enum class UpdateTermResult { kAlreadyUpToDate, kTriggerStepDown, kUpdatedTerm }; + + //////////////////////////////////////////////////////////// + // + // Basic state manipulation methods. + // + //////////////////////////////////////////////////////////// + + /** + * Sets the latest term this member is aware of to the higher of its current value and + * the value passed in as "term". + * Returns the result of setting the term value, or if a stepdown should be triggered. + */ + UpdateTermResult updateTerm(long long term, Date_t now); + + /** + * Sets the index into the config used when we next choose a sync source + */ + void setForceSyncSourceIndex(int index); + + enum class ChainingPreference { kAllowChaining, kUseConfiguration }; + + /** + * Chooses and sets a new sync source, based on our current knowledge of the world. + */ + HostAndPort chooseNewSyncSource(Date_t now, + const OpTime& lastOpTimeFetched, + ChainingPreference chainingPreference); + + /** + * Suppresses selecting "host" as sync source until "until". + */ + void blacklistSyncSource(const HostAndPort& host, Date_t until); + + /** + * Removes a single entry "host" from the list of potential sync sources which we + * have blacklisted, if it is supposed to be unblacklisted by "now". + */ + void unblacklistSyncSource(const HostAndPort& host, Date_t now); + + /** + * Clears the list of potential sync sources we have blacklisted. + */ + void clearSyncSourceBlacklist(); + + /** + * Determines if a new sync source should be chosen, if a better candidate sync source is + * available. If the current sync source's last optime ("syncSourceLastOpTime" under + * protocolVersion 1, but pulled from the MemberData in protocolVersion 0) is more than + * _maxSyncSourceLagSecs behind any syncable source, this function returns true. If we are + * running in ProtocolVersion 1, our current sync source is not primary, has no sync source + * ("syncSourceHasSyncSource" is false), and only has data up to "myLastOpTime", returns true. * - * This value is incremented by calls to miss(), cleared by calls to start() and - * set to the maximum possible value by calls to hit(). + * "now" is used to skip over currently blacklisted sync sources. + * + * TODO (SERVER-27668): Make OplogQueryMetadata non-optional in mongodb 3.8. */ - int getNumFailuresSinceLastStart() const { - return _numFailuresSinceLastStart; - } + bool shouldChangeSyncSource(const HostAndPort& currentSource, + const rpc::ReplSetMetadata& replMetadata, + boost::optional<rpc::OplogQueryMetadata> oqMetadata, + Date_t now) const; -private: - unsigned int count = 0; - Milliseconds value = UninitializedPing; - Date_t _lastHeartbeatStartDate; - int _numFailuresSinceLastStart = std::numeric_limits<int>::max(); -}; + /** + * Checks whether we are a single node set and we are not in a stepdown period. If so, + * puts us into candidate mode, otherwise does nothing. This is used to ensure that + * nodes in a single node replset become primary again when their stepdown period ends. + */ + bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now); -class TopologyCoordinatorImpl : public TopologyCoordinator { -public: - struct Options { - // A sync source is re-evaluated after it lags behind further than this amount. - Seconds maxSyncSourceLagSecs{0}; + /** + * Sets the earliest time the current node will stand for election to "newTime". + * + * Until this time, while the node may report itself as electable, it will not stand + * for election. + */ + void setElectionSleepUntil(Date_t newTime); - // Whether or not this node is running as a config server. - ClusterRole clusterRole{ClusterRole::None}; - }; + /** + * Sets the reported mode of this node to one of RS_SECONDARY, RS_STARTUP2, RS_ROLLBACK or + * RS_RECOVERING, when getRole() == Role::follower. This is the interface by which the + * applier changes the reported member state of the current node, and enables or suppresses + * electability of the current node. All modes but RS_SECONDARY indicate an unelectable + * follower state (one that cannot transition to candidate). + */ + void setFollowerMode(MemberState::MS newMode); /** - * Constructs a Topology Coordinator object. - **/ - TopologyCoordinatorImpl(Options options); + * Scan the memberData and determine the highest last applied or last + * durable optime present on a majority of servers; set _lastCommittedOpTime to this + * new entry. + * Whether the last applied or last durable op time is used depends on whether + * the config getWriteConcernMajorityShouldJournal is set. + * Returns true if the _lastCommittedOpTime was changed. + */ + bool updateLastCommittedOpTime(); + + /** + * Updates _lastCommittedOpTime to be "committedOpTime" if it is more recent than the + * current last committed OpTime. Returns true if _lastCommittedOpTime is changed. + */ + bool advanceLastCommittedOpTime(const OpTime& committedOpTime); + + /** + * Returns the OpTime of the latest majority-committed op known to this server. + */ + OpTime getLastCommittedOpTime() const; + + /** + * Returns true if it's safe to transition to LeaderMode::kMaster. + */ + bool canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const; + + /** + * Called by the ReplicationCoordinator to signal that we have finished catchup and drain modes + * and are ready to fully become primary and start accepting writes. + * "firstOpTimeOfTerm" is a floor on the OpTimes this node will be allowed to consider committed + * for this tenure as primary. This prevents entries from before our election from counting as + * committed in our view, until our election (the "firstOpTimeOfTerm" op) has been committed. + * Returns PrimarySteppedDown if this node is no longer eligible to begin accepting writes. + */ + Status completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm); + + /** + * Adjusts the maintenance mode count by "inc". + * + * It is an error to call this method if getRole() does not return Role::follower. + * It is an error to allow the maintenance count to go negative. + */ + void adjustMaintenanceCountBy(int inc); //////////////////////////////////////////////////////////// // - // Implementation of TopologyCoordinator interface + // Methods that prepare responses to command requests. // //////////////////////////////////////////////////////////// - virtual Role getRole() const; - virtual MemberState getMemberState() const; - virtual bool canAcceptWrites() const override; - virtual bool isSteppingDown() const override; - virtual HostAndPort getSyncSourceAddress() const; - virtual std::vector<HostAndPort> getMaybeUpHostAndPorts() const; - virtual int getMaintenanceCount() const; - virtual long long getTerm(); - virtual UpdateTermResult updateTerm(long long term, Date_t now); - virtual void setForceSyncSourceIndex(int index); - virtual HostAndPort chooseNewSyncSource(Date_t now, - const OpTime& lastOpTimeFetched, - ChainingPreference chainingPreference) override; - virtual void blacklistSyncSource(const HostAndPort& host, Date_t until); - virtual void unblacklistSyncSource(const HostAndPort& host, Date_t now); - virtual void clearSyncSourceBlacklist(); - virtual bool shouldChangeSyncSource(const HostAndPort& currentSource, - const rpc::ReplSetMetadata& replMetadata, - boost::optional<rpc::OplogQueryMetadata> oqMetadata, - Date_t now) const; - virtual bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now); - virtual void setElectionSleepUntil(Date_t newTime); - virtual void setFollowerMode(MemberState::MS newMode); - virtual bool updateLastCommittedOpTime(); - virtual bool advanceLastCommittedOpTime(const OpTime& committedOpTime); - virtual OpTime getLastCommittedOpTime() const; - virtual bool canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const override; - virtual Status completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) override; - virtual void adjustMaintenanceCountBy(int inc); - virtual void prepareSyncFromResponse(const HostAndPort& target, - BSONObjBuilder* response, - Status* result); - virtual void prepareFreshResponse(const ReplicationCoordinator::ReplSetFreshArgs& args, - Date_t now, - BSONObjBuilder* response, - Status* result); - virtual void prepareElectResponse(const ReplicationCoordinator::ReplSetElectArgs& args, - Date_t now, - BSONObjBuilder* response, - Status* result); - virtual Status prepareHeartbeatResponse(Date_t now, - const ReplSetHeartbeatArgs& args, - const std::string& ourSetName, - ReplSetHeartbeatResponse* response); - virtual Status prepareHeartbeatResponseV1(Date_t now, - const ReplSetHeartbeatArgsV1& args, - const std::string& ourSetName, - ReplSetHeartbeatResponse* response); - virtual void prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs, - BSONObjBuilder* response, - Status* result); - virtual StatusWith<BSONObj> prepareReplSetUpdatePositionCommand( + // produces a reply to a replSetSyncFrom command + void prepareSyncFromResponse(const HostAndPort& target, + BSONObjBuilder* response, + Status* result); + + // produce a reply to a replSetFresh command + void prepareFreshResponse(const ReplicationCoordinator::ReplSetFreshArgs& args, + Date_t now, + BSONObjBuilder* response, + Status* result); + + // produce a reply to a received electCmd + void prepareElectResponse(const ReplicationCoordinator::ReplSetElectArgs& args, + Date_t now, + BSONObjBuilder* response, + Status* result); + + // produce a reply to a heartbeat + Status prepareHeartbeatResponse(Date_t now, + const ReplSetHeartbeatArgs& args, + const std::string& ourSetName, + ReplSetHeartbeatResponse* response); + + // produce a reply to a V1 heartbeat + Status prepareHeartbeatResponseV1(Date_t now, + const ReplSetHeartbeatArgsV1& args, + const std::string& ourSetName, + ReplSetHeartbeatResponse* response); + + struct ReplSetStatusArgs { + Date_t now; + unsigned selfUptime; + const OpTime& readConcernMajorityOpTime; + const BSONObj& initialSyncStatus; + }; + + // produce a reply to a status request + void prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs, + BSONObjBuilder* response, + Status* result); + + // Produce a replSetUpdatePosition command to be sent to the node's sync source. + StatusWith<BSONObj> prepareReplSetUpdatePositionCommand( ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle, OpTime currentCommittedSnapshotOpTime) const; - virtual void fillIsMasterForReplSet(IsMasterResponse* response); - virtual void fillMemberData(BSONObjBuilder* result); - virtual StatusWith<PrepareFreezeResponseResult> prepareFreezeResponse(Date_t now, - int secs, - BSONObjBuilder* response); - virtual void updateConfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now); - virtual std::pair<ReplSetHeartbeatArgs, Milliseconds> prepareHeartbeatRequest( + // produce a reply to an ismaster request. It is only valid to call this if we are a + // replset. + void fillIsMasterForReplSet(IsMasterResponse* response); + + // Produce member data for the serverStatus command and diagnostic logging. + void fillMemberData(BSONObjBuilder* result); + + enum class PrepareFreezeResponseResult { kNoAction, kElectSelf }; + + /** + * Produce a reply to a freeze request. Returns a PostMemberStateUpdateAction on success that + * may trigger state changes in the caller. + */ + StatusWith<PrepareFreezeResponseResult> prepareFreezeResponse(Date_t now, + int secs, + BSONObjBuilder* response); + + //////////////////////////////////////////////////////////// + // + // Methods for sending and receiving heartbeats, + // reconfiguring and handling the results of standing for + // election. + // + //////////////////////////////////////////////////////////// + + /** + * Updates the topology coordinator's notion of the replica set configuration. + * + * "newConfig" is the new configuration, and "selfIndex" is the index of this + * node's configuration information in "newConfig", or "selfIndex" is -1 to + * indicate that this node is not a member of "newConfig". + * + * newConfig.isInitialized() should be true, though implementations may accept + * configurations where this is not true, for testing purposes. + */ + void updateConfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now); + + /** + * Prepares a heartbeat request appropriate for sending to "target", assuming the + * current time is "now". "ourSetName" is used as the name for our replica set if + * the topology coordinator does not have a valid configuration installed. + * + * The returned pair contains proper arguments for a replSetHeartbeat command, and + * an amount of time to wait for the response. + * + * This call should be paired (with intervening network communication) with a call to + * processHeartbeatResponse for the same "target". + */ + std::pair<ReplSetHeartbeatArgs, Milliseconds> prepareHeartbeatRequest( Date_t now, const std::string& ourSetName, const HostAndPort& target); - virtual std::pair<ReplSetHeartbeatArgsV1, Milliseconds> prepareHeartbeatRequestV1( + std::pair<ReplSetHeartbeatArgsV1, Milliseconds> prepareHeartbeatRequestV1( Date_t now, const std::string& ourSetName, const HostAndPort& target); - virtual HeartbeatResponseAction processHeartbeatResponse( + + /** + * Processes a heartbeat response from "target" that arrived around "now", having + * spent "networkRoundTripTime" millis on the network. + * + * Updates internal topology coordinator state, and returns instructions about what action + * to take next. + * + * If the next action indicates StartElection, the topology coordinator has transitioned to + * the "candidate" role, and will remain there until processWinElection or + * processLoseElection are called. + * + * If the next action indicates "StepDownSelf", the topology coordinator has transitioned + * to the "follower" role from "leader", and the caller should take any necessary actions + * to become a follower. + * + * If the next action indicates "StepDownRemotePrimary", the caller should take steps to + * cause the specified remote host to step down from primary to secondary. + * + * If the next action indicates "Reconfig", the caller should verify the configuration in + * hbResponse is acceptable, perform any other reconfiguration actions it must, and call + * updateConfig with the new configuration and the appropriate value for "selfIndex". It + * must also wrap up any outstanding elections (by calling processLoseElection or + * processWinElection) before calling updateConfig. + * + * This call should be paired (with intervening network communication) with a call to + * prepareHeartbeatRequest for the same "target". + */ + HeartbeatResponseAction processHeartbeatResponse( Date_t now, Milliseconds networkRoundTripTime, const HostAndPort& target, const StatusWith<ReplSetHeartbeatResponse>& hbResponse); - virtual bool voteForMyself(Date_t now); - virtual void setElectionInfo(OID electionId, Timestamp electionOpTime); - virtual void processWinElection(OID electionId, Timestamp electionOpTime); - virtual void processLoseElection(); - virtual Status checkShouldStandForElection(Date_t now) const; - virtual void setMyHeartbeatMessage(const Date_t now, const std::string& message); - virtual bool attemptStepDown(long long termAtStart, - Date_t now, - Date_t waitUntil, - Date_t stepDownUntil, - bool force) override; - virtual bool isSafeToStepDown() override; - virtual bool prepareForUnconditionalStepDown() override; - virtual Status prepareForStepDownAttempt() override; - virtual void abortAttemptedStepDownIfNeeded() override; - virtual void finishUnconditionalStepDown() override; - virtual Date_t getStepDownTime() const; - virtual rpc::ReplSetMetadata prepareReplSetMetadata(const OpTime& lastVisibleOpTime) const; - virtual rpc::OplogQueryMetadata prepareOplogQueryMetadata(int rbid) const; - virtual void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, - ReplSetRequestVotesResponse* response); - virtual void summarizeAsHtml(ReplSetHtmlSummary* output); - virtual void loadLastVote(const LastVote& lastVote); - virtual void voteForMyselfV1(); - virtual void setPrimaryIndex(long long primaryIndex); - virtual int getCurrentPrimaryIndex() const; - virtual bool haveNumNodesReachedOpTime(const OpTime& opTime, int numNodes, bool durablyWritten); - virtual bool haveTaggedNodesReachedOpTime(const OpTime& opTime, - const ReplSetTagPattern& tagPattern, - bool durablyWritten); - virtual std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op, - bool durablyWritten, - bool skipSelf); - virtual bool setMemberAsDown(Date_t now, const int memberIndex) override; - virtual std::pair<int, Date_t> getStalestLiveMember() const; - virtual HeartbeatResponseAction checkMemberTimeouts(Date_t now); - virtual void resetAllMemberTimeouts(Date_t now); - virtual void resetMemberTimeouts(Date_t now, - const stdx::unordered_set<HostAndPort>& member_set); - virtual OpTime getMyLastAppliedOpTime() const; - virtual OpTime getMyLastDurableOpTime() const; - virtual MemberData* getMyMemberData(); - virtual MemberData* findMemberDataByMemberId(const int memberId); - virtual MemberData* findMemberDataByRid(const OID rid); - virtual MemberData* addSlaveMemberData(const OID rid); - virtual Status becomeCandidateIfElectable(const Date_t now, StartElectionReason reason); - virtual void setStorageEngineSupportsReadCommitted(bool supported); - - virtual void restartHeartbeats(); - - virtual boost::optional<OpTime> latestKnownOpTimeSinceHeartbeatRestart() const; + + /** + * Returns whether or not at least 'numNodes' have reached the given opTime. + * "durablyWritten" indicates whether the operation has to be durably applied. + */ + bool haveNumNodesReachedOpTime(const OpTime& opTime, int numNodes, bool durablyWritten); + + /** + * Returns whether or not at least one node matching the tagPattern has reached + * the given opTime. + * "durablyWritten" indicates whether the operation has to be durably applied. + */ + bool haveTaggedNodesReachedOpTime(const OpTime& opTime, + const ReplSetTagPattern& tagPattern, + bool durablyWritten); + + /** + * Returns a vector of members that have applied the operation with OpTime 'op'. + * "durablyWritten" indicates whether the operation has to be durably applied. + * "skipSelf" means to exclude this node whether or not the op has been applied. + */ + std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op, + bool durablyWritten, + bool skipSelf); + + /** + * Marks a member as down from our perspective and returns a bool which indicates if we can no + * longer see a majority of the nodes and thus should step down. + */ + bool setMemberAsDown(Date_t now, const int memberIndex); + + /** + * Goes through the memberData and determines which member that is currently live + * has the stalest (earliest) last update time. Returns (-1, Date_t::max()) if there are + * no other members. + */ + std::pair<int, Date_t> getStalestLiveMember() const; + + /** + * Go through the memberData, and mark nodes which haven't been updated + * recently (within an election timeout) as "down". Returns a HeartbeatResponseAction, which + * will be StepDownSelf if we can no longer see a majority of the nodes, otherwise NoAction. + */ + HeartbeatResponseAction checkMemberTimeouts(Date_t now); + + /** + * Set all nodes in memberData to not stale with a lastUpdate of "now". + */ + void resetAllMemberTimeouts(Date_t now); + + /** + * Set all nodes in memberData that are present in member_set + * to not stale with a lastUpdate of "now". + */ + void resetMemberTimeouts(Date_t now, const stdx::unordered_set<HostAndPort>& member_set); + + /* + * Returns the last optime that this node has applied, whether or not it has been journaled. + */ + OpTime getMyLastAppliedOpTime() const; + + /* + * Returns the last optime that this node has applied and journaled. + */ + OpTime getMyLastDurableOpTime() const; + + /* + * Returns information we have on the state of this node. + */ + MemberData* getMyMemberData(); + + /* + * Returns information we have on the state of the node identified by memberId. Returns + * nullptr if memberId is not found in the configuration. + */ + MemberData* findMemberDataByMemberId(const int memberId); + + /* + * Returns information we have on the state of the node identified by rid. Returns + * nullptr if rid is not found in the heartbeat data. This method is used only for + * master/slave replication. + */ + MemberData* findMemberDataByRid(const OID rid); + + /* + * Adds and returns a memberData entry for the given RID. + * Used only in master/slave mode. + */ + MemberData* addSlaveMemberData(const OID rid); + + /** + * If getRole() == Role::candidate and this node has not voted too recently, updates the + * lastVote tracker and returns true. Otherwise, returns false. + */ + bool voteForMyself(Date_t now); + + /** + * Sets lastVote to be for ourself in this term. + */ + void voteForMyselfV1(); + + /** + * Sets election id and election optime. + */ + void setElectionInfo(OID electionId, Timestamp electionOpTime); + + /** + * Performs state updates associated with winning an election. + * + * It is an error to call this if the topology coordinator is not in candidate mode. + * + * Exactly one of either processWinElection or processLoseElection must be called if + * processHeartbeatResponse returns StartElection, to exit candidate mode. + */ + void processWinElection(OID electionId, Timestamp electionOpTime); + + /** + * Performs state updates associated with losing an election. + * + * It is an error to call this if the topology coordinator is not in candidate mode. + * + * Exactly one of either processWinElection or processLoseElection must be called if + * processHeartbeatResponse returns StartElection, to exit candidate mode. + */ + void processLoseElection(); + + /** + * Readies the TopologyCoordinator for an attempt to stepdown that may fail. This is used + * when we receive a stepdown command (which can fail if not enough secondaries are caught up) + * to ensure that we never process more than one stepdown request at a time. + * Returns OK if it is safe to continue with the stepdown attempt, or returns + * ConflictingOperationInProgess if this node is already processing a stepdown request of any + * kind. + */ + Status prepareForStepDownAttempt(); + + /** + * If this node is still attempting to process a stepdown attempt, aborts the attempt and + * returns this node to normal primary/master state. If this node has already completed + * stepping down or is now in the process of handling an unconditional stepdown, then this + * method does nothing. + */ + void abortAttemptedStepDownIfNeeded(); + + /** + * Tries to transition the coordinator from the leader role to the follower role. + * + * A step down succeeds based on the following conditions: + * + * C1. 'force' is true and now > waitUntil + * + * C2. A majority set of nodes, M, in the replica set have optimes greater than or + * equal to the last applied optime of the primary. + * + * C3. There exists at least one electable secondary node in the majority set M. + * + * + * If C1 is true, or if both C2 and C3 are true, then the stepdown occurs and this method + * returns true. If the conditions for successful stepdown aren't met yet, but waiting for more + * time to pass could make it succeed, returns false. If the whole stepdown attempt should be + * abandoned (for example because the time limit expired or because we've already stepped down), + * throws an exception. + * TODO(spencer): Unify with the finishUnconditionalStepDown() method. + */ + bool attemptStepDown( + long long termAtStart, Date_t now, Date_t waitUntil, Date_t stepDownUntil, bool force); + + /** + * Returns whether it is safe for a stepdown attempt to complete, ignoring the 'force' argument. + * This is essentially checking conditions C2 and C3 as described in the comment to + * attemptStepDown(). + */ + bool isSafeToStepDown(); + + /** + * Readies the TopologyCoordinator for stepdown. Returns false if we're already in the process + * of an unconditional step down. If we are in the middle of a stepdown command attempt when + * this is called then this unconditional stepdown will supersede the stepdown attempt, which + * will cause the stepdown to fail. When this returns true it must be followed by a call to + * finishUnconditionalStepDown() that is called when holding the global X lock. + */ + bool prepareForUnconditionalStepDown(); + + /** + * Sometimes a request to step down comes in (like via a heartbeat), but we don't have the + * global exclusive lock so we can't actually stepdown at that moment. When that happens + * we record that a stepdown request is pending (by calling prepareForUnconditionalStepDown()) + * and schedule work to stepdown in the global X lock. This method is called after holding the + * global lock to perform the actual stepdown. + * TODO(spencer): Unify with the finishAttemptedStepDown() method. + */ + void finishUnconditionalStepDown(); + + /** + * Considers whether or not this node should stand for election, and returns true + * if the node has transitioned to candidate role as a result of the call. + */ + Status checkShouldStandForElection(Date_t now) const; + + /** + * Set the outgoing heartbeat message from self + */ + void setMyHeartbeatMessage(const Date_t now, const std::string& s); + + /** + * Prepares a ReplSetMetadata object describing the current term, primary, and lastOp + * information. + */ + rpc::ReplSetMetadata prepareReplSetMetadata(const OpTime& lastVisibleOpTime) const; + + /** + * Prepares an OplogQueryMetadata object describing the current sync source, rbid, primary, + * lastOpApplied, and lastOpCommitted. + */ + rpc::OplogQueryMetadata prepareOplogQueryMetadata(int rbid) const; + + /** + * Writes into 'output' all the information needed to generate a summary of the current + * replication state for use by the web interface. + */ + void summarizeAsHtml(ReplSetHtmlSummary* output); + + /** + * Prepares a ReplSetRequestVotesResponse. + */ + void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args, + ReplSetRequestVotesResponse* response); + + /** + * Loads an initial LastVote document, which was read from local storage. + * + * Called only during replication startup. All other updates are done internally. + */ + void loadLastVote(const LastVote& lastVote); + + /** + * Updates the current primary index. + */ + void setPrimaryIndex(long long primaryIndex); + + /** + * Returns the current primary index. + */ + int getCurrentPrimaryIndex() const; + + enum StartElectionReason { + kElectionTimeout, + kPriorityTakeover, + kStepUpRequest, + kCatchupTakeover + }; + + /** + * Transitions to the candidate role if the node is electable. + */ + Status becomeCandidateIfElectable(const Date_t now, StartElectionReason reason); + + /** + * Updates the storage engine read committed support in the TopologyCoordinator options after + * creation. + */ + void setStorageEngineSupportsReadCommitted(bool supported); + + /** + * Reset the booleans to record the last heartbeat restart. + */ + void restartHeartbeats(); + + /** + * Scans through all members that are 'up' and return the latest known optime, if we have + * received (successful or failed) heartbeats from all nodes since heartbeat restart. + * + * Returns boost::none if any node hasn't responded to a heartbeat since we last restarted + * heartbeats. + * Returns OpTime(Timestamp(0, 0), 0), the smallest OpTime in PV1, if other nodes are all down. + */ + boost::optional<OpTime> latestKnownOpTimeSinceHeartbeatRestart() const; //////////////////////////////////////////////////////////// // @@ -289,6 +783,9 @@ public: OID getElectionId() const; private: + typedef int UnelectableReasonMask; + class PingStats; + enum UnelectableReason { None = 0, CannotSeeMajority = 1 << 0, @@ -304,8 +801,6 @@ private: NotCloseEnoughToLatestForPriorityTakeover = 1 << 10, NotFreshEnoughForCatchupTakeover = 1 << 11, }; - typedef int UnelectableReasonMask; - // Set what type of PRIMARY this node currently is. void _setLeaderMode(LeaderMode mode); @@ -525,5 +1020,81 @@ private: ReadCommittedSupport _storageEngineSupportsReadCommitted{ReadCommittedSupport::kUnknown}; }; +/** + * Represents a latency measurement for each replica set member based on heartbeat requests. + * The measurement is an average weighted 80% to the old value, and 20% to the new value. + * + * Also stores information about heartbeat progress and retries. + */ +class TopologyCoordinator::PingStats { +public: + /** + * Records that a new heartbeat request started at "now". + * + * This resets the failure count used in determining whether the next request to a target + * should be a retry or a regularly scheduled heartbeat message. + */ + void start(Date_t now); + + /** + * Records that a heartbeat request completed successfully, and that "millis" milliseconds + * were spent for a single network roundtrip plus remote processing time. + */ + void hit(Milliseconds millis); + + /** + * Records that a heartbeat request failed. + */ + void miss(); + + /** + * Gets the number of hit() calls. + */ + unsigned int getCount() const { + return count; + } + + /** + * Gets the weighted average round trip time for heartbeat messages to the target. + * Returns 0 if there have been no pings recorded yet. + */ + Milliseconds getMillis() const { + return value == UninitializedPing ? Milliseconds(0) : value; + } + + /** + * Gets the date at which start() was last called, which is used to determine if + * a heartbeat should be retried or if the time limit has expired. + */ + Date_t getLastHeartbeatStartDate() const { + return _lastHeartbeatStartDate; + } + + /** + * Gets the number of failures since start() was last called. + * + * This value is incremented by calls to miss(), cleared by calls to start() and + * set to the maximum possible value by calls to hit(). + */ + int getNumFailuresSinceLastStart() const { + return _numFailuresSinceLastStart; + } + +private: + static constexpr Milliseconds UninitializedPing{-1}; + + unsigned int count = 0; + Milliseconds value = UninitializedPing; + Date_t _lastHeartbeatStartDate; + int _numFailuresSinceLastStart = std::numeric_limits<int>::max(); +}; + +// +// Convenience method for unittest code. Please use accessors otherwise. +// + +std::ostream& operator<<(std::ostream& os, TopologyCoordinator::Role role); +std::ostream& operator<<(std::ostream& os, TopologyCoordinator::PrepareFreezeResponseResult result); + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/topology_coordinator_test.cpp b/src/mongo/db/repl/topology_coordinator_test.cpp index a22aec9f72f..034aaabda89 100644 --- a/src/mongo/db/repl/topology_coordinator_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_test.cpp @@ -37,7 +37,6 @@ #include "mongo/db/repl/repl_set_heartbeat_response.h" #include "mongo/db/repl/repl_set_request_votes_args.h" #include "mongo/db/repl/topology_coordinator.h" -#include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/db/server_options.h" #include "mongo/executor/task_executor.h" #include "mongo/logger/logger.h" @@ -73,9 +72,9 @@ bool stringContains(const std::string& haystack, const std::string& needle) { class TopoCoordTest : public mongo::unittest::Test { public: virtual void setUp() { - _options = TopologyCoordinatorImpl::Options{}; + _options = TopologyCoordinator::Options{}; _options.maxSyncSourceLagSecs = Seconds{100}; - _topo.reset(new TopologyCoordinatorImpl(_options)); + _topo.reset(new TopologyCoordinator(_options)); _now = Date_t(); _selfIndex = -1; _cbData.reset(new executor::TaskExecutor::CallbackArgs( @@ -88,7 +87,7 @@ public: } protected: - TopologyCoordinatorImpl& getTopoCoord() { + TopologyCoordinator& getTopoCoord() { return *_topo; } executor::TaskExecutor::CallbackArgs cbData() { @@ -98,9 +97,9 @@ protected: return _now; } - void setOptions(const TopologyCoordinatorImpl::Options& options) { + void setOptions(const TopologyCoordinator::Options& options) { _options = options; - _topo.reset(new TopologyCoordinatorImpl(_options)); + _topo.reset(new TopologyCoordinator(_options)); } int64_t countLogLinesContaining(const std::string& needle) { @@ -239,12 +238,12 @@ private: } private: - unique_ptr<TopologyCoordinatorImpl> _topo; + unique_ptr<TopologyCoordinator> _topo; unique_ptr<executor::TaskExecutor::CallbackArgs> _cbData; ReplSetConfig _currentConfig; Date_t _now; int _selfIndex; - TopologyCoordinatorImpl::Options _options; + TopologyCoordinator::Options _options; }; TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) { @@ -6263,7 +6262,7 @@ TEST_F(TopoCoordTest, DoNotGrantDryRunVoteWhenOpTimeIsStale) { TEST_F(TopoCoordTest, CSRSConfigServerRejectsPV0Config) { ON_BLOCK_EXIT([]() { serverGlobalParams.clusterRole = ClusterRole::None; }); serverGlobalParams.clusterRole = ClusterRole::ConfigServer; - TopologyCoordinatorImpl::Options options; + TopologyCoordinator::Options options; options.clusterRole = ClusterRole::ConfigServer; setOptions(options); getTopoCoord().setStorageEngineSupportsReadCommitted(false); diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp index 048bb17b573..c43c0b53066 100644 --- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp @@ -38,7 +38,6 @@ #include "mongo/db/repl/repl_set_heartbeat_response.h" #include "mongo/db/repl/repl_set_request_votes_args.h" #include "mongo/db/repl/topology_coordinator.h" -#include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/db/server_options.h" #include "mongo/executor/task_executor.h" #include "mongo/logger/logger.h" @@ -74,9 +73,9 @@ bool stringContains(const std::string& haystack, const std::string& needle) { class TopoCoordTest : public mongo::unittest::Test { public: virtual void setUp() { - _options = TopologyCoordinatorImpl::Options{}; + _options = TopologyCoordinator::Options{}; _options.maxSyncSourceLagSecs = Seconds{100}; - _topo.reset(new TopologyCoordinatorImpl(_options)); + _topo.reset(new TopologyCoordinator(_options)); _now = Date_t(); _selfIndex = -1; _cbData.reset(new executor::TaskExecutor::CallbackArgs( @@ -89,7 +88,7 @@ public: } protected: - TopologyCoordinatorImpl& getTopoCoord() { + TopologyCoordinator& getTopoCoord() { return *_topo; } executor::TaskExecutor::CallbackArgs cbData() { @@ -99,9 +98,9 @@ protected: return _now; } - void setOptions(const TopologyCoordinatorImpl::Options& options) { + void setOptions(const TopologyCoordinator::Options& options) { _options = options; - _topo.reset(new TopologyCoordinatorImpl(_options)); + _topo.reset(new TopologyCoordinator(_options)); } int64_t countLogLinesContaining(const std::string& needle) { @@ -252,12 +251,12 @@ private: } private: - unique_ptr<TopologyCoordinatorImpl> _topo; + unique_ptr<TopologyCoordinator> _topo; unique_ptr<executor::TaskExecutor::CallbackArgs> _cbData; ReplSetConfig _currentConfig; Date_t _now; int _selfIndex; - TopologyCoordinatorImpl::Options _options; + TopologyCoordinator::Options _options; }; TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) { @@ -3179,7 +3178,7 @@ TEST_F(TopoCoordTest, DoNotGrantDryRunVoteWhenOpTimeIsStale) { TEST_F(TopoCoordTest, NodeTransitionsToRemovedIfCSRSButHaveNoReadCommittedSupport) { ON_BLOCK_EXIT([]() { serverGlobalParams.clusterRole = ClusterRole::None; }); serverGlobalParams.clusterRole = ClusterRole::ConfigServer; - TopologyCoordinatorImpl::Options options; + TopologyCoordinator::Options options; options.clusterRole = ClusterRole::ConfigServer; setOptions(options); getTopoCoord().setStorageEngineSupportsReadCommitted(false); @@ -3206,7 +3205,7 @@ TEST_F(TopoCoordTest, NodeTransitionsToRemovedIfCSRSButHaveNoReadCommittedSuppor TEST_F(TopoCoordTest, NodeBecomesSecondaryAsNormalWhenReadCommittedSupportedAndCSRS) { ON_BLOCK_EXIT([]() { serverGlobalParams.clusterRole = ClusterRole::None; }); serverGlobalParams.clusterRole = ClusterRole::ConfigServer; - TopologyCoordinatorImpl::Options options; + TopologyCoordinator::Options options; options.clusterRole = ClusterRole::ConfigServer; setOptions(options); getTopoCoord().setStorageEngineSupportsReadCommitted(true); |