summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Broadstone <mbroadst@mongodb.com>2022-09-14 23:05:50 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-15 00:19:47 +0000
commit1a13031f7cdfb6cffdcff212edef0790fe084df2 (patch)
tree8d6ba207b94133d0775d398c3384380dfcc8343a
parentd1da149077ea312efc80f8836bc79f2b1b10c1ad (diff)
downloadmongo-1a13031f7cdfb6cffdcff212edef0790fe084df2.tar.gz
SERVER-65315 Enforce mutual exclusion between serverless operations
-rw-r--r--buildscripts/resmokeconfig/suites/serverless.yml3
-rw-r--r--jstests/replsets/libs/tenant_migration_test.js4
-rw-r--r--jstests/replsets/libs/tenant_migration_util.js15
-rw-r--r--jstests/replsets/tenant_migration_donor_initial_sync_recovery.js13
-rw-r--r--jstests/replsets/tenant_migration_donor_startup_recovery.js12
-rw-r--r--jstests/replsets/tenant_migration_recipient_initial_sync_recovery.js12
-rw-r--r--jstests/serverless/libs/basic_serverless_test.js72
-rw-r--r--jstests/serverless/serverless_reject_multiple_ops.js474
-rw-r--r--jstests/serverless/shard_split_recipient_removes_access_blockers.js2
-rw-r--r--jstests/serverless/shard_split_recipient_removes_serverless_lock.js48
-rw-r--r--jstests/serverless/shard_split_startup_recovery_initially_aborted.js10
-rw-r--r--src/mongo/base/error_codes.yml2
-rw-r--r--src/mongo/db/repl/SConscript5
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp2
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp13
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp2
-rw-r--r--src/mongo/db/repl/rollback_impl.cpp2
-rw-r--r--src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp4
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_op_observer.cpp18
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp7
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp20
-rw-r--r--src/mongo/db/serverless/SConscript18
-rw-r--r--src/mongo/db/serverless/serverless_operation_lock_registry.cpp192
-rw-r--r--src/mongo/db/serverless/serverless_operation_lock_registry.h96
-rw-r--r--src/mongo/db/serverless/serverless_operation_lock_registry_test.cpp154
-rw-r--r--src/mongo/db/serverless/serverless_server_status.cpp57
-rw-r--r--src/mongo/db/serverless/shard_split_donor_op_observer.cpp32
-rw-r--r--src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp7
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.cpp17
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service_test.cpp34
31 files changed, 1301 insertions, 48 deletions
diff --git a/buildscripts/resmokeconfig/suites/serverless.yml b/buildscripts/resmokeconfig/suites/serverless.yml
index eeef77194a2..30c0d5758df 100644
--- a/buildscripts/resmokeconfig/suites/serverless.yml
+++ b/buildscripts/resmokeconfig/suites/serverless.yml
@@ -4,6 +4,9 @@ selector:
roots:
- jstests/serverless/*.js
- jstests/serverless/change_streams/**/*.js
+ exclude_files:
+ # TODO(SERVER-69676): Remove this excluded file once the test runtime is reasonable.
+ - jstests/serverless/serverless_reject_multiple_ops.js
executor:
config:
diff --git a/jstests/replsets/libs/tenant_migration_test.js b/jstests/replsets/libs/tenant_migration_test.js
index 04619679e33..bd79c51b6bb 100644
--- a/jstests/replsets/libs/tenant_migration_test.js
+++ b/jstests/replsets/libs/tenant_migration_test.js
@@ -136,7 +136,7 @@ function TenantMigrationTest({
nodeOptions["setParameter"] = setParameterOpts;
const rstName = `${name}_${(isDonor ? "donor" : "recipient")}`;
- const rst = new ReplSetTest({name: rstName, nodes, nodeOptions});
+ const rst = new ReplSetTest({name: rstName, nodes, serverless: true, nodeOptions});
rst.startSet();
if (initiateRstWithHighElectionTimeout) {
rst.initiateWithHighElectionTimeout();
@@ -231,6 +231,7 @@ function TenantMigrationTest({
this.runDonorStartMigration = function({
migrationIdString,
tenantId,
+ protocol,
recipientConnectionString = recipientRst.getURL(),
readPreference = {mode: "primary"},
donorCertificateForRecipient = migrationCertificates.donorCertificateForRecipient,
@@ -251,6 +252,7 @@ function TenantMigrationTest({
readPreference,
donorCertificateForRecipient,
recipientCertificateForDonor,
+ protocol
};
const stateRes = TenantMigrationUtil.runTenantMigrationCommand(cmdObj, this.getDonorRst(), {
diff --git a/jstests/replsets/libs/tenant_migration_util.js b/jstests/replsets/libs/tenant_migration_util.js
index d0e9354f1c1..bfad63c2cd0 100644
--- a/jstests/replsets/libs/tenant_migration_util.js
+++ b/jstests/replsets/libs/tenant_migration_util.js
@@ -262,6 +262,19 @@ var TenantMigrationUtil = (function() {
return res;
}
+ const ServerlessLockType = {
+ ShardSplitDonor: 'shard split donor',
+ TenantMigrationDonor: 'tenant migration donor',
+ TenantMigrationRecipient: 'tenant migration recipient'
+ };
+
+ /**
+ * Return the active serverless operation lock, if one is acquired.
+ */
+ function getServerlessOperationLock(node) {
+ return assert.commandWorked(node.adminCommand({serverStatus: 1})).serverless.operationLock;
+ }
+
/**
* Returns the TenantMigrationAccessBlocker serverStatus output for the multi-tenant migration
* or shard merge for the given node.
@@ -561,6 +574,8 @@ var TenantMigrationUtil = (function() {
makeMigrationCertificatesForTest,
makeX509OptionsForTest,
isMigrationCompleted,
+ ServerlessLockType,
+ getServerlessOperationLock,
getTenantMigrationAccessBlocker,
getTenantMigrationAccessBlockers,
getNumBlockedReads,
diff --git a/jstests/replsets/tenant_migration_donor_initial_sync_recovery.js b/jstests/replsets/tenant_migration_donor_initial_sync_recovery.js
index 85429af3ddd..8209ee570e2 100644
--- a/jstests/replsets/tenant_migration_donor_initial_sync_recovery.js
+++ b/jstests/replsets/tenant_migration_donor_initial_sync_recovery.js
@@ -5,6 +5,7 @@
* @tags: [
* incompatible_with_macos,
* incompatible_with_windows_tls,
+ * requires_fcv_62,
* requires_majority_read_concern,
* requires_persistence,
* serverless,
@@ -19,6 +20,7 @@ load("jstests/libs/uuid_util.js");
load("jstests/libs/parallelTester.js");
load("jstests/libs/write_concern_util.js");
load("jstests/replsets/libs/tenant_migration_test.js");
+const {ServerlessLockType, getServerlessOperationLock} = TenantMigrationUtil;
const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()});
@@ -47,6 +49,7 @@ const migrationOpts = {
migrationIdString: extractUUIDFromObject(UUID()),
tenantId: kTenantId
};
+
assert.commandWorked(tenantMigrationTest.startMigration(migrationOpts));
// We must wait for the migration to have finished replicating the recipient keys on the donor set
// before starting initial sync, otherwise the migration will hang while waiting for initial sync to
@@ -83,7 +86,8 @@ donorRst.awaitReplication();
stopServerReplication(initialSyncNode);
let configDonorsColl = initialSyncNode.getCollection(TenantMigrationTest.kConfigDonorsNS);
-let donorDoc = configDonorsColl.findOne({tenantId: kTenantId});
+assert.lte(configDonorsColl.count(), 1);
+let donorDoc = configDonorsColl.findOne();
if (donorDoc) {
jsTestLog("Initial sync completed while migration was in state: " + donorDoc.state);
switch (donorDoc.state) {
@@ -148,6 +152,13 @@ if (donorDoc) {
}
}
+const activeServerlessLock = getServerlessOperationLock(initialSyncNode);
+if (donorDoc && !donorDoc.expireAt) {
+ assert.eq(activeServerlessLock, ServerlessLockType.TenantMigrationDonor);
+} else {
+ assert.eq(activeServerlessLock, null);
+}
+
if (fp) {
fp.off();
}
diff --git a/jstests/replsets/tenant_migration_donor_startup_recovery.js b/jstests/replsets/tenant_migration_donor_startup_recovery.js
index 7a564e416a5..f10c1c90d45 100644
--- a/jstests/replsets/tenant_migration_donor_startup_recovery.js
+++ b/jstests/replsets/tenant_migration_donor_startup_recovery.js
@@ -8,6 +8,7 @@
* incompatible_with_macos,
* incompatible_with_shard_merge,
* incompatible_with_windows_tls,
+ * requires_fcv_62,
* requires_majority_read_concern,
* requires_persistence,
* serverless,
@@ -20,6 +21,7 @@
load("jstests/libs/fail_point_util.js");
load("jstests/libs/uuid_util.js");
load("jstests/replsets/libs/tenant_migration_test.js");
+const {ServerlessLockType, getServerlessOperationLock} = TenantMigrationUtil;
const donorRst = new ReplSetTest({
nodes: 1,
@@ -70,7 +72,8 @@ donorRst.startSet({
donorPrimary = donorRst.getPrimary();
const configDonorsColl = donorPrimary.getCollection(TenantMigrationTest.kConfigDonorsNS);
-const donorDoc = configDonorsColl.findOne({tenantId: kTenantId});
+assert.lte(configDonorsColl.count(), 1);
+const donorDoc = configDonorsColl.findOne();
if (donorDoc) {
switch (donorDoc.state) {
case TenantMigrationTest.DonorState.kAbortingIndexBuilds:
@@ -133,6 +136,13 @@ if (donorDoc) {
}
}
+const activeServerlessLock = getServerlessOperationLock(donorPrimary);
+if (donorDoc && !donorDoc.expireAt) {
+ assert.eq(activeServerlessLock, ServerlessLockType.TenantMigrationDonor);
+} else {
+ assert.eq(activeServerlessLock, null);
+}
+
tenantMigrationTest.stop();
donorRst.stopSet();
})();
diff --git a/jstests/replsets/tenant_migration_recipient_initial_sync_recovery.js b/jstests/replsets/tenant_migration_recipient_initial_sync_recovery.js
index 404bf0fa765..a0fc1bfacb3 100644
--- a/jstests/replsets/tenant_migration_recipient_initial_sync_recovery.js
+++ b/jstests/replsets/tenant_migration_recipient_initial_sync_recovery.js
@@ -6,6 +6,7 @@
* incompatible_with_macos,
* incompatible_with_shard_merge,
* incompatible_with_windows_tls,
+ * requires_fcv_62,
* requires_majority_read_concern,
* requires_persistence,
* serverless,
@@ -19,6 +20,7 @@ load("jstests/libs/fail_point_util.js");
load("jstests/libs/uuid_util.js");
load("jstests/libs/write_concern_util.js");
load("jstests/replsets/libs/tenant_migration_test.js");
+const {ServerlessLockType, getServerlessOperationLock} = TenantMigrationUtil;
const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()});
@@ -59,7 +61,8 @@ recipientRst.awaitReplication();
stopServerReplication(initialSyncNode);
const configRecipientsColl = initialSyncNode.getCollection(TenantMigrationTest.kConfigRecipientsNS);
-const recipientDoc = configRecipientsColl.findOne({tenantId: kTenantId});
+assert.lte(configRecipientsColl.count(), 1);
+const recipientDoc = configRecipientsColl.findOne();
if (recipientDoc) {
switch (recipientDoc.state) {
case TenantMigrationTest.RecipientState.kStarted:
@@ -97,6 +100,13 @@ if (recipientDoc) {
}
}
+const activeServerlessLock = getServerlessOperationLock(initialSyncNode);
+if (recipientDoc && !recipientDoc.expireAt) {
+ assert.eq(activeServerlessLock, ServerlessLockType.TenantMigrationRecipient);
+} else {
+ assert.eq(activeServerlessLock, null);
+}
+
restartServerReplication(initialSyncNode);
tenantMigrationTest.stop();
diff --git a/jstests/serverless/libs/basic_serverless_test.js b/jstests/serverless/libs/basic_serverless_test.js
index 69aeab56616..727827429db 100644
--- a/jstests/serverless/libs/basic_serverless_test.js
+++ b/jstests/serverless/libs/basic_serverless_test.js
@@ -41,6 +41,45 @@ const runCommitSplitThreadWrapper = function(rstArgs,
donorRst, commitShardSplitCmdObj, retryOnRetryableErrors, enableDonorStartMigrationFsync);
};
+/*
+ * Wait for state document garbage collection by polling for when the document has been removed
+ * from the 'shardSplitDonors' namespace, and all access blockers have been removed.
+ * @param {migrationId} id that was used for the commitShardSplit command.
+ * @param {tenantIds} tenant ids of the shard split.
+ */
+const waitForGarbageCollectionForSplit = function(donorNodes, migrationId, tenantIds) {
+ jsTestLog("Wait for garbage collection");
+ assert.soon(() => donorNodes.every(node => {
+ const donorDocumentDeleted =
+ node.getCollection(BasicServerlessTest.kConfigSplitDonorsNS).count({
+ _id: migrationId
+ }) === 0;
+ const allAccessBlockersRemoved = tenantIds.every(
+ id => BasicServerlessTest.getTenantMigrationAccessBlocker({node, id}) == null);
+
+ const result = donorDocumentDeleted && allAccessBlockersRemoved;
+ if (!result) {
+ const status = [];
+ if (!donorDocumentDeleted) {
+ status.push(`donor document to be deleted (docCount=${
+ node.getCollection(BasicServerlessTest.kConfigSplitDonorsNS).count({
+ _id: migrationId
+ })})`);
+ }
+
+ if (!allAccessBlockersRemoved) {
+ const tenantsWithBlockers = tenantIds.filter(
+ id => BasicServerlessTest.getTenantMigrationAccessBlocker({node, id}) != null);
+ status.push(`access blockers to be removed (${tenantsWithBlockers})`);
+ }
+ }
+ return donorDocumentDeleted && allAccessBlockersRemoved;
+ }),
+ "tenant access blockers weren't removed",
+ 60 * 1000,
+ 1 * 1000);
+};
+
const runShardSplitCommand = function(
replicaSet, cmdObj, retryOnRetryableErrors, enableDonorStartMigrationFsync) {
let res;
@@ -355,38 +394,7 @@ class BasicServerlessTest {
* @param {tenantIds} tenant ids of the shard split.
*/
waitForGarbageCollection(migrationId, tenantIds) {
- jsTestLog("Wait for garbage collection");
- const donorNodes = this.donor.nodes;
- assert.soon(() => donorNodes.every(node => {
- const donorDocumentDeleted =
- node.getCollection(BasicServerlessTest.kConfigSplitDonorsNS).count({
- _id: migrationId
- }) === 0;
- const allAccessBlockersRemoved = tenantIds.every(
- id => BasicServerlessTest.getTenantMigrationAccessBlocker({node, id}) == null);
-
- const result = donorDocumentDeleted && allAccessBlockersRemoved;
- if (!result) {
- const status = [];
- if (!donorDocumentDeleted) {
- status.push(`donor document to be deleted (docCount=${
- node.getCollection(BasicServerlessTest.kConfigSplitDonorsNS).count({
- _id: migrationId
- })})`);
- }
-
- if (!allAccessBlockersRemoved) {
- const tenantsWithBlockers =
- tenantIds.filter(id => BasicServerlessTest.getTenantMigrationAccessBlocker(
- {node, id}) != null);
- status.push(`access blockers to be removed (${tenantsWithBlockers})`);
- }
- }
- return donorDocumentDeleted && allAccessBlockersRemoved;
- }),
- "tenant access blockers weren't removed",
- 60 * 1000,
- 1 * 1000);
+ return waitForGarbageCollectionForSplit(this.donor.nodes, migrationId, tenantIds);
}
/**
diff --git a/jstests/serverless/serverless_reject_multiple_ops.js b/jstests/serverless/serverless_reject_multiple_ops.js
new file mode 100644
index 00000000000..4f24f8ba40f
--- /dev/null
+++ b/jstests/serverless/serverless_reject_multiple_ops.js
@@ -0,0 +1,474 @@
+/**
+ * Tests that shard split operations cannot be started simultaneously with a tenant migration or
+ * shard merge.
+ *
+ * @tags: [
+ * serverless,
+ * requires_fcv_52,
+ * featureFlagShardSplit,
+ * featureFlagShardMerge
+ * ]
+ */
+
+load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+load("jstests/replsets/rslib.js");
+load("jstests/serverless/libs/basic_serverless_test.js");
+load("jstests/libs/parallelTester.js");
+load("jstests/libs/uuid_util.js");
+
+function waitForMergeToComplete(migrationOpts, migrationId, test) {
+ // Assert that the migration has already been started.
+ assert(test.getDonorPrimary().getCollection(TenantMigrationTest.kConfigDonorsNS).findOne({
+ _id: migrationId
+ }));
+
+ const donorStartReply = test.runDonorStartMigration(
+ migrationOpts, {waitForMigrationToComplete: true, retryOnRetryableErrors: false});
+
+ return donorStartReply;
+}
+
+function commitSplitAsync(rst, tenantIds, recipientTagName, recipientSetName, migrationId) {
+ jsTestLog("Running commitAsync command");
+
+ const rstArgs = createRstArgs(rst);
+ const migrationIdString = extractUUIDFromObject(migrationId);
+
+ const thread = new Thread(runCommitSplitThreadWrapper,
+ rstArgs,
+ migrationIdString,
+ tenantIds,
+ recipientTagName,
+ recipientSetName,
+ false /* enableDonorStartMigrationFsync */);
+ thread.start();
+
+ return thread;
+}
+
+function addRecipientNodes(rst, recipientTagName) {
+ const numNodes = 3; // default to three nodes
+ let recipientNodes = [];
+ const options = TenantMigrationUtil.makeX509OptionsForTest();
+ jsTestLog(`Adding ${numNodes} non-voting recipient nodes to donor`);
+ for (let i = 0; i < numNodes; ++i) {
+ recipientNodes.push(rst.add(options.donor));
+ }
+
+ const primary = rst.getPrimary();
+ const admin = primary.getDB('admin');
+ const config = rst.getReplSetConfigFromNode();
+ config.version++;
+
+ // ensure recipient nodes are added as non-voting members
+ recipientNodes.forEach(node => {
+ config.members.push({
+ host: node.host,
+ votes: 0,
+ priority: 0,
+ hidden: true,
+ tags: {[recipientTagName]: ObjectId().valueOf()}
+ });
+ });
+
+ // reindex all members from 0
+ config.members = config.members.map((member, idx) => {
+ member._id = idx;
+ return member;
+ });
+
+ assert.commandWorked(admin.runCommand({replSetReconfig: config}));
+ recipientNodes.forEach(node => rst.waitForState(node, ReplSetTest.State.SECONDARY));
+
+ return recipientNodes;
+}
+
+function cannotStartShardSplitWithMigrationInProgress({protocol, runOnRecipient}) {
+ // Test that we cannot start a shard split while a migration is in progress.
+ const recipientTagName = "recipientTag";
+ const recipientSetName = "recipient";
+ const tenantIds = ["tenant1", "tenant2"];
+ const splitMigrationId = UUID();
+ const tenantMigrationId = UUID();
+
+ sharedOptions = {};
+ sharedOptions["setParameter"] = {shardSplitGarbageCollectionDelayMS: 0, ttlMonitorSleepSecs: 1};
+
+ const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions});
+
+ const shardSplitRst = runOnRecipient ? test.getRecipientRst() : test.getDonorRst();
+
+ let recipientNodes = addRecipientNodes(shardSplitRst, recipientTagName);
+
+ let fp = configureFailPoint(test.getDonorRst().getPrimary(),
+ "pauseTenantMigrationBeforeLeavingDataSyncState");
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(tenantMigrationId),
+ protocol,
+ };
+ if (protocol != "shard merge") {
+ migrationOpts["tenantId"] = tenantIds[0];
+ }
+ jsTestLog("Starting tenant migration");
+ assert.commandWorked(test.startMigration(migrationOpts));
+
+ fp.wait();
+
+ const commitThread = commitSplitAsync(
+ shardSplitRst, tenantIds, recipientTagName, recipientSetName, splitMigrationId);
+ assert.commandFailed(commitThread.returnData());
+
+ fp.off();
+
+ TenantMigrationTest.assertCommitted(
+ waitForMergeToComplete(migrationOpts, tenantMigrationId, test));
+ assert.commandWorked(test.forgetMigration(migrationOpts.migrationIdString));
+
+ recipientNodes.forEach(node => {
+ MongoRunner.stopMongod(node);
+ });
+
+ test.stop();
+ jsTestLog("cannotStartShardSplitWithMigrationInProgress test completed");
+}
+
+function canStartShardSplitWithAbortedMigration({protocol, runOnRecipient}) {
+ const recipientTagName = "recipientTag";
+ const recipientSetName = "recipient";
+ const tenantIds = ["tenant1", "tenant2"];
+ const splitMigrationId = UUID();
+ const tenantMigrationId = UUID();
+
+ sharedOptions = {};
+ sharedOptions["setParameter"] = {shardSplitGarbageCollectionDelayMS: 0, ttlMonitorSleepSecs: 1};
+
+ const test = new TenantMigrationTest({quickGarbageCollection: false, sharedOptions});
+
+ const shardSplitRst = runOnRecipient ? test.getRecipientRst() : test.getDonorRst();
+
+ let recipientNodes = addRecipientNodes(shardSplitRst, recipientTagName);
+
+ let fp = configureFailPoint(test.getDonorRst().getPrimary(),
+ "abortTenantMigrationBeforeLeavingBlockingState");
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(tenantMigrationId),
+ protocol,
+ };
+ if (protocol != "shard merge") {
+ migrationOpts["tenantId"] = tenantIds[0];
+ }
+ jsTestLog("Starting tenant migration");
+ assert.commandWorked(test.startMigration(migrationOpts));
+
+ TenantMigrationTest.assertAborted(
+ waitForMergeToComplete(migrationOpts, tenantMigrationId, test));
+ assert.commandWorked(test.forgetMigration(migrationOpts.migrationIdString));
+
+ const commitThread = commitSplitAsync(
+ shardSplitRst, tenantIds, recipientTagName, recipientSetName, splitMigrationId);
+ assert.commandWorked(commitThread.returnData());
+
+ recipientNodes.forEach(node => {
+ MongoRunner.stopMongod(node);
+ });
+
+ test.stop();
+ jsTestLog("cannotStartShardSplitWithMigrationInProgress test completed");
+}
+
+function cannotStartMigrationWhileShardSplitIsInProgress(protocol) {
+ // Test that we cannot start a migration while a shard split is in progress.
+ const recipientTagName = "recipientTag";
+ const recipientSetName = "recipient";
+ const tenantIds = ["tenant1", "tenant2"];
+ const splitMigrationId = UUID();
+ const tenantMigrationId = UUID();
+
+ sharedOptions = {};
+ sharedOptions["setParameter"] = {shardSplitGarbageCollectionDelayMS: 0, ttlMonitorSleepSecs: 1};
+
+ const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions});
+
+ const splitRst = test.getDonorRst();
+
+ let splitRecipientNodes = addRecipientNodes(splitRst, recipientTagName);
+
+ let fp = configureFailPoint(splitRst.getPrimary(), "pauseShardSplitBeforeBlockingState");
+
+ const commitThread =
+ commitSplitAsync(splitRst, tenantIds, recipientTagName, recipientSetName, splitMigrationId);
+ fp.wait();
+
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(tenantMigrationId),
+ protocol,
+ };
+ if (protocol != "shard merge") {
+ migrationOpts["tenantId"] = tenantIds[0];
+ }
+ jsTestLog("Starting tenant migration");
+ assert.commandFailedWithCode(test.startMigration(migrationOpts),
+ ErrorCodes.ConflictingServerlessOperation);
+
+ fp.off();
+
+ assert.commandWorked(commitThread.returnData());
+
+ splitRst.nodes = splitRst.nodes.filter(node => !splitRecipientNodes.includes(node));
+ splitRst.ports =
+ splitRst.ports.filter(port => !splitRecipientNodes.some(node => node.port === port));
+
+ assert.commandWorked(
+ splitRst.getPrimary().adminCommand({forgetShardSplit: 1, migrationId: splitMigrationId}));
+
+ splitRecipientNodes.forEach(node => {
+ MongoRunner.stopMongod(node);
+ });
+
+ waitForGarbageCollectionForSplit(splitRst.nodes, splitMigrationId, tenantIds);
+
+ test.stop();
+ jsTestLog("cannotStartMigrationWhileShardSplitIsInProgress test completed");
+}
+
+function cannotStartMigrationWhileShardSplitIsInProgressOnRecipient(protocol) {
+ // Test that we cannot start a migration while a shard split is in progress.
+ const recipientTagName = "recipientTag";
+ const recipientSetName = "recipient";
+ const tenantIds = ["tenant1", "tenant2"];
+ const splitMigrationId = UUID();
+ const tenantMigrationId = UUID();
+
+ sharedOptions = {};
+ sharedOptions["setParameter"] = {shardSplitGarbageCollectionDelayMS: 0, ttlMonitorSleepSecs: 1};
+
+ const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions});
+
+ const splitRst = test.getRecipientRst();
+
+ let splitRecipientNodes = addRecipientNodes(splitRst, recipientTagName);
+
+ let fp = configureFailPoint(splitRst.getPrimary(), "pauseShardSplitBeforeBlockingState");
+
+ const commitThread =
+ commitSplitAsync(splitRst, tenantIds, recipientTagName, recipientSetName, splitMigrationId);
+ fp.wait();
+
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(tenantMigrationId),
+ protocol,
+ };
+ if (protocol != "shard merge") {
+ migrationOpts["tenantId"] = tenantIds[0];
+ }
+ jsTestLog("Starting tenant migration");
+ assert.commandWorked(test.startMigration(migrationOpts));
+
+ const result = assert.commandWorked(test.waitForMigrationToComplete(migrationOpts));
+ assert.eq(result.state, "aborted");
+ assert.eq(result.abortReason.code, ErrorCodes.ConflictingServerlessOperation);
+
+ fp.off();
+
+ assert.commandWorked(commitThread.returnData());
+
+ splitRst.nodes = splitRst.nodes.filter(node => !splitRecipientNodes.includes(node));
+ splitRst.ports =
+ splitRst.ports.filter(port => !splitRecipientNodes.some(node => node.port === port));
+
+ assert.commandWorked(
+ splitRst.getPrimary().adminCommand({forgetShardSplit: 1, migrationId: splitMigrationId}));
+
+ splitRecipientNodes.forEach(node => {
+ MongoRunner.stopMongod(node);
+ });
+
+ waitForGarbageCollectionForSplit(splitRst.nodes, splitMigrationId, tenantIds);
+
+ test.stop();
+ jsTestLog("cannotStartMigrationWhileShardSplitIsInProgress test completed");
+}
+
+function cannotStartMigrationWithDifferentTenantWhileShardSplitIsInProgress(protocol) {
+ // Test that we cannot start a tenant migration while a shard split is in progress. Use a
+ // tenantId uninvolved in the split.
+ const recipientTagName = "recipientTag";
+ const recipientSetName = "recipient";
+ const tenantIds = ["tenant1", "tenant2"];
+ const splitMigrationId = UUID();
+ const tenantMigrationId = UUID();
+
+ sharedOptions = {};
+ sharedOptions["setParameter"] = {shardSplitGarbageCollectionDelayMS: 0, ttlMonitorSleepSecs: 1};
+
+ const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions});
+
+ let recipientNodes = addRecipientNodes(test.getDonorRst(), recipientTagName);
+
+ let fp =
+ configureFailPoint(test.getDonorRst().getPrimary(), "pauseShardSplitBeforeBlockingState");
+
+ const commitThread = commitSplitAsync(
+ test.getDonorRst(), tenantIds, recipientTagName, recipientSetName, splitMigrationId);
+ fp.wait();
+
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(tenantMigrationId),
+ protocol,
+ };
+ if (protocol != "shard merge") {
+ migrationOpts["tenantId"] = "otherTenantToMove";
+ }
+ jsTestLog("Starting tenant migration");
+ assert.commandFailedWithCode(test.startMigration(migrationOpts),
+ ErrorCodes.ConflictingServerlessOperation);
+
+ fp.off();
+
+ assert.commandWorked(commitThread.returnData());
+
+ test.getDonorRst().nodes =
+ test.getDonorRst().nodes.filter(node => !recipientNodes.includes(node));
+ test.getDonorRst().ports =
+ test.getDonorRst().ports.filter(port => !recipientNodes.some(node => node.port === port));
+
+ assert.commandWorked(test.getDonorRst().getPrimary().adminCommand(
+ {forgetShardSplit: 1, migrationId: splitMigrationId}));
+
+ recipientNodes.forEach(node => {
+ MongoRunner.stopMongod(node);
+ });
+
+ waitForGarbageCollectionForSplit(test.getDonorRst().nodes, splitMigrationId, tenantIds);
+
+ test.stop();
+ jsTestLog("cannotStartMigrationWithDifferentTenantWhileShardSplitIsInProgress test completed");
+}
+
+function cannotStartMigrationWhenThereIsAnExistingAccessBlocker(protocol) {
+ // Test that we cannot start a tenant migration for a tenant that already has an access blocker.
+ const recipientTagName = "recipientTag";
+ const recipientSetName = "recipient";
+ const tenantIds = ["tenant1", "tenant2"];
+ const splitMigrationId = UUID();
+ const tenantMigrationId = UUID();
+
+ // Ensure a high enough delay so the shard split document is not deleted before tenant migration
+ // is started.
+ sharedOptions = {};
+ sharedOptions["setParameter"] = {
+ shardSplitGarbageCollectionDelayMS: 36000000,
+ ttlMonitorSleepSecs: 1
+ };
+
+ const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions});
+
+ let recipientNodes = addRecipientNodes(test.getDonorRst(), recipientTagName);
+
+ const commitThread = commitSplitAsync(
+ test.getDonorRst(), tenantIds, recipientTagName, recipientSetName, splitMigrationId);
+ assert.commandWorked(commitThread.returnData());
+
+ // Remove recipient nodes
+ test.getDonorRst().nodes =
+ test.getDonorRst().nodes.filter(node => !recipientNodes.includes(node));
+ test.getDonorRst().ports =
+ test.getDonorRst().ports.filter(port => !recipientNodes.some(node => node.port === port));
+
+ assert.commandWorked(test.getDonorRst().getPrimary().adminCommand(
+ {forgetShardSplit: 1, migrationId: splitMigrationId}));
+
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(tenantMigrationId),
+ protocol,
+ };
+ if (protocol != "shard merge") {
+ migrationOpts["tenantId"] = tenantIds[0];
+ }
+ assert.commandFailed(test.startMigration(migrationOpts));
+
+ recipientNodes.forEach(node => {
+ MongoRunner.stopMongod(node);
+ });
+
+ test.stop();
+ jsTestLog("cannotStartMigrationWhenThereIsAnExistingAccessBlocker test completed");
+}
+
+function canStartMigrationAfterSplitGarbageCollection(protocol) {
+ // Test that we can start a migration after a shard split has been garbage collected.
+ const recipientTagName = "recipientTag";
+ const recipientSetName = "recipient";
+ const tenantIds = ["tenant1", "tenant2"];
+ const splitMigrationId = UUID();
+ const tenantMigrationId = UUID();
+
+ sharedOptions = {};
+ sharedOptions["setParameter"] = {shardSplitGarbageCollectionDelayMS: 0, ttlMonitorSleepSecs: 1};
+
+ const test = new TenantMigrationTest({quickGarbageCollection: true, sharedOptions});
+
+ let recipientNodes = addRecipientNodes(test.getDonorRst(), recipientTagName);
+
+ const commitThread = commitSplitAsync(
+ test.getDonorRst(), tenantIds, recipientTagName, recipientSetName, splitMigrationId);
+ assert.commandWorked(commitThread.returnData());
+
+ // Remove recipient nodes
+ test.getDonorRst().nodes =
+ test.getDonorRst().nodes.filter(node => !recipientNodes.includes(node));
+ test.getDonorRst().ports =
+ test.getDonorRst().ports.filter(port => !recipientNodes.some(node => node.port === port));
+
+ assert.commandWorked(test.getDonorRst().getPrimary().adminCommand(
+ {forgetShardSplit: 1, migrationId: splitMigrationId}));
+
+ waitForGarbageCollectionForSplit(test.getDonorRst().nodes, splitMigrationId, tenantIds);
+
+ jsTestLog("Starting tenant migration");
+ const migrationOpts = {
+ migrationIdString: extractUUIDFromObject(tenantMigrationId),
+ protocol,
+ };
+ if (protocol != "shard merge") {
+ migrationOpts["tenantId"] = tenantIds[0];
+ }
+ assert.commandWorked(test.startMigration(migrationOpts));
+
+ TenantMigrationTest.assertCommitted(test.waitForMigrationToComplete(migrationOpts));
+ assert.commandWorked(test.forgetMigration(migrationOpts.migrationIdString));
+
+ recipientNodes.forEach(node => {
+ MongoRunner.stopMongod(node);
+ });
+
+ test.stop();
+ jsTestLog("canStartMigrationAfterSplitGarbageCollection test completed");
+}
+
+canStartShardSplitWithAbortedMigration({protocol: "multitenant migrations", runOnRecipient: false});
+canStartShardSplitWithAbortedMigration({protocol: "shard merge", runOnRecipient: false});
+
+cannotStartShardSplitWithMigrationInProgress(
+ {protocol: "multitenant migrations", runOnRecipient: false});
+cannotStartShardSplitWithMigrationInProgress({protocol: "shard merge", runOnRecipient: false});
+
+cannotStartShardSplitWithMigrationInProgress(
+ {protocol: "multitenant migrations", runOnRecipient: true});
+cannotStartShardSplitWithMigrationInProgress({protocol: "shard merge", runOnRecipient: true});
+
+cannotStartMigrationWhileShardSplitIsInProgress("multitenant migrations");
+cannotStartMigrationWhileShardSplitIsInProgress("shard merge");
+
+cannotStartMigrationWhileShardSplitIsInProgressOnRecipient("multitenant migrations");
+cannotStartMigrationWhileShardSplitIsInProgressOnRecipient("shard merge");
+
+cannotStartMigrationWithDifferentTenantWhileShardSplitIsInProgress("multitenant migrations");
+cannotStartMigrationWithDifferentTenantWhileShardSplitIsInProgress("shard merge");
+
+cannotStartMigrationWhenThereIsAnExistingAccessBlocker("multitenant migrations");
+cannotStartMigrationWhenThereIsAnExistingAccessBlocker("shard merge");
+
+canStartMigrationAfterSplitGarbageCollection("multitenant migrations");
+canStartMigrationAfterSplitGarbageCollection("shard merge");
diff --git a/jstests/serverless/shard_split_recipient_removes_access_blockers.js b/jstests/serverless/shard_split_recipient_removes_access_blockers.js
index 546b44c4d08..5398c8aba05 100644
--- a/jstests/serverless/shard_split_recipient_removes_access_blockers.js
+++ b/jstests/serverless/shard_split_recipient_removes_access_blockers.js
@@ -1,5 +1,5 @@
/*
- * Test that tenant access blockers are removed from recipients when applying the recipient config.
+ * Test that tenant access blockers are removed when applying the recipient config
*
* @tags: [requires_fcv_52, featureFlagShardSplit, serverless]
*/
diff --git a/jstests/serverless/shard_split_recipient_removes_serverless_lock.js b/jstests/serverless/shard_split_recipient_removes_serverless_lock.js
new file mode 100644
index 00000000000..c4f143677c8
--- /dev/null
+++ b/jstests/serverless/shard_split_recipient_removes_serverless_lock.js
@@ -0,0 +1,48 @@
+/*
+ * Test the serverless operation lock is released from recipients when the state document is
+ * removed.
+ *
+ * @tags: [requires_fcv_62, featureFlagShardSplit, serverless]
+ */
+
+load("jstests/libs/fail_point_util.js");
+load("jstests/serverless/libs/basic_serverless_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+const {ServerlessLockType, getServerlessOperationLock} = TenantMigrationUtil;
+
+(function() {
+"use strict";
+
+// Skip db hash check because secondary is left with a different config.
+TestData.skipCheckDBHashes = true;
+
+const test = new BasicServerlessTest({
+ recipientTagName: "recipientNode",
+ recipientSetName: "recipient",
+ quickGarbageCollection: true
+});
+test.addRecipientNodes();
+
+const donorPrimary = test.donor.getPrimary();
+const tenantIds = ["tenant1", "tenant2"];
+const operation = test.createSplitOperation(tenantIds);
+
+const donorAfterBlockingFailpoint =
+ configureFailPoint(donorPrimary.getDB("admin"), "pauseShardSplitAfterBlocking");
+
+const commitOp = operation.commitAsync();
+donorAfterBlockingFailpoint.wait();
+
+jsTestLog("Asserting recipient nodes have installed the serverless lock");
+assert.soon(() => test.recipientNodes.every(node => getServerlessOperationLock(node) ===
+ ServerlessLockType.ShardSplitDonor));
+donorAfterBlockingFailpoint.off();
+
+commitOp.join();
+assert.commandWorked(commitOp.returnData());
+
+jsTestLog("Asserting the serverless exclusion lock has been released");
+assert.soon(() => test.recipientNodes.every(node => getServerlessOperationLock(node) == null));
+
+test.stop();
+})();
diff --git a/jstests/serverless/shard_split_startup_recovery_initially_aborted.js b/jstests/serverless/shard_split_startup_recovery_initially_aborted.js
index f6301fa7f9a..2e4ddefade5 100644
--- a/jstests/serverless/shard_split_startup_recovery_initially_aborted.js
+++ b/jstests/serverless/shard_split_startup_recovery_initially_aborted.js
@@ -1,7 +1,9 @@
/**
- * Starts a shard split througt `abortShardSplit` and assert that no tenant access blockers are
+ * Starts a shard split through `abortShardSplit` and assert that no tenant access blockers are
* recovered since we do not recover access blockers for aborted split marked garbage collectable.
- * @tags: [requires_fcv_52, featureFlagShardSplit]
+ * Also verifies the serverless operation lock is not acquired when starting a split in aborted
+ * state.
+ * @tags: [requires_fcv_62, featureFlagShardSplit]
*/
load("jstests/libs/fail_point_util.js"); // for "configureFailPoint"
@@ -9,6 +11,7 @@ load('jstests/libs/parallel_shell_helpers.js'); // for "startPa
load("jstests/noPassthrough/libs/server_parameter_helpers.js"); // for "setParameter"
load("jstests/serverless/libs/basic_serverless_test.js");
load("jstests/replsets/libs/tenant_migration_test.js");
+const {getServerlessOperationLock} = TenantMigrationUtil;
(function() {
"use strict";
@@ -59,5 +62,8 @@ tenantIds.every(tenantId => {
{node: donorPrimary, tenantId: tenantId}));
});
+// We do not acquire the lock for document marked for garbage collection
+assert.eq(getServerlessOperationLock(donorPrimary), null);
+
test.stop();
})();
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index d45344827f5..8cc122bfbe1 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -490,7 +490,7 @@ error_codes:
- {code: 378, name: NonConformantBSON, categories: [ValidationError]}
- {code: 379, name: DatabaseMetadataRefreshCanceled, categories: [InternalOnly]}
- {code: 380, name: RequestAlreadyFulfilled}
-
+ - {code: 381, name: ConflictingServerlessOperation}
# Error codes 4000-8999 are reserved.
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index f27a9f26e57..7185e7e77d6 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -546,6 +546,7 @@ env.Library(
'$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker',
'$BUILD_DIR/mongo/db/s/sharding_runtime_d',
'$BUILD_DIR/mongo/db/server_base',
+ '$BUILD_DIR/mongo/db/serverless/serverless_lock',
'$BUILD_DIR/mongo/db/session/kill_sessions_local',
'$BUILD_DIR/mongo/db/session/session_catalog_mongod',
'$BUILD_DIR/mongo/db/storage/historical_ident_tracker',
@@ -760,6 +761,7 @@ env.Library(
'$BUILD_DIR/mongo/db/commands/test_commands_enabled',
'$BUILD_DIR/mongo/db/index_builds_coordinator_interface',
'$BUILD_DIR/mongo/db/server_base',
+ '$BUILD_DIR/mongo/db/serverless/serverless_lock',
'$BUILD_DIR/mongo/db/session/session_catalog',
'$BUILD_DIR/mongo/db/storage/journal_flusher',
'delayable_timeout_callback',
@@ -1243,6 +1245,7 @@ env.Library(
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/commands/feature_compatibility_parsers',
'$BUILD_DIR/mongo/db/index_builds_coordinator_interface',
+ '$BUILD_DIR/mongo/db/serverless/serverless_lock',
'$BUILD_DIR/mongo/db/session/session_catalog_mongod',
'$BUILD_DIR/mongo/executor/scoped_task_executor',
'repl_server_parameters',
@@ -1411,6 +1414,7 @@ env.Library(
'$BUILD_DIR/mongo/db/multitenancy',
'$BUILD_DIR/mongo/db/ops/write_ops_exec',
'$BUILD_DIR/mongo/db/pipeline/process_interface/mongo_process_interface',
+ '$BUILD_DIR/mongo/db/serverless/serverless_lock',
'$BUILD_DIR/mongo/db/session/session_catalog_mongod',
'$BUILD_DIR/mongo/db/storage/wiredtiger/storage_wiredtiger_import',
'$BUILD_DIR/mongo/db/transaction/transaction',
@@ -1483,6 +1487,7 @@ env.Library(
"$BUILD_DIR/mongo/db/catalog/local_oplog_info",
"$BUILD_DIR/mongo/db/concurrency/exception_util",
"$BUILD_DIR/mongo/db/index_builds_coordinator_interface",
+ "$BUILD_DIR/mongo/db/serverless/serverless_lock",
],
)
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 2550235f095..4c28e28fd56 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -63,6 +63,7 @@
#include "mongo/db/repl/sync_source_selector.h"
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
#include "mongo/db/repl/transaction_oplog_application.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/session/session_txn_record_gen.h"
#include "mongo/executor/task_executor.h"
#include "mongo/executor/thread_pool_task_executor.h"
@@ -577,6 +578,7 @@ void InitialSyncer::_tearDown_inlock(OperationContext* opCtx,
_storage->oplogDiskLocRegister(opCtx, initialDataTimestamp, orderedCommit);
tenant_migration_access_blocker::recoverTenantMigrationAccessBlockers(opCtx);
+ ServerlessOperationLockRegistry::recoverLocks(opCtx);
reconstructPreparedTransactions(opCtx, repl::OplogApplication::Mode::kInitialSync);
_replicationProcess->getConsistencyMarkers()->setInitialSyncIdIfNotSet(opCtx);
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 7bc8da73f7a..32946412f68 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -2049,6 +2049,7 @@ TEST_F(
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
// Start the real work.
ASSERT_OK(initialSyncer->startup(opCtx.get(), initialSyncMaxAttempts));
@@ -2091,6 +2092,8 @@ TEST_F(InitialSyncerTest,
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
+
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
@@ -2199,6 +2202,7 @@ TEST_F(InitialSyncerTest,
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -2270,6 +2274,7 @@ TEST_F(
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -2581,6 +2586,7 @@ TEST_F(InitialSyncerTest, InitialSyncerRetriesLastOplogEntryFetcherNetworkError)
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -3225,6 +3231,8 @@ TEST_F(InitialSyncerTest, InitialSyncerHandlesNetworkErrorsFromRollbackCheckerAf
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
+
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -3539,6 +3547,7 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -4204,6 +4213,7 @@ TEST_F(InitialSyncerTest,
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
doSuccessfulInitialSyncWithOneBatch();
}
@@ -4219,6 +4229,7 @@ TEST_F(InitialSyncerTest,
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -4552,6 +4563,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
// Skip clearing initial sync progress so that we can check initialSyncStatus fields after
// initial sync is complete.
@@ -4921,6 +4933,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgressForNetwork
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
// Skip clearing initial sync progress so that we can check initialSyncStatus fields after
// initial sync is complete.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index a943637c2e5..2feb1ed6b7b 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -88,6 +88,7 @@
#include "mongo/db/repl/update_position_args.h"
#include "mongo/db/repl/vote_requester.h"
#include "mongo/db/server_options.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/session/kill_sessions_local.h"
#include "mongo/db/session/session_catalog.h"
#include "mongo/db/shutdown_in_progress_quiesce_info.h"
@@ -534,6 +535,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(
}
tenant_migration_access_blocker::recoverTenantMigrationAccessBlockers(opCtx);
+ ServerlessOperationLockRegistry::recoverLocks(opCtx);
LOGV2(4280506, "Reconstructing prepared transactions");
reconstructPreparedTransactions(opCtx, OplogApplication::Mode::kRecovering);
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index 101ddcf0bb3..9f7f2e5863d 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -192,6 +192,8 @@ void ReplCoordTest::start() {
// Skip recovering user writes critical sections for the same reason as the above.
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ // Skip recovering of serverless mutual exclusion locks for the same reason as the above.
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
invariant(!_callShutdown);
// if we haven't initialized yet, do that first.
if (!_repl) {
diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp
index 5c0f74b225c..f2bf4fb78da 100644
--- a/src/mongo/db/repl/rollback_impl.cpp
+++ b/src/mongo/db/repl/rollback_impl.cpp
@@ -58,6 +58,7 @@
#include "mongo/db/repl/transaction_oplog_application.h"
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/db/server_recovery.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/session/kill_sessions_local.h"
#include "mongo/db/session/session_catalog_mongod.h"
#include "mongo/db/session/session_txn_record_gen.h"
@@ -651,6 +652,7 @@ void RollbackImpl::_runPhaseFromAbortToReconstructPreparedTxns(
_correctRecordStoreCounts(opCtx);
tenant_migration_access_blocker::recoverTenantMigrationAccessBlockers(opCtx);
+ ServerlessOperationLockRegistry::recoverLocks(opCtx);
// Reconstruct prepared transactions after counts have been adjusted. Since prepared
// transactions were aborted (i.e. the in-memory counts were rolled-back) before computing
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp
index 6f0fde3e5e6..c581366d9c2 100644
--- a/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp
+++ b/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp
@@ -85,7 +85,7 @@ void TenantMigrationAccessBlockerRegistry::add(StringData tenantId,
if (it != _tenantMigrationAccessBlockers.end()) {
auto existingMtab = it->second.getAccessBlocker(mtabType);
if (existingMtab) {
- tasserted(ErrorCodes::ConflictingOperationInProgress,
+ uasserted(ErrorCodes::ConflictingServerlessOperation,
str::stream() << "This node is already a "
<< (mtabType == MtabType::kDonor ? "donor" : "recipient")
<< " for tenantId \"" << tenantId << "\" with migrationId \""
@@ -117,7 +117,7 @@ void TenantMigrationAccessBlockerRegistry::addShardMergeDonorAccessBlocker(
_tenantMigrationAccessBlockers.begin(),
_tenantMigrationAccessBlockers.end(),
[](const auto& pair) { return pair.second.getAccessBlocker(MtabType::kDonor).get(); });
- tassert(6114105,
+ uassert(ErrorCodes::ConflictingServerlessOperation,
str::stream() << "Adding shard merge donor blocker when this node has another donor "
"blocker with migrationId \""
<< foundAccessBlocker->second.getAccessBlocker(MtabType::kDonor)
diff --git a/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp b/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp
index 264099e24cf..29ec527b2a9 100644
--- a/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/repl/tenant_migration_decoration.h"
#include "mongo/db/repl/tenant_migration_donor_op_observer.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/logv2/log.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
@@ -54,6 +55,10 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx,
const TenantMigrationDonorDocument& donorStateDoc) {
invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kAbortingIndexBuilds);
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .acquireLock(ServerlessOperationLockRegistry::LockType::kTenantDonor,
+ donorStateDoc.getId());
+
auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>(opCtx->getServiceContext(),
donorStateDoc.getId());
if (donorStateDoc.getProtocol().value_or(MigrationProtocolEnum::kMultitenantMigrations) ==
@@ -69,6 +74,9 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx,
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.remove(donorStateDoc.getTenantId(),
TenantMigrationAccessBlocker::BlockerType::kDonor);
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor,
+ donorStateDoc.getId());
});
}
} else {
@@ -85,6 +93,9 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx,
opCtx->recoveryUnit()->onRollback([opCtx, donorStateDoc] {
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.removeShardMergeDonorAccessBlocker(donorStateDoc.getId());
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor,
+ donorStateDoc.getId());
});
}
}
@@ -156,6 +167,10 @@ public:
void commit(boost::optional<Timestamp>) override {
if (_donorStateDoc.getExpireAt()) {
+ ServerlessOperationLockRegistry::get(_opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor,
+ _donorStateDoc.getId());
+
auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker(
_opCtx->getServiceContext(), _donorStateDoc.getTenantId());
@@ -339,6 +354,9 @@ repl::OpTime TenantMigrationDonorOpObserver::onDropCollection(OperationContext*
opCtx->recoveryUnit()->onCommit([opCtx](boost::optional<Timestamp>) {
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.removeAll(TenantMigrationAccessBlocker::BlockerType::kDonor);
+
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .onDropStateCollection(ServerlessOperationLockRegistry::LockType::kTenantDonor);
});
}
return {};
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index 1d3f4cdf1e2..d9c512059ff 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -515,7 +515,12 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertState
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
})
- .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); })
+ .until([&](StatusWith<repl::OpTime> swOpTime) {
+ if (swOpTime.getStatus().code() == ErrorCodes::ConflictingServerlessOperation) {
+ uassertStatusOK(swOpTime);
+ }
+ return swOpTime.getStatus().isOK();
+ })
.withBackoffBetweenIterations(kExponentialBackoff)
.on(**executor, token);
}
diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
index e2a047876bf..19f5f6ab71c 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/repl/tenant_migration_shard_merge_util.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
#include "mongo/db/repl/tenant_migration_util.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/logv2/log.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
@@ -179,6 +180,18 @@ void TenantMigrationRecipientOpObserver::onInserts(
std::vector<InsertStatement>::const_iterator first,
std::vector<InsertStatement>::const_iterator last,
bool fromMigrate) {
+ if (coll->ns() == NamespaceString::kTenantMigrationRecipientsNamespace &&
+ !tenant_migration_access_blocker::inRecoveryMode(opCtx)) {
+ for (auto it = first; it != last; it++) {
+ auto recipientStateDoc = TenantMigrationRecipientDocument::parse(
+ IDLParserContext("recipientStateDoc"), it->doc);
+ if (!recipientStateDoc.getExpireAt()) {
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .acquireLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient,
+ recipientStateDoc.getId());
+ }
+ }
+ }
if (!shard_merge_utils::isDonatedFilesCollection(coll->ns())) {
return;
@@ -204,6 +217,10 @@ void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx,
repl::TenantFileImporterService::get(opCtx->getServiceContext())
->interrupt(recipientStateDoc.getId());
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient,
+ recipientStateDoc.getId());
+
std::vector<std::string> tenantIdsToRemove;
auto cleanUpBlockerIfGarbage =
[&](std::string tenantId, std::shared_ptr<TenantMigrationAccessBlocker>& mtab) {
@@ -312,6 +329,9 @@ repl::OpTime TenantMigrationRecipientOpObserver::onDropCollection(
repl::TenantFileImporterService::get(opCtx->getServiceContext())->interruptAll();
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.removeAll(TenantMigrationAccessBlocker::BlockerType::kRecipient);
+
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .onDropStateCollection(ServerlessOperationLockRegistry::LockType::kTenantRecipient);
});
}
return {};
diff --git a/src/mongo/db/serverless/SConscript b/src/mongo/db/serverless/SConscript
index 3ccfd8ea7f7..82143897663 100644
--- a/src/mongo/db/serverless/SConscript
+++ b/src/mongo/db/serverless/SConscript
@@ -57,6 +57,21 @@ env.Library(
)
env.Library(
+ target='serverless_lock',
+ source=[
+ 'serverless_operation_lock_registry.cpp',
+ 'serverless_server_status.cpp',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/dbdirectclient',
+ '$BUILD_DIR/mongo/db/repl/tenant_migration_state_machine_idl',
+ '$BUILD_DIR/mongo/db/repl/tenant_migration_utils',
+ '$BUILD_DIR/mongo/db/server_base',
+ 'shard_split_state_machine',
+ ],
+)
+
+env.Library(
target='shard_split_donor_service',
source=[
'shard_split_donor_service.cpp',
@@ -77,6 +92,7 @@ env.Library(
'$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker',
'$BUILD_DIR/mongo/db/server_base',
'$BUILD_DIR/mongo/db/shard_role',
+ 'serverless_lock',
'shard_split_utils',
],
)
@@ -84,6 +100,7 @@ env.Library(
env.CppUnitTest(
target='db_serverless_test',
source=[
+ 'serverless_operation_lock_registry_test.cpp',
'shard_split_donor_op_observer_test.cpp',
'shard_split_donor_service_test.cpp',
'shard_split_utils_test.cpp',
@@ -97,6 +114,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/repl/replmocks',
'$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker',
'$BUILD_DIR/mongo/dbtests/mocklib',
+ 'serverless_lock',
'shard_split_donor_service',
'shard_split_utils',
],
diff --git a/src/mongo/db/serverless/serverless_operation_lock_registry.cpp b/src/mongo/db/serverless/serverless_operation_lock_registry.cpp
new file mode 100644
index 00000000000..2623320764a
--- /dev/null
+++ b/src/mongo/db/serverless/serverless_operation_lock_registry.cpp
@@ -0,0 +1,192 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
+#include "mongo/db/persistent_task_store.h"
+#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
+#include "mongo/db/serverless/shard_split_state_machine_gen.h"
+#include "mongo/logv2/log.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTenantMigration
+
+// Failpoint that will cause recoverLocks to return early.
+MONGO_FAIL_POINT_DEFINE(skipRecoverServerlessOperationLock);
+namespace mongo {
+
+const ServiceContext::Decoration<ServerlessOperationLockRegistry>
+ ServerlessOperationLockRegistry::get =
+ ServiceContext::declareDecoration<ServerlessOperationLockRegistry>();
+
+void ServerlessOperationLockRegistry::acquireLock(
+ ServerlessOperationLockRegistry::LockType lockType, const UUID& operationId) {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ // Verify there is no serverless operation in progress or it is the same type as the one
+ // acquiring the lock.
+ uassert(ErrorCodes::ConflictingServerlessOperation,
+ "Conflicting serverless operation in progress",
+ !_activeLockType || _activeLockType.get() == lockType);
+ invariant(_activeOperations.find(operationId) == _activeOperations.end(),
+ "Cannot acquire the serverless lock twice for the same operationId.");
+ _activeLockType = lockType;
+
+ _activeOperations.emplace(operationId);
+
+ LOGV2(6531500,
+ "Acquired serverless operation lock",
+ "type"_attr = lockType,
+ "id"_attr = operationId);
+}
+
+void ServerlessOperationLockRegistry::releaseLock(
+ ServerlessOperationLockRegistry::LockType lockType, const UUID& operationId) {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ invariant(_activeLockType && *_activeLockType == lockType,
+ "Cannot release a serverless lock that is not owned by the given lock type.");
+
+ invariant(_activeOperations.find(operationId) != _activeOperations.end(),
+ "Cannot release a serverless lock if the given operationId does not own the lock.");
+ _activeOperations.erase(operationId);
+
+ if (_activeOperations.empty()) {
+ _activeLockType.reset();
+ }
+
+ LOGV2(6531501,
+ "Released serverless operation lock",
+ "type"_attr = lockType,
+ "id"_attr = operationId);
+}
+
+void ServerlessOperationLockRegistry::onDropStateCollection(LockType lockType) {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ if (!_activeLockType || *_activeLockType != lockType) {
+ return;
+ }
+
+ LOGV2(6531505,
+ "Released all serverless locks due to state collection drop",
+ "type"_attr = lockType);
+
+ _activeLockType.reset();
+ _activeOperations.clear();
+}
+
+void ServerlessOperationLockRegistry::clear() {
+ stdx::lock_guard<Latch> lg(_mutex);
+ LOGV2(6531504,
+ "Releasing ServerlessMutualExclusionRegistry lock on shutdown",
+ "ns"_attr = _activeLockType);
+
+ _activeOperations.clear();
+ _activeLockType.reset();
+}
+
+void ServerlessOperationLockRegistry::recoverLocks(OperationContext* opCtx) {
+ if (skipRecoverServerlessOperationLock.shouldFail()) {
+ return;
+ }
+
+ auto& registry = ServerlessOperationLockRegistry::get(opCtx->getServiceContext());
+ registry.clear();
+
+ PersistentTaskStore<TenantMigrationDonorDocument> donorStore(
+ NamespaceString::kTenantMigrationDonorsNamespace);
+ donorStore.forEach(opCtx, {}, [&](const TenantMigrationDonorDocument& doc) {
+ // Do not acquire a lock for garbage-collectable documents
+ if (doc.getExpireAt()) {
+ return true;
+ }
+
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, doc.getId());
+
+ return true;
+ });
+
+ PersistentTaskStore<TenantMigrationRecipientDocument> recipientStore(
+ NamespaceString::kTenantMigrationRecipientsNamespace);
+ recipientStore.forEach(opCtx, {}, [&](const TenantMigrationRecipientDocument& doc) {
+ // Do not acquire a lock for garbage-collectable documents
+ if (doc.getExpireAt()) {
+ return true;
+ }
+
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient,
+ doc.getId());
+
+ return true;
+ });
+
+ PersistentTaskStore<ShardSplitDonorDocument> splitStore(
+ NamespaceString::kShardSplitDonorsNamespace);
+ splitStore.forEach(opCtx, {}, [&](const ShardSplitDonorDocument& doc) {
+ // Do not acquire a lock for garbage-collectable documents
+ if (doc.getExpireAt()) {
+ return true;
+ }
+
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, doc.getId());
+
+ return true;
+ });
+}
+
+const std::string kOperationLockFieldName = "operationLock";
+void ServerlessOperationLockRegistry::appendInfoForServerStatus(BSONObjBuilder* builder) const {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ if (!_activeLockType) {
+ builder->appendNull(kOperationLockFieldName);
+ return;
+ }
+
+ switch (_activeLockType.value()) {
+ case ServerlessOperationLockRegistry::LockType::kShardSplit:
+ builder->append(kOperationLockFieldName, "shard split donor");
+ break;
+ case ServerlessOperationLockRegistry::LockType::kTenantDonor:
+ builder->append(kOperationLockFieldName, "tenant migration donor");
+ break;
+ case ServerlessOperationLockRegistry::LockType::kTenantRecipient:
+ builder->append(kOperationLockFieldName, "tenant migration recipient");
+ break;
+ }
+}
+
+boost::optional<ServerlessOperationLockRegistry::LockType>
+ServerlessOperationLockRegistry::getActiveOperationType_forTest() {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ return _activeLockType;
+}
+
+
+} // namespace mongo
diff --git a/src/mongo/db/serverless/serverless_operation_lock_registry.h b/src/mongo/db/serverless/serverless_operation_lock_registry.h
new file mode 100644
index 00000000000..d9ac07393f4
--- /dev/null
+++ b/src/mongo/db/serverless/serverless_operation_lock_registry.h
@@ -0,0 +1,96 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/service_context.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/util/uuid.h"
+
+#include <set>
+
+namespace mongo {
+
+/**
+ * Registry to allow only one type of active serverless operation at a time. It allows multiple
+ * simultaneous operations of the same type.
+ */
+class ServerlessOperationLockRegistry {
+ ServerlessOperationLockRegistry(const ServerlessOperationLockRegistry&) = delete;
+ ServerlessOperationLockRegistry& operator=(const ServerlessOperationLockRegistry&) = delete;
+
+public:
+ ServerlessOperationLockRegistry() = default;
+
+ static const ServiceContext::Decoration<ServerlessOperationLockRegistry> get;
+
+ enum LockType { kShardSplit, kTenantDonor, kTenantRecipient };
+
+ /**
+ * Acquire the serverless lock for LockType and adds operationId to the set of
+ * instances tracked. Throws ConflictingOperationInProgress error if there is already an
+ * activeServerlessOperation in progress with a different namespace than operationNamespace.
+ */
+ void acquireLock(LockType lockType, const UUID& operationId);
+
+ /**
+ * If _activeOpSeverlessOperation matches LockType, removes the given operationId from
+ * the set of active instances and releases the lock if the set becomes empty. Invariant if
+ * lockType or operationId does not own the lock.
+ */
+ void releaseLock(LockType lockType, const UUID& operationId);
+
+ /**
+ * Called when a state document collection is dropped. If the collection's lockType currently
+ * holds the lock, it releases the lock. If it does not own the lock, the function does nothing.
+ */
+ void onDropStateCollection(LockType lockType);
+
+ void clear();
+
+ /**
+ * Scan serverless state documents and acquire the serverless mutual exclusion lock if needed.
+ */
+ static void recoverLocks(OperationContext* opCtx);
+
+ /**
+ * Appends the exclusion status to the BSONObjBuilder.
+ */
+ void appendInfoForServerStatus(BSONObjBuilder* builder) const;
+
+ boost::optional<ServerlessOperationLockRegistry::LockType> getActiveOperationType_forTest();
+
+private:
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("ServerlessMutualExclusionRegistry::_mutex");
+ boost::optional<LockType> _activeLockType;
+ std::set<UUID> _activeOperations;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/serverless/serverless_operation_lock_registry_test.cpp b/src/mongo/db/serverless/serverless_operation_lock_registry_test.cpp
new file mode 100644
index 00000000000..9d95b3b7bc7
--- /dev/null
+++ b/src/mongo/db/serverless/serverless_operation_lock_registry_test.cpp
@@ -0,0 +1,154 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
+#include "mongo/unittest/death_test.h"
+#include "mongo/unittest/log_test.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+
+TEST(ServerlessOperationLockRegistryTest, InsertRemoveOne) {
+ ServerlessOperationLockRegistry registry;
+
+ auto id = UUID::gen();
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+
+ ASSERT_FALSE(registry.getActiveOperationType_forTest());
+}
+
+DEATH_TEST(ServerlessOperationLockRegistryTest,
+ InsertSameIdTwice,
+ "Cannot acquire the serverless lock twice for the same operationId.") {
+ ServerlessOperationLockRegistry registry;
+
+ auto id = UUID::gen();
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+}
+
+TEST(ServerlessOperationLockRegistryTest, AcquireDifferentNamespaceFail) {
+ ServerlessOperationLockRegistry registry;
+
+ auto id = UUID::gen();
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+
+ ASSERT_THROWS_CODE(
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, UUID::gen()),
+ DBException,
+ ErrorCodes::ConflictingServerlessOperation);
+}
+
+DEATH_TEST(ServerlessOperationLockRegistryTest,
+ ReleaseDifferentNsTriggersInvariant,
+ "Cannot release a serverless lock that is not owned by the given lock type.") {
+ ServerlessOperationLockRegistry registry;
+
+ auto id = UUID::gen();
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ registry.releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, id);
+}
+
+
+DEATH_TEST(ServerlessOperationLockRegistryTest,
+ ReleaseDifferentIdTriggersInvariant,
+ "Cannot release a serverless lock if the given operationId does not own the lock.") {
+ ServerlessOperationLockRegistry registry;
+
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen());
+ registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen());
+}
+
+TEST(ServerlessOperationLockRegistryTest, ClearReleasesAllLocks) {
+ ServerlessOperationLockRegistry registry;
+
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen());
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen());
+
+ registry.clear();
+
+ // Verify the lock has been released.
+ ASSERT_FALSE(registry.getActiveOperationType_forTest());
+}
+
+TEST(ServerlessOperationLockRegistryTest, LockIsReleasedWhenAllInstanceAreRemoved) {
+ ServerlessOperationLockRegistry registry;
+
+ std::vector<UUID> ids;
+ for (int i = 0; i < 5; ++i) {
+ ids.push_back(UUID::gen());
+ }
+
+ for (auto& id : ids) {
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ }
+
+ // Verify the lock is held;
+ ASSERT_EQ(*registry.getActiveOperationType_forTest(),
+ ServerlessOperationLockRegistry::LockType::kShardSplit);
+
+
+ for (auto& id : ids) {
+ registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ }
+
+ // Verify the lock has been released.
+ ASSERT_FALSE(registry.getActiveOperationType_forTest());
+}
+
+TEST(ServerlessOperationLockRegistryTest, LockIsNotReleasedWhenNotAllInstanceAreRemoved) {
+ ServerlessOperationLockRegistry registry;
+
+ std::vector<UUID> ids;
+ for (int i = 0; i < 5; ++i) {
+ ids.push_back(UUID::gen());
+ }
+
+ for (auto& id : ids) {
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ }
+ // Add an additional id;
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen());
+
+ // Verify the lock is held;
+ ASSERT_EQ(*registry.getActiveOperationType_forTest(),
+ ServerlessOperationLockRegistry::LockType::kShardSplit);
+
+ for (auto& id : ids) {
+ registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ }
+
+ // Verify the lock is held;
+ ASSERT_EQ(*registry.getActiveOperationType_forTest(),
+ ServerlessOperationLockRegistry::LockType::kShardSplit);
+}
+
+
+} // namespace mongo
diff --git a/src/mongo/db/serverless/serverless_server_status.cpp b/src/mongo/db/serverless/serverless_server_status.cpp
new file mode 100644
index 00000000000..18bf376c6bb
--- /dev/null
+++ b/src/mongo/db/serverless/serverless_server_status.cpp
@@ -0,0 +1,57 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/commands/server_status.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
+
+namespace mongo {
+namespace {
+
+class ServerlessServerStatus final : public ServerStatusSection {
+public:
+ ServerlessServerStatus() : ServerStatusSection("serverless") {}
+
+ bool includeByDefault() const override {
+ return true;
+ }
+
+ BSONObj generateSection(OperationContext* opCtx,
+ const BSONElement& configElement) const override {
+ BSONObjBuilder result;
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .appendInfoForServerStatus(&result);
+ return result.obj();
+ }
+} serverlessServerStatus;
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
index 0e25639559f..0f62f732b9a 100644
--- a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/catalog_raii.h"
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/serverless/shard_split_donor_op_observer.h"
#include "mongo/db/serverless/shard_split_state_machine_gen.h"
#include "mongo/db/serverless/shard_split_utils.h"
@@ -48,6 +49,8 @@ bool isPrimary(const OperationContext* opCtx) {
const auto tenantIdsToDeleteDecoration =
OperationContext::declareDecoration<boost::optional<std::vector<std::string>>>();
+const auto shardSplitIdToDeleteDecoration =
+ OperationContext::declareDecoration<boost::optional<UUID>>();
ShardSplitDonorDocument parseAndValidateDonorDocument(const BSONObj& doc) {
auto donorStateDoc = ShardSplitDonorDocument::parse(IDLParserContext("donorStateDoc"), doc);
@@ -146,6 +149,9 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx,
invariant(donorStateDoc.getTenantIds());
invariant(donorStateDoc.getRecipientConnectionString());
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, donorStateDoc.getId());
+
auto tenantIds = *donorStateDoc.getTenantIds();
for (const auto& tenantId : tenantIds) {
auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>(opCtx->getServiceContext(),
@@ -157,11 +163,13 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx,
if (isPrimary(opCtx)) {
// onRollback is not registered on secondaries since secondaries should not fail to
// apply the write.
- opCtx->recoveryUnit()->onRollback([opCtx, tenantIds] {
+ opCtx->recoveryUnit()->onRollback([opCtx, tenantIds, id = donorStateDoc.getId()] {
for (const auto& tenantId : tenantIds) {
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kDonor);
}
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
});
}
}
@@ -251,6 +259,10 @@ public:
void commit(boost::optional<Timestamp>) override {
if (_donorStateDoc.getExpireAt()) {
+ ServerlessOperationLockRegistry::get(_opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit,
+ _donorStateDoc.getId());
+
if (_donorStateDoc.getTenantIds()) {
auto tenantIds = _donorStateDoc.getTenantIds().value();
for (auto&& tenantId : tenantIds) {
@@ -378,12 +390,13 @@ void ShardSplitDonorOpObserver::aboutToDelete(OperationContext* opCtx,
}
auto donorStateDoc = parseAndValidateDonorDocument(doc);
+ const bool shouldRemoveOnRecipient =
+ serverless::shouldRemoveStateDocumentOnRecipient(opCtx, donorStateDoc);
uassert(ErrorCodes::IllegalOperation,
str::stream() << "cannot delete a donor's state document " << doc
<< " since it has not been marked as garbage collectable and is not a"
<< " recipient garbage collectable.",
- donorStateDoc.getExpireAt() ||
- serverless::shouldRemoveStateDocumentOnRecipient(opCtx, donorStateDoc));
+ donorStateDoc.getExpireAt() || shouldRemoveOnRecipient);
// To support back-to-back split retries, when a split is aborted, we remove its
// TenantMigrationDonorAccessBlockers as soon as its donor state doc is marked as garbage
@@ -399,6 +412,10 @@ void ShardSplitDonorOpObserver::aboutToDelete(OperationContext* opCtx,
tenantIdsToDeleteDecoration(opCtx) = boost::make_optional(result);
}
+
+ if (shouldRemoveOnRecipient) {
+ shardSplitIdToDeleteDecoration(opCtx) = boost::make_optional(donorStateDoc.getId());
+ }
}
void ShardSplitDonorOpObserver::onDelete(OperationContext* opCtx,
@@ -421,6 +438,12 @@ void ShardSplitDonorOpObserver::onDelete(OperationContext* opCtx,
for (auto&& tenantId : *tenantIdsToDeleteDecoration(opCtx)) {
registry.remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kDonor);
}
+
+ const auto idToDelete = shardSplitIdToDeleteDecoration(opCtx);
+ if (idToDelete) {
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, *idToDelete);
+ }
});
}
@@ -433,6 +456,9 @@ repl::OpTime ShardSplitDonorOpObserver::onDropCollection(OperationContext* opCtx
opCtx->recoveryUnit()->onCommit([opCtx](boost::optional<Timestamp>) {
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.removeAll(TenantMigrationAccessBlocker::BlockerType::kDonor);
+
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .onDropStateCollection(ServerlessOperationLockRegistry::LockType::kShardSplit);
});
}
diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp
index 97c923524a3..f30ad593951 100644
--- a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/serverless/shard_split_donor_op_observer.h"
#include "mongo/db/serverless/shard_split_state_machine_gen.h"
#include "mongo/db/serverless/shard_split_test_utils.h"
@@ -446,7 +447,13 @@ TEST_F(ShardSplitDonorOpObserverTest, SetExpireAtForAbortedRemoveBlockers) {
ASSERT_FALSE(mtab);
};
+ ServerlessOperationLockRegistry::get(_opCtx->getServiceContext())
+ .acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, _uuid);
+
runUpdateTestCase(stateDocument, _tenantIds, mtabVerifier);
+
+ ASSERT_FALSE(ServerlessOperationLockRegistry::get(_opCtx->getServiceContext())
+ .getActiveOperationType_forTest());
}
TEST_F(ShardSplitDonorOpObserverTest, DeleteAbortedDocumentDoesNotRemoveBlockers) {
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp
index 4c3fdabb39f..97eecdae6d1 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service.cpp
@@ -1005,7 +1005,14 @@ ExecutorFuture<repl::OpTime> ShardSplitDonorService::DonorStateMachine::_updateS
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
})
- .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); })
+ .until([&](StatusWith<repl::OpTime> swOpTime) {
+ if (swOpTime.getStatus().code() == ErrorCodes::ConflictingServerlessOperation) {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ uassertStatusOK(swOpTime);
+ }
+ return swOpTime.getStatus().isOK();
+ })
.withBackoffBetweenIterations(kExponentialBackoff)
.on(**executor, token);
}
@@ -1094,6 +1101,11 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState(
"Entering 'aborted' state.",
"id"_attr = _migrationId,
"abortReason"_attr = _abortReason.value());
+
+ if (_abortReason->code() == ErrorCodes::ConflictingServerlessOperation) {
+ return ExecutorFuture(**executor,
+ DurableState{ShardSplitDonorStateEnum::kAborted, _abortReason});
+ }
}
return ExecutorFuture<void>(**executor)
@@ -1113,7 +1125,8 @@ ExecutorFuture<void>
ShardSplitDonorService::DonorStateMachine::_waitForForgetCmdThenMarkGarbageCollectable(
const ScopedTaskExecutorPtr& executor, const CancellationToken& primaryToken) {
stdx::lock_guard<Latch> lg(_mutex);
- if (_stateDoc.getExpireAt()) {
+ if (_stateDoc.getExpireAt() ||
+ (_abortReason && _abortReason->code() == ErrorCodes::ConflictingServerlessOperation)) {
return ExecutorFuture(**executor);
}
diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
index 743f64669c8..646a11244dd 100644
--- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
#include "mongo/db/repl/tenant_migration_donor_access_blocker.h"
#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/serverless/shard_split_donor_op_observer.h"
#include "mongo/db/serverless/shard_split_donor_service.h"
#include "mongo/db/serverless/shard_split_state_machine_gen.h"
@@ -511,6 +512,11 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation)
waitForReplSetStepUp(Status(ErrorCodes::OK, ""));
waitForRecipientPrimaryMajorityWrite();
+ // Verify the serverless lock has been acquired for split.
+ auto& registry = ServerlessOperationLockRegistry::get(opCtx->getServiceContext());
+ ASSERT_EQ(*registry.getActiveOperationType_forTest(),
+ ServerlessOperationLockRegistry::LockType::kShardSplit);
+
auto result = serviceInstance->decisionFuture().get();
ASSERT_TRUE(hasActiveSplitForTenants(opCtx.get(), _tenantIds));
ASSERT(!result.abortReason);
@@ -520,10 +526,34 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation)
auto completionFuture = serviceInstance->completionFuture();
completionFuture.wait();
+ // The lock has been released.
+ ASSERT_FALSE(registry.getActiveOperationType_forTest());
+
ASSERT_OK(serviceInstance->completionFuture().getNoThrow());
ASSERT_TRUE(serviceInstance->isGarbageCollectable());
}
+TEST_F(ShardSplitDonorServiceTest, ShardSplitFailsWhenLockIsHeld) {
+ auto opCtx = makeOperationContext();
+ test::shard_split::reconfigToAddRecipientNodes(
+ getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts());
+
+ auto& registry = ServerlessOperationLockRegistry::get(opCtx->getServiceContext());
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient, UUID::gen());
+
+ // Create and start the instance.
+ auto serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate(
+ opCtx.get(), _service, defaultStateDocument().toBSON());
+ ASSERT(serviceInstance.get());
+
+ auto decisionFuture = serviceInstance->decisionFuture();
+
+ auto result = decisionFuture.get();
+ ASSERT_EQ(result.state, ShardSplitDonorStateEnum::kAborted);
+ ASSERT(result.abortReason);
+ ASSERT_EQ(result.abortReason->code(), ErrorCodes::ConflictingServerlessOperation);
+}
+
TEST_F(ShardSplitDonorServiceTest, ReplSetStepUpRetryable) {
auto opCtx = makeOperationContext();
test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get());
@@ -1015,6 +1045,10 @@ public:
stateDocument.setState(ShardSplitDonorStateEnum::kBlocking);
stateDocument.setRecipientConnectionString(ConnectionString::forLocal());
+ ServerlessOperationLockRegistry::get(getServiceContext())
+ .acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit,
+ stateDocument.getId());
+
return stateDocument;
}
};