summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Chan <jason.chan@mongodb.com>2021-03-09 02:23:04 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-03-17 01:55:09 +0000
commit06ec07e0aab34e645fa116543afc88695d2d63dc (patch)
tree24296fbacf08223f4ab2672684939205f9b2c5fb
parent8da9f544f8952dc7bfcca112127c8c9eb0714cf4 (diff)
downloadmongo-06ec07e0aab34e645fa116543afc88695d2d63dc.tar.gz
SERVER-54785 Complete TODO listed in SERVER-52719 and allow recipient to
retry on donor failover after data consistent
-rw-r--r--jstests/replsets/tenant_migration_donor_resume_on_stepup_and_restart.js13
-rw-r--r--jstests/replsets/tenant_migration_donor_rollback_recovery.js4
-rw-r--r--jstests/replsets/tenant_migration_recipient_resumes_on_donor_failover.js8
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp35
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.h11
5 files changed, 38 insertions, 33 deletions
diff --git a/jstests/replsets/tenant_migration_donor_resume_on_stepup_and_restart.js b/jstests/replsets/tenant_migration_donor_resume_on_stepup_and_restart.js
index bad84337a06..8e915d9e724 100644
--- a/jstests/replsets/tenant_migration_donor_resume_on_stepup_and_restart.js
+++ b/jstests/replsets/tenant_migration_donor_resume_on_stepup_and_restart.js
@@ -43,9 +43,7 @@ function testDonorStartMigrationInterrupt(interruptFunc, donorRestarted) {
donorRst.startSet();
donorRst.initiate();
- // TODO SERVER-52719: Remove 'enableRecipientTesting: false'.
- const tenantMigrationTest =
- new TenantMigrationTest({name: jsTestName(), donorRst, enableRecipientTesting: false});
+ const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst});
if (!tenantMigrationTest.isFeatureFlagEnabled()) {
jsTestLog("Skipping test because the tenant migrations feature flag is disabled");
donorRst.stopSet();
@@ -125,9 +123,6 @@ function testDonorForgetMigrationInterrupt(interruptFunc) {
name: "recipientRst",
nodeOptions: Object.assign(migrationX509Options.recipient, {
setParameter: {
- // TODO SERVER-52719: Remove the failpoint
- // 'returnResponseOkForRecipientSyncDataCmd'.
- 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'}),
tenantMigrationGarbageCollectionDelayMS: kGarbageCollectionDelayMS,
ttlMonitorSleepSecs: kTTLMonitorSleepSecs,
}
@@ -209,9 +204,6 @@ function testDonorAbortMigrationInterrupt(interruptFunc, fpName, isShutdown = fa
name: "recipientRst",
nodeOptions: Object.assign(migrationX509Options.recipient, {
setParameter: {
- // TODO SERVER-52719: Remove the failpoint
- // 'returnResponseOkForRecipientSyncDataCmd'.
- 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'}),
tenantMigrationGarbageCollectionDelayMS: kGarbageCollectionDelayMS,
ttlMonitorSleepSecs: kTTLMonitorSleepSecs,
}
@@ -311,9 +303,6 @@ function testStateDocPersistenceOnFailover(interruptFunc, fpName, isShutdown = f
name: "recipientRst",
nodeOptions: Object.assign(migrationX509Options.recipient, {
setParameter: {
- // TODO SERVER-52719: Remove the failpoint
- // 'returnResponseOkForRecipientSyncDataCmd'.
- 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'}),
tenantMigrationGarbageCollectionDelayMS: kGarbageCollectionDelayMS,
ttlMonitorSleepSecs: kTTLMonitorSleepSecs,
}
diff --git a/jstests/replsets/tenant_migration_donor_rollback_recovery.js b/jstests/replsets/tenant_migration_donor_rollback_recovery.js
index bb339dc7029..01dc59ed0bf 100644
--- a/jstests/replsets/tenant_migration_donor_rollback_recovery.js
+++ b/jstests/replsets/tenant_migration_donor_rollback_recovery.js
@@ -30,10 +30,9 @@ const recipientRst = new ReplSetTest({
nodes: 1,
nodeOptions: Object.assign(migrationX509Options.recipient, {
setParameter: {
- // TODO SERVER-52719: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'.
- 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'}),
tenantMigrationGarbageCollectionDelayMS: kGarbageCollectionDelayMS,
ttlMonitorSleepSecs: 1,
+ tenantMigrationDisableX509Auth: true,
}
})
});
@@ -72,6 +71,7 @@ function testRollBack(setUpFunc, rollbackOpsFunc, steadyStateFunc) {
setParameter: {
tenantMigrationGarbageCollectionDelayMS: kGarbageCollectionDelayMS,
ttlMonitorSleepSecs: 1,
+ tenantMigrationDisableX509Auth: true,
}
})
});
diff --git a/jstests/replsets/tenant_migration_recipient_resumes_on_donor_failover.js b/jstests/replsets/tenant_migration_recipient_resumes_on_donor_failover.js
index 587b3a11434..b1fc37d8ecd 100644
--- a/jstests/replsets/tenant_migration_recipient_resumes_on_donor_failover.js
+++ b/jstests/replsets/tenant_migration_recipient_resumes_on_donor_failover.js
@@ -79,7 +79,10 @@ function runTest(failPoint) {
`the recipient should start with 'donorPrimary' as the sync source`);
let configRecipientNs = recipientPrimary.getCollection(TenantMigrationTest.kConfigRecipientsNS);
let recipientDoc = configRecipientNs.find({"_id": migrationUuid}).toArray();
- assert.eq(recipientDoc[0].state, "started", recipientDoc[0]);
+ const expectedMigrationState = (failPoint === "fpAfterDataConsistentMigrationRecipientInstance")
+ ? "consistent"
+ : "started";
+ assert.eq(recipientDoc[0].state, expectedMigrationState, recipientDoc[0]);
assert.eq(recipientDoc[0].numRestartsDueToDonorConnectionFailure, 0, recipientDoc[0]);
jsTestLog("Stopping the donor primary");
@@ -123,5 +126,8 @@ if (testEnabled) {
// Test case where donor is shutdown after cloning has finished but before the donor is notified
// that the recipient is in the consistent state.
runTest('fpAfterStartingOplogApplierMigrationRecipientInstance');
+ // Test case where donor is shutdown after the recipient responds to the first
+ // 'RecipientSyncData' cmd, indicating that the data is consistent.
+ runTest('fpAfterDataConsistentMigrationRecipientInstance');
}
})();
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 182d6ef092c..b21ac4f0872 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -407,8 +407,12 @@ TenantMigrationRecipientService::Instance::waitUntilMigrationReachesReturnAfterR
}
auto getWaitOpTimeFuture = [&]() {
- stdx::lock_guard lk(_mutex);
-
+ stdx::unique_lock lk(_mutex);
+ // In the event of a donor failover, it is possible that a new donor has stepped up and
+ // initiated this 'recipientSyncData' cmd. Make sure the recipient is not in the middle of
+ // restarting the oplog applier to retry the future chain.
+ opCtx->waitForConditionOrInterrupt(
+ _restartOplogApplierCondVar, lk, [&] { return !_isRestartingOplogApplier; });
if (_dataSyncCompletionPromise.getFuture().isReady()) {
// When the data sync is done, we reset _tenantOplogApplier, so just throw the data sync
// completion future result.
@@ -1652,6 +1656,8 @@ void TenantMigrationRecipientService::Instance::_cleanupOnDataSyncCompletion(Sta
std::unique_ptr<ThreadPool> savedWriterPool;
{
stdx::lock_guard lk(_mutex);
+ _isRestartingOplogApplier = false;
+ _restartOplogApplierCondVar.notify_all();
_cancelRemainingWork(lk);
@@ -2030,6 +2036,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
{
stdx::lock_guard lk(_mutex);
uassertStatusOK(_tenantOplogApplier->startup());
+ _isRestartingOplogApplier = false;
+ _restartOplogApplierCondVar.notify_all();
}
_stopOrHangOnFailPoint(
&fpAfterStartingOplogApplierMigrationRecipientInstance);
@@ -2050,9 +2058,19 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
_dataConsistentPromise.emplaceValue(
_stateDoc.getDataConsistentStopDonorOpTime().get());
}
+ })
+ .then([this, self = shared_from_this()] {
+ _stopOrHangOnFailPoint(&fpAfterDataConsistentMigrationRecipientInstance);
+ stdx::lock_guard lk(_mutex);
+ // wait for oplog applier to complete/stop.
+ // The oplog applier does not exit normally; it must be shut down externally,
+ // e.g. by recipientForgetMigration.
+ return _tenantOplogApplier->getNotificationForOpTime(OpTime::max());
});
})
- .until([this, self = shared_from_this()](Status status) {
+ .until([this, self = shared_from_this()](
+ StatusOrStatusWith<TenantOplogApplier::OpTimePair> applierStatus) {
+ auto status = applierStatus.getStatus();
stdx::unique_lock lk(_mutex);
if (_taskState.isInterrupted()) {
status = _taskState.getInterruptStatus();
@@ -2063,7 +2081,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
if (!_taskState.isRunning()) {
_taskState.setState(TaskState::kRunning);
}
- _taskState.clearInterruptStatus();
+ _isRestartingOplogApplier = true;
// Clean up the async components before retrying the future chain.
_oplogFetcherStatus = boost::none;
std::unique_ptr<OplogFetcher> savedDonorOplogFetcher;
@@ -2088,15 +2106,6 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
.withBackoffBetweenIterations(kExponentialBackoff)
.on(_recipientService->getInstanceCleanupExecutor(), token)
.semi()
- .thenRunOn(**_scopedExecutor)
- .then([this, self = shared_from_this()] {
- _stopOrHangOnFailPoint(&fpAfterDataConsistentMigrationRecipientInstance);
- stdx::lock_guard lk(_mutex);
- // wait for oplog applier to complete/stop.
- // The oplog applier does not exit normally; it must be shut down externally,
- // e.g. by recipientForgetMigration.
- return _tenantOplogApplier->getNotificationForOpTime(OpTime::max());
- })
.thenRunOn(_recipientService->getInstanceCleanupExecutor())
.onCompletion([this, self = shared_from_this()](
StatusOrStatusWith<TenantOplogApplier::OpTimePair> applierStatus) {
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h
index eb7d10e6057..2d40b6cda81 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.h
@@ -242,11 +242,6 @@ public:
_isExternalInterrupt = isExternalInterrupt;
}
- void clearInterruptStatus() {
- _interruptStatus = Status{ErrorCodes::InternalError, "Uninitialized value"};
- _isExternalInterrupt = false;
- }
-
bool isExternalInterrupt() const {
return (_state == kInterrupted) && _isExternalInterrupt;
}
@@ -563,6 +558,12 @@ public:
// Promise that is resolved when the chain of work kicked off by run() has completed to
// indicate whether the state doc is successfully marked as garbage collectable.
SharedPromise<void> _taskCompletionPromise; // (W)
+
+ // Waiters are notified when 'tenantOplogApplier' is valid on restart.
+ stdx::condition_variable _restartOplogApplierCondVar; // (M)
+ // Indicates that the oplog applier is being cleaned up due to restart of the future chain.
+ // This is set to true when the oplog applier is started up again.
+ bool _isRestartingOplogApplier = false; // (M)
};
private: