summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Chan <jason.chan@mongodb.com>2021-02-17 20:11:31 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-24 17:49:12 +0000
commit275bc8977cfd8845cb4773d639504698cd99de90 (patch)
treeb9be23d4c09f420adceb4db8bb72ccf306813182
parent686dc9a29c3444759d538c3e0055f66dba4031d9 (diff)
downloadmongo-275bc8977cfd8845cb4773d639504698cd99de90.tar.gz
SERVER-52719 Restart TenantMigrationRecipientService future chain on donor failurer4.9.0-alpha5
-rw-r--r--jstests/replsets/tenant_migration_recipient_resumes_on_donor_failover.js127
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp443
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.h21
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service_test.cpp289
-rw-r--r--src/mongo/db/repl/tenant_migration_state_machine.idl5
5 files changed, 700 insertions, 185 deletions
diff --git a/jstests/replsets/tenant_migration_recipient_resumes_on_donor_failover.js b/jstests/replsets/tenant_migration_recipient_resumes_on_donor_failover.js
new file mode 100644
index 00000000000..587b3a11434
--- /dev/null
+++ b/jstests/replsets/tenant_migration_recipient_resumes_on_donor_failover.js
@@ -0,0 +1,127 @@
+/*
+ * Tests that the recipient will retry a migration on donor sync source failure in the following
+ * scenarios:
+ * - donor shuts down when the recipient oplog fetcher is created but cloning has yet to start
+ * - donor shuts down in the middle of the cloning phase
+ * - donor shuts down after cloning is finished but the recipient has yet to declare that the data
+ * is consistent
+ *
+ * @tags: [requires_majority_read_concern, requires_fcv_49, incompatible_with_windows_tls]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/uuid_util.js");
+load("jstests/libs/write_concern_util.js");
+load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+load('jstests/replsets/rslib.js');
+
+function runTest(failPoint) {
+ const recipientRst = new ReplSetTest({
+ nodes: 2,
+ name: jsTestName() + "_recipient",
+ // Use a batch size of 2 so that collection cloner requires more than a single batch to
+ // complete.
+ nodeOptions: Object.assign(TenantMigrationUtil.makeX509OptionsForTest().recipient,
+ {setParameter: {collectionClonerBatchSize: 2}})
+ });
+ recipientRst.startSet();
+ recipientRst.initiateWithHighElectionTimeout();
+
+ const tenantMigrationTest =
+ new TenantMigrationTest({name: jsTestName(), recipientRst, sharedOptions: {nodes: 3}});
+ if (!tenantMigrationTest.isFeatureFlagEnabled()) {
+ jsTestLog("Skipping test because the tenant migrations feature flag is disabled");
+ recipientRst.stopSet();
+ return false;
+ }
+ jsTestLog("Running test with failpoint: " + failPoint);
+ const tenantId = "testTenantId";
+ const tenantDB = tenantMigrationTest.tenantDB(tenantId, "DB");
+ const collName = "testColl";
+
+ const donorRst = tenantMigrationTest.getDonorRst();
+ const donorPrimary = tenantMigrationTest.getDonorPrimary();
+ const donorSecondary = donorRst.getSecondary();
+
+ const recipientPrimary = tenantMigrationTest.getRecipientPrimary();
+ const recipientDb = recipientPrimary.getDB(tenantDB);
+ let recipientColl = recipientDb.getCollection(collName);
+
+ tenantMigrationTest.insertDonorDB(tenantDB, collName);
+
+ let waitInFailPoint;
+ if (failPoint === 'tenantMigrationHangCollectionClonerAfterHandlingBatchResponse') {
+ waitInFailPoint =
+ configureFailPoint(recipientPrimary, failPoint, {nss: recipientColl.getFullName()});
+ } else {
+ waitInFailPoint = configureFailPoint(recipientPrimary, failPoint, {action: "hang"});
+ }
+
+ const migrationUuid = UUID();
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(migrationUuid),
+ tenantId,
+ readPreference: {mode: 'primary'}
+ };
+
+ jsTestLog("Starting the tenant migration to wait in failpoint: " + failPoint);
+ assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
+ waitInFailPoint.wait();
+ let res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"});
+ let currOp = res.inprog[0];
+ // We should start the migration syncing from the primary.
+ assert.eq(donorPrimary.host,
+ currOp.donorSyncSource,
+ `the recipient should start with 'donorPrimary' as the sync source`);
+ let configRecipientNs = recipientPrimary.getCollection(TenantMigrationTest.kConfigRecipientsNS);
+ let recipientDoc = configRecipientNs.find({"_id": migrationUuid}).toArray();
+ assert.eq(recipientDoc[0].state, "started", recipientDoc[0]);
+ assert.eq(recipientDoc[0].numRestartsDueToDonorConnectionFailure, 0, recipientDoc[0]);
+
+ jsTestLog("Stopping the donor primary");
+ donorRst.stop(donorPrimary);
+ waitInFailPoint.off();
+ assert.soon(() => {
+ // We expect that the recipient is retrying the migration as the donor has shutdown. We will
+ // fail trying to find a sync source until a new donor primary is discovered as we will
+ // honor the original read preference.
+ let recipientDoc = configRecipientNs.find({"_id": migrationUuid}).toArray();
+ jsTestLog("recipientDoc:" + tojson(recipientDoc));
+ return recipientDoc[0].numRestartsDueToDonorConnectionFailure == 1;
+ });
+
+ let hangOnRetry = configureFailPoint(recipientPrimary,
+ 'fpAfterStartingOplogFetcherMigrationRecipientInstance',
+ {action: "hang"});
+ // Step up a new donor primary.
+ assert.commandWorked(donorSecondary.adminCommand({replSetStepUp: 1}));
+ hangOnRetry.wait();
+ res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"});
+ currOp = res.inprog[0];
+ // The recipient should resume the migration against the new donor primary.
+ assert.eq(donorSecondary.host, currOp.donorSyncSource, currOp);
+ hangOnRetry.off();
+
+ assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
+ // Remove 'donorPrimary' so that the test can complete properly.
+ donorRst.remove(donorPrimary);
+ recipientRst.stopSet();
+ tenantMigrationTest.stop();
+ return true;
+}
+
+// Test case where donor is shutdown after the recipient has started the oplog fetcher but not the
+// cloner.
+let testEnabled = runTest('fpAfterStartingOplogFetcherMigrationRecipientInstance');
+if (testEnabled) {
+ // Test case where donor is shutdown in the middle of the cloning phase.
+ runTest('tenantMigrationHangCollectionClonerAfterHandlingBatchResponse');
+ // Test case where donor is shutdown after cloning has finished but before the donor is notified
+ // that the recipient is in the consistent state.
+ runTest('fpAfterStartingOplogApplierMigrationRecipientInstance');
+}
+})();
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 297ee928c6f..ab648a0526d 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -481,7 +481,6 @@ TenantMigrationRecipientService::Instance::_createAndConnectClients() {
}
const auto kDelayedMajorityOpTimeErrorCode = 5272000;
-
return AsyncTry([this,
self = shared_from_this(),
getHostCancelSource,
@@ -490,7 +489,6 @@ TenantMigrationRecipientService::Instance::_createAndConnectClients() {
// Get all donor hosts that we have excluded.
const auto& excludedHosts = _getExcludedDonorHosts(lk);
-
return _donorReplicaSetMonitor
->getHostOrRefresh(_readPreference, excludedHosts, getHostCancelSource.token())
.thenRunOn(**_scopedExecutor)
@@ -784,14 +782,18 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() {
options.dropCollectionAtStartup = false;
options.dropCollectionAtShutdown = false;
options.useTemporaryCollection = false;
- // Create the oplog buffer outside the mutex to avoid deadlock on a concurrent stepdown.
+
auto oplogBufferNS = getOplogBufferNs(getMigrationUUID());
- auto bufferCollection = std::make_unique<OplogBufferCollection>(
- StorageInterface::get(opCtx.get()), oplogBufferNS, options);
+ if (!_donorOplogBuffer) {
+ // Create the oplog buffer outside the mutex to avoid deadlock on a concurrent stepdown.
+ auto bufferCollection = std::make_unique<OplogBufferCollection>(
+ StorageInterface::get(opCtx.get()), oplogBufferNS, options);
+ stdx::lock_guard lk(_mutex);
+ _donorOplogBuffer = std::move(bufferCollection);
+ }
+
stdx::unique_lock lk(_mutex);
invariant(_stateDoc.getStartFetchingDonorOpTime());
- _donorOplogBuffer = std::move(bufferCollection);
-
{
// Ensure we are primary when trying to startup and create the oplog buffer collection.
auto coordinator = repl::ReplicationCoordinator::get(opCtx.get());
@@ -926,6 +928,7 @@ void TenantMigrationRecipientService::Instance::_oplogFetcherCallback(Status opl
"error"_attr = oplogFetcherStatus);
_interrupt(oplogFetcherStatus, /*skipWaitingForForgetMigration=*/false);
}
+ _oplogFetcherStatus = oplogFetcherStatus;
}
void TenantMigrationRecipientService::Instance::_stopOrHangOnFailPoint(FailPoint* fp) {
@@ -1250,7 +1253,8 @@ void TenantMigrationRecipientService::Instance::_interrupt(Status status,
}
}
- _taskState.setState(TaskState::kInterrupted, status);
+ _taskState.setState(
+ TaskState::kInterrupted, status, skipWaitingForForgetMigration /* isExternalInterrupt */);
}
void TenantMigrationRecipientService::Instance::interrupt(Status status) {
@@ -1352,7 +1356,6 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_updateStateDocForMa
void TenantMigrationRecipientService::Instance::_fetchAndStoreDonorClusterTimeKeyDocs(
const CancelationToken& token) {
std::vector<ExternalKeysCollectionDocument> keyDocs;
-
auto cursor =
_client->query(NamespaceString::kKeysCollectionNamespace,
Query().readPref(_readPreference.pref, _readPreference.tags.getTagBSON()));
@@ -1411,179 +1414,257 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
"readPreference"_attr = _readPreference);
pauseBeforeRunTenantMigrationRecipientInstance.pauseWhileSet();
+ // The 'AsyncTry' is run on the cleanup executor as we rely on the 'CancelationToken' to signal
+ // when work should be canceled rather than letting the scoped executor be destroyed on
+ // shutdown/stepdown.
+ return AsyncTry([this, self = shared_from_this(), executor, token] {
+ return ExecutorFuture(**executor)
+ .then([this, self = shared_from_this()] {
+ stdx::lock_guard lk(_mutex);
+ // Instance task can be started only once for the current term on a primary.
+ invariant(!_taskState.isDone());
+ // If the task state is interrupted, then don't start the task.
+ if (_taskState.isInterrupted()) {
+ uassertStatusOK(_taskState.getInterruptStatus());
+ }
- return ExecutorFuture(**executor)
- .then([this, self = shared_from_this()] {
- stdx::lock_guard lk(_mutex);
- // Instance task can be started only once for the current term on a primary.
- invariant(!_taskState.isDone());
- // If the task state is interrupted, then don't start the task.
- if (_taskState.isInterrupted()) {
- uassertStatusOK(_taskState.getInterruptStatus());
- }
-
- _taskState.setState(TaskState::kRunning);
-
- pauseAfterRunTenantMigrationRecipientInstance.pauseWhileSet();
-
- return _initializeStateDoc(lk);
- })
- .then([this, self = shared_from_this()] {
- _stateDocPersistedPromise.emplaceValue();
- uassert(ErrorCodes::TenantMigrationForgotten,
- str::stream() << "Migration " << getMigrationUUID()
- << " already marked for garbage collect",
- !_stateDoc.getExpireAt());
- _stopOrHangOnFailPoint(&fpAfterPersistingTenantMigrationRecipientInstanceStateDoc);
- return _createAndConnectClients();
- })
- .then([this, self = shared_from_this()](ConnectionPair ConnectionPair) {
- stdx::lock_guard lk(_mutex);
- if (_taskState.isInterrupted()) {
- uassertStatusOK(_taskState.getInterruptStatus());
- }
-
- // interrupt() called after this code block will interrupt the cloner and fetcher.
- _client = std::move(ConnectionPair.first);
- _oplogFetcherClient = std::move(ConnectionPair.second);
+ // The task state will already have been set to 'kRunning' if we restarted
+ // the future chain on donor failover.
+ if (!_taskState.isRunning()) {
+ _taskState.setState(TaskState::kRunning);
+ }
+ pauseAfterRunTenantMigrationRecipientInstance.pauseWhileSet();
+ return _initializeStateDoc(lk);
+ })
+ .then([this, self = shared_from_this()] {
+ if (_stateDocPersistedPromise.getFuture().isReady()) {
+ // This is a retry of the future chain.
+ auto opCtx = cc().makeOperationContext();
+ TenantMigrationRecipientDocument stateDoc;
+ {
+ stdx::lock_guard lk(_mutex);
+ _stateDoc.setNumRestartsDueToDonorConnectionFailure(
+ _stateDoc.getNumRestartsDueToDonorConnectionFailure() + 1);
+ stateDoc = _stateDoc;
+ }
+ uassertStatusOK(tenantMigrationRecipientEntryHelpers::updateStateDoc(
+ opCtx.get(), _stateDoc));
+ } else {
+ // Avoid fulfilling the promise twice on restart of the future chain.
+ _stateDocPersistedPromise.emplaceValue();
+ }
+ uassert(ErrorCodes::TenantMigrationForgotten,
+ str::stream() << "Migration " << getMigrationUUID()
+ << " already marked for garbage collect",
+ !_stateDoc.getExpireAt());
+ _stopOrHangOnFailPoint(
+ &fpAfterPersistingTenantMigrationRecipientInstanceStateDoc);
+ return _createAndConnectClients();
+ })
+ .then([this, self = shared_from_this()](ConnectionPair ConnectionPair) {
+ stdx::lock_guard lk(_mutex);
+ if (_taskState.isInterrupted()) {
+ uassertStatusOK(_taskState.getInterruptStatus());
+ }
- // Create the writer pool and shared data.
- _writerPool = makeTenantMigrationWriterPool();
- _sharedData = std::make_unique<TenantMigrationSharedData>(
- getGlobalServiceContext()->getFastClockSource(),
- getMigrationUUID(),
- _stateDoc.getStartFetchingDonorOpTime().has_value());
- })
- .then([this, self = shared_from_this(), token] {
- _fetchAndStoreDonorClusterTimeKeyDocs(token);
- })
- .then([this, self = shared_from_this()] {
- _stopOrHangOnFailPoint(&fpAfterConnectingTenantMigrationRecipientInstance);
- stdx::lock_guard lk(_mutex);
+ // interrupt() called after this code block will interrupt the cloner and
+ // fetcher.
+ _client = std::move(ConnectionPair.first);
+ _oplogFetcherClient = std::move(ConnectionPair.second);
- // Record the FCV at the start of a migration and check for changes in every subsequent
- // attempt. Fail if there is any mismatch in FCV or upgrade/downgrade state.
- // (Generic FCV reference): This FCV check should exist across LTS binary versions.
- auto currentFCV = serverGlobalParams.featureCompatibility.getVersion();
- auto startingFCV = _stateDoc.getRecipientPrimaryStartingFCV();
+ if (!_writerPool) {
+ // Create the writer pool and shared data.
+ _writerPool = makeTenantMigrationWriterPool();
+ }
+ _sharedData = std::make_unique<TenantMigrationSharedData>(
+ getGlobalServiceContext()->getFastClockSource(),
+ getMigrationUUID(),
+ _stateDoc.getStartFetchingDonorOpTime().has_value());
+ })
+ .then([this, self = shared_from_this(), token] {
+ _fetchAndStoreDonorClusterTimeKeyDocs(token);
+ })
+ .then([this, self = shared_from_this()] {
+ _stopOrHangOnFailPoint(&fpAfterConnectingTenantMigrationRecipientInstance);
+ stdx::lock_guard lk(_mutex);
+
+ // Record the FCV at the start of a migration and check for changes in every
+ // subsequent attempt. Fail if there is any mismatch in FCV or
+ // upgrade/downgrade state. (Generic FCV reference): This FCV check should
+ // exist across LTS binary versions.
+ auto currentFCV = serverGlobalParams.featureCompatibility.getVersion();
+ auto startingFCV = _stateDoc.getRecipientPrimaryStartingFCV();
+
+ if (!startingFCV) {
+ _stateDoc.setRecipientPrimaryStartingFCV(currentFCV);
+ return _updateStateDocForMajority(lk);
+ }
- if (!startingFCV) {
- _stateDoc.setRecipientPrimaryStartingFCV(currentFCV);
- return _updateStateDocForMajority(lk);
- }
+ if (startingFCV != currentFCV) {
+ LOGV2_ERROR(5356200,
+ "FCV may not change during migration",
+ "tenantId"_attr = getTenantId(),
+ "migrationId"_attr = getMigrationUUID(),
+ "startingFCV"_attr = startingFCV,
+ "currentFCV"_attr = currentFCV);
+ uasserted(5356201, "Detected FCV change from last migration attempt.");
+ }
- if (startingFCV != currentFCV) {
- LOGV2_ERROR(5356200,
- "FCV may not change during migration",
- "tenantId"_attr = getTenantId(),
- "migrationId"_attr = getMigrationUUID(),
- "startingFCV"_attr = startingFCV,
- "currentFCV"_attr = currentFCV);
- uasserted(5356201, "Detected FCV change from last migration attempt.");
- }
+ return SemiFuture<void>::makeReady();
+ })
+ .then([this, self = shared_from_this()] {
+ _stopOrHangOnFailPoint(&fpAfterRecordingRecipientPrimaryStartingFCV);
+ _compareRecipientAndDonorFCV();
+ })
+ .then([this, self = shared_from_this()] {
+ _stopOrHangOnFailPoint(&fpAfterComparingRecipientAndDonorFCV);
+ stdx::lock_guard lk(_mutex);
+ _getStartOpTimesFromDonor(lk);
+ return _updateStateDocForMajority(lk);
+ })
+ .then([this, self = shared_from_this()] {
+ _fetchRetryableWritesOplogBeforeStartOpTime();
+ })
+ .then([this, self = shared_from_this()] {
+ _stopOrHangOnFailPoint(
+ &fpAfterRetrievingStartOpTimesMigrationRecipientInstance);
+ _startOplogFetcher();
+ })
+ .then([this, self = shared_from_this()] {
+ _stopOrHangOnFailPoint(
+ &fpAfterStartingOplogFetcherMigrationRecipientInstance);
+
+ stdx::unique_lock lk(_mutex);
- return SemiFuture<void>::makeReady();
- })
- .then([this, self = shared_from_this()] {
- _stopOrHangOnFailPoint(&fpAfterRecordingRecipientPrimaryStartingFCV);
- _compareRecipientAndDonorFCV();
- })
- .then([this, self = shared_from_this()] {
- _stopOrHangOnFailPoint(&fpAfterComparingRecipientAndDonorFCV);
- stdx::lock_guard lk(_mutex);
- _getStartOpTimesFromDonor(lk);
- return _updateStateDocForMajority(lk);
- })
- .then([this, self = shared_from_this()] { _fetchRetryableWritesOplogBeforeStartOpTime(); })
- .then([this, self = shared_from_this()] {
- _stopOrHangOnFailPoint(&fpAfterRetrievingStartOpTimesMigrationRecipientInstance);
- _startOplogFetcher();
- })
- .then([this, self = shared_from_this()] {
- _stopOrHangOnFailPoint(&fpAfterStartingOplogFetcherMigrationRecipientInstance);
+ {
+ // Throwing error when cloner is canceled externally via interrupt(),
+ // makes the instance to skip the remaining task (i.e., starting oplog
+ // applier) in the sync process. This step is necessary to prevent race
+ // between interrupt() and starting oplog applier for the failover
+ // scenarios where we don't start the cloner if the tenant data is
+ // already in consistent state.
+ stdx::lock_guard<TenantMigrationSharedData> sharedDatalk(*_sharedData);
+ uassertStatusOK(_sharedData->getStatus(sharedDatalk));
+ }
+ // Create the oplog applier but do not start it yet.
+ invariant(_stateDoc.getStartApplyingDonorOpTime());
+
+ OpTime beginApplyingAfterOpTime;
+ bool isResuming = false;
+ if (_isCloneCompletedMarkerSet(lk)) {
+ const auto startApplyingDonorOpTime =
+ *_stateDoc.getStartApplyingDonorOpTime();
+ const auto cloneFinishedRecipientOptime =
+ *_stateDoc.getCloneFinishedRecipientOpTime();
+ lk.unlock();
+ // We avoid holding the mutex while scanning the local oplog which
+ // acquires the RSTL in IX mode. This is to allow us to be interruptable
+ // via a concurrent stepDown which acquires the RSTL in X mode.
+ beginApplyingAfterOpTime = _getOplogResumeApplyingDonorOptime(
+ startApplyingDonorOpTime, cloneFinishedRecipientOptime);
+ isResuming = beginApplyingAfterOpTime > startApplyingDonorOpTime;
+ lk.lock();
+ } else {
+ beginApplyingAfterOpTime = *_stateDoc.getStartApplyingDonorOpTime();
+ }
+ LOGV2_DEBUG(4881202,
+ 1,
+ "Recipient migration service creating oplog applier",
+ "tenantId"_attr = getTenantId(),
+ "migrationId"_attr = getMigrationUUID(),
+ "startApplyingDonorOpTime"_attr = beginApplyingAfterOpTime);
+ _tenantOplogApplier =
+ std::make_shared<TenantOplogApplier>(_migrationUuid,
+ _tenantId,
+ beginApplyingAfterOpTime,
+ _donorOplogBuffer.get(),
+ **_scopedExecutor,
+ _writerPool.get(),
+ isResuming);
+
+ // Start the cloner.
+ auto clonerFuture = _startTenantAllDatabaseCloner(lk);
+
+ // Signal that the data sync has started successfully.
+ if (!_dataSyncStartedPromise.getFuture().isReady()) {
+ _dataSyncStartedPromise.emplaceValue();
+ }
+ return clonerFuture;
+ })
+ .then([this, self = shared_from_this()] { return _onCloneSuccess(); })
+ .then([this, self = shared_from_this()] {
+ _fetchCommittedTransactionsBeforeStartOpTime();
+ })
+ .then([this, self = shared_from_this()] {
+ _stopOrHangOnFailPoint(&fpAfterCollectionClonerDone);
+ LOGV2_DEBUG(4881200,
+ 1,
+ "Recipient migration service starting oplog applier",
+ "tenantId"_attr = getTenantId(),
+ "migrationId"_attr = getMigrationUUID());
+ {
+ stdx::lock_guard lk(_mutex);
+ uassertStatusOK(_tenantOplogApplier->startup());
+ }
+ _stopOrHangOnFailPoint(
+ &fpAfterStartingOplogApplierMigrationRecipientInstance);
+ return _getDataConsistentFuture();
+ })
+ .then([this, self = shared_from_this()] {
+ _stopOrHangOnFailPoint(&fpBeforeFulfillingDataConsistentPromise);
+ stdx::lock_guard lk(_mutex);
+ LOGV2_DEBUG(4881101,
+ 1,
+ "Tenant migration recipient instance is in consistent state",
+ "migrationId"_attr = getMigrationUUID(),
+ "tenantId"_attr = getTenantId(),
+ "donorConsistentOpTime"_attr =
+ _stateDoc.getDataConsistentStopDonorOpTime());
+
+ if (!_dataConsistentPromise.getFuture().isReady()) {
+ _dataConsistentPromise.emplaceValue(
+ _stateDoc.getDataConsistentStopDonorOpTime().get());
+ }
+ });
+ })
+ .until([this, self = shared_from_this()](Status status) {
stdx::unique_lock lk(_mutex);
-
- {
- // Throwing error when cloner is canceled externally via interrupt(), makes the
- // instance to skip the remaining task (i.e., starting oplog applier) in the
- // sync process. This step is necessary to prevent race between interrupt()
- // and starting oplog applier for the failover scenarios where we don't start
- // the cloner if the tenant data is already in consistent state.
- stdx::lock_guard<TenantMigrationSharedData> sharedDatalk(*_sharedData);
- uassertStatusOK(_sharedData->getStatus(sharedDatalk));
+ if (_taskState.isInterrupted()) {
+ status = _taskState.getInterruptStatus();
}
-
- // Create the oplog applier but do not start it yet.
- invariant(_stateDoc.getStartApplyingDonorOpTime());
-
- OpTime beginApplyingAfterOpTime;
- bool isResuming = false;
- if (_isCloneCompletedMarkerSet(lk)) {
- const auto startApplyingDonorOpTime = *_stateDoc.getStartApplyingDonorOpTime();
- const auto cloneFinishedRecipientOptime =
- *_stateDoc.getCloneFinishedRecipientOpTime();
+ if (ErrorCodes::isRetriableError(status) && !_taskState.isExternalInterrupt() &&
+ _stateDocPersistedPromise.getFuture().isReady()) {
+ // Reset the task state and clear the interrupt status.
+ if (!_taskState.isRunning()) {
+ _taskState.setState(TaskState::kRunning);
+ }
+ _taskState.clearInterruptStatus();
+ // Clean up the async components before retrying the future chain.
+ _oplogFetcherStatus = boost::none;
+ std::unique_ptr<OplogFetcher> savedDonorOplogFetcher;
+ std::shared_ptr<TenantOplogApplier> savedTenantOplogApplier;
+
+ _cancelRemainingWork(lk);
+ shutdownTarget(lk, _donorOplogFetcher);
+
+ // Swap the oplog fetcher and applier to join() outside of '_mutex'.
+ using std::swap;
+ swap(savedDonorOplogFetcher, _donorOplogFetcher);
+ swap(savedTenantOplogApplier, _tenantOplogApplier);
lk.unlock();
- // We avoid holding the mutex while scanning the local oplog which acquires the RSTL
- // in IX mode. This is to allow us to be interruptable via a concurrent stepDown
- // which acquires the RSTL in X mode.
- beginApplyingAfterOpTime = _getOplogResumeApplyingDonorOptime(
- startApplyingDonorOpTime, cloneFinishedRecipientOptime);
- isResuming = beginApplyingAfterOpTime > startApplyingDonorOpTime;
- lk.lock();
- } else {
- beginApplyingAfterOpTime = *_stateDoc.getStartApplyingDonorOpTime();
- }
- LOGV2_DEBUG(4881202,
- 1,
- "Recipient migration service creating oplog applier",
- "tenantId"_attr = getTenantId(),
- "migrationId"_attr = getMigrationUUID(),
- "startApplyingDonorOpTime"_attr = beginApplyingAfterOpTime);
- _tenantOplogApplier = std::make_shared<TenantOplogApplier>(_migrationUuid,
- _tenantId,
- beginApplyingAfterOpTime,
- _donorOplogBuffer.get(),
- **_scopedExecutor,
- _writerPool.get(),
- isResuming);
-
- // Start the cloner.
- auto clonerFuture = _startTenantAllDatabaseCloner(lk);
-
- // Signal that the data sync has started successfully.
- _dataSyncStartedPromise.emplaceValue();
- return clonerFuture;
- })
- .then([this, self = shared_from_this()] { return _onCloneSuccess(); })
- .then([this, self = shared_from_this()] { _fetchCommittedTransactionsBeforeStartOpTime(); })
- .then([this, self = shared_from_this()] {
- _stopOrHangOnFailPoint(&fpAfterCollectionClonerDone);
- LOGV2_DEBUG(4881200,
- 1,
- "Recipient migration service starting oplog applier",
- "tenantId"_attr = getTenantId(),
- "migrationId"_attr = getMigrationUUID());
- {
- stdx::lock_guard lk(_mutex);
- uassertStatusOK(_tenantOplogApplier->startup());
- }
- _stopOrHangOnFailPoint(&fpAfterStartingOplogApplierMigrationRecipientInstance);
- return _getDataConsistentFuture();
- })
- .then([this, self = shared_from_this()] {
- _stopOrHangOnFailPoint(&fpBeforeFulfillingDataConsistentPromise);
- stdx::lock_guard lk(_mutex);
- LOGV2_DEBUG(4881101,
- 1,
- "Tenant migration recipient instance is in consistent state",
- "migrationId"_attr = getMigrationUUID(),
- "tenantId"_attr = getTenantId(),
- "donorConsistentOpTime"_attr =
- _stateDoc.getDataConsistentStopDonorOpTime());
- _dataConsistentPromise.emplaceValue(_stateDoc.getDataConsistentStopDonorOpTime().get());
+ // Perform join outside the lock to avoid deadlocks.
+ joinTarget(savedDonorOplogFetcher);
+ joinTarget(savedTenantOplogApplier);
+ return false;
+ }
+ return true;
})
+ .withBackoffBetweenIterations(kExponentialBackoff)
+ .on(_recipientService->getInstanceCleanupExecutor(), token)
+ .semi()
+ .thenRunOn(**_scopedExecutor)
.then([this, self = shared_from_this()] {
_stopOrHangOnFailPoint(&fpAfterDataConsistentMigrationRecipientInstance);
stdx::lock_guard lk(_mutex);
@@ -1595,8 +1676,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
.thenRunOn(_recipientService->getInstanceCleanupExecutor())
.onCompletion([this, self = shared_from_this()](
StatusOrStatusWith<TenantOplogApplier::OpTimePair> applierStatus) {
- // On shutDown/stepDown, the _scopedExecutor may have already been shut down. So we need
- // to schedule the clean up work on the parent executor.
+ // On shutDown/stepDown, the _scopedExecutor may have already been shut down. So we
+ // need to schedule the clean up work on the parent executor.
// We don't need the final optime from the oplog applier. The data sync does not
// normally stop by itself on success. It completes only on errors or on external
@@ -1646,8 +1727,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
})
.thenRunOn(**_scopedExecutor)
.then([this, self = shared_from_this()] {
- // Schedule on the _scopedExecutor to make sure we are still the primary when waiting
- // for the recipientForgetMigration command.
+ // Schedule on the _scopedExecutor to make sure we are still the primary when
+ // waiting for the recipientForgetMigration command.
return _receivedRecipientForgetMigrationPromise.getFuture();
})
.then([this, self = shared_from_this()] { return _markStateDocAsGarbageCollectable(); })
@@ -1656,22 +1737,22 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
auto opCtx = cc().makeOperationContext();
auto storageInterface = StorageInterface::get(opCtx.get());
- // The oplog buffer collection can be safely dropped at this point. In case it doesn't
- // exist, dropping will be a no-op.
- // It isn't necessary that the drop is majority-committed. A new primary will attempt
- // to drop the collection anyway.
+ // The oplog buffer collection can be safely dropped at this point. In case it
+ // doesn't exist, dropping will be a no-op. It isn't necessary that the drop is
+ // majority-committed. A new primary will attempt to drop the collection anyway.
return storageInterface->dropCollection(opCtx.get(),
getOplogBufferNs(getMigrationUUID()));
})
.thenRunOn(_recipientService->getInstanceCleanupExecutor())
.onCompletion([this, self = shared_from_this()](Status status) {
- // Schedule on the parent executor to mark the completion of the whole chain so this is
- // safe even on shutDown/stepDown.
+ // Schedule on the parent executor to mark the completion of the whole chain so this
+ // is safe even on shutDown/stepDown.
stdx::lock_guard lk(_mutex);
invariant(_dataSyncCompletionPromise.getFuture().isReady());
if (status.isOK()) {
LOGV2(4881401,
- "Migration marked to be garbage collectable due to recipientForgetMigration "
+ "Migration marked to be garbage collectable due to "
+ "recipientForgetMigration "
"command",
"migrationId"_attr = getMigrationUUID(),
"tenantId"_attr = getTenantId(),
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h
index 504fffb0596..fde90d1327a 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.h
@@ -209,14 +209,16 @@ public:
case kRunning:
return newState == kInterrupted || newState == kDone;
case kInterrupted:
- return newState == kDone;
+ return newState == kDone || newState == kRunning;
case kDone:
return false;
}
MONGO_UNREACHABLE;
}
- void setState(StateFlag state, boost::optional<Status> interruptStatus = boost::none) {
+ void setState(StateFlag state,
+ boost::optional<Status> interruptStatus = boost::none,
+ bool isExternalInterrupt = false) {
invariant(checkIfValidTransition(state),
str::stream() << "current state: " << toString(_state)
<< ", new state: " << toString(state));
@@ -230,6 +232,16 @@ public:
_state = state;
_interruptStatus = (interruptStatus) ? interruptStatus.get() : _interruptStatus;
+ _isExternalInterrupt = isExternalInterrupt;
+ }
+
+ void clearInterruptStatus() {
+ _interruptStatus = Status{ErrorCodes::InternalError, "Uninitialized value"};
+ _isExternalInterrupt = false;
+ }
+
+ bool isExternalInterrupt() const {
+ return (_state == kInterrupted) && _isExternalInterrupt;
}
bool isNotStarted() const {
@@ -276,6 +288,9 @@ public:
// task interrupt status. Set to Status::OK() only when the recipient service has not
// been interrupted so far, and is used to remember the initial interrupt error.
Status _interruptStatus = Status::OK();
+ // Indicates if the task was interrupted externally due to a 'recipientForgetMigration'
+ // or stepdown/shutdown.
+ bool _isExternalInterrupt = false;
};
/*
@@ -495,6 +510,8 @@ public:
std::unique_ptr<TenantMigrationSharedData> _sharedData; // (S)
// Indicates whether the main task future continuation chain state kicked off by run().
TaskState _taskState; // (M)
+ // Used to indicate whether the migration is able to be retried on fetcher error.
+ boost::optional<Status> _oplogFetcherStatus; // (M)
// Promise that is resolved when the state document is initialized and persisted.
SharedPromise<void> _stateDocPersistedPromise; // (W)
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
index ab6829fb324..e060a53ac9e 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
@@ -461,8 +461,7 @@ TEST_F(TenantMigrationRecipientServiceTest, BasicTenantMigrationRecipientService
ASSERT_OK(instance->getCompletionFuture().getNoThrow());
}
-
-TEST_F(TenantMigrationRecipientServiceTest, InstanceReportsErrorOnFailureWhilePersisitingStateDoc) {
+TEST_F(TenantMigrationRecipientServiceTest, InstanceReportsErrorOnFailureWhilePersistingStateDoc) {
stopFailPointEnableBlock fp("failWhilePersistingTenantMigrationRecipientInstanceStateDoc");
const UUID migrationUUID = UUID::gen();
@@ -2988,6 +2987,292 @@ TEST_F(TenantMigrationRecipientServiceTest, WaitUntilTimestampIsMajorityCommitte
ASSERT_EQ(oplogBSON["o"]["msg"].String(), "Noop write for recipientSyncData");
}
+TEST_F(TenantMigrationRecipientServiceTest, RecipientReceivesRetriableFetcherError) {
+ stopFailPointEnableBlock stopFp("fpAfterCollectionClonerDone");
+ auto fp =
+ globalFailPointRegistry().find("fpAfterStartingOplogFetcherMigrationRecipientInstance");
+ auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn,
+ 0,
+ BSON("action"
+ << "hang"));
+
+ const UUID migrationUUID = UUID::gen();
+ const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, topOfOplogOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ kDefaultStartMigrationTimestamp,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+ initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload);
+
+ // Create and start the instance.
+ auto opCtx = makeOperationContext();
+ std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
+ {
+ FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance");
+ // Create and start the instance.
+ instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
+ }
+
+ fp->waitForTimesEntered(initialTimesEntered + 1);
+ auto oplogFetcher = checked_cast<OplogFetcherMock*>(getDonorOplogFetcher(instance.get()));
+ ASSERT_TRUE(oplogFetcher != nullptr);
+ ASSERT_TRUE(oplogFetcher->isActive());
+
+ auto doc = getStateDoc(instance.get());
+ ASSERT_EQ(doc.getNumRestartsDueToDonorConnectionFailure(), 0);
+ // Kill the oplog fetcher with a retriable error and wait for the migration to retry.
+ const auto retriableErrorCode = ErrorCodes::SocketException;
+ ASSERT_TRUE(ErrorCodes::isRetriableError(retriableErrorCode));
+ oplogFetcher->shutdownWith({retriableErrorCode, "Injected retriable error"});
+
+ // Skip the cloners in this test, so we provide an empty list of databases.
+ MockRemoteDBServer* const _donorServer =
+ mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary());
+ _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({}));
+ _donorServer->setCommandReply("find", makeFindResponse());
+
+ fp->setMode(FailPoint::off);
+ // Wait for task completion.
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+
+ doc = getStateDoc(instance.get());
+ ASSERT_EQ(doc.getNumRestartsDueToDonorConnectionFailure(), 1);
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, RecipientReceivesNonRetriableFetcherError) {
+ auto fp =
+ globalFailPointRegistry().find("fpAfterStartingOplogFetcherMigrationRecipientInstance");
+ auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn,
+ 0,
+ BSON("action"
+ << "hang"));
+
+ const UUID migrationUUID = UUID::gen();
+ const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, topOfOplogOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ kDefaultStartMigrationTimestamp,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+ initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload);
+
+ // Create and start the instance.
+ auto opCtx = makeOperationContext();
+ std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
+ {
+ FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance");
+ // Create and start the instance.
+ instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
+ }
+
+ fp->waitForTimesEntered(initialTimesEntered + 1);
+ auto oplogFetcher = checked_cast<OplogFetcherMock*>(getDonorOplogFetcher(instance.get()));
+ ASSERT_TRUE(oplogFetcher != nullptr);
+ ASSERT_TRUE(oplogFetcher->isActive());
+
+ auto doc = getStateDoc(instance.get());
+ ASSERT_EQ(doc.getNumRestartsDueToDonorConnectionFailure(), 0);
+ // Kill the oplog fetcher with a non-retriable error.
+ const auto nonRetriableErrorCode = ErrorCodes::Error(5271901);
+ ASSERT_FALSE(ErrorCodes::isRetriableError(nonRetriableErrorCode));
+ oplogFetcher->shutdownWith({nonRetriableErrorCode, "Injected non-retriable error"});
+
+ fp->setMode(FailPoint::off);
+ // Wait for task completion failure.
+ auto status = instance->getDataSyncCompletionFuture().getNoThrow();
+ ASSERT_EQ(nonRetriableErrorCode, status.code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+
+ doc = getStateDoc(instance.get());
+ ASSERT_EQ(doc.getNumRestartsDueToDonorConnectionFailure(), 0);
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, RecipientWillNotRetryOnExternalInterrupt) {
+ auto fp =
+ globalFailPointRegistry().find("fpAfterStartingOplogFetcherMigrationRecipientInstance");
+ auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn,
+ 0,
+ BSON("action"
+ << "hang"));
+
+ const UUID migrationUUID = UUID::gen();
+ const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, topOfOplogOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ kDefaultStartMigrationTimestamp,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+ initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload);
+
+ // Create and start the instance.
+ auto opCtx = makeOperationContext();
+ std::shared_ptr<TenantMigrationRecipientService::Instance> instance;
+ {
+ FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance");
+ // Create and start the instance.
+ instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>());
+ }
+
+ fp->waitForTimesEntered(initialTimesEntered + 1);
+ auto oplogFetcher = checked_cast<OplogFetcherMock*>(getDonorOplogFetcher(instance.get()));
+ ASSERT_TRUE(oplogFetcher != nullptr);
+ ASSERT_TRUE(oplogFetcher->isActive());
+
+ auto doc = getStateDoc(instance.get());
+ ASSERT_EQ(doc.getNumRestartsDueToDonorConnectionFailure(), 0);
+ ASSERT_TRUE(ErrorCodes::isRetriableError(ErrorCodes::SocketException));
+ // Interrupt the task with 'skipWaitingForForgetMigration' = true.
+ instance->interrupt(
+ {ErrorCodes::SocketException, "Test retriable error with external interrupt"});
+
+ fp->setMode(FailPoint::off);
+ // Wait for task completion failure.
+ ASSERT_EQ(instance->getCompletionFuture().getNoThrow(), ErrorCodes::SocketException);
+
+ doc = getStateDoc(instance.get());
+ ASSERT_EQ(doc.getNumRestartsDueToDonorConnectionFailure(), 0);
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, RecipientReceivesRetriableClonerError) {
+ stopFailPointEnableBlock stopFp("fpAfterCollectionClonerDone");
+ auto fp =
+ globalFailPointRegistry().find("fpAfterStartingOplogFetcherMigrationRecipientInstance");
+ auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn,
+ 0,
+ BSON("action"
+ << "hang"));
+
+ const UUID migrationUUID = UUID::gen();
+ const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, topOfOplogOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ kDefaultStartMigrationTimestamp,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+ initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload);
+
+ // Create and start the instance.
+ auto opCtx = makeOperationContext();
+ auto instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+
+ fp->waitForTimesEntered(initialTimesEntered + 1);
+ auto doc = getStateDoc(instance.get());
+ ASSERT_EQ(doc.getNumRestartsDueToDonorConnectionFailure(), 0);
+
+ // Have the cloner fail on a retriable error (from the point of view of the recipient service).
+ MockRemoteDBServer* const _donorServer =
+ mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary());
+ const auto retriableErrorCode = ErrorCodes::HostUnreachable;
+ ASSERT_TRUE(ErrorCodes::isRetriableError(retriableErrorCode));
+ _donorServer->setCommandReply("listDatabases",
+ Status(retriableErrorCode, "Injecting retriable error."));
+
+ auto retryFp =
+ globalFailPointRegistry().find("fpAfterRetrievingStartOpTimesMigrationRecipientInstance");
+ initialTimesEntered = retryFp->setMode(FailPoint::alwaysOn,
+ 0,
+ BSON("action"
+ << "hang"));
+ fp->setMode(FailPoint::off);
+
+ retryFp->waitForTimesEntered(initialTimesEntered + 1);
+ // Let cloner run successfully on retry.
+ _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({}));
+ _donorServer->setCommandReply("find", makeFindResponse());
+ retryFp->setMode(FailPoint::off);
+
+ // Wait for task completion.
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+
+ doc = getStateDoc(instance.get());
+ ASSERT_EQ(doc.getNumRestartsDueToDonorConnectionFailure(), 1);
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, RecipientReceivesNonRetriableClonerError) {
+ stopFailPointEnableBlock stopFp("fpAfterCollectionClonerDone");
+ auto fp =
+ globalFailPointRegistry().find("fpAfterStartingOplogFetcherMigrationRecipientInstance");
+ auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn,
+ 0,
+ BSON("action"
+ << "hang"));
+
+ const UUID migrationUUID = UUID::gen();
+ const OpTime topOfOplogOpTime(Timestamp(5, 1), 1);
+
+ MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
+ insertTopOfOplog(&replSet, topOfOplogOpTime);
+
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ replSet.getConnectionString(),
+ "tenantA",
+ kDefaultStartMigrationTimestamp,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+ initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload);
+
+ // Create and start the instance.
+ auto opCtx = makeOperationContext();
+ auto instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+
+ fp->waitForTimesEntered(initialTimesEntered + 1);
+ auto doc = getStateDoc(instance.get());
+ ASSERT_EQ(doc.getNumRestartsDueToDonorConnectionFailure(), 0);
+
+ // Have the cloner fail on a non-retriable error.
+ MockRemoteDBServer* const _donorServer =
+ mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary());
+ const auto nonRetriableErrorCode = ErrorCodes::Error(5271902);
+ ASSERT_FALSE(ErrorCodes::isRetriableError(nonRetriableErrorCode));
+ _donorServer->setCommandReply("listDatabases",
+ Status(nonRetriableErrorCode, "Injecting non-retriable error."));
+
+ fp->setMode(FailPoint::off);
+
+ // Wait for task completion.
+ ASSERT_EQ(nonRetriableErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getCompletionFuture().getNoThrow());
+
+ doc = getStateDoc(instance.get());
+ ASSERT_EQ(doc.getNumRestartsDueToDonorConnectionFailure(), 0);
+}
+
#endif
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_migration_state_machine.idl b/src/mongo/db/repl/tenant_migration_state_machine.idl
index 296fd91dd72..56c307809c3 100644
--- a/src/mongo/db/repl/tenant_migration_state_machine.idl
+++ b/src/mongo/db/repl/tenant_migration_state_machine.idl
@@ -214,3 +214,8 @@ structs:
downgrade state.
type: fcv_string
optional: true
+ numRestartsDueToDonorConnectionFailure:
+ description: >-
+ A counter that is incremented on each restart due to a donor connection failure.
+ type: long
+ default: 0