diff options
Diffstat (limited to 'src/mongo/db/repl/tenant_migration_recipient_service.cpp')
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.cpp | 443 |
1 files changed, 262 insertions, 181 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 297ee928c6f..ab648a0526d 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -481,7 +481,6 @@ TenantMigrationRecipientService::Instance::_createAndConnectClients() { } const auto kDelayedMajorityOpTimeErrorCode = 5272000; - return AsyncTry([this, self = shared_from_this(), getHostCancelSource, @@ -490,7 +489,6 @@ TenantMigrationRecipientService::Instance::_createAndConnectClients() { // Get all donor hosts that we have excluded. const auto& excludedHosts = _getExcludedDonorHosts(lk); - return _donorReplicaSetMonitor ->getHostOrRefresh(_readPreference, excludedHosts, getHostCancelSource.token()) .thenRunOn(**_scopedExecutor) @@ -784,14 +782,18 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() { options.dropCollectionAtStartup = false; options.dropCollectionAtShutdown = false; options.useTemporaryCollection = false; - // Create the oplog buffer outside the mutex to avoid deadlock on a concurrent stepdown. + auto oplogBufferNS = getOplogBufferNs(getMigrationUUID()); - auto bufferCollection = std::make_unique<OplogBufferCollection>( - StorageInterface::get(opCtx.get()), oplogBufferNS, options); + if (!_donorOplogBuffer) { + // Create the oplog buffer outside the mutex to avoid deadlock on a concurrent stepdown. + auto bufferCollection = std::make_unique<OplogBufferCollection>( + StorageInterface::get(opCtx.get()), oplogBufferNS, options); + stdx::lock_guard lk(_mutex); + _donorOplogBuffer = std::move(bufferCollection); + } + stdx::unique_lock lk(_mutex); invariant(_stateDoc.getStartFetchingDonorOpTime()); - _donorOplogBuffer = std::move(bufferCollection); - { // Ensure we are primary when trying to startup and create the oplog buffer collection. auto coordinator = repl::ReplicationCoordinator::get(opCtx.get()); @@ -926,6 +928,7 @@ void TenantMigrationRecipientService::Instance::_oplogFetcherCallback(Status opl "error"_attr = oplogFetcherStatus); _interrupt(oplogFetcherStatus, /*skipWaitingForForgetMigration=*/false); } + _oplogFetcherStatus = oplogFetcherStatus; } void TenantMigrationRecipientService::Instance::_stopOrHangOnFailPoint(FailPoint* fp) { @@ -1250,7 +1253,8 @@ void TenantMigrationRecipientService::Instance::_interrupt(Status status, } } - _taskState.setState(TaskState::kInterrupted, status); + _taskState.setState( + TaskState::kInterrupted, status, skipWaitingForForgetMigration /* isExternalInterrupt */); } void TenantMigrationRecipientService::Instance::interrupt(Status status) { @@ -1352,7 +1356,6 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_updateStateDocForMa void TenantMigrationRecipientService::Instance::_fetchAndStoreDonorClusterTimeKeyDocs( const CancelationToken& token) { std::vector<ExternalKeysCollectionDocument> keyDocs; - auto cursor = _client->query(NamespaceString::kKeysCollectionNamespace, Query().readPref(_readPreference.pref, _readPreference.tags.getTagBSON())); @@ -1411,179 +1414,257 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( "readPreference"_attr = _readPreference); pauseBeforeRunTenantMigrationRecipientInstance.pauseWhileSet(); + // The 'AsyncTry' is run on the cleanup executor as we rely on the 'CancelationToken' to signal + // when work should be canceled rather than letting the scoped executor be destroyed on + // shutdown/stepdown. + return AsyncTry([this, self = shared_from_this(), executor, token] { + return ExecutorFuture(**executor) + .then([this, self = shared_from_this()] { + stdx::lock_guard lk(_mutex); + // Instance task can be started only once for the current term on a primary. + invariant(!_taskState.isDone()); + // If the task state is interrupted, then don't start the task. + if (_taskState.isInterrupted()) { + uassertStatusOK(_taskState.getInterruptStatus()); + } - return ExecutorFuture(**executor) - .then([this, self = shared_from_this()] { - stdx::lock_guard lk(_mutex); - // Instance task can be started only once for the current term on a primary. - invariant(!_taskState.isDone()); - // If the task state is interrupted, then don't start the task. - if (_taskState.isInterrupted()) { - uassertStatusOK(_taskState.getInterruptStatus()); - } - - _taskState.setState(TaskState::kRunning); - - pauseAfterRunTenantMigrationRecipientInstance.pauseWhileSet(); - - return _initializeStateDoc(lk); - }) - .then([this, self = shared_from_this()] { - _stateDocPersistedPromise.emplaceValue(); - uassert(ErrorCodes::TenantMigrationForgotten, - str::stream() << "Migration " << getMigrationUUID() - << " already marked for garbage collect", - !_stateDoc.getExpireAt()); - _stopOrHangOnFailPoint(&fpAfterPersistingTenantMigrationRecipientInstanceStateDoc); - return _createAndConnectClients(); - }) - .then([this, self = shared_from_this()](ConnectionPair ConnectionPair) { - stdx::lock_guard lk(_mutex); - if (_taskState.isInterrupted()) { - uassertStatusOK(_taskState.getInterruptStatus()); - } - - // interrupt() called after this code block will interrupt the cloner and fetcher. - _client = std::move(ConnectionPair.first); - _oplogFetcherClient = std::move(ConnectionPair.second); + // The task state will already have been set to 'kRunning' if we restarted + // the future chain on donor failover. + if (!_taskState.isRunning()) { + _taskState.setState(TaskState::kRunning); + } + pauseAfterRunTenantMigrationRecipientInstance.pauseWhileSet(); + return _initializeStateDoc(lk); + }) + .then([this, self = shared_from_this()] { + if (_stateDocPersistedPromise.getFuture().isReady()) { + // This is a retry of the future chain. + auto opCtx = cc().makeOperationContext(); + TenantMigrationRecipientDocument stateDoc; + { + stdx::lock_guard lk(_mutex); + _stateDoc.setNumRestartsDueToDonorConnectionFailure( + _stateDoc.getNumRestartsDueToDonorConnectionFailure() + 1); + stateDoc = _stateDoc; + } + uassertStatusOK(tenantMigrationRecipientEntryHelpers::updateStateDoc( + opCtx.get(), _stateDoc)); + } else { + // Avoid fulfilling the promise twice on restart of the future chain. + _stateDocPersistedPromise.emplaceValue(); + } + uassert(ErrorCodes::TenantMigrationForgotten, + str::stream() << "Migration " << getMigrationUUID() + << " already marked for garbage collect", + !_stateDoc.getExpireAt()); + _stopOrHangOnFailPoint( + &fpAfterPersistingTenantMigrationRecipientInstanceStateDoc); + return _createAndConnectClients(); + }) + .then([this, self = shared_from_this()](ConnectionPair ConnectionPair) { + stdx::lock_guard lk(_mutex); + if (_taskState.isInterrupted()) { + uassertStatusOK(_taskState.getInterruptStatus()); + } - // Create the writer pool and shared data. - _writerPool = makeTenantMigrationWriterPool(); - _sharedData = std::make_unique<TenantMigrationSharedData>( - getGlobalServiceContext()->getFastClockSource(), - getMigrationUUID(), - _stateDoc.getStartFetchingDonorOpTime().has_value()); - }) - .then([this, self = shared_from_this(), token] { - _fetchAndStoreDonorClusterTimeKeyDocs(token); - }) - .then([this, self = shared_from_this()] { - _stopOrHangOnFailPoint(&fpAfterConnectingTenantMigrationRecipientInstance); - stdx::lock_guard lk(_mutex); + // interrupt() called after this code block will interrupt the cloner and + // fetcher. + _client = std::move(ConnectionPair.first); + _oplogFetcherClient = std::move(ConnectionPair.second); - // Record the FCV at the start of a migration and check for changes in every subsequent - // attempt. Fail if there is any mismatch in FCV or upgrade/downgrade state. - // (Generic FCV reference): This FCV check should exist across LTS binary versions. - auto currentFCV = serverGlobalParams.featureCompatibility.getVersion(); - auto startingFCV = _stateDoc.getRecipientPrimaryStartingFCV(); + if (!_writerPool) { + // Create the writer pool and shared data. + _writerPool = makeTenantMigrationWriterPool(); + } + _sharedData = std::make_unique<TenantMigrationSharedData>( + getGlobalServiceContext()->getFastClockSource(), + getMigrationUUID(), + _stateDoc.getStartFetchingDonorOpTime().has_value()); + }) + .then([this, self = shared_from_this(), token] { + _fetchAndStoreDonorClusterTimeKeyDocs(token); + }) + .then([this, self = shared_from_this()] { + _stopOrHangOnFailPoint(&fpAfterConnectingTenantMigrationRecipientInstance); + stdx::lock_guard lk(_mutex); + + // Record the FCV at the start of a migration and check for changes in every + // subsequent attempt. Fail if there is any mismatch in FCV or + // upgrade/downgrade state. (Generic FCV reference): This FCV check should + // exist across LTS binary versions. + auto currentFCV = serverGlobalParams.featureCompatibility.getVersion(); + auto startingFCV = _stateDoc.getRecipientPrimaryStartingFCV(); + + if (!startingFCV) { + _stateDoc.setRecipientPrimaryStartingFCV(currentFCV); + return _updateStateDocForMajority(lk); + } - if (!startingFCV) { - _stateDoc.setRecipientPrimaryStartingFCV(currentFCV); - return _updateStateDocForMajority(lk); - } + if (startingFCV != currentFCV) { + LOGV2_ERROR(5356200, + "FCV may not change during migration", + "tenantId"_attr = getTenantId(), + "migrationId"_attr = getMigrationUUID(), + "startingFCV"_attr = startingFCV, + "currentFCV"_attr = currentFCV); + uasserted(5356201, "Detected FCV change from last migration attempt."); + } - if (startingFCV != currentFCV) { - LOGV2_ERROR(5356200, - "FCV may not change during migration", - "tenantId"_attr = getTenantId(), - "migrationId"_attr = getMigrationUUID(), - "startingFCV"_attr = startingFCV, - "currentFCV"_attr = currentFCV); - uasserted(5356201, "Detected FCV change from last migration attempt."); - } + return SemiFuture<void>::makeReady(); + }) + .then([this, self = shared_from_this()] { + _stopOrHangOnFailPoint(&fpAfterRecordingRecipientPrimaryStartingFCV); + _compareRecipientAndDonorFCV(); + }) + .then([this, self = shared_from_this()] { + _stopOrHangOnFailPoint(&fpAfterComparingRecipientAndDonorFCV); + stdx::lock_guard lk(_mutex); + _getStartOpTimesFromDonor(lk); + return _updateStateDocForMajority(lk); + }) + .then([this, self = shared_from_this()] { + _fetchRetryableWritesOplogBeforeStartOpTime(); + }) + .then([this, self = shared_from_this()] { + _stopOrHangOnFailPoint( + &fpAfterRetrievingStartOpTimesMigrationRecipientInstance); + _startOplogFetcher(); + }) + .then([this, self = shared_from_this()] { + _stopOrHangOnFailPoint( + &fpAfterStartingOplogFetcherMigrationRecipientInstance); + + stdx::unique_lock lk(_mutex); - return SemiFuture<void>::makeReady(); - }) - .then([this, self = shared_from_this()] { - _stopOrHangOnFailPoint(&fpAfterRecordingRecipientPrimaryStartingFCV); - _compareRecipientAndDonorFCV(); - }) - .then([this, self = shared_from_this()] { - _stopOrHangOnFailPoint(&fpAfterComparingRecipientAndDonorFCV); - stdx::lock_guard lk(_mutex); - _getStartOpTimesFromDonor(lk); - return _updateStateDocForMajority(lk); - }) - .then([this, self = shared_from_this()] { _fetchRetryableWritesOplogBeforeStartOpTime(); }) - .then([this, self = shared_from_this()] { - _stopOrHangOnFailPoint(&fpAfterRetrievingStartOpTimesMigrationRecipientInstance); - _startOplogFetcher(); - }) - .then([this, self = shared_from_this()] { - _stopOrHangOnFailPoint(&fpAfterStartingOplogFetcherMigrationRecipientInstance); + { + // Throwing error when cloner is canceled externally via interrupt(), + // makes the instance to skip the remaining task (i.e., starting oplog + // applier) in the sync process. This step is necessary to prevent race + // between interrupt() and starting oplog applier for the failover + // scenarios where we don't start the cloner if the tenant data is + // already in consistent state. + stdx::lock_guard<TenantMigrationSharedData> sharedDatalk(*_sharedData); + uassertStatusOK(_sharedData->getStatus(sharedDatalk)); + } + // Create the oplog applier but do not start it yet. + invariant(_stateDoc.getStartApplyingDonorOpTime()); + + OpTime beginApplyingAfterOpTime; + bool isResuming = false; + if (_isCloneCompletedMarkerSet(lk)) { + const auto startApplyingDonorOpTime = + *_stateDoc.getStartApplyingDonorOpTime(); + const auto cloneFinishedRecipientOptime = + *_stateDoc.getCloneFinishedRecipientOpTime(); + lk.unlock(); + // We avoid holding the mutex while scanning the local oplog which + // acquires the RSTL in IX mode. This is to allow us to be interruptable + // via a concurrent stepDown which acquires the RSTL in X mode. + beginApplyingAfterOpTime = _getOplogResumeApplyingDonorOptime( + startApplyingDonorOpTime, cloneFinishedRecipientOptime); + isResuming = beginApplyingAfterOpTime > startApplyingDonorOpTime; + lk.lock(); + } else { + beginApplyingAfterOpTime = *_stateDoc.getStartApplyingDonorOpTime(); + } + LOGV2_DEBUG(4881202, + 1, + "Recipient migration service creating oplog applier", + "tenantId"_attr = getTenantId(), + "migrationId"_attr = getMigrationUUID(), + "startApplyingDonorOpTime"_attr = beginApplyingAfterOpTime); + _tenantOplogApplier = + std::make_shared<TenantOplogApplier>(_migrationUuid, + _tenantId, + beginApplyingAfterOpTime, + _donorOplogBuffer.get(), + **_scopedExecutor, + _writerPool.get(), + isResuming); + + // Start the cloner. + auto clonerFuture = _startTenantAllDatabaseCloner(lk); + + // Signal that the data sync has started successfully. + if (!_dataSyncStartedPromise.getFuture().isReady()) { + _dataSyncStartedPromise.emplaceValue(); + } + return clonerFuture; + }) + .then([this, self = shared_from_this()] { return _onCloneSuccess(); }) + .then([this, self = shared_from_this()] { + _fetchCommittedTransactionsBeforeStartOpTime(); + }) + .then([this, self = shared_from_this()] { + _stopOrHangOnFailPoint(&fpAfterCollectionClonerDone); + LOGV2_DEBUG(4881200, + 1, + "Recipient migration service starting oplog applier", + "tenantId"_attr = getTenantId(), + "migrationId"_attr = getMigrationUUID()); + { + stdx::lock_guard lk(_mutex); + uassertStatusOK(_tenantOplogApplier->startup()); + } + _stopOrHangOnFailPoint( + &fpAfterStartingOplogApplierMigrationRecipientInstance); + return _getDataConsistentFuture(); + }) + .then([this, self = shared_from_this()] { + _stopOrHangOnFailPoint(&fpBeforeFulfillingDataConsistentPromise); + stdx::lock_guard lk(_mutex); + LOGV2_DEBUG(4881101, + 1, + "Tenant migration recipient instance is in consistent state", + "migrationId"_attr = getMigrationUUID(), + "tenantId"_attr = getTenantId(), + "donorConsistentOpTime"_attr = + _stateDoc.getDataConsistentStopDonorOpTime()); + + if (!_dataConsistentPromise.getFuture().isReady()) { + _dataConsistentPromise.emplaceValue( + _stateDoc.getDataConsistentStopDonorOpTime().get()); + } + }); + }) + .until([this, self = shared_from_this()](Status status) { stdx::unique_lock lk(_mutex); - - { - // Throwing error when cloner is canceled externally via interrupt(), makes the - // instance to skip the remaining task (i.e., starting oplog applier) in the - // sync process. This step is necessary to prevent race between interrupt() - // and starting oplog applier for the failover scenarios where we don't start - // the cloner if the tenant data is already in consistent state. - stdx::lock_guard<TenantMigrationSharedData> sharedDatalk(*_sharedData); - uassertStatusOK(_sharedData->getStatus(sharedDatalk)); + if (_taskState.isInterrupted()) { + status = _taskState.getInterruptStatus(); } - - // Create the oplog applier but do not start it yet. - invariant(_stateDoc.getStartApplyingDonorOpTime()); - - OpTime beginApplyingAfterOpTime; - bool isResuming = false; - if (_isCloneCompletedMarkerSet(lk)) { - const auto startApplyingDonorOpTime = *_stateDoc.getStartApplyingDonorOpTime(); - const auto cloneFinishedRecipientOptime = - *_stateDoc.getCloneFinishedRecipientOpTime(); + if (ErrorCodes::isRetriableError(status) && !_taskState.isExternalInterrupt() && + _stateDocPersistedPromise.getFuture().isReady()) { + // Reset the task state and clear the interrupt status. + if (!_taskState.isRunning()) { + _taskState.setState(TaskState::kRunning); + } + _taskState.clearInterruptStatus(); + // Clean up the async components before retrying the future chain. + _oplogFetcherStatus = boost::none; + std::unique_ptr<OplogFetcher> savedDonorOplogFetcher; + std::shared_ptr<TenantOplogApplier> savedTenantOplogApplier; + + _cancelRemainingWork(lk); + shutdownTarget(lk, _donorOplogFetcher); + + // Swap the oplog fetcher and applier to join() outside of '_mutex'. + using std::swap; + swap(savedDonorOplogFetcher, _donorOplogFetcher); + swap(savedTenantOplogApplier, _tenantOplogApplier); lk.unlock(); - // We avoid holding the mutex while scanning the local oplog which acquires the RSTL - // in IX mode. This is to allow us to be interruptable via a concurrent stepDown - // which acquires the RSTL in X mode. - beginApplyingAfterOpTime = _getOplogResumeApplyingDonorOptime( - startApplyingDonorOpTime, cloneFinishedRecipientOptime); - isResuming = beginApplyingAfterOpTime > startApplyingDonorOpTime; - lk.lock(); - } else { - beginApplyingAfterOpTime = *_stateDoc.getStartApplyingDonorOpTime(); - } - LOGV2_DEBUG(4881202, - 1, - "Recipient migration service creating oplog applier", - "tenantId"_attr = getTenantId(), - "migrationId"_attr = getMigrationUUID(), - "startApplyingDonorOpTime"_attr = beginApplyingAfterOpTime); - _tenantOplogApplier = std::make_shared<TenantOplogApplier>(_migrationUuid, - _tenantId, - beginApplyingAfterOpTime, - _donorOplogBuffer.get(), - **_scopedExecutor, - _writerPool.get(), - isResuming); - - // Start the cloner. - auto clonerFuture = _startTenantAllDatabaseCloner(lk); - - // Signal that the data sync has started successfully. - _dataSyncStartedPromise.emplaceValue(); - return clonerFuture; - }) - .then([this, self = shared_from_this()] { return _onCloneSuccess(); }) - .then([this, self = shared_from_this()] { _fetchCommittedTransactionsBeforeStartOpTime(); }) - .then([this, self = shared_from_this()] { - _stopOrHangOnFailPoint(&fpAfterCollectionClonerDone); - LOGV2_DEBUG(4881200, - 1, - "Recipient migration service starting oplog applier", - "tenantId"_attr = getTenantId(), - "migrationId"_attr = getMigrationUUID()); - { - stdx::lock_guard lk(_mutex); - uassertStatusOK(_tenantOplogApplier->startup()); - } - _stopOrHangOnFailPoint(&fpAfterStartingOplogApplierMigrationRecipientInstance); - return _getDataConsistentFuture(); - }) - .then([this, self = shared_from_this()] { - _stopOrHangOnFailPoint(&fpBeforeFulfillingDataConsistentPromise); - stdx::lock_guard lk(_mutex); - LOGV2_DEBUG(4881101, - 1, - "Tenant migration recipient instance is in consistent state", - "migrationId"_attr = getMigrationUUID(), - "tenantId"_attr = getTenantId(), - "donorConsistentOpTime"_attr = - _stateDoc.getDataConsistentStopDonorOpTime()); - _dataConsistentPromise.emplaceValue(_stateDoc.getDataConsistentStopDonorOpTime().get()); + // Perform join outside the lock to avoid deadlocks. + joinTarget(savedDonorOplogFetcher); + joinTarget(savedTenantOplogApplier); + return false; + } + return true; }) + .withBackoffBetweenIterations(kExponentialBackoff) + .on(_recipientService->getInstanceCleanupExecutor(), token) + .semi() + .thenRunOn(**_scopedExecutor) .then([this, self = shared_from_this()] { _stopOrHangOnFailPoint(&fpAfterDataConsistentMigrationRecipientInstance); stdx::lock_guard lk(_mutex); @@ -1595,8 +1676,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( .thenRunOn(_recipientService->getInstanceCleanupExecutor()) .onCompletion([this, self = shared_from_this()]( StatusOrStatusWith<TenantOplogApplier::OpTimePair> applierStatus) { - // On shutDown/stepDown, the _scopedExecutor may have already been shut down. So we need - // to schedule the clean up work on the parent executor. + // On shutDown/stepDown, the _scopedExecutor may have already been shut down. So we + // need to schedule the clean up work on the parent executor. // We don't need the final optime from the oplog applier. The data sync does not // normally stop by itself on success. It completes only on errors or on external @@ -1646,8 +1727,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( }) .thenRunOn(**_scopedExecutor) .then([this, self = shared_from_this()] { - // Schedule on the _scopedExecutor to make sure we are still the primary when waiting - // for the recipientForgetMigration command. + // Schedule on the _scopedExecutor to make sure we are still the primary when + // waiting for the recipientForgetMigration command. return _receivedRecipientForgetMigrationPromise.getFuture(); }) .then([this, self = shared_from_this()] { return _markStateDocAsGarbageCollectable(); }) @@ -1656,22 +1737,22 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( auto opCtx = cc().makeOperationContext(); auto storageInterface = StorageInterface::get(opCtx.get()); - // The oplog buffer collection can be safely dropped at this point. In case it doesn't - // exist, dropping will be a no-op. - // It isn't necessary that the drop is majority-committed. A new primary will attempt - // to drop the collection anyway. + // The oplog buffer collection can be safely dropped at this point. In case it + // doesn't exist, dropping will be a no-op. It isn't necessary that the drop is + // majority-committed. A new primary will attempt to drop the collection anyway. return storageInterface->dropCollection(opCtx.get(), getOplogBufferNs(getMigrationUUID())); }) .thenRunOn(_recipientService->getInstanceCleanupExecutor()) .onCompletion([this, self = shared_from_this()](Status status) { - // Schedule on the parent executor to mark the completion of the whole chain so this is - // safe even on shutDown/stepDown. + // Schedule on the parent executor to mark the completion of the whole chain so this + // is safe even on shutDown/stepDown. stdx::lock_guard lk(_mutex); invariant(_dataSyncCompletionPromise.getFuture().isReady()); if (status.isOK()) { LOGV2(4881401, - "Migration marked to be garbage collectable due to recipientForgetMigration " + "Migration marked to be garbage collectable due to " + "recipientForgetMigration " "command", "migrationId"_attr = getMigrationUUID(), "tenantId"_attr = getTenantId(), |