diff options
Diffstat (limited to 'src/mongo/db/serverless/shard_split_donor_op_observer.cpp')
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_op_observer.cpp | 32 |
1 files changed, 29 insertions, 3 deletions
diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp index 2d67495c431..b2470d07854 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp +++ b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp @@ -31,6 +31,7 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/repl/tenant_migration_access_blocker_util.h" +#include "mongo/db/serverless/serverless_operation_lock_registry.h" #include "mongo/db/serverless/shard_split_donor_op_observer.h" #include "mongo/db/serverless/shard_split_state_machine_gen.h" #include "mongo/db/serverless/shard_split_utils.h" @@ -48,6 +49,8 @@ bool isPrimary(const OperationContext* opCtx) { const auto tenantIdsToDeleteDecoration = OperationContext::declareDecoration<boost::optional<std::vector<std::string>>>(); +const auto shardSplitIdToDeleteDecoration = + OperationContext::declareDecoration<boost::optional<UUID>>(); ShardSplitDonorDocument parseAndValidateDonorDocument(const BSONObj& doc) { auto donorStateDoc = ShardSplitDonorDocument::parse(IDLParserContext("donorStateDoc"), doc); @@ -146,6 +149,9 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx, invariant(donorStateDoc.getTenantIds()); invariant(donorStateDoc.getRecipientConnectionString()); + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .acquireLock(ServerlessOperationLockRegistry::LockType::kShardSplit, donorStateDoc.getId()); + auto tenantIds = *donorStateDoc.getTenantIds(); for (const auto& tenantId : tenantIds) { auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>(opCtx->getServiceContext(), @@ -157,11 +163,13 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx, if (isPrimary(opCtx)) { // onRollback is not registered on secondaries since secondaries should not fail to // apply the write. - opCtx->recoveryUnit()->onRollback([opCtx, tenantIds] { + opCtx->recoveryUnit()->onRollback([opCtx, tenantIds, migrationId = donorStateDoc.getId()] { for (const auto& tenantId : tenantIds) { TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kDonor); } + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, migrationId); }); } } @@ -250,6 +258,10 @@ public: void commit(OperationContext* opCtx, boost::optional<Timestamp>) override { if (_donorStateDoc.getExpireAt()) { + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, + _donorStateDoc.getId()); + if (_donorStateDoc.getTenantIds()) { auto tenantIds = _donorStateDoc.getTenantIds().value(); for (auto&& tenantId : tenantIds) { @@ -376,12 +388,13 @@ void ShardSplitDonorOpObserver::aboutToDelete(OperationContext* opCtx, } auto donorStateDoc = parseAndValidateDonorDocument(doc); + const bool shouldRemoveOnRecipient = + serverless::shouldRemoveStateDocumentOnRecipient(opCtx, donorStateDoc); uassert(ErrorCodes::IllegalOperation, str::stream() << "cannot delete a donor's state document " << doc << " since it has not been marked as garbage collectable and is not a" << " recipient garbage collectable.", - donorStateDoc.getExpireAt() || - serverless::shouldRemoveStateDocumentOnRecipient(opCtx, donorStateDoc)); + donorStateDoc.getExpireAt() || shouldRemoveOnRecipient); // To support back-to-back split retries, when a split is aborted, we remove its // TenantMigrationDonorAccessBlockers as soon as its donor state doc is marked as garbage @@ -397,6 +410,10 @@ void ShardSplitDonorOpObserver::aboutToDelete(OperationContext* opCtx, tenantIdsToDeleteDecoration(opCtx) = boost::make_optional(result); } + + if (shouldRemoveOnRecipient) { + shardSplitIdToDeleteDecoration(opCtx) = boost::make_optional(donorStateDoc.getId()); + } } void ShardSplitDonorOpObserver::onDelete(OperationContext* opCtx, @@ -419,6 +436,12 @@ void ShardSplitDonorOpObserver::onDelete(OperationContext* opCtx, for (auto&& tenantId : *tenantIdsToDeleteDecoration(opCtx)) { registry.remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kDonor); } + + const auto idToDelete = shardSplitIdToDeleteDecoration(opCtx); + if (idToDelete) { + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .releaseLock(ServerlessOperationLockRegistry::LockType::kShardSplit, *idToDelete); + } }); } @@ -431,6 +454,9 @@ repl::OpTime ShardSplitDonorOpObserver::onDropCollection(OperationContext* opCtx opCtx->recoveryUnit()->onCommit([opCtx](boost::optional<Timestamp>) { TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .removeAll(TenantMigrationAccessBlocker::BlockerType::kDonor); + + ServerlessOperationLockRegistry::get(opCtx->getServiceContext()) + .onDropStateCollection(ServerlessOperationLockRegistry::LockType::kShardSplit); }); } |