summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/migration_coordinator.cpp40
-rw-r--r--src/mongo/db/s/migration_coordinator.h10
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp73
3 files changed, 63 insertions, 60 deletions
diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp
index d43bd82cc2f..e5f21d6f9c0 100644
--- a/src/mongo/db/s/migration_coordinator.cpp
+++ b/src/mongo/db/s/migration_coordinator.cpp
@@ -164,27 +164,18 @@ boost::optional<SemiFuture<void>> MigrationCoordinator::completeMigration(
if (!_releaseRecipientCriticalSectionFuture) {
launchReleaseRecipientCriticalSection(opCtx);
}
-
- try {
- invariant(_releaseRecipientCriticalSectionFuture);
- _releaseRecipientCriticalSectionFuture->get(opCtx);
- } catch (const ExceptionFor<ErrorCodes::ShardNotFound>& exShardNotFound) {
- LOGV2(5899100,
- "Failed to releaseCriticalSectionOnRecipient",
- "shardId"_attr = _migrationInfo.getRecipientShardId(),
- "error"_attr = exShardNotFound);
- }
}
boost::optional<SemiFuture<void>> cleanupCompleteFuture = boost::none;
switch (*decision) {
case DecisionEnum::kAborted:
- _abortMigrationOnDonorAndRecipient(opCtx);
+ _abortMigrationOnDonorAndRecipient(opCtx, acquireCSOnRecipient);
hangBeforeForgettingMigrationAfterAbortDecision.pauseWhileSet();
break;
case DecisionEnum::kCommitted:
- cleanupCompleteFuture = _commitMigrationOnDonorAndRecipient(opCtx);
+ cleanupCompleteFuture =
+ _commitMigrationOnDonorAndRecipient(opCtx, acquireCSOnRecipient);
hangBeforeForgettingMigrationAfterCommitDecision.pauseWhileSet();
break;
}
@@ -195,13 +186,17 @@ boost::optional<SemiFuture<void>> MigrationCoordinator::completeMigration(
}
SemiFuture<void> MigrationCoordinator::_commitMigrationOnDonorAndRecipient(
- OperationContext* opCtx) {
+ OperationContext* opCtx, bool acquireCSOnRecipient) {
hangBeforeMakingCommitDecisionDurable.pauseWhileSet();
LOGV2_DEBUG(
23894, 2, "Making commit decision durable", "migrationId"_attr = _migrationInfo.getId());
migrationutil::persistCommitDecision(opCtx, _migrationInfo);
+ if (acquireCSOnRecipient) {
+ waitForReleaseRecipientCriticalSectionFuture(opCtx);
+ }
+
LOGV2_DEBUG(
23895,
2,
@@ -250,7 +245,8 @@ SemiFuture<void> MigrationCoordinator::_commitMigrationOnDonorAndRecipient(
return migrationutil::submitRangeDeletionTask(opCtx, deletionTask).semi();
}
-void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext* opCtx) {
+void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext* opCtx,
+ bool acquireCSOnRecipient) {
hangBeforeMakingAbortDecisionDurable.pauseWhileSet();
LOGV2_DEBUG(
@@ -259,6 +255,10 @@ void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext*
hangBeforeSendingAbortDecision.pauseWhileSet();
+ if (acquireCSOnRecipient) {
+ waitForReleaseRecipientCriticalSectionFuture(opCtx);
+ }
+
// Ensure removing the local range deletion document to prevent incoming migrations with
// overlapping ranges to hang.
LOGV2_DEBUG(23901,
@@ -320,6 +320,18 @@ void MigrationCoordinator::launchReleaseRecipientCriticalSection(OperationContex
_migrationInfo.getMigrationSessionId());
}
+void MigrationCoordinator::waitForReleaseRecipientCriticalSectionFuture(OperationContext* opCtx) {
+ invariant(_releaseRecipientCriticalSectionFuture);
+ try {
+ _releaseRecipientCriticalSectionFuture->get(opCtx);
+ } catch (const ExceptionFor<ErrorCodes::ShardNotFound>& exShardNotFound) {
+ LOGV2(5899100,
+ "Failed to releaseCriticalSectionOnRecipient",
+ "shardId"_attr = _migrationInfo.getRecipientShardId(),
+ "error"_attr = exShardNotFound);
+ }
+}
+
} // namespace migrationutil
} // namespace mongo
diff --git a/src/mongo/db/s/migration_coordinator.h b/src/mongo/db/s/migration_coordinator.h
index 9f8b9d34d67..0c96d1eab15 100644
--- a/src/mongo/db/s/migration_coordinator.h
+++ b/src/mongo/db/s/migration_coordinator.h
@@ -108,13 +108,19 @@ private:
* the donor as ready to be processed. Returns a future that is set when range deletion for
* the donated range completes.
*/
- SemiFuture<void> _commitMigrationOnDonorAndRecipient(OperationContext* opCtx);
+ SemiFuture<void> _commitMigrationOnDonorAndRecipient(OperationContext* opCtx,
+ bool acquireCSOnRecipient);
/**
* Deletes the range deletion task from the donor node and marks the range deletion task on the
* recipient node as ready to be processed.
*/
- void _abortMigrationOnDonorAndRecipient(OperationContext* opCtx);
+ void _abortMigrationOnDonorAndRecipient(OperationContext* opCtx, bool acquireCSOnRecipient);
+
+ /**
+ * Waits for the completion of _releaseRecipientCriticalSectionFuture
+ */
+ void waitForReleaseRecipientCriticalSectionFuture(OperationContext* opCtx);
MigrationCoordinatorDocument _migrationInfo;
bool _waitForDelete = false;
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 875e99ce5b9..e51b7f8bf29 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -1531,7 +1531,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx,
sleepmillis(10);
}
- if (getState() == FAIL) {
+ if (getState() == FAIL || getState() == ABORT) {
_setStateFail("timed out waiting for commit");
return;
}
@@ -1572,36 +1572,17 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx,
timeInCriticalSection.emplace();
});
- if (getState() == FAIL) {
+ if (getState() == FAIL || getState() == ABORT) {
_setStateFail("timed out waiting for critical section acquisition");
-
- runWithoutSession(outerOpCtx, [&] {
- RecoverableCriticalSectionService::get(opCtx)
- ->releaseRecoverableCriticalSection(
- opCtx,
- _nss,
- critSecReason,
- ShardingCatalogClient::kMajorityWriteConcern);
-
- invariant(timeInCriticalSection);
- const auto timeInCriticalSectionMs = timeInCriticalSection->millis();
- ShardingStatistics::get(opCtx)
- .totalRecipientCriticalSectionTimeMillis.addAndFetch(
- timeInCriticalSectionMs);
-
- LOGV2(5899115,
- "Exited migration recipient critical section",
- "nss"_attr = _nss,
- "durationMillis"_attr = timeInCriticalSectionMs);
-
- // Delete the recovery document
- migrationutil::deleteMigrationRecipientRecoveryDocument(opCtx, *_migrationId);
- });
-
- return;
}
- _setState(ENTERED_CRIT_SEC);
+ {
+ stdx::lock_guard<Latch> sl(_mutex);
+ if (_state != FAIL || _state != ABORT) {
+ _state = ENTERED_CRIT_SEC;
+ _stateChangedCV.notify_all();
+ }
+ }
}
}
@@ -1779,28 +1760,32 @@ void MigrationDestinationManager::awaitCriticalSectionReleaseSignalAndCompleteMi
{
stdx::unique_lock<Latch> lock(_mutex);
opCtx->waitForConditionOrInterrupt(
- _stateChangedCV, lock, [&]() { return _state == EXIT_CRIT_SEC; });
+ _stateChangedCV, lock, [&]() { return _state != ENTERED_CRIT_SEC; });
}
+ invariant(_state == EXIT_CRIT_SEC || _state == FAIL || _state == ABORT);
+
// Refresh the filtering metadata
- LOGV2_DEBUG(5899112, 3, "Refreshing filtering metadata before exiting critical section");
+ if (_state == EXIT_CRIT_SEC) {
+ LOGV2_DEBUG(5899112, 3, "Refreshing filtering metadata before exiting critical section");
- try {
- if (MONGO_unlikely(migrationRecipientFailPostCommitRefresh.shouldFail())) {
- uasserted(ErrorCodes::InternalError, "skipShardFilteringMetadataRefresh failpoint");
- }
+ try {
+ if (MONGO_unlikely(migrationRecipientFailPostCommitRefresh.shouldFail())) {
+ uasserted(ErrorCodes::InternalError, "skipShardFilteringMetadataRefresh failpoint");
+ }
- forceShardFilteringMetadataRefresh(opCtx, _nss);
- } catch (const DBException& ex) {
- LOGV2_DEBUG(5899103,
- 2,
- "Post-migration commit refresh failed on recipient",
- "migrationId"_attr = _migrationId,
- "error"_attr = redact(ex));
+ forceShardFilteringMetadataRefresh(opCtx, _nss);
+ } catch (const DBException& ex) {
+ LOGV2_DEBUG(5899103,
+ 2,
+ "Post-migration commit refresh failed on recipient",
+ "migrationId"_attr = _migrationId,
+ "error"_attr = redact(ex));
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- AutoGetCollection autoColl(opCtx, _nss, MODE_IX);
- CollectionShardingRuntime::get(opCtx, _nss)->clearFilteringMetadata(opCtx);
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState());
+ AutoGetCollection autoColl(opCtx, _nss, MODE_IX);
+ CollectionShardingRuntime::get(opCtx, _nss)->clearFilteringMetadata(opCtx);
+ }
}
// Release the critical section