summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp')
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp20
1 files changed, 20 insertions, 0 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
index e2a047876bf..19f5f6ab71c 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/repl/tenant_migration_shard_merge_util.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
#include "mongo/db/repl/tenant_migration_util.h"
+#include "mongo/db/serverless/serverless_operation_lock_registry.h"
#include "mongo/logv2/log.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
@@ -179,6 +180,18 @@ void TenantMigrationRecipientOpObserver::onInserts(
std::vector<InsertStatement>::const_iterator first,
std::vector<InsertStatement>::const_iterator last,
bool fromMigrate) {
+ if (coll->ns() == NamespaceString::kTenantMigrationRecipientsNamespace &&
+ !tenant_migration_access_blocker::inRecoveryMode(opCtx)) {
+ for (auto it = first; it != last; it++) {
+ auto recipientStateDoc = TenantMigrationRecipientDocument::parse(
+ IDLParserContext("recipientStateDoc"), it->doc);
+ if (!recipientStateDoc.getExpireAt()) {
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .acquireLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient,
+ recipientStateDoc.getId());
+ }
+ }
+ }
if (!shard_merge_utils::isDonatedFilesCollection(coll->ns())) {
return;
@@ -204,6 +217,10 @@ void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx,
repl::TenantFileImporterService::get(opCtx->getServiceContext())
->interrupt(recipientStateDoc.getId());
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .releaseLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient,
+ recipientStateDoc.getId());
+
std::vector<std::string> tenantIdsToRemove;
auto cleanUpBlockerIfGarbage =
[&](std::string tenantId, std::shared_ptr<TenantMigrationAccessBlocker>& mtab) {
@@ -312,6 +329,9 @@ repl::OpTime TenantMigrationRecipientOpObserver::onDropCollection(
repl::TenantFileImporterService::get(opCtx->getServiceContext())->interruptAll();
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.removeAll(TenantMigrationAccessBlocker::BlockerType::kRecipient);
+
+ ServerlessOperationLockRegistry::get(opCtx->getServiceContext())
+ .onDropStateCollection(ServerlessOperationLockRegistry::LockType::kTenantRecipient);
});
}
return {};