summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamy Lanka <samy.lanka@mongodb.com>2020-04-28 03:20:48 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-05-12 02:35:39 +0000
commitab9233d37298a13752ebed493c2c2f973b087adf (patch)
treefb4097522044abbd52ad3e3e2990f78c9f5a38bd
parent4639620984ecf63486a8c3a2534151d9bc619b9a (diff)
downloadmongo-ab9233d37298a13752ebed493c2c2f973b087adf.tar.gz
SERVER-47453 Maintain a rolling list of timestamps of the when the recent sync source changes occurred
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/repl_server_parameters.idl12
-rw-r--r--src/mongo/db/repl/topology_coordinator.cpp55
-rw-r--r--src/mongo/db/repl/topology_coordinator.h41
-rw-r--r--src/mongo/db/repl/topology_coordinator_v1_test.cpp176
5 files changed, 273 insertions, 12 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 429558dae15..d8acb4fdf42 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -670,6 +670,7 @@ env.Library('topology_coordinator',
'repl_coordinator_interface',
],
LIBDEPS_PRIVATE=[
+ 'repl_server_parameters',
'$BUILD_DIR/mongo/db/catalog/commit_quorum_options',
'$BUILD_DIR/mongo/idl/server_parameter',
'repl_server_parameters',
diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl
index 62e8475e9dc..8e345c8fc92 100644
--- a/src/mongo/db/repl/repl_server_parameters.idl
+++ b/src/mongo/db/repl/repl_server_parameters.idl
@@ -313,3 +313,15 @@ server_parameters:
default: 5
validator:
gte: 0
+
+ maxNumSyncSourceChangesPerHour:
+ description: >-
+ The number of sync source changes that can happen per hour before the node temporarily
+ turns off reevaluating its sync source. This will only affect sync source changes while
+ a node has a valid sync source.
+ set_at: [ startup, runtime ]
+ cpp_vartype: AtomicWord<int>
+ cpp_varname: maxNumSyncSourceChangesPerHour
+ default: 3
+ validator:
+ gt: 0
diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp
index 8db549071ad..e38cd18f126 100644
--- a/src/mongo/db/repl/topology_coordinator.cpp
+++ b/src/mongo/db/repl/topology_coordinator.cpp
@@ -173,6 +173,38 @@ void TopologyCoordinator::PingStats::miss() {
}
}
+bool TopologyCoordinator::RecentSyncSourceChanges::changedTooOftenRecently(Date_t now) {
+ size_t maxSize = maxNumSyncSourceChangesPerHour.load();
+
+ // Return false if we have fewer than maxNumSyncSourceChangesPerHour entries.
+ if (_recentChanges.empty() || _recentChanges.size() < maxSize) {
+ return false;
+ }
+
+ // Remove additional entries in case maxNumSyncSourceChangesPerHour was changed.
+ while (_recentChanges.size() > maxSize) {
+ _recentChanges.pop();
+ }
+
+ // Return whether all entries in the queue happened within the last hour by checking the oldest
+ // entry.
+ auto hourBefore = now - Hours(1);
+ return _recentChanges.front() > hourBefore;
+}
+
+void TopologyCoordinator::RecentSyncSourceChanges::addNewEntry(Date_t now) {
+ // Remove additional entries if the queue already has maxNumSyncSourceChangerPerHour entries.
+ while (_recentChanges.size() >= static_cast<size_t>(maxNumSyncSourceChangesPerHour.load())) {
+ _recentChanges.pop();
+ }
+ _recentChanges.push(now);
+ return;
+}
+
+std::queue<Date_t> TopologyCoordinator::RecentSyncSourceChanges::getChanges_forTest() {
+ return _recentChanges;
+}
+
TopologyCoordinator::TopologyCoordinator(Options options)
: _role(Role::kFollower),
_topologyVersion(instanceId, 0),
@@ -209,6 +241,12 @@ HostAndPort TopologyCoordinator::getSyncSourceAddress() const {
HostAndPort TopologyCoordinator::chooseNewSyncSource(Date_t now,
const OpTime& lastOpTimeFetched,
ReadPreference readPreference) {
+ ON_BLOCK_EXIT([&]() {
+ // If we chose another sync source, update the recent sync source changes.
+ if (!_syncSource.empty()) {
+ _recentSyncSourceChanges.addNewEntry(now);
+ }
+ });
// Check to make sure we can choose a sync source, and choose a forced one if
// set.
auto maybeSyncSource = _chooseSyncSourceInitialStep(now);
@@ -307,14 +345,14 @@ HostAndPort TopologyCoordinator::_chooseNearbySyncSource(Date_t now,
}
setMyHeartbeatMessage(now, message);
- _syncSource = HostAndPort();
- return _syncSource;
+ return HostAndPort();
}
- _syncSource = _rsConfig.getMemberAt(closestIndex).getHostAndPort();
- LOGV2(21799, "Sync source candidate chosen", "syncSource"_attr = _syncSource);
- std::string msg(str::stream() << "syncing from: " << _syncSource.toString(), 0);
+
+ auto syncSource = _rsConfig.getMemberAt(closestIndex).getHostAndPort();
+ LOGV2(21799, "Sync source candidate chosen", "syncSource"_attr = syncSource);
+ std::string msg(str::stream() << "syncing from: " << syncSource.toString(), 0);
setMyHeartbeatMessage(now, msg);
- return _syncSource;
+ return syncSource;
}
const OpTime TopologyCoordinator::_getOldestSyncOpTime() const {
@@ -1685,6 +1723,11 @@ void TopologyCoordinator::populateAllMembersConfigVersionAndTerm_forTest() {
}
}
+TopologyCoordinator::RecentSyncSourceChanges*
+TopologyCoordinator::getRecentSyncSourceChanges_forTest() {
+ return &_recentSyncSourceChanges;
+}
+
std::string TopologyCoordinator::_getReplSetStatusString() {
// Construct a ReplSetStatusArgs using default parameters. Missing parameters will not be
// included in the status string.
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index d61ef4502bf..797f1beecb5 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -31,6 +31,7 @@
#include <functional>
#include <iosfwd>
+#include <queue>
#include <string>
#include "mongo/client/read_preference.h"
@@ -76,6 +77,37 @@ class TopologyCoordinator {
public:
/**
+ * RecentSyncSourceChanges stores the times that recent sync source changes happened. It will
+ * maintain a max size of maxSyncSourceChangesPerHour. If any additional entries are added,
+ * older entries will be removed. It is used to restrict the number of sync source changes that
+ * happen per hour when the node already has a valid sync source.
+ */
+ class RecentSyncSourceChanges {
+ public:
+ /**
+ * Checks if all the entries occurred within the last hour or not. It will remove additional
+ * entries if it sees that there are more than maxSyncSourceChangesPerHour entries. If there
+ * are fewer than maxSyncSourceChangesPerHour entries, it returns false.
+ */
+ bool changedTooOftenRecently(Date_t now);
+
+ /**
+ * Adds a new entry. It will remove additional entries if it sees that there are more than
+ * maxSyncSourceChangesPerHour entries. This should only be called if the sync source was
+ * changed to another node, not if the sync source was cleared.
+ */
+ void addNewEntry(Date_t now);
+
+ /**
+ * Return the underlying queue. Used for testing purposes only.
+ */
+ std::queue<Date_t> getChanges_forTest();
+
+ private:
+ std::queue<Date_t> _recentChanges;
+ };
+
+ /**
* Type that denotes the role of a node in the replication protocol.
*
* The role is distinct from MemberState, in that it only deals with the
@@ -784,6 +816,13 @@ public:
const Timestamp& electionTime = Timestamp(0, 0));
/**
+ * Get a raw pointer to the list of recent sync source changes. It is the caller's
+ * responsibility to not use this pointer beyond the lifetime of the object. Used for testing
+ * only.
+ */
+ RecentSyncSourceChanges* getRecentSyncSourceChanges_forTest();
+
+ /**
* Change config (version, term) of each member in the initial test config so that
* it will be majority replicated without having to mock heartbeats.
*/
@@ -1076,6 +1115,8 @@ private:
// Whether or not the storage engine supports read committed.
ReadCommittedSupport _storageEngineSupportsReadCommitted{ReadCommittedSupport::kUnknown};
+
+ RecentSyncSourceChanges _recentSyncSourceChanges;
};
/**
diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
index 1224f1331fc..3ec624eb95f 100644
--- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
@@ -636,13 +636,23 @@ TEST_F(TopoCoordTest, ChooseOnlyVotersAsSyncSourceWhenNodeIsAVoter) {
heartbeatFromMember(h3, "rs0", MemberState::RS_SECONDARY, ot1, hbRTT300);
// Should choose h3 as it is a voter
- auto newSource = getTopoCoord().chooseNewSyncSource(now()++, OpTime(), ReadPreference::Nearest);
+ auto first = now()++;
+ auto newSource = getTopoCoord().chooseNewSyncSource(first, OpTime(), ReadPreference::Nearest);
ASSERT_EQUALS(h3, newSource);
+ // Since a new sync source was chosen, recentSyncSourceChanges should be updated
+ auto recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest();
+ ASSERT(std::queue<Date_t>({first}) == recentSyncSourceChanges->getChanges_forTest());
+
// Can't choose h2 as it is not a voter
- newSource = getTopoCoord().chooseNewSyncSource(now()++, ot10, ReadPreference::Nearest);
+ auto second = now()++;
+ newSource = getTopoCoord().chooseNewSyncSource(second, ot10, ReadPreference::Nearest);
ASSERT_EQUALS(HostAndPort(), newSource);
+ // Since no new sync source was chosen, recentSyncSourceChanges shouldn't be updated.
+ recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest();
+ ASSERT(std::queue<Date_t>({first}) == recentSyncSourceChanges->getChanges_forTest());
+
// Should choose h3 as it is a voter, and ahead
heartbeatFromMember(h3, "rs0", MemberState::RS_SECONDARY, ot5, hbRTT300);
newSource = getTopoCoord().chooseNewSyncSource(now()++, ot1, ReadPreference::Nearest);
@@ -893,6 +903,10 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenReadPreferenceIsPrimaryOn
getTopoCoord().chooseNewSyncSource(now()++, OpTime(), ReadPreference::PrimaryOnly));
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
+ // Since no new sync source was chosen, recentSyncSourceChanges shouldn't be updated.
+ auto recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest();
+ ASSERT(recentSyncSourceChanges->getChanges_forTest().empty());
+
// Add primary
ASSERT_EQUALS(-1, getCurrentPrimaryIndex());
heartbeatFromMember(HostAndPort("h3"),
@@ -909,6 +923,10 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenReadPreferenceIsPrimaryOn
now()++, OpTime(Timestamp(10, 0), 0), ReadPreference::PrimaryOnly));
ASSERT_EQUALS(HostAndPort(), getTopoCoord().getSyncSourceAddress());
+ // Since no new sync source was chosen, recentSyncSourceChanges shouldn't be updated.
+ recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest();
+ ASSERT(recentSyncSourceChanges->getChanges_forTest().empty());
+
// Update the primary's position.
heartbeatFromMember(HostAndPort("h3"),
"rs0",
@@ -918,11 +936,16 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenReadPreferenceIsPrimaryOn
// h3 is primary and should be chosen as the sync source, despite being further away than h2
// and the primary (h3) being at our most recently applied optime.
+ auto changeTime = now()++;
ASSERT_EQUALS(HostAndPort("h3"),
getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(Timestamp(10, 0), 0), ReadPreference::PrimaryOnly));
+ changeTime, OpTime(Timestamp(10, 0), 0), ReadPreference::PrimaryOnly));
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
+ // Since a new sync source was chosen, recentSyncSourceChanges should be updated.
+ recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest();
+ ASSERT(std::queue<Date_t>({changeTime}) == recentSyncSourceChanges->getChanges_forTest());
+
// Become primary: should not choose self as sync source.
heartbeatFromMember(HostAndPort("h3"),
"rs0",
@@ -1058,11 +1081,17 @@ TEST_F(TopoCoordTest, PreferPrimaryAsSyncSourceWhenReadPreferenceIsPrimaryPrefer
Milliseconds(300));
// No primary situation: should choose h2.
+ auto first = now()++;
ASSERT_EQUALS(
HostAndPort("h2"),
- getTopoCoord().chooseNewSyncSource(now()++, OpTime(), ReadPreference::PrimaryPreferred));
+ getTopoCoord().chooseNewSyncSource(first, OpTime(), ReadPreference::PrimaryPreferred));
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
+ // Make sure that recentSyncSourceChanges is updated even though the primary is not chosen as
+ // the next sync source.
+ auto recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest();
+ ASSERT(std::queue<Date_t>({first}) == recentSyncSourceChanges->getChanges_forTest());
+
// Add primary
ASSERT_EQUALS(-1, getCurrentPrimaryIndex());
heartbeatFromMember(HostAndPort("h3"),
@@ -1074,11 +1103,17 @@ TEST_F(TopoCoordTest, PreferPrimaryAsSyncSourceWhenReadPreferenceIsPrimaryPrefer
// h3 is primary, but its last applied isn't as up-to-date as ours, so it cannot be chosen
// as the sync source.
+ auto second = now()++;
ASSERT_EQUALS(HostAndPort("h2"),
getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(Timestamp(10, 0), 0), ReadPreference::PrimaryPreferred));
+ second, OpTime(Timestamp(10, 0), 0), ReadPreference::PrimaryPreferred));
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
+ // Make sure that recentSyncSourceChanges is updated even though the primary is not chosen as
+ // the next sync source.
+ recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest();
+ ASSERT(std::queue<Date_t>({first, second}) == recentSyncSourceChanges->getChanges_forTest());
+
// Update the primary's position.
heartbeatFromMember(HostAndPort("h3"),
"rs0",
@@ -1088,11 +1123,18 @@ TEST_F(TopoCoordTest, PreferPrimaryAsSyncSourceWhenReadPreferenceIsPrimaryPrefer
// h3 is primary and should be chosen as the sync source, despite being further away than h2
// and the primary (h3) being at our most recently applied optime.
+ auto third = now()++;
ASSERT_EQUALS(HostAndPort("h3"),
getTopoCoord().chooseNewSyncSource(
- now()++, OpTime(Timestamp(10, 0), 0), ReadPreference::PrimaryPreferred));
+ third, OpTime(Timestamp(10, 0), 0), ReadPreference::PrimaryPreferred));
ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress());
+ // Make sure that recentSyncSourceChanges is updated when the primary is chosen as the next
+ // sync source.
+ recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest();
+ ASSERT(std::queue<Date_t>({first, second, third}) ==
+ recentSyncSourceChanges->getChanges_forTest());
+
// Sanity check: the same test as above should return the secondary "h2" if primary is not
// preferred.
ASSERT_EQUALS(HostAndPort("h2"),
@@ -2119,6 +2161,128 @@ TEST_F(TopoCoordTest, PrepareStepDownAttemptFailsIfNotLeader) {
ASSERT_EQUALS(expectedStatus, getTopoCoord().prepareForStepDownAttempt().getStatus());
}
+/**
+ * Checks that the queue contains the expected entries in the correct order.
+ */
+void assertTimesEqual(const std::initializer_list<Date_t>& expectedTimes,
+ std::queue<Date_t> actualQueue) {
+ std::queue<Date_t> expectedQueue(expectedTimes);
+ ASSERT(expectedQueue == actualQueue);
+}
+
+TEST_F(TopoCoordTest, RecordSyncSourceChangeMaintainsSizeWhenAtCapacity) {
+ auto recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest();
+
+ auto first = Date_t::now();
+ recentSyncSourceChanges->addNewEntry(first);
+
+ auto second = Date_t::now();
+ recentSyncSourceChanges->addNewEntry(second);
+
+ auto third = Date_t::now();
+ recentSyncSourceChanges->addNewEntry(third);
+
+ auto fourth = Date_t::now();
+ recentSyncSourceChanges->addNewEntry(fourth);
+
+ assertTimesEqual({second, third, fourth}, recentSyncSourceChanges->getChanges_forTest());
+}
+
+TEST_F(TopoCoordTest, ChangedTooOftenRecentlyReturnsTrue) {
+ auto recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest();
+
+ auto first = Date_t::now();
+ recentSyncSourceChanges->addNewEntry(first);
+
+ auto second = Date_t::now();
+ recentSyncSourceChanges->addNewEntry(second);
+
+ auto third = Date_t::now();
+ recentSyncSourceChanges->addNewEntry(third);
+
+ assertTimesEqual({first, second, third}, recentSyncSourceChanges->getChanges_forTest());
+ ASSERT(recentSyncSourceChanges->changedTooOftenRecently(Date_t::now()));
+}
+
+TEST_F(TopoCoordTest, ChangedTooOftenRecentlyReturnsFalse) {
+ auto recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest();
+
+ // Make this two hours before now.
+ auto first = Date_t::now() - Hours(2);
+ recentSyncSourceChanges->addNewEntry(first);
+
+ auto second = Date_t::now();
+ recentSyncSourceChanges->addNewEntry(second);
+
+ auto third = Date_t::now();
+ recentSyncSourceChanges->addNewEntry(third);
+
+ assertTimesEqual({first, second, third}, recentSyncSourceChanges->getChanges_forTest());
+ ASSERT_FALSE(recentSyncSourceChanges->changedTooOftenRecently(Date_t::now()));
+}
+
+TEST_F(TopoCoordTest, AddNewEntryReducesSizeIfMaxNumSyncSourceChangesPerHourChanged) {
+ // Make sure to restore the default value at the end of this test.
+ ON_BLOCK_EXIT([]() { maxNumSyncSourceChangesPerHour.store(3); });
+
+ auto recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest();
+
+ auto first = Date_t::now();
+ recentSyncSourceChanges->addNewEntry(first);
+
+ auto second = Date_t::now();
+ recentSyncSourceChanges->addNewEntry(second);
+
+ auto third = Date_t::now();
+ recentSyncSourceChanges->addNewEntry(third);
+
+ maxNumSyncSourceChangesPerHour.store(2);
+
+ auto fourth = Date_t::now();
+ recentSyncSourceChanges->addNewEntry(fourth);
+
+ assertTimesEqual({third, fourth}, recentSyncSourceChanges->getChanges_forTest());
+ ASSERT(recentSyncSourceChanges->changedTooOftenRecently(Date_t::now()));
+}
+
+TEST_F(TopoCoordTest, ChangedTooOftenRecentlyReducesSizeIfMaxNumSyncSourceChangesPerHourChanged) {
+ // Make sure to restore the default value at the end of this test.
+ ON_BLOCK_EXIT([]() { maxNumSyncSourceChangesPerHour.store(3); });
+
+ auto recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest();
+
+ auto first = Date_t::now();
+ recentSyncSourceChanges->addNewEntry(first);
+
+ auto second = Date_t::now();
+ recentSyncSourceChanges->addNewEntry(second);
+
+ auto third = Date_t::now();
+ recentSyncSourceChanges->addNewEntry(third);
+
+ maxNumSyncSourceChangesPerHour.store(1);
+
+ ASSERT(recentSyncSourceChanges->changedTooOftenRecently(Date_t::now()));
+
+ assertTimesEqual({third}, recentSyncSourceChanges->getChanges_forTest());
+}
+
+TEST_F(TopoCoordTest, ChangedTooOftenRecentlyReturnsFalseWhenEmpty) {
+ auto recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest();
+ assertTimesEqual({}, recentSyncSourceChanges->getChanges_forTest());
+ ASSERT_FALSE(recentSyncSourceChanges->changedTooOftenRecently(Date_t::now()));
+}
+
+TEST_F(TopoCoordTest, ChangedTooOftenRecentlyReturnsFalseWhenNotFilled) {
+ auto recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest();
+
+ auto first = Date_t::now();
+ recentSyncSourceChanges->addNewEntry(first);
+
+ assertTimesEqual({first}, recentSyncSourceChanges->getChanges_forTest());
+ ASSERT_FALSE(recentSyncSourceChanges->changedTooOftenRecently(Date_t::now()));
+}
+
class PrepareHeartbeatResponseV1Test : public TopoCoordTest {
public:
virtual void setUp() {