diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/s/migration_coordinator.cpp | 40 | ||||
-rw-r--r-- | src/mongo/db/s/migration_coordinator.h | 10 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 73 |
3 files changed, 63 insertions, 60 deletions
diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp index d43bd82cc2f..e5f21d6f9c0 100644 --- a/src/mongo/db/s/migration_coordinator.cpp +++ b/src/mongo/db/s/migration_coordinator.cpp @@ -164,27 +164,18 @@ boost::optional<SemiFuture<void>> MigrationCoordinator::completeMigration( if (!_releaseRecipientCriticalSectionFuture) { launchReleaseRecipientCriticalSection(opCtx); } - - try { - invariant(_releaseRecipientCriticalSectionFuture); - _releaseRecipientCriticalSectionFuture->get(opCtx); - } catch (const ExceptionFor<ErrorCodes::ShardNotFound>& exShardNotFound) { - LOGV2(5899100, - "Failed to releaseCriticalSectionOnRecipient", - "shardId"_attr = _migrationInfo.getRecipientShardId(), - "error"_attr = exShardNotFound); - } } boost::optional<SemiFuture<void>> cleanupCompleteFuture = boost::none; switch (*decision) { case DecisionEnum::kAborted: - _abortMigrationOnDonorAndRecipient(opCtx); + _abortMigrationOnDonorAndRecipient(opCtx, acquireCSOnRecipient); hangBeforeForgettingMigrationAfterAbortDecision.pauseWhileSet(); break; case DecisionEnum::kCommitted: - cleanupCompleteFuture = _commitMigrationOnDonorAndRecipient(opCtx); + cleanupCompleteFuture = + _commitMigrationOnDonorAndRecipient(opCtx, acquireCSOnRecipient); hangBeforeForgettingMigrationAfterCommitDecision.pauseWhileSet(); break; } @@ -195,13 +186,17 @@ boost::optional<SemiFuture<void>> MigrationCoordinator::completeMigration( } SemiFuture<void> MigrationCoordinator::_commitMigrationOnDonorAndRecipient( - OperationContext* opCtx) { + OperationContext* opCtx, bool acquireCSOnRecipient) { hangBeforeMakingCommitDecisionDurable.pauseWhileSet(); LOGV2_DEBUG( 23894, 2, "Making commit decision durable", "migrationId"_attr = _migrationInfo.getId()); migrationutil::persistCommitDecision(opCtx, _migrationInfo); + if (acquireCSOnRecipient) { + waitForReleaseRecipientCriticalSectionFuture(opCtx); + } + LOGV2_DEBUG( 23895, 2, @@ -250,7 +245,8 @@ SemiFuture<void> MigrationCoordinator::_commitMigrationOnDonorAndRecipient( return migrationutil::submitRangeDeletionTask(opCtx, deletionTask).semi(); } -void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext* opCtx) { +void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext* opCtx, + bool acquireCSOnRecipient) { hangBeforeMakingAbortDecisionDurable.pauseWhileSet(); LOGV2_DEBUG( @@ -259,6 +255,10 @@ void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext* hangBeforeSendingAbortDecision.pauseWhileSet(); + if (acquireCSOnRecipient) { + waitForReleaseRecipientCriticalSectionFuture(opCtx); + } + // Ensure removing the local range deletion document to prevent incoming migrations with // overlapping ranges to hang. LOGV2_DEBUG(23901, @@ -320,6 +320,18 @@ void MigrationCoordinator::launchReleaseRecipientCriticalSection(OperationContex _migrationInfo.getMigrationSessionId()); } +void MigrationCoordinator::waitForReleaseRecipientCriticalSectionFuture(OperationContext* opCtx) { + invariant(_releaseRecipientCriticalSectionFuture); + try { + _releaseRecipientCriticalSectionFuture->get(opCtx); + } catch (const ExceptionFor<ErrorCodes::ShardNotFound>& exShardNotFound) { + LOGV2(5899100, + "Failed to releaseCriticalSectionOnRecipient", + "shardId"_attr = _migrationInfo.getRecipientShardId(), + "error"_attr = exShardNotFound); + } +} + } // namespace migrationutil } // namespace mongo diff --git a/src/mongo/db/s/migration_coordinator.h b/src/mongo/db/s/migration_coordinator.h index 9f8b9d34d67..0c96d1eab15 100644 --- a/src/mongo/db/s/migration_coordinator.h +++ b/src/mongo/db/s/migration_coordinator.h @@ -108,13 +108,19 @@ private: * the donor as ready to be processed. Returns a future that is set when range deletion for * the donated range completes. */ - SemiFuture<void> _commitMigrationOnDonorAndRecipient(OperationContext* opCtx); + SemiFuture<void> _commitMigrationOnDonorAndRecipient(OperationContext* opCtx, + bool acquireCSOnRecipient); /** * Deletes the range deletion task from the donor node and marks the range deletion task on the * recipient node as ready to be processed. */ - void _abortMigrationOnDonorAndRecipient(OperationContext* opCtx); + void _abortMigrationOnDonorAndRecipient(OperationContext* opCtx, bool acquireCSOnRecipient); + + /** + * Waits for the completion of _releaseRecipientCriticalSectionFuture + */ + void waitForReleaseRecipientCriticalSectionFuture(OperationContext* opCtx); MigrationCoordinatorDocument _migrationInfo; bool _waitForDelete = false; diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 875e99ce5b9..e51b7f8bf29 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -1531,7 +1531,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx, sleepmillis(10); } - if (getState() == FAIL) { + if (getState() == FAIL || getState() == ABORT) { _setStateFail("timed out waiting for commit"); return; } @@ -1572,36 +1572,17 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx, timeInCriticalSection.emplace(); }); - if (getState() == FAIL) { + if (getState() == FAIL || getState() == ABORT) { _setStateFail("timed out waiting for critical section acquisition"); - - runWithoutSession(outerOpCtx, [&] { - RecoverableCriticalSectionService::get(opCtx) - ->releaseRecoverableCriticalSection( - opCtx, - _nss, - critSecReason, - ShardingCatalogClient::kMajorityWriteConcern); - - invariant(timeInCriticalSection); - const auto timeInCriticalSectionMs = timeInCriticalSection->millis(); - ShardingStatistics::get(opCtx) - .totalRecipientCriticalSectionTimeMillis.addAndFetch( - timeInCriticalSectionMs); - - LOGV2(5899115, - "Exited migration recipient critical section", - "nss"_attr = _nss, - "durationMillis"_attr = timeInCriticalSectionMs); - - // Delete the recovery document - migrationutil::deleteMigrationRecipientRecoveryDocument(opCtx, *_migrationId); - }); - - return; } - _setState(ENTERED_CRIT_SEC); + { + stdx::lock_guard<Latch> sl(_mutex); + if (_state != FAIL || _state != ABORT) { + _state = ENTERED_CRIT_SEC; + _stateChangedCV.notify_all(); + } + } } } @@ -1779,28 +1760,32 @@ void MigrationDestinationManager::awaitCriticalSectionReleaseSignalAndCompleteMi { stdx::unique_lock<Latch> lock(_mutex); opCtx->waitForConditionOrInterrupt( - _stateChangedCV, lock, [&]() { return _state == EXIT_CRIT_SEC; }); + _stateChangedCV, lock, [&]() { return _state != ENTERED_CRIT_SEC; }); } + invariant(_state == EXIT_CRIT_SEC || _state == FAIL || _state == ABORT); + // Refresh the filtering metadata - LOGV2_DEBUG(5899112, 3, "Refreshing filtering metadata before exiting critical section"); + if (_state == EXIT_CRIT_SEC) { + LOGV2_DEBUG(5899112, 3, "Refreshing filtering metadata before exiting critical section"); - try { - if (MONGO_unlikely(migrationRecipientFailPostCommitRefresh.shouldFail())) { - uasserted(ErrorCodes::InternalError, "skipShardFilteringMetadataRefresh failpoint"); - } + try { + if (MONGO_unlikely(migrationRecipientFailPostCommitRefresh.shouldFail())) { + uasserted(ErrorCodes::InternalError, "skipShardFilteringMetadataRefresh failpoint"); + } - forceShardFilteringMetadataRefresh(opCtx, _nss); - } catch (const DBException& ex) { - LOGV2_DEBUG(5899103, - 2, - "Post-migration commit refresh failed on recipient", - "migrationId"_attr = _migrationId, - "error"_attr = redact(ex)); + forceShardFilteringMetadataRefresh(opCtx, _nss); + } catch (const DBException& ex) { + LOGV2_DEBUG(5899103, + 2, + "Post-migration commit refresh failed on recipient", + "migrationId"_attr = _migrationId, + "error"_attr = redact(ex)); - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - AutoGetCollection autoColl(opCtx, _nss, MODE_IX); - CollectionShardingRuntime::get(opCtx, _nss)->clearFilteringMetadata(opCtx); + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + AutoGetCollection autoColl(opCtx, _nss, MODE_IX); + CollectionShardingRuntime::get(opCtx, _nss)->clearFilteringMetadata(opCtx); + } } // Release the critical section |