summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2017-11-27 10:32:07 -0500
committerMatthew Russotto <matthew.russotto@10gen.com>2017-11-28 15:59:13 -0500
commit26b42901e302db19cfbdbf3726b7be6fb624f4f5 (patch)
tree3e1a3e8f57c584ea3131b6fda11d84531b3077eb /src/mongo/db
parente38cf83f058f2e29e41c4fe2f02b7fbc405ceb8c (diff)
downloadmongo-26b42901e302db19cfbdbf3726b7be6fb624f4f5.tar.gz
SERVER-30626 Remove TopologyCoordinator interface.
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/SConscript2
-rw-r--r--src/mongo/db/db.cpp6
-rw-r--r--src/mongo/db/repl/SConscript30
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp6
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.h6
-rw-r--r--src/mongo/db/repl/topology_coordinator.cpp372
-rw-r--r--src/mongo/db/repl/topology_coordinator.h931
-rw-r--r--src/mongo/db/repl/topology_coordinator_test.cpp17
-rw-r--r--src/mongo/db/repl/topology_coordinator_v1_test.cpp19
16 files changed, 996 insertions, 407 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 904c31e9092..5d466a1777e 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -840,7 +840,7 @@ env.Library(
"repl/serveronly",
"repl/storage_interface",
"repl/sync_tail",
- "repl/topology_coordinator_impl",
+ "repl/topology_coordinator",
"rw_concern_d",
"s/collection_metadata",
"s/commands",
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index cf7acb988cf..fe0402f15e3 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -102,7 +102,7 @@
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/replication_recovery.h"
#include "mongo/db/repl/storage_interface_impl.h"
-#include "mongo/db/repl/topology_coordinator_impl.h"
+#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/db/s/balancer/balancer.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_initialization_mongod.h"
@@ -1194,7 +1194,7 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager,
serviceContext, stdx::make_unique<repl::DropPendingCollectionReaper>(storageInterface));
auto dropPendingCollectionReaper = repl::DropPendingCollectionReaper::get(serviceContext);
- repl::TopologyCoordinatorImpl::Options topoCoordOptions;
+ repl::TopologyCoordinator::Options topoCoordOptions;
topoCoordOptions.maxSyncSourceLagSecs = Seconds(repl::maxSyncSourceLagSecs);
topoCoordOptions.clusterRole = serverGlobalParams.clusterRole;
@@ -1207,7 +1207,7 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager,
stdx::make_unique<repl::ReplicationCoordinatorExternalStateImpl>(
serviceContext, dropPendingCollectionReaper, storageInterface, replicationProcess),
makeReplicationExecutor(serviceContext),
- stdx::make_unique<repl::TopologyCoordinatorImpl>(topoCoordOptions),
+ stdx::make_unique<repl::TopologyCoordinator>(topoCoordOptions),
replicationProcess,
storageInterface,
static_cast<int64_t>(curTimeMillis64()));
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 1621bb65651..bf1821ce75d 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -732,23 +732,13 @@ env.Library('topology_coordinator',
'topology_coordinator.cpp',
],
LIBDEPS=[
- 'repl_coordinator_interface',
- 'replica_set_messages',
- 'rslog',
- '$BUILD_DIR/mongo/rpc/metadata',
- ])
-
-env.Library('topology_coordinator_impl',
- [
- 'topology_coordinator_impl.cpp',
- ],
- LIBDEPS=[
'$BUILD_DIR/mongo/db/audit',
+ '$BUILD_DIR/mongo/rpc/metadata',
+ '$BUILD_DIR/mongo/util/fail_point',
'replica_set_messages',
'repl_settings',
'rslog',
- 'topology_coordinator',
- '$BUILD_DIR/mongo/util/fail_point',
+ 'repl_coordinator_interface',
])
env.CppUnitTest('repl_set_heartbeat_response_test',
@@ -756,12 +746,12 @@ env.CppUnitTest('repl_set_heartbeat_response_test',
LIBDEPS=['replica_set_messages'])
env.CppUnitTest(
- target='topology_coordinator_impl_test',
+ target='topology_coordinator_test',
source=[
- 'topology_coordinator_impl_test.cpp',
+ 'topology_coordinator_test.cpp',
],
LIBDEPS=[
- 'topology_coordinator_impl',
+ 'topology_coordinator',
'replica_set_messages',
'$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
'$BUILD_DIR/mongo/db/service_context_noop_init',
@@ -769,12 +759,12 @@ env.CppUnitTest(
)
env.CppUnitTest(
- target='topology_coordinator_impl_v1_test',
+ target='topology_coordinator_v1_test',
source=[
- 'topology_coordinator_impl_v1_test.cpp',
+ 'topology_coordinator_v1_test.cpp',
],
LIBDEPS=[
- 'topology_coordinator_impl',
+ 'topology_coordinator',
'replica_set_messages',
'$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
'$BUILD_DIR/mongo/db/service_context_noop_init',
@@ -828,7 +818,7 @@ env.Library(
'repl_coordinator_impl',
'replmocks',
'service_context_repl_mock_init',
- 'topology_coordinator_impl',
+ 'topology_coordinator',
'$BUILD_DIR/mongo/db/auth/authorization_manager_global',
'$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
'$BUILD_DIR/mongo/executor/network_interface_mock',
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect.cpp
index 727d652630a..ac6e07ce3ee 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect.cpp
@@ -34,7 +34,7 @@
#include "mongo/db/repl/elect_cmd_runner.h"
#include "mongo/db/repl/freshness_checker.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
-#include "mongo/db/repl/topology_coordinator_impl.h"
+#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp
index e449665a483..0025f70464b 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp
@@ -39,7 +39,7 @@
#include "mongo/db/repl/replication_coordinator_external_state_mock.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_coordinator_test_fixture.h"
-#include "mongo/db/repl/topology_coordinator_impl.h"
+#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/fail_point_service.h"
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
index 75b419765b8..dac78fdd56e 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1.cpp
@@ -34,7 +34,7 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
-#include "mongo/db/repl/topology_coordinator_impl.h"
+#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/db/repl/vote_requester.h"
#include "mongo/platform/unordered_set.h"
#include "mongo/stdx/mutex.h"
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
index 6aff08b85dc..0fd15751fec 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
@@ -39,7 +39,7 @@
#include "mongo/db/repl/replication_coordinator_external_state_mock.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_coordinator_test_fixture.h"
-#include "mongo/db/repl/topology_coordinator_impl.h"
+#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/fail_point_service.h"
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp
index c83657a154e..55140bb9a23 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp
@@ -39,7 +39,7 @@
#include "mongo/db/repl/replication_coordinator_external_state_mock.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_coordinator_test_fixture.h"
-#include "mongo/db/repl/topology_coordinator_impl.h"
+#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/log.h"
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
index 6e77d46c084..2637deed13a 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
@@ -39,7 +39,7 @@
#include "mongo/db/repl/replication_coordinator_external_state_mock.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_coordinator_test_fixture.h"
-#include "mongo/db/repl/topology_coordinator_impl.h"
+#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/unittest/unittest.h"
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index 24975726a36..a7201c03d32 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -55,7 +55,7 @@
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_coordinator_test_fixture.h"
#include "mongo/db/repl/storage_interface_mock.h"
-#include "mongo/db/repl/topology_coordinator_impl.h"
+#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/db/repl/update_position_args.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context_noop.h"
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index 6d1555e2ed0..c2278a72e9a 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -44,7 +44,7 @@
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/replication_recovery_mock.h"
#include "mongo/db/repl/storage_interface_mock.h"
-#include "mongo/db/repl/topology_coordinator_impl.h"
+#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/thread_pool_mock.h"
#include "mongo/executor/thread_pool_task_executor.h"
@@ -138,8 +138,8 @@ void ReplCoordTest::init() {
auto logicalClock = stdx::make_unique<LogicalClock>(service);
LogicalClock::set(service, std::move(logicalClock));
- TopologyCoordinatorImpl::Options settings;
- auto topo = stdx::make_unique<TopologyCoordinatorImpl>(settings);
+ TopologyCoordinator::Options settings;
+ auto topo = stdx::make_unique<TopologyCoordinator>(settings);
_topo = topo.get();
auto net = stdx::make_unique<NetworkInterfaceMock>();
_net = net.get();
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h
index 92eeea79f63..15b0027b32f 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.h
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h
@@ -48,7 +48,7 @@ class ReplSetConfig;
class ReplicationCoordinatorExternalStateMock;
class ReplicationCoordinatorImpl;
class StorageInterfaceMock;
-class TopologyCoordinatorImpl;
+class TopologyCoordinator;
using executor::NetworkInterfaceMock;
@@ -120,7 +120,7 @@ protected:
/**
* Gets the topology coordinator used by the replication coordinator under test.
*/
- TopologyCoordinatorImpl& getTopoCoord() {
+ TopologyCoordinator& getTopoCoord() {
return *_topo;
}
@@ -288,7 +288,7 @@ protected:
private:
std::unique_ptr<ReplicationCoordinatorImpl> _repl;
// Owned by ReplicationCoordinatorImpl
- TopologyCoordinatorImpl* _topo = nullptr;
+ TopologyCoordinator* _topo = nullptr;
// Owned by executor
executor::NetworkInterfaceMock* _net = nullptr;
// Owned by ReplicationCoordinatorImpl
diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp
index 84f4ccbedfe..ffca19a4949 100644
--- a/src/mongo/db/repl/topology_coordinator.cpp
+++ b/src/mongo/db/repl/topology_coordinator.cpp
@@ -30,9 +30,10 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/repl/topology_coordinator_impl.h"
+#include "mongo/db/repl/topology_coordinator.h"
#include <limits>
+#include <string>
#include "mongo/db/audit.h"
#include "mongo/db/client.h"
@@ -50,6 +51,7 @@
#include "mongo/db/server_parameters.h"
#include "mongo/rpc/metadata/oplog_query_metadata.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
+#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/hex.h"
#include "mongo/util/log.h"
@@ -59,19 +61,61 @@
namespace mongo {
namespace repl {
using std::vector;
-const Seconds TopologyCoordinatorImpl::VoteLease::leaseTime = Seconds(30);
+const Seconds TopologyCoordinator::VoteLease::leaseTime = Seconds(30);
// Controls how caught up in replication a secondary with higher priority than the current primary
// must be before it will call for a priority takeover election.
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(priorityTakeoverFreshnessWindowSeconds, int, 2);
-// If this fail point is enabled, TopologyCoordinatorImpl::shouldChangeSyncSource() will ignore
-// the option TopologyCoordinatorImpl::Options::maxSyncSourceLagSecs. The sync source will not be
+// If this fail point is enabled, TopologyCoordinator::shouldChangeSyncSource() will ignore
+// the option TopologyCoordinator::Options::maxSyncSourceLagSecs. The sync source will not be
// re-evaluated if it lags behind another node by more than 'maxSyncSourceLagSecs' seconds.
MONGO_FP_DECLARE(disableMaxSyncSourceLagSecs);
+constexpr Milliseconds TopologyCoordinator::PingStats::UninitializedPing;
+
namespace {
+constexpr int kLeaderValue = 0;
+constexpr int kFollowerValue = 1;
+constexpr int kCandidateValue = 2;
+} // namespace
+
+const TopologyCoordinator::Role TopologyCoordinator::Role::leader(kLeaderValue);
+const TopologyCoordinator::Role TopologyCoordinator::Role::follower(kFollowerValue);
+const TopologyCoordinator::Role TopologyCoordinator::Role::candidate(kCandidateValue);
+
+TopologyCoordinator::Role::Role(int value) : _value(value) {}
+
+std::string TopologyCoordinator::Role::toString() const {
+ switch (_value) {
+ case kLeaderValue:
+ return "leader";
+ case kFollowerValue:
+ return "follower";
+ case kCandidateValue:
+ return "candidate";
+ }
+ invariant(false);
+}
+
+TopologyCoordinator::~TopologyCoordinator() {}
+std::ostream& operator<<(std::ostream& os, TopologyCoordinator::Role role) {
+ return os << role.toString();
+}
+
+std::ostream& operator<<(std::ostream& os,
+ TopologyCoordinator::PrepareFreezeResponseResult result) {
+ switch (result) {
+ case TopologyCoordinator::PrepareFreezeResponseResult::kNoAction:
+ return os << "no action";
+ case TopologyCoordinator::PrepareFreezeResponseResult::kElectSelf:
+ return os << "elect self";
+ }
+ MONGO_UNREACHABLE;
+}
+
+namespace {
template <typename T>
int indexOfIterator(const std::vector<T>& vec, typename std::vector<T>::const_iterator& it) {
return static_cast<int>(it - vec.begin());
@@ -114,23 +158,23 @@ void appendOpTime(BSONObjBuilder* bob,
}
} // namespace
-void PingStats::start(Date_t now) {
+void TopologyCoordinator::PingStats::start(Date_t now) {
_lastHeartbeatStartDate = now;
_numFailuresSinceLastStart = 0;
}
-void PingStats::hit(Milliseconds millis) {
+void TopologyCoordinator::PingStats::hit(Milliseconds millis) {
_numFailuresSinceLastStart = std::numeric_limits<int>::max();
++count;
value = value == UninitializedPing ? millis : Milliseconds((value * 4 + millis) / 5);
}
-void PingStats::miss() {
+void TopologyCoordinator::PingStats::miss() {
++_numFailuresSinceLastStart;
}
-TopologyCoordinatorImpl::TopologyCoordinatorImpl(Options options)
+TopologyCoordinator::TopologyCoordinator(Options options)
: _role(Role::follower),
_term(OpTime::kUninitializedTerm),
_currentPrimaryIndex(-1),
@@ -145,22 +189,22 @@ TopologyCoordinatorImpl::TopologyCoordinatorImpl(Options options)
_memberData.back().setIsSelf(true);
}
-TopologyCoordinator::Role TopologyCoordinatorImpl::getRole() const {
+TopologyCoordinator::Role TopologyCoordinator::getRole() const {
return _role;
}
-void TopologyCoordinatorImpl::setForceSyncSourceIndex(int index) {
+void TopologyCoordinator::setForceSyncSourceIndex(int index) {
invariant(_forceSyncSourceIndex < _rsConfig.getNumMembers());
_forceSyncSourceIndex = index;
}
-HostAndPort TopologyCoordinatorImpl::getSyncSourceAddress() const {
+HostAndPort TopologyCoordinator::getSyncSourceAddress() const {
return _syncSource;
}
-HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now,
- const OpTime& lastOpTimeFetched,
- ChainingPreference chainingPreference) {
+HostAndPort TopologyCoordinator::chooseNewSyncSource(Date_t now,
+ const OpTime& lastOpTimeFetched,
+ ChainingPreference chainingPreference) {
// If we are not a member of the current replica set configuration, no sync source is valid.
if (_selfIndex == -1) {
LOG(1) << "Cannot sync from any members because we are not in the replica set config";
@@ -361,8 +405,7 @@ HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now,
return _syncSource;
}
-bool TopologyCoordinatorImpl::_memberIsBlacklisted(const MemberConfig& memberConfig,
- Date_t now) const {
+bool TopologyCoordinator::_memberIsBlacklisted(const MemberConfig& memberConfig, Date_t now) const {
std::map<HostAndPort, Date_t>::const_iterator blacklisted =
_syncSourceBlacklist.find(memberConfig.getHostAndPort());
if (blacklisted != _syncSourceBlacklist.end()) {
@@ -373,12 +416,12 @@ bool TopologyCoordinatorImpl::_memberIsBlacklisted(const MemberConfig& memberCon
return false;
}
-void TopologyCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) {
+void TopologyCoordinator::blacklistSyncSource(const HostAndPort& host, Date_t until) {
LOG(2) << "blacklisting " << host << " until " << until.toString();
_syncSourceBlacklist[host] = until;
}
-void TopologyCoordinatorImpl::unblacklistSyncSource(const HostAndPort& host, Date_t now) {
+void TopologyCoordinator::unblacklistSyncSource(const HostAndPort& host, Date_t now) {
std::map<HostAndPort, Date_t>::iterator hostItr = _syncSourceBlacklist.find(host);
if (hostItr != _syncSourceBlacklist.end() && now >= hostItr->second) {
LOG(2) << "unblacklisting " << host;
@@ -386,13 +429,13 @@ void TopologyCoordinatorImpl::unblacklistSyncSource(const HostAndPort& host, Dat
}
}
-void TopologyCoordinatorImpl::clearSyncSourceBlacklist() {
+void TopologyCoordinator::clearSyncSourceBlacklist() {
_syncSourceBlacklist.clear();
}
-void TopologyCoordinatorImpl::prepareSyncFromResponse(const HostAndPort& target,
- BSONObjBuilder* response,
- Status* result) {
+void TopologyCoordinator::prepareSyncFromResponse(const HostAndPort& target,
+ BSONObjBuilder* response,
+ Status* result) {
response->append("syncFromRequested", target.toString());
if (_selfIndex == -1) {
@@ -483,11 +526,10 @@ void TopologyCoordinatorImpl::prepareSyncFromResponse(const HostAndPort& target,
*result = Status::OK();
}
-void TopologyCoordinatorImpl::prepareFreshResponse(
- const ReplicationCoordinator::ReplSetFreshArgs& args,
- const Date_t now,
- BSONObjBuilder* response,
- Status* result) {
+void TopologyCoordinator::prepareFreshResponse(const ReplicationCoordinator::ReplSetFreshArgs& args,
+ const Date_t now,
+ BSONObjBuilder* response,
+ Status* result) {
if (_rsConfig.getProtocolVersion() != 0) {
*result = Status(ErrorCodes::BadValue,
str::stream() << "replset: incompatible replset protocol version: "
@@ -543,10 +585,9 @@ void TopologyCoordinatorImpl::prepareFreshResponse(
*result = Status::OK();
}
-bool TopologyCoordinatorImpl::_shouldVetoMember(
- const ReplicationCoordinator::ReplSetFreshArgs& args,
- const Date_t& now,
- std::string* errmsg) const {
+bool TopologyCoordinator::_shouldVetoMember(const ReplicationCoordinator::ReplSetFreshArgs& args,
+ const Date_t& now,
+ std::string* errmsg) const {
if (_rsConfig.getConfigVersion() < args.cfgver) {
// We are stale; do not veto.
return false;
@@ -611,11 +652,10 @@ bool TopologyCoordinatorImpl::_shouldVetoMember(
}
// produce a reply to a received electCmd
-void TopologyCoordinatorImpl::prepareElectResponse(
- const ReplicationCoordinator::ReplSetElectArgs& args,
- const Date_t now,
- BSONObjBuilder* response,
- Status* result) {
+void TopologyCoordinator::prepareElectResponse(const ReplicationCoordinator::ReplSetElectArgs& args,
+ const Date_t now,
+ BSONObjBuilder* response,
+ Status* result) {
if (_rsConfig.getProtocolVersion() != 0) {
*result = Status(ErrorCodes::BadValue,
str::stream() << "replset: incompatible replset protocol version: "
@@ -688,10 +728,10 @@ void TopologyCoordinatorImpl::prepareElectResponse(
}
// produce a reply to a heartbeat
-Status TopologyCoordinatorImpl::prepareHeartbeatResponse(Date_t now,
- const ReplSetHeartbeatArgs& args,
- const std::string& ourSetName,
- ReplSetHeartbeatResponse* response) {
+Status TopologyCoordinator::prepareHeartbeatResponse(Date_t now,
+ const ReplSetHeartbeatArgs& args,
+ const std::string& ourSetName,
+ ReplSetHeartbeatResponse* response) {
if (args.getProtocolVersion() != 1) {
return Status(ErrorCodes::BadValue,
str::stream() << "replset: incompatible replset protocol version: "
@@ -784,10 +824,10 @@ Status TopologyCoordinatorImpl::prepareHeartbeatResponse(Date_t now,
return Status::OK();
}
-Status TopologyCoordinatorImpl::prepareHeartbeatResponseV1(Date_t now,
- const ReplSetHeartbeatArgsV1& args,
- const std::string& ourSetName,
- ReplSetHeartbeatResponse* response) {
+Status TopologyCoordinator::prepareHeartbeatResponseV1(Date_t now,
+ const ReplSetHeartbeatArgsV1& args,
+ const std::string& ourSetName,
+ ReplSetHeartbeatResponse* response) {
// Verify that replica set names match
const std::string rshb = args.getSetName();
if (ourSetName != rshb) {
@@ -864,7 +904,7 @@ Status TopologyCoordinatorImpl::prepareHeartbeatResponseV1(Date_t now,
return Status::OK();
}
-int TopologyCoordinatorImpl::_getMemberIndex(int id) const {
+int TopologyCoordinator::_getMemberIndex(int id) const {
int index = 0;
for (ReplSetConfig::MemberIterator it = _rsConfig.membersBegin(); it != _rsConfig.membersEnd();
++it, ++index) {
@@ -875,7 +915,7 @@ int TopologyCoordinatorImpl::_getMemberIndex(int id) const {
return -1;
}
-std::pair<ReplSetHeartbeatArgs, Milliseconds> TopologyCoordinatorImpl::prepareHeartbeatRequest(
+std::pair<ReplSetHeartbeatArgs, Milliseconds> TopologyCoordinator::prepareHeartbeatRequest(
Date_t now, const std::string& ourSetName, const HostAndPort& target) {
PingStats& hbStats = _pings[target];
Milliseconds alreadyElapsed = now - hbStats.getLastHeartbeatStartDate();
@@ -914,7 +954,7 @@ std::pair<ReplSetHeartbeatArgs, Milliseconds> TopologyCoordinatorImpl::prepareHe
return std::make_pair(hbArgs, timeout);
}
-std::pair<ReplSetHeartbeatArgsV1, Milliseconds> TopologyCoordinatorImpl::prepareHeartbeatRequestV1(
+std::pair<ReplSetHeartbeatArgsV1, Milliseconds> TopologyCoordinator::prepareHeartbeatRequestV1(
Date_t now, const std::string& ourSetName, const HostAndPort& target) {
PingStats& hbStats = _pings[target];
Milliseconds alreadyElapsed(now.asInt64() - hbStats.getLastHeartbeatStartDate().asInt64());
@@ -954,7 +994,7 @@ std::pair<ReplSetHeartbeatArgsV1, Milliseconds> TopologyCoordinatorImpl::prepare
return std::make_pair(hbArgs, timeout);
}
-HeartbeatResponseAction TopologyCoordinatorImpl::processHeartbeatResponse(
+HeartbeatResponseAction TopologyCoordinator::processHeartbeatResponse(
Date_t now,
Milliseconds networkRoundTripTime,
const HostAndPort& target,
@@ -1092,9 +1132,9 @@ HeartbeatResponseAction TopologyCoordinatorImpl::processHeartbeatResponse(
return nextAction;
}
-bool TopologyCoordinatorImpl::haveNumNodesReachedOpTime(const OpTime& targetOpTime,
- int numNodes,
- bool durablyWritten) {
+bool TopologyCoordinator::haveNumNodesReachedOpTime(const OpTime& targetOpTime,
+ int numNodes,
+ bool durablyWritten) {
// Replication progress that is for some reason ahead of us should not allow us to
// satisfy a write concern if we aren't caught up ourselves.
OpTime myOpTime = durablyWritten ? getMyLastDurableOpTime() : getMyLastAppliedOpTime();
@@ -1116,9 +1156,9 @@ bool TopologyCoordinatorImpl::haveNumNodesReachedOpTime(const OpTime& targetOpTi
return false;
}
-bool TopologyCoordinatorImpl::haveTaggedNodesReachedOpTime(const OpTime& opTime,
- const ReplSetTagPattern& tagPattern,
- bool durablyWritten) {
+bool TopologyCoordinator::haveTaggedNodesReachedOpTime(const OpTime& opTime,
+ const ReplSetTagPattern& tagPattern,
+ bool durablyWritten) {
ReplSetTagMatch matcher(tagPattern);
for (auto&& memberData : _memberData) {
const OpTime& memberOpTime =
@@ -1141,7 +1181,7 @@ bool TopologyCoordinatorImpl::haveTaggedNodesReachedOpTime(const OpTime& opTime,
return false;
}
-HeartbeatResponseAction TopologyCoordinatorImpl::checkMemberTimeouts(Date_t now) {
+HeartbeatResponseAction TopologyCoordinator::checkMemberTimeouts(Date_t now) {
bool stepdown = false;
for (int memberIndex = 0; memberIndex < static_cast<int>(_memberData.size()); memberIndex++) {
auto& memberData = _memberData[memberIndex];
@@ -1160,9 +1200,9 @@ HeartbeatResponseAction TopologyCoordinatorImpl::checkMemberTimeouts(Date_t now)
return HeartbeatResponseAction::makeNoAction();
}
-std::vector<HostAndPort> TopologyCoordinatorImpl::getHostsWrittenTo(const OpTime& op,
- bool durablyWritten,
- bool skipSelf) {
+std::vector<HostAndPort> TopologyCoordinator::getHostsWrittenTo(const OpTime& op,
+ bool durablyWritten,
+ bool skipSelf) {
std::vector<HostAndPort> hosts;
for (const auto& memberData : _memberData) {
if (skipSelf && memberData.isSelf()) {
@@ -1182,7 +1222,7 @@ std::vector<HostAndPort> TopologyCoordinatorImpl::getHostsWrittenTo(const OpTime
return hosts;
}
-bool TopologyCoordinatorImpl::setMemberAsDown(Date_t now, const int memberIndex) {
+bool TopologyCoordinator::setMemberAsDown(Date_t now, const int memberIndex) {
invariant(memberIndex != _selfIndex);
invariant(memberIndex != -1);
invariant(_currentPrimaryIndex == _selfIndex);
@@ -1196,7 +1236,7 @@ bool TopologyCoordinatorImpl::setMemberAsDown(Date_t now, const int memberIndex)
return false;
}
-std::pair<int, Date_t> TopologyCoordinatorImpl::getStalestLiveMember() const {
+std::pair<int, Date_t> TopologyCoordinator::getStalestLiveMember() const {
Date_t earliestDate = Date_t::max();
int earliestMemberId = -1;
for (const auto& memberData : _memberData) {
@@ -1217,39 +1257,39 @@ std::pair<int, Date_t> TopologyCoordinatorImpl::getStalestLiveMember() const {
return std::make_pair(earliestMemberId, earliestDate);
}
-void TopologyCoordinatorImpl::resetAllMemberTimeouts(Date_t now) {
+void TopologyCoordinator::resetAllMemberTimeouts(Date_t now) {
for (auto&& memberData : _memberData)
memberData.updateLiveness(now);
}
-void TopologyCoordinatorImpl::resetMemberTimeouts(
- Date_t now, const stdx::unordered_set<HostAndPort>& member_set) {
+void TopologyCoordinator::resetMemberTimeouts(Date_t now,
+ const stdx::unordered_set<HostAndPort>& member_set) {
for (auto&& memberData : _memberData) {
if (member_set.count(memberData.getHostAndPort()))
memberData.updateLiveness(now);
}
}
-OpTime TopologyCoordinatorImpl::getMyLastAppliedOpTime() const {
+OpTime TopologyCoordinator::getMyLastAppliedOpTime() const {
return _selfMemberData().getLastAppliedOpTime();
}
-OpTime TopologyCoordinatorImpl::getMyLastDurableOpTime() const {
+OpTime TopologyCoordinator::getMyLastDurableOpTime() const {
return _selfMemberData().getLastDurableOpTime();
}
-MemberData* TopologyCoordinatorImpl::getMyMemberData() {
+MemberData* TopologyCoordinator::getMyMemberData() {
return &_memberData[_selfMemberDataIndex()];
}
-MemberData* TopologyCoordinatorImpl::findMemberDataByMemberId(const int memberId) {
+MemberData* TopologyCoordinator::findMemberDataByMemberId(const int memberId) {
const int memberIndex = _getMemberIndex(memberId);
if (memberIndex >= 0)
return &_memberData[memberIndex];
return nullptr;
}
-MemberData* TopologyCoordinatorImpl::findMemberDataByRid(const OID rid) {
+MemberData* TopologyCoordinator::findMemberDataByRid(const OID rid) {
for (auto& memberData : _memberData) {
if (memberData.getRid() == rid)
return &memberData;
@@ -1257,7 +1297,7 @@ MemberData* TopologyCoordinatorImpl::findMemberDataByRid(const OID rid) {
return nullptr;
}
-MemberData* TopologyCoordinatorImpl::addSlaveMemberData(const OID rid) {
+MemberData* TopologyCoordinator::addSlaveMemberData(const OID rid) {
invariant(!_memberData.empty()); // Must always have our own entry first.
invariant(!_rsConfig.isInitialized()); // Used only for master-slave.
_memberData.emplace_back();
@@ -1266,7 +1306,7 @@ MemberData* TopologyCoordinatorImpl::addSlaveMemberData(const OID rid) {
return result;
}
-HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBDataV1(
+HeartbeatResponseAction TopologyCoordinator::_updatePrimaryFromHBDataV1(
int updatedConfigIndex, const MemberState& originalState, Date_t now) {
//
// Updates the local notion of which remote node, if any is primary.
@@ -1339,7 +1379,7 @@ HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBDataV1(
return HeartbeatResponseAction::makeNoAction();
}
-HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBData(
+HeartbeatResponseAction TopologyCoordinator::_updatePrimaryFromHBData(
int updatedConfigIndex, const MemberState& originalState, Date_t now) {
// This method has two interrelated responsibilities, performed in two phases.
//
@@ -1519,14 +1559,14 @@ HeartbeatResponseAction TopologyCoordinatorImpl::_updatePrimaryFromHBData(
const auto status = checkShouldStandForElection(now);
if (!status.isOK()) {
// NOTE: This log line is checked in unit test(s).
- LOG(2) << "TopologyCoordinatorImpl::_updatePrimaryFromHBData - " << status.reason();
+ LOG(2) << "TopologyCoordinator::_updatePrimaryFromHBData - " << status.reason();
return HeartbeatResponseAction::makeNoAction();
}
fassertStatusOK(28816, becomeCandidateIfElectable(now, StartElectionReason::kElectionTimeout));
return HeartbeatResponseAction::makeElectAction();
}
-Status TopologyCoordinatorImpl::checkShouldStandForElection(Date_t now) const {
+Status TopologyCoordinator::checkShouldStandForElection(Date_t now) const {
if (_currentPrimaryIndex != -1) {
return {ErrorCodes::NodeNotElectable, "Not standing for election since there is a Primary"};
}
@@ -1570,7 +1610,7 @@ Status TopologyCoordinatorImpl::checkShouldStandForElection(Date_t now) const {
return Status::OK();
}
-bool TopologyCoordinatorImpl::_aMajoritySeemsToBeUp() const {
+bool TopologyCoordinator::_aMajoritySeemsToBeUp() const {
int vUp = 0;
for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
++it) {
@@ -1583,7 +1623,7 @@ bool TopologyCoordinatorImpl::_aMajoritySeemsToBeUp() const {
return vUp * 2 > _rsConfig.getTotalVotingMembers();
}
-int TopologyCoordinatorImpl::_findHealthyPrimaryOfEqualOrGreaterPriority(
+int TopologyCoordinator::_findHealthyPrimaryOfEqualOrGreaterPriority(
const int candidateIndex) const {
const double candidatePriority = _rsConfig.getMemberAt(candidateIndex).getPriority();
for (auto it = _memberData.begin(); it != _memberData.end(); ++it) {
@@ -1600,13 +1640,13 @@ int TopologyCoordinatorImpl::_findHealthyPrimaryOfEqualOrGreaterPriority(
return -1;
}
-bool TopologyCoordinatorImpl::_isOpTimeCloseEnoughToLatestToElect(const OpTime& otherOpTime) const {
+bool TopologyCoordinator::_isOpTimeCloseEnoughToLatestToElect(const OpTime& otherOpTime) const {
const OpTime latestKnownOpTime = _latestKnownOpTime();
// Use addition instead of subtraction to avoid overflow.
return otherOpTime.getSecs() + 10 >= (latestKnownOpTime.getSecs());
}
-bool TopologyCoordinatorImpl::_amIFreshEnoughForPriorityTakeover() const {
+bool TopologyCoordinator::_amIFreshEnoughForPriorityTakeover() const {
const OpTime latestKnownOpTime = _latestKnownOpTime();
// Rules are:
@@ -1634,7 +1674,7 @@ bool TopologyCoordinatorImpl::_amIFreshEnoughForPriorityTakeover() const {
}
}
-bool TopologyCoordinatorImpl::_amIFreshEnoughForCatchupTakeover() const {
+bool TopologyCoordinator::_amIFreshEnoughForCatchupTakeover() const {
const OpTime latestKnownOpTime = _latestKnownOpTime();
@@ -1668,7 +1708,7 @@ bool TopologyCoordinatorImpl::_amIFreshEnoughForCatchupTakeover() const {
return ourLastOpApplied.getTerm() < _term;
}
-bool TopologyCoordinatorImpl::_iAmPrimary() const {
+bool TopologyCoordinator::_iAmPrimary() const {
if (_role == Role::leader) {
invariant(_currentPrimaryIndex == _selfIndex);
invariant(_leaderMode != LeaderMode::kNotLeader);
@@ -1677,7 +1717,7 @@ bool TopologyCoordinatorImpl::_iAmPrimary() const {
return false;
}
-OpTime TopologyCoordinatorImpl::_latestKnownOpTime() const {
+OpTime TopologyCoordinator::_latestKnownOpTime() const {
OpTime latest = getMyLastAppliedOpTime();
for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
++it) {
@@ -1705,8 +1745,7 @@ OpTime TopologyCoordinatorImpl::_latestKnownOpTime() const {
return latest;
}
-bool TopologyCoordinatorImpl::_isMemberHigherPriority(int memberOneIndex,
- int memberTwoIndex) const {
+bool TopologyCoordinator::_isMemberHigherPriority(int memberOneIndex, int memberTwoIndex) const {
if (memberOneIndex == -1)
return false;
@@ -1717,7 +1756,7 @@ bool TopologyCoordinatorImpl::_isMemberHigherPriority(int memberOneIndex,
_rsConfig.getMemberAt(memberTwoIndex).getPriority();
}
-int TopologyCoordinatorImpl::_getHighestPriorityElectableIndex(Date_t now) const {
+int TopologyCoordinator::_getHighestPriorityElectableIndex(Date_t now) const {
int maxIndex = -1;
for (int currentIndex = 0; currentIndex < _rsConfig.getNumMembers(); currentIndex++) {
UnelectableReasonMask reason = currentIndex == _selfIndex
@@ -1731,7 +1770,7 @@ int TopologyCoordinatorImpl::_getHighestPriorityElectableIndex(Date_t now) const
return maxIndex;
}
-bool TopologyCoordinatorImpl::prepareForUnconditionalStepDown() {
+bool TopologyCoordinator::prepareForUnconditionalStepDown() {
if (_leaderMode == LeaderMode::kSteppingDown) {
// Can only be processing one required stepdown at a time.
return false;
@@ -1742,7 +1781,7 @@ bool TopologyCoordinatorImpl::prepareForUnconditionalStepDown() {
return true;
}
-Status TopologyCoordinatorImpl::prepareForStepDownAttempt() {
+Status TopologyCoordinator::prepareForStepDownAttempt() {
if (_leaderMode == LeaderMode::kSteppingDown ||
_leaderMode == LeaderMode::kAttemptingStepDown) {
return Status{ErrorCodes::ConflictingOperationInProgress,
@@ -1752,14 +1791,14 @@ Status TopologyCoordinatorImpl::prepareForStepDownAttempt() {
return Status::OK();
}
-void TopologyCoordinatorImpl::abortAttemptedStepDownIfNeeded() {
+void TopologyCoordinator::abortAttemptedStepDownIfNeeded() {
if (_leaderMode == TopologyCoordinator::LeaderMode::kAttemptingStepDown) {
_setLeaderMode(TopologyCoordinator::LeaderMode::kMaster);
}
}
-void TopologyCoordinatorImpl::changeMemberState_forTest(const MemberState& newMemberState,
- const Timestamp& electionTime) {
+void TopologyCoordinator::changeMemberState_forTest(const MemberState& newMemberState,
+ const Timestamp& electionTime) {
invariant(_selfIndex != -1);
if (newMemberState == getMemberState())
return;
@@ -1795,7 +1834,7 @@ void TopologyCoordinatorImpl::changeMemberState_forTest(const MemberState& newMe
log() << newMemberState;
}
-void TopologyCoordinatorImpl::_setCurrentPrimaryForTest(int primaryIndex) {
+void TopologyCoordinator::_setCurrentPrimaryForTest(int primaryIndex) {
if (primaryIndex == _selfIndex) {
changeMemberState_forTest(MemberState::RS_PRIMARY);
} else {
@@ -1817,16 +1856,16 @@ void TopologyCoordinatorImpl::_setCurrentPrimaryForTest(int primaryIndex) {
}
}
-const MemberConfig* TopologyCoordinatorImpl::_currentPrimaryMember() const {
+const MemberConfig* TopologyCoordinator::_currentPrimaryMember() const {
if (_currentPrimaryIndex == -1)
return NULL;
return &(_rsConfig.getMemberAt(_currentPrimaryIndex));
}
-void TopologyCoordinatorImpl::prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs,
- BSONObjBuilder* response,
- Status* result) {
+void TopologyCoordinator::prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs,
+ BSONObjBuilder* response,
+ Status* result) {
// output for each member
vector<BSONObj> membersOut;
const MemberState myState = getMemberState();
@@ -1933,12 +1972,10 @@ void TopologyCoordinatorImpl::prepareStatusResponse(const ReplSetStatusArgs& rsS
bb.appendDate("lastHeartbeat", it->getLastHeartbeat());
bb.appendDate("lastHeartbeatRecv", it->getLastHeartbeatRecv());
Milliseconds ping = _getPing(itConfig.getHostAndPort());
- if (ping != UninitializedPing) {
- bb.append("pingMs", durationCount<Milliseconds>(ping));
- std::string s = it->getLastHeartbeatMsg();
- if (!s.empty())
- bb.append("lastHeartbeatMessage", s);
- }
+ bb.append("pingMs", durationCount<Milliseconds>(ping));
+ std::string s = it->getLastHeartbeatMsg();
+ if (!s.empty())
+ bb.append("lastHeartbeatMessage", s);
if (it->hasAuthIssue()) {
bb.append("authenticated", false);
}
@@ -1997,7 +2034,7 @@ void TopologyCoordinatorImpl::prepareStatusResponse(const ReplSetStatusArgs& rsS
*result = Status::OK();
}
-StatusWith<BSONObj> TopologyCoordinatorImpl::prepareReplSetUpdatePositionCommand(
+StatusWith<BSONObj> TopologyCoordinator::prepareReplSetUpdatePositionCommand(
ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle,
OpTime currentCommittedSnapshotOpTime) const {
BSONObjBuilder cmdBuilder;
@@ -2051,7 +2088,7 @@ StatusWith<BSONObj> TopologyCoordinatorImpl::prepareReplSetUpdatePositionCommand
return cmdBuilder.obj();
}
-void TopologyCoordinatorImpl::fillMemberData(BSONObjBuilder* result) {
+void TopologyCoordinator::fillMemberData(BSONObjBuilder* result) {
BSONArrayBuilder replicationProgress(result->subarrayStart("replicationProgress"));
{
for (const auto& memberData : _memberData) {
@@ -2076,7 +2113,7 @@ void TopologyCoordinatorImpl::fillMemberData(BSONObjBuilder* result) {
}
}
-void TopologyCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* response) {
+void TopologyCoordinator::fillIsMasterForReplSet(IsMasterResponse* response) {
const MemberState myState = getMemberState();
if (!_rsConfig.isInitialized()) {
response->markAsNoConfig();
@@ -2146,8 +2183,8 @@ void TopologyCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* response)
}
}
-StatusWith<TopologyCoordinatorImpl::PrepareFreezeResponseResult>
-TopologyCoordinatorImpl::prepareFreezeResponse(Date_t now, int secs, BSONObjBuilder* response) {
+StatusWith<TopologyCoordinator::PrepareFreezeResponseResult>
+TopologyCoordinator::prepareFreezeResponse(Date_t now, int secs, BSONObjBuilder* response) {
if (_role != TopologyCoordinator::Role::follower) {
std::string msg = str::stream()
<< "cannot freeze node when primary or running for election. state: "
@@ -2180,7 +2217,7 @@ TopologyCoordinatorImpl::prepareFreezeResponse(Date_t now, int secs, BSONObjBuil
return PrepareFreezeResponseResult::kNoAction;
}
-bool TopologyCoordinatorImpl::becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now) {
+bool TopologyCoordinator::becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now) {
if (_stepDownUntil > now) {
return false;
}
@@ -2195,31 +2232,31 @@ bool TopologyCoordinatorImpl::becomeCandidateIfStepdownPeriodOverAndSingleNodeSe
return false;
}
-void TopologyCoordinatorImpl::setElectionSleepUntil(Date_t newTime) {
+void TopologyCoordinator::setElectionSleepUntil(Date_t newTime) {
if (_electionSleepUntil < newTime) {
_electionSleepUntil = newTime;
}
}
-Timestamp TopologyCoordinatorImpl::getElectionTime() const {
+Timestamp TopologyCoordinator::getElectionTime() const {
return _electionTime;
}
-OID TopologyCoordinatorImpl::getElectionId() const {
+OID TopologyCoordinator::getElectionId() const {
return _electionId;
}
-int TopologyCoordinatorImpl::getCurrentPrimaryIndex() const {
+int TopologyCoordinator::getCurrentPrimaryIndex() const {
return _currentPrimaryIndex;
}
-Date_t TopologyCoordinatorImpl::getStepDownTime() const {
+Date_t TopologyCoordinator::getStepDownTime() const {
return _stepDownUntil;
}
-void TopologyCoordinatorImpl::_updateHeartbeatDataForReconfig(const ReplSetConfig& newConfig,
- int selfIndex,
- Date_t now) {
+void TopologyCoordinator::_updateHeartbeatDataForReconfig(const ReplSetConfig& newConfig,
+ int selfIndex,
+ Date_t now) {
std::vector<MemberData> oldHeartbeats;
_memberData.swap(oldHeartbeats);
@@ -2265,9 +2302,7 @@ void TopologyCoordinatorImpl::_updateHeartbeatDataForReconfig(const ReplSetConfi
// This function installs a new config object and recreates MemberData objects
// that reflect the new config.
-void TopologyCoordinatorImpl::updateConfig(const ReplSetConfig& newConfig,
- int selfIndex,
- Date_t now) {
+void TopologyCoordinator::updateConfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now) {
invariant(_role != Role::candidate);
invariant(selfIndex < newConfig.getNumMembers());
@@ -2312,7 +2347,7 @@ void TopologyCoordinatorImpl::updateConfig(const ReplSetConfig& newConfig,
_role = Role::candidate;
}
}
-std::string TopologyCoordinatorImpl::_getHbmsg(Date_t now) const {
+std::string TopologyCoordinator::_getHbmsg(Date_t now) const {
// ignore messages over 2 minutes old
if ((now - _hbmsgTime) > Seconds{120}) {
return "";
@@ -2320,20 +2355,20 @@ std::string TopologyCoordinatorImpl::_getHbmsg(Date_t now) const {
return _hbmsg;
}
-void TopologyCoordinatorImpl::setMyHeartbeatMessage(const Date_t now, const std::string& message) {
+void TopologyCoordinator::setMyHeartbeatMessage(const Date_t now, const std::string& message) {
_hbmsgTime = now;
_hbmsg = message;
}
-const MemberConfig& TopologyCoordinatorImpl::_selfConfig() const {
+const MemberConfig& TopologyCoordinator::_selfConfig() const {
return _rsConfig.getMemberAt(_selfIndex);
}
-const MemberData& TopologyCoordinatorImpl::_selfMemberData() const {
+const MemberData& TopologyCoordinator::_selfMemberData() const {
return _memberData[_selfMemberDataIndex()];
}
-const int TopologyCoordinatorImpl::_selfMemberDataIndex() const {
+const int TopologyCoordinator::_selfMemberDataIndex() const {
invariant(!_memberData.empty());
if (_selfIndex >= 0)
return _selfIndex;
@@ -2342,7 +2377,7 @@ const int TopologyCoordinatorImpl::_selfMemberDataIndex() const {
return 0;
}
-TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getUnelectableReason(
+TopologyCoordinator::UnelectableReasonMask TopologyCoordinator::_getUnelectableReason(
int index) const {
invariant(index != _selfIndex);
const MemberConfig& memberConfig = _rsConfig.getMemberAt(index);
@@ -2368,7 +2403,7 @@ TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getUnel
return result;
}
-TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getMyUnelectableReason(
+TopologyCoordinator::UnelectableReasonMask TopologyCoordinator::_getMyUnelectableReason(
const Date_t now, StartElectionReason reason) const {
UnelectableReasonMask result = None;
const OpTime lastApplied = getMyLastAppliedOpTime();
@@ -2423,8 +2458,7 @@ TopologyCoordinatorImpl::UnelectableReasonMask TopologyCoordinatorImpl::_getMyUn
return result;
}
-std::string TopologyCoordinatorImpl::_getUnelectableReasonString(
- const UnelectableReasonMask ur) const {
+std::string TopologyCoordinator::_getUnelectableReasonString(const UnelectableReasonMask ur) const {
invariant(ur);
str::stream ss;
bool hasWrittenToStream = false;
@@ -2521,15 +2555,15 @@ std::string TopologyCoordinatorImpl::_getUnelectableReasonString(
return ss;
}
-Milliseconds TopologyCoordinatorImpl::_getPing(const HostAndPort& host) {
+Milliseconds TopologyCoordinator::_getPing(const HostAndPort& host) {
return _pings[host].getMillis();
}
-void TopologyCoordinatorImpl::_setElectionTime(const Timestamp& newElectionTime) {
+void TopologyCoordinator::_setElectionTime(const Timestamp& newElectionTime) {
_electionTime = newElectionTime;
}
-int TopologyCoordinatorImpl::_getTotalPings() {
+int TopologyCoordinator::_getTotalPings() {
PingMap::iterator it = _pings.begin();
PingMap::iterator end = _pings.end();
int totalPings = 0;
@@ -2540,7 +2574,7 @@ int TopologyCoordinatorImpl::_getTotalPings() {
return totalPings;
}
-std::vector<HostAndPort> TopologyCoordinatorImpl::getMaybeUpHostAndPorts() const {
+std::vector<HostAndPort> TopologyCoordinator::getMaybeUpHostAndPorts() const {
std::vector<HostAndPort> upHosts;
for (std::vector<MemberData>::const_iterator it = _memberData.begin(); it != _memberData.end();
++it) {
@@ -2557,7 +2591,7 @@ std::vector<HostAndPort> TopologyCoordinatorImpl::getMaybeUpHostAndPorts() const
return upHosts;
}
-bool TopologyCoordinatorImpl::voteForMyself(Date_t now) {
+bool TopologyCoordinator::voteForMyself(Date_t now) {
if (_role != Role::candidate) {
return false;
}
@@ -2574,12 +2608,12 @@ bool TopologyCoordinatorImpl::voteForMyself(Date_t now) {
return true;
}
-bool TopologyCoordinatorImpl::isSteppingDown() const {
+bool TopologyCoordinator::isSteppingDown() const {
return _leaderMode == LeaderMode::kAttemptingStepDown ||
_leaderMode == LeaderMode::kSteppingDown;
}
-void TopologyCoordinatorImpl::_setLeaderMode(TopologyCoordinator::LeaderMode newMode) {
+void TopologyCoordinator::_setLeaderMode(TopologyCoordinator::LeaderMode newMode) {
// Invariants for valid state transitions.
switch (_leaderMode) {
case LeaderMode::kNotLeader:
@@ -2607,7 +2641,7 @@ void TopologyCoordinatorImpl::_setLeaderMode(TopologyCoordinator::LeaderMode new
_leaderMode = std::move(newMode);
}
-MemberState TopologyCoordinatorImpl::getMemberState() const {
+MemberState TopologyCoordinator::getMemberState() const {
if (_selfIndex == -1) {
if (_rsConfig.isInitialized()) {
return MemberState::RS_REMOVED;
@@ -2646,17 +2680,17 @@ MemberState TopologyCoordinatorImpl::getMemberState() const {
return _followerMode;
}
-bool TopologyCoordinatorImpl::canAcceptWrites() const {
+bool TopologyCoordinator::canAcceptWrites() const {
return _leaderMode == LeaderMode::kMaster;
}
-void TopologyCoordinatorImpl::setElectionInfo(OID electionId, Timestamp electionOpTime) {
+void TopologyCoordinator::setElectionInfo(OID electionId, Timestamp electionOpTime) {
invariant(_role == Role::leader);
_electionTime = electionOpTime;
_electionId = electionId;
}
-void TopologyCoordinatorImpl::processWinElection(OID electionId, Timestamp electionOpTime) {
+void TopologyCoordinator::processWinElection(OID electionId, Timestamp electionOpTime) {
invariant(_role == Role::candidate);
invariant(_leaderMode == LeaderMode::kNotLeader);
_role = Role::leader;
@@ -2670,7 +2704,7 @@ void TopologyCoordinatorImpl::processWinElection(OID electionId, Timestamp elect
OpTime(Timestamp(std::numeric_limits<int>::max(), 0), std::numeric_limits<int>::max());
}
-void TopologyCoordinatorImpl::processLoseElection() {
+void TopologyCoordinator::processLoseElection() {
invariant(_role == Role::candidate);
invariant(_leaderMode == LeaderMode::kNotLeader);
const HostAndPort syncSourceAddress = getSyncSourceAddress();
@@ -2685,7 +2719,7 @@ void TopologyCoordinatorImpl::processLoseElection() {
}
}
-bool TopologyCoordinatorImpl::attemptStepDown(
+bool TopologyCoordinator::attemptStepDown(
long long termAtStart, Date_t now, Date_t waitUntil, Date_t stepDownUntil, bool force) {
if (_role != Role::leader || _leaderMode == LeaderMode::kSteppingDown || _term != termAtStart) {
@@ -2724,9 +2758,7 @@ bool TopologyCoordinatorImpl::attemptStepDown(
return true;
}
-bool TopologyCoordinatorImpl::_canCompleteStepDownAttempt(Date_t now,
- Date_t waitUntil,
- bool force) {
+bool TopologyCoordinator::_canCompleteStepDownAttempt(Date_t now, Date_t waitUntil, bool force) {
const bool forceNow = force && (now >= waitUntil);
if (forceNow) {
return true;
@@ -2735,7 +2767,7 @@ bool TopologyCoordinatorImpl::_canCompleteStepDownAttempt(Date_t now,
return isSafeToStepDown();
}
-bool TopologyCoordinatorImpl::isSafeToStepDown() {
+bool TopologyCoordinator::isSafeToStepDown() {
if (!_rsConfig.isInitialized() || _selfIndex < 0) {
return false;
}
@@ -2767,7 +2799,7 @@ bool TopologyCoordinatorImpl::isSafeToStepDown() {
return false;
}
-void TopologyCoordinatorImpl::setFollowerMode(MemberState::MS newMode) {
+void TopologyCoordinator::setFollowerMode(MemberState::MS newMode) {
invariant(_role == Role::follower);
switch (newMode) {
case MemberState::RS_RECOVERING:
@@ -2793,13 +2825,13 @@ void TopologyCoordinatorImpl::setFollowerMode(MemberState::MS newMode) {
}
}
-bool TopologyCoordinatorImpl::_isElectableNodeInSingleNodeReplicaSet() const {
+bool TopologyCoordinator::_isElectableNodeInSingleNodeReplicaSet() const {
return _followerMode == MemberState::RS_SECONDARY && _rsConfig.getNumMembers() == 1 &&
_selfIndex == 0 && _rsConfig.getMemberAt(_selfIndex).isElectable() &&
_maintenanceModeCalls == 0;
}
-void TopologyCoordinatorImpl::finishUnconditionalStepDown() {
+void TopologyCoordinator::finishUnconditionalStepDown() {
invariant(_leaderMode == LeaderMode::kSteppingDown);
int remotePrimaryIndex = -1;
@@ -2824,7 +2856,7 @@ void TopologyCoordinatorImpl::finishUnconditionalStepDown() {
_stepDownSelfAndReplaceWith(remotePrimaryIndex);
}
-void TopologyCoordinatorImpl::_stepDownSelfAndReplaceWith(int newPrimary) {
+void TopologyCoordinator::_stepDownSelfAndReplaceWith(int newPrimary) {
invariant(_role == Role::leader);
invariant(_selfIndex != -1);
invariant(_selfIndex != newPrimary);
@@ -2834,7 +2866,7 @@ void TopologyCoordinatorImpl::_stepDownSelfAndReplaceWith(int newPrimary) {
_setLeaderMode(LeaderMode::kNotLeader);
}
-bool TopologyCoordinatorImpl::updateLastCommittedOpTime() {
+bool TopologyCoordinator::updateLastCommittedOpTime() {
// If we're not primary or we're stepping down due to learning of a new term then we must not
// advance the commit point. If we are stepping down due to a user request, however, then it
// is safe to advance the commit point, and in fact we must since the stepdown request may be
@@ -2870,7 +2902,7 @@ bool TopologyCoordinatorImpl::updateLastCommittedOpTime() {
return advanceLastCommittedOpTime(committedOpTime);
}
-bool TopologyCoordinatorImpl::advanceLastCommittedOpTime(const OpTime& committedOpTime) {
+bool TopologyCoordinator::advanceLastCommittedOpTime(const OpTime& committedOpTime) {
if (committedOpTime == _lastCommittedOpTime) {
return false; // Hasn't changed, so ignore it.
} else if (committedOpTime < _lastCommittedOpTime) {
@@ -2891,12 +2923,11 @@ bool TopologyCoordinatorImpl::advanceLastCommittedOpTime(const OpTime& committed
return true;
}
-OpTime TopologyCoordinatorImpl::getLastCommittedOpTime() const {
+OpTime TopologyCoordinator::getLastCommittedOpTime() const {
return _lastCommittedOpTime;
}
-bool TopologyCoordinatorImpl::canCompleteTransitionToPrimary(
- long long termWhenDrainCompleted) const {
+bool TopologyCoordinator::canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const {
if (termWhenDrainCompleted != _term) {
return false;
@@ -2910,7 +2941,7 @@ bool TopologyCoordinatorImpl::canCompleteTransitionToPrimary(
return true;
}
-Status TopologyCoordinatorImpl::completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) {
+Status TopologyCoordinator::completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) {
if (!canCompleteTransitionToPrimary(firstOpTimeOfTerm.getTerm())) {
return Status(ErrorCodes::PrimarySteppedDown,
"By the time this node was ready to complete its transition to PRIMARY it "
@@ -2923,18 +2954,17 @@ Status TopologyCoordinatorImpl::completeTransitionToPrimary(const OpTime& firstO
return Status::OK();
}
-void TopologyCoordinatorImpl::adjustMaintenanceCountBy(int inc) {
+void TopologyCoordinator::adjustMaintenanceCountBy(int inc) {
invariant(_role == Role::follower);
_maintenanceModeCalls += inc;
invariant(_maintenanceModeCalls >= 0);
}
-int TopologyCoordinatorImpl::getMaintenanceCount() const {
+int TopologyCoordinator::getMaintenanceCount() const {
return _maintenanceModeCalls;
}
-TopologyCoordinator::UpdateTermResult TopologyCoordinatorImpl::updateTerm(long long term,
- Date_t now) {
+TopologyCoordinator::UpdateTermResult TopologyCoordinator::updateTerm(long long term, Date_t now) {
if (term <= _term) {
return TopologyCoordinator::UpdateTermResult::kAlreadyUpToDate;
}
@@ -2952,13 +2982,13 @@ TopologyCoordinator::UpdateTermResult TopologyCoordinatorImpl::updateTerm(long l
}
-long long TopologyCoordinatorImpl::getTerm() {
+long long TopologyCoordinator::getTerm() const {
return _term;
}
// TODO(siyuan): Merge _hddata into _slaveInfo, so that we have a single view of the
// replset. Passing metadata is unnecessary.
-bool TopologyCoordinatorImpl::shouldChangeSyncSource(
+bool TopologyCoordinator::shouldChangeSyncSource(
const HostAndPort& currentSource,
const rpc::ReplSetMetadata& replMetadata,
boost::optional<rpc::OplogQueryMetadata> oqMetadata,
@@ -3079,7 +3109,7 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(
return false;
}
-rpc::ReplSetMetadata TopologyCoordinatorImpl::prepareReplSetMetadata(
+rpc::ReplSetMetadata TopologyCoordinator::prepareReplSetMetadata(
const OpTime& lastVisibleOpTime) const {
return rpc::ReplSetMetadata(_term,
_lastCommittedOpTime,
@@ -3090,7 +3120,7 @@ rpc::ReplSetMetadata TopologyCoordinatorImpl::prepareReplSetMetadata(
_rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress()));
}
-rpc::OplogQueryMetadata TopologyCoordinatorImpl::prepareOplogQueryMetadata(int rbid) const {
+rpc::OplogQueryMetadata TopologyCoordinator::prepareOplogQueryMetadata(int rbid) const {
return rpc::OplogQueryMetadata(_lastCommittedOpTime,
getMyLastAppliedOpTime(),
rbid,
@@ -3098,7 +3128,7 @@ rpc::OplogQueryMetadata TopologyCoordinatorImpl::prepareOplogQueryMetadata(int r
_rsConfig.findMemberIndexByHostAndPort(getSyncSourceAddress()));
}
-void TopologyCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) {
+void TopologyCoordinator::summarizeAsHtml(ReplSetHtmlSummary* output) {
output->setConfig(_rsConfig);
output->setHBData(_memberData);
output->setSelfIndex(_selfIndex);
@@ -3107,8 +3137,8 @@ void TopologyCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) {
output->setSelfHeartbeatMessage(_hbmsg);
}
-void TopologyCoordinatorImpl::processReplSetRequestVotes(const ReplSetRequestVotesArgs& args,
- ReplSetRequestVotesResponse* response) {
+void TopologyCoordinator::processReplSetRequestVotes(const ReplSetRequestVotesArgs& args,
+ ReplSetRequestVotesResponse* response) {
response->setTerm(_term);
if (args.getTerm() < _term) {
@@ -3164,21 +3194,21 @@ void TopologyCoordinatorImpl::processReplSetRequestVotes(const ReplSetRequestVot
}
}
-void TopologyCoordinatorImpl::loadLastVote(const LastVote& lastVote) {
+void TopologyCoordinator::loadLastVote(const LastVote& lastVote) {
_lastVote = lastVote;
}
-void TopologyCoordinatorImpl::voteForMyselfV1() {
+void TopologyCoordinator::voteForMyselfV1() {
_lastVote.setTerm(_term);
_lastVote.setCandidateIndex(_selfIndex);
}
-void TopologyCoordinatorImpl::setPrimaryIndex(long long primaryIndex) {
+void TopologyCoordinator::setPrimaryIndex(long long primaryIndex) {
_currentPrimaryIndex = primaryIndex;
}
-Status TopologyCoordinatorImpl::becomeCandidateIfElectable(const Date_t now,
- StartElectionReason reason) {
+Status TopologyCoordinator::becomeCandidateIfElectable(const Date_t now,
+ StartElectionReason reason) {
if (_role == Role::leader) {
return {ErrorCodes::NodeNotElectable, "Not standing for election again; already primary"};
}
@@ -3200,18 +3230,18 @@ Status TopologyCoordinatorImpl::becomeCandidateIfElectable(const Date_t now,
return Status::OK();
}
-void TopologyCoordinatorImpl::setStorageEngineSupportsReadCommitted(bool supported) {
+void TopologyCoordinator::setStorageEngineSupportsReadCommitted(bool supported) {
_storageEngineSupportsReadCommitted =
supported ? ReadCommittedSupport::kYes : ReadCommittedSupport::kNo;
}
-void TopologyCoordinatorImpl::restartHeartbeats() {
+void TopologyCoordinator::restartHeartbeats() {
for (auto& hb : _memberData) {
hb.restart();
}
}
-boost::optional<OpTime> TopologyCoordinatorImpl::latestKnownOpTimeSinceHeartbeatRestart() const {
+boost::optional<OpTime> TopologyCoordinator::latestKnownOpTimeSinceHeartbeatRestart() const {
// The smallest OpTime in PV1.
OpTime latest(Timestamp(0, 0), 0);
for (size_t i = 0; i < _memberData.size(); i++) {
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 046ff2217a6..eca071e590a 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -28,241 +28,735 @@
#pragma once
+#include <iosfwd>
#include <string>
-#include <vector>
-#include "mongo/bson/timestamp.h"
+#include "mongo/base/disallow_copying.h"
#include "mongo/db/repl/last_vote.h"
-#include "mongo/db/repl/member_data.h"
-#include "mongo/db/repl/member_state.h"
-#include "mongo/db/repl/optime.h"
-#include "mongo/db/repl/repl_set_config.h"
+#include "mongo/db/repl/repl_set_heartbeat_response.h"
#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/db/server_options.h"
+#include "mongo/stdx/functional.h"
+#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"
namespace mongo {
-
-class OperationContext;
-
-namespace rpc {
-class ReplSetMetadata;
-} // namespace rpc
+class Timestamp;
namespace repl {
-
-static const Milliseconds UninitializedPing{-1};
+class HeartbeatResponseAction;
+class MemberData;
+class OpTime;
+class ReplSetHeartbeatArgs;
+class ReplSetConfig;
+class TagSubgroup;
+struct MemberState;
/**
- * Represents a latency measurement for each replica set member based on heartbeat requests.
- * The measurement is an average weighted 80% to the old value, and 20% to the new value.
+ * Replication Topology Coordinator
*
- * Also stores information about heartbeat progress and retries.
+ * This object is responsible for managing the topology of the cluster.
+ * Tasks include consensus and leader election, chaining, and configuration management.
+ * Methods of this class should be non-blocking.
*/
-class PingStats {
+class TopologyCoordinator {
+ MONGO_DISALLOW_COPYING(TopologyCoordinator);
+
public:
/**
- * Records that a new heartbeat request started at "now".
+ * Type that denotes the role of a node in the replication protocol.
*
- * This resets the failure count used in determining whether the next request to a target
- * should be a retry or a regularly scheduled heartbeat message.
+ * The role is distinct from MemberState, in that it only deals with the
+ * roles a node plays in the basic protocol -- leader, follower and candidate.
+ * The mapping between MemberState and Role is complex -- several MemberStates
+ * map to the follower role, and MemberState::RS_SECONDARY maps to either
+ * follower or candidate roles, e.g.
*/
- void start(Date_t now);
+ class Role {
+ public:
+ /**
+ * Constant indicating leader role.
+ */
+ static const Role leader;
+
+ /**
+ * Constant indicating follower role.
+ */
+ static const Role follower;
+
+ /**
+ * Constant indicating candidate role
+ */
+ static const Role candidate;
+
+ Role() {}
+
+ bool operator==(Role other) const {
+ return _value == other._value;
+ }
+ bool operator!=(Role other) const {
+ return _value != other._value;
+ }
+
+ std::string toString() const;
+
+ private:
+ explicit Role(int value);
+
+ int _value;
+ };
+
+ struct Options {
+ // A sync source is re-evaluated after it lags behind further than this amount.
+ Seconds maxSyncSourceLagSecs{0};
+
+ // Whether or not this node is running as a config server.
+ ClusterRole clusterRole{ClusterRole::None};
+ };
/**
- * Records that a heartbeat request completed successfully, and that "millis" milliseconds
- * were spent for a single network roundtrip plus remote processing time.
+ * Constructs a Topology Coordinator object.
+ **/
+ TopologyCoordinator(Options options);
+
+
+ ~TopologyCoordinator();
+
+ /**
+ * Different modes a node can be in while still reporting itself as in state PRIMARY.
+ *
+ * Valid transitions:
+ *
+ * kNotLeader <----------------------------------
+ * | |
+ * | |
+ * | |
+ * v |
+ * kLeaderElect----- |
+ * | | |
+ * | | |
+ * v | |
+ * kMaster ------------------------- |
+ * | ^ | | |
+ * | | ------------------- | |
+ * | | | | | |
+ * v | v v v |
+ * kAttemptingStepDown----------->kSteppingDown |
+ * | | |
+ * | | |
+ * | | |
+ * ---------------------------------------------
+ *
*/
- void hit(Milliseconds millis);
+ enum class LeaderMode {
+ kNotLeader, // This node is not currently a leader.
+ kLeaderElect, // This node has been elected leader, but can't yet accept writes.
+ kMaster, // This node reports ismaster:true and can accept writes.
+ kSteppingDown, // This node is in the middle of a (hb) stepdown that must complete.
+ kAttemptingStepDown, // This node is in the middle of a stepdown (cmd) that might fail.
+ };
+
+ ////////////////////////////////////////////////////////////
+ //
+ // State inspection methods.
+ //
+ ////////////////////////////////////////////////////////////
/**
- * Records that a heartbeat request failed.
+ * Gets the role of this member in the replication protocol.
*/
- void miss();
+ Role getRole() const;
/**
- * Gets the number of hit() calls.
+ * Gets the MemberState of this member in the replica set.
*/
- unsigned int getCount() const {
- return count;
- }
+ MemberState getMemberState() const;
/**
- * Gets the weighted average round trip time for heartbeat messages to the target.
- * Returns 0 if there have been no pings recorded yet.
+ * Returns whether this node should be allowed to accept writes.
*/
- Milliseconds getMillis() const {
- return value == UninitializedPing ? Milliseconds(0) : value;
- }
+ bool canAcceptWrites() const;
/**
- * Gets the date at which start() was last called, which is used to determine if
- * a heartbeat should be retried or if the time limit has expired.
+ * Returns true if this node is in the process of stepping down. Note that this can be
+ * due to an unconditional stepdown that must succeed (for instance from learning about a new
+ * term) or due to a stepdown attempt that could fail (for instance from a stepdown cmd that
+ * could fail if not enough nodes are caught up).
*/
- Date_t getLastHeartbeatStartDate() const {
- return _lastHeartbeatStartDate;
- }
+ bool isSteppingDown() const;
/**
- * Gets the number of failures since start() was last called.
+ * Returns the address of the current sync source, or an empty HostAndPort if there is no
+ * current sync source.
+ */
+ HostAndPort getSyncSourceAddress() const;
+
+ /**
+ * Retrieves a vector of HostAndPorts containing all nodes that are neither DOWN nor
+ * ourself.
+ */
+ std::vector<HostAndPort> getMaybeUpHostAndPorts() const;
+
+ /**
+ * Gets the earliest time the current node will stand for election.
+ */
+ Date_t getStepDownTime() const;
+
+ /**
+ * Gets the current value of the maintenance mode counter.
+ */
+ int getMaintenanceCount() const;
+
+ /**
+ * Gets the latest term this member is aware of. If this member is the primary,
+ * it's the current term of the replica set.
+ */
+ long long getTerm() const;
+
+ enum class UpdateTermResult { kAlreadyUpToDate, kTriggerStepDown, kUpdatedTerm };
+
+ ////////////////////////////////////////////////////////////
+ //
+ // Basic state manipulation methods.
+ //
+ ////////////////////////////////////////////////////////////
+
+ /**
+ * Sets the latest term this member is aware of to the higher of its current value and
+ * the value passed in as "term".
+ * Returns the result of setting the term value, or if a stepdown should be triggered.
+ */
+ UpdateTermResult updateTerm(long long term, Date_t now);
+
+ /**
+ * Sets the index into the config used when we next choose a sync source
+ */
+ void setForceSyncSourceIndex(int index);
+
+ enum class ChainingPreference { kAllowChaining, kUseConfiguration };
+
+ /**
+ * Chooses and sets a new sync source, based on our current knowledge of the world.
+ */
+ HostAndPort chooseNewSyncSource(Date_t now,
+ const OpTime& lastOpTimeFetched,
+ ChainingPreference chainingPreference);
+
+ /**
+ * Suppresses selecting "host" as sync source until "until".
+ */
+ void blacklistSyncSource(const HostAndPort& host, Date_t until);
+
+ /**
+ * Removes a single entry "host" from the list of potential sync sources which we
+ * have blacklisted, if it is supposed to be unblacklisted by "now".
+ */
+ void unblacklistSyncSource(const HostAndPort& host, Date_t now);
+
+ /**
+ * Clears the list of potential sync sources we have blacklisted.
+ */
+ void clearSyncSourceBlacklist();
+
+ /**
+ * Determines if a new sync source should be chosen, if a better candidate sync source is
+ * available. If the current sync source's last optime ("syncSourceLastOpTime" under
+ * protocolVersion 1, but pulled from the MemberData in protocolVersion 0) is more than
+ * _maxSyncSourceLagSecs behind any syncable source, this function returns true. If we are
+ * running in ProtocolVersion 1, our current sync source is not primary, has no sync source
+ * ("syncSourceHasSyncSource" is false), and only has data up to "myLastOpTime", returns true.
*
- * This value is incremented by calls to miss(), cleared by calls to start() and
- * set to the maximum possible value by calls to hit().
+ * "now" is used to skip over currently blacklisted sync sources.
+ *
+ * TODO (SERVER-27668): Make OplogQueryMetadata non-optional in mongodb 3.8.
*/
- int getNumFailuresSinceLastStart() const {
- return _numFailuresSinceLastStart;
- }
+ bool shouldChangeSyncSource(const HostAndPort& currentSource,
+ const rpc::ReplSetMetadata& replMetadata,
+ boost::optional<rpc::OplogQueryMetadata> oqMetadata,
+ Date_t now) const;
-private:
- unsigned int count = 0;
- Milliseconds value = UninitializedPing;
- Date_t _lastHeartbeatStartDate;
- int _numFailuresSinceLastStart = std::numeric_limits<int>::max();
-};
+ /**
+ * Checks whether we are a single node set and we are not in a stepdown period. If so,
+ * puts us into candidate mode, otherwise does nothing. This is used to ensure that
+ * nodes in a single node replset become primary again when their stepdown period ends.
+ */
+ bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now);
-class TopologyCoordinatorImpl : public TopologyCoordinator {
-public:
- struct Options {
- // A sync source is re-evaluated after it lags behind further than this amount.
- Seconds maxSyncSourceLagSecs{0};
+ /**
+ * Sets the earliest time the current node will stand for election to "newTime".
+ *
+ * Until this time, while the node may report itself as electable, it will not stand
+ * for election.
+ */
+ void setElectionSleepUntil(Date_t newTime);
- // Whether or not this node is running as a config server.
- ClusterRole clusterRole{ClusterRole::None};
- };
+ /**
+ * Sets the reported mode of this node to one of RS_SECONDARY, RS_STARTUP2, RS_ROLLBACK or
+ * RS_RECOVERING, when getRole() == Role::follower. This is the interface by which the
+ * applier changes the reported member state of the current node, and enables or suppresses
+ * electability of the current node. All modes but RS_SECONDARY indicate an unelectable
+ * follower state (one that cannot transition to candidate).
+ */
+ void setFollowerMode(MemberState::MS newMode);
/**
- * Constructs a Topology Coordinator object.
- **/
- TopologyCoordinatorImpl(Options options);
+ * Scan the memberData and determine the highest last applied or last
+ * durable optime present on a majority of servers; set _lastCommittedOpTime to this
+ * new entry.
+ * Whether the last applied or last durable op time is used depends on whether
+ * the config getWriteConcernMajorityShouldJournal is set.
+ * Returns true if the _lastCommittedOpTime was changed.
+ */
+ bool updateLastCommittedOpTime();
+
+ /**
+ * Updates _lastCommittedOpTime to be "committedOpTime" if it is more recent than the
+ * current last committed OpTime. Returns true if _lastCommittedOpTime is changed.
+ */
+ bool advanceLastCommittedOpTime(const OpTime& committedOpTime);
+
+ /**
+ * Returns the OpTime of the latest majority-committed op known to this server.
+ */
+ OpTime getLastCommittedOpTime() const;
+
+ /**
+ * Returns true if it's safe to transition to LeaderMode::kMaster.
+ */
+ bool canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const;
+
+ /**
+ * Called by the ReplicationCoordinator to signal that we have finished catchup and drain modes
+ * and are ready to fully become primary and start accepting writes.
+ * "firstOpTimeOfTerm" is a floor on the OpTimes this node will be allowed to consider committed
+ * for this tenure as primary. This prevents entries from before our election from counting as
+ * committed in our view, until our election (the "firstOpTimeOfTerm" op) has been committed.
+ * Returns PrimarySteppedDown if this node is no longer eligible to begin accepting writes.
+ */
+ Status completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm);
+
+ /**
+ * Adjusts the maintenance mode count by "inc".
+ *
+ * It is an error to call this method if getRole() does not return Role::follower.
+ * It is an error to allow the maintenance count to go negative.
+ */
+ void adjustMaintenanceCountBy(int inc);
////////////////////////////////////////////////////////////
//
- // Implementation of TopologyCoordinator interface
+ // Methods that prepare responses to command requests.
//
////////////////////////////////////////////////////////////
- virtual Role getRole() const;
- virtual MemberState getMemberState() const;
- virtual bool canAcceptWrites() const override;
- virtual bool isSteppingDown() const override;
- virtual HostAndPort getSyncSourceAddress() const;
- virtual std::vector<HostAndPort> getMaybeUpHostAndPorts() const;
- virtual int getMaintenanceCount() const;
- virtual long long getTerm();
- virtual UpdateTermResult updateTerm(long long term, Date_t now);
- virtual void setForceSyncSourceIndex(int index);
- virtual HostAndPort chooseNewSyncSource(Date_t now,
- const OpTime& lastOpTimeFetched,
- ChainingPreference chainingPreference) override;
- virtual void blacklistSyncSource(const HostAndPort& host, Date_t until);
- virtual void unblacklistSyncSource(const HostAndPort& host, Date_t now);
- virtual void clearSyncSourceBlacklist();
- virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const rpc::ReplSetMetadata& replMetadata,
- boost::optional<rpc::OplogQueryMetadata> oqMetadata,
- Date_t now) const;
- virtual bool becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(Date_t now);
- virtual void setElectionSleepUntil(Date_t newTime);
- virtual void setFollowerMode(MemberState::MS newMode);
- virtual bool updateLastCommittedOpTime();
- virtual bool advanceLastCommittedOpTime(const OpTime& committedOpTime);
- virtual OpTime getLastCommittedOpTime() const;
- virtual bool canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const override;
- virtual Status completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) override;
- virtual void adjustMaintenanceCountBy(int inc);
- virtual void prepareSyncFromResponse(const HostAndPort& target,
- BSONObjBuilder* response,
- Status* result);
- virtual void prepareFreshResponse(const ReplicationCoordinator::ReplSetFreshArgs& args,
- Date_t now,
- BSONObjBuilder* response,
- Status* result);
- virtual void prepareElectResponse(const ReplicationCoordinator::ReplSetElectArgs& args,
- Date_t now,
- BSONObjBuilder* response,
- Status* result);
- virtual Status prepareHeartbeatResponse(Date_t now,
- const ReplSetHeartbeatArgs& args,
- const std::string& ourSetName,
- ReplSetHeartbeatResponse* response);
- virtual Status prepareHeartbeatResponseV1(Date_t now,
- const ReplSetHeartbeatArgsV1& args,
- const std::string& ourSetName,
- ReplSetHeartbeatResponse* response);
- virtual void prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs,
- BSONObjBuilder* response,
- Status* result);
- virtual StatusWith<BSONObj> prepareReplSetUpdatePositionCommand(
+ // produces a reply to a replSetSyncFrom command
+ void prepareSyncFromResponse(const HostAndPort& target,
+ BSONObjBuilder* response,
+ Status* result);
+
+ // produce a reply to a replSetFresh command
+ void prepareFreshResponse(const ReplicationCoordinator::ReplSetFreshArgs& args,
+ Date_t now,
+ BSONObjBuilder* response,
+ Status* result);
+
+ // produce a reply to a received electCmd
+ void prepareElectResponse(const ReplicationCoordinator::ReplSetElectArgs& args,
+ Date_t now,
+ BSONObjBuilder* response,
+ Status* result);
+
+ // produce a reply to a heartbeat
+ Status prepareHeartbeatResponse(Date_t now,
+ const ReplSetHeartbeatArgs& args,
+ const std::string& ourSetName,
+ ReplSetHeartbeatResponse* response);
+
+ // produce a reply to a V1 heartbeat
+ Status prepareHeartbeatResponseV1(Date_t now,
+ const ReplSetHeartbeatArgsV1& args,
+ const std::string& ourSetName,
+ ReplSetHeartbeatResponse* response);
+
+ struct ReplSetStatusArgs {
+ Date_t now;
+ unsigned selfUptime;
+ const OpTime& readConcernMajorityOpTime;
+ const BSONObj& initialSyncStatus;
+ };
+
+ // produce a reply to a status request
+ void prepareStatusResponse(const ReplSetStatusArgs& rsStatusArgs,
+ BSONObjBuilder* response,
+ Status* result);
+
+ // Produce a replSetUpdatePosition command to be sent to the node's sync source.
+ StatusWith<BSONObj> prepareReplSetUpdatePositionCommand(
ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle,
OpTime currentCommittedSnapshotOpTime) const;
- virtual void fillIsMasterForReplSet(IsMasterResponse* response);
- virtual void fillMemberData(BSONObjBuilder* result);
- virtual StatusWith<PrepareFreezeResponseResult> prepareFreezeResponse(Date_t now,
- int secs,
- BSONObjBuilder* response);
- virtual void updateConfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now);
- virtual std::pair<ReplSetHeartbeatArgs, Milliseconds> prepareHeartbeatRequest(
+ // produce a reply to an ismaster request. It is only valid to call this if we are a
+ // replset.
+ void fillIsMasterForReplSet(IsMasterResponse* response);
+
+ // Produce member data for the serverStatus command and diagnostic logging.
+ void fillMemberData(BSONObjBuilder* result);
+
+ enum class PrepareFreezeResponseResult { kNoAction, kElectSelf };
+
+ /**
+ * Produce a reply to a freeze request. Returns a PostMemberStateUpdateAction on success that
+ * may trigger state changes in the caller.
+ */
+ StatusWith<PrepareFreezeResponseResult> prepareFreezeResponse(Date_t now,
+ int secs,
+ BSONObjBuilder* response);
+
+ ////////////////////////////////////////////////////////////
+ //
+ // Methods for sending and receiving heartbeats,
+ // reconfiguring and handling the results of standing for
+ // election.
+ //
+ ////////////////////////////////////////////////////////////
+
+ /**
+ * Updates the topology coordinator's notion of the replica set configuration.
+ *
+ * "newConfig" is the new configuration, and "selfIndex" is the index of this
+ * node's configuration information in "newConfig", or "selfIndex" is -1 to
+ * indicate that this node is not a member of "newConfig".
+ *
+ * newConfig.isInitialized() should be true, though implementations may accept
+ * configurations where this is not true, for testing purposes.
+ */
+ void updateConfig(const ReplSetConfig& newConfig, int selfIndex, Date_t now);
+
+ /**
+ * Prepares a heartbeat request appropriate for sending to "target", assuming the
+ * current time is "now". "ourSetName" is used as the name for our replica set if
+ * the topology coordinator does not have a valid configuration installed.
+ *
+ * The returned pair contains proper arguments for a replSetHeartbeat command, and
+ * an amount of time to wait for the response.
+ *
+ * This call should be paired (with intervening network communication) with a call to
+ * processHeartbeatResponse for the same "target".
+ */
+ std::pair<ReplSetHeartbeatArgs, Milliseconds> prepareHeartbeatRequest(
Date_t now, const std::string& ourSetName, const HostAndPort& target);
- virtual std::pair<ReplSetHeartbeatArgsV1, Milliseconds> prepareHeartbeatRequestV1(
+ std::pair<ReplSetHeartbeatArgsV1, Milliseconds> prepareHeartbeatRequestV1(
Date_t now, const std::string& ourSetName, const HostAndPort& target);
- virtual HeartbeatResponseAction processHeartbeatResponse(
+
+ /**
+ * Processes a heartbeat response from "target" that arrived around "now", having
+ * spent "networkRoundTripTime" millis on the network.
+ *
+ * Updates internal topology coordinator state, and returns instructions about what action
+ * to take next.
+ *
+ * If the next action indicates StartElection, the topology coordinator has transitioned to
+ * the "candidate" role, and will remain there until processWinElection or
+ * processLoseElection are called.
+ *
+ * If the next action indicates "StepDownSelf", the topology coordinator has transitioned
+ * to the "follower" role from "leader", and the caller should take any necessary actions
+ * to become a follower.
+ *
+ * If the next action indicates "StepDownRemotePrimary", the caller should take steps to
+ * cause the specified remote host to step down from primary to secondary.
+ *
+ * If the next action indicates "Reconfig", the caller should verify the configuration in
+ * hbResponse is acceptable, perform any other reconfiguration actions it must, and call
+ * updateConfig with the new configuration and the appropriate value for "selfIndex". It
+ * must also wrap up any outstanding elections (by calling processLoseElection or
+ * processWinElection) before calling updateConfig.
+ *
+ * This call should be paired (with intervening network communication) with a call to
+ * prepareHeartbeatRequest for the same "target".
+ */
+ HeartbeatResponseAction processHeartbeatResponse(
Date_t now,
Milliseconds networkRoundTripTime,
const HostAndPort& target,
const StatusWith<ReplSetHeartbeatResponse>& hbResponse);
- virtual bool voteForMyself(Date_t now);
- virtual void setElectionInfo(OID electionId, Timestamp electionOpTime);
- virtual void processWinElection(OID electionId, Timestamp electionOpTime);
- virtual void processLoseElection();
- virtual Status checkShouldStandForElection(Date_t now) const;
- virtual void setMyHeartbeatMessage(const Date_t now, const std::string& message);
- virtual bool attemptStepDown(long long termAtStart,
- Date_t now,
- Date_t waitUntil,
- Date_t stepDownUntil,
- bool force) override;
- virtual bool isSafeToStepDown() override;
- virtual bool prepareForUnconditionalStepDown() override;
- virtual Status prepareForStepDownAttempt() override;
- virtual void abortAttemptedStepDownIfNeeded() override;
- virtual void finishUnconditionalStepDown() override;
- virtual Date_t getStepDownTime() const;
- virtual rpc::ReplSetMetadata prepareReplSetMetadata(const OpTime& lastVisibleOpTime) const;
- virtual rpc::OplogQueryMetadata prepareOplogQueryMetadata(int rbid) const;
- virtual void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args,
- ReplSetRequestVotesResponse* response);
- virtual void summarizeAsHtml(ReplSetHtmlSummary* output);
- virtual void loadLastVote(const LastVote& lastVote);
- virtual void voteForMyselfV1();
- virtual void setPrimaryIndex(long long primaryIndex);
- virtual int getCurrentPrimaryIndex() const;
- virtual bool haveNumNodesReachedOpTime(const OpTime& opTime, int numNodes, bool durablyWritten);
- virtual bool haveTaggedNodesReachedOpTime(const OpTime& opTime,
- const ReplSetTagPattern& tagPattern,
- bool durablyWritten);
- virtual std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op,
- bool durablyWritten,
- bool skipSelf);
- virtual bool setMemberAsDown(Date_t now, const int memberIndex) override;
- virtual std::pair<int, Date_t> getStalestLiveMember() const;
- virtual HeartbeatResponseAction checkMemberTimeouts(Date_t now);
- virtual void resetAllMemberTimeouts(Date_t now);
- virtual void resetMemberTimeouts(Date_t now,
- const stdx::unordered_set<HostAndPort>& member_set);
- virtual OpTime getMyLastAppliedOpTime() const;
- virtual OpTime getMyLastDurableOpTime() const;
- virtual MemberData* getMyMemberData();
- virtual MemberData* findMemberDataByMemberId(const int memberId);
- virtual MemberData* findMemberDataByRid(const OID rid);
- virtual MemberData* addSlaveMemberData(const OID rid);
- virtual Status becomeCandidateIfElectable(const Date_t now, StartElectionReason reason);
- virtual void setStorageEngineSupportsReadCommitted(bool supported);
-
- virtual void restartHeartbeats();
-
- virtual boost::optional<OpTime> latestKnownOpTimeSinceHeartbeatRestart() const;
+
+ /**
+ * Returns whether or not at least 'numNodes' have reached the given opTime.
+ * "durablyWritten" indicates whether the operation has to be durably applied.
+ */
+ bool haveNumNodesReachedOpTime(const OpTime& opTime, int numNodes, bool durablyWritten);
+
+ /**
+ * Returns whether or not at least one node matching the tagPattern has reached
+ * the given opTime.
+ * "durablyWritten" indicates whether the operation has to be durably applied.
+ */
+ bool haveTaggedNodesReachedOpTime(const OpTime& opTime,
+ const ReplSetTagPattern& tagPattern,
+ bool durablyWritten);
+
+ /**
+ * Returns a vector of members that have applied the operation with OpTime 'op'.
+ * "durablyWritten" indicates whether the operation has to be durably applied.
+ * "skipSelf" means to exclude this node whether or not the op has been applied.
+ */
+ std::vector<HostAndPort> getHostsWrittenTo(const OpTime& op,
+ bool durablyWritten,
+ bool skipSelf);
+
+ /**
+ * Marks a member as down from our perspective and returns a bool which indicates if we can no
+ * longer see a majority of the nodes and thus should step down.
+ */
+ bool setMemberAsDown(Date_t now, const int memberIndex);
+
+ /**
+ * Goes through the memberData and determines which member that is currently live
+ * has the stalest (earliest) last update time. Returns (-1, Date_t::max()) if there are
+ * no other members.
+ */
+ std::pair<int, Date_t> getStalestLiveMember() const;
+
+ /**
+ * Go through the memberData, and mark nodes which haven't been updated
+ * recently (within an election timeout) as "down". Returns a HeartbeatResponseAction, which
+ * will be StepDownSelf if we can no longer see a majority of the nodes, otherwise NoAction.
+ */
+ HeartbeatResponseAction checkMemberTimeouts(Date_t now);
+
+ /**
+ * Set all nodes in memberData to not stale with a lastUpdate of "now".
+ */
+ void resetAllMemberTimeouts(Date_t now);
+
+ /**
+ * Set all nodes in memberData that are present in member_set
+ * to not stale with a lastUpdate of "now".
+ */
+ void resetMemberTimeouts(Date_t now, const stdx::unordered_set<HostAndPort>& member_set);
+
+ /*
+ * Returns the last optime that this node has applied, whether or not it has been journaled.
+ */
+ OpTime getMyLastAppliedOpTime() const;
+
+ /*
+ * Returns the last optime that this node has applied and journaled.
+ */
+ OpTime getMyLastDurableOpTime() const;
+
+ /*
+ * Returns information we have on the state of this node.
+ */
+ MemberData* getMyMemberData();
+
+ /*
+ * Returns information we have on the state of the node identified by memberId. Returns
+ * nullptr if memberId is not found in the configuration.
+ */
+ MemberData* findMemberDataByMemberId(const int memberId);
+
+ /*
+ * Returns information we have on the state of the node identified by rid. Returns
+ * nullptr if rid is not found in the heartbeat data. This method is used only for
+ * master/slave replication.
+ */
+ MemberData* findMemberDataByRid(const OID rid);
+
+ /*
+ * Adds and returns a memberData entry for the given RID.
+ * Used only in master/slave mode.
+ */
+ MemberData* addSlaveMemberData(const OID rid);
+
+ /**
+ * If getRole() == Role::candidate and this node has not voted too recently, updates the
+ * lastVote tracker and returns true. Otherwise, returns false.
+ */
+ bool voteForMyself(Date_t now);
+
+ /**
+ * Sets lastVote to be for ourself in this term.
+ */
+ void voteForMyselfV1();
+
+ /**
+ * Sets election id and election optime.
+ */
+ void setElectionInfo(OID electionId, Timestamp electionOpTime);
+
+ /**
+ * Performs state updates associated with winning an election.
+ *
+ * It is an error to call this if the topology coordinator is not in candidate mode.
+ *
+ * Exactly one of either processWinElection or processLoseElection must be called if
+ * processHeartbeatResponse returns StartElection, to exit candidate mode.
+ */
+ void processWinElection(OID electionId, Timestamp electionOpTime);
+
+ /**
+ * Performs state updates associated with losing an election.
+ *
+ * It is an error to call this if the topology coordinator is not in candidate mode.
+ *
+ * Exactly one of either processWinElection or processLoseElection must be called if
+ * processHeartbeatResponse returns StartElection, to exit candidate mode.
+ */
+ void processLoseElection();
+
+ /**
+ * Readies the TopologyCoordinator for an attempt to stepdown that may fail. This is used
+ * when we receive a stepdown command (which can fail if not enough secondaries are caught up)
+ * to ensure that we never process more than one stepdown request at a time.
+ * Returns OK if it is safe to continue with the stepdown attempt, or returns
+ * ConflictingOperationInProgess if this node is already processing a stepdown request of any
+ * kind.
+ */
+ Status prepareForStepDownAttempt();
+
+ /**
+ * If this node is still attempting to process a stepdown attempt, aborts the attempt and
+ * returns this node to normal primary/master state. If this node has already completed
+ * stepping down or is now in the process of handling an unconditional stepdown, then this
+ * method does nothing.
+ */
+ void abortAttemptedStepDownIfNeeded();
+
+ /**
+ * Tries to transition the coordinator from the leader role to the follower role.
+ *
+ * A step down succeeds based on the following conditions:
+ *
+ * C1. 'force' is true and now > waitUntil
+ *
+ * C2. A majority set of nodes, M, in the replica set have optimes greater than or
+ * equal to the last applied optime of the primary.
+ *
+ * C3. There exists at least one electable secondary node in the majority set M.
+ *
+ *
+ * If C1 is true, or if both C2 and C3 are true, then the stepdown occurs and this method
+ * returns true. If the conditions for successful stepdown aren't met yet, but waiting for more
+ * time to pass could make it succeed, returns false. If the whole stepdown attempt should be
+ * abandoned (for example because the time limit expired or because we've already stepped down),
+ * throws an exception.
+ * TODO(spencer): Unify with the finishUnconditionalStepDown() method.
+ */
+ bool attemptStepDown(
+ long long termAtStart, Date_t now, Date_t waitUntil, Date_t stepDownUntil, bool force);
+
+ /**
+ * Returns whether it is safe for a stepdown attempt to complete, ignoring the 'force' argument.
+ * This is essentially checking conditions C2 and C3 as described in the comment to
+ * attemptStepDown().
+ */
+ bool isSafeToStepDown();
+
+ /**
+ * Readies the TopologyCoordinator for stepdown. Returns false if we're already in the process
+ * of an unconditional step down. If we are in the middle of a stepdown command attempt when
+ * this is called then this unconditional stepdown will supersede the stepdown attempt, which
+ * will cause the stepdown to fail. When this returns true it must be followed by a call to
+ * finishUnconditionalStepDown() that is called when holding the global X lock.
+ */
+ bool prepareForUnconditionalStepDown();
+
+ /**
+ * Sometimes a request to step down comes in (like via a heartbeat), but we don't have the
+ * global exclusive lock so we can't actually stepdown at that moment. When that happens
+ * we record that a stepdown request is pending (by calling prepareForUnconditionalStepDown())
+ * and schedule work to stepdown in the global X lock. This method is called after holding the
+ * global lock to perform the actual stepdown.
+ * TODO(spencer): Unify with the finishAttemptedStepDown() method.
+ */
+ void finishUnconditionalStepDown();
+
+ /**
+ * Considers whether or not this node should stand for election, and returns true
+ * if the node has transitioned to candidate role as a result of the call.
+ */
+ Status checkShouldStandForElection(Date_t now) const;
+
+ /**
+ * Set the outgoing heartbeat message from self
+ */
+ void setMyHeartbeatMessage(const Date_t now, const std::string& s);
+
+ /**
+ * Prepares a ReplSetMetadata object describing the current term, primary, and lastOp
+ * information.
+ */
+ rpc::ReplSetMetadata prepareReplSetMetadata(const OpTime& lastVisibleOpTime) const;
+
+ /**
+ * Prepares an OplogQueryMetadata object describing the current sync source, rbid, primary,
+ * lastOpApplied, and lastOpCommitted.
+ */
+ rpc::OplogQueryMetadata prepareOplogQueryMetadata(int rbid) const;
+
+ /**
+ * Writes into 'output' all the information needed to generate a summary of the current
+ * replication state for use by the web interface.
+ */
+ void summarizeAsHtml(ReplSetHtmlSummary* output);
+
+ /**
+ * Prepares a ReplSetRequestVotesResponse.
+ */
+ void processReplSetRequestVotes(const ReplSetRequestVotesArgs& args,
+ ReplSetRequestVotesResponse* response);
+
+ /**
+ * Loads an initial LastVote document, which was read from local storage.
+ *
+ * Called only during replication startup. All other updates are done internally.
+ */
+ void loadLastVote(const LastVote& lastVote);
+
+ /**
+ * Updates the current primary index.
+ */
+ void setPrimaryIndex(long long primaryIndex);
+
+ /**
+ * Returns the current primary index.
+ */
+ int getCurrentPrimaryIndex() const;
+
+ enum StartElectionReason {
+ kElectionTimeout,
+ kPriorityTakeover,
+ kStepUpRequest,
+ kCatchupTakeover
+ };
+
+ /**
+ * Transitions to the candidate role if the node is electable.
+ */
+ Status becomeCandidateIfElectable(const Date_t now, StartElectionReason reason);
+
+ /**
+ * Updates the storage engine read committed support in the TopologyCoordinator options after
+ * creation.
+ */
+ void setStorageEngineSupportsReadCommitted(bool supported);
+
+ /**
+ * Reset the booleans to record the last heartbeat restart.
+ */
+ void restartHeartbeats();
+
+ /**
+ * Scans through all members that are 'up' and return the latest known optime, if we have
+ * received (successful or failed) heartbeats from all nodes since heartbeat restart.
+ *
+ * Returns boost::none if any node hasn't responded to a heartbeat since we last restarted
+ * heartbeats.
+ * Returns OpTime(Timestamp(0, 0), 0), the smallest OpTime in PV1, if other nodes are all down.
+ */
+ boost::optional<OpTime> latestKnownOpTimeSinceHeartbeatRestart() const;
////////////////////////////////////////////////////////////
//
@@ -289,6 +783,9 @@ public:
OID getElectionId() const;
private:
+ typedef int UnelectableReasonMask;
+ class PingStats;
+
enum UnelectableReason {
None = 0,
CannotSeeMajority = 1 << 0,
@@ -304,8 +801,6 @@ private:
NotCloseEnoughToLatestForPriorityTakeover = 1 << 10,
NotFreshEnoughForCatchupTakeover = 1 << 11,
};
- typedef int UnelectableReasonMask;
-
// Set what type of PRIMARY this node currently is.
void _setLeaderMode(LeaderMode mode);
@@ -525,5 +1020,81 @@ private:
ReadCommittedSupport _storageEngineSupportsReadCommitted{ReadCommittedSupport::kUnknown};
};
+/**
+ * Represents a latency measurement for each replica set member based on heartbeat requests.
+ * The measurement is an average weighted 80% to the old value, and 20% to the new value.
+ *
+ * Also stores information about heartbeat progress and retries.
+ */
+class TopologyCoordinator::PingStats {
+public:
+ /**
+ * Records that a new heartbeat request started at "now".
+ *
+ * This resets the failure count used in determining whether the next request to a target
+ * should be a retry or a regularly scheduled heartbeat message.
+ */
+ void start(Date_t now);
+
+ /**
+ * Records that a heartbeat request completed successfully, and that "millis" milliseconds
+ * were spent for a single network roundtrip plus remote processing time.
+ */
+ void hit(Milliseconds millis);
+
+ /**
+ * Records that a heartbeat request failed.
+ */
+ void miss();
+
+ /**
+ * Gets the number of hit() calls.
+ */
+ unsigned int getCount() const {
+ return count;
+ }
+
+ /**
+ * Gets the weighted average round trip time for heartbeat messages to the target.
+ * Returns 0 if there have been no pings recorded yet.
+ */
+ Milliseconds getMillis() const {
+ return value == UninitializedPing ? Milliseconds(0) : value;
+ }
+
+ /**
+ * Gets the date at which start() was last called, which is used to determine if
+ * a heartbeat should be retried or if the time limit has expired.
+ */
+ Date_t getLastHeartbeatStartDate() const {
+ return _lastHeartbeatStartDate;
+ }
+
+ /**
+ * Gets the number of failures since start() was last called.
+ *
+ * This value is incremented by calls to miss(), cleared by calls to start() and
+ * set to the maximum possible value by calls to hit().
+ */
+ int getNumFailuresSinceLastStart() const {
+ return _numFailuresSinceLastStart;
+ }
+
+private:
+ static constexpr Milliseconds UninitializedPing{-1};
+
+ unsigned int count = 0;
+ Milliseconds value = UninitializedPing;
+ Date_t _lastHeartbeatStartDate;
+ int _numFailuresSinceLastStart = std::numeric_limits<int>::max();
+};
+
+//
+// Convenience method for unittest code. Please use accessors otherwise.
+//
+
+std::ostream& operator<<(std::ostream& os, TopologyCoordinator::Role role);
+std::ostream& operator<<(std::ostream& os, TopologyCoordinator::PrepareFreezeResponseResult result);
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/topology_coordinator_test.cpp b/src/mongo/db/repl/topology_coordinator_test.cpp
index a22aec9f72f..034aaabda89 100644
--- a/src/mongo/db/repl/topology_coordinator_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_test.cpp
@@ -37,7 +37,6 @@
#include "mongo/db/repl/repl_set_heartbeat_response.h"
#include "mongo/db/repl/repl_set_request_votes_args.h"
#include "mongo/db/repl/topology_coordinator.h"
-#include "mongo/db/repl/topology_coordinator_impl.h"
#include "mongo/db/server_options.h"
#include "mongo/executor/task_executor.h"
#include "mongo/logger/logger.h"
@@ -73,9 +72,9 @@ bool stringContains(const std::string& haystack, const std::string& needle) {
class TopoCoordTest : public mongo::unittest::Test {
public:
virtual void setUp() {
- _options = TopologyCoordinatorImpl::Options{};
+ _options = TopologyCoordinator::Options{};
_options.maxSyncSourceLagSecs = Seconds{100};
- _topo.reset(new TopologyCoordinatorImpl(_options));
+ _topo.reset(new TopologyCoordinator(_options));
_now = Date_t();
_selfIndex = -1;
_cbData.reset(new executor::TaskExecutor::CallbackArgs(
@@ -88,7 +87,7 @@ public:
}
protected:
- TopologyCoordinatorImpl& getTopoCoord() {
+ TopologyCoordinator& getTopoCoord() {
return *_topo;
}
executor::TaskExecutor::CallbackArgs cbData() {
@@ -98,9 +97,9 @@ protected:
return _now;
}
- void setOptions(const TopologyCoordinatorImpl::Options& options) {
+ void setOptions(const TopologyCoordinator::Options& options) {
_options = options;
- _topo.reset(new TopologyCoordinatorImpl(_options));
+ _topo.reset(new TopologyCoordinator(_options));
}
int64_t countLogLinesContaining(const std::string& needle) {
@@ -239,12 +238,12 @@ private:
}
private:
- unique_ptr<TopologyCoordinatorImpl> _topo;
+ unique_ptr<TopologyCoordinator> _topo;
unique_ptr<executor::TaskExecutor::CallbackArgs> _cbData;
ReplSetConfig _currentConfig;
Date_t _now;
int _selfIndex;
- TopologyCoordinatorImpl::Options _options;
+ TopologyCoordinator::Options _options;
};
TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) {
@@ -6263,7 +6262,7 @@ TEST_F(TopoCoordTest, DoNotGrantDryRunVoteWhenOpTimeIsStale) {
TEST_F(TopoCoordTest, CSRSConfigServerRejectsPV0Config) {
ON_BLOCK_EXIT([]() { serverGlobalParams.clusterRole = ClusterRole::None; });
serverGlobalParams.clusterRole = ClusterRole::ConfigServer;
- TopologyCoordinatorImpl::Options options;
+ TopologyCoordinator::Options options;
options.clusterRole = ClusterRole::ConfigServer;
setOptions(options);
getTopoCoord().setStorageEngineSupportsReadCommitted(false);
diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
index 048bb17b573..c43c0b53066 100644
--- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
@@ -38,7 +38,6 @@
#include "mongo/db/repl/repl_set_heartbeat_response.h"
#include "mongo/db/repl/repl_set_request_votes_args.h"
#include "mongo/db/repl/topology_coordinator.h"
-#include "mongo/db/repl/topology_coordinator_impl.h"
#include "mongo/db/server_options.h"
#include "mongo/executor/task_executor.h"
#include "mongo/logger/logger.h"
@@ -74,9 +73,9 @@ bool stringContains(const std::string& haystack, const std::string& needle) {
class TopoCoordTest : public mongo::unittest::Test {
public:
virtual void setUp() {
- _options = TopologyCoordinatorImpl::Options{};
+ _options = TopologyCoordinator::Options{};
_options.maxSyncSourceLagSecs = Seconds{100};
- _topo.reset(new TopologyCoordinatorImpl(_options));
+ _topo.reset(new TopologyCoordinator(_options));
_now = Date_t();
_selfIndex = -1;
_cbData.reset(new executor::TaskExecutor::CallbackArgs(
@@ -89,7 +88,7 @@ public:
}
protected:
- TopologyCoordinatorImpl& getTopoCoord() {
+ TopologyCoordinator& getTopoCoord() {
return *_topo;
}
executor::TaskExecutor::CallbackArgs cbData() {
@@ -99,9 +98,9 @@ protected:
return _now;
}
- void setOptions(const TopologyCoordinatorImpl::Options& options) {
+ void setOptions(const TopologyCoordinator::Options& options) {
_options = options;
- _topo.reset(new TopologyCoordinatorImpl(_options));
+ _topo.reset(new TopologyCoordinator(_options));
}
int64_t countLogLinesContaining(const std::string& needle) {
@@ -252,12 +251,12 @@ private:
}
private:
- unique_ptr<TopologyCoordinatorImpl> _topo;
+ unique_ptr<TopologyCoordinator> _topo;
unique_ptr<executor::TaskExecutor::CallbackArgs> _cbData;
ReplSetConfig _currentConfig;
Date_t _now;
int _selfIndex;
- TopologyCoordinatorImpl::Options _options;
+ TopologyCoordinator::Options _options;
};
TEST_F(TopoCoordTest, NodeReturnsSecondaryWithMostRecentDataAsSyncSource) {
@@ -3179,7 +3178,7 @@ TEST_F(TopoCoordTest, DoNotGrantDryRunVoteWhenOpTimeIsStale) {
TEST_F(TopoCoordTest, NodeTransitionsToRemovedIfCSRSButHaveNoReadCommittedSupport) {
ON_BLOCK_EXIT([]() { serverGlobalParams.clusterRole = ClusterRole::None; });
serverGlobalParams.clusterRole = ClusterRole::ConfigServer;
- TopologyCoordinatorImpl::Options options;
+ TopologyCoordinator::Options options;
options.clusterRole = ClusterRole::ConfigServer;
setOptions(options);
getTopoCoord().setStorageEngineSupportsReadCommitted(false);
@@ -3206,7 +3205,7 @@ TEST_F(TopoCoordTest, NodeTransitionsToRemovedIfCSRSButHaveNoReadCommittedSuppor
TEST_F(TopoCoordTest, NodeBecomesSecondaryAsNormalWhenReadCommittedSupportedAndCSRS) {
ON_BLOCK_EXIT([]() { serverGlobalParams.clusterRole = ClusterRole::None; });
serverGlobalParams.clusterRole = ClusterRole::ConfigServer;
- TopologyCoordinatorImpl::Options options;
+ TopologyCoordinator::Options options;
options.clusterRole = ClusterRole::ConfigServer;
setOptions(options);
getTopoCoord().setStorageEngineSupportsReadCommitted(true);