summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorMarcos José Grillo Ramírez <marcos.grillo@mongodb.com>2021-01-08 18:31:31 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-01-08 18:59:59 +0000
commit40aa110c655b6a3b562881c63d14a83c0848b3a0 (patch)
treec1256aceb44955b2fb11722f46e88d3afb9a9dd6 /src/mongo/db/s
parent9e1f0ea4f371a8101f96c84d2ecd3811d68cafb6 (diff)
downloadmongo-40aa110c655b6a3b562881c63d14a83c0848b3a0.tar.gz
SERVER-50890 Improve failover management when completing migrations
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/migration_coordinator.cpp48
-rw-r--r--src/mongo/db/s/migration_coordinator.h7
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp115
-rw-r--r--src/mongo/db/s/migration_source_manager.h2
-rw-r--r--src/mongo/db/s/migration_util.cpp79
-rw-r--r--src/mongo/db/s/migration_util.h6
6 files changed, 128 insertions, 129 deletions
diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp
index d72311a3369..4cc266fa0e3 100644
--- a/src/mongo/db/s/migration_coordinator.cpp
+++ b/src/mongo/db/s/migration_coordinator.cpp
@@ -105,10 +105,8 @@ TxnNumber MigrationCoordinator::getTxnNumber() const {
}
void MigrationCoordinator::startMigration(OperationContext* opCtx) {
- LOGV2_DEBUG(23889,
- 2,
- "Persisting migration coordinator doc",
- "migrationDoc"_attr = _migrationInfo.toBSON());
+ LOGV2_DEBUG(
+ 23889, 2, "Persisting migration coordinator doc", "migrationDoc"_attr = _migrationInfo);
migrationutil::persistMigrationCoordinatorLocally(opCtx, _migrationInfo);
LOGV2_DEBUG(23890,
@@ -127,19 +125,20 @@ void MigrationCoordinator::startMigration(OperationContext* opCtx) {
opCtx, donorDeletionTask, WriteConcerns::kMajorityWriteConcern);
}
-void MigrationCoordinator::setMigrationDecision(Decision decision) {
+void MigrationCoordinator::setMigrationDecision(DecisionEnum decision) {
LOGV2_DEBUG(23891,
2,
"MigrationCoordinator setting migration decision to {decision}",
"MigrationCoordinator setting migration decision",
- "decision"_attr = (decision == Decision::kCommitted ? "committed" : "aborted"),
+ "decision"_attr = (decision == DecisionEnum::kCommitted ? "committed" : "aborted"),
"migrationId"_attr = _migrationInfo.getId());
- _decision = decision;
+ _migrationInfo.setDecision(decision);
}
boost::optional<SemiFuture<void>> MigrationCoordinator::completeMigration(OperationContext* opCtx) {
- if (!_decision) {
+ auto decision = _migrationInfo.getDecision();
+ if (!decision) {
LOGV2(
23892,
"Migration completed without setting a decision. This node might have "
@@ -153,17 +152,17 @@ boost::optional<SemiFuture<void>> MigrationCoordinator::completeMigration(Operat
LOGV2(23893,
"MigrationCoordinator delivering decision {decision} to self and to recipient",
"MigrationCoordinator delivering decision to self and to recipient",
- "decision"_attr = (_decision == Decision::kCommitted ? "committed" : "aborted"),
+ "decision"_attr = (decision == DecisionEnum::kCommitted ? "committed" : "aborted"),
"migrationId"_attr = _migrationInfo.getId());
boost::optional<SemiFuture<void>> cleanupCompleteFuture = boost::none;
- switch (*_decision) {
- case Decision::kAborted:
+ switch (*decision) {
+ case DecisionEnum::kAborted:
_abortMigrationOnDonorAndRecipient(opCtx);
hangBeforeForgettingMigrationAfterAbortDecision.pauseWhileSet();
break;
- case Decision::kCommitted:
+ case DecisionEnum::kCommitted:
cleanupCompleteFuture = _commitMigrationOnDonorAndRecipient(opCtx);
hangBeforeForgettingMigrationAfterCommitDecision.pauseWhileSet();
break;
@@ -180,7 +179,7 @@ SemiFuture<void> MigrationCoordinator::_commitMigrationOnDonorAndRecipient(
LOGV2_DEBUG(
23894, 2, "Making commit decision durable", "migrationId"_attr = _migrationInfo.getId());
- migrationutil::persistCommitDecision(opCtx, _migrationInfo.getId());
+ migrationutil::persistCommitDecision(opCtx, _migrationInfo);
LOGV2_DEBUG(
23895,
@@ -233,7 +232,17 @@ void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext*
LOGV2_DEBUG(
23899, 2, "Making abort decision durable", "migrationId"_attr = _migrationInfo.getId());
- migrationutil::persistAbortDecision(opCtx, _migrationInfo.getId());
+ migrationutil::persistAbortDecision(opCtx, _migrationInfo);
+
+ hangBeforeSendingAbortDecision.pauseWhileSet();
+
+ // Ensure removing the local range deletion document to prevent incoming migrations with
+ // overlapping ranges to hang.
+ LOGV2_DEBUG(23901,
+ 2,
+ "Deleting range deletion task on donor",
+ "migrationId"_attr = _migrationInfo.getId());
+ migrationutil::deleteRangeDeletionTaskLocally(opCtx, _migrationInfo.getId());
try {
LOGV2_DEBUG(23900,
@@ -254,7 +263,8 @@ void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext*
} catch (const ExceptionFor<ErrorCodes::ShardNotFound>& exShardNotFound) {
LOGV2_DEBUG(4620231,
1,
- "Failed to advance transaction number on recipient shard for abort",
+ "Failed to advance transaction number on recipient shard for abort and/or "
+ "marking range deletion task on recipient as ready for processing",
"namespace"_attr = _migrationInfo.getNss(),
"migrationId"_attr = _migrationInfo.getId(),
"recipientShardId"_attr = _migrationInfo.getRecipientShardId(),
@@ -262,14 +272,6 @@ void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext*
"error"_attr = exShardNotFound);
}
- hangBeforeSendingAbortDecision.pauseWhileSet();
-
- LOGV2_DEBUG(23901,
- 2,
- "Deleting range deletion task on donor",
- "migrationId"_attr = _migrationInfo.getId());
- migrationutil::deleteRangeDeletionTaskLocally(opCtx, _migrationInfo.getId());
-
LOGV2_DEBUG(23902,
2,
"Marking range deletion task on recipient as ready for processing",
diff --git a/src/mongo/db/s/migration_coordinator.h b/src/mongo/db/s/migration_coordinator.h
index 742de204d38..e560dff9d0b 100644
--- a/src/mongo/db/s/migration_coordinator.h
+++ b/src/mongo/db/s/migration_coordinator.h
@@ -42,8 +42,6 @@ namespace migrationutil {
*/
class MigrationCoordinator {
public:
- enum class Decision { kAborted, kCommitted };
-
MigrationCoordinator(MigrationSessionId sessionId,
ShardId donorShard,
ShardId recipientShard,
@@ -80,7 +78,7 @@ public:
*
* This method is non-blocking and does not perform any I/O.
*/
- void setMigrationDecision(Decision decision);
+ void setMigrationDecision(DecisionEnum decision);
/**
* If a decision has been set, makes the decision durable, then communicates the decision by
@@ -110,9 +108,6 @@ private:
*/
void _abortMigrationOnDonorAndRecipient(OperationContext* opCtx);
- // The decision of the migration commit against the config server.
- boost::optional<Decision> _decision;
-
MigrationCoordinatorDocument _migrationInfo;
bool _waitForDelete = false;
};
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 3a32c4400c1..a1ae8f973ed 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -500,7 +500,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig() {
"updatedCollectionVersion"_attr = refreshedMetadata.getCollVersion(),
"migrationId"_attr = _coordinator->getMigrationId());
- _coordinator->setMigrationDecision(migrationutil::MigrationCoordinator::Decision::kCommitted);
+ _coordinator->setMigrationDecision(DecisionEnum::kCommitted);
hangBeforeLeavingCriticalSection.pauseWhileSet();
@@ -567,17 +567,7 @@ void MigrationSourceManager::cleanupOnError() {
<< _args.getFromShardId() << "to" << _args.getToShardId()),
ShardingCatalogClient::kMajorityWriteConcern);
- try {
- _cleanup(true);
- } catch (const DBException& ex) {
- LOGV2_WARNING(22022,
- "Failed to clean up migration with request parameters "
- "{chunkMigrationRequestParameters} due to: {error}",
- "Failed to clean up migration",
- "chunkMigrationRequestParameters"_attr = redact(_args.toString()),
- "error"_attr = redact(ex),
- "migrationId"_attr = _coordinator->getMigrationId());
- }
+ _cleanup(true);
}
void MigrationSourceManager::abortDueToConflictingIndexOperation(OperationContext* opCtx) {
@@ -651,7 +641,7 @@ void MigrationSourceManager::_notifyChangeStreamsOnRecipientFirstChunk(
});
}
-void MigrationSourceManager::_cleanup(bool completeMigration) {
+void MigrationSourceManager::_cleanup(bool completeMigration) noexcept {
invariant(_state != kDone);
auto cloneDriver = [&]() {
@@ -692,49 +682,66 @@ void MigrationSourceManager::_cleanup(bool completeMigration) {
cloneDriver->cancelClone(_opCtx);
}
- if (_state == kCriticalSection || _state == kCloneCompleted || _state == kCommittingOnConfig) {
- _stats.totalCriticalSectionTimeMillis.addAndFetch(_cloneAndCommitTimer.millis());
-
- // NOTE: The order of the operations below is important and the comments explain the
- // reasoning behind it
-
- // Wait for the updates to the cache of the routing table to be fully written to disk before
- // clearing the 'minOpTime recovery' document. This way, we ensure that all nodes from a
- // shard, which donated a chunk will always be at the shard version of the last migration it
- // performed.
- //
- // If the metadata is not persisted before clearing the 'inMigration' flag below, it is
- // possible that the persisted metadata is rolled back after step down, but the write which
- // cleared the 'inMigration' flag is not, a secondary node will report itself at an older
- // shard version.
- CatalogCacheLoader::get(_opCtx).waitForCollectionFlush(_opCtx, getNss());
-
- // Clear the 'minOpTime recovery' document so that the next time a node from this shard
- // becomes a primary, it won't have to recover the config server optime.
- ShardingStateRecovery::endMetadataOp(_opCtx);
- }
-
- if (completeMigration && _state >= kCloning) {
- invariant(_coordinator);
- if (_state < kCommittingOnConfig) {
- _coordinator->setMigrationDecision(
- migrationutil::MigrationCoordinator::Decision::kAborted);
- }
- // This can be called on an exception path after the OperationContext has been interrupted,
- // so use a new OperationContext. Note, it's valid to call getServiceContext on an
- // interrupted OperationContext.
- auto newClient = _opCtx->getServiceContext()->makeClient("MigrationCoordinator");
- {
- stdx::lock_guard<Client> lk(*newClient.get());
- newClient->setSystemOperationKillableByStepdown(lk);
+ try {
+ if (_state >= kCloning) {
+ invariant(_coordinator);
+ if (_state < kCommittingOnConfig) {
+ _coordinator->setMigrationDecision(DecisionEnum::kAborted);
+ }
+
+ auto newClient = _opCtx->getServiceContext()->makeClient("MigrationCoordinator");
+ {
+ stdx::lock_guard<Client> lk(*newClient.get());
+ newClient->setSystemOperationKillableByStepdown(lk);
+ }
+ AlternativeClientRegion acr(newClient);
+ auto newOpCtxPtr = cc().makeOperationContext();
+ auto newOpCtx = newOpCtxPtr.get();
+
+ if (_state >= kCriticalSection && _state <= kCommittingOnConfig) {
+ _stats.totalCriticalSectionTimeMillis.addAndFetch(_cloneAndCommitTimer.millis());
+
+ // NOTE: The order of the operations below is important and the comments explain the
+ // reasoning behind it.
+ //
+ // Wait for the updates to the cache of the routing table to be fully written to
+ // disk before clearing the 'minOpTime recovery' document. This way, we ensure that
+ // all nodes from a shard, which donated a chunk will always be at the shard version
+ // of the last migration it performed.
+ //
+ // If the metadata is not persisted before clearing the 'inMigration' flag below, it
+ // is possible that the persisted metadata is rolled back after step down, but the
+ // write which cleared the 'inMigration' flag is not, a secondary node will report
+ // itself at an older shard version.
+ CatalogCacheLoader::get(newOpCtx).waitForCollectionFlush(newOpCtx, getNss());
+
+ // Clear the 'minOpTime recovery' document so that the next time a node from this
+ // shard becomes a primary, it won't have to recover the config server optime.
+ ShardingStateRecovery::endMetadataOp(newOpCtx);
+ }
+ if (completeMigration) {
+ // This can be called on an exception path after the OperationContext has been
+ // interrupted, so use a new OperationContext. Note, it's valid to call
+ // getServiceContext on an interrupted OperationContext.
+ _cleanupCompleteFuture = _coordinator->completeMigration(newOpCtx);
+ }
}
- AlternativeClientRegion acr(newClient);
- auto newOpCtxPtr = cc().makeOperationContext();
- auto newOpCtx = newOpCtxPtr.get();
- _cleanupCompleteFuture = _coordinator->completeMigration(newOpCtx);
- }
- _state = kDone;
+ _state = kDone;
+ } catch (const DBException& ex) {
+ LOGV2_WARNING(5089001,
+ "Failed to complete the migration {migrationId} with "
+ "{chunkMigrationRequestParameters} due to: {error}",
+ "Failed to complete the migration",
+ "chunkMigrationRequestParameters"_attr = redact(_args.toString()),
+ "error"_attr = redact(ex),
+ "migrationId"_attr = _coordinator->getMigrationId());
+ // Something went really wrong when completing the migration just unset the metadata and let
+ // the next op to recover.
+ UninterruptibleLockGuard noInterrupt(_opCtx->lockState());
+ AutoGetCollection autoColl(_opCtx, getNss(), MODE_IX);
+ CollectionShardingRuntime::get(_opCtx, getNss())->clearFilteringMetadata(_opCtx);
+ }
}
BSONObj MigrationSourceManager::getMigrationStatusReport() const {
diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h
index 3bbae9988af..d032047c4fb 100644
--- a/src/mongo/db/s/migration_source_manager.h
+++ b/src/mongo/db/s/migration_source_manager.h
@@ -214,7 +214,7 @@ private:
* Called when any of the states fails. May only be called once and will put the migration
* manager into the kDone state.
*/
- void _cleanup(bool completeMigration);
+ void _cleanup(bool completeMigration) noexcept;
// This is the opCtx of the moveChunk request that constructed the MigrationSourceManager.
// The caller must guarantee it outlives the MigrationSourceManager.
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index 01651ece011..26d1abcbb48 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -583,15 +583,18 @@ void persistRangeDeletionTaskLocally(OperationContext* opCtx,
}
}
-void persistCommitDecision(OperationContext* opCtx, const UUID& migrationId) {
+void persistCommitDecision(OperationContext* opCtx,
+ const MigrationCoordinatorDocument& migrationDoc) {
+ invariant(migrationDoc.getDecision() &&
+ *migrationDoc.getDecision() == DecisionEnum::kCommitted);
+
hangInPersistMigrateCommitDecisionInterruptible.pauseWhileSet(opCtx);
PersistentTaskStore<MigrationCoordinatorDocument> store(
NamespaceString::kMigrationCoordinatorsNamespace);
- store.update(
- opCtx,
- QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationId),
- BSON("$set" << BSON(MigrationCoordinatorDocument::kDecisionFieldName << "committed")));
+ store.upsert(opCtx,
+ QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationDoc.getId()),
+ migrationDoc.toBSON());
if (hangInPersistMigrateCommitDecisionThenSimulateErrorUninterruptible.shouldFail()) {
hangInPersistMigrateCommitDecisionThenSimulateErrorUninterruptible.pauseWhileSet(opCtx);
@@ -600,25 +603,23 @@ void persistCommitDecision(OperationContext* opCtx, const UUID& migrationId) {
}
}
-void persistAbortDecision(OperationContext* opCtx, const UUID& migrationId) {
- retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown(
- opCtx, "persist migrate abort decision", [&](OperationContext* newOpCtx) {
- hangInPersistMigrateAbortDecisionInterruptible.pauseWhileSet(newOpCtx);
-
- PersistentTaskStore<MigrationCoordinatorDocument> store(
- NamespaceString::kMigrationCoordinatorsNamespace);
- store.update(newOpCtx,
- QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationId),
- BSON("$set" << BSON(MigrationCoordinatorDocument::kDecisionFieldName
- << "aborted")));
-
- if (hangInPersistMigrateAbortDecisionThenSimulateErrorUninterruptible.shouldFail()) {
- hangInPersistMigrateAbortDecisionThenSimulateErrorUninterruptible.pauseWhileSet(
- newOpCtx);
- uasserted(ErrorCodes::InternalError,
- "simulate an error response when persisting migrate abort decision");
- }
- });
+void persistAbortDecision(OperationContext* opCtx,
+ const MigrationCoordinatorDocument& migrationDoc) {
+ invariant(migrationDoc.getDecision() && *migrationDoc.getDecision() == DecisionEnum::kAborted);
+
+ hangInPersistMigrateAbortDecisionInterruptible.pauseWhileSet(opCtx);
+
+ PersistentTaskStore<MigrationCoordinatorDocument> store(
+ NamespaceString::kMigrationCoordinatorsNamespace);
+ store.upsert(opCtx,
+ QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationDoc.getId()),
+ migrationDoc.toBSON());
+
+ if (hangInPersistMigrateAbortDecisionThenSimulateErrorUninterruptible.shouldFail()) {
+ hangInPersistMigrateAbortDecisionThenSimulateErrorUninterruptible.pauseWhileSet(opCtx);
+ uasserted(ErrorCodes::InternalError,
+ "simulate an error response when persisting migrate abort decision");
+ }
}
void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx,
@@ -646,20 +647,15 @@ void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx,
void deleteRangeDeletionTaskLocally(OperationContext* opCtx,
const UUID& deletionTaskId,
const WriteConcernOptions& writeConcern) {
- retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown(
- opCtx, "cancel local range deletion", [&](OperationContext* newOpCtx) {
- hangInDeleteRangeDeletionLocallyInterruptible.pauseWhileSet(newOpCtx);
- PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
- store.remove(
- newOpCtx, QUERY(RangeDeletionTask::kIdFieldName << deletionTaskId), writeConcern);
-
- if (hangInDeleteRangeDeletionLocallyThenSimulateErrorUninterruptible.shouldFail()) {
- hangInDeleteRangeDeletionLocallyThenSimulateErrorUninterruptible.pauseWhileSet(
- newOpCtx);
- uasserted(ErrorCodes::InternalError,
- "simulate an error response when deleting range deletion locally");
- }
- });
+ hangInDeleteRangeDeletionLocallyInterruptible.pauseWhileSet(opCtx);
+ PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
+ store.remove(opCtx, QUERY(RangeDeletionTask::kIdFieldName << deletionTaskId), writeConcern);
+
+ if (hangInDeleteRangeDeletionLocallyThenSimulateErrorUninterruptible.shouldFail()) {
+ hangInDeleteRangeDeletionLocallyThenSimulateErrorUninterruptible.pauseWhileSet(opCtx);
+ uasserted(ErrorCodes::InternalError,
+ "simulate an error response when deleting range deletion locally");
+ }
}
void markAsReadyRangeDeletionTaskOnRecipient(OperationContext* opCtx,
@@ -891,9 +887,6 @@ void recoverMigrationCoordinations(OperationContext* opCtx, NamespaceString nss)
if (doc.getDecision()) {
// The decision is already known.
- coordinator.setMigrationDecision((*doc.getDecision()) == DecisionEnum::kCommitted
- ? MigrationCoordinator::Decision::kCommitted
- : MigrationCoordinator::Decision::kAborted);
coordinator.completeMigration(opCtx);
return true;
}
@@ -957,9 +950,9 @@ void recoverMigrationCoordinations(OperationContext* opCtx, NamespaceString nss)
}
if (currentMetadata.keyBelongsToMe(doc.getRange().getMin())) {
- coordinator.setMigrationDecision(MigrationCoordinator::Decision::kAborted);
+ coordinator.setMigrationDecision(DecisionEnum::kAborted);
} else {
- coordinator.setMigrationDecision(MigrationCoordinator::Decision::kCommitted);
+ coordinator.setMigrationDecision(DecisionEnum::kCommitted);
}
coordinator.completeMigration(opCtx);
diff --git a/src/mongo/db/s/migration_util.h b/src/mongo/db/s/migration_util.h
index facef4118e2..09ca6b7c1e0 100644
--- a/src/mongo/db/s/migration_util.h
+++ b/src/mongo/db/s/migration_util.h
@@ -145,13 +145,15 @@ void persistRangeDeletionTaskLocally(OperationContext* opCtx,
* Updates the migration coordinator document to set the decision field to "committed" and waits for
* majority writeConcern.
*/
-void persistCommitDecision(OperationContext* opCtx, const UUID& migrationId);
+void persistCommitDecision(OperationContext* opCtx,
+ const MigrationCoordinatorDocument& migrationDoc);
/**
* Updates the migration coordinator document to set the decision field to "aborted" and waits for
* majority writeConcern.
*/
-void persistAbortDecision(OperationContext* opCtx, const UUID& migrationId);
+void persistAbortDecision(OperationContext* opCtx,
+ const MigrationCoordinatorDocument& migrationDoc);
/**
* Deletes the range deletion task document with the specified id from config.rangeDeletions and