diff options
author | Suganthi Mani <suganthi.mani@mongodb.com> | 2023-03-17 03:03:16 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-03-17 04:11:13 +0000 |
commit | 8604298838c4d2e4414b9b90c13a5af85fe05d96 (patch) | |
tree | 688d31d97db2df047fded24c189360a90692c747 | |
parent | 9c5cc01a012b602d72d0c56148da04a6e95699a9 (diff) | |
download | mongo-8604298838c4d2e4414b9b90c13a5af85fe05d96.tar.gz |
SERVER-73397 New shard merge recipient POS instance, along with test changes.
89 files changed, 5596 insertions, 240 deletions
diff --git a/buildscripts/resmokelib/testing/hooks/shard_merge.py b/buildscripts/resmokelib/testing/hooks/shard_merge.py index c8b932bd151..69a42b14c4c 100644 --- a/buildscripts/resmokelib/testing/hooks/shard_merge.py +++ b/buildscripts/resmokelib/testing/hooks/shard_merge.py @@ -627,7 +627,7 @@ class _ShardMergeThread(threading.Thread): # pylint: disable=too-many-instance- try: recipient_node_client = self._create_client(recipient_node) res = recipient_node_client.config.command({ - "count": "tenantMigrationRecipients", "query": { + "count": "shardMergeRecipients", "query": { "_id": Binary(migration_opts.migration_id.bytes, UUID_SUBTYPE) } }) diff --git a/jstests/multiVersion/genericSetFCVUsage/tenant_migration_donor_recipient_fcv_mismatch.js b/jstests/multiVersion/genericSetFCVUsage/tenant_migration_donor_recipient_fcv_mismatch.js index 8c924efe732..c11f570e735 100644 --- a/jstests/multiVersion/genericSetFCVUsage/tenant_migration_donor_recipient_fcv_mismatch.js +++ b/jstests/multiVersion/genericSetFCVUsage/tenant_migration_donor_recipient_fcv_mismatch.js @@ -8,6 +8,7 @@ */ import {TenantMigrationTest} from "jstests/replsets/libs/tenant_migration_test.js"; +import {isShardMergeEnabled} from "jstests/replsets/libs/tenant_migration_util.js"; load("jstests/libs/fail_point_util.js"); load("jstests/libs/uuid_util.js"); // for 'extractUUIDFromObject' load("jstests/libs/parallelTester.js"); // for 'Thread' @@ -46,7 +47,11 @@ function runTest(downgradeFCV) { hangAfterSavingFCV.off(); // Make sure we see the FCV mismatch detection message on the recipient. - checkLog.containsJson(recipientPrimary, 5382300); + if (isShardMergeEnabled(recipientDB)) { + checkLog.containsJson(recipientPrimary, 7339749); + } else { + checkLog.containsJson(recipientPrimary, 5382300); + } // Upgrade again to check on the status of the migration from the donor's point of view. assert.commandWorked(donorPrimary.adminCommand({setFeatureCompatibilityVersion: latestFCV})); diff --git a/jstests/multiVersion/genericSetFCVUsage/tenant_migration_donor_recipient_fcv_mismatch_after_failover.js b/jstests/multiVersion/genericSetFCVUsage/tenant_migration_donor_recipient_fcv_mismatch_after_failover.js index 4e40d7e5c3f..191c1e7a802 100644 --- a/jstests/multiVersion/genericSetFCVUsage/tenant_migration_donor_recipient_fcv_mismatch_after_failover.js +++ b/jstests/multiVersion/genericSetFCVUsage/tenant_migration_donor_recipient_fcv_mismatch_after_failover.js @@ -4,6 +4,8 @@ * @tags: [ * requires_majority_read_concern, * incompatible_with_windows_tls, + * # Shard merge is not robust to failovers and restarts. + * incompatible_with_shard_merge, * serverless, * ] */ diff --git a/jstests/multiVersion/genericSetFCVUsage/tenant_migration_save_fcv.js b/jstests/multiVersion/genericSetFCVUsage/tenant_migration_save_fcv.js index 2a88f62439b..16735b65a93 100644 --- a/jstests/multiVersion/genericSetFCVUsage/tenant_migration_save_fcv.js +++ b/jstests/multiVersion/genericSetFCVUsage/tenant_migration_save_fcv.js @@ -4,6 +4,8 @@ * @tags: [ * requires_majority_read_concern, * incompatible_with_windows_tls, + * # Shard merge is not robust to failovers and restarts. + * incompatible_with_shard_merge, * serverless, * ] */ diff --git a/jstests/replsets/libs/tenant_migration_recipient_sync_source.js b/jstests/replsets/libs/tenant_migration_recipient_sync_source.js index 968143b3a8d..521fd89ecf5 100644 --- a/jstests/replsets/libs/tenant_migration_recipient_sync_source.js +++ b/jstests/replsets/libs/tenant_migration_recipient_sync_source.js @@ -23,6 +23,7 @@ export function setUpMigrationSyncSourceTest() { const donorRst = new ReplSetTest({ name: `${jsTestName()}_donor`, nodes: 3, + serverless: true, settings: {chainingAllowed: false}, nodeOptions: Object.assign(makeX509OptionsForTest().donor, { setParameter: { diff --git a/jstests/replsets/libs/tenant_migration_test.js b/jstests/replsets/libs/tenant_migration_test.js index 54b40c95581..270c443c8a1 100644 --- a/jstests/replsets/libs/tenant_migration_test.js +++ b/jstests/replsets/libs/tenant_migration_test.js @@ -349,12 +349,19 @@ export class TenantMigrationTest { _id: UUID(migrationIdString) }); + const shardMergeRecipientStateDoc = + recipientPrimary.getCollection(TenantMigrationTest.kConfigShardMergeRecipientsNS) + .findOne({_id: UUID(migrationIdString)}); + if (donorStateDoc) { assert(donorStateDoc.expireAt); } if (recipientStateDoc) { assert(recipientStateDoc.expireAt); } + if (shardMergeRecipientStateDoc) { + assert(shardMergeRecipientStateDoc.expireAt); + } const configDBCollections = recipientPrimary.getDB('config').getCollectionNames(); assert(!configDBCollections.includes('repl.migration.oplog_' + migrationIdString), @@ -422,6 +429,10 @@ export class TenantMigrationTest { node.getCollection(TenantMigrationTest.kConfigRecipientsNS); assert.soon(() => 0 === configRecipientsColl.count({_id: migrationId}), tojson(node)); + const configShardMergeRecipientsColl = + node.getCollection(TenantMigrationTest.kConfigShardMergeRecipientsNS); + assert.soon(() => 0 === configRecipientsColl.count({_id: migrationId}), tojson(node)); + let mtab; assert.soon(() => { mtab = @@ -534,7 +545,13 @@ export class TenantMigrationTest { }) { const configRecipientsColl = this.getRecipientPrimary().getCollection("config.tenantMigrationRecipients"); - const configDoc = configRecipientsColl.findOne({_id: migrationId}); + let configDoc = configRecipientsColl.findOne({_id: migrationId}); + if (!configDoc) { + configDoc = this.getRecipientPrimary() + .getCollection(TenantMigrationTest.kConfigShardMergeRecipientsNS) + .findOne({_id: migrationId}); + } + const mtab = this.getTenantMigrationAccessBlocker({recipientNode: node, tenantId}); let checkStates = () => { @@ -673,6 +690,14 @@ TenantMigrationTest.RecipientState = { kAborted: "aborted", }; +TenantMigrationTest.ShardMergeRecipientState = { + kStarted: "started", + kLearnedFilenames: "learned filenames", + kConsistent: "consistent", + kCommitted: "committed", + kAborted: "aborted", +}; + TenantMigrationTest.RecipientStateEnum = Object.keys(TenantMigrationTest.RecipientState).reduce((acc, key, idx) => { acc[key] = idx; @@ -696,3 +721,4 @@ TenantMigrationTest.RecipientAccessState = { TenantMigrationTest.kConfigDonorsNS = "config.tenantMigrationDonors"; TenantMigrationTest.kConfigRecipientsNS = "config.tenantMigrationRecipients"; +TenantMigrationTest.kConfigShardMergeRecipientsNS = "config.shardMergeRecipients"; diff --git a/jstests/replsets/tenant_migration_buildindex.js b/jstests/replsets/tenant_migration_buildindex.js index 9745ddfd049..519debee1e1 100644 --- a/jstests/replsets/tenant_migration_buildindex.js +++ b/jstests/replsets/tenant_migration_buildindex.js @@ -3,6 +3,8 @@ * * @tags: [ * incompatible_with_macos, + * # Shard merge protocol will be tested by tenant_migration_buildindex_shard_merge.js. + * incompatible_with_shard_merge, * incompatible_with_windows_tls, * requires_majority_read_concern, * requires_persistence, @@ -22,13 +24,6 @@ load("jstests/libs/uuid_util.js"); load("jstests/replsets/rslib.js"); // 'createRstArgs' const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); -// TODO (SERVER-63517): This test assumes the donor blocks only some tenants. Replace this test with -// tenant_migration_buildindex_shard_merge.js. -if (isShardMergeEnabled(tenantMigrationTest.getDonorPrimary().getDB("adminDB"))) { - jsTestLog("Skip: incompatible with featureFlagShardMerge"); - tenantMigrationTest.stop(); - quit(); -} const kTenantId = ObjectId().str; const kUnrelatedTenantId = ObjectId().str; diff --git a/jstests/replsets/tenant_migration_causal_consistency_commit_optime_before_last_cloning_optime.js b/jstests/replsets/tenant_migration_causal_consistency_commit_optime_before_last_cloning_optime.js index 60e8bab1547..4a3796b85f0 100644 --- a/jstests/replsets/tenant_migration_causal_consistency_commit_optime_before_last_cloning_optime.js +++ b/jstests/replsets/tenant_migration_causal_consistency_commit_optime_before_last_cloning_optime.js @@ -36,8 +36,12 @@ function assertCanFindWithReadConcern(conn, dbName, collName, expectedDoc, readC // Simulate a lagged node by setting secondaryDelaySecs on one recipient secondary. Verify this // does not prevent reading all the tenant's data after the migration commits. - const recipientRst = new ReplSetTest( - {name: "recipient_local_case", nodes: 3, nodeOptions: migrationX509Options.recipient}); + const recipientRst = new ReplSetTest({ + name: "recipient_local_case", + nodes: 3, + serverless: true, + nodeOptions: migrationX509Options.recipient + }); recipientRst.startSet(); let config = recipientRst.getReplSetConfig(); diff --git a/jstests/replsets/tenant_migration_cluster_time_keys_cloning.js b/jstests/replsets/tenant_migration_cluster_time_keys_cloning.js index 1ea21a598ca..f3737dd4706 100644 --- a/jstests/replsets/tenant_migration_cluster_time_keys_cloning.js +++ b/jstests/replsets/tenant_migration_cluster_time_keys_cloning.js @@ -104,8 +104,12 @@ const migrationX509Options = makeX509OptionsForTest(); (() => { jsTest.log("Test that the donor and recipient correctly copy each other's cluster time keys " + "when there is no failover but the recipient syncs data from a secondary."); - const recipientRst = new ReplSetTest( - {nodes: 3, name: "recipientRst", nodeOptions: migrationX509Options.recipient}); + const recipientRst = new ReplSetTest({ + nodes: 3, + name: "recipientRst", + serverless: true, + nodeOptions: migrationX509Options.recipient + }); recipientRst.startSet(); recipientRst.initiate(); @@ -131,8 +135,8 @@ const migrationX509Options = makeX509OptionsForTest(); (() => { jsTest.log("Test that the donor and recipient correctly copy each other's cluster time keys " + "when there is donor failover."); - const donorRst = - new ReplSetTest({nodes: 3, name: "donorRst", nodeOptions: migrationX509Options.donor}); + const donorRst = new ReplSetTest( + {nodes: 3, name: "donorRst", serverless: true, nodeOptions: migrationX509Options.donor}); donorRst.startSet(); donorRst.initiate(); if (isShardMergeEnabled(donorRst.getPrimary().getDB("adminDB"))) { @@ -176,8 +180,12 @@ const migrationX509Options = makeX509OptionsForTest(); (() => { jsTest.log("Test that the donor and recipient correctly copy each other's cluster time keys " + "when there is recipient failover."); - const recipientRst = new ReplSetTest( - {nodes: 3, name: "recipientRst", nodeOptions: migrationX509Options.recipient}); + const recipientRst = new ReplSetTest({ + nodes: 3, + name: "recipientRst", + serverless: true, + nodeOptions: migrationX509Options.recipient + }); recipientRst.startSet(); recipientRst.initiate(); if (isShardMergeEnabled(recipientRst.getPrimary().getDB("adminDB"))) { @@ -224,6 +232,7 @@ const migrationX509Options = makeX509OptionsForTest(); const donorRst = new ReplSetTest({ nodes: [{}, {}, {rsConfig: {priority: 0}}], name: "donorRst", + serverless: true, settings: {chainingAllowed: false}, nodeOptions: migrationX509Options.donor }); diff --git a/jstests/replsets/tenant_migration_concurrent_bulk_writes.js b/jstests/replsets/tenant_migration_concurrent_bulk_writes.js index 0c1bd67eb7c..6e7d1ca5fd2 100644 --- a/jstests/replsets/tenant_migration_concurrent_bulk_writes.js +++ b/jstests/replsets/tenant_migration_concurrent_bulk_writes.js @@ -37,6 +37,7 @@ function setup() { const donorRst = new ReplSetTest({ nodes: 1, name: 'donor', + serverless: true, nodeOptions: Object.assign(migrationX509Options.donor, { setParameter: { internalInsertMaxBatchSize: @@ -54,6 +55,7 @@ function setup() { const recipientRst = new ReplSetTest({ nodes: 1, name: 'recipient', + serverless: true, nodeOptions: Object.assign(migrationX509Options.recipient, { setParameter: { internalInsertMaxBatchSize: diff --git a/jstests/replsets/tenant_migration_concurrent_migrations.js b/jstests/replsets/tenant_migration_concurrent_migrations.js index 2ccc07c6351..4e51dea3634 100644 --- a/jstests/replsets/tenant_migration_concurrent_migrations.js +++ b/jstests/replsets/tenant_migration_concurrent_migrations.js @@ -24,9 +24,9 @@ const x509Options0 = makeX509Options("jstests/libs/rs0.pem"); const x509Options1 = makeX509Options("jstests/libs/rs1.pem"); const x509Options2 = makeX509Options("jstests/libs/rs2.pem"); -const rst0 = new ReplSetTest({nodes: 1, name: 'rst0', nodeOptions: x509Options0}); -const rst1 = new ReplSetTest({nodes: 1, name: 'rst1', nodeOptions: x509Options1}); -const rst2 = new ReplSetTest({nodes: 1, name: 'rst2', nodeOptions: x509Options2}); +const rst0 = new ReplSetTest({nodes: 1, name: 'rst0', serverless: true, nodeOptions: x509Options0}); +const rst1 = new ReplSetTest({nodes: 1, name: 'rst1', serverless: true, nodeOptions: x509Options1}); +const rst2 = new ReplSetTest({nodes: 1, name: 'rst2', serverless: true, nodeOptions: x509Options2}); rst0.startSet(); rst0.initiate(); diff --git a/jstests/replsets/tenant_migration_concurrent_reads_on_recipient.js b/jstests/replsets/tenant_migration_concurrent_reads_on_recipient.js index 21bcb2c55f9..4230e8c1c0f 100644 --- a/jstests/replsets/tenant_migration_concurrent_reads_on_recipient.js +++ b/jstests/replsets/tenant_migration_concurrent_reads_on_recipient.js @@ -18,7 +18,10 @@ */ import {TenantMigrationTest} from "jstests/replsets/libs/tenant_migration_test.js"; -import {runMigrationAsync} from "jstests/replsets/libs/tenant_migration_util.js"; +import { + isShardMergeEnabled, + runMigrationAsync +} from "jstests/replsets/libs/tenant_migration_util.js"; load("jstests/libs/fail_point_util.js"); load("jstests/libs/parallelTester.js"); @@ -125,10 +128,12 @@ function testRejectOnlyReadsWithAtClusterTimeLessThanRejectReadsBeforeTimestamp( // unspecified atClusterTime have read timestamp >= rejectReadsBeforeTimestamp. recipientRst.awaitLastOpCommitted(); - const recipientDoc = - recipientPrimary.getCollection(TenantMigrationTest.kConfigRecipientsNS).findOne({ - _id: UUID(migrationOpts.migrationIdString), - }); + const recipientStateDocNss = isShardMergeEnabled(recipientPrimary.getDB("admin")) + ? TenantMigrationTest.kConfigShardMergeRecipientsNS + : TenantMigrationTest.kConfigRecipientsNS; + const recipientDoc = recipientPrimary.getCollection(recipientStateDocNss).findOne({ + _id: UUID(migrationOpts.migrationIdString), + }); assert.lt(preMigrationTimestamp, recipientDoc.rejectReadsBeforeTimestamp); const nodes = testCase.isSupportedOnSecondaries ? recipientRst.nodes : [recipientPrimary]; @@ -270,10 +275,12 @@ function testDoNotRejectReadsAfterMigrationAbortedAfterReachingRejectReadsBefore // unspecified atClusterTime have read timestamp >= rejectReadsBeforeTimestamp. recipientRst.awaitLastOpCommitted(); - const recipientDoc = - recipientPrimary.getCollection(TenantMigrationTest.kConfigRecipientsNS).findOne({ - _id: UUID(migrationOpts.migrationIdString), - }); + const recipientStateDocNss = isShardMergeEnabled(recipientPrimary.getDB("admin")) + ? TenantMigrationTest.kConfigShardMergeRecipientsNS + : TenantMigrationTest.kConfigRecipientsNS; + const recipientDoc = recipientPrimary.getCollection(recipientStateDocNss).findOne({ + _id: UUID(migrationOpts.migrationIdString), + }); const nodes = testCase.isSupportedOnSecondaries ? recipientRst.nodes : [recipientPrimary]; nodes.forEach(node => { diff --git a/jstests/replsets/tenant_migration_conflicting_donor_start_migration_cmds.js b/jstests/replsets/tenant_migration_conflicting_donor_start_migration_cmds.js index 9cbd20de21f..14a4cd91834 100644 --- a/jstests/replsets/tenant_migration_conflicting_donor_start_migration_cmds.js +++ b/jstests/replsets/tenant_migration_conflicting_donor_start_migration_cmds.js @@ -60,6 +60,7 @@ Object.assign(donorNodeOptions.setParameter, { const donorRst = new ReplSetTest({ nodes: 1, name: 'donorRst', + serverless: true, nodeOptions: donorNodeOptions, }); diff --git a/jstests/replsets/tenant_migration_conflicting_recipient_sync_data_cmds.js b/jstests/replsets/tenant_migration_conflicting_recipient_sync_data_cmds.js index cd49aa1521d..9983e2bba86 100644 --- a/jstests/replsets/tenant_migration_conflicting_recipient_sync_data_cmds.js +++ b/jstests/replsets/tenant_migration_conflicting_recipient_sync_data_cmds.js @@ -3,6 +3,9 @@ * * @tags: [ * incompatible_with_macos, + * # Shard merge protocol will be tested by + * # tenant_migration_shard_merge_conflicting_recipient_sync_data_cmds.js. + * incompatible_with_shard_merge, * requires_fcv_52, * incompatible_with_windows_tls, * requires_majority_read_concern, @@ -21,7 +24,8 @@ load("jstests/libs/fail_point_util.js"); load("jstests/libs/parallelTester.js"); load("jstests/libs/uuid_util.js"); -var rst = new ReplSetTest({nodes: 1, nodeOptions: makeX509OptionsForTest().donor}); +var rst = + new ReplSetTest({nodes: 1, serverless: true, nodeOptions: makeX509OptionsForTest().donor}); rst.startSet(); rst.initiate(); const primary = rst.getPrimary(); diff --git a/jstests/replsets/tenant_migration_donor_resume_on_stepup_and_restart.js b/jstests/replsets/tenant_migration_donor_resume_on_stepup_and_restart.js index 25b37f576da..b3b158c7134 100644 --- a/jstests/replsets/tenant_migration_donor_resume_on_stepup_and_restart.js +++ b/jstests/replsets/tenant_migration_donor_resume_on_stepup_and_restart.js @@ -54,8 +54,8 @@ const migrationX509Options = makeX509OptionsForTest(); */ function testDonorStartMigrationInterrupt(interruptFunc, {donorRestarted = false, disableForShardMerge = true}) { - const donorRst = - new ReplSetTest({nodes: 3, name: "donorRst", nodeOptions: migrationX509Options.donor}); + const donorRst = new ReplSetTest( + {nodes: 3, name: "donorRst", serverless: true, nodeOptions: migrationX509Options.donor}); donorRst.startSet(); donorRst.initiate(); @@ -128,6 +128,7 @@ function testDonorForgetMigrationInterrupt(interruptFunc) { const donorRst = new ReplSetTest({ nodes: 3, name: "donorRst", + serverless: true, nodeOptions: Object.assign({}, migrationX509Options.donor, { setParameter: { tenantMigrationGarbageCollectionDelayMS: kGarbageCollectionDelayMS, @@ -138,6 +139,7 @@ function testDonorForgetMigrationInterrupt(interruptFunc) { const recipientRst = new ReplSetTest({ nodes: 1, name: "recipientRst", + serverless: true, nodeOptions: Object.assign({}, migrationX509Options.recipient, { setParameter: { tenantMigrationGarbageCollectionDelayMS: kGarbageCollectionDelayMS, @@ -204,6 +206,7 @@ function testDonorAbortMigrationInterrupt( const donorRst = new ReplSetTest({ nodes: 3, name: "donorRst", + serverless: true, nodeOptions: Object.assign({}, migrationX509Options.donor, { setParameter: { tenantMigrationGarbageCollectionDelayMS: kGarbageCollectionDelayMS, @@ -214,6 +217,7 @@ function testDonorAbortMigrationInterrupt( const recipientRst = new ReplSetTest({ nodes: 1, name: "recipientRst", + serverless: true, nodeOptions: Object.assign({}, migrationX509Options.recipient, { setParameter: { tenantMigrationGarbageCollectionDelayMS: kGarbageCollectionDelayMS, @@ -298,8 +302,8 @@ function testDonorAbortMigrationInterrupt( * restarting, check the to see if the donorDoc data has persisted. */ function testStateDocPersistenceOnFailover(interruptFunc, fpName, isShutdown = false) { - const donorRst = - new ReplSetTest({nodes: 3, name: "donorRst", nodeOptions: migrationX509Options.donor}); + const donorRst = new ReplSetTest( + {nodes: 3, name: "donorRst", serverless: true, nodeOptions: migrationX509Options.donor}); donorRst.startSet(); donorRst.initiate(); diff --git a/jstests/replsets/tenant_migration_donor_retry.js b/jstests/replsets/tenant_migration_donor_retry.js index 8ebc52b1f62..621babbd33e 100644 --- a/jstests/replsets/tenant_migration_donor_retry.js +++ b/jstests/replsets/tenant_migration_donor_retry.js @@ -27,6 +27,7 @@ function setup() { const donorRst = new ReplSetTest({ name: "donorRst", nodes: 1, + serverless: true, nodeOptions: Object.assign(makeX509OptionsForTest().donor, { setParameter: { // Set the delay before a donor state doc is garbage collected to be short to speed diff --git a/jstests/replsets/tenant_migration_donor_rollback_during_cloning.js b/jstests/replsets/tenant_migration_donor_rollback_during_cloning.js index 783589cecad..b66ed2f4f7b 100644 --- a/jstests/replsets/tenant_migration_donor_rollback_during_cloning.js +++ b/jstests/replsets/tenant_migration_donor_rollback_during_cloning.js @@ -24,8 +24,12 @@ load("jstests/libs/write_concern_util.js"); // for 'stopReplicationOnSecondarie const migrationX509Options = makeX509OptionsForTest(); -const recipientRst = - new ReplSetTest({name: "recipientRst", nodes: 1, nodeOptions: migrationX509Options.recipient}); +const recipientRst = new ReplSetTest({ + name: "recipientRst", + nodes: 1, + serverless: true, + nodeOptions: migrationX509Options.recipient +}); recipientRst.startSet(); recipientRst.initiateWithHighElectionTimeout(); @@ -55,6 +59,7 @@ function runTest(tenantId, const donorRst = new ReplSetTest({ name: "donorRst", nodes: 5, + serverless: true, nodeOptions: Object.assign(migrationX509Options.donor, { setParameter: { // Allow non-timestamped reads on donor after migration completes for testing. diff --git a/jstests/replsets/tenant_migration_donor_rollback_recovery.js b/jstests/replsets/tenant_migration_donor_rollback_recovery.js index 133f69c0c39..910004be6d1 100644 --- a/jstests/replsets/tenant_migration_donor_rollback_recovery.js +++ b/jstests/replsets/tenant_migration_donor_rollback_recovery.js @@ -40,6 +40,7 @@ const migrationX509Options = makeX509OptionsForTest(); const recipientRst = new ReplSetTest({ name: "recipientRst", nodes: 1, + serverless: true, nodeOptions: Object.assign({}, migrationX509Options.recipient, { setParameter: { tenantMigrationGarbageCollectionDelayMS: kGarbageCollectionDelayMS, @@ -70,6 +71,7 @@ function testRollBack(setUpFunc, rollbackOpsFunc, steadyStateFunc) { const donorRst = new ReplSetTest({ name: "donorRst", nodes: 3, + serverless: true, nodeOptions: Object.assign({}, migrationX509Options.donor, { setParameter: { tenantMigrationGarbageCollectionDelayMS: kGarbageCollectionDelayMS, diff --git a/jstests/replsets/tenant_migration_donor_startup_recovery.js b/jstests/replsets/tenant_migration_donor_startup_recovery.js index a3c3a59b10a..d825ff01c7e 100644 --- a/jstests/replsets/tenant_migration_donor_startup_recovery.js +++ b/jstests/replsets/tenant_migration_donor_startup_recovery.js @@ -28,6 +28,7 @@ load("jstests/libs/uuid_util.js"); const donorRst = new ReplSetTest({ nodes: 1, name: 'donor', + serverless: true, nodeOptions: Object.assign(makeX509OptionsForTest().donor, { setParameter: // In order to deterministically validate that in-memory state is preserved during diff --git a/jstests/replsets/tenant_migration_donor_state_machine.js b/jstests/replsets/tenant_migration_donor_state_machine.js index fddbafa7051..c718fb85ed9 100644 --- a/jstests/replsets/tenant_migration_donor_state_machine.js +++ b/jstests/replsets/tenant_migration_donor_state_machine.js @@ -67,10 +67,13 @@ function testDonorForgetMigrationAfterMigrationCompletes( assert.soon(() => null == getTenantMigrationAccessBlocker({recipientNode: node})); }); - assert.soon(() => 0 === - recipientPrimary.getCollection(TenantMigrationTest.kConfigRecipientsNS).count({ - _id: migrationId, - })); + const recipientStateDocNss = isShardMergeEnabled(recipientPrimary.getDB("admin")) + ? TenantMigrationTest.kConfigShardMergeRecipientsNS + : TenantMigrationTest.kConfigRecipientsNS; + + assert.soon(() => 0 === recipientPrimary.getCollection(recipientStateDocNss).count({ + _id: migrationId, + })); assert.soon(() => 0 === recipientPrimary.adminCommand({serverStatus: 1}) .repl.primaryOnlyServices.TenantMigrationRecipientService.numInstances); @@ -92,12 +95,14 @@ const x509Options = makeX509OptionsForTest(); const donorRst = new ReplSetTest({ nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}], name: "donor", + serverless: true, nodeOptions: Object.assign(x509Options.donor, sharedOptions) }); const recipientRst = new ReplSetTest({ nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}], name: "recipient", + serverless: true, nodeOptions: Object.assign(x509Options.recipient, sharedOptions) }); diff --git a/jstests/replsets/tenant_migration_donor_try_abort.js b/jstests/replsets/tenant_migration_donor_try_abort.js index 8fa33cd9d86..f7bd1b80bde 100644 --- a/jstests/replsets/tenant_migration_donor_try_abort.js +++ b/jstests/replsets/tenant_migration_donor_try_abort.js @@ -304,6 +304,7 @@ const migrationX509Options = makeX509OptionsForTest(); const donorRst = new ReplSetTest({ nodes: 3, name: "donorRst", + serverless: true, settings: {chainingAllowed: false}, nodeOptions: migrationX509Options.donor }); @@ -491,8 +492,8 @@ const migrationX509Options = makeX509OptionsForTest(); (() => { jsTestLog("Test sending donorAbortMigration for a non-existent tenant migration."); - const donorRst = - new ReplSetTest({nodes: 2, name: "donorRst", nodeOptions: migrationX509Options.donor}); + const donorRst = new ReplSetTest( + {nodes: 2, name: "donorRst", serverless: true, nodeOptions: migrationX509Options.donor}); donorRst.startSet(); donorRst.initiate(); diff --git a/jstests/replsets/tenant_migration_donor_unblock_reads_and_writes_on_completion.js b/jstests/replsets/tenant_migration_donor_unblock_reads_and_writes_on_completion.js index 2e65e8fe6d8..3367deeb89d 100644 --- a/jstests/replsets/tenant_migration_donor_unblock_reads_and_writes_on_completion.js +++ b/jstests/replsets/tenant_migration_donor_unblock_reads_and_writes_on_completion.js @@ -51,6 +51,7 @@ function setup() { const donorRst = new ReplSetTest({ nodes: 3, name: "donorRst", + serverless: true, nodeOptions: Object.assign(makeX509OptionsForTest().donor, { setParameter: { tenantMigrationGarbageCollectionDelayMS: 1, diff --git a/jstests/replsets/tenant_migration_drop_collection.js b/jstests/replsets/tenant_migration_drop_collection.js index aac09835e4d..c6940030dfc 100644 --- a/jstests/replsets/tenant_migration_drop_collection.js +++ b/jstests/replsets/tenant_migration_drop_collection.js @@ -32,6 +32,7 @@ function runDropTest({failPointName, failPointData, expectedLog, createNew}) { const recipientRst = new ReplSetTest({ nodes: 1, name: "recipient", + serverless: true, nodeOptions: Object.assign(makeX509OptionsForTest().recipient, {setParameter: {collectionClonerBatchSize: 1}}) }); diff --git a/jstests/replsets/tenant_migration_drop_state_doc_collection.js b/jstests/replsets/tenant_migration_drop_state_doc_collection.js index 505d2c4e2a7..5df9dbba2d2 100644 --- a/jstests/replsets/tenant_migration_drop_state_doc_collection.js +++ b/jstests/replsets/tenant_migration_drop_state_doc_collection.js @@ -4,6 +4,8 @@ * @tags: [ * incompatible_with_macos, * incompatible_with_windows_tls, + * # It is illegal to drop internal state doc collection while migration is active. + * incompatible_with_shard_merge, * requires_majority_read_concern, * requires_persistence, * serverless, diff --git a/jstests/replsets/tenant_migration_ensure_migration_outcome_visibility_for_blocked_writes.js b/jstests/replsets/tenant_migration_ensure_migration_outcome_visibility_for_blocked_writes.js index 4eb0c9378ee..a5c39831204 100644 --- a/jstests/replsets/tenant_migration_ensure_migration_outcome_visibility_for_blocked_writes.js +++ b/jstests/replsets/tenant_migration_ensure_migration_outcome_visibility_for_blocked_writes.js @@ -35,6 +35,7 @@ const kTenantDefinedDbName = "0"; const donorRst = new ReplSetTest({ nodes: 1, name: 'donor', + serverless: true, nodeOptions: Object.assign(makeX509OptionsForTest().donor, {setParameter: kGarbageCollectionParams}) }); diff --git a/jstests/replsets/tenant_migration_external_cluster_validation.js b/jstests/replsets/tenant_migration_external_cluster_validation.js index c9d116ffd61..378d6cc8c9c 100644 --- a/jstests/replsets/tenant_migration_external_cluster_validation.js +++ b/jstests/replsets/tenant_migration_external_cluster_validation.js @@ -55,6 +55,7 @@ const x509Options = makeX509OptionsForTest(); const donorRst = new ReplSetTest({ nodes: 2, name: "donor", + serverless: true, keyFile: "jstests/libs/key1", nodeOptions: Object.assign(x509Options.donor, { setParameter: { @@ -68,6 +69,7 @@ const donorRst = new ReplSetTest({ const recipientRst = new ReplSetTest({ nodes: 2, name: "recipient", + serverless: true, keyFile: "jstests/libs/key1", nodeOptions: Object.assign( x509Options.recipient, diff --git a/jstests/replsets/tenant_migration_external_keys_ttl.js b/jstests/replsets/tenant_migration_external_keys_ttl.js index 41d3165b46c..28611f9abaf 100644 --- a/jstests/replsets/tenant_migration_external_keys_ttl.js +++ b/jstests/replsets/tenant_migration_external_keys_ttl.js @@ -223,6 +223,7 @@ function makeTestParams() { const donorRst = new ReplSetTest({ nodes: 3, name: "donorRst", + serverless: true, nodeOptions: Object.assign(migrationX509Options.donor, {setParameter: ttlMonitorOptions}) }); @@ -232,6 +233,7 @@ function makeTestParams() { const recipientRst = new ReplSetTest({ nodes: 3, name: "recipientRst", + serverless: true, nodeOptions: Object.assign(migrationX509Options.recipient, {setParameter: ttlMonitorOptions}) }); diff --git a/jstests/replsets/tenant_migration_index_oplog_entries.js b/jstests/replsets/tenant_migration_index_oplog_entries.js index 3a1d5af954d..6aa3ca4b9c0 100644 --- a/jstests/replsets/tenant_migration_index_oplog_entries.js +++ b/jstests/replsets/tenant_migration_index_oplog_entries.js @@ -18,7 +18,7 @@ const kDbName = "testDb"; const kCollName = "testColl"; const kNs = kDbName + "." + kCollName; -const rst = new ReplSetTest({nodes: 1}); +const rst = new ReplSetTest({nodes: 1, serverless: true}); rst.startSet(); rst.initiate(); diff --git a/jstests/replsets/tenant_migration_metrics_output.js b/jstests/replsets/tenant_migration_metrics_output.js index 49b763ec058..bb2e29a9e28 100644 --- a/jstests/replsets/tenant_migration_metrics_output.js +++ b/jstests/replsets/tenant_migration_metrics_output.js @@ -36,6 +36,7 @@ const testPath = MongoRunner.toRealPath("ftdc_dir_repl_node"); const donorRst = new ReplSetTest({ nodes: 1, name: "donorRst", + serverless: true, nodeOptions: Object.assign(makeX509OptionsForTest().donor, {setParameter: {diagnosticDataCollectionDirectoryPath: testPath}}) }); diff --git a/jstests/replsets/tenant_migration_multi_writes.js b/jstests/replsets/tenant_migration_multi_writes.js index 6ff86a2db0a..8a8fe468d82 100644 --- a/jstests/replsets/tenant_migration_multi_writes.js +++ b/jstests/replsets/tenant_migration_multi_writes.js @@ -28,6 +28,7 @@ load("jstests/libs/uuid_util.js"); const donorRst = new ReplSetTest({ nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}], name: "TenantMigrationTest_donor", + serverless: true, nodeOptions: Object.assign(makeX509OptionsForTest().donor, { setParameter: { // Set the delay before a donor state doc is garbage collected to be short to speed up diff --git a/jstests/replsets/tenant_migration_multikey_index.js b/jstests/replsets/tenant_migration_multikey_index.js index 14e6ff23bd0..925547bc1f1 100644 --- a/jstests/replsets/tenant_migration_multikey_index.js +++ b/jstests/replsets/tenant_migration_multikey_index.js @@ -35,6 +35,7 @@ const verifyMultiKeyIndex = function(coll, isMultiKey, multiKeyPath) { const recipientRst = new ReplSetTest({ nodes: 2, name: jsTestName() + "_recipient", + serverless: true, nodeOptions: Object.assign(makeX509OptionsForTest().recipient, { setParameter: { // Allow reads on recipient before migration completes for testing. diff --git a/jstests/replsets/tenant_migration_network_error_via_rollback.js b/jstests/replsets/tenant_migration_network_error_via_rollback.js index eeea7ad02e8..b8d4d4540f6 100644 --- a/jstests/replsets/tenant_migration_network_error_via_rollback.js +++ b/jstests/replsets/tenant_migration_network_error_via_rollback.js @@ -38,6 +38,7 @@ function runTest({failPointName, failPointData = {}, batchSize = 10 * 1000}) { {rsConfig: {priority: 0, hidden: true}}, {rsConfig: {priority: 0, hidden: true}} ], + serverless: true, nodeOptions: Object.assign(migrationX509Options.donor, { setParameter: { // Allow non-timestamped reads on donor after migration completes for testing. diff --git a/jstests/replsets/tenant_migration_oplog_view.js b/jstests/replsets/tenant_migration_oplog_view.js index 764cf86494e..29eb83733e8 100644 --- a/jstests/replsets/tenant_migration_oplog_view.js +++ b/jstests/replsets/tenant_migration_oplog_view.js @@ -17,6 +17,7 @@ const kGarbageCollectionDelayMS = 5 * 1000; const donorRst = new ReplSetTest({ name: "donorRst", nodes: 1, + serverless: true, nodeOptions: { setParameter: { // Set the delay before a donor state doc is garbage collected to be short to speed diff --git a/jstests/replsets/tenant_migration_recipient_access_blocker_rollback.js b/jstests/replsets/tenant_migration_recipient_access_blocker_rollback.js index 5cc3a9dd7c5..35720c08fbc 100644 --- a/jstests/replsets/tenant_migration_recipient_access_blocker_rollback.js +++ b/jstests/replsets/tenant_migration_recipient_access_blocker_rollback.js @@ -4,6 +4,9 @@ * * @tags: [ * incompatible_with_macos, + * # Shard merge protocol will be tested by + * # tenant_migration_shard_merge_recipient_access_blocker_rollback.js. + * incompatible_with_shard_merge, * incompatible_with_windows_tls, * requires_majority_read_concern, * requires_persistence, @@ -27,6 +30,7 @@ const migrationX509Options = makeX509OptionsForTest(); const recipientRst = new ReplSetTest({ name: "recipRst", nodes: 3, + serverless: true, nodeOptions: Object.assign(migrationX509Options.recipient, {}), settings: {catchUpTimeoutMillis: 0, chainingAllowed: false} }); diff --git a/jstests/replsets/tenant_migration_recipient_directly_deletes_its_state_doc.js b/jstests/replsets/tenant_migration_recipient_directly_deletes_its_state_doc.js index 6f6e7e360a2..d081b5c0477 100644 --- a/jstests/replsets/tenant_migration_recipient_directly_deletes_its_state_doc.js +++ b/jstests/replsets/tenant_migration_recipient_directly_deletes_its_state_doc.js @@ -6,6 +6,8 @@ * @tags: [ * incompatible_with_macos, * incompatible_with_windows_tls, + * # Shard merge recipient state doc deletion is no longer managed by TTL monitor. + * incompatible_with_shard_merge, * # Uses pauseTenantMigrationRecipientBeforeDeletingStateDoc failpoint, which was added in 6.2 * requires_fcv_62, * requires_majority_read_concern, diff --git a/jstests/replsets/tenant_migration_recipient_does_not_change_sync_source_after_step_down.js b/jstests/replsets/tenant_migration_recipient_does_not_change_sync_source_after_step_down.js index 809084e79a0..a6f37c9bcbf 100644 --- a/jstests/replsets/tenant_migration_recipient_does_not_change_sync_source_after_step_down.js +++ b/jstests/replsets/tenant_migration_recipient_does_not_change_sync_source_after_step_down.js @@ -34,6 +34,7 @@ const batchSize = 2; const recipientRst = new ReplSetTest({ nodes: 2, name: jsTestName() + "_recipient", + serverless: true, nodeOptions: Object.assign(makeX509OptionsForTest().recipient, { setParameter: { // Use a batch size of 2 so that collection cloner requires more than a single diff --git a/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_entry_after_committed_snapshot.js b/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_entry_after_committed_snapshot.js index 7322544c0c0..bddaa663079 100644 --- a/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_entry_after_committed_snapshot.js +++ b/jstests/replsets/tenant_migration_recipient_fetches_retryable_writes_entry_after_committed_snapshot.js @@ -6,8 +6,12 @@ * recipient's majority read on 'config.transactions' can miss committed retryable writes at that * majority commit point. * + * TODO SERVER-61231: Adapt for shard merge. + * * @tags: [ * incompatible_with_macos, + * # Shard merge only supports 'primary' read preference. + * incompatible_with_shard_merge, * incompatible_with_windows_tls, * requires_majority_read_concern, * requires_persistence, @@ -59,6 +63,7 @@ const donorRst = new ReplSetTest({ n3: {rsConfig: {priority: 0, hidden: true}, setParameter: {bgSyncOplogFetcherBatchSize: 1}}, n4: {rsConfig: {priority: 0, hidden: true}, setParameter: {bgSyncOplogFetcherBatchSize: 1}}, }, + serverless: true, // Force secondaries to sync from the primary to guarantee replication progress with the // stopReplProducerOnDocument failpoint. Also disable primary catchup because some replicated // retryable write statements are intentionally not being made majority committed. @@ -77,14 +82,6 @@ const donorPrimary = donorRst.getPrimary(); const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst: donorRst}); -if (isShardMergeEnabled(donorPrimary.getDB("admin"))) { - jsTestLog( - "Skip: incompatible with featureFlagShardMerge. Only 'primary' read preference is supported."); - donorRst.stopSet(); - tenantMigrationTest.stop(); - quit(); -} - const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); const kTenantId = ObjectId().str; const migrationId = UUID(); diff --git a/jstests/replsets/tenant_migration_recipient_has_tenant_data.js b/jstests/replsets/tenant_migration_recipient_has_tenant_data.js index f5e7b6d7b15..5a7f5778404 100644 --- a/jstests/replsets/tenant_migration_recipient_has_tenant_data.js +++ b/jstests/replsets/tenant_migration_recipient_has_tenant_data.js @@ -30,6 +30,7 @@ const kGarbageCollectionParams = { const donorRst = new ReplSetTest({ nodes: 1, name: "donor", + serverless: true, nodeOptions: Object.assign(makeX509OptionsForTest().donor, {setParameter: kGarbageCollectionParams}) }); diff --git a/jstests/replsets/tenant_migration_recipient_initial_sync_cloning.js b/jstests/replsets/tenant_migration_recipient_initial_sync_cloning.js index 27f3b4716f2..b0c58169b01 100644 --- a/jstests/replsets/tenant_migration_recipient_initial_sync_cloning.js +++ b/jstests/replsets/tenant_migration_recipient_initial_sync_cloning.js @@ -135,6 +135,7 @@ function runTestCase(recipientFailpoint, checkMtab, restartNodeAndCheckStateFunc const donorRst = new ReplSetTest({ name: "donorRst", nodes: 1, + serverless: true, nodeOptions: Object.assign(migrationX509Options.donor, { setParameter: { // Allow non-timestamped reads on donor after migration completes for testing. diff --git a/jstests/replsets/tenant_migration_recipient_resume_on_stepup_and_restart.js b/jstests/replsets/tenant_migration_recipient_resume_on_stepup_and_restart.js index bde03f9f80b..f00a4c4f083 100644 --- a/jstests/replsets/tenant_migration_recipient_resume_on_stepup_and_restart.js +++ b/jstests/replsets/tenant_migration_recipient_resume_on_stepup_and_restart.js @@ -44,8 +44,12 @@ const migrationX509Options = makeX509OptionsForTest(); * @param {recipientRestarted} bool is needed to properly assert the tenant migrations stat count. */ function testRecipientSyncDataInterrupt(interruptFunc, recipientRestarted) { - const recipientRst = new ReplSetTest( - {nodes: 3, name: "recipientRst", nodeOptions: migrationX509Options.recipient}); + const recipientRst = new ReplSetTest({ + nodes: 3, + name: "recipientRst", + serverless: true, + nodeOptions: migrationX509Options.recipient + }); recipientRst.startSet(); recipientRst.initiate(); @@ -97,6 +101,7 @@ function testRecipientForgetMigrationInterrupt(interruptFunc) { const donorRst = new ReplSetTest({ nodes: 1, name: "donorRst", + serverless: true, nodeOptions: Object.assign({}, migrationX509Options.donor, { setParameter: { tenantMigrationGarbageCollectionDelayMS: kGarbageCollectionDelayMS, @@ -107,6 +112,7 @@ function testRecipientForgetMigrationInterrupt(interruptFunc) { const recipientRst = new ReplSetTest({ nodes: 3, name: "recipientRst", + serverless: true, nodeOptions: Object.assign({}, migrationX509Options.recipient, { setParameter: { tenantMigrationGarbageCollectionDelayMS: kGarbageCollectionDelayMS, diff --git a/jstests/replsets/tenant_migration_recipient_resumes_on_donor_failover.js b/jstests/replsets/tenant_migration_recipient_resumes_on_donor_failover.js index 5966e189671..d64fc1eb096 100644 --- a/jstests/replsets/tenant_migration_recipient_resumes_on_donor_failover.js +++ b/jstests/replsets/tenant_migration_recipient_resumes_on_donor_failover.js @@ -30,6 +30,7 @@ function runTest(failPoint) { const recipientRst = new ReplSetTest({ nodes: 2, name: jsTestName() + "_recipient", + serverless: true, nodeOptions: Object.assign(makeX509OptionsForTest().recipient, { setParameter: { // Use a batch size of 2 so that collection cloner requires more than a single batch diff --git a/jstests/replsets/tenant_migration_recipient_retry_forget_migration.js b/jstests/replsets/tenant_migration_recipient_retry_forget_migration.js index 278cf4a882f..43798c7952b 100644 --- a/jstests/replsets/tenant_migration_recipient_retry_forget_migration.js +++ b/jstests/replsets/tenant_migration_recipient_retry_forget_migration.js @@ -5,6 +5,9 @@ * incompatible_with_macos, * incompatible_with_windows_tls, * requires_majority_read_concern, + * # Shard merge protocol will be tested by + * # tenant_migration_shard_merge_recipient_retry_forget_migration.js. + * incompatible_with_shard_merge, * requires_persistence, * serverless, * # The currentOp output field 'state' was changed from an enum value to a string. diff --git a/jstests/replsets/tenant_migration_recipient_rollback_recovery.js b/jstests/replsets/tenant_migration_recipient_rollback_recovery.js index 4e04db234b6..ffb2cc4919e 100644 --- a/jstests/replsets/tenant_migration_recipient_rollback_recovery.js +++ b/jstests/replsets/tenant_migration_recipient_rollback_recovery.js @@ -55,6 +55,7 @@ function testRollBack(setUpFunc, rollbackOpsFunc, steadyStateFunc) { const donorRst = new ReplSetTest({ name: "donorRst", nodes: 1, + serverless: true, nodeOptions: Object.assign({}, migrationX509Options.donor, { setParameter: { tenantMigrationGarbageCollectionDelayMS: kGarbageCollectionDelayMS, @@ -70,6 +71,7 @@ function testRollBack(setUpFunc, rollbackOpsFunc, steadyStateFunc) { const recipientRst = new ReplSetTest({ name: "recipientRst", nodes: 3, + serverless: true, nodeOptions: Object.assign({}, migrationX509Options.recipient, { setParameter: { tenantMigrationGarbageCollectionDelayMS: kGarbageCollectionDelayMS, diff --git a/jstests/replsets/tenant_migration_recipient_shard_merge_learn_files.js b/jstests/replsets/tenant_migration_recipient_shard_merge_learn_files.js index 10ccacb62ad..a4b8affa7a3 100644 --- a/jstests/replsets/tenant_migration_recipient_shard_merge_learn_files.js +++ b/jstests/replsets/tenant_migration_recipient_shard_merge_learn_files.js @@ -63,7 +63,7 @@ tenantMigrationTest.assertRecipientNodesInExpectedState({ nodes: tenantMigrationTest.getRecipientRst().nodes, migrationId: migrationUuid, tenantId: tenantId.str, - expectedState: TenantMigrationTest.RecipientState.kLearnedFilenames, + expectedState: TenantMigrationTest.ShardMergeRecipientState.kLearnedFilenames, expectedAccessState: TenantMigrationTest.RecipientAccessState.kReject }); diff --git a/jstests/replsets/tenant_migration_recipient_startup_recovery.js b/jstests/replsets/tenant_migration_recipient_startup_recovery.js index 640a94d4a96..a59601be1be 100644 --- a/jstests/replsets/tenant_migration_recipient_startup_recovery.js +++ b/jstests/replsets/tenant_migration_recipient_startup_recovery.js @@ -22,6 +22,7 @@ load("jstests/libs/uuid_util.js"); const recipientRst = new ReplSetTest({ nodes: 1, name: 'recipient', + serverless: true, nodeOptions: Object.assign(makeX509OptionsForTest().recipient, { setParameter: {"failpoint.PrimaryOnlyServiceSkipRebuildingInstances": tojson({mode: "alwaysOn"})} diff --git a/jstests/replsets/tenant_migration_recipient_ttl.js b/jstests/replsets/tenant_migration_recipient_ttl.js index dd776653a05..75295c9d26d 100644 --- a/jstests/replsets/tenant_migration_recipient_ttl.js +++ b/jstests/replsets/tenant_migration_recipient_ttl.js @@ -5,6 +5,8 @@ * @tags: [ * incompatible_with_macos, * incompatible_with_windows_tls, + * # Shard merge recipient state doc deletion is no longer managed by TTL monitor. + * incompatible_with_shard_merge, * requires_persistence, * serverless, * ] diff --git a/jstests/replsets/tenant_migration_recipient_vote_imported_files.js b/jstests/replsets/tenant_migration_recipient_vote_imported_files.js index 318837ab94d..04e77d5da90 100644 --- a/jstests/replsets/tenant_migration_recipient_vote_imported_files.js +++ b/jstests/replsets/tenant_migration_recipient_vote_imported_files.js @@ -83,7 +83,7 @@ const migrationThread = new Thread(runMigrationAsync, migrationOpts, donorRstArg migrationThread.start(); jsTestLog("Wait for recipient to log 'Waiting for all nodes to call recipientVoteImportedFiles'"); -assert.soon(() => checkLog.checkContainsOnceJson(recipientPrimary, 6113402, {})); +assert.soon(() => checkLog.checkContainsOnceJson(recipientPrimary, 7339751, {})); jsTestLog("Test that recipientVoteImportedFiles succeeds"); voteShouldSucceed(migrationId); diff --git a/jstests/replsets/tenant_migration_resume_collection_cloner_after_recipient_failover.js b/jstests/replsets/tenant_migration_resume_collection_cloner_after_recipient_failover.js index f4db2594cc8..ea9654d97ac 100644 --- a/jstests/replsets/tenant_migration_resume_collection_cloner_after_recipient_failover.js +++ b/jstests/replsets/tenant_migration_resume_collection_cloner_after_recipient_failover.js @@ -25,6 +25,7 @@ const tenantMigrationFailoverTest = function(isTimeSeries, createCollFn, docs) { const recipientRst = new ReplSetTest({ nodes: 2, name: jsTestName() + "_recipient", + serverless: true, nodeOptions: Object.assign(makeX509OptionsForTest().recipient, { setParameter: { // Use a batch size of 2 so that collection cloner requires more than a single diff --git a/jstests/replsets/tenant_migration_resume_collection_cloner_after_recipient_failover_with_dropped_views.js b/jstests/replsets/tenant_migration_resume_collection_cloner_after_recipient_failover_with_dropped_views.js index 5f89b6fe55c..262e40edf00 100644 --- a/jstests/replsets/tenant_migration_resume_collection_cloner_after_recipient_failover_with_dropped_views.js +++ b/jstests/replsets/tenant_migration_resume_collection_cloner_after_recipient_failover_with_dropped_views.js @@ -21,6 +21,7 @@ const tenantMigrationFailoverTest = function(isTimeSeries, createCollFn) { const recipientRst = new ReplSetTest({ nodes: 2, name: jsTestName() + "_recipient", + serverless: true, nodeOptions: Object.assign(makeX509OptionsForTest().recipient, { setParameter: { // Allow reads on recipient before migration completes for testing. diff --git a/jstests/replsets/tenant_migration_resume_collection_cloner_after_rename.js b/jstests/replsets/tenant_migration_resume_collection_cloner_after_rename.js index 2c8288722aa..0919240cf24 100644 --- a/jstests/replsets/tenant_migration_resume_collection_cloner_after_rename.js +++ b/jstests/replsets/tenant_migration_resume_collection_cloner_after_rename.js @@ -26,6 +26,7 @@ load('jstests/replsets/rslib.js'); // 'createRstArgs' const recipientRst = new ReplSetTest({ nodes: 2, name: jsTestName() + "_recipient", + serverless: true, nodeOptions: Object.assign(makeX509OptionsForTest().recipient, { setParameter: { // Use a batch size of 2 so that collection cloner requires more than a single batch to diff --git a/jstests/replsets/tenant_migration_resume_oplog_application.js b/jstests/replsets/tenant_migration_resume_oplog_application.js index a9331393670..70849750914 100644 --- a/jstests/replsets/tenant_migration_resume_oplog_application.js +++ b/jstests/replsets/tenant_migration_resume_oplog_application.js @@ -28,6 +28,7 @@ load('jstests/replsets/rslib.js'); // For 'createRstArgs' const recipientRst = new ReplSetTest({ nodes: 3, name: jsTestName() + "_recipient", + serverless: true, // Use a batch size of 2 so that we can hang in the middle of tenant oplog application. nodeOptions: Object.assign(makeX509OptionsForTest().recipient, {setParameter: {tenantApplierBatchSizeOps: 2}}) diff --git a/jstests/replsets/tenant_migration_retryable_write_retry.js b/jstests/replsets/tenant_migration_retryable_write_retry.js index 282e3756a97..f127d36bd81 100644 --- a/jstests/replsets/tenant_migration_retryable_write_retry.js +++ b/jstests/replsets/tenant_migration_retryable_write_retry.js @@ -28,6 +28,7 @@ const migrationX509Options = makeX509OptionsForTest(); const donorRst = new ReplSetTest({ nodes: 1, name: "donor", + serverless: true, nodeOptions: Object.assign(migrationX509Options.donor, { setParameter: { // Allow non-timestamped reads on donor after migration completes for testing. @@ -35,8 +36,8 @@ const donorRst = new ReplSetTest({ } }) }); -const recipientRst = - new ReplSetTest({nodes: 1, name: "recipient", nodeOptions: migrationX509Options.recipient}); +const recipientRst = new ReplSetTest( + {nodes: 1, name: "recipient", serverless: true, nodeOptions: migrationX509Options.recipient}); donorRst.startSet(); donorRst.initiate(); diff --git a/jstests/replsets/tenant_migration_shard_merge_conflicting_recipient_sync_data_cmds.js b/jstests/replsets/tenant_migration_shard_merge_conflicting_recipient_sync_data_cmds.js new file mode 100644 index 00000000000..2a5a88e1026 --- /dev/null +++ b/jstests/replsets/tenant_migration_shard_merge_conflicting_recipient_sync_data_cmds.js @@ -0,0 +1,224 @@ +/** + * Test that shard merge recipient rejects conflicting recipientSyncData commands. + * + * @tags: [ + * incompatible_with_macos, + * requires_fcv_52, + * incompatible_with_windows_tls, + * requires_majority_read_concern, + * requires_persistence, + * featureFlagShardMerge, + * serverless, + * ] + */ + +import { + getCertificateAndPrivateKey, + isShardMergeEnabled, + makeX509OptionsForTest, +} from "jstests/replsets/libs/tenant_migration_util.js"; + +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/parallelTester.js"); +load("jstests/libs/uuid_util.js"); + +const standalone = MongoRunner.runMongod({}); +const shardMergeFeatureFlagEnabled = isShardMergeEnabled(standalone.getDB("admin")); +MongoRunner.stopMongod(standalone); + +// Note: including this explicit early return here due to the fact that multiversion +// suites will execute this test without featureFlagShardMerge enabled (despite the +// presence of the featureFlagShardMerge tag above), which means the test will attempt +// to run a multi-tenant migration and fail. +if (!shardMergeFeatureFlagEnabled) { + jsTestLog("Skipping Shard Merge-specific test"); + quit(); +} + +const kDonorConnectionString0 = "foo/bar:12345"; +const kDonorConnectionString1 = "foo/bar:56789"; +const kPrimaryReadPreference = { + mode: "primary" +}; +const kRecipientCertificateForDonor = + getCertificateAndPrivateKey("jstests/libs/tenant_migration_recipient.pem"); +const kExpiredRecipientCertificateForDonor = + getCertificateAndPrivateKey("jstests/libs/tenant_migration_recipient_expired.pem"); + +TestData.stopFailPointErrorCode = 4880402; + +/** + * Runs recipientSyncData on the given host and returns the response. + */ +function runRecipientSyncDataCmd(primaryHost, { + migrationIdString, + tenantIds, + donorConnectionString, + readPreference, + recipientCertificateForDonor +}) { + jsTestLog("Starting a recipientSyncDataCmd for migrationId: " + migrationIdString + + " tenantIds: '" + tenantIds + "'"); + const primary = new Mongo(primaryHost); + const res = primary.adminCommand({ + recipientSyncData: 1, + migrationId: UUID(migrationIdString), + donorConnectionString: donorConnectionString, + tenantIds: eval(tenantIds), + protocol: "shard merge", + readPreference: readPreference, + startMigrationDonorTimestamp: Timestamp(1, 1), + recipientCertificateForDonor: recipientCertificateForDonor + }); + return res; +} + +/** + * Returns an array of currentOp entries for the TenantMigrationRecipientService instances that + * match the given query. + */ +function getTenantMigrationRecipientCurrentOpEntries(recipientPrimary, query) { + const cmdObj = Object.assign({currentOp: true, desc: "shard merge recipient"}, query); + return assert.commandWorked(recipientPrimary.adminCommand(cmdObj)).inprog; +} + +/** + * Asserts that the string does not contain certificate or private pem string. + */ +function assertNoCertificateOrPrivateKey(string) { + assert(!string.includes("CERTIFICATE"), "found certificate"); + assert(!string.includes("PRIVATE KEY"), "found private key"); +} + +/** + * Tests that if the client runs multiple recipientSyncData commands that would start conflicting + * migrations, only one of the migrations will start and succeed. + */ +function testConcurrentConflictingMigration(migrationOpts0, migrationOpts1) { + var rst = + new ReplSetTest({nodes: 1, serverless: true, nodeOptions: makeX509OptionsForTest().donor}); + rst.startSet(); + rst.initiate(); + + const primary = rst.getPrimary(); + const configRecipientsColl = primary.getDB("config")["shardMergeRecipients"]; + + // Enable the failpoint to stop the tenant migration after persisting the state doc. + assert.commandWorked(primary.adminCommand({ + configureFailPoint: "fpAfterPersistingTenantMigrationRecipientInstanceStateDoc", + mode: "alwaysOn", + data: {action: "stop", stopErrorCode: NumberInt(TestData.stopFailPointErrorCode)} + })); + + // Start the conflicting recipientSyncData cmds. + const recipientSyncDataThread0 = + new Thread(runRecipientSyncDataCmd, primary.host, migrationOpts0); + const recipientSyncDataThread1 = + new Thread(runRecipientSyncDataCmd, primary.host, migrationOpts1); + recipientSyncDataThread0.start(); + recipientSyncDataThread1.start(); + + const res0 = assert.commandFailed(recipientSyncDataThread0.returnData()); + const res1 = assert.commandFailed(recipientSyncDataThread1.returnData()); + + if (res0.code == TestData.stopFailPointErrorCode) { + assert.commandFailedWithCode(res0, TestData.stopFailPointErrorCode); + assert.commandFailedWithCode(res1, ErrorCodes.ConflictingOperationInProgress); + assertNoCertificateOrPrivateKey(res1.errmsg); + assert.eq(1, configRecipientsColl.count({_id: UUID(migrationOpts0.migrationIdString)})); + assert.eq(1, getTenantMigrationRecipientCurrentOpEntries(primary, { + "instanceID": UUID(migrationOpts0.migrationIdString) + }).length); + if (migrationOpts0.migrationIdString != migrationOpts1.migrationIdString) { + assert.eq(0, configRecipientsColl.count({_id: UUID(migrationOpts1.migrationIdString)})); + assert.eq(0, getTenantMigrationRecipientCurrentOpEntries(primary, { + "instanceID": UUID(migrationOpts1.migrationIdString) + }).length); + } else if (migrationOpts0.tenantIds != migrationOpts1.tenantIds) { + assert.eq(0, configRecipientsColl.count({tenantIds: eval(migrationOpts1.tenantIds)})); + assert.eq(0, getTenantMigrationRecipientCurrentOpEntries(primary, { + tenantIds: eval(migrationOpts1.tenantIds) + }).length); + } + } else { + assert.commandFailedWithCode(res0, ErrorCodes.ConflictingOperationInProgress); + assert.commandFailedWithCode(res1, TestData.stopFailPointErrorCode); + assertNoCertificateOrPrivateKey(res0.errmsg); + assert.eq(1, configRecipientsColl.count({_id: UUID(migrationOpts1.migrationIdString)})); + assert.eq(1, getTenantMigrationRecipientCurrentOpEntries(primary, { + "instanceID": UUID(migrationOpts1.migrationIdString) + }).length); + if (migrationOpts0.migrationIdString != migrationOpts1.migrationIdString) { + assert.eq(0, configRecipientsColl.count({_id: UUID(migrationOpts0.migrationIdString)})); + assert.eq(0, getTenantMigrationRecipientCurrentOpEntries(primary, { + "instanceID": UUID(migrationOpts0.migrationIdString) + }).length); + } else if (migrationOpts0.tenantIds != migrationOpts1.tenantIds) { + assert.eq(0, configRecipientsColl.count({tenantId: eval(migrationOpts0.tenantIds)})); + assert.eq(0, getTenantMigrationRecipientCurrentOpEntries(primary, { + tenantIds: eval(migrationOpts0.tenantIds) + }).length); + } + } + + rst.stopSet(); +} + +// Test migrations with different migrationIds but identical settings. +(() => { + const migrationOpts0 = { + migrationIdString: extractUUIDFromObject(UUID()), + tenantIds: tojson([ObjectId()]), + donorConnectionString: kDonorConnectionString0, + readPreference: kPrimaryReadPreference, + recipientCertificateForDonor: kRecipientCertificateForDonor + }; + const migrationOpts1 = Object.extend({}, migrationOpts0, true); + migrationOpts1.migrationIdString = extractUUIDFromObject(UUID()); + testConcurrentConflictingMigration(migrationOpts0, migrationOpts1); +})(); + +// Test reusing a migrationId with different migration settings. + +// Test different tenantIds. +(() => { + const migrationOpts0 = { + migrationIdString: extractUUIDFromObject(UUID()), + tenantIds: tojson([ObjectId()]), + donorConnectionString: kDonorConnectionString0, + readPreference: kPrimaryReadPreference, + recipientCertificateForDonor: kRecipientCertificateForDonor + }; + const migrationOpts1 = Object.extend({}, migrationOpts0, true); + migrationOpts1.tenantIds = tojson([ObjectId()]); + testConcurrentConflictingMigration(migrationOpts0, migrationOpts1); +})(); + +// Test different donor connection strings. +(() => { + const migrationOpts0 = { + migrationIdString: extractUUIDFromObject(UUID()), + tenantIds: tojson([ObjectId()]), + donorConnectionString: kDonorConnectionString0, + readPreference: kPrimaryReadPreference, + recipientCertificateForDonor: kRecipientCertificateForDonor + }; + const migrationOpts1 = Object.extend({}, migrationOpts0, true); + migrationOpts1.donorConnectionString = kDonorConnectionString1; + testConcurrentConflictingMigration(migrationOpts0, migrationOpts1); +})(); + +// Test different certificates. +(() => { + const migrationOpts0 = { + migrationIdString: extractUUIDFromObject(UUID()), + tenantIds: tojson([ObjectId()]), + donorConnectionString: kDonorConnectionString0, + readPreference: kPrimaryReadPreference, + recipientCertificateForDonor: kRecipientCertificateForDonor + }; + const migrationOpts1 = Object.extend({}, migrationOpts0, true); + migrationOpts1.recipientCertificateForDonor = kExpiredRecipientCertificateForDonor; + testConcurrentConflictingMigration(migrationOpts0, migrationOpts1); +})(); diff --git a/jstests/replsets/tenant_migration_shard_merge_recipient_access_blocker_rollback.js b/jstests/replsets/tenant_migration_shard_merge_recipient_access_blocker_rollback.js new file mode 100644 index 00000000000..6da482857eb --- /dev/null +++ b/jstests/replsets/tenant_migration_shard_merge_recipient_access_blocker_rollback.js @@ -0,0 +1,239 @@ +/** + * Tests if the recipient is rolled back well after a migration has been committed, the shard + * merge recipient access blocker is initialized in the correct state. + * + * @tags: [ + * incompatible_with_macos, + * incompatible_with_windows_tls, + * requires_majority_read_concern, + * requires_persistence, + * featureFlagShardMerge, + * serverless, + * ] + */ + +import {TenantMigrationTest} from "jstests/replsets/libs/tenant_migration_test.js"; +import { + getCertificateAndPrivateKey, + isShardMergeEnabled, + makeX509OptionsForTest +} from "jstests/replsets/libs/tenant_migration_util.js"; + +load("jstests/libs/uuid_util.js"); // For extractUUIDFromObject(). +load("jstests/libs/fail_point_util.js"); // For configureFailPoint(). +load("jstests/libs/write_concern_util.js"); // for 'stopReplicationOnSecondaries' +load("jstests/libs/parallelTester.js"); // For Thread() + +const migrationX509Options = makeX509OptionsForTest(); + +const recipientRst = new ReplSetTest({ + name: "recipRst", + nodes: 3, + serverless: true, + nodeOptions: Object.assign(migrationX509Options.recipient, {}), + settings: {catchUpTimeoutMillis: 0, chainingAllowed: false} +}); + +recipientRst.startSet(); +recipientRst.initiate(); + +// Note: including this explicit early return here due to the fact that multiversion +// suites will execute this test without featureFlagShardMerge enabled (despite the +// presence of the featureFlagShardMerge tag above), which means the test will attempt +// to run a multi-tenant migration and fail. +if (!isShardMergeEnabled(recipientRst.getPrimary().getDB("admin"))) { + recipientRst.stopSet(); + jsTestLog("Skipping Shard Merge-specific test"); + quit(); +} + +// This test case +// 1) Completes and commits a tenant migration. Then forgets the migration (state doc marked with +// 'expireAt', but not yet deleted.) +// 2) Waits until the replica set is stable. +// 3) Rolls back the primary. This makes the primary recover its tenant migration access blockers. +// 4) Ensures that a read is possible from the primary. +function runRollbackAfterMigrationCommitted() { + jsTestLog("Testing a rollback after the migration has been committed and marked forgotten."); + const tenantMigrationTest = new TenantMigrationTest( + {name: jsTestName(), recipientRst: recipientRst, sharedOptions: {nodes: 1}}); + + const kMigrationId = UUID(); + const kTenantId = ObjectId(); + const kReadPreference = {mode: "primary"}; + const migrationOpts = { + migrationIdString: extractUUIDFromObject(kMigrationId), + tenantIds: [kTenantId], + protocol: "shard merge", + readPreference: kReadPreference + }; + + // Populate the donor side with data. + const dbName = tenantMigrationTest.tenantDB(kTenantId.str, "testDB"); + const collName = "testColl"; + const numDocs = 20; + tenantMigrationTest.insertDonorDB( + dbName, + collName, + [...Array(numDocs).keys()].map((i) => ({a: i, band: "Air", song: "La Femme d'Argent"}))); + + jsTestLog(`Starting tenant migration with migrationId ${kMigrationId}`); + assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); + + // Complete and commit the migration, and then forget it as well. + jsTestLog("Waiting for migration to complete and commit."); + TenantMigrationTest.assertCommitted( + tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); + assert.commandWorked(tenantMigrationTest.forgetMigration(migrationOpts.migrationIdString)); + recipientRst.awaitReplication(); + + // It should be possible to read from the recipient now. + jsTestLog("Reading from the recipient primary on the tenant collection."); + const originalPrimary = recipientRst.getPrimary(); + assert.eq(numDocs, originalPrimary.getDB(dbName)[collName].find().itcount()); + + jsTestLog("Halting replication on the secondaries."); + const secondaries = recipientRst.getSecondaries(); + stopServerReplication(secondaries); + + // Prepare the recipient primary for rollback, by inserting non-tenant related data on it while + // replication has been halted. + jsTestLog("Inserting random data on recipient primary."); + const randomColl = originalPrimary.getDB("randomDB")["random_coll"]; + assert.commandWorked(randomColl.insert({x: "The Real Folk Blues"}, {writeConcern: {w: 1}})); + + // Stepping up one of the secondaries should cause the original primary to rollback. + jsTestLog("Stepping up one of the secondaries."); + const newRecipientPrimary = secondaries[0]; + recipientRst.stepUp(newRecipientPrimary, {awaitReplicationBeforeStepUp: false}); + + jsTestLog("Restarting server replication."); + restartServerReplication(secondaries); + recipientRst.awaitReplication(); + + jsTestLog("Stepping up the original primary back to primary."); + recipientRst.stepUp(originalPrimary, {awaitReplicationBeforeStepUp: false}); + + jsTestLog("Perform a read against the original primary on the tenant collection."); + assert.eq(numDocs, originalPrimary.getDB(dbName)[collName].find().itcount()); + + tenantMigrationTest.stop(); +} + +// This test case: +// 1) Sets the replica set up such that the migration has already been committed and forgotten, and +// the state doc has been deleted as well. +// 2) Sends a 'recipientForgetMigration' command to the recipient primary, and waits for the state +// doc to persist. +// 3) Performs a rollback on the recipient primary, so that the access blockers are reconstructed. +// 4) Performs a read on the recipient primary. +function runRollbackAfterLoneRecipientForgetMigrationCommand() { + jsTestLog("Testing a rollback after migration has been committed and completely forgotten."); + const tenantMigrationTest = new TenantMigrationTest( + {name: jsTestName(), recipientRst: recipientRst, sharedOptions: {nodes: 1}}); + + const kMigrationId = UUID(); + const kTenantId = ObjectId(); + const kReadPreference = {mode: "primary"}; + const recipientCertificateForDonor = + getCertificateAndPrivateKey("jstests/libs/tenant_migration_recipient.pem"); + + const dbName = tenantMigrationTest.tenantDB(kTenantId.str, "testDB"); + const collName = "testColl"; + + const originalPrimary = recipientRst.getPrimary(); + const newPrimary = recipientRst.getSecondaries()[0]; + + // We will have the tenant database already on the recipient, as though the tenant migration has + // already run to completion, and the state document has been cleaned up already. + assert.commandWorked(originalPrimary.getDB(dbName)[collName].insert( + {x: "Composer", y: "Mysore Vasudevacharya"})); + recipientRst.awaitReplication(); + + // Prevent the "expireAt" field from being populated. + const fpOriginalPrimary = configureFailPoint(originalPrimary, "hangBeforeTaskCompletion"); + // Prevent the new primary from marking the state document as garbage collectable. + const fpNewPrimary = + configureFailPoint(newPrimary, "pauseBeforeRunTenantMigrationRecipientInstance"); + + function runRecipientForgetMigration(host, { + migrationIdString, + donorConnectionString, + tenantIds, + readPreference, + recipientCertificateForDonor + }) { + const db = new Mongo(host); + return db.adminCommand({ + recipientForgetMigration: 1, + migrationId: UUID(migrationIdString), + donorConnectionString, + tenantIds: eval(tenantIds), + protocol: "shard merge", + decision: "committed", + readPreference, + recipientCertificateForDonor + }); + } + + const recipientForgetMigrationThread = + new Thread(runRecipientForgetMigration, originalPrimary.host, { + migrationIdString: extractUUIDFromObject(kMigrationId), + donorConnectionString: tenantMigrationTest.getDonorRst().getURL(), + tenantIds: tojson([kTenantId]), + readPreference: kReadPreference, + recipientCertificateForDonor + }); + + // Run a delayed/retried recipientForgetMigration command after the state doc has been deleted. + recipientForgetMigrationThread.start(); + + jsTestLog("Wait until the right before the state document's 'expireAt' is set."); + fpOriginalPrimary.wait(); + recipientRst.awaitReplication(); + + // It should be possible to read from the recipient now. + assert.eq(1, originalPrimary.getDB(dbName)[collName].find().itcount()); + + // Now perform a rollback on the recipient primary. + jsTestLog("Halting replication on the secondaries."); + const secondaries = recipientRst.getSecondaries(); + stopServerReplication(secondaries); + + jsTestLog("Inserting random data on recipient primary."); + const randomColl = originalPrimary.getDB("randomDB")["random_coll"]; + assert.commandWorked(randomColl.insert({x: "Que Sera Sera"}, {writeConcern: {w: 1}})); + + // Stepping up one of the secondaries should cause the original primary to rollback. + jsTestLog("Stepping up one of the secondaries."); + recipientRst.stepUp(newPrimary, {awaitReplicationBeforeStepUp: false}); + + assert.commandFailedWithCode(recipientForgetMigrationThread.returnData(), + ErrorCodes.InterruptedDueToReplStateChange); + + // It should be possible to read from new recipient primary. + assert.eq(1, newPrimary.getDB(dbName)[collName].find().itcount()); + + jsTestLog("Restarting server replication."); + restartServerReplication(secondaries); + recipientRst.awaitReplication(); + + jsTestLog("Stepping up the original primary back to primary."); + const fpOriginalPrimaryBeforeStarting = + configureFailPoint(originalPrimary, "pauseBeforeRunTenantMigrationRecipientInstance"); + fpOriginalPrimary.off(); + recipientRst.stepUp(originalPrimary, {awaitReplicationBeforeStepUp: false}); + + jsTestLog("Perform another read against the original primary on the tenant collection."); + assert.eq(1, originalPrimary.getDB(dbName)[collName].find().itcount()); + + fpOriginalPrimaryBeforeStarting.off(); + fpNewPrimary.off(); + + tenantMigrationTest.stop(); +} + +runRollbackAfterMigrationCommitted(); +runRollbackAfterLoneRecipientForgetMigrationCommand(); + +recipientRst.stopSet(); diff --git a/jstests/replsets/tenant_migration_shard_merge_recipient_current_op.js b/jstests/replsets/tenant_migration_shard_merge_recipient_current_op.js index df9eefd9a93..5d7bcc78929 100644 --- a/jstests/replsets/tenant_migration_shard_merge_recipient_current_op.js +++ b/jstests/replsets/tenant_migration_shard_merge_recipient_current_op.js @@ -2,7 +2,7 @@ * Tests the currentOp command during a shard merge protocol. A tenant migration is started, and the * currentOp command is tested as the recipient moves through below state sequence. * - * kStarted ---> kLearnedFilenames ---> kConsistent ---> kDone. + * kStarted ---> kLearnedFilenames ---> kConsistent ---> kCommitted. * * @tags: [ * featureFlagShardMerge, @@ -68,9 +68,6 @@ function checkStandardFieldsOK(res) { assert.eq(bsonWoCompare(res.inprog[0].instanceID, kMigrationId), 0, res); assert.eq(res.inprog[0].donorConnectionString, tenantMigrationTest.getDonorRst().getURL(), res); assert.eq(bsonWoCompare(res.inprog[0].readPreference, kReadPreference), 0, res); - // We don't test failovers in this test so we don't expect these counters to be incremented. - assert.eq(res.inprog[0].numRestartsDueToDonorConnectionFailure, 0, res); - assert.eq(res.inprog[0].numRestartsDueToRecipientFailure, 0, res); } // Check currentOp fields' expected value once the recipient is in state "consistent" or later. @@ -85,8 +82,6 @@ function checkPostConsistentFieldsOK(res) { assert(currOp.hasOwnProperty("cloneFinishedRecipientOpTime") && checkOptime(currOp.cloneFinishedRecipientOpTime), res); - // Not applicable to shard merge protocol. - assert(!currOp.hasOwnProperty("dataConsistentStopDonorOpTime")); } // Validates the fields of an optime object. @@ -112,22 +107,21 @@ const fpAfterForgetMigration = configureFailPoint( recipientPrimary, "fpAfterReceivingRecipientForgetMigration", {action: "hang"}); jsTestLog(`Starting tenant migration with migrationId: ${kMigrationId}`); -assert.commandWorked( - tenantMigrationTest.startMigration(migrationOpts, {enableDonorStartMigrationFsync: true})); +assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); const fpBeforePersistingRejectReadsBeforeTimestamp = configureFailPoint( recipientPrimary, "fpBeforePersistingRejectReadsBeforeTimestamp", {action: "hang"}); { - // Wait until a current operation corresponding to "tenant recipient migration" with state + // Wait until a current operation corresponding to "shard merge recipient" with state // kStarted is visible on the recipientPrimary. jsTestLog("Waiting until current operation with state kStarted is visible."); fpAfterPersistingStateDoc.wait(); - let res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); + let res = recipientPrimary.adminCommand({currentOp: true, desc: "shard merge recipient"}); checkStandardFieldsOK(res); let currOp = res.inprog[0]; - assert.eq(currOp.state, TenantMigrationTest.RecipientState.kStarted, res); + assert.eq(currOp.state, TenantMigrationTest.ShardMergeRecipientState.kStarted, res); assert.eq(currOp.garbageCollectable, false, res); assert.eq(currOp.migrationCompleted, false, res); assert(!currOp.hasOwnProperty("startFetchingDonorOpTime"), res); @@ -135,8 +129,6 @@ const fpBeforePersistingRejectReadsBeforeTimestamp = configureFailPoint( assert(!currOp.hasOwnProperty("expireAt"), res); assert(!currOp.hasOwnProperty("donorSyncSource"), res); assert(!currOp.hasOwnProperty("cloneFinishedRecipientOpTime"), res); - // Not applicable to shard merge protocol. - assert(!currOp.hasOwnProperty("dataConsistentStopDonorOpTime"), res); fpAfterPersistingStateDoc.off(); } @@ -147,13 +139,11 @@ const fpBeforePersistingRejectReadsBeforeTimestamp = configureFailPoint( jsTestLog("Waiting for startFetchingDonorOpTime to exist."); fpAfterRetrievingStartOpTime.wait(); - let res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); + let res = recipientPrimary.adminCommand({currentOp: true, desc: "shard merge recipient"}); checkStandardFieldsOK(res); let currOp = res.inprog[0]; assert.gt(new Date(), currOp.receiveStart, tojson(res)); - - assert.eq(currOp.state, TenantMigrationTest.RecipientState.kLearnedFilenames, res); - + assert.eq(currOp.state, TenantMigrationTest.ShardMergeRecipientState.kLearnedFilenames, res); assert.eq(currOp.garbageCollectable, false, res); assert.eq(currOp.migrationCompleted, false, res); assert(!currOp.hasOwnProperty("expireAt"), res); @@ -166,8 +156,6 @@ const fpBeforePersistingRejectReadsBeforeTimestamp = configureFailPoint( res); assert(currOp.hasOwnProperty("donorSyncSource") && typeof currOp.donorSyncSource === 'string', res); - // Not applicable to shard merge protocol. - assert(!currOp.hasOwnProperty("dataConsistentStopDonorOpTime"), res); fpAfterRetrievingStartOpTime.off(); } @@ -177,12 +165,12 @@ const fpBeforePersistingRejectReadsBeforeTimestamp = configureFailPoint( jsTestLog("Waiting for the kConsistent state to be reached."); fpAfterDataConsistent.wait(); - let res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); + let res = recipientPrimary.adminCommand({currentOp: true, desc: "shard merge recipient"}); checkStandardFieldsOK(res); checkPostConsistentFieldsOK(res); let currOp = res.inprog[0]; // State should have changed. - assert.eq(currOp.state, TenantMigrationTest.RecipientState.kConsistent, res); + assert.eq(currOp.state, TenantMigrationTest.ShardMergeRecipientState.kConsistent, res); assert.eq(currOp.garbageCollectable, false, res); assert.eq(currOp.migrationCompleted, false, res); assert(!currOp.hasOwnProperty("expireAt"), res); @@ -191,12 +179,12 @@ const fpBeforePersistingRejectReadsBeforeTimestamp = configureFailPoint( fpAfterDataConsistent.off(); fpBeforePersistingRejectReadsBeforeTimestamp.wait(); - res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); + res = recipientPrimary.adminCommand({currentOp: true, desc: "shard merge recipient"}); checkStandardFieldsOK(res); checkPostConsistentFieldsOK(res); currOp = res.inprog[0]; // State should have changed. - assert.eq(currOp.state, TenantMigrationTest.RecipientState.kConsistent, res); + assert.eq(currOp.state, TenantMigrationTest.ShardMergeRecipientState.kConsistent, res); assert.eq(currOp.garbageCollectable, false, res); assert.eq(currOp.migrationCompleted, false, res); assert(!currOp.hasOwnProperty("expireAt"), res); @@ -220,11 +208,11 @@ forgetMigrationThread.start(); jsTestLog("Waiting for the recipient to receive the forgetMigration, and pause at failpoint"); fpAfterForgetMigration.wait(); - let res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); + let res = recipientPrimary.adminCommand({currentOp: true, desc: "shard merge recipient"}); checkStandardFieldsOK(res); checkPostConsistentFieldsOK(res); let currOp = res.inprog[0]; - assert.eq(currOp.state, TenantMigrationTest.RecipientState.kConsistent, res); + assert.eq(currOp.state, TenantMigrationTest.ShardMergeRecipientState.kConsistent, res); assert.eq(currOp.garbageCollectable, false, res); // migrationCompleted should have changed. assert.eq(currOp.migrationCompleted, true, res); @@ -234,13 +222,13 @@ forgetMigrationThread.start(); fpAfterForgetMigration.off(); assert.commandWorked(forgetMigrationThread.returnData()); - res = recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); + res = recipientPrimary.adminCommand({currentOp: true, desc: "shard merge recipient"}); checkStandardFieldsOK(res); checkPostConsistentFieldsOK(res); currOp = res.inprog[0]; assert.eq(currOp.migrationCompleted, true, res); // State, completion status and expireAt should have changed. - assert.eq(currOp.state, TenantMigrationTest.RecipientState.kCommitted, res); + assert.eq(currOp.state, TenantMigrationTest.ShardMergeRecipientState.kCommitted, res); assert.eq(currOp.garbageCollectable, true, res); assert(currOp.hasOwnProperty("expireAt") && currOp.expireAt instanceof Date, res); } diff --git a/jstests/replsets/tenant_migration_shard_merge_recipient_retry_forget_migration.js b/jstests/replsets/tenant_migration_shard_merge_recipient_retry_forget_migration.js new file mode 100644 index 00000000000..1dcbacef5b8 --- /dev/null +++ b/jstests/replsets/tenant_migration_shard_merge_recipient_retry_forget_migration.js @@ -0,0 +1,131 @@ +/** + * Tests that a recipientForgetMigration is received after the recipient state doc has been deleted + * for shard merge protocol. + * + * @tags: [ + * incompatible_with_macos, + * incompatible_with_windows_tls, + * requires_majority_read_concern, + * requires_persistence, + * featureFlagShardMerge, + * serverless, + * ] + */ + +import {TenantMigrationTest} from "jstests/replsets/libs/tenant_migration_test.js"; +import { + getCertificateAndPrivateKey, + isShardMergeEnabled +} from "jstests/replsets/libs/tenant_migration_util.js"; + +load("jstests/libs/fail_point_util.js"); // For configureFailPoint(). +load("jstests/libs/parallelTester.js"); // For Thread() +load("jstests/libs/uuid_util.js"); // For extractUUIDFromObject(). + +const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); + +const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); + +// Note: including this explicit early return here due to the fact that multiversion +// suites will execute this test without featureFlagShardMerge enabled (despite the +// presence of the featureFlagShardMerge tag above), which means the test will attempt +// to run a multi-tenant migration and fail. +if (!isShardMergeEnabled(recipientPrimary.getDB("admin"))) { + tenantMigrationTest.stop(); + jsTestLog("Skipping Shard Merge-specific test"); + quit(); +} + +const migrationId = UUID(); +const tenantId = ObjectId(); +const recipientCertificateForDonor = + getCertificateAndPrivateKey("jstests/libs/tenant_migration_recipient.pem"); + +const dbName = tenantMigrationTest.tenantDB(tenantId.str, "test"); +const collName = "coll"; + +// Not doing a migration before writing to the recipient to mimic that a migration has completed and +// the state doc has been garbage collected. +assert.commandWorked(recipientPrimary.getDB(dbName)[collName].insert({_id: 1})); + +function runRecipientForgetMigration(host, { + migrationIdString, + donorConnectionString, + tenantIds, + readPreference, + recipientCertificateForDonor +}) { + const db = new Mongo(host); + return db.adminCommand({ + recipientForgetMigration: 1, + migrationId: UUID(migrationIdString), + donorConnectionString, + tenantIds: eval(tenantIds), + protocol: "shard merge", + decision: "committed", + readPreference: {mode: "primary"}, + recipientCertificateForDonor + }); +} + +const fp = configureFailPoint( + recipientPrimary, "fpBeforeMarkingStateDocAsGarbageCollectable", {action: "hang"}); + +const recipientForgetMigrationThread = + new Thread(runRecipientForgetMigration, recipientPrimary.host, { + migrationIdString: extractUUIDFromObject(migrationId), + donorConnectionString: tenantMigrationTest.getDonorRst().getURL(), + tenantIds: tojson([tenantId]), + recipientCertificateForDonor + }); + +// Run a delayed/retried recipientForgetMigration command after the state doc has been deleted. +recipientForgetMigrationThread.start(); + +// Block the recipient before it updates the state doc with an expireAt field. +fp.wait(); + +let currOp = assert + .commandWorked(recipientPrimary.adminCommand( + {currentOp: true, desc: "shard merge recipient"})) + .inprog[0]; +assert.eq(currOp.state, TenantMigrationTest.ShardMergeRecipientState.kCommitted, currOp); +assert(!currOp.hasOwnProperty("expireAt"), currOp); + +// Test that we can still read from the recipient. +assert.eq(1, recipientPrimary.getDB(dbName)[collName].find().itcount()); + +const newRecipientPrimary = tenantMigrationTest.getRecipientRst().getSecondary(); +const newPrimaryFp = configureFailPoint(newRecipientPrimary, "hangBeforeTaskCompletion"); + +// Step up a new recipient primary before the state doc is truly marked as garbage collectable. +tenantMigrationTest.getRecipientRst().stepUp(newRecipientPrimary); +fp.off(); + +// The new primary should skip all tenant migration steps but wait for another +// recipientForgetMigration command. +newPrimaryFp.wait(); + +assert.commandFailedWithCode(recipientForgetMigrationThread.returnData(), + ErrorCodes.InterruptedDueToReplStateChange); + +// Test that we can still read from the recipient. +assert.eq(1, newRecipientPrimary.getDB(dbName)[collName].find().itcount()); + +// Test that we can retry the recipientForgetMigration on the new primary. +newPrimaryFp.off(); +assert.commandWorked(runRecipientForgetMigration(newRecipientPrimary.host, { + migrationIdString: extractUUIDFromObject(migrationId), + donorConnectionString: tenantMigrationTest.getDonorRst().getURL(), + tenantIds: tojson([tenantId]), + recipientCertificateForDonor +})); + +currOp = assert + .commandWorked( + newRecipientPrimary.adminCommand({currentOp: true, desc: "shard merge recipient"})) + .inprog[0]; +assert.eq(currOp.state, TenantMigrationTest.ShardMergeRecipientState.kCommitted, currOp); +assert(currOp.hasOwnProperty("expireAt"), currOp); + +tenantMigrationTest.stop(); diff --git a/jstests/replsets/tenant_migration_shard_merge_ssl_configuration.js b/jstests/replsets/tenant_migration_shard_merge_ssl_configuration.js new file mode 100644 index 00000000000..e52d7ad6af5 --- /dev/null +++ b/jstests/replsets/tenant_migration_shard_merge_ssl_configuration.js @@ -0,0 +1,382 @@ +/** + * Test that tenant migration commands only require and use certificate fields, and require SSL to + * to be enabled when 'tenantMigrationDisableX509Auth' server parameter is false (default). + * Note: If a migration is started and SSL is not enabled on the recipient, we will repeatedly get + * back HostUnreachable on the donor side. + * + * @tags: [ + * incompatible_with_macos, + * requires_majority_read_concern, + * requires_persistence, + * featureFlagShardMerge, + * serverless, + * ] + */ + +import {TenantMigrationTest} from "jstests/replsets/libs/tenant_migration_test.js"; +import { + donorStartMigrationWithProtocol, + getCertificateAndPrivateKey, + isMigrationCompleted, + isShardMergeEnabled, + makeMigrationCertificatesForTest, + makeX509OptionsForTest, + runTenantMigrationCommand, +} from "jstests/replsets/libs/tenant_migration_util.js"; + +const standalone = MongoRunner.runMongod({}); +const shardMergeFeatureFlagEnabled = isShardMergeEnabled(standalone.getDB("admin")); +MongoRunner.stopMongod(standalone); + +// Note: including this explicit early return here due to the fact that multiversion +// suites will execute this test without featureFlagShardMerge enabled (despite the +// presence of the featureFlagShardMerge tag above), which means the test will attempt +// to run a multi-tenant migration and fail. +if (!shardMergeFeatureFlagEnabled) { + jsTestLog("Skipping Shard Merge-specific test"); + quit(); +} + +const kTenantId = ObjectId(); +const kReadPreference = { + mode: "primary" +}; +const kValidMigrationCertificates = makeMigrationCertificatesForTest(); +const kExpiredMigrationCertificates = { + donorCertificateForRecipient: + getCertificateAndPrivateKey("jstests/libs/tenant_migration_donor_expired.pem"), + recipientCertificateForDonor: + getCertificateAndPrivateKey("jstests/libs/tenant_migration_recipient_expired.pem") +}; + +(() => { + jsTest.log( + "Test that certificate fields are required when tenantMigrationDisableX509Auth=false"); + const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); + + const donorPrimary = tenantMigrationTest.getDonorPrimary(); + const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); + + jsTest.log("Test that donorStartMigration requires 'donorCertificateForRecipient' when " + + "tenantMigrationDisableX509Auth=false"); + assert.commandFailedWithCode(donorPrimary.adminCommand({ + donorStartMigration: 1, + migrationId: UUID(), + recipientConnectionString: tenantMigrationTest.getRecipientRst().getURL(), + tenantIds: [kTenantId], + protocol: "shard merge", + readPreference: kReadPreference, + recipientCertificateForDonor: kValidMigrationCertificates.recipientCertificateForDonor, + }), + ErrorCodes.InvalidOptions); + + jsTest.log("Test that donorStartMigration requires 'recipientCertificateForDonor' when " + + "tenantMigrationDisableX509Auth=false"); + assert.commandFailedWithCode(donorPrimary.adminCommand({ + donorStartMigration: 1, + migrationId: UUID(), + recipientConnectionString: tenantMigrationTest.getRecipientRst().getURL(), + tenantIds: [kTenantId], + protocol: "shard merge", + readPreference: kReadPreference, + donorCertificateForRecipient: kValidMigrationCertificates.donorCertificateForRecipient, + }), + ErrorCodes.InvalidOptions); + + jsTest.log("Test that recipientSyncData requires 'recipientCertificateForDonor' when " + + "tenantMigrationDisableX509Auth=false"); + assert.commandFailedWithCode(recipientPrimary.adminCommand({ + recipientSyncData: 1, + migrationId: UUID(), + donorConnectionString: tenantMigrationTest.getDonorRst().getURL(), + tenantIds: [kTenantId], + protocol: "shard merge", + startMigrationDonorTimestamp: Timestamp(1, 1), + readPreference: kReadPreference + }), + ErrorCodes.InvalidOptions); + + jsTest.log("Test that recipientForgetMigration requires 'recipientCertificateForDonor' when " + + "tenantMigrationDisableX509Auth=false"); + assert.commandFailedWithCode(recipientPrimary.adminCommand({ + recipientForgetMigration: 1, + migrationId: UUID(), + donorConnectionString: tenantMigrationTest.getDonorRst().getURL(), + tenantIds: [kTenantId], + protocol: "shard merge", + decision: "aborted", + readPreference: kReadPreference + }), + ErrorCodes.InvalidOptions); + + tenantMigrationTest.stop(); +})(); + +(() => { + jsTest.log("Test that donorStartMigration fails if SSL is not enabled on the donor and " + + "tenantMigrationDisableX509Auth=false"); + const donorRst = new ReplSetTest({nodes: 1, name: "donor", serverless: true}); + donorRst.startSet(); + donorRst.initiate(); + + const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), donorRst}); + + const donorPrimary = tenantMigrationTest.getDonorPrimary(); + + assert.commandFailedWithCode(donorPrimary.adminCommand({ + donorStartMigration: 1, + migrationId: UUID(), + recipientConnectionString: tenantMigrationTest.getRecipientRst().getURL(), + tenantIds: [kTenantId], + protocol: "shard merge", + readPreference: kReadPreference, + donorCertificateForRecipient: kValidMigrationCertificates.donorCertificateForRecipient, + recipientCertificateForDonor: kValidMigrationCertificates.recipientCertificateForDonor, + }), + ErrorCodes.IllegalOperation); + + donorRst.stopSet(); + tenantMigrationTest.stop(); +})(); + +(() => { + jsTest.log("Test that recipientSyncData fails if SSL is not enabled on the recipient and " + + "tenantMigrationDisableX509Auth=false"); + const recipientRst = new ReplSetTest({nodes: 1, name: "recipient", serverless: true}); + recipientRst.startSet(); + recipientRst.initiate(); + + const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), recipientRst}); + + const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); + + assert.commandFailedWithCode(recipientPrimary.adminCommand({ + recipientSyncData: 1, + migrationId: UUID(), + donorConnectionString: tenantMigrationTest.getDonorRst().getURL(), + tenantIds: [kTenantId], + protocol: "shard merge", + readPreference: kReadPreference, + startMigrationDonorTimestamp: Timestamp(1, 1), + recipientCertificateForDonor: kValidMigrationCertificates.recipientCertificateForDonor, + }), + ErrorCodes.IllegalOperation); + + recipientRst.stopSet(); + tenantMigrationTest.stop(); +})(); + +(() => { + jsTest.log("Test that recipientSyncData doesn't require 'recipientCertificateForDonor' when " + + "tenantMigrationDisableX509Auth=true"); + const migrationX509Options = makeX509OptionsForTest(); + const recipientRst = new ReplSetTest({ + nodes: 1, + name: "recipient", + serverless: true, + nodeOptions: Object.assign(migrationX509Options.recipient, + {setParameter: {tenantMigrationDisableX509Auth: true}}) + }); + + recipientRst.startSet(); + recipientRst.initiate(); + + const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), recipientRst}); + const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); + + // In order to avoid "SnapshotUnavailable" error, fsync all the data on the donor before + // recipient starts migration. + assert.commandWorked(tenantMigrationTest.getDonorPrimary().adminCommand({fsync: 1})); + + assert.commandWorked(recipientPrimary.adminCommand({ + recipientSyncData: 1, + migrationId: UUID(), + donorConnectionString: tenantMigrationTest.getDonorRst().getURL(), + tenantIds: [kTenantId], + protocol: "shard merge", + startMigrationDonorTimestamp: Timestamp(1, 1), + readPreference: kReadPreference + })); + + recipientRst.stopSet(); + tenantMigrationTest.stop(); +})(); + +(() => { + jsTest.log( + "Test that recipientForgetMigration doesn't require 'recipientCertificateForDonor' when " + + "tenantMigrationDisableX509Auth=true"); + const migrationX509Options = makeX509OptionsForTest(); + const recipientRst = new ReplSetTest({ + nodes: 1, + name: "recipient", + serverless: true, + nodeOptions: Object.assign(migrationX509Options.recipient, + {setParameter: {tenantMigrationDisableX509Auth: true}}) + }); + + recipientRst.startSet(); + recipientRst.initiate(); + + const tenantMigrationTest = new TenantMigrationTest({name: jsTestName(), recipientRst}); + + const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); + + assert.commandWorked(recipientPrimary.adminCommand({ + recipientForgetMigration: 1, + migrationId: UUID(), + donorConnectionString: tenantMigrationTest.getDonorRst().getURL(), + tenantIds: [kTenantId], + protocol: "shard merge", + decision: "aborted", + readPreference: kReadPreference + })); + + recipientRst.stopSet(); + tenantMigrationTest.stop(); +})(); + +(() => { + jsTest.log("Test that donorStartMigration doesn't require certificate fields when " + + "tenantMigrationDisableX509Auth=true"); + const migrationX509Options = makeX509OptionsForTest(); + const donorRst = new ReplSetTest({ + nodes: 1, + name: "donor", + serverless: true, + nodeOptions: Object.assign(migrationX509Options.donor, + {setParameter: {tenantMigrationDisableX509Auth: true}}) + }); + const recipientRst = new ReplSetTest({ + nodes: 1, + name: "recipient", + serverless: true, + nodeOptions: Object.assign(migrationX509Options.recipient, + {setParameter: {tenantMigrationDisableX509Auth: true}}) + }); + + donorRst.startSet(); + donorRst.initiate(); + + recipientRst.startSet(); + recipientRst.initiate(); + + const tenantMigrationTest = + new TenantMigrationTest({name: jsTestName(), donorRst, recipientRst}); + + const migrationId = UUID(); + const donorStartMigrationCmdObj = { + donorStartMigration: 1, + migrationId: migrationId, + recipientConnectionString: tenantMigrationTest.getRecipientRst().getURL(), + tenantIds: [kTenantId], + protocol: "shard merge", + readPreference: kReadPreference + }; + const stateRes = assert.commandWorked(runTenantMigrationCommand( + donorStartMigrationCmdObj, + donorRst, + {retryOnRetryableErrors: false, shouldStopFunc: isMigrationCompleted})); + assert.eq(stateRes.state, TenantMigrationTest.DonorState.kCommitted); + assert.commandWorked( + donorRst.getPrimary().adminCommand({donorForgetMigration: 1, migrationId: migrationId})); + + donorRst.stopSet(); + recipientRst.stopSet(); + tenantMigrationTest.stop(); +})(); + +(() => { + jsTest.log("Test that tenant migration doesn't fail if SSL is not enabled on the donor and " + + "the recipient and tenantMigrationDisableX509Auth=true"); + + const donorRst = new ReplSetTest({ + nodes: 1, + name: "donor", + serverless: true, + nodeOptions: {setParameter: {tenantMigrationDisableX509Auth: true}} + }); + const recipientRst = new ReplSetTest({ + nodes: 1, + name: "recipient", + serverless: true, + nodeOptions: {setParameter: {tenantMigrationDisableX509Auth: true}} + }); + + donorRst.startSet(); + donorRst.initiate(); + + recipientRst.startSet(); + recipientRst.initiate(); + + const tenantMigrationTest = + new TenantMigrationTest({name: jsTestName(), donorRst, recipientRst}); + + const donorStartMigrationCmdObj = { + donorStartMigration: 1, + migrationId: UUID(), + recipientConnectionString: tenantMigrationTest.getRecipientRst().getURL(), + tenantIds: [kTenantId], + protocol: "shard merge", + readPreference: kReadPreference + }; + + const stateRes = assert.commandWorked(runTenantMigrationCommand( + donorStartMigrationCmdObj, + donorRst, + {retryOnRetryableErrors: false, shouldStopFunc: isMigrationCompleted})); + assert.eq(stateRes.state, TenantMigrationTest.DonorState.kCommitted); + + donorRst.stopSet(); + recipientRst.stopSet(); + tenantMigrationTest.stop(); +})(); + +(() => { + jsTest.log( + "Test that input certificate fields are not used when tenantMigrationDisableX509Auth=true"); + const migrationX509Options = makeX509OptionsForTest(); + const donorRst = new ReplSetTest({ + nodes: 1, + name: "donor", + serverless: true, + nodeOptions: Object.assign(migrationX509Options.donor, + {setParameter: {tenantMigrationDisableX509Auth: true}}) + }); + const recipientRst = new ReplSetTest({ + nodes: 1, + name: "recipient", + serverless: true, + nodeOptions: Object.assign(migrationX509Options.recipient, + {setParameter: {tenantMigrationDisableX509Auth: true}}) + }); + + donorRst.startSet(); + donorRst.initiate(); + + recipientRst.startSet(); + recipientRst.initiate(); + + const tenantMigrationTest = + new TenantMigrationTest({name: jsTestName(), donorRst, recipientRst}); + + const donorStartMigrationCmdObj = { + donorStartMigration: 1, + migrationId: UUID(), + recipientConnectionString: tenantMigrationTest.getRecipientRst().getURL(), + tenantIds: [kTenantId], + protocol: "shard merge", + readPreference: kReadPreference, + donorCertificateForRecipient: kExpiredMigrationCertificates.donorCertificateForRecipient, + recipientCertificateForDonor: kExpiredMigrationCertificates.recipientCertificateForDonor, + }; + const stateRes = assert.commandWorked(runTenantMigrationCommand( + donorStartMigrationCmdObj, + donorRst, + {retryOnRetryableErrors: false, shouldStopFunc: isMigrationCompleted})); + assert.eq(stateRes.state, TenantMigrationTest.DonorState.kCommitted); + + donorRst.stopSet(); + recipientRst.stopSet(); + tenantMigrationTest.stop(); +})(); diff --git a/jstests/replsets/tenant_migration_ssl_configuration.js b/jstests/replsets/tenant_migration_ssl_configuration.js index de4032102cc..9e9d7b5e285 100644 --- a/jstests/replsets/tenant_migration_ssl_configuration.js +++ b/jstests/replsets/tenant_migration_ssl_configuration.js @@ -6,6 +6,8 @@ * * @tags: [ * incompatible_with_macos, + * # Shard merge protocol will be tested by tenant_migration_shard_merge_ssl_configuration.js. + * incompatible_with_shard_merge, * requires_majority_read_concern, * requires_persistence, * serverless, @@ -99,7 +101,7 @@ const kExpiredMigrationCertificates = { (() => { jsTest.log("Test that donorStartMigration fails if SSL is not enabled on the donor and " + "tenantMigrationDisableX509Auth=false"); - const donorRst = new ReplSetTest({nodes: 1, name: "donor"}); + const donorRst = new ReplSetTest({nodes: 1, name: "donor", serverless: true}); donorRst.startSet(); donorRst.initiate(); @@ -127,7 +129,7 @@ const kExpiredMigrationCertificates = { (() => { jsTest.log("Test that recipientSyncData fails if SSL is not enabled on the recipient and " + "tenantMigrationDisableX509Auth=false"); - const recipientRst = new ReplSetTest({nodes: 1, name: "recipient"}); + const recipientRst = new ReplSetTest({nodes: 1, name: "recipient", serverless: true}); recipientRst.startSet(); recipientRst.initiate(); @@ -157,6 +159,7 @@ const kExpiredMigrationCertificates = { const recipientRst = new ReplSetTest({ nodes: 1, name: "recipient", + serverless: true, nodeOptions: Object.assign(migrationX509Options.recipient, {setParameter: {tenantMigrationDisableX509Auth: true}}) }); @@ -188,6 +191,7 @@ const kExpiredMigrationCertificates = { const recipientRst = new ReplSetTest({ nodes: 1, name: "recipient", + serverless: true, nodeOptions: Object.assign(migrationX509Options.recipient, {setParameter: {tenantMigrationDisableX509Auth: true}}) }); @@ -218,12 +222,14 @@ const kExpiredMigrationCertificates = { const donorRst = new ReplSetTest({ nodes: 1, name: "donor", + serverless: true, nodeOptions: Object.assign(migrationX509Options.donor, {setParameter: {tenantMigrationDisableX509Auth: true}}) }); const recipientRst = new ReplSetTest({ nodes: 1, name: "recipient", + serverless: true, nodeOptions: Object.assign(migrationX509Options.recipient, {setParameter: {tenantMigrationDisableX509Auth: true}}) }); @@ -265,11 +271,13 @@ const kExpiredMigrationCertificates = { const donorRst = new ReplSetTest({ nodes: 1, name: "donor", + serverless: true, nodeOptions: {setParameter: {tenantMigrationDisableX509Auth: true}} }); const recipientRst = new ReplSetTest({ nodes: 1, name: "recipient", + serverless: true, nodeOptions: {setParameter: {tenantMigrationDisableX509Auth: true}} }); @@ -308,12 +316,14 @@ const kExpiredMigrationCertificates = { const donorRst = new ReplSetTest({ nodes: 1, name: "donor", + serverless: true, nodeOptions: Object.assign(migrationX509Options.donor, {setParameter: {tenantMigrationDisableX509Auth: true}}) }); const recipientRst = new ReplSetTest({ nodes: 1, name: "recipient", + serverless: true, nodeOptions: Object.assign(migrationX509Options.recipient, {setParameter: {tenantMigrationDisableX509Auth: true}}) }); diff --git a/jstests/replsets/tenant_migration_stepup_recovery_after_abort.js b/jstests/replsets/tenant_migration_stepup_recovery_after_abort.js index ca708b301d5..9e16b82dd27 100644 --- a/jstests/replsets/tenant_migration_stepup_recovery_after_abort.js +++ b/jstests/replsets/tenant_migration_stepup_recovery_after_abort.js @@ -25,6 +25,7 @@ const kGarbageCollectionParams = { const donorRst = new ReplSetTest({ nodes: 3, name: "donor", + serverless: true, nodeOptions: Object.assign(makeX509OptionsForTest().donor, {setParameter: kGarbageCollectionParams}) }); diff --git a/jstests/replsets/tenant_migration_sync_source_too_stale.js b/jstests/replsets/tenant_migration_sync_source_too_stale.js index e72f715fc7d..a60a06db48f 100644 --- a/jstests/replsets/tenant_migration_sync_source_too_stale.js +++ b/jstests/replsets/tenant_migration_sync_source_too_stale.js @@ -34,6 +34,7 @@ load('jstests/replsets/rslib.js'); const donorRst = new ReplSetTest({ name: `${jsTestName()}_donor`, nodes: 3, + serverless: true, settings: {chainingAllowed: false}, nodeOptions: Object.assign(makeX509OptionsForTest().donor, { setParameter: { diff --git a/jstests/replsets/tenant_migration_timeseries_retryable_write_oplog_cloning.js b/jstests/replsets/tenant_migration_timeseries_retryable_write_oplog_cloning.js index 96a04e013b2..d0b4b37e496 100644 --- a/jstests/replsets/tenant_migration_timeseries_retryable_write_oplog_cloning.js +++ b/jstests/replsets/tenant_migration_timeseries_retryable_write_oplog_cloning.js @@ -33,12 +33,14 @@ function testOplogCloning(ordered) { const donorRst = new ReplSetTest({ nodes: 1, name: "donor", + serverless: true, nodeOptions: Object.assign(migrationX509Options.donor, {setParameter: kGarbageCollectionParams}) }); const recipientRst = new ReplSetTest({ nodes: 1, name: "recipient", + serverless: true, nodeOptions: Object.assign(migrationX509Options.recipient, {setParameter: kGarbageCollectionParams}) }); diff --git a/jstests/replsets/tenant_migrations_noop_writes.js b/jstests/replsets/tenant_migrations_noop_writes.js index 3cfd8445423..0c43a6f2f4f 100644 --- a/jstests/replsets/tenant_migrations_noop_writes.js +++ b/jstests/replsets/tenant_migrations_noop_writes.js @@ -74,6 +74,7 @@ function setup() { const donorRst = new ReplSetTest({ nodes: 3, name: "donor", + serverless: true, settings: {chainingAllowed: false}, nodeOptions: Object.assign(migrationX509Options.donor, { setParameter: { @@ -89,6 +90,7 @@ function setup() { const recipientRst = new ReplSetTest({ nodes: 3, name: "recipient", + serverless: true, settings: {chainingAllowed: false}, nodeOptions: migrationX509Options.recipient }); diff --git a/jstests/sharding/tenant_migration_disallowed_on_config_server.js b/jstests/sharding/tenant_migration_disallowed_on_config_server.js index 3993794859e..32c64148768 100644 --- a/jstests/sharding/tenant_migration_disallowed_on_config_server.js +++ b/jstests/sharding/tenant_migration_disallowed_on_config_server.js @@ -3,6 +3,9 @@ * * @tags: [ * incompatible_with_windows_tls, + * # Shard merge protocol will be tested by + * # tenant_migration_shard_merge_disallowed_on_config_server.js. + * incompatible_with_shard_merge, * requires_majority_read_concern, * requires_persistence, * does_not_support_stepdowns, diff --git a/jstests/sharding/tenant_migration_shard_merge_disallowed_on_config_server.js b/jstests/sharding/tenant_migration_shard_merge_disallowed_on_config_server.js new file mode 100644 index 00000000000..ffd6ca11ff7 --- /dev/null +++ b/jstests/sharding/tenant_migration_shard_merge_disallowed_on_config_server.js @@ -0,0 +1,100 @@ +/** + * Tests that tenant migration commands cannot be run on sharded clusters for config servers for + * shard merge protocol. + * + * @tags: [ + * incompatible_with_windows_tls, + * requires_majority_read_concern, + * requires_persistence, + * does_not_support_stepdowns, + * featureFlagShardMerge, + * serverless, + * ] + */ + +import {TenantMigrationTest} from "jstests/replsets/libs/tenant_migration_test.js"; +import {isShardMergeEnabled} from "jstests/replsets/libs/tenant_migration_util.js"; + +(function() { +load("jstests/libs/catalog_shard_util.js"); + +const st = new ShardingTest({shards: 1}); +const donorRstShard = st.rs0; +const donorRstConfig = st.configRS; + +if (CatalogShardUtil.isEnabledIgnoringFCV(st)) { + // TODO SERVER-73409: Decide how to handle tenant migrations on a config server then revisit + // this test. Currently it does not pass when the catalog shard feature flag is enabled because + // the config server will have the shard role, so it won't reject tenant migration commands. + jsTestLog("Skipping test because catalog shard mode is enabled"); + st.stop(); + return; +} + +// Note: including this explicit early return here due to the fact that multiversion +// suites will execute this test without featureFlagShardMerge enabled (despite the +// presence of the featureFlagShardMerge tag above), which means the test will attempt +// to run a multi-tenant migration and fail. +if (!isShardMergeEnabled(donorRstShard.getPrimary().getDB("admin"))) { + jsTestLog("Skipping Shard Merge-specific test"); + st.stop(); + quit(); +} + +const recipientRst = new ReplSetTest({nodes: 1}); +recipientRst.startSet(); +recipientRst.initiate(); + +const tenantMigrationTest = + new TenantMigrationTest({name: jsTestName(), donorRst: donorRstShard, recipientRst}); + +// Run tenant migration commands on config servers. +let donorPrimary = donorRstConfig.getPrimary(); + +let cmdObj = { + donorStartMigration: 1, + tenantIds: [ObjectId()], + migrationId: UUID(), + protocol: "shard merge", + recipientConnectionString: tenantMigrationTest.getRecipientConnString(), + readPreference: {mode: "primary"} +}; +assert.commandFailedWithCode(donorPrimary.adminCommand(cmdObj), ErrorCodes.IllegalOperation); + +cmdObj = { + donorForgetMigration: 1, + migrationId: UUID() +}; +assert.commandFailedWithCode(donorPrimary.adminCommand(cmdObj), ErrorCodes.IllegalOperation); + +cmdObj = { + donorAbortMigration: 1, + migrationId: UUID() +}; +assert.commandFailedWithCode(donorPrimary.adminCommand(cmdObj), ErrorCodes.IllegalOperation); + +cmdObj = { + recipientSyncData: 1, + migrationId: UUID(), + donorConnectionString: tenantMigrationTest.getRecipientRst().getURL(), + tenantIds: [ObjectId()], + protocol: "shard merge", + readPreference: {mode: "primary"}, + startMigrationDonorTimestamp: Timestamp(1, 1) +}; +assert.commandFailedWithCode(donorPrimary.adminCommand(cmdObj), ErrorCodes.IllegalOperation); + +cmdObj = { + recipientForgetMigration: 1, + migrationId: UUID(), + donorConnectionString: tenantMigrationTest.getRecipientRst().getURL(), + tenantIds: [ObjectId()], + protocol: "shard merge", + readPreference: {mode: "primary"}, +}; +assert.commandFailedWithCode(donorPrimary.adminCommand(cmdObj), ErrorCodes.IllegalOperation); + +tenantMigrationTest.stop(); +recipientRst.stopSet(); +st.stop(); +})(); diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 1063f38a494..3c32e5da968 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -2344,6 +2344,7 @@ env.Library( 'repl/repl_coordinator_impl', 'repl/replication_recovery', 'repl/serveronly_repl', + 'repl/shard_merge_recipient_service', 'repl/storage_interface_impl', 'repl/tenant_migration_donor_service', 'repl/tenant_migration_recipient_service', diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index c25d3ed82dd..3311d717568 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -599,6 +599,7 @@ env.Library( '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', '$BUILD_DIR/mongo/db/repl/repl_server_parameters', '$BUILD_DIR/mongo/db/repl/replica_set_messages', + '$BUILD_DIR/mongo/db/repl/shard_merge_recipient_service', '$BUILD_DIR/mongo/db/repl/tenant_migration_donor_service', '$BUILD_DIR/mongo/db/repl/tenant_migration_recipient_service', '$BUILD_DIR/mongo/db/rw_concern_d', diff --git a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp index 0118fd3dc30..4de2aaf792b 100644 --- a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp +++ b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp @@ -64,6 +64,7 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_set_config.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/shard_merge_recipient_service.h" #include "mongo/db/repl/tenant_migration_donor_service.h" #include "mongo/db/repl/tenant_migration_recipient_service.h" #include "mongo/db/s/config/sharding_catalog_manager.h" @@ -1281,6 +1282,12 @@ private: repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()) ->lookupServiceByName(ShardSplitDonorService::kServiceName)); splitDonorService->abortAllSplits(opCtx); + + auto mergeRecipientService = checked_cast<repl::ShardMergeRecipientService*>( + repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()) + ->lookupServiceByName( + repl::ShardMergeRecipientService::kShardMergeRecipientServiceName)); + mergeRecipientService->abortAllMigrations(opCtx); } } } diff --git a/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp b/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp index 96b04d272bc..ff7c94df325 100644 --- a/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp +++ b/src/mongo/db/commands/tenant_migration_recipient_cmds.cpp @@ -34,6 +34,7 @@ #include "mongo/db/feature_compatibility_version_parser.h" #include "mongo/db/repl/primary_only_service.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/shard_merge_recipient_service.h" #include "mongo/db/repl/tenant_migration_recipient_service.h" #include "mongo/logv2/log.h" @@ -43,39 +44,6 @@ namespace mongo { namespace { -// This function requires the definition of MigrationDecision. It cannot be moved to -// tenant_migration_util.h as this would cause a dependency cycle. -inline TenantMigrationRecipientStateEnum protocolCheckRecipientForgetDecision( - const MigrationProtocolEnum protocol, const boost::optional<MigrationDecisionEnum>& decision) { - switch (protocol) { - case MigrationProtocolEnum::kShardMerge: { - uassert(ErrorCodes::InvalidOptions, - str::stream() << "'decision' is required for protocol '" - << MigrationProtocol_serializer(protocol) << "'", - decision.has_value()); - - // For 'shard merge', return 'kAborted' or 'kCommitted' to allow garbage collection to - // proceed. - if (decision == MigrationDecisionEnum::kCommitted) { - return TenantMigrationRecipientStateEnum::kCommitted; - } else { - return TenantMigrationRecipientStateEnum::kAborted; - } - } - case MigrationProtocolEnum::kMultitenantMigrations: { - // For 'multitenant migration' simply return kDone. - uassert(ErrorCodes::InvalidOptions, - str::stream() << "'decision' must be empty for protocol '" - << MigrationProtocol_serializer(protocol) << "'", - !decision.has_value()); - return TenantMigrationRecipientStateEnum::kDone; - } - default: - MONGO_UNREACHABLE; - } -} - - MONGO_FAIL_POINT_DEFINE(returnResponseOkForRecipientSyncDataCmd); MONGO_FAIL_POINT_DEFINE(returnResponseOkForRecipientForgetMigrationCmd); @@ -117,13 +85,33 @@ public: tenant_migration_util::protocolReadPreferenceCompatibilityCheck( opCtx, migrationProtocol, cmd.getReadPreference()); - // tenantId will be set to empty string for the "shard merge" protocol. + if (MONGO_unlikely(returnResponseOkForRecipientSyncDataCmd.shouldFail())) { + LOGV2(4879608, + "Immediately returning OK because failpoint is enabled.", + "migrationId"_attr = cmd.getMigrationId(), + "fpName"_attr = returnResponseOkForRecipientSyncDataCmd.getName()); + return Response(repl::OpTime()); + } + + switch (migrationProtocol) { + case MigrationProtocolEnum::kMultitenantMigrations: + return _handleMTMRecipientSyncDataCmd(opCtx, cmd); + case MigrationProtocolEnum::kShardMerge: + return _handleShardMergeRecipientSyncDataCmd(opCtx, cmd); + default: + MONGO_UNREACHABLE; + } + + MONGO_UNREACHABLE; + } + + private: + Response _handleMTMRecipientSyncDataCmd(OperationContext* opCtx, const Request& cmd) { TenantMigrationRecipientDocument stateDoc(cmd.getMigrationId(), cmd.getDonorConnectionString().toString(), - tenantId.value_or("").toString(), + cmd.getTenantId()->toString(), cmd.getStartMigrationDonorTimestamp(), cmd.getReadPreference()); - stateDoc.setTenantIds(tenantIds); if (!repl::tenantMigrationDisableX509Auth) { uassert(ErrorCodes::InvalidOptions, @@ -133,37 +121,54 @@ public: stateDoc.setRecipientCertificateForDonor(cmd.getRecipientCertificateForDonor()); } - stateDoc.setProtocol(migrationProtocol); - - const auto stateDocBson = stateDoc.toBSON(); - - if (MONGO_unlikely(returnResponseOkForRecipientSyncDataCmd.shouldFail())) { - LOGV2(4879608, - "Immediately returning OK because 'returnResponseOkForRecipientSyncDataCmd' " - "failpoint is enabled.", - "tenantMigrationRecipientInstance"_attr = stateDoc.toBSON()); - return Response(repl::OpTime()); - } + stateDoc.setProtocol(MigrationProtocolEnum::kMultitenantMigrations); auto recipientService = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()) ->lookupServiceByName(repl::TenantMigrationRecipientService:: kTenantMigrationRecipientServiceName); auto recipientInstance = repl::TenantMigrationRecipientService::Instance::getOrCreate( - opCtx, recipientService, stateDocBson); + opCtx, recipientService, stateDoc.toBSON()); auto returnAfterReachingDonorTs = cmd.getReturnAfterReachingDonorTimestamp(); - if (!returnAfterReachingDonorTs) { - return Response(recipientInstance->waitUntilMigrationReachesConsistentState(opCtx)); + return returnAfterReachingDonorTs + ? Response(recipientInstance->waitUntilMigrationReachesReturnAfterReachingTimestamp( + opCtx, *returnAfterReachingDonorTs)) + : Response(recipientInstance->waitUntilMigrationReachesConsistentState(opCtx)); + } + + Response _handleShardMergeRecipientSyncDataCmd(OperationContext* opCtx, + const Request& cmd) { + ShardMergeRecipientDocument stateDoc(cmd.getMigrationId(), + cmd.getDonorConnectionString().toString(), + *cmd.getTenantIds(), + cmd.getStartMigrationDonorTimestamp(), + cmd.getReadPreference()); + + if (!repl::tenantMigrationDisableX509Auth) { + uassert(ErrorCodes::InvalidOptions, + str::stream() << "'" << Request::kRecipientCertificateForDonorFieldName + << "' is a required field", + cmd.getRecipientCertificateForDonor()); + stateDoc.setRecipientCertificateForDonor(cmd.getRecipientCertificateForDonor()); } - return Response( - recipientInstance->waitUntilMigrationReachesReturnAfterReachingTimestamp( - opCtx, *returnAfterReachingDonorTs)); + auto recipientService = + repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()) + ->lookupServiceByName( + repl::ShardMergeRecipientService::kShardMergeRecipientServiceName); + auto recipientInstance = repl::ShardMergeRecipientService::Instance::getOrCreate( + opCtx, recipientService, stateDoc.toBSON()); + + auto returnAfterReachingDonorTs = cmd.getReturnAfterReachingDonorTimestamp(); + + return returnAfterReachingDonorTs + ? Response(recipientInstance->waitUntilMigrationReachesReturnAfterReachingTimestamp( + opCtx, *returnAfterReachingDonorTs)) + : Response(recipientInstance->waitUntilMigrationReachesConsistentState(opCtx)); } - private: void doCheckAuthorization(OperationContext* opCtx) const final { uassert(ErrorCodes::Unauthorized, "Unauthorized", @@ -233,9 +238,9 @@ public: auto recipientService = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()) - ->lookupServiceByName(repl::TenantMigrationRecipientService:: - kTenantMigrationRecipientServiceName); - auto instance = repl::TenantMigrationRecipientService::Instance::lookup( + ->lookupServiceByName( + repl::ShardMergeRecipientService::kShardMergeRecipientServiceName); + auto instance = repl::ShardMergeRecipientService::Instance::lookup( opCtx, recipientService, BSON("_id" << cmd.getMigrationId())); uassert(8423340, "Unknown migrationId", instance); (*instance)->onMemberImportedFiles(cmd.getFrom(), cmd.getSuccess(), cmd.getReason()); @@ -264,6 +269,12 @@ class RecipientForgetMigrationCmd : public TypedCommand<RecipientForgetMigration public: using Request = RecipientForgetMigration; + // We may not have a document if recipientForgetMigration is received before + // recipientSyncData. But even if that's the case, we still need to create an instance + // and persist a state document that's marked garbage collectable (which is done by the + // main chain). + static inline const Timestamp kUnusedStartMigrationTimestamp{1, 1}; + std::set<StringData> sensitiveFieldNames() const final { return {Request::kRecipientCertificateForDonorFieldName}; } @@ -286,28 +297,39 @@ public: tenant_migration_util::protocolTenantIdCompatibilityCheck(migrationProtocol, tenantId); tenant_migration_util::protocolTenantIdsCompatibilityCheck(migrationProtocol, tenantIds); - auto nextState = - protocolCheckRecipientForgetDecision(migrationProtocol, cmd.getDecision()); + tenant_migration_util::protocolCheckRecipientForgetDecision(migrationProtocol, + cmd.getDecision()); + + if (MONGO_unlikely(returnResponseOkForRecipientForgetMigrationCmd.shouldFail())) { + LOGV2(5949502, + "Immediately returning ok because failpoint is enabled", + "migrationId"_attr = cmd.getMigrationId(), + "fpName"_attr = returnResponseOkForRecipientForgetMigrationCmd.getName()); + return; + } opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE(); - auto recipientService = - repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()) - ->lookupServiceByName(repl::TenantMigrationRecipientService:: - kTenantMigrationRecipientServiceName); - // We may not have a document if recipientForgetMigration is received before - // recipientSyncData. But even if that's the case, we still need to create an instance - // and persist a state document that's marked garbage collectable (which is done by the - // main chain). - const Timestamp kUnusedStartMigrationTimestamp(1, 1); + switch (migrationProtocol) { + case MigrationProtocolEnum::kMultitenantMigrations: + return _handleMTMRecipientForgetMigrationCmd(opCtx, cmd); + case MigrationProtocolEnum::kShardMerge: + return _handleShardMergeRecipientForgetMigrationCmd(opCtx, cmd); + default: + MONGO_UNREACHABLE; + } - // tenantId will be set to empty string for the "shard merge" protocol. + MONGO_UNREACHABLE; + } + + private: + void _handleMTMRecipientForgetMigrationCmd(OperationContext* opCtx, const Request& cmd) { TenantMigrationRecipientDocument stateDoc(cmd.getMigrationId(), cmd.getDonorConnectionString().toString(), - tenantId.value_or("").toString(), + cmd.getTenantId()->toString(), kUnusedStartMigrationTimestamp, cmd.getReadPreference()); - stateDoc.setTenantIds(tenantIds); + if (!repl::tenantMigrationDisableX509Auth) { uassert(ErrorCodes::InvalidOptions, str::stream() << "'" << Request::kRecipientCertificateForDonorFieldName @@ -315,30 +337,60 @@ public: cmd.getRecipientCertificateForDonor()); stateDoc.setRecipientCertificateForDonor(cmd.getRecipientCertificateForDonor()); } - stateDoc.setProtocol(migrationProtocol); - // Set the state to 'kDone' for 'multitenant migration' or 'kCommitted'/'kAborted' for - // 'shard merge' so that we don't create a recipient access blocker unnecessarily if - // this recipientForgetMigration command is received before a recipientSyncData command - // or after the state doc is garbage collected. - stateDoc.setState(nextState); - if (MONGO_unlikely(returnResponseOkForRecipientForgetMigrationCmd.shouldFail())) { - LOGV2(5949502, - "Immediately returning ok because " - "'returnResponseOkForRecipientForgetMigrationCmd' failpoint is enabled", - "tenantMigrationRecipientInstance"_attr = stateDoc.toBSON()); - return; - } + stateDoc.setProtocol(MigrationProtocolEnum::kMultitenantMigrations); + // Set the state to 'kDone' so that we don't create a recipient access blocker + // unnecessarily if this recipientForgetMigration command is received before a + // recipientSyncData command or after the state doc is garbage collected. + stateDoc.setState(TenantMigrationRecipientStateEnum::kDone); + auto recipientService = + repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()) + ->lookupServiceByName(repl::TenantMigrationRecipientService:: + kTenantMigrationRecipientServiceName); auto recipientInstance = repl::TenantMigrationRecipientService::Instance::getOrCreate( opCtx, recipientService, stateDoc.toBSON(), false); // Instruct the instance run() function to mark this migration garbage collectable. - recipientInstance->onReceiveRecipientForgetMigration(opCtx, nextState); + recipientInstance->onReceiveRecipientForgetMigration( + opCtx, TenantMigrationRecipientStateEnum::kDone); + recipientInstance->getForgetMigrationDurableFuture().get(opCtx); + } + + void _handleShardMergeRecipientForgetMigrationCmd(OperationContext* opCtx, + const Request& cmd) { + ShardMergeRecipientDocument stateDoc(cmd.getMigrationId(), + cmd.getDonorConnectionString().toString(), + *cmd.getTenantIds(), + kUnusedStartMigrationTimestamp, + cmd.getReadPreference()); + + if (!repl::tenantMigrationDisableX509Auth) { + uassert(ErrorCodes::InvalidOptions, + str::stream() << "'" << Request::kRecipientCertificateForDonorFieldName + << "' is a required field", + cmd.getRecipientCertificateForDonor()); + stateDoc.setRecipientCertificateForDonor(cmd.getRecipientCertificateForDonor()); + } + + // Set 'startGarbageCollect' true to not start a migration (and install access blocker + // or get serverless lock) unncessarily if this recipientForgetMigration command is + // received before a recipientSyncData command or after the state doc is garbage + // collected. + stateDoc.setStartGarbageCollect(true); + + auto recipientService = + repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()) + ->lookupServiceByName( + repl::ShardMergeRecipientService::kShardMergeRecipientServiceName); + auto recipientInstance = repl::ShardMergeRecipientService::Instance::getOrCreate( + opCtx, recipientService, stateDoc.toBSON(), false); + + // Instruct the instance run() function to mark this migration garbage collectable. + recipientInstance->onReceiveRecipientForgetMigration(opCtx, *cmd.getDecision()); recipientInstance->getForgetMigrationDurableFuture().get(opCtx); } - private: void doCheckAuthorization(OperationContext* opCtx) const final { uassert(ErrorCodes::Unauthorized, "Unauthorized", diff --git a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl index f3477f0fb4d..0a2f5cd11c7 100644 --- a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl +++ b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl @@ -42,14 +42,6 @@ imports: - "mongo/db/repl/replication_types.idl" - "mongo/util/net/hostandport.idl" -enums: - MigrationDecision: - description: "Whether the migration committed or aborted." - type: string - values: - kCommitted: "committed" - kAborted: "aborted" - structs: recipientSyncDataResponse: description: "Response for the 'recipientSyncData' command" diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index 710d7ee5990..811ffca829c 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -125,6 +125,8 @@ #include "mongo/db/repl/replication_coordinator_impl_gen.h" #include "mongo/db/repl/replication_process.h" #include "mongo/db/repl/replication_recovery.h" +#include "mongo/db/repl/shard_merge_recipient_op_observer.h" +#include "mongo/db/repl/shard_merge_recipient_service.h" #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/repl/tenant_migration_donor_op_observer.h" #include "mongo/db/repl/tenant_migration_donor_service.h" @@ -377,6 +379,9 @@ void registerPrimaryOnlyServices(ServiceContext* serviceContext) { services.push_back(std::make_unique<ReshardingRecipientService>(serviceContext)); services.push_back(std::make_unique<TenantMigrationDonorService>(serviceContext)); services.push_back(std::make_unique<repl::TenantMigrationRecipientService>(serviceContext)); + if (getGlobalReplSettings().isServerless()) { + services.push_back(std::make_unique<repl::ShardMergeRecipientService>(serviceContext)); + } } if (serverGlobalParams.clusterRole == ClusterRole::None) { @@ -384,6 +389,7 @@ void registerPrimaryOnlyServices(ServiceContext* serviceContext) { services.push_back(std::make_unique<repl::TenantMigrationRecipientService>(serviceContext)); if (getGlobalReplSettings().isServerless()) { services.push_back(std::make_unique<ShardSplitDonorService>(serviceContext)); + services.push_back(std::make_unique<repl::ShardMergeRecipientService>(serviceContext)); } } @@ -1233,6 +1239,8 @@ void setUpObservers(ServiceContext* serviceContext) { opObserverRegistry->addObserver(std::make_unique<UserWriteBlockModeOpObserver>()); if (getGlobalReplSettings().isServerless()) { opObserverRegistry->addObserver(std::make_unique<ShardSplitDonorOpObserver>()); + opObserverRegistry->addObserver( + std::make_unique<repl::ShardMergeRecipientOpObserver>()); } } @@ -1257,6 +1265,8 @@ void setUpObservers(ServiceContext* serviceContext) { opObserverRegistry->addObserver(std::make_unique<UserWriteBlockModeOpObserver>()); if (getGlobalReplSettings().isServerless()) { opObserverRegistry->addObserver(std::make_unique<ShardSplitDonorOpObserver>()); + opObserverRegistry->addObserver( + std::make_unique<repl::ShardMergeRecipientOpObserver>()); } } diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index a15cd37fac1..8965f0bcc98 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -157,7 +157,7 @@ bool NamespaceString::isLegalClientSystemNS( * Process updates to 'admin.system.version' individually as well so the secondary's FCV when * processing each operation matches the primary's when committing that operation. * - * Process updates to 'config.tenantMigrationRecipients' individually so they serialize after + * Process updates to 'config.shardMergeRecipients' individually so they serialize after * inserts into 'config.donatedFiles.<migrationId>'. * * Oplog entries on 'config.shards' should be processed one at a time, otherwise the in-memory state @@ -170,7 +170,7 @@ bool NamespaceString::mustBeAppliedInOwnOplogBatch() const { return isSystemDotViews() || isServerConfigurationCollection() || isPrivilegeCollection() || _ns == kDonorReshardingOperationsNamespace.ns() || _ns == kForceOplogBatchBoundaryNamespace.ns() || - _ns == kTenantMigrationDonorsNamespace.ns() || + _ns == kTenantMigrationDonorsNamespace.ns() || _ns == kShardMergeRecipientsNamespace.ns() || _ns == kTenantMigrationRecipientsNamespace.ns() || _ns == kShardSplitDonorsNamespace.ns() || _ns == kConfigsvrShardsNamespace.ns(); } diff --git a/src/mongo/db/namespace_string_reserved.def.h b/src/mongo/db/namespace_string_reserved.def.h index 30081a9a5d1..0c33040bb00 100644 --- a/src/mongo/db/namespace_string_reserved.def.h +++ b/src/mongo/db/namespace_string_reserved.def.h @@ -98,6 +98,9 @@ NSS_CONSTANT(kTenantMigrationRecipientsNamespace, DatabaseName::kConfig, "tenantMigrationRecipients"_sd) +// Namespace for storing the persisted state of shard merge recipient service instances. +NSS_CONSTANT(kShardMergeRecipientsNamespace, DatabaseName::kConfig, "shardMergeRecipients"_sd) + // Namespace for view on local.oplog.rs for tenant migrations. NSS_CONSTANT(kTenantMigrationOplogView, DatabaseName::kLocal, "system.tenantMigration.oplogView"_sd) diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index ec1d257cf25..6a3b2e54e3b 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1406,6 +1406,47 @@ env.Library( ) env.Library( + target='shard_merge_recipient_service', + source=[ + 'shard_merge_recipient_op_observer.cpp', + 'shard_merge_recipient_service.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/client/fetcher', + '$BUILD_DIR/mongo/client/read_preference', + '$BUILD_DIR/mongo/db/catalog/commit_quorum_options', + '$BUILD_DIR/mongo/db/vector_clock_mutable', + 'tenant_migration_access_blocker', + 'tenant_migration_statistics', + 'tenant_migration_utils', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/client/clientdriver_network', + '$BUILD_DIR/mongo/db/catalog/collection_crud', + '$BUILD_DIR/mongo/db/catalog/local_oplog_info', + '$BUILD_DIR/mongo/db/concurrency/exception_util', + '$BUILD_DIR/mongo/db/ops/write_ops_exec', + '$BUILD_DIR/mongo/db/pipeline/process_interface/mongo_process_interface', + '$BUILD_DIR/mongo/db/serverless/serverless_lock', + '$BUILD_DIR/mongo/db/session/session_catalog_mongod', + '$BUILD_DIR/mongo/db/transaction/transaction', + 'cloner_utils', + 'oplog', + 'oplog_buffer_collection', + 'oplog_entry', + 'oplog_fetcher', + 'oplog_interface_local', + 'primary_only_service', + 'repl_server_parameters', + 'replica_set_aware_service', + 'replication_auth', + 'tenant_migration_cloners', + 'tenant_migration_state_machine_idl', + 'tenant_oplog_processing', + ], +) + +env.Library( target='tenant_migration_recipient_service', source=[ 'tenant_migration_recipient_op_observer.cpp', @@ -1430,8 +1471,6 @@ env.Library( '$BUILD_DIR/mongo/db/serverless/serverless_lock', '$BUILD_DIR/mongo/db/session/session_catalog_mongod', '$BUILD_DIR/mongo/db/transaction/transaction', - '$BUILD_DIR/mongo/executor/scoped_task_executor', - '$BUILD_DIR/mongo/idl/cluster_parameter_synchronization_helpers', 'cloner_utils', 'oplog', 'oplog_buffer_collection', diff --git a/src/mongo/db/repl/shard_merge_recipient_op_observer.cpp b/src/mongo/db/repl/shard_merge_recipient_op_observer.cpp new file mode 100644 index 00000000000..ad6c3c5d5ac --- /dev/null +++ b/src/mongo/db/repl/shard_merge_recipient_op_observer.cpp @@ -0,0 +1,363 @@ +/** + * Copyright (C) 2023-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + + +#include "mongo/db/repl/shard_merge_recipient_op_observer.h" + +#include <fmt/format.h> + +#include "mongo/db/multitenancy_gen.h" +#include "mongo/db/repl/shard_merge_recipient_service.h" +#include "mongo/db/repl/tenant_file_importer_service.h" +#include "mongo/db/repl/tenant_migration_access_blocker_util.h" +#include "mongo/db/repl/tenant_migration_decoration.h" +#include "mongo/db/repl/tenant_migration_recipient_access_blocker.h" +#include "mongo/db/repl/tenant_migration_shard_merge_util.h" +#include "mongo/db/repl/tenant_migration_state_machine_gen.h" +#include "mongo/db/repl/tenant_migration_util.h" +#include "mongo/db/serverless/serverless_operation_lock_registry.h" +#include "mongo/logv2/log.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication + +namespace mongo::repl { +using namespace fmt; +namespace { +void onShardMergeRecipientsNssInsert(OperationContext* opCtx, + std::vector<InsertStatement>::const_iterator first, + std::vector<InsertStatement>::const_iterator last) { + if (tenant_migration_access_blocker::inRecoveryMode(opCtx)) + return; + + for (auto it = first; it != last; it++) { + auto recipientStateDoc = + ShardMergeRecipientDocument::parse(IDLParserContext("recipientStateDoc"), it->doc); + switch (recipientStateDoc.getState()) { + case ShardMergeRecipientStateEnum::kStarted: { + invariant(!recipientStateDoc.getStartGarbageCollect()); + + auto migrationId = recipientStateDoc.getId(); + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .acquireLock(ServerlessOperationLockRegistry::LockType::kMergeRecipient, + migrationId); + + auto mtab = std::make_shared<TenantMigrationRecipientAccessBlocker>( + opCtx->getServiceContext(), migrationId); + TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) + .add(recipientStateDoc.getTenantIds(), mtab); + + opCtx->recoveryUnit()->onRollback([migrationId](OperationContext* opCtx) { + TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) + .removeAccessBlockersForMigration( + migrationId, TenantMigrationAccessBlocker::BlockerType::kRecipient); + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .releaseLock(ServerlessOperationLockRegistry::LockType::kMergeRecipient, + migrationId); + }); + + opCtx->recoveryUnit()->onCommit([migrationId](OperationContext* opCtx, auto _) { + repl::TenantFileImporterService::get(opCtx)->startMigration(migrationId); + }); + } break; + case ShardMergeRecipientStateEnum::kCommitted: + case ShardMergeRecipientStateEnum::kAborted: + invariant(recipientStateDoc.getStartGarbageCollect()); + break; + default: + MONGO_UNREACHABLE; + } + } +} + +void onDonatedFilesCollNssInsert(OperationContext* opCtx, + std::vector<InsertStatement>::const_iterator first, + std::vector<InsertStatement>::const_iterator last) { + if (tenant_migration_access_blocker::inRecoveryMode(opCtx)) + return; + + for (auto it = first; it != last; it++) { + const auto& metadataDoc = it->doc; + auto migrationId = + uassertStatusOK(UUID::parse(metadataDoc[shard_merge_utils::kMigrationIdFieldName])); + repl::TenantFileImporterService::get(opCtx)->learnedFilename(migrationId, metadataDoc); + } +} + +void assertStateTransitionIsValid(ShardMergeRecipientStateEnum prevState, + ShardMergeRecipientStateEnum nextState) { + + auto validPrevStates = [&]() -> stdx::unordered_set<ShardMergeRecipientStateEnum> { + switch (nextState) { + case ShardMergeRecipientStateEnum::kStarted: + return {ShardMergeRecipientStateEnum::kStarted}; + case ShardMergeRecipientStateEnum::kLearnedFilenames: + return {ShardMergeRecipientStateEnum::kStarted, + ShardMergeRecipientStateEnum::kLearnedFilenames}; + case ShardMergeRecipientStateEnum::kConsistent: + return {ShardMergeRecipientStateEnum::kLearnedFilenames, + ShardMergeRecipientStateEnum::kConsistent}; + case ShardMergeRecipientStateEnum::kCommitted: + return {ShardMergeRecipientStateEnum::kConsistent, + ShardMergeRecipientStateEnum::kCommitted}; + case ShardMergeRecipientStateEnum::kAborted: + return {ShardMergeRecipientStateEnum::kStarted, + ShardMergeRecipientStateEnum::kLearnedFilenames, + ShardMergeRecipientStateEnum::kConsistent, + ShardMergeRecipientStateEnum::kAborted}; + default: + MONGO_UNREACHABLE; + } + }(); + + uassert(7339766, "Invalid state transition", validPrevStates.contains(prevState)); +} + +void onTransitioningToLearnedFilenames(OperationContext* opCtx, + const ShardMergeRecipientDocument& recipientStateDoc) { + opCtx->recoveryUnit()->onCommit( + [migrationId = recipientStateDoc.getId()](OperationContext* opCtx, auto _) { + repl::TenantFileImporterService::get(opCtx)->learnedAllFilenames(migrationId); + }); +} + +void onTransitioningToConsistent(OperationContext* opCtx, + const ShardMergeRecipientDocument& recipientStateDoc) { + if (recipientStateDoc.getRejectReadsBeforeTimestamp()) { + opCtx->recoveryUnit()->onCommit([recipientStateDoc](OperationContext* opCtx, auto _) { + auto mtab = tenant_migration_access_blocker::getRecipientAccessBlockerForMigration( + opCtx->getServiceContext(), recipientStateDoc.getId()); + invariant(mtab); + mtab->startRejectingReadsBefore( + recipientStateDoc.getRejectReadsBeforeTimestamp().get()); + }); + } +} + +void onTransitioningToCommitted(OperationContext* opCtx, + const ShardMergeRecipientDocument& recipientStateDoc) { + auto migrationId = recipientStateDoc.getId(); + // It's safe to do interrupt outside of onCommit hook as the decision to forget a migration or + // the migration decision is not reversible. + repl::TenantFileImporterService::get(opCtx)->interrupt(migrationId); + + auto markedGCAfterMigrationStart = [&] { + return !recipientStateDoc.getStartGarbageCollect() && recipientStateDoc.getExpireAt(); + }(); + + if (markedGCAfterMigrationStart) { + opCtx->recoveryUnit()->onCommit([migrationId](OperationContext* opCtx, auto _) { + auto mtab = tenant_migration_access_blocker::getRecipientAccessBlockerForMigration( + opCtx->getServiceContext(), migrationId); + invariant(mtab); + // Once the migration is committed and state doc is marked garbage collectable, + // the TTL deletions should be unblocked for the imported donor collections. + mtab->stopBlockingTTL(); + + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .releaseLock(ServerlessOperationLockRegistry::LockType::kMergeRecipient, + migrationId); + }); + } +} + +void onTransitioningToAborted(OperationContext* opCtx, + const ShardMergeRecipientDocument& recipientStateDoc) { + auto migrationId = recipientStateDoc.getId(); + // It's safe to do interrupt outside of onCommit hook as the decision to forget a migration or + // the migration decision is not reversible. + repl::TenantFileImporterService::get(opCtx)->interrupt(migrationId); + + auto markedGCAfterMigrationStart = [&] { + return !recipientStateDoc.getStartGarbageCollect() && recipientStateDoc.getExpireAt(); + }(); + + if (markedGCAfterMigrationStart) { + opCtx->recoveryUnit()->onCommit([migrationId](OperationContext* opCtx, auto _) { + // Remove access blocker and release locks to allow faster migration retry. + // (Note: Not needed to unblock TTL deletions as we would have already dropped all + // imported donor collections immediately on transitioning to `kAborted`). + + TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) + .removeAccessBlockersForMigration( + migrationId, TenantMigrationAccessBlocker::BlockerType::kRecipient); + + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .releaseLock(ServerlessOperationLockRegistry::LockType::kMergeRecipient, + migrationId); + }); + } +} +} // namespace + +void ShardMergeRecipientOpObserver::onCreateCollection(OperationContext* opCtx, + const CollectionPtr& coll, + const NamespaceString& collectionName, + const CollectionOptions& options, + const BSONObj& idIndex, + const OplogSlot& createOpTime, + bool fromMigrate) { + if (!shard_merge_utils::isDonatedFilesCollection(collectionName) || + tenant_migration_access_blocker::inRecoveryMode(opCtx)) { + return; + } + + auto collString = collectionName.coll().toString(); + auto migrationUUID = uassertStatusOK(UUID::parse(collString.substr(collString.find('.') + 1))); + auto fileClonerTempDirPath = shard_merge_utils::fileClonerTempDir(migrationUUID); + + boost::system::error_code ec; + bool createdNewDir = boost::filesystem::create_directory(fileClonerTempDirPath, ec); + uassert(7339767, + str::stream() << "Failed to create WT temp directory:: " + << fileClonerTempDirPath.generic_string() << ", Error:: " << ec.message(), + !ec); + uassert(7339768, str::stream() << "WT temp directory already exists", !createdNewDir); +} + +void ShardMergeRecipientOpObserver::onInserts(OperationContext* opCtx, + const CollectionPtr& coll, + std::vector<InsertStatement>::const_iterator first, + std::vector<InsertStatement>::const_iterator last, + bool fromMigrate) { + if (coll->ns() == NamespaceString::kShardMergeRecipientsNamespace) { + onShardMergeRecipientsNssInsert(opCtx, first, last); + return; + } + + if (shard_merge_utils::isDonatedFilesCollection(coll->ns())) { + onDonatedFilesCollNssInsert(opCtx, first, last); + return; + } +} + +void ShardMergeRecipientOpObserver::onUpdate(OperationContext* opCtx, + const OplogUpdateEntryArgs& args) { + + if (args.coll->ns() != NamespaceString::kShardMergeRecipientsNamespace || + tenant_migration_access_blocker::inRecoveryMode(opCtx)) { + return; + } + + auto prevState = ShardMergeRecipientState_parse( + IDLParserContext("preImageRecipientStateDoc"), + args.updateArgs->preImageDoc[ShardMergeRecipientDocument::kStateFieldName] + .valueStringData()); + auto recipientStateDoc = ShardMergeRecipientDocument::parse( + IDLParserContext("recipientStateDoc"), args.updateArgs->updatedDoc); + auto nextState = recipientStateDoc.getState(); + + assertStateTransitionIsValid(prevState, nextState); + + switch (nextState) { + case ShardMergeRecipientStateEnum::kStarted: + break; + case ShardMergeRecipientStateEnum::kLearnedFilenames: + onTransitioningToLearnedFilenames(opCtx, recipientStateDoc); + break; + case ShardMergeRecipientStateEnum::kConsistent: + onTransitioningToConsistent(opCtx, recipientStateDoc); + break; + case ShardMergeRecipientStateEnum::kCommitted: + onTransitioningToCommitted(opCtx, recipientStateDoc); + break; + case ShardMergeRecipientStateEnum::kAborted: + onTransitioningToAborted(opCtx, recipientStateDoc); + break; + default: + MONGO_UNREACHABLE; + } +} + +void ShardMergeRecipientOpObserver::aboutToDelete(OperationContext* opCtx, + const CollectionPtr& coll, + BSONObj const& doc) { + if (coll->ns() != NamespaceString::kShardMergeRecipientsNamespace || + tenant_migration_access_blocker::inRecoveryMode(opCtx)) { + return; + } + + auto recipientStateDoc = + ShardMergeRecipientDocument::parse(IDLParserContext("recipientStateDoc"), doc); + + bool isDocMarkedGarbageCollectable = [&] { + auto state = recipientStateDoc.getState(); + auto expireAtIsSet = recipientStateDoc.getExpireAt().has_value(); + invariant(!expireAtIsSet || state == ShardMergeRecipientStateEnum::kCommitted || + state == ShardMergeRecipientStateEnum::kAborted); + return expireAtIsSet; + }(); + + uassert(ErrorCodes::IllegalOperation, + str::stream() << "Cannot delete the recipient state document " + << " since it has not been marked as garbage collectable: " + << tenant_migration_util::redactStateDoc(recipientStateDoc.toBSON()), + isDocMarkedGarbageCollectable); + + tenantMigrationInfo(opCtx) = TenantMigrationInfo(recipientStateDoc.getId()); +} + +void ShardMergeRecipientOpObserver::onDelete(OperationContext* opCtx, + const CollectionPtr& coll, + StmtId stmtId, + const OplogDeleteEntryArgs& args) { + if (coll->ns() != NamespaceString::kShardMergeRecipientsNamespace || + tenant_migration_access_blocker::inRecoveryMode(opCtx)) { + return; + } + + if (auto tmi = tenantMigrationInfo(opCtx)) { + opCtx->recoveryUnit()->onCommit([migrationId = tmi->uuid](OperationContext* opCtx, auto _) { + LOGV2_INFO(7339765, + "Removing expired recipient access blocker", + "migrationId"_attr = migrationId); + TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) + .removeAccessBlockersForMigration( + migrationId, TenantMigrationAccessBlocker::BlockerType::kRecipient); + }); + } +} + +repl::OpTime ShardMergeRecipientOpObserver::onDropCollection(OperationContext* opCtx, + const NamespaceString& collectionName, + const UUID& uuid, + std::uint64_t numRecords, + const CollectionDropType dropType) { + if (collectionName == NamespaceString::kShardMergeRecipientsNamespace && + !tenant_migration_access_blocker::inRecoveryMode(opCtx)) { + + uassert(ErrorCodes::IllegalOperation, + str::stream() << "Cannot drop " + << NamespaceString::kShardMergeRecipientsNamespace.ns() + << " collection as it is not empty", + !numRecords); + } + return OpTime(); +} + +} // namespace mongo::repl diff --git a/src/mongo/db/repl/shard_merge_recipient_op_observer.h b/src/mongo/db/repl/shard_merge_recipient_op_observer.h new file mode 100644 index 00000000000..9adeca87d05 --- /dev/null +++ b/src/mongo/db/repl/shard_merge_recipient_op_observer.h @@ -0,0 +1,252 @@ +/** + * Copyright (C) 2023-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/op_observer/op_observer.h" + +namespace mongo::repl { + +/** + * OpObserver for shard merge recipient. + */ +class ShardMergeRecipientOpObserver final : public OpObserver { + ShardMergeRecipientOpObserver(const ShardMergeRecipientOpObserver&) = delete; + ShardMergeRecipientOpObserver& operator=(const ShardMergeRecipientOpObserver&) = delete; + +public: + ShardMergeRecipientOpObserver() = default; + ~ShardMergeRecipientOpObserver() = default; + + void onModifyCollectionShardingIndexCatalog(OperationContext* opCtx, + const NamespaceString& nss, + const UUID& uuid, + BSONObj indexDoc) final {} + + void onCreateGlobalIndex(OperationContext* opCtx, + const NamespaceString& globalIndexNss, + const UUID& globalIndexUUID) final{}; + + void onDropGlobalIndex(OperationContext* opCtx, + const NamespaceString& globalIndexNss, + const UUID& globalIndexUUID, + long long numKeys) final{}; + + void onCreateIndex(OperationContext* opCtx, + const NamespaceString& nss, + const UUID& uuid, + BSONObj indexDoc, + bool fromMigrate) final {} + + void onStartIndexBuild(OperationContext* opCtx, + const NamespaceString& nss, + const UUID& collUUID, + const UUID& indexBuildUUID, + const std::vector<BSONObj>& indexes, + bool fromMigrate) final {} + + void onStartIndexBuildSinglePhase(OperationContext* opCtx, const NamespaceString& nss) final {} + + void onAbortIndexBuildSinglePhase(OperationContext* opCtx, const NamespaceString& nss) final {} + + void onCommitIndexBuild(OperationContext* opCtx, + const NamespaceString& nss, + const UUID& collUUID, + const UUID& indexBuildUUID, + const std::vector<BSONObj>& indexes, + bool fromMigrate) final {} + + void onAbortIndexBuild(OperationContext* opCtx, + const NamespaceString& nss, + const UUID& collUUID, + const UUID& indexBuildUUID, + const std::vector<BSONObj>& indexes, + const Status& cause, + bool fromMigrate) final {} + + void onInserts(OperationContext* opCtx, + const CollectionPtr& coll, + std::vector<InsertStatement>::const_iterator first, + std::vector<InsertStatement>::const_iterator last, + bool fromMigrate) final; + + void onInsertGlobalIndexKey(OperationContext* opCtx, + const NamespaceString& globalIndexNss, + const UUID& globalIndexUuid, + const BSONObj& key, + const BSONObj& docKey) final{}; + + void onDeleteGlobalIndexKey(OperationContext* opCtx, + const NamespaceString& globalIndexNss, + const UUID& globalIndexUuid, + const BSONObj& key, + const BSONObj& docKey) final {} + + void onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) final; + + void aboutToDelete(OperationContext* opCtx, + const CollectionPtr& coll, + const BSONObj& doc) final; + + void onDelete(OperationContext* opCtx, + const CollectionPtr& coll, + StmtId stmtId, + const OplogDeleteEntryArgs& args) final; + + void onInternalOpMessage(OperationContext* opCtx, + const NamespaceString& nss, + const boost::optional<UUID>& uuid, + const BSONObj& msgObj, + const boost::optional<BSONObj> o2MsgObj, + const boost::optional<repl::OpTime> preImageOpTime, + const boost::optional<repl::OpTime> postImageOpTime, + const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction, + const boost::optional<OplogSlot> slot) final {} + + void onCreateCollection(OperationContext* opCtx, + const CollectionPtr& coll, + const NamespaceString& collectionName, + const CollectionOptions& options, + const BSONObj& idIndex, + const OplogSlot& createOpTime, + bool fromMigrate) final; + + void onCollMod(OperationContext* opCtx, + const NamespaceString& nss, + const UUID& uuid, + const BSONObj& collModCmd, + const CollectionOptions& oldCollOptions, + boost::optional<IndexCollModInfo> indexInfo) final {} + + void onDropDatabase(OperationContext* opCtx, const DatabaseName& dbName) final {} + + using OpObserver::onDropCollection; + repl::OpTime onDropCollection(OperationContext* opCtx, + const NamespaceString& collectionName, + const UUID& uuid, + std::uint64_t numRecords, + CollectionDropType dropType) final; + + void onDropIndex(OperationContext* opCtx, + const NamespaceString& nss, + const UUID& uuid, + const std::string& indexName, + const BSONObj& indexInfo) final {} + + using OpObserver::onRenameCollection; + void onRenameCollection(OperationContext* opCtx, + const NamespaceString& fromCollection, + const NamespaceString& toCollection, + const UUID& uuid, + const boost::optional<UUID>& dropTargetUUID, + std::uint64_t numRecords, + bool stayTemp) final {} + + void onImportCollection(OperationContext* opCtx, + const UUID& importUUID, + const NamespaceString& nss, + long long numRecords, + long long dataSize, + const BSONObj& catalogEntry, + const BSONObj& storageMetadata, + bool isDryRun) final {} + + using OpObserver::preRenameCollection; + repl::OpTime preRenameCollection(OperationContext* opCtx, + const NamespaceString& fromCollection, + const NamespaceString& toCollection, + const UUID& uuid, + const boost::optional<UUID>& dropTargetUUID, + std::uint64_t numRecords, + bool stayTemp) final { + return repl::OpTime(); + } + void postRenameCollection(OperationContext* opCtx, + const NamespaceString& fromCollection, + const NamespaceString& toCollection, + const UUID& uuid, + const boost::optional<UUID>& dropTargetUUID, + bool stayTemp) final {} + void onApplyOps(OperationContext* opCtx, + const DatabaseName& dbName, + const BSONObj& applyOpCmd) final {} + + void onEmptyCapped(OperationContext* opCtx, + const NamespaceString& collectionName, + const UUID& uuid) final {} + + void onTransactionStart(OperationContext* opCtx) final {} + + void onUnpreparedTransactionCommit(OperationContext* opCtx, + const TransactionOperations& transactionOperations) final {} + + void onPreparedTransactionCommit( + OperationContext* opCtx, + OplogSlot commitOplogEntryOpTime, + Timestamp commitTimestamp, + const std::vector<repl::ReplOperation>& statements) noexcept final {} + + std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + const TransactionOperations& transactionOperations, + Date_t wallClockTime) final { + return nullptr; + } + + void onTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + const TransactionOperations& transactionOperations, + const ApplyOpsOplogSlotAndOperationAssignment& applyOpsOperationAssignment, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime) final {} + + void onTransactionPrepareNonPrimary(OperationContext* opCtx, + const std::vector<repl::OplogEntry>& statements, + const repl::OpTime& prepareOpTime) final {} + + void onTransactionAbort(OperationContext* opCtx, + boost::optional<OplogSlot> abortOplogEntryOpTime) final {} + + void onBatchedWriteStart(OperationContext* opCtx) final {} + + void onBatchedWriteCommit(OperationContext* opCtx) final {} + + void onBatchedWriteAbort(OperationContext* opCtx) final {} + + void onMajorityCommitPointUpdate(ServiceContext* service, + const repl::OpTime& newCommitPoint) final {} + +private: + void _onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) final { + } +}; + +} // namespace mongo::repl diff --git a/src/mongo/db/repl/shard_merge_recipient_service.cpp b/src/mongo/db/repl/shard_merge_recipient_service.cpp new file mode 100644 index 00000000000..8e362697ee3 --- /dev/null +++ b/src/mongo/db/repl/shard_merge_recipient_service.cpp @@ -0,0 +1,2535 @@ +/** + * Copyright (C) 2023-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + + +#include "mongo/platform/basic.h" + +#include <boost/filesystem/operations.hpp> +#include <fmt/format.h> + +#include "mongo/base/checked_cast.h" +#include "mongo/client/dbclient_connection.h" +#include "mongo/client/replica_set_monitor.h" +#include "mongo/client/replica_set_monitor_manager.h" +#include "mongo/config.h" +#include "mongo/db/catalog/collection_write_path.h" +#include "mongo/db/catalog/document_validation.h" +#include "mongo/db/catalog_raii.h" +#include "mongo/db/client.h" +#include "mongo/db/commands/test_commands_enabled.h" +#include "mongo/db/concurrency/exception_util.h" +#include "mongo/db/concurrency/replication_state_transition_lock_guard.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/dbhelpers.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/op_observer/op_observer.h" +#include "mongo/db/ops/write_ops_exec.h" +#include "mongo/db/persistent_task_store.h" +#include "mongo/db/pipeline/process_interface/mongo_process_interface.h" +#include "mongo/db/repl/cloner_utils.h" +#include "mongo/db/repl/data_replicator_external_state.h" +#include "mongo/db/repl/oplog_applier.h" +#include "mongo/db/repl/oplog_buffer_collection.h" +#include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/oplog_interface_local.h" +#include "mongo/db/repl/read_concern_args.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/repl/repl_server_parameters_gen.h" +#include "mongo/db/repl/replication_auth.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/shard_merge_recipient_service.h" +#include "mongo/db/repl/tenant_migration_access_blocker_util.h" +#include "mongo/db/repl/tenant_migration_decoration.h" +#include "mongo/db/repl/tenant_migration_shard_merge_util.h" +#include "mongo/db/repl/tenant_migration_statistics.h" +#include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/db/session/session_catalog_mongod.h" +#include "mongo/db/session/session_txn_record_gen.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_import.h" +#include "mongo/db/transaction/transaction_participant.h" +#include "mongo/db/vector_clock_mutable.h" +#include "mongo/db/write_concern_options.h" +#include "mongo/executor/task_executor.h" +#include "mongo/logv2/log.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/cancellation.h" +#include "mongo/util/future_util.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTenantMigration + + +namespace mongo { +namespace repl { +namespace { +using namespace fmt; +const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max()); +constexpr StringData kOplogBufferPrefix = "repl.migration.oplog_"_sd; +constexpr int kBackupCursorFileFetcherRetryAttempts = 10; +constexpr int kCheckpointTsBackupCursorErrorCode = 6929900; +constexpr int kCloseCursorBeforeOpenErrorCode = 50886; + +NamespaceString getOplogBufferNs(const UUID& migrationUUID) { + return NamespaceString(DatabaseName::kConfig, kOplogBufferPrefix + migrationUUID.toString()); +} + +boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext* opCtx) { + StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; + + // Add kTenantMigrationOplogView, kSessionTransactionsTableNamespace, and kRsOplogNamespace + // to resolvedNamespaces since they are all used during different pipeline stages. + resolvedNamespaces[NamespaceString::kTenantMigrationOplogView.coll()] = { + NamespaceString::kTenantMigrationOplogView, std::vector<BSONObj>()}; + + resolvedNamespaces[NamespaceString::kSessionTransactionsTableNamespace.coll()] = { + NamespaceString::kSessionTransactionsTableNamespace, std::vector<BSONObj>()}; + + resolvedNamespaces[NamespaceString::kRsOplogNamespace.coll()] = { + NamespaceString::kRsOplogNamespace, std::vector<BSONObj>()}; + + return make_intrusive<ExpressionContext>(opCtx, + boost::none, /* explain */ + false, /* fromMongos */ + false, /* needsMerge */ + true, /* allowDiskUse */ + true, /* bypassDocumentValidation */ + false, /* isMapReduceCommand */ + NamespaceString::kSessionTransactionsTableNamespace, + boost::none, /* runtimeConstants */ + nullptr, /* collator */ + MongoProcessInterface::create(opCtx), + std::move(resolvedNamespaces), + boost::none); /* collUUID */ +} + +// We allow retrying on the following oplog fetcher errors: +// 1) InvalidSyncSource - we cannot sync from the chosen sync source, potentially because the sync +// source is too stale or there was a network error when connecting to the sync source. +// 2) ShudownInProgress - the current sync source is shutting down +bool isRetriableOplogFetcherError(Status oplogFetcherStatus) { + return oplogFetcherStatus == ErrorCodes::InvalidSyncSource || + oplogFetcherStatus == ErrorCodes::ShutdownInProgress; +} + +// We never restart just the oplog fetcher. If a failure occurs, we restart the whole state machine +// and recover from there. So the restart decision is always "no". +class OplogFetcherRestartDecisionTenantMigration + : public OplogFetcher::OplogFetcherRestartDecision { +public: + ~OplogFetcherRestartDecisionTenantMigration(){}; + bool shouldContinue(OplogFetcher* fetcher, Status status) final { + return false; + } + void fetchSuccessful(OplogFetcher* fetcher) final {} +}; + +// The oplog fetcher requires some of the methods in DataReplicatorExternalState to operate. +class DataReplicatorExternalStateTenantMigration : public DataReplicatorExternalState { +public: + // The oplog fetcher is passed its executor directly and does not use the one from the + // DataReplicatorExternalState. + executor::TaskExecutor* getTaskExecutor() const final { + MONGO_UNREACHABLE; + } + std::shared_ptr<executor::TaskExecutor> getSharedTaskExecutor() const final { + MONGO_UNREACHABLE; + } + + // The oplog fetcher uses the current term and opTime to inform the sync source of term changes. + // As the term on the donor and the term on the recipient have nothing to do with each other, + // we do not want to do that. + OpTimeWithTerm getCurrentTermAndLastCommittedOpTime() final { + return {OpTime::kUninitializedTerm, OpTime()}; + } + + // Tenant migration does not require the metadata from the oplog query. + void processMetadata(const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata) final {} + + // Tenant migration does not change sync source depending on metadata. + ChangeSyncSourceAction shouldStopFetching(const HostAndPort& source, + const rpc::ReplSetMetadata& replMetadata, + const rpc::OplogQueryMetadata& oqMetadata, + const OpTime& previousOpTimeFetched, + const OpTime& lastOpTimeFetched) const final { + return ChangeSyncSourceAction::kContinueSyncing; + } + + // Tenant migration does not re-evaluate sync source on error. + ChangeSyncSourceAction shouldStopFetchingOnError(const HostAndPort& source, + const OpTime& lastOpTimeFetched) const final { + return ChangeSyncSourceAction::kContinueSyncing; + } + + // The oplog fetcher should never call the rest of the methods. + std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(OperationContext* opCtx) const final { + MONGO_UNREACHABLE; + } + + std::unique_ptr<OplogApplier> makeOplogApplier( + OplogBuffer* oplogBuffer, + OplogApplier::Observer* observer, + ReplicationConsistencyMarkers* consistencyMarkers, + StorageInterface* storageInterface, + const OplogApplier::Options& options, + ThreadPool* writerPool) final { + MONGO_UNREACHABLE; + }; + + virtual StatusWith<ReplSetConfig> getCurrentConfig() const final { + MONGO_UNREACHABLE; + } + + StatusWith<BSONObj> loadLocalConfigDocument(OperationContext* opCtx) const final { + MONGO_UNREACHABLE; + } + + Status storeLocalConfigDocument(OperationContext* opCtx, const BSONObj& config) final { + MONGO_UNREACHABLE; + } + + StatusWith<LastVote> loadLocalLastVoteDocument(OperationContext* opCtx) const final { + MONGO_UNREACHABLE; + } + + JournalListener* getReplicationJournalListener() final { + MONGO_UNREACHABLE; + } +}; + +/* + * Acceptable classes for the 'Target' are AbstractAsyncComponent and RandomAccessOplogBuffer. + */ +template <class Target> +void shutdownTarget(WithLock lk, Target& target) { + if (target) + target->shutdown(); +} + +template <class Target> +void shutdownTargetWithOpCtx(WithLock lk, Target& target, OperationContext* opCtx) { + if (target) + target->shutdown(opCtx); +} + +template <class Target> +void joinTarget(Target& target) { + if (target) + target->join(); +} + +template <class Promise> +void setPromiseErrorifNotReady(WithLock lk, Promise& promise, Status status) { + if (promise.getFuture().isReady()) { + return; + } + + promise.setError(status); +} + +template <class Promise> +void setPromiseOkifNotReady(WithLock lk, Promise& promise) { + if (promise.getFuture().isReady()) { + return; + } + + promise.emplaceValue(); +} + +template <class Promise, class Value> +void setPromiseValueIfNotReady(WithLock lk, Promise& promise, Value& value) { + if (promise.getFuture().isReady()) { + return; + } + + promise.emplaceValue(value); +} + +Timestamp selectRejectReadsBeforeTimestamp(OperationContext* opCtx, + const Timestamp& returnAfterReachingTimestamp, + const OpTime& oplogApplierOpTime) { + // Don't allow reading before the opTime timestamp of the final write on the recipient + // associated with cloning the donor's data so the client can't see an inconsistent state. The + // oplog applier timestamp may be null if no oplog entries were copied, but data may still have + // been cloned, so use the last applied opTime in that case. + // + // Note the cloning writes happen on a separate thread, but the last applied opTime in the + // replication coordinator is guaranteed to be inclusive of those writes because this function + // is called after waiting for the _dataConsistentPromise to resolve, which happens after the + // last write for cloning completes (and all of its WUOW onCommit() handlers). + auto finalRecipientWriteTimestamp = oplogApplierOpTime.getTimestamp().isNull() + ? ReplicationCoordinator::get(opCtx)->getMyLastAppliedOpTime().getTimestamp() + : oplogApplierOpTime.getTimestamp(); + + // Also don't allow reading before the returnAfterReachingTimestamp (aka the blockTimestamp) to + // prevent readers from possibly seeing data in a point in time snapshot on the recipient that + // would not have been seen at the same point in time on the donor if the donor's cluster time + // is ahead of the recipient's. + return std::max(finalRecipientWriteTimestamp, returnAfterReachingTimestamp); +} + + +/** + * Converts migration errors, such as, network errors and cancellation errors to interrupt + * error status. + * + * On migration interrupt, async components will fail with generic network/cancellation + * errors rather than interrupt error status. When sending the migration command response to + * donor, we should convert those into real errors so that donor can decide if they need to + * retry migration commands. + */ +Status overrideMigrationErrToInterruptStatusIfNeeded( + const UUID& migrationUUID, + Status status, + boost::optional<SharedSemiFuture<void>> interruptFuture = boost::none) { + if (status.isOK()) + return status; + + // Network and cancellation errors can be caused due to migration interrupt so replace those + // error status with interrupt error status, if set. + if (ErrorCodes::isCancellationError(status) || ErrorCodes::isNetworkError(status)) { + boost::optional<Status> newErrStatus; + if (interruptFuture && interruptFuture->isReady() && + !interruptFuture->getNoThrow().isOK()) { + newErrStatus = interruptFuture->getNoThrow(); + } else if (status == ErrorCodes::CallbackCanceled) { + // All of our async components don't exit with CallbackCanceled normally unless + // they are shut down by the instance itself via interrupt. If we get a + // CallbackCanceled error without an interrupt, it is coming from the service's + // cancellation token or scoped task executor shutdown on failovers. It is possible + // for the token to get canceled or scope task executor to shutdown + // before the instance is interrupted. So we replace the CallbackCanceled error + // with InterruptedDueToReplStateChange and treat it as a retryable error. + newErrStatus = + Status{ErrorCodes::InterruptedDueToReplStateChange, "operation was interrupted"}; + } + + if (newErrStatus) { + LOGV2(7339701, + "Override migration error with interrupt status", + "migrationId"_attr = migrationUUID, + "error"_attr = status, + "interruptStatus"_attr = newErrStatus); + return *newErrStatus; + } + } + return status; +} +} // namespace + +MONGO_FAIL_POINT_DEFINE(autoRecipientForgetMigrationAbort); +MONGO_FAIL_POINT_DEFINE(fpBeforeMarkingStateDocAsGarbageCollectable); + +ShardMergeRecipientService::ShardMergeRecipientService(ServiceContext* const serviceContext) + : PrimaryOnlyService(serviceContext), _serviceContext(serviceContext) {} + +StringData ShardMergeRecipientService::getServiceName() const { + return kShardMergeRecipientServiceName; +} + +NamespaceString ShardMergeRecipientService::getStateDocumentsNS() const { + return NamespaceString::kShardMergeRecipientsNamespace; +} + +ThreadPool::Limits ShardMergeRecipientService::getThreadPoolLimits() const { + ThreadPool::Limits limits; + limits.maxThreads = maxTenantMigrationRecipientThreadPoolSize; + limits.minThreads = minTenantMigrationRecipientThreadPoolSize; + return limits; +} + +void ShardMergeRecipientService::abortAllMigrations(OperationContext* opCtx) { + LOGV2(7339700, "Aborting all active shard merge recipient instances."); + auto instances = getAllInstances(opCtx); + for (auto& instance : instances) { + auto typedInstance = checked_pointer_cast<ShardMergeRecipientService::Instance>(instance); + auto status = + Status(ErrorCodes::TenantMigrationAborted, "Shard merge recipient service interrupted"); + typedInstance->interruptConditionally(status); + } +} + +ExecutorFuture<void> ShardMergeRecipientService::_rebuildService( + std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) { + return AsyncTry([this] { + AllowOpCtxWhenServiceRebuildingBlock allowOpCtxBlock(Client::getCurrent()); + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + + auto status = StorageInterface::get(opCtx)->createCollection( + opCtx, getStateDocumentsNS(), CollectionOptions()); + if (!status.isOK() && status != ErrorCodes::NamespaceExists) { + uassertStatusOK(status); + } + return Status::OK(); + }) + .until([token](Status status) { return status.isOK() || token.isCanceled(); }) + .withBackoffBetweenIterations(kExponentialBackoff) + .on(**executor, CancellationToken::uncancelable()); +} + +void ShardMergeRecipientService::checkIfConflictsWithOtherInstances( + OperationContext* opCtx, + BSONObj initialStateDoc, + const std::vector<const PrimaryOnlyService::Instance*>& existingInstances) { + + auto recipientStateDoc = + ShardMergeRecipientDocument::parse(IDLParserContext("recipientStateDoc"), initialStateDoc); + + // We don't start migration if `startGarbageCollect` is true. So, it's safe to not + // check the conflicts with other instances. + // + // We need this to avoid races, like, delayed 'recipientForgetMigration'with migration decision + // 'committed' received after the corresponding migration state doc was deleted and another + // conflicting migration was started. + if (recipientStateDoc.getStartGarbageCollect()) { + return; + } + + for (auto& instance : existingInstances) { + auto existingTypedInstance = + checked_cast<const ShardMergeRecipientService::Instance*>(instance); + auto existingStateDoc = existingTypedInstance->getStateDoc(); + + uassert(ErrorCodes::ConflictingOperationInProgress, + "An existing shard merge is in progress", + existingStateDoc.getStartGarbageCollect() || existingStateDoc.getExpireAt()); + } +} + +std::shared_ptr<PrimaryOnlyService::Instance> ShardMergeRecipientService::constructInstance( + BSONObj initialStateDoc) { + return std::make_shared<ShardMergeRecipientService::Instance>( + _serviceContext, this, initialStateDoc); +} + +ShardMergeRecipientService::Instance::Instance(ServiceContext* const serviceContext, + const ShardMergeRecipientService* recipientService, + BSONObj stateDoc) + : PrimaryOnlyService::TypedInstance<Instance>(), + _serviceContext(serviceContext), + _recipientService(recipientService), + _stateDoc( + ShardMergeRecipientDocument::parse(IDLParserContext("mergeRecipientStateDoc"), stateDoc)), + _tenantIds(_stateDoc.getTenantIds()), + _migrationUuid(_stateDoc.getId()), + _donorConnectionString(_stateDoc.getDonorConnectionString().toString()), + _donorUri(uassertStatusOK(MongoURI::parse(_stateDoc.getDonorConnectionString().toString()))), + _readPreference(_stateDoc.getReadPreference()), + _recipientCertificateForDonor(_stateDoc.getRecipientCertificateForDonor()), + _transientSSLParams([&]() -> boost::optional<TransientSSLParams> { + if (auto recipientCertificate = _stateDoc.getRecipientCertificateForDonor()) { + invariant(!repl::tenantMigrationDisableX509Auth); +#ifdef MONGO_CONFIG_SSL + uassert(ErrorCodes::IllegalOperation, + "Cannot run shard merge with x509 authentication as SSL is not enabled", + getSSLGlobalParams().sslMode.load() != SSLParams::SSLMode_disabled); + auto recipientSSLClusterPEMPayload = + recipientCertificate->getCertificate().toString() + "\n" + + recipientCertificate->getPrivateKey().toString(); + return TransientSSLParams{_donorUri.connectionString(), + std::move(recipientSSLClusterPEMPayload)}; +#else + // If SSL is not supported, the recipientSyncData command should have failed + // certificate field validation. + MONGO_UNREACHABLE; +#endif + } else { + invariant(repl::tenantMigrationDisableX509Auth); + return boost::none; + } + }()) { +} + +boost::optional<BSONObj> ShardMergeRecipientService::Instance::reportForCurrentOp( + MongoProcessInterface::CurrentOpConnectionsMode connMode, + MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept { + + BSONObjBuilder bob; + + stdx::lock_guard lk(_mutex); + bob.append("desc", "shard merge recipient"); + _migrationUuid.appendToBuilder(&bob, "instanceID"_sd); + { + BSONArrayBuilder arrayBuilder(bob.subarrayStart("tenantIds")); + for (const auto& tenantId : _stateDoc.getTenantIds()) { + tenantId.serializeToBSON(&arrayBuilder); + } + } + + bob.append("donorConnectionString", _stateDoc.getDonorConnectionString()); + bob.append("readPreference", _stateDoc.getReadPreference().toInnerBSON()); + bob.append("state", ShardMergeRecipientState_serializer(_stateDoc.getState())); + bob.appendBool("migrationStarted", !_stateDoc.getStartGarbageCollect()); + bob.append("migrationCompleted", _migrationCompletionPromise.getFuture().isReady()); + bob.append("garbageCollectable", _forgetMigrationDurablePromise.getFuture().isReady()); + + if (_stateDoc.getStartFetchingDonorOpTime()) + _stateDoc.getStartFetchingDonorOpTime()->append(&bob, "startFetchingDonorOpTime"); + if (_stateDoc.getStartApplyingDonorOpTime()) + _stateDoc.getStartApplyingDonorOpTime()->append(&bob, "startApplyingDonorOpTime"); + if (_stateDoc.getCloneFinishedRecipientOpTime()) + _stateDoc.getCloneFinishedRecipientOpTime()->append(&bob, "cloneFinishedRecipientOpTime"); + + if (_stateDoc.getExpireAt()) + bob.append("expireAt", *_stateDoc.getExpireAt()); + + if (_client) { + bob.append("donorSyncSource", _client->getServerAddress()); + } + + if (_stateDoc.getStartAt()) { + bob.append("receiveStart", *_stateDoc.getStartAt()); + } + + if (_tenantOplogApplier) { + bob.appendNumber("numOpsApplied", + static_cast<long long>(_tenantOplogApplier->getNumOpsApplied())); + } + + return bob.obj(); +} + +void ShardMergeRecipientService::Instance::checkIfOptionsConflict(const BSONObj& options) const { + auto stateDoc = + ShardMergeRecipientDocument::parse(IDLParserContext("recipientStateDoc"), options); + + invariant(stateDoc.getId() == _migrationUuid); + + if (stateDoc.getTenantIds() != _tenantIds || + stateDoc.getDonorConnectionString() != _donorConnectionString || + !stateDoc.getReadPreference().equals(_readPreference) || + stateDoc.getRecipientCertificateForDonor() != _recipientCertificateForDonor) { + uasserted(ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Found active migration for migrationId \"" + << _migrationUuid.toBSON() << "\" with different options " + << tenant_migration_util::redactStateDoc(getStateDoc().toBSON())); + } +} + +OpTime ShardMergeRecipientService::Instance::waitUntilMigrationReachesConsistentState( + OperationContext* opCtx) const { + return _dataConsistentPromise.getFuture().get(opCtx); +} + +OpTime ShardMergeRecipientService::Instance::waitUntilMigrationReachesReturnAfterReachingTimestamp( + OperationContext* opCtx, const Timestamp& returnAfterReachingTimestamp) { + // This gives assurance that _tenantOplogApplier pointer won't be empty, and that it has been + // started. Additionally, we must have finished processing the recipientSyncData command that + // waits on _dataConsistentPromise. + _dataConsistentPromise.getFuture().get(opCtx); + + auto getWaitOpTimeFuture = [&]() { + stdx::unique_lock lk(_mutex); + // We start tenant oplog applier after recipient informs donor, + // the data is in consistent state. So, there is a possibility, recipient might receive + // recipientSyncData cmd with `returnAfterReachingDonorTimestamp` from donor before the + // recipient has started the tenant oplog applier. + opCtx->waitForConditionOrInterrupt(_oplogApplierReadyCondVar, lk, [&] { + return _oplogApplierReady || _migrationCompletionPromise.getFuture().isReady(); + }); + if (_migrationCompletionPromise.getFuture().isReady()) { + // When the data sync is done, we reset _tenantOplogApplier, so just throw the data sync + // completion future result. + _migrationCompletionPromise.getFuture().get(); + MONGO_UNREACHABLE; + } + + // Sanity checks. + invariant(_tenantOplogApplier); + auto state = _stateDoc.getState(); + uassert( + ErrorCodes::IllegalOperation, + str::stream() + << "Failed to wait for the donor timestamp to be majority committed due to" + "conflicting tenant migration state, migration uuid: " + << getMigrationUUID() << " , current state: " + << ShardMergeRecipientState_serializer(state) << " , expected state: " + << ShardMergeRecipientState_serializer(ShardMergeRecipientStateEnum::kConsistent) + << ".", + state == ShardMergeRecipientStateEnum::kConsistent); + + return _tenantOplogApplier->getNotificationForOpTime( + OpTime(returnAfterReachingTimestamp, OpTime::kUninitializedTerm)); + }; + + auto waitOpTimeFuture = getWaitOpTimeFuture(); + fpWaitUntilTimestampMajorityCommitted.pauseWhileSet(); + auto swDonorRecipientOpTimePair = waitOpTimeFuture.getNoThrow(); + + auto status = swDonorRecipientOpTimePair.getStatus(); + + // A cancellation error may occur due to an interrupt. If that is the case, replace the error + // code with the interrupt code, the true reason for interruption. + status = overrideMigrationErrToInterruptStatusIfNeeded( + _migrationUuid, status, _interruptPromise.getFuture()); + uassertStatusOK(status); + + auto& donorRecipientOpTimePair = swDonorRecipientOpTimePair.getValue(); + + // Make sure that the recipient logical clock has advanced to at least the donor timestamp + // before returning success for recipientSyncData. + // Note: tickClusterTimeTo() will not tick the recipient clock backwards in time. + VectorClockMutable::get(opCtx)->tickClusterTimeTo(LogicalTime(returnAfterReachingTimestamp)); + + stdx::unique_lock lk(_mutex); + _stateDoc.setRejectReadsBeforeTimestamp(selectRejectReadsBeforeTimestamp( + opCtx, returnAfterReachingTimestamp, donorRecipientOpTimePair.recipientOpTime)); + const auto stateDoc = _stateDoc; + lk.unlock(); + _stopOrHangOnFailPoint(&fpBeforePersistingRejectReadsBeforeTimestamp, opCtx); + + auto lastOpBeforeUpdate = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + _writeStateDoc(opCtx, stateDoc, OpType::kUpdate); + auto lastOpAfterUpdate = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + auto replCoord = repl::ReplicationCoordinator::get(_serviceContext); + if (lastOpBeforeUpdate == lastOpAfterUpdate) { + // updateStateDoc was a no-op, but we still must ensure it's all-replicated. + lastOpAfterUpdate = uassertStatusOK(replCoord->getLatestWriteOpTime(opCtx)); + LOGV2(7339702, + "Fixed write timestamp for recording rejectReadsBeforeTimestamp", + "newWriteOpTime"_attr = lastOpAfterUpdate); + } + + WriteConcernOptions writeConcern(repl::ReplSetConfig::kConfigAllWriteConcernName, + WriteConcernOptions::SyncMode::NONE, + opCtx->getWriteConcern().wTimeout); + uassertStatusOK(replCoord->awaitReplication(opCtx, lastOpAfterUpdate, writeConcern).status); + + _stopOrHangOnFailPoint(&fpAfterWaitForRejectReadsBeforeTimestamp, opCtx); + + return donorRecipientOpTimePair.donorOpTime; +} + +std::unique_ptr<DBClientConnection> ShardMergeRecipientService::Instance::_connectAndAuth( + const HostAndPort& serverAddress, StringData applicationName) { + auto swClientBase = ConnectionString(serverAddress) + .connect(applicationName, + 0 /* socketTimeout */, + nullptr /* uri */, + nullptr /* apiParameters */, + _transientSSLParams ? &_transientSSLParams.value() : nullptr); + if (!swClientBase.isOK()) { + LOGV2_ERROR(7339719, + "Failed to connect to migration donor", + "migrationId"_attr = getMigrationUUID(), + "serverAddress"_attr = serverAddress, + "applicationName"_attr = applicationName, + "error"_attr = swClientBase.getStatus()); + uassertStatusOK(swClientBase.getStatus()); + } + + auto clientBase = swClientBase.getValue().release(); + + // ConnectionString::connect() always returns a DBClientConnection in a unique_ptr of + // DBClientBase type. + std::unique_ptr<DBClientConnection> client(checked_cast<DBClientConnection*>(clientBase)); + + // Authenticate connection to the donor. + if (!_transientSSLParams) { + uassertStatusOK(replAuthenticate(clientBase) + .withContext(str::stream() + << "ShardMergeRecipientService failed to authenticate to " + << serverAddress)); + } else if (MONGO_likely(!skipTenantMigrationRecipientAuth.shouldFail())) { + client->auth(auth::createInternalX509AuthDocument()); + } + + return client; +} + +OpTime ShardMergeRecipientService::Instance::_getDonorMajorityOpTime( + std::unique_ptr<mongo::DBClientConnection>& client) { + auto oplogOpTimeFields = + BSON(OplogEntry::kTimestampFieldName << 1 << OplogEntry::kTermFieldName << 1); + FindCommandRequest findCmd{NamespaceString::kRsOplogNamespace}; + findCmd.setSort(BSON("$natural" << -1)); + findCmd.setProjection(oplogOpTimeFields); + findCmd.setReadConcern(ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern).toBSONInner()); + auto majorityOpTimeBson = client->findOne( + std::move(findCmd), ReadPreferenceSetting{ReadPreference::SecondaryPreferred}); + uassert(7339780, "Found no entries in the remote oplog", !majorityOpTimeBson.isEmpty()); + + auto majorityOpTime = uassertStatusOK(OpTime::parseFromOplogEntry(majorityOpTimeBson)); + return majorityOpTime; +} + +SemiFuture<ShardMergeRecipientService::Instance::ConnectionPair> +ShardMergeRecipientService::Instance::_createAndConnectClients() { + LOGV2_DEBUG(7339720, + 1, + "Recipient migration instance connecting clients", + "migrationId"_attr = getMigrationUUID(), + "connectionString"_attr = _donorConnectionString, + "readPreference"_attr = _readPreference); + const auto& servers = _donorUri.getServers(); + stdx::lock_guard lk(_mutex); + _donorReplicaSetMonitor = ReplicaSetMonitor::createIfNeeded( + _donorUri.getSetName(), std::set<HostAndPort>(servers.begin(), servers.end())); + + // Only ever used to cancel when the setTenantMigrationRecipientInstanceHostTimeout failpoint is + // set. + CancellationSource getHostCancelSource; + setTenantMigrationRecipientInstanceHostTimeout.execute([&](const BSONObj& data) { + auto exec = **_scopedExecutor; + const auto deadline = + exec->now() + Milliseconds(data["findHostTimeoutMillis"].safeNumberLong()); + // Cancel the find host request after a timeout. Ignore callback handle. + exec->sleepUntil(deadline, CancellationToken::uncancelable()) + .getAsync([getHostCancelSource](auto) mutable { getHostCancelSource.cancel(); }); + }); + + if (MONGO_unlikely(hangAfterCreatingRSM.shouldFail())) { + LOGV2(7339703, "hangAfterCreatingRSM failpoint enabled"); + hangAfterCreatingRSM.pauseWhileSet(); + } + + const auto kDelayedMajorityOpTimeErrorCode = 5272000; + return AsyncTry([this, + self = shared_from_this(), + getHostCancelSource, + kDelayedMajorityOpTimeErrorCode] { + stdx::lock_guard lk(_mutex); + + // Get all donor hosts that we have excluded. + const auto& excludedHosts = _getExcludedDonorHosts(lk); + return _donorReplicaSetMonitor + ->getHostOrRefresh(_readPreference, excludedHosts, getHostCancelSource.token()) + .thenRunOn(**_scopedExecutor) + .then([this, self = shared_from_this(), kDelayedMajorityOpTimeErrorCode]( + const HostAndPort& serverAddress) { + LOGV2(7339704, + "Attempting to connect to donor host", + "donorHost"_attr = serverAddress, + "migrationId"_attr = getMigrationUUID()); + auto applicationName = "TenantMigration_" + getMigrationUUID().toString(); + + auto client = _connectAndAuth(serverAddress, applicationName); + + boost::optional<repl::OpTime> startApplyingOpTime; + Timestamp startMigrationDonorTimestamp; + { + stdx::lock_guard lk(_mutex); + startApplyingOpTime = _stateDoc.getStartApplyingDonorOpTime(); + startMigrationDonorTimestamp = + _stateDoc.getStartMigrationDonorTimestamp(); + } + + auto majoritySnapshotOpTime = _getDonorMajorityOpTime(client); + if (majoritySnapshotOpTime.getTimestamp() < startMigrationDonorTimestamp) { + stdx::lock_guard lk(_mutex); + const auto now = _serviceContext->getFastClockSource()->now(); + _excludeDonorHost( + lk, + serverAddress, + now + Milliseconds(tenantMigrationExcludeDonorHostTimeoutMS)); + uasserted( + kDelayedMajorityOpTimeErrorCode, + str::stream() + << "majoritySnapshotOpTime on donor host must not be behind " + "startMigrationDonorTimestamp, majoritySnapshotOpTime: " + << majoritySnapshotOpTime.toString() + << "; startMigrationDonorTimestamp: " + << startMigrationDonorTimestamp.toString()); + } + if (startApplyingOpTime && majoritySnapshotOpTime < *startApplyingOpTime) { + stdx::lock_guard lk(_mutex); + const auto now = _serviceContext->getFastClockSource()->now(); + _excludeDonorHost( + lk, + serverAddress, + now + Milliseconds(tenantMigrationExcludeDonorHostTimeoutMS)); + uasserted( + kDelayedMajorityOpTimeErrorCode, + str::stream() + << "majoritySnapshotOpTime on donor host must not be behind " + "startApplyingDonorOpTime, majoritySnapshotOpTime: " + << majoritySnapshotOpTime.toString() + << "; startApplyingDonorOpTime: " + << (*startApplyingOpTime).toString()); + } + + applicationName += "_oplogFetcher"; + auto oplogFetcherClient = _connectAndAuth(serverAddress, applicationName); + return ConnectionPair(std::move(client), std::move(oplogFetcherClient)); + }); + }) + .until([this, self = shared_from_this(), kDelayedMajorityOpTimeErrorCode]( + const StatusWith<ConnectionPair>& statusWith) { + auto status = statusWith.getStatus(); + + if (status.isOK()) { + return true; + } + + LOGV2_ERROR(7339721, + "Connecting to donor failed", + "migrationId"_attr = getMigrationUUID(), + "error"_attr = status); + + // If the future chain has been interrupted, stop retrying. + if (!_getInterruptStatus().isOK()) { + return true; + } + + if (MONGO_unlikely(skipRetriesWhenConnectingToDonorHost.shouldFail())) { + LOGV2(7339705, + "skipRetriesWhenConnectingToDonorHost failpoint enabled, migration " + "proceeding with error from connecting to sync source"); + return true; + } + + /* + * Retry sync source selection if we encountered any of the following errors: + * 1) The RSM couldn't find a suitable donor host + * 2) The majority snapshot OpTime on the donor host was not ahead of our stored + * 'startApplyingDonorOpTime' + * 3) Some other retriable error + */ + if (status == ErrorCodes::FailedToSatisfyReadPreference || + status == ErrorCodes::Error(kDelayedMajorityOpTimeErrorCode) || + ErrorCodes::isRetriableError(status)) { + return false; + } + + return true; + }) + .on(**_scopedExecutor, CancellationToken::uncancelable()) + .semi(); +} + +void ShardMergeRecipientService::Instance::_excludeDonorHost(WithLock, + const HostAndPort& host, + Date_t until) { + LOGV2_DEBUG(7339722, + 2, + "Excluding donor host", + "donorHost"_attr = host, + "until"_attr = until.toString()); + + _excludedDonorHosts.emplace_back(std::make_pair(host, until)); +} + +std::vector<HostAndPort> ShardMergeRecipientService::Instance::_getExcludedDonorHosts(WithLock lk) { + const auto now = _serviceContext->getFastClockSource()->now(); + + // Clean up any hosts that have had their exclusion duration expired. + auto itr = std::remove_if( + _excludedDonorHosts.begin(), + _excludedDonorHosts.end(), + [now](const std::pair<HostAndPort, Date_t>& pair) { return pair.second < now; }); + _excludedDonorHosts.erase(itr, _excludedDonorHosts.end()); + + // Return the list of currently excluded donor hosts. + std::vector<HostAndPort> excludedHosts; + std::transform(_excludedDonorHosts.begin(), + _excludedDonorHosts.end(), + std::back_inserter(excludedHosts), + [](const std::pair<HostAndPort, Date_t>& pair) { return pair.first; }); + return excludedHosts; +} + +SemiFuture<void> ShardMergeRecipientService::Instance::_initializeAndDurablyPersistStateDoc() { + stdx::unique_lock lk(_mutex); + uassert(ErrorCodes::TenantMigrationForgotten, + str::stream() << "Migration " << getMigrationUUID() + << " already marked for garbage collection", + !(_isCommitOrAbortState(lk) || _stateDoc.getStartGarbageCollect())); + + uassert(ErrorCodes::TenantMigrationAborted, + str::stream() << "Failover happened during migration :: migrationId: " + << getMigrationUUID(), + !_stateDoc.getStartAt()); + + LOGV2_DEBUG(7339723, + 2, + "Recipient migration instance initializing state document", + "migrationId"_attr = getMigrationUUID(), + "connectionString"_attr = _donorConnectionString, + "readPreference"_attr = _readPreference); + + // Record the time at which the state doc is initialized. + _stateDoc.setStartAt(_serviceContext->getFastClockSource()->now()); + + if (MONGO_unlikely(failWhilePersistingTenantMigrationRecipientInstanceStateDoc.shouldFail())) { + LOGV2(7339706, "Persisting state doc failed due to fail point enabled."); + uasserted(ErrorCodes::NotWritablePrimary, + "Persisting state doc failed for shard merge recipient service - " + "'failWhilePersistingTenantMigrationRecipientInstanceStateDoc' fail point " + "active"); + } + + auto failToInitializeChangeCbk = [&](OperationContext* opCtx) { + opCtx->recoveryUnit()->onRollback([&](auto _) { + stdx::unique_lock lk(_mutex); + _stateDoc.setStartGarbageCollect(true); + }); + }; + + return _insertStateDocForMajority(lk, failToInitializeChangeCbk); +} + +SemiFuture<void> ShardMergeRecipientService::Instance::_killBackupCursor() { + stdx::lock_guard lk(_mutex); + + auto& donorBackupCursorInfo = _getDonorBackupCursorInfo(lk); + if (donorBackupCursorInfo.cursorId <= 0) { + return SemiFuture<void>::makeReady(); + } + + if (_backupCursorKeepAliveFuture) { + _backupCursorKeepAliveCancellation.cancel(); + } + + return std::exchange(_backupCursorKeepAliveFuture, {}) + .value_or(SemiFuture<void>::makeReady()) + .thenRunOn(_recipientService->getInstanceCleanupExecutor()) + .then([this, self = shared_from_this(), donorBackupCursorInfo] { + LOGV2_INFO(7339724, + "Killing backup cursor", + "migrationId"_attr = getMigrationUUID(), + "cursorId"_attr = donorBackupCursorInfo.cursorId); + + stdx::lock_guard lk(_mutex); + executor::RemoteCommandRequest request( + _client->getServerHostAndPort(), + donorBackupCursorInfo.nss.db().toString(), + BSON("killCursors" << donorBackupCursorInfo.nss.coll().toString() << "cursors" + << BSON_ARRAY(donorBackupCursorInfo.cursorId)), + nullptr); + request.sslMode = _donorUri.getSSLMode(); + + const auto scheduleResult = _scheduleKillBackupCursorWithLock( + lk, _recipientService->getInstanceCleanupExecutor()); + if (!scheduleResult.isOK()) { + LOGV2_WARNING(7339725, + "Failed to run killCursors command on backup cursor", + "status"_attr = scheduleResult.getStatus()); + } + }) + .semi(); +} + +SemiFuture<void> ShardMergeRecipientService::Instance::_openBackupCursor( + const CancellationToken& token) { + + const auto aggregateCommandRequestObj = [] { + AggregateCommandRequest aggRequest( + NamespaceString::makeCollectionlessAggregateNSS(DatabaseName::kAdmin), + {BSON("$backupCursor" << BSONObj())}); + // We must set a writeConcern on internal commands. + aggRequest.setWriteConcern(WriteConcernOptions()); + return aggRequest.toBSON(BSONObj()); + }(); + + stdx::lock_guard lk(_mutex); + uassertStatusOK(_getInterruptStatus()); + + LOGV2_DEBUG(7339726, + 1, + "Trying to open backup cursor on donor primary", + "migrationId"_attr = _stateDoc.getId(), + "donorConnectionString"_attr = _stateDoc.getDonorConnectionString()); + + const auto startMigrationDonorTimestamp = _stateDoc.getStartMigrationDonorTimestamp(); + + auto fetchStatus = std::make_shared<boost::optional<Status>>(); + auto uniqueMetadataInfo = std::make_unique<boost::optional<shard_merge_utils::MetadataInfo>>(); + const auto fetcherCallback = [this, + self = shared_from_this(), + fetchStatus, + metadataInfoPtr = uniqueMetadataInfo.get(), + token, + startMigrationDonorTimestamp]( + const Fetcher::QueryResponseStatus& dataStatus, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob) noexcept { + try { + uassertStatusOK(dataStatus); + uassert(ErrorCodes::CallbackCanceled, "backup cursor interrupted", !token.isCanceled()); + + const auto uniqueOpCtx = cc().makeOperationContext(); + const auto opCtx = uniqueOpCtx.get(); + + const auto& data = dataStatus.getValue(); + for (const BSONObj& doc : data.documents) { + if (doc["metadata"]) { + // First batch must contain the metadata. + const auto& metadata = doc["metadata"].Obj(); + auto checkpointTimestamp = metadata["checkpointTimestamp"].timestamp(); + + LOGV2_INFO(7339727, + "Opened backup cursor on donor", + "migrationId"_attr = getMigrationUUID(), + "backupCursorId"_attr = data.cursorId, + "backupCursorCheckpointTimestamp"_attr = checkpointTimestamp); + + { + stdx::lock_guard lk(_mutex); + stdx::lock_guard<TenantMigrationSharedData> sharedDatalk(*_sharedData); + _sharedData->setDonorBackupCursorInfo( + sharedDatalk, + BackupCursorInfo{data.cursorId, data.nss, checkpointTimestamp}); + } + + // This ensures that the recipient won’t receive any 2 phase index build donor + // oplog entries during the migration. We also have a check in the tenant oplog + // applier to detect such oplog entries. Adding a check here helps us to detect + // the problem earlier. + uassert(kCheckpointTsBackupCursorErrorCode, + "backupCursorCheckpointTimestamp should be greater than or equal to " + "startMigrationDonorTimestamp", + checkpointTimestamp >= startMigrationDonorTimestamp); + + invariant(metadataInfoPtr && !*metadataInfoPtr); + (*metadataInfoPtr) = shard_merge_utils::MetadataInfo::constructMetadataInfo( + getMigrationUUID(), _client->getServerAddress(), metadata); + } else { + LOGV2_DEBUG(7339728, + 1, + "Backup cursor entry", + "migrationId"_attr = getMigrationUUID(), + "filename"_attr = doc["filename"].String(), + "backupCursorId"_attr = data.cursorId); + + invariant(metadataInfoPtr && *metadataInfoPtr); + auto docs = + std::vector<mongo::BSONObj>{(*metadataInfoPtr)->toBSON(doc).getOwned()}; + + // Disabling internal document validation because the fetcher batch size + // can exceed the max data size limit BSONObjMaxUserSize with the + // additional fields we add to documents. + DisableDocumentValidation documentValidationDisabler( + opCtx, DocumentValidationSettings::kDisableInternalValidation); + + write_ops::InsertCommandRequest insertOp( + shard_merge_utils::getDonatedFilesNs(getMigrationUUID())); + insertOp.setDocuments(std::move(docs)); + insertOp.setWriteCommandRequestBase([] { + write_ops::WriteCommandRequestBase wcb; + wcb.setOrdered(true); + return wcb; + }()); + + auto writeResult = write_ops_exec::performInserts(opCtx, insertOp); + invariant(!writeResult.results.empty()); + // Writes are ordered, check only the last writeOp result. + uassertStatusOK(writeResult.results.back()); + } + } + + *fetchStatus = Status::OK(); + if (!getMoreBob || data.documents.empty()) { + // Exit fetcher but keep the backupCursor alive to prevent WT on Donor from + // modifying file bytes. backupCursor can be closed after all Recipient nodes + // have copied files from Donor primary. + *nextAction = Fetcher::NextAction::kExitAndKeepCursorAlive; + return; + } + + getMoreBob->append("getMore", data.cursorId); + getMoreBob->append("collection", data.nss.coll()); + } catch (DBException& ex) { + LOGV2_ERROR(7339729, + "Error fetching backup cursor entries", + "migrationId"_attr = getMigrationUUID(), + "error"_attr = ex.toString()); + *fetchStatus = ex.toStatus(); + } + }; + + _donorFilenameBackupCursorFileFetcher = std::make_unique<Fetcher>( + _backupCursorExecutor.get(), + _client->getServerHostAndPort(), + DatabaseName::kAdmin.toString(), + aggregateCommandRequestObj, + fetcherCallback, + ReadPreferenceSetting(ReadPreference::PrimaryPreferred).toContainingBSON(), + executor::RemoteCommandRequest::kNoTimeout, /* aggregateTimeout */ + executor::RemoteCommandRequest::kNoTimeout, /* getMoreNetworkTimeout */ + RemoteCommandRetryScheduler::makeRetryPolicy<ErrorCategory::RetriableError>( + kBackupCursorFileFetcherRetryAttempts, executor::RemoteCommandRequest::kNoTimeout), + transport::kGlobalSSLMode); + + uassertStatusOK(_donorFilenameBackupCursorFileFetcher->schedule()); + + return _donorFilenameBackupCursorFileFetcher->onCompletion() + .thenRunOn(**_scopedExecutor) + .then([fetchStatus, uniqueMetadataInfo = std::move(uniqueMetadataInfo)] { + if (!*fetchStatus) { + // the callback was never invoked + uasserted(7339781, "Internal error running cursor callback in command"); + } + + uassertStatusOK(fetchStatus->get()); + }) + .semi(); +} + +StatusWith<executor::TaskExecutor::CallbackHandle> +ShardMergeRecipientService::Instance::_scheduleKillBackupCursorWithLock( + WithLock lk, std::shared_ptr<executor::TaskExecutor> executor) { + auto& donorBackupCursorInfo = _getDonorBackupCursorInfo(lk); + executor::RemoteCommandRequest killCursorsRequest( + _client->getServerHostAndPort(), + donorBackupCursorInfo.nss.db().toString(), + BSON("killCursors" << donorBackupCursorInfo.nss.coll().toString() << "cursors" + << BSON_ARRAY(donorBackupCursorInfo.cursorId)), + nullptr); + killCursorsRequest.sslMode = _donorUri.getSSLMode(); + + return executor->scheduleRemoteCommand( + killCursorsRequest, [](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { + if (!args.response.isOK()) { + LOGV2_WARNING(7339730, + "killCursors command task failed", + "error"_attr = redact(args.response.status)); + return; + } + auto status = getStatusFromCommandResult(args.response.data); + if (status.isOK()) { + LOGV2_INFO(7339731, "Killed backup cursor"); + } else { + LOGV2_WARNING(7339732, "killCursors command failed", "error"_attr = redact(status)); + } + }); +} + +SemiFuture<void> ShardMergeRecipientService::Instance::_openBackupCursorWithRetry( + const CancellationToken& token) { + return AsyncTry([this, self = shared_from_this(), token] { return _openBackupCursor(token); }) + .until([this, self = shared_from_this()](Status status) { + if (status == ErrorCodes::BackupCursorOpenConflictWithCheckpoint) { + LOGV2_INFO(7339733, + "Retrying backup cursor creation after transient error", + "migrationId"_attr = getMigrationUUID(), + "status"_attr = status); + + return false; + } else if (status.code() == kCheckpointTsBackupCursorErrorCode || + status.code() == kCloseCursorBeforeOpenErrorCode) { + LOGV2_INFO(7339734, + "Closing backup cursor and retrying after getting retryable error", + "migrationId"_attr = getMigrationUUID(), + "status"_attr = status); + + stdx::lock_guard lk(_mutex); + const auto scheduleResult = + _scheduleKillBackupCursorWithLock(lk, _backupCursorExecutor); + uassertStatusOK(scheduleResult); + + return false; + } + + return true; + }) + .on(**_scopedExecutor, token) + .semi(); +} + +const BackupCursorInfo& ShardMergeRecipientService::Instance::_getDonorBackupCursorInfo( + WithLock) const { + stdx::lock_guard<TenantMigrationSharedData> sharedDatalk(*_sharedData); + return _sharedData->getDonorBackupCursorInfo(sharedDatalk); +} + +void ShardMergeRecipientService::Instance::_keepBackupCursorAlive(const CancellationToken& token) { + LOGV2_DEBUG(7339735, + 1, + "Starting periodic 'getMore' requests to keep " + "backup cursor alive.", + "migrationId"_attr = getMigrationUUID()); + + stdx::lock_guard lk(_mutex); + _backupCursorKeepAliveCancellation = CancellationSource(token); + auto& donorBackupCursorInfo = _getDonorBackupCursorInfo(lk); + _backupCursorKeepAliveFuture = + shard_merge_utils::keepBackupCursorAlive(_backupCursorKeepAliveCancellation, + _backupCursorExecutor, + _client->getServerHostAndPort(), + donorBackupCursorInfo.cursorId, + donorBackupCursorInfo.nss); +} + +SemiFuture<void> ShardMergeRecipientService::Instance::_enterLearnedFilenamesState() { + stdx::lock_guard lk(_mutex); + _stateDoc.setState(ShardMergeRecipientStateEnum::kLearnedFilenames); + return _updateStateDocForMajority(lk); +} + +boost::optional<OpTime> ShardMergeRecipientService::Instance::_getOldestActiveTransactionAt( + Timestamp ReadTimestamp) { + const auto preparedState = DurableTxnState_serializer(DurableTxnStateEnum::kPrepared); + const auto inProgressState = DurableTxnState_serializer(DurableTxnStateEnum::kInProgress); + auto transactionTableOpTimeFields = BSON(SessionTxnRecord::kStartOpTimeFieldName << 1); + + FindCommandRequest findCmd{NamespaceString::kSessionTransactionsTableNamespace}; + findCmd.setFilter(BSON("state" << BSON("$in" << BSON_ARRAY(preparedState << inProgressState)))); + findCmd.setSort(BSON(SessionTxnRecord::kStartOpTimeFieldName.toString() << 1)); + findCmd.setProjection(transactionTableOpTimeFields); + // Generally, snapshot reads on config.transactions table have some risks. + // But for this case, it is safe because we query only for multi-statement transaction entries + // (and "state" field is set only for multi-statement transaction transactions) and writes to + // config.transactions collection aren't coalesced for multi-statement transactions during + // secondary oplog application, unlike the retryable writes where updates to config.transactions + // collection are coalesced on secondaries. + findCmd.setReadConcern(BSON(repl::ReadConcernArgs::kLevelFieldName + << repl::readConcernLevels::kSnapshotName + << repl::ReadConcernArgs::kAtClusterTimeFieldName << ReadTimestamp + << repl::ReadConcernArgs::kAllowTransactionTableSnapshot << true)); + + auto earliestOpenTransactionBson = _client->findOne(std::move(findCmd), _readPreference); + LOGV2_DEBUG(7339736, + 2, + "Transaction table entry for earliest transaction that was open at the read " + "concern majority optime on remote node (may be empty)", + "migrationId"_attr = getMigrationUUID(), + "earliestOpenTransaction"_attr = earliestOpenTransactionBson); + + boost::optional<OpTime> startOpTime; + if (!earliestOpenTransactionBson.isEmpty()) { + auto startOpTimeField = + earliestOpenTransactionBson[SessionTxnRecord::kStartOpTimeFieldName]; + if (startOpTimeField.isABSONObj()) { + startOpTime = OpTime::parse(startOpTimeField.Obj()); + } + } + return startOpTime; +} + +SemiFuture<void> ShardMergeRecipientService::Instance::_getStartOpTimesFromDonor() { + OpTime startApplyingDonorOpTime; + + stdx::unique_lock lk(_mutex); + + startApplyingDonorOpTime = + OpTime(_getDonorBackupCursorInfo(lk).checkpointTimestamp, OpTime::kUninitializedTerm); + + // Unlock the mutex before doing network reads + lk.unlock(); + + auto oldestActiveTxnOpTime = + _getOldestActiveTransactionAt(startApplyingDonorOpTime.getTimestamp()); + auto startFetchingDonorOpTime = + oldestActiveTxnOpTime ? oldestActiveTxnOpTime : startApplyingDonorOpTime; + + pauseAfterRetrievingLastTxnMigrationRecipientInstance.pauseWhileSet(); + + lk.lock(); + _stateDoc.setStartApplyingDonorOpTime(startApplyingDonorOpTime); + _stateDoc.setStartFetchingDonorOpTime(startFetchingDonorOpTime); + + return _updateStateDocForMajority(lk); +} + +void ShardMergeRecipientService::Instance::_processCommittedTransactionEntry(const BSONObj& entry) { + auto sessionTxnRecord = SessionTxnRecord::parse(IDLParserContext("SessionTxnRecord"), entry); + auto sessionId = sessionTxnRecord.getSessionId(); + auto txnNumber = sessionTxnRecord.getTxnNum(); + auto optTxnRetryCounter = sessionTxnRecord.getTxnRetryCounter(); + uassert(ErrorCodes::InvalidOptions, + "txnRetryCounter is only supported in sharded clusters", + !optTxnRetryCounter.has_value()); + + auto uniqueOpCtx = cc().makeOperationContext(); + auto opCtx = uniqueOpCtx.get(); + + // If the tenantMigrationInfo is set on the opCtx, we will set the + // 'fromTenantMigration' field when writing oplog entries. That field is used to help recipient + // secondaries determine if a no-op entry is related to a transaction entry. + tenantMigrationInfo(opCtx) = boost::make_optional<TenantMigrationInfo>(getMigrationUUID()); + { + auto lk = stdx::lock_guard(*opCtx->getClient()); + opCtx->setLogicalSessionId(sessionId); + opCtx->setTxnNumber(txnNumber); + opCtx->setInMultiDocumentTransaction(); + } + auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx); + auto ocs = mongoDSessionCatalog->checkOutSession(opCtx); + + LOGV2_DEBUG(7339737, + 1, + "Migration attempting to commit transaction", + "sessionId"_attr = sessionId, + "txnNumber"_attr = txnNumber, + "txnRetryCounter"_attr = optTxnRetryCounter, + "migrationId"_attr = getMigrationUUID(), + "entry"_attr = entry); + + auto txnParticipant = TransactionParticipant::get(opCtx); + uassert(7339782, + str::stream() << "Migration failed to get transaction participant for transaction " + << txnNumber << " on session " << sessionId, + txnParticipant); + + // The in-memory transaction state may have been updated past the on-disk transaction state. For + // instance, this might happen in an unprepared read-only transaction, which updates in-memory + // but not on-disk. To prevent potential errors, we use the on-disk state for the following + // transaction number checks. + txnParticipant.invalidate(opCtx); + txnParticipant.refreshFromStorageIfNeeded(opCtx); + + // If the entry's transaction number is stale/older than the current active transaction number + // on the participant, fail the migration. + uassert(ErrorCodes::TransactionTooOld, + str::stream() << "Migration cannot apply transaction with tranaction number " + << txnNumber << " and transaction retry counter " << optTxnRetryCounter + << " on session " << sessionId + << " because a newer transaction with txnNumberAndRetryCounter: " + << txnParticipant.getActiveTxnNumberAndRetryCounter().toBSON() + << " has already started", + txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnNumber() <= txnNumber); + if (txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnNumber() == txnNumber) { + // If the txn numbers are equal, move on to the next entry. + return; + } + + txnParticipant.beginOrContinueTransactionUnconditionally(opCtx, + {txnNumber, optTxnRetryCounter}); + + MutableOplogEntry noopEntry; + noopEntry.setOpType(repl::OpTypeEnum::kNoop); + + // Shard merge copies all tenants from the donor. This means that merge does + // not need to filter prefetched committed transactions by tenantId. As a result, setting + // a nss containing the tenantId for the no-op entry isn't necessary. + noopEntry.setNss({}); + noopEntry.setObject({}); + + noopEntry.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now()); + noopEntry.setSessionId(sessionId); + noopEntry.setTxnNumber(txnNumber); + noopEntry.getOperationSessionInfo().setTxnRetryCounter(optTxnRetryCounter); + + sessionTxnRecord.setStartOpTime(boost::none); + // Use the same wallclock time as the noop entry. + sessionTxnRecord.setLastWriteDate(noopEntry.getWallClockTime()); + + AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); + writeConflictRetry( + opCtx, "writeDonorCommittedTxnEntry", NamespaceString::kRsOplogNamespace.ns(), [&] { + WriteUnitOfWork wuow(opCtx); + + // Write the no-op entry and update 'config.transactions'. + auto opTime = repl::logOp(opCtx, &noopEntry); + sessionTxnRecord.setLastWriteOpTime(std::move(opTime)); + TransactionParticipant::get(opCtx).onWriteOpCompletedOnPrimary( + opCtx, {}, sessionTxnRecord); + + wuow.commit(); + }); + + // Invalidate in-memory state so that the next time the session is checked out, it would reload + // the transaction state from 'config.transactions'. + txnParticipant.invalidate(opCtx); + + hangAfterUpdatingTransactionEntry.execute([&](const BSONObj& data) { + LOGV2(7339707, "hangAfterUpdatingTransactionEntry failpoint enabled"); + hangAfterUpdatingTransactionEntry.pauseWhileSet(); + if (data["failAfterHanging"].trueValue()) { + // Simulate the sync source shutting down/restarting. + uasserted(ErrorCodes::ShutdownInProgress, + "Throwing error due to hangAfterUpdatingTransactionEntry failpoint"); + } + }); +} + +SemiFuture<void> +ShardMergeRecipientService::Instance::_fetchCommittedTransactionsBeforeStartOpTime() { + { + auto opCtx = cc().makeOperationContext(); + _stopOrHangOnFailPoint(&fpBeforeFetchingCommittedTransactions, opCtx.get()); + } + + if (MONGO_unlikely(skipFetchingCommittedTransactions.shouldFail())) { // Test-only. + return SemiFuture<void>::makeReady(); + } + + { + stdx::lock_guard lk(_mutex); + if (_stateDoc.getCompletedUpdatingTransactionsBeforeStartOpTime()) { + LOGV2_DEBUG( + 7339738, + 2, + "Already completed fetching committed transactions from donor, skipping stage", + "migrationId"_attr = getMigrationUUID()); + return SemiFuture<void>::makeReady(); + } + } + + std::unique_ptr<DBClientCursor> cursor; + cursor = _openCommittedTransactionsFindCursor(); + + while (cursor->more()) { + auto transactionEntry = cursor->next(); + _processCommittedTransactionEntry(transactionEntry); + + uassertStatusOK(_getInterruptStatus()); + } + + stdx::lock_guard lk(_mutex); + _stateDoc.setCompletedUpdatingTransactionsBeforeStartOpTime(true); + return _updateStateDocForMajority(lk); +} + +std::unique_ptr<DBClientCursor> +ShardMergeRecipientService::Instance::_openCommittedTransactionsFindCursor() { + Timestamp startApplyingDonorTimestamp; + { + stdx::lock_guard lk(_mutex); + invariant(_stateDoc.getStartApplyingDonorOpTime()); + startApplyingDonorTimestamp = _stateDoc.getStartApplyingDonorOpTime()->getTimestamp(); + } + + FindCommandRequest findCommandRequest{NamespaceString::kSessionTransactionsTableNamespace}; + findCommandRequest.setFilter(BSON("state" + << "committed" + << "lastWriteOpTime.ts" + << BSON("$lte" << startApplyingDonorTimestamp))); + findCommandRequest.setReadConcern( + ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern).toBSONInner()); + findCommandRequest.setHint(BSON("$natural" << 1)); + + return _client->find(std::move(findCommandRequest), _readPreference, ExhaustMode::kOn); +} + +void ShardMergeRecipientService::Instance::_createOplogBuffer(WithLock, OperationContext* opCtx) { + OplogBufferCollection::Options options; + options.peekCacheSize = static_cast<size_t>(tenantMigrationOplogBufferPeekCacheSize); + options.dropCollectionAtStartup = false; + options.dropCollectionAtShutdown = false; + options.useTemporaryCollection = false; + + auto oplogBufferNS = getOplogBufferNs(getMigrationUUID()); + if (!_donorOplogBuffer) { + + auto bufferCollection = std::make_unique<OplogBufferCollection>( + StorageInterface::get(opCtx), oplogBufferNS, options); + _donorOplogBuffer = std::move(bufferCollection); + } +} + +SemiFuture<void> +ShardMergeRecipientService::Instance::_fetchRetryableWritesOplogBeforeStartOpTime() { + _stopOrHangOnFailPoint(&fpAfterRetrievingStartOpTimesMigrationRecipientInstance); + if (MONGO_unlikely( + skipFetchingRetryableWritesEntriesBeforeStartOpTime.shouldFail())) { // Test-only. + return SemiFuture<void>::makeReady(); + } + + { + stdx::lock_guard lk(_mutex); + if (_stateDoc.getCompletedFetchingRetryableWritesBeforeStartOpTime()) { + LOGV2_DEBUG(7339739, + 2, + "Already completed fetching retryable writes oplog entries from donor, " + "skipping stage", + "migrationId"_attr = getMigrationUUID()); + return SemiFuture<void>::makeReady(); + } + } + + auto opCtx = cc().makeOperationContext(); + auto expCtx = makeExpressionContext(opCtx.get()); + // If the oplog buffer contains entries at this point, it indicates that the recipient went + // through failover before it finished writing all oplog entries to the buffer. Clear it and + // redo the work. + auto oplogBufferNS = getOplogBufferNs(getMigrationUUID()); + if (_donorOplogBuffer->getCount() > 0) { + // Ensure we are primary when trying to clear the oplog buffer since it will drop and + // re-create the collection. + auto coordinator = repl::ReplicationCoordinator::get(opCtx.get()); + Lock::GlobalLock globalLock(opCtx.get(), MODE_IX); + if (!coordinator->canAcceptWritesForDatabase(opCtx.get(), oplogBufferNS.db())) { + uassertStatusOK( + Status(ErrorCodes::NotWritablePrimary, + "Recipient node is not primary, cannot clear oplog buffer collection.")); + } + _donorOplogBuffer->clear(opCtx.get()); + } + + Timestamp startFetchingTimestamp; + { + stdx::lock_guard lk(_mutex); + invariant(_stateDoc.getStartFetchingDonorOpTime()); + startFetchingTimestamp = _stateDoc.getStartFetchingDonorOpTime().value().getTimestamp(); + } + + LOGV2_DEBUG(7339740, + 1, + "Pre-fetching retryable oplog entries before startFetchingTimstamp", + "startFetchingTimestamp"_attr = startFetchingTimestamp, + "migrationId"_attr = getMigrationUUID()); + + // Fetch the oplog chains of all retryable writes that occurred before startFetchingTimestamp. + std::vector<BSONObj> serializedPipeline; + serializedPipeline = + tenant_migration_util::createRetryableWritesOplogFetchingPipelineForAllTenants( + expCtx, startFetchingTimestamp) + ->serializeToBson(); + + + AggregateCommandRequest aggRequest(NamespaceString::kSessionTransactionsTableNamespace, + std::move(serializedPipeline)); + + // Use local read concern. This is because secondary oplog application coalesces multiple + // updates to the same config.transactions record into a single update of the most recent + // retryable write statement, and since after SERVER-47844, the committed snapshot of a + // secondary can be in the middle of batch, the combination of these two makes secondary + // majority reads on config.transactions not always reflect committed retryable writes at + // that majority commit point. So we need to do a local read to fetch the retryable writes + // so that we don't miss the config.transactions record and later do a majority read on the + // donor's last applied operationTime to make sure the fetched results are majority committed. + auto readConcernArgs = repl::ReadConcernArgs( + boost::optional<repl::ReadConcernLevel>(repl::ReadConcernLevel::kLocalReadConcern)); + aggRequest.setReadConcern(readConcernArgs.toBSONInner()); + // We must set a writeConcern on internal commands. + aggRequest.setWriteConcern(WriteConcernOptions()); + // Allow aggregation to write to temporary files in case it reaches memory restriction. + aggRequest.setAllowDiskUse(true); + + // Failpoint to set a small batch size on the aggregation request. + if (MONGO_unlikely(fpSetSmallAggregationBatchSize.shouldFail())) { + SimpleCursorOptions cursor; + cursor.setBatchSize(1); + aggRequest.setCursor(cursor); + } + + std::unique_ptr<DBClientCursor> cursor = uassertStatusOK(DBClientCursor::fromAggregationRequest( + _client.get(), std::move(aggRequest), true /* secondaryOk */, false /* useExhaust */)); + + // cursor->more() will automatically request more from the server if necessary. + while (cursor->more()) { + // Similar to the OplogFetcher, we keep track of each oplog entry to apply and the number of + // the bytes of the documents read off the network. + std::vector<BSONObj> retryableWritesEntries; + retryableWritesEntries.reserve(cursor->objsLeftInBatch()); + auto toApplyDocumentBytes = 0; + + while (cursor->moreInCurrentBatch()) { + // Gather entries from current batch. + BSONObj doc = cursor->next(); + toApplyDocumentBytes += doc.objsize(); + retryableWritesEntries.push_back(doc); + } + + if (retryableWritesEntries.size() != 0) { + // Wait for enough space. + _donorOplogBuffer->waitForSpace(opCtx.get(), toApplyDocumentBytes); + // Buffer retryable writes entries. + _donorOplogBuffer->preload( + opCtx.get(), retryableWritesEntries.begin(), retryableWritesEntries.end()); + } + + pauseAfterRetrievingRetryableWritesBatch.pauseWhileSet(); + + // In between batches, check for recipient failover. + uassertStatusOK(_getInterruptStatus()); + } + + // Do a majority read on the sync source to make sure the pre-fetch result exists on a + // majority of nodes in the set. The timestamp we wait on is the donor's last applied + // operationTime, which is guaranteed to be at batch boundary if the sync source is a + // secondary. We do not check the rollbackId - rollback would lead to the sync source + // closing connections so the migration would fail and retry. + auto operationTime = cursor->getOperationTime(); + uassert(7339783, + "Donor operationTime not available in retryable write pre-fetch result.", + operationTime); + LOGV2_DEBUG(7339741, + 1, + "Waiting for retryable write pre-fetch result to be majority committed.", + "operationTime"_attr = operationTime); + + fpBeforeWaitingForRetryableWritePreFetchMajorityCommitted.pauseWhileSet(); + + BSONObj readResult; + BSONObj cmd = ClonerUtils::buildMajorityWaitRequest(*operationTime); + _client.get()->runCommand( + DatabaseName(boost::none, "admin"), cmd, readResult, QueryOption_SecondaryOk); + uassertStatusOKWithContext( + getStatusFromCommandResult(readResult), + "Failed to wait for retryable writes pre-fetch result majority committed"); + + // Update _stateDoc to indicate that we've finished the retryable writes oplog entry fetching + // stage. + stdx::lock_guard lk(_mutex); + _stateDoc.setCompletedFetchingRetryableWritesBeforeStartOpTime(true); + return _updateStateDocForMajority(lk); +} + +void ShardMergeRecipientService::Instance::_startOplogBuffer(OperationContext* opCtx) try { + // It is illegal to start the replicated donor buffer when the node is not primary. + // So ensure we are primary before trying to startup the oplog buffer. + repl::ReplicationStateTransitionLockGuard rstl(opCtx, MODE_IX); + + auto oplogBufferNS = getOplogBufferNs(getMigrationUUID()); + if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(opCtx, + oplogBufferNS.db())) { + uassertStatusOK( + Status(ErrorCodes::NotWritablePrimary, "Recipient node is no longer a primary.")); + } + + _donorOplogBuffer->startup(opCtx); + +} catch (DBException& ex) { + ex.addContext("Failed to create oplog buffer collection."); + throw; +} + +void ShardMergeRecipientService::Instance::_startOplogFetcher() { + _stopOrHangOnFailPoint(&fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime); + + auto opCtx = cc().makeOperationContext(); + // Start the oplog buffer outside the mutex to avoid deadlock on a concurrent stepdown. + _startOplogBuffer(opCtx.get()); + + const auto donorMajorityOpTime = _getDonorMajorityOpTime(_oplogFetcherClient); + + stdx::lock_guard lk(_mutex); + + auto startFetchOpTime = _stateDoc.getStartFetchingDonorOpTime(); + invariant(startFetchOpTime && !startFetchOpTime->isNull()); + + if (donorMajorityOpTime < *startFetchOpTime) { + LOGV2_ERROR(7339742, + "Donor sync source's majority OpTime is behind our startFetchOpTime", + "migrationId"_attr = getMigrationUUID(), + "donorMajorityOpTime"_attr = donorMajorityOpTime, + "startFetchOpTime"_attr = *startFetchOpTime); + const auto now = _serviceContext->getFastClockSource()->now(); + _excludeDonorHost(lk, + _oplogFetcherClient->getServerHostAndPort(), + now + Milliseconds(tenantMigrationExcludeDonorHostTimeoutMS)); + uasserted(ErrorCodes::InvalidSyncSource, + "Donor sync source's majority OpTime is behind our startFetchOpTime, retrying " + "sync source selection"); + } + + OplogFetcher::Config oplogFetcherConfig( + *startFetchOpTime, + _oplogFetcherClient->getServerHostAndPort(), + // The config is only used for setting the awaitData timeout; the defaults are fine. + ReplSetConfig::parse(BSON("_id" + << "dummy" + << "version" << 1 << "members" << BSONArray(BSONObj()))), + // We do not need to check the rollback ID. + ReplicationProcess::kUninitializedRollbackId, + tenantMigrationOplogFetcherBatchSize, + OplogFetcher::RequireFresherSyncSource::kDontRequireFresherSyncSource, + true /* forTenantMigration */); + + oplogFetcherConfig.queryReadConcern = + ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern); + oplogFetcherConfig.name = "TenantOplogFetcher_" + getMigrationUUID().toString(); + oplogFetcherConfig.startingPoint = OplogFetcher::StartingPoint::kEnqueueFirstDoc; + + _dataReplicatorExternalState = std::make_unique<DataReplicatorExternalStateTenantMigration>(); + + // Starting oplog fetcher after migration interrupt would cause the fetcher to fail + // due to closed _oplogFetcherClient connection. + _donorOplogFetcher = (*_createOplogFetcherFn)( + (**_scopedExecutor).get(), + std::make_unique<OplogFetcherRestartDecisionTenantMigration>(), + _dataReplicatorExternalState.get(), + [this, self = shared_from_this()](OplogFetcher::Documents::const_iterator first, + OplogFetcher::Documents::const_iterator last, + const OplogFetcher::DocumentsInfo& info) { + return _enqueueDocuments(first, last, info); + }, + [this, self = shared_from_this()](const Status& s, int rbid) { _oplogFetcherCallback(s); }, + std::move(oplogFetcherConfig)); + _donorOplogFetcher->setConnection(std::move(_oplogFetcherClient)); + uassertStatusOK(_donorOplogFetcher->startup()); +} + +Status ShardMergeRecipientService::Instance::_enqueueDocuments( + OplogFetcher::Documents::const_iterator begin, + OplogFetcher::Documents::const_iterator end, + const OplogFetcher::DocumentsInfo& info) { + + invariant(_donorOplogBuffer); + + if (info.toApplyDocumentCount != 0) { + auto opCtx = cc().makeOperationContext(); + // Wait for enough space. + _donorOplogBuffer->waitForSpace(opCtx.get(), info.toApplyDocumentBytes); + // Buffer docs for later application. + _donorOplogBuffer->push(opCtx.get(), begin, end); + } + + return Status::OK(); +} + +void ShardMergeRecipientService::Instance::_oplogFetcherCallback(Status oplogFetcherStatus) { + // The oplog fetcher is normally canceled when migration is done; any other error + // indicates failure. + if (oplogFetcherStatus.isOK()) { + // Oplog fetcher status of "OK" means the stopReplProducer failpoint is set. Migration + // cannot continue in this state so force a failure. + LOGV2_ERROR( + 7339743, + "Recipient migration instance oplog fetcher stopped due to stopReplProducer failpoint", + "migrationId"_attr = getMigrationUUID()); + interruptConditionally( + {ErrorCodes::Error(7339793), + "Recipient migration instance oplog fetcher stopped due to stopReplProducer " + "failpoint"}); + } else if (oplogFetcherStatus.code() != ErrorCodes::CallbackCanceled) { + LOGV2_ERROR(7339744, + "Recipient migration instance oplog fetcher failed", + "migrationId"_attr = getMigrationUUID(), + "error"_attr = oplogFetcherStatus); + if (isRetriableOplogFetcherError(oplogFetcherStatus)) { + LOGV2_DEBUG(7339745, + 1, + "Recipient migration instance oplog fetcher received retriable error, " + "excluding donor host as sync source and retrying", + "migrationId"_attr = getMigrationUUID(), + "error"_attr = oplogFetcherStatus); + + stdx::lock_guard lk(_mutex); + const auto now = _serviceContext->getFastClockSource()->now(); + _excludeDonorHost(lk, + _client->getServerHostAndPort(), + now + Milliseconds(tenantMigrationExcludeDonorHostTimeoutMS)); + } + interruptConditionally(oplogFetcherStatus); + } +} + +void ShardMergeRecipientService::Instance::_stopOrHangOnFailPoint(FailPoint* fp, + OperationContext* opCtx) { + auto shouldHang = false; + fp->executeIf( + [&](const BSONObj& data) { + LOGV2(7339708, + "Shard merge recipient instance: failpoint enabled", + "migrationId"_attr = getMigrationUUID(), + "name"_attr = fp->getName(), + "args"_attr = data); + if (data["action"].str() == "hang") { + // fp is locked. If we call pauseWhileSet here, another thread can't disable fp. + shouldHang = true; + } else { + uasserted(data["stopErrorCode"].numberInt(), + "Skipping remaining processing due to fail point"); + } + }, + [&](const BSONObj& data) { + auto action = data["action"].str(); + return (action == "hang" || action == "stop"); + }); + + if (shouldHang) { + if (opCtx) { + fp->pauseWhileSet(opCtx); + } else { + fp->pauseWhileSet(); + } + } +} + +SemiFuture<void> +ShardMergeRecipientService::Instance::_advanceMajorityCommitTsToBkpCursorCheckpointTs( + const CancellationToken& token) { + auto uniqueOpCtx = cc().makeOperationContext(); + auto opCtx = uniqueOpCtx.get(); + + Timestamp donorBkpCursorCkptTs; + { + stdx::lock_guard lk(_mutex); + donorBkpCursorCkptTs = _getDonorBackupCursorInfo(lk).checkpointTimestamp; + } + + if (opCtx->getServiceContext()->getStorageEngine()->getStableTimestamp() >= + donorBkpCursorCkptTs) { + return SemiFuture<void>::makeReady(); + } + + LOGV2( + 7339709, + "Advancing recipient's majority commit timestamp to be at least the donor's backup cursor " + "checkpoint timestamp", + "migrationId"_attr = getMigrationUUID(), + "donorBackupCursorCheckpointTimestamp"_attr = donorBkpCursorCkptTs); + + _stopOrHangOnFailPoint(&fpBeforeAdvancingStableTimestamp, opCtx); + + // Advance the cluster time to the donorBkpCursorCkptTs so that we ensure we + // write the no-op entry below at ts > donorBkpCursorCkptTs. + VectorClockMutable::get(_serviceContext)->tickClusterTimeTo(LogicalTime(donorBkpCursorCkptTs)); + + writeConflictRetry(opCtx, + "mergeRecipientWriteNoopToAdvanceStableTimestamp", + NamespaceString::kRsOplogNamespace.ns(), + [&] { + if (token.isCanceled()) { + return; + } + AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); + WriteUnitOfWork wuow(opCtx); + const std::string msg = str::stream() + << "Merge recipient advancing stable timestamp"; + opCtx->getClient()->getServiceContext()->getOpObserver()->onOpMessage( + opCtx, BSON("msg" << msg)); + wuow.commit(); + }); + + // Get the timestamp of the no-op. This will have ts > donorBkpCursorCkptTs. + auto noOpTs = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + return WaitForMajorityService::get(opCtx->getServiceContext()).waitUntilMajority(noOpTs, token); +} + +SemiFuture<void> ShardMergeRecipientService::Instance::_durablyPersistConsistentState() { + _stopOrHangOnFailPoint(&fpBeforeMarkingCloneSuccess); + + stdx::lock_guard lk(_mutex); + _stateDoc.setCloneFinishedRecipientOpTime( + repl::ReplicationCoordinator::get(cc().getServiceContext())->getMyLastAppliedOpTime()); + // Mark the migration has reached consistent state. + _stateDoc.setState(ShardMergeRecipientStateEnum::kConsistent); + return _updateStateDocForMajority(lk); +} + +SemiFuture<void> ShardMergeRecipientService::Instance::_enterConsistentState() { + return _durablyPersistConsistentState() + .thenRunOn(**_scopedExecutor) + .then([this, self = shared_from_this()]() { + _stopOrHangOnFailPoint(&fpBeforeFulfillingDataConsistentPromise); + stdx::lock_guard lk(_mutex); + + auto donorConsistentOpTime = _stateDoc.getStartApplyingDonorOpTime(); + invariant(donorConsistentOpTime && !donorConsistentOpTime->isNull()); + + LOGV2_DEBUG(7339746, + 1, + "Recipient migration instance is in consistent state", + "migrationId"_attr = getMigrationUUID(), + "donorConsistentOpTime"_attr = *donorConsistentOpTime); + setPromiseValueIfNotReady(lk, _dataConsistentPromise, *donorConsistentOpTime); + }) + .semi(); +} + +void ShardMergeRecipientService::Instance::onMemberImportedFiles( + const HostAndPort& host, bool success, const boost::optional<StringData>& reason) { + stdx::lock_guard lk(_mutex); + if (!_waitingForMembersToImportFiles) { + LOGV2_WARNING(7339747, + "Ignoring delayed recipientVoteImportedFiles", + "host"_attr = host.toString(), + "migrationId"_attr = _migrationUuid); + return; + } + + auto state = _stateDoc.getState(); + uassert(7339785, + "The migration is at the wrong stage for recipientVoteImportedFiles: {}"_format( + ShardMergeRecipientState_serializer(state)), + state == ShardMergeRecipientStateEnum::kLearnedFilenames); + + if (!success) { + _importedFilesPromise.setError( + {ErrorCodes::OperationFailed, + "Migration failed on {}, error: {}"_format(host, reason.value_or("null"))}); + _waitingForMembersToImportFiles = false; + return; + } + + _membersWhoHaveImportedFiles.insert(host); + // Not reconfig-safe, we must not do a reconfig concurrent with a migration. + if (static_cast<int>(_membersWhoHaveImportedFiles.size()) == + repl::ReplicationCoordinator::get(_serviceContext) + ->getConfig() + .getNumDataBearingMembers()) { + LOGV2_INFO(7339748, + "All members finished importing donated files", + "migrationId"_attr = _migrationUuid); + _importedFilesPromise.emplaceValue(); + _waitingForMembersToImportFiles = false; + } +} + +SemiFuture<void> ShardMergeRecipientService::Instance::_markStateDocAsGarbageCollectable() { + _stopOrHangOnFailPoint(&fpBeforeMarkingStateDocAsGarbageCollectable); + + stdx::lock_guard lk(_mutex); + if (_stateDoc.getExpireAt()) { + // Nothing to do if the state doc already has the expireAt set. + return SemiFuture<void>::makeReady(); + } + + _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() + + Milliseconds{repl::tenantMigrationGarbageCollectionDelayMS.load()}); + + return _updateStateDocForMajority(lk); +} + +Status ShardMergeRecipientService::Instance::_getInterruptStatus() const { + if (auto future = _interruptPromise.getFuture(); future.isReady()) { + return future.getNoThrow(); + } + return Status::OK(); +} + +void ShardMergeRecipientService::Instance::_cancelRemainingWork(WithLock lk, Status status) { + setPromiseErrorifNotReady(lk, _interruptPromise, status); + + if (_client) { + _client->shutdownAndDisallowReconnect(); + } + + if (_oplogFetcherClient) { + // Closing this connection will be cause tenant oplog fetcher to fail. + _oplogFetcherClient->shutdownAndDisallowReconnect(); + } + + shutdownTarget(lk, _donorFilenameBackupCursorFileFetcher); + shutdownTarget(lk, _tenantOplogApplier); +} + +void ShardMergeRecipientService::Instance::interrupt(Status status) { + stdx::lock_guard lk(_mutex); + setPromiseErrorifNotReady(lk, _receivedRecipientForgetMigrationPromise, status); + _cancelRemainingWork(lk, status); +} + +void ShardMergeRecipientService::Instance::interruptConditionally(Status status) { + stdx::lock_guard lk(_mutex); + _cancelRemainingWork(lk, status); +} + +void ShardMergeRecipientService::Instance::onReceiveRecipientForgetMigration( + OperationContext* opCtx, const MigrationDecisionEnum& decision) { + LOGV2(7339710, + "Forgetting migration due to recipientForgetMigration command", + "migrationId"_attr = getMigrationUUID()); + + stdx::lock_guard lk(_mutex); + setPromiseValueIfNotReady(lk, _receivedRecipientForgetMigrationPromise, decision); + _cancelRemainingWork(lk, + Status(ErrorCodes::TenantMigrationForgotten, + str::stream() << "recipientForgetMigration received for migration " + << getMigrationUUID())); +} + +void ShardMergeRecipientService::Instance::_cleanupOnMigrationCompletion(Status status) { + auto opCtx = cc().makeOperationContext(); + + std::unique_ptr<OplogFetcher> savedDonorOplogFetcher; + std::shared_ptr<TenantOplogApplier> savedTenantOplogApplier; + std::unique_ptr<ThreadPool> savedWriterPool; + std::unique_ptr<Fetcher> savedDonorFilenameBackupCursorFileFetcher; + { + stdx::lock_guard lk(_mutex); + _cancelRemainingWork(lk, status); + + _backupCursorKeepAliveCancellation = {}; + _backupCursorKeepAliveFuture = boost::none; + + shutdownTarget(lk, _donorOplogFetcher); + shutdownTargetWithOpCtx(lk, _donorOplogBuffer, opCtx.get()); + + _donorReplicaSetMonitor = nullptr; + + invariant(!status.isOK()); + setPromiseErrorifNotReady(lk, _dataConsistentPromise, status); + setPromiseErrorifNotReady(lk, _migrationCompletionPromise, status); + + _oplogApplierReady = false; + _oplogApplierReadyCondVar.notify_all(); + + // Save them to join() with it outside of _mutex. + using std::swap; + swap(savedDonorOplogFetcher, _donorOplogFetcher); + swap(savedTenantOplogApplier, _tenantOplogApplier); + swap(savedWriterPool, _writerPool); + swap(savedDonorFilenameBackupCursorFileFetcher, _donorFilenameBackupCursorFileFetcher); + } + + // Perform join outside the lock to avoid deadlocks. + if (savedDonorFilenameBackupCursorFileFetcher) { + invariantStatusOK( + savedDonorFilenameBackupCursorFileFetcher->join(Interruptible::notInterruptible())); + } + joinTarget(savedDonorOplogFetcher); + joinTarget(savedTenantOplogApplier); + if (savedWriterPool) { + savedWriterPool->shutdown(); + savedWriterPool->join(); + } +} + +SemiFuture<void> ShardMergeRecipientService::Instance::_updateStateDocForMajority( + WithLock lk, const RegisterChangeCbk& registerChange) { + return _writeStateDocForMajority(lk, OpType::kUpdate, registerChange); +} + +SemiFuture<void> ShardMergeRecipientService::Instance::_insertStateDocForMajority( + WithLock lk, const RegisterChangeCbk& registerChange) { + return _writeStateDocForMajority(lk, OpType::kInsert, registerChange); +} + +SemiFuture<void> ShardMergeRecipientService::Instance::_writeStateDocForMajority( + WithLock, OpType opType, const RegisterChangeCbk& registerChange) { + return ExecutorFuture(**_scopedExecutor) + .then([this, self = shared_from_this(), stateDoc = _stateDoc, opType, registerChange] { + auto opCtx = cc().makeOperationContext(); + _writeStateDoc(opCtx.get(), stateDoc, opType, registerChange); + auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + return WaitForMajorityService::get(opCtx->getServiceContext()) + .waitUntilMajority(writeOpTime, CancellationToken::uncancelable()); + }) + .semi(); +} + +void ShardMergeRecipientService::Instance::_writeStateDoc( + OperationContext* opCtx, + const ShardMergeRecipientDocument& stateDoc, + OpType opType, + const RegisterChangeCbk& registerChange) { + const auto& nss = NamespaceString::kShardMergeRecipientsNamespace; + AutoGetCollection collection(opCtx, nss, MODE_IX); + + uassert( + ErrorCodes::NamespaceNotFound, str::stream() << nss.ns() << " does not exist", collection); + + writeConflictRetry(opCtx, "writeShardMergeRecipientStateDoc", nss.ns(), [&]() { + WriteUnitOfWork wunit(opCtx); + + if (registerChange) + registerChange(opCtx); + + const auto filter = + BSON(TenantMigrationRecipientDocument::kIdFieldName << stateDoc.getId()); + auto updateResult = Helpers::upsert(opCtx, + nss, + filter, + stateDoc.toBSON(), + /*fromMigrate=*/false); + switch (opType) { + case OpType::kInsert: + uassert(ErrorCodes::ConflictingOperationInProgress, + str::stream() + << "Failed to insert the shard merge recipient state doc: " + << tenant_migration_util::redactStateDoc(stateDoc.toBSON()) + << "; Found active migration for migrationId: " << stateDoc.getId(), + !updateResult.upsertedId.isEmpty()); + break; + case OpType::kUpdate: + // Intentionally not checking `updateResult.numDocsModified` to handle no-op + // updates. + uassert(ErrorCodes::NoSuchKey, + str::stream() + << "Failed to update shard merge recipient state document due to " + "missing state document for migrationId: " + << stateDoc.getId(), + updateResult.numMatched); + break; + default: + MONGO_UNREACHABLE + } + + wunit.commit(); + }); +} + +void ShardMergeRecipientService::Instance::_assertIfMigrationIsSafeToRunWithCurrentFcv() { + //(Generic FCV reference): This FCV check should exist across LTS binary versions. + auto recipientFCV = serverGlobalParams.featureCompatibility.getVersion(); + if (serverGlobalParams.featureCompatibility.isUpgradingOrDowngrading(recipientFCV)) { + LOGV2(7339711, "Must abort shard merge as recipient is upgrading or downgrading"); + uasserted(ErrorCodes::TenantMigrationAborted, + "Can't run shard merge when FCV is downgrading or upgrading"); + } + + _stopOrHangOnFailPoint(&fpAfterRecordingRecipientPrimaryStartingFCV); + if (skipComparingRecipientAndDonorFCV.shouldFail()) { // Test-only. + return; + } + + FindCommandRequest findCmd{NamespaceString::kServerConfigurationNamespace}; + findCmd.setFilter(BSON("_id" << multiversion::kParameterName)); + findCmd.setReadConcern(ReadConcernArgs(ReadConcernLevel::kMajorityReadConcern).toBSONInner()); + auto donorFCVbson = _client->findOne(std::move(findCmd), + ReadPreferenceSetting{ReadPreference::SecondaryPreferred}); + + uassert(7339755, "FCV on donor not set", !donorFCVbson.isEmpty()); + + auto swDonorFCV = FeatureCompatibilityVersionParser::parse(donorFCVbson); + uassertStatusOK(swDonorFCV.getStatus()); + + auto donorFCV = swDonorFCV.getValue(); + if (donorFCV != recipientFCV) { + LOGV2_ERROR(7339749, + "Donor and recipient FCV mismatch", + "migrationId"_attr = getMigrationUUID(), + "donorConnString"_attr = _donorConnectionString, + "donorFCV"_attr = donorFCV, + "recipientFCV"_attr = recipientFCV); + uasserted(7339756, "Mismatch between donor and recipient FCVs"); + } + + _stopOrHangOnFailPoint(&fpAfterComparingRecipientAndDonorFCV); +} + +void ShardMergeRecipientService::Instance::_startOplogApplier() { + _stopOrHangOnFailPoint(&fpAfterFetchingCommittedTransactions); + + stdx::unique_lock lk(_mutex); + // Don't start the tenant oplog applier if the migration is interrupted. + uassertStatusOK(_getInterruptStatus()); + + const auto& startApplyingDonorOpTime = _stateDoc.getStartApplyingDonorOpTime(); + invariant(startApplyingDonorOpTime); + const auto& cloneFinishedRecipientOpTime = _stateDoc.getCloneFinishedRecipientOpTime(); + invariant(cloneFinishedRecipientOpTime); + + _tenantOplogApplier = std::make_shared<TenantOplogApplier>(_migrationUuid, + MigrationProtocolEnum::kShardMerge, + boost::none, + *startApplyingDonorOpTime, + _donorOplogBuffer.get(), + **_scopedExecutor, + _writerPool.get()); + _tenantOplogApplier->setCloneFinishedRecipientOpTime(*cloneFinishedRecipientOpTime); + + LOGV2_DEBUG(7339750, + 1, + "Recipient migration instance starting oplog applier", + "migrationId"_attr = getMigrationUUID(), + "startApplyingAfterDonorOpTime"_attr = + _tenantOplogApplier->getStartApplyingAfterOpTime()); + + uassertStatusOK(_tenantOplogApplier->startup()); + _oplogApplierReady = true; + _oplogApplierReadyCondVar.notify_all(); + + lk.unlock(); + _stopOrHangOnFailPoint(&fpAfterStartingOplogApplierMigrationRecipientInstance); +} + +void ShardMergeRecipientService::Instance::_setup(ConnectionPair connectionPair) { + auto uniqueOpCtx = cc().makeOperationContext(); + auto opCtx = uniqueOpCtx.get(); + + stdx::lock_guard lk(_mutex); + // Do not set the internal states if the migration is already interrupted. + uassertStatusOK(_getInterruptStatus()); + + _client = std::move(connectionPair.first); + _oplogFetcherClient = std::move(connectionPair.second); + + _writerPool = makeTenantMigrationWriterPool(); + + _sharedData = std::make_unique<TenantMigrationSharedData>(_serviceContext->getFastClockSource(), + getMigrationUUID()); + + _createOplogBuffer(lk, opCtx); +} + +void ShardMergeRecipientService::Instance::_fetchAndStoreDonorClusterTimeKeyDocs( + const CancellationToken& token) { + std::vector<ExternalKeysCollectionDocument> keyDocs; + FindCommandRequest findRequest{NamespaceString::kKeysCollectionNamespace}; + auto cursor = _client->find(std::move(findRequest), _readPreference); + while (cursor->more()) { + const auto doc = cursor->nextSafe().getOwned(); + keyDocs.push_back( + tenant_migration_util::makeExternalClusterTimeKeyDoc(_migrationUuid, doc)); + } + + tenant_migration_util::storeExternalClusterTimeKeyDocs(std::move(keyDocs)); +} + +bool ShardMergeRecipientService::Instance::_isCommitOrAbortState(WithLock) const { + auto state = _stateDoc.getState(); + return state == ShardMergeRecipientStateEnum::kAborted || + state == ShardMergeRecipientStateEnum::kCommitted; +} + +SemiFuture<void> ShardMergeRecipientService::Instance::_prepareForMigration( + const CancellationToken& token) { + _stopOrHangOnFailPoint(&fpAfterPersistingTenantMigrationRecipientInstanceStateDoc); + + return _createAndConnectClients() + .thenRunOn(**_scopedExecutor) + .then([this, self = shared_from_this(), token](ConnectionPair connectionPair) { + _stopOrHangOnFailPoint(&fpAfterConnectingTenantMigrationRecipientInstance); + _setup(std::move(connectionPair)); + + _stopOrHangOnFailPoint(&fpBeforeFetchingDonorClusterTimeKeys); + _fetchAndStoreDonorClusterTimeKeyDocs(token); + }) + .semi(); +} + +SemiFuture<void> ShardMergeRecipientService::Instance::_waitForAllNodesToFinishImport() { + _stopOrHangOnFailPoint(&fpAfterStartingOplogFetcherMigrationRecipientInstance); + LOGV2_INFO(7339751, "Waiting for all nodes to call recipientVoteImportedFiles"); + return _importedFilesPromise.getFuture().semi(); +} + +SemiFuture<void> ShardMergeRecipientService::Instance::_startMigrationIfSafeToRunwithCurrentFCV( + const CancellationToken& token) { + _assertIfMigrationIsSafeToRunWithCurrentFcv(); + return _openBackupCursorWithRetry(token) + .thenRunOn(**_scopedExecutor) + .then([this, self = shared_from_this(), token] { _keepBackupCursorAlive(token); }) + .then([this, self = shared_from_this(), token] { + return _advanceMajorityCommitTsToBkpCursorCheckpointTs(token); + }) + .then([this, self = shared_from_this()] { return _enterLearnedFilenamesState(); }) + .then([this, self = shared_from_this()]() { return _getStartOpTimesFromDonor(); }) + .then([this, self = shared_from_this()] { + return _fetchRetryableWritesOplogBeforeStartOpTime(); + }) + .then([this, self = shared_from_this()] { _startOplogFetcher(); }) + .then([this, self = shared_from_this()] { return _waitForAllNodesToFinishImport(); }) + .then([this, self = shared_from_this()] { return _killBackupCursor(); }) + .then([this, self = shared_from_this()] { return _enterConsistentState(); }) + .then([this, self = shared_from_this()] { + return _fetchCommittedTransactionsBeforeStartOpTime(); + }) + .then([this, self = shared_from_this()] { return _startOplogApplier(); }) + .semi(); +} + +SemiFuture<TenantOplogApplier::OpTimePair> +ShardMergeRecipientService::Instance::_waitForMigrationToComplete() { + _stopOrHangOnFailPoint(&fpAfterDataConsistentMigrationRecipientInstance); + + stdx::lock_guard lk(_mutex); + // wait for oplog applier to complete/stop. + // The oplog applier does not exit normally; it must be shut down externally, + // e.g. by recipientForgetMigration. + return _tenantOplogApplier->getNotificationForOpTime(OpTime::max()); +} + +void ShardMergeRecipientService::Instance::_dropTempCollections() { + _stopOrHangOnFailPoint(&fpBeforeDroppingTempCollections); + + auto opCtx = cc().makeOperationContext(); + auto storageInterface = StorageInterface::get(opCtx.get()); + + // The donated files and oplog buffer collections can be safely dropped at this + // point. In case either collection does not exist, dropping will be a no-op. + // It isn't necessary that a given drop is majority-committed. A new primary will + // attempt to drop the collection anyway. + uassertStatusOK(storageInterface->dropCollection( + opCtx.get(), shard_merge_utils::getDonatedFilesNs(getMigrationUUID()))); + + uassertStatusOK( + storageInterface->dropCollection(opCtx.get(), getOplogBufferNs(getMigrationUUID()))); +} + +SemiFuture<void> ShardMergeRecipientService::Instance::_durablyPersistCommitAbortDecision( + MigrationDecisionEnum decision) { + _stopOrHangOnFailPoint(&fpAfterReceivingRecipientForgetMigration); + { + stdx::lock_guard<Latch> lk(_mutex); + if (_isCommitOrAbortState(lk)) { + return SemiFuture<void>::makeReady(); + } + } + + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + const auto& nss = NamespaceString::kShardMergeRecipientsNamespace; + + AutoGetCollection collection(opCtx, nss, MODE_IX); + uassert( + ErrorCodes::NamespaceNotFound, str::stream() << nss.ns() << " does not exist", collection); + + writeConflictRetry(opCtx, "markShardMergeStateDocGarbageCollectable", nss.ns(), [&]() { + WriteUnitOfWork wuow(opCtx); + auto oplogSlot = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0]; + const auto originalRecordId = + Helpers::findById(opCtx, collection.getCollection(), BSON("_id" << _migrationUuid)); + + auto stateDoc = [&]() { + stdx::lock_guard<Latch> lg(_mutex); + switch (decision) { + case MigrationDecisionEnum::kCommitted: + LOGV2_DEBUG(7339760, + 2, + "Marking recipient migration instance as committed ", + "migrationId"_attr = _migrationUuid); + _stateDoc.setState(ShardMergeRecipientStateEnum::kCommitted); + break; + case MigrationDecisionEnum::kAborted: + LOGV2_DEBUG(7339791, + 2, + "Marking recipient migration instance as aborted ", + "migrationId"_attr = _migrationUuid, + "abortOpTime"_attr = oplogSlot); + _stateDoc.setState(ShardMergeRecipientStateEnum::kAborted); + _stateDoc.setAbortOpTime(oplogSlot); + break; + default: + MONGO_UNREACHABLE; + } + if (originalRecordId.isNull()) { + // It's possible to get here only for following cases. + // 1) The migration was forgotten before receiving a 'recipientSyncData'. + // 2) A delayed 'recipientForgetMigration' was received after the state doc was + // deleted. + // 3) Fail to initialize the state document. + invariant(_stateDoc.getStartGarbageCollect()); + _stateDoc.setStartAt(_serviceContext->getFastClockSource()->now()); + } + return _stateDoc.toBSON(); + }(); + + if (originalRecordId.isNull()) { + uassertStatusOK(collection_internal::insertDocument( + opCtx, + *collection, + InsertStatement(kUninitializedStmtId, stateDoc, oplogSlot), + nullptr)); + + } else { + auto preImageDoc = collection->docFor(opCtx, originalRecordId); + CollectionUpdateArgs args{preImageDoc.value()}; + args.criteria = BSON("_id" << _migrationUuid); + args.oplogSlots = {oplogSlot}; + args.update = stateDoc; + + collection_internal::updateDocument(opCtx, + *collection, + originalRecordId, + preImageDoc, + stateDoc, + collection_internal::kUpdateAllIndexes, + nullptr /* OpDebug* */, + &args); + } + + wuow.commit(); + }); + + auto waitOptime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + return WaitForMajorityService::get(_serviceContext) + .waitUntilMajority(waitOptime, CancellationToken::uncancelable()); +} + +SemiFuture<void> +ShardMergeRecipientService::Instance::_waitForForgetMigrationThenMarkMigrationGarbageCollectable( + const CancellationToken& token) { + auto decision = [&]() -> boost::optional<MigrationDecisionEnum> { + { + stdx::lock_guard lk(_mutex); + if (_isCommitOrAbortState(lk)) { + return (_stateDoc.getState() == ShardMergeRecipientStateEnum::kCommitted) + ? MigrationDecisionEnum::kCommitted + : MigrationDecisionEnum::kAborted; + } + } + if (MONGO_unlikely(autoRecipientForgetMigrationAbort.shouldFail())) { + return MigrationDecisionEnum::kAborted; + } + return boost::none; + }(); + + if (decision) { + auto opCtx = cc().makeOperationContext(); + onReceiveRecipientForgetMigration(opCtx.get(), *decision); + } + + LOGV2_DEBUG(7339759, + 2, + "Waiting to receive 'recipientForgetMigration' command.", + "migrationId"_attr = _migrationUuid); + + return _receivedRecipientForgetMigrationPromise.getFuture() + .semi() + .thenRunOn(**_scopedExecutor) + .then([this, self = shared_from_this()](MigrationDecisionEnum decision) { + return _durablyPersistCommitAbortDecision(decision); + }) + .then([this, self = shared_from_this()] { _dropTempCollections(); }) + .then([this, self = shared_from_this(), token] { + // Note marking the keys as garbage collectable is not atomic with marking the + // state document garbage collectable, so an interleaved failover can lead the + // keys to be deleted before the state document has an expiration date. This is + // acceptable because the decision to forget a migration is not reversible. + return tenant_migration_util::markExternalKeysAsGarbageCollectable( + _serviceContext, + _scopedExecutor, + _recipientService->getInstanceCleanupExecutor(), + _migrationUuid, + token); + }) + .then([this, self = shared_from_this()] { return _markStateDocAsGarbageCollectable(); }) + .then([this, self = shared_from_this()] { + stdx::lock_guard lk(_mutex); + setPromiseOkifNotReady(lk, _forgetMigrationDurablePromise); + }) + .semi(); +} + +SemiFuture<void> ShardMergeRecipientService::Instance::run( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& token) noexcept { + _scopedExecutor = executor; + _backupCursorExecutor = **_scopedExecutor; + auto scopedOutstandingMigrationCounter = + TenantMigrationStatistics::get(_serviceContext)->getScopedOutstandingReceivingCount(); + + LOGV2(7339712, + "Starting shard merge recipient instance: ", + "migrationId"_attr = getMigrationUUID(), + "connectionString"_attr = _donorConnectionString, + "readPreference"_attr = _readPreference); + + pauseBeforeRunTenantMigrationRecipientInstance.pauseWhileSet(); + + return ExecutorFuture(**executor) + .then([this, self = shared_from_this()] { + pauseAfterRunTenantMigrationRecipientInstance.pauseWhileSet(); + return _initializeAndDurablyPersistStateDoc(); + }) + .then([this, self = shared_from_this(), token] { return _prepareForMigration(token); }) + .then([this, self = shared_from_this(), token] { + return _startMigrationIfSafeToRunwithCurrentFCV(token); + }) + .then([this, self = shared_from_this()] { return _waitForMigrationToComplete(); }) + .thenRunOn(_recipientService->getInstanceCleanupExecutor()) + .onCompletion([this, self = shared_from_this()]( + StatusOrStatusWith<TenantOplogApplier::OpTimePair> applierStatus) { + // Note: The tenant oplog applier does not normally stop by itself on success. It + // completes only on errors or on external interruption (e.g. by shutDown/stepDown or by + // recipientForgetMigration command). So, errored completion status doesn't always mean + // migration wasn't success. + auto status = overrideMigrationErrToInterruptStatusIfNeeded( + _migrationUuid, applierStatus.getStatus(), _interruptPromise.getFuture()); + + LOGV2(7339713, + "Shard merge recipient instance: Migration completed.", + "migrationId"_attr = getMigrationUUID(), + "completionStatus"_attr = status); + + if (MONGO_unlikely(hangBeforeTaskCompletion.shouldFail())) { + LOGV2(7339714, + "Shard merge recipient instance: hangBeforeTaskCompletion failpoint " + "enabled"); + hangBeforeTaskCompletion.pauseWhileSet(); + } + + _cleanupOnMigrationCompletion(status); + }) + .thenRunOn(**_scopedExecutor) + .then([this, self = shared_from_this(), token] { + return _waitForForgetMigrationThenMarkMigrationGarbageCollectable(token); + }) + .then([this, self = shared_from_this(), token] { + return _waitForGarbageCollectionDelayThenDeleteStateDoc(token); + }) + .thenRunOn(_recipientService->getInstanceCleanupExecutor()) + .onCompletion([this, + self = shared_from_this(), + scopedCounter{std::move(scopedOutstandingMigrationCounter)}](Status status) { + // we won't don't want the errors + // happened in the garbage collection stage to be replaced with interrupt errors due to + // on receive of 'recipientForgetMigration' command but still want to replace with + // failover/shutdown interrupt errors. + status = overrideMigrationErrToInterruptStatusIfNeeded(_migrationUuid, status); + if (status.isOK()) + return; + + LOGV2(7339715, + "Shard merge recipient instance not marked to be garbage collectable", + "migrationId"_attr = getMigrationUUID(), + "status"_attr = status); + + // We should only hit here on a stepdown or shudDown errors. + invariant(ErrorCodes::isShutdownError(status) || ErrorCodes::isNotPrimaryError(status)); + + stdx::lock_guard lk(_mutex); + setPromiseErrorifNotReady(lk, _forgetMigrationDurablePromise, status); + }) + .semi(); +} + +SemiFuture<void> ShardMergeRecipientService::Instance::_removeStateDoc( + const CancellationToken& token) { + return AsyncTry([this, self = shared_from_this()] { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + + pauseTenantMigrationRecipientBeforeDeletingStateDoc.pauseWhileSet(opCtx); + + PersistentTaskStore<ShardMergeRecipientDocument> store(_stateDocumentsNS); + store.remove( + opCtx, + BSON(ShardMergeRecipientDocument::kIdFieldName << _migrationUuid), + WriteConcernOptions(1, WriteConcernOptions::SyncMode::UNSET, Seconds(0))); + LOGV2(7339716, + "shard merge recipient state document is deleted", + "migrationId"_attr = _migrationUuid); + }) + .until([](Status status) { return status.isOK(); }) + .withBackoffBetweenIterations(kExponentialBackoff) + .on(**_scopedExecutor, token) + .semi(); +} + +SemiFuture<void> +ShardMergeRecipientService::Instance::_waitForGarbageCollectionDelayThenDeleteStateDoc( + const CancellationToken& token) { + stdx::lock_guard<Latch> lg(_mutex); + LOGV2(7339717, + "Waiting for garbage collection delay before deleting state document", + "migrationId"_attr = _migrationUuid, + "expireAt"_attr = *_stateDoc.getExpireAt()); + + return (**_scopedExecutor) + ->sleepUntil(*_stateDoc.getExpireAt(), token) + .then([this, self = shared_from_this(), token]() { + LOGV2(7339718, + "Deleting shard merge recipient state document", + "migrationId"_attr = _migrationUuid); + return _removeStateDoc(token); + }) + .semi(); +} + +const UUID& ShardMergeRecipientService::Instance::getMigrationUUID() const { + return _migrationUuid; +} + +ShardMergeRecipientDocument ShardMergeRecipientService::Instance::getStateDoc() const { + stdx::lock_guard lk(_mutex); + return _stateDoc; +} + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/shard_merge_recipient_service.h b/src/mongo/db/repl/shard_merge_recipient_service.h new file mode 100644 index 00000000000..c863e862678 --- /dev/null +++ b/src/mongo/db/repl/shard_merge_recipient_service.h @@ -0,0 +1,634 @@ +/** + * Copyright (C) 2023-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> +#include <memory> + +#include "mongo/client/fetcher.h" +#include "mongo/db/commands/tenant_migration_donor_cmds_gen.h" +#include "mongo/db/pipeline/aggregate_command_gen.h" +#include "mongo/db/repl/oplog_fetcher.h" +#include "mongo/db/repl/primary_only_service.h" +#include "mongo/db/repl/tenant_migration_shared_data.h" +#include "mongo/db/repl/tenant_migration_state_machine_gen.h" +#include "mongo/db/repl/tenant_oplog_applier.h" +#include "mongo/rpc/metadata/repl_set_metadata.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +class DBClientConnection; +class OperationContext; +class ReplicaSetMonitor; +class ServiceContext; + +namespace repl { +class OplogBufferCollection; + +/** + * ShardMergeRecipientService is a primary only service which orchestrates the + * data migration on the recipient side for shard merge protocol. + */ +class ShardMergeRecipientService final : public PrimaryOnlyService { + // Disallows copying. + ShardMergeRecipientService(const ShardMergeRecipientService&) = delete; + ShardMergeRecipientService& operator=(const ShardMergeRecipientService&) = delete; + +public: + static constexpr StringData kShardMergeRecipientServiceName = "ShardMergeRecipientService"_sd; + + explicit ShardMergeRecipientService(ServiceContext* serviceContext); + ~ShardMergeRecipientService() = default; + + StringData getServiceName() const final; + + NamespaceString getStateDocumentsNS() const final; + + ThreadPool::Limits getThreadPoolLimits() const final; + + void checkIfConflictsWithOtherInstances( + OperationContext* opCtx, + BSONObj initialStateDoc, + const std::vector<const PrimaryOnlyService::Instance*>& existingInstances) final; + + std::shared_ptr<PrimaryOnlyService::Instance> constructInstance(BSONObj initialStateDoc) final; + + /** + * Interrupts all shard merge recipient service instances. + */ + void abortAllMigrations(OperationContext* opCtx); + + class Instance final : public PrimaryOnlyService::TypedInstance<Instance> { + public: + explicit Instance(ServiceContext* serviceContext, + const ShardMergeRecipientService* recipientService, + BSONObj stateDoc); + + SemiFuture<void> run(std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& token) noexcept final; + + /** + * Unconditional migration interrupt called on node's stepdown/shutdown event. + * Make the instance to not wait for `recipientForgetMigration` command. + */ + void interrupt(Status status) override; + + /** + * Conditional migration interrupt called on fcv change or due to oplog fetcher error. + * Make the instance to wait for `recipientForgetMigration` command. + */ + void interruptConditionally(Status status); + + /** + * Interrupts the migration for garbage collection. + */ + void onReceiveRecipientForgetMigration(OperationContext* opCtx, + const MigrationDecisionEnum& decision); + + /** + * Returns a Future that will be resolved when migration is completed. + */ + SharedSemiFuture<void> getMigrationCompletionFuture() const { + return _migrationCompletionPromise.getFuture(); + } + + /** + * Returns a Future that will be resolved when the instance has been durably marked garbage + * collectable. + */ + SharedSemiFuture<void> getForgetMigrationDurableFuture() const { + return _forgetMigrationDurablePromise.getFuture(); + } + + /** + * Returns the instance id. + */ + const UUID& getMigrationUUID() const; + + /** + * Returns the instance state document. + */ + ShardMergeRecipientDocument getStateDoc() const; + + boost::optional<BSONObj> reportForCurrentOp( + MongoProcessInterface::CurrentOpConnectionsMode connMode, + MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept final; + + void checkIfOptionsConflict(const BSONObj& stateDoc) const final; + + /** + * Blocks the thread until the migration reaches consistent state in an interruptible + * mode. + * + * Returns the donor OpTime at which the migration reached consistent state. Throws + * exception on error. + */ + OpTime waitUntilMigrationReachesConsistentState(OperationContext* opCtx) const; + + /** + * Blocks the thread until the tenant oplog applier applied data past the + * 'returnAfterReachingTimestamp' in an interruptible mode. If the recipient's logical clock + * has not yet reached the 'returnAfterReachingTimestamp', advances the recipient's logical + * clock to 'returnAfterReachingTimestamp'. Finally, stores the + * 'returnAfterReachingTimestamp' as 'rejectReadsBeforeTimestamp' in the state + * document and waits for the write to be replicated to every node (i.e. wait for + * 'rejectReadsBeforeTimestamp' to be set on the TenantMigrationRecipientAccessBlocker of + * every node) to guarantee that no reads will be incorrectly accepted. + */ + OpTime waitUntilMigrationReachesReturnAfterReachingTimestamp( + OperationContext* opCtx, const Timestamp& returnAfterReachingTimestamp); + + /** + * Called when a replica set member (self, or a secondary) finishes importing donated files. + */ + void onMemberImportedFiles(const HostAndPort& host, + bool success, + const boost::optional<StringData>& reason = boost::none); + + /** + * Set the oplog creator functor, to allow use of a mock oplog fetcher. + */ + void setCreateOplogFetcherFn_forTest( + std::unique_ptr<OplogFetcherFactory>&& createOplogFetcherFn) { + _createOplogFetcherFn = std::move(createOplogFetcherFn); + } + + /** + * Stops the oplog applier without going through recipientForgetMigration. + */ + void stopOplogApplier_forTest() { + stdx::lock_guard lk(_mutex); + _tenantOplogApplier->shutdown(); + } + + /** + * Suppresses selecting 'host' as the donor sync source, until 'until'. + */ + void excludeDonorHost_forTest(const HostAndPort& host, Date_t until) { + stdx::lock_guard lk(_mutex); + _excludeDonorHost(lk, host, until); + } + + const auto& getExcludedDonorHosts_forTest() { + return _excludedDonorHosts; + } + + private: + friend class ShardMergeRecipientServiceTest; + friend class ShardMergeRecipientServiceShardMergeTest; + + /** + * Only used for testing. Allows setting a custom task executor for backup cursor fetcher. + */ + void setBackupCursorFetcherExecutor_forTest( + std::shared_ptr<executor::TaskExecutor> taskExecutor) { + _backupCursorExecutor = taskExecutor; + } + + const NamespaceString _stateDocumentsNS = NamespaceString::kShardMergeRecipientsNamespace; + + using ConnectionPair = + std::pair<std::unique_ptr<DBClientConnection>, std::unique_ptr<DBClientConnection>>; + + /** + * Transitions the instance state to 'kStarted' if the state is uninitialized. + */ + SemiFuture<void> _initializeAndDurablyPersistStateDoc(); + + /** + * Execute steps which are necessary to start a migration, such as, establishing donor + * client connection, setting up internal state, get donor cluster keys, etc. + */ + SemiFuture<void> _prepareForMigration(const CancellationToken& token); + + /** + * Sets up internal state to begin migration. + */ + void _setup(ConnectionPair connectionPair); + + /** + * Start migration only if the following FCV checks passes: + * a) Not in middle of FCV upgrading/downgrading. + * b) Donor and recipient FCV matches. + */ + SemiFuture<void> _startMigrationIfSafeToRunwithCurrentFCV(const CancellationToken& token); + + /** + * Helper to run FCV sanity checks at the start of migration. + */ + void _assertIfMigrationIsSafeToRunWithCurrentFcv(); + + /** + * Waits for all data bearing nodes to complete import. + */ + SemiFuture<void> _waitForAllNodesToFinishImport(); + + /** + * Tells whether the migration is committed or aborted. + */ + bool _isCommitOrAbortState(WithLock) const; + + /** + * Waits for recipientForgetMigartion command for migration decision and then, mark external + * keys doc and instance state doc as garbage collectable. + */ + SemiFuture<void> _waitForForgetMigrationThenMarkMigrationGarbageCollectable( + const CancellationToken& token); + + /** + * Durably persists the migration decision in the state doc. + */ + SemiFuture<void> _durablyPersistCommitAbortDecision(MigrationDecisionEnum decision); + + /* + * Drops ephemeral collections used for migrations after migration decision is durably + * persisted. + */ + void _dropTempCollections(); + + /** + * Sets the `expireAt` field at the state doc. + */ + SemiFuture<void> _markStateDocAsGarbageCollectable(); + + /** + * Deletes the state document. Does not return the opTime for the delete, since it's not + * necessary to wait for this delete to be majority committed (this is one of the last steps + * in the chain, and if the delete rolls back, the new primary will re-do the delete). + */ + SemiFuture<void> _removeStateDoc(const CancellationToken& token); + + SemiFuture<void> _waitForGarbageCollectionDelayThenDeleteStateDoc( + const CancellationToken& token); + + /** + * Creates a client, connects it to the donor. If '_transientSSLParams' is not none, uses + * the migration certificate to do SSL authentication. Otherwise, uses the default + * authentication mode. Throws a user assertion on failure. + * + */ + std::unique_ptr<DBClientConnection> _connectAndAuth(const HostAndPort& serverAddress, + StringData applicationName); + + /** + * Creates and connects both the oplog fetcher client and the client used for other + * operations. + */ + SemiFuture<ConnectionPair> _createAndConnectClients(); + + /** + * Fetches all key documents from the donor's admin.system.keys collection, stores them in + * config.external_validation_keys, and refreshes the keys cache. + */ + void _fetchAndStoreDonorClusterTimeKeyDocs(const CancellationToken& token); + + /** + * Opens a backup cursor on the donor primary and fetches the + * list of donor files to be cloned. + */ + SemiFuture<void> _openBackupCursor(const CancellationToken& token); + SemiFuture<void> _openBackupCursorWithRetry(const CancellationToken& token); + + /** + * Keeps the donor backup cursor alive. + */ + void _keepBackupCursorAlive(const CancellationToken& token); + + /** + * Kills the Donor backup cursor. + */ + SemiFuture<void> _killBackupCursor(); + + /** + * Gets the backup cursor metadata info. + */ + const BackupCursorInfo& _getDonorBackupCursorInfo(WithLock) const; + + /** + * Get the oldest active multi-statement transaction optime by reading + * config.transactions collection at given ReadTimestamp (i.e, equal to + * startApplyingDonorOpTime) snapshot. + */ + boost::optional<OpTime> _getOldestActiveTransactionAt(Timestamp ReadTimestamp); + + /** + * Retrieves the start/fetch optimes from the donor and updates the in-memory/on-disk states + * accordingly. + */ + SemiFuture<void> _getStartOpTimesFromDonor(); + + /** + * Pushes documents from oplog fetcher to oplog buffer. + * + * Returns a status even though it always returns OK, to conform the interface OplogFetcher + * expects for the EnqueueDocumentsFn. + */ + Status _enqueueDocuments(OplogFetcher::Documents::const_iterator begin, + OplogFetcher::Documents::const_iterator end, + const OplogFetcher::DocumentsInfo& info); + + /** + * Creates the oplog buffer that will be populated by donor oplog entries from the retryable + * writes fetching stage and oplog fetching stage. + */ + void _createOplogBuffer(WithLock, OperationContext* opCtx); + + /** + * Runs an aggregation that gets the entire oplog chain for every retryable write entry in + * `config.transactions`. Only returns oplog entries in the chain where + * `ts` < `startFetchingOpTime.ts` and adds them to the oplog buffer. + */ + SemiFuture<void> _fetchRetryableWritesOplogBeforeStartOpTime(); + + /** + * Migrates committed transactions entries into 'config.transactions'. + */ + SemiFuture<void> _fetchCommittedTransactionsBeforeStartOpTime(); + + /** + * Opens and returns a cursor for all entries with 'lastWriteOpTime' <= + * 'startApplyingDonorOpTime' and state 'committed'. + */ + std::unique_ptr<DBClientCursor> _openCommittedTransactionsFindCursor(); + + /** + * Opens and returns a cursor for entries from '_makeCommittedTransactionsAggregation()'. + */ + std::unique_ptr<DBClientCursor> _openCommittedTransactionsAggregationCursor(); + + /** + * Creates an aggregation pipeline to fetch transaction entries with 'lastWriteOpTime' < + * 'startFetchingDonorOpTime' and 'state: committed'. + */ + AggregateCommandRequest _makeCommittedTransactionsAggregation() const; + + /** + * Processes a committed transaction entry from the donor. Updates the recipient's + * 'config.transactions' collection with the entry and writes a no-op entry for the + * recipient secondaries to replicate the entry. + */ + void _processCommittedTransactionEntry(const BSONObj& entry); + + /** + * Starts the oplog buffer only if the node is primary. Otherwise, throw error. + */ + void _startOplogBuffer(OperationContext* opCtx); + + /** + * Starts the tenant oplog fetcher. + */ + void _startOplogFetcher(); + + /** + * Called when the oplog fetcher finishes. Usually the oplog fetcher finishes only when + * cancelled or on error. + */ + void _oplogFetcherCallback(Status oplogFetcherStatus); + + /** + * Starts the tenant oplog applier. + */ + void _startOplogApplier(); + + /** + * Waits for tenant oplog applier to stop. + */ + SemiFuture<TenantOplogApplier::OpTimePair> _waitForMigrationToComplete(); + + /** + * Advances the majority commit timestamp to be >= donor's backup cursor checkpoint + * timestamp(CkptTs) by: + * 1. Advancing the clusterTime to CkptTs. + * 2. Writing a no-op oplog entry with ts > CkptTs + * 3. Waiting for the majority commit timestamp to be the time of the no-op write. + * + * Notes: This method should be called before transitioning the instance state to + * 'kLearnedFilenames' which causes donor collections to get imported. Current import rule + * is that the import table's checkpoint timestamp can't be later than the recipient's + * stable timestamp. Due to the fact, we don't have a mechanism to wait until a specific + * stable timestamp on a given node or set of nodes in the replica set and the majority + * commit point and stable timestamp aren't atomically updated, advancing the majority + * commit point on the recipient before import collection stage is a best-effort attempt to + * prevent import retry attempts on import timestamp rule violation. + */ + SemiFuture<void> _advanceMajorityCommitTsToBkpCursorCheckpointTs( + const CancellationToken& token); + + /** + * Returns a future that will be fulfilled when the tenant migration reaches consistent + * state. + */ + SemiFuture<void> _getDataConsistentFuture(); + + /** + * Transitions the instance state to 'kLearnedFilenames' after learning all filenames to be + * imported. + */ + SemiFuture<void> _enterLearnedFilenamesState(); + + /** + * Durably persist that migration has reached consistent state and signal waiters. + */ + SemiFuture<void> _enterConsistentState(); + SemiFuture<void> _durablyPersistConsistentState(); + + /** + * Gets the migration interrupt status. Answers may change after this call as it reads the + * interrupt status without holding mutex lock. It's the caller's responsibility to decide + * if they need to hold mutex lock or not before calling the method. + */ + Status _getInterruptStatus() const; + + /** + * Cancels all remaining work in the migration. + */ + void _cancelRemainingWork(WithLock lk, Status status); + + /** + * Performs some cleanup work on migration completion, like, shutting down the components or + * fulfilling any instance promises. + */ + void _cleanupOnMigrationCompletion(Status status); + + /** + * Suppresses selecting 'host' as the donor sync source, until 'until'. + */ + void _excludeDonorHost(WithLock, const HostAndPort& host, Date_t until); + + /** + * Returns a vector of currently excluded donor hosts. Also removes hosts from the list of + * excluded donor nodes, if the exclude duration has expired. + */ + std::vector<HostAndPort> _getExcludedDonorHosts(WithLock); + + /** + * Makes the failpoint stop or hang the migration based on failpoint data "action" field. + * If "action" is "hang" and 'opCtx' is not null, the failpoint will be interruptible. + */ + void _stopOrHangOnFailPoint(FailPoint* fp, OperationContext* opCtx = nullptr); + + enum class OpType { kInsert, kUpdate }; + using RegisterChangeCbk = std::function<void(OperationContext* opCtx)>; + /** + * Insert/updates the shard merge recipient state doc and waits for that change to be + * propagated to a majority. + */ + SemiFuture<void> _insertStateDocForMajority( + WithLock lk, const RegisterChangeCbk& registerChange = nullptr); + SemiFuture<void> _updateStateDocForMajority( + WithLock lk, const RegisterChangeCbk& registerChange = nullptr); + + /** + * Helper to persist state doc. + */ + SemiFuture<void> _writeStateDocForMajority( + WithLock, OpType opType, const RegisterChangeCbk& registerChange = nullptr); + + /** + * Insert/updates the shard merge recipient state doc. Throws error if it fails to + * perform the operation opType. + */ + void _writeStateDoc(OperationContext* opCtx, + const ShardMergeRecipientDocument& stateDoc, + OpType opType, + const RegisterChangeCbk& registerChange = nullptr); + + /** + * Returns the majority OpTime on the donor node that 'client' is connected to. + */ + OpTime _getDonorMajorityOpTime(std::unique_ptr<mongo::DBClientConnection>& client); + + /** + * Send the killBackupCursor command to the remote in order to close the backup cursor + * connection on the donor. + */ + StatusWith<executor::TaskExecutor::CallbackHandle> _scheduleKillBackupCursorWithLock( + WithLock lk, std::shared_ptr<executor::TaskExecutor> executor); + + mutable Mutex _mutex = MONGO_MAKE_LATCH("ShardMergeRecipientService::_mutex"); + + // All member variables are labeled with one of the following codes indicating the + // synchronization rules for accessing them. + // + // (R) Read-only in concurrent operation; no synchronization required. + // (S) Self-synchronizing; access according to class's own rules. + // (M) Reads and writes guarded by _mutex. + // (W) Synchronization required only for writes. + + ServiceContext* const _serviceContext; + const ShardMergeRecipientService* const _recipientService; // (R) (not owned) + std::shared_ptr<executor::ScopedTaskExecutor> _scopedExecutor; // (M) + std::shared_ptr<executor::TaskExecutor> _backupCursorExecutor; // (M) + ShardMergeRecipientDocument _stateDoc; // (M) + + // This data is provided in the initial state doc and never changes. We keep copies to + // avoid having to obtain the mutex to access them. + const std::vector<TenantId> _tenantIds; // (R) + const UUID _migrationUuid; // (R) + const std::string _donorConnectionString; // (R) + const MongoURI _donorUri; // (R) + const ReadPreferenceSetting _readPreference; // (R) + const boost::optional<TenantMigrationPEMPayload> _recipientCertificateForDonor; // (R) + // TODO (SERVER-54085): Remove server parameter tenantMigrationDisableX509Auth. + // Transient SSL params created based on the state doc if the server parameter + // 'tenantMigrationDisableX509Auth' is false. + const boost::optional<TransientSSLParams> _transientSSLParams = boost::none; // (R) + + std::shared_ptr<ReplicaSetMonitor> _donorReplicaSetMonitor; // (M) + + // Members of the donor replica set that we have excluded as a potential sync source for + // some period of time. + std::vector<std::pair<HostAndPort, Date_t>> _excludedDonorHosts; // (M) + + // The '_client' will be used for other operations such as fetching + // optimes while the '_oplogFetcherClient' will be reserved for the oplog fetcher only. + // Because the oplog fetcher uses exhaust, we need a dedicated connection for oplog fetcher. + // + // Follow DBClientCursor synchonization rules. + std::unique_ptr<DBClientConnection> _client; // (S) + std::unique_ptr<DBClientConnection> _oplogFetcherClient; // (S) + + std::unique_ptr<Fetcher> _donorFilenameBackupCursorFileFetcher; // (M) + CancellationSource _backupCursorKeepAliveCancellation = {}; // (X) + boost::optional<SemiFuture<void>> _backupCursorKeepAliveFuture; // (M) + + std::unique_ptr<OplogFetcherFactory> _createOplogFetcherFn = + std::make_unique<CreateOplogFetcherFn>(); // (M) + std::unique_ptr<OplogBufferCollection> _donorOplogBuffer; // (M) + std::unique_ptr<DataReplicatorExternalState> _dataReplicatorExternalState; // (M) + std::unique_ptr<OplogFetcher> _donorOplogFetcher; // (M) + std::shared_ptr<TenantOplogApplier> _tenantOplogApplier; // (M) + + // Writer pool to do storage write operation. Used by tenant collection cloner and by + // tenant oplog applier. + std::unique_ptr<ThreadPool> _writerPool; //(M) + // Data shared by cloners. Follow TenantMigrationSharedData synchronization rules. + std::unique_ptr<TenantMigrationSharedData> _sharedData; // (S) + + // Promise that is resolved when all recipient nodes have imported all donor files. + SharedPromise<void> _importedFilesPromise; // (W) + // Whether we are waiting for members to import donor files. + bool _waitingForMembersToImportFiles = true; + // Which members have imported all donor files. + stdx::unordered_set<HostAndPort> _membersWhoHaveImportedFiles; + + // Promise that is resolved when the migration reached consistent point. + SharedPromise<OpTime> _dataConsistentPromise; // (W) + // Promise that is resolved when migration is completed. + SharedPromise<void> _migrationCompletionPromise; // (W) + // Promise that is resolved when the recipientForgetMigration command is received or on + // stepDown/shutDown with errors. + SharedPromise<MigrationDecisionEnum> _receivedRecipientForgetMigrationPromise; // (W) + // Promise that is resolved when the instance has been durably marked garbage collectable. + SharedPromise<void> _forgetMigrationDurablePromise; // (W) + // Promise that is resolved with when the instance is interrupted, and holds interrupt error + // status. + SharedPromise<void> _interruptPromise; // (M) + + // Waiters are notified when 'tenantOplogApplier' is valid on restart. + stdx::condition_variable _restartOplogApplierCondVar; // (M) + // Waiters are notified when 'tenantOplogApplier' is ready to use. + stdx::condition_variable _oplogApplierReadyCondVar; // (M) + // Indicates whether 'tenantOplogApplier' is ready to use or not. + bool _oplogApplierReady = false; // (M) + }; + +private: + /** + * Creates the state document collection. + */ + ExecutorFuture<void> _rebuildService(std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& token) override; + + ServiceContext* const _serviceContext; +}; +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/tenant_file_importer_service.cpp b/src/mongo/db/repl/tenant_file_importer_service.cpp index 484c0161989..80f4a561f64 100644 --- a/src/mongo/db/repl/tenant_file_importer_service.cpp +++ b/src/mongo/db/repl/tenant_file_importer_service.cpp @@ -106,6 +106,10 @@ TenantFileImporterService* TenantFileImporterService::get(ServiceContext* servic return &_TenantFileImporterService(serviceContext); } +TenantFileImporterService* TenantFileImporterService::get(OperationContext* opCtx) { + return get(opCtx->getServiceContext()); +} + void TenantFileImporterService::startMigration(const UUID& migrationId) { _reset(); @@ -186,11 +190,11 @@ void TenantFileImporterService::learnedAllFilenames(const UUID& migrationId) { void TenantFileImporterService::interrupt(const UUID& migrationId) { stdx::lock_guard lk(_mutex); if (migrationId != _migrationId) { - LOGV2_WARNING( - 6378901, - "Called interrupt with migrationId {migrationId}, but {activeMigrationId} is active", - "migrationId"_attr = migrationId.toString(), - "activeMigrationId"_attr = _migrationId ? _migrationId->toString() : "no migration"); + LOGV2_WARNING(6378901, + "TenantFileImporterService interrupted", + "migrationId"_attr = migrationId.toString(), + "activeMigrationId"_attr = + _migrationId ? _migrationId->toString() : "no migration"); return; } _interrupt(lk); diff --git a/src/mongo/db/repl/tenant_file_importer_service.h b/src/mongo/db/repl/tenant_file_importer_service.h index 6741aa11519..4a6650da72f 100644 --- a/src/mongo/db/repl/tenant_file_importer_service.h +++ b/src/mongo/db/repl/tenant_file_importer_service.h @@ -48,6 +48,7 @@ class TenantFileImporterService : public ReplicaSetAwareService<TenantFileImport public: static constexpr StringData kTenantFileImporterServiceName = "TenantFileImporterService"_sd; static TenantFileImporterService* get(ServiceContext* serviceContext); + static TenantFileImporterService* get(OperationContext* opCtx); TenantFileImporterService() = default; /** diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp index 72b38c21551..ce250ffe301 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp +++ b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp @@ -72,8 +72,8 @@ bool noDataHasBeenCopiedByRecipient(const TenantMigrationRecipientDocument& doc) return !doc.getRecipientPrimaryStartingFCV(); } -bool recoverTenantMigrationRecipientAccessBlockers(const TenantMigrationRecipientDocument& doc, - OperationContext* opCtx) { +bool recoverTenantMigrationRecipientAccessBlockers(OperationContext* opCtx, + const TenantMigrationRecipientDocument& doc) { // Do not create the mtab when: // 1) The migration was forgotten before receiving a 'recipientSyncData'. // 2) A delayed 'recipientForgetMigration' was received after the state doc was deleted. @@ -122,6 +122,48 @@ bool recoverTenantMigrationRecipientAccessBlockers(const TenantMigrationRecipien return true; } + +bool recoverShardMergeRecipientAccessBlockers(OperationContext* opCtx, + const ShardMergeRecipientDocument& doc) { + // Do not create mtab for following cases. Otherwise, we can get into potential race + // causing recovery procedure to fail with `ErrorCodes::ConflictingServerlessOperation`. + // 1) The migration was skipped. + if (doc.getStartGarbageCollect()) { + invariant(doc.getState() == ShardMergeRecipientStateEnum::kAborted || + doc.getState() == ShardMergeRecipientStateEnum::kCommitted); + return true; + } + // 2) Aborted state doc marked as garbage collectable. + if (doc.getState() == ShardMergeRecipientStateEnum::kAborted && doc.getExpireAt()) { + return true; + } + + auto mtab = std::make_shared<TenantMigrationRecipientAccessBlocker>(opCtx->getServiceContext(), + doc.getId()); + TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) + .add(doc.getTenantIds(), mtab); + + switch (doc.getState()) { + case ShardMergeRecipientStateEnum::kStarted: + case ShardMergeRecipientStateEnum::kLearnedFilenames: + break; + case ShardMergeRecipientStateEnum::kCommitted: + if (doc.getExpireAt()) { + mtab->stopBlockingTTL(); + } + FMT_FALLTHROUGH; + case ShardMergeRecipientStateEnum::kConsistent: + case ShardMergeRecipientStateEnum::kAborted: + if (auto rejectTs = doc.getRejectReadsBeforeTimestamp()) { + mtab->startRejectingReadsBefore(*rejectTs); + } + break; + default: + MONGO_UNREACHABLE; + } + + return true; +} } // namespace std::shared_ptr<TenantMigrationDonorAccessBlocker> getDonorAccessBlockerForMigration( @@ -487,7 +529,14 @@ void recoverTenantMigrationAccessBlockers(OperationContext* opCtx) { NamespaceString::kTenantMigrationRecipientsNamespace); recipientStore.forEach(opCtx, {}, [&](const TenantMigrationRecipientDocument& doc) { - return recoverTenantMigrationRecipientAccessBlockers(doc, opCtx); + return recoverTenantMigrationRecipientAccessBlockers(opCtx, doc); + }); + + PersistentTaskStore<ShardMergeRecipientDocument> mergeRecipientStore( + NamespaceString::kShardMergeRecipientsNamespace); + + mergeRecipientStore.forEach(opCtx, {}, [&](const ShardMergeRecipientDocument& doc) { + return recoverShardMergeRecipientAccessBlockers(opCtx, doc); }); // Recover TenantMigrationDonorAccessBlockers for ShardSplit. diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index 2997b64c957..60d5ce1f266 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -1522,7 +1522,7 @@ TenantMigrationDonorService::Instance::_waitForForgetMigrationThenMarkMigrationG // If the abortReason is ConflictingServerlessOperation, it means there are no // document on the recipient. Do not send the forget command. stdx::lock_guard<Latch> lg(_mutex); - if (_abortReason && + if (_protocol == MigrationProtocolEnum::kMultitenantMigrations && _abortReason && _abortReason->code() == ErrorCodes::ConflictingServerlessOperation) { return ExecutorFuture(**executor); } diff --git a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp index 5b0c4172c17..83690fe0f5a 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_entry_helpers.cpp @@ -45,7 +45,7 @@ #include "mongo/db/storage/write_unit_of_work.h" #include "mongo/util/str.h" -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTenantMigration namespace mongo { diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 63a3efa2b31..1e01d46a9be 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -149,47 +149,6 @@ bool isRetriableOplogFetcherError(Status oplogFetcherStatus) { } // namespace -// A convenient place to set test-specific parameters. -MONGO_FAIL_POINT_DEFINE(pauseBeforeRunTenantMigrationRecipientInstance); -MONGO_FAIL_POINT_DEFINE(pauseAfterRunTenantMigrationRecipientInstance); -MONGO_FAIL_POINT_DEFINE(skipTenantMigrationRecipientAuth); -MONGO_FAIL_POINT_DEFINE(skipComparingRecipientAndDonorFCV); -MONGO_FAIL_POINT_DEFINE(autoRecipientForgetMigration); -MONGO_FAIL_POINT_DEFINE(skipFetchingCommittedTransactions); -MONGO_FAIL_POINT_DEFINE(skipFetchingRetryableWritesEntriesBeforeStartOpTime); -MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationRecipientBeforeDeletingStateDoc); - -// Fails before waiting for the state doc to be majority replicated. -MONGO_FAIL_POINT_DEFINE(failWhilePersistingTenantMigrationRecipientInstanceStateDoc); -MONGO_FAIL_POINT_DEFINE(fpAfterPersistingTenantMigrationRecipientInstanceStateDoc); -MONGO_FAIL_POINT_DEFINE(fpBeforeFetchingDonorClusterTimeKeys); -MONGO_FAIL_POINT_DEFINE(fpAfterConnectingTenantMigrationRecipientInstance); -MONGO_FAIL_POINT_DEFINE(fpAfterRecordingRecipientPrimaryStartingFCV); -MONGO_FAIL_POINT_DEFINE(fpAfterComparingRecipientAndDonorFCV); -MONGO_FAIL_POINT_DEFINE(fpAfterRetrievingStartOpTimesMigrationRecipientInstance); -MONGO_FAIL_POINT_DEFINE(fpSetSmallAggregationBatchSize); -MONGO_FAIL_POINT_DEFINE(fpBeforeWaitingForRetryableWritePreFetchMajorityCommitted); -MONGO_FAIL_POINT_DEFINE(pauseAfterRetrievingRetryableWritesBatch); -MONGO_FAIL_POINT_DEFINE(fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime); -MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogFetcherMigrationRecipientInstance); -MONGO_FAIL_POINT_DEFINE(setTenantMigrationRecipientInstanceHostTimeout); -MONGO_FAIL_POINT_DEFINE(pauseAfterRetrievingLastTxnMigrationRecipientInstance); -MONGO_FAIL_POINT_DEFINE(fpBeforeMarkingCloneSuccess); -MONGO_FAIL_POINT_DEFINE(fpBeforeFetchingCommittedTransactions); -MONGO_FAIL_POINT_DEFINE(fpAfterFetchingCommittedTransactions); -MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogApplierMigrationRecipientInstance); -MONGO_FAIL_POINT_DEFINE(fpBeforeFulfillingDataConsistentPromise); -MONGO_FAIL_POINT_DEFINE(fpAfterDataConsistentMigrationRecipientInstance); -MONGO_FAIL_POINT_DEFINE(fpBeforePersistingRejectReadsBeforeTimestamp); -MONGO_FAIL_POINT_DEFINE(fpAfterWaitForRejectReadsBeforeTimestamp); -MONGO_FAIL_POINT_DEFINE(hangBeforeTaskCompletion); -MONGO_FAIL_POINT_DEFINE(fpAfterReceivingRecipientForgetMigration); -MONGO_FAIL_POINT_DEFINE(hangAfterCreatingRSM); -MONGO_FAIL_POINT_DEFINE(skipRetriesWhenConnectingToDonorHost); -MONGO_FAIL_POINT_DEFINE(fpBeforeDroppingTempCollections); -MONGO_FAIL_POINT_DEFINE(fpWaitUntilTimestampMajorityCommitted); -MONGO_FAIL_POINT_DEFINE(hangAfterUpdatingTransactionEntry); -MONGO_FAIL_POINT_DEFINE(fpBeforeAdvancingStableTimestamp); MONGO_FAIL_POINT_DEFINE(hangMigrationBeforeRetryCheck); MONGO_FAIL_POINT_DEFINE(skipCreatingIndexDuringRebuildService); MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationRecipientInstanceBeforeDeletingOldStateDoc); diff --git a/src/mongo/db/repl/tenant_migration_state_machine.idl b/src/mongo/db/repl/tenant_migration_state_machine.idl index 5d7054da435..1ec1314d49c 100644 --- a/src/mongo/db/repl/tenant_migration_state_machine.idl +++ b/src/mongo/db/repl/tenant_migration_state_machine.idl @@ -64,6 +64,16 @@ enums: kCommitted: "committed" kAborted: "aborted" + ShardMergeRecipientState: + description: "The state of shard merge on recipient side." + type: string + values: + kStarted: "started" + kLearnedFilenames: "learned filenames" + kConsistent: "consistent" + kCommitted: "committed" + kAborted: "aborted" + structs: tenantMigrationDonorDocument: description: "Represents an in-progress tenant migration on the migration donor." @@ -282,3 +292,108 @@ structs: The opTime at which the donor's state document was set to 'aborted'. Only set for 'shard merge' when it goes into 'aborted' state. optional: true + + ShardMergeRecipientDocument: + description: "Represents an in-progress shard merge on the merge recipient." + strict: true + fields: + _id: + type: uuid + description: "Unique identifier for the shard merge." + cpp_name: id + donorConnectionString: + type: string + description: >- + The URI string that the recipient will utilize to create a connection + with the donor. + validator: + callback: "tenant_migration_util::validateConnectionString" + tenantIds: + type: array<tenant_id> + description: "List of tenants to be migrated." + startMigrationDonorTimestamp: + type: timestamp + description: >- + Timestamp after all index builds for tenant are complete or aborted. Recipient + must not start cloning/fetching oplog entries from the donor until this + timestamp is majority committed. + validator: + callback: "tenant_migration_util::validateTimestampNotNull" + readPreference: + type: readPreference + description: >- + The read preference setting that the recipient will use to determine + which node in the donor replica set to clone from. + state: + type: ShardMergeRecipientState + description: "The state of the shard merge recipient" + default: kStarted + expireAt: + type: date + description: >- + The wall-clock time at which the state machine document should be + removed. + optional: true + startApplyingDonorOpTime: + description: >- + Populated during migration; the donor's operation time when the data + cloning starts. + type: optime + optional: true + startFetchingDonorOpTime: + description: >- + Populated during migration; the donor's operation time of the last open + transaction when the data cloning started. + type: optime + optional: true + rejectReadsBeforeTimestamp: + description: >- + Populated during migration when the recipientSyncData with the + returnAfterReachingTimestamp is received after data is consistent; the earliest + timestamp at which reads are allowed on the recipient (corresponds to the + donor's blockTimestamp). + type: timestamp + optional: true + cloneFinishedRecipientOpTime: + description: >- + Populated during migration; the recipient operation time when the data + cloning finishes. + type: optime + optional: true + recipientCertificateForDonor: + description: >- + The SSL certificate and private key that the recipient should use to + authenticate to the donor. + type: TenantMigrationPEMPayload + # TODO (SERVER-54085): Remove server parameter tenantMigrationDisableX509Auth. + optional: true + completedUpdatingTransactionsBeforeStartOpTime: + description: >- + Indicates if the recipient has finished updating transaction entries that were + committed before 'startFetchingDonorOpTime'. If true, the recipient can skip + the fetching transactions stage. + type: bool + default: false + completedFetchingRetryableWritesBeforeStartOpTime: + description: >- + Indicates if the recipient has finished fetching retryable writes oplog entries + before 'startFetchingDonorOpTime' for each retryable writes entry in + 'config.transactions' + type: bool + default: false + startAt: + type: date + description: >- + The wall-clock time at which the state machine document is initialized. + optional: true + startGarbageCollect: + type: bool + description: >- + Indicates if migration needs to be skipped. If true, the recipient state machine + will skip migration and waits for migration decision directly. + default: false + abortOpTime: + type: optime + description: + The opTime at which the donor's state document was set to 'aborted'. + optional: true diff --git a/src/mongo/db/repl/tenant_migration_util.cpp b/src/mongo/db/repl/tenant_migration_util.cpp index 824ad84a039..9f1109b7b72 100644 --- a/src/mongo/db/repl/tenant_migration_util.cpp +++ b/src/mongo/db/repl/tenant_migration_util.cpp @@ -54,6 +54,49 @@ namespace mongo { +namespace repl { + +MONGO_FAIL_POINT_DEFINE(pauseBeforeRunTenantMigrationRecipientInstance); +MONGO_FAIL_POINT_DEFINE(pauseAfterRunTenantMigrationRecipientInstance); +MONGO_FAIL_POINT_DEFINE(skipTenantMigrationRecipientAuth); +MONGO_FAIL_POINT_DEFINE(skipComparingRecipientAndDonorFCV); +MONGO_FAIL_POINT_DEFINE(autoRecipientForgetMigration); +MONGO_FAIL_POINT_DEFINE(skipFetchingCommittedTransactions); +MONGO_FAIL_POINT_DEFINE(skipFetchingRetryableWritesEntriesBeforeStartOpTime); +MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationRecipientBeforeDeletingStateDoc); +MONGO_FAIL_POINT_DEFINE(failWhilePersistingTenantMigrationRecipientInstanceStateDoc); +MONGO_FAIL_POINT_DEFINE(fpAfterPersistingTenantMigrationRecipientInstanceStateDoc); +MONGO_FAIL_POINT_DEFINE(fpBeforeFetchingDonorClusterTimeKeys); +MONGO_FAIL_POINT_DEFINE(fpAfterConnectingTenantMigrationRecipientInstance); +MONGO_FAIL_POINT_DEFINE(fpAfterRecordingRecipientPrimaryStartingFCV); +MONGO_FAIL_POINT_DEFINE(fpAfterComparingRecipientAndDonorFCV); +MONGO_FAIL_POINT_DEFINE(fpAfterRetrievingStartOpTimesMigrationRecipientInstance); +MONGO_FAIL_POINT_DEFINE(fpSetSmallAggregationBatchSize); +MONGO_FAIL_POINT_DEFINE(fpBeforeWaitingForRetryableWritePreFetchMajorityCommitted); +MONGO_FAIL_POINT_DEFINE(pauseAfterRetrievingRetryableWritesBatch); +MONGO_FAIL_POINT_DEFINE(fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime); +MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogFetcherMigrationRecipientInstance); +MONGO_FAIL_POINT_DEFINE(setTenantMigrationRecipientInstanceHostTimeout); +MONGO_FAIL_POINT_DEFINE(pauseAfterRetrievingLastTxnMigrationRecipientInstance); +MONGO_FAIL_POINT_DEFINE(fpBeforeMarkingCloneSuccess); +MONGO_FAIL_POINT_DEFINE(fpBeforeFetchingCommittedTransactions); +MONGO_FAIL_POINT_DEFINE(fpAfterFetchingCommittedTransactions); +MONGO_FAIL_POINT_DEFINE(fpAfterStartingOplogApplierMigrationRecipientInstance); +MONGO_FAIL_POINT_DEFINE(fpBeforeFulfillingDataConsistentPromise); +MONGO_FAIL_POINT_DEFINE(fpAfterDataConsistentMigrationRecipientInstance); +MONGO_FAIL_POINT_DEFINE(fpBeforePersistingRejectReadsBeforeTimestamp); +MONGO_FAIL_POINT_DEFINE(fpAfterWaitForRejectReadsBeforeTimestamp); +MONGO_FAIL_POINT_DEFINE(hangBeforeTaskCompletion); +MONGO_FAIL_POINT_DEFINE(fpAfterReceivingRecipientForgetMigration); +MONGO_FAIL_POINT_DEFINE(hangAfterCreatingRSM); +MONGO_FAIL_POINT_DEFINE(skipRetriesWhenConnectingToDonorHost); +MONGO_FAIL_POINT_DEFINE(fpBeforeDroppingTempCollections); +MONGO_FAIL_POINT_DEFINE(fpWaitUntilTimestampMajorityCommitted); +MONGO_FAIL_POINT_DEFINE(hangAfterUpdatingTransactionEntry); +MONGO_FAIL_POINT_DEFINE(fpBeforeAdvancingStableTimestamp); + +} // namespace repl + namespace tenant_migration_util { namespace { diff --git a/src/mongo/db/repl/tenant_migration_util.h b/src/mongo/db/repl/tenant_migration_util.h index 78426ab0ea7..518a93c5879 100644 --- a/src/mongo/db/repl/tenant_migration_util.h +++ b/src/mongo/db/repl/tenant_migration_util.h @@ -55,6 +55,50 @@ const std::set<std::string> kUnsupportedTenantIds{"", "admin", "local", "config" } // namespace +namespace repl { + +// Common failpoints between ShardMergeRecipientService and TenantMigrationRecipientService. +extern FailPoint pauseBeforeRunTenantMigrationRecipientInstance; +extern FailPoint pauseAfterRunTenantMigrationRecipientInstance; +extern FailPoint skipTenantMigrationRecipientAuth; +extern FailPoint skipComparingRecipientAndDonorFCV; +extern FailPoint autoRecipientForgetMigration; +extern FailPoint skipFetchingCommittedTransactions; +extern FailPoint skipFetchingRetryableWritesEntriesBeforeStartOpTime; +extern FailPoint pauseTenantMigrationRecipientBeforeDeletingStateDoc; +extern FailPoint failWhilePersistingTenantMigrationRecipientInstanceStateDoc; +extern FailPoint fpAfterPersistingTenantMigrationRecipientInstanceStateDoc; +extern FailPoint fpBeforeFetchingDonorClusterTimeKeys; +extern FailPoint fpAfterConnectingTenantMigrationRecipientInstance; +extern FailPoint fpAfterRecordingRecipientPrimaryStartingFCV; +extern FailPoint fpAfterComparingRecipientAndDonorFCV; +extern FailPoint fpAfterRetrievingStartOpTimesMigrationRecipientInstance; +extern FailPoint fpSetSmallAggregationBatchSize; +extern FailPoint fpBeforeWaitingForRetryableWritePreFetchMajorityCommitted; +extern FailPoint pauseAfterRetrievingRetryableWritesBatch; +extern FailPoint fpAfterFetchingRetryableWritesEntriesBeforeStartOpTime; +extern FailPoint fpAfterStartingOplogFetcherMigrationRecipientInstance; +extern FailPoint setTenantMigrationRecipientInstanceHostTimeout; +extern FailPoint pauseAfterRetrievingLastTxnMigrationRecipientInstance; +extern FailPoint fpBeforeMarkingCloneSuccess; +extern FailPoint fpBeforeFetchingCommittedTransactions; +extern FailPoint fpAfterFetchingCommittedTransactions; +extern FailPoint fpAfterStartingOplogApplierMigrationRecipientInstance; +extern FailPoint fpBeforeFulfillingDataConsistentPromise; +extern FailPoint fpAfterDataConsistentMigrationRecipientInstance; +extern FailPoint fpBeforePersistingRejectReadsBeforeTimestamp; +extern FailPoint fpAfterWaitForRejectReadsBeforeTimestamp; +extern FailPoint hangBeforeTaskCompletion; +extern FailPoint fpAfterReceivingRecipientForgetMigration; +extern FailPoint hangAfterCreatingRSM; +extern FailPoint skipRetriesWhenConnectingToDonorHost; +extern FailPoint fpBeforeDroppingTempCollections; +extern FailPoint fpWaitUntilTimestampMajorityCommitted; +extern FailPoint hangAfterUpdatingTransactionEntry; +extern FailPoint fpBeforeAdvancingStableTimestamp; + +} // namespace repl + namespace tenant_migration_util { inline Status validateDatabasePrefix(const std::string& tenantId) { @@ -249,6 +293,26 @@ inline void protocolReadPreferenceCompatibilityCheck(OperationContext* opCtx, !readPreference.canRunOnSecondary()); } +inline void protocolCheckRecipientForgetDecision( + const MigrationProtocolEnum protocol, const boost::optional<MigrationDecisionEnum>& decision) { + switch (protocol) { + case MigrationProtocolEnum::kShardMerge: + uassert(ErrorCodes::InvalidOptions, + str::stream() << "'decision' is required for protocol '" + << MigrationProtocol_serializer(protocol) << "'", + decision.has_value()); + break; + case MigrationProtocolEnum::kMultitenantMigrations: + uassert(ErrorCodes::InvalidOptions, + str::stream() << "'decision' must be empty for protocol '" + << MigrationProtocol_serializer(protocol) << "'", + !decision.has_value()); + break; + default: + MONGO_UNREACHABLE; + } +} + /* * Creates an ExternalKeysCollectionDocument representing an config.external_validation_keys * document from the given the admin.system.keys document BSONObj. diff --git a/src/mongo/db/serverless/serverless_operation_lock_registry.cpp b/src/mongo/db/serverless/serverless_operation_lock_registry.cpp index 20a02c6cd15..050e160ac8e 100644 --- a/src/mongo/db/serverless/serverless_operation_lock_registry.cpp +++ b/src/mongo/db/serverless/serverless_operation_lock_registry.cpp @@ -121,7 +121,7 @@ void ServerlessOperationLockRegistry::recoverLocks(OperationContext* opCtx) { PersistentTaskStore<TenantMigrationDonorDocument> donorStore( NamespaceString::kTenantMigrationDonorsNamespace); donorStore.forEach(opCtx, {}, [&](const TenantMigrationDonorDocument& doc) { - // Do not acquire a lock for garbage-collectable documents + // Do not acquire a lock for garbage-collectable documents. if (doc.getExpireAt()) { return true; } @@ -134,7 +134,7 @@ void ServerlessOperationLockRegistry::recoverLocks(OperationContext* opCtx) { PersistentTaskStore<TenantMigrationRecipientDocument> recipientStore( NamespaceString::kTenantMigrationRecipientsNamespace); recipientStore.forEach(opCtx, {}, [&](const TenantMigrationRecipientDocument& doc) { - // Do not acquire a lock for garbage-collectable documents + // Do not acquire a lock for garbage-collectable documents. if (doc.getExpireAt()) { return true; } @@ -145,10 +145,32 @@ void ServerlessOperationLockRegistry::recoverLocks(OperationContext* opCtx) { return true; }); + PersistentTaskStore<ShardMergeRecipientDocument> mergeRecipientStore( + NamespaceString::kShardMergeRecipientsNamespace); + mergeRecipientStore.forEach(opCtx, {}, [&](const ShardMergeRecipientDocument& doc) { + // Do not acquire locks for following cases. Otherwise, we can get into potential race + // causing recovery procedure to fail with `ErrorCodes::ConflictingServerlessOperation`. + // 1) The migration was skipped. + if (doc.getStartGarbageCollect()) { + invariant(doc.getState() == ShardMergeRecipientStateEnum::kAborted || + doc.getState() == ShardMergeRecipientStateEnum::kCommitted); + return true; + } + // 2) State doc marked as garbage collectable. + if (doc.getExpireAt()) { + return true; + } + + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kMergeRecipient, + doc.getId()); + + return true; + }); + PersistentTaskStore<ShardSplitDonorDocument> splitStore( NamespaceString::kShardSplitDonorsNamespace); splitStore.forEach(opCtx, {}, [&](const ShardSplitDonorDocument& doc) { - // Do not acquire a lock for garbage-collectable documents + // Do not acquire a lock for garbage-collectable documents. if (doc.getExpireAt()) { return true; } @@ -178,6 +200,9 @@ void ServerlessOperationLockRegistry::appendInfoForServerStatus(BSONObjBuilder* case ServerlessOperationLockRegistry::LockType::kTenantRecipient: builder->append(kOperationLockFieldName, 3); break; + case ServerlessOperationLockRegistry::LockType::kMergeRecipient: + builder->append(kOperationLockFieldName, 4); + break; } } diff --git a/src/mongo/db/serverless/serverless_operation_lock_registry.h b/src/mongo/db/serverless/serverless_operation_lock_registry.h index cb122d702f8..578214d7564 100644 --- a/src/mongo/db/serverless/serverless_operation_lock_registry.h +++ b/src/mongo/db/serverless/serverless_operation_lock_registry.h @@ -51,7 +51,7 @@ public: static const ServiceContext::Decoration<ServerlessOperationLockRegistry> get; - enum LockType { kShardSplit, kTenantDonor, kTenantRecipient }; + enum LockType { kShardSplit, kTenantDonor, kTenantRecipient, kMergeRecipient }; /** * Acquire the serverless lock for LockType and adds operationId to the set of diff --git a/src/mongo/db/serverless/serverless_types.idl b/src/mongo/db/serverless/serverless_types.idl index f67af9c24d2..1aadbd35c41 100644 --- a/src/mongo/db/serverless/serverless_types.idl +++ b/src/mongo/db/serverless/serverless_types.idl @@ -38,3 +38,10 @@ enums: values: kMultitenantMigrations: "multitenant migrations" kShardMerge: "shard merge" + + MigrationDecision: + description: "Whether the migration committed or aborted." + type: string + values: + kCommitted: "committed" + kAborted: "aborted" |