diff options
author | Xuerui Fa <xuerui.fa@mongodb.com> | 2020-05-04 20:36:27 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-05-08 14:24:43 +0000 |
commit | a7ff5838b5a4bcb4881a08c5dafce671c3b54940 (patch) | |
tree | 8cd080ae35227546cead1104b958ce8b3996e9f4 /src/mongo/db/repl/replication_coordinator_impl.cpp | |
parent | 307b547e672ffebe30461eae1ed5d6bc36e919ca (diff) | |
download | mongo-a7ff5838b5a4bcb4881a08c5dafce671c3b54940.tar.gz |
SERVER-47449: Re-evaluate sync source after fetching each batch of oplog entries
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 45 |
1 files changed, 31 insertions, 14 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 3f41b3d9870..92db9fccae8 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -84,7 +84,6 @@ #include "mongo/db/repl/replication_process.h" #include "mongo/db/repl/rslog.h" #include "mongo/db/repl/storage_interface.h" -#include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/transaction_oplog_application.h" #include "mongo/db/repl/update_position_args.h" #include "mongo/db/repl/vote_requester.h" @@ -4603,16 +4602,12 @@ bool ReplicationCoordinatorImpl::isReplEnabled() const { return getReplicationMode() != modeNone; } -HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOpTimeFetched) { - stdx::lock_guard<Latch> lk(_mutex); - - HostAndPort oldSyncSource = _topCoord->getSyncSourceAddress(); +const ReadPreference ReplicationCoordinatorImpl::_getSyncSourceReadPreference(WithLock) { // Always allow chaining while in catchup and drain mode. auto memberState = _getMemberState_inlock(); - auto chainingPreference = memberState.primary() - ? TopologyCoordinator::ChainingPreference::kAllowChaining - : TopologyCoordinator::ChainingPreference::kUseConfiguration; ReadPreference readPreference = ReadPreference::Nearest; + + bool parsedSyncSourceFromInitialSync = false; // Handle special case of initial sync source read preference. // This sync source will be cleared when we go to secondary mode, because we will perform // a postMemberState action of kOnFollowerModeStateChange which calls chooseNewSyncSource(). @@ -4622,11 +4617,10 @@ HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOp readPreference = ReadPreference_parse(IDLParserErrorContext("initialSyncSourceReadPreference"), initialSyncSourceReadPreference); + parsedSyncSourceFromInitialSync = true; } catch (const DBException& e) { fassertFailedWithStatus(3873100, e.toStatus()); } - // If read preference is explictly set, it takes precedence over chaining: false. - chainingPreference = TopologyCoordinator::ChainingPreference::kAllowChaining; } else if (_rsConfig.getMemberAt(_selfIndex).getNumVotes() > 0) { // Voting nodes prefer to sync from the primary. A voting node that is initial syncing // may have acknowledged writes which are part of the set's write majority; if it then @@ -4637,8 +4631,24 @@ HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOp readPreference = ReadPreference::PrimaryPreferred; } } - HostAndPort newSyncSource = _topCoord->chooseNewSyncSource( - _replExecutor->now(), lastOpTimeFetched, chainingPreference, readPreference); + if (!parsedSyncSourceFromInitialSync && !memberState.primary() && + !_rsConfig.isChainingAllowed()) { + // If we are not the primary and chaining is disabled in the config, we should only be + // syncing from the primary. + readPreference = ReadPreference::PrimaryOnly; + } + return readPreference; +} + +HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOpTimeFetched) { + stdx::lock_guard<Latch> lk(_mutex); + + HostAndPort oldSyncSource = _topCoord->getSyncSourceAddress(); + + const auto readPreference = _getSyncSourceReadPreference(lk); + + HostAndPort newSyncSource = + _topCoord->chooseNewSyncSource(_replExecutor->now(), lastOpTimeFetched, readPreference); auto primary = _topCoord->getCurrentPrimaryMember(); // If read preference is SecondaryOnly, we should never choose the primary. invariant(readPreference != ReadPreference::SecondaryOnly || !primary || @@ -4703,8 +4713,15 @@ bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& curre const rpc::OplogQueryMetadata& oqMetadata, const OpTime& lastOpTimeFetched) { stdx::lock_guard<Latch> lock(_mutex); - return _topCoord->shouldChangeSyncSource( - currentSource, replMetadata, oqMetadata, lastOpTimeFetched, _replExecutor->now()); + const auto now = _replExecutor->now(); + if (_topCoord->shouldChangeSyncSource( + currentSource, replMetadata, oqMetadata, lastOpTimeFetched, now)) { + return true; + } + + const auto readPreference = _getSyncSourceReadPreference(lock); + return _topCoord->shouldChangeSyncSourceDueToPingTime( + currentSource, _getMemberState_inlock(), lastOpTimeFetched, now, readPreference); } void ReplicationCoordinatorImpl::_updateLastCommittedOpTimeAndWallTime(WithLock lk) { |