diff options
author | Jason Chan <jason.chan@mongodb.com> | 2021-03-09 02:23:04 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-03-17 01:55:09 +0000 |
commit | 06ec07e0aab34e645fa116543afc88695d2d63dc (patch) | |
tree | 24296fbacf08223f4ab2672684939205f9b2c5fb | |
parent | 8da9f544f8952dc7bfcca112127c8c9eb0714cf4 (diff) | |
download | mongo-06ec07e0aab34e645fa116543afc88695d2d63dc.tar.gz |
SERVER-54785 Complete TODO listed in SERVER-52719 and allow recipient to
retry on donor failover after data consistent
5 files changed, 38 insertions, 33 deletions
diff --git a/jstests/replsets/tenant_migration_donor_resume_on_stepup_and_restart.js b/jstests/replsets/tenant_migration_donor_resume_on_stepup_and_restart.js index bad84337a06..8e915d9e724 100644 --- a/jstests/replsets/tenant_migration_donor_resume_on_stepup_and_restart.js +++ b/jstests/replsets/tenant_migration_donor_resume_on_stepup_and_restart.js @@ -43,9 +43,7 @@ function testDonorStartMigrationInterrupt(interruptFunc, donorRestarted) { donorRst.startSet(); donorRst.initiate(); - // TODO SERVER-52719: Remove 'enableRecipientTesting: false'. - const tenantMigrationTest = - new TenantMigrationTest({name: jsTestName(), donorRst, enableRecipientTesting: false}); + const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst}); if (!tenantMigrationTest.isFeatureFlagEnabled()) { jsTestLog("Skipping test because the tenant migrations feature flag is disabled"); donorRst.stopSet(); @@ -125,9 +123,6 @@ function testDonorForgetMigrationInterrupt(interruptFunc) { name: "recipientRst", nodeOptions: Object.assign(migrationX509Options.recipient, { setParameter: { - // TODO SERVER-52719: Remove the failpoint - // 'returnResponseOkForRecipientSyncDataCmd'. - 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'}), tenantMigrationGarbageCollectionDelayMS: kGarbageCollectionDelayMS, ttlMonitorSleepSecs: kTTLMonitorSleepSecs, } @@ -209,9 +204,6 @@ function testDonorAbortMigrationInterrupt(interruptFunc, fpName, isShutdown = fa name: "recipientRst", nodeOptions: Object.assign(migrationX509Options.recipient, { setParameter: { - // TODO SERVER-52719: Remove the failpoint - // 'returnResponseOkForRecipientSyncDataCmd'. - 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'}), tenantMigrationGarbageCollectionDelayMS: kGarbageCollectionDelayMS, ttlMonitorSleepSecs: kTTLMonitorSleepSecs, } @@ -311,9 +303,6 @@ function testStateDocPersistenceOnFailover(interruptFunc, fpName, isShutdown = f name: "recipientRst", nodeOptions: Object.assign(migrationX509Options.recipient, { setParameter: { - // TODO SERVER-52719: Remove the failpoint - // 'returnResponseOkForRecipientSyncDataCmd'. - 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'}), tenantMigrationGarbageCollectionDelayMS: kGarbageCollectionDelayMS, ttlMonitorSleepSecs: kTTLMonitorSleepSecs, } diff --git a/jstests/replsets/tenant_migration_donor_rollback_recovery.js b/jstests/replsets/tenant_migration_donor_rollback_recovery.js index bb339dc7029..01dc59ed0bf 100644 --- a/jstests/replsets/tenant_migration_donor_rollback_recovery.js +++ b/jstests/replsets/tenant_migration_donor_rollback_recovery.js @@ -30,10 +30,9 @@ const recipientRst = new ReplSetTest({ nodes: 1, nodeOptions: Object.assign(migrationX509Options.recipient, { setParameter: { - // TODO SERVER-52719: Remove the failpoint 'returnResponseOkForRecipientSyncDataCmd'. - 'failpoint.returnResponseOkForRecipientSyncDataCmd': tojson({mode: 'alwaysOn'}), tenantMigrationGarbageCollectionDelayMS: kGarbageCollectionDelayMS, ttlMonitorSleepSecs: 1, + tenantMigrationDisableX509Auth: true, } }) }); @@ -72,6 +71,7 @@ function testRollBack(setUpFunc, rollbackOpsFunc, steadyStateFunc) { setParameter: { tenantMigrationGarbageCollectionDelayMS: kGarbageCollectionDelayMS, ttlMonitorSleepSecs: 1, + tenantMigrationDisableX509Auth: true, } }) }); diff --git a/jstests/replsets/tenant_migration_recipient_resumes_on_donor_failover.js b/jstests/replsets/tenant_migration_recipient_resumes_on_donor_failover.js index 587b3a11434..b1fc37d8ecd 100644 --- a/jstests/replsets/tenant_migration_recipient_resumes_on_donor_failover.js +++ b/jstests/replsets/tenant_migration_recipient_resumes_on_donor_failover.js @@ -79,7 +79,10 @@ function runTest(failPoint) { `the recipient should start with 'donorPrimary' as the sync source`); let configRecipientNs = recipientPrimary.getCollection(TenantMigrationTest.kConfigRecipientsNS); let recipientDoc = configRecipientNs.find({"_id": migrationUuid}).toArray(); - assert.eq(recipientDoc[0].state, "started", recipientDoc[0]); + const expectedMigrationState = (failPoint === "fpAfterDataConsistentMigrationRecipientInstance") + ? "consistent" + : "started"; + assert.eq(recipientDoc[0].state, expectedMigrationState, recipientDoc[0]); assert.eq(recipientDoc[0].numRestartsDueToDonorConnectionFailure, 0, recipientDoc[0]); jsTestLog("Stopping the donor primary"); @@ -123,5 +126,8 @@ if (testEnabled) { // Test case where donor is shutdown after cloning has finished but before the donor is notified // that the recipient is in the consistent state. runTest('fpAfterStartingOplogApplierMigrationRecipientInstance'); + // Test case where donor is shutdown after the recipient responds to the first + // 'RecipientSyncData' cmd, indicating that the data is consistent. + runTest('fpAfterDataConsistentMigrationRecipientInstance'); } })(); diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 182d6ef092c..b21ac4f0872 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -407,8 +407,12 @@ TenantMigrationRecipientService::Instance::waitUntilMigrationReachesReturnAfterR } auto getWaitOpTimeFuture = [&]() { - stdx::lock_guard lk(_mutex); - + stdx::unique_lock lk(_mutex); + // In the event of a donor failover, it is possible that a new donor has stepped up and + // initiated this 'recipientSyncData' cmd. Make sure the recipient is not in the middle of + // restarting the oplog applier to retry the future chain. + opCtx->waitForConditionOrInterrupt( + _restartOplogApplierCondVar, lk, [&] { return !_isRestartingOplogApplier; }); if (_dataSyncCompletionPromise.getFuture().isReady()) { // When the data sync is done, we reset _tenantOplogApplier, so just throw the data sync // completion future result. @@ -1652,6 +1656,8 @@ void TenantMigrationRecipientService::Instance::_cleanupOnDataSyncCompletion(Sta std::unique_ptr<ThreadPool> savedWriterPool; { stdx::lock_guard lk(_mutex); + _isRestartingOplogApplier = false; + _restartOplogApplierCondVar.notify_all(); _cancelRemainingWork(lk); @@ -2030,6 +2036,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( { stdx::lock_guard lk(_mutex); uassertStatusOK(_tenantOplogApplier->startup()); + _isRestartingOplogApplier = false; + _restartOplogApplierCondVar.notify_all(); } _stopOrHangOnFailPoint( &fpAfterStartingOplogApplierMigrationRecipientInstance); @@ -2050,9 +2058,19 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( _dataConsistentPromise.emplaceValue( _stateDoc.getDataConsistentStopDonorOpTime().get()); } + }) + .then([this, self = shared_from_this()] { + _stopOrHangOnFailPoint(&fpAfterDataConsistentMigrationRecipientInstance); + stdx::lock_guard lk(_mutex); + // wait for oplog applier to complete/stop. + // The oplog applier does not exit normally; it must be shut down externally, + // e.g. by recipientForgetMigration. + return _tenantOplogApplier->getNotificationForOpTime(OpTime::max()); }); }) - .until([this, self = shared_from_this()](Status status) { + .until([this, self = shared_from_this()]( + StatusOrStatusWith<TenantOplogApplier::OpTimePair> applierStatus) { + auto status = applierStatus.getStatus(); stdx::unique_lock lk(_mutex); if (_taskState.isInterrupted()) { status = _taskState.getInterruptStatus(); @@ -2063,7 +2081,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( if (!_taskState.isRunning()) { _taskState.setState(TaskState::kRunning); } - _taskState.clearInterruptStatus(); + _isRestartingOplogApplier = true; // Clean up the async components before retrying the future chain. _oplogFetcherStatus = boost::none; std::unique_ptr<OplogFetcher> savedDonorOplogFetcher; @@ -2088,15 +2106,6 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( .withBackoffBetweenIterations(kExponentialBackoff) .on(_recipientService->getInstanceCleanupExecutor(), token) .semi() - .thenRunOn(**_scopedExecutor) - .then([this, self = shared_from_this()] { - _stopOrHangOnFailPoint(&fpAfterDataConsistentMigrationRecipientInstance); - stdx::lock_guard lk(_mutex); - // wait for oplog applier to complete/stop. - // The oplog applier does not exit normally; it must be shut down externally, - // e.g. by recipientForgetMigration. - return _tenantOplogApplier->getNotificationForOpTime(OpTime::max()); - }) .thenRunOn(_recipientService->getInstanceCleanupExecutor()) .onCompletion([this, self = shared_from_this()]( StatusOrStatusWith<TenantOplogApplier::OpTimePair> applierStatus) { diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h index eb7d10e6057..2d40b6cda81 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.h +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -242,11 +242,6 @@ public: _isExternalInterrupt = isExternalInterrupt; } - void clearInterruptStatus() { - _interruptStatus = Status{ErrorCodes::InternalError, "Uninitialized value"}; - _isExternalInterrupt = false; - } - bool isExternalInterrupt() const { return (_state == kInterrupted) && _isExternalInterrupt; } @@ -563,6 +558,12 @@ public: // Promise that is resolved when the chain of work kicked off by run() has completed to // indicate whether the state doc is successfully marked as garbage collectable. SharedPromise<void> _taskCompletionPromise; // (W) + + // Waiters are notified when 'tenantOplogApplier' is valid on restart. + stdx::condition_variable _restartOplogApplierCondVar; // (M) + // Indicates that the oplog applier is being cleaned up due to restart of the future chain. + // This is set to true when the oplog applier is started up again. + bool _isRestartingOplogApplier = false; // (M) }; private: |