summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_impl.cpp
diff options
context:
space:
mode:
authorXuerui Fa <xuerui.fa@mongodb.com>2020-05-04 20:36:27 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-05-08 14:24:43 +0000
commita7ff5838b5a4bcb4881a08c5dafce671c3b54940 (patch)
tree8cd080ae35227546cead1104b958ce8b3996e9f4 /src/mongo/db/repl/replication_coordinator_impl.cpp
parent307b547e672ffebe30461eae1ed5d6bc36e919ca (diff)
downloadmongo-a7ff5838b5a4bcb4881a08c5dafce671c3b54940.tar.gz
SERVER-47449: Re-evaluate sync source after fetching each batch of oplog entries
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp45
1 files changed, 31 insertions, 14 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 3f41b3d9870..92db9fccae8 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -84,7 +84,6 @@
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/rslog.h"
#include "mongo/db/repl/storage_interface.h"
-#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/db/repl/transaction_oplog_application.h"
#include "mongo/db/repl/update_position_args.h"
#include "mongo/db/repl/vote_requester.h"
@@ -4603,16 +4602,12 @@ bool ReplicationCoordinatorImpl::isReplEnabled() const {
return getReplicationMode() != modeNone;
}
-HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOpTimeFetched) {
- stdx::lock_guard<Latch> lk(_mutex);
-
- HostAndPort oldSyncSource = _topCoord->getSyncSourceAddress();
+const ReadPreference ReplicationCoordinatorImpl::_getSyncSourceReadPreference(WithLock) {
// Always allow chaining while in catchup and drain mode.
auto memberState = _getMemberState_inlock();
- auto chainingPreference = memberState.primary()
- ? TopologyCoordinator::ChainingPreference::kAllowChaining
- : TopologyCoordinator::ChainingPreference::kUseConfiguration;
ReadPreference readPreference = ReadPreference::Nearest;
+
+ bool parsedSyncSourceFromInitialSync = false;
// Handle special case of initial sync source read preference.
// This sync source will be cleared when we go to secondary mode, because we will perform
// a postMemberState action of kOnFollowerModeStateChange which calls chooseNewSyncSource().
@@ -4622,11 +4617,10 @@ HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOp
readPreference =
ReadPreference_parse(IDLParserErrorContext("initialSyncSourceReadPreference"),
initialSyncSourceReadPreference);
+ parsedSyncSourceFromInitialSync = true;
} catch (const DBException& e) {
fassertFailedWithStatus(3873100, e.toStatus());
}
- // If read preference is explictly set, it takes precedence over chaining: false.
- chainingPreference = TopologyCoordinator::ChainingPreference::kAllowChaining;
} else if (_rsConfig.getMemberAt(_selfIndex).getNumVotes() > 0) {
// Voting nodes prefer to sync from the primary. A voting node that is initial syncing
// may have acknowledged writes which are part of the set's write majority; if it then
@@ -4637,8 +4631,24 @@ HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOp
readPreference = ReadPreference::PrimaryPreferred;
}
}
- HostAndPort newSyncSource = _topCoord->chooseNewSyncSource(
- _replExecutor->now(), lastOpTimeFetched, chainingPreference, readPreference);
+ if (!parsedSyncSourceFromInitialSync && !memberState.primary() &&
+ !_rsConfig.isChainingAllowed()) {
+ // If we are not the primary and chaining is disabled in the config, we should only be
+ // syncing from the primary.
+ readPreference = ReadPreference::PrimaryOnly;
+ }
+ return readPreference;
+}
+
+HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOpTimeFetched) {
+ stdx::lock_guard<Latch> lk(_mutex);
+
+ HostAndPort oldSyncSource = _topCoord->getSyncSourceAddress();
+
+ const auto readPreference = _getSyncSourceReadPreference(lk);
+
+ HostAndPort newSyncSource =
+ _topCoord->chooseNewSyncSource(_replExecutor->now(), lastOpTimeFetched, readPreference);
auto primary = _topCoord->getCurrentPrimaryMember();
// If read preference is SecondaryOnly, we should never choose the primary.
invariant(readPreference != ReadPreference::SecondaryOnly || !primary ||
@@ -4703,8 +4713,15 @@ bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& curre
const rpc::OplogQueryMetadata& oqMetadata,
const OpTime& lastOpTimeFetched) {
stdx::lock_guard<Latch> lock(_mutex);
- return _topCoord->shouldChangeSyncSource(
- currentSource, replMetadata, oqMetadata, lastOpTimeFetched, _replExecutor->now());
+ const auto now = _replExecutor->now();
+ if (_topCoord->shouldChangeSyncSource(
+ currentSource, replMetadata, oqMetadata, lastOpTimeFetched, now)) {
+ return true;
+ }
+
+ const auto readPreference = _getSyncSourceReadPreference(lock);
+ return _topCoord->shouldChangeSyncSourceDueToPingTime(
+ currentSource, _getMemberState_inlock(), lastOpTimeFetched, now, readPreference);
}
void ReplicationCoordinatorImpl::_updateLastCommittedOpTimeAndWallTime(WithLock lk) {