summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/SConscript5
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp2
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp13
-rw-r--r--src/mongo/db/repl/primary_only_service.cpp10
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp2
-rw-r--r--src/mongo/db/repl/rollback_impl.cpp2
-rw-r--r--src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp4
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_op_observer.cpp18
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp65
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp20
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp16
-rw-r--r--src/mongo/db/serverless/SConscript18
-rw-r--r--src/mongo/db/serverless/serverless_operation_lock_registry.cpp192
-rw-r--r--src/mongo/db/serverless/serverless_operation_lock_registry.h96
-rw-r--r--src/mongo/db/serverless/serverless_operation_lock_registry_test.cpp154
-rw-r--r--src/mongo/db/serverless/serverless_server_status.cpp57
-rw-r--r--src/mongo/db/serverless/shard_split_commands.cpp2
-rw-r--r--src/mongo/db/serverless/shard_split_donor_op_observer.cpp32
-rw-r--r--src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp7
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.cpp16
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service_test.cpp32
22 files changed, 751 insertions, 14 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 54bb5718fa1..f4c2d2d9447 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -548,6 +548,7 @@ env.Library(
'$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker',
'$BUILD_DIR/mongo/db/s/sharding_runtime_d',
'$BUILD_DIR/mongo/db/server_base',
+ '$BUILD_DIR/mongo/db/serverless/serverless_lock',
'$BUILD_DIR/mongo/db/session/kill_sessions_local',
'$BUILD_DIR/mongo/db/session/session_catalog_mongod',
'$BUILD_DIR/mongo/db/storage/historical_ident_tracker',
@@ -763,6 +764,7 @@ env.Library(
'$BUILD_DIR/mongo/db/commands/test_commands_enabled',
'$BUILD_DIR/mongo/db/index_builds_coordinator_interface',
'$BUILD_DIR/mongo/db/server_base',
+ '$BUILD_DIR/mongo/db/serverless/serverless_lock',
'$BUILD_DIR/mongo/db/session/session_catalog',
'$BUILD_DIR/mongo/db/storage/journal_flusher',
'delayable_timeout_callback',
@@ -1245,6 +1247,7 @@ env.Library(
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/index_builds_coordinator_interface',
'$BUILD_DIR/mongo/db/server_base',
+ '$BUILD_DIR/mongo/db/serverless/serverless_lock',
'$BUILD_DIR/mongo/db/session/session_catalog_mongod',
'$BUILD_DIR/mongo/executor/scoped_task_executor',
'repl_server_parameters',
@@ -1413,6 +1416,7 @@ env.Library(
'$BUILD_DIR/mongo/db/multitenancy',
'$BUILD_DIR/mongo/db/ops/write_ops_exec',
'$BUILD_DIR/mongo/db/pipeline/process_interface/mongo_process_interface',
+ '$BUILD_DIR/mongo/db/serverless/serverless_lock',
'$BUILD_DIR/mongo/db/session/session_catalog_mongod',
'$BUILD_DIR/mongo/db/storage/wiredtiger/storage_wiredtiger_import',
'$BUILD_DIR/mongo/db/transaction/transaction',
@@ -1485,6 +1489,7 @@ env.Library(
"$BUILD_DIR/mongo/db/catalog/local_oplog_info",
"$BUILD_DIR/mongo/db/concurrency/exception_util",
"$BUILD_DIR/mongo/db/index_builds_coordinator_interface",
+ "$BUILD_DIR/mongo/db/serverless/serverless_lock",
],
)
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 361e6aabe1f..cd562d19af5 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -63,6 +63,7 @@
#include "mongo/db/repl/sync_source_selector.h"
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
#include "mongo/db/repl/transaction_oplog_application.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/session/session_txn_record_gen.h"
#include "mongo/executor/task_executor.h"
#include "mongo/executor/thread_pool_task_executor.h"
@@ -577,6 +578,7 @@ void InitialSyncer::_tearDown_inlock(OperationContext* opCtx,
_storage->oplogDiskLocRegister(opCtx, initialDataTimestamp, orderedCommit);
tenant_migration_access_blocker::recoverTenantMigrationAccessBlockers(opCtx);
+ ServerlessOperationLockRegistry::recoverLocks(opCtx);
reconstructPreparedTransactions(opCtx, repl::OplogApplication::Mode::kInitialSync);
_replicationProcess->getConsistencyMarkers()->setInitialSyncIdIfNotSet(opCtx);
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index e98b65b9d76..8fde4801165 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -2049,6 +2049,7 @@ TEST_F(
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
// Start the real work.
ASSERT_OK(initialSyncer->startup(opCtx.get(), initialSyncMaxAttempts));
@@ -2091,6 +2092,8 @@ TEST_F(InitialSyncerTest,
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
+
{
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
@@ -2199,6 +2202,7 @@ TEST_F(InitialSyncerTest,
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -2270,6 +2274,7 @@ TEST_F(
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -2581,6 +2586,7 @@ TEST_F(InitialSyncerTest, InitialSyncerRetriesLastOplogEntryFetcherNetworkError)
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -3225,6 +3231,8 @@ TEST_F(InitialSyncerTest, InitialSyncerHandlesNetworkErrorsFromRollbackCheckerAf
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
+
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -3539,6 +3547,7 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -4204,6 +4213,7 @@ TEST_F(InitialSyncerTest,
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
doSuccessfulInitialSyncWithOneBatch();
}
@@ -4219,6 +4229,7 @@ TEST_F(InitialSyncerTest,
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
auto initialSyncer = &getInitialSyncer();
auto opCtx = makeOpCtx();
@@ -4552,6 +4563,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
// Skip clearing initial sync progress so that we can check initialSyncStatus fields after
// initial sync is complete.
@@ -4921,6 +4933,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgressForNetwork
"skipRecoverTenantMigrationAccessBlockers");
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
// Skip clearing initial sync progress so that we can check initialSyncStatus fields after
// initial sync is complete.
diff --git a/src/mongo/db/repl/primary_only_service.cpp b/src/mongo/db/repl/primary_only_service.cpp
index c15f88c3df0..da06257acdf 100644
--- a/src/mongo/db/repl/primary_only_service.cpp
+++ b/src/mongo/db/repl/primary_only_service.cpp
@@ -777,6 +777,16 @@ std::shared_ptr<PrimaryOnlyService::Instance> PrimaryOnlyService::_insertNewInst
return instance->run(std::move(scopedExecutor), std::move(token));
})
+ // TODO SERVER-61717 remove this error handler once instance are automatically released
+ // at the end of run()
+ .onError<ErrorCodes::ConflictingServerlessOperation>([this, instanceID](Status status) {
+ LOGV2(6531507,
+ "Removing instance due to ConflictingServerlessOperation error",
+ "instanceID"_attr = instanceID);
+ releaseInstance(instanceID, Status::OK());
+
+ return status;
+ })
.semi();
auto [it, inserted] = _activeInstances.try_emplace(
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index a943637c2e5..2feb1ed6b7b 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -88,6 +88,7 @@
#include "mongo/db/repl/update_position_args.h"
#include "mongo/db/repl/vote_requester.h"
#include "mongo/db/server_options.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/session/kill_sessions_local.h"
#include "mongo/db/session/session_catalog.h"
#include "mongo/db/shutdown_in_progress_quiesce_info.h"
@@ -534,6 +535,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(
}
tenant_migration_access_blocker::recoverTenantMigrationAccessBlockers(opCtx);
+ ServerlessOperationLockRegistry::recoverLocks(opCtx);
LOGV2(4280506, "Reconstructing prepared transactions");
reconstructPreparedTransactions(opCtx, OplogApplication::Mode::kRecovering);
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index 101ddcf0bb3..9f7f2e5863d 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -192,6 +192,8 @@ void ReplCoordTest::start() {
// Skip recovering user writes critical sections for the same reason as the above.
FailPointEnableBlock skipRecoverUserWriteCriticalSections(
"skipRecoverUserWriteCriticalSections");
+ // Skip recovering of serverless mutual exclusion locks for the same reason as the above.
+ FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock");
invariant(!_callShutdown);
// if we haven't initialized yet, do that first.
if (!_repl) {
diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp
index fa3d51489a8..2fff5e05a99 100644
--- a/src/mongo/db/repl/rollback_impl.cpp
+++ b/src/mongo/db/repl/rollback_impl.cpp
@@ -58,6 +58,7 @@
#include "mongo/db/repl/transaction_oplog_application.h"
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/db/server_recovery.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/session/kill_sessions_local.h"
#include "mongo/db/session/session_catalog_mongod.h"
#include "mongo/db/session/session_txn_record_gen.h"
@@ -652,6 +653,7 @@ void RollbackImpl::_runPhaseFromAbortToReconstructPreparedTxns(
_correctRecordStoreCounts(opCtx);
tenant_migration_access_blocker::recoverTenantMigrationAccessBlockers(opCtx);
+ ServerlessOperationLockRegistry::recoverLocks(opCtx);
// Reconstruct prepared transactions after counts have been adjusted. Since prepared
// transactions were aborted (i.e. the in-memory counts were rolled-back) before computing
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp
index 85e3f012bf0..360c0599db6 100644
--- a/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp
+++ b/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp
@@ -88,7 +88,7 @@ void TenantMigrationAccessBlockerRegistry::add(StringData tenantId,
if (it != _tenantMigrationAccessBlockers.end()) {
auto existingMtab = it->second.getAccessBlocker(mtabType);
if (existingMtab) {
- tasserted(ErrorCodes::ConflictingOperationInProgress,
+ uasserted(ErrorCodes::ConflictingServerlessOperation,
str::stream() << "This node is already a "
<< (mtabType == MtabType::kDonor ? "donor" : "recipient")
<< " for tenantId \"" << tenantId << "\" with migrationId \""
@@ -121,7 +121,7 @@ void TenantMigrationAccessBlockerRegistry::add(std::shared_ptr<TenantMigrationAc
std::find_if(_tenantMigrationAccessBlockers.begin(),
_tenantMigrationAccessBlockers.end(),
[](const auto& pair) { return pair.second.getDonorAccessBlocker().get(); });
- tassert(6114105,
+ uassert(ErrorCodes::ConflictingServerlessOperation,
str::stream()
<< "Trying to add donor blocker for all tenants when this node already has a donor "
"blocker for \""
diff --git a/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp b/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp
index 491f6e15753..65f765cf94e 100644
--- a/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/repl/tenant_migration_decoration.h"
#include "mongo/db/repl/tenant_migration_donor_op_observer.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/logv2/log.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
@@ -54,6 +55,10 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx,
const TenantMigrationDonorDocument& donorStateDoc) {
invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kAbortingIndexBuilds);
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .acquireLock(ServerlessOperationLockRegistry::LockType::kTenantDonor,
+ donorStateDoc.getId());
+
auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>(opCtx->getServiceContext(),
donorStateDoc.getId());
if (donorStateDoc.getProtocol().value_or(MigrationProtocolEnum::kMultitenantMigrations) ==
@@ -69,6 +74,9 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx,
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.remove(donorStateDoc.getTenantId(),
TenantMigrationAccessBlocker::BlockerType::kDonor);
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor,
+ donorStateDoc.getId());
});
}
} else {
@@ -85,6 +93,9 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx,
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.removeAccessBlockersForMigration(
donorStateDoc.getId(), TenantMigrationAccessBlocker::BlockerType::kDonor);
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor,
+ donorStateDoc.getId());
});
}
}
@@ -155,6 +166,10 @@ public:
void commit(OperationContext* opCtx, boost::optional<Timestamp>) override {
if (_donorStateDoc.getExpireAt()) {
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor,
+ _donorStateDoc.getId());
+
auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker(
opCtx->getServiceContext(), _donorStateDoc.getTenantId());
@@ -338,6 +353,9 @@ repl::OpTime TenantMigrationDonorOpObserver::onDropCollection(OperationContext*
opCtx->recoveryUnit()->onCommit([opCtx](boost::optional<Timestamp>) {
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.removeAll(TenantMigrationAccessBlocker::BlockerType::kDonor);
+
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .onDropStateCollection(ServerlessOperationLockRegistry::LockType::kTenantDonor);
});
}
return {};
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index 1d3f4cdf1e2..0da1de32c4e 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -123,7 +123,6 @@ void checkForTokenInterrupt(const CancellationToken& token) {
uassert(ErrorCodes::CallbackCanceled, "Donor service interrupted", !token.isCanceled());
}
-
template <class Promise>
void setPromiseFromStatusIfNotReady(WithLock lk, Promise& promise, Status status) {
if (promise.getFuture().isReady()) {
@@ -155,6 +154,17 @@ void setPromiseOkIfNotReady(WithLock lk, Promise& promise) {
promise.emplaceValue();
}
+bool isNotDurableAndServerlessConflict(WithLock lk, SharedPromise<void>& promise) {
+ auto future = promise.getFuture();
+
+ if (!future.isReady() ||
+ future.getNoThrow().code() != ErrorCodes::ConflictingServerlessOperation) {
+ return false;
+ }
+
+ return true;
+}
+
} // namespace
void TenantMigrationDonorService::checkIfConflictsWithOtherInstances(
@@ -515,7 +525,16 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertState
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
})
- .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); })
+ .until([&](StatusWith<repl::OpTime> swOpTime) {
+ if (swOpTime.getStatus().code() == ErrorCodes::ConflictingServerlessOperation) {
+ LOGV2(6531508,
+ "Tenant migration completed due to serverless lock error",
+ "id"_attr = _migrationUuid,
+ "status"_attr = swOpTime.getStatus());
+ uassertStatusOK(swOpTime);
+ }
+ return swOpTime.getStatus().isOK();
+ })
.withBackoffBetweenIterations(kExponentialBackoff)
.on(**executor, token);
}
@@ -950,6 +969,8 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
->incTotalMigrationDonationsCommitted();
}
}
+
+ return Status::OK();
})
.then([this, self = shared_from_this(), executor, token, recipientTargeterRS] {
return _waitForForgetMigrationThenMarkMigrationGarbageCollectable(
@@ -977,6 +998,13 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
"tenantId"_attr = _tenantId,
"status"_attr = status,
"abortReason"_attr = _abortReason);
+
+ // If a ConflictingServerlessOperation was thrown during the initial insertion we do not
+ // have a state document. In that case return the error to PrimaryOnlyService so it
+ // frees the instance from its map.
+ if (isNotDurableAndServerlessConflict(lg, _initialDonorStateDurablePromise)) {
+ uassertStatusOK(_initialDonorStateDurablePromise.getFuture().getNoThrow());
+ }
})
.semi();
}
@@ -1363,7 +1391,6 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_handleErrorOrEnterA
checkForTokenInterrupt(token);
{
- stdx::lock_guard<Latch> lg(_mutex);
if (_stateDoc.getState() == TenantMigrationDonorStateEnum::kAborted) {
// The migration was resumed on stepup and it was already aborted.
return ExecutorFuture(**executor);
@@ -1420,6 +1447,21 @@ TenantMigrationDonorService::Instance::_waitForForgetMigrationThenMarkMigrationG
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
const CancellationToken& token) {
+ const bool skipWaitingForForget = [&]() {
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (!isNotDurableAndServerlessConflict(lg, _initialDonorStateDurablePromise)) {
+ return false;
+ }
+ setPromiseErrorIfNotReady(lg,
+ _receiveDonorForgetMigrationPromise,
+ _initialDonorStateDurablePromise.getFuture().getNoThrow());
+ return true;
+ }();
+
+ if (skipWaitingForForget) {
+ return ExecutorFuture(**executor);
+ }
+
LOGV2(6104909,
"Waiting to receive 'donorForgetMigration' command.",
"migrationId"_attr = _migrationUuid,
@@ -1445,6 +1487,16 @@ TenantMigrationDonorService::Instance::_waitForForgetMigrationThenMarkMigrationG
return std::move(_receiveDonorForgetMigrationPromise.getFuture())
.thenRunOn(**executor)
.then([this, self = shared_from_this(), executor, recipientTargeterRS, token] {
+ {
+ // If the abortReason is ConflictingServerlessOperation, it means there are no
+ // document on the recipient. Do not send the forget command.
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (_abortReason &&
+ _abortReason->code() == ErrorCodes::ConflictingServerlessOperation) {
+ return ExecutorFuture(**executor);
+ }
+ }
+
LOGV2(6104910,
"Waiting for recipientForgetMigration response.",
"migrationId"_attr = _migrationUuid,
@@ -1487,6 +1539,12 @@ TenantMigrationDonorService::Instance::_waitForForgetMigrationThenMarkMigrationG
ExecutorFuture<void>
TenantMigrationDonorService::Instance::_waitForGarbageCollectionDelayThenDeleteStateDoc(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancellationToken& token) {
+ // If the state document was not inserted due to a conflicting serverless operation, do not
+ // try to delete it.
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (isNotDurableAndServerlessConflict(lg, _initialDonorStateDurablePromise)) {
+ return ExecutorFuture(**executor);
+ }
LOGV2(8423362,
"Waiting for garbage collection delay before deleting state document",
@@ -1494,7 +1552,6 @@ TenantMigrationDonorService::Instance::_waitForGarbageCollectionDelayThenDeleteS
"tenantId"_attr = _tenantId,
"expireAt"_attr = *_stateDoc.getExpireAt());
- stdx::lock_guard<Latch> lg(_mutex);
return (*executor)
->sleepUntil(*_stateDoc.getExpireAt(), token)
.then([this, self = shared_from_this(), executor, token]() {
diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
index e2a047876bf..19f5f6ab71c 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/repl/tenant_migration_shard_merge_util.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
#include "mongo/db/repl/tenant_migration_util.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/logv2/log.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
@@ -179,6 +180,18 @@ void TenantMigrationRecipientOpObserver::onInserts(
std::vector<InsertStatement>::const_iterator first,
std::vector<InsertStatement>::const_iterator last,
bool fromMigrate) {
+ if (coll->ns() == NamespaceString::kTenantMigrationRecipientsNamespace &&
+ !tenant_migration_access_blocker::inRecoveryMode(opCtx)) {
+ for (auto it = first; it != last; it++) {
+ auto recipientStateDoc = TenantMigrationRecipientDocument::parse(
+ IDLParserContext("recipientStateDoc"), it->doc);
+ if (!recipientStateDoc.getExpireAt()) {
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .acquireLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient,
+ recipientStateDoc.getId());
+ }
+ }
+ }
if (!shard_merge_utils::isDonatedFilesCollection(coll->ns())) {
return;
@@ -204,6 +217,10 @@ void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx,
repl::TenantFileImporterService::get(opCtx->getServiceContext())
->interrupt(recipientStateDoc.getId());
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient,
+ recipientStateDoc.getId());
+
std::vector<std::string> tenantIdsToRemove;
auto cleanUpBlockerIfGarbage =
[&](std::string tenantId, std::shared_ptr<TenantMigrationAccessBlocker>& mtab) {
@@ -312,6 +329,9 @@ repl::OpTime TenantMigrationRecipientOpObserver::onDropCollection(
repl::TenantFileImporterService::get(opCtx->getServiceContext())->interruptAll();
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.removeAll(TenantMigrationAccessBlocker::BlockerType::kRecipient);
+
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .onDropStateCollection(ServerlessOperationLockRegistry::LockType::kTenantRecipient);
});
}
return {};
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 869670c6ca3..ab7e7ef12e1 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -2968,7 +2968,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
// Handle recipientForgetMigration.
stdx::lock_guard lk(_mutex);
if (_stateDoc.getExpireAt() ||
- MONGO_unlikely(autoRecipientForgetMigration.shouldFail())) {
+ MONGO_unlikely(autoRecipientForgetMigration.shouldFail()) ||
+ status.code() == ErrorCodes::ConflictingServerlessOperation) {
// Skip waiting for the recipientForgetMigration command.
setPromiseOkifNotReady(lk, _receivedRecipientForgetMigrationPromise);
}
@@ -3018,7 +3019,16 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
// is safe even on shutDown/stepDown.
stdx::lock_guard lk(_mutex);
invariant(_dataSyncCompletionPromise.getFuture().isReady());
- if (!status.isOK()) {
+
+ if (status.code() == ErrorCodes::ConflictingServerlessOperation) {
+ LOGV2(6531506,
+ "Migration failed as another serverless operation was in progress",
+ "migrationId"_attr = getMigrationUUID(),
+ "tenantId"_attr = getTenantId(),
+ "status"_attr = status);
+ setPromiseOkifNotReady(lk, _forgetMigrationDurablePromise);
+ return status;
+ } else if (!status.isOK()) {
// We should only hit here on a stepDown/shutDown, or a 'conflicting migration'
// error.
LOGV2(4881402,
@@ -3029,6 +3039,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
setPromiseErrorifNotReady(lk, _forgetMigrationDurablePromise, status);
}
_taskState.setState(TaskState::kDone);
+
+ return Status::OK();
})
.semi();
}
diff --git a/src/mongo/db/serverless/SConscript b/src/mongo/db/serverless/SConscript
index 3ccfd8ea7f7..82143897663 100644
--- a/src/mongo/db/serverless/SConscript
+++ b/src/mongo/db/serverless/SConscript
@@ -57,6 +57,21 @@ env.Library(
)
env.Library(
+ target='serverless_lock',
+ source=[
+ 'serverless_operation_lock_registry.cpp',
+ 'serverless_server_status.cpp',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/dbdirectclient',
+ '$BUILD_DIR/mongo/db/repl/tenant_migration_state_machine_idl',
+ '$BUILD_DIR/mongo/db/repl/tenant_migration_utils',
+ '$BUILD_DIR/mongo/db/server_base',
+ 'shard_split_state_machine',
+ ],
+)
+
+env.Library(
target='shard_split_donor_service',
source=[
'shard_split_donor_service.cpp',
@@ -77,6 +92,7 @@ env.Library(
'$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker',
'$BUILD_DIR/mongo/db/server_base',
'$BUILD_DIR/mongo/db/shard_role',
+ 'serverless_lock',
'shard_split_utils',
],
)
@@ -84,6 +100,7 @@ env.Library(
env.CppUnitTest(
target='db_serverless_test',
source=[
+ 'serverless_operation_lock_registry_test.cpp',
'shard_split_donor_op_observer_test.cpp',
'shard_split_donor_service_test.cpp',
'shard_split_utils_test.cpp',
@@ -97,6 +114,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/repl/replmocks',
'$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker',
'$BUILD_DIR/mongo/dbtests/mocklib',
+ 'serverless_lock',
'shard_split_donor_service',
'shard_split_utils',
],
diff --git a/src/mongo/db/serverless/serverless_operation_lock_registry.cpp b/src/mongo/db/serverless/serverless_operation_lock_registry.cpp
new file mode 100644
index 00000000000..20a02c6cd15
--- /dev/null
+++ b/src/mongo/db/serverless/serverless_operation_lock_registry.cpp
@@ -0,0 +1,192 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
+#include "mongo/db/persistent_task_store.h"
+#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
+#include "mongo/db/serverless/shard_split_state_machine_gen.h"
+#include "mongo/logv2/log.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTenantMigration
+
+// Failpoint that will cause recoverLocks to return early.
+MONGO_FAIL_POINT_DEFINE(skipRecoverServerlessOperationLock);
+namespace mongo {
+
+const ServiceContext::Decoration<ServerlessOperationLockRegistry>
+ ServerlessOperationLockRegistry::get =
+ ServiceContext::declareDecoration<ServerlessOperationLockRegistry>();
+
+void ServerlessOperationLockRegistry::acquireLock(
+ ServerlessOperationLockRegistry::LockType lockType, const UUID& operationId) {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ // Verify there is no serverless operation in progress or it is the same type as the one
+ // acquiring the lock.
+ uassert(ErrorCodes::ConflictingServerlessOperation,
+ "Conflicting serverless operation in progress",
+ !_activeLockType || _activeLockType.get() == lockType);
+ invariant(_activeOperations.find(operationId) == _activeOperations.end(),
+ "Cannot acquire the serverless lock twice for the same operationId.");
+ _activeLockType = lockType;
+
+ _activeOperations.emplace(operationId);
+
+ LOGV2(6531500,
+ "Acquired serverless operation lock",
+ "type"_attr = lockType,
+ "id"_attr = operationId);
+}
+
+void ServerlessOperationLockRegistry::releaseLock(
+ ServerlessOperationLockRegistry::LockType lockType, const UUID& operationId) {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ invariant(_activeLockType && *_activeLockType == lockType,
+ "Cannot release a serverless lock that is not owned by the given lock type.");
+
+ invariant(_activeOperations.find(operationId) != _activeOperations.end(),
+ "Cannot release a serverless lock if the given operationId does not own the lock.");
+ _activeOperations.erase(operationId);
+
+ if (_activeOperations.empty()) {
+ _activeLockType.reset();
+ }
+
+ LOGV2(6531501,
+ "Released serverless operation lock",
+ "type"_attr = lockType,
+ "id"_attr = operationId);
+}
+
+void ServerlessOperationLockRegistry::onDropStateCollection(LockType lockType) {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ if (!_activeLockType || *_activeLockType != lockType) {
+ return;
+ }
+
+ LOGV2(6531505,
+ "Released all serverless locks due to state collection drop",
+ "type"_attr = lockType);
+
+ _activeLockType.reset();
+ _activeOperations.clear();
+}
+
+void ServerlessOperationLockRegistry::clear() {
+ stdx::lock_guard<Latch> lg(_mutex);
+ LOGV2(6531504,
+ "Clearing serverless operation lock registry on shutdown",
+ "ns"_attr = _activeLockType);
+
+ _activeOperations.clear();
+ _activeLockType.reset();
+}
+
+void ServerlessOperationLockRegistry::recoverLocks(OperationContext* opCtx) {
+ if (skipRecoverServerlessOperationLock.shouldFail()) {
+ return;
+ }
+
+ auto& registry = ServerlessOperationLockRegistry::get(opCtx->getServiceContext());
+ registry.clear();
+
+ PersistentTaskStore<TenantMigrationDonorDocument> donorStore(
+ NamespaceString::kTenantMigrationDonorsNamespace);
+ donorStore.forEach(opCtx, {}, [&](const TenantMigrationDonorDocument& doc) {
+ // Do not acquire a lock for garbage-collectable documents
+ if (doc.getExpireAt()) {
+ return true;
+ }
+
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, doc.getId());
+
+ return true;
+ });
+
+ PersistentTaskStore<TenantMigrationRecipientDocument> recipientStore(
+ NamespaceString::kTenantMigrationRecipientsNamespace);
+ recipientStore.forEach(opCtx, {}, [&](const TenantMigrationRecipientDocument& doc) {
+ // Do not acquire a lock for garbage-collectable documents
+ if (doc.getExpireAt()) {
+ return true;
+ }
+
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient,
+ doc.getId());
+
+ return true;
+ });
+
+ PersistentTaskStore<ShardSplitDonorDocument> splitStore(
+ NamespaceString::kShardSplitDonorsNamespace);
+ splitStore.forEach(opCtx, {}, [&](const ShardSplitDonorDocument& doc) {
+ // Do not acquire a lock for garbage-collectable documents
+ if (doc.getExpireAt()) {
+ return true;
+ }
+
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, doc.getId());
+
+ return true;
+ });
+}
+
+const std::string kOperationLockFieldName = "operationLock";
+void ServerlessOperationLockRegistry::appendInfoForServerStatus(BSONObjBuilder* builder) const {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ if (!_activeLockType) {
+ builder->append(kOperationLockFieldName, 0);
+ return;
+ }
+
+ switch (_activeLockType.value()) {
+ case ServerlessOperationLockRegistry::LockType::kShardSplit:
+ builder->append(kOperationLockFieldName, 1);
+ break;
+ case ServerlessOperationLockRegistry::LockType::kTenantDonor:
+ builder->append(kOperationLockFieldName, 2);
+ break;
+ case ServerlessOperationLockRegistry::LockType::kTenantRecipient:
+ builder->append(kOperationLockFieldName, 3);
+ break;
+ }
+}
+
+boost::optional<ServerlessOperationLockRegistry::LockType>
+ServerlessOperationLockRegistry::getActiveOperationType_forTest() {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ return _activeLockType;
+}
+
+
+} // namespace mongo
diff --git a/src/mongo/db/serverless/serverless_operation_lock_registry.h b/src/mongo/db/serverless/serverless_operation_lock_registry.h
new file mode 100644
index 00000000000..d9ac07393f4
--- /dev/null
+++ b/src/mongo/db/serverless/serverless_operation_lock_registry.h
@@ -0,0 +1,96 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/service_context.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/util/uuid.h"
+
+#include <set>
+
+namespace mongo {
+
+/**
+ * Registry to allow only one type of active serverless operation at a time. It allows multiple
+ * simultaneous operations of the same type.
+ */
+class ServerlessOperationLockRegistry {
+ ServerlessOperationLockRegistry(const ServerlessOperationLockRegistry&) = delete;
+ ServerlessOperationLockRegistry& operator=(const ServerlessOperationLockRegistry&) = delete;
+
+public:
+ ServerlessOperationLockRegistry() = default;
+
+ static const ServiceContext::Decoration<ServerlessOperationLockRegistry> get;
+
+ enum LockType { kShardSplit, kTenantDonor, kTenantRecipient };
+
+ /**
+ * Acquire the serverless lock for LockType and adds operationId to the set of
+ * instances tracked. Throws ConflictingOperationInProgress error if there is already an
+ * activeServerlessOperation in progress with a different namespace than operationNamespace.
+ */
+ void acquireLock(LockType lockType, const UUID& operationId);
+
+ /**
+ * If _activeOpSeverlessOperation matches LockType, removes the given operationId from
+ * the set of active instances and releases the lock if the set becomes empty. Invariant if
+ * lockType or operationId does not own the lock.
+ */
+ void releaseLock(LockType lockType, const UUID& operationId);
+
+ /**
+ * Called when a state document collection is dropped. If the collection's lockType currently
+ * holds the lock, it releases the lock. If it does not own the lock, the function does nothing.
+ */
+ void onDropStateCollection(LockType lockType);
+
+ void clear();
+
+ /**
+ * Scan serverless state documents and acquire the serverless mutual exclusion lock if needed.
+ */
+ static void recoverLocks(OperationContext* opCtx);
+
+ /**
+ * Appends the exclusion status to the BSONObjBuilder.
+ */
+ void appendInfoForServerStatus(BSONObjBuilder* builder) const;
+
+ boost::optional<ServerlessOperationLockRegistry::LockType> getActiveOperationType_forTest();
+
+private:
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("ServerlessMutualExclusionRegistry::_mutex");
+ boost::optional<LockType> _activeLockType;
+ std::set<UUID> _activeOperations;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/serverless/serverless_operation_lock_registry_test.cpp b/src/mongo/db/serverless/serverless_operation_lock_registry_test.cpp
new file mode 100644
index 00000000000..9d95b3b7bc7
--- /dev/null
+++ b/src/mongo/db/serverless/serverless_operation_lock_registry_test.cpp
@@ -0,0 +1,154 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
+#include "mongo/unittest/death_test.h"
+#include "mongo/unittest/log_test.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+
+TEST(ServerlessOperationLockRegistryTest, InsertRemoveOne) {
+ ServerlessOperationLockRegistry registry;
+
+ auto id = UUID::gen();
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+
+ ASSERT_FALSE(registry.getActiveOperationType_forTest());
+}
+
+DEATH_TEST(ServerlessOperationLockRegistryTest,
+ InsertSameIdTwice,
+ "Cannot acquire the serverless lock twice for the same operationId.") {
+ ServerlessOperationLockRegistry registry;
+
+ auto id = UUID::gen();
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+}
+
+TEST(ServerlessOperationLockRegistryTest, AcquireDifferentNamespaceFail) {
+ ServerlessOperationLockRegistry registry;
+
+ auto id = UUID::gen();
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+
+ ASSERT_THROWS_CODE(
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, UUID::gen()),
+ DBException,
+ ErrorCodes::ConflictingServerlessOperation);
+}
+
+DEATH_TEST(ServerlessOperationLockRegistryTest,
+ ReleaseDifferentNsTriggersInvariant,
+ "Cannot release a serverless lock that is not owned by the given lock type.") {
+ ServerlessOperationLockRegistry registry;
+
+ auto id = UUID::gen();
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ registry.releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, id);
+}
+
+
+DEATH_TEST(ServerlessOperationLockRegistryTest,
+ ReleaseDifferentIdTriggersInvariant,
+ "Cannot release a serverless lock if the given operationId does not own the lock.") {
+ ServerlessOperationLockRegistry registry;
+
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen());
+ registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen());
+}
+
+TEST(ServerlessOperationLockRegistryTest, ClearReleasesAllLocks) {
+ ServerlessOperationLockRegistry registry;
+
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen());
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen());
+
+ registry.clear();
+
+ // Verify the lock has been released.
+ ASSERT_FALSE(registry.getActiveOperationType_forTest());
+}
+
+TEST(ServerlessOperationLockRegistryTest, LockIsReleasedWhenAllInstanceAreRemoved) {
+ ServerlessOperationLockRegistry registry;
+
+ std::vector<UUID> ids;
+ for (int i = 0; i < 5; ++i) {
+ ids.push_back(UUID::gen());
+ }
+
+ for (auto& id : ids) {
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ }
+
+ // Verify the lock is held;
+ ASSERT_EQ(*registry.getActiveOperationType_forTest(),
+ ServerlessOperationLockRegistry::LockType::kShardSplit);
+
+
+ for (auto& id : ids) {
+ registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ }
+
+ // Verify the lock has been released.
+ ASSERT_FALSE(registry.getActiveOperationType_forTest());
+}
+
+TEST(ServerlessOperationLockRegistryTest, LockIsNotReleasedWhenNotAllInstanceAreRemoved) {
+ ServerlessOperationLockRegistry registry;
+
+ std::vector<UUID> ids;
+ for (int i = 0; i < 5; ++i) {
+ ids.push_back(UUID::gen());
+ }
+
+ for (auto& id : ids) {
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ }
+ // Add an additional id;
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen());
+
+ // Verify the lock is held;
+ ASSERT_EQ(*registry.getActiveOperationType_forTest(),
+ ServerlessOperationLockRegistry::LockType::kShardSplit);
+
+ for (auto& id : ids) {
+ registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id);
+ }
+
+ // Verify the lock is held;
+ ASSERT_EQ(*registry.getActiveOperationType_forTest(),
+ ServerlessOperationLockRegistry::LockType::kShardSplit);
+}
+
+
+} // namespace mongo
diff --git a/src/mongo/db/serverless/serverless_server_status.cpp b/src/mongo/db/serverless/serverless_server_status.cpp
new file mode 100644
index 00000000000..8d0d4658dc3
--- /dev/null
+++ b/src/mongo/db/serverless/serverless_server_status.cpp
@@ -0,0 +1,57 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/commands/server_status.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
+
+namespace mongo {
+namespace {
+
+class ServerlessServerStatus final : public ServerStatusSection {
+public:
+ ServerlessServerStatus() : ServerStatusSection("serverless") {}
+
+ bool includeByDefault() const override {
+ return false;
+ }
+
+ BSONObj generateSection(OperationContext* opCtx,
+ const BSONElement& configElement) const override {
+ BSONObjBuilder result;
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .appendInfoForServerStatus(&result);
+ return result.obj();
+ }
+} serverlessServerStatus;
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/serverless/shard_split_commands.cpp b/src/mongo/db/serverless/shard_split_commands.cpp
index b0f0e6c6cab..5ce6c9c0307 100644
--- a/src/mongo/db/serverless/shard_split_commands.cpp
+++ b/src/mongo/db/serverless/shard_split_commands.cpp
@@ -114,7 +114,7 @@ public:
};
std::string help() const {
- return "Start an opereation to split a shard into its own slice.";
+ return "Start an operation to split a shard into its own slice.";
}
bool adminOnly() const override {
diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
index 2d67495c431..b2470d07854 100644
--- a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/catalog_raii.h"
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/serverless/shard_split_donor_op_observer.h"
#include "mongo/db/serverless/shard_split_state_machine_gen.h"
#include "mongo/db/serverless/shard_split_utils.h"
@@ -48,6 +49,8 @@ bool isPrimary(const OperationContext* opCtx) {
const auto tenantIdsToDeleteDecoration =
OperationContext::declareDecoration<boost::optional<std::vector<std::string>>>();
+const auto shardSplitIdToDeleteDecoration =
+ OperationContext::declareDecoration<boost::optional<UUID>>();
ShardSplitDonorDocument parseAndValidateDonorDocument(const BSONObj& doc) {
auto donorStateDoc = ShardSplitDonorDocument::parse(IDLParserContext("donorStateDoc"), doc);
@@ -146,6 +149,9 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx,
invariant(donorStateDoc.getTenantIds());
invariant(donorStateDoc.getRecipientConnectionString());
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, donorStateDoc.getId());
+
auto tenantIds = *donorStateDoc.getTenantIds();
for (const auto& tenantId : tenantIds) {
auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>(opCtx->getServiceContext(),
@@ -157,11 +163,13 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx,
if (isPrimary(opCtx)) {
// onRollback is not registered on secondaries since secondaries should not fail to
// apply the write.
- opCtx->recoveryUnit()->onRollback([opCtx, tenantIds] {
+ opCtx->recoveryUnit()->onRollback([opCtx, tenantIds, migrationId = donorStateDoc.getId()] {
for (const auto& tenantId : tenantIds) {
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kDonor);
}
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, migrationId);
});
}
}
@@ -250,6 +258,10 @@ public:
void commit(OperationContext* opCtx, boost::optional<Timestamp>) override {
if (_donorStateDoc.getExpireAt()) {
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit,
+ _donorStateDoc.getId());
+
if (_donorStateDoc.getTenantIds()) {
auto tenantIds = _donorStateDoc.getTenantIds().value();
for (auto&& tenantId : tenantIds) {
@@ -376,12 +388,13 @@ void ShardSplitDonorOpObserver::aboutToDelete(OperationContext* opCtx,
}
auto donorStateDoc = parseAndValidateDonorDocument(doc);
+ const bool shouldRemoveOnRecipient =
+ serverless::shouldRemoveStateDocumentOnRecipient(opCtx, donorStateDoc);
uassert(ErrorCodes::IllegalOperation,
str::stream() << "cannot delete a donor's state document " << doc
<< " since it has not been marked as garbage collectable and is not a"
<< " recipient garbage collectable.",
- donorStateDoc.getExpireAt() ||
- serverless::shouldRemoveStateDocumentOnRecipient(opCtx, donorStateDoc));
+ donorStateDoc.getExpireAt() || shouldRemoveOnRecipient);
// To support back-to-back split retries, when a split is aborted, we remove its
// TenantMigrationDonorAccessBlockers as soon as its donor state doc is marked as garbage
@@ -397,6 +410,10 @@ void ShardSplitDonorOpObserver::aboutToDelete(OperationContext* opCtx,
tenantIdsToDeleteDecoration(opCtx) = boost::make_optional(result);
}
+
+ if (shouldRemoveOnRecipient) {
+ shardSplitIdToDeleteDecoration(opCtx) = boost::make_optional(donorStateDoc.getId());
+ }
}
void ShardSplitDonorOpObserver::onDelete(OperationContext* opCtx,
@@ -419,6 +436,12 @@ void ShardSplitDonorOpObserver::onDelete(OperationContext* opCtx,
for (auto&& tenantId : *tenantIdsToDeleteDecoration(opCtx)) {
registry.remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kDonor);
}
+
+ const auto idToDelete = shardSplitIdToDeleteDecoration(opCtx);
+ if (idToDelete) {
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, *idToDelete);
+ }
});
}
@@ -431,6 +454,9 @@ repl::OpTime ShardSplitDonorOpObserver::onDropCollection(OperationContext* opCtx
opCtx->recoveryUnit()->onCommit([opCtx](boost::optional<Timestamp>) {
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.removeAll(TenantMigrationAccessBlocker::BlockerType::kDonor);
+
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .onDropStateCollection(ServerlessOperationLockRegistry::LockType::kShardSplit);
});
}
diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp
index 97c923524a3..f30ad593951 100644
--- a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/serverless/shard_split_donor_op_observer.h"
#include "mongo/db/serverless/shard_split_state_machine_gen.h"
#include "mongo/db/serverless/shard_split_test_utils.h"
@@ -446,7 +447,13 @@ TEST_F(ShardSplitDonorOpObserverTest, SetExpireAtForAbortedRemoveBlockers) {
ASSERT_FALSE(mtab);
};
+ ServerlessOperationLockRegistry::get(_opCtx->getServiceContext())
+ .acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, _uuid);
+
runUpdateTestCase(stateDocument, _tenantIds, mtabVerifier);
+
+ ASSERT_FALSE(ServerlessOperationLockRegistry::get(_opCtx->getServiceContext())
+ .getActiveOperationType_forTest());
}
TEST_F(ShardSplitDonorOpObserverTest, DeleteAbortedDocumentDoesNotRemoveBlockers) {
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp
index 4c3fdabb39f..fa977ae0c7d 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service.cpp
@@ -1005,7 +1005,18 @@ ExecutorFuture<repl::OpTime> ShardSplitDonorService::DonorStateMachine::_updateS
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
})
- .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); })
+ .until([&](StatusWith<repl::OpTime> swOpTime) {
+ if (swOpTime.getStatus().code() == ErrorCodes::ConflictingServerlessOperation) {
+ LOGV2(6531509,
+ "Shard split completed due to serverless lock error",
+ "id"_attr = _migrationId,
+ "status"_attr = swOpTime.getStatus());
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ uassertStatusOK(swOpTime);
+ }
+ return swOpTime.getStatus().isOK();
+ })
.withBackoffBetweenIterations(kExponentialBackoff)
.on(**executor, token);
}
@@ -1067,7 +1078,8 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState(
}
}
- if (ErrorCodes::isNotPrimaryError(status) || ErrorCodes::isShutdownError(status)) {
+ if (ErrorCodes::isNotPrimaryError(status) || ErrorCodes::isShutdownError(status) ||
+ status.code() == ErrorCodes::ConflictingServerlessOperation) {
// Don't abort the split on retriable errors that may have been generated by the local
// server shutting/stepping down because it can be resumed when the client retries.
return ExecutorFuture(**executor, StatusWith<DurableState>{status});
diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
index 463031b66bc..f46c908059b 100644
--- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
#include "mongo/db/repl/tenant_migration_donor_access_blocker.h"
#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/db/serverless/shard_split_donor_op_observer.h"
#include "mongo/db/serverless/shard_split_donor_service.h"
#include "mongo/db/serverless/shard_split_state_machine_gen.h"
@@ -511,6 +512,11 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation)
waitForReplSetStepUp(Status(ErrorCodes::OK, ""));
waitForRecipientPrimaryMajorityWrite();
+ // Verify the serverless lock has been acquired for split.
+ auto& registry = ServerlessOperationLockRegistry::get(opCtx->getServiceContext());
+ ASSERT_EQ(*registry.getActiveOperationType_forTest(),
+ ServerlessOperationLockRegistry::LockType::kShardSplit);
+
auto result = serviceInstance->decisionFuture().get();
ASSERT_TRUE(hasActiveSplitForTenants(opCtx.get(), _tenantIds));
ASSERT(!result.abortReason);
@@ -520,10 +526,32 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation)
auto completionFuture = serviceInstance->completionFuture();
completionFuture.wait();
+ // The lock has been released.
+ ASSERT_FALSE(registry.getActiveOperationType_forTest());
+
ASSERT_OK(serviceInstance->completionFuture().getNoThrow());
ASSERT_TRUE(serviceInstance->isGarbageCollectable());
}
+TEST_F(ShardSplitDonorServiceTest, ShardSplitFailsWhenLockIsHeld) {
+ auto opCtx = makeOperationContext();
+ test::shard_split::reconfigToAddRecipientNodes(
+ getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts());
+
+ auto& registry = ServerlessOperationLockRegistry::get(opCtx->getServiceContext());
+ registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient, UUID::gen());
+
+ // Create and start the instance.
+ auto serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate(
+ opCtx.get(), _service, defaultStateDocument().toBSON());
+ ASSERT(serviceInstance.get());
+
+ auto decisionFuture = serviceInstance->decisionFuture();
+
+ auto result = decisionFuture.getNoThrow();
+ ASSERT_EQ(result.getStatus().code(), ErrorCodes::ConflictingServerlessOperation);
+}
+
TEST_F(ShardSplitDonorServiceTest, ReplSetStepUpRetryable) {
auto opCtx = makeOperationContext();
test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get());
@@ -1015,6 +1043,10 @@ public:
stateDocument.setState(ShardSplitDonorStateEnum::kBlocking);
stateDocument.setRecipientConnectionString(ConnectionString::forLocal());
+ ServerlessOperationLockRegistry::get(getServiceContext())
+ .acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit,
+ stateDocument.getId());
+
return stateDocument;
}
};