summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorXueruiFa <xuerui.fa@mongodb.com>2021-02-03 21:35:24 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-09 16:00:46 +0000
commit2a1c62250ab5c9f089a2e4699d454ed326057b82 (patch)
treeb17059a8883a57a4ef58634b02cacede429bcc69
parentb8a9330fe1591763ef40d26595b5668e581a1248 (diff)
downloadmongo-2a1c62250ab5c9f089a2e4699d454ed326057b82.tar.gz
SERVER-53996: Avoid holding mutex while reading/writing in tenant migration recipient
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp139
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.h2
2 files changed, 84 insertions, 57 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 19473158f53..922c4bd5742 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -605,9 +605,6 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_initializeStateDoc(
return SemiFuture<void>::makeReady();
}
- auto uniqueOpCtx = cc().makeOperationContext();
- auto opCtx = uniqueOpCtx.get();
-
LOGV2_DEBUG(5081400,
2,
"Recipient migration service initializing state document",
@@ -618,25 +615,34 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_initializeStateDoc(
// Persist the state doc before starting the data sync.
_stateDoc.setState(TenantMigrationRecipientStateEnum::kStarted);
- {
- Lock::ExclusiveLock stateDocInsertLock(
- opCtx, opCtx->lockState(), _recipientService->_stateDocInsertMutex);
- uassertStatusOK(tenantMigrationRecipientEntryHelpers::insertStateDoc(opCtx, _stateDoc));
- }
- if (MONGO_unlikely(failWhilePersistingTenantMigrationRecipientInstanceStateDoc.shouldFail())) {
- LOGV2(4878500, "Persisting state doc failed due to fail point enabled.");
- uassert(ErrorCodes::NotWritablePrimary,
- "Persisting state doc failed - "
- "'failWhilePersistingTenantMigrationRecipientInstanceStateDoc' fail point active",
- false);
- }
- // Wait for the state doc to be majority replicated to make sure that the state doc doesn't
- // rollback.
- auto insertOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
- return WaitForMajorityService::get(opCtx->getServiceContext())
- .waitUntilMajority(insertOpTime)
+ return ExecutorFuture(**_scopedExecutor)
+ .then([this, self = shared_from_this(), stateDoc = _stateDoc] {
+ auto opCtx = cc().makeOperationContext();
+ {
+ Lock::ExclusiveLock stateDocInsertLock(
+ opCtx.get(), opCtx->lockState(), _recipientService->_stateDocInsertMutex);
+ uassertStatusOK(
+ tenantMigrationRecipientEntryHelpers::insertStateDoc(opCtx.get(), stateDoc));
+ }
+
+ if (MONGO_unlikely(
+ failWhilePersistingTenantMigrationRecipientInstanceStateDoc.shouldFail())) {
+ LOGV2(4878500, "Persisting state doc failed due to fail point enabled.");
+ uasserted(
+ ErrorCodes::NotWritablePrimary,
+ "Persisting state doc failed - "
+ "'failWhilePersistingTenantMigrationRecipientInstanceStateDoc' fail point "
+ "active");
+ }
+
+ // Wait for the state doc to be majority replicated to make sure that the state doc
+ // doesn't rollback.
+ auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ return WaitForMajorityService::get(opCtx->getServiceContext())
+ .waitUntilMajority(writeOpTime);
+ })
.semi();
}
@@ -976,9 +982,16 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_onCloneSuccess() {
_stateDoc.setCloneFinishedRecipientOpTime(
repl::ReplicationCoordinator::get(opCtx.get())->getMyLastAppliedOpTime());
- uassertStatusOK(tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), _stateDoc));
- return WaitForMajorityService::get(opCtx->getServiceContext())
- .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp())
+ return ExecutorFuture(**_scopedExecutor)
+ .then([this, self = shared_from_this(), stateDoc = _stateDoc] {
+ auto opCtx = cc().makeOperationContext();
+ uassertStatusOK(
+ tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), stateDoc));
+
+ auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ return WaitForMajorityService::get(opCtx->getServiceContext())
+ .waitUntilMajority(writeOpTime);
+ })
.semi();
}
@@ -995,17 +1008,19 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_getDataConsistentFu
.thenRunOn(**_scopedExecutor)
.then(
[this, self = shared_from_this()](TenantOplogApplier::OpTimePair donorRecipientOpTime) {
- auto opCtx = cc().makeOperationContext();
-
stdx::lock_guard lk(_mutex);
// Persist the state that tenant migration instance has reached
// consistent state.
_stateDoc.setState(TenantMigrationRecipientStateEnum::kConsistent);
- uassertStatusOK(
- tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), _stateDoc));
- return WaitForMajorityService::get(opCtx->getServiceContext())
- .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp());
+ return _stateDoc;
})
+ .then([this, self = shared_from_this()](TenantMigrationRecipientDocument stateDoc) {
+ auto opCtx = cc().makeOperationContext();
+ uassertStatusOK(
+ tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), stateDoc));
+ return WaitForMajorityService::get(opCtx->getServiceContext())
+ .waitUntilMajority(repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp());
+ })
.semi();
}
@@ -1068,31 +1083,36 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_markStateDocAsGarba
return SemiFuture<void>::makeReady();
}
- auto uniqueOpCtx = cc().makeOperationContext();
- auto opCtx = uniqueOpCtx.get();
_stateDoc.setState(TenantMigrationRecipientStateEnum::kDone);
- _stateDoc.setExpireAt(opCtx->getServiceContext()->getFastClockSource()->now() +
+ _stateDoc.setExpireAt(getGlobalServiceContext()->getFastClockSource()->now() +
Milliseconds{repl::tenantMigrationGarbageCollectionDelayMS.load()});
- auto status = [&]() {
- try {
- // Update the state doc with the expireAt set.
- return tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx, _stateDoc);
- } catch (DBException& ex) {
- return ex.toStatus();
- }
- }();
- if (!status.isOK()) {
- // We assume that we only fail with shutDown/stepDown errors (i.e. for failovers).
- // Otherwise, the whole chain would stop running without marking the state doc garbage
- // collectable while we are still the primary.
- invariant(ErrorCodes::isShutdownError(status) || ErrorCodes::isNotPrimaryError(status));
- uassertStatusOK(status);
- }
+ return ExecutorFuture(**_scopedExecutor)
+ .then([this, self = shared_from_this(), stateDoc = _stateDoc] {
+ auto opCtx = cc().makeOperationContext();
+ auto status = [&]() {
+ try {
+ // Update the state doc with the expireAt set.
+ return tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(),
+ stateDoc);
+ } catch (DBException& ex) {
+ return ex.toStatus();
+ }
+ }();
+
+ if (!status.isOK()) {
+ // We assume that we only fail with shutDown/stepDown errors (i.e. for failovers).
+ // Otherwise, the whole chain would stop running without marking the state doc
+ // garbage collectable while we are still the primary.
+ invariant(ErrorCodes::isShutdownError(status) ||
+ ErrorCodes::isNotPrimaryError(status));
+ uassertStatusOK(status);
+ }
- auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
- return WaitForMajorityService::get(opCtx->getServiceContext())
- .waitUntilMajority(writeOpTime)
+ auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ return WaitForMajorityService::get(opCtx->getServiceContext())
+ .waitUntilMajority(writeOpTime);
+ })
.semi();
}
@@ -1240,12 +1260,19 @@ BSONObj TenantMigrationRecipientService::Instance::_getOplogFetcherFilter() cons
<< "o.applyOps.0.ns" << namespaceRegex)));
}
-SharedSemiFuture<void> TenantMigrationRecipientService::Instance::_updateStateDocForMajority(
+SemiFuture<void> TenantMigrationRecipientService::Instance::_updateStateDocForMajority(
WithLock lk) const {
- auto opCtx = cc().makeOperationContext();
- uassertStatusOK(tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), _stateDoc));
- return WaitForMajorityService::get(opCtx->getServiceContext())
- .waitUntilMajority(repl::ReplClientInfo::forClient(cc()).getLastOp());
+ return ExecutorFuture(**_scopedExecutor)
+ .then([this, self = shared_from_this(), stateDoc = _stateDoc] {
+ auto opCtx = cc().makeOperationContext();
+ uassertStatusOK(
+ tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), stateDoc));
+
+ auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ return WaitForMajorityService::get(opCtx->getServiceContext())
+ .waitUntilMajority(writeOpTime);
+ })
+ .semi();
}
void TenantMigrationRecipientService::Instance::_fetchAndStoreDonorClusterTimeKeyDocs(
@@ -1383,7 +1410,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
uasserted(5356201, "Detected FCV change from last migration attempt.");
}
- return SemiFuture<void>::makeReady().share();
+ return SemiFuture<void>::makeReady();
})
.then([this, self = shared_from_this()] {
_stopOrHangOnFailPoint(&fpAfterRecordingRecipientPrimaryStartingFCV);
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h
index 2fec9e6bf55..fb4d7ef5163 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.h
@@ -414,7 +414,7 @@ public:
/**
* Updates the state doc in the database and waits for that to be propagated to a majority.
*/
- SharedSemiFuture<void> _updateStateDocForMajority(WithLock lk) const;
+ SemiFuture<void> _updateStateDocForMajority(WithLock lk) const;
/*
* Returns the majority OpTime on the donor node that 'client' is connected to.