summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/bgsync.cpp16
-rw-r--r--src/mongo/db/repl/data_replicator_external_state.h10
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.cpp19
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.h9
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp12
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_initial_sync.h9
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.cpp10
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.h11
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp11
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp55
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp26
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp24
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h9
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp10
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h9
-rw-r--r--src/mongo/db/repl/replication_coordinator_noop.cpp10
-rw-r--r--src/mongo/db/repl/replication_coordinator_noop.h9
-rw-r--r--src/mongo/db/repl/sync_source_selector.h15
-rw-r--r--src/mongo/db/repl/sync_source_selector_mock.cpp12
-rw-r--r--src/mongo/db/repl/sync_source_selector_mock.h9
-rw-r--r--src/mongo/db/repl/topology_coordinator.cpp11
-rw-r--r--src/mongo/db/repl/topology_coordinator.h2
22 files changed, 187 insertions, 121 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index b1bdb3eff66..77daa595256 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -110,10 +110,11 @@ public:
ReplicationCoordinator* replicationCoordinator,
ReplicationCoordinatorExternalState* replicationCoordinatorExternalState,
BackgroundSync* bgsync);
- bool shouldStopFetching(const HostAndPort& source,
- const rpc::ReplSetMetadata& replMetadata,
- const rpc::OplogQueryMetadata& oqMetadata,
- const OpTime& lastOpTimeFetched) override;
+ ChangeSyncSourceAction shouldStopFetching(const HostAndPort& source,
+ const rpc::ReplSetMetadata& replMetadata,
+ const rpc::OplogQueryMetadata& oqMetadata,
+ const OpTime& previousOpTimeFetched,
+ const OpTime& lastOpTimeFetched) override;
private:
BackgroundSync* _bgsync;
@@ -126,17 +127,18 @@ DataReplicatorExternalStateBackgroundSync::DataReplicatorExternalStateBackground
: DataReplicatorExternalStateImpl(replicationCoordinator, replicationCoordinatorExternalState),
_bgsync(bgsync) {}
-bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching(
+ChangeSyncSourceAction DataReplicatorExternalStateBackgroundSync::shouldStopFetching(
const HostAndPort& source,
const rpc::ReplSetMetadata& replMetadata,
const rpc::OplogQueryMetadata& oqMetadata,
+ const OpTime& previousOpTimeFetched,
const OpTime& lastOpTimeFetched) {
if (_bgsync->shouldStopFetching()) {
- return true;
+ return ChangeSyncSourceAction::kStopSyncingAndEnqueueLastBatch;
}
return DataReplicatorExternalStateImpl::shouldStopFetching(
- source, replMetadata, oqMetadata, lastOpTimeFetched);
+ source, replMetadata, oqMetadata, previousOpTimeFetched, lastOpTimeFetched);
}
size_t getSize(const BSONObj& o) {
diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h
index f1906c50b3c..2d2e56df3d1 100644
--- a/src/mongo/db/repl/data_replicator_external_state.h
+++ b/src/mongo/db/repl/data_replicator_external_state.h
@@ -37,6 +37,7 @@
#include "mongo/db/repl/optime_with.h"
#include "mongo/db/repl/repl_set_config.h"
#include "mongo/db/repl/replication_consistency_markers.h"
+#include "mongo/db/repl/sync_source_selector.h"
#include "mongo/rpc/metadata/oplog_query_metadata.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/util/concurrency/thread_pool.h"
@@ -95,10 +96,11 @@ public:
* sync source (from metadata); and whether this sync source has a sync source (also from
* metadata).
*/
- virtual bool shouldStopFetching(const HostAndPort& source,
- const rpc::ReplSetMetadata& replMetadata,
- const rpc::OplogQueryMetadata& oqMetadata,
- const OpTime& lastOpTimeFetched) = 0;
+ virtual ChangeSyncSourceAction shouldStopFetching(const HostAndPort& source,
+ const rpc::ReplSetMetadata& replMetadata,
+ const rpc::OplogQueryMetadata& oqMetadata,
+ const OpTime& previousOpTimeFetched,
+ const OpTime& lastOpTimeFetched) = 0;
/**
* This function creates an oplog buffer of the type specified at server startup.
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
index 385883939fd..6849dd52418 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
@@ -91,13 +91,16 @@ void DataReplicatorExternalStateImpl::processMetadata(const rpc::ReplSetMetadata
}
}
-bool DataReplicatorExternalStateImpl::shouldStopFetching(const HostAndPort& source,
- const rpc::ReplSetMetadata& replMetadata,
- const rpc::OplogQueryMetadata& oqMetadata,
- const OpTime& lastOpTimeFetched) {
+ChangeSyncSourceAction DataReplicatorExternalStateImpl::shouldStopFetching(
+ const HostAndPort& source,
+ const rpc::ReplSetMetadata& replMetadata,
+ const rpc::OplogQueryMetadata& oqMetadata,
+ const OpTime& previousOpTimeFetched,
+ const OpTime& lastOpTimeFetched) {
// Re-evaluate quality of sync target.
- if (_replicationCoordinator->shouldChangeSyncSource(
- source, replMetadata, oqMetadata, lastOpTimeFetched)) {
+ auto changeSyncSourceAction = _replicationCoordinator->shouldChangeSyncSource(
+ source, replMetadata, oqMetadata, previousOpTimeFetched, lastOpTimeFetched);
+ if (changeSyncSourceAction != ChangeSyncSourceAction::kContinueSyncing) {
LOGV2(21150,
"Canceling oplog query due to OplogQueryMetadata. We have to choose a new "
"sync source. Current source: {syncSource}, OpTime {lastAppliedOpTime}, "
@@ -107,10 +110,8 @@ bool DataReplicatorExternalStateImpl::shouldStopFetching(const HostAndPort& sour
"syncSource"_attr = source,
"lastAppliedOpTime"_attr = oqMetadata.getLastOpApplied(),
"syncSourceIndex"_attr = oqMetadata.getSyncSourceIndex());
-
- return true;
}
- return false;
+ return changeSyncSourceAction;
}
std::unique_ptr<OplogBuffer> DataReplicatorExternalStateImpl::makeInitialSyncOplogBuffer(
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h
index 353b9b532e6..3741fbae7c4 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.h
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.h
@@ -54,10 +54,11 @@ public:
void processMetadata(const rpc::ReplSetMetadata& replMetadata,
rpc::OplogQueryMetadata oqMetadata) override;
- bool shouldStopFetching(const HostAndPort& source,
- const rpc::ReplSetMetadata& replMetadata,
- const rpc::OplogQueryMetadata& oqMetadata,
- const OpTime& lastOpTimeFetched) override;
+ ChangeSyncSourceAction shouldStopFetching(const HostAndPort& source,
+ const rpc::ReplSetMetadata& replMetadata,
+ const rpc::OplogQueryMetadata& oqMetadata,
+ const OpTime& previousOpTimeFetched,
+ const OpTime& lastOpTimeFetched) override;
std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(OperationContext* opCtx) const override;
diff --git a/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp b/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp
index 11cbe65f911..f26f2a1dd0a 100644
--- a/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_initial_sync.cpp
@@ -40,16 +40,18 @@ DataReplicatorExternalStateInitialSync::DataReplicatorExternalStateInitialSync(
: DataReplicatorExternalStateImpl(replicationCoordinator, replicationCoordinatorExternalState) {
}
-bool DataReplicatorExternalStateInitialSync::shouldStopFetching(const HostAndPort&,
- const rpc::ReplSetMetadata&,
- const rpc::OplogQueryMetadata&,
- const OpTime& lastOpTimeFetched) {
+ChangeSyncSourceAction DataReplicatorExternalStateInitialSync::shouldStopFetching(
+ const HostAndPort&,
+ const rpc::ReplSetMetadata&,
+ const rpc::OplogQueryMetadata&,
+ const OpTime& previousOpTimeFetched,
+ const OpTime& lastOpTimeFetched) {
// Since initial sync does not allow for sync source changes, it should not check if there are
// better sync sources. If there is a problem on the sync source, it will manifest itself in the
// cloning phase as well, and cause a failure there.
- return false;
+ return ChangeSyncSourceAction::kContinueSyncing;
}
} // namespace repl
diff --git a/src/mongo/db/repl/data_replicator_external_state_initial_sync.h b/src/mongo/db/repl/data_replicator_external_state_initial_sync.h
index e656f0002da..c3c8195928c 100644
--- a/src/mongo/db/repl/data_replicator_external_state_initial_sync.h
+++ b/src/mongo/db/repl/data_replicator_external_state_initial_sync.h
@@ -44,10 +44,11 @@ public:
ReplicationCoordinator* replicationCoordinator,
ReplicationCoordinatorExternalState* replicationCoordinatorExternalState);
- bool shouldStopFetching(const HostAndPort& source,
- const rpc::ReplSetMetadata& replMetadata,
- const rpc::OplogQueryMetadata& oqMetadata,
- const OpTime& lastOpTimeFetched) override;
+ ChangeSyncSourceAction shouldStopFetching(const HostAndPort& source,
+ const rpc::ReplSetMetadata& replMetadata,
+ const rpc::OplogQueryMetadata& oqMetadata,
+ const OpTime& previousOpTimeFetched,
+ const OpTime& lastOpTimeFetched) override;
};
} // namespace repl
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
index 0bc385a6708..305c4e1abd2 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
@@ -89,10 +89,12 @@ void DataReplicatorExternalStateMock::processMetadata(const rpc::ReplSetMetadata
metadataWasProcessed = true;
}
-bool DataReplicatorExternalStateMock::shouldStopFetching(const HostAndPort& source,
- const rpc::ReplSetMetadata& replMetadata,
- const rpc::OplogQueryMetadata& oqMetadata,
- const OpTime& lastOpTimeFetched) {
+ChangeSyncSourceAction DataReplicatorExternalStateMock::shouldStopFetching(
+ const HostAndPort& source,
+ const rpc::ReplSetMetadata& replMetadata,
+ const rpc::OplogQueryMetadata& oqMetadata,
+ const OpTime& previousOpTimeFetched,
+ const OpTime& lastOpTimeFetched) {
lastSyncSourceChecked = source;
syncSourceLastOpTime = oqMetadata.getLastOpApplied();
syncSourceHasSyncSource = oqMetadata.getSyncSourceIndex() != -1;
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h
index ce53a542eb1..a9155c30b9e 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.h
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.h
@@ -51,10 +51,11 @@ public:
void processMetadata(const rpc::ReplSetMetadata& metadata,
rpc::OplogQueryMetadata oqMetadata) override;
- bool shouldStopFetching(const HostAndPort& source,
- const rpc::ReplSetMetadata& replMetadata,
- const rpc::OplogQueryMetadata& oqMetadata,
- const OpTime& lastOpTimeFetched) override;
+ ChangeSyncSourceAction shouldStopFetching(const HostAndPort& source,
+ const rpc::ReplSetMetadata& replMetadata,
+ const rpc::OplogQueryMetadata& oqMetadata,
+ const OpTime& previousOpTimeFetched,
+ const OpTime& lastOpTimeFetched) override;
std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(OperationContext* opCtx) const override;
@@ -86,7 +87,7 @@ public:
bool syncSourceHasSyncSource = false;
// Returned by shouldStopFetching.
- bool shouldStopFetchingResult = false;
+ ChangeSyncSourceAction shouldStopFetchingResult = ChangeSyncSourceAction::kContinueSyncing;
// Override to change applyOplogBatch behavior.
using ApplyOplogBatchFn = std::function<StatusWith<OpTime>(
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 9ea641dfc5a..8adea565621 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -159,12 +159,13 @@ public:
void blacklistSyncSource(const HostAndPort& host, Date_t until) override {
_syncSourceSelector->blacklistSyncSource(host, until);
}
- bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const rpc::ReplSetMetadata& replMetadata,
- const rpc::OplogQueryMetadata& oqMetadata,
- const OpTime& lastOpTimeFetched) override {
+ ChangeSyncSourceAction shouldChangeSyncSource(const HostAndPort& currentSource,
+ const rpc::ReplSetMetadata& replMetadata,
+ const rpc::OplogQueryMetadata& oqMetadata,
+ const OpTime& previousOpTimeFetched,
+ const OpTime& lastOpTimeFetched) override {
return _syncSourceSelector->shouldChangeSyncSource(
- currentSource, replMetadata, oqMetadata, lastOpTimeFetched);
+ currentSource, replMetadata, oqMetadata, previousOpTimeFetched, lastOpTimeFetched);
}
void scheduleNetworkResponse(std::string cmdName, const BSONObj& obj) {
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index ebc0e522a11..7bdd2af9b90 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -680,14 +680,16 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
}
// This lastFetched value is the last OpTime from the previous batch.
- auto lastFetched = _getLastOpTimeFetched();
+ auto previousOpTimeFetched = _getLastOpTimeFetched();
auto validateResult = OplogFetcher::validateDocuments(
- documents, _firstBatch, lastFetched.getTimestamp(), _startingPoint);
+ documents, _firstBatch, previousOpTimeFetched.getTimestamp(), _startingPoint);
if (!validateResult.isOK()) {
return validateResult.getStatus();
}
auto info = validateResult.getValue();
+ // If the batch is empty, set 'lastDocOpTime' to the lastFetched from the previous batch.
+ auto lastDocOpTime = info.lastDocument.isNull() ? previousOpTimeFetched : info.lastDocument;
// Process replset metadata. It is important that this happen after we've validated the
// first batch, so we don't progress our knowledge of the commit point from a
@@ -704,6 +706,24 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
return metadataResult.getStatus();
}
auto replSetMetadata = metadataResult.getValue();
+
+ // Determine if we should stop syncing from our current sync source.
+ auto changeSyncSourceAction = _dataReplicatorExternalState->shouldStopFetching(
+ _source, replSetMetadata, oqMetadata, previousOpTimeFetched, lastDocOpTime);
+ str::stream errMsg;
+ errMsg << "sync source " << _source.toString();
+ errMsg << " (config version: " << replSetMetadata.getConfigVersion();
+ errMsg << "; last applied optime: " << oqMetadata.getLastOpApplied().toString();
+ errMsg << "; sync source index: " << oqMetadata.getSyncSourceIndex();
+ errMsg << "; has primary index: " << oqMetadata.hasPrimaryIndex();
+ errMsg << ") is no longer valid";
+ errMsg << " previous batch last fetched optime: " << previousOpTimeFetched.toString();
+ errMsg << " current batch last fetched optime: " << lastDocOpTime.toString();
+
+ if (changeSyncSourceAction == ChangeSyncSourceAction::kStopSyncingAndDropLastBatch) {
+ return Status(ErrorCodes::InvalidSyncSource, errMsg);
+ }
+
_dataReplicatorExternalState->processMetadata(replSetMetadata, oqMetadata);
// Increment stats. We read all of the docs in the query.
@@ -717,6 +737,10 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
return status;
}
+ if (changeSyncSourceAction == ChangeSyncSourceAction::kStopSyncingAndEnqueueLastBatch) {
+ return Status(ErrorCodes::InvalidSyncSource, errMsg);
+ }
+
if (MONGO_unlikely(hangOplogFetcherBeforeAdvancingLastFetched.shouldFail())) {
hangOplogFetcherBeforeAdvancingLastFetched.pauseWhileSet();
}
@@ -725,14 +749,9 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
// of this fetcher.
_startingPoint = StartingPoint::kSkipFirstDoc;
- // We have now processed the batch and should move forward our view of _lastFetched.
- if (documents.size() > 0) {
- auto lastDocOpTimeRes = OpTime::parseFromOplogEntry(documents.back());
- if (!lastDocOpTimeRes.isOK()) {
- return lastDocOpTimeRes.getStatus();
- }
-
- auto lastDocOpTime = lastDocOpTimeRes.getValue();
+ // We have now processed the batch. We should only move forward our view of _lastFetched if the
+ // batch was not empty.
+ if (lastDocOpTime != previousOpTimeFetched) {
LOGV2_DEBUG(21273,
3,
"Oplog fetcher setting last fetched optime ahead after batch: {lastDocOpTime}",
@@ -743,22 +762,6 @@ Status OplogFetcher::_onSuccessfulBatch(const Documents& documents) {
_lastFetched = lastDocOpTime;
}
- // Get the last fetched optime from the most recent batch.
- lastFetched = _getLastOpTimeFetched();
-
- if (_dataReplicatorExternalState->shouldStopFetching(
- _source, replSetMetadata, oqMetadata, lastFetched)) {
- str::stream errMsg;
- errMsg << "sync source " << _source.toString();
- errMsg << " (config version: " << replSetMetadata.getConfigVersion();
- errMsg << "; last applied optime: " << oqMetadata.getLastOpApplied().toString();
- errMsg << "; sync source index: " << oqMetadata.getSyncSourceIndex();
- errMsg << "; has primary index: " << oqMetadata.hasPrimaryIndex();
- errMsg << ") is no longer valid";
- errMsg << "last fetched optime: " << lastFetched.toString();
- return Status(ErrorCodes::InvalidSyncSource, errMsg);
- }
-
_firstBatch = false;
return Status::OK();
}
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index 88086deaeeb..2359c1ba250 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -323,7 +323,9 @@ protected:
* Tests checkSyncSource result handling.
*/
void testSyncSourceChecking(const rpc::ReplSetMetadata& replMetadata,
- const rpc::OplogQueryMetadata& oqMetadata);
+ const rpc::OplogQueryMetadata& oqMetadata,
+ ChangeSyncSourceAction changeSyncSourceAction =
+ ChangeSyncSourceAction::kStopSyncingAndEnqueueLastBatch);
void validateLastBatch(bool skipFirstDoc, OplogFetcher::Documents docs, OpTime lastFetched);
@@ -478,14 +480,15 @@ std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(const Messag
}
void OplogFetcherTest::testSyncSourceChecking(const rpc::ReplSetMetadata& replMetadata,
- const rpc::OplogQueryMetadata& oqMetadata) {
+ const rpc::OplogQueryMetadata& oqMetadata,
+ ChangeSyncSourceAction changeSyncSourceAction) {
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()});
auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()});
auto metadataObj = makeOplogBatchMetadata(replMetadata, oqMetadata);
- dataReplicatorExternalState->shouldStopFetchingResult = true;
+ dataReplicatorExternalState->shouldStopFetchingResult = changeSyncSourceAction;
auto shutdownState =
processSingleBatch(makeFirstBatch(0, {firstEntry, secondEntry, thirdEntry}, metadataObj),
@@ -1832,6 +1835,10 @@ TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetc
ASSERT_EQUALS(source, dataReplicatorExternalState->lastSyncSourceChecked);
ASSERT_EQUALS(oqMetadata.getLastOpApplied(), dataReplicatorExternalState->syncSourceLastOpTime);
ASSERT_TRUE(dataReplicatorExternalState->syncSourceHasSyncSource);
+
+ // We should have enqueued the last batch if the 'shouldStopFetching' check returns
+ // kStopSyncingAndEnqueueLastBatch.
+ ASSERT_FALSE(lastEnqueuedDocuments.empty());
}
TEST_F(OplogFetcherTest,
@@ -1845,6 +1852,19 @@ TEST_F(OplogFetcherTest,
ASSERT_EQUALS(oplogQueryMetadata.getLastOpApplied(),
dataReplicatorExternalState->syncSourceLastOpTime);
ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource);
+
+ // We should have enqueued the last batch if the 'shouldStopFetching' check returns
+ // kStopSyncingAndEnqueueLastBatch.
+ ASSERT_FALSE(lastEnqueuedDocuments.empty());
+}
+
+TEST_F(OplogFetcherTest, FailedSyncSourceCheckReturnsStopSyncingAndDropBatch) {
+ testSyncSourceChecking(
+ replSetMetadata, oqMetadata, ChangeSyncSourceAction::kStopSyncingAndDropLastBatch);
+
+ // If the 'shouldStopFetching' check returns kStopSyncingAndDropLastBatch, we should not enqueue
+ // any documents.
+ ASSERT_TRUE(lastEnqueuedDocuments.empty());
}
TEST_F(OplogFetcherTest, ValidateDocumentsReturnsNoSuchKeyIfTimestampIsNotFoundInAnyDocument) {
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index c058325362e..ea39a6edcba 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -4803,20 +4803,30 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opC
_reportUpstream_inlock(std::move(lock));
}
-bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource,
- const rpc::ReplSetMetadata& replMetadata,
- const rpc::OplogQueryMetadata& oqMetadata,
- const OpTime& lastOpTimeFetched) {
+ChangeSyncSourceAction ReplicationCoordinatorImpl::shouldChangeSyncSource(
+ const HostAndPort& currentSource,
+ const rpc::ReplSetMetadata& replMetadata,
+ const rpc::OplogQueryMetadata& oqMetadata,
+ const OpTime& previousOpTimeFetched,
+ const OpTime& lastOpTimeFetched) {
stdx::lock_guard<Latch> lock(_mutex);
const auto now = _replExecutor->now();
+
if (_topCoord->shouldChangeSyncSource(
currentSource, replMetadata, oqMetadata, lastOpTimeFetched, now)) {
- return true;
+ return ChangeSyncSourceAction::kStopSyncingAndEnqueueLastBatch;
}
const auto readPreference = _getSyncSourceReadPreference(lock);
- return _topCoord->shouldChangeSyncSourceDueToPingTime(
- currentSource, _getMemberState_inlock(), lastOpTimeFetched, now, readPreference);
+ if (_topCoord->shouldChangeSyncSourceDueToPingTime(
+ currentSource, _getMemberState_inlock(), previousOpTimeFetched, now, readPreference)) {
+ // We should drop the last batch if we find a significantly closer node. This is to
+ // avoid advancing our 'lastFetched', which makes it more likely that we will be able to
+ // choose the closer node as our sync source.
+ return ChangeSyncSourceAction::kStopSyncingAndDropLastBatch;
+ }
+
+ return ChangeSyncSourceAction::kContinueSyncing;
}
void ReplicationCoordinatorImpl::_updateLastCommittedOpTimeAndWallTime(WithLock lk) {
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index cd9d487b4bc..354304e2a5f 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -283,10 +283,11 @@ public:
virtual void resetLastOpTimesFromOplog(OperationContext* opCtx,
DataConsistency consistency) override;
- virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const rpc::ReplSetMetadata& replMetadata,
- const rpc::OplogQueryMetadata& oqMetadata,
- const OpTime& lastOpTimeFetched) override;
+ virtual ChangeSyncSourceAction shouldChangeSyncSource(const HostAndPort& currentSource,
+ const rpc::ReplSetMetadata& replMetadata,
+ const rpc::OplogQueryMetadata& oqMetadata,
+ const OpTime& previousOpTimeFetched,
+ const OpTime& lastOpTimeFetched) override;
virtual OpTime getLastCommittedOpTime() const override;
virtual OpTimeAndWallTime getLastCommittedOpTimeAndWallTime() const override;
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index dd7ca0f8e07..a531cb41da2 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -501,10 +501,12 @@ bool ReplicationCoordinatorMock::lastOpTimesWereReset() const {
return _resetLastOpTimesCalled;
}
-bool ReplicationCoordinatorMock::shouldChangeSyncSource(const HostAndPort& currentSource,
- const rpc::ReplSetMetadata& replMetadata,
- const rpc::OplogQueryMetadata& oqMetadata,
- const OpTime& lastOpTimeFetched) {
+ChangeSyncSourceAction ReplicationCoordinatorMock::shouldChangeSyncSource(
+ const HostAndPort& currentSource,
+ const rpc::ReplSetMetadata& replMetadata,
+ const rpc::OplogQueryMetadata& oqMetadata,
+ const OpTime& previousOpTimeFetched,
+ const OpTime& lastOpTimeFetched) {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index a79fc32478d..cafb302372a 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -247,10 +247,11 @@ public:
bool lastOpTimesWereReset() const;
- virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const rpc::ReplSetMetadata& replMetadata,
- const rpc::OplogQueryMetadata& oqMetadata,
- const OpTime& lastOpTimeFetched);
+ virtual ChangeSyncSourceAction shouldChangeSyncSource(const HostAndPort& currentSource,
+ const rpc::ReplSetMetadata& replMetadata,
+ const rpc::OplogQueryMetadata& oqMetadata,
+ const OpTime& previousOpTimeFetched,
+ const OpTime& lastOpTimeFetched);
virtual OpTime getLastCommittedOpTime() const;
diff --git a/src/mongo/db/repl/replication_coordinator_noop.cpp b/src/mongo/db/repl/replication_coordinator_noop.cpp
index d6d2232bcc4..a85689d66e3 100644
--- a/src/mongo/db/repl/replication_coordinator_noop.cpp
+++ b/src/mongo/db/repl/replication_coordinator_noop.cpp
@@ -396,10 +396,12 @@ void ReplicationCoordinatorNoOp::resetLastOpTimesFromOplog(OperationContext*, Da
MONGO_UNREACHABLE;
}
-bool ReplicationCoordinatorNoOp::shouldChangeSyncSource(const HostAndPort&,
- const rpc::ReplSetMetadata&,
- const rpc::OplogQueryMetadata&,
- const OpTime&) {
+ChangeSyncSourceAction ReplicationCoordinatorNoOp::shouldChangeSyncSource(
+ const HostAndPort&,
+ const rpc::ReplSetMetadata&,
+ const rpc::OplogQueryMetadata&,
+ const OpTime&,
+ const OpTime&) {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/repl/replication_coordinator_noop.h b/src/mongo/db/repl/replication_coordinator_noop.h
index a5c96eec3b0..cef32c185e2 100644
--- a/src/mongo/db/repl/replication_coordinator_noop.h
+++ b/src/mongo/db/repl/replication_coordinator_noop.h
@@ -212,10 +212,11 @@ public:
void resetLastOpTimesFromOplog(OperationContext*, DataConsistency) final;
- bool shouldChangeSyncSource(const HostAndPort&,
- const rpc::ReplSetMetadata&,
- const rpc::OplogQueryMetadata&,
- const OpTime&) final;
+ ChangeSyncSourceAction shouldChangeSyncSource(const HostAndPort&,
+ const rpc::ReplSetMetadata&,
+ const rpc::OplogQueryMetadata&,
+ const OpTime&,
+ const OpTime&) final;
OpTime getLastCommittedOpTime() const final;
diff --git a/src/mongo/db/repl/sync_source_selector.h b/src/mongo/db/repl/sync_source_selector.h
index 2bed6b70249..98646426da4 100644
--- a/src/mongo/db/repl/sync_source_selector.h
+++ b/src/mongo/db/repl/sync_source_selector.h
@@ -48,6 +48,12 @@ namespace repl {
class OpTime;
struct SyncSourceResolverResponse;
+enum class ChangeSyncSourceAction {
+ kContinueSyncing,
+ kStopSyncingAndDropLastBatch,
+ kStopSyncingAndEnqueueLastBatch
+};
+
/**
* Manage list of viable and blocked sync sources that we can replicate from.
*/
@@ -84,10 +90,11 @@ public:
*
* "now" is used to skip over currently blacklisted sync sources.
*/
- virtual bool shouldChangeSyncSource(const HostAndPort& currentSource,
- const rpc::ReplSetMetadata& replMetadata,
- const rpc::OplogQueryMetadata& oqMetadata,
- const OpTime& lastOpTimeFetched) = 0;
+ virtual ChangeSyncSourceAction shouldChangeSyncSource(const HostAndPort& currentSource,
+ const rpc::ReplSetMetadata& replMetadata,
+ const rpc::OplogQueryMetadata& oqMetadata,
+ const OpTime& previousOpTimeFetched,
+ const OpTime& lastOpTimeFetched) = 0;
};
} // namespace repl
diff --git a/src/mongo/db/repl/sync_source_selector_mock.cpp b/src/mongo/db/repl/sync_source_selector_mock.cpp
index e77e5e3e29a..d684c957ef7 100644
--- a/src/mongo/db/repl/sync_source_selector_mock.cpp
+++ b/src/mongo/db/repl/sync_source_selector_mock.cpp
@@ -56,11 +56,13 @@ void SyncSourceSelectorMock::setChooseNewSyncSourceHook_forTest(
_chooseNewSyncSourceHook = hook;
}
-bool SyncSourceSelectorMock::shouldChangeSyncSource(const HostAndPort&,
- const rpc::ReplSetMetadata&,
- const rpc::OplogQueryMetadata& oqMetadata,
- const OpTime& lastOpTimeFetched) {
- return false;
+ChangeSyncSourceAction SyncSourceSelectorMock::shouldChangeSyncSource(
+ const HostAndPort&,
+ const rpc::ReplSetMetadata&,
+ const rpc::OplogQueryMetadata& oqMetadata,
+ const OpTime& previousOpTimeFetched,
+ const OpTime& lastOpTimeFetched) {
+ return ChangeSyncSourceAction::kContinueSyncing;
}
void SyncSourceSelectorMock::setChooseNewSyncSourceResult_forTest(const HostAndPort& syncSource) {
diff --git a/src/mongo/db/repl/sync_source_selector_mock.h b/src/mongo/db/repl/sync_source_selector_mock.h
index fd295cf4590..102a16452c8 100644
--- a/src/mongo/db/repl/sync_source_selector_mock.h
+++ b/src/mongo/db/repl/sync_source_selector_mock.h
@@ -53,10 +53,11 @@ public:
void clearSyncSourceBlacklist() override;
HostAndPort chooseNewSyncSource(const OpTime& ot) override;
void blacklistSyncSource(const HostAndPort& host, Date_t until) override;
- bool shouldChangeSyncSource(const HostAndPort&,
- const rpc::ReplSetMetadata&,
- const rpc::OplogQueryMetadata& oqMetadata,
- const OpTime& lastOpTimeFetched) override;
+ ChangeSyncSourceAction shouldChangeSyncSource(const HostAndPort&,
+ const rpc::ReplSetMetadata&,
+ const rpc::OplogQueryMetadata& oqMetadata,
+ const OpTime& previousOpTimeFetched,
+ const OpTime& lastOpTimeFetched) override;
/**
* Sets a function that will be run every time chooseNewSyncSource() is called.
diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp
index 8146e848f0a..8bc7cf36a2c 100644
--- a/src/mongo/db/repl/topology_coordinator.cpp
+++ b/src/mongo/db/repl/topology_coordinator.cpp
@@ -3016,7 +3016,7 @@ bool TopologyCoordinator::shouldChangeSyncSource(const HostAndPort& currentSourc
"Choosing new sync source. Our current sync source is not primary and does "
"not have a sync source, so we require that it is ahead of us",
"syncSource"_attr = currentSource,
- "lastFetchedOpTime"_attr = lastOpTimeFetched,
+ "lastOpTimeFetched"_attr = lastOpTimeFetched,
"syncSourceLatestOplogOpTime"_attr = currentSourceOpTime,
"isPrimary"_attr = replMetadata.getIsPrimary());
return true;
@@ -3069,7 +3069,7 @@ bool TopologyCoordinator::shouldChangeSyncSource(const HostAndPort& currentSourc
bool TopologyCoordinator::shouldChangeSyncSourceDueToPingTime(const HostAndPort& currentSource,
const MemberState& memberState,
- const OpTime& lastOpTimeFetched,
+ const OpTime& previousOpTimeFetched,
Date_t now,
const ReadPreference readPreference) {
// If we find an eligible sync source that is significantly closer than our current sync source,
@@ -3146,8 +3146,11 @@ bool TopologyCoordinator::shouldChangeSyncSourceDueToPingTime(const HostAndPort&
continue;
}
- if (_isEligibleSyncSource(
- candidateIndex, now, lastOpTimeFetched, readPreference, true /* firstAttempt */)) {
+ if (_isEligibleSyncSource(candidateIndex,
+ now,
+ previousOpTimeFetched,
+ readPreference,
+ true /* firstAttempt */)) {
LOGV2(4744901,
"Choosing new sync source because we have found another potential sync "
"source that is significantly closer than our current sync source",
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index b2675ebad17..3810c0e1010 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -277,7 +277,7 @@ public:
*/
bool shouldChangeSyncSourceDueToPingTime(const HostAndPort& currentSource,
const MemberState& memberState,
- const OpTime& lastOpTimeFetched,
+ const OpTime& previousOpTimeFetched,
Date_t now,
const ReadPreference readPreference);