summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDidier Nadeau <didier.nadeau@mongodb.com>2022-09-29 18:17:50 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-29 19:37:36 +0000
commitbe197bff5315f3990a598dff20a89d32a24b1e5e (patch)
tree8e5aa83b42dfaee2869bc2e49e2246e275aa1abd
parent5f40051ccd887859a43bf0e456773759f5d93ddc (diff)
downloadmongo-be197bff5315f3990a598dff20a89d32a24b1e5e.tar.gz
SERVER-65315 Enfore mutual exclusion between serverless operations
-rw-r--r--jstests/replsets/libs/tenant_migration_test.js4
-rw-r--r--jstests/replsets/libs/tenant_migration_util.js13
-rw-r--r--jstests/replsets/tenant_migration_donor_initial_sync_recovery.js13
-rw-r--r--jstests/replsets/tenant_migration_donor_startup_recovery.js12
-rw-r--r--jstests/replsets/tenant_migration_recipient_initial_sync_recovery.js12
-rw-r--r--jstests/serverless/libs/basic_serverless_test.js72
-rw-r--r--jstests/serverless/libs/serverless_reject_multiple_ops_utils.js80
-rw-r--r--jstests/serverless/serverless_reject_multiple_ops_access_blocker.js67
-rw-r--r--jstests/serverless/serverless_reject_multiple_ops_migration_after_garbage_collection.js69
-rw-r--r--jstests/serverless/serverless_reject_multiple_ops_migration_different_tenant.js73
-rw-r--r--jstests/serverless/serverless_reject_multiple_ops_migration_donor.js69
-rw-r--r--jstests/serverless/serverless_reject_multiple_ops_migration_donor_retry.js86
-rw-r--r--jstests/serverless/serverless_reject_multiple_ops_migration_fail.js72
-rw-r--r--jstests/serverless/serverless_reject_multiple_ops_migration_fail_on_recipient.js78
-rw-r--r--jstests/serverless/serverless_reject_multiple_ops_migration_recipient_retry.js90
-rw-r--r--jstests/serverless/serverless_reject_multiple_ops_split.js82
-rw-r--r--jstests/serverless/serverless_reject_multiple_ops_split_retry.js127
-rw-r--r--jstests/serverless/serverless_reject_multiple_ops_split_success.js61
-rw-r--r--jstests/serverless/shard_split_recipient_removes_access_blockers.js2
-rw-r--r--jstests/serverless/shard_split_recipient_removes_serverless_lock.js49
-rw-r--r--jstests/serverless/shard_split_startup_recovery_initially_aborted.js11
-rw-r--r--src/mongo/base/error_codes.yml3
-rw-r--r--src/mongo/db/repl/SConscript5
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp2
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp13
-rw-r--r--src/mongo/db/repl/primary_only_service.cpp10
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp2
-rw-r--r--src/mongo/db/repl/rollback_impl.cpp2
-rw-r--r--src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp4
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_op_observer.cpp18
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp65
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp20
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp16
-rw-r--r--src/mongo/db/serverless/SConscript18
-rw-r--r--src/mongo/db/serverless/serverless_operation_lock_registry.cpp192
-rw-r--r--src/mongo/db/serverless/serverless_operation_lock_registry.h96
-rw-r--r--src/mongo/db/serverless/serverless_operation_lock_registry_test.cpp154
-rw-r--r--src/mongo/db/serverless/serverless_server_status.cpp57
-rw-r--r--src/mongo/db/serverless/shard_split_commands.cpp2
-rw-r--r--src/mongo/db/serverless/shard_split_donor_op_observer.cpp32
-rw-r--r--src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp7
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.cpp16
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service_test.cpp32
44 files changed, 1856 insertions, 54 deletions
diff --git a/jstests/replsets/libs/tenant_migration_test.js b/jstests/replsets/libs/tenant_migration_test.js
index 04619679e33..bd79c51b6bb 100644
--- a/jstests/replsets/libs/tenant_migration_test.js
+++ b/jstests/replsets/libs/tenant_migration_test.js
@@ -136,7 +136,7 @@ function TenantMigrationTest({
nodeOptions["setParameter"] = setParameterOpts;
const rstName = `${name}_${(isDonor ? "donor" : "recipient")}`;
- const rst = new ReplSetTest({name: rstName, nodes, nodeOptions});
+ const rst = new ReplSetTest({name: rstName, nodes, serverless: true, nodeOptions});
rst.startSet();
if (initiateRstWithHighElectionTimeout) {
rst.initiateWithHighElectionTimeout();
@@ -231,6 +231,7 @@ function TenantMigrationTest({
this.runDonorStartMigration = function({
migrationIdString,
tenantId,
+ protocol,
recipientConnectionString = recipientRst.getURL(),
readPreference = {mode: "primary"},
donorCertificateForRecipient = migrationCertificates.donorCertificateForRecipient,
@@ -251,6 +252,7 @@ function TenantMigrationTest({
readPreference,
donorCertificateForRecipient,
recipientCertificateForDonor,
+ protocol
};
const stateRes = TenantMigrationUtil.runTenantMigrationCommand(cmdObj, this.getDonorRst(), {
diff --git a/jstests/replsets/libs/tenant_migration_util.js b/jstests/replsets/libs/tenant_migration_util.js
index d0e9354f1c1..381dc3855bd 100644
--- a/jstests/replsets/libs/tenant_migration_util.js
+++ b/jstests/replsets/libs/tenant_migration_util.js
@@ -262,6 +262,17 @@ var TenantMigrationUtil = (function() {
return res;
}
+ const ServerlessLockType =
+ {None: 0, ShardSplitDonor: 1, TenantMigrationDonor: 2, TenantMigrationRecipient: 3};
+
+ /**
+ * Return the active serverless operation lock, if one is acquired.
+ */
+ function getServerlessOperationLock(node) {
+ return assert.commandWorked(node.adminCommand({serverStatus: 1, serverless: 1}))
+ .serverless.operationLock;
+ }
+
/**
* Returns the TenantMigrationAccessBlocker serverStatus output for the multi-tenant migration
* or shard merge for the given node.
@@ -561,6 +572,8 @@ var TenantMigrationUtil = (function() {
makeMigrationCertificatesForTest,
makeX509OptionsForTest,
isMigrationCompleted,
+ ServerlessLockType,
+ getServerlessOperationLock,
getTenantMigrationAccessBlocker,
getTenantMigrationAccessBlockers,
getNumBlockedReads,
diff --git a/jstests/replsets/tenant_migration_donor_initial_sync_recovery.js b/jstests/replsets/tenant_migration_donor_initial_sync_recovery.js
index 85429af3ddd..8afc2d2fcf9 100644
--- a/jstests/replsets/tenant_migration_donor_initial_sync_recovery.js
+++ b/jstests/replsets/tenant_migration_donor_initial_sync_recovery.js
@@ -5,6 +5,7 @@
* @tags: [
* incompatible_with_macos,
* incompatible_with_windows_tls,
+ * requires_fcv_62,
* requires_majority_read_concern,
* requires_persistence,
* serverless,
@@ -19,6 +20,7 @@ load("jstests/libs/uuid_util.js");
load("jstests/libs/parallelTester.js");
load("jstests/libs/write_concern_util.js");
load("jstests/replsets/libs/tenant_migration_test.js");
+const {ServerlessLockType, getServerlessOperationLock} = TenantMigrationUtil;
const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()});
@@ -47,6 +49,7 @@ const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
tenantId: kTenantId
};
+
assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
// We must wait for the migration to have finished replicating the recipient keys on the donor set
// before starting initial sync, otherwise the migration will hang while waiting for initial sync to
@@ -83,7 +86,8 @@ donorRst.awaitReplication();
stopServerReplication(initialSyncNode);
let configDonorsColl = initialSyncNode.getCollection(TenantMigrationTest.kConfigDonorsNS);
-let donorDoc = configDonorsColl.findOne({tenantId: kTenantId});
+assert.lte(configDonorsColl.count(), 1);
+let donorDoc = configDonorsColl.findOne();
if (donorDoc) {
jsTestLog("Initial sync completed while migration was in state: " + donorDoc.state);
switch (donorDoc.state) {
@@ -148,6 +152,13 @@ if (donorDoc) {
}
}
+const activeServerlessLock = getServerlessOperationLock(initialSyncNode);
+if (donorDoc && !donorDoc.expireAt) {
+ assert.eq(activeServerlessLock, ServerlessLockType.TenantMigrationDonor);
+} else {
+ assert.eq(activeServerlessLock, ServerlessLockType.None);
+}
+
if (fp) {
fp.off();
}
diff --git a/jstests/replsets/tenant_migration_donor_startup_recovery.js b/jstests/replsets/tenant_migration_donor_startup_recovery.js
index 7a564e416a5..7dbbcb56f8c 100644
--- a/jstests/replsets/tenant_migration_donor_startup_recovery.js
+++ b/jstests/replsets/tenant_migration_donor_startup_recovery.js
@@ -8,6 +8,7 @@
* incompatible_with_macos,
* incompatible_with_shard_merge,
* incompatible_with_windows_tls,
+ * requires_fcv_62,
* requires_majority_read_concern,
* requires_persistence,
* serverless,
@@ -20,6 +21,7 @@
load("jstests/libs/fail_point_util.js");
load("jstests/libs/uuid_util.js");
load("jstests/replsets/libs/tenant_migration_test.js");
+const {ServerlessLockType, getServerlessOperationLock} = TenantMigrationUtil;
const donorRst = new ReplSetTest({
nodes: 1,
@@ -70,7 +72,8 @@ donorRst.startSet({
donorPrimary = donorRst.getPrimary();
const configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS);
-const donorDoc = configDonorsColl.findOne({tenantId: kTenantId});
+assert.lte(configDonorsColl.count(), 1);
+const donorDoc = configDonorsColl.findOne();
if (donorDoc) {
switch (donorDoc.state) {
case TenantMigrationTest.DonorState.kAbortingIndexBuilds:
@@ -133,6 +136,13 @@ if (donorDoc) {
}
}
+const activeServerlessLock = getServerlessOperationLock(donorPrimary);
+if (donorDoc && !donorDoc.expireAt) {
+ assert.eq(activeServerlessLock, ServerlessLockType.TenantMigrationDonor);
+} else {
+ assert.eq(activeServerlessLock, ServerlessLockType.None);
+}
+
tenantMigrationTest.stop();
donorRst.stopSet();
})();
diff --git a/jstests/replsets/tenant_migration_recipient_initial_sync_recovery.js b/jstests/replsets/tenant_migration_recipient_initial_sync_recovery.js
index 404bf0fa765..1012be2670c 100644
--- a/jstests/replsets/tenant_migration_recipient_initial_sync_recovery.js
+++ b/jstests/replsets/tenant_migration_recipient_initial_sync_recovery.js
@@ -6,6 +6,7 @@
* incompatible_with_macos,
* incompatible_with_shard_merge,
* incompatible_with_windows_tls,
+ * requires_fcv_62,
* requires_majority_read_concern,
* requires_persistence,
* serverless,
@@ -19,6 +20,7 @@ load("jstests/libs/fail_point_util.js");
load("jstests/libs/uuid_util.js");
load("jstests/libs/write_concern_util.js");
load("jstests/replsets/libs/tenant_migration_test.js");
+const {ServerlessLockType, getServerlessOperationLock} = TenantMigrationUtil;
const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()});
@@ -59,7 +61,8 @@ recipientRst.awaitReplication();
stopServerReplication(initialSyncNode);
const configRecipientsColl = initialSyncNode.getCollection(TenantMigrationTest.kConfigRecipientsNS);
-const recipientDoc = configRecipientsColl.findOne({tenantId: kTenantId});
+assert.lte(configRecipientsColl.count(), 1);
+const recipientDoc = configRecipientsColl.findOne();
if (recipientDoc) {
switch (recipientDoc.state) {
case TenantMigrationTest.RecipientState.kStarted:
@@ -97,6 +100,13 @@ if (recipientDoc) {
}
}
+const activeServerlessLock = getServerlessOperationLock(initialSyncNode);
+if (recipientDoc && !recipientDoc.expireAt) {
+ assert.eq(activeServerlessLock, ServerlessLockType.TenantMigrationRecipient);
+} else {
+ assert.eq(activeServerlessLock, ServerlessLockType.None);
+}
+
restartServerReplication(initialSyncNode);
tenantMigrationTest.stop();
diff --git a/jstests/serverless/libs/basic_serverless_test.js b/jstests/serverless/libs/basic_serverless_test.js
index 69aeab56616..727827429db 100644
--- a/jstests/serverless/libs/basic_serverless_test.js
+++ b/jstests/serverless/libs/basic_serverless_test.js
@@ -41,6 +41,45 @@ const runCommitSplitThreadWrapper = function(rstArgs,
donorRst, commitShardSplitCmdObj, retryOnRetryableErrors, enableDonorStartMigrationFsync);
};
+/*
+ * Wait for state document garbage collection by polling for when the document has been removed
+ * from the 'shardSplitDonors' namespace, and all access blockers have been removed.
+ * @param {migrationId} id that was used for the commitShardSplit command.
+ * @param {tenantIds} tenant ids of the shard split.
+ */
+const waitForGarbageCollectionForSplit = function(donorNodes, migrationId, tenantIds) {
+ jsTestLog("Wait for garbage collection");
+ assert.soon(() => donorNodes.every(node => {
+ const donorDocumentDeleted =
+ node.getCollection(BasicServerlessTest.kConfigSplitDonorsNS).count({
+ _id: migrationId
+ }) === 0;
+ const allAccessBlockersRemoved = tenantIds.every(
+ id => BasicServerlessTest.getTenantMigrationAccessBlocker({node, id}) == null);
+
+ const result = donorDocumentDeleted && allAccessBlockersRemoved;
+ if (!result) {
+ const status = [];
+ if (!donorDocumentDeleted) {
+ status.push(`donor document to be deleted (docCount=${
+ node.getCollection(BasicServerlessTest.kConfigSplitDonorsNS).count({
+ _id: migrationId
+ })})`);
+ }
+
+ if (!allAccessBlockersRemoved) {
+ const tenantsWithBlockers = tenantIds.filter(
+ id => BasicServerlessTest.getTenantMigrationAccessBlocker({node, id}) != null);
+ status.push(`access blockers to be removed (${tenantsWithBlockers})`);
+ }
+ }
+ return donorDocumentDeleted && allAccessBlockersRemoved;
+ }),
+ "tenant access blockers weren't removed",
+ 60 * 1000,
+ 1 * 1000);
+};
+
const runShardSplitCommand = function(
replicaSet, cmdObj, retryOnRetryableErrors, enableDonorStartMigrationFsync) {
let res;
@@ -355,38 +394,7 @@ class BasicServerlessTest {
* @param {tenantIds} tenant ids of the shard split.
*/
waitForGarbageCollection(migrationId, tenantIds) {
- jsTestLog("Wait for garbage collection");
- const donorNodes = this.donor.nodes;
- assert.soon(() => donorNodes.every(node => {
- const donorDocumentDeleted =
- node.getCollection(BasicServerlessTest.kConfigSplitDonorsNS).count({
- _id: migrationId
- }) === 0;
- const allAccessBlockersRemoved = tenantIds.every(
- id => BasicServerlessTest.getTenantMigrationAccessBlocker({node, id}) == null);
-
- const result = donorDocumentDeleted && allAccessBlockersRemoved;
- if (!result) {
- const status = [];
- if (!donorDocumentDeleted) {
- status.push(`donor document to be deleted (docCount=${
- node.getCollection(BasicServerlessTest.kConfigSplitDonorsNS).count({
- _id: migrationId
- })})`);
- }
-
- if (!allAccessBlockersRemoved) {
- const tenantsWithBlockers =
- tenantIds.filter(id => BasicServerlessTest.getTenantMigrationAccessBlocker(
- {node, id}) != null);
- status.push(`access blockers to be removed (${tenantsWithBlockers})`);
- }
- }
- return donorDocumentDeleted && allAccessBlockersRemoved;
- }),
- "tenant access blockers weren't removed",
- 60 * 1000,
- 1 * 1000);
+ return waitForGarbageCollectionForSplit(this.donor.nodes, migrationId, tenantIds);
}
/**
diff --git a/jstests/serverless/libs/serverless_reject_multiple_ops_utils.js b/jstests/serverless/libs/serverless_reject_multiple_ops_utils.js
new file mode 100644
index 00000000000..ca79b44778e
--- /dev/null
+++ b/jstests/serverless/libs/serverless_reject_multiple_ops_utils.js
@@ -0,0 +1,80 @@
+/**
+ * Utility functions for serverless_reject_multiple_ops tests
+ *
+ * @tags: [
+ * serverless,
+ * requires_fcv_52,
+ * featureFlagShardSplit,
+ * featureFlagShardMerge
+ * ]
+ */
+
+load("jstests/replsets/rslib.js");
+load("jstests/libs/parallelTester.js");
+
+function waitForMergeToComplete(migrationOpts, migrationId, test) {
+ // Assert that the migration has already been started.
+ assert(test.getDonorPrimary().getCollection(TenantMigrationTest.kConfigDonorsNS).findOne({
+ _id: migrationId
+ }));
+
+ const donorStartReply = test.runDonorStartMigration(
+ migrationOpts, {waitForMigrationToComplete: true, retryOnRetryableErrors: false});
+
+ return donorStartReply;
+}
+
+function commitSplitAsync(rst, tenantIds, recipientTagName, recipientSetName, migrationId) {
+ jsTestLog("Running commitAsync command");
+
+ const rstArgs = createRstArgs(rst);
+ const migrationIdString = extractUUIDFromObject(migrationId);
+
+ const thread = new Thread(runCommitSplitThreadWrapper,
+ rstArgs,
+ migrationIdString,
+ tenantIds,
+ recipientTagName,
+ recipientSetName,
+ false /* enableDonorStartMigrationFsync */);
+ thread.start();
+
+ return thread;
+}
+
+function addRecipientNodes(rst, recipientTagName) {
+ const numNodes = 3; // default to three nodes
+ let recipientNodes = [];
+ const options = TenantMigrationUtil.makeX509OptionsForTest();
+ jsTestLog(`Adding ${numNodes} non-voting recipient nodes to donor`);
+ for (let i = 0; i < numNodes; ++i) {
+ recipientNodes.push(rst.add(options.donor));
+ }
+
+ const primary = rst.getPrimary();
+ const admin = primary.getDB('admin');
+ const config = rst.getReplSetConfigFromNode();
+ config.version++;
+
+ // ensure recipient nodes are added as non-voting members
+ recipientNodes.forEach(node => {
+ config.members.push({
+ host: node.host,
+ votes: 0,
+ priority: 0,
+ hidden: true,
+ tags: {[recipientTagName]: ObjectId().valueOf()}
+ });
+ });
+
+ // reindex all members from 0
+ config.members = config.members.map((member, idx) => {
+ member._id = idx;
+ return member;
+ });
+
+ assert.commandWorked(admin.runCommand({replSetReconfig: config}));
+ recipientNodes.forEach(node => rst.waitForState(node, ReplSetTest.State.SECONDARY));
+
+ return recipientNodes;
+}
diff --git a/jstests/serverless/serverless_reject_multiple_ops_access_blocker.js b/jstests/serverless/serverless_reject_multiple_ops_access_blocker.js
new file mode 100644
index 00000000000..0c7107f9682
--- /dev/null
+++ b/jstests/serverless/serverless_reject_multiple_ops_access_blocker.js
@@ -0,0 +1,67 @@
+/**
+ * @tags: [
+ * serverless,
+ * requires_fcv_52,
+ * featureFlagShardSplit,
+ * featureFlagShardMerge
+ * ]
+ */
+
+load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/serverless/libs/basic_serverless_test.js");
+load("jstests/serverless/libs/serverless_reject_multiple_ops_utils.js");
+load("jstests/libs/uuid_util.js");
+
+function cannotStartMigrationWhenThereIsAnExistingAccessBlocker(protocol) {
+ // Test that we cannot start a tenant migration for a tenant that already has an access blocker.
+ const recipientTagName = "recipientTag";
+ const recipientSetName = "recipient";
+ const tenantIds = ["tenant1", "tenant2"];
+ const splitMigrationId = UUID();
+ const tenantMigrationId = UUID();
+
+ // Ensure a high enough delay so the shard split document is not deleted before tenant migration
+ // is started.
+ sharedOptions = {};
+ sharedOptions["setParameter"] = {
+ shardSplitGarbageCollectionDelayMS: 36000000,
+ ttlMonitorSleepSecs: 1
+ };
+
+ const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions});
+
+ let recipientNodes = addRecipientNodes(test.getDonorRst(), recipientTagName);
+
+ const commitThread = commitSplitAsync(
+ test.getDonorRst(), tenantIds, recipientTagName, recipientSetName, splitMigrationId);
+ assert.commandWorked(commitThread.returnData());
+
+ // Remove recipient nodes
+ test.getDonorRst().nodes =
+ test.getDonorRst().nodes.filter(node => !recipientNodes.includes(node));
+ test.getDonorRst().ports =
+ test.getDonorRst().ports.filter(port => !recipientNodes.some(node => node.port === port));
+
+ assert.commandWorked(test.getDonorRst().getPrimary().adminCommand(
+ {forgetShardSplit: 1, migrationId: splitMigrationId}));
+
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(tenantMigrationId),
+ protocol,
+ };
+ if (protocol != "shard merge") {
+ migrationOpts["tenantId"] = tenantIds[0];
+ }
+ assert.commandFailed(test.startMigration(migrationOpts));
+
+ recipientNodes.forEach(node => {
+ MongoRunner.stopMongod(node);
+ });
+
+ test.stop();
+ jsTestLog("cannotStartMigrationWhenThereIsAnExistingAccessBlocker test completed");
+}
+
+cannotStartMigrationWhenThereIsAnExistingAccessBlocker("multitenant migrations");
+cannotStartMigrationWhenThereIsAnExistingAccessBlocker("shard merge");
diff --git a/jstests/serverless/serverless_reject_multiple_ops_migration_after_garbage_collection.js b/jstests/serverless/serverless_reject_multiple_ops_migration_after_garbage_collection.js
new file mode 100644
index 00000000000..f237d507262
--- /dev/null
+++ b/jstests/serverless/serverless_reject_multiple_ops_migration_after_garbage_collection.js
@@ -0,0 +1,69 @@
+/**
+ * @tags: [
+ * serverless,
+ * requires_fcv_52,
+ * featureFlagShardSplit,
+ * featureFlagShardMerge
+ * ]
+ */
+
+load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/serverless/libs/basic_serverless_test.js");
+load("jstests/serverless/libs/serverless_reject_multiple_ops_utils.js");
+load("jstests/libs/parallelTester.js");
+load("jstests/libs/uuid_util.js");
+
+function canStartMigrationAfterSplitGarbageCollection(protocol) {
+ // Test that we can start a migration after a shard split has been garbage collected.
+ const recipientTagName = "recipientTag";
+ const recipientSetName = "recipient";
+ const tenantIds = ["tenant1", "tenant2"];
+ const splitMigrationId = UUID();
+ const tenantMigrationId = UUID();
+
+ sharedOptions = {};
+ sharedOptions["setParameter"] = {shardSplitGarbageCollectionDelayMS: 0, ttlMonitorSleepSecs: 1};
+
+ const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions});
+
+ let recipientNodes = addRecipientNodes(test.getDonorRst(), recipientTagName);
+
+ const commitThread = commitSplitAsync(
+ test.getDonorRst(), tenantIds, recipientTagName, recipientSetName, splitMigrationId);
+ assert.commandWorked(commitThread.returnData());
+
+ // Remove recipient nodes
+ test.getDonorRst().nodes =
+ test.getDonorRst().nodes.filter(node => !recipientNodes.includes(node));
+ test.getDonorRst().ports =
+ test.getDonorRst().ports.filter(port => !recipientNodes.some(node => node.port === port));
+
+ assert.commandWorked(test.getDonorRst().getPrimary().adminCommand(
+ {forgetShardSplit: 1, migrationId: splitMigrationId}));
+
+ waitForGarbageCollectionForSplit(test.getDonorRst().nodes, splitMigrationId, tenantIds);
+
+ jsTestLog("Starting tenant migration");
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(tenantMigrationId),
+ protocol,
+ };
+ if (protocol != "shard merge") {
+ migrationOpts["tenantId"] = tenantIds[0];
+ }
+ assert.commandWorked(test.startMigration(migrationOpts));
+
+ TenantMigrationTest.assertCommitted(test.waitForMigrationToComplete(migrationOpts));
+ assert.commandWorked(test.forgetMigration(migrationOpts.migrationIdString));
+
+ recipientNodes.forEach(node => {
+ MongoRunner.stopMongod(node);
+ });
+
+ test.stop();
+ jsTestLog("canStartMigrationAfterSplitGarbageCollection test completed");
+}
+
+canStartMigrationAfterSplitGarbageCollection("multitenant migrations");
+canStartMigrationAfterSplitGarbageCollection("shard merge");
diff --git a/jstests/serverless/serverless_reject_multiple_ops_migration_different_tenant.js b/jstests/serverless/serverless_reject_multiple_ops_migration_different_tenant.js
new file mode 100644
index 00000000000..55309f25710
--- /dev/null
+++ b/jstests/serverless/serverless_reject_multiple_ops_migration_different_tenant.js
@@ -0,0 +1,73 @@
+/**
+ * @tags: [
+ * serverless,
+ * requires_fcv_52,
+ * featureFlagShardSplit,
+ * featureFlagShardMerge
+ * ]
+ */
+
+load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/serverless/libs/basic_serverless_test.js");
+load("jstests/serverless/libs/serverless_reject_multiple_ops_utils.js");
+load("jstests/libs/uuid_util.js");
+
+function cannotStartMigrationWithDifferentTenantWhileShardSplitIsInProgress(protocol) {
+ // Test that we cannot start a tenant migration while a shard split is in progress. Use a
+ // tenantId uninvolved in the split.
+ const recipientTagName = "recipientTag";
+ const recipientSetName = "recipient";
+ const tenantIds = ["tenant1", "tenant2"];
+ const splitMigrationId = UUID();
+ const tenantMigrationId = UUID();
+
+ sharedOptions = {};
+ sharedOptions["setParameter"] = {shardSplitGarbageCollectionDelayMS: 0, ttlMonitorSleepSecs: 1};
+
+ const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions});
+
+ let recipientNodes = addRecipientNodes(test.getDonorRst(), recipientTagName);
+
+ let fp =
+ configureFailPoint(test.getDonorRst().getPrimary(), "pauseShardSplitBeforeBlockingState");
+
+ const commitThread = commitSplitAsync(
+ test.getDonorRst(), tenantIds, recipientTagName, recipientSetName, splitMigrationId);
+ fp.wait();
+
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(tenantMigrationId),
+ protocol,
+ };
+ if (protocol != "shard merge") {
+ migrationOpts["tenantId"] = "otherTenantToMove";
+ }
+ jsTestLog("Starting tenant migration");
+ assert.commandFailedWithCode(test.startMigration(migrationOpts),
+ ErrorCodes.ConflictingServerlessOperation);
+
+ fp.off();
+
+ assert.commandWorked(commitThread.returnData());
+
+ test.getDonorRst().nodes =
+ test.getDonorRst().nodes.filter(node => !recipientNodes.includes(node));
+ test.getDonorRst().ports =
+ test.getDonorRst().ports.filter(port => !recipientNodes.some(node => node.port === port));
+
+ assert.commandWorked(test.getDonorRst().getPrimary().adminCommand(
+ {forgetShardSplit: 1, migrationId: splitMigrationId}));
+
+ recipientNodes.forEach(node => {
+ MongoRunner.stopMongod(node);
+ });
+
+ waitForGarbageCollectionForSplit(test.getDonorRst().nodes, splitMigrationId, tenantIds);
+
+ test.stop();
+ jsTestLog("cannotStartMigrationWithDifferentTenantWhileShardSplitIsInProgress test completed");
+}
+
+cannotStartMigrationWithDifferentTenantWhileShardSplitIsInProgress("multitenant migrations");
+cannotStartMigrationWithDifferentTenantWhileShardSplitIsInProgress("shard merge");
diff --git a/jstests/serverless/serverless_reject_multiple_ops_migration_donor.js b/jstests/serverless/serverless_reject_multiple_ops_migration_donor.js
new file mode 100644
index 00000000000..f237d507262
--- /dev/null
+++ b/jstests/serverless/serverless_reject_multiple_ops_migration_donor.js
@@ -0,0 +1,69 @@
+/**
+ * @tags: [
+ * serverless,
+ * requires_fcv_52,
+ * featureFlagShardSplit,
+ * featureFlagShardMerge
+ * ]
+ */
+
+load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/serverless/libs/basic_serverless_test.js");
+load("jstests/serverless/libs/serverless_reject_multiple_ops_utils.js");
+load("jstests/libs/parallelTester.js");
+load("jstests/libs/uuid_util.js");
+
+function canStartMigrationAfterSplitGarbageCollection(protocol) {
+ // Test that we can start a migration after a shard split has been garbage collected.
+ const recipientTagName = "recipientTag";
+ const recipientSetName = "recipient";
+ const tenantIds = ["tenant1", "tenant2"];
+ const splitMigrationId = UUID();
+ const tenantMigrationId = UUID();
+
+ sharedOptions = {};
+ sharedOptions["setParameter"] = {shardSplitGarbageCollectionDelayMS: 0, ttlMonitorSleepSecs: 1};
+
+ const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions});
+
+ let recipientNodes = addRecipientNodes(test.getDonorRst(), recipientTagName);
+
+ const commitThread = commitSplitAsync(
+ test.getDonorRst(), tenantIds, recipientTagName, recipientSetName, splitMigrationId);
+ assert.commandWorked(commitThread.returnData());
+
+ // Remove recipient nodes
+ test.getDonorRst().nodes =
+ test.getDonorRst().nodes.filter(node => !recipientNodes.includes(node));
+ test.getDonorRst().ports =
+ test.getDonorRst().ports.filter(port => !recipientNodes.some(node => node.port === port));
+
+ assert.commandWorked(test.getDonorRst().getPrimary().adminCommand(
+ {forgetShardSplit: 1, migrationId: splitMigrationId}));
+
+ waitForGarbageCollectionForSplit(test.getDonorRst().nodes, splitMigrationId, tenantIds);
+
+ jsTestLog("Starting tenant migration");
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(tenantMigrationId),
+ protocol,
+ };
+ if (protocol != "shard merge") {
+ migrationOpts["tenantId"] = tenantIds[0];
+ }
+ assert.commandWorked(test.startMigration(migrationOpts));
+
+ TenantMigrationTest.assertCommitted(test.waitForMigrationToComplete(migrationOpts));
+ assert.commandWorked(test.forgetMigration(migrationOpts.migrationIdString));
+
+ recipientNodes.forEach(node => {
+ MongoRunner.stopMongod(node);
+ });
+
+ test.stop();
+ jsTestLog("canStartMigrationAfterSplitGarbageCollection test completed");
+}
+
+canStartMigrationAfterSplitGarbageCollection("multitenant migrations");
+canStartMigrationAfterSplitGarbageCollection("shard merge");
diff --git a/jstests/serverless/serverless_reject_multiple_ops_migration_donor_retry.js b/jstests/serverless/serverless_reject_multiple_ops_migration_donor_retry.js
new file mode 100644
index 00000000000..1ea49a99f06
--- /dev/null
+++ b/jstests/serverless/serverless_reject_multiple_ops_migration_donor_retry.js
@@ -0,0 +1,86 @@
+/**
+ * @tags: [
+ * serverless,
+ * requires_fcv_52,
+ * featureFlagShardSplit,
+ * featureFlagShardMerge
+ * ]
+ */
+
+load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/serverless/libs/basic_serverless_test.js");
+load("jstests/serverless/libs/serverless_reject_multiple_ops_utils.js");
+load("jstests/libs/uuid_util.js");
+
+function retryMigrationAfterSplitCompletes(protocol) {
+ // Test that we cannot start a migration while a shard split is in progress.
+ const recipientTagName = "recipientTag";
+ const recipientSetName = "recipient";
+ const tenantIds = ["tenant1", "tenant2"];
+ const splitMigrationId = UUID();
+ const firstTenantMigrationId = UUID();
+ const secondTenantMigrationId = UUID();
+
+ sharedOptions = {};
+ sharedOptions["setParameter"] = {shardSplitGarbageCollectionDelayMS: 0, ttlMonitorSleepSecs: 1};
+
+ const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions});
+
+ const splitRst = test.getDonorRst();
+
+ let splitRecipientNodes = addRecipientNodes(splitRst, recipientTagName);
+
+ let fp = configureFailPoint(splitRst.getPrimary(), "pauseShardSplitBeforeBlockingState");
+
+ const commitThread =
+ commitSplitAsync(splitRst, tenantIds, recipientTagName, recipientSetName, splitMigrationId);
+ fp.wait();
+
+ const firstMigrationOpts = {
+ migrationIdString: extractUUIDFromObject(firstTenantMigrationId),
+ protocol,
+ };
+ if (protocol != "shard merge") {
+ firstMigrationOpts["tenantId"] = tenantIds[0];
+ }
+ jsTestLog("Starting tenant migration");
+ assert.commandFailedWithCode(test.startMigration(firstMigrationOpts),
+ ErrorCodes.ConflictingServerlessOperation);
+
+ fp.off();
+
+ assert.commandWorked(commitThread.returnData());
+
+ splitRst.nodes = splitRst.nodes.filter(node => !splitRecipientNodes.includes(node));
+ splitRst.ports =
+ splitRst.ports.filter(port => !splitRecipientNodes.some(node => node.port === port));
+
+ assert.commandWorked(
+ splitRst.getPrimary().adminCommand({forgetShardSplit: 1, migrationId: splitMigrationId}));
+
+ splitRecipientNodes.forEach(node => {
+ MongoRunner.stopMongod(node);
+ });
+
+ const secondMigrationOpts = {
+ migrationIdString: extractUUIDFromObject(secondTenantMigrationId),
+ protocol,
+ };
+ if (protocol != "shard merge") {
+ secondMigrationOpts["tenantId"] = tenantIds[0];
+ }
+ jsTestLog("Starting tenant migration");
+ assert.commandWorked(test.startMigration(secondMigrationOpts));
+ TenantMigrationTest.assertCommitted(
+ waitForMergeToComplete(secondMigrationOpts, secondTenantMigrationId, test));
+ assert.commandWorked(test.forgetMigration(secondMigrationOpts.migrationIdString));
+
+ waitForGarbageCollectionForSplit(splitRst.nodes, splitMigrationId, tenantIds);
+
+ test.stop();
+ jsTestLog("cannotStartMigrationWhileShardSplitIsInProgress test completed");
+}
+
+retryMigrationAfterSplitCompletes("multitenant migrations");
+retryMigrationAfterSplitCompletes("shard merge");
diff --git a/jstests/serverless/serverless_reject_multiple_ops_migration_fail.js b/jstests/serverless/serverless_reject_multiple_ops_migration_fail.js
new file mode 100644
index 00000000000..2bf713c1cb4
--- /dev/null
+++ b/jstests/serverless/serverless_reject_multiple_ops_migration_fail.js
@@ -0,0 +1,72 @@
+/**
+ * @tags: [
+ * serverless,
+ * requires_fcv_52,
+ * featureFlagShardSplit,
+ * featureFlagShardMerge
+ * ]
+ */
+
+load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/serverless/libs/basic_serverless_test.js");
+load("jstests/serverless/libs/serverless_reject_multiple_ops_utils.js");
+load("jstests/libs/uuid_util.js");
+
+function cannotStartMigrationWhileShardSplitIsInProgress(protocol) {
+ // Test that we cannot start a migration while a shard split is in progress.
+ const recipientTagName = "recipientTag";
+ const recipientSetName = "recipient";
+ const tenantIds = ["tenant1", "tenant2"];
+ const splitMigrationId = UUID();
+ const tenantMigrationId = UUID();
+
+ sharedOptions = {};
+ sharedOptions["setParameter"] = {shardSplitGarbageCollectionDelayMS: 0, ttlMonitorSleepSecs: 1};
+
+ const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions});
+
+ const splitRst = test.getDonorRst();
+
+ let splitRecipientNodes = addRecipientNodes(splitRst, recipientTagName);
+
+ let fp = configureFailPoint(splitRst.getPrimary(), "pauseShardSplitBeforeBlockingState");
+
+ const commitThread =
+ commitSplitAsync(splitRst, tenantIds, recipientTagName, recipientSetName, splitMigrationId);
+ fp.wait();
+
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(tenantMigrationId),
+ protocol,
+ };
+ if (protocol != "shard merge") {
+ migrationOpts["tenantId"] = tenantIds[0];
+ }
+ jsTestLog("Starting tenant migration");
+ assert.commandFailedWithCode(test.startMigration(migrationOpts),
+ ErrorCodes.ConflictingServerlessOperation);
+
+ fp.off();
+
+ assert.commandWorked(commitThread.returnData());
+
+ splitRst.nodes = splitRst.nodes.filter(node => !splitRecipientNodes.includes(node));
+ splitRst.ports =
+ splitRst.ports.filter(port => !splitRecipientNodes.some(node => node.port === port));
+
+ assert.commandWorked(
+ splitRst.getPrimary().adminCommand({forgetShardSplit: 1, migrationId: splitMigrationId}));
+
+ splitRecipientNodes.forEach(node => {
+ MongoRunner.stopMongod(node);
+ });
+
+ waitForGarbageCollectionForSplit(splitRst.nodes, splitMigrationId, tenantIds);
+
+ test.stop();
+ jsTestLog("cannotStartMigrationWhileShardSplitIsInProgress test completed");
+}
+
+cannotStartMigrationWhileShardSplitIsInProgress("multitenant migrations");
+cannotStartMigrationWhileShardSplitIsInProgress("shard merge");
diff --git a/jstests/serverless/serverless_reject_multiple_ops_migration_fail_on_recipient.js b/jstests/serverless/serverless_reject_multiple_ops_migration_fail_on_recipient.js
new file mode 100644
index 00000000000..70447f44323
--- /dev/null
+++ b/jstests/serverless/serverless_reject_multiple_ops_migration_fail_on_recipient.js
@@ -0,0 +1,78 @@
+/**
+ * @tags: [
+ * serverless,
+ * requires_fcv_52,
+ * featureFlagShardSplit,
+ * featureFlagShardMerge
+ * ]
+ */
+
+load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/serverless/libs/basic_serverless_test.js");
+load("jstests/serverless/libs/serverless_reject_multiple_ops_utils.js");
+load("jstests/libs/uuid_util.js");
+
+function cannotStartMigrationWhileShardSplitIsInProgressOnRecipient(protocol) {
+ // Test that we cannot start a migration while a shard split is in progress.
+ const recipientTagName = "recipientTag";
+ const recipientSetName = "recipient";
+ const tenantIds = ["tenant1", "tenant2"];
+ const splitMigrationId = UUID();
+ const tenantMigrationId = UUID();
+
+ sharedOptions = {};
+ sharedOptions["setParameter"] = {shardSplitGarbageCollectionDelayMS: 0, ttlMonitorSleepSecs: 1};
+
+ const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions});
+
+ const splitRst = test.getRecipientRst();
+
+ let splitRecipientNodes = addRecipientNodes(splitRst, recipientTagName);
+
+ let fp = configureFailPoint(splitRst.getPrimary(), "pauseShardSplitBeforeBlockingState");
+
+ const commitThread =
+ commitSplitAsync(splitRst, tenantIds, recipientTagName, recipientSetName, splitMigrationId);
+ fp.wait();
+
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(tenantMigrationId),
+ protocol,
+ };
+ if (protocol != "shard merge") {
+ migrationOpts["tenantId"] = tenantIds[0];
+ }
+ jsTestLog("Starting tenant migration");
+ assert.commandWorked(test.startMigration(migrationOpts));
+
+ const result = assert.commandWorked(test.waitForMigrationToComplete(migrationOpts));
+ assert.eq(result.state, "aborted");
+ assert.eq(result.abortReason.code, ErrorCodes.ConflictingServerlessOperation);
+
+ assert.commandWorked(
+ test.forgetMigration(migrationOpts.migrationIdString, false /* retryOnRetryableErrors */));
+
+ fp.off();
+
+ assert.commandWorked(commitThread.returnData());
+
+ splitRst.nodes = splitRst.nodes.filter(node => !splitRecipientNodes.includes(node));
+ splitRst.ports =
+ splitRst.ports.filter(port => !splitRecipientNodes.some(node => node.port === port));
+
+ assert.commandWorked(
+ splitRst.getPrimary().adminCommand({forgetShardSplit: 1, migrationId: splitMigrationId}));
+
+ splitRecipientNodes.forEach(node => {
+ MongoRunner.stopMongod(node);
+ });
+
+ waitForGarbageCollectionForSplit(splitRst.nodes, splitMigrationId, tenantIds);
+
+ test.stop();
+ jsTestLog("cannotStartMigrationWhileShardSplitIsInProgressOnRecipient test completed");
+}
+
+cannotStartMigrationWhileShardSplitIsInProgressOnRecipient("multitenant migrations");
+cannotStartMigrationWhileShardSplitIsInProgressOnRecipient("shard merge");
diff --git a/jstests/serverless/serverless_reject_multiple_ops_migration_recipient_retry.js b/jstests/serverless/serverless_reject_multiple_ops_migration_recipient_retry.js
new file mode 100644
index 00000000000..f3bd83a5243
--- /dev/null
+++ b/jstests/serverless/serverless_reject_multiple_ops_migration_recipient_retry.js
@@ -0,0 +1,90 @@
+/**
+ * @tags: [
+ * serverless,
+ * requires_fcv_52,
+ * featureFlagShardSplit,
+ * featureFlagShardMerge
+ * ]
+ */
+
+load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/serverless/libs/basic_serverless_test.js");
+load("jstests/serverless/libs/serverless_reject_multiple_ops_utils.js");
+load("jstests/libs/uuid_util.js");
+
+function cannotStartMigrationWhileShardSplitIsInProgressOnRecipient(protocol) {
+ // Test that we cannot start a migration while a shard split is in progress.
+ const recipientTagName = "recipientTag";
+ const recipientSetName = "recipient";
+ const tenantIds = ["tenant1", "tenant2"];
+ const splitMigrationId = UUID();
+ const tenantMigrationId = UUID();
+ const secondTenantMigrationId = UUID();
+
+ sharedOptions = {};
+ sharedOptions["setParameter"] = {shardSplitGarbageCollectionDelayMS: 0, ttlMonitorSleepSecs: 1};
+
+ const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions});
+
+ const splitRst = test.getRecipientRst();
+
+ let splitRecipientNodes = addRecipientNodes(splitRst, recipientTagName);
+
+ let fp = configureFailPoint(splitRst.getPrimary(), "pauseShardSplitBeforeBlockingState");
+
+ const commitThread =
+ commitSplitAsync(splitRst, tenantIds, recipientTagName, recipientSetName, splitMigrationId);
+ fp.wait();
+
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(tenantMigrationId),
+ protocol,
+ };
+ if (protocol != "shard merge") {
+ migrationOpts["tenantId"] = tenantIds[0];
+ }
+ jsTestLog("Starting tenant migration");
+ assert.commandWorked(test.startMigration(migrationOpts));
+
+ const result = assert.commandWorked(test.waitForMigrationToComplete(migrationOpts));
+ assert.eq(result.state, "aborted");
+ assert.eq(result.abortReason.code, ErrorCodes.ConflictingServerlessOperation);
+ assert.commandWorked(test.forgetMigration(migrationOpts.migrationIdString));
+
+ fp.off();
+
+ assert.commandWorked(commitThread.returnData());
+
+ splitRst.nodes = splitRst.nodes.filter(node => !splitRecipientNodes.includes(node));
+ splitRst.ports =
+ splitRst.ports.filter(port => !splitRecipientNodes.some(node => node.port === port));
+
+ assert.commandWorked(
+ splitRst.getPrimary().adminCommand({forgetShardSplit: 1, migrationId: splitMigrationId}));
+
+ splitRecipientNodes.forEach(node => {
+ MongoRunner.stopMongod(node);
+ });
+
+ const secondMigrationOpts = {
+ migrationIdString: extractUUIDFromObject(secondTenantMigrationId),
+ protocol,
+ };
+ if (protocol != "shard merge") {
+ secondMigrationOpts["tenantId"] = tenantIds[0];
+ }
+ jsTestLog("Starting tenant migration");
+ assert.commandWorked(test.startMigration(secondMigrationOpts));
+ TenantMigrationTest.assertCommitted(
+ waitForMergeToComplete(secondMigrationOpts, secondTenantMigrationId, test));
+ assert.commandWorked(test.forgetMigration(secondMigrationOpts.migrationIdString));
+
+ waitForGarbageCollectionForSplit(splitRst.nodes, splitMigrationId, tenantIds);
+
+ test.stop();
+ jsTestLog("cannotStartMigrationWhileShardSplitIsInProgressOnRecipient test completed");
+}
+
+cannotStartMigrationWhileShardSplitIsInProgressOnRecipient("multitenant migrations");
+cannotStartMigrationWhileShardSplitIsInProgressOnRecipient("shard merge");
diff --git a/jstests/serverless/serverless_reject_multiple_ops_split.js b/jstests/serverless/serverless_reject_multiple_ops_split.js
new file mode 100644
index 00000000000..71ea388a9a7
--- /dev/null
+++ b/jstests/serverless/serverless_reject_multiple_ops_split.js
@@ -0,0 +1,82 @@
+/**
+ * @tags: [
+ * serverless,
+ * requires_fcv_52,
+ * featureFlagShardSplit,
+ * featureFlagShardMerge
+ * ]
+ */
+
+load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/serverless/libs/basic_serverless_test.js");
+load("jstests/serverless/libs/serverless_reject_multiple_ops_utils.js");
+load("jstests/libs/uuid_util.js");
+
+function cannotStartShardSplitWithMigrationInProgress(
+ {recipientTagName, protocol, shardSplitRst, test}) {
+ // Test that we cannot start a shard split while a migration is in progress.
+ const recipientSetName = "recipient";
+ const tenantIds = ["tenant1", "tenant2"];
+ const splitMigrationId = UUID();
+ const tenantMigrationId = UUID();
+
+ let fp = configureFailPoint(test.getDonorRst().getPrimary(),
+ "pauseTenantMigrationBeforeLeavingDataSyncState");
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(tenantMigrationId),
+ protocol,
+ };
+ if (protocol != "shard merge") {
+ migrationOpts["tenantId"] = tenantIds[0];
+ }
+ jsTestLog("Starting tenant migration");
+ assert.commandWorked(test.startMigration(migrationOpts));
+
+ fp.wait();
+
+ const commitThread = commitSplitAsync(
+ shardSplitRst, tenantIds, recipientTagName, recipientSetName, splitMigrationId);
+ assert.commandFailed(commitThread.returnData());
+
+ fp.off();
+
+ TenantMigrationTest.assertCommitted(
+ waitForMergeToComplete(migrationOpts, tenantMigrationId, test));
+ assert.commandWorked(test.forgetMigration(migrationOpts.migrationIdString));
+
+ jsTestLog("cannotStartShardSplitWithMigrationInProgress test completed");
+}
+
+sharedOptions = {};
+sharedOptions["setParameter"] = {
+ shardSplitGarbageCollectionDelayMS: 0,
+ tenantMigrationGarbageCollectionDelayMS: 0,
+ ttlMonitorSleepSecs: 1
+};
+
+const recipientTagName = "recipientTag";
+
+const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions});
+addRecipientNodes(test.getDonorRst(), recipientTagName);
+addRecipientNodes(test.getRecipientRst(), recipientTagName);
+
+cannotStartShardSplitWithMigrationInProgress({
+ recipientTagName,
+ protocol: "multitenant migrations",
+ shardSplitRst: test.getDonorRst(),
+ test
+});
+cannotStartShardSplitWithMigrationInProgress(
+ {recipientTagName, protocol: "shard merge", shardSplitRst: test.getDonorRst(), test});
+
+cannotStartShardSplitWithMigrationInProgress({
+ recipientTagName,
+ protocol: "multitenant migrations",
+ shardSplitRst: test.getRecipientRst(),
+ test
+});
+cannotStartShardSplitWithMigrationInProgress(
+ {recipientTagName, protocol: "shard merge", shardSplitRst: test.getRecipientRst(), test});
+
+test.stop();
diff --git a/jstests/serverless/serverless_reject_multiple_ops_split_retry.js b/jstests/serverless/serverless_reject_multiple_ops_split_retry.js
new file mode 100644
index 00000000000..b4716494af6
--- /dev/null
+++ b/jstests/serverless/serverless_reject_multiple_ops_split_retry.js
@@ -0,0 +1,127 @@
+/**
+ * @tags: [
+ * serverless,
+ * requires_fcv_52,
+ * featureFlagShardSplit,
+ * featureFlagShardMerge
+ * ]
+ */
+
+load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/serverless/libs/basic_serverless_test.js");
+load("jstests/serverless/libs/serverless_reject_multiple_ops_utils.js");
+load("jstests/libs/uuid_util.js");
+
+function retrySplit({protocol, recipientTagName, recipientSetName, tenantIds, test, splitRst}) {
+ const tenantMigrationId = UUID();
+ const firstSplitMigrationId = UUID();
+ const secondSplitMigrationId = UUID();
+
+ let recipientNodes = addRecipientNodes(splitRst, recipientTagName);
+
+ let fp = configureFailPoint(test.getDonorRst().getPrimary(),
+ "pauseTenantMigrationBeforeLeavingDataSyncState");
+
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(tenantMigrationId),
+ protocol,
+ };
+ if (protocol != "shard merge") {
+ migrationOpts["tenantId"] = tenantIds[0];
+ }
+ jsTestLog("Starting tenant migration");
+ assert.commandWorked(test.startMigration(migrationOpts));
+
+ fp.wait();
+
+ const commitThread = commitSplitAsync(
+ splitRst, tenantIds, recipientTagName, recipientSetName, firstSplitMigrationId);
+ assert.commandFailed(commitThread.returnData());
+
+ fp.off();
+
+ TenantMigrationTest.assertCommitted(
+ waitForMergeToComplete(migrationOpts, tenantMigrationId, test));
+ assert.commandWorked(test.forgetMigration(migrationOpts.migrationIdString));
+
+ // Potential race condition as we do not know how quickly the future continuation in
+ // PrimaryOnlyService will remove the instance from its map.
+ sleep(1000);
+ const secondCommitThread = commitSplitAsync(
+ splitRst, tenantIds, recipientTagName, recipientSetName, secondSplitMigrationId);
+ assert.commandWorked(secondCommitThread.returnData());
+
+ splitRst.nodes = splitRst.nodes.filter(node => !recipientNodes.includes(node));
+ splitRst.ports =
+ splitRst.ports.filter(port => !recipientNodes.some(node => node.port === port));
+
+ assert.commandWorked(splitRst.getPrimary().getDB("admin").runCommand(
+ {forgetShardSplit: 1, migrationId: secondSplitMigrationId}));
+
+ recipientNodes.forEach(node => {
+ MongoRunner.stopMongod(node);
+ });
+}
+
+// Test that we cannot start a shard split while a migration is in progress.
+const recipientTagName = "recipientTag";
+const recipientSetName = "recipient";
+const tenantIds = ["tenant1", "tenant2"];
+
+sharedOptions = {
+ setParameter: {
+ shardSplitGarbageCollectionDelayMS: 0,
+ tenantMigrationGarbageCollectionDelayMS: 0,
+ ttlMonitorSleepSecs: 1
+ }
+};
+
+const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions});
+
+// "multitenant migration" with shard split on donor
+retrySplit({
+ protocol: "multitenant migrations",
+ recipientTagName,
+ recipientSetName,
+ tenantIds,
+ test,
+ splitRst: test.getDonorRst()
+});
+
+// "multitenant migration" with shard split on recipient
+retrySplit({
+ protocol: "multitenant migrations",
+ recipientTagName,
+ recipientSetName,
+ tenantIds,
+ test,
+ splitRst: test.getRecipientRst()
+});
+
+// "shard merge" with shard split on donor
+retrySplit({
+ protocol: "shard merge",
+ recipientTagName,
+ recipientSetName,
+ tenantIds,
+ test,
+ splitRst: test.getDonorRst()
+});
+
+test.stop();
+
+// We need a new test for the next shard merge as adding nodes will cause a crash.
+const test2 = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions});
+
+// "shard merge" with shard split on recipient
+retrySplit({
+ protocol: "multitenant migrations",
+ recipientTagName,
+ recipientSetName,
+ tenantIds,
+ test: test2,
+ splitRst: test2.getDonorRst()
+});
+
+test2.stop();
diff --git a/jstests/serverless/serverless_reject_multiple_ops_split_success.js b/jstests/serverless/serverless_reject_multiple_ops_split_success.js
new file mode 100644
index 00000000000..4f77bae0293
--- /dev/null
+++ b/jstests/serverless/serverless_reject_multiple_ops_split_success.js
@@ -0,0 +1,61 @@
+/**
+ * @tags: [
+ * serverless,
+ * requires_fcv_52,
+ * featureFlagShardSplit,
+ * featureFlagShardMerge
+ * ]
+ */
+
+load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/serverless/libs/basic_serverless_test.js");
+load("jstests/serverless/libs/serverless_reject_multiple_ops_utils.js");
+load("jstests/libs/uuid_util.js");
+
+function canStartShardSplitWithAbortedMigration({protocol, runOnRecipient}) {
+ const recipientTagName = "recipientTag";
+ const recipientSetName = "recipient";
+ const tenantIds = ["tenant1", "tenant2"];
+ const splitMigrationId = UUID();
+ const tenantMigrationId = UUID();
+
+ sharedOptions = {};
+ sharedOptions["setParameter"] = {shardSplitGarbageCollectionDelayMS: 0, ttlMonitorSleepSecs: 1};
+
+ const test = new TenantMigrationTest({quickGarbageCollection: false, sharedOptions});
+
+ const shardSplitRst = runOnRecipient ? test.getRecipientRst() : test.getDonorRst();
+
+ let recipientNodes = addRecipientNodes(shardSplitRst, recipientTagName);
+
+ let fp = configureFailPoint(test.getDonorRst().getPrimary(),
+ "abortTenantMigrationBeforeLeavingBlockingState");
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(tenantMigrationId),
+ protocol,
+ };
+ if (protocol != "shard merge") {
+ migrationOpts["tenantId"] = tenantIds[0];
+ }
+ jsTestLog("Starting tenant migration");
+ assert.commandWorked(test.startMigration(migrationOpts));
+
+ TenantMigrationTest.assertAborted(
+ waitForMergeToComplete(migrationOpts, tenantMigrationId, test));
+ assert.commandWorked(test.forgetMigration(migrationOpts.migrationIdString));
+
+ const commitThread = commitSplitAsync(
+ shardSplitRst, tenantIds, recipientTagName, recipientSetName, splitMigrationId);
+ assert.commandWorked(commitThread.returnData());
+
+ recipientNodes.forEach(node => {
+ MongoRunner.stopMongod(node);
+ });
+
+ test.stop();
+ jsTestLog("canStartShardSplitWithAbortedMigration test completed");
+}
+
+canStartShardSplitWithAbortedMigration({protocol: "multitenant migrations", runOnRecipient: false});
+canStartShardSplitWithAbortedMigration({protocol: "shard merge", runOnRecipient: false});
diff --git a/jstests/serverless/shard_split_recipient_removes_access_blockers.js b/jstests/serverless/shard_split_recipient_removes_access_blockers.js
index 546b44c4d08..5398c8aba05 100644
--- a/jstests/serverless/shard_split_recipient_removes_access_blockers.js
+++ b/jstests/serverless/shard_split_recipient_removes_access_blockers.js
@@ -1,5 +1,5 @@
/*
- * Test that tenant access blockers are removed from recipients when applying the recipient config.
+ * Test that tenant access blockers are removed when applying the recipient config
*
* @tags: [requires_fcv_52, featureFlagShardSplit, serverless]
*/
diff --git a/jstests/serverless/shard_split_recipient_removes_serverless_lock.js b/jstests/serverless/shard_split_recipient_removes_serverless_lock.js
new file mode 100644
index 00000000000..adb55c8753e
--- /dev/null
+++ b/jstests/serverless/shard_split_recipient_removes_serverless_lock.js
@@ -0,0 +1,49 @@
+/*
+ * Test the serverless operation lock is released from recipients when the state document is
+ * removed.
+ *
+ * @tags: [requires_fcv_62, featureFlagShardSplit, serverless]
+ */
+
+load("jstests/libs/fail_point_util.js");
+load("jstests/serverless/libs/basic_serverless_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+const {ServerlessLockType, getServerlessOperationLock} = TenantMigrationUtil;
+
+(function() {
+"use strict";
+
+// Skip db hash check because secondary is left with a different config.
+TestData.skipCheckDBHashes = true;
+
+const test = new BasicServerlessTest({
+ recipientTagName: "recipientNode",
+ recipientSetName: "recipient",
+ quickGarbageCollection: true
+});
+test.addRecipientNodes();
+
+const donorPrimary = test.donor.getPrimary();
+const tenantIds = ["tenant1", "tenant2"];
+const operation = test.createSplitOperation(tenantIds);
+
+const donorAfterBlockingFailpoint =
+ configureFailPoint(donorPrimary.getDB("admin"), "pauseShardSplitAfterBlocking");
+
+const commitOp = operation.commitAsync();
+donorAfterBlockingFailpoint.wait();
+
+jsTestLog("Asserting recipient nodes have installed the serverless lock");
+assert.soon(() => test.recipientNodes.every(node => getServerlessOperationLock(node) ===
+ ServerlessLockType.ShardSplitDonor));
+donorAfterBlockingFailpoint.off();
+
+commitOp.join();
+assert.commandWorked(commitOp.returnData());
+
+jsTestLog("Asserting the serverless exclusion lock has been released");
+assert.soon(() => test.recipientNodes.every(node => getServerlessOperationLock(node) ==
+ ServerlessLockType.None));
+
+test.stop();
+})();
diff --git a/jstests/serverless/shard_split_startup_recovery_initially_aborted.js b/jstests/serverless/shard_split_startup_recovery_initially_aborted.js
index f6301fa7f9a..faf0245b41a 100644
--- a/jstests/serverless/shard_split_startup_recovery_initially_aborted.js
+++ b/jstests/serverless/shard_split_startup_recovery_initially_aborted.js
@@ -1,7 +1,9 @@
/**
- * Starts a shard split througt `abortShardSplit` and assert that no tenant access blockers are
+ * Starts a shard split through `abortShardSplit` and assert that no tenant access blockers are
* recovered since we do not recover access blockers for aborted split marked garbage collectable.
- * @tags: [requires_fcv_52, featureFlagShardSplit]
+ * Also verifies the serverless operation lock is not acquired when starting a split in aborted
+ * state.
+ * @tags: [requires_fcv_62, featureFlagShardSplit]
*/
load("jstests/libs/fail_point_util.js"); // for "configureFailPoint"
@@ -9,6 +11,8 @@ load('jstests/libs/parallel_shell_helpers.js'); // for "startPa
load("jstests/noPassthrough/libs/server_parameter_helpers.js"); // for "setParameter"
load("jstests/serverless/libs/basic_serverless_test.js");
load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+const {ServerlessLockType, getServerlessOperationLock} = TenantMigrationUtil;
(function() {
"use strict";
@@ -59,5 +63,8 @@ tenantIds.every(tenantId => {
{node: donorPrimary, tenantId: tenantId}));
});
+// We do not acquire the lock for document marked for garbage collection
+assert.eq(getServerlessOperationLock(donorPrimary), ServerlessLockType.None);
+
test.stop();
})();
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index 216b40d2a9e..755ae596893 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -494,13 +494,14 @@ error_codes:
- {code: 381, name: ReshardingCoordinatorServiceConflictingOperationInProgress, extra: ReshardingCoordinatorServiceConflictingOperationInProgressInfo, categories: [InternalOnly]}
-
- {code: 382, name: RemoteCommandExecutionError, extra: RemoteCommandExecutionErrorInfo, categories: [InternalOnly]}
- {code: 383, name: CollectionIsEmptyLocally, categories: [InternalOnly]}
- {code: 384, name: ConnectionError, categories: [NetworkError,RetriableError,InternalOnly]}
+ - {code: 385, name: ConflictingServerlessOperation}
+
# Error codes 4000-8999 are reserved.
# Non-sequential error codes for compatibility only)
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 54bb5718fa1..f4c2d2d9447 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -548,6 +548,7 @@ env.Library(
'$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker',
'$BUILD_DIR/mongo/db/s/sharding_runtime_d',
'$BUILD_DIR/mongo/db/server_base',
+ '$BUILD_DIR/mongo/db/serverless/serverless_lock',
'$BUILD_DIR/mongo/db/session/kill_sessions_local',
'$BUILD_DIR/mongo/db/session/session_catalog_mongod',
'$BUILD_DIR/mongo/db/storage/historical_ident_tracker',
@@ -763,6 +764,7 @@ env.Library(
'$BUILD_DIR/mongo/db/commands/test_commands_enabled',
'$BUILD_DIR/mongo/db/index_builds_coordinator_interface',
'$BUILD_DIR/mongo/db/server_base',
+ '$BUILD_DIR/mongo/db/serverless/serverless_lock',
'$BUILD_DIR/mongo/db/session/session_catalog',
'$BUILD_DIR/mongo/db/storage/journal_flusher',
'delayable_timeout_callback',
@@ -1245,6 +1247,7 @@ env.Library(
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/index_builds_coordinator_interface',
'$BUILD_DIR/mongo/db/server_base',
+ '$BUILD_DIR/mongo/db/serverless/serverless_lock',
'$BUILD_DIR/mongo/db/session/session_catalog_mongod',
'$BUILD_DIR/mongo/executor/scoped_task_executor',
'repl_server_parameters',
@@ -1413,6 +1416,7 @@ env.Library(
'$BUILD_DIR/mongo/db/multitenancy',
'$BUILD_DIR/mongo/db/ops/write_ops_exec',
'$BUILD_DIR/mongo/db/pipeline/process_interface/mongo_process_interface',
+ '$BUILD_DIR/mongo/db/serverless/serverless_lock',
'$BUILD_DIR/mongo/db/session/session_catalog_mongod',
'$BUILD_DIR/mongo/db/storage/wiredtiger/storage_wiredtiger_import',
'$BUILD_DIR/mongo/db/transaction/transaction',
@@ -1485,6 +1489,7 @@ env.Library(
"$BUILD_DIR/mongo/db/catalog/local_oplog_info",
"$BUILD_DIR/mongo/db/concurrency/exception_util",
"$BUILD_DIR/mongo/db/index_builds_coordinator_interface",
+ "$BUILD_DIR/mongo/db/serverless/serverless_lock",
],
)
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 361e6aabe1f..cd562d19af5 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -63,6 +63,7 @@
#include "mongo/db/repl/sync_source_selector.h"
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
#include "mongo/db/repl/transaction_oplog_application.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/session/session_txn_record_gen.h"
#include "mongo/executor/task_executor.h"
#include "mongo/executor/thread_pool_task_executor.h"
@@ -577,6 +578,7 @@ void InitialSyncer::_tearDown_inlock(OperationContext* opCtx,
_storage->oplogDiskLocRegister(opCtx, initialDataTimestamp, orderedCommit);
tenant_migration_access_blocker::recoverTenantMigrationAccessBlockers(opCtx);
+ ServerlessOperationLockRegistry::recoverLocks(opCtx);
reconstructPreparedTransactions(opCtx, repl::OplogApplication::Mode::kInitialSync);
_replicationProcess->getConsistencyMarkers()->setInitialSyncIdIfNotSet(opCtx);
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index e98b65b9d76..8fde4801165 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -2049,6 +2049,7 @@ TEST_F(
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
// Start the real work.
ASSERT_OK(initialSyncer->startup(opCtx.get(), initialSyncMaxAttempts));
@@ -2091,6 +2092,8 @@ TEST_F(InitialSyncerTest,
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
+
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
@@ -2199,6 +2202,7 @@ TEST_F(InitialSyncerTest,
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -2270,6 +2274,7 @@ TEST_F(
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -2581,6 +2586,7 @@ TEST_F(InitialSyncerTest, InitialSyncerRetriesLastOplogEntryFetcherNetworkError)
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -3225,6 +3231,8 @@ TEST_F(InitialSyncerTest, InitialSyncerHandlesNetworkErrorsFromRollbackCheckerAf
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
+
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -3539,6 +3547,7 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -4204,6 +4213,7 @@ TEST_F(InitialSyncerTest,
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
doSuccessfulInitialSyncWithOneBatch();
}
@@ -4219,6 +4229,7 @@ TEST_F(InitialSyncerTest,
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -4552,6 +4563,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
// Skip clearing initial sync progress so that we can check initialSyncStatus fields after
// initial sync is complete.
@@ -4921,6 +4933,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgressForNetwork
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
// Skip clearing initial sync progress so that we can check initialSyncStatus fields after
// initial sync is complete.
diff --git a/src/mongo/db/repl/primary_only_service.cpp b/src/mongo/db/repl/primary_only_service.cpp
index c15f88c3df0..da06257acdf 100644
--- a/src/mongo/db/repl/primary_only_service.cpp
+++ b/src/mongo/db/repl/primary_only_service.cpp
@@ -777,6 +777,16 @@ std::shared_ptr<PrimaryOnlyService::Instance> PrimaryOnlyService::_insertNewInst
return instance->run(std::move(scopedExecutor), std::move(token));
})
+ // TODO SERVER-61717 remove this error handler once instance are automatically released
+ // at the end of run()
+ .onError<ErrorCodes::ConflictingServerlessOperation>([this, instanceID](Status status) {
+ LOGV2(6531507,
+ "Removing instance due to ConflictingServerlessOperation error",
+ "instanceID"_attr = instanceID);
+ releaseInstance(instanceID, Status::OK());
+
+ return status;
+ })
.semi();
auto [it, inserted] = _activeInstances.try_emplace(
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index a943637c2e5..2feb1ed6b7b 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -88,6 +88,7 @@
#include "mongo/db/repl/update_position_args.h"
#include "mongo/db/repl/vote_requester.h"
#include "mongo/db/server_options.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/session/kill_sessions_local.h"
#include "mongo/db/session/session_catalog.h"
#include "mongo/db/shutdown_in_progress_quiesce_info.h"
@@ -534,6 +535,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(
}
tenant_migration_access_blocker::recoverTenantMigrationAccessBlockers(opCtx);
+ ServerlessOperationLockRegistry::recoverLocks(opCtx);
LOGV2(4280506, "Reconstructing prepared transactions");
reconstructPreparedTransactions(opCtx, OplogApplication::Mode::kRecovering);
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index 101ddcf0bb3..9f7f2e5863d 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -192,6 +192,8 @@ void ReplCoordTest::start() {
// Skip recovering user writes critical sections for the same reason as the above.
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ // Skip recovering of serverless mutual exclusion locks for the same reason as the above.
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
invariant(!_callShutdown);
// if we haven't initialized yet, do that first.
if (!_repl) {
diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp
index fa3d51489a8..2fff5e05a99 100644
--- a/src/mongo/db/repl/rollback_impl.cpp
+++ b/src/mongo/db/repl/rollback_impl.cpp
@@ -58,6 +58,7 @@
#include "mongo/db/repl/transaction_oplog_application.h"
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/db/server_recovery.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/session/kill_sessions_local.h"
#include "mongo/db/session/session_catalog_mongod.h"
#include "mongo/db/session/session_txn_record_gen.h"
@@ -652,6 +653,7 @@ void RollbackImpl::_runPhaseFromAbortToReconstructPreparedTxns(
_correctRecordStoreCounts(opCtx);
tenant_migration_access_blocker::recoverTenantMigrationAccessBlockers(opCtx);
+ ServerlessOperationLockRegistry::recoverLocks(opCtx);
// Reconstruct prepared transactions after counts have been adjusted. Since prepared
// transactions were aborted (i.e. the in-memory counts were rolled-back) before computing
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp
index 85e3f012bf0..360c0599db6 100644
--- a/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp
+++ b/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp
@@ -88,7 +88,7 @@ void TenantMigrationAccessBlockerRegistry::add(StringData tenantId,
if (it != _tenantMigrationAccessBlockers.end()) {
auto existingMtab = it->second.getAccessBlocker(mtabType);
if (existingMtab) {
- tasserted(ErrorCodes::ConflictingOperationInProgress,
+ uasserted(ErrorCodes::ConflictingServerlessOperation,
str::stream() << "This node is already a "
<< (mtabType == MtabType::kDonor ? "donor" : "recipient")
<< " for tenantId \"" << tenantId << "\" with migrationId \""
@@ -121,7 +121,7 @@ void TenantMigrationAccessBlockerRegistry::add(std::shared_ptr<TenantMigrationAc
std::find_if(_tenantMigrationAccessBlockers.begin(),
_tenantMigrationAccessBlockers.end(),
[](const auto& pair) { return pair.second.getDonorAccessBlocker().get(); });
- tassert(6114105,
+ uassert(ErrorCodes::ConflictingServerlessOperation,
str::stream()
<< "Trying to add donor blocker for all tenants when this node already has a donor "
"blocker for \""
diff --git a/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp b/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp
index 491f6e15753..65f765cf94e 100644
--- a/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/repl/tenant_migration_decoration.h"
#include "mongo/db/repl/tenant_migration_donor_op_observer.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/logv2/log.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
@@ -54,6 +55,10 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx,
const TenantMigrationDonorDocument& donorStateDoc) {
invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kAbortingIndexBuilds);
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .acquireLock(ServerlessOperationLockRegistry::LockType::kTenantDonor,
+ donorStateDoc.getId());
+
auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>(opCtx->getServiceContext(),
donorStateDoc.getId());
if (donorStateDoc.getProtocol().value_or(MigrationProtocolEnum::kMultitenantMigrations) ==
@@ -69,6 +74,9 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx,
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.remove(donorStateDoc.getTenantId(),
TenantMigrationAccessBlocker::BlockerType::kDonor);
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor,
+ donorStateDoc.getId());
});
}
} else {
@@ -85,6 +93,9 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx,
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.removeAccessBlockersForMigration(
donorStateDoc.getId(), TenantMigrationAccessBlocker::BlockerType::kDonor);
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor,
+ donorStateDoc.getId());
});
}
}
@@ -155,6 +166,10 @@ public:
void commit(OperationContext* opCtx, boost::optional<Timestamp>) override {
if (_donorStateDoc.getExpireAt()) {
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor,
+ _donorStateDoc.getId());
+
auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker(
opCtx->getServiceContext(), _donorStateDoc.getTenantId());
@@ -338,6 +353,9 @@ repl::OpTime TenantMigrationDonorOpObserver::onDropCollection(OperationContext*
opCtx->recoveryUnit()->onCommit([opCtx](boost::optional<Timestamp>) {
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.removeAll(TenantMigrationAccessBlocker::BlockerType::kDonor);
+
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .onDropStateCollection(ServerlessOperationLockRegistry::LockType::kTenantDonor);
});
}
return {};
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index 1d3f4cdf1e2..0da1de32c4e 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -123,7 +123,6 @@ void checkForTokenInterrupt(const CancellationToken& token) {
uassert(ErrorCodes::CallbackCanceled, "Donor service interrupted", !token.isCanceled());
}
-
template <class Promise>
void setPromiseFromStatusIfNotReady(WithLock lk, Promise& promise, Status status) {
if (promise.getFuture().isReady()) {
@@ -155,6 +154,17 @@ void setPromiseOkIfNotReady(WithLock lk, Promise& promise) {
promise.emplaceValue();
}
+bool isNotDurableAndServerlessConflict(WithLock lk, SharedPromise<void>& promise) {
+ auto future = promise.getFuture();
+
+ if (!future.isReady() ||
+ future.getNoThrow().code() != ErrorCodes::ConflictingServerlessOperation) {
+ return false;
+ }
+
+ return true;
+}
+
} // namespace
void TenantMigrationDonorService::checkIfConflictsWithOtherInstances(
@@ -515,7 +525,16 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertState
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
})
- .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); })
+ .until([&](StatusWith<repl::OpTime> swOpTime) {
+ if (swOpTime.getStatus().code() == ErrorCodes::ConflictingServerlessOperation) {
+ LOGV2(6531508,
+ "Tenant migration completed due to serverless lock error",
+ "id"_attr = _migrationUuid,
+ "status"_attr = swOpTime.getStatus());
+ uassertStatusOK(swOpTime);
+ }
+ return swOpTime.getStatus().isOK();
+ })
.withBackoffBetweenIterations(kExponentialBackoff)
.on(**executor, token);
}
@@ -950,6 +969,8 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
->incTotalMigrationDonationsCommitted();
}
}
+
+ return Status::OK();
})
.then([this, self = shared_from_this(), executor, token, recipientTargeterRS] {
return _waitForForgetMigrationThenMarkMigrationGarbageCollectable(
@@ -977,6 +998,13 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
"tenantId"_attr = _tenantId,
"status"_attr = status,
"abortReason"_attr = _abortReason);
+
+ // If a ConflictingServerlessOperation was thrown during the initial insertion we do not
+ // have a state document. In that case return the error to PrimaryOnlyService so it
+ // frees the instance from its map.
+ if (isNotDurableAndServerlessConflict(lg, _initialDonorStateDurablePromise)) {
+ uassertStatusOK(_initialDonorStateDurablePromise.getFuture().getNoThrow());
+ }
})
.semi();
}
@@ -1363,7 +1391,6 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_handleErrorOrEnterA
checkForTokenInterrupt(token);
{
- stdx::lock_guard<Latch> lg(_mutex);
if (_stateDoc.getState() == TenantMigrationDonorStateEnum::kAborted) {
// The migration was resumed on stepup and it was already aborted.
return ExecutorFuture(**executor);
@@ -1420,6 +1447,21 @@ TenantMigrationDonorService::Instance::_waitForForgetMigrationThenMarkMigrationG
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
const CancellationToken& token) {
+ const bool skipWaitingForForget = [&]() {
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (!isNotDurableAndServerlessConflict(lg, _initialDonorStateDurablePromise)) {
+ return false;
+ }
+ setPromiseErrorIfNotReady(lg,
+ _receiveDonorForgetMigrationPromise,
+ _initialDonorStateDurablePromise.getFuture().getNoThrow());
+ return true;
+ }();
+
+ if (skipWaitingForForget) {
+ return ExecutorFuture(**executor);
+ }
+
LOGV2(6104909,
"Waiting to receive 'donorForgetMigration' command.",
"migrationId"_attr = _migrationUuid,
@@ -1445,6 +1487,16 @@ TenantMigrationDonorService::Instance::_waitForForgetMigrationThenMarkMigrationG
return std::move(_receiveDonorForgetMigrationPromise.getFuture())
.thenRunOn(**executor)
.then([this, self = shared_from_this(), executor, recipientTargeterRS, token] {
+ {
+ // If the abortReason is ConflictingServerlessOperation, it means there are no
+ // document on the recipient. Do not send the forget command.
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (_abortReason &&
+ _abortReason->code() == ErrorCodes::ConflictingServerlessOperation) {
+ return ExecutorFuture(**executor);
+ }
+ }
+
LOGV2(6104910,
"Waiting for recipientForgetMigration response.",
"migrationId"_attr = _migrationUuid,
@@ -1487,6 +1539,12 @@ TenantMigrationDonorService::Instance::_waitForForgetMigrationThenMarkMigrationG
ExecutorFuture<void>
TenantMigrationDonorService::Instance::_waitForGarbageCollectionDelayThenDeleteStateDoc(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancellationToken& token) {
+ // If the state document was not inserted due to a conflicting serverless operation, do not
+ // try to delete it.
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (isNotDurableAndServerlessConflict(lg, _initialDonorStateDurablePromise)) {
+ return ExecutorFuture(**executor);
+ }
LOGV2(8423362,
"Waiting for garbage collection delay before deleting state document",
@@ -1494,7 +1552,6 @@ TenantMigrationDonorService::Instance::_waitForGarbageCollectionDelayThenDeleteS
"tenantId"_attr = _tenantId,
"expireAt"_attr = *_stateDoc.getExpireAt());
- stdx::lock_guard<Latch> lg(_mutex);
return (*executor)
->sleepUntil(*_stateDoc.getExpireAt(), token)
.then([this, self = shared_from_this(), executor, token]() {
diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
index e2a047876bf..19f5f6ab71c 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/repl/tenant_migration_shard_merge_util.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
#include "mongo/db/repl/tenant_migration_util.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/logv2/log.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
@@ -179,6 +180,18 @@ void TenantMigrationRecipientOpObserver::onInserts(
std::vector<InsertStatement>::const_iterator first,
std::vector<InsertStatement>::const_iterator last,
bool fromMigrate) {
+ if (coll->ns() == NamespaceString::kTenantMigrationRecipientsNamespace &&
+ !tenant_migration_access_blocker::inRecoveryMode(opCtx)) {
+ for (auto it = first; it != last; it++) {
+ auto recipientStateDoc = TenantMigrationRecipientDocument::parse(
+ IDLParserContext("recipientStateDoc"), it->doc);
+ if (!recipientStateDoc.getExpireAt()) {
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .acquireLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient,
+ recipientStateDoc.getId());
+ }
+ }
+ }
if (!shard_merge_utils::isDonatedFilesCollection(coll->ns())) {
return;
@@ -204,6 +217,10 @@ void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx,
repl::TenantFileImporterService::get(opCtx->getServiceContext())
->interrupt(recipientStateDoc.getId());
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient,
+ recipientStateDoc.getId());
+
std::vector<std::string> tenantIdsToRemove;
auto cleanUpBlockerIfGarbage =
[&](std::string tenantId, std::shared_ptr<TenantMigrationAccessBlocker>& mtab) {
@@ -312,6 +329,9 @@ repl::OpTime TenantMigrationRecipientOpObserver::onDropCollection(
repl::TenantFileImporterService::get(opCtx->getServiceContext())->interruptAll();
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.removeAll(TenantMigrationAccessBlocker::BlockerType::kRecipient);
+
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .onDropStateCollection(ServerlessOperationLockRegistry::LockType::kTenantRecipient);
});
}
return {};
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 869670c6ca3..ab7e7ef12e1 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -2968,7 +2968,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
// Handle recipientForgetMigration.
stdx::lock_guard lk(_mutex);
if (_stateDoc.getExpireAt() ||
- MONGO_unlikely(autoRecipientForgetMigration.shouldFail())) {
+ MONGO_unlikely(autoRecipientForgetMigration.shouldFail()) ||
+ status.code() == ErrorCodes::ConflictingServerlessOperation) {
// Skip waiting for the recipientForgetMigration command.
setPromiseOkifNotReady(lk, _receivedRecipientForgetMigrationPromise);
}
@@ -3018,7 +3019,16 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
// is safe even on shutDown/stepDown.
stdx::lock_guard lk(_mutex);
invariant(_dataSyncCompletionPromise.getFuture().isReady());
- if (!status.isOK()) {
+
+ if (status.code() == ErrorCodes::ConflictingServerlessOperation) {
+ LOGV2(6531506,
+ "Migration failed as another serverless operation was in progress",
+ "migrationId"_attr = getMigrationUUID(),
+ "tenantId"_attr = getTenantId(),
+ "status"_attr = status);
+ setPromiseOkifNotReady(lk, _forgetMigrationDurablePromise);
+ return status;
+ } else if (!status.isOK()) {
// We should only hit here on a stepDown/shutDown, or a 'conflicting migration'
// error.
LOGV2(4881402,
@@ -3029,6 +3039,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
setPromiseErrorifNotReady(lk, _forgetMigrationDurablePromise, status);
}
_taskState.setState(TaskState::kDone);
+
+ return Status::OK();
})
.semi();
}
diff --git a/src/mongo/db/serverless/SConscript b/src/mongo/db/serverless/SConscript
index 3ccfd8ea7f7..82143897663 100644
--- a/src/mongo/db/serverless/SConscript
+++ b/src/mongo/db/serverless/SConscript
@@ -57,6 +57,21 @@ env.Library(
)
env.Library(
+ target='serverless_lock',
+ source=[
+ 'serverless_operation_lock_registry.cpp',
+ 'serverless_server_status.cpp',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/dbdirectclient',
+ '$BUILD_DIR/mongo/db/repl/tenant_migration_state_machine_idl',
+ '$BUILD_DIR/mongo/db/repl/tenant_migration_utils',
+ '$BUILD_DIR/mongo/db/server_base',
+ 'shard_split_state_machine',
+ ],
+)
+
+env.Library(
target='shard_split_donor_service',
source=[
'shard_split_donor_service.cpp',
@@ -77,6 +92,7 @@ env.Library(
'$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker',
'$BUILD_DIR/mongo/db/server_base',
'$BUILD_DIR/mongo/db/shard_role',
+ 'serverless_lock',
'shard_split_utils',
],
)
@@ -84,6 +100,7 @@ env.Library(
env.CppUnitTest(
target='db_serverless_test',
source=[
+ 'serverless_operation_lock_registry_test.cpp',
'shard_split_donor_op_observer_test.cpp',
'shard_split_donor_service_test.cpp',
'shard_split_utils_test.cpp',
@@ -97,6 +114,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/repl/replmocks',
'$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker',
'$BUILD_DIR/mongo/dbtests/mocklib',
+ 'serverless_lock',
'shard_split_donor_service',
'shard_split_utils',
],
diff --git a/src/mongo/db/serverless/serverless_operation_lock_registry.cpp b/src/mongo/db/serverless/serverless_operation_lock_registry.cpp
new file mode 100644
index 00000000000..20a02c6cd15
--- /dev/null
+++ b/src/mongo/db/serverless/serverless_operation_lock_registry.cpp
@@ -0,0 +1,192 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
+#include "mongo/db/persistent_task_store.h"
+#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
+#include "mongo/db/serverless/shard_split_state_machine_gen.h"
+#include "mongo/logv2/log.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTenantMigration
+
+// Failpoint that will cause recoverLocks to return early.
+MONGO_FAIL_POINT_DEFINE(skipRecoverServerlessOperationLock);
+namespace mongo {
+
+const ServiceContext::Decoration<ServerlessOperationLockRegistry>
+ ServerlessOperationLockRegistry::get =
+ ServiceContext::declareDecoration<ServerlessOperationLockRegistry>();
+
+void ServerlessOperationLockRegistry::acquireLock(
+ ServerlessOperationLockRegistry::LockType lockType, const UUID& operationId) {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ // Verify there is no serverless operation in progress or it is the same type as the one
+ // acquiring the lock.
+ uassert(ErrorCodes::ConflictingServerlessOperation,
+ "Conflicting serverless operation in progress",
+ !_activeLockType || _activeLockType.get() == lockType);
+ invariant(_activeOperations.find(operationId) == _activeOperations.end(),
+ "Cannot acquire the serverless lock twice for the same operationId.");
+ _activeLockType = lockType;
+
+ _activeOperations.emplace(operationId);
+
+ LOGV2(6531500,
+ "Acquired serverless operation lock",
+ "type"_attr = lockType,
+ "id"_attr = operationId);
+}
+
+void ServerlessOperationLockRegistry::releaseLock(
+ ServerlessOperationLockRegistry::LockType lockType, const UUID& operationId) {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ invariant(_activeLockType && *_activeLockType == lockType,
+ "Cannot release a serverless lock that is not owned by the given lock type.");
+
+ invariant(_activeOperations.find(operationId) != _activeOperations.end(),
+ "Cannot release a serverless lock if the given operationId does not own the lock.");
+ _activeOperations.erase(operationId);
+
+ if (_activeOperations.empty()) {
+ _activeLockType.reset();
+ }
+
+ LOGV2(6531501,
+ "Released serverless operation lock",
+ "type"_attr = lockType,
+ "id"_attr = operationId);
+}
+
+void ServerlessOperationLockRegistry::onDropStateCollection(LockType lockType) {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ if (!_activeLockType || *_activeLockType != lockType) {
+ return;
+ }
+
+ LOGV2(6531505,
+ "Released all serverless locks due to state collection drop",
+ "type"_attr = lockType);
+
+ _activeLockType.reset();
+ _activeOperations.clear();
+}
+
+void ServerlessOperationLockRegistry::clear() {
+ stdx::lock_guard<Latch> lg(_mutex);
+ LOGV2(6531504,
+ "Clearing serverless operation lock registry on shutdown",
+ "ns"_attr = _activeLockType);
+
+ _activeOperations.clear();
+ _activeLockType.reset();
+}
+
+void ServerlessOperationLockRegistry::recoverLocks(OperationContext* opCtx) {
+ if (skipRecoverServerlessOperationLock.shouldFail()) {
+ return;
+ }
+
+ auto& registry = ServerlessOperationLockRegistry::get(opCtx->getServiceContext());
+ registry.clear();
+
+ PersistentTaskStore<TenantMigrationDonorDocument> donorStore(
+ NamespaceString::kTenantMigrationDonorsNamespace);
+ donorStore.forEach(opCtx, {}, [&](const TenantMigrationDonorDocument& doc) {
+ // Do not acquire a lock for garbage-collectable documents
+ if (doc.getExpireAt()) {
+ return true;
+ }
+
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, doc.getId());
+
+ return true;
+ });
+
+ PersistentTaskStore<TenantMigrationRecipientDocument> recipientStore(
+ NamespaceString::kTenantMigrationRecipientsNamespace);
+ recipientStore.forEach(opCtx, {}, [&](const TenantMigrationRecipientDocument& doc) {
+ // Do not acquire a lock for garbage-collectable documents
+ if (doc.getExpireAt()) {
+ return true;
+ }
+
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient,
+ doc.getId());
+
+ return true;
+ });
+
+ PersistentTaskStore<ShardSplitDonorDocument> splitStore(
+ NamespaceString::kShardSplitDonorsNamespace);
+ splitStore.forEach(opCtx, {}, [&](const ShardSplitDonorDocument& doc) {
+ // Do not acquire a lock for garbage-collectable documents
+ if (doc.getExpireAt()) {
+ return true;
+ }
+
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, doc.getId());
+
+ return true;
+ });
+}
+
+const std::string kOperationLockFieldName = "operationLock";
+void ServerlessOperationLockRegistry::appendInfoForServerStatus(BSONObjBuilder* builder) const {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ if (!_activeLockType) {
+ builder->append(kOperationLockFieldName, 0);
+ return;
+ }
+
+ switch (_activeLockType.value()) {
+ case ServerlessOperationLockRegistry::LockType::kShardSplit:
+ builder->append(kOperationLockFieldName, 1);
+ break;
+ case ServerlessOperationLockRegistry::LockType::kTenantDonor:
+ builder->append(kOperationLockFieldName, 2);
+ break;
+ case ServerlessOperationLockRegistry::LockType::kTenantRecipient:
+ builder->append(kOperationLockFieldName, 3);
+ break;
+ }
+}
+
+boost::optional<ServerlessOperationLockRegistry::LockType>
+ServerlessOperationLockRegistry::getActiveOperationType_forTest() {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ return _activeLockType;
+}
+
+
+} // namespace mongo
diff --git a/src/mongo/db/serverless/serverless_operation_lock_registry.h b/src/mongo/db/serverless/serverless_operation_lock_registry.h
new file mode 100644
index 00000000000..d9ac07393f4
--- /dev/null
+++ b/src/mongo/db/serverless/serverless_operation_lock_registry.h
@@ -0,0 +1,96 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/service_context.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/util/uuid.h"
+
+#include <set>
+
+namespace mongo {
+
+/**
+ * Registry to allow only one type of active serverless operation at a time. It allows multiple
+ * simultaneous operations of the same type.
+ */
+class ServerlessOperationLockRegistry {
+ ServerlessOperationLockRegistry(const ServerlessOperationLockRegistry&) = delete;
+ ServerlessOperationLockRegistry& operator=(const ServerlessOperationLockRegistry&) = delete;
+
+public:
+ ServerlessOperationLockRegistry() = default;
+
+ static const ServiceContext::Decoration<ServerlessOperationLockRegistry> get;
+
+ enum LockType { kShardSplit, kTenantDonor, kTenantRecipient };
+
+ /**
+ * Acquire the serverless lock for LockType and adds operationId to the set of
+ * instances tracked. Throws ConflictingOperationInProgress error if there is already an
+ * activeServerlessOperation in progress with a different namespace than operationNamespace.
+ */
+ void acquireLock(LockType lockType, const UUID& operationId);
+
+ /**
+ * If _activeOpSeverlessOperation matches LockType, removes the given operationId from
+ * the set of active instances and releases the lock if the set becomes empty. Invariant if
+ * lockType or operationId does not own the lock.
+ */
+ void releaseLock(LockType lockType, const UUID& operationId);
+
+ /**
+ * Called when a state document collection is dropped. If the collection's lockType currently
+ * holds the lock, it releases the lock. If it does not own the lock, the function does nothing.
+ */
+ void onDropStateCollection(LockType lockType);
+
+ void clear();
+
+ /**
+ * Scan serverless state documents and acquire the serverless mutual exclusion lock if needed.
+ */
+ static void recoverLocks(OperationContext* opCtx);
+
+ /**
+ * Appends the exclusion status to the BSONObjBuilder.
+ */
+ void appendInfoForServerStatus(BSONObjBuilder* builder) const;
+
+ boost::optional<ServerlessOperationLockRegistry::LockType> getActiveOperationType_forTest();
+
+private:
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("ServerlessMutualExclusionRegistry::_mutex");
+ boost::optional<LockType> _activeLockType;
+ std::set<UUID> _activeOperations;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/serverless/serverless_operation_lock_registry_test.cpp b/src/mongo/db/serverless/serverless_operation_lock_registry_test.cpp
new file mode 100644
index 00000000000..9d95b3b7bc7
--- /dev/null
+++ b/src/mongo/db/serverless/serverless_operation_lock_registry_test.cpp
@@ -0,0 +1,154 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
+#include "mongo/unittest/death_test.h"
+#include "mongo/unittest/log_test.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+
+TEST(ServerlessOperationLockRegistryTest, InsertRemoveOne) {
+ ServerlessOperationLockRegistry registry;
+
+ auto id = UUID::gen();
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+
+ ASSERT_FALSE(registry.getActiveOperationType_forTest());
+}
+
+DEATH_TEST(ServerlessOperationLockRegistryTest,
+ InsertSameIdTwice,
+ "Cannot acquire the serverless lock twice for the same operationId.") {
+ ServerlessOperationLockRegistry registry;
+
+ auto id = UUID::gen();
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+}
+
+TEST(ServerlessOperationLockRegistryTest, AcquireDifferentNamespaceFail) {
+ ServerlessOperationLockRegistry registry;
+
+ auto id = UUID::gen();
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+
+ ASSERT_THROWS_CODE(
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, UUID::gen()),
+ DBException,
+ ErrorCodes::ConflictingServerlessOperation);
+}
+
+DEATH_TEST(ServerlessOperationLockRegistryTest,
+ ReleaseDifferentNsTriggersInvariant,
+ "Cannot release a serverless lock that is not owned by the given lock type.") {
+ ServerlessOperationLockRegistry registry;
+
+ auto id = UUID::gen();
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ registry.releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, id);
+}
+
+
+DEATH_TEST(ServerlessOperationLockRegistryTest,
+ ReleaseDifferentIdTriggersInvariant,
+ "Cannot release a serverless lock if the given operationId does not own the lock.") {
+ ServerlessOperationLockRegistry registry;
+
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen());
+ registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen());
+}
+
+TEST(ServerlessOperationLockRegistryTest, ClearReleasesAllLocks) {
+ ServerlessOperationLockRegistry registry;
+
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen());
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen());
+
+ registry.clear();
+
+ // Verify the lock has been released.
+ ASSERT_FALSE(registry.getActiveOperationType_forTest());
+}
+
+TEST(ServerlessOperationLockRegistryTest, LockIsReleasedWhenAllInstanceAreRemoved) {
+ ServerlessOperationLockRegistry registry;
+
+ std::vector<UUID> ids;
+ for (int i = 0; i < 5; ++i) {
+ ids.push_back(UUID::gen());
+ }
+
+ for (auto& id : ids) {
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ }
+
+ // Verify the lock is held;
+ ASSERT_EQ(*registry.getActiveOperationType_forTest(),
+ ServerlessOperationLockRegistry::LockType::kShardSplit);
+
+
+ for (auto& id : ids) {
+ registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ }
+
+ // Verify the lock has been released.
+ ASSERT_FALSE(registry.getActiveOperationType_forTest());
+}
+
+TEST(ServerlessOperationLockRegistryTest, LockIsNotReleasedWhenNotAllInstanceAreRemoved) {
+ ServerlessOperationLockRegistry registry;
+
+ std::vector<UUID> ids;
+ for (int i = 0; i < 5; ++i) {
+ ids.push_back(UUID::gen());
+ }
+
+ for (auto& id : ids) {
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ }
+ // Add an additional id;
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen());
+
+ // Verify the lock is held;
+ ASSERT_EQ(*registry.getActiveOperationType_forTest(),
+ ServerlessOperationLockRegistry::LockType::kShardSplit);
+
+ for (auto& id : ids) {
+ registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ }
+
+ // Verify the lock is held;
+ ASSERT_EQ(*registry.getActiveOperationType_forTest(),
+ ServerlessOperationLockRegistry::LockType::kShardSplit);
+}
+
+
+} // namespace mongo
diff --git a/src/mongo/db/serverless/serverless_server_status.cpp b/src/mongo/db/serverless/serverless_server_status.cpp
new file mode 100644
index 00000000000..8d0d4658dc3
--- /dev/null
+++ b/src/mongo/db/serverless/serverless_server_status.cpp
@@ -0,0 +1,57 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/commands/server_status.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
+
+namespace mongo {
+namespace {
+
+class ServerlessServerStatus final : public ServerStatusSection {
+public:
+ ServerlessServerStatus() : ServerStatusSection("serverless") {}
+
+ bool includeByDefault() const override {
+ return false;
+ }
+
+ BSONObj generateSection(OperationContext* opCtx,
+ const BSONElement& configElement) const override {
+ BSONObjBuilder result;
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .appendInfoForServerStatus(&result);
+ return result.obj();
+ }
+} serverlessServerStatus;
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/serverless/shard_split_commands.cpp b/src/mongo/db/serverless/shard_split_commands.cpp
index b0f0e6c6cab..5ce6c9c0307 100644
--- a/src/mongo/db/serverless/shard_split_commands.cpp
+++ b/src/mongo/db/serverless/shard_split_commands.cpp
@@ -114,7 +114,7 @@ public:
};
std::string help() const {
- return "Start an opereation to split a shard into its own slice.";
+ return "Start an operation to split a shard into its own slice.";
}
bool adminOnly() const override {
diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
index 2d67495c431..b2470d07854 100644
--- a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/catalog_raii.h"
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/serverless/shard_split_donor_op_observer.h"
#include "mongo/db/serverless/shard_split_state_machine_gen.h"
#include "mongo/db/serverless/shard_split_utils.h"
@@ -48,6 +49,8 @@ bool isPrimary(const OperationContext* opCtx) {
const auto tenantIdsToDeleteDecoration =
OperationContext::declareDecoration<boost::optional<std::vector<std::string>>>();
+const auto shardSplitIdToDeleteDecoration =
+ OperationContext::declareDecoration<boost::optional<UUID>>();
ShardSplitDonorDocument parseAndValidateDonorDocument(const BSONObj& doc) {
auto donorStateDoc = ShardSplitDonorDocument::parse(IDLParserContext("donorStateDoc"), doc);
@@ -146,6 +149,9 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx,
invariant(donorStateDoc.getTenantIds());
invariant(donorStateDoc.getRecipientConnectionString());
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, donorStateDoc.getId());
+
auto tenantIds = *donorStateDoc.getTenantIds();
for (const auto& tenantId : tenantIds) {
auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>(opCtx->getServiceContext(),
@@ -157,11 +163,13 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx,
if (isPrimary(opCtx)) {
// onRollback is not registered on secondaries since secondaries should not fail to
// apply the write.
- opCtx->recoveryUnit()->onRollback([opCtx, tenantIds] {
+ opCtx->recoveryUnit()->onRollback([opCtx, tenantIds, migrationId = donorStateDoc.getId()] {
for (const auto& tenantId : tenantIds) {
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kDonor);
}
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, migrationId);
});
}
}
@@ -250,6 +258,10 @@ public:
void commit(OperationContext* opCtx, boost::optional<Timestamp>) override {
if (_donorStateDoc.getExpireAt()) {
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit,
+ _donorStateDoc.getId());
+
if (_donorStateDoc.getTenantIds()) {
auto tenantIds = _donorStateDoc.getTenantIds().value();
for (auto&& tenantId : tenantIds) {
@@ -376,12 +388,13 @@ void ShardSplitDonorOpObserver::aboutToDelete(OperationContext* opCtx,
}
auto donorStateDoc = parseAndValidateDonorDocument(doc);
+ const bool shouldRemoveOnRecipient =
+ serverless::shouldRemoveStateDocumentOnRecipient(opCtx, donorStateDoc);
uassert(ErrorCodes::IllegalOperation,
str::stream() << "cannot delete a donor's state document " << doc
<< " since it has not been marked as garbage collectable and is not a"
<< " recipient garbage collectable.",
- donorStateDoc.getExpireAt() ||
- serverless::shouldRemoveStateDocumentOnRecipient(opCtx, donorStateDoc));
+ donorStateDoc.getExpireAt() || shouldRemoveOnRecipient);
// To support back-to-back split retries, when a split is aborted, we remove its
// TenantMigrationDonorAccessBlockers as soon as its donor state doc is marked as garbage
@@ -397,6 +410,10 @@ void ShardSplitDonorOpObserver::aboutToDelete(OperationContext* opCtx,
tenantIdsToDeleteDecoration(opCtx) = boost::make_optional(result);
}
+
+ if (shouldRemoveOnRecipient) {
+ shardSplitIdToDeleteDecoration(opCtx) = boost::make_optional(donorStateDoc.getId());
+ }
}
void ShardSplitDonorOpObserver::onDelete(OperationContext* opCtx,
@@ -419,6 +436,12 @@ void ShardSplitDonorOpObserver::onDelete(OperationContext* opCtx,
for (auto&& tenantId : *tenantIdsToDeleteDecoration(opCtx)) {
registry.remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kDonor);
}
+
+ const auto idToDelete = shardSplitIdToDeleteDecoration(opCtx);
+ if (idToDelete) {
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, *idToDelete);
+ }
});
}
@@ -431,6 +454,9 @@ repl::OpTime ShardSplitDonorOpObserver::onDropCollection(OperationContext* opCtx
opCtx->recoveryUnit()->onCommit([opCtx](boost::optional<Timestamp>) {
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.removeAll(TenantMigrationAccessBlocker::BlockerType::kDonor);
+
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .onDropStateCollection(ServerlessOperationLockRegistry::LockType::kShardSplit);
});
}
diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp
index 97c923524a3..f30ad593951 100644
--- a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/serverless/shard_split_donor_op_observer.h"
#include "mongo/db/serverless/shard_split_state_machine_gen.h"
#include "mongo/db/serverless/shard_split_test_utils.h"
@@ -446,7 +447,13 @@ TEST_F(ShardSplitDonorOpObserverTest, SetExpireAtForAbortedRemoveBlockers) {
ASSERT_FALSE(mtab);
};
+ ServerlessOperationLockRegistry::get(_opCtx->getServiceContext())
+ .acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, _uuid);
+
runUpdateTestCase(stateDocument, _tenantIds, mtabVerifier);
+
+ ASSERT_FALSE(ServerlessOperationLockRegistry::get(_opCtx->getServiceContext())
+ .getActiveOperationType_forTest());
}
TEST_F(ShardSplitDonorOpObserverTest, DeleteAbortedDocumentDoesNotRemoveBlockers) {
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp
index 4c3fdabb39f..fa977ae0c7d 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service.cpp
@@ -1005,7 +1005,18 @@ ExecutorFuture<repl::OpTime> ShardSplitDonorService::DonorStateMachine::_updateS
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
})
- .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); })
+ .until([&](StatusWith<repl::OpTime> swOpTime) {
+ if (swOpTime.getStatus().code() == ErrorCodes::ConflictingServerlessOperation) {
+ LOGV2(6531509,
+ "Shard split completed due to serverless lock error",
+ "id"_attr = _migrationId,
+ "status"_attr = swOpTime.getStatus());
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ uassertStatusOK(swOpTime);
+ }
+ return swOpTime.getStatus().isOK();
+ })
.withBackoffBetweenIterations(kExponentialBackoff)
.on(**executor, token);
}
@@ -1067,7 +1078,8 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState(
}
}
- if (ErrorCodes::isNotPrimaryError(status) || ErrorCodes::isShutdownError(status)) {
+ if (ErrorCodes::isNotPrimaryError(status) || ErrorCodes::isShutdownError(status) ||
+ status.code() == ErrorCodes::ConflictingServerlessOperation) {
// Don't abort the split on retriable errors that may have been generated by the local
// server shutting/stepping down because it can be resumed when the client retries.
return ExecutorFuture(**executor, StatusWith<DurableState>{status});
diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
index 463031b66bc..f46c908059b 100644
--- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
#include "mongo/db/repl/tenant_migration_donor_access_blocker.h"
#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/serverless/shard_split_donor_op_observer.h"
#include "mongo/db/serverless/shard_split_donor_service.h"
#include "mongo/db/serverless/shard_split_state_machine_gen.h"
@@ -511,6 +512,11 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation)
waitForReplSetStepUp(Status(ErrorCodes::OK, ""));
waitForRecipientPrimaryMajorityWrite();
+ // Verify the serverless lock has been acquired for split.
+ auto& registry = ServerlessOperationLockRegistry::get(opCtx->getServiceContext());
+ ASSERT_EQ(*registry.getActiveOperationType_forTest(),
+ ServerlessOperationLockRegistry::LockType::kShardSplit);
+
auto result = serviceInstance->decisionFuture().get();
ASSERT_TRUE(hasActiveSplitForTenants(opCtx.get(), _tenantIds));
ASSERT(!result.abortReason);
@@ -520,10 +526,32 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation)
auto completionFuture = serviceInstance->completionFuture();
completionFuture.wait();
+ // The lock has been released.
+ ASSERT_FALSE(registry.getActiveOperationType_forTest());
+
ASSERT_OK(serviceInstance->completionFuture().getNoThrow());
ASSERT_TRUE(serviceInstance->isGarbageCollectable());
}
+TEST_F(ShardSplitDonorServiceTest, ShardSplitFailsWhenLockIsHeld) {
+ auto opCtx = makeOperationContext();
+ test::shard_split::reconfigToAddRecipientNodes(
+ getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts());
+
+ auto& registry = ServerlessOperationLockRegistry::get(opCtx->getServiceContext());
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient, UUID::gen());
+
+ // Create and start the instance.
+ auto serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate(
+ opCtx.get(), _service, defaultStateDocument().toBSON());
+ ASSERT(serviceInstance.get());
+
+ auto decisionFuture = serviceInstance->decisionFuture();
+
+ auto result = decisionFuture.getNoThrow();
+ ASSERT_EQ(result.getStatus().code(), ErrorCodes::ConflictingServerlessOperation);
+}
+
TEST_F(ShardSplitDonorServiceTest, ReplSetStepUpRetryable) {
auto opCtx = makeOperationContext();
test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get());
@@ -1015,6 +1043,10 @@ public:
stateDocument.setState(ShardSplitDonorStateEnum::kBlocking);
stateDocument.setRecipientConnectionString(ConnectionString::forLocal());
+ ServerlessOperationLockRegistry::get(getServiceContext())
+ .acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit,
+ stateDocument.getId());
+
return stateDocument;
}
};