diff options
author | Alex Taskov <alex.taskov@mongodb.com> | 2020-01-14 21:55:45 +0000 |
---|---|---|
committer | A. Jesse Jiryu Davis <jesse@mongodb.com> | 2020-01-27 15:40:33 -0500 |
commit | 31fdaa40c14e9cfd008ab29e390800ca111283c7 (patch) | |
tree | d869cb9d1d8db11acca6b7a46a5ac8c1d059f171 | |
parent | 3ef44b94bfd27071cb0ca58c4e794bdc082e3395 (diff) | |
download | mongo-31fdaa40c14e9cfd008ab29e390800ca111283c7.tar.gz |
SERVER-45179 Add the FCV 4.4 behavior to the MigrationDestinationManager
24 files changed, 235 insertions, 109 deletions
diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_causal_consistency_and_balancer.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_causal_consistency_and_balancer.yml index 4ed710e4a3a..7d5b72c4dba 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_causal_consistency_and_balancer.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_causal_consistency_and_balancer.yml @@ -101,6 +101,9 @@ selector: # TODO Unblacklist (SERVER-35538). - jstests/concurrency/fsm_workloads/database_versioning.js + # SERVER-44160 Modify cleanupOrphaned to wait for overlapping ranges to finish + - jstests/concurrency/fsm_workloads/cleanupOrphanedWhileMigrating.js + exclude_with_any_tags: - does_not_support_causal_consistency # This suite uses secondary read preference, which isn't currently compatible with transactions. diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_local_read_write_multi_stmt_txn_with_balancer.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_local_read_write_multi_stmt_txn_with_balancer.yml index 3913e53d8e2..009bce0e23b 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_local_read_write_multi_stmt_txn_with_balancer.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_local_read_write_multi_stmt_txn_with_balancer.yml @@ -133,6 +133,9 @@ selector: - jstests/concurrency/fsm_workloads/update_rename.js - jstests/concurrency/fsm_workloads/update_rename_noindex.js + # SERVER-44160 Modify cleanupOrphaned to wait for overlapping ranges to finish + - jstests/concurrency/fsm_workloads/cleanupOrphanedWhileMigrating.js + exclude_with_any_tags: - assumes_balancer_off - does_not_support_causal_consistency diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn.yml index 8c3c01d7899..8a67591e1fb 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn.yml @@ -133,6 +133,9 @@ selector: - jstests/concurrency/fsm_workloads/update_rename.js - jstests/concurrency/fsm_workloads/update_rename_noindex.js + # SERVER-44160 Modify cleanupOrphaned to wait for overlapping ranges to finish + - jstests/concurrency/fsm_workloads/cleanupOrphanedWhileMigrating.js + exclude_with_any_tags: - does_not_support_causal_consistency - requires_replication diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_balancer.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_balancer.yml index 55fab6ab7bb..8ab3b284880 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_balancer.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_balancer.yml @@ -133,6 +133,9 @@ selector: - jstests/concurrency/fsm_workloads/update_rename.js - jstests/concurrency/fsm_workloads/update_rename_noindex.js + # SERVER-44160 Modify cleanupOrphaned to wait for overlapping ranges to finish + - jstests/concurrency/fsm_workloads/cleanupOrphanedWhileMigrating.js + exclude_with_any_tags: - assumes_balancer_off - does_not_support_causal_consistency diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_replication.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_replication.yml index 5d1f00fe7ac..e089cda1cce 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_replication.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_replication.yml @@ -105,6 +105,9 @@ selector: # router tries to commit, it may not know the full participant list. - jstests/concurrency/fsm_workloads/multi_statement_transaction_all_commands_same_session.js + # SERVER-44160 Modify cleanupOrphaned to wait for overlapping ranges to finish + - jstests/concurrency/fsm_workloads/cleanupOrphanedWhileMigrating.js + exclude_with_any_tags: - requires_replication - assumes_balancer_on diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_with_balancer.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_with_balancer.yml index c83a1cb142a..03278dff1fb 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_with_balancer.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_replication_with_balancer.yml @@ -110,6 +110,9 @@ selector: # router tries to commit, it may not know the full participant list. - jstests/concurrency/fsm_workloads/multi_statement_transaction_all_commands_same_session.js + # SERVER-44160 Modify cleanupOrphaned to wait for overlapping ranges to finish + - jstests/concurrency/fsm_workloads/cleanupOrphanedWhileMigrating.js + exclude_with_any_tags: - assumes_balancer_off - requires_replication diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_with_stepdowns_and_balancer.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_with_stepdowns_and_balancer.yml index 8864ad30dee..2c723371ed6 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_with_stepdowns_and_balancer.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_with_stepdowns_and_balancer.yml @@ -179,6 +179,9 @@ selector: - jstests/concurrency/fsm_workloads/snapshot_read_kill_op_only.js - jstests/concurrency/fsm_workloads/snapshot_read_kill_operations.js + # SERVER-44160 Modify cleanupOrphaned to wait for overlapping ranges to finish + - jstests/concurrency/fsm_workloads/cleanupOrphanedWhileMigrating.js + exclude_with_any_tags: - assumes_balancer_off - requires_replication diff --git a/buildscripts/resmokeconfig/suites/sharding_misc.yml b/buildscripts/resmokeconfig/suites/sharding_misc.yml index c4538a8ce4b..4bdc42ea1f4 100644 --- a/buildscripts/resmokeconfig/suites/sharding_misc.yml +++ b/buildscripts/resmokeconfig/suites/sharding_misc.yml @@ -174,7 +174,6 @@ selector: - jstests/sharding/causal_consistency_shell_support.js - jstests/sharding/change_streams_establishment_finds_new_shards.js - jstests/sharding/retryable_writes.js - - jstests/sharding/pending_chunk.js - jstests/sharding/basic_merge.js - jstests/sharding/migration_critical_section_concurrency.js - jstests/sharding/sort1.js diff --git a/jstests/multiVersion/migrations_with_mixed_fcv.js b/jstests/multiVersion/migrations_with_mixed_fcv.js index c91d84a5b1d..3b6c9178439 100644 --- a/jstests/multiVersion/migrations_with_mixed_fcv.js +++ b/jstests/multiVersion/migrations_with_mixed_fcv.js @@ -204,11 +204,8 @@ function testSetFCVBlocksWhileMigratingChunk() { assert.commandWorked( st.s.getDB("admin").runCommand({setFeatureCompatibilityVersion: lastStableFCV})); - // Set shard1 to FCV 4.4 and leave shard0 at FCV 4.2 - assert.commandWorked( - st.shard1.getDB("admin").runCommand({setFeatureCompatibilityVersion: latestFCV})); checkFCV(st.shard0.getDB("admin"), lastStableFCV); - checkFCV(st.shard1.getDB("admin"), latestFCV); + checkFCV(st.shard1.getDB("admin"), lastStableFCV); // Start migration and block with failpoint. let shard0Primary = st.rs0.getPrimary(); diff --git a/jstests/sharding/pending_chunk.js b/jstests/multiVersion/pending_chunk.js index 06f9a2afec0..5d8b938e535 100644 --- a/jstests/sharding/pending_chunk.js +++ b/jstests/multiVersion/pending_chunk.js @@ -13,6 +13,9 @@ var coll = mongos.getCollection('foo.bar'); var ns = coll.getFullName(); var dbName = coll.getDB().getName(); +assert.commandWorked( + st.s.getDB("admin").runCommand({setFeatureCompatibilityVersion: lastStableFCV})); + assert.commandWorked(admin.runCommand({enableSharding: dbName})); printjson(admin.runCommand({movePrimary: dbName, to: st.shard0.shardName})); assert.commandWorked(admin.runCommand({shardCollection: ns, key: {_id: 1}})); diff --git a/jstests/sharding/migration_coordinator_basic.js b/jstests/sharding/migration_coordinator_basic.js index e81fd5ed138..040e90d2407 100644 --- a/jstests/sharding/migration_coordinator_basic.js +++ b/jstests/sharding/migration_coordinator_basic.js @@ -65,7 +65,7 @@ function assertEventuallyDoesNotHaveMigrationCoordinatorDoc(conn) { }); } -function assertHasRangeDeletionDoc({conn, pending, ns, uuid}) { +function assertHasRangeDeletionDoc({conn, pending, whenToClean, ns, uuid}) { const query = { nss: ns, collectionUuid: uuid, @@ -73,7 +73,7 @@ function assertHasRangeDeletionDoc({conn, pending, ns, uuid}) { "range.min._id": MinKey, "range.max._id": MaxKey, pending: (pending ? true : {$exists: false}), - whenToClean: "delayed" + whenToClean: whenToClean }; assert.neq(null, conn.getDB("config").getCollection("rangeDeletions").findOne(query), @@ -106,19 +106,18 @@ function assertEventuallyDoesNotHaveRangeDeletionDoc(conn) { // Run the moveChunk asynchronously, pausing during cloning to allow the test to make // assertions. - let step3Failpoint = configureFailPoint(st.shard0, "moveChunkHangAtStep3"); + let step4Failpoint = configureFailPoint(st.shard0, "moveChunkHangAtStep4"); const awaitResult = startParallelShell( funWithArgs(function(ns, toShardName) { assert.commandWorked(db.adminCommand({moveChunk: ns, find: {_id: 0}, to: toShardName})); }, ns, st.shard1.shardName), st.s.port); // Assert that the durable state for coordinating the migration was written correctly. - step3Failpoint.wait(); + step4Failpoint.wait(); assertHasMigrationCoordinatorDoc({conn: st.shard0, ns, uuid, epoch}); assertHasRangeDeletionDoc({conn: st.shard0, pending: true, whenToClean: "delayed", ns, uuid}); - // TODO (SERVER-45179): Add the FCV 4.4 behavior to the MigrationDestinationManager - // assertHasRangeDeletionDoc({conn: st.shard1, pending: true, whenToClean: "now", ns, uuid}); - step3Failpoint.off(); + assertHasRangeDeletionDoc({conn: st.shard1, pending: true, whenToClean: "now", ns, uuid}); + step4Failpoint.off(); // Allow the moveChunk to finish. awaitResult(); @@ -159,7 +158,7 @@ function assertEventuallyDoesNotHaveRangeDeletionDoc(conn) { // Run the moveChunk asynchronously, pausing during cloning to allow the test to make // assertions. - let step3Failpoint = configureFailPoint(st.shard0, "moveChunkHangAtStep3"); + let step4Failpoint = configureFailPoint(st.shard0, "moveChunkHangAtStep4"); let step5Failpoint = configureFailPoint(st.shard0, "moveChunkHangAtStep5"); const awaitResult = startParallelShell( funWithArgs(function(ns, toShardName) { @@ -170,12 +169,11 @@ function assertEventuallyDoesNotHaveRangeDeletionDoc(conn) { }, ns, st.shard1.shardName), st.s.port); // Assert that the durable state for coordinating the migration was written correctly. - step3Failpoint.wait(); + step4Failpoint.wait(); assertHasMigrationCoordinatorDoc({conn: st.shard0, ns, uuid, epoch}); assertHasRangeDeletionDoc({conn: st.shard0, pending: true, whenToClean: "delayed", ns, uuid}); - // TODO (SERVER-45179): Add the FCV 4.4 behavior to the MigrationDestinationManager - // assertHasRangeDeletionDoc({conn: st.shard1, pending: true, whenToClean: "now", ns, uuid}); - step3Failpoint.off(); + assertHasRangeDeletionDoc({conn: st.shard1, pending: true, whenToClean: "now", ns, uuid}); + step4Failpoint.off(); // Assert that the recipient has 'numDocs' orphans. step5Failpoint.wait(); @@ -186,10 +184,9 @@ function assertEventuallyDoesNotHaveRangeDeletionDoc(conn) { awaitResult(); // Recipient shard eventually cleans up the orphans. - // TODO (SERVER-45179): Add the FCV 4.4 behavior to the MigrationDestinationManager - // assert.soon(function() { - // return st.shard1.getDB(dbName).getCollection(collName).count() === 0; - //}); + assert.soon(function() { + return st.shard1.getDB(dbName).getCollection(collName).count() === 0; + }); assert.eq(numDocs, st.s.getDB(dbName).getCollection(collName).find().itcount()); // The durable state for coordinating the migration is eventually cleaned up. diff --git a/jstests/sharding/migration_sets_fromMigrate_flag.js b/jstests/sharding/migration_sets_fromMigrate_flag.js index 73859a88aab..78f91705c08 100644 --- a/jstests/sharding/migration_sets_fromMigrate_flag.js +++ b/jstests/sharding/migration_sets_fromMigrate_flag.js @@ -57,12 +57,11 @@ jsTest.log('Inserting 5 docs into donor shard, ensuring one orphan on the recipi assert.commandWorked(coll.insert({_id: 2})); assert.eq(1, donorColl.count()); assert.commandWorked( - recipient.adminCommand({configureFailPoint: "failMigrationLeaveOrphans", mode: "alwaysOn"})); + recipient.adminCommand({configureFailPoint: "failMigrationOnRecipient", mode: "alwaysOn"})); assert.commandFailed( admin.runCommand({moveChunk: coll.getFullName(), find: {_id: 2}, to: st.shard1.shardName})); -assert.eq(1, recipientColl.count()); assert.commandWorked( - recipient.adminCommand({configureFailPoint: "failMigrationLeaveOrphans", mode: "off"})); + recipient.adminCommand({configureFailPoint: "failMigrationOnRecipient", mode: "off"})); // Insert the remaining documents into the collection. assert.commandWorked(coll.insert({_id: 0})); diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h index c871a3e08a8..f59efc0f16b 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source.h +++ b/src/mongo/db/s/migration_chunk_cloner_source.h @@ -37,6 +37,7 @@ class BSONObj; class OperationContext; class Status; class Timestamp; +class UUID; namespace repl { class OpTime; @@ -71,7 +72,7 @@ public: * NOTE: Must be called without any locks and must succeed, before any other methods are called * (except for cancelClone and [insert/update/delete]Op). */ - virtual Status startClone(OperationContext* opCtx) = 0; + virtual Status startClone(OperationContext* opCtx, const UUID& migrationId) = 0; /** * Blocking method, which uses some custom selected logic for deciding whether it is appropriate diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index f8920c66800..5ba3a2068e7 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -236,7 +236,8 @@ MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() { invariant(_state == kDone); } -Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) { +Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx, + const UUID& migrationId) { invariant(_state == kNew); invariant(!opCtx->lockState()->isLocked()); @@ -276,19 +277,33 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) { // Tell the recipient shard to start cloning BSONObjBuilder cmdBuilder; - StartChunkCloneRequest::appendAsCommand(&cmdBuilder, - _args.getNss(), - // TODO (SERVER-44161): Replace with UUID provided by - // migration donor. - UUID::gen(), - _sessionId, - _donorConnStr, - _args.getFromShardId(), - _args.getToShardId(), - _args.getMinKey(), - _args.getMaxKey(), - _shardKeyPattern.toBSON(), - _args.getSecondaryThrottle()); + + auto fcvVersion = serverGlobalParams.featureCompatibility.getVersion(); + if (fcvVersion == ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo44) { + StartChunkCloneRequest::appendAsCommand(&cmdBuilder, + _args.getNss(), + migrationId, + _sessionId, + _donorConnStr, + _args.getFromShardId(), + _args.getToShardId(), + _args.getMinKey(), + _args.getMaxKey(), + _shardKeyPattern.toBSON(), + _args.getSecondaryThrottle()); + } else { + // TODO (SERVER-44787): Remove this overload after 4.4 is released. + StartChunkCloneRequest::appendAsCommand(&cmdBuilder, + _args.getNss(), + _sessionId, + _donorConnStr, + _args.getFromShardId(), + _args.getToShardId(), + _args.getMinKey(), + _args.getMaxKey(), + _shardKeyPattern.toBSON(), + _args.getSecondaryThrottle()); + } auto startChunkCloneResponseStatus = _callRecipient(cmdBuilder.obj()); if (!startChunkCloneResponseStatus.isOK()) { diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 2653f401ef1..a7236c136ce 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -90,7 +90,7 @@ public: HostAndPort recipientHost); ~MigrationChunkClonerSourceLegacy(); - Status startClone(OperationContext* opCtx) override; + Status startClone(OperationContext* opCtx, const UUID& migrationId) override; Status awaitUntilCriticalSectionIsAppropriate(OperationContext* opCtx, Milliseconds maxTimeToWait) override; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp index c117aa5b26a..89ca8742cfc 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp @@ -214,7 +214,7 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, CorrectDocumentsFetched) { onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); }); }); - ASSERT_OK(cloner.startClone(operationContext())); + ASSERT_OK(cloner.startClone(operationContext(), UUID::gen())); futureStartClone.default_timed_get(); } @@ -312,7 +312,7 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, CollectionNotFound) { kDonorConnStr, kRecipientConnStr.getServers()[0]); - ASSERT_NOT_OK(cloner.startClone(operationContext())); + ASSERT_NOT_OK(cloner.startClone(operationContext(), UUID::gen())); cloner.cancelClone(operationContext()); } @@ -325,7 +325,7 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, ShardKeyIndexNotFound) { kDonorConnStr, kRecipientConnStr.getServers()[0]); - ASSERT_NOT_OK(cloner.startClone(operationContext())); + ASSERT_NOT_OK(cloner.startClone(operationContext(), UUID::gen())); cloner.cancelClone(operationContext()); } @@ -351,7 +351,7 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, FailedToEngageRecipientShard) { }); }); - auto startCloneStatus = cloner.startClone(operationContext()); + auto startCloneStatus = cloner.startClone(operationContext(), UUID::gen()); ASSERT_EQ(ErrorCodes::NetworkTimeout, startCloneStatus.code()); futureStartClone.default_timed_get(); } diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index ac34640554c..d33e5fbcbc9 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -213,7 +213,7 @@ MONGO_FAIL_POINT_DEFINE(migrateThreadHangAtStep4); MONGO_FAIL_POINT_DEFINE(migrateThreadHangAtStep5); MONGO_FAIL_POINT_DEFINE(migrateThreadHangAtStep6); -MONGO_FAIL_POINT_DEFINE(failMigrationLeaveOrphans); +MONGO_FAIL_POINT_DEFINE(failMigrationOnRecipient); MONGO_FAIL_POINT_DEFINE(failMigrationReceivedOutOfRangeOperation); } // namespace @@ -341,10 +341,24 @@ Status MigrationDestinationManager::start(OperationContext* opCtx, return Status(ErrorCodes::ConflictingOperationInProgress, "Can't receive chunk while FCV is upgrading/downgrading"); + // Note: It is expected that the FCV cannot change while the node is donating or receiving a + // chunk. This is guaranteed by the setFCV command serializing with donating and receiving + // chunks via the ActiveMigrationsRegistry. + _useFCV44Protocol = + fcvVersion == ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo44; + _state = READY; _stateChangedCV.notify_all(); _errmsg = ""; + if (_useFCV44Protocol) { + uassert(ErrorCodes::ConflictingOperationInProgress, + "Missing migrationId in FCV 4.4", + cloneRequest.hasMigrationId()); + + _migrationId = cloneRequest.getMigrationId(); + } + _nss = nss; _fromShard = cloneRequest.getFromShardId(); _fromShardConnString = @@ -748,8 +762,10 @@ void MigrationDestinationManager::_migrateThread() { _setStateFail(str::stream() << "migrate failed: " << redact(exceptionToStatus())); } - if (getState() != DONE && !MONGO_unlikely(failMigrationLeaveOrphans.shouldFail())) { - _forgetPending(opCtx.get(), ChunkRange(_min, _max)); + if (!_useFCV44Protocol) { + if (getState() != DONE) { + _forgetPending(opCtx.get(), ChunkRange(_min, _max)); + } } stdx::lock_guard<Latch> lk(_mutex); @@ -799,39 +815,46 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) { { const ChunkRange range(_min, _max); - while (migrationutil::checkForConflictingDeletions(opCtx, range, collectionUuid)) { - LOG(0) << "Migration paused because range overlaps with a " - "range that is scheduled for deletion: collection: " - << _nss.ns() << " range: " << redact(range.toString()); + // 2. Ensure any data which might have been left orphaned in the range being moved has been + // deleted. + if (_useFCV44Protocol) { + while (migrationutil::checkForConflictingDeletions(opCtx, range, collectionUuid)) { + LOG(0) << "Migration paused because range overlaps with a " + "range that is scheduled for deletion: collection: " + << _nss.ns() << " range: " << redact(range.toString()); - auto status = CollectionShardingRuntime::waitForClean(opCtx, _nss, _epoch, range); + auto status = CollectionShardingRuntime::waitForClean(opCtx, _nss, _epoch, range); - if (!status.isOK()) { - _setStateFail(redact(status.reason())); - return; - } + if (!status.isOK()) { + _setStateFail(redact(status.reason())); + return; + } - opCtx->sleepFor(Milliseconds(1000)); - } + opCtx->sleepFor(Milliseconds(1000)); + } - // TODO(SERVER-44163): Delete this block after the MigrationCoordinator has been integrated - // into the source. It will be replaced by the checkForOverlapping call. + RangeDeletionTask recipientDeletionTask( + _migrationId, _nss, collectionUuid, _fromShard, range, CleanWhenEnum::kNow); + recipientDeletionTask.setPending(true); - // 2. Synchronously delete any data which might have been left orphaned in the range - // being moved, and wait for completion + migrationutil::persistRangeDeletionTaskLocally(opCtx, recipientDeletionTask); + } else { + // Synchronously delete any data which might have been left orphaned in the range + // being moved, and wait for completion - auto notification = _notePending(opCtx, range); - // Wait for the range deletion to report back - if (!notification.waitStatus(opCtx).isOK()) { - _setStateFail(redact(notification.waitStatus(opCtx).reason())); - return; - } + auto notification = _notePending(opCtx, range); + // Wait for the range deletion to report back + if (!notification.waitStatus(opCtx).isOK()) { + _setStateFail(redact(notification.waitStatus(opCtx).reason())); + return; + } - // Wait for any other, overlapping queued deletions to drain - auto status = CollectionShardingRuntime::waitForClean(opCtx, _nss, _epoch, range); - if (!status.isOK()) { - _setStateFail(redact(status.reason())); - return; + // Wait for any other, overlapping queued deletions to drain + auto status = CollectionShardingRuntime::waitForClean(opCtx, _nss, _epoch, range); + if (!status.isOK()) { + _setStateFail(redact(status.reason())); + return; + } } timing.done(2); @@ -934,9 +957,9 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) { timing.done(3); migrateThreadHangAtStep3.pauseWhileSet(); - if (MONGO_unlikely(failMigrationLeaveOrphans.shouldFail())) { + if (MONGO_unlikely(failMigrationOnRecipient.shouldFail())) { _setStateFail(str::stream() << "failing migration after cloning " << _numCloned - << " docs due to failMigrationLeaveOrphans failpoint"); + << " docs due to failMigrationOnRecipient failpoint"); return; } } diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index eff5aae8eb5..41841c96abe 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -188,6 +188,9 @@ private: stdx::thread _migrateThreadHandle; + bool _useFCV44Protocol{false}; + + UUID _migrationId; NamespaceString _nss; ConnectionString _fromShardConnString; ShardId _fromShard; diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 0644506b536..dd131735106 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -251,6 +251,8 @@ Status MigrationSourceManager::startClone() { auto replCoord = repl::ReplicationCoordinator::get(_opCtx); auto replEnabled = replCoord->isReplEnabled(); + UUID migrationId = UUID::gen(); + { const auto metadata = _getCurrentMetadataAndCheckEpoch(); @@ -283,10 +285,6 @@ Status MigrationSourceManager::startClone() { invariant(nullptr == std::exchange(msmForCsr(csr), this)); if (_useFCV44Protocol) { - // TODO (SERVER-45175): Unify the migration UUID used by the MigrationCoordinator and - // MigrationChunkClonerSourceLegacy - UUID migrationId = UUID::gen(); - // TODO (SERVER-xxx): Allow re-using the same session (though different transaction // number) across migrations. auto lsid = makeLogicalSessionId(_opCtx); @@ -320,7 +318,7 @@ Status MigrationSourceManager::startClone() { _coordinator->startMigration(_opCtx, _args.getWaitForDelete()); } - Status startCloneStatus = _cloneDriver->startClone(_opCtx); + Status startCloneStatus = _cloneDriver->startClone(_opCtx, migrationId); if (!startCloneStatus.isOK()) { return startCloneStatus; } @@ -648,16 +646,9 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig() { const ChunkRange range(_args.getMinKey(), _args.getMaxKey()); - auto notification = [&] { - auto const whenToClean = _args.getWaitForDelete() ? CollectionShardingRuntime::kNow - : CollectionShardingRuntime::kDelayed; - UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); - AutoGetCollection autoColl(_opCtx, getNss(), MODE_IS); - return CollectionShardingRuntime::get(_opCtx, getNss())->cleanUpRange(range, whenToClean); - }(); - if (!MONGO_unlikely(doNotRefreshRecipientAfterCommit.shouldFail())) { - // Best-effort make the recipient refresh its routing table to the new collection version. + // Best-effort make the recipient refresh its routing table to the new collection + // version. refreshRecipientRoutingTable(_opCtx, getNss(), _args.getToShardId(), @@ -669,24 +660,51 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig() { << "Moved chunks successfully but failed to clean up " << getNss().ns() << " range " << redact(range.toString()) << " due to: "; - if (_args.getWaitForDelete()) { - log() << "Waiting for cleanup of " << getNss().ns() << " range " - << redact(range.toString()); - auto deleteStatus = notification.waitStatus(_opCtx); - if (!deleteStatus.isOK()) { - return {ErrorCodes::OrphanedRangeCleanUpFailed, - orphanedRangeCleanUpErrMsg + redact(deleteStatus)}; - } - return Status::OK(); - } + if (_useFCV44Protocol) { + if (_args.getWaitForDelete()) { + log() << "Waiting for cleanup of " << getNss().ns() << " range " + << redact(range.toString()); - if (notification.ready() && !notification.waitStatus(_opCtx).isOK()) { - return {ErrorCodes::OrphanedRangeCleanUpFailed, - orphanedRangeCleanUpErrMsg + redact(notification.waitStatus(_opCtx))}; + auto deleteStatus = + CollectionShardingRuntime::waitForClean(_opCtx, getNss(), _collectionEpoch, range); + + if (!deleteStatus.isOK()) { + return {ErrorCodes::OrphanedRangeCleanUpFailed, + orphanedRangeCleanUpErrMsg + redact(deleteStatus)}; + } + } } else { - log() << "Leaving cleanup of " << getNss().ns() << " range " << redact(range.toString()) - << " to complete in background"; - notification.abandon(); + auto notification = [&] { + auto const whenToClean = _args.getWaitForDelete() ? CollectionShardingRuntime::kNow + : CollectionShardingRuntime::kDelayed; + UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); + AutoGetCollection autoColl(_opCtx, getNss(), MODE_IS); + return CollectionShardingRuntime::get(_opCtx, getNss()) + ->cleanUpRange(range, whenToClean); + }(); + + if (_args.getWaitForDelete()) { + log() << "Waiting for cleanup of " << getNss().ns() << " range " + << redact(range.toString()); + + auto deleteStatus = notification.waitStatus(_opCtx); + + if (!deleteStatus.isOK()) { + return {ErrorCodes::OrphanedRangeCleanUpFailed, + orphanedRangeCleanUpErrMsg + redact(deleteStatus)}; + } + + return Status::OK(); + } + + if (notification.ready() && !notification.waitStatus(_opCtx).isOK()) { + return {ErrorCodes::OrphanedRangeCleanUpFailed, + orphanedRangeCleanUpErrMsg + redact(notification.waitStatus(_opCtx))}; + } else { + log() << "Leaving cleanup of " << getNss().ns() << " range " << redact(range.toString()) + << " to complete in background"; + notification.abandon(); + } } return Status::OK(); diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index c6bee0d5377..7994dce7251 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -348,6 +348,7 @@ void persistAbortDecision(OperationContext* opCtx, const UUID& migrationId) { QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationId), BSON("$set" << BSON(MigrationCoordinatorDocument::kDecisionFieldName << "aborted"))); } + void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx, const ShardId& recipientId, const UUID& migrationId, diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp index 1fa019e7ebd..68abc0bbb00 100644 --- a/src/mongo/db/s/move_chunk_command.cpp +++ b/src/mongo/db/s/move_chunk_command.cpp @@ -72,7 +72,7 @@ const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, // writeConcernMajorityJournalDefault is set to true // in the ReplSetConfig. WriteConcernOptions::SyncMode::UNSET, - -1); + WriteConcernOptions::kWriteConcernTimeoutSharding); // Tests can pause and resume moveChunk's progress at each step by enabling/disabling each failpoint MONGO_FAIL_POINT_DEFINE(moveChunkHangAtStep1); @@ -174,6 +174,7 @@ public: writeConcernResult.wTimedOut = false; Status majorityStatus = waitForWriteConcern( opCtx, replClient.getLastOp(), kMajorityWriteConcern, &writeConcernResult); + if (!majorityStatus.isOK()) { if (!writeConcernResult.wTimedOut) { uassertStatusOK(majorityStatus); diff --git a/src/mongo/db/s/start_chunk_clone_request.cpp b/src/mongo/db/s/start_chunk_clone_request.cpp index ad350452346..30d2813a428 100644 --- a/src/mongo/db/s/start_chunk_clone_request.cpp +++ b/src/mongo/db/s/start_chunk_clone_request.cpp @@ -77,7 +77,8 @@ StatusWith<StartChunkCloneRequest> StartChunkCloneRequest::createFromCommand(Nam // TODO (SERVER-44787): Remove this FCV check after 4.4 is released. if (serverGlobalParams.featureCompatibility.getVersion() == ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo44) { - request._migrationId = UUID::parse(obj); + if (obj.getField("uuid")) + request._migrationId = UUID::parse(obj); } { @@ -187,4 +188,31 @@ void StartChunkCloneRequest::appendAsCommand( secondaryThrottle.append(builder); } +// TODO (SERVER-44787): Remove this overload after 4.4 is released. +void StartChunkCloneRequest::appendAsCommand( + BSONObjBuilder* builder, + const NamespaceString& nss, + const MigrationSessionId& sessionId, + const ConnectionString& fromShardConnectionString, + const ShardId& fromShardId, + const ShardId& toShardId, + const BSONObj& chunkMinKey, + const BSONObj& chunkMaxKey, + const BSONObj& shardKeyPattern, + const MigrationSecondaryThrottleOptions& secondaryThrottle) { + invariant(builder->asTempObj().isEmpty()); + invariant(nss.isValid()); + invariant(fromShardConnectionString.isValid()); + + builder->append(kRecvChunkStart, nss.ns()); + sessionId.append(builder); + builder->append(kFromShardConnectionString, fromShardConnectionString.toString()); + builder->append(kFromShardId, fromShardId.toString()); + builder->append(kToShardId, toShardId.toString()); + builder->append(kChunkMinKey, chunkMinKey); + builder->append(kChunkMaxKey, chunkMaxKey); + builder->append(kShardKeyPattern, shardKeyPattern); + secondaryThrottle.append(builder); +} + } // namespace mongo diff --git a/src/mongo/db/s/start_chunk_clone_request.h b/src/mongo/db/s/start_chunk_clone_request.h index 055ba59a459..826c6e94371 100644 --- a/src/mongo/db/s/start_chunk_clone_request.h +++ b/src/mongo/db/s/start_chunk_clone_request.h @@ -71,6 +71,18 @@ public: const BSONObj& shardKeyPattern, const MigrationSecondaryThrottleOptions& secondaryThrottle); + // TODO (SERVER-44787): Remove this overload after 4.4 is released. + static void appendAsCommand(BSONObjBuilder* builder, + const NamespaceString& nss, + const MigrationSessionId& sessionId, + const ConnectionString& fromShardConnectionString, + const ShardId& fromShardId, + const ShardId& toShardId, + const BSONObj& chunkMinKey, + const BSONObj& chunkMaxKey, + const BSONObj& shardKeyPattern, + const MigrationSecondaryThrottleOptions& secondaryThrottle); + const NamespaceString& getNss() const { return _nss; } @@ -83,9 +95,16 @@ public: return _fromShardCS; } + // TODO (SERVER-44787): Remove this function after 4.4 is released. + // Use this check so that getMigrationId() is never called in a cluster that's not fully + // upgraded to 4.4. + bool hasMigrationId() const { + return _migrationId.is_initialized(); + } + const UUID& getMigrationId() const { - // getMigrationId() should never be called in a cluster that's not fully upgraded to 4.4. - // TODO (SERVER-44787): Remove this invariant after 4.4 is released. + // TODO (SERVER-44787): change _migrationId to non-optional and remove invariant after 4.4 + // is released. invariant(_migrationId); return *_migrationId; } diff --git a/src/mongo/util/uuid.h b/src/mongo/util/uuid.h index 5f7e27d35b9..b90b4d29d30 100644 --- a/src/mongo/util/uuid.h +++ b/src/mongo/util/uuid.h @@ -77,6 +77,7 @@ class UUID { friend class LogicalSessionIdToClient; friend class LogicalSessionFromClient; friend class MigrationCoordinatorDocument; + friend class MigrationDestinationManager; friend class RangeDeletionTask; friend class ResolvedKeyId; friend class repl::CollectionInfo; |