diff options
5 files changed, 700 insertions, 185 deletions
diff --git a/jstests/replsets/tenant_migration_recipient_resumes_on_donor_failover.js b/jstests/replsets/tenant_migration_recipient_resumes_on_donor_failover.js new file mode 100644 index 00000000000..587b3a11434 --- /dev/null +++ b/jstests/replsets/tenant_migration_recipient_resumes_on_donor_failover.js @@ -0,0 +1,127 @@ +/* + * Tests that the recipient will retry a migration on donor sync source failure in the following + * scenarios: + * - donor shuts down when the recipient oplog fetcher is created but cloning has yet to start + * - donor shuts down in the middle of the cloning phase + * - donor shuts down after cloning is finished but the recipient has yet to declare that the data + * is consistent + * + * @tags: [requires_majority_read_concern, requires_fcv_49, incompatible_with_windows_tls] + */ + +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/uuid_util.js"); +load("jstests/libs/write_concern_util.js"); +load("jstests/replsets/libs/tenant_migration_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); +load('jstests/replsets/rslib.js'); + +function runTest(failPoint) { + const recipientRst = new ReplSetTest({ + nodes: 2, + name: jsTestName() + "_recipient", + // Use a batch size of 2 so that collection cloner requires more than a single batch to + // complete. + nodeOptions: Object.assign(TenantMigrationUtil.makeX509OptionsForTest().recipient, + {setParameter: {collectionClonerBatchSize: 2}}) + }); + recipientRst.startSet(); + recipientRst.initiateWithHighElectionTimeout(); + + const tenantMigrationTest = + new TenantMigrationTest({name: jsTestName(), recipientRst, sharedOptions: {nodes: 3}}); + if (!tenantMigrationTest.isFeatureFlagEnabled()) { + jsTestLog("Skipping test because the tenant migrations feature flag is disabled"); + recipientRst.stopSet(); + return false; + } + jsTestLog("Running test with failpoint: " + failPoint); + const tenantId = "testTenantId"; + const tenantDB = tenantMigrationTest.tenantDB(tenantId, "DB"); + const collName = "testColl"; + + const donorRst = tenantMigrationTest.getDonorRst(); + const donorPrimary = tenantMigrationTest.getDonorPrimary(); + const donorSecondary = donorRst.getSecondary(); + + const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); + const recipientDb = recipientPrimary.getDB(tenantDB); + let recipientColl = recipientDb.getCollection(collName); + + tenantMigrationTest.insertDonorDB(tenantDB, collName); + + let waitInFailPoint; + if (failPoint === 'tenantMigrationHangCollectionClonerAfterHandlingBatchResponse') { + waitInFailPoint = + configureFailPoint(recipientPrimary, failPoint, {nss: recipientColl.getFullName()}); + } else { + waitInFailPoint = configureFailPoint(recipientPrimary, failPoint, {action: "hang"}); + } + + const migrationUuid = UUID(); + const migrationOpts = { + migrationIdString: extractUUIDFromObject(migrationUuid), + tenantId, + readPreference: {mode: 'primary'} + }; + + jsTestLog("Starting the tenant migration to wait in failpoint: " + failPoint); + assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); + waitInFailPoint.wait(); + let res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); + let currOp = res.inprog[0]; + // We should start the migration syncing from the primary. + assert.eq(donorPrimary.host, + currOp.donorSyncSource, + `the recipient should start with 'donorPrimary' as the sync source`); + let configRecipientNs = recipientPrimary.getCollection(TenantMigrationTest.kConfigRecipientsNS); + let recipientDoc = configRecipientNs.find({"_id": migrationUuid}).toArray(); + assert.eq(recipientDoc[0].state, "started", recipientDoc[0]); + assert.eq(recipientDoc[0].numRestartsDueToDonorConnectionFailure, 0, recipientDoc[0]); + + jsTestLog("Stopping the donor primary"); + donorRst.stop(donorPrimary); + waitInFailPoint.off(); + assert.soon(() => { + // We expect that the recipient is retrying the migration as the donor has shutdown. We will + // fail trying to find a sync source until a new donor primary is discovered as we will + // honor the original read preference. + let recipientDoc = configRecipientNs.find({"_id": migrationUuid}).toArray(); + jsTestLog("recipientDoc:" + tojson(recipientDoc)); + return recipientDoc[0].numRestartsDueToDonorConnectionFailure == 1; + }); + + let hangOnRetry = configureFailPoint(recipientPrimary, + 'fpAfterStartingOplogFetcherMigrationRecipientInstance', + {action: "hang"}); + // Step up a new donor primary. + assert.commandWorked(donorSecondary.adminCommand({replSetStepUp: 1})); + hangOnRetry.wait(); + res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); + currOp = res.inprog[0]; + // The recipient should resume the migration against the new donor primary. + assert.eq(donorSecondary.host, currOp.donorSyncSource, currOp); + hangOnRetry.off(); + + assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); + // Remove 'donorPrimary' so that the test can complete properly. + donorRst.remove(donorPrimary); + recipientRst.stopSet(); + tenantMigrationTest.stop(); + return true; +} + +// Test case where donor is shutdown after the recipient has started the oplog fetcher but not the +// cloner. +let testEnabled = runTest('fpAfterStartingOplogFetcherMigrationRecipientInstance'); +if (testEnabled) { + // Test case where donor is shutdown in the middle of the cloning phase. + runTest('tenantMigrationHangCollectionClonerAfterHandlingBatchResponse'); + // Test case where donor is shutdown after cloning has finished but before the donor is notified + // that the recipient is in the consistent state. + runTest('fpAfterStartingOplogApplierMigrationRecipientInstance'); +} +})(); diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 297ee928c6f..ab648a0526d 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -481,7 +481,6 @@ TenantMigrationRecipientService::Instance::_createAndConnectClients() { } const auto kDelayedMajorityOpTimeErrorCode = 5272000; - return AsyncTry([this, self = shared_from_this(), getHostCancelSource, @@ -490,7 +489,6 @@ TenantMigrationRecipientService::Instance::_createAndConnectClients() { // Get all donor hosts that we have excluded. const auto& excludedHosts = _getExcludedDonorHosts(lk); - return _donorReplicaSetMonitor ->getHostOrRefresh(_readPreference, excludedHosts, getHostCancelSource.token()) .thenRunOn(**_scopedExecutor) @@ -784,14 +782,18 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() { options.dropCollectionAtStartup = false; options.dropCollectionAtShutdown = false; options.useTemporaryCollection = false; - // Create the oplog buffer outside the mutex to avoid deadlock on a concurrent stepdown. + auto oplogBufferNS = getOplogBufferNs(getMigrationUUID()); - auto bufferCollection = std::make_unique<OplogBufferCollection>( - StorageInterface::get(opCtx.get()), oplogBufferNS, options); + if (!_donorOplogBuffer) { + // Create the oplog buffer outside the mutex to avoid deadlock on a concurrent stepdown. + auto bufferCollection = std::make_unique<OplogBufferCollection>( + StorageInterface::get(opCtx.get()), oplogBufferNS, options); + stdx::lock_guard lk(_mutex); + _donorOplogBuffer = std::move(bufferCollection); + } + stdx::unique_lock lk(_mutex); invariant(_stateDoc.getStartFetchingDonorOpTime()); - _donorOplogBuffer = std::move(bufferCollection); - { // Ensure we are primary when trying to startup and create the oplog buffer collection. auto coordinator = repl::ReplicationCoordinator::get(opCtx.get()); @@ -926,6 +928,7 @@ void TenantMigrationRecipientService::Instance::_oplogFetcherCallback(Status opl "error"_attr = oplogFetcherStatus); _interrupt(oplogFetcherStatus, /*skipWaitingForForgetMigration=*/false); } + _oplogFetcherStatus = oplogFetcherStatus; } void TenantMigrationRecipientService::Instance::_stopOrHangOnFailPoint(FailPoint* fp) { @@ -1250,7 +1253,8 @@ void TenantMigrationRecipientService::Instance::_interrupt(Status status, } } - _taskState.setState(TaskState::kInterrupted, status); + _taskState.setState( + TaskState::kInterrupted, status, skipWaitingForForgetMigration /* isExternalInterrupt */); } void TenantMigrationRecipientService::Instance::interrupt(Status status) { @@ -1352,7 +1356,6 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_updateStateDocForMa void TenantMigrationRecipientService::Instance::_fetchAndStoreDonorClusterTimeKeyDocs( const CancelationToken& token) { std::vector<ExternalKeysCollectionDocument> keyDocs; - auto cursor = _client->query(NamespaceString::kKeysCollectionNamespace, Query().readPref(_readPreference.pref, _readPreference.tags.getTagBSON())); @@ -1411,179 +1414,257 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( "readPreference"_attr = _readPreference); pauseBeforeRunTenantMigrationRecipientInstance.pauseWhileSet(); + // The 'AsyncTry' is run on the cleanup executor as we rely on the 'CancelationToken' to signal + // when work should be canceled rather than letting the scoped executor be destroyed on + // shutdown/stepdown. + return AsyncTry([this, self = shared_from_this(), executor, token] { + return ExecutorFuture(**executor) + .then([this, self = shared_from_this()] { + stdx::lock_guard lk(_mutex); + // Instance task can be started only once for the current term on a primary. + invariant(!_taskState.isDone()); + // If the task state is interrupted, then don't start the task. + if (_taskState.isInterrupted()) { + uassertStatusOK(_taskState.getInterruptStatus()); + } - return ExecutorFuture(**executor) - .then([this, self = shared_from_this()] { - stdx::lock_guard lk(_mutex); - // Instance task can be started only once for the current term on a primary. - invariant(!_taskState.isDone()); - // If the task state is interrupted, then don't start the task. - if (_taskState.isInterrupted()) { - uassertStatusOK(_taskState.getInterruptStatus()); - } - - _taskState.setState(TaskState::kRunning); - - pauseAfterRunTenantMigrationRecipientInstance.pauseWhileSet(); - - return _initializeStateDoc(lk); - }) - .then([this, self = shared_from_this()] { - _stateDocPersistedPromise.emplaceValue(); - uassert(ErrorCodes::TenantMigrationForgotten, - str::stream() << "Migration " << getMigrationUUID() - << " already marked for garbage collect", - !_stateDoc.getExpireAt()); - _stopOrHangOnFailPoint(&fpAfterPersistingTenantMigrationRecipientInstanceStateDoc); - return _createAndConnectClients(); - }) - .then([this, self = shared_from_this()](ConnectionPair ConnectionPair) { - stdx::lock_guard lk(_mutex); - if (_taskState.isInterrupted()) { - uassertStatusOK(_taskState.getInterruptStatus()); - } - - // interrupt() called after this code block will interrupt the cloner and fetcher. - _client = std::move(ConnectionPair.first); - _oplogFetcherClient = std::move(ConnectionPair.second); + // The task state will already have been set to 'kRunning' if we restarted + // the future chain on donor failover. + if (!_taskState.isRunning()) { + _taskState.setState(TaskState::kRunning); + } + pauseAfterRunTenantMigrationRecipientInstance.pauseWhileSet(); + return _initializeStateDoc(lk); + }) + .then([this, self = shared_from_this()] { + if (_stateDocPersistedPromise.getFuture().isReady()) { + // This is a retry of the future chain. + auto opCtx = cc().makeOperationContext(); + TenantMigrationRecipientDocument stateDoc; + { + stdx::lock_guard lk(_mutex); + _stateDoc.setNumRestartsDueToDonorConnectionFailure( + _stateDoc.getNumRestartsDueToDonorConnectionFailure() + 1); + stateDoc = _stateDoc; + } + uassertStatusOK(tenantMigrationRecipientEntryHelpers::updateStateDoc( + opCtx.get(), _stateDoc)); + } else { + // Avoid fulfilling the promise twice on restart of the future chain. + _stateDocPersistedPromise.emplaceValue(); + } + uassert(ErrorCodes::TenantMigrationForgotten, + str::stream() << "Migration " << getMigrationUUID() + << " already marked for garbage collect", + !_stateDoc.getExpireAt()); + _stopOrHangOnFailPoint( + &fpAfterPersistingTenantMigrationRecipientInstanceStateDoc); + return _createAndConnectClients(); + }) + .then([this, self = shared_from_this()](ConnectionPair ConnectionPair) { + stdx::lock_guard lk(_mutex); + if (_taskState.isInterrupted()) { + uassertStatusOK(_taskState.getInterruptStatus()); + } - // Create the writer pool and shared data. - _writerPool = makeTenantMigrationWriterPool(); - _sharedData = std::make_unique<TenantMigrationSharedData>( - getGlobalServiceContext()->getFastClockSource(), - getMigrationUUID(), - _stateDoc.getStartFetchingDonorOpTime().has_value()); - }) - .then([this, self = shared_from_this(), token] { - _fetchAndStoreDonorClusterTimeKeyDocs(token); - }) - .then([this, self = shared_from_this()] { - _stopOrHangOnFailPoint(&fpAfterConnectingTenantMigrationRecipientInstance); - stdx::lock_guard lk(_mutex); + // interrupt() called after this code block will interrupt the cloner and + // fetcher. + _client = std::move(ConnectionPair.first); + _oplogFetcherClient = std::move(ConnectionPair.second); - // Record the FCV at the start of a migration and check for changes in every subsequent - // attempt. Fail if there is any mismatch in FCV or upgrade/downgrade state. - // (Generic FCV reference): This FCV check should exist across LTS binary versions. - auto currentFCV = serverGlobalParams.featureCompatibility.getVersion(); - auto startingFCV = _stateDoc.getRecipientPrimaryStartingFCV(); + if (!_writerPool) { + // Create the writer pool and shared data. + _writerPool = makeTenantMigrationWriterPool(); + } + _sharedData = std::make_unique<TenantMigrationSharedData>( + getGlobalServiceContext()->getFastClockSource(), + getMigrationUUID(), + _stateDoc.getStartFetchingDonorOpTime().has_value()); + }) + .then([this, self = shared_from_this(), token] { + _fetchAndStoreDonorClusterTimeKeyDocs(token); + }) + .then([this, self = shared_from_this()] { + _stopOrHangOnFailPoint(&fpAfterConnectingTenantMigrationRecipientInstance); + stdx::lock_guard lk(_mutex); + + // Record the FCV at the start of a migration and check for changes in every + // subsequent attempt. Fail if there is any mismatch in FCV or + // upgrade/downgrade state. (Generic FCV reference): This FCV check should + // exist across LTS binary versions. + auto currentFCV = serverGlobalParams.featureCompatibility.getVersion(); + auto startingFCV = _stateDoc.getRecipientPrimaryStartingFCV(); + + if (!startingFCV) { + _stateDoc.setRecipientPrimaryStartingFCV(currentFCV); + return _updateStateDocForMajority(lk); + } - if (!startingFCV) { - _stateDoc.setRecipientPrimaryStartingFCV(currentFCV); - return _updateStateDocForMajority(lk); - } + if (startingFCV != currentFCV) { + LOGV2_ERROR(5356200, + "FCV may not change during migration", + "tenantId"_attr = getTenantId(), + "migrationId"_attr = getMigrationUUID(), + "startingFCV"_attr = startingFCV, + "currentFCV"_attr = currentFCV); + uasserted(5356201, "Detected FCV change from last migration attempt."); + } - if (startingFCV != currentFCV) { - LOGV2_ERROR(5356200, - "FCV may not change during migration", - "tenantId"_attr = getTenantId(), - "migrationId"_attr = getMigrationUUID(), - "startingFCV"_attr = startingFCV, - "currentFCV"_attr = currentFCV); - uasserted(5356201, "Detected FCV change from last migration attempt."); - } + return SemiFuture<void>::makeReady(); + }) + .then([this, self = shared_from_this()] { + _stopOrHangOnFailPoint(&fpAfterRecordingRecipientPrimaryStartingFCV); + _compareRecipientAndDonorFCV(); + }) + .then([this, self = shared_from_this()] { + _stopOrHangOnFailPoint(&fpAfterComparingRecipientAndDonorFCV); + stdx::lock_guard lk(_mutex); + _getStartOpTimesFromDonor(lk); + return _updateStateDocForMajority(lk); + }) + .then([this, self = shared_from_this()] { + _fetchRetryableWritesOplogBeforeStartOpTime(); + }) + .then([this, self = shared_from_this()] { + _stopOrHangOnFailPoint( + &fpAfterRetrievingStartOpTimesMigrationRecipientInstance); + _startOplogFetcher(); + }) + .then([this, self = shared_from_this()] { + _stopOrHangOnFailPoint( + &fpAfterStartingOplogFetcherMigrationRecipientInstance); + + stdx::unique_lock lk(_mutex); - return SemiFuture<void>::makeReady(); - }) - .then([this, self = shared_from_this()] { - _stopOrHangOnFailPoint(&fpAfterRecordingRecipientPrimaryStartingFCV); - _compareRecipientAndDonorFCV(); - }) - .then([this, self = shared_from_this()] { - _stopOrHangOnFailPoint(&fpAfterComparingRecipientAndDonorFCV); - stdx::lock_guard lk(_mutex); - _getStartOpTimesFromDonor(lk); - return _updateStateDocForMajority(lk); - }) - .then([this, self = shared_from_this()] { _fetchRetryableWritesOplogBeforeStartOpTime(); }) - .then([this, self = shared_from_this()] { - _stopOrHangOnFailPoint(&fpAfterRetrievingStartOpTimesMigrationRecipientInstance); - _startOplogFetcher(); - }) - .then([this, self = shared_from_this()] { - _stopOrHangOnFailPoint(&fpAfterStartingOplogFetcherMigrationRecipientInstance); + { + // Throwing error when cloner is canceled externally via interrupt(), + // makes the instance to skip the remaining task (i.e., starting oplog + // applier) in the sync process. This step is necessary to prevent race + // between interrupt() and starting oplog applier for the failover + // scenarios where we don't start the cloner if the tenant data is + // already in consistent state. + stdx::lock_guard<TenantMigrationSharedData> sharedDatalk(*_sharedData); + uassertStatusOK(_sharedData->getStatus(sharedDatalk)); + } + // Create the oplog applier but do not start it yet. + invariant(_stateDoc.getStartApplyingDonorOpTime()); + + OpTime beginApplyingAfterOpTime; + bool isResuming = false; + if (_isCloneCompletedMarkerSet(lk)) { + const auto startApplyingDonorOpTime = + *_stateDoc.getStartApplyingDonorOpTime(); + const auto cloneFinishedRecipientOptime = + *_stateDoc.getCloneFinishedRecipientOpTime(); + lk.unlock(); + // We avoid holding the mutex while scanning the local oplog which + // acquires the RSTL in IX mode. This is to allow us to be interruptable + // via a concurrent stepDown which acquires the RSTL in X mode. + beginApplyingAfterOpTime = _getOplogResumeApplyingDonorOptime( + startApplyingDonorOpTime, cloneFinishedRecipientOptime); + isResuming = beginApplyingAfterOpTime > startApplyingDonorOpTime; + lk.lock(); + } else { + beginApplyingAfterOpTime = *_stateDoc.getStartApplyingDonorOpTime(); + } + LOGV2_DEBUG(4881202, + 1, + "Recipient migration service creating oplog applier", + "tenantId"_attr = getTenantId(), + "migrationId"_attr = getMigrationUUID(), + "startApplyingDonorOpTime"_attr = beginApplyingAfterOpTime); + _tenantOplogApplier = + std::make_shared<TenantOplogApplier>(_migrationUuid, + _tenantId, + beginApplyingAfterOpTime, + _donorOplogBuffer.get(), + **_scopedExecutor, + _writerPool.get(), + isResuming); + + // Start the cloner. + auto clonerFuture = _startTenantAllDatabaseCloner(lk); + + // Signal that the data sync has started successfully. + if (!_dataSyncStartedPromise.getFuture().isReady()) { + _dataSyncStartedPromise.emplaceValue(); + } + return clonerFuture; + }) + .then([this, self = shared_from_this()] { return _onCloneSuccess(); }) + .then([this, self = shared_from_this()] { + _fetchCommittedTransactionsBeforeStartOpTime(); + }) + .then([this, self = shared_from_this()] { + _stopOrHangOnFailPoint(&fpAfterCollectionClonerDone); + LOGV2_DEBUG(4881200, + 1, + "Recipient migration service starting oplog applier", + "tenantId"_attr = getTenantId(), + "migrationId"_attr = getMigrationUUID()); + { + stdx::lock_guard lk(_mutex); + uassertStatusOK(_tenantOplogApplier->startup()); + } + _stopOrHangOnFailPoint( + &fpAfterStartingOplogApplierMigrationRecipientInstance); + return _getDataConsistentFuture(); + }) + .then([this, self = shared_from_this()] { + _stopOrHangOnFailPoint(&fpBeforeFulfillingDataConsistentPromise); + stdx::lock_guard lk(_mutex); + LOGV2_DEBUG(4881101, + 1, + "Tenant migration recipient instance is in consistent state", + "migrationId"_attr = getMigrationUUID(), + "tenantId"_attr = getTenantId(), + "donorConsistentOpTime"_attr = + _stateDoc.getDataConsistentStopDonorOpTime()); + + if (!_dataConsistentPromise.getFuture().isReady()) { + _dataConsistentPromise.emplaceValue( + _stateDoc.getDataConsistentStopDonorOpTime().get()); + } + }); + }) + .until([this, self = shared_from_this()](Status status) { stdx::unique_lock lk(_mutex); - - { - // Throwing error when cloner is canceled externally via interrupt(), makes the - // instance to skip the remaining task (i.e., starting oplog applier) in the - // sync process. This step is necessary to prevent race between interrupt() - // and starting oplog applier for the failover scenarios where we don't start - // the cloner if the tenant data is already in consistent state. - stdx::lock_guard<TenantMigrationSharedData> sharedDatalk(*_sharedData); - uassertStatusOK(_sharedData->getStatus(sharedDatalk)); + if (_taskState.isInterrupted()) { + status = _taskState.getInterruptStatus(); } - - // Create the oplog applier but do not start it yet. - invariant(_stateDoc.getStartApplyingDonorOpTime()); - - OpTime beginApplyingAfterOpTime; - bool isResuming = false; - if (_isCloneCompletedMarkerSet(lk)) { - const auto startApplyingDonorOpTime = *_stateDoc.getStartApplyingDonorOpTime(); - const auto cloneFinishedRecipientOptime = - *_stateDoc.getCloneFinishedRecipientOpTime(); + if (ErrorCodes::isRetriableError(status) && !_taskState.isExternalInterrupt() && + _stateDocPersistedPromise.getFuture().isReady()) { + // Reset the task state and clear the interrupt status. + if (!_taskState.isRunning()) { + _taskState.setState(TaskState::kRunning); + } + _taskState.clearInterruptStatus(); + // Clean up the async components before retrying the future chain. + _oplogFetcherStatus = boost::none; + std::unique_ptr<OplogFetcher> savedDonorOplogFetcher; + std::shared_ptr<TenantOplogApplier> savedTenantOplogApplier; + + _cancelRemainingWork(lk); + shutdownTarget(lk, _donorOplogFetcher); + + // Swap the oplog fetcher and applier to join() outside of '_mutex'. + using std::swap; + swap(savedDonorOplogFetcher, _donorOplogFetcher); + swap(savedTenantOplogApplier, _tenantOplogApplier); lk.unlock(); - // We avoid holding the mutex while scanning the local oplog which acquires the RSTL - // in IX mode. This is to allow us to be interruptable via a concurrent stepDown - // which acquires the RSTL in X mode. - beginApplyingAfterOpTime = _getOplogResumeApplyingDonorOptime( - startApplyingDonorOpTime, cloneFinishedRecipientOptime); - isResuming = beginApplyingAfterOpTime > startApplyingDonorOpTime; - lk.lock(); - } else { - beginApplyingAfterOpTime = *_stateDoc.getStartApplyingDonorOpTime(); - } - LOGV2_DEBUG(4881202, - 1, - "Recipient migration service creating oplog applier", - "tenantId"_attr = getTenantId(), - "migrationId"_attr = getMigrationUUID(), - "startApplyingDonorOpTime"_attr = beginApplyingAfterOpTime); - _tenantOplogApplier = std::make_shared<TenantOplogApplier>(_migrationUuid, - _tenantId, - beginApplyingAfterOpTime, - _donorOplogBuffer.get(), - **_scopedExecutor, - _writerPool.get(), - isResuming); - - // Start the cloner. - auto clonerFuture = _startTenantAllDatabaseCloner(lk); - - // Signal that the data sync has started successfully. - _dataSyncStartedPromise.emplaceValue(); - return clonerFuture; - }) - .then([this, self = shared_from_this()] { return _onCloneSuccess(); }) - .then([this, self = shared_from_this()] { _fetchCommittedTransactionsBeforeStartOpTime(); }) - .then([this, self = shared_from_this()] { - _stopOrHangOnFailPoint(&fpAfterCollectionClonerDone); - LOGV2_DEBUG(4881200, - 1, - "Recipient migration service starting oplog applier", - "tenantId"_attr = getTenantId(), - "migrationId"_attr = getMigrationUUID()); - { - stdx::lock_guard lk(_mutex); - uassertStatusOK(_tenantOplogApplier->startup()); - } - _stopOrHangOnFailPoint(&fpAfterStartingOplogApplierMigrationRecipientInstance); - return _getDataConsistentFuture(); - }) - .then([this, self = shared_from_this()] { - _stopOrHangOnFailPoint(&fpBeforeFulfillingDataConsistentPromise); - stdx::lock_guard lk(_mutex); - LOGV2_DEBUG(4881101, - 1, - "Tenant migration recipient instance is in consistent state", - "migrationId"_attr = getMigrationUUID(), - "tenantId"_attr = getTenantId(), - "donorConsistentOpTime"_attr = - _stateDoc.getDataConsistentStopDonorOpTime()); - _dataConsistentPromise.emplaceValue(_stateDoc.getDataConsistentStopDonorOpTime().get()); + // Perform join outside the lock to avoid deadlocks. + joinTarget(savedDonorOplogFetcher); + joinTarget(savedTenantOplogApplier); + return false; + } + return true; }) + .withBackoffBetweenIterations(kExponentialBackoff) + .on(_recipientService->getInstanceCleanupExecutor(), token) + .semi() + .thenRunOn(**_scopedExecutor) .then([this, self = shared_from_this()] { _stopOrHangOnFailPoint(&fpAfterDataConsistentMigrationRecipientInstance); stdx::lock_guard lk(_mutex); @@ -1595,8 +1676,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( .thenRunOn(_recipientService->getInstanceCleanupExecutor()) .onCompletion([this, self = shared_from_this()]( StatusOrStatusWith<TenantOplogApplier::OpTimePair> applierStatus) { - // On shutDown/stepDown, the _scopedExecutor may have already been shut down. So we need - // to schedule the clean up work on the parent executor. + // On shutDown/stepDown, the _scopedExecutor may have already been shut down. So we + // need to schedule the clean up work on the parent executor. // We don't need the final optime from the oplog applier. The data sync does not // normally stop by itself on success. It completes only on errors or on external @@ -1646,8 +1727,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( }) .thenRunOn(**_scopedExecutor) .then([this, self = shared_from_this()] { - // Schedule on the _scopedExecutor to make sure we are still the primary when waiting - // for the recipientForgetMigration command. + // Schedule on the _scopedExecutor to make sure we are still the primary when + // waiting for the recipientForgetMigration command. return _receivedRecipientForgetMigrationPromise.getFuture(); }) .then([this, self = shared_from_this()] { return _markStateDocAsGarbageCollectable(); }) @@ -1656,22 +1737,22 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( auto opCtx = cc().makeOperationContext(); auto storageInterface = StorageInterface::get(opCtx.get()); - // The oplog buffer collection can be safely dropped at this point. In case it doesn't - // exist, dropping will be a no-op. - // It isn't necessary that the drop is majority-committed. A new primary will attempt - // to drop the collection anyway. + // The oplog buffer collection can be safely dropped at this point. In case it + // doesn't exist, dropping will be a no-op. It isn't necessary that the drop is + // majority-committed. A new primary will attempt to drop the collection anyway. return storageInterface->dropCollection(opCtx.get(), getOplogBufferNs(getMigrationUUID())); }) .thenRunOn(_recipientService->getInstanceCleanupExecutor()) .onCompletion([this, self = shared_from_this()](Status status) { - // Schedule on the parent executor to mark the completion of the whole chain so this is - // safe even on shutDown/stepDown. + // Schedule on the parent executor to mark the completion of the whole chain so this + // is safe even on shutDown/stepDown. stdx::lock_guard lk(_mutex); invariant(_dataSyncCompletionPromise.getFuture().isReady()); if (status.isOK()) { LOGV2(4881401, - "Migration marked to be garbage collectable due to recipientForgetMigration " + "Migration marked to be garbage collectable due to " + "recipientForgetMigration " "command", "migrationId"_attr = getMigrationUUID(), "tenantId"_attr = getTenantId(), diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h index 504fffb0596..fde90d1327a 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.h +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -209,14 +209,16 @@ public: case kRunning: return newState == kInterrupted || newState == kDone; case kInterrupted: - return newState == kDone; + return newState == kDone || newState == kRunning; case kDone: return false; } MONGO_UNREACHABLE; } - void setState(StateFlag state, boost::optional<Status> interruptStatus = boost::none) { + void setState(StateFlag state, + boost::optional<Status> interruptStatus = boost::none, + bool isExternalInterrupt = false) { invariant(checkIfValidTransition(state), str::stream() << "current state: " << toString(_state) << ", new state: " << toString(state)); @@ -230,6 +232,16 @@ public: _state = state; _interruptStatus = (interruptStatus) ? interruptStatus.get() : _interruptStatus; + _isExternalInterrupt = isExternalInterrupt; + } + + void clearInterruptStatus() { + _interruptStatus = Status{ErrorCodes::InternalError, "Uninitialized value"}; + _isExternalInterrupt = false; + } + + bool isExternalInterrupt() const { + return (_state == kInterrupted) && _isExternalInterrupt; } bool isNotStarted() const { @@ -276,6 +288,9 @@ public: // task interrupt status. Set to Status::OK() only when the recipient service has not // been interrupted so far, and is used to remember the initial interrupt error. Status _interruptStatus = Status::OK(); + // Indicates if the task was interrupted externally due to a 'recipientForgetMigration' + // or stepdown/shutdown. + bool _isExternalInterrupt = false; }; /* @@ -495,6 +510,8 @@ public: std::unique_ptr<TenantMigrationSharedData> _sharedData; // (S) // Indicates whether the main task future continuation chain state kicked off by run(). TaskState _taskState; // (M) + // Used to indicate whether the migration is able to be retried on fetcher error. + boost::optional<Status> _oplogFetcherStatus; // (M) // Promise that is resolved when the state document is initialized and persisted. SharedPromise<void> _stateDocPersistedPromise; // (W) diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp index ab6829fb324..e060a53ac9e 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp @@ -461,8 +461,7 @@ TEST_F(TenantMigrationRecipientServiceTest, BasicTenantMigrationRecipientService ASSERT_OK(instance->getCompletionFuture().getNoThrow()); } - -TEST_F(TenantMigrationRecipientServiceTest, InstanceReportsErrorOnFailureWhilePersisitingStateDoc) { +TEST_F(TenantMigrationRecipientServiceTest, InstanceReportsErrorOnFailureWhilePersistingStateDoc) { stopFailPointEnableBlock fp("failWhilePersistingTenantMigrationRecipientInstanceStateDoc"); const UUID migrationUUID = UUID::gen(); @@ -2988,6 +2987,292 @@ TEST_F(TenantMigrationRecipientServiceTest, WaitUntilTimestampIsMajorityCommitte ASSERT_EQ(oplogBSON["o"]["msg"].String(), "Noop write for recipientSyncData"); } +TEST_F(TenantMigrationRecipientServiceTest, RecipientReceivesRetriableFetcherError) { + stopFailPointEnableBlock stopFp("fpAfterCollectionClonerDone"); + auto fp = + globalFailPointRegistry().find("fpAfterStartingOplogFetcherMigrationRecipientInstance"); + auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + + const UUID migrationUUID = UUID::gen(); + const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + insertTopOfOplog(&replSet, topOfOplogOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + kDefaultStartMigrationTimestamp, + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload); + + // Create and start the instance. + auto opCtx = makeOperationContext(); + std::shared_ptr<TenantMigrationRecipientService::Instance> instance; + { + FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance"); + // Create and start the instance. + instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); + } + + fp->waitForTimesEntered(initialTimesEntered + 1); + auto oplogFetcher = checked_cast<OplogFetcherMock*>(getDonorOplogFetcher(instance.get())); + ASSERT_TRUE(oplogFetcher != nullptr); + ASSERT_TRUE(oplogFetcher->isActive()); + + auto doc = getStateDoc(instance.get()); + ASSERT_EQ(doc.getNumRestartsDueToDonorConnectionFailure(), 0); + // Kill the oplog fetcher with a retriable error and wait for the migration to retry. + const auto retriableErrorCode = ErrorCodes::SocketException; + ASSERT_TRUE(ErrorCodes::isRetriableError(retriableErrorCode)); + oplogFetcher->shutdownWith({retriableErrorCode, "Injected retriable error"}); + + // Skip the cloners in this test, so we provide an empty list of databases. + MockRemoteDBServer* const _donorServer = + mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary()); + _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({})); + _donorServer->setCommandReply("find", makeFindResponse()); + + fp->setMode(FailPoint::off); + // Wait for task completion. + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); + + doc = getStateDoc(instance.get()); + ASSERT_EQ(doc.getNumRestartsDueToDonorConnectionFailure(), 1); +} + +TEST_F(TenantMigrationRecipientServiceTest, RecipientReceivesNonRetriableFetcherError) { + auto fp = + globalFailPointRegistry().find("fpAfterStartingOplogFetcherMigrationRecipientInstance"); + auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + + const UUID migrationUUID = UUID::gen(); + const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + insertTopOfOplog(&replSet, topOfOplogOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + kDefaultStartMigrationTimestamp, + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload); + + // Create and start the instance. + auto opCtx = makeOperationContext(); + std::shared_ptr<TenantMigrationRecipientService::Instance> instance; + { + FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance"); + // Create and start the instance. + instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); + } + + fp->waitForTimesEntered(initialTimesEntered + 1); + auto oplogFetcher = checked_cast<OplogFetcherMock*>(getDonorOplogFetcher(instance.get())); + ASSERT_TRUE(oplogFetcher != nullptr); + ASSERT_TRUE(oplogFetcher->isActive()); + + auto doc = getStateDoc(instance.get()); + ASSERT_EQ(doc.getNumRestartsDueToDonorConnectionFailure(), 0); + // Kill the oplog fetcher with a non-retriable error. + const auto nonRetriableErrorCode = ErrorCodes::Error(5271901); + ASSERT_FALSE(ErrorCodes::isRetriableError(nonRetriableErrorCode)); + oplogFetcher->shutdownWith({nonRetriableErrorCode, "Injected non-retriable error"}); + + fp->setMode(FailPoint::off); + // Wait for task completion failure. + auto status = instance->getDataSyncCompletionFuture().getNoThrow(); + ASSERT_EQ(nonRetriableErrorCode, status.code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); + + doc = getStateDoc(instance.get()); + ASSERT_EQ(doc.getNumRestartsDueToDonorConnectionFailure(), 0); +} + +TEST_F(TenantMigrationRecipientServiceTest, RecipientWillNotRetryOnExternalInterrupt) { + auto fp = + globalFailPointRegistry().find("fpAfterStartingOplogFetcherMigrationRecipientInstance"); + auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + + const UUID migrationUUID = UUID::gen(); + const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + insertTopOfOplog(&replSet, topOfOplogOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + kDefaultStartMigrationTimestamp, + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload); + + // Create and start the instance. + auto opCtx = makeOperationContext(); + std::shared_ptr<TenantMigrationRecipientService::Instance> instance; + { + FailPointEnableBlock fp("pauseBeforeRunTenantMigrationRecipientInstance"); + // Create and start the instance. + instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + instance->setCreateOplogFetcherFn_forTest(std::make_unique<CreateOplogFetcherMockFn>()); + } + + fp->waitForTimesEntered(initialTimesEntered + 1); + auto oplogFetcher = checked_cast<OplogFetcherMock*>(getDonorOplogFetcher(instance.get())); + ASSERT_TRUE(oplogFetcher != nullptr); + ASSERT_TRUE(oplogFetcher->isActive()); + + auto doc = getStateDoc(instance.get()); + ASSERT_EQ(doc.getNumRestartsDueToDonorConnectionFailure(), 0); + ASSERT_TRUE(ErrorCodes::isRetriableError(ErrorCodes::SocketException)); + // Interrupt the task with 'skipWaitingForForgetMigration' = true. + instance->interrupt( + {ErrorCodes::SocketException, "Test retriable error with external interrupt"}); + + fp->setMode(FailPoint::off); + // Wait for task completion failure. + ASSERT_EQ(instance->getCompletionFuture().getNoThrow(), ErrorCodes::SocketException); + + doc = getStateDoc(instance.get()); + ASSERT_EQ(doc.getNumRestartsDueToDonorConnectionFailure(), 0); +} + +TEST_F(TenantMigrationRecipientServiceTest, RecipientReceivesRetriableClonerError) { + stopFailPointEnableBlock stopFp("fpAfterCollectionClonerDone"); + auto fp = + globalFailPointRegistry().find("fpAfterStartingOplogFetcherMigrationRecipientInstance"); + auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + + const UUID migrationUUID = UUID::gen(); + const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + insertTopOfOplog(&replSet, topOfOplogOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + kDefaultStartMigrationTimestamp, + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload); + + // Create and start the instance. + auto opCtx = makeOperationContext(); + auto instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + + fp->waitForTimesEntered(initialTimesEntered + 1); + auto doc = getStateDoc(instance.get()); + ASSERT_EQ(doc.getNumRestartsDueToDonorConnectionFailure(), 0); + + // Have the cloner fail on a retriable error (from the point of view of the recipient service). + MockRemoteDBServer* const _donorServer = + mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary()); + const auto retriableErrorCode = ErrorCodes::HostUnreachable; + ASSERT_TRUE(ErrorCodes::isRetriableError(retriableErrorCode)); + _donorServer->setCommandReply("listDatabases", + Status(retriableErrorCode, "Injecting retriable error.")); + + auto retryFp = + globalFailPointRegistry().find("fpAfterRetrievingStartOpTimesMigrationRecipientInstance"); + initialTimesEntered = retryFp->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + fp->setMode(FailPoint::off); + + retryFp->waitForTimesEntered(initialTimesEntered + 1); + // Let cloner run successfully on retry. + _donorServer->setCommandReply("listDatabases", makeListDatabasesResponse({})); + _donorServer->setCommandReply("find", makeFindResponse()); + retryFp->setMode(FailPoint::off); + + // Wait for task completion. + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); + + doc = getStateDoc(instance.get()); + ASSERT_EQ(doc.getNumRestartsDueToDonorConnectionFailure(), 1); +} + +TEST_F(TenantMigrationRecipientServiceTest, RecipientReceivesNonRetriableClonerError) { + stopFailPointEnableBlock stopFp("fpAfterCollectionClonerDone"); + auto fp = + globalFailPointRegistry().find("fpAfterStartingOplogFetcherMigrationRecipientInstance"); + auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + + const UUID migrationUUID = UUID::gen(); + const OpTime topOfOplogOpTime(Timestamp(5, 1), 1); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + insertTopOfOplog(&replSet, topOfOplogOpTime); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "tenantA", + kDefaultStartMigrationTimestamp, + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload); + + // Create and start the instance. + auto opCtx = makeOperationContext(); + auto instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + + fp->waitForTimesEntered(initialTimesEntered + 1); + auto doc = getStateDoc(instance.get()); + ASSERT_EQ(doc.getNumRestartsDueToDonorConnectionFailure(), 0); + + // Have the cloner fail on a non-retriable error. + MockRemoteDBServer* const _donorServer = + mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary()); + const auto nonRetriableErrorCode = ErrorCodes::Error(5271902); + ASSERT_FALSE(ErrorCodes::isRetriableError(nonRetriableErrorCode)); + _donorServer->setCommandReply("listDatabases", + Status(nonRetriableErrorCode, "Injecting non-retriable error.")); + + fp->setMode(FailPoint::off); + + // Wait for task completion. + ASSERT_EQ(nonRetriableErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getCompletionFuture().getNoThrow()); + + doc = getStateDoc(instance.get()); + ASSERT_EQ(doc.getNumRestartsDueToDonorConnectionFailure(), 0); +} + #endif } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/tenant_migration_state_machine.idl b/src/mongo/db/repl/tenant_migration_state_machine.idl index 296fd91dd72..56c307809c3 100644 --- a/src/mongo/db/repl/tenant_migration_state_machine.idl +++ b/src/mongo/db/repl/tenant_migration_state_machine.idl @@ -214,3 +214,8 @@ structs: downgrade state. type: fcv_string optional: true + numRestartsDueToDonorConnectionFailure: + description: >- + A counter that is incremented on each restart due to a donor connection failure. + type: long + default: 0 |