diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2020-01-24 02:34:42 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2020-01-24 02:34:42 +0000 |
commit | e791a2ea966bb302ff180dd4538d87c078e74747 (patch) | |
tree | 42f37fe0222fcf47acb5bd92f5d451799f193b83 | |
parent | 281973f2c819cd8dcca2b10c0637091ffdcff2c3 (diff) | |
download | mongo-e791a2ea966bb302ff180dd4538d87c078e74747.tar.gz |
SERVER-44162 Resume coordinating active migrations on stepup
-rw-r--r-- | buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml | 1 | ||||
-rw-r--r-- | buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml | 1 | ||||
-rw-r--r-- | jstests/sharding/migration_coordinator_failover.js | 155 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/s/migration_coordinator.cpp | 23 | ||||
-rw-r--r-- | src/mongo/db/s/migration_coordinator.h | 10 | ||||
-rw-r--r-- | src/mongo/db/s/migration_coordinator_document.idl | 12 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 62 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.cpp | 177 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.h | 21 | ||||
-rw-r--r-- | src/mongo/s/request_types/move_chunk_request.h | 4 |
11 files changed, 401 insertions, 69 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml index cd46d0cb144..6d5b596da65 100644 --- a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml +++ b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml @@ -96,6 +96,7 @@ selector: - jstests/sharding/key_rotation.js - jstests/sharding/keys_rotation_interval_sec.js - jstests/sharding/migration_coordinator_basic.js # sets a failpoint on the config primary + - jstests/sharding/migration_coordinator_failover.js # sets a failpoint on the config primary - jstests/sharding/move_chunk_find_and_modify_with_write_retryability.js - jstests/sharding/move_chunk_insert_with_write_retryability.js - jstests/sharding/move_chunk_remove_with_write_retryability.js diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index b1b648d8f36..ec758617272 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -22,6 +22,7 @@ selector: - jstests/sharding/out_fails_to_replace_sharded_collection.js - jstests/sharding/merge_from_stale_mongos.js - jstests/sharding/migration_coordinator_basic.js + - jstests/sharding/migration_coordinator_failover.js - jstests/sharding/count1.js # Enable when SERVER-44733 is backported - jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js diff --git a/jstests/sharding/migration_coordinator_failover.js b/jstests/sharding/migration_coordinator_failover.js new file mode 100644 index 00000000000..ea690027d2a --- /dev/null +++ b/jstests/sharding/migration_coordinator_failover.js @@ -0,0 +1,155 @@ +/** + * Tests that a donor resumes coordinating a migration if it fails over after creating the + * migration coordinator document but before deleting it. + * + * @tags: [requires_fcv_44] + */ + +// This test induces failovers on shards. +TestData.skipCheckingUUIDsConsistentAcrossCluster = true; + +(function() { +'use strict'; + +load("jstests/libs/fail_point_util.js"); +load('jstests/libs/parallel_shell_helpers.js'); + +function getNewNs(dbName) { + if (typeof getNewNs.counter == 'undefined') { + getNewNs.counter = 0; + } + getNewNs.counter++; + const collName = "ns" + getNewNs.counter; + return [collName, dbName + "." + collName]; +} + +const dbName = "test"; + +var st = new ShardingTest({shards: 2, rs: {nodes: 2}}); + +assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); +assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: st.shard0.shardName})); + +function runMoveChunkMakeDonorStepDownAfterFailpoint( + failpointName, shouldMakeMigrationFailToCommitOnConfig, expectAbortDecisionWithCode) { + const [collName, ns] = getNewNs(dbName); + jsTest.log("Running migration, making donor step down after failpoint " + failpointName + + "; shouldMakeMigrationFailToCommitOnConfig is " + + shouldMakeMigrationFailToCommitOnConfig + "; expectAbortDecisionWithCode is " + + expectAbortDecisionWithCode + "; ns is " + ns); + + // Wait for mongos to see a primary node on the primary shard, because mongos does not retry + // writes on NotMaster errors, and we are about to insert docs through mongos. + awaitRSClientHosts(st.s, st.rs0.getPrimary(), {ok: true, ismaster: true}); + + // Insert some docs into the collection so that the migration leaves orphans on either the + // donor or recipient, depending on the decision. + const numDocs = 1000; + var bulk = st.s.getDB(dbName).getCollection(collName).initializeUnorderedBulkOp(); + for (var i = 0; i < numDocs; i++) { + bulk.insert({_id: i}); + } + assert.commandWorked(bulk.execute()); + + // Shard the collection. + assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}})); + + if (shouldMakeMigrationFailToCommitOnConfig) { + // Turn on a failpoint to make the migration commit fail on the config server. + assert.commandWorked(st.configRS.getPrimary().adminCommand( + {configureFailPoint: "migrationCommitVersionError", mode: "alwaysOn"})); + } + + jsTest.log("Run the moveChunk asynchronously and wait for " + failpointName + " to be hit."); + let failpointHandle = configureFailPoint(st.rs0.getPrimary(), failpointName); + const awaitResult = startParallelShell( + funWithArgs(function(ns, toShardName, expectAbortDecisionWithCode) { + if (expectAbortDecisionWithCode) { + assert.commandFailedWithCode( + db.adminCommand({moveChunk: ns, find: {_id: 0}, to: toShardName}), + expectAbortDecisionWithCode); + } else { + assert.commandWorked( + db.adminCommand({moveChunk: ns, find: {_id: 0}, to: toShardName})); + } + }, ns, st.shard1.shardName, expectAbortDecisionWithCode), st.s.port); + failpointHandle.wait(); + + jsTest.log("Make the donor primary step down."); + assert.commandWorked( + st.rs0.getPrimary().adminCommand({replSetStepDown: 10 /* stepDownSecs */, force: true})); + failpointHandle.off(); + + jsTest.log("Allow the moveChunk to finish."); + awaitResult(); + + if (expectAbortDecisionWithCode) { + jsTest.log("Expect abort decision, so wait for recipient to clean up the orphans."); + assert.soon(() => { + return 0 === st.rs1.getPrimary().getDB(dbName).getCollection(collName).count(); + }); + + } else { + jsTest.log("Expect commit decision, so wait for donor to clean up the orphans."); + assert.soon(() => { + return 0 === st.rs0.getPrimary().getDB(dbName).getCollection(collName).count(); + }); + } + + // The data should still be present on the shard that owns the chunk. + assert.eq(numDocs, st.s.getDB(dbName).getCollection(collName).count()); + + jsTest.log("Wait for the donor to delete the migration coordinator doc"); + assert.soon(() => { + return 0 === + st.rs0.getPrimary().getDB("config").getCollection("migrationCoordinators").count(); + }); + + if (shouldMakeMigrationFailToCommitOnConfig) { + // Turn off the failpoint on the config server before returning. + assert.commandWorked(st.configRS.getPrimary().adminCommand( + {configureFailPoint: "migrationCommitVersionError", mode: "off"})); + } +} + +// +// Decision is commit +// + +runMoveChunkMakeDonorStepDownAfterFailpoint("hangBeforeMakingCommitDecisionDurable", + false /* shouldMakeMigrationFailToCommitOnConfig */); +runMoveChunkMakeDonorStepDownAfterFailpoint("hangBeforeSendingCommitDecision", + false /* shouldMakeMigrationFailToCommitOnConfig */); +runMoveChunkMakeDonorStepDownAfterFailpoint("hangBeforeForgettingMigrationAfterCommitDecision", + false /* shouldMakeMigrationFailToCommitOnConfig */); + +// +// Decision is abort +// + +runMoveChunkMakeDonorStepDownAfterFailpoint("moveChunkHangAtStep3", + false /* shouldMakeMigrationFailToCommitOnConfig */, + ErrorCodes.OperationFailed); + +runMoveChunkMakeDonorStepDownAfterFailpoint("moveChunkHangAtStep4", + false /* shouldMakeMigrationFailToCommitOnConfig */, + ErrorCodes.OperationFailed); + +runMoveChunkMakeDonorStepDownAfterFailpoint("moveChunkHangAtStep5", + false /* shouldMakeMigrationFailToCommitOnConfig */, + ErrorCodes.OperationFailed); + +runMoveChunkMakeDonorStepDownAfterFailpoint("hangBeforeMakingAbortDecisionDurable", + true /* shouldMakeMigrationFailToCommitOnConfig */, + ErrorCodes.StaleEpoch); + +runMoveChunkMakeDonorStepDownAfterFailpoint("hangBeforeSendingAbortDecision", + true /* shouldMakeMigrationFailToCommitOnConfig */, + ErrorCodes.StaleEpoch); + +runMoveChunkMakeDonorStepDownAfterFailpoint("hangBeforeForgettingMigrationAfterAbortDecision", + true /* shouldMakeMigrationFailToCommitOnConfig */, + ErrorCodes.StaleEpoch); + +st.stop(); +})(); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 7e8fb6ba89a..6db23585c2d 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -819,7 +819,11 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook PeriodicBalancerConfigRefresher::get(_service).onStepUp(_service); TransactionCoordinatorService::get(_service)->onStepUp(opCtx); + // Note, these must be done after the configOpTime is recovered via + // ShardingStateRecovery::recover above, because they may trigger filtering metadata + // refreshes which should use the recovered configOpTime. migrationutil::resubmitRangeDeletionsOnStepUp(_service); + migrationutil::resumeMigrationCoordinationsOnStepUp(_service); } else { // unsharded if (auto validator = LogicalTimeValidator::get(_service)) { diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp index 38e2d47b5c4..038b23318a2 100644 --- a/src/mongo/db/s/migration_coordinator.cpp +++ b/src/mongo/db/s/migration_coordinator.cpp @@ -42,6 +42,15 @@ namespace mongo { MONGO_FAIL_POINT_DEFINE(disableWritingPendingRangeDeletionEntries); +MONGO_FAIL_POINT_DEFINE(hangBeforeMakingCommitDecisionDurable); +MONGO_FAIL_POINT_DEFINE(hangBeforeMakingAbortDecisionDurable); + +MONGO_FAIL_POINT_DEFINE(hangBeforeSendingCommitDecision); +MONGO_FAIL_POINT_DEFINE(hangBeforeSendingAbortDecision); + +MONGO_FAIL_POINT_DEFINE(hangBeforeForgettingMigrationAfterCommitDecision); +MONGO_FAIL_POINT_DEFINE(hangBeforeForgettingMigrationAfterAbortDecision); + namespace migrationutil { MigrationCoordinator::MigrationCoordinator(UUID migrationId, @@ -103,18 +112,24 @@ void MigrationCoordinator::completeMigration(OperationContext* opCtx) { switch (*_decision) { case Decision::kAborted: _abortMigrationOnDonorAndRecipient(opCtx); + hangBeforeForgettingMigrationAfterAbortDecision.pauseWhileSet(); break; case Decision::kCommitted: _commitMigrationOnDonorAndRecipient(opCtx); + hangBeforeForgettingMigrationAfterCommitDecision.pauseWhileSet(); break; } - _forgetMigration(opCtx); + forgetMigration(opCtx); } void MigrationCoordinator::_commitMigrationOnDonorAndRecipient(OperationContext* opCtx) { + hangBeforeMakingCommitDecisionDurable.pauseWhileSet(); + LOG(0) << _logPrefix() << "Making commit decision durable"; migrationutil::persistCommitDecision(opCtx, _migrationInfo.getId()); + hangBeforeSendingCommitDecision.pauseWhileSet(); + LOG(0) << _logPrefix() << "Deleting range deletion task on recipient"; migrationutil::deleteRangeDeletionTaskOnRecipient( opCtx, _migrationInfo.getRecipientShardId(), _migrationInfo.getId()); @@ -124,9 +139,13 @@ void MigrationCoordinator::_commitMigrationOnDonorAndRecipient(OperationContext* } void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext* opCtx) { + hangBeforeMakingAbortDecisionDurable.pauseWhileSet(); + LOG(0) << _logPrefix() << "Making abort decision durable"; migrationutil::persistAbortDecision(opCtx, _migrationInfo.getId()); + hangBeforeSendingAbortDecision.pauseWhileSet(); + LOG(0) << _logPrefix() << "Deleting range deletion task on donor"; migrationutil::deleteRangeDeletionTaskLocally(opCtx, _migrationInfo.getId()); @@ -135,7 +154,7 @@ void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext* opCtx, _migrationInfo.getRecipientShardId(), _migrationInfo.getId()); } -void MigrationCoordinator::_forgetMigration(OperationContext* opCtx) { +void MigrationCoordinator::forgetMigration(OperationContext* opCtx) { LOG(0) << _logPrefix() << "Deleting migration coordinator document"; migrationutil::deleteMigrationCoordinatorDocumentLocally(opCtx, _migrationInfo.getId()); } diff --git a/src/mongo/db/s/migration_coordinator.h b/src/mongo/db/s/migration_coordinator.h index a4f89ecbbc7..1f64e5af3a5 100644 --- a/src/mongo/db/s/migration_coordinator.h +++ b/src/mongo/db/s/migration_coordinator.h @@ -82,6 +82,11 @@ public: */ void completeMigration(OperationContext* opCtx); + /** + * Deletes the persistent state for this migration from config.migrationCoordinators. + */ + void forgetMigration(OperationContext* opCtx); + private: /** * Deletes the range deletion task from the recipient node and marks the range deletion task on @@ -95,11 +100,6 @@ private: */ void _abortMigrationOnDonorAndRecipient(OperationContext* opCtx); - /** - * Deletes the persistent state for this migration from config.migrationCoordinators. - */ - void _forgetMigration(OperationContext* opCtx); - // The decision of the migration commit against the config server. boost::optional<Decision> _decision; diff --git a/src/mongo/db/s/migration_coordinator_document.idl b/src/mongo/db/s/migration_coordinator_document.idl index 9e1310fbf79..c1001c0c748 100644 --- a/src/mongo/db/s/migration_coordinator_document.idl +++ b/src/mongo/db/s/migration_coordinator_document.idl @@ -38,6 +38,14 @@ imports: - "mongo/s/chunk_range.idl" - "mongo/s/chunk_version.idl" +enums: + Decision: + description: "Whether the migration committed or aborted." + type: string + values: + kCommitted: "committed" + kAborted: "aborted" + structs: migrationCoordinatorDocument: description: "Represents an in-progress migration on the migration donor." @@ -67,6 +75,6 @@ structs: type: ChunkVersion description: "The version, at the start of the migration, of the chunk being moved." decision: - type: string - description: "Whether the migration committed or aborted." + type: Decision + description: "Enumeration that defines whether the migration committed or aborted." optional: true diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 85479a801a0..728366a814c 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -60,7 +60,6 @@ #include "mongo/s/catalog_cache_loader.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/commit_chunk_migration_request_type.h" -#include "mongo/s/request_types/ensure_chunk_version_is_greater_than_gen.h" #include "mongo/s/request_types/set_shard_version_request.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/util/duration.h" @@ -469,48 +468,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig() { if (!migrationCommitStatus.isOK()) { if (_useFCV44Protocol) { - // Send _configsvrEnsureShardVersionIsGreaterThan until hearing success to ensure - // that if the migration commit has not occurred yet, it will never occur. This - // makes it safe for the shard to refresh to find out if the migration commit - // succeeded. - - ConfigsvrEnsureChunkVersionIsGreaterThan ensureChunkVersionIsGreaterThanRequest; - ensureChunkVersionIsGreaterThanRequest.setDbName(NamespaceString::kAdminDb); - ensureChunkVersionIsGreaterThanRequest.setMinKey(_args.getMinKey()); - ensureChunkVersionIsGreaterThanRequest.setMaxKey(_args.getMaxKey()); - ensureChunkVersionIsGreaterThanRequest.setVersion(_chunkVersion); - const auto ensureChunkVersionIsGreaterThanRequestBSON = - ensureChunkVersionIsGreaterThanRequest.toBSON({}); - - for (int attempts = 1;; attempts++) { - const auto ensureChunkVersionIsGreaterThanResponse = - Grid::get(_opCtx) - ->shardRegistry() - ->getConfigShard() - ->runCommandWithFixedRetryAttempts( - _opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - "admin", - ensureChunkVersionIsGreaterThanRequestBSON, - Shard::RetryPolicy::kIdempotent); - const auto ensureChunkVersionIsGreaterThanStatus = - Shard::CommandResponse::getEffectiveStatus( - ensureChunkVersionIsGreaterThanResponse); - if (ensureChunkVersionIsGreaterThanStatus.isOK()) { - break; - } - - // If the server is already doing a clean shutdown, join the shutdown. This - // prevents the cleanup logic from running if the node is shutting down. - if (globalInShutdownDeprecated()) { - shutdown(waitForShutdown()); - } - _opCtx->checkForInterrupt(); - - LOG(0) << "_configsvrEnsureChunkVersionIsGreaterThan failed after " << attempts - << " attempts " << causedBy(ensureChunkVersionIsGreaterThanStatus) - << " . Will try again."; - } + migrationutil::ensureChunkVersionIsGreaterThan(_opCtx, _args.getRange(), _chunkVersion); } else { // This is the FCV 4.2 and below protocol. @@ -571,23 +529,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig() { } // Incrementally refresh the metadata before leaving the critical section. - for (int attempts = 1;; attempts++) { - try { - forceShardFilteringMetadataRefresh(_opCtx, getNss(), true); - break; - } catch (const DBException& ex) { - // If the server is already doing a clean shutdown, join the shutdown. This prevents the - // cleanup logic from running if the node is shutting down. - if (globalInShutdownDeprecated()) { - shutdown(waitForShutdown()); - } - _opCtx->checkForInterrupt(); - - log() << "Failed to refresh metadata after " << attempts << " attempts, after a " - << (migrationCommitStatus.isOK() ? "failed commit attempt" : "successful commit") - << causedBy(redact(ex.toStatus())) << ". Will try to refresh again."; - } - } + migrationutil::refreshFilteringMetadataUntilSuccess(_opCtx, getNss()); const auto refreshedMetadata = _getCurrentMetadataAndCheckEpoch(); diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index 17d7c806a33..964addcd78f 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -46,6 +46,7 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/active_migrations_registry.h" #include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/s/migration_coordinator.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/write_concern.h" #include "mongo/executor/task_executor_pool.h" @@ -54,6 +55,8 @@ #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/client/shard.h" #include "mongo/s/grid.h" +#include "mongo/s/request_types/ensure_chunk_version_is_greater_than_gen.h" +#include "mongo/util/exit.h" #include "mongo/util/log.h" namespace mongo { @@ -419,5 +422,179 @@ void deleteMigrationCoordinatorDocumentLocally(OperationContext* opCtx, const UU QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationId), {1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)}); } + +void ensureChunkVersionIsGreaterThan(OperationContext* opCtx, + const ChunkRange& range, + const ChunkVersion& preMigrationChunkVersion) { + ConfigsvrEnsureChunkVersionIsGreaterThan ensureChunkVersionIsGreaterThanRequest; + ensureChunkVersionIsGreaterThanRequest.setDbName(NamespaceString::kAdminDb); + ensureChunkVersionIsGreaterThanRequest.setMinKey(range.getMin()); + ensureChunkVersionIsGreaterThanRequest.setMaxKey(range.getMax()); + ensureChunkVersionIsGreaterThanRequest.setVersion(preMigrationChunkVersion); + const auto ensureChunkVersionIsGreaterThanRequestBSON = + ensureChunkVersionIsGreaterThanRequest.toBSON({}); + + for (int attempts = 1;; attempts++) { + const auto ensureChunkVersionIsGreaterThanResponse = + Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + ensureChunkVersionIsGreaterThanRequestBSON, + Shard::RetryPolicy::kIdempotent); + const auto ensureChunkVersionIsGreaterThanStatus = + Shard::CommandResponse::getEffectiveStatus(ensureChunkVersionIsGreaterThanResponse); + if (ensureChunkVersionIsGreaterThanStatus.isOK()) { + break; + } + + // If the server is already doing a clean shutdown, join the shutdown. + if (globalInShutdownDeprecated()) { + shutdown(waitForShutdown()); + } + opCtx->checkForInterrupt(); + + LOG(0) << "_configsvrEnsureChunkVersionIsGreaterThan failed after " << attempts + << " attempts " << causedBy(ensureChunkVersionIsGreaterThanStatus) + << " . Will try again."; + } +} + +void refreshFilteringMetadataUntilSuccess(OperationContext* opCtx, const NamespaceString& nss) { + for (int attempts = 1;; attempts++) { + try { + forceShardFilteringMetadataRefresh(opCtx, nss, true); + break; + } catch (const DBException& ex) { + // If the server is already doing a clean shutdown, join the shutdown. + if (globalInShutdownDeprecated()) { + shutdown(waitForShutdown()); + } + opCtx->checkForInterrupt(); + + LOG(0) << "Failed to refresh metadata for " << nss.ns() << " after " << attempts + << " attempts " << causedBy(redact(ex.toStatus())) + << ". Will try to refresh again."; + } + } +} + +void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) { + LOG(0) << "Starting migration coordinator stepup recovery thread."; + + auto executor = Grid::get(serviceContext)->getExecutorPool()->getFixedExecutor(); + ExecutorFuture<void>(executor).getAsync([serviceContext](const Status& status) { + try { + ThreadClient tc("MigrationCoordinatorStepupRecovery", serviceContext); + { + stdx::lock_guard<Client> lk(*tc.get()); + tc->setSystemOperationKillable(lk); + } + + auto uniqueOpCtx = tc->makeOperationContext(); + auto opCtx = uniqueOpCtx.get(); + + // Wait for the latest OpTime to be majority committed to ensure any decision that is + // read is on the true branch of history. + // Note (Esha): I don't think this is strictly required for correctness, but it is + // is difficult to reason about, and being pessimistic by waiting for the decision to be + // majority committed does not cost much, since stepup should be rare. It *is* required + // that this node ensure a decision that it itself recovers is majority committed. For + // example, it is possible that this node is a stale primary, and the true primary has + // already sent a *commit* decision and re-received a chunk containing the minKey of + // this migration. In this case, this node would see that the minKey is still owned and + // assume the migration *aborted*. If this node communicated the abort decision to the + // recipient, the recipient (if it had not heard the decision yet) would delete data + // that the recipient actually owns. (The recipient does not currently wait to hear the + // range deletion decision for the first migration before being able to donate (any + // part of) the chunk again.) + auto& replClientInfo = repl::ReplClientInfo::forClient(opCtx->getClient()); + replClientInfo.setLastOpToSystemLastOpTime(opCtx); + const auto lastOpTime = replClientInfo.getLastOp(); + LOG(0) << "Waiting for OpTime " << lastOpTime << " to become majority committed"; + WriteConcernResult unusedWCResult; + uassertStatusOK( + waitForWriteConcern(opCtx, + lastOpTime, + WriteConcernOptions{WriteConcernOptions::kMajority, + WriteConcernOptions::SyncMode::UNSET, + WriteConcernOptions::kNoTimeout}, + &unusedWCResult)); + + PersistentTaskStore<MigrationCoordinatorDocument> store( + opCtx, NamespaceString::kMigrationCoordinatorsNamespace); + Query query; + store.forEach(opCtx, query, [&opCtx](const MigrationCoordinatorDocument& doc) { + LOG(0) << "Recovering migration " << doc.toBSON(); + + // Create a MigrationCoordinator to complete the coordination. + MigrationCoordinator coordinator(doc.getId(), + doc.getDonorShardId(), + doc.getRecipientShardId(), + doc.getNss(), + doc.getCollectionUuid(), + doc.getRange(), + doc.getPreMigrationChunkVersion()); + + if (doc.getDecision()) { + // The decision is already known. + coordinator.setMigrationDecision( + (*doc.getDecision()) == DecisionEnum::kCommitted + ? MigrationCoordinator::Decision::kCommitted + : MigrationCoordinator::Decision::kAborted); + coordinator.completeMigration(opCtx); + return true; + } + + // The decision is not known. Recover the decision from the config server. + + ensureChunkVersionIsGreaterThan( + opCtx, doc.getRange(), doc.getPreMigrationChunkVersion()); + + refreshFilteringMetadataUntilSuccess(opCtx, doc.getNss()); + + auto refreshedMetadata = [&] { + AutoGetCollection autoColl(opCtx, doc.getNss(), MODE_IS); + auto* const css = CollectionShardingRuntime::get(opCtx, doc.getNss()); + return css->getCurrentMetadataIfKnown(); + }(); + + if (!refreshedMetadata || !(*refreshedMetadata)->isSharded() || + !(*refreshedMetadata)->uuidMatches(doc.getCollectionUuid())) { + LOG(0) << "Even after forced refresh, filtering metadata for namespace in " + "migration coordinator doc " + << doc.toBSON() + << (!refreshedMetadata || !(*refreshedMetadata)->isSharded() + ? "is not known" + : "has UUID that does not match the collection UUID in the " + "coordinator doc") + << ". Deleting the range deletion tasks on the donor and recipient as " + "well as the migration coordinator document on this node."; + + // TODO (SERVER-45707): Test that range deletion tasks are eventually + // deleted even if the collection is dropped before migration coordination + // is resumed. + deleteRangeDeletionTaskOnRecipient( + opCtx, doc.getRecipientShardId(), doc.getId()); + deleteRangeDeletionTaskLocally(opCtx, doc.getId()); + coordinator.forgetMigration(opCtx); + return true; + } + + if ((*refreshedMetadata)->keyBelongsToMe(doc.getRange().getMin())) { + coordinator.setMigrationDecision(MigrationCoordinator::Decision::kAborted); + } else { + coordinator.setMigrationDecision(MigrationCoordinator::Decision::kCommitted); + } + coordinator.completeMigration(opCtx); + return true; + }); + } catch (const DBException& ex) { + LOG(0) << "Failed to resume coordinating migrations on stepup " + << causedBy(ex.toStatus()); + } + }); +} + } // namespace migrationutil } // namespace mongo diff --git a/src/mongo/db/s/migration_util.h b/src/mongo/db/s/migration_util.h index 2b7de1b39b7..3ba16bf3ffe 100644 --- a/src/mongo/db/s/migration_util.h +++ b/src/mongo/db/s/migration_util.h @@ -180,5 +180,26 @@ void markAsReadyRangeDeletionTaskOnRecipient(OperationContext* opCtx, * config.migrationCoordinators without waiting for majority writeConcern. */ void deleteMigrationCoordinatorDocumentLocally(OperationContext* opCtx, const UUID& migrationId); + +/** + * Sends _configsvrEnsureChunkVersionIsGreaterThan for the range and preMigrationChunkVersion until + * hearing success or the node steps down or shuts down. + */ +void ensureChunkVersionIsGreaterThan(OperationContext* opCtx, + const ChunkRange& range, + const ChunkVersion& preMigrationChunkVersion); + +/** + * Forces a filtering metadata refresh of the namespace until the refresh succeeds or the node + * steps down or shuts down. + */ +void refreshFilteringMetadataUntilSuccess(OperationContext* opCtx, const NamespaceString& nss); + +/** + * Submits an asynchronous task to scan config.migrationCoordinators and drive each unfinished + * migration coordination to completion. + */ +void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext); + } // namespace migrationutil } // namespace mongo diff --git a/src/mongo/s/request_types/move_chunk_request.h b/src/mongo/s/request_types/move_chunk_request.h index f92d1ec61e1..e11eec2dd1e 100644 --- a/src/mongo/s/request_types/move_chunk_request.h +++ b/src/mongo/s/request_types/move_chunk_request.h @@ -103,6 +103,10 @@ public: return _toShardId; } + const ChunkRange& getRange() const { + return _range; + } + const BSONObj& getMinKey() const { return _range.getMin(); } |