diff options
author | XueruiFa <xuerui.fa@mongodb.com> | 2021-02-03 21:35:24 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-09 16:00:46 +0000 |
commit | 2a1c62250ab5c9f089a2e4699d454ed326057b82 (patch) | |
tree | b17059a8883a57a4ef58634b02cacede429bcc69 | |
parent | b8a9330fe1591763ef40d26595b5668e581a1248 (diff) | |
download | mongo-2a1c62250ab5c9f089a2e4699d454ed326057b82.tar.gz |
SERVER-53996: Avoid holding mutex while reading/writing in tenant migration recipient
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.cpp | 139 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.h | 2 |
2 files changed, 84 insertions, 57 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 19473158f53..922c4bd5742 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -605,9 +605,6 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_initializeStateDoc( return SemiFuture<void>::makeReady(); } - auto uniqueOpCtx = cc().makeOperationContext(); - auto opCtx = uniqueOpCtx.get(); - LOGV2_DEBUG(5081400, 2, "Recipient migration service initializing state document", @@ -618,25 +615,34 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_initializeStateDoc( // Persist the state doc before starting the data sync. _stateDoc.setState(TenantMigrationRecipientStateEnum::kStarted); - { - Lock::ExclusiveLock stateDocInsertLock( - opCtx, opCtx->lockState(), _recipientService->_stateDocInsertMutex); - uassertStatusOK(tenantMigrationRecipientEntryHelpers::insertStateDoc(opCtx, _stateDoc)); - } - if (MONGO_unlikely(failWhilePersistingTenantMigrationRecipientInstanceStateDoc.shouldFail())) { - LOGV2(4878500, "Persisting state doc failed due to fail point enabled."); - uassert(ErrorCodes::NotWritablePrimary, - "Persisting state doc failed - " - "'failWhilePersistingTenantMigrationRecipientInstanceStateDoc' fail point active", - false); - } - // Wait for the state doc to be majority replicated to make sure that the state doc doesn't - // rollback. - auto insertOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - return WaitForMajorityService::get(opCtx->getServiceContext()) - .waitUntilMajority(insertOpTime) + return ExecutorFuture(**_scopedExecutor) + .then([this, self = shared_from_this(), stateDoc = _stateDoc] { + auto opCtx = cc().makeOperationContext(); + { + Lock::ExclusiveLock stateDocInsertLock( + opCtx.get(), opCtx->lockState(), _recipientService->_stateDocInsertMutex); + uassertStatusOK( + tenantMigrationRecipientEntryHelpers::insertStateDoc(opCtx.get(), stateDoc)); + } + + if (MONGO_unlikely( + failWhilePersistingTenantMigrationRecipientInstanceStateDoc.shouldFail())) { + LOGV2(4878500, "Persisting state doc failed due to fail point enabled."); + uasserted( + ErrorCodes::NotWritablePrimary, + "Persisting state doc failed - " + "'failWhilePersistingTenantMigrationRecipientInstanceStateDoc' fail point " + "active"); + } + + // Wait for the state doc to be majority replicated to make sure that the state doc + // doesn't rollback. + auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + return WaitForMajorityService::get(opCtx->getServiceContext()) + .waitUntilMajority(writeOpTime); + }) .semi(); } @@ -976,9 +982,16 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_onCloneSuccess() { _stateDoc.setCloneFinishedRecipientOpTime( repl::ReplicationCoordinator::get(opCtx.get())->getMyLastAppliedOpTime()); - uassertStatusOK(tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), _stateDoc)); - return WaitForMajorityService::get(opCtx->getServiceContext()) - .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp()) + return ExecutorFuture(**_scopedExecutor) + .then([this, self = shared_from_this(), stateDoc = _stateDoc] { + auto opCtx = cc().makeOperationContext(); + uassertStatusOK( + tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), stateDoc)); + + auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + return WaitForMajorityService::get(opCtx->getServiceContext()) + .waitUntilMajority(writeOpTime); + }) .semi(); } @@ -995,17 +1008,19 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_getDataConsistentFu .thenRunOn(**_scopedExecutor) .then( [this, self = shared_from_this()](TenantOplogApplier::OpTimePair donorRecipientOpTime) { - auto opCtx = cc().makeOperationContext(); - stdx::lock_guard lk(_mutex); // Persist the state that tenant migration instance has reached // consistent state. _stateDoc.setState(TenantMigrationRecipientStateEnum::kConsistent); - uassertStatusOK( - tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), _stateDoc)); - return WaitForMajorityService::get(opCtx->getServiceContext()) - .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp()); + return _stateDoc; }) + .then([this, self = shared_from_this()](TenantMigrationRecipientDocument stateDoc) { + auto opCtx = cc().makeOperationContext(); + uassertStatusOK( + tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), stateDoc)); + return WaitForMajorityService::get(opCtx->getServiceContext()) + .waitUntilMajority(repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp()); + }) .semi(); } @@ -1068,31 +1083,36 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_markStateDocAsGarba return SemiFuture<void>::makeReady(); } - auto uniqueOpCtx = cc().makeOperationContext(); - auto opCtx = uniqueOpCtx.get(); _stateDoc.setState(TenantMigrationRecipientStateEnum::kDone); - _stateDoc.setExpireAt(opCtx->getServiceContext()->getFastClockSource()->now() + + _stateDoc.setExpireAt(getGlobalServiceContext()->getFastClockSource()->now() + Milliseconds{repl::tenantMigrationGarbageCollectionDelayMS.load()}); - auto status = [&]() { - try { - // Update the state doc with the expireAt set. - return tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx, _stateDoc); - } catch (DBException& ex) { - return ex.toStatus(); - } - }(); - if (!status.isOK()) { - // We assume that we only fail with shutDown/stepDown errors (i.e. for failovers). - // Otherwise, the whole chain would stop running without marking the state doc garbage - // collectable while we are still the primary. - invariant(ErrorCodes::isShutdownError(status) || ErrorCodes::isNotPrimaryError(status)); - uassertStatusOK(status); - } + return ExecutorFuture(**_scopedExecutor) + .then([this, self = shared_from_this(), stateDoc = _stateDoc] { + auto opCtx = cc().makeOperationContext(); + auto status = [&]() { + try { + // Update the state doc with the expireAt set. + return tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), + stateDoc); + } catch (DBException& ex) { + return ex.toStatus(); + } + }(); + + if (!status.isOK()) { + // We assume that we only fail with shutDown/stepDown errors (i.e. for failovers). + // Otherwise, the whole chain would stop running without marking the state doc + // garbage collectable while we are still the primary. + invariant(ErrorCodes::isShutdownError(status) || + ErrorCodes::isNotPrimaryError(status)); + uassertStatusOK(status); + } - auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - return WaitForMajorityService::get(opCtx->getServiceContext()) - .waitUntilMajority(writeOpTime) + auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + return WaitForMajorityService::get(opCtx->getServiceContext()) + .waitUntilMajority(writeOpTime); + }) .semi(); } @@ -1240,12 +1260,19 @@ BSONObj TenantMigrationRecipientService::Instance::_getOplogFetcherFilter() cons << "o.applyOps.0.ns" << namespaceRegex))); } -SharedSemiFuture<void> TenantMigrationRecipientService::Instance::_updateStateDocForMajority( +SemiFuture<void> TenantMigrationRecipientService::Instance::_updateStateDocForMajority( WithLock lk) const { - auto opCtx = cc().makeOperationContext(); - uassertStatusOK(tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), _stateDoc)); - return WaitForMajorityService::get(opCtx->getServiceContext()) - .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp()); + return ExecutorFuture(**_scopedExecutor) + .then([this, self = shared_from_this(), stateDoc = _stateDoc] { + auto opCtx = cc().makeOperationContext(); + uassertStatusOK( + tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), stateDoc)); + + auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + return WaitForMajorityService::get(opCtx->getServiceContext()) + .waitUntilMajority(writeOpTime); + }) + .semi(); } void TenantMigrationRecipientService::Instance::_fetchAndStoreDonorClusterTimeKeyDocs( @@ -1383,7 +1410,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( uasserted(5356201, "Detected FCV change from last migration attempt."); } - return SemiFuture<void>::makeReady().share(); + return SemiFuture<void>::makeReady(); }) .then([this, self = shared_from_this()] { _stopOrHangOnFailPoint(&fpAfterRecordingRecipientPrimaryStartingFCV); diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h index 2fec9e6bf55..fb4d7ef5163 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.h +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -414,7 +414,7 @@ public: /** * Updates the state doc in the database and waits for that to be propagated to a majority. */ - SharedSemiFuture<void> _updateStateDocForMajority(WithLock lk) const; + SemiFuture<void> _updateStateDocForMajority(WithLock lk) const; /* * Returns the majority OpTime on the donor node that 'client' is connected to. |