summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2020-01-24 02:34:42 +0000
committerevergreen <evergreen@mongodb.com>2020-01-24 02:34:42 +0000
commite791a2ea966bb302ff180dd4538d87c078e74747 (patch)
tree42f37fe0222fcf47acb5bd92f5d451799f193b83
parent281973f2c819cd8dcca2b10c0637091ffdcff2c3 (diff)
downloadmongo-e791a2ea966bb302ff180dd4538d87c078e74747.tar.gz
SERVER-44162 Resume coordinating active migrations on stepup
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml1
-rw-r--r--jstests/sharding/migration_coordinator_failover.js155
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp4
-rw-r--r--src/mongo/db/s/migration_coordinator.cpp23
-rw-r--r--src/mongo/db/s/migration_coordinator.h10
-rw-r--r--src/mongo/db/s/migration_coordinator_document.idl12
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp62
-rw-r--r--src/mongo/db/s/migration_util.cpp177
-rw-r--r--src/mongo/db/s/migration_util.h21
-rw-r--r--src/mongo/s/request_types/move_chunk_request.h4
11 files changed, 401 insertions, 69 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
index cd46d0cb144..6d5b596da65 100644
--- a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
@@ -96,6 +96,7 @@ selector:
- jstests/sharding/key_rotation.js
- jstests/sharding/keys_rotation_interval_sec.js
- jstests/sharding/migration_coordinator_basic.js # sets a failpoint on the config primary
+ - jstests/sharding/migration_coordinator_failover.js # sets a failpoint on the config primary
- jstests/sharding/move_chunk_find_and_modify_with_write_retryability.js
- jstests/sharding/move_chunk_insert_with_write_retryability.js
- jstests/sharding/move_chunk_remove_with_write_retryability.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index b1b648d8f36..ec758617272 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -22,6 +22,7 @@ selector:
- jstests/sharding/out_fails_to_replace_sharded_collection.js
- jstests/sharding/merge_from_stale_mongos.js
- jstests/sharding/migration_coordinator_basic.js
+ - jstests/sharding/migration_coordinator_failover.js
- jstests/sharding/count1.js
# Enable when SERVER-44733 is backported
- jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js
diff --git a/jstests/sharding/migration_coordinator_failover.js b/jstests/sharding/migration_coordinator_failover.js
new file mode 100644
index 00000000000..ea690027d2a
--- /dev/null
+++ b/jstests/sharding/migration_coordinator_failover.js
@@ -0,0 +1,155 @@
+/**
+ * Tests that a donor resumes coordinating a migration if it fails over after creating the
+ * migration coordinator document but before deleting it.
+ *
+ * @tags: [requires_fcv_44]
+ */
+
+// This test induces failovers on shards.
+TestData.skipCheckingUUIDsConsistentAcrossCluster = true;
+
+(function() {
+'use strict';
+
+load("jstests/libs/fail_point_util.js");
+load('jstests/libs/parallel_shell_helpers.js');
+
+function getNewNs(dbName) {
+ if (typeof getNewNs.counter == 'undefined') {
+ getNewNs.counter = 0;
+ }
+ getNewNs.counter++;
+ const collName = "ns" + getNewNs.counter;
+ return [collName, dbName + "." + collName];
+}
+
+const dbName = "test";
+
+var st = new ShardingTest({shards: 2, rs: {nodes: 2}});
+
+assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: st.shard0.shardName}));
+
+function runMoveChunkMakeDonorStepDownAfterFailpoint(
+ failpointName, shouldMakeMigrationFailToCommitOnConfig, expectAbortDecisionWithCode) {
+ const [collName, ns] = getNewNs(dbName);
+ jsTest.log("Running migration, making donor step down after failpoint " + failpointName +
+ "; shouldMakeMigrationFailToCommitOnConfig is " +
+ shouldMakeMigrationFailToCommitOnConfig + "; expectAbortDecisionWithCode is " +
+ expectAbortDecisionWithCode + "; ns is " + ns);
+
+ // Wait for mongos to see a primary node on the primary shard, because mongos does not retry
+ // writes on NotMaster errors, and we are about to insert docs through mongos.
+ awaitRSClientHosts(st.s, st.rs0.getPrimary(), {ok: true, ismaster: true});
+
+ // Insert some docs into the collection so that the migration leaves orphans on either the
+ // donor or recipient, depending on the decision.
+ const numDocs = 1000;
+ var bulk = st.s.getDB(dbName).getCollection(collName).initializeUnorderedBulkOp();
+ for (var i = 0; i < numDocs; i++) {
+ bulk.insert({_id: i});
+ }
+ assert.commandWorked(bulk.execute());
+
+ // Shard the collection.
+ assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}}));
+
+ if (shouldMakeMigrationFailToCommitOnConfig) {
+ // Turn on a failpoint to make the migration commit fail on the config server.
+ assert.commandWorked(st.configRS.getPrimary().adminCommand(
+ {configureFailPoint: "migrationCommitVersionError", mode: "alwaysOn"}));
+ }
+
+ jsTest.log("Run the moveChunk asynchronously and wait for " + failpointName + " to be hit.");
+ let failpointHandle = configureFailPoint(st.rs0.getPrimary(), failpointName);
+ const awaitResult = startParallelShell(
+ funWithArgs(function(ns, toShardName, expectAbortDecisionWithCode) {
+ if (expectAbortDecisionWithCode) {
+ assert.commandFailedWithCode(
+ db.adminCommand({moveChunk: ns, find: {_id: 0}, to: toShardName}),
+ expectAbortDecisionWithCode);
+ } else {
+ assert.commandWorked(
+ db.adminCommand({moveChunk: ns, find: {_id: 0}, to: toShardName}));
+ }
+ }, ns, st.shard1.shardName, expectAbortDecisionWithCode), st.s.port);
+ failpointHandle.wait();
+
+ jsTest.log("Make the donor primary step down.");
+ assert.commandWorked(
+ st.rs0.getPrimary().adminCommand({replSetStepDown: 10 /* stepDownSecs */, force: true}));
+ failpointHandle.off();
+
+ jsTest.log("Allow the moveChunk to finish.");
+ awaitResult();
+
+ if (expectAbortDecisionWithCode) {
+ jsTest.log("Expect abort decision, so wait for recipient to clean up the orphans.");
+ assert.soon(() => {
+ return 0 === st.rs1.getPrimary().getDB(dbName).getCollection(collName).count();
+ });
+
+ } else {
+ jsTest.log("Expect commit decision, so wait for donor to clean up the orphans.");
+ assert.soon(() => {
+ return 0 === st.rs0.getPrimary().getDB(dbName).getCollection(collName).count();
+ });
+ }
+
+ // The data should still be present on the shard that owns the chunk.
+ assert.eq(numDocs, st.s.getDB(dbName).getCollection(collName).count());
+
+ jsTest.log("Wait for the donor to delete the migration coordinator doc");
+ assert.soon(() => {
+ return 0 ===
+ st.rs0.getPrimary().getDB("config").getCollection("migrationCoordinators").count();
+ });
+
+ if (shouldMakeMigrationFailToCommitOnConfig) {
+ // Turn off the failpoint on the config server before returning.
+ assert.commandWorked(st.configRS.getPrimary().adminCommand(
+ {configureFailPoint: "migrationCommitVersionError", mode: "off"}));
+ }
+}
+
+//
+// Decision is commit
+//
+
+runMoveChunkMakeDonorStepDownAfterFailpoint("hangBeforeMakingCommitDecisionDurable",
+ false /* shouldMakeMigrationFailToCommitOnConfig */);
+runMoveChunkMakeDonorStepDownAfterFailpoint("hangBeforeSendingCommitDecision",
+ false /* shouldMakeMigrationFailToCommitOnConfig */);
+runMoveChunkMakeDonorStepDownAfterFailpoint("hangBeforeForgettingMigrationAfterCommitDecision",
+ false /* shouldMakeMigrationFailToCommitOnConfig */);
+
+//
+// Decision is abort
+//
+
+runMoveChunkMakeDonorStepDownAfterFailpoint("moveChunkHangAtStep3",
+ false /* shouldMakeMigrationFailToCommitOnConfig */,
+ ErrorCodes.OperationFailed);
+
+runMoveChunkMakeDonorStepDownAfterFailpoint("moveChunkHangAtStep4",
+ false /* shouldMakeMigrationFailToCommitOnConfig */,
+ ErrorCodes.OperationFailed);
+
+runMoveChunkMakeDonorStepDownAfterFailpoint("moveChunkHangAtStep5",
+ false /* shouldMakeMigrationFailToCommitOnConfig */,
+ ErrorCodes.OperationFailed);
+
+runMoveChunkMakeDonorStepDownAfterFailpoint("hangBeforeMakingAbortDecisionDurable",
+ true /* shouldMakeMigrationFailToCommitOnConfig */,
+ ErrorCodes.StaleEpoch);
+
+runMoveChunkMakeDonorStepDownAfterFailpoint("hangBeforeSendingAbortDecision",
+ true /* shouldMakeMigrationFailToCommitOnConfig */,
+ ErrorCodes.StaleEpoch);
+
+runMoveChunkMakeDonorStepDownAfterFailpoint("hangBeforeForgettingMigrationAfterAbortDecision",
+ true /* shouldMakeMigrationFailToCommitOnConfig */,
+ ErrorCodes.StaleEpoch);
+
+st.stop();
+})();
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 7e8fb6ba89a..6db23585c2d 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -819,7 +819,11 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
PeriodicBalancerConfigRefresher::get(_service).onStepUp(_service);
TransactionCoordinatorService::get(_service)->onStepUp(opCtx);
+ // Note, these must be done after the configOpTime is recovered via
+ // ShardingStateRecovery::recover above, because they may trigger filtering metadata
+ // refreshes which should use the recovered configOpTime.
migrationutil::resubmitRangeDeletionsOnStepUp(_service);
+ migrationutil::resumeMigrationCoordinationsOnStepUp(_service);
} else { // unsharded
if (auto validator = LogicalTimeValidator::get(_service)) {
diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp
index 38e2d47b5c4..038b23318a2 100644
--- a/src/mongo/db/s/migration_coordinator.cpp
+++ b/src/mongo/db/s/migration_coordinator.cpp
@@ -42,6 +42,15 @@ namespace mongo {
MONGO_FAIL_POINT_DEFINE(disableWritingPendingRangeDeletionEntries);
+MONGO_FAIL_POINT_DEFINE(hangBeforeMakingCommitDecisionDurable);
+MONGO_FAIL_POINT_DEFINE(hangBeforeMakingAbortDecisionDurable);
+
+MONGO_FAIL_POINT_DEFINE(hangBeforeSendingCommitDecision);
+MONGO_FAIL_POINT_DEFINE(hangBeforeSendingAbortDecision);
+
+MONGO_FAIL_POINT_DEFINE(hangBeforeForgettingMigrationAfterCommitDecision);
+MONGO_FAIL_POINT_DEFINE(hangBeforeForgettingMigrationAfterAbortDecision);
+
namespace migrationutil {
MigrationCoordinator::MigrationCoordinator(UUID migrationId,
@@ -103,18 +112,24 @@ void MigrationCoordinator::completeMigration(OperationContext* opCtx) {
switch (*_decision) {
case Decision::kAborted:
_abortMigrationOnDonorAndRecipient(opCtx);
+ hangBeforeForgettingMigrationAfterAbortDecision.pauseWhileSet();
break;
case Decision::kCommitted:
_commitMigrationOnDonorAndRecipient(opCtx);
+ hangBeforeForgettingMigrationAfterCommitDecision.pauseWhileSet();
break;
}
- _forgetMigration(opCtx);
+ forgetMigration(opCtx);
}
void MigrationCoordinator::_commitMigrationOnDonorAndRecipient(OperationContext* opCtx) {
+ hangBeforeMakingCommitDecisionDurable.pauseWhileSet();
+
LOG(0) << _logPrefix() << "Making commit decision durable";
migrationutil::persistCommitDecision(opCtx, _migrationInfo.getId());
+ hangBeforeSendingCommitDecision.pauseWhileSet();
+
LOG(0) << _logPrefix() << "Deleting range deletion task on recipient";
migrationutil::deleteRangeDeletionTaskOnRecipient(
opCtx, _migrationInfo.getRecipientShardId(), _migrationInfo.getId());
@@ -124,9 +139,13 @@ void MigrationCoordinator::_commitMigrationOnDonorAndRecipient(OperationContext*
}
void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext* opCtx) {
+ hangBeforeMakingAbortDecisionDurable.pauseWhileSet();
+
LOG(0) << _logPrefix() << "Making abort decision durable";
migrationutil::persistAbortDecision(opCtx, _migrationInfo.getId());
+ hangBeforeSendingAbortDecision.pauseWhileSet();
+
LOG(0) << _logPrefix() << "Deleting range deletion task on donor";
migrationutil::deleteRangeDeletionTaskLocally(opCtx, _migrationInfo.getId());
@@ -135,7 +154,7 @@ void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext*
opCtx, _migrationInfo.getRecipientShardId(), _migrationInfo.getId());
}
-void MigrationCoordinator::_forgetMigration(OperationContext* opCtx) {
+void MigrationCoordinator::forgetMigration(OperationContext* opCtx) {
LOG(0) << _logPrefix() << "Deleting migration coordinator document";
migrationutil::deleteMigrationCoordinatorDocumentLocally(opCtx, _migrationInfo.getId());
}
diff --git a/src/mongo/db/s/migration_coordinator.h b/src/mongo/db/s/migration_coordinator.h
index a4f89ecbbc7..1f64e5af3a5 100644
--- a/src/mongo/db/s/migration_coordinator.h
+++ b/src/mongo/db/s/migration_coordinator.h
@@ -82,6 +82,11 @@ public:
*/
void completeMigration(OperationContext* opCtx);
+ /**
+ * Deletes the persistent state for this migration from config.migrationCoordinators.
+ */
+ void forgetMigration(OperationContext* opCtx);
+
private:
/**
* Deletes the range deletion task from the recipient node and marks the range deletion task on
@@ -95,11 +100,6 @@ private:
*/
void _abortMigrationOnDonorAndRecipient(OperationContext* opCtx);
- /**
- * Deletes the persistent state for this migration from config.migrationCoordinators.
- */
- void _forgetMigration(OperationContext* opCtx);
-
// The decision of the migration commit against the config server.
boost::optional<Decision> _decision;
diff --git a/src/mongo/db/s/migration_coordinator_document.idl b/src/mongo/db/s/migration_coordinator_document.idl
index 9e1310fbf79..c1001c0c748 100644
--- a/src/mongo/db/s/migration_coordinator_document.idl
+++ b/src/mongo/db/s/migration_coordinator_document.idl
@@ -38,6 +38,14 @@ imports:
- "mongo/s/chunk_range.idl"
- "mongo/s/chunk_version.idl"
+enums:
+ Decision:
+ description: "Whether the migration committed or aborted."
+ type: string
+ values:
+ kCommitted: "committed"
+ kAborted: "aborted"
+
structs:
migrationCoordinatorDocument:
description: "Represents an in-progress migration on the migration donor."
@@ -67,6 +75,6 @@ structs:
type: ChunkVersion
description: "The version, at the start of the migration, of the chunk being moved."
decision:
- type: string
- description: "Whether the migration committed or aborted."
+ type: Decision
+ description: "Enumeration that defines whether the migration committed or aborted."
optional: true
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 85479a801a0..728366a814c 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -60,7 +60,6 @@
#include "mongo/s/catalog_cache_loader.h"
#include "mongo/s/grid.h"
#include "mongo/s/request_types/commit_chunk_migration_request_type.h"
-#include "mongo/s/request_types/ensure_chunk_version_is_greater_than_gen.h"
#include "mongo/s/request_types/set_shard_version_request.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/util/duration.h"
@@ -469,48 +468,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig() {
if (!migrationCommitStatus.isOK()) {
if (_useFCV44Protocol) {
- // Send _configsvrEnsureShardVersionIsGreaterThan until hearing success to ensure
- // that if the migration commit has not occurred yet, it will never occur. This
- // makes it safe for the shard to refresh to find out if the migration commit
- // succeeded.
-
- ConfigsvrEnsureChunkVersionIsGreaterThan ensureChunkVersionIsGreaterThanRequest;
- ensureChunkVersionIsGreaterThanRequest.setDbName(NamespaceString::kAdminDb);
- ensureChunkVersionIsGreaterThanRequest.setMinKey(_args.getMinKey());
- ensureChunkVersionIsGreaterThanRequest.setMaxKey(_args.getMaxKey());
- ensureChunkVersionIsGreaterThanRequest.setVersion(_chunkVersion);
- const auto ensureChunkVersionIsGreaterThanRequestBSON =
- ensureChunkVersionIsGreaterThanRequest.toBSON({});
-
- for (int attempts = 1;; attempts++) {
- const auto ensureChunkVersionIsGreaterThanResponse =
- Grid::get(_opCtx)
- ->shardRegistry()
- ->getConfigShard()
- ->runCommandWithFixedRetryAttempts(
- _opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- "admin",
- ensureChunkVersionIsGreaterThanRequestBSON,
- Shard::RetryPolicy::kIdempotent);
- const auto ensureChunkVersionIsGreaterThanStatus =
- Shard::CommandResponse::getEffectiveStatus(
- ensureChunkVersionIsGreaterThanResponse);
- if (ensureChunkVersionIsGreaterThanStatus.isOK()) {
- break;
- }
-
- // If the server is already doing a clean shutdown, join the shutdown. This
- // prevents the cleanup logic from running if the node is shutting down.
- if (globalInShutdownDeprecated()) {
- shutdown(waitForShutdown());
- }
- _opCtx->checkForInterrupt();
-
- LOG(0) << "_configsvrEnsureChunkVersionIsGreaterThan failed after " << attempts
- << " attempts " << causedBy(ensureChunkVersionIsGreaterThanStatus)
- << " . Will try again.";
- }
+ migrationutil::ensureChunkVersionIsGreaterThan(_opCtx, _args.getRange(), _chunkVersion);
} else {
// This is the FCV 4.2 and below protocol.
@@ -571,23 +529,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig() {
}
// Incrementally refresh the metadata before leaving the critical section.
- for (int attempts = 1;; attempts++) {
- try {
- forceShardFilteringMetadataRefresh(_opCtx, getNss(), true);
- break;
- } catch (const DBException& ex) {
- // If the server is already doing a clean shutdown, join the shutdown. This prevents the
- // cleanup logic from running if the node is shutting down.
- if (globalInShutdownDeprecated()) {
- shutdown(waitForShutdown());
- }
- _opCtx->checkForInterrupt();
-
- log() << "Failed to refresh metadata after " << attempts << " attempts, after a "
- << (migrationCommitStatus.isOK() ? "failed commit attempt" : "successful commit")
- << causedBy(redact(ex.toStatus())) << ". Will try to refresh again.";
- }
- }
+ migrationutil::refreshFilteringMetadataUntilSuccess(_opCtx, getNss());
const auto refreshedMetadata = _getCurrentMetadataAndCheckEpoch();
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index 17d7c806a33..964addcd78f 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/s/active_migrations_registry.h"
#include "mongo/db/s/collection_sharding_runtime.h"
+#include "mongo/db/s/migration_coordinator.h"
#include "mongo/db/s/shard_filtering_metadata_refresh.h"
#include "mongo/db/write_concern.h"
#include "mongo/executor/task_executor_pool.h"
@@ -54,6 +55,8 @@
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/grid.h"
+#include "mongo/s/request_types/ensure_chunk_version_is_greater_than_gen.h"
+#include "mongo/util/exit.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -419,5 +422,179 @@ void deleteMigrationCoordinatorDocumentLocally(OperationContext* opCtx, const UU
QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationId),
{1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)});
}
+
+void ensureChunkVersionIsGreaterThan(OperationContext* opCtx,
+ const ChunkRange& range,
+ const ChunkVersion& preMigrationChunkVersion) {
+ ConfigsvrEnsureChunkVersionIsGreaterThan ensureChunkVersionIsGreaterThanRequest;
+ ensureChunkVersionIsGreaterThanRequest.setDbName(NamespaceString::kAdminDb);
+ ensureChunkVersionIsGreaterThanRequest.setMinKey(range.getMin());
+ ensureChunkVersionIsGreaterThanRequest.setMaxKey(range.getMax());
+ ensureChunkVersionIsGreaterThanRequest.setVersion(preMigrationChunkVersion);
+ const auto ensureChunkVersionIsGreaterThanRequestBSON =
+ ensureChunkVersionIsGreaterThanRequest.toBSON({});
+
+ for (int attempts = 1;; attempts++) {
+ const auto ensureChunkVersionIsGreaterThanResponse =
+ Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ ensureChunkVersionIsGreaterThanRequestBSON,
+ Shard::RetryPolicy::kIdempotent);
+ const auto ensureChunkVersionIsGreaterThanStatus =
+ Shard::CommandResponse::getEffectiveStatus(ensureChunkVersionIsGreaterThanResponse);
+ if (ensureChunkVersionIsGreaterThanStatus.isOK()) {
+ break;
+ }
+
+ // If the server is already doing a clean shutdown, join the shutdown.
+ if (globalInShutdownDeprecated()) {
+ shutdown(waitForShutdown());
+ }
+ opCtx->checkForInterrupt();
+
+ LOG(0) << "_configsvrEnsureChunkVersionIsGreaterThan failed after " << attempts
+ << " attempts " << causedBy(ensureChunkVersionIsGreaterThanStatus)
+ << " . Will try again.";
+ }
+}
+
+void refreshFilteringMetadataUntilSuccess(OperationContext* opCtx, const NamespaceString& nss) {
+ for (int attempts = 1;; attempts++) {
+ try {
+ forceShardFilteringMetadataRefresh(opCtx, nss, true);
+ break;
+ } catch (const DBException& ex) {
+ // If the server is already doing a clean shutdown, join the shutdown.
+ if (globalInShutdownDeprecated()) {
+ shutdown(waitForShutdown());
+ }
+ opCtx->checkForInterrupt();
+
+ LOG(0) << "Failed to refresh metadata for " << nss.ns() << " after " << attempts
+ << " attempts " << causedBy(redact(ex.toStatus()))
+ << ". Will try to refresh again.";
+ }
+ }
+}
+
+void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) {
+ LOG(0) << "Starting migration coordinator stepup recovery thread.";
+
+ auto executor = Grid::get(serviceContext)->getExecutorPool()->getFixedExecutor();
+ ExecutorFuture<void>(executor).getAsync([serviceContext](const Status& status) {
+ try {
+ ThreadClient tc("MigrationCoordinatorStepupRecovery", serviceContext);
+ {
+ stdx::lock_guard<Client> lk(*tc.get());
+ tc->setSystemOperationKillable(lk);
+ }
+
+ auto uniqueOpCtx = tc->makeOperationContext();
+ auto opCtx = uniqueOpCtx.get();
+
+ // Wait for the latest OpTime to be majority committed to ensure any decision that is
+ // read is on the true branch of history.
+ // Note (Esha): I don't think this is strictly required for correctness, but it is
+ // is difficult to reason about, and being pessimistic by waiting for the decision to be
+ // majority committed does not cost much, since stepup should be rare. It *is* required
+ // that this node ensure a decision that it itself recovers is majority committed. For
+ // example, it is possible that this node is a stale primary, and the true primary has
+ // already sent a *commit* decision and re-received a chunk containing the minKey of
+ // this migration. In this case, this node would see that the minKey is still owned and
+ // assume the migration *aborted*. If this node communicated the abort decision to the
+ // recipient, the recipient (if it had not heard the decision yet) would delete data
+ // that the recipient actually owns. (The recipient does not currently wait to hear the
+ // range deletion decision for the first migration before being able to donate (any
+ // part of) the chunk again.)
+ auto& replClientInfo = repl::ReplClientInfo::forClient(opCtx->getClient());
+ replClientInfo.setLastOpToSystemLastOpTime(opCtx);
+ const auto lastOpTime = replClientInfo.getLastOp();
+ LOG(0) << "Waiting for OpTime " << lastOpTime << " to become majority committed";
+ WriteConcernResult unusedWCResult;
+ uassertStatusOK(
+ waitForWriteConcern(opCtx,
+ lastOpTime,
+ WriteConcernOptions{WriteConcernOptions::kMajority,
+ WriteConcernOptions::SyncMode::UNSET,
+ WriteConcernOptions::kNoTimeout},
+ &unusedWCResult));
+
+ PersistentTaskStore<MigrationCoordinatorDocument> store(
+ opCtx, NamespaceString::kMigrationCoordinatorsNamespace);
+ Query query;
+ store.forEach(opCtx, query, [&opCtx](const MigrationCoordinatorDocument& doc) {
+ LOG(0) << "Recovering migration " << doc.toBSON();
+
+ // Create a MigrationCoordinator to complete the coordination.
+ MigrationCoordinator coordinator(doc.getId(),
+ doc.getDonorShardId(),
+ doc.getRecipientShardId(),
+ doc.getNss(),
+ doc.getCollectionUuid(),
+ doc.getRange(),
+ doc.getPreMigrationChunkVersion());
+
+ if (doc.getDecision()) {
+ // The decision is already known.
+ coordinator.setMigrationDecision(
+ (*doc.getDecision()) == DecisionEnum::kCommitted
+ ? MigrationCoordinator::Decision::kCommitted
+ : MigrationCoordinator::Decision::kAborted);
+ coordinator.completeMigration(opCtx);
+ return true;
+ }
+
+ // The decision is not known. Recover the decision from the config server.
+
+ ensureChunkVersionIsGreaterThan(
+ opCtx, doc.getRange(), doc.getPreMigrationChunkVersion());
+
+ refreshFilteringMetadataUntilSuccess(opCtx, doc.getNss());
+
+ auto refreshedMetadata = [&] {
+ AutoGetCollection autoColl(opCtx, doc.getNss(), MODE_IS);
+ auto* const css = CollectionShardingRuntime::get(opCtx, doc.getNss());
+ return css->getCurrentMetadataIfKnown();
+ }();
+
+ if (!refreshedMetadata || !(*refreshedMetadata)->isSharded() ||
+ !(*refreshedMetadata)->uuidMatches(doc.getCollectionUuid())) {
+ LOG(0) << "Even after forced refresh, filtering metadata for namespace in "
+ "migration coordinator doc "
+ << doc.toBSON()
+ << (!refreshedMetadata || !(*refreshedMetadata)->isSharded()
+ ? "is not known"
+ : "has UUID that does not match the collection UUID in the "
+ "coordinator doc")
+ << ". Deleting the range deletion tasks on the donor and recipient as "
+ "well as the migration coordinator document on this node.";
+
+ // TODO (SERVER-45707): Test that range deletion tasks are eventually
+ // deleted even if the collection is dropped before migration coordination
+ // is resumed.
+ deleteRangeDeletionTaskOnRecipient(
+ opCtx, doc.getRecipientShardId(), doc.getId());
+ deleteRangeDeletionTaskLocally(opCtx, doc.getId());
+ coordinator.forgetMigration(opCtx);
+ return true;
+ }
+
+ if ((*refreshedMetadata)->keyBelongsToMe(doc.getRange().getMin())) {
+ coordinator.setMigrationDecision(MigrationCoordinator::Decision::kAborted);
+ } else {
+ coordinator.setMigrationDecision(MigrationCoordinator::Decision::kCommitted);
+ }
+ coordinator.completeMigration(opCtx);
+ return true;
+ });
+ } catch (const DBException& ex) {
+ LOG(0) << "Failed to resume coordinating migrations on stepup "
+ << causedBy(ex.toStatus());
+ }
+ });
+}
+
} // namespace migrationutil
} // namespace mongo
diff --git a/src/mongo/db/s/migration_util.h b/src/mongo/db/s/migration_util.h
index 2b7de1b39b7..3ba16bf3ffe 100644
--- a/src/mongo/db/s/migration_util.h
+++ b/src/mongo/db/s/migration_util.h
@@ -180,5 +180,26 @@ void markAsReadyRangeDeletionTaskOnRecipient(OperationContext* opCtx,
* config.migrationCoordinators without waiting for majority writeConcern.
*/
void deleteMigrationCoordinatorDocumentLocally(OperationContext* opCtx, const UUID& migrationId);
+
+/**
+ * Sends _configsvrEnsureChunkVersionIsGreaterThan for the range and preMigrationChunkVersion until
+ * hearing success or the node steps down or shuts down.
+ */
+void ensureChunkVersionIsGreaterThan(OperationContext* opCtx,
+ const ChunkRange& range,
+ const ChunkVersion& preMigrationChunkVersion);
+
+/**
+ * Forces a filtering metadata refresh of the namespace until the refresh succeeds or the node
+ * steps down or shuts down.
+ */
+void refreshFilteringMetadataUntilSuccess(OperationContext* opCtx, const NamespaceString& nss);
+
+/**
+ * Submits an asynchronous task to scan config.migrationCoordinators and drive each unfinished
+ * migration coordination to completion.
+ */
+void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext);
+
} // namespace migrationutil
} // namespace mongo
diff --git a/src/mongo/s/request_types/move_chunk_request.h b/src/mongo/s/request_types/move_chunk_request.h
index f92d1ec61e1..e11eec2dd1e 100644
--- a/src/mongo/s/request_types/move_chunk_request.h
+++ b/src/mongo/s/request_types/move_chunk_request.h
@@ -103,6 +103,10 @@ public:
return _toShardId;
}
+ const ChunkRange& getRange() const {
+ return _range;
+ }
+
const BSONObj& getMinKey() const {
return _range.getMin();
}