summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatt Broadstone <mbroadst@mongodb.com>2022-09-14 23:05:50 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-15 00:19:47 +0000
commit1a13031f7cdfb6cffdcff212edef0790fe084df2 (patch)
tree8d6ba207b94133d0775d398c3384380dfcc8343a /src
parentd1da149077ea312efc80f8836bc79f2b1b10c1ad (diff)
downloadmongo-1a13031f7cdfb6cffdcff212edef0790fe084df2.tar.gz
SERVER-65315 Enforce mutual exclusion between serverless operations
Diffstat (limited to 'src')
-rw-r--r--src/mongo/base/error_codes.yml2
-rw-r--r--src/mongo/db/repl/SConscript5
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp2
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp13
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp2
-rw-r--r--src/mongo/db/repl/rollback_impl.cpp2
-rw-r--r--src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp4
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_op_observer.cpp18
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp7
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp20
-rw-r--r--src/mongo/db/serverless/SConscript18
-rw-r--r--src/mongo/db/serverless/serverless_operation_lock_registry.cpp192
-rw-r--r--src/mongo/db/serverless/serverless_operation_lock_registry.h96
-rw-r--r--src/mongo/db/serverless/serverless_operation_lock_registry_test.cpp154
-rw-r--r--src/mongo/db/serverless/serverless_server_status.cpp57
-rw-r--r--src/mongo/db/serverless/shard_split_donor_op_observer.cpp32
-rw-r--r--src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp7
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.cpp17
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service_test.cpp34
20 files changed, 675 insertions, 9 deletions
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index d45344827f5..8cc122bfbe1 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -490,7 +490,7 @@ error_codes:
- {code: 378, name: NonConformantBSON, categories: [ValidationError]}
- {code: 379, name: DatabaseMetadataRefreshCanceled, categories: [InternalOnly]}
- {code: 380, name: RequestAlreadyFulfilled}
-
+ - {code: 381, name: ConflictingServerlessOperation}
# Error codes 4000-8999 are reserved.
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index f27a9f26e57..7185e7e77d6 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -546,6 +546,7 @@ env.Library(
'$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker',
'$BUILD_DIR/mongo/db/s/sharding_runtime_d',
'$BUILD_DIR/mongo/db/server_base',
+ '$BUILD_DIR/mongo/db/serverless/serverless_lock',
'$BUILD_DIR/mongo/db/session/kill_sessions_local',
'$BUILD_DIR/mongo/db/session/session_catalog_mongod',
'$BUILD_DIR/mongo/db/storage/historical_ident_tracker',
@@ -760,6 +761,7 @@ env.Library(
'$BUILD_DIR/mongo/db/commands/test_commands_enabled',
'$BUILD_DIR/mongo/db/index_builds_coordinator_interface',
'$BUILD_DIR/mongo/db/server_base',
+ '$BUILD_DIR/mongo/db/serverless/serverless_lock',
'$BUILD_DIR/mongo/db/session/session_catalog',
'$BUILD_DIR/mongo/db/storage/journal_flusher',
'delayable_timeout_callback',
@@ -1243,6 +1245,7 @@ env.Library(
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/commands/feature_compatibility_parsers',
'$BUILD_DIR/mongo/db/index_builds_coordinator_interface',
+ '$BUILD_DIR/mongo/db/serverless/serverless_lock',
'$BUILD_DIR/mongo/db/session/session_catalog_mongod',
'$BUILD_DIR/mongo/executor/scoped_task_executor',
'repl_server_parameters',
@@ -1411,6 +1414,7 @@ env.Library(
'$BUILD_DIR/mongo/db/multitenancy',
'$BUILD_DIR/mongo/db/ops/write_ops_exec',
'$BUILD_DIR/mongo/db/pipeline/process_interface/mongo_process_interface',
+ '$BUILD_DIR/mongo/db/serverless/serverless_lock',
'$BUILD_DIR/mongo/db/session/session_catalog_mongod',
'$BUILD_DIR/mongo/db/storage/wiredtiger/storage_wiredtiger_import',
'$BUILD_DIR/mongo/db/transaction/transaction',
@@ -1483,6 +1487,7 @@ env.Library(
"$BUILD_DIR/mongo/db/catalog/local_oplog_info",
"$BUILD_DIR/mongo/db/concurrency/exception_util",
"$BUILD_DIR/mongo/db/index_builds_coordinator_interface",
+ "$BUILD_DIR/mongo/db/serverless/serverless_lock",
],
)
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 2550235f095..4c28e28fd56 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -63,6 +63,7 @@
#include "mongo/db/repl/sync_source_selector.h"
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
#include "mongo/db/repl/transaction_oplog_application.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/session/session_txn_record_gen.h"
#include "mongo/executor/task_executor.h"
#include "mongo/executor/thread_pool_task_executor.h"
@@ -577,6 +578,7 @@ void InitialSyncer::_tearDown_inlock(OperationContext* opCtx,
_storage->oplogDiskLocRegister(opCtx, initialDataTimestamp, orderedCommit);
tenant_migration_access_blocker::recoverTenantMigrationAccessBlockers(opCtx);
+ ServerlessOperationLockRegistry::recoverLocks(opCtx);
reconstructPreparedTransactions(opCtx, repl::OplogApplication::Mode::kInitialSync);
_replicationProcess->getConsistencyMarkers()->setInitialSyncIdIfNotSet(opCtx);
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 7bc8da73f7a..32946412f68 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -2049,6 +2049,7 @@ TEST_F(
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
// Start the real work.
ASSERT_OK(initialSyncer->startup(opCtx.get(), initialSyncMaxAttempts));
@@ -2091,6 +2092,8 @@ TEST_F(InitialSyncerTest,
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
+
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
@@ -2199,6 +2202,7 @@ TEST_F(InitialSyncerTest,
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -2270,6 +2274,7 @@ TEST_F(
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -2581,6 +2586,7 @@ TEST_F(InitialSyncerTest, InitialSyncerRetriesLastOplogEntryFetcherNetworkError)
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -3225,6 +3231,8 @@ TEST_F(InitialSyncerTest, InitialSyncerHandlesNetworkErrorsFromRollbackCheckerAf
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
+
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -3539,6 +3547,7 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -4204,6 +4213,7 @@ TEST_F(InitialSyncerTest,
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
doSuccessfulInitialSyncWithOneBatch();
}
@@ -4219,6 +4229,7 @@ TEST_F(InitialSyncerTest,
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -4552,6 +4563,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
// Skip clearing initial sync progress so that we can check initialSyncStatus fields after
// initial sync is complete.
@@ -4921,6 +4933,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgressForNetwork
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
// Skip clearing initial sync progress so that we can check initialSyncStatus fields after
// initial sync is complete.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index a943637c2e5..2feb1ed6b7b 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -88,6 +88,7 @@
#include "mongo/db/repl/update_position_args.h"
#include "mongo/db/repl/vote_requester.h"
#include "mongo/db/server_options.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/session/kill_sessions_local.h"
#include "mongo/db/session/session_catalog.h"
#include "mongo/db/shutdown_in_progress_quiesce_info.h"
@@ -534,6 +535,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(
}
tenant_migration_access_blocker::recoverTenantMigrationAccessBlockers(opCtx);
+ ServerlessOperationLockRegistry::recoverLocks(opCtx);
LOGV2(4280506, "Reconstructing prepared transactions");
reconstructPreparedTransactions(opCtx, OplogApplication::Mode::kRecovering);
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index 101ddcf0bb3..9f7f2e5863d 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -192,6 +192,8 @@ void ReplCoordTest::start() {
// Skip recovering user writes critical sections for the same reason as the above.
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ // Skip recovering of serverless mutual exclusion locks for the same reason as the above.
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
invariant(!_callShutdown);
// if we haven't initialized yet, do that first.
if (!_repl) {
diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp
index 5c0f74b225c..f2bf4fb78da 100644
--- a/src/mongo/db/repl/rollback_impl.cpp
+++ b/src/mongo/db/repl/rollback_impl.cpp
@@ -58,6 +58,7 @@
#include "mongo/db/repl/transaction_oplog_application.h"
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/db/server_recovery.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/session/kill_sessions_local.h"
#include "mongo/db/session/session_catalog_mongod.h"
#include "mongo/db/session/session_txn_record_gen.h"
@@ -651,6 +652,7 @@ void RollbackImpl::_runPhaseFromAbortToReconstructPreparedTxns(
_correctRecordStoreCounts(opCtx);
tenant_migration_access_blocker::recoverTenantMigrationAccessBlockers(opCtx);
+ ServerlessOperationLockRegistry::recoverLocks(opCtx);
// Reconstruct prepared transactions after counts have been adjusted. Since prepared
// transactions were aborted (i.e. the in-memory counts were rolled-back) before computing
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp
index 6f0fde3e5e6..c581366d9c2 100644
--- a/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp
+++ b/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp
@@ -85,7 +85,7 @@ void TenantMigrationAccessBlockerRegistry::add(StringData tenantId,
if (it != _tenantMigrationAccessBlockers.end()) {
auto existingMtab = it->second.getAccessBlocker(mtabType);
if (existingMtab) {
- tasserted(ErrorCodes::ConflictingOperationInProgress,
+ uasserted(ErrorCodes::ConflictingServerlessOperation,
str::stream() << "This node is already a "
<< (mtabType == MtabType::kDonor ? "donor" : "recipient")
<< " for tenantId \"" << tenantId << "\" with migrationId \""
@@ -117,7 +117,7 @@ void TenantMigrationAccessBlockerRegistry::addShardMergeDonorAccessBlocker(
_tenantMigrationAccessBlockers.begin(),
_tenantMigrationAccessBlockers.end(),
[](const auto& pair) { return pair.second.getAccessBlocker(MtabType::kDonor).get(); });
- tassert(6114105,
+ uassert(ErrorCodes::ConflictingServerlessOperation,
str::stream() << "Adding shard merge donor blocker when this node has another donor "
"blocker with migrationId \""
<< foundAccessBlocker->second.getAccessBlocker(MtabType::kDonor)
diff --git a/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp b/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp
index 264099e24cf..29ec527b2a9 100644
--- a/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/repl/tenant_migration_decoration.h"
#include "mongo/db/repl/tenant_migration_donor_op_observer.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/logv2/log.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
@@ -54,6 +55,10 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx,
const TenantMigrationDonorDocument& donorStateDoc) {
invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kAbortingIndexBuilds);
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .acquireLock(ServerlessOperationLockRegistry::LockType::kTenantDonor,
+ donorStateDoc.getId());
+
auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>(opCtx->getServiceContext(),
donorStateDoc.getId());
if (donorStateDoc.getProtocol().value_or(MigrationProtocolEnum::kMultitenantMigrations) ==
@@ -69,6 +74,9 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx,
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.remove(donorStateDoc.getTenantId(),
TenantMigrationAccessBlocker::BlockerType::kDonor);
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor,
+ donorStateDoc.getId());
});
}
} else {
@@ -85,6 +93,9 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx,
opCtx->recoveryUnit()->onRollback([opCtx, donorStateDoc] {
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.removeShardMergeDonorAccessBlocker(donorStateDoc.getId());
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor,
+ donorStateDoc.getId());
});
}
}
@@ -156,6 +167,10 @@ public:
void commit(boost::optional<Timestamp>) override {
if (_donorStateDoc.getExpireAt()) {
+ ServerlessOperationLockRegistry::get(_opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor,
+ _donorStateDoc.getId());
+
auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker(
_opCtx->getServiceContext(), _donorStateDoc.getTenantId());
@@ -339,6 +354,9 @@ repl::OpTime TenantMigrationDonorOpObserver::onDropCollection(OperationContext*
opCtx->recoveryUnit()->onCommit([opCtx](boost::optional<Timestamp>) {
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.removeAll(TenantMigrationAccessBlocker::BlockerType::kDonor);
+
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .onDropStateCollection(ServerlessOperationLockRegistry::LockType::kTenantDonor);
});
}
return {};
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index 1d3f4cdf1e2..d9c512059ff 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -515,7 +515,12 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertState
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
})
- .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); })
+ .until([&](StatusWith<repl::OpTime> swOpTime) {
+ if (swOpTime.getStatus().code() == ErrorCodes::ConflictingServerlessOperation) {
+ uassertStatusOK(swOpTime);
+ }
+ return swOpTime.getStatus().isOK();
+ })
.withBackoffBetweenIterations(kExponentialBackoff)
.on(**executor, token);
}
diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
index e2a047876bf..19f5f6ab71c 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/repl/tenant_migration_shard_merge_util.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
#include "mongo/db/repl/tenant_migration_util.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/logv2/log.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
@@ -179,6 +180,18 @@ void TenantMigrationRecipientOpObserver::onInserts(
std::vector<InsertStatement>::const_iterator first,
std::vector<InsertStatement>::const_iterator last,
bool fromMigrate) {
+ if (coll->ns() == NamespaceString::kTenantMigrationRecipientsNamespace &&
+ !tenant_migration_access_blocker::inRecoveryMode(opCtx)) {
+ for (auto it = first; it != last; it++) {
+ auto recipientStateDoc = TenantMigrationRecipientDocument::parse(
+ IDLParserContext("recipientStateDoc"), it->doc);
+ if (!recipientStateDoc.getExpireAt()) {
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .acquireLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient,
+ recipientStateDoc.getId());
+ }
+ }
+ }
if (!shard_merge_utils::isDonatedFilesCollection(coll->ns())) {
return;
@@ -204,6 +217,10 @@ void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx,
repl::TenantFileImporterService::get(opCtx->getServiceContext())
->interrupt(recipientStateDoc.getId());
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient,
+ recipientStateDoc.getId());
+
std::vector<std::string> tenantIdsToRemove;
auto cleanUpBlockerIfGarbage =
[&](std::string tenantId, std::shared_ptr<TenantMigrationAccessBlocker>& mtab) {
@@ -312,6 +329,9 @@ repl::OpTime TenantMigrationRecipientOpObserver::onDropCollection(
repl::TenantFileImporterService::get(opCtx->getServiceContext())->interruptAll();
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.removeAll(TenantMigrationAccessBlocker::BlockerType::kRecipient);
+
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .onDropStateCollection(ServerlessOperationLockRegistry::LockType::kTenantRecipient);
});
}
return {};
diff --git a/src/mongo/db/serverless/SConscript b/src/mongo/db/serverless/SConscript
index 3ccfd8ea7f7..82143897663 100644
--- a/src/mongo/db/serverless/SConscript
+++ b/src/mongo/db/serverless/SConscript
@@ -57,6 +57,21 @@ env.Library(
)
env.Library(
+ target='serverless_lock',
+ source=[
+ 'serverless_operation_lock_registry.cpp',
+ 'serverless_server_status.cpp',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/dbdirectclient',
+ '$BUILD_DIR/mongo/db/repl/tenant_migration_state_machine_idl',
+ '$BUILD_DIR/mongo/db/repl/tenant_migration_utils',
+ '$BUILD_DIR/mongo/db/server_base',
+ 'shard_split_state_machine',
+ ],
+)
+
+env.Library(
target='shard_split_donor_service',
source=[
'shard_split_donor_service.cpp',
@@ -77,6 +92,7 @@ env.Library(
'$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker',
'$BUILD_DIR/mongo/db/server_base',
'$BUILD_DIR/mongo/db/shard_role',
+ 'serverless_lock',
'shard_split_utils',
],
)
@@ -84,6 +100,7 @@ env.Library(
env.CppUnitTest(
target='db_serverless_test',
source=[
+ 'serverless_operation_lock_registry_test.cpp',
'shard_split_donor_op_observer_test.cpp',
'shard_split_donor_service_test.cpp',
'shard_split_utils_test.cpp',
@@ -97,6 +114,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/repl/replmocks',
'$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker',
'$BUILD_DIR/mongo/dbtests/mocklib',
+ 'serverless_lock',
'shard_split_donor_service',
'shard_split_utils',
],
diff --git a/src/mongo/db/serverless/serverless_operation_lock_registry.cpp b/src/mongo/db/serverless/serverless_operation_lock_registry.cpp
new file mode 100644
index 00000000000..2623320764a
--- /dev/null
+++ b/src/mongo/db/serverless/serverless_operation_lock_registry.cpp
@@ -0,0 +1,192 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
+#include "mongo/db/persistent_task_store.h"
+#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
+#include "mongo/db/serverless/shard_split_state_machine_gen.h"
+#include "mongo/logv2/log.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTenantMigration
+
+// Failpoint that will cause recoverLocks to return early.
+MONGO_FAIL_POINT_DEFINE(skipRecoverServerlessOperationLock);
+namespace mongo {
+
+const ServiceContext::Decoration<ServerlessOperationLockRegistry>
+ ServerlessOperationLockRegistry::get =
+ ServiceContext::declareDecoration<ServerlessOperationLockRegistry>();
+
+void ServerlessOperationLockRegistry::acquireLock(
+ ServerlessOperationLockRegistry::LockType lockType, const UUID& operationId) {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ // Verify there is no serverless operation in progress or it is the same type as the one
+ // acquiring the lock.
+ uassert(ErrorCodes::ConflictingServerlessOperation,
+ "Conflicting serverless operation in progress",
+ !_activeLockType || _activeLockType.get() == lockType);
+ invariant(_activeOperations.find(operationId) == _activeOperations.end(),
+ "Cannot acquire the serverless lock twice for the same operationId.");
+ _activeLockType = lockType;
+
+ _activeOperations.emplace(operationId);
+
+ LOGV2(6531500,
+ "Acquired serverless operation lock",
+ "type"_attr = lockType,
+ "id"_attr = operationId);
+}
+
+void ServerlessOperationLockRegistry::releaseLock(
+ ServerlessOperationLockRegistry::LockType lockType, const UUID& operationId) {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ invariant(_activeLockType && *_activeLockType == lockType,
+ "Cannot release a serverless lock that is not owned by the given lock type.");
+
+ invariant(_activeOperations.find(operationId) != _activeOperations.end(),
+ "Cannot release a serverless lock if the given operationId does not own the lock.");
+ _activeOperations.erase(operationId);
+
+ if (_activeOperations.empty()) {
+ _activeLockType.reset();
+ }
+
+ LOGV2(6531501,
+ "Released serverless operation lock",
+ "type"_attr = lockType,
+ "id"_attr = operationId);
+}
+
+void ServerlessOperationLockRegistry::onDropStateCollection(LockType lockType) {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ if (!_activeLockType || *_activeLockType != lockType) {
+ return;
+ }
+
+ LOGV2(6531505,
+ "Released all serverless locks due to state collection drop",
+ "type"_attr = lockType);
+
+ _activeLockType.reset();
+ _activeOperations.clear();
+}
+
+void ServerlessOperationLockRegistry::clear() {
+ stdx::lock_guard<Latch> lg(_mutex);
+ LOGV2(6531504,
+ "Releasing ServerlessMutualExclusionRegistry lock on shutdown",
+ "ns"_attr = _activeLockType);
+
+ _activeOperations.clear();
+ _activeLockType.reset();
+}
+
+void ServerlessOperationLockRegistry::recoverLocks(OperationContext* opCtx) {
+ if (skipRecoverServerlessOperationLock.shouldFail()) {
+ return;
+ }
+
+ auto& registry = ServerlessOperationLockRegistry::get(opCtx->getServiceContext());
+ registry.clear();
+
+ PersistentTaskStore<TenantMigrationDonorDocument> donorStore(
+ NamespaceString::kTenantMigrationDonorsNamespace);
+ donorStore.forEach(opCtx, {}, [&](const TenantMigrationDonorDocument& doc) {
+ // Do not acquire a lock for garbage-collectable documents
+ if (doc.getExpireAt()) {
+ return true;
+ }
+
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, doc.getId());
+
+ return true;
+ });
+
+ PersistentTaskStore<TenantMigrationRecipientDocument> recipientStore(
+ NamespaceString::kTenantMigrationRecipientsNamespace);
+ recipientStore.forEach(opCtx, {}, [&](const TenantMigrationRecipientDocument& doc) {
+ // Do not acquire a lock for garbage-collectable documents
+ if (doc.getExpireAt()) {
+ return true;
+ }
+
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient,
+ doc.getId());
+
+ return true;
+ });
+
+ PersistentTaskStore<ShardSplitDonorDocument> splitStore(
+ NamespaceString::kShardSplitDonorsNamespace);
+ splitStore.forEach(opCtx, {}, [&](const ShardSplitDonorDocument& doc) {
+ // Do not acquire a lock for garbage-collectable documents
+ if (doc.getExpireAt()) {
+ return true;
+ }
+
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, doc.getId());
+
+ return true;
+ });
+}
+
+const std::string kOperationLockFieldName = "operationLock";
+void ServerlessOperationLockRegistry::appendInfoForServerStatus(BSONObjBuilder* builder) const {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ if (!_activeLockType) {
+ builder->appendNull(kOperationLockFieldName);
+ return;
+ }
+
+ switch (_activeLockType.value()) {
+ case ServerlessOperationLockRegistry::LockType::kShardSplit:
+ builder->append(kOperationLockFieldName, "shard split donor");
+ break;
+ case ServerlessOperationLockRegistry::LockType::kTenantDonor:
+ builder->append(kOperationLockFieldName, "tenant migration donor");
+ break;
+ case ServerlessOperationLockRegistry::LockType::kTenantRecipient:
+ builder->append(kOperationLockFieldName, "tenant migration recipient");
+ break;
+ }
+}
+
+boost::optional<ServerlessOperationLockRegistry::LockType>
+ServerlessOperationLockRegistry::getActiveOperationType_forTest() {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ return _activeLockType;
+}
+
+
+} // namespace mongo
diff --git a/src/mongo/db/serverless/serverless_operation_lock_registry.h b/src/mongo/db/serverless/serverless_operation_lock_registry.h
new file mode 100644
index 00000000000..d9ac07393f4
--- /dev/null
+++ b/src/mongo/db/serverless/serverless_operation_lock_registry.h
@@ -0,0 +1,96 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/service_context.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/util/uuid.h"
+
+#include <set>
+
+namespace mongo {
+
+/**
+ * Registry to allow only one type of active serverless operation at a time. It allows multiple
+ * simultaneous operations of the same type.
+ */
+class ServerlessOperationLockRegistry {
+ ServerlessOperationLockRegistry(const ServerlessOperationLockRegistry&) = delete;
+ ServerlessOperationLockRegistry& operator=(const ServerlessOperationLockRegistry&) = delete;
+
+public:
+ ServerlessOperationLockRegistry() = default;
+
+ static const ServiceContext::Decoration<ServerlessOperationLockRegistry> get;
+
+ enum LockType { kShardSplit, kTenantDonor, kTenantRecipient };
+
+ /**
+ * Acquire the serverless lock for LockType and adds operationId to the set of
+ * instances tracked. Throws ConflictingOperationInProgress error if there is already an
+ * activeServerlessOperation in progress with a different namespace than operationNamespace.
+ */
+ void acquireLock(LockType lockType, const UUID& operationId);
+
+ /**
+ * If _activeOpSeverlessOperation matches LockType, removes the given operationId from
+ * the set of active instances and releases the lock if the set becomes empty. Invariant if
+ * lockType or operationId does not own the lock.
+ */
+ void releaseLock(LockType lockType, const UUID& operationId);
+
+ /**
+ * Called when a state document collection is dropped. If the collection's lockType currently
+ * holds the lock, it releases the lock. If it does not own the lock, the function does nothing.
+ */
+ void onDropStateCollection(LockType lockType);
+
+ void clear();
+
+ /**
+ * Scan serverless state documents and acquire the serverless mutual exclusion lock if needed.
+ */
+ static void recoverLocks(OperationContext* opCtx);
+
+ /**
+ * Appends the exclusion status to the BSONObjBuilder.
+ */
+ void appendInfoForServerStatus(BSONObjBuilder* builder) const;
+
+ boost::optional<ServerlessOperationLockRegistry::LockType> getActiveOperationType_forTest();
+
+private:
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("ServerlessMutualExclusionRegistry::_mutex");
+ boost::optional<LockType> _activeLockType;
+ std::set<UUID> _activeOperations;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/serverless/serverless_operation_lock_registry_test.cpp b/src/mongo/db/serverless/serverless_operation_lock_registry_test.cpp
new file mode 100644
index 00000000000..9d95b3b7bc7
--- /dev/null
+++ b/src/mongo/db/serverless/serverless_operation_lock_registry_test.cpp
@@ -0,0 +1,154 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
+#include "mongo/unittest/death_test.h"
+#include "mongo/unittest/log_test.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+
+TEST(ServerlessOperationLockRegistryTest, InsertRemoveOne) {
+ ServerlessOperationLockRegistry registry;
+
+ auto id = UUID::gen();
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+
+ ASSERT_FALSE(registry.getActiveOperationType_forTest());
+}
+
+DEATH_TEST(ServerlessOperationLockRegistryTest,
+ InsertSameIdTwice,
+ "Cannot acquire the serverless lock twice for the same operationId.") {
+ ServerlessOperationLockRegistry registry;
+
+ auto id = UUID::gen();
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+}
+
+TEST(ServerlessOperationLockRegistryTest, AcquireDifferentNamespaceFail) {
+ ServerlessOperationLockRegistry registry;
+
+ auto id = UUID::gen();
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+
+ ASSERT_THROWS_CODE(
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, UUID::gen()),
+ DBException,
+ ErrorCodes::ConflictingServerlessOperation);
+}
+
+DEATH_TEST(ServerlessOperationLockRegistryTest,
+ ReleaseDifferentNsTriggersInvariant,
+ "Cannot release a serverless lock that is not owned by the given lock type.") {
+ ServerlessOperationLockRegistry registry;
+
+ auto id = UUID::gen();
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ registry.releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, id);
+}
+
+
+DEATH_TEST(ServerlessOperationLockRegistryTest,
+ ReleaseDifferentIdTriggersInvariant,
+ "Cannot release a serverless lock if the given operationId does not own the lock.") {
+ ServerlessOperationLockRegistry registry;
+
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen());
+ registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen());
+}
+
+TEST(ServerlessOperationLockRegistryTest, ClearReleasesAllLocks) {
+ ServerlessOperationLockRegistry registry;
+
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen());
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen());
+
+ registry.clear();
+
+ // Verify the lock has been released.
+ ASSERT_FALSE(registry.getActiveOperationType_forTest());
+}
+
+TEST(ServerlessOperationLockRegistryTest, LockIsReleasedWhenAllInstanceAreRemoved) {
+ ServerlessOperationLockRegistry registry;
+
+ std::vector<UUID> ids;
+ for (int i = 0; i < 5; ++i) {
+ ids.push_back(UUID::gen());
+ }
+
+ for (auto& id : ids) {
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ }
+
+ // Verify the lock is held;
+ ASSERT_EQ(*registry.getActiveOperationType_forTest(),
+ ServerlessOperationLockRegistry::LockType::kShardSplit);
+
+
+ for (auto& id : ids) {
+ registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ }
+
+ // Verify the lock has been released.
+ ASSERT_FALSE(registry.getActiveOperationType_forTest());
+}
+
+TEST(ServerlessOperationLockRegistryTest, LockIsNotReleasedWhenNotAllInstanceAreRemoved) {
+ ServerlessOperationLockRegistry registry;
+
+ std::vector<UUID> ids;
+ for (int i = 0; i < 5; ++i) {
+ ids.push_back(UUID::gen());
+ }
+
+ for (auto& id : ids) {
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ }
+ // Add an additional id;
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen());
+
+ // Verify the lock is held;
+ ASSERT_EQ(*registry.getActiveOperationType_forTest(),
+ ServerlessOperationLockRegistry::LockType::kShardSplit);
+
+ for (auto& id : ids) {
+ registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ }
+
+ // Verify the lock is held;
+ ASSERT_EQ(*registry.getActiveOperationType_forTest(),
+ ServerlessOperationLockRegistry::LockType::kShardSplit);
+}
+
+
+} // namespace mongo
diff --git a/src/mongo/db/serverless/serverless_server_status.cpp b/src/mongo/db/serverless/serverless_server_status.cpp
new file mode 100644
index 00000000000..18bf376c6bb
--- /dev/null
+++ b/src/mongo/db/serverless/serverless_server_status.cpp
@@ -0,0 +1,57 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/commands/server_status.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
+
+namespace mongo {
+namespace {
+
+class ServerlessServerStatus final : public ServerStatusSection {
+public:
+ ServerlessServerStatus() : ServerStatusSection("serverless") {}
+
+ bool includeByDefault() const override {
+ return true;
+ }
+
+ BSONObj generateSection(OperationContext* opCtx,
+ const BSONElement& configElement) const override {
+ BSONObjBuilder result;
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .appendInfoForServerStatus(&result);
+ return result.obj();
+ }
+} serverlessServerStatus;
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
index 0e25639559f..0f62f732b9a 100644
--- a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/catalog_raii.h"
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/serverless/shard_split_donor_op_observer.h"
#include "mongo/db/serverless/shard_split_state_machine_gen.h"
#include "mongo/db/serverless/shard_split_utils.h"
@@ -48,6 +49,8 @@ bool isPrimary(const OperationContext* opCtx) {
const auto tenantIdsToDeleteDecoration =
OperationContext::declareDecoration<boost::optional<std::vector<std::string>>>();
+const auto shardSplitIdToDeleteDecoration =
+ OperationContext::declareDecoration<boost::optional<UUID>>();
ShardSplitDonorDocument parseAndValidateDonorDocument(const BSONObj& doc) {
auto donorStateDoc = ShardSplitDonorDocument::parse(IDLParserContext("donorStateDoc"), doc);
@@ -146,6 +149,9 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx,
invariant(donorStateDoc.getTenantIds());
invariant(donorStateDoc.getRecipientConnectionString());
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, donorStateDoc.getId());
+
auto tenantIds = *donorStateDoc.getTenantIds();
for (const auto& tenantId : tenantIds) {
auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>(opCtx->getServiceContext(),
@@ -157,11 +163,13 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx,
if (isPrimary(opCtx)) {
// onRollback is not registered on secondaries since secondaries should not fail to
// apply the write.
- opCtx->recoveryUnit()->onRollback([opCtx, tenantIds] {
+ opCtx->recoveryUnit()->onRollback([opCtx, tenantIds, id = donorStateDoc.getId()] {
for (const auto& tenantId : tenantIds) {
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kDonor);
}
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
});
}
}
@@ -251,6 +259,10 @@ public:
void commit(boost::optional<Timestamp>) override {
if (_donorStateDoc.getExpireAt()) {
+ ServerlessOperationLockRegistry::get(_opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit,
+ _donorStateDoc.getId());
+
if (_donorStateDoc.getTenantIds()) {
auto tenantIds = _donorStateDoc.getTenantIds().value();
for (auto&& tenantId : tenantIds) {
@@ -378,12 +390,13 @@ void ShardSplitDonorOpObserver::aboutToDelete(OperationContext* opCtx,
}
auto donorStateDoc = parseAndValidateDonorDocument(doc);
+ const bool shouldRemoveOnRecipient =
+ serverless::shouldRemoveStateDocumentOnRecipient(opCtx, donorStateDoc);
uassert(ErrorCodes::IllegalOperation,
str::stream() << "cannot delete a donor's state document " << doc
<< " since it has not been marked as garbage collectable and is not a"
<< " recipient garbage collectable.",
- donorStateDoc.getExpireAt() ||
- serverless::shouldRemoveStateDocumentOnRecipient(opCtx, donorStateDoc));
+ donorStateDoc.getExpireAt() || shouldRemoveOnRecipient);
// To support back-to-back split retries, when a split is aborted, we remove its
// TenantMigrationDonorAccessBlockers as soon as its donor state doc is marked as garbage
@@ -399,6 +412,10 @@ void ShardSplitDonorOpObserver::aboutToDelete(OperationContext* opCtx,
tenantIdsToDeleteDecoration(opCtx) = boost::make_optional(result);
}
+
+ if (shouldRemoveOnRecipient) {
+ shardSplitIdToDeleteDecoration(opCtx) = boost::make_optional(donorStateDoc.getId());
+ }
}
void ShardSplitDonorOpObserver::onDelete(OperationContext* opCtx,
@@ -421,6 +438,12 @@ void ShardSplitDonorOpObserver::onDelete(OperationContext* opCtx,
for (auto&& tenantId : *tenantIdsToDeleteDecoration(opCtx)) {
registry.remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kDonor);
}
+
+ const auto idToDelete = shardSplitIdToDeleteDecoration(opCtx);
+ if (idToDelete) {
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, *idToDelete);
+ }
});
}
@@ -433,6 +456,9 @@ repl::OpTime ShardSplitDonorOpObserver::onDropCollection(OperationContext* opCtx
opCtx->recoveryUnit()->onCommit([opCtx](boost::optional<Timestamp>) {
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.removeAll(TenantMigrationAccessBlocker::BlockerType::kDonor);
+
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .onDropStateCollection(ServerlessOperationLockRegistry::LockType::kShardSplit);
});
}
diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp
index 97c923524a3..f30ad593951 100644
--- a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/serverless/shard_split_donor_op_observer.h"
#include "mongo/db/serverless/shard_split_state_machine_gen.h"
#include "mongo/db/serverless/shard_split_test_utils.h"
@@ -446,7 +447,13 @@ TEST_F(ShardSplitDonorOpObserverTest, SetExpireAtForAbortedRemoveBlockers) {
ASSERT_FALSE(mtab);
};
+ ServerlessOperationLockRegistry::get(_opCtx->getServiceContext())
+ .acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, _uuid);
+
runUpdateTestCase(stateDocument, _tenantIds, mtabVerifier);
+
+ ASSERT_FALSE(ServerlessOperationLockRegistry::get(_opCtx->getServiceContext())
+ .getActiveOperationType_forTest());
}
TEST_F(ShardSplitDonorOpObserverTest, DeleteAbortedDocumentDoesNotRemoveBlockers) {
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp
index 4c3fdabb39f..97eecdae6d1 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service.cpp
@@ -1005,7 +1005,14 @@ ExecutorFuture<repl::OpTime> ShardSplitDonorService::DonorStateMachine::_updateS
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
})
- .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); })
+ .until([&](StatusWith<repl::OpTime> swOpTime) {
+ if (swOpTime.getStatus().code() == ErrorCodes::ConflictingServerlessOperation) {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ uassertStatusOK(swOpTime);
+ }
+ return swOpTime.getStatus().isOK();
+ })
.withBackoffBetweenIterations(kExponentialBackoff)
.on(**executor, token);
}
@@ -1094,6 +1101,11 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState(
"Entering 'aborted' state.",
"id"_attr = _migrationId,
"abortReason"_attr = _abortReason.value());
+
+ if (_abortReason->code() == ErrorCodes::ConflictingServerlessOperation) {
+ return ExecutorFuture(**executor,
+ DurableState{ShardSplitDonorStateEnum::kAborted, _abortReason});
+ }
}
return ExecutorFuture<void>(**executor)
@@ -1113,7 +1125,8 @@ ExecutorFuture<void>
ShardSplitDonorService::DonorStateMachine::_waitForForgetCmdThenMarkGarbageCollectable(
const ScopedTaskExecutorPtr& executor, const CancellationToken& primaryToken) {
stdx::lock_guard<Latch> lg(_mutex);
- if (_stateDoc.getExpireAt()) {
+ if (_stateDoc.getExpireAt() ||
+ (_abortReason && _abortReason->code() == ErrorCodes::ConflictingServerlessOperation)) {
return ExecutorFuture(**executor);
}
diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
index 743f64669c8..646a11244dd 100644
--- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
#include "mongo/db/repl/tenant_migration_donor_access_blocker.h"
#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/serverless/shard_split_donor_op_observer.h"
#include "mongo/db/serverless/shard_split_donor_service.h"
#include "mongo/db/serverless/shard_split_state_machine_gen.h"
@@ -511,6 +512,11 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation)
waitForReplSetStepUp(Status(ErrorCodes::OK, ""));
waitForRecipientPrimaryMajorityWrite();
+ // Verify the serverless lock has been acquired for split.
+ auto& registry = ServerlessOperationLockRegistry::get(opCtx->getServiceContext());
+ ASSERT_EQ(*registry.getActiveOperationType_forTest(),
+ ServerlessOperationLockRegistry::LockType::kShardSplit);
+
auto result = serviceInstance->decisionFuture().get();
ASSERT_TRUE(hasActiveSplitForTenants(opCtx.get(), _tenantIds));
ASSERT(!result.abortReason);
@@ -520,10 +526,34 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation)
auto completionFuture = serviceInstance->completionFuture();
completionFuture.wait();
+ // The lock has been released.
+ ASSERT_FALSE(registry.getActiveOperationType_forTest());
+
ASSERT_OK(serviceInstance->completionFuture().getNoThrow());
ASSERT_TRUE(serviceInstance->isGarbageCollectable());
}
+TEST_F(ShardSplitDonorServiceTest, ShardSplitFailsWhenLockIsHeld) {
+ auto opCtx = makeOperationContext();
+ test::shard_split::reconfigToAddRecipientNodes(
+ getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts());
+
+ auto& registry = ServerlessOperationLockRegistry::get(opCtx->getServiceContext());
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient, UUID::gen());
+
+ // Create and start the instance.
+ auto serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate(
+ opCtx.get(), _service, defaultStateDocument().toBSON());
+ ASSERT(serviceInstance.get());
+
+ auto decisionFuture = serviceInstance->decisionFuture();
+
+ auto result = decisionFuture.get();
+ ASSERT_EQ(result.state, ShardSplitDonorStateEnum::kAborted);
+ ASSERT(result.abortReason);
+ ASSERT_EQ(result.abortReason->code(), ErrorCodes::ConflictingServerlessOperation);
+}
+
TEST_F(ShardSplitDonorServiceTest, ReplSetStepUpRetryable) {
auto opCtx = makeOperationContext();
test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get());
@@ -1015,6 +1045,10 @@ public:
stateDocument.setState(ShardSplitDonorStateEnum::kBlocking);
stateDocument.setRecipientConnectionString(ConnectionString::forLocal());
+ ServerlessOperationLockRegistry::get(getServiceContext())
+ .acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit,
+ stateDocument.getId());
+
return stateDocument;
}
};