diff options
author | Suganthi Mani <suganthi.mani@mongodb.com> | 2022-04-26 12:03:36 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-26 12:36:10 +0000 |
commit | 61087a02978226541e7f17f51a8755b680e26364 (patch) | |
tree | 7c3713b2c5a0b8517c1858695650f084e55d4967 | |
parent | 35f03b7ef5739264c9de4b9a6d3e38bdc19b5c17 (diff) | |
download | mongo-61087a02978226541e7f17f51a8755b680e26364.tar.gz |
SERVER-65300 Refactor tenant migration recipient state machinery code.
20 files changed, 869 insertions, 579 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml index 58959b05063..b6e63985bde 100644 --- a/etc/backports_required_for_multiversion_tests.yml +++ b/etc/backports_required_for_multiversion_tests.yml @@ -206,6 +206,8 @@ last-continuous: test_file: jstests/sharding/change_stream_shard_failover.js - ticket: SERVER-63049 test_file: jstests/noPassthrough/operator_counters_accumulators.js + - ticket: SERVER-65300 + test_file: jstests/replsets/tenant_migration_recipient_rollback_recovery.js # Tests that should only be excluded from particular suites should be listed under that suite. suites: @@ -545,6 +547,8 @@ last-lts: test_file: jstests/replsets/capped_deletes.js - ticket: SERVER-65022 test_file: jstests/sharding/change_stream_shard_failover.js + - ticket: SERVER-65300 + test_file: jstests/replsets/tenant_migration_recipient_rollback_recovery.js # Tests that should only be excluded from particular suites should be listed under that suite. suites: diff --git a/jstests/replsets/tenant_migration_commit_transaction_retry.js b/jstests/replsets/tenant_migration_commit_transaction_retry.js index 2d2eedb4bfe..597aebadebc 100644 --- a/jstests/replsets/tenant_migration_commit_transaction_retry.js +++ b/jstests/replsets/tenant_migration_commit_transaction_retry.js @@ -56,8 +56,8 @@ assert.commandWorked(donorPrimary.getCollection(kNs).insert( session.endSession(); } -const waitAfterStartingOplogApplier = configureFailPoint( - recipientPrimary, "fpAfterStartingOplogApplierMigrationRecipientInstance", {action: "hang"}); +const pauseTenantMigrationBeforeLeavingDataSyncState = + configureFailPoint(donorPrimary, "pauseTenantMigrationBeforeLeavingDataSyncState"); jsTestLog("Run a migration to completion"); const migrationId = UUID(); @@ -69,7 +69,7 @@ tenantMigrationTest.startMigration(migrationOpts); // Hang the recipient during oplog application before we continue to run more transactions on the // donor. This is to test applying multiple transactions on multiple sessions in the same batch. -waitAfterStartingOplogApplier.wait(); +pauseTenantMigrationBeforeLeavingDataSyncState.wait(); const waitInOplogApplier = configureFailPoint(recipientPrimary, "hangInTenantOplogApplication"); tenantMigrationTest.insertDonorDB(kDbName, kCollName, [{_id: 3, x: 3}, {_id: 4, x: 4}]); @@ -92,7 +92,7 @@ for (let i = 0; i < 5; i++) { session.endSession(); } -waitAfterStartingOplogApplier.off(); +pauseTenantMigrationBeforeLeavingDataSyncState.off(); waitInOplogApplier.off(); TenantMigrationTest.assertCommitted(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); diff --git a/jstests/replsets/tenant_migration_recipient_current_op.js b/jstests/replsets/tenant_migration_recipient_current_op.js index c5d88734a00..3117790bf1e 100644 --- a/jstests/replsets/tenant_migration_recipient_current_op.js +++ b/jstests/replsets/tenant_migration_recipient_current_op.js @@ -1,11 +1,14 @@ /** - * Tests the currentOp command during a tenant migration. A tenant migration is started, and the - * currentOp command is tested as the recipient moves through the kStarted, kConsistent and kDone - * states. + * Tests the currentOp command during a multi-tenant migration protocol. A tenant migration + * is started, and the currentOp command is tested as the recipient moves through below + * state sequence. + * + * kStarted ---> kConsistent ---> kDone. * * Tenant migrations are not expected to be run on servers with ephemeralForTest. * * @tags: [ + * incompatible_with_shard_merge, * incompatible_with_eft, * incompatible_with_macos, * incompatible_with_windows_tls, @@ -39,9 +42,6 @@ const migrationOpts = { const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); -const shardMergeIsEnabled = - TenantMigrationUtil.isShardMergeEnabled(recipientPrimary.getDB("admin")); - // Initial inserts to test cloner stats. const dbsToClone = ["db0", "db1", "db2"]; const collsToClone = ["coll0", "coll1"]; @@ -75,27 +75,25 @@ function checkPostConsistentFieldsOK(res) { assert(currOp.hasOwnProperty("startApplyingDonorOpTime") && checkOptime(currOp.startApplyingDonorOpTime), res); + assert(currOp.hasOwnProperty("cloneFinishedRecipientOpTime") && + checkOptime(currOp.cloneFinishedRecipientOpTime), + res); assert(currOp.hasOwnProperty("dataConsistentStopDonorOpTime") && checkOptime(currOp.dataConsistentStopDonorOpTime), res); - if (!shardMergeIsEnabled) { - assert(currOp.hasOwnProperty("cloneFinishedRecipientOpTime") && - checkOptime(currOp.cloneFinishedRecipientOpTime), - res); - assert(currOp.hasOwnProperty("approxTotalDataSize") && - currOp.approxTotalDataSize instanceof NumberLong, - res); - assert(currOp.hasOwnProperty("approxTotalBytesCopied") && - currOp.approxTotalBytesCopied instanceof NumberLong, - res); - assert(currOp.hasOwnProperty("totalReceiveElapsedMillis") && - currOp.totalReceiveElapsedMillis instanceof NumberLong, - res); - assert(currOp.hasOwnProperty("remainingReceiveEstimatedMillis") && - currOp.remainingReceiveEstimatedMillis instanceof NumberLong, - res); - } + assert(currOp.hasOwnProperty("approxTotalDataSize") && + currOp.approxTotalDataSize instanceof NumberLong, + res); + assert(currOp.hasOwnProperty("approxTotalBytesCopied") && + currOp.approxTotalBytesCopied instanceof NumberLong, + res); + assert(currOp.hasOwnProperty("totalReceiveElapsedMillis") && + currOp.totalReceiveElapsedMillis instanceof NumberLong, + res); + assert(currOp.hasOwnProperty("remainingReceiveEstimatedMillis") && + currOp.remainingReceiveEstimatedMillis instanceof NumberLong, + res); } // Validates the fields of an optime object. @@ -127,88 +125,87 @@ jsTestLog("Starting tenant migration with migrationId: " + kMigrationId + assert.commandWorked( tenantMigrationTest.startMigration(migrationOpts, {enableDonorStartMigrationFsync: true})); -// Wait until a current operation corresponding to "tenant recipient migration" with state kStarted -// is visible on the recipientPrimary. -jsTestLog("Waiting until current operation with state kStarted is visible."); -fpAfterPersistingStateDoc.wait(); - -let res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); -checkStandardFieldsOK(res); -let currOp = res.inprog[0]; -assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kStarted, res); -assert.eq(currOp.migrationCompleted, false, res); -assert.eq(currOp.dataSyncCompleted, false, res); -assert(!currOp.hasOwnProperty("startFetchingDonorOpTime"), res); -assert(!currOp.hasOwnProperty("startApplyingDonorOpTime"), res); -assert(!currOp.hasOwnProperty("dataConsistentStopDonorOpTime"), res); -assert(!currOp.hasOwnProperty("expireAt"), res); -assert(!currOp.hasOwnProperty("donorSyncSource"), res); -if (!shardMergeIsEnabled) { +{ + // Wait until a current operation corresponding to "tenant recipient migration" with state + // kStarted is visible on the recipientPrimary. + jsTestLog("Waiting until current operation with state kStarted is visible."); + fpAfterPersistingStateDoc.wait(); + + let res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); + checkStandardFieldsOK(res); + let currOp = res.inprog[0]; + assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kStarted, res); + assert.eq(currOp.migrationCompleted, false, res); + assert.eq(currOp.dataSyncCompleted, false, res); + assert(!currOp.hasOwnProperty("startFetchingDonorOpTime"), res); + assert(!currOp.hasOwnProperty("startApplyingDonorOpTime"), res); + assert(!currOp.hasOwnProperty("expireAt"), res); + assert(!currOp.hasOwnProperty("donorSyncSource"), res); assert(!currOp.hasOwnProperty("cloneFinishedRecipientOpTime"), res); + assert(!currOp.hasOwnProperty("dataConsistentStopDonorOpTime"), res); assert(!currOp.hasOwnProperty("approxTotalDataSize"), res); assert(!currOp.hasOwnProperty("approxTotalBytesCopied"), res); assert(!currOp.hasOwnProperty("totalReceiveElapsedMillis"), res); assert(!currOp.hasOwnProperty("remainingReceiveEstimatedMillis"), res); + fpAfterPersistingStateDoc.off(); } -fpAfterPersistingStateDoc.off(); - -// Allow the migration to move to the point where the startFetchingDonorOpTime has been obtained. -jsTestLog("Waiting for startFetchingDonorOpTime to exist."); -fpAfterRetrievingStartOpTime.wait(); - -res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); -checkStandardFieldsOK(res); -currOp = res.inprog[0]; -assert.gt(new Date(), currOp.receiveStart, tojson(res)); -assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kStarted, res); -assert.eq(currOp.migrationCompleted, false, res); -assert.eq(currOp.dataSyncCompleted, false, res); -assert(!currOp.hasOwnProperty("dataConsistentStopDonorOpTime"), res); -assert(!currOp.hasOwnProperty("expireAt"), res); -if (!shardMergeIsEnabled) { + +{ + // Allow the migration to move to the point where the startFetchingDonorOpTime has been + // obtained. + jsTestLog("Waiting for startFetchingDonorOpTime to exist."); + fpAfterRetrievingStartOpTime.wait(); + + let res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); + checkStandardFieldsOK(res); + let currOp = res.inprog[0]; + assert.gt(new Date(), currOp.receiveStart, tojson(res)); + assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kStarted, res); + assert.eq(currOp.migrationCompleted, false, res); + assert.eq(currOp.dataSyncCompleted, false, res); + assert(!currOp.hasOwnProperty("expireAt"), res); assert(!currOp.hasOwnProperty("cloneFinishedRecipientOpTime"), res); + assert(currOp.hasOwnProperty("startFetchingDonorOpTime") && + checkOptime(currOp.startFetchingDonorOpTime), + res); + assert(currOp.hasOwnProperty("startApplyingDonorOpTime") && + checkOptime(currOp.startApplyingDonorOpTime), + res); + assert(currOp.hasOwnProperty("donorSyncSource") && typeof currOp.donorSyncSource === 'string', + res); + assert(!currOp.hasOwnProperty("dataConsistentStopDonorOpTime"), res); assert(!currOp.hasOwnProperty("approxTotalDataSize"), res); assert(!currOp.hasOwnProperty("approxTotalBytesCopied"), res); assert(!currOp.hasOwnProperty("totalReceiveElapsedMillis"), res); assert(!currOp.hasOwnProperty("remainingReceiveEstimatedMillis"), res); + fpAfterRetrievingStartOpTime.off(); } -assert(currOp.hasOwnProperty("startFetchingDonorOpTime") && - checkOptime(currOp.startFetchingDonorOpTime), - res); -assert(currOp.hasOwnProperty("startApplyingDonorOpTime") && - checkOptime(currOp.startApplyingDonorOpTime), - res); -assert(currOp.hasOwnProperty("donorSyncSource") && typeof currOp.donorSyncSource === 'string', res); -fpAfterRetrievingStartOpTime.off(); - -jsTestLog("Waiting until we are ready to fetch committed transactions."); -fpBeforeFetchingTransactions.wait(); - -res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); -checkStandardFieldsOK(res); -currOp = res.inprog[0]; - -if (shardMergeIsEnabled) { - assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kLearnedFilenames, res); -} else { + +{ + jsTestLog("Waiting until we are ready to fetch committed transactions."); + fpBeforeFetchingTransactions.wait(); + + let res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); + checkStandardFieldsOK(res); + let currOp = res.inprog[0]; + assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kStarted, res); -} -assert.eq(currOp.migrationCompleted, false, res); -assert.eq(currOp.dataSyncCompleted, false, res); -assert(!currOp.hasOwnProperty("expireAt"), res); -// Must exist now. -assert(currOp.hasOwnProperty("startFetchingDonorOpTime") && - checkOptime(currOp.startFetchingDonorOpTime), - res); -assert(currOp.hasOwnProperty("startApplyingDonorOpTime") && - checkOptime(currOp.startApplyingDonorOpTime), - res); -assert(currOp.hasOwnProperty("donorSyncSource") && typeof currOp.donorSyncSource === 'string', res); -assert(currOp.hasOwnProperty("dataConsistentStopDonorOpTime") && - checkOptime(currOp.dataConsistentStopDonorOpTime), - res); -if (!shardMergeIsEnabled) { + assert.eq(currOp.migrationCompleted, false, res); + assert.eq(currOp.dataSyncCompleted, false, res); + assert(!currOp.hasOwnProperty("expireAt"), res); + // Must exist now. + assert(currOp.hasOwnProperty("startFetchingDonorOpTime") && + checkOptime(currOp.startFetchingDonorOpTime), + res); + assert(currOp.hasOwnProperty("startApplyingDonorOpTime") && + checkOptime(currOp.startApplyingDonorOpTime), + res); + assert(currOp.hasOwnProperty("donorSyncSource") && typeof currOp.donorSyncSource === 'string', + res); + assert(currOp.hasOwnProperty("dataConsistentStopDonorOpTime") && + checkOptime(currOp.dataConsistentStopDonorOpTime), + res); assert(currOp.hasOwnProperty("cloneFinishedRecipientOpTime") && checkOptime(currOp.cloneFinishedRecipientOpTime), res); @@ -224,44 +221,47 @@ if (!shardMergeIsEnabled) { assert(currOp.hasOwnProperty("remainingReceiveEstimatedMillis") && currOp.remainingReceiveEstimatedMillis instanceof NumberLong, res); + fpBeforeFetchingTransactions.off(); +} + +{ + // Wait for the "kConsistent" state to be reached. + jsTestLog("Waiting for the kConsistent state to be reached."); + fpAfterDataConsistent.wait(); + const fpBeforePersistingRejectReadsBeforeTimestamp = configureFailPoint( + recipientPrimary, "fpBeforePersistingRejectReadsBeforeTimestamp", {action: "hang"}); + + let res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); + checkStandardFieldsOK(res); + checkPostConsistentFieldsOK(res); + let currOp = res.inprog[0]; + // State should have changed. + assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kConsistent, res); + assert.eq(currOp.migrationCompleted, false, res); + assert.eq(currOp.dataSyncCompleted, false, res); + assert(!currOp.hasOwnProperty("expireAt"), res); + + // Wait to receive recipientSyncData with returnAfterReachingDonorTimestamp. + fpAfterDataConsistent.off(); + fpBeforePersistingRejectReadsBeforeTimestamp.wait(); + + res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); + checkStandardFieldsOK(res); + checkPostConsistentFieldsOK(res); + currOp = res.inprog[0]; + // State should have changed. + assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kConsistent, res); + assert.eq(currOp.migrationCompleted, false, res); + assert.eq(currOp.dataSyncCompleted, false, res); + assert(!currOp.hasOwnProperty("expireAt"), res); + // The oplog applier should have applied at least the noop resume token. + assert.gte(currOp.numOpsApplied, 1, tojson(res)); + fpBeforePersistingRejectReadsBeforeTimestamp.off(); + + jsTestLog("Waiting for migration to complete."); + TenantMigrationTest.assertCommitted( + tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); } -fpBeforeFetchingTransactions.off(); - -// Wait for the "kConsistent" state to be reached. -jsTestLog("Waiting for the kConsistent state to be reached."); -fpAfterDataConsistent.wait(); -const fpBeforePersistingRejectReadsBeforeTimestamp = configureFailPoint( - recipientPrimary, "fpBeforePersistingRejectReadsBeforeTimestamp", {action: "hang"}); - -res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); -checkStandardFieldsOK(res); -checkPostConsistentFieldsOK(res); -currOp = res.inprog[0]; -// State should have changed. -assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kConsistent, res); -assert.eq(currOp.migrationCompleted, false, res); -assert.eq(currOp.dataSyncCompleted, false, res); -assert(!currOp.hasOwnProperty("expireAt"), res); - -// Wait to receive recipientSyncData with returnAfterReachingDonorTimestamp. -fpAfterDataConsistent.off(); -fpBeforePersistingRejectReadsBeforeTimestamp.wait(); - -res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); -checkStandardFieldsOK(res); -checkPostConsistentFieldsOK(res); -currOp = res.inprog[0]; -// State should have changed. -assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kConsistent, res); -assert.eq(currOp.migrationCompleted, false, res); -assert.eq(currOp.dataSyncCompleted, false, res); -assert(!currOp.hasOwnProperty("expireAt"), res); -// The oplog applier should have applied at least the noop resume token. -assert.gte(currOp.numOpsApplied, 1, tojson(res)); -fpBeforePersistingRejectReadsBeforeTimestamp.off(); - -jsTestLog("Waiting for migration to complete."); -TenantMigrationTest.assertCommitted(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); jsTestLog("Issuing a forget migration command."); const forgetMigrationThread = @@ -271,33 +271,34 @@ const forgetMigrationThread = true /* retryOnRetryableErrors */); forgetMigrationThread.start(); -jsTestLog("Waiting for the recipient to receive the forgetMigration, and pause at failpoint"); -fpAfterForgetMigration.wait(); - -res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); -checkStandardFieldsOK(res); -checkPostConsistentFieldsOK(res); -currOp = res.inprog[0]; -assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kConsistent, res); -assert.eq(currOp.migrationCompleted, false, res); -// dataSyncCompleted should have changed. -assert.eq(currOp.dataSyncCompleted, true, res); -assert(!currOp.hasOwnProperty("expireAt"), res); - -jsTestLog("Allow the forgetMigration to complete."); -fpAfterForgetMigration.off(); -assert.commandWorked(forgetMigrationThread.returnData()); - -res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); -checkStandardFieldsOK(res); -checkPostConsistentFieldsOK(res); -currOp = res.inprog[0]; -assert.eq(currOp.dataSyncCompleted, true, res); -// State, completion status and expireAt should have changed. -assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kDone, res); -assert.eq(currOp.migrationCompleted, true, res); -assert(currOp.hasOwnProperty("expireAt") && currOp.expireAt instanceof Date, res); -if (!shardMergeIsEnabled) { +{ + jsTestLog("Waiting for the recipient to receive the forgetMigration, and pause at failpoint"); + fpAfterForgetMigration.wait(); + + let res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); + checkStandardFieldsOK(res); + checkPostConsistentFieldsOK(res); + let currOp = res.inprog[0]; + assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kConsistent, res); + assert.eq(currOp.migrationCompleted, false, res); + // dataSyncCompleted should have changed. + assert.eq(currOp.dataSyncCompleted, true, res); + assert(!currOp.hasOwnProperty("expireAt"), res); + + jsTestLog("Allow the forgetMigration to complete."); + fpAfterForgetMigration.off(); + assert.commandWorked(forgetMigrationThread.returnData()); + + res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); + checkStandardFieldsOK(res); + checkPostConsistentFieldsOK(res); + currOp = res.inprog[0]; + assert.eq(currOp.dataSyncCompleted, true, res); + // State, completion status and expireAt should have changed. + assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kDone, res); + assert.eq(currOp.migrationCompleted, true, res); + assert(currOp.hasOwnProperty("expireAt") && currOp.expireAt instanceof Date, res); + assert(currOp.hasOwnProperty("databases")); assert.eq(0, currOp.databases.databasesClonedBeforeFailover, tojson(res)); assert.eq(dbsToClone.length, currOp.databases.databasesToClone, tojson(res)); @@ -327,4 +328,4 @@ if (!shardMergeIsEnabled) { } tenantMigrationTest.stop(); -})(); +})();
\ No newline at end of file diff --git a/jstests/replsets/tenant_migration_recipient_rollback_recovery.js b/jstests/replsets/tenant_migration_recipient_rollback_recovery.js index 60344d7910c..4d00500f9ba 100644 --- a/jstests/replsets/tenant_migration_recipient_rollback_recovery.js +++ b/jstests/replsets/tenant_migration_recipient_rollback_recovery.js @@ -314,7 +314,7 @@ testRollbackInitialState(); jsTest.log("Test roll back recipient's state doc update"); [{ - pauseFailPoint: "fpBeforeMarkingCollectionClonerDone", + pauseFailPoint: "fpBeforeMarkingCloneSuccess", nextState: "reject", query: {dataConsistentStopDonorOpTime: {$exists: 1}} }, diff --git a/jstests/replsets/tenant_migration_recipient_shard_merge_learn_files.js b/jstests/replsets/tenant_migration_recipient_shard_merge_learn_files.js index 2f3856cb0b6..4ec16d15feb 100644 --- a/jstests/replsets/tenant_migration_recipient_shard_merge_learn_files.js +++ b/jstests/replsets/tenant_migration_recipient_shard_merge_learn_files.js @@ -42,7 +42,7 @@ const donorPrimary = tenantMigrationTest.getDonorPrimary(); // Do a majority write. tenantMigrationTest.insertDonorDB(tenantDB, collName); -const failpoint = "fpAfterStartingOplogApplierMigrationRecipientInstance"; +const failpoint = "fpBeforeMarkingCloneSuccess"; const waitInFailPoint = configureFailPoint(recipientPrimary, failpoint, {action: "hang"}); // In order to prevent the copying of "testTenantId" databases via logical cloning from donor to diff --git a/jstests/replsets/tenant_migration_shard_merge_recipient_current_op.js b/jstests/replsets/tenant_migration_shard_merge_recipient_current_op.js new file mode 100644 index 00000000000..93460e63558 --- /dev/null +++ b/jstests/replsets/tenant_migration_shard_merge_recipient_current_op.js @@ -0,0 +1,242 @@ +/** + * Tests the currentOp command during a shard merge protocol. A tenant migration is started, and the + * currentOp command is tested as the recipient moves through below state sequence. + * + * kStarted ---> kLearnedFilenames ---> kConsistent ---> kDone. + * + * Tenant migrations are not expected to be run on servers with ephemeralForTest. + * + * @tags: [ + * featureFlagShardMerge, + * incompatible_with_eft, + * incompatible_with_macos, + * incompatible_with_windows_tls, + * requires_majority_read_concern, + * requires_persistence, + * serverless, + * ] + */ + +(function() { + +"use strict"; +load("jstests/libs/uuid_util.js"); // For extractUUIDFromObject(). +load("jstests/libs/fail_point_util.js"); // For configureFailPoint(). +load("jstests/libs/parallelTester.js"); // For the Thread(). +load("jstests/replsets/libs/tenant_migration_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); + +const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); + +const kMigrationId = UUID(); +const kTenantId = 'testTenantId'; +const kReadPreference = { + mode: "primary" +}; +const migrationOpts = { + migrationIdString: extractUUIDFromObject(kMigrationId), + tenantId: kTenantId, + readPreference: kReadPreference +}; + +const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); + +// Initial inserts to test cloner stats. +const dbsToClone = ["db0", "db1", "db2"]; +const collsToClone = ["coll0", "coll1"]; +const docs = [...Array(10).keys()].map((i) => ({x: i})); +for (const db of dbsToClone) { + const tenantDB = tenantMigrationTest.tenantDB(kTenantId, db); + for (const coll of collsToClone) { + tenantMigrationTest.insertDonorDB(tenantDB, coll, docs); + } +} + +// Makes sure the fields that are always expected to exist, such as the donorConnectionString, are +// correct. +function checkStandardFieldsOK(res) { + assert.eq(res.inprog.length, 1, res); + assert.eq(bsonWoCompare(res.inprog[0].instanceID, kMigrationId), 0, res); + assert.eq(res.inprog[0].donorConnectionString, tenantMigrationTest.getDonorRst().getURL(), res); + assert.eq(bsonWoCompare(res.inprog[0].readPreference, kReadPreference), 0, res); + // We don't test failovers in this test so we don't expect these counters to be incremented. + assert.eq(res.inprog[0].numRestartsDueToDonorConnectionFailure, 0, res); + assert.eq(res.inprog[0].numRestartsDueToRecipientFailure, 0, res); + assert.eq(bsonWoCompare(res.inprog[0].tenantId, kTenantId), 0, res); +} + +// Check currentOp fields' expected value once the recipient is in state "consistent" or later. +function checkPostConsistentFieldsOK(res) { + const currOp = res.inprog[0]; + assert(currOp.hasOwnProperty("startFetchingDonorOpTime") && + checkOptime(currOp.startFetchingDonorOpTime), + res); + assert(currOp.hasOwnProperty("startApplyingDonorOpTime") && + checkOptime(currOp.startApplyingDonorOpTime), + res); + assert(currOp.hasOwnProperty("cloneFinishedRecipientOpTime") && + checkOptime(currOp.cloneFinishedRecipientOpTime), + res); + // Not applicable to shard merge protocol. + assert(!currOp.hasOwnProperty("dataConsistentStopDonorOpTime")); +} + +// Validates the fields of an optime object. +function checkOptime(optime) { + assert(optime.ts instanceof Timestamp); + assert(optime.t instanceof NumberLong); + return true; +} + +// Set all failPoints up on the recipient's end to block the migration at certain points. The +// migration will be unblocked through the test to allow transitions to different states. +jsTestLog("Setting up all failPoints."); + +const fpAfterPersistingStateDoc = + configureFailPoint(recipientPrimary, + "fpAfterPersistingTenantMigrationRecipientInstanceStateDoc", + {action: "hang"}); +const fpAfterRetrievingStartOpTime = configureFailPoint( + recipientPrimary, "fpAfterRetrievingStartOpTimesMigrationRecipientInstance", {action: "hang"}); +const fpAfterDataConsistent = configureFailPoint( + recipientPrimary, "fpAfterDataConsistentMigrationRecipientInstance", {action: "hang"}); +const fpAfterForgetMigration = configureFailPoint( + recipientPrimary, "fpAfterReceivingRecipientForgetMigration", {action: "hang"}); + +jsTestLog("Starting tenant migration with migrationId: " + kMigrationId + + ", tenantId: " + kTenantId); +assert.commandWorked( + tenantMigrationTest.startMigration(migrationOpts, {enableDonorStartMigrationFsync: true})); + +{ + // Wait until a current operation corresponding to "tenant recipient migration" with state + // kStarted is visible on the recipientPrimary. + jsTestLog("Waiting until current operation with state kStarted is visible."); + fpAfterPersistingStateDoc.wait(); + + let res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); + checkStandardFieldsOK(res); + let currOp = res.inprog[0]; + assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kStarted, res); + assert.eq(currOp.migrationCompleted, false, res); + assert.eq(currOp.dataSyncCompleted, false, res); + assert(!currOp.hasOwnProperty("startFetchingDonorOpTime"), res); + assert(!currOp.hasOwnProperty("startApplyingDonorOpTime"), res); + assert(!currOp.hasOwnProperty("expireAt"), res); + assert(!currOp.hasOwnProperty("donorSyncSource"), res); + assert(!currOp.hasOwnProperty("cloneFinishedRecipientOpTime"), res); + // Not applicable to shard merge protocol. + assert(!currOp.hasOwnProperty("dataConsistentStopDonorOpTime"), res); + + fpAfterPersistingStateDoc.off(); +} + +{ + // Allow the migration to move to the point where the startFetchingDonorOpTime has been + // obtained. + jsTestLog("Waiting for startFetchingDonorOpTime to exist."); + fpAfterRetrievingStartOpTime.wait(); + + let res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); + checkStandardFieldsOK(res); + let currOp = res.inprog[0]; + assert.gt(new Date(), currOp.receiveStart, tojson(res)); + + assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kLearnedFilenames, res); + + assert.eq(currOp.migrationCompleted, false, res); + assert.eq(currOp.dataSyncCompleted, false, res); + assert(!currOp.hasOwnProperty("expireAt"), res); + assert(!currOp.hasOwnProperty("cloneFinishedRecipientOpTime"), res); + assert(currOp.hasOwnProperty("startFetchingDonorOpTime") && + checkOptime(currOp.startFetchingDonorOpTime), + res); + assert(currOp.hasOwnProperty("startApplyingDonorOpTime") && + checkOptime(currOp.startApplyingDonorOpTime), + res); + assert(currOp.hasOwnProperty("donorSyncSource") && typeof currOp.donorSyncSource === 'string', + res); + // Not applicable to shard merge protocol. + assert(!currOp.hasOwnProperty("dataConsistentStopDonorOpTime"), res); + + fpAfterRetrievingStartOpTime.off(); +} + +{ + // Wait for the "kConsistent" state to be reached. + jsTestLog("Waiting for the kConsistent state to be reached."); + fpAfterDataConsistent.wait(); + const fpBeforePersistingRejectReadsBeforeTimestamp = configureFailPoint( + recipientPrimary, "fpBeforePersistingRejectReadsBeforeTimestamp", {action: "hang"}); + + let res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); + checkStandardFieldsOK(res); + checkPostConsistentFieldsOK(res); + let currOp = res.inprog[0]; + // State should have changed. + assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kConsistent, res); + assert.eq(currOp.migrationCompleted, false, res); + assert.eq(currOp.dataSyncCompleted, false, res); + assert(!currOp.hasOwnProperty("expireAt"), res); + + // Wait to receive recipientSyncData with returnAfterReachingDonorTimestamp. + fpAfterDataConsistent.off(); + fpBeforePersistingRejectReadsBeforeTimestamp.wait(); + + res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); + checkStandardFieldsOK(res); + checkPostConsistentFieldsOK(res); + currOp = res.inprog[0]; + // State should have changed. + assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kConsistent, res); + assert.eq(currOp.migrationCompleted, false, res); + assert.eq(currOp.dataSyncCompleted, false, res); + assert(!currOp.hasOwnProperty("expireAt"), res); + // The oplog applier should have applied at least the noop resume token. + assert.gte(currOp.numOpsApplied, 1, tojson(res)); + fpBeforePersistingRejectReadsBeforeTimestamp.off(); + + jsTestLog("Waiting for migration to complete."); + TenantMigrationTest.assertCommitted( + tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); +} + +jsTestLog("Issuing a forget migration command."); +const forgetMigrationThread = + new Thread(TenantMigrationUtil.forgetMigrationAsync, + migrationOpts.migrationIdString, + TenantMigrationUtil.createRstArgs(tenantMigrationTest.getDonorRst()), + true /* retryOnRetryableErrors */); +forgetMigrationThread.start(); + +{ + jsTestLog("Waiting for the recipient to receive the forgetMigration, and pause at failpoint"); + fpAfterForgetMigration.wait(); + + let res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); + checkStandardFieldsOK(res); + checkPostConsistentFieldsOK(res); + let currOp = res.inprog[0]; + assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kConsistent, res); + assert.eq(currOp.migrationCompleted, false, res); + // dataSyncCompleted should have changed. + assert.eq(currOp.dataSyncCompleted, true, res); + assert(!currOp.hasOwnProperty("expireAt"), res); + + jsTestLog("Allow the forgetMigration to complete."); + fpAfterForgetMigration.off(); + assert.commandWorked(forgetMigrationThread.returnData()); + + res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); + checkStandardFieldsOK(res); + checkPostConsistentFieldsOK(res); + currOp = res.inprog[0]; + assert.eq(currOp.dataSyncCompleted, true, res); + // State, completion status and expireAt should have changed. + assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kDone, res); + assert.eq(currOp.migrationCompleted, true, res); + assert(currOp.hasOwnProperty("expireAt") && currOp.expireAt instanceof Date, res); +} + +tenantMigrationTest.stop(); +})(); diff --git a/jstests/replsets/tenant_migration_timeseries_retryable_write_retry_on_recipient.js b/jstests/replsets/tenant_migration_timeseries_retryable_write_retry_on_recipient.js index 5fc0eb8b9ee..7fce50f5dcc 100644 --- a/jstests/replsets/tenant_migration_timeseries_retryable_write_retry_on_recipient.js +++ b/jstests/replsets/tenant_migration_timeseries_retryable_write_retry_on_recipient.js @@ -41,8 +41,8 @@ function testRetryOnRecipient(ordered) { const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); const recipientDb = recipientPrimary.getDB(kDbName); - const waitBeforeFetchingTransactions = configureFailPoint( - recipientPrimary, "fpBeforeFetchingCommittedTransactions", {action: "hang"}); + const pauseTenantMigrationBeforeLeavingDataSyncState = + configureFailPoint(donorPrimary, "pauseTenantMigrationBeforeLeavingDataSyncState"); const migrationId = UUID(); const migrationOpts = { @@ -96,14 +96,14 @@ function testRetryOnRecipient(ordered) { new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs); migrationThread.start(); - waitBeforeFetchingTransactions.wait(); + pauseTenantMigrationBeforeLeavingDataSyncState.wait(); jsTestLog("Run retryable writes during the migration"); assert.commandWorked(donorDb.runCommand(duringWrites.retryableInsertCommand)); // Wait for the migration to complete. jsTest.log("Waiting for migration to complete"); - waitBeforeFetchingTransactions.off(); + pauseTenantMigrationBeforeLeavingDataSyncState.off(); TenantMigrationTest.assertCommitted(migrationThread.returnData()); // Print the no-op oplog entries for debugging purposes. diff --git a/jstests/replsets/tenant_migrations_transaction_with_create_collection.js b/jstests/replsets/tenant_migrations_transaction_with_create_collection.js index 57508e2fafc..44207266628 100644 --- a/jstests/replsets/tenant_migrations_transaction_with_create_collection.js +++ b/jstests/replsets/tenant_migrations_transaction_with_create_collection.js @@ -31,8 +31,8 @@ const transactionsNS = "config.transactions"; const donorPrimary = tenantMigrationTest.getDonorPrimary(); const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); -const hangAfterStartingOplogApplier = configureFailPoint( - recipientPrimary, "fpAfterStartingOplogApplierMigrationRecipientInstance", {action: "hang"}); +const pauseTenantMigrationBeforeLeavingDataSyncState = + configureFailPoint(donorPrimary, "pauseTenantMigrationBeforeLeavingDataSyncState"); jsTestLog("Starting a migration"); const migrationId = UUID(); @@ -42,7 +42,7 @@ const migrationOpts = { }; assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); -hangAfterStartingOplogApplier.wait(); +pauseTenantMigrationBeforeLeavingDataSyncState.wait(); jsTestLog("Running transaction while the migration is running"); const session = donorPrimary.startSession(); @@ -60,7 +60,7 @@ assert.commandWorked(sessionColl.insert(doc3)); assert.commandWorked(session.commitTransaction_forTesting()); session.endSession(); -hangAfterStartingOplogApplier.off(); +pauseTenantMigrationBeforeLeavingDataSyncState.off(); jsTestLog("Waiting for migration to complete"); TenantMigrationTest.assertCommitted(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); diff --git a/src/mongo/db/repl/tenant_all_database_cloner.cpp b/src/mongo/db/repl/tenant_all_database_cloner.cpp index 1b633cf7ecf..97f89ab2fe7 100644 --- a/src/mongo/db/repl/tenant_all_database_cloner.cpp +++ b/src/mongo/db/repl/tenant_all_database_cloner.cpp @@ -175,7 +175,7 @@ BaseCloner::AfterStageBehavior TenantAllDatabaseCloner::listExistingDatabasesSta } } - if (!getSharedData()->isResuming()) { + if (getSharedData()->getResumePhase() == ResumePhase::kNone) { uassert(ErrorCodes::NamespaceExists, str::stream() << "Tenant '" << _tenantId << "': databases already exist prior to data sync", diff --git a/src/mongo/db/repl/tenant_all_database_cloner_test.cpp b/src/mongo/db/repl/tenant_all_database_cloner_test.cpp index 1326a042403..22765d9eb37 100644 --- a/src/mongo/db/repl/tenant_all_database_cloner_test.cpp +++ b/src/mongo/db/repl/tenant_all_database_cloner_test.cpp @@ -402,7 +402,7 @@ TEST_F(TenantAllDatabaseClonerTest, ResumingFromLastClonedDb) { _mockServer->setCommandReply("find", createFindResponse()); _mockServer->setCommandReply("dbStats", fromjson("{ok:1, dataSize: 30}")); - TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true); + TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync); auto cloner = makeAllDatabaseCloner(&resumingSharedData); cloner->setStopAfterStage_forTest("initializeStatsStage"); @@ -445,7 +445,7 @@ TEST_F(TenantAllDatabaseClonerTest, LastClonedDbDeleted_AllGreater) { _mockServer->setCommandReply("find", createFindResponse()); _mockServer->setCommandReply("dbStats", fromjson("{ok:1, dataSize: 30}")); - TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true); + TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync); auto cloner = makeAllDatabaseCloner(&resumingSharedData); cloner->setStopAfterStage_forTest("initializeStatsStage"); @@ -502,7 +502,7 @@ TEST_F(TenantAllDatabaseClonerTest, LastClonedDbDeleted_SomeGreater) { _mockServer->setCommandReply("find", createFindResponse()); _mockServer->setCommandReply("dbStats", fromjson("{ok:1, dataSize: 30}")); - TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true); + TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync); auto cloner = makeAllDatabaseCloner(&resumingSharedData); cloner->setStopAfterStage_forTest("initializeStatsStage"); @@ -570,7 +570,7 @@ TEST_F(TenantAllDatabaseClonerTest, LastClonedDbDeleted_AllLess) { _mockServer->setCommandReply("find", createFindResponse()); _mockServer->setCommandReply("dbStats", fromjson("{ok:1, dataSize: 30}")); - TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true); + TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync); auto cloner = makeAllDatabaseCloner(&resumingSharedData); cloner->setStopAfterStage_forTest("initializeStatsStage"); diff --git a/src/mongo/db/repl/tenant_collection_cloner.cpp b/src/mongo/db/repl/tenant_collection_cloner.cpp index d74b0760cd4..cd6ea25a650 100644 --- a/src/mongo/db/repl/tenant_collection_cloner.cpp +++ b/src/mongo/db/repl/tenant_collection_cloner.cpp @@ -324,7 +324,7 @@ BaseCloner::AfterStageBehavior TenantCollectionCloner::createCollectionStage() { uassert(ErrorCodes::NamespaceExists, str::stream() << "Tenant '" << _tenantId << "': collection '" << collection->ns() << "' already exists prior to data sync", - getSharedData()->isResuming()); + getSharedData()->getResumePhase() == ResumePhase::kDataSync); _existingNss = collection->ns(); LOGV2(5342502, @@ -393,7 +393,8 @@ BaseCloner::AfterStageBehavior TenantCollectionCloner::createCollectionStage() { _collectionOptions, !_idIndexSpec.isEmpty() /* createIdIndex */, _idIndexSpec); - if (status == ErrorCodes::NamespaceExists && getSharedData()->isResuming()) { + if (status == ErrorCodes::NamespaceExists && + getSharedData()->getResumePhase() == ResumePhase::kDataSync) { // If we are resuming from a recipient failover we can get ErrorCodes::NamespaceExists // due to following conditions: // diff --git a/src/mongo/db/repl/tenant_collection_cloner_test.cpp b/src/mongo/db/repl/tenant_collection_cloner_test.cpp index a7ae564e6e8..978a5efe234 100644 --- a/src/mongo/db/repl/tenant_collection_cloner_test.cpp +++ b/src/mongo/db/repl/tenant_collection_cloner_test.cpp @@ -890,7 +890,7 @@ TEST_F(TenantCollectionClonerTest, QueryPlanKilledThenNamespaceNotFoundSubsequen } TEST_F(TenantCollectionClonerTest, ResumeFromEmptyCollectionMissingAllSecondaryIndexes) { - TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true); + TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync); auto cloner = makeCollectionCloner(CollectionOptions(), &resumingSharedData); // Simulate that the collection already exists with no data and no secondary index. @@ -922,7 +922,7 @@ TEST_F(TenantCollectionClonerTest, ResumeFromEmptyCollectionMissingAllSecondaryI } TEST_F(TenantCollectionClonerTest, ResumeFromEmptyCollectionMissingSomeSecondaryIndexes) { - TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true); + TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync); auto cloner = makeCollectionCloner(CollectionOptions(), &resumingSharedData); // Simulate that the collection already exists with no data and some secondary indexes. @@ -956,7 +956,7 @@ TEST_F(TenantCollectionClonerTest, ResumeFromEmptyCollectionMissingSomeSecondary } TEST_F(TenantCollectionClonerTest, ResumeFromEmptyCollectionMissingNoSecondaryIndexes) { - TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true); + TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync); auto cloner = makeCollectionCloner(CollectionOptions(), &resumingSharedData); // Simulate that the collection already exists with no data and all matching secondary indexes. @@ -982,7 +982,7 @@ TEST_F(TenantCollectionClonerTest, ResumeFromEmptyCollectionMissingNoSecondaryIn } TEST_F(TenantCollectionClonerTest, ResumeFromNonEmptyCollection) { - TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true); + TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync); auto cloner = makeCollectionCloner(CollectionOptions(), &resumingSharedData); // Simulate that the collection already exists with some data. @@ -1015,7 +1015,7 @@ TEST_F(TenantCollectionClonerTest, ResumeFromNonEmptyCollection) { } TEST_F(TenantCollectionClonerTest, ResumeFromRecreatedCollection) { - TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true); + TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync); auto cloner = makeCollectionCloner(CollectionOptions(), &resumingSharedData); // Simulate that the namespace already exists under a different uuid. @@ -1045,7 +1045,7 @@ TEST_F(TenantCollectionClonerTest, ResumeFromRecreatedCollection) { } TEST_F(TenantCollectionClonerTest, ResumeFromRenamedCollection) { - TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true); + TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync); auto cloner = makeCollectionCloner(CollectionOptions(), &resumingSharedData); // Simulate that the collection already exists under a different name with no index and no data. diff --git a/src/mongo/db/repl/tenant_database_cloner.cpp b/src/mongo/db/repl/tenant_database_cloner.cpp index 0cde8bfa265..7885c67eaa5 100644 --- a/src/mongo/db/repl/tenant_database_cloner.cpp +++ b/src/mongo/db/repl/tenant_database_cloner.cpp @@ -238,7 +238,7 @@ BaseCloner::AfterStageBehavior TenantDatabaseCloner::listExistingCollectionsStag } } - if (!getSharedData()->isResuming()) { + if (getSharedData()->getResumePhase() == ResumePhase::kNone) { uassert(ErrorCodes::NamespaceExists, str::stream() << "Tenant '" << _tenantId << "': collections already exist prior to data sync", diff --git a/src/mongo/db/repl/tenant_database_cloner_test.cpp b/src/mongo/db/repl/tenant_database_cloner_test.cpp index 01540fe3d57..4b089101465 100644 --- a/src/mongo/db/repl/tenant_database_cloner_test.cpp +++ b/src/mongo/db/repl/tenant_database_cloner_test.cpp @@ -694,7 +694,7 @@ TEST_F(TenantDatabaseClonerTest, ResumingFromLastClonedCollection) { sizeOfOneCollection = swSize.getValue(); } - TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true); + TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync); auto cloner = makeDatabaseCloner(&resumingSharedData); cloner->setStopAfterStage_forTest("listExistingCollections"); @@ -758,7 +758,7 @@ TEST_F(TenantDatabaseClonerTest, LastClonedCollectionDeleted_AllGreater) { sizeANss = swSize.getValue(); } - TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true); + TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync); auto cloner = makeDatabaseCloner(&resumingSharedData); cloner->setStopAfterStage_forTest("listExistingCollections"); @@ -837,7 +837,7 @@ TEST_F(TenantDatabaseClonerTest, LastClonedCollectionDeleted_SomeGreater) { ANssBNssSize += swSizeBNss.getValue(); } - TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true); + TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync); auto cloner = makeDatabaseCloner(&resumingSharedData); cloner->setStopAfterStage_forTest("listExistingCollections"); @@ -927,7 +927,7 @@ TEST_F(TenantDatabaseClonerTest, LastClonedCollectionDeleted_AllLess) { sizeOfAllColls += swSizeCNss.getValue(); } - TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, /*resuming=*/true); + TenantMigrationSharedData resumingSharedData(&_clock, _migrationId, ResumePhase::kDataSync); auto cloner = makeDatabaseCloner(&resumingSharedData); cloner->setStopAfterStage_forTest("listExistingCollections"); diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 3f32411c555..32666685c4b 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -141,7 +141,6 @@ MONGO_FAIL_POINT_DEFINE(pauseAfterRunTenantMigrationRecipientInstance); MONGO_FAIL_POINT_DEFINE(skipTenantMigrationRecipientAuth); MONGO_FAIL_POINT_DEFINE(skipComparingRecipientAndDonorFCV); MONGO_FAIL_POINT_DEFINE(autoRecipientForgetMigration); -MONGO_FAIL_POINT_DEFINE(pauseAfterCreatingOplogBuffer); MONGO_FAIL_POINT_DEFINE(skipFetchingCommittedTransactions); MONGO_FAIL_POINT_DEFINE(skipFetchingRetryableWritesEntriesBeforeStartOpTime); @@ -160,7 +159,7 @@ MONGO_FAIL_POINT_DEFINE(fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime); MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogFetcherMigrationRecipientInstance); MONGO_FAIL_POINT_DEFINE(setTenantMigrationRecipientInstanceHostTimeout); MONGO_FAIL_POINT_DEFINE(pauseAfterRetrievingLastTxnMigrationRecipientInstance); -MONGO_FAIL_POINT_DEFINE(fpBeforeMarkingCollectionClonerDone); +MONGO_FAIL_POINT_DEFINE(fpBeforeMarkingCloneSuccess); MONGO_FAIL_POINT_DEFINE(fpBeforeFetchingCommittedTransactions); MONGO_FAIL_POINT_DEFINE(fpAfterFetchingCommittedTransactions); MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogApplierMigrationRecipientInstance); @@ -515,8 +514,14 @@ TenantMigrationRecipientService::Instance::waitUntilMigrationReachesReturnAfterR // In the event of a donor failover, it is possible that a new donor has stepped up and // initiated this 'recipientSyncData' cmd. Make sure the recipient is not in the middle of // restarting the oplog applier to retry the future chain. - opCtx->waitForConditionOrInterrupt( - _restartOplogApplierCondVar, lk, [&] { return !_isRestartingOplogApplier; }); + // + // For shard merge protocol, we start tenant oplog applier after recipient informs donor, + // the data is in consistent state. So, there is a possibility, recipient might receive + // recipientSyncData cmd with `returnAfterReachingDonorTimestamp` from donor before the + // recipient has started the tenant oplog applier. + opCtx->waitForConditionOrInterrupt(_oplogApplierReadyCondVar, lk, [&] { + return _oplogApplierReady || _dataSyncCompletionPromise.getFuture().isReady(); + }); if (_dataSyncCompletionPromise.getFuture().isReady()) { // When the data sync is done, we reset _tenantOplogApplier, so just throw the data sync // completion future result. @@ -649,8 +654,7 @@ OpTime TenantMigrationRecipientService::Instance::_getDonorMajorityOpTime( return majorityOpTime; } -SemiFuture<TenantMigrationRecipientService::Instance::ConnectionPair> -TenantMigrationRecipientService::Instance::_createAndConnectClients() { +SemiFuture<void> TenantMigrationRecipientService::Instance::_createAndConnectClients() { LOGV2_DEBUG(4880401, 1, "Recipient migration service connecting clients", @@ -763,12 +767,19 @@ TenantMigrationRecipientService::Instance::_createAndConnectClients() { applicationName += "_oplogFetcher"; auto oplogFetcherClient = _connectAndAuth(serverAddress, applicationName); return ConnectionPair(std::move(client), std::move(oplogFetcherClient)); + }) + .then([this, self = shared_from_this()](ConnectionPair connectionPair) { + stdx::lock_guard lk(_mutex); + if (_taskState.isInterrupted()) { + uassertStatusOK(_taskState.getInterruptStatus()); + } + + _client = std::move(connectionPair.first); + _oplogFetcherClient = std::move(connectionPair.second); }); }) .until([this, self = shared_from_this(), kDelayedMajorityOpTimeErrorCode]( - const StatusWith<ConnectionPair>& statusWith) { - auto status = statusWith.getStatus(); - + const Status& status) { if (status.isOK()) { return true; } @@ -1105,12 +1116,13 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_getDonorFilenam }); } -void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLock lk) { +SemiFuture<void> TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor() { + stdx::lock_guard lk(_mutex); // Get the last oplog entry at the read concern majority optime in the remote oplog. It // does not matter which tenant it is for. - if (_sharedData->isResuming()) { + if (_sharedData->getResumePhase() != ResumePhase::kNone) { // We are resuming a migration. - return; + return SemiFuture<void>::makeReady(); } auto isShardMerge = _stateDoc.getProtocol() == MigrationProtocolEnum::kShardMerge; @@ -1180,6 +1192,8 @@ void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLo // between startFetchingDonorOpTime and startApplyingDonorOpTime. _stateDoc.setStartFetchingDonorOpTime( std::min(startFetchingDonorOpTime, *_stateDoc.getStartApplyingDonorOpTime())); + + return _updateStateDocForMajority(lk); } AggregateCommandRequest @@ -1328,6 +1342,11 @@ void TenantMigrationRecipientService::Instance::_processCommittedTransactionEntr SemiFuture<void> TenantMigrationRecipientService::Instance::_fetchCommittedTransactionsBeforeStartOpTime() { + { + auto opCtx = cc().makeOperationContext(); + _stopOrHangOnFailPoint(&fpBeforeFetchingCommittedTransactions, opCtx.get()); + } + if (MONGO_unlikely(skipFetchingCommittedTransactions.shouldFail())) { // Test-only. return SemiFuture<void>::makeReady(); } @@ -1378,8 +1397,8 @@ TenantMigrationRecipientService::Instance::_fetchCommittedTransactionsBeforeStar .semi(); } -void TenantMigrationRecipientService::Instance::_createOplogBuffer() { - auto opCtx = cc().makeOperationContext(); +void TenantMigrationRecipientService::Instance::_createOplogBuffer(WithLock, + OperationContext* opCtx) { OplogBufferCollection::Options options; options.peekCacheSize = static_cast<size_t>(tenantMigrationOplogBufferPeekCacheSize); options.dropCollectionAtStartup = false; @@ -1388,35 +1407,16 @@ void TenantMigrationRecipientService::Instance::_createOplogBuffer() { auto oplogBufferNS = getOplogBufferNs(getMigrationUUID()); if (!_donorOplogBuffer) { - // Create the oplog buffer outside the mutex to avoid deadlock on a concurrent stepdown. + auto bufferCollection = std::make_unique<OplogBufferCollection>( - StorageInterface::get(opCtx.get()), oplogBufferNS, options); - stdx::lock_guard lk(_mutex); + StorageInterface::get(opCtx), oplogBufferNS, options); _donorOplogBuffer = std::move(bufferCollection); } - - { - stdx::lock_guard lk(_mutex); - invariant(_stateDoc.getStartFetchingDonorOpTime()); - } - - { - // Ensure we are primary when trying to startup and create the oplog buffer collection. - auto coordinator = repl::ReplicationCoordinator::get(opCtx.get()); - Lock::GlobalLock globalLock(opCtx.get(), MODE_IX); - if (!coordinator->canAcceptWritesForDatabase(opCtx.get(), oplogBufferNS.db())) { - uassertStatusOK( - Status(ErrorCodes::NotWritablePrimary, - "Recipient node is not primary, cannot create oplog buffer collection.")); - } - _donorOplogBuffer->startup(opCtx.get()); - } - - pauseAfterCreatingOplogBuffer.pauseWhileSet(); } SemiFuture<void> TenantMigrationRecipientService::Instance::_fetchRetryableWritesOplogBeforeStartOpTime() { + _stopOrHangOnFailPoint(&fpAfterRetrievingStartOpTimesMigrationRecipientInstance); if (MONGO_unlikely( skipFetchingRetryableWritesEntriesBeforeStartOpTime.shouldFail())) { // Test-only. return SemiFuture<void>::makeReady(); @@ -1575,6 +1575,8 @@ TenantMigrationRecipientService::Instance::_fetchRetryableWritesOplogBeforeStart } void TenantMigrationRecipientService::Instance::_startOplogFetcher() { + _stopOrHangOnFailPoint(&fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime); + auto opCtx = cc().makeOperationContext(); OpTime startFetchOpTime; auto resumingFromOplogBuffer = false; @@ -1586,7 +1588,7 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() { startFetchOpTime = *_stateDoc.getStartFetchingDonorOpTime(); } - if (_sharedData->isResuming()) { + if (_sharedData->getResumePhase() != ResumePhase::kNone) { // If the oplog buffer already contains fetched documents, we must be resuming a // migration. if (auto topOfOplogBuffer = _donorOplogBuffer->lastObjectPushed(opCtx.get())) { @@ -1614,7 +1616,7 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() { "sync source selection"); } - stdx::lock_guard lk(_mutex); + stdx::unique_lock lk(_mutex); OplogFetcher::Config oplogFetcherConfig( startFetchOpTime, _oplogFetcherClient->getServerHostAndPort(), @@ -1650,6 +1652,9 @@ void TenantMigrationRecipientService::Instance::_startOplogFetcher() { std::move(oplogFetcherConfig)); _donorOplogFetcher->setConnection(std::move(_oplogFetcherClient)); uassertStatusOK(_donorOplogFetcher->startup()); + + lk.unlock(); + _stopOrHangOnFailPoint(&fpAfterStartingOplogFetcherMigrationRecipientInstance); } Status TenantMigrationRecipientService::Instance::_enqueueDocuments( @@ -1772,17 +1777,8 @@ void TenantMigrationRecipientService::Instance::_stopOrHangOnFailPoint(FailPoint } } -bool TenantMigrationRecipientService::Instance::_isCloneCompletedMarkerSet(WithLock) const { - return _stateDoc.getCloneFinishedRecipientOpTime().has_value(); -} - -OpTime TenantMigrationRecipientService::Instance::_getOplogResumeApplyingDonorOptime() const { - auto cloneFinishedRecipientOpTime = [this, self = shared_from_this()] { - stdx::lock_guard lk(_mutex); - auto opt = _stateDoc.getCloneFinishedRecipientOpTime(); - invariant(opt.has_value()); - return *opt; - }(); +OpTime TenantMigrationRecipientService::Instance::_getOplogResumeApplyingDonorOptime( + const OpTime& cloneFinishedRecipientOpTime) const { auto opCtx = cc().makeOperationContext(); OplogInterfaceLocal oplog(opCtx.get()); auto oplogIter = oplog.makeIterator(); @@ -1820,7 +1816,7 @@ OpTime TenantMigrationRecipientService::Instance::_getOplogResumeApplyingDonorOp Future<void> TenantMigrationRecipientService::Instance::_startTenantAllDatabaseCloner(WithLock lk) { // If the state is data consistent, do not start the cloner. - if (_isCloneCompletedMarkerSet(lk)) { + if (_sharedData->getResumePhase() == ResumePhase::kOplogCatchup) { return {Future<void>::makeReady()}; } @@ -1908,39 +1904,42 @@ void TenantMigrationRecipientService::Instance::_advanceStableTimestampToStartAp } SemiFuture<void> TenantMigrationRecipientService::Instance::_onCloneSuccess() { + _stopOrHangOnFailPoint(&fpBeforeMarkingCloneSuccess); stdx::lock_guard lk(_mutex); // PrimaryOnlyService::onStepUp() before starting instance makes sure that the state doc // is majority committed, so we can also skip waiting for it to be majority replicated. - if (_isCloneCompletedMarkerSet(lk)) { + if (_sharedData->getResumePhase() == ResumePhase::kOplogCatchup) { return SemiFuture<void>::makeReady(); } - { + if (_protocol == MigrationProtocolEnum::kMultitenantMigrations) { stdx::lock_guard<TenantMigrationSharedData> sharedDatalk(*_sharedData); auto lastVisibleMajorityCommittedDonorOpTime = _sharedData->getLastVisibleOpTime(sharedDatalk); invariant(!lastVisibleMajorityCommittedDonorOpTime.isNull()); _stateDoc.setDataConsistentStopDonorOpTime(lastVisibleMajorityCommittedDonorOpTime); } + _stateDoc.setCloneFinishedRecipientOpTime( repl::ReplicationCoordinator::get(cc().getServiceContext())->getMyLastAppliedOpTime()); + return _updateStateDocForMajority(lk); +} - return ExecutorFuture(**_scopedExecutor) - .then([this, self = shared_from_this(), stateDoc = _stateDoc] { - auto opCtx = cc().makeOperationContext(); - - _stopOrHangOnFailPoint(&fpBeforeMarkingCollectionClonerDone, opCtx.get()); - uassertStatusOK( - tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), stateDoc)); +SemiFuture<TenantOplogApplier::OpTimePair> +TenantMigrationRecipientService::Instance::_waitForDataToBecomeConsistent() { + stdx::lock_guard lk(_mutex); + // PrimaryOnlyService::onStepUp() before starting instance makes sure that the state doc + // is majority committed, so we can also skip waiting for it to be majority replicated. + if (_stateDoc.getState() == TenantMigrationRecipientStateEnum::kConsistent) { + return SemiFuture<TenantOplogApplier::OpTimePair>::makeReady( + TenantOplogApplier::OpTimePair()); + } - auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - return WaitForMajorityService::get(opCtx->getServiceContext()) - .waitUntilMajority(writeOpTime, CancellationToken::uncancelable()); - }) - .semi(); + return _tenantOplogApplier->getNotificationForOpTime( + _stateDoc.getDataConsistentStopDonorOpTime().get()); } -SemiFuture<void> TenantMigrationRecipientService::Instance::_getDataConsistentFuture() { +SemiFuture<void> TenantMigrationRecipientService::Instance::_persistConsistentState() { stdx::lock_guard lk(_mutex); // PrimaryOnlyService::onStepUp() before starting instance makes sure that the state doc // is majority committed, so we can also skip waiting for it to be majority replicated. @@ -1948,24 +1947,41 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_getDataConsistentFu return SemiFuture<void>::makeReady(); } - return _tenantOplogApplier - ->getNotificationForOpTime(_stateDoc.getDataConsistentStopDonorOpTime().get()) + // Persist the state that tenant migration instance has reached + // consistent state. + _stateDoc.setState(TenantMigrationRecipientStateEnum::kConsistent); + return _updateStateDocForMajority(lk); +} + +SemiFuture<void> TenantMigrationRecipientService::Instance::_enterConsistentState() { + return _persistConsistentState() .thenRunOn(**_scopedExecutor) - .then( - [this, self = shared_from_this()](TenantOplogApplier::OpTimePair donorRecipientOpTime) { - stdx::lock_guard lk(_mutex); - // Persist the state that tenant migration instance has reached - // consistent state. - _stateDoc.setState(TenantMigrationRecipientStateEnum::kConsistent); - return _stateDoc; - }) - .then([this, self = shared_from_this()](TenantMigrationRecipientDocument stateDoc) { - auto opCtx = cc().makeOperationContext(); - uassertStatusOK( - tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), stateDoc)); - return WaitForMajorityService::get(opCtx->getServiceContext()) - .waitUntilMajority(repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), - CancellationToken::uncancelable()); + .then([this, self = shared_from_this()]() { + _stopOrHangOnFailPoint(&fpBeforeFulfillingDataConsistentPromise); + stdx::lock_guard lk(_mutex); + + auto donorConsistentOpTime = [&]() { + switch (_protocol) { + case MigrationProtocolEnum::kMultitenantMigrations: + return _stateDoc.getDataConsistentStopDonorOpTime(); + case MigrationProtocolEnum::kShardMerge: + return _stateDoc.getStartApplyingDonorOpTime(); + default: + MONGO_UNREACHABLE; + } + boost::optional<repl::OpTime>(); + }(); + invariant(donorConsistentOpTime && !donorConsistentOpTime->isNull()); + + LOGV2_DEBUG(4881101, + 1, + "Tenant migration recipient instance is in consistent state", + "migrationId"_attr = getMigrationUUID(), + "tenantId"_attr = getTenantId(), + "donorConsistentOpTime"_attr = *donorConsistentOpTime); + if (!_dataConsistentPromise.getFuture().isReady()) { + _dataConsistentPromise.emplaceValue(*donorConsistentOpTime); + } }) .semi(); } @@ -2208,9 +2224,6 @@ void TenantMigrationRecipientService::Instance::_cleanupOnDataSyncCompletion(Sta std::unique_ptr<ThreadPool> savedWriterPool; { stdx::lock_guard lk(_mutex); - _isRestartingOplogApplier = false; - _restartOplogApplierCondVar.notify_all(); - _cancelRemainingWork(lk); shutdownTarget(lk, _donorOplogFetcher); @@ -2223,6 +2236,9 @@ void TenantMigrationRecipientService::Instance::_cleanupOnDataSyncCompletion(Sta setPromiseErrorifNotReady(lk, _dataConsistentPromise, status); setPromiseErrorifNotReady(lk, _dataSyncCompletionPromise, status); + _oplogApplierReady = false; + _oplogApplierReadyCondVar.notify_all(); + // Save them to join() with it outside of _mutex. using std::swap; swap(savedDonorOplogFetcher, _donorOplogFetcher); @@ -2288,6 +2304,35 @@ void TenantMigrationRecipientService::Instance::_fetchAndStoreDonorClusterTimeKe tenant_migration_util::storeExternalClusterTimeKeyDocs(std::move(keyDocs)); } +SemiFuture<void> +TenantMigrationRecipientService::Instance::_checkIfFcvHasChangedSinceLastAttempt() { + stdx::lock_guard lk(_mutex); + + // Record the FCV at the start of a migration and check for changes in every + // subsequent attempt. Fail if there is any mismatch in FCV or + // upgrade/downgrade state. (Generic FCV reference): This FCV check should + // exist across LTS binary versions. + auto currentFCV = serverGlobalParams.featureCompatibility.getVersion(); + auto startingFCV = _stateDoc.getRecipientPrimaryStartingFCV(); + + if (!startingFCV) { + _stateDoc.setRecipientPrimaryStartingFCV(currentFCV); + return _updateStateDocForMajority(lk); + } + + if (startingFCV != currentFCV) { + LOGV2_ERROR(5356200, + "FCV may not change during migration", + "tenantId"_attr = getTenantId(), + "migrationId"_attr = getMigrationUUID(), + "startingFCV"_attr = startingFCV, + "currentFCV"_attr = currentFCV); + uasserted(5356201, "Detected FCV change from last migration attempt."); + } + + return SemiFuture<void>::makeReady(); +} + void TenantMigrationRecipientService::Instance::_compareRecipientAndDonorFCV() const { if (skipComparingRecipientAndDonorFCV.shouldFail()) { // Test-only. return; @@ -2347,6 +2392,207 @@ bool TenantMigrationRecipientService::Instance::_checkifProtocolRemainsFCVCompat return true; } +void TenantMigrationRecipientService::Instance::_startOplogApplier() { + _stopOrHangOnFailPoint(&fpAfterFetchingCommittedTransactions); + + stdx::unique_lock lk(_mutex); + const auto& cloneFinishedRecipientOpTime = _stateDoc.getCloneFinishedRecipientOpTime(); + invariant(cloneFinishedRecipientOpTime); + + OpTime resumeOpTime; + if (_sharedData->getResumePhase() == ResumePhase::kOplogCatchup) { + lk.unlock(); + // We avoid holding the mutex while scanning the local oplog which + // acquires the RSTL in IX mode. This is to allow us to be interruptable + // via a concurrent stepDown which acquires the RSTL in X mode. + resumeOpTime = _getOplogResumeApplyingDonorOptime(*cloneFinishedRecipientOpTime); + lk.lock(); + } + + // Throwing error when cloner is canceled externally via interrupt(), + // makes the instance to skip the remaining task (i.e., starting oplog + // applier) in the sync process. This step is necessary to prevent race + // between interrupt() and starting oplog applier for the failover + // scenarios where we don't start the cloner if the tenant data is + // already in consistent state. + if (_taskState.isInterrupted()) { + uassertStatusOK(_taskState.getInterruptStatus()); + } + + const auto& startApplyingDonorOpTime = _stateDoc.getStartApplyingDonorOpTime(); + invariant(startApplyingDonorOpTime); + + _tenantOplogApplier = std::make_shared<TenantOplogApplier>( + _migrationUuid, + _tenantId, + (!resumeOpTime.isNull()) ? std::max(resumeOpTime, *startApplyingDonorOpTime) + : *startApplyingDonorOpTime, + _donorOplogBuffer.get(), + **_scopedExecutor, + _writerPool.get(), + resumeOpTime.getTimestamp()); + _tenantOplogApplier->setCloneFinishedRecipientOpTime(*cloneFinishedRecipientOpTime); + + LOGV2_DEBUG(4881202, + 1, + "Recipient migration service starting oplog applier", + "tenantId"_attr = getTenantId(), + "migrationId"_attr = getMigrationUUID(), + "startApplyingAfterDonorOpTime"_attr = + _tenantOplogApplier->getStartApplyingAfterOpTime(), + "resumeBatchingTs"_attr = _tenantOplogApplier->getResumeBatchingTs()); + + uassertStatusOK(_tenantOplogApplier->startup()); + _oplogApplierReady = true; + _oplogApplierReadyCondVar.notify_all(); + + lk.unlock(); + _stopOrHangOnFailPoint(&fpAfterStartingOplogApplierMigrationRecipientInstance); +} + +void TenantMigrationRecipientService::Instance::_setup() { + auto opCtx = cc().makeOperationContext(); + { + stdx::lock_guard lk(_mutex); + // Do not set the internal states if the migration is already interrupted. + if (_taskState.isInterrupted()) { + uassertStatusOK(_taskState.getInterruptStatus()); + } + + // Reuse the _writerPool for retry of the future chain. + if (!_writerPool) { + _writerPool = makeTenantMigrationWriterPool(); + } + + ResumePhase resumePhase = [&] { + if (_stateDoc.getCloneFinishedRecipientOpTime()) { + invariant(_stateDoc.getStartFetchingDonorOpTime()); + return ResumePhase::kOplogCatchup; + } + if (_stateDoc.getStartFetchingDonorOpTime()) { + return ResumePhase::kDataSync; + } + return ResumePhase::kNone; + }(); + + _sharedData = std::make_unique<TenantMigrationSharedData>( + getGlobalServiceContext()->getFastClockSource(), getMigrationUUID(), resumePhase); + + _createOplogBuffer(lk, opCtx.get()); + } + + // Start the oplog buffer outside the mutex to avoid deadlock on a concurrent stepdown. + try { + _donorOplogBuffer->startup(opCtx.get()); + } catch (DBException& ex) { + ex.addContext("Failed to create oplog buffer collection."); + throw; + } +} + +SemiFuture<TenantOplogApplier::OpTimePair> +TenantMigrationRecipientService::Instance::_waitForOplogApplierToStop() { + _stopOrHangOnFailPoint(&fpAfterDataConsistentMigrationRecipientInstance); + + stdx::lock_guard lk(_mutex); + // wait for oplog applier to complete/stop. + // The oplog applier does not exit normally; it must be shut down externally, + // e.g. by recipientForgetMigration. + return _tenantOplogApplier->getNotificationForOpTime(OpTime::max()); +} + +SemiFuture<TenantOplogApplier::OpTimePair> +TenantMigrationRecipientService::Instance::_migrateUsingMTMProtocol( + const CancellationToken& token) { + return ExecutorFuture(**_scopedExecutor) + .then([this, self = shared_from_this()] { return _getStartOpTimesFromDonor(); }) + .then([this, self = shared_from_this()] { + return _fetchRetryableWritesOplogBeforeStartOpTime(); + }) + .then([this, self = shared_from_this()] { _startOplogFetcher(); }) + .then([this, self = shared_from_this()] { + stdx::lock_guard lk(_mutex); + return _startTenantAllDatabaseCloner(lk); + }) + .then([this, self = shared_from_this()] { return _onCloneSuccess(); }) + .then([this, self = shared_from_this()] { + return _fetchCommittedTransactionsBeforeStartOpTime(); + }) + .then([this, self = shared_from_this()] { return _startOplogApplier(); }) + .then([this, self = shared_from_this()] { + _stopOrHangOnFailPoint(&fpAfterStartingOplogApplierMigrationRecipientInstance); + return _waitForDataToBecomeConsistent(); + }) + .then( + [this, self = shared_from_this()](TenantOplogApplier::OpTimePair donorRecipientOpTime) { + return _enterConsistentState(); + }) + .then([this, self = shared_from_this()] { return _waitForOplogApplierToStop(); }) + .semi(); +} + +SemiFuture<TenantOplogApplier::OpTimePair> +TenantMigrationRecipientService::Instance::_migrateUsingShardMergeProtocol( + const CancellationToken& token) { + return ExecutorFuture(**_scopedExecutor) + .then([this, self = shared_from_this(), token] { + return AsyncTry([this, self = shared_from_this(), token] { + return _getDonorFilenames(token); + }) + .until([](Status status) { + if (status.code() == ErrorCodes::BackupCursorOpenConflictWithCheckpoint) { + LOGV2_DEBUG(6113008, + 1, + "Retrying backup cursor creation after error", + "status"_attr = status); + // A checkpoint took place while opening a backup cursor. We + // should retry and *not* cancel migration. + return false; + } + + return true; + }) + .on(**_scopedExecutor, token); + }) + .then([this, self = shared_from_this(), token] { + LOGV2_DEBUG(6113200, + 1, + "Starting periodic 'getMore' requests to keep " + "backup cursor alive."); + stdx::lock_guard lk(_mutex); + _backupCursorKeepAliveCancellation = CancellationSource(token); + _backupCursorKeepAliveFuture = + shard_merge_utils::keepBackupCursorAlive(_backupCursorKeepAliveCancellation, + **_scopedExecutor, + _client->getServerHostAndPort(), + _donorFilenameBackupCursorId, + _donorFilenameBackupCursorNamespaceString); + }) + .then([this, self = shared_from_this()] { + stdx::lock_guard lk(_mutex); + _stateDoc.setState(TenantMigrationRecipientStateEnum::kLearnedFilenames); + return _updateStateDocForMajority(lk); + }) + .then([this, self = shared_from_this()] { return _getStartOpTimesFromDonor(); }) + .then([this, self = shared_from_this()] { + return _fetchRetryableWritesOplogBeforeStartOpTime(); + }) + .then([this, self = shared_from_this()] { _startOplogFetcher(); }) + .then([this, self = shared_from_this()] { + LOGV2_INFO(6113402, "Waiting for all nodes to call recipientVoteImportedFiles"); + return _importedFilesPromise.getFuture().semi(); + }) + .then([this, self = shared_from_this()] { return _killBackupCursor(); }) + .then([this, self = shared_from_this()] { return _onCloneSuccess(); }) + .then([this, self = shared_from_this()] { return _enterConsistentState(); }) + .then([this, self = shared_from_this()] { + return _fetchCommittedTransactionsBeforeStartOpTime(); + }) + .then([this, self = shared_from_this()] { return _startOplogApplier(); }) + .then([this, self = shared_from_this()] { return _waitForOplogApplierToStop(); }) + .semi(); +} + SemiFuture<void> TenantMigrationRecipientService::Instance::run( std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept { @@ -2474,7 +2720,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( stdx::lock_guard<Latch> lg(_mutex); uassert(ErrorCodes::TenantMigrationForgotten, str::stream() << "Migration " << getMigrationUUID() - << " already marked for garbage collect", + << " already marked for garbage collection", _stateDoc.getState() != TenantMigrationRecipientStateEnum::kDone && !_stateDoc.getExpireAt()); @@ -2489,277 +2735,32 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( &fpAfterPersistingTenantMigrationRecipientInstanceStateDoc); return _createAndConnectClients(); }) - .then([this, self = shared_from_this()](ConnectionPair ConnectionPair) { - stdx::lock_guard lk(_mutex); - if (_taskState.isInterrupted()) { - uassertStatusOK(_taskState.getInterruptStatus()); - } - - // interrupt() called after this code block will interrupt the cloner and - // fetcher. - _client = std::move(ConnectionPair.first); - _oplogFetcherClient = std::move(ConnectionPair.second); - - if (!_writerPool) { - // Create the writer pool and shared data. - _writerPool = makeTenantMigrationWriterPool(); - } - _sharedData = std::make_unique<TenantMigrationSharedData>( - getGlobalServiceContext()->getFastClockSource(), - getMigrationUUID(), - _stateDoc.getStartFetchingDonorOpTime().has_value()); - }) .then([this, self = shared_from_this(), token] { _stopOrHangOnFailPoint(&fpBeforeFetchingDonorClusterTimeKeys); _fetchAndStoreDonorClusterTimeKeyDocs(token); }) .then([this, self = shared_from_this()] { _stopOrHangOnFailPoint(&fpAfterConnectingTenantMigrationRecipientInstance); - stdx::lock_guard lk(_mutex); - - // Record the FCV at the start of a migration and check for changes in every - // subsequent attempt. Fail if there is any mismatch in FCV or - // upgrade/downgrade state. (Generic FCV reference): This FCV check should - // exist across LTS binary versions. - auto currentFCV = serverGlobalParams.featureCompatibility.getVersion(); - auto startingFCV = _stateDoc.getRecipientPrimaryStartingFCV(); - - if (!startingFCV) { - _stateDoc.setRecipientPrimaryStartingFCV(currentFCV); - return _updateStateDocForMajority(lk); - } - - if (startingFCV != currentFCV) { - LOGV2_ERROR(5356200, - "FCV may not change during migration", - "tenantId"_attr = getTenantId(), - "migrationId"_attr = getMigrationUUID(), - "startingFCV"_attr = startingFCV, - "currentFCV"_attr = currentFCV); - uasserted(5356201, "Detected FCV change from last migration attempt."); - } - - return SemiFuture<void>::makeReady(); + return _checkIfFcvHasChangedSinceLastAttempt(); }) .then([this, self = shared_from_this()] { _stopOrHangOnFailPoint(&fpAfterRecordingRecipientPrimaryStartingFCV); _compareRecipientAndDonorFCV(); }) - .then([this, self = shared_from_this(), token] { + .then([this, self = shared_from_this()] { _stopOrHangOnFailPoint(&fpAfterComparingRecipientAndDonorFCV); - if (_stateDoc.getProtocol() != MigrationProtocolEnum::kShardMerge) { - return SemiFuture<void>::makeReady().thenRunOn(**_scopedExecutor); - } - - return AsyncTry([this, self = shared_from_this(), token] { - return _getDonorFilenames(token); - }) - .until([](Status status) { - if (status.code() == - ErrorCodes::BackupCursorOpenConflictWithCheckpoint) { - LOGV2_DEBUG(6113008, - 1, - "Retrying backup cursor creation after error", - "status"_attr = status); - // A checkpoint took place while opening a backup cursor. We - // should retry and *not* cancel migration. - return false; - } - - return true; - }) - .on(**_scopedExecutor, token); + // Sets up internal state to begin migration. + _setup(); }) .then([this, self = shared_from_this(), token] { - if (_stateDoc.getProtocol() != MigrationProtocolEnum::kShardMerge) { - return; - } - - LOGV2_DEBUG(6113200, - 1, - "Starting periodic 'getMore' requests to keep " - "backup cursor alive."); - stdx::lock_guard lk(_mutex); - _backupCursorKeepAliveCancellation = CancellationSource(token); - _backupCursorKeepAliveFuture = shard_merge_utils::keepBackupCursorAlive( - _backupCursorKeepAliveCancellation, - **_scopedExecutor, - _client->getServerHostAndPort(), - _donorFilenameBackupCursorId, - _donorFilenameBackupCursorNamespaceString); - }) - .then([this, self = shared_from_this()] { - stdx::lock_guard lk(_mutex); - _getStartOpTimesFromDonor(lk); - return _updateStateDocForMajority(lk); - }) - .then([this, self = shared_from_this()] { - _stopOrHangOnFailPoint( - &fpAfterRetrievingStartOpTimesMigrationRecipientInstance); - _createOplogBuffer(); - return _fetchRetryableWritesOplogBeforeStartOpTime(); - }) - .then([this, self = shared_from_this()] { - _stopOrHangOnFailPoint( - &fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime); - _startOplogFetcher(); - }) - .then([this, self = shared_from_this()] { - _stopOrHangOnFailPoint( - &fpAfterStartingOplogFetcherMigrationRecipientInstance); - - stdx::unique_lock lk(_mutex); - - // Create the oplog applier but do not start it yet. - invariant(_stateDoc.getStartApplyingDonorOpTime()); - - OpTime beginApplyingAfterOpTime; - Timestamp resumeBatchingTs; - if (_isCloneCompletedMarkerSet(lk)) { - lk.unlock(); - // We avoid holding the mutex while scanning the local oplog which - // acquires the RSTL in IX mode. This is to allow us to be interruptable - // via a concurrent stepDown which acquires the RSTL in X mode. - const auto resumeOpTime = _getOplogResumeApplyingDonorOptime(); - if (!resumeOpTime.isNull()) { - // It's possible we've applied retryable writes no-op oplog entries - // with donor opTimes earlier than 'startApplyingDonorOpTime'. In - // this case, we resume batching from a timestamp earlier than the - // 'beginApplyingAfterOpTime'. - resumeBatchingTs = resumeOpTime.getTimestamp(); - } - - lk.lock(); - - // We are retrying from failure. Find the point at which we should resume - // oplog batching and oplog application. - beginApplyingAfterOpTime = - std::max(resumeOpTime, *_stateDoc.getStartApplyingDonorOpTime()); - LOGV2_DEBUG(5394601, - 1, - "Resuming oplog application from previous tenant " - "migration attempt", - "startApplyingDonorOpTime"_attr = beginApplyingAfterOpTime, - "resumeBatchingOpTime"_attr = resumeOpTime); - } else { - beginApplyingAfterOpTime = *_stateDoc.getStartApplyingDonorOpTime(); - } - - { - // Throwing error when cloner is canceled externally via interrupt(), - // makes the instance to skip the remaining task (i.e., starting oplog - // applier) in the sync process. This step is necessary to prevent race - // between interrupt() and starting oplog applier for the failover - // scenarios where we don't start the cloner if the tenant data is - // already in consistent state. - stdx::lock_guard<TenantMigrationSharedData> sharedDatalk(*_sharedData); - uassertStatusOK(_sharedData->getStatus(sharedDatalk)); - } - - LOGV2_DEBUG(4881202, - 1, - "Recipient migration service creating oplog applier", - "tenantId"_attr = getTenantId(), - "migrationId"_attr = getMigrationUUID(), - "startApplyingDonorOpTime"_attr = beginApplyingAfterOpTime); - _tenantOplogApplier = - std::make_shared<TenantOplogApplier>(_migrationUuid, - _tenantId, - beginApplyingAfterOpTime, - _donorOplogBuffer.get(), - **_scopedExecutor, - _writerPool.get(), - resumeBatchingTs); - }) - .then([this, self = shared_from_this()] { - if (_stateDoc.getProtocol() == MigrationProtocolEnum::kShardMerge) { - return Future<void>::makeReady(); + switch (_protocol) { + case MigrationProtocolEnum::kMultitenantMigrations: + return _migrateUsingMTMProtocol(token); + case MigrationProtocolEnum::kShardMerge: + return _migrateUsingShardMergeProtocol(token); + default: + MONGO_UNREACHABLE; } - stdx::lock_guard lk(_mutex); - return _startTenantAllDatabaseCloner(lk); - }) - .then([this, self = shared_from_this()] { - if (_stateDoc.getProtocol() == MigrationProtocolEnum::kShardMerge) { - return SemiFuture<void>::makeReady(); - } - return _onCloneSuccess(); - }) - .then([this, self = shared_from_this()] { - if (_stateDoc.getProtocol() != MigrationProtocolEnum::kShardMerge) { - return SemiFuture<void>::makeReady(); - } - - stdx::lock_guard lk(_mutex); - _stateDoc.setDataConsistentStopDonorOpTime( - _stateDoc.getStartApplyingDonorOpTime()); - _stateDoc.setState(TenantMigrationRecipientStateEnum::kLearnedFilenames); - return _updateStateDocForMajority(lk); - }) - .then([this, self = shared_from_this()] { - if (_stateDoc.getProtocol() != MigrationProtocolEnum::kShardMerge) { - return SemiFuture<void>::makeReady(); - } - - LOGV2_INFO(6113402, - "Waiting for all nodes to call recipientVoteImportedFiles"); - return _importedFilesPromise.getFuture().semi(); - }) - .then([this, self = shared_from_this()] { return _killBackupCursor(); }) - .then([this, self = shared_from_this()] { - { - auto opCtx = cc().makeOperationContext(); - _stopOrHangOnFailPoint(&fpBeforeFetchingCommittedTransactions, - opCtx.get()); - } - return _fetchCommittedTransactionsBeforeStartOpTime(); - }) - .then([this, self = shared_from_this()] { - _stopOrHangOnFailPoint(&fpAfterFetchingCommittedTransactions); - LOGV2_DEBUG(4881200, - 1, - "Recipient migration service starting oplog applier", - "tenantId"_attr = getTenantId(), - "migrationId"_attr = getMigrationUUID()); - { - stdx::lock_guard lk(_mutex); - auto cloneFinishedRecipientOpTime = - _stateDoc.getProtocol() == MigrationProtocolEnum::kShardMerge - ? repl::ReplicationCoordinator::get(cc().getServiceContext()) - ->getMyLastAppliedOpTime() - : *_stateDoc.getCloneFinishedRecipientOpTime(); - _tenantOplogApplier->setCloneFinishedRecipientOpTime( - cloneFinishedRecipientOpTime); - uassertStatusOK(_tenantOplogApplier->startup()); - _isRestartingOplogApplier = false; - _restartOplogApplierCondVar.notify_all(); - } - _stopOrHangOnFailPoint( - &fpAfterStartingOplogApplierMigrationRecipientInstance); - return _getDataConsistentFuture(); - }) - .then([this, self = shared_from_this()] { - _stopOrHangOnFailPoint(&fpBeforeFulfillingDataConsistentPromise); - stdx::lock_guard lk(_mutex); - LOGV2_DEBUG(4881101, - 1, - "Tenant migration recipient instance is in consistent state", - "migrationId"_attr = getMigrationUUID(), - "tenantId"_attr = getTenantId(), - "donorConsistentOpTime"_attr = - _stateDoc.getDataConsistentStopDonorOpTime()); - - if (!_dataConsistentPromise.getFuture().isReady()) { - _dataConsistentPromise.emplaceValue( - _stateDoc.getDataConsistentStopDonorOpTime().get()); - } - }) - .then([this, self = shared_from_this()] { - _stopOrHangOnFailPoint(&fpAfterDataConsistentMigrationRecipientInstance); - stdx::lock_guard lk(_mutex); - // wait for oplog applier to complete/stop. - // The oplog applier does not exit normally; it must be shut down externally, - // e.g. by recipientForgetMigration. - return _tenantOplogApplier->getNotificationForOpTime(OpTime::max()); }); }) .until([this, self = shared_from_this()]( @@ -2792,7 +2793,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( if (!_taskState.isRunning()) { _taskState.setState(TaskState::kRunning); } - _isRestartingOplogApplier = true; + _oplogApplierReady = false; // Clean up the async components before retrying the future chain. std::unique_ptr<OplogFetcher> savedDonorOplogFetcher; std::shared_ptr<TenantOplogApplier> savedTenantOplogApplier; diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h index 6972379144f..3378ffac337 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.h +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -357,7 +357,7 @@ public: * Creates and connects both the oplog fetcher client and the client used for other * operations. */ - SemiFuture<ConnectionPair> _createAndConnectClients(); + SemiFuture<void> _createAndConnectClients(); /** * Fetches all key documents from the donor's admin.system.keys collection, stores them in @@ -376,9 +376,9 @@ public: ExecutorFuture<void> _killBackupCursor(); /** - * Retrieves the start optimes from the donor and updates the in-memory state accordingly. + * Retrieves the start optimes from the donor and updates the on-disk state accordingly. */ - void _getStartOpTimesFromDonor(WithLock lk); + SemiFuture<void> _getStartOpTimesFromDonor(); /** * Pushes documents from oplog fetcher to oplog buffer. @@ -394,7 +394,7 @@ public: * Creates the oplog buffer that will be populated by donor oplog entries from the retryable * writes fetching stage and oplog fetching stage. */ - void _createOplogBuffer(); + void _createOplogBuffer(WithLock, OperationContext* opCtx); /** * Runs an aggregation that gets the entire oplog chain for every retryable write entry in @@ -439,16 +439,11 @@ public: BSONObj _getOplogFetcherFilter() const; /* - * Indicates that the recipient has completed the tenant cloning phase. - */ - bool _isCloneCompletedMarkerSet(WithLock) const; - - /* * Traverse backwards through the oplog to find the optime which tenant oplog application * should resume from. The oplog applier should resume applying entries that have a greater * optime than the returned value. */ - OpTime _getOplogResumeApplyingDonorOptime() const; + OpTime _getOplogResumeApplyingDonorOptime(const OpTime& cloneFinishedRecipientOpTime) const; /* * Starts the tenant cloner. @@ -457,6 +452,16 @@ public: Future<void> _startTenantAllDatabaseCloner(WithLock lk); /* + * Starts the tenant oplog applier. + */ + void _startOplogApplier(); + + /* + * Waits for tenant oplog applier to stop. + */ + SemiFuture<TenantOplogApplier::OpTimePair> _waitForOplogApplierToStop(); + + /* * Advances the stableTimestamp to be >= startApplyingDonorOpTime by: * 1. Advancing the clusterTime to startApplyingDonorOpTime * 2. Writing a no-op oplog entry with ts > startApplyingDonorOpTime @@ -466,7 +471,7 @@ public: const CancellationToken& token); /* - * Gets called when the cloner completes cloning data successfully. + * Gets called when the logical/file cloner completes cloning data successfully. * And, it is responsible to populate the 'dataConsistentStopDonorOpTime' * and 'cloneFinishedRecipientOpTime' fields in the state doc. */ @@ -479,6 +484,22 @@ public: SemiFuture<void> _getDataConsistentFuture(); /* + * Wait for the data cloned via logical cloner to be consistent. + */ + SemiFuture<TenantOplogApplier::OpTimePair> _waitForDataToBecomeConsistent(); + + /* + * Transitions the instance state to 'kConsistent'. + */ + SemiFuture<void> _enterConsistentState(); + + /* + * Persists the instance state doc and waits for it to be majority replicated. + * Throws an user assertion on failure. + */ + SemiFuture<void> _persistConsistentState(); + + /* * Cancels the tenant migration recipient instance task work. */ void _cancelRemainingWork(WithLock lk); @@ -514,19 +535,35 @@ public: /* * Returns the majority OpTime on the donor node that 'client' is connected to. */ - OpTime _getDonorMajorityOpTime(std::unique_ptr<mongo::DBClientConnection>& client); + + /* + * Detects recipient FCV changes during migration. + */ + SemiFuture<void> _checkIfFcvHasChangedSinceLastAttempt(); + /** * Enforces that the donor and recipient share the same featureCompatibilityVersion. */ void _compareRecipientAndDonorFCV() const; - /** + /* * Increments either 'totalSuccessfulMigrationsReceived' or 'totalFailedMigrationsReceived' * in TenantMigrationStatistics by examining status and promises. */ void _setMigrationStatsOnCompletion(Status completionStatus) const; + /* + * Sets up internal state to begin migration. + */ + void _setup(); + + SemiFuture<TenantOplogApplier::OpTimePair> _migrateUsingMTMProtocol( + const CancellationToken& token); + + SemiFuture<TenantOplogApplier::OpTimePair> _migrateUsingShardMergeProtocol( + const CancellationToken& token); + mutable Mutex _mutex = MONGO_MAKE_LATCH("TenantMigrationRecipientService::_mutex"); // All member variables are labeled with one of the following codes indicating the @@ -616,9 +653,10 @@ public: // Waiters are notified when 'tenantOplogApplier' is valid on restart. stdx::condition_variable _restartOplogApplierCondVar; // (M) - // Indicates that the oplog applier is being cleaned up due to restart of the future chain. - // This is set to true when the oplog applier is started up again. - bool _isRestartingOplogApplier = false; // (M) + // Waiters are notified when 'tenantOplogApplier' is ready to use. + stdx::condition_variable _oplogApplierReadyCondVar; // (M) + // Indicates whether 'tenantOplogApplier' is ready to use or not. + bool _oplogApplierReady = false; // (M) }; private: diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp index 589b3e63cd0..02ba6104543 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp @@ -1725,7 +1725,7 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherResumesFromTopOfOplogBuf // Hang after creating the oplog buffer collection but before starting the oplog fetcher. const auto hangBeforeFetcherFp = - globalFailPointRegistry().find("pauseAfterCreatingOplogBuffer"); + globalFailPointRegistry().find("fpAfterRetrievingStartOpTimesMigrationRecipientInstance"); auto initialTimesEntered = hangBeforeFetcherFp->setMode(FailPoint::alwaysOn, 0, BSON("action" @@ -1839,7 +1839,7 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogFetcherNoDocInBufferToResumeFro // Hang after creating the oplog buffer collection but before starting the oplog fetcher. const auto hangBeforeFetcherFp = - globalFailPointRegistry().find("pauseAfterCreatingOplogBuffer"); + globalFailPointRegistry().find("fpAfterRetrievingStartOpTimesMigrationRecipientInstance"); auto initialTimesEntered = hangBeforeFetcherFp->setMode(FailPoint::alwaysOn, 0, BSON("action" @@ -2040,8 +2040,8 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromLastNoOpOplog // The oplog applier should have started batching and applying at the donor opTime equal to // 'resumeOpTime'. const auto oplogApplier = getTenantOplogApplier(instance.get()); - ASSERT_EQUALS(resumeOpTime, oplogApplier->getBeginApplyingOpTime_forTest()); - ASSERT_EQUALS(resumeOpTime.getTimestamp(), oplogApplier->getResumeBatchingTs_forTest()); + ASSERT_EQUALS(resumeOpTime, oplogApplier->getStartApplyingAfterOpTime()); + ASSERT_EQUALS(resumeOpTime.getTimestamp(), oplogApplier->getResumeBatchingTs()); // Stop the oplog applier. instance->stopOplogApplier_forTest(); @@ -2213,10 +2213,9 @@ TEST_F(TenantMigrationRecipientServiceTest, const auto oplogApplier = getTenantOplogApplier(instance.get()); // Resume batching from the first migration no-op oplog entry. In this test, this is before // the 'startApplyingDonorOpTime'. - ASSERT_EQUALS(beforeStartApplyingOpTime.getTimestamp(), - oplogApplier->getResumeBatchingTs_forTest()); + ASSERT_EQUALS(beforeStartApplyingOpTime.getTimestamp(), oplogApplier->getResumeBatchingTs()); // The oplog applier starts applying from the donor opTime equal to 'beginApplyingOpTime'. - ASSERT_EQUALS(startApplyingOpTime, oplogApplier->getBeginApplyingOpTime_forTest()); + ASSERT_EQUALS(startApplyingOpTime, oplogApplier->getStartApplyingAfterOpTime()); // Stop the oplog applier. instance->stopOplogApplier_forTest(); @@ -2360,9 +2359,9 @@ TEST_F(TenantMigrationRecipientServiceTest, OplogApplierResumesFromStartDonorApp const auto oplogApplier = getTenantOplogApplier(instance.get()); // There is no oplog entry to resume batching from, so we treat it as if we are resuming // oplog application from the start. The 'resumeBatchingTs' will be a null timestamp. - ASSERT_EQUALS(Timestamp(), oplogApplier->getResumeBatchingTs_forTest()); + ASSERT_EQUALS(Timestamp(), oplogApplier->getResumeBatchingTs()); // The oplog applier starts applying from the donor opTime equal to 'beginApplyingOpTime'. - ASSERT_EQUALS(startApplyingOpTime, oplogApplier->getBeginApplyingOpTime_forTest()); + ASSERT_EQUALS(startApplyingOpTime, oplogApplier->getStartApplyingAfterOpTime()); // Stop the oplog applier. instance->stopOplogApplier_forTest(); @@ -2401,7 +2400,7 @@ TEST_F(TenantMigrationRecipientServiceTest, // Hang after creating the oplog buffer collection but before starting the oplog fetcher. const auto hangBeforeFetcherFp = - globalFailPointRegistry().find("pauseAfterCreatingOplogBuffer"); + globalFailPointRegistry().find("fpAfterRetrievingStartOpTimesMigrationRecipientInstance"); auto initialTimesEntered = hangBeforeFetcherFp->setMode(FailPoint::alwaysOn, 0, BSON("action" diff --git a/src/mongo/db/repl/tenant_migration_shared_data.h b/src/mongo/db/repl/tenant_migration_shared_data.h index 49c06c7ee7b..32ac179b13b 100644 --- a/src/mongo/db/repl/tenant_migration_shared_data.h +++ b/src/mongo/db/repl/tenant_migration_shared_data.h @@ -34,12 +34,15 @@ namespace mongo { namespace repl { + +enum class ResumePhase { kNone, kDataSync, kOplogCatchup }; + class TenantMigrationSharedData final : public ReplSyncSharedData { public: TenantMigrationSharedData(ClockSource* clock, const UUID& migrationId) - : ReplSyncSharedData(clock), _migrationId(migrationId), _resuming(false) {} - TenantMigrationSharedData(ClockSource* clock, const UUID& migrationId, bool resuming) - : ReplSyncSharedData(clock), _migrationId(migrationId), _resuming(resuming) {} + : ReplSyncSharedData(clock), _migrationId(migrationId), _resumePhase(ResumePhase::kNone) {} + TenantMigrationSharedData(ClockSource* clock, const UUID& migrationId, ResumePhase resumePhase) + : ReplSyncSharedData(clock), _migrationId(migrationId), _resumePhase(resumePhase) {} void setLastVisibleOpTime(WithLock, OpTime opTime); @@ -49,8 +52,8 @@ public: return _migrationId; } - bool isResuming() const { - return _resuming; + ResumePhase getResumePhase() const { + return _resumePhase; } private: @@ -61,8 +64,9 @@ private: // Id of the current tenant migration. const UUID _migrationId; - // Indicate whether the tenant migration is resuming from a failover. - const bool _resuming; + // Indicate the phase from which the tenant migration is resuming due to recipient/donor + // failovers. + const ResumePhase _resumePhase; }; } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp index 8a76189a52c..d92bd245187 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier.cpp @@ -65,7 +65,7 @@ MONGO_FAIL_POINT_DEFINE(fpBeforeTenantOplogApplyingBatch); TenantOplogApplier::TenantOplogApplier(const UUID& migrationUuid, const std::string& tenantId, - OpTime applyFromOpTime, + OpTime startApplyingAfterOpTime, RandomAccessOplogBuffer* oplogBuffer, std::shared_ptr<executor::TaskExecutor> executor, ThreadPool* writerPool, @@ -73,7 +73,7 @@ TenantOplogApplier::TenantOplogApplier(const UUID& migrationUuid, : AbstractAsyncComponent(executor.get(), std::string("TenantOplogApplier_") + tenantId), _migrationUuid(migrationUuid), _tenantId(tenantId), - _beginApplyingAfterOpTime(applyFromOpTime), + _startApplyingAfterOpTime(startApplyingAfterOpTime), _oplogBuffer(oplogBuffer), _executor(std::move(executor)), _writerPool(writerPool), @@ -93,7 +93,7 @@ SemiFuture<TenantOplogApplier::OpTimePair> TenantOplogApplier::getNotificationFo } // If this optime has already passed, just return a ready future. if (_lastAppliedOpTimesUpToLastBatch.donorOpTime >= donorOpTime || - _beginApplyingAfterOpTime >= donorOpTime) { + _startApplyingAfterOpTime >= donorOpTime) { return SemiFuture<OpTimePair>::makeReady(_lastAppliedOpTimesUpToLastBatch); } @@ -103,11 +103,11 @@ SemiFuture<TenantOplogApplier::OpTimePair> TenantOplogApplier::getNotificationFo return iter->second.getFuture().semi(); } -OpTime TenantOplogApplier::getBeginApplyingOpTime_forTest() const { - return _beginApplyingAfterOpTime; +OpTime TenantOplogApplier::getStartApplyingAfterOpTime() const { + return _startApplyingAfterOpTime; } -Timestamp TenantOplogApplier::getResumeBatchingTs_forTest() const { +Timestamp TenantOplogApplier::getResumeBatchingTs() const { return _resumeBatchingTs; } @@ -121,7 +121,7 @@ void TenantOplogApplier::setCloneFinishedRecipientOpTime(OpTime cloneFinishedRec Status TenantOplogApplier::_doStartup_inlock() noexcept { _oplogBatcher = std::make_shared<TenantOplogBatcher>( - _tenantId, _oplogBuffer, _executor, _resumeBatchingTs, _beginApplyingAfterOpTime); + _tenantId, _oplogBuffer, _executor, _resumeBatchingTs, _startApplyingAfterOpTime); auto status = _oplogBatcher->startup(); if (!status.isOK()) return status; @@ -906,9 +906,9 @@ std::vector<std::vector<const OplogEntry*>> TenantOplogApplier::_fillWriterVecto CachedCollectionProperties collPropertiesCache; for (auto&& op : batch->ops) { - // If the operation's optime is before or the same as the beginApplyingAfterOpTime we don't + // If the operation's optime is before or the same as the startApplyingAfterOpTime we don't // want to apply it, so don't include it in writerVectors. - if (op.entry.getOpTime() <= _beginApplyingAfterOpTime) + if (op.entry.getOpTime() <= _startApplyingAfterOpTime) continue; uassert(4886006, "Tenant oplog application does not support prepared transactions.", diff --git a/src/mongo/db/repl/tenant_oplog_applier.h b/src/mongo/db/repl/tenant_oplog_applier.h index 58c4533b0d6..2ee03341c43 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.h +++ b/src/mongo/db/repl/tenant_oplog_applier.h @@ -73,7 +73,7 @@ public: TenantOplogApplier(const UUID& migrationUuid, const std::string& tenantId, - OpTime applyFromOpTime, + OpTime StartApplyingAfterOpTime, RandomAccessOplogBuffer* oplogBuffer, std::shared_ptr<executor::TaskExecutor> executor, ThreadPool* writerPool, @@ -101,14 +101,14 @@ public: void setCloneFinishedRecipientOpTime(OpTime cloneFinishedRecipientOpTime); /** - * Returns the optime the applier will start applying from. Used for testing. + * Returns the optime the applier will start applying from. */ - OpTime getBeginApplyingOpTime_forTest() const; + OpTime getStartApplyingAfterOpTime() const; /** - * Returns the timestamp the applier will resume batching from. Used for testing. + * Returns the timestamp the applier will resume batching from. */ - Timestamp getResumeBatchingTs_forTest() const; + Timestamp getResumeBatchingTs() const; private: Status _doStartup_inlock() noexcept final; @@ -160,7 +160,7 @@ private: std::shared_ptr<TenantOplogBatcher> _oplogBatcher; // (R) const UUID _migrationUuid; // (R) const std::string _tenantId; // (R) - const OpTime _beginApplyingAfterOpTime; // (R) + const OpTime _startApplyingAfterOpTime; // (R) RandomAccessOplogBuffer* _oplogBuffer; // (R) std::shared_ptr<executor::TaskExecutor> _executor; // (R) // All no-op entries written by this tenant migration should have OpTime greater than this |