summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaolo Polato <paolo.polato@mongodb.com>2021-07-13 14:12:47 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-07-21 07:59:33 +0000
commitd735ffcac376e514f266a315f6405976d603aef6 (patch)
tree5d667b97ff35e5ce01ac4e3b5df8478e322c2e3a
parent03c49a3d3b228c68a2a369bd7a4d8f5588c72d09 (diff)
downloadmongo-d735ffcac376e514f266a315f6405976d603aef6.tar.gz
SERVER-52906 Applying metadata update before dispatching rollback task in moveChunk (BACKPORT-9612)
-rw-r--r--etc/backports_required_for_multiversion_tests.yml18
-rw-r--r--jstests/libs/chunk_manipulation_util.js17
-rw-r--r--jstests/sharding/migration_ignore_interrupts_1.js6
-rw-r--r--jstests/sharding/migration_ignore_interrupts_3.js12
-rw-r--r--jstests/sharding/migration_waits_for_majority_commit.js6
-rw-r--r--jstests/sharding/movechunk_interrupt_at_primary_stepdown.js6
-rw-r--r--jstests/sharding/movechunk_parallel.js12
-rw-r--r--jstests/sharding/txn_writes_during_movechunk.js6
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp192
9 files changed, 158 insertions, 117 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml
index d4f3ac4d156..572e3d98a42 100644
--- a/etc/backports_required_for_multiversion_tests.yml
+++ b/etc/backports_required_for_multiversion_tests.yml
@@ -141,6 +141,24 @@ all:
test_file: jstests/concurrency/fsm_workloads/findAndModify_flip_location.js
- ticket: SERVER-57476
test_file: jstests/replsets/assert_on_prepare_conflict_with_hole.js
+ - ticket: SERVER-52906
+ test_file: jstests/sharding/prepare_transaction_then_migrate.js
+ - ticket: SERVER-52906
+ test_file: jstests/sharding/migration_waits_for_majority_commit.js
+ - ticket: SERVER-52906
+ test_file: jstests/sharding/migration_ignore_interrupts_1.js
+ - ticket: SERVER-52906
+ test_file: jstests/sharding/migration_ignore_interrupts_3.js
+ - ticket: SERVER-52906
+ test_file: jstests/sharding/movechunk_interrupt_at_primary_stepdown.js
+ - ticket: SERVER-52906
+ test_file: jstests/sharding/movechunk_parallel.js
+ - ticket: SERVER-52906
+ test_file: jstests/sharding/txn_writes_during_movechunk.js
+ - ticket: SERVER-52906
+ test_file: jstests/sharding/cleanup_orphaned_cmd_during_movechunk.js
+ - ticket: SERVER-52906
+ test_file: jstests/sharding/cleanup_orphaned_cmd_during_movechunk_hashed.js
suites:
diff --git a/jstests/libs/chunk_manipulation_util.js b/jstests/libs/chunk_manipulation_util.js
index 94a9f786b19..fadf461f830 100644
--- a/jstests/libs/chunk_manipulation_util.js
+++ b/jstests/libs/chunk_manipulation_util.js
@@ -127,9 +127,9 @@ function configureMoveChunkFailPoint(shardConnection, stepNumber, mode) {
function waitForMoveChunkStep(shardConnection, stepNumber) {
var searchString = 'step ' + stepNumber, admin = shardConnection.getDB('admin');
- assert.between(migrateStepNames.deletedPriorDataInRange,
+ assert.between(moveChunkStepNames.parsedOptions,
stepNumber,
- migrateStepNames.done,
+ moveChunkStepNames.committed,
"incorrect stepNumber",
true);
@@ -162,14 +162,15 @@ function waitForMoveChunkStep(shardConnection, stepNumber) {
var migrateStepNames = {
deletedPriorDataInRange: 1,
copiedIndexes: 2,
- cloned: 3,
- catchup: 4, // About to enter steady state.
- steady: 5,
- done: 6
+ rangeDeletionTaskScheduled: 3,
+ cloned: 4,
+ catchup: 5, // About to enter steady state.
+ steady: 6,
+ done: 7
};
//
-// Configure a failpoint to make migration thread hang at a step (1 through 5).
+// Configure a failpoint to make migration thread hang at a step (1 through 7).
//
function pauseMigrateAtStep(shardConnection, stepNumber) {
configureMigrateFailPoint(shardConnection, stepNumber, 'alwaysOn');
@@ -205,7 +206,7 @@ function configureMigrateFailPoint(shardConnection, stepNumber, mode) {
}
//
-// Wait for moveChunk to reach a step (1 through 6).
+// Wait for moveChunk to reach a step (1 through 7).
//
function waitForMigrateStep(shardConnection, stepNumber) {
var searchString = 'step ' + stepNumber, admin = shardConnection.getDB('admin');
diff --git a/jstests/sharding/migration_ignore_interrupts_1.js b/jstests/sharding/migration_ignore_interrupts_1.js
index d093f0a8f0b..b0a3db9d9e9 100644
--- a/jstests/sharding/migration_ignore_interrupts_1.js
+++ b/jstests/sharding/migration_ignore_interrupts_1.js
@@ -43,10 +43,10 @@ assert.commandWorked(admin.runCommand(
jsTest.log("Set up complete, now proceeding to test that migration interruptions fail.");
// Start a migration between shard0 and shard1 on coll1 and then pause it
-pauseMigrateAtStep(shard1, migrateStepNames.deletedPriorDataInRange);
+pauseMigrateAtStep(shard1, migrateStepNames.rangeDeletionTaskScheduled);
var joinMoveChunk = moveChunkParallel(
staticMongod, st.s0.host, {a: 0}, null, coll1.getFullName(), st.shard1.shardName);
-waitForMigrateStep(shard1, migrateStepNames.deletedPriorDataInRange);
+waitForMigrateStep(shard1, migrateStepNames.rangeDeletionTaskScheduled);
assert.commandFailedWithCode(
admin.runCommand({moveChunk: ns1, find: {a: -10}, to: st.shard2.shardName}),
@@ -64,7 +64,7 @@ assert.commandFailedWithCode(
"(3) A shard should not be able to be both a donor and recipient of migrations.");
// Finish migration
-unpauseMigrateAtStep(shard1, migrateStepNames.deletedPriorDataInRange);
+unpauseMigrateAtStep(shard1, migrateStepNames.rangeDeletionTaskScheduled);
assert.doesNotThrow(function() {
joinMoveChunk();
});
diff --git a/jstests/sharding/migration_ignore_interrupts_3.js b/jstests/sharding/migration_ignore_interrupts_3.js
index a8138604df2..bea7864275a 100644
--- a/jstests/sharding/migration_ignore_interrupts_3.js
+++ b/jstests/sharding/migration_ignore_interrupts_3.js
@@ -54,11 +54,11 @@ jsTest.log("Set up complete, now proceeding to test that migration interruption
// Start coll1 migration to shard1: pause recipient after delete step, donor before interrupt
// check.
-pauseMigrateAtStep(shard1, migrateStepNames.deletedPriorDataInRange);
+pauseMigrateAtStep(shard1, migrateStepNames.rangeDeletionTaskScheduled);
pauseMoveChunkAtStep(shard0, moveChunkStepNames.startedMoveChunk);
const joinMoveChunk = moveChunkParallel(
staticMongod, st.s0.host, {a: 0}, null, coll1.getFullName(), st.shard1.shardName);
-waitForMigrateStep(shard1, migrateStepNames.deletedPriorDataInRange);
+waitForMigrateStep(shard1, migrateStepNames.rangeDeletionTaskScheduled);
// Abort migration on donor side, recipient is unaware.
killRunningMoveChunk(admin);
@@ -69,13 +69,13 @@ assert.throws(function() {
});
// Start coll2 migration to shard2, pause recipient after delete step.
-pauseMigrateAtStep(shard2, migrateStepNames.deletedPriorDataInRange);
+pauseMigrateAtStep(shard2, migrateStepNames.rangeDeletionTaskScheduled);
const joinMoveChunk2 = moveChunkParallel(
staticMongod, st.s0.host, {a: 0}, null, coll2.getFullName(), st.shard2.shardName);
-waitForMigrateStep(shard2, migrateStepNames.deletedPriorDataInRange);
+waitForMigrateStep(shard2, migrateStepNames.rangeDeletionTaskScheduled);
jsTest.log('Releasing coll1 migration recipient, whose clone command should fail....');
-unpauseMigrateAtStep(shard1, migrateStepNames.deletedPriorDataInRange);
+unpauseMigrateAtStep(shard1, migrateStepNames.rangeDeletionTaskScheduled);
assert.soon(function() {
// Wait for the destination shard to report that it is not in an active migration.
var res = shard1.adminCommand({'_recvChunkStatus': 1});
@@ -86,7 +86,7 @@ assert.eq(
0, shard1Coll1.find().itcount(), "shard1 cloned documents despite donor migration abortion.");
jsTest.log('Finishing coll2 migration, which should succeed....');
-unpauseMigrateAtStep(shard2, migrateStepNames.deletedPriorDataInRange);
+unpauseMigrateAtStep(shard2, migrateStepNames.rangeDeletionTaskScheduled);
assert.doesNotThrow(function() {
joinMoveChunk2();
});
diff --git a/jstests/sharding/migration_waits_for_majority_commit.js b/jstests/sharding/migration_waits_for_majority_commit.js
index 6581a8da592..eea1592b586 100644
--- a/jstests/sharding/migration_waits_for_majority_commit.js
+++ b/jstests/sharding/migration_waits_for_majority_commit.js
@@ -24,7 +24,7 @@ assert.commandWorked(st.s.adminCommand({split: "test.foo", middle: {_id: 0}}));
assert.eq(1, testDB.foo.find().readConcern("majority").itcount());
// Advance a migration to the beginning of the cloning phase.
-pauseMigrateAtStep(st.rs1.getPrimary(), 2);
+pauseMigrateAtStep(st.rs1.getPrimary(), migrateStepNames.rangeDeletionTaskScheduled);
// For startParallelOps to write its state
let staticMongod = MongoRunner.runMongod({});
@@ -39,7 +39,7 @@ let awaitMigration = moveChunkParallel(staticMongod,
// Wait for the migration to reach the failpoint and allow any writes to become majority committed
// before pausing replication.
-waitForMigrateStep(st.rs1.getPrimary(), 2);
+waitForMigrateStep(st.rs1.getPrimary(), migrateStepNames.rangeDeletionTaskScheduled);
st.rs1.awaitLastOpCommitted();
// Disable replication on the recipient shard's secondary node, so the recipient shard's majority
@@ -50,7 +50,7 @@ assert.commandWorked(
"failed to enable fail point on secondary");
// Allow the migration to begin cloning.
-unpauseMigrateAtStep(st.rs1.getPrimary(), 2);
+unpauseMigrateAtStep(st.rs1.getPrimary(), migrateStepNames.rangeDeletionTaskScheduled);
// The migration should fail to commit without being able to advance the majority commit point.
if (jsTestOptions().mongosBinVersion == "last-stable") {
diff --git a/jstests/sharding/movechunk_interrupt_at_primary_stepdown.js b/jstests/sharding/movechunk_interrupt_at_primary_stepdown.js
index 463f67008d4..a22a9993d35 100644
--- a/jstests/sharding/movechunk_interrupt_at_primary_stepdown.js
+++ b/jstests/sharding/movechunk_interrupt_at_primary_stepdown.js
@@ -29,7 +29,7 @@ var coll = mongos.getDB('TestDB').TestColl;
var staticMongod = MongoRunner.runMongod({});
function interruptMoveChunkAndRecover(fromShard, toShard, isJumbo) {
- pauseMigrateAtStep(toShard, migrateStepNames.deletedPriorDataInRange);
+ pauseMigrateAtStep(toShard, migrateStepNames.rangeDeletionTaskScheduled);
var joinMoveChunk = moveChunkParallel(staticMongod,
mongos.host,
@@ -39,7 +39,7 @@ function interruptMoveChunkAndRecover(fromShard, toShard, isJumbo) {
toShard.shardName,
true /* expectSuccess */,
isJumbo);
- waitForMigrateStep(toShard, migrateStepNames.deletedPriorDataInRange);
+ waitForMigrateStep(toShard, migrateStepNames.rangeDeletionTaskScheduled);
// Stepdown the primary in order to force the balancer to stop. Use a timeout of 5 seconds for
// both step down operations, because mongos will retry to find the CSRS primary for up to 20
@@ -65,7 +65,7 @@ function interruptMoveChunkAndRecover(fromShard, toShard, isJumbo) {
// Ensure a new primary is found promptly
st.configRS.getPrimary(30000);
- unpauseMigrateAtStep(toShard, migrateStepNames.deletedPriorDataInRange);
+ unpauseMigrateAtStep(toShard, migrateStepNames.rangeDeletionTaskScheduled);
// Ensure that migration succeeded
joinMoveChunk();
diff --git a/jstests/sharding/movechunk_parallel.js b/jstests/sharding/movechunk_parallel.js
index ca16d4caa8b..f03166a6fa7 100644
--- a/jstests/sharding/movechunk_parallel.js
+++ b/jstests/sharding/movechunk_parallel.js
@@ -41,8 +41,8 @@ assert.eq(2,
.itcount());
// Pause migrations at shards 2 and 3
-pauseMigrateAtStep(st.shard2, migrateStepNames.deletedPriorDataInRange);
-pauseMigrateAtStep(st.shard3, migrateStepNames.deletedPriorDataInRange);
+pauseMigrateAtStep(st.shard2, migrateStepNames.rangeDeletionTaskScheduled);
+pauseMigrateAtStep(st.shard3, migrateStepNames.rangeDeletionTaskScheduled);
// Both move chunk operations should proceed
var joinMoveChunk1 = moveChunkParallel(
@@ -50,11 +50,11 @@ var joinMoveChunk1 = moveChunkParallel(
var joinMoveChunk2 = moveChunkParallel(
staticMongod, st.s0.host, {Key: 30}, null, 'TestDB.TestColl', st.shard3.shardName);
-waitForMigrateStep(st.shard2, migrateStepNames.deletedPriorDataInRange);
-waitForMigrateStep(st.shard3, migrateStepNames.deletedPriorDataInRange);
+waitForMigrateStep(st.shard2, migrateStepNames.rangeDeletionTaskScheduled);
+waitForMigrateStep(st.shard3, migrateStepNames.rangeDeletionTaskScheduled);
-unpauseMigrateAtStep(st.shard2, migrateStepNames.deletedPriorDataInRange);
-unpauseMigrateAtStep(st.shard3, migrateStepNames.deletedPriorDataInRange);
+unpauseMigrateAtStep(st.shard2, migrateStepNames.rangeDeletionTaskScheduled);
+unpauseMigrateAtStep(st.shard3, migrateStepNames.rangeDeletionTaskScheduled);
joinMoveChunk1();
joinMoveChunk2();
diff --git a/jstests/sharding/txn_writes_during_movechunk.js b/jstests/sharding/txn_writes_during_movechunk.js
index 75432ed818c..66cecf302ed 100644
--- a/jstests/sharding/txn_writes_during_movechunk.js
+++ b/jstests/sharding/txn_writes_during_movechunk.js
@@ -17,12 +17,12 @@ assert.commandWorked(coll.insert({_id: 'updateMe'}));
assert.commandWorked(coll.insert({_id: 'deleteMe'}));
assert.commandWorked(coll.insert({_id: 'deleteMeUsingFindAndModify'}));
-pauseMigrateAtStep(st.shard1, migrateStepNames.deletedPriorDataInRange);
+pauseMigrateAtStep(st.shard1, migrateStepNames.rangeDeletionTaskScheduled);
let joinMoveChunk =
moveChunkParallel(staticMongod, st.s0.host, {_id: 0}, null, 'test.user', st.shard1.shardName);
-waitForMigrateStep(st.shard1, migrateStepNames.deletedPriorDataInRange);
+waitForMigrateStep(st.shard1, migrateStepNames.rangeDeletionTaskScheduled);
let session = st.s.startSession();
let sessionDB = session.getDatabase('test');
@@ -35,7 +35,7 @@ sessionColl.remove({_id: 'deleteMe'});
sessionColl.findAndModify({query: {_id: 'deleteMeUsingFindAndModify'}, remove: true});
pauseMoveChunkAtStep(st.shard0, moveChunkStepNames.reachedSteadyState);
-unpauseMigrateAtStep(st.shard1, migrateStepNames.deletedPriorDataInRange);
+unpauseMigrateAtStep(st.shard1, migrateStepNames.rangeDeletionTaskScheduled);
waitForMoveChunkStep(st.shard0, moveChunkStepNames.reachedSteadyState);
let recipientColl = st.rs1.getPrimary().getDB('test').user;
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 0f713b99f1c..b6e4b3abf1c 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -272,6 +272,8 @@ MONGO_FAIL_POINT_DEFINE(migrateThreadHangAtStep3);
MONGO_FAIL_POINT_DEFINE(migrateThreadHangAtStep4);
MONGO_FAIL_POINT_DEFINE(migrateThreadHangAtStep5);
MONGO_FAIL_POINT_DEFINE(migrateThreadHangAtStep6);
+MONGO_FAIL_POINT_DEFINE(migrateThreadHangAtStep7);
+
MONGO_FAIL_POINT_DEFINE(failMigrationOnRecipient);
MONGO_FAIL_POINT_DEFINE(failMigrationReceivedOutOfRangeOperation);
@@ -933,7 +935,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
"migrationId"_attr = _useFCV44RangeDeleterProtocol ? _migrationId->toBSON() : BSONObj());
MoveTimingHelper timing(
- outerOpCtx, "to", _nss.ns(), _min, _max, 6 /* steps */, &_errmsg, _toShard, _fromShard);
+ outerOpCtx, "to", _nss.ns(), _min, _max, 7 /* steps */, &_errmsg, _toShard, _fromShard);
const auto initialState = getState();
@@ -953,41 +955,103 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
auto fromShard =
uassertStatusOK(Grid::get(outerOpCtx)->shardRegistry()->getShard(outerOpCtx, _fromShard));
- {
- const ChunkRange range(_min, _max);
+ const ChunkRange range(_min, _max);
- // 2. Ensure any data which might have been left orphaned in the range being moved has been
- // deleted.
- if (_useFCV44RangeDeleterProtocol) {
- while (migrationutil::checkForConflictingDeletions(
+ // 1. Ensure any data which might have been left orphaned in the range being moved has been
+ // deleted.
+ if (_useFCV44RangeDeleterProtocol) {
+ if (migrationutil::checkForConflictingDeletions(
outerOpCtx, range, donorCollectionOptionsAndIndexes.uuid)) {
- uassert(ErrorCodes::ResumableRangeDeleterDisabled,
- "Failing migration because the disableResumableRangeDeleter server "
- "parameter is set to true on the recipient shard, which contains range "
- "deletion tasks overlapping the incoming range.",
- !disableResumableRangeDeleter.load());
-
- LOGV2(22001,
- "Migration paused because the requested range {range} for {namespace} "
- "overlaps with a range already scheduled for deletion",
- "Migration paused because the requested range overlaps with a range already "
- "scheduled for deletion",
- "namespace"_attr = _nss.ns(),
- "range"_attr = redact(range.toString()),
- "migrationId"_attr =
- _useFCV44RangeDeleterProtocol ? _migrationId->toBSON() : BSONObj());
-
- auto status = CollectionShardingRuntime::waitForClean(
- outerOpCtx, _nss, donorCollectionOptionsAndIndexes.uuid, range);
+ uassert(ErrorCodes::ResumableRangeDeleterDisabled,
+ "Failing migration because the disableResumableRangeDeleter server "
+ "parameter is set to true on the recipient shard, which contains range "
+ "deletion tasks overlapping the incoming range.",
+ !disableResumableRangeDeleter.load());
+
+ LOGV2(22001,
+ "Migration paused because the requested range {range} for {namespace} "
+ "overlaps with a range already scheduled for deletion",
+ "Migration paused because the requested range overlaps with a range already "
+ "scheduled for deletion",
+ "namespace"_attr = _nss.ns(),
+ "range"_attr = redact(range.toString()),
+ "migrationId"_attr =
+ _useFCV44RangeDeleterProtocol ? _migrationId->toBSON() : BSONObj());
- if (!status.isOK()) {
- _setStateFail(redact(status.toString()));
- return;
- }
+ auto status = CollectionShardingRuntime::waitForClean(
+ outerOpCtx, _nss, donorCollectionOptionsAndIndexes.uuid, range);
- outerOpCtx->sleepFor(Milliseconds(1000));
+ if (!status.isOK()) {
+ _setStateFail(redact(status.toString()));
+ return;
}
+ }
+ } else {
+ // Synchronously delete any data which might have been left orphaned in the range
+ // being moved, and wait for completion
+
+ // Needed for _forgetPending to make sure the collection has the same UUID at the end of
+ // an aborted migration as at the beginning. Must be set before calling _notePending.
+ _collUuid = donorCollectionOptionsAndIndexes.uuid;
+ auto cleanupCompleteFuture = _notePending(outerOpCtx, range);
+ auto cleanupStatus = cleanupCompleteFuture.getNoThrow(outerOpCtx);
+ // Wait for the range deletion to report back. Swallow
+ // RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist error since the
+ // collection could either never exist or get dropped directly from the shard after the
+ // range deletion task got scheduled.
+ if (!cleanupStatus.isOK() &&
+ cleanupStatus !=
+ ErrorCodes::RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist) {
+ _setStateFail(redact(cleanupStatus.toString()));
+ return;
+ }
+
+ // Wait for any other, overlapping queued deletions to drain
+ cleanupStatus = CollectionShardingRuntime::waitForClean(
+ outerOpCtx, _nss, donorCollectionOptionsAndIndexes.uuid, range);
+ if (!cleanupStatus.isOK()) {
+ _setStateFail(redact(cleanupStatus.reason()));
+ return;
+ }
+ }
+ timing.done(1);
+ migrateThreadHangAtStep1.pauseWhileSet();
+
+
+ // 2. Create the parent collection and its indexes, if needed.
+ // The conventional usage of retryable writes is to assign statement id's to all of
+ // the writes done as part of the data copying so that _recvChunkStart is
+ // conceptually a retryable write batch. However, we are using an alternate approach to do those
+ // writes under an AlternativeClientRegion because 1) threading the
+ // statement id's through to all the places where they are needed would make this code more
+ // complex, and 2) some of the operations, like creating the collection or building indexes, are
+ // not currently supported in retryable writes.
+ outerOpCtx->setAlwaysInterruptAtStepDownOrUp();
+ {
+ auto newClient = outerOpCtx->getServiceContext()->makeClient("MigrationCoordinator");
+ {
+ stdx::lock_guard<Client> lk(*newClient.get());
+ newClient->setSystemOperationKillable(lk);
+ }
+
+ AlternativeClientRegion acr(newClient);
+ auto newOpCtxPtr = cc().makeOperationContext();
+ newOpCtxPtr->setAlwaysInterruptAtStepDownOrUp();
+ {
+ stdx::lock_guard<Client> lk(*outerOpCtx->getClient());
+ outerOpCtx->checkForInterrupt();
+ }
+ auto opCtx = newOpCtxPtr.get();
+
+ cloneCollectionIndexesAndOptions(opCtx, _nss, donorCollectionOptionsAndIndexes);
+ timing.done(2);
+ migrateThreadHangAtStep2.pauseWhileSet();
+ }
+ {
+ // 3. If supported, insert a pending range deletion task for the incoming chunk
+ // (to be executed in case of migration rollback).
+ if (_useFCV44RangeDeleterProtocol) {
RangeDeletionTask recipientDeletionTask(*_migrationId,
_nss,
donorCollectionOptionsAndIndexes.uuid,
@@ -1014,46 +1078,12 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
&ignoreResult));
},
_useFCV44RangeDeleterProtocol);
- } else {
- // Synchronously delete any data which might have been left orphaned in the range
- // being moved, and wait for completion
-
- // Needed for _forgetPending to make sure the collection has the same UUID at the end of
- // an aborted migration as at the beginning. Must be set before calling _notePending.
- _collUuid = donorCollectionOptionsAndIndexes.uuid;
- auto cleanupCompleteFuture = _notePending(outerOpCtx, range);
- auto cleanupStatus = cleanupCompleteFuture.getNoThrow(outerOpCtx);
- // Wait for the range deletion to report back. Swallow
- // RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist error since the
- // collection could either never exist or get dropped directly from the shard after the
- // range deletion task got scheduled.
- if (!cleanupStatus.isOK() &&
- cleanupStatus !=
- ErrorCodes::RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist) {
- _setStateFail(redact(cleanupStatus.toString()));
- return;
- }
-
- // Wait for any other, overlapping queued deletions to drain
- cleanupStatus = CollectionShardingRuntime::waitForClean(
- outerOpCtx, _nss, donorCollectionOptionsAndIndexes.uuid, range);
- if (!cleanupStatus.isOK()) {
- _setStateFail(redact(cleanupStatus.reason()));
- return;
- }
}
- timing.done(1);
- migrateThreadHangAtStep1.pauseWhileSet();
+ timing.done(3);
+ migrateThreadHangAtStep3.pauseWhileSet();
}
- // The conventional usage of retryable writes is to assign statement id's to all of
- // the writes done as part of the data copying so that _recvChunkStart is
- // conceptually a retryable write batch. However, we are using an alternate approach to do those
- // writes under an AlternativeClientRegion because 1) threading the
- // statement id's through to all the places where they are needed would make this code more
- // complex, and 2) some of the operations, like creating the collection or building indexes, are
- // not currently supported in retryable writes.
auto newClient = outerOpCtx->getServiceContext()->makeClient("MigrationCoordinator");
{
stdx::lock_guard<Client> lk(*newClient.get());
@@ -1068,17 +1098,9 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
outerOpCtx->checkForInterrupt();
}
auto opCtx = newOpCtxPtr.get();
-
- {
- cloneCollectionIndexesAndOptions(opCtx, _nss, donorCollectionOptionsAndIndexes);
-
- timing.done(2);
- migrateThreadHangAtStep2.pauseWhileSet();
- }
-
repl::OpTime lastOpApplied;
{
- // 3. Initial bulk clone
+ // 4. Initial bulk clone
_setState(CLONE);
_sessionMigration->start(opCtx->getServiceContext());
@@ -1179,8 +1201,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
// secondaries
lastOpApplied = cloneDocumentsFromDonor(opCtx, insertBatchFn, fetchBatchFn);
- timing.done(3);
- migrateThreadHangAtStep3.pauseWhileSet();
+ timing.done(4);
+ migrateThreadHangAtStep4.pauseWhileSet();
if (MONGO_unlikely(failMigrationOnRecipient.shouldFail())) {
_setStateFail(str::stream() << "failing migration after cloning " << _numCloned
@@ -1192,7 +1214,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
const BSONObj xferModsRequest = createTransferModsRequest(_nss, *_sessionId);
{
- // 4. Do bulk of mods
+ // 5. Do bulk of mods
_setState(CATCHUP);
while (true) {
@@ -1256,8 +1278,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
}
}
- timing.done(4);
- migrateThreadHangAtStep4.pauseWhileSet();
+ timing.done(5);
+ migrateThreadHangAtStep5.pauseWhileSet();
}
{
@@ -1287,7 +1309,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
}
{
- // 5. Wait for commit
+ // 6. Wait for commit
_setState(STEADY);
bool transferAfterCommit = false;
@@ -1349,8 +1371,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
return;
}
- timing.done(5);
- migrateThreadHangAtStep5.pauseWhileSet();
+ timing.done(6);
+ migrateThreadHangAtStep6.pauseWhileSet();
}
runWithoutSession(
@@ -1362,8 +1384,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
_setState(DONE);
- timing.done(6);
- migrateThreadHangAtStep6.pauseWhileSet();
+ timing.done(7);
+ migrateThreadHangAtStep7.pauseWhileSet();
}
bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx,