From ab9233d37298a13752ebed493c2c2f973b087adf Mon Sep 17 00:00:00 2001 From: Samy Lanka Date: Tue, 28 Apr 2020 03:20:48 -0400 Subject: SERVER-47453 Maintain a rolling list of timestamps of the when the recent sync source changes occurred --- src/mongo/db/repl/SConscript | 1 + src/mongo/db/repl/repl_server_parameters.idl | 12 ++ src/mongo/db/repl/topology_coordinator.cpp | 55 ++++++- src/mongo/db/repl/topology_coordinator.h | 41 +++++ src/mongo/db/repl/topology_coordinator_v1_test.cpp | 176 ++++++++++++++++++++- 5 files changed, 273 insertions(+), 12 deletions(-) (limited to 'src') diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 429558dae15..d8acb4fdf42 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -670,6 +670,7 @@ env.Library('topology_coordinator', 'repl_coordinator_interface', ], LIBDEPS_PRIVATE=[ + 'repl_server_parameters', '$BUILD_DIR/mongo/db/catalog/commit_quorum_options', '$BUILD_DIR/mongo/idl/server_parameter', 'repl_server_parameters', diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl index 62e8475e9dc..8e345c8fc92 100644 --- a/src/mongo/db/repl/repl_server_parameters.idl +++ b/src/mongo/db/repl/repl_server_parameters.idl @@ -313,3 +313,15 @@ server_parameters: default: 5 validator: gte: 0 + + maxNumSyncSourceChangesPerHour: + description: >- + The number of sync source changes that can happen per hour before the node temporarily + turns off reevaluating its sync source. This will only affect sync source changes while + a node has a valid sync source. + set_at: [ startup, runtime ] + cpp_vartype: AtomicWord + cpp_varname: maxNumSyncSourceChangesPerHour + default: 3 + validator: + gt: 0 diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index 8db549071ad..e38cd18f126 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -173,6 +173,38 @@ void TopologyCoordinator::PingStats::miss() { } } +bool TopologyCoordinator::RecentSyncSourceChanges::changedTooOftenRecently(Date_t now) { + size_t maxSize = maxNumSyncSourceChangesPerHour.load(); + + // Return false if we have fewer than maxNumSyncSourceChangesPerHour entries. + if (_recentChanges.empty() || _recentChanges.size() < maxSize) { + return false; + } + + // Remove additional entries in case maxNumSyncSourceChangesPerHour was changed. + while (_recentChanges.size() > maxSize) { + _recentChanges.pop(); + } + + // Return whether all entries in the queue happened within the last hour by checking the oldest + // entry. + auto hourBefore = now - Hours(1); + return _recentChanges.front() > hourBefore; +} + +void TopologyCoordinator::RecentSyncSourceChanges::addNewEntry(Date_t now) { + // Remove additional entries if the queue already has maxNumSyncSourceChangerPerHour entries. + while (_recentChanges.size() >= static_cast(maxNumSyncSourceChangesPerHour.load())) { + _recentChanges.pop(); + } + _recentChanges.push(now); + return; +} + +std::queue TopologyCoordinator::RecentSyncSourceChanges::getChanges_forTest() { + return _recentChanges; +} + TopologyCoordinator::TopologyCoordinator(Options options) : _role(Role::kFollower), _topologyVersion(instanceId, 0), @@ -209,6 +241,12 @@ HostAndPort TopologyCoordinator::getSyncSourceAddress() const { HostAndPort TopologyCoordinator::chooseNewSyncSource(Date_t now, const OpTime& lastOpTimeFetched, ReadPreference readPreference) { + ON_BLOCK_EXIT([&]() { + // If we chose another sync source, update the recent sync source changes. + if (!_syncSource.empty()) { + _recentSyncSourceChanges.addNewEntry(now); + } + }); // Check to make sure we can choose a sync source, and choose a forced one if // set. auto maybeSyncSource = _chooseSyncSourceInitialStep(now); @@ -307,14 +345,14 @@ HostAndPort TopologyCoordinator::_chooseNearbySyncSource(Date_t now, } setMyHeartbeatMessage(now, message); - _syncSource = HostAndPort(); - return _syncSource; + return HostAndPort(); } - _syncSource = _rsConfig.getMemberAt(closestIndex).getHostAndPort(); - LOGV2(21799, "Sync source candidate chosen", "syncSource"_attr = _syncSource); - std::string msg(str::stream() << "syncing from: " << _syncSource.toString(), 0); + + auto syncSource = _rsConfig.getMemberAt(closestIndex).getHostAndPort(); + LOGV2(21799, "Sync source candidate chosen", "syncSource"_attr = syncSource); + std::string msg(str::stream() << "syncing from: " << syncSource.toString(), 0); setMyHeartbeatMessage(now, msg); - return _syncSource; + return syncSource; } const OpTime TopologyCoordinator::_getOldestSyncOpTime() const { @@ -1685,6 +1723,11 @@ void TopologyCoordinator::populateAllMembersConfigVersionAndTerm_forTest() { } } +TopologyCoordinator::RecentSyncSourceChanges* +TopologyCoordinator::getRecentSyncSourceChanges_forTest() { + return &_recentSyncSourceChanges; +} + std::string TopologyCoordinator::_getReplSetStatusString() { // Construct a ReplSetStatusArgs using default parameters. Missing parameters will not be // included in the status string. diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index d61ef4502bf..797f1beecb5 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -31,6 +31,7 @@ #include #include +#include #include #include "mongo/client/read_preference.h" @@ -75,6 +76,37 @@ class TopologyCoordinator { TopologyCoordinator& operator=(const TopologyCoordinator&) = delete; public: + /** + * RecentSyncSourceChanges stores the times that recent sync source changes happened. It will + * maintain a max size of maxSyncSourceChangesPerHour. If any additional entries are added, + * older entries will be removed. It is used to restrict the number of sync source changes that + * happen per hour when the node already has a valid sync source. + */ + class RecentSyncSourceChanges { + public: + /** + * Checks if all the entries occurred within the last hour or not. It will remove additional + * entries if it sees that there are more than maxSyncSourceChangesPerHour entries. If there + * are fewer than maxSyncSourceChangesPerHour entries, it returns false. + */ + bool changedTooOftenRecently(Date_t now); + + /** + * Adds a new entry. It will remove additional entries if it sees that there are more than + * maxSyncSourceChangesPerHour entries. This should only be called if the sync source was + * changed to another node, not if the sync source was cleared. + */ + void addNewEntry(Date_t now); + + /** + * Return the underlying queue. Used for testing purposes only. + */ + std::queue getChanges_forTest(); + + private: + std::queue _recentChanges; + }; + /** * Type that denotes the role of a node in the replication protocol. * @@ -783,6 +815,13 @@ public: void setCurrentPrimary_forTest(int primaryIndex, const Timestamp& electionTime = Timestamp(0, 0)); + /** + * Get a raw pointer to the list of recent sync source changes. It is the caller's + * responsibility to not use this pointer beyond the lifetime of the object. Used for testing + * only. + */ + RecentSyncSourceChanges* getRecentSyncSourceChanges_forTest(); + /** * Change config (version, term) of each member in the initial test config so that * it will be majority replicated without having to mock heartbeats. @@ -1076,6 +1115,8 @@ private: // Whether or not the storage engine supports read committed. ReadCommittedSupport _storageEngineSupportsReadCommitted{ReadCommittedSupport::kUnknown}; + + RecentSyncSourceChanges _recentSyncSourceChanges; }; /** diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp index 1224f1331fc..3ec624eb95f 100644 --- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp @@ -636,13 +636,23 @@ TEST_F(TopoCoordTest, ChooseOnlyVotersAsSyncSourceWhenNodeIsAVoter) { heartbeatFromMember(h3, "rs0", MemberState::RS_SECONDARY, ot1, hbRTT300); // Should choose h3 as it is a voter - auto newSource = getTopoCoord().chooseNewSyncSource(now()++, OpTime(), ReadPreference::Nearest); + auto first = now()++; + auto newSource = getTopoCoord().chooseNewSyncSource(first, OpTime(), ReadPreference::Nearest); ASSERT_EQUALS(h3, newSource); + // Since a new sync source was chosen, recentSyncSourceChanges should be updated + auto recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest(); + ASSERT(std::queue({first}) == recentSyncSourceChanges->getChanges_forTest()); + // Can't choose h2 as it is not a voter - newSource = getTopoCoord().chooseNewSyncSource(now()++, ot10, ReadPreference::Nearest); + auto second = now()++; + newSource = getTopoCoord().chooseNewSyncSource(second, ot10, ReadPreference::Nearest); ASSERT_EQUALS(HostAndPort(), newSource); + // Since no new sync source was chosen, recentSyncSourceChanges shouldn't be updated. + recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest(); + ASSERT(std::queue({first}) == recentSyncSourceChanges->getChanges_forTest()); + // Should choose h3 as it is a voter, and ahead heartbeatFromMember(h3, "rs0", MemberState::RS_SECONDARY, ot5, hbRTT300); newSource = getTopoCoord().chooseNewSyncSource(now()++, ot1, ReadPreference::Nearest); @@ -893,6 +903,10 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenReadPreferenceIsPrimaryOn getTopoCoord().chooseNewSyncSource(now()++, OpTime(), ReadPreference::PrimaryOnly)); ASSERT(getTopoCoord().getSyncSourceAddress().empty()); + // Since no new sync source was chosen, recentSyncSourceChanges shouldn't be updated. + auto recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest(); + ASSERT(recentSyncSourceChanges->getChanges_forTest().empty()); + // Add primary ASSERT_EQUALS(-1, getCurrentPrimaryIndex()); heartbeatFromMember(HostAndPort("h3"), @@ -909,6 +923,10 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenReadPreferenceIsPrimaryOn now()++, OpTime(Timestamp(10, 0), 0), ReadPreference::PrimaryOnly)); ASSERT_EQUALS(HostAndPort(), getTopoCoord().getSyncSourceAddress()); + // Since no new sync source was chosen, recentSyncSourceChanges shouldn't be updated. + recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest(); + ASSERT(recentSyncSourceChanges->getChanges_forTest().empty()); + // Update the primary's position. heartbeatFromMember(HostAndPort("h3"), "rs0", @@ -918,11 +936,16 @@ TEST_F(TopoCoordTest, ChooseOnlyPrimaryAsSyncSourceWhenReadPreferenceIsPrimaryOn // h3 is primary and should be chosen as the sync source, despite being further away than h2 // and the primary (h3) being at our most recently applied optime. + auto changeTime = now()++; ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().chooseNewSyncSource( - now()++, OpTime(Timestamp(10, 0), 0), ReadPreference::PrimaryOnly)); + changeTime, OpTime(Timestamp(10, 0), 0), ReadPreference::PrimaryOnly)); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); + // Since a new sync source was chosen, recentSyncSourceChanges should be updated. + recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest(); + ASSERT(std::queue({changeTime}) == recentSyncSourceChanges->getChanges_forTest()); + // Become primary: should not choose self as sync source. heartbeatFromMember(HostAndPort("h3"), "rs0", @@ -1058,11 +1081,17 @@ TEST_F(TopoCoordTest, PreferPrimaryAsSyncSourceWhenReadPreferenceIsPrimaryPrefer Milliseconds(300)); // No primary situation: should choose h2. + auto first = now()++; ASSERT_EQUALS( HostAndPort("h2"), - getTopoCoord().chooseNewSyncSource(now()++, OpTime(), ReadPreference::PrimaryPreferred)); + getTopoCoord().chooseNewSyncSource(first, OpTime(), ReadPreference::PrimaryPreferred)); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); + // Make sure that recentSyncSourceChanges is updated even though the primary is not chosen as + // the next sync source. + auto recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest(); + ASSERT(std::queue({first}) == recentSyncSourceChanges->getChanges_forTest()); + // Add primary ASSERT_EQUALS(-1, getCurrentPrimaryIndex()); heartbeatFromMember(HostAndPort("h3"), @@ -1074,11 +1103,17 @@ TEST_F(TopoCoordTest, PreferPrimaryAsSyncSourceWhenReadPreferenceIsPrimaryPrefer // h3 is primary, but its last applied isn't as up-to-date as ours, so it cannot be chosen // as the sync source. + auto second = now()++; ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().chooseNewSyncSource( - now()++, OpTime(Timestamp(10, 0), 0), ReadPreference::PrimaryPreferred)); + second, OpTime(Timestamp(10, 0), 0), ReadPreference::PrimaryPreferred)); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); + // Make sure that recentSyncSourceChanges is updated even though the primary is not chosen as + // the next sync source. + recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest(); + ASSERT(std::queue({first, second}) == recentSyncSourceChanges->getChanges_forTest()); + // Update the primary's position. heartbeatFromMember(HostAndPort("h3"), "rs0", @@ -1088,11 +1123,18 @@ TEST_F(TopoCoordTest, PreferPrimaryAsSyncSourceWhenReadPreferenceIsPrimaryPrefer // h3 is primary and should be chosen as the sync source, despite being further away than h2 // and the primary (h3) being at our most recently applied optime. + auto third = now()++; ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().chooseNewSyncSource( - now()++, OpTime(Timestamp(10, 0), 0), ReadPreference::PrimaryPreferred)); + third, OpTime(Timestamp(10, 0), 0), ReadPreference::PrimaryPreferred)); ASSERT_EQUALS(HostAndPort("h3"), getTopoCoord().getSyncSourceAddress()); + // Make sure that recentSyncSourceChanges is updated when the primary is chosen as the next + // sync source. + recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest(); + ASSERT(std::queue({first, second, third}) == + recentSyncSourceChanges->getChanges_forTest()); + // Sanity check: the same test as above should return the secondary "h2" if primary is not // preferred. ASSERT_EQUALS(HostAndPort("h2"), @@ -2119,6 +2161,128 @@ TEST_F(TopoCoordTest, PrepareStepDownAttemptFailsIfNotLeader) { ASSERT_EQUALS(expectedStatus, getTopoCoord().prepareForStepDownAttempt().getStatus()); } +/** + * Checks that the queue contains the expected entries in the correct order. + */ +void assertTimesEqual(const std::initializer_list& expectedTimes, + std::queue actualQueue) { + std::queue expectedQueue(expectedTimes); + ASSERT(expectedQueue == actualQueue); +} + +TEST_F(TopoCoordTest, RecordSyncSourceChangeMaintainsSizeWhenAtCapacity) { + auto recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest(); + + auto first = Date_t::now(); + recentSyncSourceChanges->addNewEntry(first); + + auto second = Date_t::now(); + recentSyncSourceChanges->addNewEntry(second); + + auto third = Date_t::now(); + recentSyncSourceChanges->addNewEntry(third); + + auto fourth = Date_t::now(); + recentSyncSourceChanges->addNewEntry(fourth); + + assertTimesEqual({second, third, fourth}, recentSyncSourceChanges->getChanges_forTest()); +} + +TEST_F(TopoCoordTest, ChangedTooOftenRecentlyReturnsTrue) { + auto recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest(); + + auto first = Date_t::now(); + recentSyncSourceChanges->addNewEntry(first); + + auto second = Date_t::now(); + recentSyncSourceChanges->addNewEntry(second); + + auto third = Date_t::now(); + recentSyncSourceChanges->addNewEntry(third); + + assertTimesEqual({first, second, third}, recentSyncSourceChanges->getChanges_forTest()); + ASSERT(recentSyncSourceChanges->changedTooOftenRecently(Date_t::now())); +} + +TEST_F(TopoCoordTest, ChangedTooOftenRecentlyReturnsFalse) { + auto recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest(); + + // Make this two hours before now. + auto first = Date_t::now() - Hours(2); + recentSyncSourceChanges->addNewEntry(first); + + auto second = Date_t::now(); + recentSyncSourceChanges->addNewEntry(second); + + auto third = Date_t::now(); + recentSyncSourceChanges->addNewEntry(third); + + assertTimesEqual({first, second, third}, recentSyncSourceChanges->getChanges_forTest()); + ASSERT_FALSE(recentSyncSourceChanges->changedTooOftenRecently(Date_t::now())); +} + +TEST_F(TopoCoordTest, AddNewEntryReducesSizeIfMaxNumSyncSourceChangesPerHourChanged) { + // Make sure to restore the default value at the end of this test. + ON_BLOCK_EXIT([]() { maxNumSyncSourceChangesPerHour.store(3); }); + + auto recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest(); + + auto first = Date_t::now(); + recentSyncSourceChanges->addNewEntry(first); + + auto second = Date_t::now(); + recentSyncSourceChanges->addNewEntry(second); + + auto third = Date_t::now(); + recentSyncSourceChanges->addNewEntry(third); + + maxNumSyncSourceChangesPerHour.store(2); + + auto fourth = Date_t::now(); + recentSyncSourceChanges->addNewEntry(fourth); + + assertTimesEqual({third, fourth}, recentSyncSourceChanges->getChanges_forTest()); + ASSERT(recentSyncSourceChanges->changedTooOftenRecently(Date_t::now())); +} + +TEST_F(TopoCoordTest, ChangedTooOftenRecentlyReducesSizeIfMaxNumSyncSourceChangesPerHourChanged) { + // Make sure to restore the default value at the end of this test. + ON_BLOCK_EXIT([]() { maxNumSyncSourceChangesPerHour.store(3); }); + + auto recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest(); + + auto first = Date_t::now(); + recentSyncSourceChanges->addNewEntry(first); + + auto second = Date_t::now(); + recentSyncSourceChanges->addNewEntry(second); + + auto third = Date_t::now(); + recentSyncSourceChanges->addNewEntry(third); + + maxNumSyncSourceChangesPerHour.store(1); + + ASSERT(recentSyncSourceChanges->changedTooOftenRecently(Date_t::now())); + + assertTimesEqual({third}, recentSyncSourceChanges->getChanges_forTest()); +} + +TEST_F(TopoCoordTest, ChangedTooOftenRecentlyReturnsFalseWhenEmpty) { + auto recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest(); + assertTimesEqual({}, recentSyncSourceChanges->getChanges_forTest()); + ASSERT_FALSE(recentSyncSourceChanges->changedTooOftenRecently(Date_t::now())); +} + +TEST_F(TopoCoordTest, ChangedTooOftenRecentlyReturnsFalseWhenNotFilled) { + auto recentSyncSourceChanges = getTopoCoord().getRecentSyncSourceChanges_forTest(); + + auto first = Date_t::now(); + recentSyncSourceChanges->addNewEntry(first); + + assertTimesEqual({first}, recentSyncSourceChanges->getChanges_forTest()); + ASSERT_FALSE(recentSyncSourceChanges->changedTooOftenRecently(Date_t::now())); +} + class PrepareHeartbeatResponseV1Test : public TopoCoordTest { public: virtual void setUp() { -- cgit v1.2.1