diff options
Diffstat (limited to 'src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp')
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp | 20 |
1 files changed, 20 insertions, 0 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp index e2a047876bf..19f5f6ab71c 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp @@ -41,6 +41,7 @@ #include "mongo/db/repl/tenant_migration_shard_merge_util.h" #include "mongo/db/repl/tenant_migration_state_machine_gen.h" #include "mongo/db/repl/tenant_migration_util.h" +#include "mongo/db/serverless/serverless_operation_lock_registry.h" #include "mongo/logv2/log.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication @@ -179,6 +180,18 @@ void TenantMigrationRecipientOpObserver::onInserts( std::vector<InsertStatement>::const_iterator first, std::vector<InsertStatement>::const_iterator last, bool fromMigrate) { + if (coll->ns() == NamespaceString::kTenantMigrationRecipientsNamespace && + !tenant_migration_access_blocker::inRecoveryMode(opCtx)) { + for (auto it = first; it != last; it++) { + auto recipientStateDoc = TenantMigrationRecipientDocument::parse( + IDLParserContext("recipientStateDoc"), it->doc); + if (!recipientStateDoc.getExpireAt()) { + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .acquireLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient, + recipientStateDoc.getId()); + } + } + } if (!shard_merge_utils::isDonatedFilesCollection(coll->ns())) { return; @@ -204,6 +217,10 @@ void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx, repl::TenantFileImporterService::get(opCtx->getServiceContext()) ->interrupt(recipientStateDoc.getId()); + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient, + recipientStateDoc.getId()); + std::vector<std::string> tenantIdsToRemove; auto cleanUpBlockerIfGarbage = [&](std::string tenantId, std::shared_ptr<TenantMigrationAccessBlocker>& mtab) { @@ -312,6 +329,9 @@ repl::OpTime TenantMigrationRecipientOpObserver::onDropCollection( repl::TenantFileImporterService::get(opCtx->getServiceContext())->interruptAll(); TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .removeAll(TenantMigrationAccessBlocker::BlockerType::kRecipient); + + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .onDropStateCollection(ServerlessOperationLockRegistry::LockType::kTenantRecipient); }); } return {}; |