diff options
44 files changed, 1856 insertions, 54 deletions
diff --git a/jstests/replsets/libs/tenant_migration_test.js b/jstests/replsets/libs/tenant_migration_test.js index 04619679e33..bd79c51b6bb 100644 --- a/jstests/replsets/libs/tenant_migration_test.js +++ b/jstests/replsets/libs/tenant_migration_test.js @@ -136,7 +136,7 @@ function TenantMigrationTest({ nodeOptions["setParameter"] = setParameterOpts; const rstName = `${name}_${(isDonor ? "donor" : "recipient")}`; - const rst = new ReplSetTest({name: rstName, nodes, nodeOptions}); + const rst = new ReplSetTest({name: rstName, nodes, serverless: true, nodeOptions}); rst.startSet(); if (initiateRstWithHighElectionTimeout) { rst.initiateWithHighElectionTimeout(); @@ -231,6 +231,7 @@ function TenantMigrationTest({ this.runDonorStartMigration = function({ migrationIdString, tenantId, + protocol, recipientConnectionString = recipientRst.getURL(), readPreference = {mode: "primary"}, donorCertificateForRecipient = migrationCertificates.donorCertificateForRecipient, @@ -251,6 +252,7 @@ function TenantMigrationTest({ readPreference, donorCertificateForRecipient, recipientCertificateForDonor, + protocol }; const stateRes = TenantMigrationUtil.runTenantMigrationCommand(cmdObj, this.getDonorRst(), { diff --git a/jstests/replsets/libs/tenant_migration_util.js b/jstests/replsets/libs/tenant_migration_util.js index d0e9354f1c1..381dc3855bd 100644 --- a/jstests/replsets/libs/tenant_migration_util.js +++ b/jstests/replsets/libs/tenant_migration_util.js @@ -262,6 +262,17 @@ var TenantMigrationUtil = (function() { return res; } + const ServerlessLockType = + {None: 0, ShardSplitDonor: 1, TenantMigrationDonor: 2, TenantMigrationRecipient: 3}; + + /** + * Return the active serverless operation lock, if one is acquired. + */ + function getServerlessOperationLock(node) { + return assert.commandWorked(node.adminCommand({serverStatus: 1, serverless: 1})) + .serverless.operationLock; + } + /** * Returns the TenantMigrationAccessBlocker serverStatus output for the multi-tenant migration * or shard merge for the given node. @@ -561,6 +572,8 @@ var TenantMigrationUtil = (function() { makeMigrationCertificatesForTest, makeX509OptionsForTest, isMigrationCompleted, + ServerlessLockType, + getServerlessOperationLock, getTenantMigrationAccessBlocker, getTenantMigrationAccessBlockers, getNumBlockedReads, diff --git a/jstests/replsets/tenant_migration_donor_initial_sync_recovery.js b/jstests/replsets/tenant_migration_donor_initial_sync_recovery.js index 85429af3ddd..8afc2d2fcf9 100644 --- a/jstests/replsets/tenant_migration_donor_initial_sync_recovery.js +++ b/jstests/replsets/tenant_migration_donor_initial_sync_recovery.js @@ -5,6 +5,7 @@ * @tags: [ * incompatible_with_macos, * incompatible_with_windows_tls, + * requires_fcv_62, * requires_majority_read_concern, * requires_persistence, * serverless, @@ -19,6 +20,7 @@ load("jstests/libs/uuid_util.js"); load("jstests/libs/parallelTester.js"); load("jstests/libs/write_concern_util.js"); load("jstests/replsets/libs/tenant_migration_test.js"); +const {ServerlessLockType, getServerlessOperationLock} = TenantMigrationUtil; const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); @@ -47,6 +49,7 @@ const migrationOpts = { migrationIdString: extractUUIDFromObject(UUID()), tenantId: kTenantId }; + assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts)); // We must wait for the migration to have finished replicating the recipient keys on the donor set // before starting initial sync, otherwise the migration will hang while waiting for initial sync to @@ -83,7 +86,8 @@ donorRst.awaitReplication(); stopServerReplication(initialSyncNode); let configDonorsColl = initialSyncNode.getCollection(TenantMigrationTest.kConfigDonorsNS); -let donorDoc = configDonorsColl.findOne({tenantId: kTenantId}); +assert.lte(configDonorsColl.count(), 1); +let donorDoc = configDonorsColl.findOne(); if (donorDoc) { jsTestLog("Initial sync completed while migration was in state: " + donorDoc.state); switch (donorDoc.state) { @@ -148,6 +152,13 @@ if (donorDoc) { } } +const activeServerlessLock = getServerlessOperationLock(initialSyncNode); +if (donorDoc && !donorDoc.expireAt) { + assert.eq(activeServerlessLock, ServerlessLockType.TenantMigrationDonor); +} else { + assert.eq(activeServerlessLock, ServerlessLockType.None); +} + if (fp) { fp.off(); } diff --git a/jstests/replsets/tenant_migration_donor_startup_recovery.js b/jstests/replsets/tenant_migration_donor_startup_recovery.js index 7a564e416a5..7dbbcb56f8c 100644 --- a/jstests/replsets/tenant_migration_donor_startup_recovery.js +++ b/jstests/replsets/tenant_migration_donor_startup_recovery.js @@ -8,6 +8,7 @@ * incompatible_with_macos, * incompatible_with_shard_merge, * incompatible_with_windows_tls, + * requires_fcv_62, * requires_majority_read_concern, * requires_persistence, * serverless, @@ -20,6 +21,7 @@ load("jstests/libs/fail_point_util.js"); load("jstests/libs/uuid_util.js"); load("jstests/replsets/libs/tenant_migration_test.js"); +const {ServerlessLockType, getServerlessOperationLock} = TenantMigrationUtil; const donorRst = new ReplSetTest({ nodes: 1, @@ -70,7 +72,8 @@ donorRst.startSet({ donorPrimary = donorRst.getPrimary(); const configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS); -const donorDoc = configDonorsColl.findOne({tenantId: kTenantId}); +assert.lte(configDonorsColl.count(), 1); +const donorDoc = configDonorsColl.findOne(); if (donorDoc) { switch (donorDoc.state) { case TenantMigrationTest.DonorState.kAbortingIndexBuilds: @@ -133,6 +136,13 @@ if (donorDoc) { } } +const activeServerlessLock = getServerlessOperationLock(donorPrimary); +if (donorDoc && !donorDoc.expireAt) { + assert.eq(activeServerlessLock, ServerlessLockType.TenantMigrationDonor); +} else { + assert.eq(activeServerlessLock, ServerlessLockType.None); +} + tenantMigrationTest.stop(); donorRst.stopSet(); })(); diff --git a/jstests/replsets/tenant_migration_recipient_initial_sync_recovery.js b/jstests/replsets/tenant_migration_recipient_initial_sync_recovery.js index 404bf0fa765..1012be2670c 100644 --- a/jstests/replsets/tenant_migration_recipient_initial_sync_recovery.js +++ b/jstests/replsets/tenant_migration_recipient_initial_sync_recovery.js @@ -6,6 +6,7 @@ * incompatible_with_macos, * incompatible_with_shard_merge, * incompatible_with_windows_tls, + * requires_fcv_62, * requires_majority_read_concern, * requires_persistence, * serverless, @@ -19,6 +20,7 @@ load("jstests/libs/fail_point_util.js"); load("jstests/libs/uuid_util.js"); load("jstests/libs/write_concern_util.js"); load("jstests/replsets/libs/tenant_migration_test.js"); +const {ServerlessLockType, getServerlessOperationLock} = TenantMigrationUtil; const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); @@ -59,7 +61,8 @@ recipientRst.awaitReplication(); stopServerReplication(initialSyncNode); const configRecipientsColl = initialSyncNode.getCollection(TenantMigrationTest.kConfigRecipientsNS); -const recipientDoc = configRecipientsColl.findOne({tenantId: kTenantId}); +assert.lte(configRecipientsColl.count(), 1); +const recipientDoc = configRecipientsColl.findOne(); if (recipientDoc) { switch (recipientDoc.state) { case TenantMigrationTest.RecipientState.kStarted: @@ -97,6 +100,13 @@ if (recipientDoc) { } } +const activeServerlessLock = getServerlessOperationLock(initialSyncNode); +if (recipientDoc && !recipientDoc.expireAt) { + assert.eq(activeServerlessLock, ServerlessLockType.TenantMigrationRecipient); +} else { + assert.eq(activeServerlessLock, ServerlessLockType.None); +} + restartServerReplication(initialSyncNode); tenantMigrationTest.stop(); diff --git a/jstests/serverless/libs/basic_serverless_test.js b/jstests/serverless/libs/basic_serverless_test.js index 69aeab56616..727827429db 100644 --- a/jstests/serverless/libs/basic_serverless_test.js +++ b/jstests/serverless/libs/basic_serverless_test.js @@ -41,6 +41,45 @@ const runCommitSplitThreadWrapper = function(rstArgs, donorRst, commitShardSplitCmdObj, retryOnRetryableErrors, enableDonorStartMigrationFsync); }; +/* + * Wait for state document garbage collection by polling for when the document has been removed + * from the 'shardSplitDonors' namespace, and all access blockers have been removed. + * @param {migrationId} id that was used for the commitShardSplit command. + * @param {tenantIds} tenant ids of the shard split. + */ +const waitForGarbageCollectionForSplit = function(donorNodes, migrationId, tenantIds) { + jsTestLog("Wait for garbage collection"); + assert.soon(() => donorNodes.every(node => { + const donorDocumentDeleted = + node.getCollection(BasicServerlessTest.kConfigSplitDonorsNS).count({ + _id: migrationId + }) === 0; + const allAccessBlockersRemoved = tenantIds.every( + id => BasicServerlessTest.getTenantMigrationAccessBlocker({node, id}) == null); + + const result = donorDocumentDeleted && allAccessBlockersRemoved; + if (!result) { + const status = []; + if (!donorDocumentDeleted) { + status.push(`donor document to be deleted (docCount=${ + node.getCollection(BasicServerlessTest.kConfigSplitDonorsNS).count({ + _id: migrationId + })})`); + } + + if (!allAccessBlockersRemoved) { + const tenantsWithBlockers = tenantIds.filter( + id => BasicServerlessTest.getTenantMigrationAccessBlocker({node, id}) != null); + status.push(`access blockers to be removed (${tenantsWithBlockers})`); + } + } + return donorDocumentDeleted && allAccessBlockersRemoved; + }), + "tenant access blockers weren't removed", + 60 * 1000, + 1 * 1000); +}; + const runShardSplitCommand = function( replicaSet, cmdObj, retryOnRetryableErrors, enableDonorStartMigrationFsync) { let res; @@ -355,38 +394,7 @@ class BasicServerlessTest { * @param {tenantIds} tenant ids of the shard split. */ waitForGarbageCollection(migrationId, tenantIds) { - jsTestLog("Wait for garbage collection"); - const donorNodes = this.donor.nodes; - assert.soon(() => donorNodes.every(node => { - const donorDocumentDeleted = - node.getCollection(BasicServerlessTest.kConfigSplitDonorsNS).count({ - _id: migrationId - }) === 0; - const allAccessBlockersRemoved = tenantIds.every( - id => BasicServerlessTest.getTenantMigrationAccessBlocker({node, id}) == null); - - const result = donorDocumentDeleted && allAccessBlockersRemoved; - if (!result) { - const status = []; - if (!donorDocumentDeleted) { - status.push(`donor document to be deleted (docCount=${ - node.getCollection(BasicServerlessTest.kConfigSplitDonorsNS).count({ - _id: migrationId - })})`); - } - - if (!allAccessBlockersRemoved) { - const tenantsWithBlockers = - tenantIds.filter(id => BasicServerlessTest.getTenantMigrationAccessBlocker( - {node, id}) != null); - status.push(`access blockers to be removed (${tenantsWithBlockers})`); - } - } - return donorDocumentDeleted && allAccessBlockersRemoved; - }), - "tenant access blockers weren't removed", - 60 * 1000, - 1 * 1000); + return waitForGarbageCollectionForSplit(this.donor.nodes, migrationId, tenantIds); } /** diff --git a/jstests/serverless/libs/serverless_reject_multiple_ops_utils.js b/jstests/serverless/libs/serverless_reject_multiple_ops_utils.js new file mode 100644 index 00000000000..ca79b44778e --- /dev/null +++ b/jstests/serverless/libs/serverless_reject_multiple_ops_utils.js @@ -0,0 +1,80 @@ +/** + * Utility functions for serverless_reject_multiple_ops tests + * + * @tags: [ + * serverless, + * requires_fcv_52, + * featureFlagShardSplit, + * featureFlagShardMerge + * ] + */ + +load("jstests/replsets/rslib.js"); +load("jstests/libs/parallelTester.js"); + +function waitForMergeToComplete(migrationOpts, migrationId, test) { + // Assert that the migration has already been started. + assert(test.getDonorPrimary().getCollection(TenantMigrationTest.kConfigDonorsNS).findOne({ + _id: migrationId + })); + + const donorStartReply = test.runDonorStartMigration( + migrationOpts, {waitForMigrationToComplete: true, retryOnRetryableErrors: false}); + + return donorStartReply; +} + +function commitSplitAsync(rst, tenantIds, recipientTagName, recipientSetName, migrationId) { + jsTestLog("Running commitAsync command"); + + const rstArgs = createRstArgs(rst); + const migrationIdString = extractUUIDFromObject(migrationId); + + const thread = new Thread(runCommitSplitThreadWrapper, + rstArgs, + migrationIdString, + tenantIds, + recipientTagName, + recipientSetName, + false /* enableDonorStartMigrationFsync */); + thread.start(); + + return thread; +} + +function addRecipientNodes(rst, recipientTagName) { + const numNodes = 3; // default to three nodes + let recipientNodes = []; + const options = TenantMigrationUtil.makeX509OptionsForTest(); + jsTestLog(`Adding ${numNodes} non-voting recipient nodes to donor`); + for (let i = 0; i < numNodes; ++i) { + recipientNodes.push(rst.add(options.donor)); + } + + const primary = rst.getPrimary(); + const admin = primary.getDB('admin'); + const config = rst.getReplSetConfigFromNode(); + config.version++; + + // ensure recipient nodes are added as non-voting members + recipientNodes.forEach(node => { + config.members.push({ + host: node.host, + votes: 0, + priority: 0, + hidden: true, + tags: {[recipientTagName]: ObjectId().valueOf()} + }); + }); + + // reindex all members from 0 + config.members = config.members.map((member, idx) => { + member._id = idx; + return member; + }); + + assert.commandWorked(admin.runCommand({replSetReconfig: config})); + recipientNodes.forEach(node => rst.waitForState(node, ReplSetTest.State.SECONDARY)); + + return recipientNodes; +} diff --git a/jstests/serverless/serverless_reject_multiple_ops_access_blocker.js b/jstests/serverless/serverless_reject_multiple_ops_access_blocker.js new file mode 100644 index 00000000000..0c7107f9682 --- /dev/null +++ b/jstests/serverless/serverless_reject_multiple_ops_access_blocker.js @@ -0,0 +1,67 @@ +/** + * @tags: [ + * serverless, + * requires_fcv_52, + * featureFlagShardSplit, + * featureFlagShardMerge + * ] + */ + +load("jstests/replsets/libs/tenant_migration_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); +load("jstests/serverless/libs/basic_serverless_test.js"); +load("jstests/serverless/libs/serverless_reject_multiple_ops_utils.js"); +load("jstests/libs/uuid_util.js"); + +function cannotStartMigrationWhenThereIsAnExistingAccessBlocker(protocol) { + // Test that we cannot start a tenant migration for a tenant that already has an access blocker. + const recipientTagName = "recipientTag"; + const recipientSetName = "recipient"; + const tenantIds = ["tenant1", "tenant2"]; + const splitMigrationId = UUID(); + const tenantMigrationId = UUID(); + + // Ensure a high enough delay so the shard split document is not deleted before tenant migration + // is started. + sharedOptions = {}; + sharedOptions["setParameter"] = { + shardSplitGarbageCollectionDelayMS: 36000000, + ttlMonitorSleepSecs: 1 + }; + + const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions}); + + let recipientNodes = addRecipientNodes(test.getDonorRst(), recipientTagName); + + const commitThread = commitSplitAsync( + test.getDonorRst(), tenantIds, recipientTagName, recipientSetName, splitMigrationId); + assert.commandWorked(commitThread.returnData()); + + // Remove recipient nodes + test.getDonorRst().nodes = + test.getDonorRst().nodes.filter(node => !recipientNodes.includes(node)); + test.getDonorRst().ports = + test.getDonorRst().ports.filter(port => !recipientNodes.some(node => node.port === port)); + + assert.commandWorked(test.getDonorRst().getPrimary().adminCommand( + {forgetShardSplit: 1, migrationId: splitMigrationId})); + + const migrationOpts = { + migrationIdString: extractUUIDFromObject(tenantMigrationId), + protocol, + }; + if (protocol != "shard merge") { + migrationOpts["tenantId"] = tenantIds[0]; + } + assert.commandFailed(test.startMigration(migrationOpts)); + + recipientNodes.forEach(node => { + MongoRunner.stopMongod(node); + }); + + test.stop(); + jsTestLog("cannotStartMigrationWhenThereIsAnExistingAccessBlocker test completed"); +} + +cannotStartMigrationWhenThereIsAnExistingAccessBlocker("multitenant migrations"); +cannotStartMigrationWhenThereIsAnExistingAccessBlocker("shard merge"); diff --git a/jstests/serverless/serverless_reject_multiple_ops_migration_after_garbage_collection.js b/jstests/serverless/serverless_reject_multiple_ops_migration_after_garbage_collection.js new file mode 100644 index 00000000000..f237d507262 --- /dev/null +++ b/jstests/serverless/serverless_reject_multiple_ops_migration_after_garbage_collection.js @@ -0,0 +1,69 @@ +/** + * @tags: [ + * serverless, + * requires_fcv_52, + * featureFlagShardSplit, + * featureFlagShardMerge + * ] + */ + +load("jstests/replsets/libs/tenant_migration_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); +load("jstests/serverless/libs/basic_serverless_test.js"); +load("jstests/serverless/libs/serverless_reject_multiple_ops_utils.js"); +load("jstests/libs/parallelTester.js"); +load("jstests/libs/uuid_util.js"); + +function canStartMigrationAfterSplitGarbageCollection(protocol) { + // Test that we can start a migration after a shard split has been garbage collected. + const recipientTagName = "recipientTag"; + const recipientSetName = "recipient"; + const tenantIds = ["tenant1", "tenant2"]; + const splitMigrationId = UUID(); + const tenantMigrationId = UUID(); + + sharedOptions = {}; + sharedOptions["setParameter"] = {shardSplitGarbageCollectionDelayMS: 0, ttlMonitorSleepSecs: 1}; + + const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions}); + + let recipientNodes = addRecipientNodes(test.getDonorRst(), recipientTagName); + + const commitThread = commitSplitAsync( + test.getDonorRst(), tenantIds, recipientTagName, recipientSetName, splitMigrationId); + assert.commandWorked(commitThread.returnData()); + + // Remove recipient nodes + test.getDonorRst().nodes = + test.getDonorRst().nodes.filter(node => !recipientNodes.includes(node)); + test.getDonorRst().ports = + test.getDonorRst().ports.filter(port => !recipientNodes.some(node => node.port === port)); + + assert.commandWorked(test.getDonorRst().getPrimary().adminCommand( + {forgetShardSplit: 1, migrationId: splitMigrationId})); + + waitForGarbageCollectionForSplit(test.getDonorRst().nodes, splitMigrationId, tenantIds); + + jsTestLog("Starting tenant migration"); + const migrationOpts = { + migrationIdString: extractUUIDFromObject(tenantMigrationId), + protocol, + }; + if (protocol != "shard merge") { + migrationOpts["tenantId"] = tenantIds[0]; + } + assert.commandWorked(test.startMigration(migrationOpts)); + + TenantMigrationTest.assertCommitted(test.waitForMigrationToComplete(migrationOpts)); + assert.commandWorked(test.forgetMigration(migrationOpts.migrationIdString)); + + recipientNodes.forEach(node => { + MongoRunner.stopMongod(node); + }); + + test.stop(); + jsTestLog("canStartMigrationAfterSplitGarbageCollection test completed"); +} + +canStartMigrationAfterSplitGarbageCollection("multitenant migrations"); +canStartMigrationAfterSplitGarbageCollection("shard merge"); diff --git a/jstests/serverless/serverless_reject_multiple_ops_migration_different_tenant.js b/jstests/serverless/serverless_reject_multiple_ops_migration_different_tenant.js new file mode 100644 index 00000000000..55309f25710 --- /dev/null +++ b/jstests/serverless/serverless_reject_multiple_ops_migration_different_tenant.js @@ -0,0 +1,73 @@ +/** + * @tags: [ + * serverless, + * requires_fcv_52, + * featureFlagShardSplit, + * featureFlagShardMerge + * ] + */ + +load("jstests/replsets/libs/tenant_migration_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); +load("jstests/serverless/libs/basic_serverless_test.js"); +load("jstests/serverless/libs/serverless_reject_multiple_ops_utils.js"); +load("jstests/libs/uuid_util.js"); + +function cannotStartMigrationWithDifferentTenantWhileShardSplitIsInProgress(protocol) { + // Test that we cannot start a tenant migration while a shard split is in progress. Use a + // tenantId uninvolved in the split. + const recipientTagName = "recipientTag"; + const recipientSetName = "recipient"; + const tenantIds = ["tenant1", "tenant2"]; + const splitMigrationId = UUID(); + const tenantMigrationId = UUID(); + + sharedOptions = {}; + sharedOptions["setParameter"] = {shardSplitGarbageCollectionDelayMS: 0, ttlMonitorSleepSecs: 1}; + + const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions}); + + let recipientNodes = addRecipientNodes(test.getDonorRst(), recipientTagName); + + let fp = + configureFailPoint(test.getDonorRst().getPrimary(), "pauseShardSplitBeforeBlockingState"); + + const commitThread = commitSplitAsync( + test.getDonorRst(), tenantIds, recipientTagName, recipientSetName, splitMigrationId); + fp.wait(); + + const migrationOpts = { + migrationIdString: extractUUIDFromObject(tenantMigrationId), + protocol, + }; + if (protocol != "shard merge") { + migrationOpts["tenantId"] = "otherTenantToMove"; + } + jsTestLog("Starting tenant migration"); + assert.commandFailedWithCode(test.startMigration(migrationOpts), + ErrorCodes.ConflictingServerlessOperation); + + fp.off(); + + assert.commandWorked(commitThread.returnData()); + + test.getDonorRst().nodes = + test.getDonorRst().nodes.filter(node => !recipientNodes.includes(node)); + test.getDonorRst().ports = + test.getDonorRst().ports.filter(port => !recipientNodes.some(node => node.port === port)); + + assert.commandWorked(test.getDonorRst().getPrimary().adminCommand( + {forgetShardSplit: 1, migrationId: splitMigrationId})); + + recipientNodes.forEach(node => { + MongoRunner.stopMongod(node); + }); + + waitForGarbageCollectionForSplit(test.getDonorRst().nodes, splitMigrationId, tenantIds); + + test.stop(); + jsTestLog("cannotStartMigrationWithDifferentTenantWhileShardSplitIsInProgress test completed"); +} + +cannotStartMigrationWithDifferentTenantWhileShardSplitIsInProgress("multitenant migrations"); +cannotStartMigrationWithDifferentTenantWhileShardSplitIsInProgress("shard merge"); diff --git a/jstests/serverless/serverless_reject_multiple_ops_migration_donor.js b/jstests/serverless/serverless_reject_multiple_ops_migration_donor.js new file mode 100644 index 00000000000..f237d507262 --- /dev/null +++ b/jstests/serverless/serverless_reject_multiple_ops_migration_donor.js @@ -0,0 +1,69 @@ +/** + * @tags: [ + * serverless, + * requires_fcv_52, + * featureFlagShardSplit, + * featureFlagShardMerge + * ] + */ + +load("jstests/replsets/libs/tenant_migration_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); +load("jstests/serverless/libs/basic_serverless_test.js"); +load("jstests/serverless/libs/serverless_reject_multiple_ops_utils.js"); +load("jstests/libs/parallelTester.js"); +load("jstests/libs/uuid_util.js"); + +function canStartMigrationAfterSplitGarbageCollection(protocol) { + // Test that we can start a migration after a shard split has been garbage collected. + const recipientTagName = "recipientTag"; + const recipientSetName = "recipient"; + const tenantIds = ["tenant1", "tenant2"]; + const splitMigrationId = UUID(); + const tenantMigrationId = UUID(); + + sharedOptions = {}; + sharedOptions["setParameter"] = {shardSplitGarbageCollectionDelayMS: 0, ttlMonitorSleepSecs: 1}; + + const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions}); + + let recipientNodes = addRecipientNodes(test.getDonorRst(), recipientTagName); + + const commitThread = commitSplitAsync( + test.getDonorRst(), tenantIds, recipientTagName, recipientSetName, splitMigrationId); + assert.commandWorked(commitThread.returnData()); + + // Remove recipient nodes + test.getDonorRst().nodes = + test.getDonorRst().nodes.filter(node => !recipientNodes.includes(node)); + test.getDonorRst().ports = + test.getDonorRst().ports.filter(port => !recipientNodes.some(node => node.port === port)); + + assert.commandWorked(test.getDonorRst().getPrimary().adminCommand( + {forgetShardSplit: 1, migrationId: splitMigrationId})); + + waitForGarbageCollectionForSplit(test.getDonorRst().nodes, splitMigrationId, tenantIds); + + jsTestLog("Starting tenant migration"); + const migrationOpts = { + migrationIdString: extractUUIDFromObject(tenantMigrationId), + protocol, + }; + if (protocol != "shard merge") { + migrationOpts["tenantId"] = tenantIds[0]; + } + assert.commandWorked(test.startMigration(migrationOpts)); + + TenantMigrationTest.assertCommitted(test.waitForMigrationToComplete(migrationOpts)); + assert.commandWorked(test.forgetMigration(migrationOpts.migrationIdString)); + + recipientNodes.forEach(node => { + MongoRunner.stopMongod(node); + }); + + test.stop(); + jsTestLog("canStartMigrationAfterSplitGarbageCollection test completed"); +} + +canStartMigrationAfterSplitGarbageCollection("multitenant migrations"); +canStartMigrationAfterSplitGarbageCollection("shard merge"); diff --git a/jstests/serverless/serverless_reject_multiple_ops_migration_donor_retry.js b/jstests/serverless/serverless_reject_multiple_ops_migration_donor_retry.js new file mode 100644 index 00000000000..1ea49a99f06 --- /dev/null +++ b/jstests/serverless/serverless_reject_multiple_ops_migration_donor_retry.js @@ -0,0 +1,86 @@ +/** + * @tags: [ + * serverless, + * requires_fcv_52, + * featureFlagShardSplit, + * featureFlagShardMerge + * ] + */ + +load("jstests/replsets/libs/tenant_migration_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); +load("jstests/serverless/libs/basic_serverless_test.js"); +load("jstests/serverless/libs/serverless_reject_multiple_ops_utils.js"); +load("jstests/libs/uuid_util.js"); + +function retryMigrationAfterSplitCompletes(protocol) { + // Test that we cannot start a migration while a shard split is in progress. + const recipientTagName = "recipientTag"; + const recipientSetName = "recipient"; + const tenantIds = ["tenant1", "tenant2"]; + const splitMigrationId = UUID(); + const firstTenantMigrationId = UUID(); + const secondTenantMigrationId = UUID(); + + sharedOptions = {}; + sharedOptions["setParameter"] = {shardSplitGarbageCollectionDelayMS: 0, ttlMonitorSleepSecs: 1}; + + const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions}); + + const splitRst = test.getDonorRst(); + + let splitRecipientNodes = addRecipientNodes(splitRst, recipientTagName); + + let fp = configureFailPoint(splitRst.getPrimary(), "pauseShardSplitBeforeBlockingState"); + + const commitThread = + commitSplitAsync(splitRst, tenantIds, recipientTagName, recipientSetName, splitMigrationId); + fp.wait(); + + const firstMigrationOpts = { + migrationIdString: extractUUIDFromObject(firstTenantMigrationId), + protocol, + }; + if (protocol != "shard merge") { + firstMigrationOpts["tenantId"] = tenantIds[0]; + } + jsTestLog("Starting tenant migration"); + assert.commandFailedWithCode(test.startMigration(firstMigrationOpts), + ErrorCodes.ConflictingServerlessOperation); + + fp.off(); + + assert.commandWorked(commitThread.returnData()); + + splitRst.nodes = splitRst.nodes.filter(node => !splitRecipientNodes.includes(node)); + splitRst.ports = + splitRst.ports.filter(port => !splitRecipientNodes.some(node => node.port === port)); + + assert.commandWorked( + splitRst.getPrimary().adminCommand({forgetShardSplit: 1, migrationId: splitMigrationId})); + + splitRecipientNodes.forEach(node => { + MongoRunner.stopMongod(node); + }); + + const secondMigrationOpts = { + migrationIdString: extractUUIDFromObject(secondTenantMigrationId), + protocol, + }; + if (protocol != "shard merge") { + secondMigrationOpts["tenantId"] = tenantIds[0]; + } + jsTestLog("Starting tenant migration"); + assert.commandWorked(test.startMigration(secondMigrationOpts)); + TenantMigrationTest.assertCommitted( + waitForMergeToComplete(secondMigrationOpts, secondTenantMigrationId, test)); + assert.commandWorked(test.forgetMigration(secondMigrationOpts.migrationIdString)); + + waitForGarbageCollectionForSplit(splitRst.nodes, splitMigrationId, tenantIds); + + test.stop(); + jsTestLog("cannotStartMigrationWhileShardSplitIsInProgress test completed"); +} + +retryMigrationAfterSplitCompletes("multitenant migrations"); +retryMigrationAfterSplitCompletes("shard merge"); diff --git a/jstests/serverless/serverless_reject_multiple_ops_migration_fail.js b/jstests/serverless/serverless_reject_multiple_ops_migration_fail.js new file mode 100644 index 00000000000..2bf713c1cb4 --- /dev/null +++ b/jstests/serverless/serverless_reject_multiple_ops_migration_fail.js @@ -0,0 +1,72 @@ +/** + * @tags: [ + * serverless, + * requires_fcv_52, + * featureFlagShardSplit, + * featureFlagShardMerge + * ] + */ + +load("jstests/replsets/libs/tenant_migration_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); +load("jstests/serverless/libs/basic_serverless_test.js"); +load("jstests/serverless/libs/serverless_reject_multiple_ops_utils.js"); +load("jstests/libs/uuid_util.js"); + +function cannotStartMigrationWhileShardSplitIsInProgress(protocol) { + // Test that we cannot start a migration while a shard split is in progress. + const recipientTagName = "recipientTag"; + const recipientSetName = "recipient"; + const tenantIds = ["tenant1", "tenant2"]; + const splitMigrationId = UUID(); + const tenantMigrationId = UUID(); + + sharedOptions = {}; + sharedOptions["setParameter"] = {shardSplitGarbageCollectionDelayMS: 0, ttlMonitorSleepSecs: 1}; + + const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions}); + + const splitRst = test.getDonorRst(); + + let splitRecipientNodes = addRecipientNodes(splitRst, recipientTagName); + + let fp = configureFailPoint(splitRst.getPrimary(), "pauseShardSplitBeforeBlockingState"); + + const commitThread = + commitSplitAsync(splitRst, tenantIds, recipientTagName, recipientSetName, splitMigrationId); + fp.wait(); + + const migrationOpts = { + migrationIdString: extractUUIDFromObject(tenantMigrationId), + protocol, + }; + if (protocol != "shard merge") { + migrationOpts["tenantId"] = tenantIds[0]; + } + jsTestLog("Starting tenant migration"); + assert.commandFailedWithCode(test.startMigration(migrationOpts), + ErrorCodes.ConflictingServerlessOperation); + + fp.off(); + + assert.commandWorked(commitThread.returnData()); + + splitRst.nodes = splitRst.nodes.filter(node => !splitRecipientNodes.includes(node)); + splitRst.ports = + splitRst.ports.filter(port => !splitRecipientNodes.some(node => node.port === port)); + + assert.commandWorked( + splitRst.getPrimary().adminCommand({forgetShardSplit: 1, migrationId: splitMigrationId})); + + splitRecipientNodes.forEach(node => { + MongoRunner.stopMongod(node); + }); + + waitForGarbageCollectionForSplit(splitRst.nodes, splitMigrationId, tenantIds); + + test.stop(); + jsTestLog("cannotStartMigrationWhileShardSplitIsInProgress test completed"); +} + +cannotStartMigrationWhileShardSplitIsInProgress("multitenant migrations"); +cannotStartMigrationWhileShardSplitIsInProgress("shard merge"); diff --git a/jstests/serverless/serverless_reject_multiple_ops_migration_fail_on_recipient.js b/jstests/serverless/serverless_reject_multiple_ops_migration_fail_on_recipient.js new file mode 100644 index 00000000000..70447f44323 --- /dev/null +++ b/jstests/serverless/serverless_reject_multiple_ops_migration_fail_on_recipient.js @@ -0,0 +1,78 @@ +/** + * @tags: [ + * serverless, + * requires_fcv_52, + * featureFlagShardSplit, + * featureFlagShardMerge + * ] + */ + +load("jstests/replsets/libs/tenant_migration_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); +load("jstests/serverless/libs/basic_serverless_test.js"); +load("jstests/serverless/libs/serverless_reject_multiple_ops_utils.js"); +load("jstests/libs/uuid_util.js"); + +function cannotStartMigrationWhileShardSplitIsInProgressOnRecipient(protocol) { + // Test that we cannot start a migration while a shard split is in progress. + const recipientTagName = "recipientTag"; + const recipientSetName = "recipient"; + const tenantIds = ["tenant1", "tenant2"]; + const splitMigrationId = UUID(); + const tenantMigrationId = UUID(); + + sharedOptions = {}; + sharedOptions["setParameter"] = {shardSplitGarbageCollectionDelayMS: 0, ttlMonitorSleepSecs: 1}; + + const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions}); + + const splitRst = test.getRecipientRst(); + + let splitRecipientNodes = addRecipientNodes(splitRst, recipientTagName); + + let fp = configureFailPoint(splitRst.getPrimary(), "pauseShardSplitBeforeBlockingState"); + + const commitThread = + commitSplitAsync(splitRst, tenantIds, recipientTagName, recipientSetName, splitMigrationId); + fp.wait(); + + const migrationOpts = { + migrationIdString: extractUUIDFromObject(tenantMigrationId), + protocol, + }; + if (protocol != "shard merge") { + migrationOpts["tenantId"] = tenantIds[0]; + } + jsTestLog("Starting tenant migration"); + assert.commandWorked(test.startMigration(migrationOpts)); + + const result = assert.commandWorked(test.waitForMigrationToComplete(migrationOpts)); + assert.eq(result.state, "aborted"); + assert.eq(result.abortReason.code, ErrorCodes.ConflictingServerlessOperation); + + assert.commandWorked( + test.forgetMigration(migrationOpts.migrationIdString, false /* retryOnRetryableErrors */)); + + fp.off(); + + assert.commandWorked(commitThread.returnData()); + + splitRst.nodes = splitRst.nodes.filter(node => !splitRecipientNodes.includes(node)); + splitRst.ports = + splitRst.ports.filter(port => !splitRecipientNodes.some(node => node.port === port)); + + assert.commandWorked( + splitRst.getPrimary().adminCommand({forgetShardSplit: 1, migrationId: splitMigrationId})); + + splitRecipientNodes.forEach(node => { + MongoRunner.stopMongod(node); + }); + + waitForGarbageCollectionForSplit(splitRst.nodes, splitMigrationId, tenantIds); + + test.stop(); + jsTestLog("cannotStartMigrationWhileShardSplitIsInProgressOnRecipient test completed"); +} + +cannotStartMigrationWhileShardSplitIsInProgressOnRecipient("multitenant migrations"); +cannotStartMigrationWhileShardSplitIsInProgressOnRecipient("shard merge"); diff --git a/jstests/serverless/serverless_reject_multiple_ops_migration_recipient_retry.js b/jstests/serverless/serverless_reject_multiple_ops_migration_recipient_retry.js new file mode 100644 index 00000000000..f3bd83a5243 --- /dev/null +++ b/jstests/serverless/serverless_reject_multiple_ops_migration_recipient_retry.js @@ -0,0 +1,90 @@ +/** + * @tags: [ + * serverless, + * requires_fcv_52, + * featureFlagShardSplit, + * featureFlagShardMerge + * ] + */ + +load("jstests/replsets/libs/tenant_migration_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); +load("jstests/serverless/libs/basic_serverless_test.js"); +load("jstests/serverless/libs/serverless_reject_multiple_ops_utils.js"); +load("jstests/libs/uuid_util.js"); + +function cannotStartMigrationWhileShardSplitIsInProgressOnRecipient(protocol) { + // Test that we cannot start a migration while a shard split is in progress. + const recipientTagName = "recipientTag"; + const recipientSetName = "recipient"; + const tenantIds = ["tenant1", "tenant2"]; + const splitMigrationId = UUID(); + const tenantMigrationId = UUID(); + const secondTenantMigrationId = UUID(); + + sharedOptions = {}; + sharedOptions["setParameter"] = {shardSplitGarbageCollectionDelayMS: 0, ttlMonitorSleepSecs: 1}; + + const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions}); + + const splitRst = test.getRecipientRst(); + + let splitRecipientNodes = addRecipientNodes(splitRst, recipientTagName); + + let fp = configureFailPoint(splitRst.getPrimary(), "pauseShardSplitBeforeBlockingState"); + + const commitThread = + commitSplitAsync(splitRst, tenantIds, recipientTagName, recipientSetName, splitMigrationId); + fp.wait(); + + const migrationOpts = { + migrationIdString: extractUUIDFromObject(tenantMigrationId), + protocol, + }; + if (protocol != "shard merge") { + migrationOpts["tenantId"] = tenantIds[0]; + } + jsTestLog("Starting tenant migration"); + assert.commandWorked(test.startMigration(migrationOpts)); + + const result = assert.commandWorked(test.waitForMigrationToComplete(migrationOpts)); + assert.eq(result.state, "aborted"); + assert.eq(result.abortReason.code, ErrorCodes.ConflictingServerlessOperation); + assert.commandWorked(test.forgetMigration(migrationOpts.migrationIdString)); + + fp.off(); + + assert.commandWorked(commitThread.returnData()); + + splitRst.nodes = splitRst.nodes.filter(node => !splitRecipientNodes.includes(node)); + splitRst.ports = + splitRst.ports.filter(port => !splitRecipientNodes.some(node => node.port === port)); + + assert.commandWorked( + splitRst.getPrimary().adminCommand({forgetShardSplit: 1, migrationId: splitMigrationId})); + + splitRecipientNodes.forEach(node => { + MongoRunner.stopMongod(node); + }); + + const secondMigrationOpts = { + migrationIdString: extractUUIDFromObject(secondTenantMigrationId), + protocol, + }; + if (protocol != "shard merge") { + secondMigrationOpts["tenantId"] = tenantIds[0]; + } + jsTestLog("Starting tenant migration"); + assert.commandWorked(test.startMigration(secondMigrationOpts)); + TenantMigrationTest.assertCommitted( + waitForMergeToComplete(secondMigrationOpts, secondTenantMigrationId, test)); + assert.commandWorked(test.forgetMigration(secondMigrationOpts.migrationIdString)); + + waitForGarbageCollectionForSplit(splitRst.nodes, splitMigrationId, tenantIds); + + test.stop(); + jsTestLog("cannotStartMigrationWhileShardSplitIsInProgressOnRecipient test completed"); +} + +cannotStartMigrationWhileShardSplitIsInProgressOnRecipient("multitenant migrations"); +cannotStartMigrationWhileShardSplitIsInProgressOnRecipient("shard merge"); diff --git a/jstests/serverless/serverless_reject_multiple_ops_split.js b/jstests/serverless/serverless_reject_multiple_ops_split.js new file mode 100644 index 00000000000..71ea388a9a7 --- /dev/null +++ b/jstests/serverless/serverless_reject_multiple_ops_split.js @@ -0,0 +1,82 @@ +/** + * @tags: [ + * serverless, + * requires_fcv_52, + * featureFlagShardSplit, + * featureFlagShardMerge + * ] + */ + +load("jstests/replsets/libs/tenant_migration_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); +load("jstests/serverless/libs/basic_serverless_test.js"); +load("jstests/serverless/libs/serverless_reject_multiple_ops_utils.js"); +load("jstests/libs/uuid_util.js"); + +function cannotStartShardSplitWithMigrationInProgress( + {recipientTagName, protocol, shardSplitRst, test}) { + // Test that we cannot start a shard split while a migration is in progress. + const recipientSetName = "recipient"; + const tenantIds = ["tenant1", "tenant2"]; + const splitMigrationId = UUID(); + const tenantMigrationId = UUID(); + + let fp = configureFailPoint(test.getDonorRst().getPrimary(), + "pauseTenantMigrationBeforeLeavingDataSyncState"); + const migrationOpts = { + migrationIdString: extractUUIDFromObject(tenantMigrationId), + protocol, + }; + if (protocol != "shard merge") { + migrationOpts["tenantId"] = tenantIds[0]; + } + jsTestLog("Starting tenant migration"); + assert.commandWorked(test.startMigration(migrationOpts)); + + fp.wait(); + + const commitThread = commitSplitAsync( + shardSplitRst, tenantIds, recipientTagName, recipientSetName, splitMigrationId); + assert.commandFailed(commitThread.returnData()); + + fp.off(); + + TenantMigrationTest.assertCommitted( + waitForMergeToComplete(migrationOpts, tenantMigrationId, test)); + assert.commandWorked(test.forgetMigration(migrationOpts.migrationIdString)); + + jsTestLog("cannotStartShardSplitWithMigrationInProgress test completed"); +} + +sharedOptions = {}; +sharedOptions["setParameter"] = { + shardSplitGarbageCollectionDelayMS: 0, + tenantMigrationGarbageCollectionDelayMS: 0, + ttlMonitorSleepSecs: 1 +}; + +const recipientTagName = "recipientTag"; + +const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions}); +addRecipientNodes(test.getDonorRst(), recipientTagName); +addRecipientNodes(test.getRecipientRst(), recipientTagName); + +cannotStartShardSplitWithMigrationInProgress({ + recipientTagName, + protocol: "multitenant migrations", + shardSplitRst: test.getDonorRst(), + test +}); +cannotStartShardSplitWithMigrationInProgress( + {recipientTagName, protocol: "shard merge", shardSplitRst: test.getDonorRst(), test}); + +cannotStartShardSplitWithMigrationInProgress({ + recipientTagName, + protocol: "multitenant migrations", + shardSplitRst: test.getRecipientRst(), + test +}); +cannotStartShardSplitWithMigrationInProgress( + {recipientTagName, protocol: "shard merge", shardSplitRst: test.getRecipientRst(), test}); + +test.stop(); diff --git a/jstests/serverless/serverless_reject_multiple_ops_split_retry.js b/jstests/serverless/serverless_reject_multiple_ops_split_retry.js new file mode 100644 index 00000000000..b4716494af6 --- /dev/null +++ b/jstests/serverless/serverless_reject_multiple_ops_split_retry.js @@ -0,0 +1,127 @@ +/** + * @tags: [ + * serverless, + * requires_fcv_52, + * featureFlagShardSplit, + * featureFlagShardMerge + * ] + */ + +load("jstests/replsets/libs/tenant_migration_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); +load("jstests/serverless/libs/basic_serverless_test.js"); +load("jstests/serverless/libs/serverless_reject_multiple_ops_utils.js"); +load("jstests/libs/uuid_util.js"); + +function retrySplit({protocol, recipientTagName, recipientSetName, tenantIds, test, splitRst}) { + const tenantMigrationId = UUID(); + const firstSplitMigrationId = UUID(); + const secondSplitMigrationId = UUID(); + + let recipientNodes = addRecipientNodes(splitRst, recipientTagName); + + let fp = configureFailPoint(test.getDonorRst().getPrimary(), + "pauseTenantMigrationBeforeLeavingDataSyncState"); + + const migrationOpts = { + migrationIdString: extractUUIDFromObject(tenantMigrationId), + protocol, + }; + if (protocol != "shard merge") { + migrationOpts["tenantId"] = tenantIds[0]; + } + jsTestLog("Starting tenant migration"); + assert.commandWorked(test.startMigration(migrationOpts)); + + fp.wait(); + + const commitThread = commitSplitAsync( + splitRst, tenantIds, recipientTagName, recipientSetName, firstSplitMigrationId); + assert.commandFailed(commitThread.returnData()); + + fp.off(); + + TenantMigrationTest.assertCommitted( + waitForMergeToComplete(migrationOpts, tenantMigrationId, test)); + assert.commandWorked(test.forgetMigration(migrationOpts.migrationIdString)); + + // Potential race condition as we do not know how quickly the future continuation in + // PrimaryOnlyService will remove the instance from its map. + sleep(1000); + const secondCommitThread = commitSplitAsync( + splitRst, tenantIds, recipientTagName, recipientSetName, secondSplitMigrationId); + assert.commandWorked(secondCommitThread.returnData()); + + splitRst.nodes = splitRst.nodes.filter(node => !recipientNodes.includes(node)); + splitRst.ports = + splitRst.ports.filter(port => !recipientNodes.some(node => node.port === port)); + + assert.commandWorked(splitRst.getPrimary().getDB("admin").runCommand( + {forgetShardSplit: 1, migrationId: secondSplitMigrationId})); + + recipientNodes.forEach(node => { + MongoRunner.stopMongod(node); + }); +} + +// Test that we cannot start a shard split while a migration is in progress. +const recipientTagName = "recipientTag"; +const recipientSetName = "recipient"; +const tenantIds = ["tenant1", "tenant2"]; + +sharedOptions = { + setParameter: { + shardSplitGarbageCollectionDelayMS: 0, + tenantMigrationGarbageCollectionDelayMS: 0, + ttlMonitorSleepSecs: 1 + } +}; + +const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions}); + +// "multitenant migration" with shard split on donor +retrySplit({ + protocol: "multitenant migrations", + recipientTagName, + recipientSetName, + tenantIds, + test, + splitRst: test.getDonorRst() +}); + +// "multitenant migration" with shard split on recipient +retrySplit({ + protocol: "multitenant migrations", + recipientTagName, + recipientSetName, + tenantIds, + test, + splitRst: test.getRecipientRst() +}); + +// "shard merge" with shard split on donor +retrySplit({ + protocol: "shard merge", + recipientTagName, + recipientSetName, + tenantIds, + test, + splitRst: test.getDonorRst() +}); + +test.stop(); + +// We need a new test for the next shard merge as adding nodes will cause a crash. +const test2 = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions}); + +// "shard merge" with shard split on recipient +retrySplit({ + protocol: "multitenant migrations", + recipientTagName, + recipientSetName, + tenantIds, + test: test2, + splitRst: test2.getDonorRst() +}); + +test2.stop(); diff --git a/jstests/serverless/serverless_reject_multiple_ops_split_success.js b/jstests/serverless/serverless_reject_multiple_ops_split_success.js new file mode 100644 index 00000000000..4f77bae0293 --- /dev/null +++ b/jstests/serverless/serverless_reject_multiple_ops_split_success.js @@ -0,0 +1,61 @@ +/** + * @tags: [ + * serverless, + * requires_fcv_52, + * featureFlagShardSplit, + * featureFlagShardMerge + * ] + */ + +load("jstests/replsets/libs/tenant_migration_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); +load("jstests/serverless/libs/basic_serverless_test.js"); +load("jstests/serverless/libs/serverless_reject_multiple_ops_utils.js"); +load("jstests/libs/uuid_util.js"); + +function canStartShardSplitWithAbortedMigration({protocol, runOnRecipient}) { + const recipientTagName = "recipientTag"; + const recipientSetName = "recipient"; + const tenantIds = ["tenant1", "tenant2"]; + const splitMigrationId = UUID(); + const tenantMigrationId = UUID(); + + sharedOptions = {}; + sharedOptions["setParameter"] = {shardSplitGarbageCollectionDelayMS: 0, ttlMonitorSleepSecs: 1}; + + const test = new TenantMigrationTest({quickGarbageCollection: false, sharedOptions}); + + const shardSplitRst = runOnRecipient ? test.getRecipientRst() : test.getDonorRst(); + + let recipientNodes = addRecipientNodes(shardSplitRst, recipientTagName); + + let fp = configureFailPoint(test.getDonorRst().getPrimary(), + "abortTenantMigrationBeforeLeavingBlockingState"); + const migrationOpts = { + migrationIdString: extractUUIDFromObject(tenantMigrationId), + protocol, + }; + if (protocol != "shard merge") { + migrationOpts["tenantId"] = tenantIds[0]; + } + jsTestLog("Starting tenant migration"); + assert.commandWorked(test.startMigration(migrationOpts)); + + TenantMigrationTest.assertAborted( + waitForMergeToComplete(migrationOpts, tenantMigrationId, test)); + assert.commandWorked(test.forgetMigration(migrationOpts.migrationIdString)); + + const commitThread = commitSplitAsync( + shardSplitRst, tenantIds, recipientTagName, recipientSetName, splitMigrationId); + assert.commandWorked(commitThread.returnData()); + + recipientNodes.forEach(node => { + MongoRunner.stopMongod(node); + }); + + test.stop(); + jsTestLog("canStartShardSplitWithAbortedMigration test completed"); +} + +canStartShardSplitWithAbortedMigration({protocol: "multitenant migrations", runOnRecipient: false}); +canStartShardSplitWithAbortedMigration({protocol: "shard merge", runOnRecipient: false}); diff --git a/jstests/serverless/shard_split_recipient_removes_access_blockers.js b/jstests/serverless/shard_split_recipient_removes_access_blockers.js index 546b44c4d08..5398c8aba05 100644 --- a/jstests/serverless/shard_split_recipient_removes_access_blockers.js +++ b/jstests/serverless/shard_split_recipient_removes_access_blockers.js @@ -1,5 +1,5 @@ /* - * Test that tenant access blockers are removed from recipients when applying the recipient config. + * Test that tenant access blockers are removed when applying the recipient config * * @tags: [requires_fcv_52, featureFlagShardSplit, serverless] */ diff --git a/jstests/serverless/shard_split_recipient_removes_serverless_lock.js b/jstests/serverless/shard_split_recipient_removes_serverless_lock.js new file mode 100644 index 00000000000..adb55c8753e --- /dev/null +++ b/jstests/serverless/shard_split_recipient_removes_serverless_lock.js @@ -0,0 +1,49 @@ +/* + * Test the serverless operation lock is released from recipients when the state document is + * removed. + * + * @tags: [requires_fcv_62, featureFlagShardSplit, serverless] + */ + +load("jstests/libs/fail_point_util.js"); +load("jstests/serverless/libs/basic_serverless_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); +const {ServerlessLockType, getServerlessOperationLock} = TenantMigrationUtil; + +(function() { +"use strict"; + +// Skip db hash check because secondary is left with a different config. +TestData.skipCheckDBHashes = true; + +const test = new BasicServerlessTest({ + recipientTagName: "recipientNode", + recipientSetName: "recipient", + quickGarbageCollection: true +}); +test.addRecipientNodes(); + +const donorPrimary = test.donor.getPrimary(); +const tenantIds = ["tenant1", "tenant2"]; +const operation = test.createSplitOperation(tenantIds); + +const donorAfterBlockingFailpoint = + configureFailPoint(donorPrimary.getDB("admin"), "pauseShardSplitAfterBlocking"); + +const commitOp = operation.commitAsync(); +donorAfterBlockingFailpoint.wait(); + +jsTestLog("Asserting recipient nodes have installed the serverless lock"); +assert.soon(() => test.recipientNodes.every(node => getServerlessOperationLock(node) === + ServerlessLockType.ShardSplitDonor)); +donorAfterBlockingFailpoint.off(); + +commitOp.join(); +assert.commandWorked(commitOp.returnData()); + +jsTestLog("Asserting the serverless exclusion lock has been released"); +assert.soon(() => test.recipientNodes.every(node => getServerlessOperationLock(node) == + ServerlessLockType.None)); + +test.stop(); +})(); diff --git a/jstests/serverless/shard_split_startup_recovery_initially_aborted.js b/jstests/serverless/shard_split_startup_recovery_initially_aborted.js index f6301fa7f9a..faf0245b41a 100644 --- a/jstests/serverless/shard_split_startup_recovery_initially_aborted.js +++ b/jstests/serverless/shard_split_startup_recovery_initially_aborted.js @@ -1,7 +1,9 @@ /** - * Starts a shard split througt `abortShardSplit` and assert that no tenant access blockers are + * Starts a shard split through `abortShardSplit` and assert that no tenant access blockers are * recovered since we do not recover access blockers for aborted split marked garbage collectable. - * @tags: [requires_fcv_52, featureFlagShardSplit] + * Also verifies the serverless operation lock is not acquired when starting a split in aborted + * state. + * @tags: [requires_fcv_62, featureFlagShardSplit] */ load("jstests/libs/fail_point_util.js"); // for "configureFailPoint" @@ -9,6 +11,8 @@ load('jstests/libs/parallel_shell_helpers.js'); // for "startPa load("jstests/noPassthrough/libs/server_parameter_helpers.js"); // for "setParameter" load("jstests/serverless/libs/basic_serverless_test.js"); load("jstests/replsets/libs/tenant_migration_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); +const {ServerlessLockType, getServerlessOperationLock} = TenantMigrationUtil; (function() { "use strict"; @@ -59,5 +63,8 @@ tenantIds.every(tenantId => { {node: donorPrimary, tenantId: tenantId})); }); +// We do not acquire the lock for document marked for garbage collection +assert.eq(getServerlessOperationLock(donorPrimary), ServerlessLockType.None); + test.stop(); })(); diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index 216b40d2a9e..755ae596893 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -494,13 +494,14 @@ error_codes: - {code: 381, name: ReshardingCoordinatorServiceConflictingOperationInProgress, extra: ReshardingCoordinatorServiceConflictingOperationInProgressInfo, categories: [InternalOnly]} - - {code: 382, name: RemoteCommandExecutionError, extra: RemoteCommandExecutionErrorInfo, categories: [InternalOnly]} - {code: 383, name: CollectionIsEmptyLocally, categories: [InternalOnly]} - {code: 384, name: ConnectionError, categories: [NetworkError,RetriableError,InternalOnly]} + - {code: 385, name: ConflictingServerlessOperation} + # Error codes 4000-8999 are reserved. # Non-sequential error codes for compatibility only) diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 54bb5718fa1..f4c2d2d9447 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -548,6 +548,7 @@ env.Library( '$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker', '$BUILD_DIR/mongo/db/s/sharding_runtime_d', '$BUILD_DIR/mongo/db/server_base', + '$BUILD_DIR/mongo/db/serverless/serverless_lock', '$BUILD_DIR/mongo/db/session/kill_sessions_local', '$BUILD_DIR/mongo/db/session/session_catalog_mongod', '$BUILD_DIR/mongo/db/storage/historical_ident_tracker', @@ -763,6 +764,7 @@ env.Library( '$BUILD_DIR/mongo/db/commands/test_commands_enabled', '$BUILD_DIR/mongo/db/index_builds_coordinator_interface', '$BUILD_DIR/mongo/db/server_base', + '$BUILD_DIR/mongo/db/serverless/serverless_lock', '$BUILD_DIR/mongo/db/session/session_catalog', '$BUILD_DIR/mongo/db/storage/journal_flusher', 'delayable_timeout_callback', @@ -1245,6 +1247,7 @@ env.Library( LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/index_builds_coordinator_interface', '$BUILD_DIR/mongo/db/server_base', + '$BUILD_DIR/mongo/db/serverless/serverless_lock', '$BUILD_DIR/mongo/db/session/session_catalog_mongod', '$BUILD_DIR/mongo/executor/scoped_task_executor', 'repl_server_parameters', @@ -1413,6 +1416,7 @@ env.Library( '$BUILD_DIR/mongo/db/multitenancy', '$BUILD_DIR/mongo/db/ops/write_ops_exec', '$BUILD_DIR/mongo/db/pipeline/process_interface/mongo_process_interface', + '$BUILD_DIR/mongo/db/serverless/serverless_lock', '$BUILD_DIR/mongo/db/session/session_catalog_mongod', '$BUILD_DIR/mongo/db/storage/wiredtiger/storage_wiredtiger_import', '$BUILD_DIR/mongo/db/transaction/transaction', @@ -1485,6 +1489,7 @@ env.Library( "$BUILD_DIR/mongo/db/catalog/local_oplog_info", "$BUILD_DIR/mongo/db/concurrency/exception_util", "$BUILD_DIR/mongo/db/index_builds_coordinator_interface", + "$BUILD_DIR/mongo/db/serverless/serverless_lock", ], ) diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index 361e6aabe1f..cd562d19af5 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -63,6 +63,7 @@ #include "mongo/db/repl/sync_source_selector.h" #include "mongo/db/repl/tenant_migration_access_blocker_util.h" #include "mongo/db/repl/transaction_oplog_application.h" +#include "mongo/db/serverless/serverless_operation_lock_registry.h" #include "mongo/db/session/session_txn_record_gen.h" #include "mongo/executor/task_executor.h" #include "mongo/executor/thread_pool_task_executor.h" @@ -577,6 +578,7 @@ void InitialSyncer::_tearDown_inlock(OperationContext* opCtx, _storage->oplogDiskLocRegister(opCtx, initialDataTimestamp, orderedCommit); tenant_migration_access_blocker::recoverTenantMigrationAccessBlockers(opCtx); + ServerlessOperationLockRegistry::recoverLocks(opCtx); reconstructPreparedTransactions(opCtx, repl::OplogApplication::Mode::kInitialSync); _replicationProcess->getConsistencyMarkers()->setInitialSyncIdIfNotSet(opCtx); diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index e98b65b9d76..8fde4801165 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -2049,6 +2049,7 @@ TEST_F( "skipRecoverTenantMigrationAccessBlockers"); FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); // Start the real work. ASSERT_OK(initialSyncer->startup(opCtx.get(), initialSyncMaxAttempts)); @@ -2091,6 +2092,8 @@ TEST_F(InitialSyncerTest, "skipRecoverTenantMigrationAccessBlockers"); FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); + { executor::NetworkInterfaceMock::InNetworkGuard guard(net); @@ -2199,6 +2202,7 @@ TEST_F(InitialSyncerTest, "skipRecoverTenantMigrationAccessBlockers"); FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); @@ -2270,6 +2274,7 @@ TEST_F( "skipRecoverTenantMigrationAccessBlockers"); FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); @@ -2581,6 +2586,7 @@ TEST_F(InitialSyncerTest, InitialSyncerRetriesLastOplogEntryFetcherNetworkError) "skipRecoverTenantMigrationAccessBlockers"); FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); @@ -3225,6 +3231,8 @@ TEST_F(InitialSyncerTest, InitialSyncerHandlesNetworkErrorsFromRollbackCheckerAf "skipRecoverTenantMigrationAccessBlockers"); FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); + auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); @@ -3539,6 +3547,7 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter "skipRecoverTenantMigrationAccessBlockers"); FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); @@ -4204,6 +4213,7 @@ TEST_F(InitialSyncerTest, "skipRecoverTenantMigrationAccessBlockers"); FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); doSuccessfulInitialSyncWithOneBatch(); } @@ -4219,6 +4229,7 @@ TEST_F(InitialSyncerTest, "skipRecoverTenantMigrationAccessBlockers"); FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); @@ -4552,6 +4563,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { "skipRecoverTenantMigrationAccessBlockers"); FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); // Skip clearing initial sync progress so that we can check initialSyncStatus fields after // initial sync is complete. @@ -4921,6 +4933,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgressForNetwork "skipRecoverTenantMigrationAccessBlockers"); FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); // Skip clearing initial sync progress so that we can check initialSyncStatus fields after // initial sync is complete. diff --git a/src/mongo/db/repl/primary_only_service.cpp b/src/mongo/db/repl/primary_only_service.cpp index c15f88c3df0..da06257acdf 100644 --- a/src/mongo/db/repl/primary_only_service.cpp +++ b/src/mongo/db/repl/primary_only_service.cpp @@ -777,6 +777,16 @@ std::shared_ptr<PrimaryOnlyService::Instance> PrimaryOnlyService::_insertNewInst return instance->run(std::move(scopedExecutor), std::move(token)); }) + // TODO SERVER-61717 remove this error handler once instance are automatically released + // at the end of run() + .onError<ErrorCodes::ConflictingServerlessOperation>([this, instanceID](Status status) { + LOGV2(6531507, + "Removing instance due to ConflictingServerlessOperation error", + "instanceID"_attr = instanceID); + releaseInstance(instanceID, Status::OK()); + + return status; + }) .semi(); auto [it, inserted] = _activeInstances.try_emplace( diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index a943637c2e5..2feb1ed6b7b 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -88,6 +88,7 @@ #include "mongo/db/repl/update_position_args.h" #include "mongo/db/repl/vote_requester.h" #include "mongo/db/server_options.h" +#include "mongo/db/serverless/serverless_operation_lock_registry.h" #include "mongo/db/session/kill_sessions_local.h" #include "mongo/db/session/session_catalog.h" #include "mongo/db/shutdown_in_progress_quiesce_info.h" @@ -534,6 +535,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig( } tenant_migration_access_blocker::recoverTenantMigrationAccessBlockers(opCtx); + ServerlessOperationLockRegistry::recoverLocks(opCtx); LOGV2(4280506, "Reconstructing prepared transactions"); reconstructPreparedTransactions(opCtx, OplogApplication::Mode::kRecovering); diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index 101ddcf0bb3..9f7f2e5863d 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -192,6 +192,8 @@ void ReplCoordTest::start() { // Skip recovering user writes critical sections for the same reason as the above. FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + // Skip recovering of serverless mutual exclusion locks for the same reason as the above. + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); invariant(!_callShutdown); // if we haven't initialized yet, do that first. if (!_repl) { diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp index fa3d51489a8..2fff5e05a99 100644 --- a/src/mongo/db/repl/rollback_impl.cpp +++ b/src/mongo/db/repl/rollback_impl.cpp @@ -58,6 +58,7 @@ #include "mongo/db/repl/transaction_oplog_application.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/db/server_recovery.h" +#include "mongo/db/serverless/serverless_operation_lock_registry.h" #include "mongo/db/session/kill_sessions_local.h" #include "mongo/db/session/session_catalog_mongod.h" #include "mongo/db/session/session_txn_record_gen.h" @@ -652,6 +653,7 @@ void RollbackImpl::_runPhaseFromAbortToReconstructPreparedTxns( _correctRecordStoreCounts(opCtx); tenant_migration_access_blocker::recoverTenantMigrationAccessBlockers(opCtx); + ServerlessOperationLockRegistry::recoverLocks(opCtx); // Reconstruct prepared transactions after counts have been adjusted. Since prepared // transactions were aborted (i.e. the in-memory counts were rolled-back) before computing diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp index 85e3f012bf0..360c0599db6 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp +++ b/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp @@ -88,7 +88,7 @@ void TenantMigrationAccessBlockerRegistry::add(StringData tenantId, if (it != _tenantMigrationAccessBlockers.end()) { auto existingMtab = it->second.getAccessBlocker(mtabType); if (existingMtab) { - tasserted(ErrorCodes::ConflictingOperationInProgress, + uasserted(ErrorCodes::ConflictingServerlessOperation, str::stream() << "This node is already a " << (mtabType == MtabType::kDonor ? "donor" : "recipient") << " for tenantId \"" << tenantId << "\" with migrationId \"" @@ -121,7 +121,7 @@ void TenantMigrationAccessBlockerRegistry::add(std::shared_ptr<TenantMigrationAc std::find_if(_tenantMigrationAccessBlockers.begin(), _tenantMigrationAccessBlockers.end(), [](const auto& pair) { return pair.second.getDonorAccessBlocker().get(); }); - tassert(6114105, + uassert(ErrorCodes::ConflictingServerlessOperation, str::stream() << "Trying to add donor blocker for all tenants when this node already has a donor " "blocker for \"" diff --git a/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp b/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp index 491f6e15753..65f765cf94e 100644 --- a/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp @@ -34,6 +34,7 @@ #include "mongo/db/repl/tenant_migration_decoration.h" #include "mongo/db/repl/tenant_migration_donor_op_observer.h" #include "mongo/db/repl/tenant_migration_state_machine_gen.h" +#include "mongo/db/serverless/serverless_operation_lock_registry.h" #include "mongo/logv2/log.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication @@ -54,6 +55,10 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx, const TenantMigrationDonorDocument& donorStateDoc) { invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kAbortingIndexBuilds); + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .acquireLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, + donorStateDoc.getId()); + auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>(opCtx->getServiceContext(), donorStateDoc.getId()); if (donorStateDoc.getProtocol().value_or(MigrationProtocolEnum::kMultitenantMigrations) == @@ -69,6 +74,9 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx, TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .remove(donorStateDoc.getTenantId(), TenantMigrationAccessBlocker::BlockerType::kDonor); + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, + donorStateDoc.getId()); }); } } else { @@ -85,6 +93,9 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx, TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .removeAccessBlockersForMigration( donorStateDoc.getId(), TenantMigrationAccessBlocker::BlockerType::kDonor); + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, + donorStateDoc.getId()); }); } } @@ -155,6 +166,10 @@ public: void commit(OperationContext* opCtx, boost::optional<Timestamp>) override { if (_donorStateDoc.getExpireAt()) { + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, + _donorStateDoc.getId()); + auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker( opCtx->getServiceContext(), _donorStateDoc.getTenantId()); @@ -338,6 +353,9 @@ repl::OpTime TenantMigrationDonorOpObserver::onDropCollection(OperationContext* opCtx->recoveryUnit()->onCommit([opCtx](boost::optional<Timestamp>) { TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .removeAll(TenantMigrationAccessBlocker::BlockerType::kDonor); + + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .onDropStateCollection(ServerlessOperationLockRegistry::LockType::kTenantDonor); }); } return {}; diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index 1d3f4cdf1e2..0da1de32c4e 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -123,7 +123,6 @@ void checkForTokenInterrupt(const CancellationToken& token) { uassert(ErrorCodes::CallbackCanceled, "Donor service interrupted", !token.isCanceled()); } - template <class Promise> void setPromiseFromStatusIfNotReady(WithLock lk, Promise& promise, Status status) { if (promise.getFuture().isReady()) { @@ -155,6 +154,17 @@ void setPromiseOkIfNotReady(WithLock lk, Promise& promise) { promise.emplaceValue(); } +bool isNotDurableAndServerlessConflict(WithLock lk, SharedPromise<void>& promise) { + auto future = promise.getFuture(); + + if (!future.isReady() || + future.getNoThrow().code() != ErrorCodes::ConflictingServerlessOperation) { + return false; + } + + return true; +} + } // namespace void TenantMigrationDonorService::checkIfConflictsWithOtherInstances( @@ -515,7 +525,16 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertState return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); }) - .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); }) + .until([&](StatusWith<repl::OpTime> swOpTime) { + if (swOpTime.getStatus().code() == ErrorCodes::ConflictingServerlessOperation) { + LOGV2(6531508, + "Tenant migration completed due to serverless lock error", + "id"_attr = _migrationUuid, + "status"_attr = swOpTime.getStatus()); + uassertStatusOK(swOpTime); + } + return swOpTime.getStatus().isOK(); + }) .withBackoffBetweenIterations(kExponentialBackoff) .on(**executor, token); } @@ -950,6 +969,8 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( ->incTotalMigrationDonationsCommitted(); } } + + return Status::OK(); }) .then([this, self = shared_from_this(), executor, token, recipientTargeterRS] { return _waitForForgetMigrationThenMarkMigrationGarbageCollectable( @@ -977,6 +998,13 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( "tenantId"_attr = _tenantId, "status"_attr = status, "abortReason"_attr = _abortReason); + + // If a ConflictingServerlessOperation was thrown during the initial insertion we do not + // have a state document. In that case return the error to PrimaryOnlyService so it + // frees the instance from its map. + if (isNotDurableAndServerlessConflict(lg, _initialDonorStateDurablePromise)) { + uassertStatusOK(_initialDonorStateDurablePromise.getFuture().getNoThrow()); + } }) .semi(); } @@ -1363,7 +1391,6 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_handleErrorOrEnterA checkForTokenInterrupt(token); { - stdx::lock_guard<Latch> lg(_mutex); if (_stateDoc.getState() == TenantMigrationDonorStateEnum::kAborted) { // The migration was resumed on stepup and it was already aborted. return ExecutorFuture(**executor); @@ -1420,6 +1447,21 @@ TenantMigrationDonorService::Instance::_waitForForgetMigrationThenMarkMigrationG const std::shared_ptr<executor::ScopedTaskExecutor>& executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, const CancellationToken& token) { + const bool skipWaitingForForget = [&]() { + stdx::lock_guard<Latch> lg(_mutex); + if (!isNotDurableAndServerlessConflict(lg, _initialDonorStateDurablePromise)) { + return false; + } + setPromiseErrorIfNotReady(lg, + _receiveDonorForgetMigrationPromise, + _initialDonorStateDurablePromise.getFuture().getNoThrow()); + return true; + }(); + + if (skipWaitingForForget) { + return ExecutorFuture(**executor); + } + LOGV2(6104909, "Waiting to receive 'donorForgetMigration' command.", "migrationId"_attr = _migrationUuid, @@ -1445,6 +1487,16 @@ TenantMigrationDonorService::Instance::_waitForForgetMigrationThenMarkMigrationG return std::move(_receiveDonorForgetMigrationPromise.getFuture()) .thenRunOn(**executor) .then([this, self = shared_from_this(), executor, recipientTargeterRS, token] { + { + // If the abortReason is ConflictingServerlessOperation, it means there are no + // document on the recipient. Do not send the forget command. + stdx::lock_guard<Latch> lg(_mutex); + if (_abortReason && + _abortReason->code() == ErrorCodes::ConflictingServerlessOperation) { + return ExecutorFuture(**executor); + } + } + LOGV2(6104910, "Waiting for recipientForgetMigration response.", "migrationId"_attr = _migrationUuid, @@ -1487,6 +1539,12 @@ TenantMigrationDonorService::Instance::_waitForForgetMigrationThenMarkMigrationG ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForGarbageCollectionDelayThenDeleteStateDoc( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancellationToken& token) { + // If the state document was not inserted due to a conflicting serverless operation, do not + // try to delete it. + stdx::lock_guard<Latch> lg(_mutex); + if (isNotDurableAndServerlessConflict(lg, _initialDonorStateDurablePromise)) { + return ExecutorFuture(**executor); + } LOGV2(8423362, "Waiting for garbage collection delay before deleting state document", @@ -1494,7 +1552,6 @@ TenantMigrationDonorService::Instance::_waitForGarbageCollectionDelayThenDeleteS "tenantId"_attr = _tenantId, "expireAt"_attr = *_stateDoc.getExpireAt()); - stdx::lock_guard<Latch> lg(_mutex); return (*executor) ->sleepUntil(*_stateDoc.getExpireAt(), token) .then([this, self = shared_from_this(), executor, token]() { diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp index e2a047876bf..19f5f6ab71c 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp @@ -41,6 +41,7 @@ #include "mongo/db/repl/tenant_migration_shard_merge_util.h" #include "mongo/db/repl/tenant_migration_state_machine_gen.h" #include "mongo/db/repl/tenant_migration_util.h" +#include "mongo/db/serverless/serverless_operation_lock_registry.h" #include "mongo/logv2/log.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication @@ -179,6 +180,18 @@ void TenantMigrationRecipientOpObserver::onInserts( std::vector<InsertStatement>::const_iterator first, std::vector<InsertStatement>::const_iterator last, bool fromMigrate) { + if (coll->ns() == NamespaceString::kTenantMigrationRecipientsNamespace && + !tenant_migration_access_blocker::inRecoveryMode(opCtx)) { + for (auto it = first; it != last; it++) { + auto recipientStateDoc = TenantMigrationRecipientDocument::parse( + IDLParserContext("recipientStateDoc"), it->doc); + if (!recipientStateDoc.getExpireAt()) { + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .acquireLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient, + recipientStateDoc.getId()); + } + } + } if (!shard_merge_utils::isDonatedFilesCollection(coll->ns())) { return; @@ -204,6 +217,10 @@ void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx, repl::TenantFileImporterService::get(opCtx->getServiceContext()) ->interrupt(recipientStateDoc.getId()); + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient, + recipientStateDoc.getId()); + std::vector<std::string> tenantIdsToRemove; auto cleanUpBlockerIfGarbage = [&](std::string tenantId, std::shared_ptr<TenantMigrationAccessBlocker>& mtab) { @@ -312,6 +329,9 @@ repl::OpTime TenantMigrationRecipientOpObserver::onDropCollection( repl::TenantFileImporterService::get(opCtx->getServiceContext())->interruptAll(); TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .removeAll(TenantMigrationAccessBlocker::BlockerType::kRecipient); + + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .onDropStateCollection(ServerlessOperationLockRegistry::LockType::kTenantRecipient); }); } return {}; diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 869670c6ca3..ab7e7ef12e1 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -2968,7 +2968,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( // Handle recipientForgetMigration. stdx::lock_guard lk(_mutex); if (_stateDoc.getExpireAt() || - MONGO_unlikely(autoRecipientForgetMigration.shouldFail())) { + MONGO_unlikely(autoRecipientForgetMigration.shouldFail()) || + status.code() == ErrorCodes::ConflictingServerlessOperation) { // Skip waiting for the recipientForgetMigration command. setPromiseOkifNotReady(lk, _receivedRecipientForgetMigrationPromise); } @@ -3018,7 +3019,16 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( // is safe even on shutDown/stepDown. stdx::lock_guard lk(_mutex); invariant(_dataSyncCompletionPromise.getFuture().isReady()); - if (!status.isOK()) { + + if (status.code() == ErrorCodes::ConflictingServerlessOperation) { + LOGV2(6531506, + "Migration failed as another serverless operation was in progress", + "migrationId"_attr = getMigrationUUID(), + "tenantId"_attr = getTenantId(), + "status"_attr = status); + setPromiseOkifNotReady(lk, _forgetMigrationDurablePromise); + return status; + } else if (!status.isOK()) { // We should only hit here on a stepDown/shutDown, or a 'conflicting migration' // error. LOGV2(4881402, @@ -3029,6 +3039,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( setPromiseErrorifNotReady(lk, _forgetMigrationDurablePromise, status); } _taskState.setState(TaskState::kDone); + + return Status::OK(); }) .semi(); } diff --git a/src/mongo/db/serverless/SConscript b/src/mongo/db/serverless/SConscript index 3ccfd8ea7f7..82143897663 100644 --- a/src/mongo/db/serverless/SConscript +++ b/src/mongo/db/serverless/SConscript @@ -57,6 +57,21 @@ env.Library( ) env.Library( + target='serverless_lock', + source=[ + 'serverless_operation_lock_registry.cpp', + 'serverless_server_status.cpp', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/dbdirectclient', + '$BUILD_DIR/mongo/db/repl/tenant_migration_state_machine_idl', + '$BUILD_DIR/mongo/db/repl/tenant_migration_utils', + '$BUILD_DIR/mongo/db/server_base', + 'shard_split_state_machine', + ], +) + +env.Library( target='shard_split_donor_service', source=[ 'shard_split_donor_service.cpp', @@ -77,6 +92,7 @@ env.Library( '$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker', '$BUILD_DIR/mongo/db/server_base', '$BUILD_DIR/mongo/db/shard_role', + 'serverless_lock', 'shard_split_utils', ], ) @@ -84,6 +100,7 @@ env.Library( env.CppUnitTest( target='db_serverless_test', source=[ + 'serverless_operation_lock_registry_test.cpp', 'shard_split_donor_op_observer_test.cpp', 'shard_split_donor_service_test.cpp', 'shard_split_utils_test.cpp', @@ -97,6 +114,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/repl/replmocks', '$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker', '$BUILD_DIR/mongo/dbtests/mocklib', + 'serverless_lock', 'shard_split_donor_service', 'shard_split_utils', ], diff --git a/src/mongo/db/serverless/serverless_operation_lock_registry.cpp b/src/mongo/db/serverless/serverless_operation_lock_registry.cpp new file mode 100644 index 00000000000..20a02c6cd15 --- /dev/null +++ b/src/mongo/db/serverless/serverless_operation_lock_registry.cpp @@ -0,0 +1,192 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/serverless/serverless_operation_lock_registry.h" +#include "mongo/db/persistent_task_store.h" +#include "mongo/db/repl/tenant_migration_state_machine_gen.h" +#include "mongo/db/serverless/shard_split_state_machine_gen.h" +#include "mongo/logv2/log.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTenantMigration + +// Failpoint that will cause recoverLocks to return early. +MONGO_FAIL_POINT_DEFINE(skipRecoverServerlessOperationLock); +namespace mongo { + +const ServiceContext::Decoration<ServerlessOperationLockRegistry> + ServerlessOperationLockRegistry::get = + ServiceContext::declareDecoration<ServerlessOperationLockRegistry>(); + +void ServerlessOperationLockRegistry::acquireLock( + ServerlessOperationLockRegistry::LockType lockType, const UUID& operationId) { + stdx::lock_guard<Latch> lg(_mutex); + + // Verify there is no serverless operation in progress or it is the same type as the one + // acquiring the lock. + uassert(ErrorCodes::ConflictingServerlessOperation, + "Conflicting serverless operation in progress", + !_activeLockType || _activeLockType.get() == lockType); + invariant(_activeOperations.find(operationId) == _activeOperations.end(), + "Cannot acquire the serverless lock twice for the same operationId."); + _activeLockType = lockType; + + _activeOperations.emplace(operationId); + + LOGV2(6531500, + "Acquired serverless operation lock", + "type"_attr = lockType, + "id"_attr = operationId); +} + +void ServerlessOperationLockRegistry::releaseLock( + ServerlessOperationLockRegistry::LockType lockType, const UUID& operationId) { + stdx::lock_guard<Latch> lg(_mutex); + + invariant(_activeLockType && *_activeLockType == lockType, + "Cannot release a serverless lock that is not owned by the given lock type."); + + invariant(_activeOperations.find(operationId) != _activeOperations.end(), + "Cannot release a serverless lock if the given operationId does not own the lock."); + _activeOperations.erase(operationId); + + if (_activeOperations.empty()) { + _activeLockType.reset(); + } + + LOGV2(6531501, + "Released serverless operation lock", + "type"_attr = lockType, + "id"_attr = operationId); +} + +void ServerlessOperationLockRegistry::onDropStateCollection(LockType lockType) { + stdx::lock_guard<Latch> lg(_mutex); + + if (!_activeLockType || *_activeLockType != lockType) { + return; + } + + LOGV2(6531505, + "Released all serverless locks due to state collection drop", + "type"_attr = lockType); + + _activeLockType.reset(); + _activeOperations.clear(); +} + +void ServerlessOperationLockRegistry::clear() { + stdx::lock_guard<Latch> lg(_mutex); + LOGV2(6531504, + "Clearing serverless operation lock registry on shutdown", + "ns"_attr = _activeLockType); + + _activeOperations.clear(); + _activeLockType.reset(); +} + +void ServerlessOperationLockRegistry::recoverLocks(OperationContext* opCtx) { + if (skipRecoverServerlessOperationLock.shouldFail()) { + return; + } + + auto& registry = ServerlessOperationLockRegistry::get(opCtx->getServiceContext()); + registry.clear(); + + PersistentTaskStore<TenantMigrationDonorDocument> donorStore( + NamespaceString::kTenantMigrationDonorsNamespace); + donorStore.forEach(opCtx, {}, [&](const TenantMigrationDonorDocument& doc) { + // Do not acquire a lock for garbage-collectable documents + if (doc.getExpireAt()) { + return true; + } + + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, doc.getId()); + + return true; + }); + + PersistentTaskStore<TenantMigrationRecipientDocument> recipientStore( + NamespaceString::kTenantMigrationRecipientsNamespace); + recipientStore.forEach(opCtx, {}, [&](const TenantMigrationRecipientDocument& doc) { + // Do not acquire a lock for garbage-collectable documents + if (doc.getExpireAt()) { + return true; + } + + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient, + doc.getId()); + + return true; + }); + + PersistentTaskStore<ShardSplitDonorDocument> splitStore( + NamespaceString::kShardSplitDonorsNamespace); + splitStore.forEach(opCtx, {}, [&](const ShardSplitDonorDocument& doc) { + // Do not acquire a lock for garbage-collectable documents + if (doc.getExpireAt()) { + return true; + } + + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, doc.getId()); + + return true; + }); +} + +const std::string kOperationLockFieldName = "operationLock"; +void ServerlessOperationLockRegistry::appendInfoForServerStatus(BSONObjBuilder* builder) const { + stdx::lock_guard<Latch> lg(_mutex); + + if (!_activeLockType) { + builder->append(kOperationLockFieldName, 0); + return; + } + + switch (_activeLockType.value()) { + case ServerlessOperationLockRegistry::LockType::kShardSplit: + builder->append(kOperationLockFieldName, 1); + break; + case ServerlessOperationLockRegistry::LockType::kTenantDonor: + builder->append(kOperationLockFieldName, 2); + break; + case ServerlessOperationLockRegistry::LockType::kTenantRecipient: + builder->append(kOperationLockFieldName, 3); + break; + } +} + +boost::optional<ServerlessOperationLockRegistry::LockType> +ServerlessOperationLockRegistry::getActiveOperationType_forTest() { + stdx::lock_guard<Latch> lg(_mutex); + + return _activeLockType; +} + + +} // namespace mongo diff --git a/src/mongo/db/serverless/serverless_operation_lock_registry.h b/src/mongo/db/serverless/serverless_operation_lock_registry.h new file mode 100644 index 00000000000..d9ac07393f4 --- /dev/null +++ b/src/mongo/db/serverless/serverless_operation_lock_registry.h @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/service_context.h" +#include "mongo/platform/mutex.h" +#include "mongo/util/uuid.h" + +#include <set> + +namespace mongo { + +/** + * Registry to allow only one type of active serverless operation at a time. It allows multiple + * simultaneous operations of the same type. + */ +class ServerlessOperationLockRegistry { + ServerlessOperationLockRegistry(const ServerlessOperationLockRegistry&) = delete; + ServerlessOperationLockRegistry& operator=(const ServerlessOperationLockRegistry&) = delete; + +public: + ServerlessOperationLockRegistry() = default; + + static const ServiceContext::Decoration<ServerlessOperationLockRegistry> get; + + enum LockType { kShardSplit, kTenantDonor, kTenantRecipient }; + + /** + * Acquire the serverless lock for LockType and adds operationId to the set of + * instances tracked. Throws ConflictingOperationInProgress error if there is already an + * activeServerlessOperation in progress with a different namespace than operationNamespace. + */ + void acquireLock(LockType lockType, const UUID& operationId); + + /** + * If _activeOpSeverlessOperation matches LockType, removes the given operationId from + * the set of active instances and releases the lock if the set becomes empty. Invariant if + * lockType or operationId does not own the lock. + */ + void releaseLock(LockType lockType, const UUID& operationId); + + /** + * Called when a state document collection is dropped. If the collection's lockType currently + * holds the lock, it releases the lock. If it does not own the lock, the function does nothing. + */ + void onDropStateCollection(LockType lockType); + + void clear(); + + /** + * Scan serverless state documents and acquire the serverless mutual exclusion lock if needed. + */ + static void recoverLocks(OperationContext* opCtx); + + /** + * Appends the exclusion status to the BSONObjBuilder. + */ + void appendInfoForServerStatus(BSONObjBuilder* builder) const; + + boost::optional<ServerlessOperationLockRegistry::LockType> getActiveOperationType_forTest(); + +private: + mutable Mutex _mutex = MONGO_MAKE_LATCH("ServerlessMutualExclusionRegistry::_mutex"); + boost::optional<LockType> _activeLockType; + std::set<UUID> _activeOperations; +}; + +} // namespace mongo diff --git a/src/mongo/db/serverless/serverless_operation_lock_registry_test.cpp b/src/mongo/db/serverless/serverless_operation_lock_registry_test.cpp new file mode 100644 index 00000000000..9d95b3b7bc7 --- /dev/null +++ b/src/mongo/db/serverless/serverless_operation_lock_registry_test.cpp @@ -0,0 +1,154 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/serverless/serverless_operation_lock_registry.h" +#include "mongo/unittest/death_test.h" +#include "mongo/unittest/log_test.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { + +TEST(ServerlessOperationLockRegistryTest, InsertRemoveOne) { + ServerlessOperationLockRegistry registry; + + auto id = UUID::gen(); + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id); + registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id); + + ASSERT_FALSE(registry.getActiveOperationType_forTest()); +} + +DEATH_TEST(ServerlessOperationLockRegistryTest, + InsertSameIdTwice, + "Cannot acquire the serverless lock twice for the same operationId.") { + ServerlessOperationLockRegistry registry; + + auto id = UUID::gen(); + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id); + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id); +} + +TEST(ServerlessOperationLockRegistryTest, AcquireDifferentNamespaceFail) { + ServerlessOperationLockRegistry registry; + + auto id = UUID::gen(); + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id); + + ASSERT_THROWS_CODE( + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, UUID::gen()), + DBException, + ErrorCodes::ConflictingServerlessOperation); +} + +DEATH_TEST(ServerlessOperationLockRegistryTest, + ReleaseDifferentNsTriggersInvariant, + "Cannot release a serverless lock that is not owned by the given lock type.") { + ServerlessOperationLockRegistry registry; + + auto id = UUID::gen(); + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id); + registry.releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, id); +} + + +DEATH_TEST(ServerlessOperationLockRegistryTest, + ReleaseDifferentIdTriggersInvariant, + "Cannot release a serverless lock if the given operationId does not own the lock.") { + ServerlessOperationLockRegistry registry; + + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen()); + registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen()); +} + +TEST(ServerlessOperationLockRegistryTest, ClearReleasesAllLocks) { + ServerlessOperationLockRegistry registry; + + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen()); + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen()); + + registry.clear(); + + // Verify the lock has been released. + ASSERT_FALSE(registry.getActiveOperationType_forTest()); +} + +TEST(ServerlessOperationLockRegistryTest, LockIsReleasedWhenAllInstanceAreRemoved) { + ServerlessOperationLockRegistry registry; + + std::vector<UUID> ids; + for (int i = 0; i < 5; ++i) { + ids.push_back(UUID::gen()); + } + + for (auto& id : ids) { + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id); + } + + // Verify the lock is held; + ASSERT_EQ(*registry.getActiveOperationType_forTest(), + ServerlessOperationLockRegistry::LockType::kShardSplit); + + + for (auto& id : ids) { + registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id); + } + + // Verify the lock has been released. + ASSERT_FALSE(registry.getActiveOperationType_forTest()); +} + +TEST(ServerlessOperationLockRegistryTest, LockIsNotReleasedWhenNotAllInstanceAreRemoved) { + ServerlessOperationLockRegistry registry; + + std::vector<UUID> ids; + for (int i = 0; i < 5; ++i) { + ids.push_back(UUID::gen()); + } + + for (auto& id : ids) { + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id); + } + // Add an additional id; + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen()); + + // Verify the lock is held; + ASSERT_EQ(*registry.getActiveOperationType_forTest(), + ServerlessOperationLockRegistry::LockType::kShardSplit); + + for (auto& id : ids) { + registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id); + } + + // Verify the lock is held; + ASSERT_EQ(*registry.getActiveOperationType_forTest(), + ServerlessOperationLockRegistry::LockType::kShardSplit); +} + + +} // namespace mongo diff --git a/src/mongo/db/serverless/serverless_server_status.cpp b/src/mongo/db/serverless/serverless_server_status.cpp new file mode 100644 index 00000000000..8d0d4658dc3 --- /dev/null +++ b/src/mongo/db/serverless/serverless_server_status.cpp @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/commands/server_status.h" +#include "mongo/db/serverless/serverless_operation_lock_registry.h" + +namespace mongo { +namespace { + +class ServerlessServerStatus final : public ServerStatusSection { +public: + ServerlessServerStatus() : ServerStatusSection("serverless") {} + + bool includeByDefault() const override { + return false; + } + + BSONObj generateSection(OperationContext* opCtx, + const BSONElement& configElement) const override { + BSONObjBuilder result; + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .appendInfoForServerStatus(&result); + return result.obj(); + } +} serverlessServerStatus; + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/serverless/shard_split_commands.cpp b/src/mongo/db/serverless/shard_split_commands.cpp index b0f0e6c6cab..5ce6c9c0307 100644 --- a/src/mongo/db/serverless/shard_split_commands.cpp +++ b/src/mongo/db/serverless/shard_split_commands.cpp @@ -114,7 +114,7 @@ public: }; std::string help() const { - return "Start an opereation to split a shard into its own slice."; + return "Start an operation to split a shard into its own slice."; } bool adminOnly() const override { diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp index 2d67495c431..b2470d07854 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp +++ b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp @@ -31,6 +31,7 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/repl/tenant_migration_access_blocker_util.h" +#include "mongo/db/serverless/serverless_operation_lock_registry.h" #include "mongo/db/serverless/shard_split_donor_op_observer.h" #include "mongo/db/serverless/shard_split_state_machine_gen.h" #include "mongo/db/serverless/shard_split_utils.h" @@ -48,6 +49,8 @@ bool isPrimary(const OperationContext* opCtx) { const auto tenantIdsToDeleteDecoration = OperationContext::declareDecoration<boost::optional<std::vector<std::string>>>(); +const auto shardSplitIdToDeleteDecoration = + OperationContext::declareDecoration<boost::optional<UUID>>(); ShardSplitDonorDocument parseAndValidateDonorDocument(const BSONObj& doc) { auto donorStateDoc = ShardSplitDonorDocument::parse(IDLParserContext("donorStateDoc"), doc); @@ -146,6 +149,9 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx, invariant(donorStateDoc.getTenantIds()); invariant(donorStateDoc.getRecipientConnectionString()); + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, donorStateDoc.getId()); + auto tenantIds = *donorStateDoc.getTenantIds(); for (const auto& tenantId : tenantIds) { auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>(opCtx->getServiceContext(), @@ -157,11 +163,13 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx, if (isPrimary(opCtx)) { // onRollback is not registered on secondaries since secondaries should not fail to // apply the write. - opCtx->recoveryUnit()->onRollback([opCtx, tenantIds] { + opCtx->recoveryUnit()->onRollback([opCtx, tenantIds, migrationId = donorStateDoc.getId()] { for (const auto& tenantId : tenantIds) { TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kDonor); } + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, migrationId); }); } } @@ -250,6 +258,10 @@ public: void commit(OperationContext* opCtx, boost::optional<Timestamp>) override { if (_donorStateDoc.getExpireAt()) { + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, + _donorStateDoc.getId()); + if (_donorStateDoc.getTenantIds()) { auto tenantIds = _donorStateDoc.getTenantIds().value(); for (auto&& tenantId : tenantIds) { @@ -376,12 +388,13 @@ void ShardSplitDonorOpObserver::aboutToDelete(OperationContext* opCtx, } auto donorStateDoc = parseAndValidateDonorDocument(doc); + const bool shouldRemoveOnRecipient = + serverless::shouldRemoveStateDocumentOnRecipient(opCtx, donorStateDoc); uassert(ErrorCodes::IllegalOperation, str::stream() << "cannot delete a donor's state document " << doc << " since it has not been marked as garbage collectable and is not a" << " recipient garbage collectable.", - donorStateDoc.getExpireAt() || - serverless::shouldRemoveStateDocumentOnRecipient(opCtx, donorStateDoc)); + donorStateDoc.getExpireAt() || shouldRemoveOnRecipient); // To support back-to-back split retries, when a split is aborted, we remove its // TenantMigrationDonorAccessBlockers as soon as its donor state doc is marked as garbage @@ -397,6 +410,10 @@ void ShardSplitDonorOpObserver::aboutToDelete(OperationContext* opCtx, tenantIdsToDeleteDecoration(opCtx) = boost::make_optional(result); } + + if (shouldRemoveOnRecipient) { + shardSplitIdToDeleteDecoration(opCtx) = boost::make_optional(donorStateDoc.getId()); + } } void ShardSplitDonorOpObserver::onDelete(OperationContext* opCtx, @@ -419,6 +436,12 @@ void ShardSplitDonorOpObserver::onDelete(OperationContext* opCtx, for (auto&& tenantId : *tenantIdsToDeleteDecoration(opCtx)) { registry.remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kDonor); } + + const auto idToDelete = shardSplitIdToDeleteDecoration(opCtx); + if (idToDelete) { + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, *idToDelete); + } }); } @@ -431,6 +454,9 @@ repl::OpTime ShardSplitDonorOpObserver::onDropCollection(OperationContext* opCtx opCtx->recoveryUnit()->onCommit([opCtx](boost::optional<Timestamp>) { TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .removeAll(TenantMigrationAccessBlocker::BlockerType::kDonor); + + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .onDropStateCollection(ServerlessOperationLockRegistry::LockType::kShardSplit); }); } diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp index 97c923524a3..f30ad593951 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp +++ b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp @@ -34,6 +34,7 @@ #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/repl/tenant_migration_access_blocker_util.h" +#include "mongo/db/serverless/serverless_operation_lock_registry.h" #include "mongo/db/serverless/shard_split_donor_op_observer.h" #include "mongo/db/serverless/shard_split_state_machine_gen.h" #include "mongo/db/serverless/shard_split_test_utils.h" @@ -446,7 +447,13 @@ TEST_F(ShardSplitDonorOpObserverTest, SetExpireAtForAbortedRemoveBlockers) { ASSERT_FALSE(mtab); }; + ServerlessOperationLockRegistry::get(_opCtx->getServiceContext()) + .acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, _uuid); + runUpdateTestCase(stateDocument, _tenantIds, mtabVerifier); + + ASSERT_FALSE(ServerlessOperationLockRegistry::get(_opCtx->getServiceContext()) + .getActiveOperationType_forTest()); } TEST_F(ShardSplitDonorOpObserverTest, DeleteAbortedDocumentDoesNotRemoveBlockers) { diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp index 4c3fdabb39f..fa977ae0c7d 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service.cpp @@ -1005,7 +1005,18 @@ ExecutorFuture<repl::OpTime> ShardSplitDonorService::DonorStateMachine::_updateS return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); }) - .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); }) + .until([&](StatusWith<repl::OpTime> swOpTime) { + if (swOpTime.getStatus().code() == ErrorCodes::ConflictingServerlessOperation) { + LOGV2(6531509, + "Shard split completed due to serverless lock error", + "id"_attr = _migrationId, + "status"_attr = swOpTime.getStatus()); + stdx::lock_guard<Latch> lg(_mutex); + + uassertStatusOK(swOpTime); + } + return swOpTime.getStatus().isOK(); + }) .withBackoffBetweenIterations(kExponentialBackoff) .on(**executor, token); } @@ -1067,7 +1078,8 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState( } } - if (ErrorCodes::isNotPrimaryError(status) || ErrorCodes::isShutdownError(status)) { + if (ErrorCodes::isNotPrimaryError(status) || ErrorCodes::isShutdownError(status) || + status.code() == ErrorCodes::ConflictingServerlessOperation) { // Don't abort the split on retriable errors that may have been generated by the local // server shutting/stepping down because it can be resumed when the client retries. return ExecutorFuture(**executor, StatusWith<DurableState>{status}); diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp index 463031b66bc..f46c908059b 100644 --- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp @@ -49,6 +49,7 @@ #include "mongo/db/repl/tenant_migration_access_blocker_util.h" #include "mongo/db/repl/tenant_migration_donor_access_blocker.h" #include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/db/serverless/serverless_operation_lock_registry.h" #include "mongo/db/serverless/shard_split_donor_op_observer.h" #include "mongo/db/serverless/shard_split_donor_service.h" #include "mongo/db/serverless/shard_split_state_machine_gen.h" @@ -511,6 +512,11 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation) waitForReplSetStepUp(Status(ErrorCodes::OK, "")); waitForRecipientPrimaryMajorityWrite(); + // Verify the serverless lock has been acquired for split. + auto& registry = ServerlessOperationLockRegistry::get(opCtx->getServiceContext()); + ASSERT_EQ(*registry.getActiveOperationType_forTest(), + ServerlessOperationLockRegistry::LockType::kShardSplit); + auto result = serviceInstance->decisionFuture().get(); ASSERT_TRUE(hasActiveSplitForTenants(opCtx.get(), _tenantIds)); ASSERT(!result.abortReason); @@ -520,10 +526,32 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation) auto completionFuture = serviceInstance->completionFuture(); completionFuture.wait(); + // The lock has been released. + ASSERT_FALSE(registry.getActiveOperationType_forTest()); + ASSERT_OK(serviceInstance->completionFuture().getNoThrow()); ASSERT_TRUE(serviceInstance->isGarbageCollectable()); } +TEST_F(ShardSplitDonorServiceTest, ShardSplitFailsWhenLockIsHeld) { + auto opCtx = makeOperationContext(); + test::shard_split::reconfigToAddRecipientNodes( + getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts()); + + auto& registry = ServerlessOperationLockRegistry::get(opCtx->getServiceContext()); + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient, UUID::gen()); + + // Create and start the instance. + auto serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate( + opCtx.get(), _service, defaultStateDocument().toBSON()); + ASSERT(serviceInstance.get()); + + auto decisionFuture = serviceInstance->decisionFuture(); + + auto result = decisionFuture.getNoThrow(); + ASSERT_EQ(result.getStatus().code(), ErrorCodes::ConflictingServerlessOperation); +} + TEST_F(ShardSplitDonorServiceTest, ReplSetStepUpRetryable) { auto opCtx = makeOperationContext(); test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); @@ -1015,6 +1043,10 @@ public: stateDocument.setState(ShardSplitDonorStateEnum::kBlocking); stateDocument.setRecipientConnectionString(ConnectionString::forLocal()); + ServerlessOperationLockRegistry::get(getServiceContext()) + .acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, + stateDocument.getId()); + return stateDocument; } }; |