diff options
Diffstat (limited to 'src/mongo/db')
22 files changed, 751 insertions, 14 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 54bb5718fa1..f4c2d2d9447 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -548,6 +548,7 @@ env.Library( '$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker', '$BUILD_DIR/mongo/db/s/sharding_runtime_d', '$BUILD_DIR/mongo/db/server_base', + '$BUILD_DIR/mongo/db/serverless/serverless_lock', '$BUILD_DIR/mongo/db/session/kill_sessions_local', '$BUILD_DIR/mongo/db/session/session_catalog_mongod', '$BUILD_DIR/mongo/db/storage/historical_ident_tracker', @@ -763,6 +764,7 @@ env.Library( '$BUILD_DIR/mongo/db/commands/test_commands_enabled', '$BUILD_DIR/mongo/db/index_builds_coordinator_interface', '$BUILD_DIR/mongo/db/server_base', + '$BUILD_DIR/mongo/db/serverless/serverless_lock', '$BUILD_DIR/mongo/db/session/session_catalog', '$BUILD_DIR/mongo/db/storage/journal_flusher', 'delayable_timeout_callback', @@ -1245,6 +1247,7 @@ env.Library( LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/index_builds_coordinator_interface', '$BUILD_DIR/mongo/db/server_base', + '$BUILD_DIR/mongo/db/serverless/serverless_lock', '$BUILD_DIR/mongo/db/session/session_catalog_mongod', '$BUILD_DIR/mongo/executor/scoped_task_executor', 'repl_server_parameters', @@ -1413,6 +1416,7 @@ env.Library( '$BUILD_DIR/mongo/db/multitenancy', '$BUILD_DIR/mongo/db/ops/write_ops_exec', '$BUILD_DIR/mongo/db/pipeline/process_interface/mongo_process_interface', + '$BUILD_DIR/mongo/db/serverless/serverless_lock', '$BUILD_DIR/mongo/db/session/session_catalog_mongod', '$BUILD_DIR/mongo/db/storage/wiredtiger/storage_wiredtiger_import', '$BUILD_DIR/mongo/db/transaction/transaction', @@ -1485,6 +1489,7 @@ env.Library( "$BUILD_DIR/mongo/db/catalog/local_oplog_info", "$BUILD_DIR/mongo/db/concurrency/exception_util", "$BUILD_DIR/mongo/db/index_builds_coordinator_interface", + "$BUILD_DIR/mongo/db/serverless/serverless_lock", ], ) diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index 361e6aabe1f..cd562d19af5 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -63,6 +63,7 @@ #include "mongo/db/repl/sync_source_selector.h" #include "mongo/db/repl/tenant_migration_access_blocker_util.h" #include "mongo/db/repl/transaction_oplog_application.h" +#include "mongo/db/serverless/serverless_operation_lock_registry.h" #include "mongo/db/session/session_txn_record_gen.h" #include "mongo/executor/task_executor.h" #include "mongo/executor/thread_pool_task_executor.h" @@ -577,6 +578,7 @@ void InitialSyncer::_tearDown_inlock(OperationContext* opCtx, _storage->oplogDiskLocRegister(opCtx, initialDataTimestamp, orderedCommit); tenant_migration_access_blocker::recoverTenantMigrationAccessBlockers(opCtx); + ServerlessOperationLockRegistry::recoverLocks(opCtx); reconstructPreparedTransactions(opCtx, repl::OplogApplication::Mode::kInitialSync); _replicationProcess->getConsistencyMarkers()->setInitialSyncIdIfNotSet(opCtx); diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index e98b65b9d76..8fde4801165 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -2049,6 +2049,7 @@ TEST_F( "skipRecoverTenantMigrationAccessBlockers"); FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); // Start the real work. ASSERT_OK(initialSyncer->startup(opCtx.get(), initialSyncMaxAttempts)); @@ -2091,6 +2092,8 @@ TEST_F(InitialSyncerTest, "skipRecoverTenantMigrationAccessBlockers"); FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); + { executor::NetworkInterfaceMock::InNetworkGuard guard(net); @@ -2199,6 +2202,7 @@ TEST_F(InitialSyncerTest, "skipRecoverTenantMigrationAccessBlockers"); FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); @@ -2270,6 +2274,7 @@ TEST_F( "skipRecoverTenantMigrationAccessBlockers"); FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); @@ -2581,6 +2586,7 @@ TEST_F(InitialSyncerTest, InitialSyncerRetriesLastOplogEntryFetcherNetworkError) "skipRecoverTenantMigrationAccessBlockers"); FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); @@ -3225,6 +3231,8 @@ TEST_F(InitialSyncerTest, InitialSyncerHandlesNetworkErrorsFromRollbackCheckerAf "skipRecoverTenantMigrationAccessBlockers"); FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); + auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); @@ -3539,6 +3547,7 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter "skipRecoverTenantMigrationAccessBlockers"); FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); @@ -4204,6 +4213,7 @@ TEST_F(InitialSyncerTest, "skipRecoverTenantMigrationAccessBlockers"); FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); doSuccessfulInitialSyncWithOneBatch(); } @@ -4219,6 +4229,7 @@ TEST_F(InitialSyncerTest, "skipRecoverTenantMigrationAccessBlockers"); FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); @@ -4552,6 +4563,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { "skipRecoverTenantMigrationAccessBlockers"); FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); // Skip clearing initial sync progress so that we can check initialSyncStatus fields after // initial sync is complete. @@ -4921,6 +4933,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgressForNetwork "skipRecoverTenantMigrationAccessBlockers"); FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); // Skip clearing initial sync progress so that we can check initialSyncStatus fields after // initial sync is complete. diff --git a/src/mongo/db/repl/primary_only_service.cpp b/src/mongo/db/repl/primary_only_service.cpp index c15f88c3df0..da06257acdf 100644 --- a/src/mongo/db/repl/primary_only_service.cpp +++ b/src/mongo/db/repl/primary_only_service.cpp @@ -777,6 +777,16 @@ std::shared_ptr<PrimaryOnlyService::Instance> PrimaryOnlyService::_insertNewInst return instance->run(std::move(scopedExecutor), std::move(token)); }) + // TODO SERVER-61717 remove this error handler once instance are automatically released + // at the end of run() + .onError<ErrorCodes::ConflictingServerlessOperation>([this, instanceID](Status status) { + LOGV2(6531507, + "Removing instance due to ConflictingServerlessOperation error", + "instanceID"_attr = instanceID); + releaseInstance(instanceID, Status::OK()); + + return status; + }) .semi(); auto [it, inserted] = _activeInstances.try_emplace( diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index a943637c2e5..2feb1ed6b7b 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -88,6 +88,7 @@ #include "mongo/db/repl/update_position_args.h" #include "mongo/db/repl/vote_requester.h" #include "mongo/db/server_options.h" +#include "mongo/db/serverless/serverless_operation_lock_registry.h" #include "mongo/db/session/kill_sessions_local.h" #include "mongo/db/session/session_catalog.h" #include "mongo/db/shutdown_in_progress_quiesce_info.h" @@ -534,6 +535,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig( } tenant_migration_access_blocker::recoverTenantMigrationAccessBlockers(opCtx); + ServerlessOperationLockRegistry::recoverLocks(opCtx); LOGV2(4280506, "Reconstructing prepared transactions"); reconstructPreparedTransactions(opCtx, OplogApplication::Mode::kRecovering); diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index 101ddcf0bb3..9f7f2e5863d 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -192,6 +192,8 @@ void ReplCoordTest::start() { // Skip recovering user writes critical sections for the same reason as the above. FailPointEnableBlock skipRecoverUserWriteCriticalSections( "skipRecoverUserWriteCriticalSections"); + // Skip recovering of serverless mutual exclusion locks for the same reason as the above. + FailPointEnableBlock skipRecoverServerlessOperationLock("skipRecoverServerlessOperationLock"); invariant(!_callShutdown); // if we haven't initialized yet, do that first. if (!_repl) { diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp index fa3d51489a8..2fff5e05a99 100644 --- a/src/mongo/db/repl/rollback_impl.cpp +++ b/src/mongo/db/repl/rollback_impl.cpp @@ -58,6 +58,7 @@ #include "mongo/db/repl/transaction_oplog_application.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/db/server_recovery.h" +#include "mongo/db/serverless/serverless_operation_lock_registry.h" #include "mongo/db/session/kill_sessions_local.h" #include "mongo/db/session/session_catalog_mongod.h" #include "mongo/db/session/session_txn_record_gen.h" @@ -652,6 +653,7 @@ void RollbackImpl::_runPhaseFromAbortToReconstructPreparedTxns( _correctRecordStoreCounts(opCtx); tenant_migration_access_blocker::recoverTenantMigrationAccessBlockers(opCtx); + ServerlessOperationLockRegistry::recoverLocks(opCtx); // Reconstruct prepared transactions after counts have been adjusted. Since prepared // transactions were aborted (i.e. the in-memory counts were rolled-back) before computing diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp index 85e3f012bf0..360c0599db6 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp +++ b/src/mongo/db/repl/tenant_migration_access_blocker_registry.cpp @@ -88,7 +88,7 @@ void TenantMigrationAccessBlockerRegistry::add(StringData tenantId, if (it != _tenantMigrationAccessBlockers.end()) { auto existingMtab = it->second.getAccessBlocker(mtabType); if (existingMtab) { - tasserted(ErrorCodes::ConflictingOperationInProgress, + uasserted(ErrorCodes::ConflictingServerlessOperation, str::stream() << "This node is already a " << (mtabType == MtabType::kDonor ? "donor" : "recipient") << " for tenantId \"" << tenantId << "\" with migrationId \"" @@ -121,7 +121,7 @@ void TenantMigrationAccessBlockerRegistry::add(std::shared_ptr<TenantMigrationAc std::find_if(_tenantMigrationAccessBlockers.begin(), _tenantMigrationAccessBlockers.end(), [](const auto& pair) { return pair.second.getDonorAccessBlocker().get(); }); - tassert(6114105, + uassert(ErrorCodes::ConflictingServerlessOperation, str::stream() << "Trying to add donor blocker for all tenants when this node already has a donor " "blocker for \"" diff --git a/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp b/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp index 491f6e15753..65f765cf94e 100644 --- a/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp @@ -34,6 +34,7 @@ #include "mongo/db/repl/tenant_migration_decoration.h" #include "mongo/db/repl/tenant_migration_donor_op_observer.h" #include "mongo/db/repl/tenant_migration_state_machine_gen.h" +#include "mongo/db/serverless/serverless_operation_lock_registry.h" #include "mongo/logv2/log.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication @@ -54,6 +55,10 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx, const TenantMigrationDonorDocument& donorStateDoc) { invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kAbortingIndexBuilds); + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .acquireLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, + donorStateDoc.getId()); + auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>(opCtx->getServiceContext(), donorStateDoc.getId()); if (donorStateDoc.getProtocol().value_or(MigrationProtocolEnum::kMultitenantMigrations) == @@ -69,6 +74,9 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx, TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .remove(donorStateDoc.getTenantId(), TenantMigrationAccessBlocker::BlockerType::kDonor); + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, + donorStateDoc.getId()); }); } } else { @@ -85,6 +93,9 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx, TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .removeAccessBlockersForMigration( donorStateDoc.getId(), TenantMigrationAccessBlocker::BlockerType::kDonor); + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, + donorStateDoc.getId()); }); } } @@ -155,6 +166,10 @@ public: void commit(OperationContext* opCtx, boost::optional<Timestamp>) override { if (_donorStateDoc.getExpireAt()) { + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, + _donorStateDoc.getId()); + auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker( opCtx->getServiceContext(), _donorStateDoc.getTenantId()); @@ -338,6 +353,9 @@ repl::OpTime TenantMigrationDonorOpObserver::onDropCollection(OperationContext* opCtx->recoveryUnit()->onCommit([opCtx](boost::optional<Timestamp>) { TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .removeAll(TenantMigrationAccessBlocker::BlockerType::kDonor); + + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .onDropStateCollection(ServerlessOperationLockRegistry::LockType::kTenantDonor); }); } return {}; diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index 1d3f4cdf1e2..0da1de32c4e 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -123,7 +123,6 @@ void checkForTokenInterrupt(const CancellationToken& token) { uassert(ErrorCodes::CallbackCanceled, "Donor service interrupted", !token.isCanceled()); } - template <class Promise> void setPromiseFromStatusIfNotReady(WithLock lk, Promise& promise, Status status) { if (promise.getFuture().isReady()) { @@ -155,6 +154,17 @@ void setPromiseOkIfNotReady(WithLock lk, Promise& promise) { promise.emplaceValue(); } +bool isNotDurableAndServerlessConflict(WithLock lk, SharedPromise<void>& promise) { + auto future = promise.getFuture(); + + if (!future.isReady() || + future.getNoThrow().code() != ErrorCodes::ConflictingServerlessOperation) { + return false; + } + + return true; +} + } // namespace void TenantMigrationDonorService::checkIfConflictsWithOtherInstances( @@ -515,7 +525,16 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertState return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); }) - .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); }) + .until([&](StatusWith<repl::OpTime> swOpTime) { + if (swOpTime.getStatus().code() == ErrorCodes::ConflictingServerlessOperation) { + LOGV2(6531508, + "Tenant migration completed due to serverless lock error", + "id"_attr = _migrationUuid, + "status"_attr = swOpTime.getStatus()); + uassertStatusOK(swOpTime); + } + return swOpTime.getStatus().isOK(); + }) .withBackoffBetweenIterations(kExponentialBackoff) .on(**executor, token); } @@ -950,6 +969,8 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( ->incTotalMigrationDonationsCommitted(); } } + + return Status::OK(); }) .then([this, self = shared_from_this(), executor, token, recipientTargeterRS] { return _waitForForgetMigrationThenMarkMigrationGarbageCollectable( @@ -977,6 +998,13 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( "tenantId"_attr = _tenantId, "status"_attr = status, "abortReason"_attr = _abortReason); + + // If a ConflictingServerlessOperation was thrown during the initial insertion we do not + // have a state document. In that case return the error to PrimaryOnlyService so it + // frees the instance from its map. + if (isNotDurableAndServerlessConflict(lg, _initialDonorStateDurablePromise)) { + uassertStatusOK(_initialDonorStateDurablePromise.getFuture().getNoThrow()); + } }) .semi(); } @@ -1363,7 +1391,6 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_handleErrorOrEnterA checkForTokenInterrupt(token); { - stdx::lock_guard<Latch> lg(_mutex); if (_stateDoc.getState() == TenantMigrationDonorStateEnum::kAborted) { // The migration was resumed on stepup and it was already aborted. return ExecutorFuture(**executor); @@ -1420,6 +1447,21 @@ TenantMigrationDonorService::Instance::_waitForForgetMigrationThenMarkMigrationG const std::shared_ptr<executor::ScopedTaskExecutor>& executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, const CancellationToken& token) { + const bool skipWaitingForForget = [&]() { + stdx::lock_guard<Latch> lg(_mutex); + if (!isNotDurableAndServerlessConflict(lg, _initialDonorStateDurablePromise)) { + return false; + } + setPromiseErrorIfNotReady(lg, + _receiveDonorForgetMigrationPromise, + _initialDonorStateDurablePromise.getFuture().getNoThrow()); + return true; + }(); + + if (skipWaitingForForget) { + return ExecutorFuture(**executor); + } + LOGV2(6104909, "Waiting to receive 'donorForgetMigration' command.", "migrationId"_attr = _migrationUuid, @@ -1445,6 +1487,16 @@ TenantMigrationDonorService::Instance::_waitForForgetMigrationThenMarkMigrationG return std::move(_receiveDonorForgetMigrationPromise.getFuture()) .thenRunOn(**executor) .then([this, self = shared_from_this(), executor, recipientTargeterRS, token] { + { + // If the abortReason is ConflictingServerlessOperation, it means there are no + // document on the recipient. Do not send the forget command. + stdx::lock_guard<Latch> lg(_mutex); + if (_abortReason && + _abortReason->code() == ErrorCodes::ConflictingServerlessOperation) { + return ExecutorFuture(**executor); + } + } + LOGV2(6104910, "Waiting for recipientForgetMigration response.", "migrationId"_attr = _migrationUuid, @@ -1487,6 +1539,12 @@ TenantMigrationDonorService::Instance::_waitForForgetMigrationThenMarkMigrationG ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForGarbageCollectionDelayThenDeleteStateDoc( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancellationToken& token) { + // If the state document was not inserted due to a conflicting serverless operation, do not + // try to delete it. + stdx::lock_guard<Latch> lg(_mutex); + if (isNotDurableAndServerlessConflict(lg, _initialDonorStateDurablePromise)) { + return ExecutorFuture(**executor); + } LOGV2(8423362, "Waiting for garbage collection delay before deleting state document", @@ -1494,7 +1552,6 @@ TenantMigrationDonorService::Instance::_waitForGarbageCollectionDelayThenDeleteS "tenantId"_attr = _tenantId, "expireAt"_attr = *_stateDoc.getExpireAt()); - stdx::lock_guard<Latch> lg(_mutex); return (*executor) ->sleepUntil(*_stateDoc.getExpireAt(), token) .then([this, self = shared_from_this(), executor, token]() { diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp index e2a047876bf..19f5f6ab71c 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp @@ -41,6 +41,7 @@ #include "mongo/db/repl/tenant_migration_shard_merge_util.h" #include "mongo/db/repl/tenant_migration_state_machine_gen.h" #include "mongo/db/repl/tenant_migration_util.h" +#include "mongo/db/serverless/serverless_operation_lock_registry.h" #include "mongo/logv2/log.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication @@ -179,6 +180,18 @@ void TenantMigrationRecipientOpObserver::onInserts( std::vector<InsertStatement>::const_iterator first, std::vector<InsertStatement>::const_iterator last, bool fromMigrate) { + if (coll->ns() == NamespaceString::kTenantMigrationRecipientsNamespace && + !tenant_migration_access_blocker::inRecoveryMode(opCtx)) { + for (auto it = first; it != last; it++) { + auto recipientStateDoc = TenantMigrationRecipientDocument::parse( + IDLParserContext("recipientStateDoc"), it->doc); + if (!recipientStateDoc.getExpireAt()) { + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .acquireLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient, + recipientStateDoc.getId()); + } + } + } if (!shard_merge_utils::isDonatedFilesCollection(coll->ns())) { return; @@ -204,6 +217,10 @@ void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx, repl::TenantFileImporterService::get(opCtx->getServiceContext()) ->interrupt(recipientStateDoc.getId()); + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient, + recipientStateDoc.getId()); + std::vector<std::string> tenantIdsToRemove; auto cleanUpBlockerIfGarbage = [&](std::string tenantId, std::shared_ptr<TenantMigrationAccessBlocker>& mtab) { @@ -312,6 +329,9 @@ repl::OpTime TenantMigrationRecipientOpObserver::onDropCollection( repl::TenantFileImporterService::get(opCtx->getServiceContext())->interruptAll(); TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .removeAll(TenantMigrationAccessBlocker::BlockerType::kRecipient); + + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .onDropStateCollection(ServerlessOperationLockRegistry::LockType::kTenantRecipient); }); } return {}; diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 869670c6ca3..ab7e7ef12e1 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -2968,7 +2968,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( // Handle recipientForgetMigration. stdx::lock_guard lk(_mutex); if (_stateDoc.getExpireAt() || - MONGO_unlikely(autoRecipientForgetMigration.shouldFail())) { + MONGO_unlikely(autoRecipientForgetMigration.shouldFail()) || + status.code() == ErrorCodes::ConflictingServerlessOperation) { // Skip waiting for the recipientForgetMigration command. setPromiseOkifNotReady(lk, _receivedRecipientForgetMigrationPromise); } @@ -3018,7 +3019,16 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( // is safe even on shutDown/stepDown. stdx::lock_guard lk(_mutex); invariant(_dataSyncCompletionPromise.getFuture().isReady()); - if (!status.isOK()) { + + if (status.code() == ErrorCodes::ConflictingServerlessOperation) { + LOGV2(6531506, + "Migration failed as another serverless operation was in progress", + "migrationId"_attr = getMigrationUUID(), + "tenantId"_attr = getTenantId(), + "status"_attr = status); + setPromiseOkifNotReady(lk, _forgetMigrationDurablePromise); + return status; + } else if (!status.isOK()) { // We should only hit here on a stepDown/shutDown, or a 'conflicting migration' // error. LOGV2(4881402, @@ -3029,6 +3039,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( setPromiseErrorifNotReady(lk, _forgetMigrationDurablePromise, status); } _taskState.setState(TaskState::kDone); + + return Status::OK(); }) .semi(); } diff --git a/src/mongo/db/serverless/SConscript b/src/mongo/db/serverless/SConscript index 3ccfd8ea7f7..82143897663 100644 --- a/src/mongo/db/serverless/SConscript +++ b/src/mongo/db/serverless/SConscript @@ -57,6 +57,21 @@ env.Library( ) env.Library( + target='serverless_lock', + source=[ + 'serverless_operation_lock_registry.cpp', + 'serverless_server_status.cpp', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/dbdirectclient', + '$BUILD_DIR/mongo/db/repl/tenant_migration_state_machine_idl', + '$BUILD_DIR/mongo/db/repl/tenant_migration_utils', + '$BUILD_DIR/mongo/db/server_base', + 'shard_split_state_machine', + ], +) + +env.Library( target='shard_split_donor_service', source=[ 'shard_split_donor_service.cpp', @@ -77,6 +92,7 @@ env.Library( '$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker', '$BUILD_DIR/mongo/db/server_base', '$BUILD_DIR/mongo/db/shard_role', + 'serverless_lock', 'shard_split_utils', ], ) @@ -84,6 +100,7 @@ env.Library( env.CppUnitTest( target='db_serverless_test', source=[ + 'serverless_operation_lock_registry_test.cpp', 'shard_split_donor_op_observer_test.cpp', 'shard_split_donor_service_test.cpp', 'shard_split_utils_test.cpp', @@ -97,6 +114,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/repl/replmocks', '$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker', '$BUILD_DIR/mongo/dbtests/mocklib', + 'serverless_lock', 'shard_split_donor_service', 'shard_split_utils', ], diff --git a/src/mongo/db/serverless/serverless_operation_lock_registry.cpp b/src/mongo/db/serverless/serverless_operation_lock_registry.cpp new file mode 100644 index 00000000000..20a02c6cd15 --- /dev/null +++ b/src/mongo/db/serverless/serverless_operation_lock_registry.cpp @@ -0,0 +1,192 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/serverless/serverless_operation_lock_registry.h" +#include "mongo/db/persistent_task_store.h" +#include "mongo/db/repl/tenant_migration_state_machine_gen.h" +#include "mongo/db/serverless/shard_split_state_machine_gen.h" +#include "mongo/logv2/log.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTenantMigration + +// Failpoint that will cause recoverLocks to return early. +MONGO_FAIL_POINT_DEFINE(skipRecoverServerlessOperationLock); +namespace mongo { + +const ServiceContext::Decoration<ServerlessOperationLockRegistry> + ServerlessOperationLockRegistry::get = + ServiceContext::declareDecoration<ServerlessOperationLockRegistry>(); + +void ServerlessOperationLockRegistry::acquireLock( + ServerlessOperationLockRegistry::LockType lockType, const UUID& operationId) { + stdx::lock_guard<Latch> lg(_mutex); + + // Verify there is no serverless operation in progress or it is the same type as the one + // acquiring the lock. + uassert(ErrorCodes::ConflictingServerlessOperation, + "Conflicting serverless operation in progress", + !_activeLockType || _activeLockType.get() == lockType); + invariant(_activeOperations.find(operationId) == _activeOperations.end(), + "Cannot acquire the serverless lock twice for the same operationId."); + _activeLockType = lockType; + + _activeOperations.emplace(operationId); + + LOGV2(6531500, + "Acquired serverless operation lock", + "type"_attr = lockType, + "id"_attr = operationId); +} + +void ServerlessOperationLockRegistry::releaseLock( + ServerlessOperationLockRegistry::LockType lockType, const UUID& operationId) { + stdx::lock_guard<Latch> lg(_mutex); + + invariant(_activeLockType && *_activeLockType == lockType, + "Cannot release a serverless lock that is not owned by the given lock type."); + + invariant(_activeOperations.find(operationId) != _activeOperations.end(), + "Cannot release a serverless lock if the given operationId does not own the lock."); + _activeOperations.erase(operationId); + + if (_activeOperations.empty()) { + _activeLockType.reset(); + } + + LOGV2(6531501, + "Released serverless operation lock", + "type"_attr = lockType, + "id"_attr = operationId); +} + +void ServerlessOperationLockRegistry::onDropStateCollection(LockType lockType) { + stdx::lock_guard<Latch> lg(_mutex); + + if (!_activeLockType || *_activeLockType != lockType) { + return; + } + + LOGV2(6531505, + "Released all serverless locks due to state collection drop", + "type"_attr = lockType); + + _activeLockType.reset(); + _activeOperations.clear(); +} + +void ServerlessOperationLockRegistry::clear() { + stdx::lock_guard<Latch> lg(_mutex); + LOGV2(6531504, + "Clearing serverless operation lock registry on shutdown", + "ns"_attr = _activeLockType); + + _activeOperations.clear(); + _activeLockType.reset(); +} + +void ServerlessOperationLockRegistry::recoverLocks(OperationContext* opCtx) { + if (skipRecoverServerlessOperationLock.shouldFail()) { + return; + } + + auto& registry = ServerlessOperationLockRegistry::get(opCtx->getServiceContext()); + registry.clear(); + + PersistentTaskStore<TenantMigrationDonorDocument> donorStore( + NamespaceString::kTenantMigrationDonorsNamespace); + donorStore.forEach(opCtx, {}, [&](const TenantMigrationDonorDocument& doc) { + // Do not acquire a lock for garbage-collectable documents + if (doc.getExpireAt()) { + return true; + } + + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, doc.getId()); + + return true; + }); + + PersistentTaskStore<TenantMigrationRecipientDocument> recipientStore( + NamespaceString::kTenantMigrationRecipientsNamespace); + recipientStore.forEach(opCtx, {}, [&](const TenantMigrationRecipientDocument& doc) { + // Do not acquire a lock for garbage-collectable documents + if (doc.getExpireAt()) { + return true; + } + + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient, + doc.getId()); + + return true; + }); + + PersistentTaskStore<ShardSplitDonorDocument> splitStore( + NamespaceString::kShardSplitDonorsNamespace); + splitStore.forEach(opCtx, {}, [&](const ShardSplitDonorDocument& doc) { + // Do not acquire a lock for garbage-collectable documents + if (doc.getExpireAt()) { + return true; + } + + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, doc.getId()); + + return true; + }); +} + +const std::string kOperationLockFieldName = "operationLock"; +void ServerlessOperationLockRegistry::appendInfoForServerStatus(BSONObjBuilder* builder) const { + stdx::lock_guard<Latch> lg(_mutex); + + if (!_activeLockType) { + builder->append(kOperationLockFieldName, 0); + return; + } + + switch (_activeLockType.value()) { + case ServerlessOperationLockRegistry::LockType::kShardSplit: + builder->append(kOperationLockFieldName, 1); + break; + case ServerlessOperationLockRegistry::LockType::kTenantDonor: + builder->append(kOperationLockFieldName, 2); + break; + case ServerlessOperationLockRegistry::LockType::kTenantRecipient: + builder->append(kOperationLockFieldName, 3); + break; + } +} + +boost::optional<ServerlessOperationLockRegistry::LockType> +ServerlessOperationLockRegistry::getActiveOperationType_forTest() { + stdx::lock_guard<Latch> lg(_mutex); + + return _activeLockType; +} + + +} // namespace mongo diff --git a/src/mongo/db/serverless/serverless_operation_lock_registry.h b/src/mongo/db/serverless/serverless_operation_lock_registry.h new file mode 100644 index 00000000000..d9ac07393f4 --- /dev/null +++ b/src/mongo/db/serverless/serverless_operation_lock_registry.h @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/service_context.h" +#include "mongo/platform/mutex.h" +#include "mongo/util/uuid.h" + +#include <set> + +namespace mongo { + +/** + * Registry to allow only one type of active serverless operation at a time. It allows multiple + * simultaneous operations of the same type. + */ +class ServerlessOperationLockRegistry { + ServerlessOperationLockRegistry(const ServerlessOperationLockRegistry&) = delete; + ServerlessOperationLockRegistry& operator=(const ServerlessOperationLockRegistry&) = delete; + +public: + ServerlessOperationLockRegistry() = default; + + static const ServiceContext::Decoration<ServerlessOperationLockRegistry> get; + + enum LockType { kShardSplit, kTenantDonor, kTenantRecipient }; + + /** + * Acquire the serverless lock for LockType and adds operationId to the set of + * instances tracked. Throws ConflictingOperationInProgress error if there is already an + * activeServerlessOperation in progress with a different namespace than operationNamespace. + */ + void acquireLock(LockType lockType, const UUID& operationId); + + /** + * If _activeOpSeverlessOperation matches LockType, removes the given operationId from + * the set of active instances and releases the lock if the set becomes empty. Invariant if + * lockType or operationId does not own the lock. + */ + void releaseLock(LockType lockType, const UUID& operationId); + + /** + * Called when a state document collection is dropped. If the collection's lockType currently + * holds the lock, it releases the lock. If it does not own the lock, the function does nothing. + */ + void onDropStateCollection(LockType lockType); + + void clear(); + + /** + * Scan serverless state documents and acquire the serverless mutual exclusion lock if needed. + */ + static void recoverLocks(OperationContext* opCtx); + + /** + * Appends the exclusion status to the BSONObjBuilder. + */ + void appendInfoForServerStatus(BSONObjBuilder* builder) const; + + boost::optional<ServerlessOperationLockRegistry::LockType> getActiveOperationType_forTest(); + +private: + mutable Mutex _mutex = MONGO_MAKE_LATCH("ServerlessMutualExclusionRegistry::_mutex"); + boost::optional<LockType> _activeLockType; + std::set<UUID> _activeOperations; +}; + +} // namespace mongo diff --git a/src/mongo/db/serverless/serverless_operation_lock_registry_test.cpp b/src/mongo/db/serverless/serverless_operation_lock_registry_test.cpp new file mode 100644 index 00000000000..9d95b3b7bc7 --- /dev/null +++ b/src/mongo/db/serverless/serverless_operation_lock_registry_test.cpp @@ -0,0 +1,154 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/serverless/serverless_operation_lock_registry.h" +#include "mongo/unittest/death_test.h" +#include "mongo/unittest/log_test.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { + +TEST(ServerlessOperationLockRegistryTest, InsertRemoveOne) { + ServerlessOperationLockRegistry registry; + + auto id = UUID::gen(); + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id); + registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id); + + ASSERT_FALSE(registry.getActiveOperationType_forTest()); +} + +DEATH_TEST(ServerlessOperationLockRegistryTest, + InsertSameIdTwice, + "Cannot acquire the serverless lock twice for the same operationId.") { + ServerlessOperationLockRegistry registry; + + auto id = UUID::gen(); + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id); + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id); +} + +TEST(ServerlessOperationLockRegistryTest, AcquireDifferentNamespaceFail) { + ServerlessOperationLockRegistry registry; + + auto id = UUID::gen(); + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id); + + ASSERT_THROWS_CODE( + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, UUID::gen()), + DBException, + ErrorCodes::ConflictingServerlessOperation); +} + +DEATH_TEST(ServerlessOperationLockRegistryTest, + ReleaseDifferentNsTriggersInvariant, + "Cannot release a serverless lock that is not owned by the given lock type.") { + ServerlessOperationLockRegistry registry; + + auto id = UUID::gen(); + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id); + registry.releaseLock(ServerlessOperationLockRegistry::LockType::kTenantDonor, id); +} + + +DEATH_TEST(ServerlessOperationLockRegistryTest, + ReleaseDifferentIdTriggersInvariant, + "Cannot release a serverless lock if the given operationId does not own the lock.") { + ServerlessOperationLockRegistry registry; + + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen()); + registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen()); +} + +TEST(ServerlessOperationLockRegistryTest, ClearReleasesAllLocks) { + ServerlessOperationLockRegistry registry; + + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen()); + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen()); + + registry.clear(); + + // Verify the lock has been released. + ASSERT_FALSE(registry.getActiveOperationType_forTest()); +} + +TEST(ServerlessOperationLockRegistryTest, LockIsReleasedWhenAllInstanceAreRemoved) { + ServerlessOperationLockRegistry registry; + + std::vector<UUID> ids; + for (int i = 0; i < 5; ++i) { + ids.push_back(UUID::gen()); + } + + for (auto& id : ids) { + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id); + } + + // Verify the lock is held; + ASSERT_EQ(*registry.getActiveOperationType_forTest(), + ServerlessOperationLockRegistry::LockType::kShardSplit); + + + for (auto& id : ids) { + registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id); + } + + // Verify the lock has been released. + ASSERT_FALSE(registry.getActiveOperationType_forTest()); +} + +TEST(ServerlessOperationLockRegistryTest, LockIsNotReleasedWhenNotAllInstanceAreRemoved) { + ServerlessOperationLockRegistry registry; + + std::vector<UUID> ids; + for (int i = 0; i < 5; ++i) { + ids.push_back(UUID::gen()); + } + + for (auto& id : ids) { + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id); + } + // Add an additional id; + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, UUID::gen()); + + // Verify the lock is held; + ASSERT_EQ(*registry.getActiveOperationType_forTest(), + ServerlessOperationLockRegistry::LockType::kShardSplit); + + for (auto& id : ids) { + registry.releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, id); + } + + // Verify the lock is held; + ASSERT_EQ(*registry.getActiveOperationType_forTest(), + ServerlessOperationLockRegistry::LockType::kShardSplit); +} + + +} // namespace mongo diff --git a/src/mongo/db/serverless/serverless_server_status.cpp b/src/mongo/db/serverless/serverless_server_status.cpp new file mode 100644 index 00000000000..8d0d4658dc3 --- /dev/null +++ b/src/mongo/db/serverless/serverless_server_status.cpp @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/commands/server_status.h" +#include "mongo/db/serverless/serverless_operation_lock_registry.h" + +namespace mongo { +namespace { + +class ServerlessServerStatus final : public ServerStatusSection { +public: + ServerlessServerStatus() : ServerStatusSection("serverless") {} + + bool includeByDefault() const override { + return false; + } + + BSONObj generateSection(OperationContext* opCtx, + const BSONElement& configElement) const override { + BSONObjBuilder result; + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .appendInfoForServerStatus(&result); + return result.obj(); + } +} serverlessServerStatus; + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/serverless/shard_split_commands.cpp b/src/mongo/db/serverless/shard_split_commands.cpp index b0f0e6c6cab..5ce6c9c0307 100644 --- a/src/mongo/db/serverless/shard_split_commands.cpp +++ b/src/mongo/db/serverless/shard_split_commands.cpp @@ -114,7 +114,7 @@ public: }; std::string help() const { - return "Start an opereation to split a shard into its own slice."; + return "Start an operation to split a shard into its own slice."; } bool adminOnly() const override { diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp index 2d67495c431..b2470d07854 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp +++ b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp @@ -31,6 +31,7 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/repl/tenant_migration_access_blocker_util.h" +#include "mongo/db/serverless/serverless_operation_lock_registry.h" #include "mongo/db/serverless/shard_split_donor_op_observer.h" #include "mongo/db/serverless/shard_split_state_machine_gen.h" #include "mongo/db/serverless/shard_split_utils.h" @@ -48,6 +49,8 @@ bool isPrimary(const OperationContext* opCtx) { const auto tenantIdsToDeleteDecoration = OperationContext::declareDecoration<boost::optional<std::vector<std::string>>>(); +const auto shardSplitIdToDeleteDecoration = + OperationContext::declareDecoration<boost::optional<UUID>>(); ShardSplitDonorDocument parseAndValidateDonorDocument(const BSONObj& doc) { auto donorStateDoc = ShardSplitDonorDocument::parse(IDLParserContext("donorStateDoc"), doc); @@ -146,6 +149,9 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx, invariant(donorStateDoc.getTenantIds()); invariant(donorStateDoc.getRecipientConnectionString()); + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, donorStateDoc.getId()); + auto tenantIds = *donorStateDoc.getTenantIds(); for (const auto& tenantId : tenantIds) { auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>(opCtx->getServiceContext(), @@ -157,11 +163,13 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx, if (isPrimary(opCtx)) { // onRollback is not registered on secondaries since secondaries should not fail to // apply the write. - opCtx->recoveryUnit()->onRollback([opCtx, tenantIds] { + opCtx->recoveryUnit()->onRollback([opCtx, tenantIds, migrationId = donorStateDoc.getId()] { for (const auto& tenantId : tenantIds) { TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kDonor); } + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, migrationId); }); } } @@ -250,6 +258,10 @@ public: void commit(OperationContext* opCtx, boost::optional<Timestamp>) override { if (_donorStateDoc.getExpireAt()) { + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, + _donorStateDoc.getId()); + if (_donorStateDoc.getTenantIds()) { auto tenantIds = _donorStateDoc.getTenantIds().value(); for (auto&& tenantId : tenantIds) { @@ -376,12 +388,13 @@ void ShardSplitDonorOpObserver::aboutToDelete(OperationContext* opCtx, } auto donorStateDoc = parseAndValidateDonorDocument(doc); + const bool shouldRemoveOnRecipient = + serverless::shouldRemoveStateDocumentOnRecipient(opCtx, donorStateDoc); uassert(ErrorCodes::IllegalOperation, str::stream() << "cannot delete a donor's state document " << doc << " since it has not been marked as garbage collectable and is not a" << " recipient garbage collectable.", - donorStateDoc.getExpireAt() || - serverless::shouldRemoveStateDocumentOnRecipient(opCtx, donorStateDoc)); + donorStateDoc.getExpireAt() || shouldRemoveOnRecipient); // To support back-to-back split retries, when a split is aborted, we remove its // TenantMigrationDonorAccessBlockers as soon as its donor state doc is marked as garbage @@ -397,6 +410,10 @@ void ShardSplitDonorOpObserver::aboutToDelete(OperationContext* opCtx, tenantIdsToDeleteDecoration(opCtx) = boost::make_optional(result); } + + if (shouldRemoveOnRecipient) { + shardSplitIdToDeleteDecoration(opCtx) = boost::make_optional(donorStateDoc.getId()); + } } void ShardSplitDonorOpObserver::onDelete(OperationContext* opCtx, @@ -419,6 +436,12 @@ void ShardSplitDonorOpObserver::onDelete(OperationContext* opCtx, for (auto&& tenantId : *tenantIdsToDeleteDecoration(opCtx)) { registry.remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kDonor); } + + const auto idToDelete = shardSplitIdToDeleteDecoration(opCtx); + if (idToDelete) { + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, *idToDelete); + } }); } @@ -431,6 +454,9 @@ repl::OpTime ShardSplitDonorOpObserver::onDropCollection(OperationContext* opCtx opCtx->recoveryUnit()->onCommit([opCtx](boost::optional<Timestamp>) { TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .removeAll(TenantMigrationAccessBlocker::BlockerType::kDonor); + + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .onDropStateCollection(ServerlessOperationLockRegistry::LockType::kShardSplit); }); } diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp index 97c923524a3..f30ad593951 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp +++ b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp @@ -34,6 +34,7 @@ #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/repl/tenant_migration_access_blocker_util.h" +#include "mongo/db/serverless/serverless_operation_lock_registry.h" #include "mongo/db/serverless/shard_split_donor_op_observer.h" #include "mongo/db/serverless/shard_split_state_machine_gen.h" #include "mongo/db/serverless/shard_split_test_utils.h" @@ -446,7 +447,13 @@ TEST_F(ShardSplitDonorOpObserverTest, SetExpireAtForAbortedRemoveBlockers) { ASSERT_FALSE(mtab); }; + ServerlessOperationLockRegistry::get(_opCtx->getServiceContext()) + .acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, _uuid); + runUpdateTestCase(stateDocument, _tenantIds, mtabVerifier); + + ASSERT_FALSE(ServerlessOperationLockRegistry::get(_opCtx->getServiceContext()) + .getActiveOperationType_forTest()); } TEST_F(ShardSplitDonorOpObserverTest, DeleteAbortedDocumentDoesNotRemoveBlockers) { diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp index 4c3fdabb39f..fa977ae0c7d 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service.cpp @@ -1005,7 +1005,18 @@ ExecutorFuture<repl::OpTime> ShardSplitDonorService::DonorStateMachine::_updateS return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); }) - .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); }) + .until([&](StatusWith<repl::OpTime> swOpTime) { + if (swOpTime.getStatus().code() == ErrorCodes::ConflictingServerlessOperation) { + LOGV2(6531509, + "Shard split completed due to serverless lock error", + "id"_attr = _migrationId, + "status"_attr = swOpTime.getStatus()); + stdx::lock_guard<Latch> lg(_mutex); + + uassertStatusOK(swOpTime); + } + return swOpTime.getStatus().isOK(); + }) .withBackoffBetweenIterations(kExponentialBackoff) .on(**executor, token); } @@ -1067,7 +1078,8 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState( } } - if (ErrorCodes::isNotPrimaryError(status) || ErrorCodes::isShutdownError(status)) { + if (ErrorCodes::isNotPrimaryError(status) || ErrorCodes::isShutdownError(status) || + status.code() == ErrorCodes::ConflictingServerlessOperation) { // Don't abort the split on retriable errors that may have been generated by the local // server shutting/stepping down because it can be resumed when the client retries. return ExecutorFuture(**executor, StatusWith<DurableState>{status}); diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp index 463031b66bc..f46c908059b 100644 --- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp @@ -49,6 +49,7 @@ #include "mongo/db/repl/tenant_migration_access_blocker_util.h" #include "mongo/db/repl/tenant_migration_donor_access_blocker.h" #include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/db/serverless/serverless_operation_lock_registry.h" #include "mongo/db/serverless/shard_split_donor_op_observer.h" #include "mongo/db/serverless/shard_split_donor_service.h" #include "mongo/db/serverless/shard_split_state_machine_gen.h" @@ -511,6 +512,11 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation) waitForReplSetStepUp(Status(ErrorCodes::OK, "")); waitForRecipientPrimaryMajorityWrite(); + // Verify the serverless lock has been acquired for split. + auto& registry = ServerlessOperationLockRegistry::get(opCtx->getServiceContext()); + ASSERT_EQ(*registry.getActiveOperationType_forTest(), + ServerlessOperationLockRegistry::LockType::kShardSplit); + auto result = serviceInstance->decisionFuture().get(); ASSERT_TRUE(hasActiveSplitForTenants(opCtx.get(), _tenantIds)); ASSERT(!result.abortReason); @@ -520,10 +526,32 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation) auto completionFuture = serviceInstance->completionFuture(); completionFuture.wait(); + // The lock has been released. + ASSERT_FALSE(registry.getActiveOperationType_forTest()); + ASSERT_OK(serviceInstance->completionFuture().getNoThrow()); ASSERT_TRUE(serviceInstance->isGarbageCollectable()); } +TEST_F(ShardSplitDonorServiceTest, ShardSplitFailsWhenLockIsHeld) { + auto opCtx = makeOperationContext(); + test::shard_split::reconfigToAddRecipientNodes( + getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts()); + + auto& registry = ServerlessOperationLockRegistry::get(opCtx->getServiceContext()); + registry.acquireLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient, UUID::gen()); + + // Create and start the instance. + auto serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate( + opCtx.get(), _service, defaultStateDocument().toBSON()); + ASSERT(serviceInstance.get()); + + auto decisionFuture = serviceInstance->decisionFuture(); + + auto result = decisionFuture.getNoThrow(); + ASSERT_EQ(result.getStatus().code(), ErrorCodes::ConflictingServerlessOperation); +} + TEST_F(ShardSplitDonorServiceTest, ReplSetStepUpRetryable) { auto opCtx = makeOperationContext(); test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); @@ -1015,6 +1043,10 @@ public: stateDocument.setState(ShardSplitDonorStateEnum::kBlocking); stateDocument.setRecipientConnectionString(ConnectionString::forLocal()); + ServerlessOperationLockRegistry::get(getServiceContext()) + .acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, + stateDocument.getId()); + return stateDocument; } }; |