summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorXueruiFa <xuerui.fa@mongodb.com>2020-06-22 18:31:41 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-07-01 16:09:42 +0000
commit5ffbd8f8322651b4953f29da0cde9e31eab039d4 (patch)
treeb81486a2abdcf60825b1b9427d964499c366d94b /src/mongo/db
parente4f92ef12f233e86be9de019f16db4f5dde47ad5 (diff)
downloadmongo-5ffbd8f8322651b4953f29da0cde9e31eab039d4.tar.gz
SERVER-47270: Only run the SyncSourceResolver if the syncing node is in rollback-via-refetch
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/bgsync.cpp13
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp36
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp17
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h12
-rw-r--r--src/mongo/db/repl/oplog_fetcher_mock.cpp4
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp106
-rw-r--r--src/mongo/db/repl/replication_process.h2
-rw-r--r--src/mongo/db/repl/sync_source_resolver.cpp28
-rw-r--r--src/mongo/db/repl/sync_source_resolver.h6
-rw-r--r--src/mongo/db/repl/sync_source_resolver_test.cpp73
10 files changed, 165 insertions, 132 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 4a6f2928365..b1bdb3eff66 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -491,12 +491,20 @@ void BackgroundSync::_produce() {
// "lastFetched" not used. Already set in _enqueueDocuments.
Status fetcherReturnStatus = Status::OK();
+ int syncSourceRBID = syncSourceResp.rbid;
+
DataReplicatorExternalStateBackgroundSync dataReplicatorExternalState(
_replCoord, _replicationCoordinatorExternalState, this);
OplogFetcher* oplogFetcher;
try {
- auto onOplogFetcherShutdownCallbackFn = [&fetcherReturnStatus](const Status& status) {
+ auto onOplogFetcherShutdownCallbackFn = [&fetcherReturnStatus,
+ &syncSourceRBID](const Status& status, int rbid) {
fetcherReturnStatus = status;
+ // If the syncSourceResp rbid is uninitialized, syncSourceRBID will be set to the
+ // rbid obtained in the oplog fetcher.
+ if (syncSourceRBID == ReplicationProcess::kUninitializedRollbackId) {
+ syncSourceRBID = rbid;
+ }
};
// The construction of OplogFetcher has to be outside bgsync mutex, because it calls
// replication coordinator.
@@ -574,8 +582,7 @@ void BackgroundSync::_produce() {
} else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing) {
auto opCtx = cc().makeOperationContext();
auto storageInterface = StorageInterface::get(opCtx.get());
- _runRollback(
- opCtx.get(), fetcherReturnStatus, source, syncSourceResp.rbid, storageInterface);
+ _runRollback(opCtx.get(), fetcherReturnStatus, source, syncSourceRBID, storageInterface);
if (bgSyncHangAfterRunRollback.shouldFail()) {
LOGV2(21095, "bgSyncHangAfterRunRollback failpoint is set");
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 08dc0b83be5..ccd3bd67b68 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -1151,24 +1151,24 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>
}
const auto& config = configResult.getValue();
- _oplogFetcher =
- _createOplogFetcherFn(_exec,
- beginFetchingOpTime,
- _syncSource,
- config,
- std::make_unique<OplogFetcherRestartDecisionInitialSyncer>(
- _sharedData.get(), _opts.oplogFetcherMaxFetcherRestarts),
- _rollbackChecker->getBaseRBID(),
- false /* requireFresherSyncSource */,
- _dataReplicatorExternalState.get(),
- [=](OplogFetcher::Documents::const_iterator first,
- OplogFetcher::Documents::const_iterator last,
- const OplogFetcher::DocumentsInfo& info) {
- return _enqueueDocuments(first, last, info);
- },
- [=](const Status& s) { _oplogFetcherCallback(s, onCompletionGuard); },
- initialSyncOplogFetcherBatchSize,
- OplogFetcher::StartingPoint::kEnqueueFirstDoc);
+ _oplogFetcher = _createOplogFetcherFn(
+ _exec,
+ beginFetchingOpTime,
+ _syncSource,
+ config,
+ std::make_unique<OplogFetcherRestartDecisionInitialSyncer>(
+ _sharedData.get(), _opts.oplogFetcherMaxFetcherRestarts),
+ _rollbackChecker->getBaseRBID(),
+ false /* requireFresherSyncSource */,
+ _dataReplicatorExternalState.get(),
+ [=](OplogFetcher::Documents::const_iterator first,
+ OplogFetcher::Documents::const_iterator last,
+ const OplogFetcher::DocumentsInfo& info) {
+ return _enqueueDocuments(first, last, info);
+ },
+ [=](const Status& s, int rbid) { _oplogFetcherCallback(s, onCompletionGuard); },
+ initialSyncOplogFetcherBatchSize,
+ OplogFetcher::StartingPoint::kEnqueueFirstDoc);
LOGV2_DEBUG(21178,
2,
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index 7c664838887..43be886e67e 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -307,8 +307,7 @@ Milliseconds OplogFetcher::_getRetriedFindMaxTime() const {
void OplogFetcher::_finishCallback(Status status) {
invariant(isActive());
-
- _onShutdownCallbackFn(status);
+ _onShutdownCallbackFn(status, _requiredRBID);
decltype(_onShutdownCallbackFn) onShutdownCallbackFn;
decltype(_oplogFetcherRestartDecision) oplogFetcherRestartDecision;
@@ -764,15 +763,19 @@ Status OplogFetcher::_checkRemoteOplogStart(const OplogFetcher::Documents& docum
int remoteRBID) {
using namespace fmt::literals;
- // Once we establish our cursor, we need to ensure that our upstream node hasn't rolled back
- // since that could cause it to not have our required minValid point. The cursor will be
- // killed if the upstream node rolls back so we don't need to keep checking once the cursor
- // is established.
- if (remoteRBID != _requiredRBID) {
+ // Once we establish our cursor, if we use rollback-via-refetch, we need to ensure that our
+ // upstream node hasn't rolled back since that could cause it to not have our required minValid
+ // point. The cursor will be killed if the upstream node rolls back so we don't need to keep
+ // checking once the cursor is established. If we do not use rollback-via-refetch, this check is
+ // not necessary, and _requiredRBID will be set to kUninitializedRollbackId in that case.
+ if (_requiredRBID != ReplicationProcess::kUninitializedRollbackId &&
+ remoteRBID != _requiredRBID) {
return Status(ErrorCodes::InvalidSyncSource,
"Upstream node rolled back after choosing it as a sync source. Choosing "
"new sync source.");
}
+ // Set _requiredRBID to remoteRBID so that it can be returned when the oplog fetcher shuts down.
+ _requiredRBID = remoteRBID;
// Sometimes our remoteLastOpApplied may be stale; if we received a document with an
// opTime later than remoteLastApplied, we can assume the remote is at least up to that
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index 0292dc5e7b0..4c3a765a780 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -41,6 +41,7 @@
#include "mongo/db/repl/abstract_async_component.h"
#include "mongo/db/repl/data_replicator_external_state.h"
#include "mongo/db/repl/repl_set_config.h"
+#include "mongo/db/repl/replication_process.h"
#include "mongo/util/fail_point.h"
namespace mongo {
@@ -84,10 +85,13 @@ public:
* The status will be Status::OK() if we have processed the last batch of operations from the
* cursor.
*
+ * rbid will be set to the rollback id of the oplog query metadata for the first batch fetched
+ * from the sync source.
+ *
* This function will be called 0 times if startup() fails and at most once after startup()
* returns success.
*/
- using OnShutdownCallbackFn = std::function<void(const Status& shutdownStatus)>;
+ using OnShutdownCallbackFn = std::function<void(const Status& shutdownStatus, int rbid)>;
/**
* Container for BSON documents extracted from cursor results.
@@ -350,8 +354,7 @@ private:
* Checks the first batch of results from query.
* 'documents' are the first batch of results returned from tailing the remote oplog.
* 'remoteLastOpApplied' is the last OpTime applied on the sync source.
- * 'remoteRBID' is a RollbackId for the sync source returned in this oplog query. This is
- * optional for compatibility with 3.4 servers that do not send OplogQueryMetadata.
+ * 'remoteRBID' is a RollbackId for the sync source returned in this oplog query.
*
* Returns TooStaleToSyncFromSource if we are too stale to sync from our source.
* Returns OplogStartMissing if we should go into rollback.
@@ -377,7 +380,8 @@ private:
// Namespace of the oplog to read.
const NamespaceString _nss = NamespaceString::kRsOplogNamespace;
- // Rollback ID that the sync source is required to have after the first batch.
+ // Rollback ID that the sync source is required to have after the first batch. If the value is
+ // uninitialized, the oplog fetcher has not contacted the sync source yet.
int _requiredRBID;
// Indicates whether the current batch is the first received via this cursor.
diff --git a/src/mongo/db/repl/oplog_fetcher_mock.cpp b/src/mongo/db/repl/oplog_fetcher_mock.cpp
index 271cf8b772a..0331fffe950 100644
--- a/src/mongo/db/repl/oplog_fetcher_mock.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_mock.cpp
@@ -61,7 +61,7 @@ OplogFetcherMock::OplogFetcherMock(
// Pass a dummy EnqueueDocumentsFn to the base OplogFetcher.
[](const auto& a1, const auto& a2, const auto& a3) { return Status::OK(); },
// Pass a dummy OnShutdownCallbackFn to the base OplogFetcher.
- [](const auto& a) {},
+ [](const auto& a, const int b) {},
batchSize,
startingPoint),
_oplogFetcherRestartDecision(std::move(oplogFetcherRestartDecision)),
@@ -210,7 +210,7 @@ void OplogFetcherMock::_finishCallback(Status status) {
invariant(isActive());
// Call _onShutdownCallbackFn outside of the mutex.
- _onShutdownCallbackFn(status);
+ _onShutdownCallbackFn(status, ReplicationProcess::kUninitializedRollbackId);
decltype(_onShutdownCallbackFn) onShutdownCallbackFn;
decltype(_oplogFetcherRestartDecision) oplogFetcherRestartDecision;
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index 3987c4a6437..61380e4c244 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -250,12 +250,18 @@ public:
Status getStatus() const;
/**
+ * Returns the rbid at shutdown.
+ */
+ int getRBID() const;
+
+ /**
* Use this for oplog fetcher shutdown callback.
*/
- void operator()(const Status& status);
+ void operator()(const Status& status, int rbid);
private:
Status _status = executor::TaskExecutorTest::getDetectableErrorStatus();
+ int _rbid = ReplicationProcess::kUninitializedRollbackId;
};
ShutdownState::ShutdownState() = default;
@@ -264,8 +270,13 @@ Status ShutdownState::getStatus() const {
return _status;
}
-void ShutdownState::operator()(const Status& status) {
+int ShutdownState::getRBID() const {
+ return _rbid;
+}
+
+void ShutdownState::operator()(const Status& status, int rbid) {
_status = status;
+ _rbid = rbid;
}
class OplogFetcherTest : public executor::ThreadPoolExecutorTest,
@@ -274,7 +285,7 @@ protected:
static const OpTime remoteNewerOpTime;
static const OpTime staleOpTime;
static const Date_t staleWallTime;
- static const int rbid = 2;
+ static const int remoteRBID = 2;
static const int primaryIndex = 2;
static const int syncSourceIndex = 2;
static const rpc::OplogQueryMetadata oqMetadata;
@@ -292,17 +303,21 @@ protected:
OplogFetcher::OnShutdownCallbackFn fn,
int numRestarts = 0,
bool requireFresherSyncSource = true,
- OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc);
+ OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc,
+ int requiredRBID = ReplicationProcess::kUninitializedRollbackId);
std::unique_ptr<OplogFetcher> getOplogFetcherAfterConnectionCreated(
OplogFetcher::OnShutdownCallbackFn fn,
int numRestarts = 0,
bool requireFresherSyncSource = true,
- OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc);
+ OplogFetcher::StartingPoint startingPoint = OplogFetcher::StartingPoint::kSkipFirstDoc,
+ int requiredRBID = ReplicationProcess::kUninitializedRollbackId);
- std::unique_ptr<ShutdownState> processSingleBatch(const Message& response,
- bool shouldShutdown = false,
- bool requireFresherSyncSource = true,
- bool lastFetchedShouldAdvance = false);
+ std::unique_ptr<ShutdownState> processSingleBatch(
+ const Message& response,
+ bool shouldShutdown = false,
+ bool requireFresherSyncSource = true,
+ bool lastFetchedShouldAdvance = false,
+ int requiredRBID = ReplicationProcess::kUninitializedRollbackId);
/**
* Tests checkSyncSource result handling.
@@ -324,15 +339,15 @@ protected:
std::unique_ptr<MockRemoteDBServer> _mockServer;
};
-const int OplogFetcherTest::rbid;
+const int OplogFetcherTest::remoteRBID;
const OpTime OplogFetcherTest::remoteNewerOpTime = OpTime(Timestamp(1000, 1), 2);
const rpc::OplogQueryMetadata OplogFetcherTest::oqMetadata = rpc::OplogQueryMetadata(
- {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, primaryIndex, syncSourceIndex);
+ {staleOpTime, staleWallTime}, remoteNewerOpTime, remoteRBID, primaryIndex, syncSourceIndex);
const OpTime OplogFetcherTest::staleOpTime = OpTime(Timestamp(1, 1), 0);
const Date_t OplogFetcherTest::staleWallTime = Date_t() + Seconds(staleOpTime.getSecs());
const rpc::OplogQueryMetadata OplogFetcherTest::staleOqMetadata = rpc::OplogQueryMetadata(
- {staleOpTime, staleWallTime}, staleOpTime, rbid, primaryIndex, syncSourceIndex);
+ {staleOpTime, staleWallTime}, staleOpTime, remoteRBID, primaryIndex, syncSourceIndex);
const rpc::ReplSetMetadata OplogFetcherTest::replSetMetadata =
rpc::ReplSetMetadata(1, OpTimeAndWallTime(), OpTime(), 1, 0, OID(), syncSourceIndex, false);
@@ -369,16 +384,17 @@ void OplogFetcherTest::setUp() {
}
std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcher() {
- return makeOplogFetcherWithDifferentExecutor(&getExecutor(), [](Status) {});
+ return makeOplogFetcherWithDifferentExecutor(&getExecutor(), [](Status, int) {});
}
std::unique_ptr<OplogFetcher> OplogFetcherTest::getOplogFetcherAfterConnectionCreated(
OplogFetcher::OnShutdownCallbackFn fn,
int numRestarts,
bool requireFresherSyncSource,
- OplogFetcher::StartingPoint startingPoint) {
+ OplogFetcher::StartingPoint startingPoint,
+ int requiredRBID) {
auto oplogFetcher = makeOplogFetcherWithDifferentExecutor(
- &getExecutor(), fn, numRestarts, requireFresherSyncSource, startingPoint);
+ &getExecutor(), fn, numRestarts, requireFresherSyncSource, startingPoint, requiredRBID);
auto waitForConnCreatedFailPoint =
globalFailPointRegistry().find("hangAfterOplogFetcherCallbackScheduled");
@@ -400,14 +416,15 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcherWithDifferentExe
OplogFetcher::OnShutdownCallbackFn fn,
int numRestarts,
bool requireFresherSyncSource,
- OplogFetcher::StartingPoint startingPoint) {
+ OplogFetcher::StartingPoint startingPoint,
+ int requiredRBID) {
auto oplogFetcher = std::make_unique<OplogFetcher>(
executor,
lastFetched,
source,
_createConfig(),
std::make_unique<OplogFetcher::OplogFetcherRestartDecisionDefault>(numRestarts),
- rbid,
+ requiredRBID,
requireFresherSyncSource,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
@@ -425,12 +442,17 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcherWithDifferentExe
std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(const Message& response,
bool shouldShutdown,
bool requireFresherSyncSource,
- bool lastFetchedShouldAdvance) {
+ bool lastFetchedShouldAdvance,
+ int requiredRBID) {
auto shutdownState = std::make_unique<ShutdownState>();
// Create an oplog fetcher with no retries.
- auto oplogFetcher = getOplogFetcherAfterConnectionCreated(
- std::ref(*shutdownState), 0, requireFresherSyncSource);
+ auto oplogFetcher =
+ getOplogFetcherAfterConnectionCreated(std::ref(*shutdownState),
+ 0,
+ requireFresherSyncSource,
+ OplogFetcher::StartingPoint::kSkipFirstDoc,
+ requiredRBID);
// Update lastFetched before it is updated by getting the next batch.
lastFetched = oplogFetcher->getLastOpTimeFetched_forTest();
@@ -515,8 +537,8 @@ TEST_F(OplogFetcherTest, OplogFetcherReturnsOperationFailedIfExecutorFailsToSche
taskExecutorMock.shouldFailScheduleWorkRequest = []() { return true; };
// The onShutdownFn should not be called because the oplog fetcher should fail during startup.
- auto oplogFetcher =
- makeOplogFetcherWithDifferentExecutor(&taskExecutorMock, [](Status) { MONGO_UNREACHABLE; });
+ auto oplogFetcher = makeOplogFetcherWithDifferentExecutor(
+ &taskExecutorMock, [](Status, int) { MONGO_UNREACHABLE; });
// Last optime fetched should match values passed to constructor.
ASSERT_EQUALS(lastFetched, oplogFetcher->getLastOpTimeFetched_forTest());
@@ -549,6 +571,7 @@ TEST_F(OplogFetcherTest, ShuttingExecutorDownAfterStartupButBeforeRunQuerySchedu
oplogFetcher->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus());
+ ASSERT_EQUALS(ReplicationProcess::kUninitializedRollbackId, shutdownState.getRBID());
}
TEST_F(OplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownBeforeRunQueryScheduled) {
@@ -571,6 +594,7 @@ TEST_F(OplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownBeforeRunQ
oplogFetcher->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus());
+ ASSERT_EQUALS(ReplicationProcess::kUninitializedRollbackId, shutdownState.getRBID());
}
TEST_F(OplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownAfterRunQueryScheduled) {
@@ -596,6 +620,7 @@ TEST_F(OplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownAfterRunQu
oplogFetcher->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus());
+ ASSERT_EQUALS(ReplicationProcess::kUninitializedRollbackId, shutdownState.getRBID());
}
TEST_F(OplogFetcherTest,
@@ -620,6 +645,7 @@ TEST_F(OplogFetcherTest,
// This is the error that the connection throws if shutdown while blocked on the network.
ASSERT_EQUALS(ErrorCodes::HostUnreachable, shutdownState.getStatus());
+ ASSERT_EQUALS(ReplicationProcess::kUninitializedRollbackId, shutdownState.getRBID());
}
TEST_F(OplogFetcherTest,
@@ -662,6 +688,7 @@ TEST_F(OplogFetcherTest,
oplogFetcher->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus());
+ ASSERT_EQUALS(ReplicationProcess::kUninitializedRollbackId, shutdownState.getRBID());
}
bool sharedCallbackStateDestroyed = false;
@@ -693,7 +720,7 @@ TEST_F(OplogFetcherTest, OplogFetcherResetsOnShutdownCallbackFnOnCompletion) {
auto status = getDetectableErrorStatus();
auto oplogFetcher = getOplogFetcherAfterConnectionCreated(
- [&callbackInvoked, sharedCallbackData, &status](const Status& shutdownStatus) {
+ [&callbackInvoked, sharedCallbackData, &status](const Status& shutdownStatus, int rbid) {
status = shutdownStatus, callbackInvoked = true;
});
@@ -861,12 +888,21 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack)
CursorId cursorId = 22LL;
auto entry = makeNoopOplogEntry(lastFetched);
- rpc::OplogQueryMetadata oplogQueryMetadata(
- {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid + 1, primaryIndex, syncSourceIndex);
+ rpc::OplogQueryMetadata oplogQueryMetadata({staleOpTime, staleWallTime},
+ remoteNewerOpTime,
+ remoteRBID + 1,
+ primaryIndex,
+ syncSourceIndex);
auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oplogQueryMetadata);
- ASSERT_EQUALS(ErrorCodes::InvalidSyncSource,
- processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj))->getStatus());
+ auto shutdownState = processSingleBatch(makeFirstBatch(cursorId, {entry}, metadataObj),
+ false /* shouldShutdown */,
+ true /* requireFresherSyncSource */,
+ false /* lastFetchedShouldAdvance */,
+ remoteRBID /* requiredRBID */);
+
+ ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, shutdownState->getStatus());
+ ASSERT_EQUALS(remoteRBID, shutdownState->getRBID());
ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
ASSERT(lastEnqueuedDocuments.empty());
@@ -889,7 +925,7 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead
auto entry = makeNoopOplogEntry(lastFetched);
rpc::OplogQueryMetadata oplogQueryMetadata(
- {staleOpTime, staleWallTime}, lastFetched, rbid, primaryIndex, syncSourceIndex);
+ {staleOpTime, staleWallTime}, lastFetched, remoteRBID, primaryIndex, syncSourceIndex);
auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oplogQueryMetadata);
ASSERT_EQUALS(ErrorCodes::InvalidSyncSource,
@@ -935,7 +971,7 @@ TEST_F(OplogFetcherTest,
MetadataAndBatchAreProcessedWhenSyncSourceIsNotAheadWithoutRequiringFresherSyncSource) {
CursorId cursorId = 0LL;
rpc::OplogQueryMetadata oplogQueryMetadata(
- {staleOpTime, staleWallTime}, lastFetched, rbid, 2, 2);
+ {staleOpTime, staleWallTime}, lastFetched, remoteRBID, 2, 2);
auto metadataObj = makeOplogBatchMetadata(replSetMetadata, oplogQueryMetadata);
auto entry = makeNoopOplogEntry(lastFetched);
@@ -1044,11 +1080,12 @@ TEST_F(OplogFetcherTest, RemoteFirstOplogEntryWithExtraFieldsReturnsOplogStartMi
<< "field");
_mockServer->insert(nss.ns(), remoteFirstOplogEntry);
+ auto shutdownState = processSingleBatch(makeFirstBatch(cursorId, {entry}, {metadataObj}));
+
// We should have parsed the OpTime correctly and realized that we have not fallen off the sync
// source's oplog, so we should go into rollback.
- ASSERT_EQUALS(
- ErrorCodes::OplogStartMissing,
- processSingleBatch(makeFirstBatch(cursorId, {entry}, {metadataObj}))->getStatus());
+ ASSERT_EQUALS(ErrorCodes::OplogStartMissing, shutdownState->getStatus());
+ ASSERT_EQUALS(remoteRBID, shutdownState->getRBID());
}
TEST_F(OplogFetcherTest, FailingInitialCreateNewCursorNoRetriesShutsDownOplogFetcher) {
@@ -1070,6 +1107,7 @@ TEST_F(OplogFetcherTest, FailingInitialCreateNewCursorWithRetriesShutsDownOplogF
oplogFetcher->join();
ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, shutdownState.getStatus());
+ ASSERT_EQUALS(ReplicationProcess::kUninitializedRollbackId, shutdownState.getRBID());
}
TEST_F(OplogFetcherTest,
@@ -1497,6 +1535,7 @@ TEST_F(OplogFetcherTest, OplogFetcherWorksWithoutExhaust) {
oplogFetcher->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus());
+ ASSERT_EQUALS(remoteRBID, shutdownState.getRBID());
}
TEST_F(OplogFetcherTest, CursorIsDeadShutsDownOplogFetcherWithSuccessfulStatus) {
@@ -1528,6 +1567,7 @@ TEST_F(OplogFetcherTest, CursorIsDeadShutsDownOplogFetcherWithSuccessfulStatus)
true /* skipFirstDoc */, firstBatch, oplogFetcher->getLastOpTimeFetched_forTest());
ASSERT_OK(shutdownState.getStatus());
+ ASSERT_EQ(remoteRBID, shutdownState.getRBID());
}
TEST_F(OplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithOplogStartMissingError) {
@@ -1797,7 +1837,7 @@ TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetc
TEST_F(OplogFetcherTest,
FailedSyncSourceCheckWithSyncSourceHavingNoSyncSourceStopsTheOplogFetcher) {
rpc::OplogQueryMetadata oplogQueryMetadata(
- {staleOpTime, staleWallTime}, remoteNewerOpTime, rbid, primaryIndex, -1);
+ {staleOpTime, staleWallTime}, remoteNewerOpTime, remoteRBID, primaryIndex, -1);
testSyncSourceChecking(replSetMetadata, oplogQueryMetadata);
// Sync source "hasSyncSource" is derived from metadata.
diff --git a/src/mongo/db/repl/replication_process.h b/src/mongo/db/repl/replication_process.h
index 82c298d363d..a6f2b019f3c 100644
--- a/src/mongo/db/repl/replication_process.h
+++ b/src/mongo/db/repl/replication_process.h
@@ -63,7 +63,7 @@ class ReplicationProcess {
ReplicationProcess& operator=(const ReplicationProcess&) = delete;
public:
- static const int kUninitializedRollbackId = -1;
+ constexpr static int kUninitializedRollbackId = -1;
// Operation Context binding.
static ReplicationProcess* get(ServiceContext* service);
diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp
index 4b4a9a178a2..ba7f06c24ea 100644
--- a/src/mongo/db/repl/sync_source_resolver.cpp
+++ b/src/mongo/db/repl/sync_source_resolver.cpp
@@ -37,7 +37,6 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/oplog_entry.h"
-#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/sync_source_selector.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -372,6 +371,13 @@ void SyncSourceResolver::_firstOplogEntryFetcherCallback(
return;
}
+ // If we should not proceed with the rollback-via-refetch checks, we can safely return the
+ // candidate with an uninitialized rbid.
+ if (_requiredOpTime.isNull()) {
+ _finishCallback(candidate, ReplicationProcess::kUninitializedRollbackId).ignore();
+ return;
+ }
+
auto status = _scheduleRBIDRequest(candidate, earliestOpTimeSeen);
if (!status.isOK()) {
_finishCallback(status).ignore();
@@ -434,18 +440,12 @@ void SyncSourceResolver::_rbidRequestCallback(
return;
}
- if (!_requiredOpTime.isNull()) {
- // Schedule fetcher to look for '_requiredOpTime' in the remote oplog.
- // Unittest requires that this kind of failure be handled specially.
- auto status =
- _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen, rbid));
- if (!status.isOK()) {
- _finishCallback(status).transitional_ignore();
- }
- return;
+ // Schedule fetcher to look for '_requiredOpTime' in the remote oplog.
+ // Unittest requires that this kind of failure be handled specially.
+ auto status = _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen, rbid));
+ if (!status.isOK()) {
+ _finishCallback(status).ignore();
}
-
- _finishCallback(candidate, rbid).ignore();
}
Status SyncSourceResolver::_compareRequiredOpTimeWithQueryResponse(
@@ -570,9 +570,7 @@ Status SyncSourceResolver::_chooseAndProbeNextSyncSource(OpTime earliestOpTimeSe
Status SyncSourceResolver::_finishCallback(HostAndPort hostAndPort, int rbid) {
SyncSourceResolverResponse response;
response.syncSourceStatus = std::move(hostAndPort);
- if (rbid != ReplicationProcess::kUninitializedRollbackId) {
- response.rbid = rbid;
- }
+ response.rbid = rbid;
return _finishCallback(response);
}
diff --git a/src/mongo/db/repl/sync_source_resolver.h b/src/mongo/db/repl/sync_source_resolver.h
index a98db3d8773..3cdeead4f65 100644
--- a/src/mongo/db/repl/sync_source_resolver.h
+++ b/src/mongo/db/repl/sync_source_resolver.h
@@ -37,6 +37,7 @@
#include "mongo/client/fetcher.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/replication_process.h"
#include "mongo/executor/task_executor.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
@@ -73,8 +74,9 @@ struct SyncSourceResolverResponse {
// Rollback ID of the selected sync source.
// The rbid is fetched before the required optime so callers can be sure that as long as the
- // rbid is the same, the required optime is still present.
- int rbid;
+ // rbid is the same, the required optime is still present. The rbid will remain set to
+ // 'kUninitializedRollbackId' if _requiredOpTime is null.
+ int rbid = ReplicationProcess::kUninitializedRollbackId;
bool isOK() {
return syncSourceStatus.isOK();
diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp
index 1e4380d2fca..2d6543c43cb 100644
--- a/src/mongo/db/repl/sync_source_resolver_test.cpp
+++ b/src/mongo/db/repl/sync_source_resolver_test.cpp
@@ -71,7 +71,7 @@ private:
};
class SyncSourceResolverTest : public executor::ThreadPoolExecutorTest {
-private:
+public:
void setUp() override;
void tearDown() override;
@@ -129,6 +129,19 @@ std::unique_ptr<SyncSourceResolver> SyncSourceResolverTest::_makeResolver(
const NamespaceString nss("local.oplog.rs");
+const OpTime requiredOpTime(Timestamp(200, 1U), 1LL);
+class SyncSourceResolverRequiredOpTimeTest : public SyncSourceResolverTest {
+public:
+ void setUp() override;
+};
+
+void SyncSourceResolverRequiredOpTimeTest::setUp() {
+ SyncSourceResolverTest::setUp();
+ // Initialize a resolver and set _requiredOpTime to null. This will prevent the sync source
+ // resolver from running the RBID and requiredOpTime checks.
+ _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
+}
+
BSONObj makeCursorResponse(CursorId cursorId,
const NamespaceString& nss,
std::vector<BSONObj> docs,
@@ -348,7 +361,6 @@ TEST_F(SyncSourceResolverTest,
_scheduleFirstOplogEntryFetcherResponse(
getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 2));
- _scheduleRBIDResponse(getNet(), candidate1);
_resolver->join();
ASSERT_FALSE(_resolver->isActive());
@@ -369,7 +381,6 @@ TEST_F(SyncSourceResolverTest,
_scheduleFirstOplogEntryFetcherResponse(
getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 2));
- _scheduleRBIDResponse(getNet(), candidate1);
_resolver->join();
ASSERT_FALSE(_resolver->isActive());
@@ -420,7 +431,6 @@ TEST_F(SyncSourceResolverTest,
_scheduleFirstOplogEntryFetcherResponse(
getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2));
- _scheduleRBIDResponse(getNet(), candidate2);
_resolver->join();
ASSERT_FALSE(_resolver->isActive());
@@ -486,7 +496,6 @@ TEST_F(SyncSourceResolverTest,
_scheduleFirstOplogEntryFetcherResponse(
getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2));
- _scheduleRBIDResponse(getNet(), candidate2);
_resolver->join();
ASSERT_FALSE(_resolver->isActive());
@@ -552,7 +561,6 @@ TEST_F(SyncSourceResolverTest,
_scheduleFirstOplogEntryFetcherResponse(
getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2));
- _scheduleRBIDResponse(getNet(), candidate2);
_resolver->join();
ASSERT_FALSE(_resolver->isActive());
@@ -577,7 +585,6 @@ TEST_F(SyncSourceResolverTest,
_scheduleFirstOplogEntryFetcherResponse(
getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2));
- _scheduleRBIDResponse(getNet(), candidate2);
_resolver->join();
ASSERT_FALSE(_resolver->isActive());
@@ -603,7 +610,6 @@ TEST_F(SyncSourceResolverTest,
_scheduleFirstOplogEntryFetcherResponse(
getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2));
- _scheduleRBIDResponse(getNet(), candidate2);
_resolver->join();
ASSERT_FALSE(_resolver->isActive());
@@ -629,7 +635,6 @@ TEST_F(SyncSourceResolverTest,
_scheduleFirstOplogEntryFetcherResponse(
getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2));
- _scheduleRBIDResponse(getNet(), candidate2);
_resolver->join();
ASSERT_FALSE(_resolver->isActive());
@@ -650,8 +655,6 @@ TEST_F(SyncSourceResolverTest, SyncSourceResolverWillSucceedWithExtraFields) {
{BSON("ts" << Timestamp(1, 1) << "t" << 1 << "note"
<< "a")});
- _scheduleRBIDResponse(getNet(), candidate1);
-
_resolver->join();
ASSERT_FALSE(_resolver->isActive());
ASSERT_EQUALS(candidate1, unittest::assertGet(_response.syncSourceStatus));
@@ -705,12 +708,8 @@ void _scheduleRequiredOpTimeFetcherResponse(executor::NetworkInterfaceMock* net,
{_makeOplogEntry(requiredOpTime.getTimestamp(), requiredOpTime.getTerm())});
}
-const OpTime requiredOpTime(Timestamp(200, 1U), 1LL);
-
-TEST_F(SyncSourceResolverTest,
+TEST_F(SyncSourceResolverRequiredOpTimeTest,
SyncSourceResolverWillCheckForRequiredOpTimeIfRequiredOpTimeIsProvided) {
- _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
-
HostAndPort candidate1("node1", 12345);
_selector->setChooseNewSyncSourceResult_forTest(candidate1);
ASSERT_OK(_resolver->startup());
@@ -731,10 +730,8 @@ TEST_F(SyncSourceResolverTest,
ASSERT_EQ(_response.rbid, 7);
}
-TEST_F(SyncSourceResolverTest,
+TEST_F(SyncSourceResolverRequiredOpTimeTest,
SyncSourceResolverRejectsRemoteOpTimeWhenCheckingRequiredOpTimeIfRemoteTermIsUninitialized) {
- _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
-
HostAndPort candidate1("node1", 12345);
HostAndPort candidate2("node2", 12345);
_selector->setChooseNewSyncSourceResult_forTest(candidate1);
@@ -770,7 +767,7 @@ TEST_F(SyncSourceResolverTest,
}
TEST_F(
- SyncSourceResolverTest,
+ SyncSourceResolverRequiredOpTimeTest,
SyncSourceResolverRejectsRemoteOpTimeWhenCheckingRequiredOpTimeIfRequiredOpTimesTermIsUninitialized) {
auto requireOpTimeWithUninitializedTerm =
OpTime(requiredOpTime.getTimestamp(), OpTime::kUninitializedTerm);
@@ -806,10 +803,8 @@ TEST_F(
ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus));
}
-TEST_F(SyncSourceResolverTest,
+TEST_F(SyncSourceResolverRequiredOpTimeTest,
SyncSourceResolverWillTryOtherSourcesIfRequiredOpTimeIsNotFoundInRemoteOplog) {
- _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
-
HostAndPort candidate1("node1", 12345);
HostAndPort candidate2("node2", 12345);
_selector->setChooseNewSyncSourceResult_forTest(candidate1);
@@ -840,10 +835,8 @@ TEST_F(SyncSourceResolverTest,
ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus));
}
-TEST_F(SyncSourceResolverTest,
+TEST_F(SyncSourceResolverRequiredOpTimeTest,
SyncSourceResolverWillTryOtherSourcesIfRequiredOpTimesTermIsNotFoundInRemoteOplog) {
- _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
-
HostAndPort candidate1("node1", 12345);
HostAndPort candidate2("node2", 12345);
_selector->setChooseNewSyncSourceResult_forTest(candidate1);
@@ -878,10 +871,8 @@ TEST_F(SyncSourceResolverTest,
ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus));
}
-TEST_F(SyncSourceResolverTest,
+TEST_F(SyncSourceResolverRequiredOpTimeTest,
SyncSourceResolverReturnsScheduleErrorWhenSchedulingRBIDCommandFails) {
- _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
-
_shouldFailRequest = [](const executor::RemoteCommandRequest& request) {
return request.cmdObj.firstElementFieldName() == "replSetGetRBID"_sd;
};
@@ -899,10 +890,8 @@ TEST_F(SyncSourceResolverTest,
ASSERT_EQUALS(ErrorCodes::OperationFailed, _response.syncSourceStatus);
}
-TEST_F(SyncSourceResolverTest,
+TEST_F(SyncSourceResolverRequiredOpTimeTest,
SyncSourceResolverReturnsScheduleErrorWhenSchedulingRequiredOpTimeFindCommandFails) {
- _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
-
_shouldFailRequest = [](const executor::RemoteCommandRequest& request) {
// Fail find commands reading the oplog with filter containing a "ts" predicate.
if (StringData{request.cmdObj.getStringField("find")} !=
@@ -934,10 +923,8 @@ TEST_F(SyncSourceResolverTest,
}
TEST_F(
- SyncSourceResolverTest,
+ SyncSourceResolverRequiredOpTimeTest,
SyncSourceResolverReturnsCallbackCanceledIfResolverIsShutdownAfterSchedulingRequiredOpTimeFetcher) {
- _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
-
HostAndPort candidate1("node1", 12345);
_selector->setChooseNewSyncSourceResult_forTest(candidate1);
ASSERT_OK(_resolver->startup());
@@ -957,10 +944,8 @@ TEST_F(
}
TEST_F(
- SyncSourceResolverTest,
+ SyncSourceResolverRequiredOpTimeTest,
SyncSourceResolverReturnsCallbackCanceledIfExecutorIsShutdownAfterSchedulingRequiredOpTimeFetcher) {
- _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
-
HostAndPort candidate1("node1", 12345);
_selector->setChooseNewSyncSourceResult_forTest(candidate1);
ASSERT_OK(_resolver->startup());
@@ -978,10 +963,8 @@ TEST_F(
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _response.syncSourceStatus);
}
-TEST_F(SyncSourceResolverTest,
+TEST_F(SyncSourceResolverRequiredOpTimeTest,
SyncSourceResolverReturnsEmptyHostIfNoViableHostExistsAfterNetworkErrorOnRBIDCommand) {
- _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
-
HostAndPort candidate1("node1", 12345);
_selector->setChooseNewSyncSourceResult_forTest(candidate1);
ASSERT_OK(_resolver->startup());
@@ -1002,10 +985,8 @@ TEST_F(SyncSourceResolverTest,
ASSERT_EQUALS(HostAndPort(), unittest::assertGet(_response.syncSourceStatus));
}
-TEST_F(SyncSourceResolverTest,
+TEST_F(SyncSourceResolverRequiredOpTimeTest,
SyncSourceResolverReturnsEmptyHostIfNoViableHostExistsAfterRBIDCommandNotOk) {
- _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
-
HostAndPort candidate1("node1", 12345);
_selector->setChooseNewSyncSourceResult_forTest(candidate1);
ASSERT_OK(_resolver->startup());
@@ -1033,10 +1014,8 @@ TEST_F(SyncSourceResolverTest,
}
TEST_F(
- SyncSourceResolverTest,
+ SyncSourceResolverRequiredOpTimeTest,
SyncSourceResolverReturnsEmptyHostIfNoViableHostExistsAfterNetworkErrorOnRequiredOpTimeCommand) {
- _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
-
HostAndPort candidate1("node1", 12345);
_selector->setChooseNewSyncSourceResult_forTest(candidate1);
ASSERT_OK(_resolver->startup());