summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@mongodb.com>2022-03-15 10:02:18 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-16 15:16:25 +0000
commitd173689149305c83f6c5e45878e54698694f4106 (patch)
treee3591c36c4f8862c1e761260297a1a8e88dbeaaa
parent3818135a1b201fc2fbb9286c14bafd88f159a08f (diff)
downloadmongo-d173689149305c83f6c5e45878e54698694f4106.tar.gz
SERVER-63417 Oplog fetcher should not retry when a node is known to be down
-rw-r--r--src/mongo/db/repl/bgsync.cpp16
-rw-r--r--src/mongo/db/repl/data_replicator_external_state.h9
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.cpp14
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.h5
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp8
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_initial_sync.h5
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.cpp8
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.h11
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp7
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp13
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp84
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp24
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h16
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp7
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h5
-rw-r--r--src/mongo/db/repl/replication_coordinator_noop.cpp7
-rw-r--r--src/mongo/db/repl/replication_coordinator_noop.h5
-rw-r--r--src/mongo/db/repl/sync_source_selector.h21
-rw-r--r--src/mongo/db/repl/sync_source_selector_mock.cpp7
-rw-r--r--src/mongo/db/repl/sync_source_selector_mock.h5
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp8
-rw-r--r--src/mongo/db/repl/topology_coordinator.cpp26
-rw-r--r--src/mongo/db/repl/topology_coordinator.h8
-rw-r--r--src/mongo/db/repl/topology_coordinator_v1_test.cpp138
-rw-r--r--src/mongo/embedded/replication_coordinator_embedded.cpp7
-rw-r--r--src/mongo/embedded/replication_coordinator_embedded.h5
26 files changed, 427 insertions, 42 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index dc8ad23ed71..8217e38090a 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -114,7 +114,10 @@ public:
const rpc::ReplSetMetadata& replMetadata,
const rpc::OplogQueryMetadata& oqMetadata,
const OpTime& previousOpTimeFetched,
- const OpTime& lastOpTimeFetched) override;
+ const OpTime& lastOpTimeFetched) const final;
+
+ ChangeSyncSourceAction shouldStopFetchingOnError(const HostAndPort& source,
+ const OpTime& lastOpTimeFetched) const final;
private:
BackgroundSync* _bgsync;
@@ -132,7 +135,7 @@ ChangeSyncSourceAction DataReplicatorExternalStateBackgroundSync::shouldStopFetc
const rpc::ReplSetMetadata& replMetadata,
const rpc::OplogQueryMetadata& oqMetadata,
const OpTime& previousOpTimeFetched,
- const OpTime& lastOpTimeFetched) {
+ const OpTime& lastOpTimeFetched) const {
if (_bgsync->shouldStopFetching()) {
return ChangeSyncSourceAction::kStopSyncingAndEnqueueLastBatch;
}
@@ -141,6 +144,15 @@ ChangeSyncSourceAction DataReplicatorExternalStateBackgroundSync::shouldStopFetc
source, replMetadata, oqMetadata, previousOpTimeFetched, lastOpTimeFetched);
}
+ChangeSyncSourceAction DataReplicatorExternalStateBackgroundSync::shouldStopFetchingOnError(
+ const HostAndPort& source, const OpTime& lastOpTimeFetched) const {
+ if (_bgsync->shouldStopFetching()) {
+ return ChangeSyncSourceAction::kStopSyncingAndDropLastBatchIfPresent;
+ }
+
+ return DataReplicatorExternalStateImpl::shouldStopFetchingOnError(source, lastOpTimeFetched);
+}
+
size_t getSize(const BSONObj& o) {
// SERVER-9808 Avoid Fortify complaint about implicit signed->unsigned conversion
return static_cast<size_t>(o.objsize());
diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h
index 5649195b807..87826b0f199 100644
--- a/src/mongo/db/repl/data_replicator_external_state.h
+++ b/src/mongo/db/repl/data_replicator_external_state.h
@@ -101,7 +101,14 @@ public:
const rpc::ReplSetMetadata& replMetadata,
const rpc::OplogQueryMetadata& oqMetadata,
const OpTime& previousOpTimeFetched,
- const OpTime& lastOpTimeFetched) = 0;
+ const OpTime& lastOpTimeFetched) const = 0;
+
+ /**
+ * Evaluates quality of sync source. This is intended to be called on error when no
+ * current metadata is available.
+ */
+ virtual ChangeSyncSourceAction shouldStopFetchingOnError(
+ const HostAndPort& source, const OpTime& lastOpTimeFetched) const = 0;
/**
* This function creates an oplog buffer of the type specified at server startup.
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
index 2621917a862..00c924ff1ea 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
@@ -100,7 +100,7 @@ ChangeSyncSourceAction DataReplicatorExternalStateImpl::shouldStopFetching(
const rpc::ReplSetMetadata& replMetadata,
const rpc::OplogQueryMetadata& oqMetadata,
const OpTime& previousOpTimeFetched,
- const OpTime& lastOpTimeFetched) {
+ const OpTime& lastOpTimeFetched) const {
// Re-evaluate quality of sync target.
auto changeSyncSourceAction = _replicationCoordinator->shouldChangeSyncSource(
source, replMetadata, oqMetadata, previousOpTimeFetched, lastOpTimeFetched);
@@ -118,6 +118,18 @@ ChangeSyncSourceAction DataReplicatorExternalStateImpl::shouldStopFetching(
return changeSyncSourceAction;
}
+ChangeSyncSourceAction DataReplicatorExternalStateImpl::shouldStopFetchingOnError(
+ const HostAndPort& source, const OpTime& lastOpTimeFetched) const {
+ auto changeSyncSourceAction =
+ _replicationCoordinator->shouldChangeSyncSourceOnError(source, lastOpTimeFetched);
+ if (changeSyncSourceAction != ChangeSyncSourceAction::kContinueSyncing) {
+ LOGV2(6341701,
+ "Canceling oplog query on fetch error. We have to choose a new sync source",
+ "syncSource"_attr = source);
+ }
+ return changeSyncSourceAction;
+}
+
std::unique_ptr<OplogBuffer> DataReplicatorExternalStateImpl::makeInitialSyncOplogBuffer(
OperationContext* opCtx) const {
if (initialSyncOplogBuffer == kCollectionOplogBufferName) {
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h
index 080483955b4..c408c484dc9 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.h
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.h
@@ -59,7 +59,10 @@ public:
const rpc::ReplSetMetadata& replMetadata,
const rpc::OplogQueryMetadata& oqMetadata,
const OpTime& previousOpTimeFetched,
- const OpTime& lastOpTimeFetched) override;
+ const OpTime& lastOpTimeFetched) const override;
+
+ ChangeSyncSourceAction shouldStopFetchingOnError(
+ const HostAndPort& source, const OpTime& lastOpTimeFetched) const override;
std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(OperationContext* opCtx) const override;
diff --git a/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp b/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp
index f26f2a1dd0a..abb8f3bebd8 100644
--- a/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp
@@ -45,7 +45,7 @@ ChangeSyncSourceAction DataReplicatorExternalStateInitialSync::shouldStopFetchin
const rpc::ReplSetMetadata&,
const rpc::OplogQueryMetadata&,
const OpTime& previousOpTimeFetched,
- const OpTime& lastOpTimeFetched) {
+ const OpTime& lastOpTimeFetched) const {
// Since initial sync does not allow for sync source changes, it should not check if there are
// better sync sources. If there is a problem on the sync source, it will manifest itself in the
@@ -54,5 +54,11 @@ ChangeSyncSourceAction DataReplicatorExternalStateInitialSync::shouldStopFetchin
return ChangeSyncSourceAction::kContinueSyncing;
}
+ChangeSyncSourceAction DataReplicatorExternalStateInitialSync::shouldStopFetchingOnError(
+ const HostAndPort&, const OpTime& lastOpTimeFetched) const {
+
+ return ChangeSyncSourceAction::kContinueSyncing;
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/data_replicator_external_state_initial_sync.h b/src/mongo/db/repl/data_replicator_external_state_initial_sync.h
index c3c8195928c..55316e3da16 100644
--- a/src/mongo/db/repl/data_replicator_external_state_initial_sync.h
+++ b/src/mongo/db/repl/data_replicator_external_state_initial_sync.h
@@ -48,7 +48,10 @@ public:
const rpc::ReplSetMetadata& replMetadata,
const rpc::OplogQueryMetadata& oqMetadata,
const OpTime& previousOpTimeFetched,
- const OpTime& lastOpTimeFetched) override;
+ const OpTime& lastOpTimeFetched) const override;
+
+ ChangeSyncSourceAction shouldStopFetchingOnError(
+ const HostAndPort& source, const OpTime& lastOpTimeFetched) const override;
};
} // namespace repl
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
index dddc2e4f9ba..ddcfc701ca6 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
@@ -98,13 +98,19 @@ ChangeSyncSourceAction DataReplicatorExternalStateMock::shouldStopFetching(
const rpc::ReplSetMetadata& replMetadata,
const rpc::OplogQueryMetadata& oqMetadata,
const OpTime& previousOpTimeFetched,
- const OpTime& lastOpTimeFetched) {
+ const OpTime& lastOpTimeFetched) const {
lastSyncSourceChecked = source;
syncSourceLastOpTime = oqMetadata.getLastOpApplied();
syncSourceHasSyncSource = oqMetadata.getSyncSourceIndex() != -1;
return shouldStopFetchingResult;
}
+ChangeSyncSourceAction DataReplicatorExternalStateMock::shouldStopFetchingOnError(
+ const HostAndPort& source, const OpTime& lastOpTimeFetched) const {
+ lastSyncSourceChecked = source;
+ return shouldStopFetchingResult;
+}
+
std::unique_ptr<OplogBuffer> DataReplicatorExternalStateMock::makeInitialSyncOplogBuffer(
OperationContext* opCtx) const {
return std::make_unique<OplogBufferBlockingQueue>();
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h
index 789e6a63ed3..535ee513102 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.h
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.h
@@ -56,7 +56,10 @@ public:
const rpc::ReplSetMetadata& replMetadata,
const rpc::OplogQueryMetadata& oqMetadata,
const OpTime& previousOpTimeFetched,
- const OpTime& lastOpTimeFetched) override;
+ const OpTime& lastOpTimeFetched) const override;
+
+ ChangeSyncSourceAction shouldStopFetchingOnError(
+ const HostAndPort& source, const OpTime& lastOpTimeFetched) const override;
std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(OperationContext* opCtx) const override;
@@ -89,9 +92,9 @@ public:
bool metadataWasProcessed = false;
// Set by shouldStopFetching.
- HostAndPort lastSyncSourceChecked;
- OpTime syncSourceLastOpTime;
- bool syncSourceHasSyncSource = false;
+ mutable HostAndPort lastSyncSourceChecked;
+ mutable OpTime syncSourceLastOpTime;
+ mutable bool syncSourceHasSyncSource = false;
// Returned by shouldStopFetching.
ChangeSyncSourceAction shouldStopFetchingResult = ChangeSyncSourceAction::kContinueSyncing;
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 9420e749159..7956564d23a 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -178,11 +178,16 @@ public:
const rpc::ReplSetMetadata& replMetadata,
const rpc::OplogQueryMetadata& oqMetadata,
const OpTime& previousOpTimeFetched,
- const OpTime& lastOpTimeFetched) override {
+ const OpTime& lastOpTimeFetched) const override {
return _syncSourceSelector->shouldChangeSyncSource(
currentSource, replMetadata, oqMetadata, previousOpTimeFetched, lastOpTimeFetched);
}
+ ChangeSyncSourceAction shouldChangeSyncSourceOnError(
+ const HostAndPort& currentSource, const OpTime& lastOpTimeFetched) const override {
+ return _syncSourceSelector->shouldChangeSyncSourceOnError(currentSource, lastOpTimeFetched);
+ }
+
void scheduleNetworkResponse(std::string cmdName, const BSONObj& obj) {
NetworkInterfaceMock* net = getNet();
if (!net->hasReadyRequests()) {
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index e05f286fb05..816b5f19fa6 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -400,11 +400,17 @@ void OplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& callbac
auto batchResult = _getNextBatch();
if (!batchResult.isOK()) {
auto brStatus = batchResult.getStatus();
+ // Determine if we should stop syncing from our current sync source. If we're going
+ // to change sync sources anyway, do it immediately rather than checking if we can
+ // retry the error.
+ const bool stopFetching = _dataReplicatorExternalState->shouldStopFetchingOnError(
+ _config.source, _getLastOpTimeFetched()) !=
+ ChangeSyncSourceAction::kContinueSyncing;
// Recreate a cursor if we have enough retries left.
// If we are a TenantOplogFetcher, we never retry as we will always restart the
// TenantMigrationRecipient state machine on failure. So instead, we just fail and exit.
- if (_oplogFetcherRestartDecision->shouldContinue(this, brStatus) &&
+ if (!stopFetching && _oplogFetcherRestartDecision->shouldContinue(this, brStatus) &&
!_config.forTenantMigration) {
hangBeforeOplogFetcherRetries.pauseWhileSet();
_cursor.reset();
@@ -492,6 +498,9 @@ Status OplogFetcher::_connect() {
}
}();
} while (!connectStatus.isOK() &&
+ _dataReplicatorExternalState->shouldStopFetchingOnError(_config.source,
+ _getLastOpTimeFetched()) ==
+ ChangeSyncSourceAction::kContinueSyncing &&
_oplogFetcherRestartDecision->shouldContinue(this, connectStatus));
return connectStatus;
@@ -888,7 +897,7 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
errMsg << " previous batch last fetched optime: " << previousOpTimeFetched.toString();
errMsg << " current batch last fetched optime: " << lastDocOpTime.toString();
- if (changeSyncSourceAction == ChangeSyncSourceAction::kStopSyncingAndDropLastBatch) {
+ if (changeSyncSourceAction == ChangeSyncSourceAction::kStopSyncingAndDropLastBatchIfPresent) {
return Status(ErrorCodes::InvalidSyncSource, errMsg);
}
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index eca60c5d2a3..96003452793 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -1236,6 +1236,48 @@ TEST_F(OplogFetcherTest, DontRecreateNewCursorAfterFailedBatchNoRetries) {
ASSERT_EQUALS(ErrorCodes::NetworkTimeout, shutdownState.getStatus());
}
+TEST_F(OplogFetcherTest, DontRecreateNewCursorAfterFailedBatchWhenSyncSourceChangeIsExpected) {
+ ShutdownState shutdownState;
+
+ // Create an oplog fetcher with one retry.
+ auto oplogFetcher = getOplogFetcherAfterConnectionCreated(std::ref(shutdownState), 1);
+
+ CursorId cursorId = 22LL;
+ auto firstEntry = makeNoopOplogEntry(lastFetched);
+ auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
+ auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oqMetadata);
+ auto firstBatch = {firstEntry, secondEntry};
+
+ // Update lastFetched before it is updated by getting the next batch.
+ lastFetched = oplogFetcher->getLastOpTimeFetched_forTest();
+
+ // Creating the cursor will succeed.
+ auto m = processSingleRequestResponse(oplogFetcher->getDBClientConnection_forTest(),
+ makeFirstBatch(cursorId, firstBatch, metadataObj),
+ true);
+
+ validateFindCommand(
+ m, lastFetched, durationCount<Milliseconds>(oplogFetcher->getInitialFindMaxTime_forTest()));
+
+ // Check that the first batch was successfully processed.
+ validateLastBatch(
+ true /* skipFirstDoc */, firstBatch, oplogFetcher->getLastOpTimeFetched_forTest());
+
+ // Mock a result that tells us to stop syncing.
+ dataReplicatorExternalState->shouldStopFetchingResult =
+ ChangeSyncSourceAction::kStopSyncingAndDropLastBatchIfPresent;
+
+ // This will cause the oplog fetcher to fail while getting the next batch. Since we're expecting
+ // a sync source change, the oplog fetcher will shut down.
+ processSingleRequestResponse(
+ oplogFetcher->getDBClientConnection_forTest(),
+ mongo::Status{mongo::ErrorCodes::NetworkTimeout, "Fake socket timeout"});
+
+ oplogFetcher->join();
+
+ ASSERT_EQUALS(ErrorCodes::NetworkTimeout, shutdownState.getStatus());
+}
+
TEST_F(OplogFetcherTest, FailCreateNewCursorAfterFailedBatchRetriesShutsDownOplogFetcher) {
ShutdownState shutdownState;
@@ -2012,10 +2054,10 @@ TEST_F(OplogFetcherTest,
TEST_F(OplogFetcherTest, FailedSyncSourceCheckReturnsStopSyncingAndDropBatch) {
testSyncSourceChecking(
- replSetMetadata, oqMetadata, ChangeSyncSourceAction::kStopSyncingAndDropLastBatch);
+ replSetMetadata, oqMetadata, ChangeSyncSourceAction::kStopSyncingAndDropLastBatchIfPresent);
- // If the 'shouldStopFetching' check returns kStopSyncingAndDropLastBatch, we should not enqueue
- // any documents.
+ // If the 'shouldStopFetching' check returns kStopSyncingAndDropLastBatchIfPresent, we should
+ // not enqueue any documents.
ASSERT_TRUE(lastEnqueuedDocuments.empty());
}
@@ -2199,14 +2241,50 @@ TEST_F(OplogFetcherTest, OplogFetcherRetriesConnectionButFails) {
// Shutdown the mock remote server before the OplogFetcher tries to connect.
_mockServer->shutdown();
+ startCapturingLogMessages();
// Create an OplogFetcher with 1 retry attempt. This will also ensure that _runQuery was
// scheduled before returning.
auto oplogFetcher = getOplogFetcherAfterConnectionCreated(std::ref(shutdownState), 1);
oplogFetcher->join();
+ stopCapturingLogMessages();
// This is the error code for connection failures.
ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState.getStatus());
+
+ // We should see one retry
+ ASSERT_EQUALS(1, countBSONFormatLogLinesIsSubset(BSON("id" << 21274)));
+
+ // We should one failure to retry
+ ASSERT_EQUALS(1, countBSONFormatLogLinesIsSubset(BSON("id" << 21275)));
+}
+
+TEST_F(OplogFetcherTest, OplogFetcherDoesNotRetryConnectionWhenSyncSourceChangeIsExpected) {
+ // Test that OplogFetcher does not retry after failing the initial connection if we've gotten
+ // a better sync source in the meantime.
+ ShutdownState shutdownState;
+
+ // Mock a result that tells us to stop syncing.
+ dataReplicatorExternalState->shouldStopFetchingResult =
+ ChangeSyncSourceAction::kStopSyncingAndDropLastBatchIfPresent;
+
+ // Shutdown the mock remote server before the OplogFetcher tries to connect.
+ _mockServer->shutdown();
+
+ startCapturingLogMessages();
+ // Create an OplogFetcher with 1 retry attempt. This will also ensure that _runQuery was
+ // scheduled before returning.
+ auto oplogFetcher = getOplogFetcherAfterConnectionCreated(std::ref(shutdownState), 1);
+
+ oplogFetcher->join();
+ stopCapturingLogMessages();
+
+ // This is the error code for connection failures.
+ ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState.getStatus());
+
+ // We should not see retry attempts.
+ ASSERT_EQUALS(0, countBSONFormatLogLinesIsSubset(BSON("id" << 21274)));
+ ASSERT_EQUALS(0, countBSONFormatLogLinesIsSubset(BSON("id" << 21275)));
}
TEST_F(OplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownBeforeReconnect) {
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index a17f698b8ab..da5fbd375e9 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -5066,7 +5066,7 @@ bool ReplicationCoordinatorImpl::isReplEnabled() const {
return getReplicationMode() != modeNone;
}
-const ReadPreference ReplicationCoordinatorImpl::_getSyncSourceReadPreference(WithLock) {
+const ReadPreference ReplicationCoordinatorImpl::_getSyncSourceReadPreference(WithLock) const {
// Always allow chaining while in catchup and drain mode.
auto memberState = _getMemberState_inlock();
ReadPreference readPreference = ReadPreference::Nearest;
@@ -5176,7 +5176,7 @@ ChangeSyncSourceAction ReplicationCoordinatorImpl::shouldChangeSyncSource(
const rpc::ReplSetMetadata& replMetadata,
const rpc::OplogQueryMetadata& oqMetadata,
const OpTime& previousOpTimeFetched,
- const OpTime& lastOpTimeFetched) {
+ const OpTime& lastOpTimeFetched) const {
stdx::lock_guard<Latch> lock(_mutex);
const auto now = _replExecutor->now();
@@ -5191,7 +5191,25 @@ ChangeSyncSourceAction ReplicationCoordinatorImpl::shouldChangeSyncSource(
// We should drop the last batch if we find a significantly closer node. This is to
// avoid advancing our 'lastFetched', which makes it more likely that we will be able to
// choose the closer node as our sync source.
- return ChangeSyncSourceAction::kStopSyncingAndDropLastBatch;
+ return ChangeSyncSourceAction::kStopSyncingAndDropLastBatchIfPresent;
+ }
+
+ return ChangeSyncSourceAction::kContinueSyncing;
+}
+
+ChangeSyncSourceAction ReplicationCoordinatorImpl::shouldChangeSyncSourceOnError(
+ const HostAndPort& currentSource, const OpTime& lastOpTimeFetched) const {
+ stdx::lock_guard<Latch> lock(_mutex);
+ const auto now = _replExecutor->now();
+
+ if (_topCoord->shouldChangeSyncSourceOnError(currentSource, lastOpTimeFetched, now)) {
+ return ChangeSyncSourceAction::kStopSyncingAndDropLastBatchIfPresent;
+ }
+
+ const auto readPreference = _getSyncSourceReadPreference(lock);
+ if (_topCoord->shouldChangeSyncSourceDueToPingTime(
+ currentSource, _getMemberState_inlock(), lastOpTimeFetched, now, readPreference)) {
+ return ChangeSyncSourceAction::kStopSyncingAndDropLastBatchIfPresent;
}
return ChangeSyncSourceAction::kContinueSyncing;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index e213e4d18d2..8affe603f30 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -316,11 +316,15 @@ public:
virtual void resetLastOpTimesFromOplog(OperationContext* opCtx) override;
- virtual ChangeSyncSourceAction shouldChangeSyncSource(const HostAndPort& currentSource,
- const rpc::ReplSetMetadata& replMetadata,
- const rpc::OplogQueryMetadata& oqMetadata,
- const OpTime& previousOpTimeFetched,
- const OpTime& lastOpTimeFetched) override;
+ virtual ChangeSyncSourceAction shouldChangeSyncSource(
+ const HostAndPort& currentSource,
+ const rpc::ReplSetMetadata& replMetadata,
+ const rpc::OplogQueryMetadata& oqMetadata,
+ const OpTime& previousOpTimeFetched,
+ const OpTime& lastOpTimeFetched) const override;
+
+ virtual ChangeSyncSourceAction shouldChangeSyncSourceOnError(
+ const HostAndPort& currentSource, const OpTime& lastOpTimeFetched) const override;
virtual OpTime getLastCommittedOpTime() const override;
virtual OpTimeAndWallTime getLastCommittedOpTimeAndWallTime() const override;
@@ -1543,7 +1547,7 @@ private:
/*
* Calculates and returns the read preference for the node.
*/
- const ReadPreference _getSyncSourceReadPreference(WithLock);
+ const ReadPreference _getSyncSourceReadPreference(WithLock) const;
/*
* Performs the replica set reconfig procedure. Certain consensus safety checks are omitted when
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index cc5b6a58388..0dfcd6e66e1 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -588,7 +588,12 @@ ChangeSyncSourceAction ReplicationCoordinatorMock::shouldChangeSyncSource(
const rpc::ReplSetMetadata& replMetadata,
const rpc::OplogQueryMetadata& oqMetadata,
const OpTime& previousOpTimeFetched,
- const OpTime& lastOpTimeFetched) {
+ const OpTime& lastOpTimeFetched) const {
+ MONGO_UNREACHABLE;
+}
+
+ChangeSyncSourceAction ReplicationCoordinatorMock::shouldChangeSyncSourceOnError(
+ const HostAndPort& currentSource, const OpTime& lastOpTimeFetched) const {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 9e69b080931..a03227ce33e 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -287,7 +287,10 @@ public:
const rpc::ReplSetMetadata& replMetadata,
const rpc::OplogQueryMetadata& oqMetadata,
const OpTime& previousOpTimeFetched,
- const OpTime& lastOpTimeFetched);
+ const OpTime& lastOpTimeFetched) const;
+
+ virtual ChangeSyncSourceAction shouldChangeSyncSourceOnError(
+ const HostAndPort& currentSource, const OpTime& lastOpTimeFetched) const;
virtual OpTime getLastCommittedOpTime() const;
diff --git a/src/mongo/db/repl/replication_coordinator_noop.cpp b/src/mongo/db/repl/replication_coordinator_noop.cpp
index e2d0994e89d..9ca8ffe1d9d 100644
--- a/src/mongo/db/repl/replication_coordinator_noop.cpp
+++ b/src/mongo/db/repl/replication_coordinator_noop.cpp
@@ -459,7 +459,12 @@ ChangeSyncSourceAction ReplicationCoordinatorNoOp::shouldChangeSyncSource(
const rpc::ReplSetMetadata&,
const rpc::OplogQueryMetadata&,
const OpTime&,
- const OpTime&) {
+ const OpTime&) const {
+ MONGO_UNREACHABLE;
+}
+
+ChangeSyncSourceAction ReplicationCoordinatorNoOp::shouldChangeSyncSourceOnError(
+ const HostAndPort&, const OpTime&) const {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/repl/replication_coordinator_noop.h b/src/mongo/db/repl/replication_coordinator_noop.h
index c355ce73e37..635ac26d371 100644
--- a/src/mongo/db/repl/replication_coordinator_noop.h
+++ b/src/mongo/db/repl/replication_coordinator_noop.h
@@ -246,7 +246,10 @@ public:
const rpc::ReplSetMetadata&,
const rpc::OplogQueryMetadata&,
const OpTime&,
- const OpTime&) final;
+ const OpTime&) const final;
+
+ ChangeSyncSourceAction shouldChangeSyncSourceOnError(const HostAndPort&,
+ const OpTime&) const final;
OpTime getLastCommittedOpTime() const final;
diff --git a/src/mongo/db/repl/sync_source_selector.h b/src/mongo/db/repl/sync_source_selector.h
index bf96d335dfe..10590d6d5be 100644
--- a/src/mongo/db/repl/sync_source_selector.h
+++ b/src/mongo/db/repl/sync_source_selector.h
@@ -50,7 +50,7 @@ struct SyncSourceResolverResponse;
enum class ChangeSyncSourceAction {
kContinueSyncing,
- kStopSyncingAndDropLastBatch,
+ kStopSyncingAndDropLastBatchIfPresent,
kStopSyncingAndEnqueueLastBatch
};
@@ -90,11 +90,20 @@ public:
*
* "now" is used to skip over currently denylisted sync sources.
*/
- virtual ChangeSyncSourceAction shouldChangeSyncSource(const HostAndPort& currentSource,
- const rpc::ReplSetMetadata& replMetadata,
- const rpc::OplogQueryMetadata& oqMetadata,
- const OpTime& previousOpTimeFetched,
- const OpTime& lastOpTimeFetched) = 0;
+ virtual ChangeSyncSourceAction shouldChangeSyncSource(
+ const HostAndPort& currentSource,
+ const rpc::ReplSetMetadata& replMetadata,
+ const rpc::OplogQueryMetadata& oqMetadata,
+ const OpTime& previousOpTimeFetched,
+ const OpTime& lastOpTimeFetched) const = 0;
+
+ /*
+ * Determines if a new sync source should be chosen when an error occures during fetching,
+ * without attempting retries on the same sync source.
+ * Because metadata is not available, checks are a subset of those in shouldChangeSyncSource.
+ */
+ virtual ChangeSyncSourceAction shouldChangeSyncSourceOnError(
+ const HostAndPort& currentSource, const OpTime& lastOpTimeFetched) const = 0;
};
} // namespace repl
diff --git a/src/mongo/db/repl/sync_source_selector_mock.cpp b/src/mongo/db/repl/sync_source_selector_mock.cpp
index 51e0b5a2c0f..2368064e4e8 100644
--- a/src/mongo/db/repl/sync_source_selector_mock.cpp
+++ b/src/mongo/db/repl/sync_source_selector_mock.cpp
@@ -61,7 +61,12 @@ ChangeSyncSourceAction SyncSourceSelectorMock::shouldChangeSyncSource(
const rpc::ReplSetMetadata&,
const rpc::OplogQueryMetadata& oqMetadata,
const OpTime& previousOpTimeFetched,
- const OpTime& lastOpTimeFetched) {
+ const OpTime& lastOpTimeFetched) const {
+ return ChangeSyncSourceAction::kContinueSyncing;
+}
+
+ChangeSyncSourceAction SyncSourceSelectorMock::shouldChangeSyncSourceOnError(
+ const HostAndPort&, const OpTime& lastOpTimeFetched) const {
return ChangeSyncSourceAction::kContinueSyncing;
}
diff --git a/src/mongo/db/repl/sync_source_selector_mock.h b/src/mongo/db/repl/sync_source_selector_mock.h
index dde3ccaa457..8253831dd59 100644
--- a/src/mongo/db/repl/sync_source_selector_mock.h
+++ b/src/mongo/db/repl/sync_source_selector_mock.h
@@ -57,7 +57,10 @@ public:
const rpc::ReplSetMetadata&,
const rpc::OplogQueryMetadata& oqMetadata,
const OpTime& previousOpTimeFetched,
- const OpTime& lastOpTimeFetched) override;
+ const OpTime& lastOpTimeFetched) const override;
+
+ ChangeSyncSourceAction shouldChangeSyncSourceOnError(
+ const HostAndPort&, const OpTime& lastOpTimeFetched) const override;
/**
* Sets a function that will be run every time chooseNewSyncSource() is called.
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 4d983b57979..6cf88435d70 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -219,7 +219,13 @@ public:
const rpc::ReplSetMetadata& replMetadata,
const rpc::OplogQueryMetadata& oqMetadata,
const OpTime& previousOpTimeFetched,
- const OpTime& lastOpTimeFetched) final {
+ const OpTime& lastOpTimeFetched) const final {
+ return ChangeSyncSourceAction::kContinueSyncing;
+ }
+
+ // Tenant migration does not re-evaluate sync source on error.
+ ChangeSyncSourceAction shouldStopFetchingOnError(const HostAndPort& source,
+ const OpTime& lastOpTimeFetched) const final {
return ChangeSyncSourceAction::kContinueSyncing;
}
diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp
index e873f0dd6c0..47a043e093e 100644
--- a/src/mongo/db/repl/topology_coordinator.cpp
+++ b/src/mongo/db/repl/topology_coordinator.cpp
@@ -3078,6 +3078,32 @@ bool TopologyCoordinator::shouldChangeSyncSource(const HostAndPort& currentSourc
return false;
}
+bool TopologyCoordinator::shouldChangeSyncSourceOnError(const HostAndPort& currentSource,
+ const OpTime& lastOpTimeFetched,
+ Date_t now) const {
+ // We change sync source on error if
+ // 1) A forced sync source change has been requested.
+ // 2) Chaining is disabled and a new primary has been detected.
+ // 3) A more eligible node exists. Note this covers the case where our current sync source is
+ // down.
+
+ auto [initialDecision, currentSourceIndex] =
+ _shouldChangeSyncSourceInitialChecks(currentSource);
+ if (initialDecision != ChangeSyncSourceDecision::kMaybe) {
+ return initialDecision == ChangeSyncSourceDecision::kYes;
+ }
+
+ if (_shouldChangeSyncSourceDueToNewPrimary(currentSource, currentSourceIndex)) {
+ return true;
+ }
+
+ if (_shouldChangeSyncSourceDueToBetterEligibleSource(
+ currentSource, currentSourceIndex, lastOpTimeFetched, now))
+ return true;
+
+ return false;
+}
+
std::pair<TopologyCoordinator::ChangeSyncSourceDecision, int>
TopologyCoordinator::_shouldChangeSyncSourceInitialChecks(const HostAndPort& currentSource) const {
if (_selfIndex == -1) {
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 0bf275af1cc..3d3084ddf76 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -272,6 +272,14 @@ public:
Date_t now) const;
/**
+ * Determines if a new sync source should be chosen when an error occurs. In this case
+ * we do not have current metadata from the sync source and so can only do a subset of
+ * the checks we do when we get a response.
+ */
+ bool shouldChangeSyncSourceOnError(const HostAndPort& currentSource,
+ const OpTime& lastOpTimeFetched,
+ Date_t now) const;
+ /**
* Returns true if we find an eligible sync source that is significantly closer than our current
* sync source.
*/
diff --git a/src/mongo/db/repl/topology_coordinator_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
index 924fbf01768..3898035b08c 100644
--- a/src/mongo/db/repl/topology_coordinator_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_v1_test.cpp
@@ -4101,6 +4101,30 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenSyncSourceIsDown) {
ASSERT_EQUALS(1, countLogLinesWithId(5929000));
}
+TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceOnErrorWhenSyncSourceIsDown) {
+ // In this test, the TopologyCoordinator should tell us to change sync sources away from
+ // "host2" and to "host3" since "host2" is down.
+ OpTime election = OpTime();
+ OpTime oldSyncSourceOpTime = OpTime(Timestamp(4, 0), 0);
+ // ahead by less than maxSyncSourceLagSecs (30)
+ OpTime freshOpTime = OpTime(Timestamp(5, 0), 0);
+ setSelfMemberState(MemberState::RS_SECONDARY);
+
+ HeartbeatResponseAction nextAction = receiveDownHeartbeat(HostAndPort("host2"), "rs0");
+ ASSERT_NO_ACTION(nextAction.getAction());
+ nextAction = receiveUpHeartbeat(
+ HostAndPort("host3"), "rs0", MemberState::RS_SECONDARY, election, freshOpTime);
+ ASSERT_NO_ACTION(nextAction.getAction());
+
+ // set up complete, time for actual check
+ startCapturingLogMessages();
+ ASSERT_TRUE(getTopoCoord().shouldChangeSyncSourceOnError(
+ HostAndPort("host2"), oldSyncSourceOpTime, now()));
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source"));
+ ASSERT_EQUALS(1, countLogLinesWithId(5929000));
+}
+
TEST_F(HeartbeatResponseTestV1, ShouldNotChangeSyncSourceFromStalePrimary) {
// In this test, the TopologyCoordinator should still sync to the primary, "host2", although
// "host3" is fresher.
@@ -4153,6 +4177,19 @@ TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceWhenMemberNotInConfig) {
ASSERT_EQUALS(1, countLogLinesWithId(21831));
}
+TEST_F(HeartbeatResponseTestV1, ShouldChangeSyncSourceOnErrorWhenMemberNotInConfig) {
+ // In this test, the TopologyCoordinator should tell us to change sync sources away from
+ // "host4" since "host4" is absent from the config of version 10.
+ ReplSetMetadata replMetadata(0, {OpTime(), Date_t()}, OpTime(), 10, 0, OID(), -1, false);
+
+ startCapturingLogMessages();
+ ASSERT_TRUE(
+ getTopoCoord().shouldChangeSyncSourceOnError(HostAndPort("host4"), OpTime(), now()));
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source"));
+ ASSERT_EQUALS(1, countLogLinesWithId(21831));
+}
+
TEST_F(HeartbeatResponseTestV1,
ShouldntChangeSyncSourceWhenNotSyncingFromPrimaryAndChainingDisabledButNoNewPrimary) {
// In this test, the TopologyCoordinator should not tell us to change sync sources away from
@@ -4232,6 +4269,80 @@ TEST_F(HeartbeatResponseTestV1,
}
TEST_F(HeartbeatResponseTestV1,
+ ShouldntChangeSyncSourceOnErrorWhenNotSyncingFromPrimaryAndChainingDisabledButNoNewPrimary) {
+ // In this test, the TopologyCoordinator should not tell us to change sync sources away from
+ // "host2" since we are not aware of who the new primary is.
+
+ setSelfMemberState(MemberState::RS_SECONDARY);
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version" << 5 << "term" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "host1:27017")
+ << BSON("_id" << 1 << "host"
+ << "host2:27017")
+ << BSON("_id" << 2 << "host"
+ << "host3:27017"))
+ << "protocolVersion" << 1 << "settings"
+ << BSON("heartbeatTimeoutSecs" << 5 << "chainingAllowed" << false)),
+ 0);
+
+ OpTime staleOpTime = OpTime(Timestamp(4, 0), 0);
+ OpTime freshOpTime = OpTime(Timestamp(5, 0), 0);
+
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSourceOnError(
+ HostAndPort("host2"),
+ staleOpTime, // lastOpTimeFetched so that we are behind host2
+ now()));
+}
+
+TEST_F(HeartbeatResponseTestV1,
+ ShouldChangeSyncSourceOnErrorWhenNotSyncingFromPrimaryChainingDisabledAndFoundNewPrimary) {
+ // In this test, the TopologyCoordinator should tell us to change sync sources away from
+ // "host2" since "host3" is the new primary and chaining is disabled.
+
+ setSelfMemberState(MemberState::RS_SECONDARY);
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version" << 5 << "term" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "host1:27017")
+ << BSON("_id" << 1 << "host"
+ << "host2:27017")
+ << BSON("_id" << 2 << "host"
+ << "host3:27017"))
+ << "protocolVersion" << 1 << "settings"
+ << BSON("heartbeatTimeoutSecs" << 5 << "chainingAllowed" << false)),
+ 0);
+
+ OpTime oldElection = OpTime(Timestamp(1, 1), 1);
+ OpTime curElection = OpTime(Timestamp(4, 2), 2);
+ OpTime staleOpTime = OpTime(Timestamp(4, 1), 1);
+ OpTime freshOpTime = OpTime(Timestamp(5, 1), 2);
+
+ // Old host should still be up; this is a stale heartbeat.
+ HeartbeatResponseAction nextAction = receiveUpHeartbeat(
+ HostAndPort("host2"), "rs0", MemberState::RS_PRIMARY, oldElection, staleOpTime);
+ ASSERT_NO_ACTION(nextAction.getAction());
+
+ getTopoCoord().updateTerm(2, now());
+
+ // Set that host3 is the new primary.
+ nextAction = receiveUpHeartbeat(
+ HostAndPort("host3"), "rs0", MemberState::RS_PRIMARY, curElection, freshOpTime);
+ ASSERT_NO_ACTION(nextAction.getAction());
+
+ startCapturingLogMessages();
+ ASSERT(getTopoCoord().shouldChangeSyncSourceOnError(
+ HostAndPort("host2"),
+ staleOpTime, // lastOpTimeFetched so that we are behind host2 and host3
+ now()));
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("Choosing new sync source"));
+ ASSERT_EQUALS(1, countLogLinesWithId(3962100));
+}
+
+TEST_F(HeartbeatResponseTestV1,
ShouldNotChangeSyncSourceWhenNotSyncingFromPrimaryChainingDisabledAndTwoPrimaries) {
// In this test, the TopologyCoordinator should not tell us to change sync sources away from
// "host2" since, though "host3" is the new primary, "host2" also thinks it is primary.
@@ -4304,6 +4415,33 @@ TEST_F(HeartbeatResponseTestV1, ShouldntChangeSyncSourceWhenChainingDisabledAndW
now()));
}
+TEST_F(HeartbeatResponseTestV1,
+ ShouldntChangeSyncSourceOnErrorWhenChainingDisabledAndWeArePrimary) {
+ updateConfig(BSON("_id"
+ << "rs0"
+ << "version" << 5 << "term" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "host1:27017")
+ << BSON("_id" << 1 << "host"
+ << "host2:27017")
+ << BSON("_id" << 2 << "host"
+ << "host3:27017"))
+ << "protocolVersion" << 1 << "settings"
+ << BSON("heartbeatTimeoutSecs" << 5 << "chainingAllowed" << false)),
+ 0);
+
+ OpTime staleOpTime = OpTime(Timestamp(4, 0), 0);
+ OpTime freshOpTime = OpTime(Timestamp(5, 0), 0);
+
+ // Set that we are primary.
+ makeSelfPrimary();
+
+ ASSERT_FALSE(getTopoCoord().shouldChangeSyncSourceOnError(
+ HostAndPort("host2"),
+ staleOpTime, // lastOpTimeFetched so that we are behind host2
+ now()));
+}
+
class ReevalSyncSourceTest : public TopoCoordTest {
public:
virtual void setUp() {
diff --git a/src/mongo/embedded/replication_coordinator_embedded.cpp b/src/mongo/embedded/replication_coordinator_embedded.cpp
index b178e9d1972..53e95a0739c 100644
--- a/src/mongo/embedded/replication_coordinator_embedded.cpp
+++ b/src/mongo/embedded/replication_coordinator_embedded.cpp
@@ -485,7 +485,12 @@ ChangeSyncSourceAction ReplicationCoordinatorEmbedded::shouldChangeSyncSource(
const rpc::ReplSetMetadata&,
const rpc::OplogQueryMetadata&,
const OpTime&,
- const OpTime&) {
+ const OpTime&) const {
+ UASSERT_NOT_IMPLEMENTED;
+}
+
+ChangeSyncSourceAction ReplicationCoordinatorEmbedded::shouldChangeSyncSourceOnError(
+ const HostAndPort&, const OpTime&) const {
UASSERT_NOT_IMPLEMENTED;
}
diff --git a/src/mongo/embedded/replication_coordinator_embedded.h b/src/mongo/embedded/replication_coordinator_embedded.h
index 9a6bad99f28..b6c5bfe1c62 100644
--- a/src/mongo/embedded/replication_coordinator_embedded.h
+++ b/src/mongo/embedded/replication_coordinator_embedded.h
@@ -255,7 +255,10 @@ public:
const rpc::ReplSetMetadata&,
const rpc::OplogQueryMetadata&,
const repl::OpTime&,
- const repl::OpTime&) override;
+ const repl::OpTime&) const override;
+
+ repl::ChangeSyncSourceAction shouldChangeSyncSourceOnError(const HostAndPort&,
+ const repl::OpTime&) const override;
repl::OpTime getLastCommittedOpTime() const override;